Home | History | Annotate | Download | only in engine
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/engine/model_type_sync_proxy_impl.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/location.h"
      9 #include "sync/engine/model_type_entity.h"
     10 #include "sync/engine/model_type_sync_worker.h"
     11 #include "sync/internal_api/public/sync_context_proxy.h"
     12 #include "sync/syncable/syncable_util.h"
     13 
     14 namespace syncer {
     15 
     16 ModelTypeSyncProxyImpl::ModelTypeSyncProxyImpl(ModelType type)
     17     : type_(type),
     18       is_preferred_(false),
     19       is_connected_(false),
     20       entities_deleter_(&entities_),
     21       pending_updates_map_deleter_(&pending_updates_map_),
     22       weak_ptr_factory_for_ui_(this),
     23       weak_ptr_factory_for_sync_(this) {
     24 }
     25 
     26 ModelTypeSyncProxyImpl::~ModelTypeSyncProxyImpl() {
     27 }
     28 
     29 bool ModelTypeSyncProxyImpl::IsPreferred() const {
     30   DCHECK(CalledOnValidThread());
     31   return is_preferred_;
     32 }
     33 
     34 bool ModelTypeSyncProxyImpl::IsConnected() const {
     35   DCHECK(CalledOnValidThread());
     36   return is_connected_;
     37 }
     38 
     39 ModelType ModelTypeSyncProxyImpl::GetModelType() const {
     40   DCHECK(CalledOnValidThread());
     41   return type_;
     42 }
     43 
     44 void ModelTypeSyncProxyImpl::Enable(
     45     scoped_ptr<SyncContextProxy> sync_context_proxy) {
     46   DCHECK(CalledOnValidThread());
     47   DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
     48 
     49   is_preferred_ = true;
     50 
     51   // TODO(rlarocque): At some point, this should be loaded from storage.
     52   data_type_state_.progress_marker.set_data_type_id(
     53       GetSpecificsFieldNumberFromModelType(type_));
     54 
     55   UpdateResponseDataList saved_pending_updates = GetPendingUpdates();
     56   sync_context_proxy_ = sync_context_proxy.Pass();
     57   sync_context_proxy_->ConnectTypeToSync(
     58       GetModelType(),
     59       data_type_state_,
     60       saved_pending_updates,
     61       weak_ptr_factory_for_sync_.GetWeakPtr());
     62 }
     63 
     64 void ModelTypeSyncProxyImpl::Disable() {
     65   DCHECK(CalledOnValidThread());
     66   is_preferred_ = false;
     67   Disconnect();
     68 
     69   ClearSyncState();
     70 }
     71 
     72 void ModelTypeSyncProxyImpl::Disconnect() {
     73   DCHECK(CalledOnValidThread());
     74   DVLOG(1) << "Asked to disconnect " << ModelTypeToString(type_);
     75   is_connected_ = false;
     76 
     77   if (sync_context_proxy_) {
     78     sync_context_proxy_->Disconnect(GetModelType());
     79     sync_context_proxy_.reset();
     80   }
     81 
     82   weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
     83   worker_.reset();
     84 
     85   ClearTransientSyncState();
     86 }
     87 
     88 base::WeakPtr<ModelTypeSyncProxyImpl> ModelTypeSyncProxyImpl::AsWeakPtrForUI() {
     89   DCHECK(CalledOnValidThread());
     90   return weak_ptr_factory_for_ui_.GetWeakPtr();
     91 }
     92 
     93 void ModelTypeSyncProxyImpl::OnConnect(scoped_ptr<ModelTypeSyncWorker> worker) {
     94   DCHECK(CalledOnValidThread());
     95   DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
     96 
     97   is_connected_ = true;
     98   worker_ = worker.Pass();
     99 
    100   FlushPendingCommitRequests();
    101 }
    102 
    103 void ModelTypeSyncProxyImpl::Put(const std::string& client_tag,
    104                                  const sync_pb::EntitySpecifics& specifics) {
    105   DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
    106 
    107   const std::string client_tag_hash(
    108       syncable::GenerateSyncableHash(type_, client_tag));
    109 
    110   EntityMap::iterator it = entities_.find(client_tag_hash);
    111   if (it == entities_.end()) {
    112     scoped_ptr<ModelTypeEntity> entity(ModelTypeEntity::NewLocalItem(
    113         client_tag, specifics, base::Time::Now()));
    114     entities_.insert(std::make_pair(client_tag_hash, entity.release()));
    115   } else {
    116     ModelTypeEntity* entity = it->second;
    117     entity->MakeLocalChange(specifics);
    118   }
    119 
    120   FlushPendingCommitRequests();
    121 }
    122 
    123 void ModelTypeSyncProxyImpl::Delete(const std::string& client_tag) {
    124   const std::string client_tag_hash(
    125       syncable::GenerateSyncableHash(type_, client_tag));
    126 
    127   EntityMap::iterator it = entities_.find(client_tag_hash);
    128   if (it == entities_.end()) {
    129     // That's unusual, but not necessarily a bad thing.
    130     // Missing is as good as deleted as far as the model is concerned.
    131     DLOG(WARNING) << "Attempted to delete missing item."
    132                   << " client tag: " << client_tag;
    133   } else {
    134     ModelTypeEntity* entity = it->second;
    135     entity->Delete();
    136   }
    137 
    138   FlushPendingCommitRequests();
    139 }
    140 
    141 void ModelTypeSyncProxyImpl::FlushPendingCommitRequests() {
    142   CommitRequestDataList commit_requests;
    143 
    144   // Don't bother sending anything if there's no one to send to.
    145   if (!IsConnected())
    146     return;
    147 
    148   // Don't send anything if the type is not ready to handle commits.
    149   if (!data_type_state_.initial_sync_done)
    150     return;
    151 
    152   // TODO(rlarocque): Do something smarter than iterate here.
    153   for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
    154        ++it) {
    155     if (it->second->RequiresCommitRequest()) {
    156       CommitRequestData request;
    157       it->second->InitializeCommitRequestData(&request);
    158       commit_requests.push_back(request);
    159       it->second->SetCommitRequestInProgress();
    160     }
    161   }
    162 
    163   if (!commit_requests.empty())
    164     worker_->EnqueueForCommit(commit_requests);
    165 }
    166 
    167 void ModelTypeSyncProxyImpl::OnCommitCompleted(
    168     const DataTypeState& type_state,
    169     const CommitResponseDataList& response_list) {
    170   data_type_state_ = type_state;
    171 
    172   for (CommitResponseDataList::const_iterator list_it = response_list.begin();
    173        list_it != response_list.end();
    174        ++list_it) {
    175     const CommitResponseData& response_data = *list_it;
    176     const std::string& client_tag_hash = response_data.client_tag_hash;
    177 
    178     EntityMap::iterator it = entities_.find(client_tag_hash);
    179     if (it == entities_.end()) {
    180       NOTREACHED() << "Received commit response for missing item."
    181                    << " type: " << type_ << " client_tag: " << client_tag_hash;
    182       return;
    183     } else {
    184       it->second->ReceiveCommitResponse(response_data.id,
    185                                         response_data.sequence_number,
    186                                         response_data.response_version,
    187                                         data_type_state_.encryption_key_name);
    188     }
    189   }
    190 }
    191 
    192 void ModelTypeSyncProxyImpl::OnUpdateReceived(
    193     const DataTypeState& data_type_state,
    194     const UpdateResponseDataList& response_list,
    195     const UpdateResponseDataList& pending_updates) {
    196   bool got_new_encryption_requirements = data_type_state_.encryption_key_name !=
    197                                          data_type_state.encryption_key_name;
    198 
    199   data_type_state_ = data_type_state;
    200 
    201   for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
    202        list_it != response_list.end();
    203        ++list_it) {
    204     const UpdateResponseData& response_data = *list_it;
    205     const std::string& client_tag_hash = response_data.client_tag_hash;
    206 
    207     UpdateMap::iterator old_it = pending_updates_map_.find(client_tag_hash);
    208     if (old_it != pending_updates_map_.end()) {
    209       // If we're being asked to apply an update to this entity, this overrides
    210       // the previous pending updates.
    211       delete old_it->second;
    212       pending_updates_map_.erase(old_it);
    213     }
    214 
    215     EntityMap::iterator it = entities_.find(client_tag_hash);
    216     if (it == entities_.end()) {
    217       scoped_ptr<ModelTypeEntity> entity =
    218           ModelTypeEntity::FromServerUpdate(response_data.id,
    219                                             response_data.client_tag_hash,
    220                                             response_data.non_unique_name,
    221                                             response_data.response_version,
    222                                             response_data.specifics,
    223                                             response_data.deleted,
    224                                             response_data.ctime,
    225                                             response_data.mtime,
    226                                             response_data.encryption_key_name);
    227       entities_.insert(std::make_pair(client_tag_hash, entity.release()));
    228     } else {
    229       ModelTypeEntity* entity = it->second;
    230       entity->ApplyUpdateFromServer(response_data.response_version,
    231                                     response_data.deleted,
    232                                     response_data.specifics,
    233                                     response_data.mtime,
    234                                     response_data.encryption_key_name);
    235 
    236       // TODO: Do something special when conflicts are detected.
    237     }
    238 
    239     // If the received entity has out of date encryption, we schedule another
    240     // commit to fix it.
    241     if (data_type_state_.encryption_key_name !=
    242         response_data.encryption_key_name) {
    243       DVLOG(2) << ModelTypeToString(type_) << ": Requesting re-encrypt commit "
    244                << response_data.encryption_key_name << " -> "
    245                << data_type_state_.encryption_key_name;
    246       EntityMap::iterator it2 = entities_.find(client_tag_hash);
    247       it2->second->UpdateDesiredEncryptionKey(
    248           data_type_state_.encryption_key_name);
    249     }
    250   }
    251 
    252   // Save pending updates in the appropriate data structure.
    253   for (UpdateResponseDataList::const_iterator list_it = pending_updates.begin();
    254        list_it != pending_updates.end();
    255        ++list_it) {
    256     const UpdateResponseData& update = *list_it;
    257     const std::string& client_tag_hash = update.client_tag_hash;
    258 
    259     UpdateMap::iterator lookup_it = pending_updates_map_.find(client_tag_hash);
    260     if (lookup_it == pending_updates_map_.end()) {
    261       pending_updates_map_.insert(
    262           std::make_pair(client_tag_hash, new UpdateResponseData(update)));
    263     } else if (lookup_it->second->response_version <= update.response_version) {
    264       delete lookup_it->second;
    265       pending_updates_map_.erase(lookup_it);
    266       pending_updates_map_.insert(
    267           std::make_pair(client_tag_hash, new UpdateResponseData(update)));
    268     } else {
    269       // Received update is stale, do not overwrite existing.
    270     }
    271   }
    272 
    273   if (got_new_encryption_requirements) {
    274     for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
    275          ++it) {
    276       it->second->UpdateDesiredEncryptionKey(
    277           data_type_state_.encryption_key_name);
    278     }
    279   }
    280 
    281   // We may have new reasons to commit by the time this function is done.
    282   FlushPendingCommitRequests();
    283 
    284   // TODO: Inform the model of the new or updated data.
    285   // TODO: Persist the new data on disk.
    286 }
    287 
    288 UpdateResponseDataList ModelTypeSyncProxyImpl::GetPendingUpdates() {
    289   UpdateResponseDataList pending_updates_list;
    290   for (UpdateMap::const_iterator it = pending_updates_map_.begin();
    291        it != pending_updates_map_.end();
    292        ++it) {
    293     pending_updates_list.push_back(*it->second);
    294   }
    295   return pending_updates_list;
    296 }
    297 
    298 void ModelTypeSyncProxyImpl::ClearTransientSyncState() {
    299   for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
    300        ++it) {
    301     it->second->ClearTransientSyncState();
    302   }
    303 }
    304 
    305 void ModelTypeSyncProxyImpl::ClearSyncState() {
    306   for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
    307        ++it) {
    308     it->second->ClearSyncState();
    309   }
    310   STLDeleteValues(&pending_updates_map_);
    311   data_type_state_ = DataTypeState();
    312 }
    313 
    314 }  // namespace syncer
    315