1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/sessions/model_type_registry.h" 6 7 #include "base/bind.h" 8 #include "base/message_loop/message_loop_proxy.h" 9 #include "base/observer_list.h" 10 #include "sync/engine/directory_commit_contributor.h" 11 #include "sync/engine/directory_update_handler.h" 12 #include "sync/engine/non_blocking_sync_common.h" 13 #include "sync/engine/non_blocking_type_processor.h" 14 #include "sync/engine/non_blocking_type_processor_core.h" 15 #include "sync/engine/non_blocking_type_processor_core_interface.h" 16 #include "sync/engine/non_blocking_type_processor_interface.h" 17 #include "sync/sessions/directory_type_debug_info_emitter.h" 18 19 namespace syncer { 20 21 namespace { 22 23 class NonBlockingTypeProcessorWrapper 24 : public NonBlockingTypeProcessorInterface { 25 public: 26 NonBlockingTypeProcessorWrapper( 27 base::WeakPtr<NonBlockingTypeProcessor> processor, 28 scoped_refptr<base::SequencedTaskRunner> processor_task_runner); 29 virtual ~NonBlockingTypeProcessorWrapper(); 30 31 virtual void ReceiveCommitResponse( 32 const DataTypeState& type_state, 33 const CommitResponseDataList& response_list) OVERRIDE; 34 virtual void ReceiveUpdateResponse( 35 const DataTypeState& type_state, 36 const UpdateResponseDataList& response_list) OVERRIDE; 37 38 private: 39 base::WeakPtr<NonBlockingTypeProcessor> processor_; 40 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_; 41 }; 42 43 NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper( 44 base::WeakPtr<NonBlockingTypeProcessor> processor, 45 scoped_refptr<base::SequencedTaskRunner> processor_task_runner) 46 : processor_(processor), processor_task_runner_(processor_task_runner) { 47 } 48 49 NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() { 50 } 51 52 void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse( 53 const DataTypeState& type_state, 54 const CommitResponseDataList& response_list) { 55 processor_task_runner_->PostTask( 56 FROM_HERE, 57 base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion, 58 processor_, 59 type_state, 60 response_list)); 61 } 62 63 void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse( 64 const DataTypeState& type_state, 65 const UpdateResponseDataList& response_list) { 66 processor_task_runner_->PostTask( 67 FROM_HERE, 68 base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived, 69 processor_, 70 type_state, 71 response_list)); 72 } 73 74 class NonBlockingTypeProcessorCoreWrapper 75 : public NonBlockingTypeProcessorCoreInterface { 76 public: 77 NonBlockingTypeProcessorCoreWrapper( 78 base::WeakPtr<NonBlockingTypeProcessorCore> core, 79 scoped_refptr<base::SequencedTaskRunner> sync_thread); 80 virtual ~NonBlockingTypeProcessorCoreWrapper(); 81 82 virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE; 83 84 private: 85 base::WeakPtr<NonBlockingTypeProcessorCore> core_; 86 scoped_refptr<base::SequencedTaskRunner> sync_thread_; 87 }; 88 89 NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper( 90 base::WeakPtr<NonBlockingTypeProcessorCore> core, 91 scoped_refptr<base::SequencedTaskRunner> sync_thread) 92 : core_(core), sync_thread_(sync_thread) { 93 } 94 95 NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() { 96 } 97 98 void NonBlockingTypeProcessorCoreWrapper::RequestCommits( 99 const CommitRequestDataList& list) { 100 sync_thread_->PostTask( 101 FROM_HERE, 102 base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list)); 103 } 104 105 } // namespace 106 107 ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {} 108 109 ModelTypeRegistry::ModelTypeRegistry( 110 const std::vector<scoped_refptr<ModelSafeWorker> >& workers, 111 syncable::Directory* directory) 112 : directory_(directory) { 113 for (size_t i = 0u; i < workers.size(); ++i) { 114 workers_map_.insert( 115 std::make_pair(workers[i]->GetModelSafeGroup(), workers[i])); 116 } 117 } 118 119 ModelTypeRegistry::~ModelTypeRegistry() {} 120 121 void ModelTypeRegistry::SetEnabledDirectoryTypes( 122 const ModelSafeRoutingInfo& routing_info) { 123 // Remove all existing directory processors and delete them. The 124 // DebugInfoEmitters are not deleted here, since we want to preserve their 125 // counters. 126 for (ModelTypeSet::Iterator it = enabled_directory_types_.First(); 127 it.Good(); it.Inc()) { 128 size_t result1 = update_handler_map_.erase(it.Get()); 129 size_t result2 = commit_contributor_map_.erase(it.Get()); 130 DCHECK_EQ(1U, result1); 131 DCHECK_EQ(1U, result2); 132 } 133 134 // Clear the old instances of directory update handlers and commit 135 // contributors, deleting their contents in the processs. 136 directory_update_handlers_.clear(); 137 directory_commit_contributors_.clear(); 138 139 // Create new ones and add them to the appropriate containers. 140 for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin(); 141 routing_iter != routing_info.end(); ++routing_iter) { 142 ModelType type = routing_iter->first; 143 ModelSafeGroup group = routing_iter->second; 144 std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator 145 worker_it = workers_map_.find(group); 146 DCHECK(worker_it != workers_map_.end()); 147 scoped_refptr<ModelSafeWorker> worker = worker_it->second; 148 149 // DebugInfoEmitters are never deleted. Use existing one if we have it. 150 DirectoryTypeDebugInfoEmitter* emitter = NULL; 151 DirectoryTypeDebugInfoEmitterMap::iterator it = 152 directory_type_debug_info_emitter_map_.find(type); 153 if (it != directory_type_debug_info_emitter_map_.end()) { 154 emitter = it->second; 155 } else { 156 emitter = new DirectoryTypeDebugInfoEmitter(directory_, type, 157 &type_debug_info_observers_); 158 directory_type_debug_info_emitter_map_.insert( 159 std::make_pair(type, emitter)); 160 directory_type_debug_info_emitters_.push_back(emitter); 161 } 162 163 DirectoryCommitContributor* committer = 164 new DirectoryCommitContributor(directory_, type, emitter); 165 DirectoryUpdateHandler* updater = 166 new DirectoryUpdateHandler(directory_, type, worker, emitter); 167 168 // These containers take ownership of their contents. 169 directory_commit_contributors_.push_back(committer); 170 directory_update_handlers_.push_back(updater); 171 172 bool inserted1 = 173 update_handler_map_.insert(std::make_pair(type, updater)).second; 174 DCHECK(inserted1) << "Attempt to override existing type handler in map"; 175 176 bool inserted2 = 177 commit_contributor_map_.insert(std::make_pair(type, committer)).second; 178 DCHECK(inserted2) << "Attempt to override existing type handler in map"; 179 } 180 181 enabled_directory_types_ = GetRoutingInfoTypes(routing_info); 182 DCHECK(Intersection(GetEnabledDirectoryTypes(), 183 GetEnabledNonBlockingTypes()).Empty()); 184 } 185 186 void ModelTypeRegistry::InitializeNonBlockingType( 187 ModelType type, 188 const DataTypeState& data_type_state, 189 scoped_refptr<base::SequencedTaskRunner> type_task_runner, 190 base::WeakPtr<NonBlockingTypeProcessor> processor) { 191 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); 192 193 // Initialize CoreProcessor -> Processor communication channel. 194 scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface( 195 new NonBlockingTypeProcessorWrapper(processor, type_task_runner)); 196 scoped_ptr<NonBlockingTypeProcessorCore> core( 197 new NonBlockingTypeProcessorCore( 198 type, data_type_state, processor_interface.Pass())); 199 200 // Initialize Processor -> CoreProcessor communication channel. 201 scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface( 202 new NonBlockingTypeProcessorCoreWrapper( 203 core->AsWeakPtr(), 204 scoped_refptr<base::SequencedTaskRunner>( 205 base::MessageLoopProxy::current()))); 206 type_task_runner->PostTask(FROM_HERE, 207 base::Bind(&NonBlockingTypeProcessor::OnConnect, 208 processor, 209 base::Passed(&core_interface))); 210 211 DCHECK(update_handler_map_.find(type) == update_handler_map_.end()); 212 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end()); 213 214 update_handler_map_.insert(std::make_pair(type, core.get())); 215 commit_contributor_map_.insert(std::make_pair(type, core.get())); 216 217 // The container takes ownership. 218 non_blocking_type_processor_cores_.push_back(core.release()); 219 220 DCHECK(Intersection(GetEnabledDirectoryTypes(), 221 GetEnabledNonBlockingTypes()).Empty()); 222 } 223 224 void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) { 225 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type); 226 DCHECK(update_handler_map_.find(type) != update_handler_map_.end()); 227 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end()); 228 229 size_t updaters_erased = update_handler_map_.erase(type); 230 size_t committers_erased = commit_contributor_map_.erase(type); 231 232 DCHECK_EQ(1U, updaters_erased); 233 DCHECK_EQ(1U, committers_erased); 234 235 // Remove from the ScopedVector, deleting the core in the process. 236 for (ScopedVector<NonBlockingTypeProcessorCore>::iterator it = 237 non_blocking_type_processor_cores_.begin(); 238 it != non_blocking_type_processor_cores_.end(); ++it) { 239 if ((*it)->GetModelType() == type) { 240 non_blocking_type_processor_cores_.erase(it); 241 break; 242 } 243 } 244 } 245 246 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const { 247 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes()); 248 } 249 250 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() { 251 return &update_handler_map_; 252 } 253 254 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() { 255 return &commit_contributor_map_; 256 } 257 258 DirectoryTypeDebugInfoEmitterMap* 259 ModelTypeRegistry::directory_type_debug_info_emitter_map() { 260 return &directory_type_debug_info_emitter_map_; 261 } 262 263 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver( 264 syncer::TypeDebugInfoObserver* observer) { 265 if (!type_debug_info_observers_.HasObserver(observer)) 266 type_debug_info_observers_.AddObserver(observer); 267 } 268 269 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver( 270 syncer::TypeDebugInfoObserver* observer) { 271 type_debug_info_observers_.RemoveObserver(observer); 272 } 273 274 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver( 275 syncer::TypeDebugInfoObserver* observer) { 276 return type_debug_info_observers_.HasObserver(observer); 277 } 278 279 void ModelTypeRegistry::RequestEmitDebugInfo() { 280 for (DirectoryTypeDebugInfoEmitterMap::iterator it = 281 directory_type_debug_info_emitter_map_.begin(); 282 it != directory_type_debug_info_emitter_map_.end(); ++it) { 283 it->second->EmitCommitCountersUpdate(); 284 it->second->EmitUpdateCountersUpdate(); 285 it->second->EmitStatusCountersUpdate(); 286 } 287 } 288 289 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const { 290 return enabled_directory_types_; 291 } 292 293 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const { 294 ModelTypeSet enabled_off_thread_types; 295 for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it = 296 non_blocking_type_processor_cores_.begin(); 297 it != non_blocking_type_processor_cores_.end(); ++it) { 298 enabled_off_thread_types.Put((*it)->GetModelType()); 299 } 300 return enabled_off_thread_types; 301 } 302 303 } // namespace syncer 304