Home | History | Annotate | Download | only in sessions
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/sessions/model_type_registry.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/message_loop/message_loop_proxy.h"
      9 #include "base/observer_list.h"
     10 #include "sync/engine/directory_commit_contributor.h"
     11 #include "sync/engine/directory_update_handler.h"
     12 #include "sync/engine/non_blocking_sync_common.h"
     13 #include "sync/engine/non_blocking_type_processor.h"
     14 #include "sync/engine/non_blocking_type_processor_core.h"
     15 #include "sync/engine/non_blocking_type_processor_core_interface.h"
     16 #include "sync/engine/non_blocking_type_processor_interface.h"
     17 #include "sync/sessions/directory_type_debug_info_emitter.h"
     18 
     19 namespace syncer {
     20 
     21 namespace {
     22 
     23 class NonBlockingTypeProcessorWrapper
     24     : public NonBlockingTypeProcessorInterface {
     25  public:
     26   NonBlockingTypeProcessorWrapper(
     27       base::WeakPtr<NonBlockingTypeProcessor> processor,
     28       scoped_refptr<base::SequencedTaskRunner> processor_task_runner);
     29   virtual ~NonBlockingTypeProcessorWrapper();
     30 
     31   virtual void ReceiveCommitResponse(
     32       const DataTypeState& type_state,
     33       const CommitResponseDataList& response_list) OVERRIDE;
     34   virtual void ReceiveUpdateResponse(
     35       const DataTypeState& type_state,
     36       const UpdateResponseDataList& response_list) OVERRIDE;
     37 
     38  private:
     39   base::WeakPtr<NonBlockingTypeProcessor> processor_;
     40   scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
     41 };
     42 
     43 NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper(
     44     base::WeakPtr<NonBlockingTypeProcessor> processor,
     45     scoped_refptr<base::SequencedTaskRunner> processor_task_runner)
     46     : processor_(processor), processor_task_runner_(processor_task_runner) {
     47 }
     48 
     49 NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() {
     50 }
     51 
     52 void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse(
     53     const DataTypeState& type_state,
     54     const CommitResponseDataList& response_list) {
     55   processor_task_runner_->PostTask(
     56       FROM_HERE,
     57       base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion,
     58                  processor_,
     59                  type_state,
     60                  response_list));
     61 }
     62 
     63 void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse(
     64     const DataTypeState& type_state,
     65     const UpdateResponseDataList& response_list) {
     66   processor_task_runner_->PostTask(
     67       FROM_HERE,
     68       base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived,
     69                  processor_,
     70                  type_state,
     71                  response_list));
     72 }
     73 
     74 class NonBlockingTypeProcessorCoreWrapper
     75     : public NonBlockingTypeProcessorCoreInterface {
     76  public:
     77   NonBlockingTypeProcessorCoreWrapper(
     78       base::WeakPtr<NonBlockingTypeProcessorCore> core,
     79       scoped_refptr<base::SequencedTaskRunner> sync_thread);
     80   virtual ~NonBlockingTypeProcessorCoreWrapper();
     81 
     82   virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE;
     83 
     84  private:
     85   base::WeakPtr<NonBlockingTypeProcessorCore> core_;
     86   scoped_refptr<base::SequencedTaskRunner> sync_thread_;
     87 };
     88 
     89 NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper(
     90     base::WeakPtr<NonBlockingTypeProcessorCore> core,
     91     scoped_refptr<base::SequencedTaskRunner> sync_thread)
     92     : core_(core), sync_thread_(sync_thread) {
     93 }
     94 
     95 NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() {
     96 }
     97 
     98 void NonBlockingTypeProcessorCoreWrapper::RequestCommits(
     99     const CommitRequestDataList& list) {
    100   sync_thread_->PostTask(
    101       FROM_HERE,
    102       base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list));
    103 }
    104 
    105 }  // namespace
    106 
    107 ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {}
    108 
    109 ModelTypeRegistry::ModelTypeRegistry(
    110     const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
    111     syncable::Directory* directory)
    112     : directory_(directory) {
    113   for (size_t i = 0u; i < workers.size(); ++i) {
    114     workers_map_.insert(
    115         std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
    116   }
    117 }
    118 
    119 ModelTypeRegistry::~ModelTypeRegistry() {}
    120 
    121 void ModelTypeRegistry::SetEnabledDirectoryTypes(
    122     const ModelSafeRoutingInfo& routing_info) {
    123   // Remove all existing directory processors and delete them.  The
    124   // DebugInfoEmitters are not deleted here, since we want to preserve their
    125   // counters.
    126   for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
    127        it.Good(); it.Inc()) {
    128     size_t result1 = update_handler_map_.erase(it.Get());
    129     size_t result2 = commit_contributor_map_.erase(it.Get());
    130     DCHECK_EQ(1U, result1);
    131     DCHECK_EQ(1U, result2);
    132   }
    133 
    134   // Clear the old instances of directory update handlers and commit
    135   // contributors, deleting their contents in the processs.
    136   directory_update_handlers_.clear();
    137   directory_commit_contributors_.clear();
    138 
    139   // Create new ones and add them to the appropriate containers.
    140   for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
    141        routing_iter != routing_info.end(); ++routing_iter) {
    142     ModelType type = routing_iter->first;
    143     ModelSafeGroup group = routing_iter->second;
    144     std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
    145         worker_it = workers_map_.find(group);
    146     DCHECK(worker_it != workers_map_.end());
    147     scoped_refptr<ModelSafeWorker> worker = worker_it->second;
    148 
    149     // DebugInfoEmitters are never deleted.  Use existing one if we have it.
    150     DirectoryTypeDebugInfoEmitter* emitter = NULL;
    151     DirectoryTypeDebugInfoEmitterMap::iterator it =
    152         directory_type_debug_info_emitter_map_.find(type);
    153     if (it != directory_type_debug_info_emitter_map_.end()) {
    154       emitter = it->second;
    155     } else {
    156       emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
    157                                                   &type_debug_info_observers_);
    158       directory_type_debug_info_emitter_map_.insert(
    159           std::make_pair(type, emitter));
    160       directory_type_debug_info_emitters_.push_back(emitter);
    161     }
    162 
    163     DirectoryCommitContributor* committer =
    164         new DirectoryCommitContributor(directory_, type, emitter);
    165     DirectoryUpdateHandler* updater =
    166         new DirectoryUpdateHandler(directory_, type, worker, emitter);
    167 
    168     // These containers take ownership of their contents.
    169     directory_commit_contributors_.push_back(committer);
    170     directory_update_handlers_.push_back(updater);
    171 
    172     bool inserted1 =
    173         update_handler_map_.insert(std::make_pair(type, updater)).second;
    174     DCHECK(inserted1) << "Attempt to override existing type handler in map";
    175 
    176     bool inserted2 =
    177         commit_contributor_map_.insert(std::make_pair(type, committer)).second;
    178     DCHECK(inserted2) << "Attempt to override existing type handler in map";
    179   }
    180 
    181   enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
    182   DCHECK(Intersection(GetEnabledDirectoryTypes(),
    183                       GetEnabledNonBlockingTypes()).Empty());
    184 }
    185 
    186 void ModelTypeRegistry::InitializeNonBlockingType(
    187     ModelType type,
    188     const DataTypeState& data_type_state,
    189     scoped_refptr<base::SequencedTaskRunner> type_task_runner,
    190     base::WeakPtr<NonBlockingTypeProcessor> processor) {
    191   DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
    192 
    193   // Initialize CoreProcessor -> Processor communication channel.
    194   scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface(
    195       new NonBlockingTypeProcessorWrapper(processor, type_task_runner));
    196   scoped_ptr<NonBlockingTypeProcessorCore> core(
    197       new NonBlockingTypeProcessorCore(
    198           type, data_type_state, processor_interface.Pass()));
    199 
    200   // Initialize Processor -> CoreProcessor communication channel.
    201   scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface(
    202       new NonBlockingTypeProcessorCoreWrapper(
    203           core->AsWeakPtr(),
    204           scoped_refptr<base::SequencedTaskRunner>(
    205               base::MessageLoopProxy::current())));
    206   type_task_runner->PostTask(FROM_HERE,
    207                              base::Bind(&NonBlockingTypeProcessor::OnConnect,
    208                                         processor,
    209                                         base::Passed(&core_interface)));
    210 
    211   DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
    212   DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
    213 
    214   update_handler_map_.insert(std::make_pair(type, core.get()));
    215   commit_contributor_map_.insert(std::make_pair(type, core.get()));
    216 
    217   // The container takes ownership.
    218   non_blocking_type_processor_cores_.push_back(core.release());
    219 
    220   DCHECK(Intersection(GetEnabledDirectoryTypes(),
    221                       GetEnabledNonBlockingTypes()).Empty());
    222 }
    223 
    224 void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) {
    225   DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
    226   DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
    227   DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
    228 
    229   size_t updaters_erased = update_handler_map_.erase(type);
    230   size_t committers_erased = commit_contributor_map_.erase(type);
    231 
    232   DCHECK_EQ(1U, updaters_erased);
    233   DCHECK_EQ(1U, committers_erased);
    234 
    235   // Remove from the ScopedVector, deleting the core in the process.
    236   for (ScopedVector<NonBlockingTypeProcessorCore>::iterator it =
    237        non_blocking_type_processor_cores_.begin();
    238        it != non_blocking_type_processor_cores_.end(); ++it) {
    239     if ((*it)->GetModelType() == type) {
    240       non_blocking_type_processor_cores_.erase(it);
    241       break;
    242     }
    243   }
    244 }
    245 
    246 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
    247   return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
    248 }
    249 
    250 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
    251   return &update_handler_map_;
    252 }
    253 
    254 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
    255   return &commit_contributor_map_;
    256 }
    257 
    258 DirectoryTypeDebugInfoEmitterMap*
    259 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
    260   return &directory_type_debug_info_emitter_map_;
    261 }
    262 
    263 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
    264     syncer::TypeDebugInfoObserver* observer) {
    265   if (!type_debug_info_observers_.HasObserver(observer))
    266     type_debug_info_observers_.AddObserver(observer);
    267 }
    268 
    269 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
    270     syncer::TypeDebugInfoObserver* observer) {
    271   type_debug_info_observers_.RemoveObserver(observer);
    272 }
    273 
    274 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
    275     syncer::TypeDebugInfoObserver* observer) {
    276   return type_debug_info_observers_.HasObserver(observer);
    277 }
    278 
    279 void ModelTypeRegistry::RequestEmitDebugInfo() {
    280   for (DirectoryTypeDebugInfoEmitterMap::iterator it =
    281        directory_type_debug_info_emitter_map_.begin();
    282        it != directory_type_debug_info_emitter_map_.end(); ++it) {
    283     it->second->EmitCommitCountersUpdate();
    284     it->second->EmitUpdateCountersUpdate();
    285     it->second->EmitStatusCountersUpdate();
    286   }
    287 }
    288 
    289 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
    290   return enabled_directory_types_;
    291 }
    292 
    293 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
    294   ModelTypeSet enabled_off_thread_types;
    295   for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it =
    296            non_blocking_type_processor_cores_.begin();
    297        it != non_blocking_type_processor_cores_.end(); ++it) {
    298     enabled_off_thread_types.Put((*it)->GetModelType());
    299   }
    300   return enabled_off_thread_types;
    301 }
    302 
    303 }  // namespace syncer
    304