Home | History | Annotate | Download | only in engine
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/engine/non_blocking_type_processor.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/location.h"
      9 #include "base/message_loop/message_loop_proxy.h"
     10 #include "sync/engine/model_thread_sync_entity.h"
     11 #include "sync/engine/non_blocking_type_processor_core_interface.h"
     12 #include "sync/internal_api/public/sync_core_proxy.h"
     13 #include "sync/syncable/syncable_util.h"
     14 
     15 namespace syncer {
     16 
     17 NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type)
     18     : type_(type),
     19       is_preferred_(false),
     20       is_connected_(false),
     21       entities_deleter_(&entities_),
     22       weak_ptr_factory_for_ui_(this),
     23       weak_ptr_factory_for_sync_(this) {
     24 }
     25 
     26 NonBlockingTypeProcessor::~NonBlockingTypeProcessor() {
     27 }
     28 
     29 bool NonBlockingTypeProcessor::IsPreferred() const {
     30   DCHECK(CalledOnValidThread());
     31   return is_preferred_;
     32 }
     33 
     34 bool NonBlockingTypeProcessor::IsConnected() const {
     35   DCHECK(CalledOnValidThread());
     36   return is_connected_;
     37 }
     38 
     39 ModelType NonBlockingTypeProcessor::GetModelType() const {
     40   DCHECK(CalledOnValidThread());
     41   return type_;
     42 }
     43 
     44 void NonBlockingTypeProcessor::Enable(
     45     scoped_ptr<SyncCoreProxy> sync_core_proxy) {
     46   DCHECK(CalledOnValidThread());
     47   DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
     48 
     49   is_preferred_ = true;
     50 
     51   // TODO(rlarocque): At some point, this should be loaded from storage.
     52   data_type_state_.progress_marker.set_data_type_id(
     53       GetSpecificsFieldNumberFromModelType(type_));
     54 
     55   sync_core_proxy_ = sync_core_proxy.Pass();
     56   sync_core_proxy_->ConnectTypeToCore(GetModelType(),
     57                                       data_type_state_,
     58                                       weak_ptr_factory_for_sync_.GetWeakPtr());
     59 }
     60 
     61 void NonBlockingTypeProcessor::Disable() {
     62   DCHECK(CalledOnValidThread());
     63   is_preferred_ = false;
     64   Disconnect();
     65 }
     66 
     67 void NonBlockingTypeProcessor::Disconnect() {
     68   DCHECK(CalledOnValidThread());
     69   DVLOG(1) << "Asked to disconnect " << ModelTypeToString(type_);
     70   is_connected_ = false;
     71 
     72   if (sync_core_proxy_) {
     73     sync_core_proxy_->Disconnect(GetModelType());
     74     sync_core_proxy_.reset();
     75   }
     76 
     77   weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
     78   core_interface_.reset();
     79 }
     80 
     81 base::WeakPtr<NonBlockingTypeProcessor>
     82 NonBlockingTypeProcessor::AsWeakPtrForUI() {
     83   DCHECK(CalledOnValidThread());
     84   return weak_ptr_factory_for_ui_.GetWeakPtr();
     85 }
     86 
     87 void NonBlockingTypeProcessor::OnConnect(
     88     scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) {
     89   DCHECK(CalledOnValidThread());
     90   DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
     91 
     92   is_connected_ = true;
     93   core_interface_ = core_interface.Pass();
     94 
     95   FlushPendingCommitRequests();
     96 }
     97 
     98 void NonBlockingTypeProcessor::Put(const std::string& client_tag,
     99                                    const sync_pb::EntitySpecifics& specifics) {
    100   DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
    101 
    102   const std::string client_tag_hash(
    103       syncable::GenerateSyncableHash(type_, client_tag));
    104 
    105   EntityMap::iterator it = entities_.find(client_tag_hash);
    106   if (it == entities_.end()) {
    107     scoped_ptr<ModelThreadSyncEntity> entity(
    108         ModelThreadSyncEntity::NewLocalItem(
    109             client_tag, specifics, base::Time::Now()));
    110     entities_.insert(std::make_pair(client_tag_hash, entity.release()));
    111   } else {
    112     ModelThreadSyncEntity* entity = it->second;
    113     entity->MakeLocalChange(specifics);
    114   }
    115 
    116   FlushPendingCommitRequests();
    117 }
    118 
    119 void NonBlockingTypeProcessor::Delete(const std::string& client_tag) {
    120   const std::string client_tag_hash(
    121       syncable::GenerateSyncableHash(type_, client_tag));
    122 
    123   EntityMap::iterator it = entities_.find(client_tag_hash);
    124   if (it == entities_.end()) {
    125     // That's unusual, but not necessarily a bad thing.
    126     // Missing is as good as deleted as far as the model is concerned.
    127     DLOG(WARNING) << "Attempted to delete missing item."
    128                   << " client tag: " << client_tag;
    129   } else {
    130     ModelThreadSyncEntity* entity = it->second;
    131     entity->Delete();
    132   }
    133 
    134   FlushPendingCommitRequests();
    135 }
    136 
    137 void NonBlockingTypeProcessor::FlushPendingCommitRequests() {
    138   CommitRequestDataList commit_requests;
    139 
    140   // Don't bother sending anything if there's no one to send to.
    141   if (!IsConnected())
    142     return;
    143 
    144   // Don't send anything if the type is not ready to handle commits.
    145   if (!data_type_state_.initial_sync_done)
    146     return;
    147 
    148   // TODO(rlarocque): Do something smarter than iterate here.
    149   for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
    150        ++it) {
    151     if (it->second->RequiresCommitRequest()) {
    152       CommitRequestData request;
    153       it->second->InitializeCommitRequestData(&request);
    154       commit_requests.push_back(request);
    155       it->second->SetCommitRequestInProgress();
    156     }
    157   }
    158 
    159   if (!commit_requests.empty())
    160     core_interface_->RequestCommits(commit_requests);
    161 }
    162 
    163 void NonBlockingTypeProcessor::OnCommitCompletion(
    164     const DataTypeState& type_state,
    165     const CommitResponseDataList& response_list) {
    166   data_type_state_ = type_state;
    167 
    168   for (CommitResponseDataList::const_iterator list_it = response_list.begin();
    169        list_it != response_list.end();
    170        ++list_it) {
    171     const CommitResponseData& response_data = *list_it;
    172     const std::string& client_tag_hash = response_data.client_tag_hash;
    173 
    174     EntityMap::iterator it = entities_.find(client_tag_hash);
    175     if (it == entities_.end()) {
    176       NOTREACHED() << "Received commit response for missing item."
    177                    << " type: " << type_ << " client_tag: " << client_tag_hash;
    178       return;
    179     } else {
    180       it->second->ReceiveCommitResponse(response_data.id,
    181                                         response_data.sequence_number,
    182                                         response_data.response_version);
    183     }
    184   }
    185 }
    186 
    187 void NonBlockingTypeProcessor::OnUpdateReceived(
    188     const DataTypeState& data_type_state,
    189     const UpdateResponseDataList& response_list) {
    190   bool initial_sync_just_finished =
    191       !data_type_state_.initial_sync_done && data_type_state.initial_sync_done;
    192 
    193   data_type_state_ = data_type_state;
    194 
    195   for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
    196        list_it != response_list.end();
    197        ++list_it) {
    198     const UpdateResponseData& response_data = *list_it;
    199     const std::string& client_tag_hash = response_data.client_tag_hash;
    200 
    201     EntityMap::iterator it = entities_.find(client_tag_hash);
    202     if (it == entities_.end()) {
    203       scoped_ptr<ModelThreadSyncEntity> entity =
    204           ModelThreadSyncEntity::FromServerUpdate(
    205               response_data.id,
    206               response_data.client_tag_hash,
    207               response_data.non_unique_name,
    208               response_data.response_version,
    209               response_data.specifics,
    210               response_data.deleted,
    211               response_data.ctime,
    212               response_data.mtime);
    213       entities_.insert(std::make_pair(client_tag_hash, entity.release()));
    214     } else {
    215       ModelThreadSyncEntity* entity = it->second;
    216       entity->ApplyUpdateFromServer(response_data.response_version,
    217                                     response_data.deleted,
    218                                     response_data.specifics,
    219                                     response_data.mtime);
    220       // TODO: Do something special when conflicts are detected.
    221     }
    222   }
    223 
    224   if (initial_sync_just_finished)
    225     FlushPendingCommitRequests();
    226 
    227   // TODO: Inform the model of the new or updated data.
    228 }
    229 
    230 }  // namespace syncer
    231