Home | History | Annotate | Download | only in sessions
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/sessions/model_type_registry.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/observer_list.h"
      9 #include "base/thread_task_runner_handle.h"
     10 #include "sync/engine/directory_commit_contributor.h"
     11 #include "sync/engine/directory_update_handler.h"
     12 #include "sync/engine/model_type_sync_proxy.h"
     13 #include "sync/engine/model_type_sync_proxy_impl.h"
     14 #include "sync/engine/model_type_sync_worker.h"
     15 #include "sync/engine/model_type_sync_worker_impl.h"
     16 #include "sync/internal_api/public/non_blocking_sync_common.h"
     17 #include "sync/sessions/directory_type_debug_info_emitter.h"
     18 #include "sync/util/cryptographer.h"
     19 
     20 namespace syncer {
     21 
     22 namespace {
     23 
     24 class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
     25  public:
     26   ModelTypeSyncProxyWrapper(
     27       const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
     28       const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
     29   virtual ~ModelTypeSyncProxyWrapper();
     30 
     31   virtual void OnCommitCompleted(
     32       const DataTypeState& type_state,
     33       const CommitResponseDataList& response_list) OVERRIDE;
     34   virtual void OnUpdateReceived(
     35       const DataTypeState& type_state,
     36       const UpdateResponseDataList& response_list,
     37       const UpdateResponseDataList& pending_updates) OVERRIDE;
     38 
     39  private:
     40   base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
     41   scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
     42 };
     43 
     44 ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
     45     const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
     46     const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
     47     : processor_(proxy), processor_task_runner_(processor_task_runner) {
     48 }
     49 
     50 ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
     51 }
     52 
     53 void ModelTypeSyncProxyWrapper::OnCommitCompleted(
     54     const DataTypeState& type_state,
     55     const CommitResponseDataList& response_list) {
     56   processor_task_runner_->PostTask(
     57       FROM_HERE,
     58       base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompleted,
     59                  processor_,
     60                  type_state,
     61                  response_list));
     62 }
     63 
     64 void ModelTypeSyncProxyWrapper::OnUpdateReceived(
     65     const DataTypeState& type_state,
     66     const UpdateResponseDataList& response_list,
     67     const UpdateResponseDataList& pending_updates) {
     68   processor_task_runner_->PostTask(
     69       FROM_HERE,
     70       base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
     71                  processor_,
     72                  type_state,
     73                  response_list,
     74                  pending_updates));
     75 }
     76 
     77 class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
     78  public:
     79   ModelTypeSyncWorkerWrapper(
     80       const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
     81       const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
     82   virtual ~ModelTypeSyncWorkerWrapper();
     83 
     84   virtual void EnqueueForCommit(const CommitRequestDataList& list) OVERRIDE;
     85 
     86  private:
     87   base::WeakPtr<ModelTypeSyncWorkerImpl> worker_;
     88   scoped_refptr<base::SequencedTaskRunner> sync_thread_;
     89 };
     90 
     91 ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
     92     const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
     93     const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
     94     : worker_(worker), sync_thread_(sync_thread) {
     95 }
     96 
     97 ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
     98 }
     99 
    100 void ModelTypeSyncWorkerWrapper::EnqueueForCommit(
    101     const CommitRequestDataList& list) {
    102   sync_thread_->PostTask(
    103       FROM_HERE,
    104       base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list));
    105 }
    106 
    107 }  // namespace
    108 
    109 ModelTypeRegistry::ModelTypeRegistry(
    110     const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
    111     syncable::Directory* directory,
    112     NudgeHandler* nudge_handler)
    113     : directory_(directory),
    114       nudge_handler_(nudge_handler),
    115       weak_ptr_factory_(this) {
    116   for (size_t i = 0u; i < workers.size(); ++i) {
    117     workers_map_.insert(
    118         std::make_pair(workers[i]->GetModelSafeGroup(), workers[i]));
    119   }
    120 }
    121 
    122 ModelTypeRegistry::~ModelTypeRegistry() {
    123 }
    124 
    125 void ModelTypeRegistry::SetEnabledDirectoryTypes(
    126     const ModelSafeRoutingInfo& routing_info) {
    127   // Remove all existing directory processors and delete them.  The
    128   // DebugInfoEmitters are not deleted here, since we want to preserve their
    129   // counters.
    130   for (ModelTypeSet::Iterator it = enabled_directory_types_.First();
    131        it.Good(); it.Inc()) {
    132     size_t result1 = update_handler_map_.erase(it.Get());
    133     size_t result2 = commit_contributor_map_.erase(it.Get());
    134     DCHECK_EQ(1U, result1);
    135     DCHECK_EQ(1U, result2);
    136   }
    137 
    138   // Clear the old instances of directory update handlers and commit
    139   // contributors, deleting their contents in the processs.
    140   directory_update_handlers_.clear();
    141   directory_commit_contributors_.clear();
    142 
    143   // Create new ones and add them to the appropriate containers.
    144   for (ModelSafeRoutingInfo::const_iterator routing_iter = routing_info.begin();
    145        routing_iter != routing_info.end(); ++routing_iter) {
    146     ModelType type = routing_iter->first;
    147     ModelSafeGroup group = routing_iter->second;
    148     std::map<ModelSafeGroup, scoped_refptr<ModelSafeWorker> >::iterator
    149         worker_it = workers_map_.find(group);
    150     DCHECK(worker_it != workers_map_.end());
    151     scoped_refptr<ModelSafeWorker> worker = worker_it->second;
    152 
    153     // DebugInfoEmitters are never deleted.  Use existing one if we have it.
    154     DirectoryTypeDebugInfoEmitter* emitter = NULL;
    155     DirectoryTypeDebugInfoEmitterMap::iterator it =
    156         directory_type_debug_info_emitter_map_.find(type);
    157     if (it != directory_type_debug_info_emitter_map_.end()) {
    158       emitter = it->second;
    159     } else {
    160       emitter = new DirectoryTypeDebugInfoEmitter(directory_, type,
    161                                                   &type_debug_info_observers_);
    162       directory_type_debug_info_emitter_map_.insert(
    163           std::make_pair(type, emitter));
    164       directory_type_debug_info_emitters_.push_back(emitter);
    165     }
    166 
    167     DirectoryCommitContributor* committer =
    168         new DirectoryCommitContributor(directory_, type, emitter);
    169     DirectoryUpdateHandler* updater =
    170         new DirectoryUpdateHandler(directory_, type, worker, emitter);
    171 
    172     // These containers take ownership of their contents.
    173     directory_commit_contributors_.push_back(committer);
    174     directory_update_handlers_.push_back(updater);
    175 
    176     bool inserted1 =
    177         update_handler_map_.insert(std::make_pair(type, updater)).second;
    178     DCHECK(inserted1) << "Attempt to override existing type handler in map";
    179 
    180     bool inserted2 =
    181         commit_contributor_map_.insert(std::make_pair(type, committer)).second;
    182     DCHECK(inserted2) << "Attempt to override existing type handler in map";
    183   }
    184 
    185   enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
    186   DCHECK(Intersection(GetEnabledDirectoryTypes(),
    187                       GetEnabledNonBlockingTypes()).Empty());
    188 }
    189 
    190 void ModelTypeRegistry::ConnectSyncTypeToWorker(
    191     ModelType type,
    192     const DataTypeState& data_type_state,
    193     const UpdateResponseDataList& saved_pending_updates,
    194     const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
    195     const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
    196   DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
    197 
    198   // Initialize Worker -> Proxy communication channel.
    199   scoped_ptr<ModelTypeSyncProxy> proxy(
    200       new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
    201   scoped_ptr<Cryptographer> cryptographer_copy;
    202   if (encrypted_types_.Has(type))
    203     cryptographer_copy.reset(new Cryptographer(*cryptographer_));
    204 
    205   scoped_ptr<ModelTypeSyncWorkerImpl> worker(
    206       new ModelTypeSyncWorkerImpl(type,
    207                                   data_type_state,
    208                                   saved_pending_updates,
    209                                   cryptographer_copy.Pass(),
    210                                   nudge_handler_,
    211                                   proxy.Pass()));
    212 
    213   // Initialize Proxy -> Worker communication channel.
    214   scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
    215       new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(),
    216                                      scoped_refptr<base::SequencedTaskRunner>(
    217                                          base::ThreadTaskRunnerHandle::Get())));
    218   type_task_runner->PostTask(FROM_HERE,
    219                              base::Bind(&ModelTypeSyncProxyImpl::OnConnect,
    220                                         proxy_impl,
    221                                         base::Passed(&wrapped_worker)));
    222 
    223   DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
    224   DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
    225 
    226   update_handler_map_.insert(std::make_pair(type, worker.get()));
    227   commit_contributor_map_.insert(std::make_pair(type, worker.get()));
    228 
    229   // The container takes ownership.
    230   model_type_sync_workers_.push_back(worker.release());
    231 
    232   DCHECK(Intersection(GetEnabledDirectoryTypes(),
    233                       GetEnabledNonBlockingTypes()).Empty());
    234 }
    235 
    236 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
    237   DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
    238   DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
    239   DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
    240 
    241   size_t updaters_erased = update_handler_map_.erase(type);
    242   size_t committers_erased = commit_contributor_map_.erase(type);
    243 
    244   DCHECK_EQ(1U, updaters_erased);
    245   DCHECK_EQ(1U, committers_erased);
    246 
    247   // Remove from the ScopedVector, deleting the worker in the process.
    248   for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
    249            model_type_sync_workers_.begin();
    250        it != model_type_sync_workers_.end();
    251        ++it) {
    252     if ((*it)->GetModelType() == type) {
    253       model_type_sync_workers_.erase(it);
    254       break;
    255     }
    256   }
    257 }
    258 
    259 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
    260   return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
    261 }
    262 
    263 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
    264   return &update_handler_map_;
    265 }
    266 
    267 CommitContributorMap* ModelTypeRegistry::commit_contributor_map() {
    268   return &commit_contributor_map_;
    269 }
    270 
    271 DirectoryTypeDebugInfoEmitterMap*
    272 ModelTypeRegistry::directory_type_debug_info_emitter_map() {
    273   return &directory_type_debug_info_emitter_map_;
    274 }
    275 
    276 void ModelTypeRegistry::RegisterDirectoryTypeDebugInfoObserver(
    277     syncer::TypeDebugInfoObserver* observer) {
    278   if (!type_debug_info_observers_.HasObserver(observer))
    279     type_debug_info_observers_.AddObserver(observer);
    280 }
    281 
    282 void ModelTypeRegistry::UnregisterDirectoryTypeDebugInfoObserver(
    283     syncer::TypeDebugInfoObserver* observer) {
    284   type_debug_info_observers_.RemoveObserver(observer);
    285 }
    286 
    287 bool ModelTypeRegistry::HasDirectoryTypeDebugInfoObserver(
    288     syncer::TypeDebugInfoObserver* observer) {
    289   return type_debug_info_observers_.HasObserver(observer);
    290 }
    291 
    292 void ModelTypeRegistry::RequestEmitDebugInfo() {
    293   for (DirectoryTypeDebugInfoEmitterMap::iterator it =
    294        directory_type_debug_info_emitter_map_.begin();
    295        it != directory_type_debug_info_emitter_map_.end(); ++it) {
    296     it->second->EmitCommitCountersUpdate();
    297     it->second->EmitUpdateCountersUpdate();
    298     it->second->EmitStatusCountersUpdate();
    299   }
    300 }
    301 
    302 base::WeakPtr<SyncContext> ModelTypeRegistry::AsWeakPtr() {
    303   return weak_ptr_factory_.GetWeakPtr();
    304 }
    305 
    306 void ModelTypeRegistry::OnPassphraseRequired(
    307     PassphraseRequiredReason reason,
    308     const sync_pb::EncryptedData& pending_keys) {
    309 }
    310 
    311 void ModelTypeRegistry::OnPassphraseAccepted() {
    312 }
    313 
    314 void ModelTypeRegistry::OnBootstrapTokenUpdated(
    315     const std::string& bootstrap_token,
    316     BootstrapTokenType type) {
    317 }
    318 
    319 void ModelTypeRegistry::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
    320                                                 bool encrypt_everything) {
    321   encrypted_types_ = encrypted_types;
    322   OnEncryptionStateChanged();
    323 }
    324 
    325 void ModelTypeRegistry::OnEncryptionComplete() {
    326 }
    327 
    328 void ModelTypeRegistry::OnCryptographerStateChanged(
    329     Cryptographer* cryptographer) {
    330   cryptographer_.reset(new Cryptographer(*cryptographer));
    331   OnEncryptionStateChanged();
    332 }
    333 
    334 void ModelTypeRegistry::OnPassphraseTypeChanged(PassphraseType type,
    335                                                 base::Time passphrase_time) {
    336 }
    337 
    338 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
    339   return enabled_directory_types_;
    340 }
    341 
    342 void ModelTypeRegistry::OnEncryptionStateChanged() {
    343   for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
    344            model_type_sync_workers_.begin();
    345        it != model_type_sync_workers_.end();
    346        ++it) {
    347     if (encrypted_types_.Has((*it)->GetModelType())) {
    348       (*it)->UpdateCryptographer(
    349           make_scoped_ptr(new Cryptographer(*cryptographer_)));
    350     }
    351   }
    352 }
    353 
    354 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
    355   ModelTypeSet enabled_off_thread_types;
    356   for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it =
    357            model_type_sync_workers_.begin();
    358        it != model_type_sync_workers_.end();
    359        ++it) {
    360     enabled_off_thread_types.Put((*it)->GetModelType());
    361   }
    362   return enabled_off_thread_types;
    363 }
    364 
    365 }  // namespace syncer
    366