Home | History | Annotate | Download | only in engine
      1 // Copyright 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/engine/process_updates_command.h"
      6 
      7 #include <vector>
      8 
      9 #include "base/basictypes.h"
     10 #include "base/location.h"
     11 #include "sync/engine/syncer.h"
     12 #include "sync/engine/syncer_proto_util.h"
     13 #include "sync/engine/syncer_util.h"
     14 #include "sync/sessions/sync_session.h"
     15 #include "sync/syncable/directory.h"
     16 #include "sync/syncable/mutable_entry.h"
     17 #include "sync/syncable/syncable_proto_util.h"
     18 #include "sync/syncable/syncable_util.h"
     19 #include "sync/syncable/syncable_write_transaction.h"
     20 #include "sync/util/cryptographer.h"
     21 
     22 using std::vector;
     23 
     24 namespace syncer {
     25 
     26 using sessions::SyncSession;
     27 using sessions::StatusController;
     28 
     29 using syncable::GET_BY_ID;
     30 
     31 ProcessUpdatesCommand::ProcessUpdatesCommand() {}
     32 ProcessUpdatesCommand::~ProcessUpdatesCommand() {}
     33 
     34 std::set<ModelSafeGroup> ProcessUpdatesCommand::GetGroupsToChange(
     35     const sessions::SyncSession& session) const {
     36   std::set<ModelSafeGroup> groups_with_updates;
     37 
     38   const sync_pb::GetUpdatesResponse& updates =
     39       session.status_controller().updates_response().get_updates();
     40   for (int i = 0; i < updates.entries().size(); i++) {
     41     groups_with_updates.insert(
     42         GetGroupForModelType(GetModelType(updates.entries(i)),
     43                              session.context()->routing_info()));
     44   }
     45 
     46   return groups_with_updates;
     47 }
     48 
     49 namespace {
     50 
     51 // This function attempts to determine whether or not this update is genuinely
     52 // new, or if it is a reflection of one of our own commits.
     53 //
     54 // There is a known inaccuracy in its implementation.  If this update ends up
     55 // being applied to a local item with a different ID, we will count the change
     56 // as being a non-reflection update.  Fortunately, the server usually updates
     57 // our IDs correctly in its commit response, so a new ID during GetUpdate should
     58 // be rare.
     59 //
     60 // The only secnarios I can think of where this might happen are:
     61 // - We commit a  new item to the server, but we don't persist the
     62 // server-returned new ID to the database before we shut down.  On the GetUpdate
     63 // following the next restart, we will receive an update from the server that
     64 // updates its local ID.
     65 // - When two attempts to create an item with identical UNIQUE_CLIENT_TAG values
     66 // collide at the server.  I have seen this in testing.  When it happens, the
     67 // test server will send one of the clients a response to upate its local ID so
     68 // that both clients will refer to the item using the same ID going forward.  In
     69 // this case, we're right to assume that the update is not a reflection.
     70 //
     71 // For more information, see FindLocalIdToUpdate().
     72 bool UpdateContainsNewVersion(syncable::BaseTransaction *trans,
     73                               const sync_pb::SyncEntity &update) {
     74   int64 existing_version = -1; // The server always sends positive versions.
     75   syncable::Entry existing_entry(trans, GET_BY_ID,
     76                                  SyncableIdFromProto(update.id_string()));
     77   if (existing_entry.good())
     78     existing_version = existing_entry.Get(syncable::BASE_VERSION);
     79 
     80   if (!existing_entry.good() && update.deleted()) {
     81     // There are several possible explanations for this.  The most common cases
     82     // will be first time sync and the redelivery of deletions we've already
     83     // synced, accepted, and purged from our database.  In either case, the
     84     // update is useless to us.  Let's count them all as "not new", even though
     85     // that may not always be entirely accurate.
     86     return false;
     87   }
     88 
     89   if (existing_entry.good() &&
     90       !existing_entry.Get(syncable::UNIQUE_CLIENT_TAG).empty() &&
     91       existing_entry.Get(syncable::IS_DEL) &&
     92       update.deleted()) {
     93     // Unique client tags will have their version set to zero when they're
     94     // deleted.  The usual version comparison logic won't be able to detect
     95     // reflections of these items.  Instead, we assume any received tombstones
     96     // are reflections.  That should be correct most of the time.
     97     return false;
     98   }
     99 
    100   return existing_version < update.version();
    101 }
    102 
    103 }  // namespace
    104 
    105 SyncerError ProcessUpdatesCommand::ModelChangingExecuteImpl(
    106     SyncSession* session) {
    107   syncable::Directory* dir = session->context()->directory();
    108 
    109   syncable::WriteTransaction trans(FROM_HERE, syncable::SYNCER, dir);
    110 
    111   sessions::StatusController* status = session->mutable_status_controller();
    112   const sync_pb::GetUpdatesResponse& updates =
    113       status->updates_response().get_updates();
    114   int update_count = updates.entries().size();
    115 
    116   ModelTypeSet requested_types = GetRoutingInfoTypes(
    117       session->context()->routing_info());
    118 
    119   DVLOG(1) << update_count << " entries to verify";
    120   for (int i = 0; i < update_count; i++) {
    121     const sync_pb::SyncEntity& update = updates.entries(i);
    122 
    123     // The current function gets executed on several different threads, but
    124     // every call iterates over the same list of items that the server returned
    125     // to us.  We're not allowed to process items unless we're on the right
    126     // thread for that type.  This check will ensure we only touch the items
    127     // that live on our current thread.
    128     // TODO(tim): Don't allow access to objects in other ModelSafeGroups.
    129     // See crbug.com/121521 .
    130     ModelSafeGroup g = GetGroupForModelType(GetModelType(update),
    131                                             session->context()->routing_info());
    132     if (g != status->group_restriction())
    133       continue;
    134 
    135     VerifyResult verify_result = VerifyUpdate(
    136         &trans, update, requested_types, session->context()->routing_info());
    137     status->increment_num_updates_downloaded_by(1);
    138     if (!UpdateContainsNewVersion(&trans, update))
    139       status->increment_num_reflected_updates_downloaded_by(1);
    140     if (update.deleted())
    141       status->increment_num_tombstone_updates_downloaded_by(1);
    142 
    143     if (verify_result != VERIFY_SUCCESS && verify_result != VERIFY_UNDELETE)
    144       continue;
    145 
    146     ServerUpdateProcessingResult process_result =
    147        ProcessUpdate(update, dir->GetCryptographer(&trans), &trans);
    148 
    149     DCHECK(process_result == SUCCESS_PROCESSED ||
    150            process_result == SUCCESS_STORED);
    151   }
    152 
    153   return SYNCER_OK;
    154 }
    155 
    156 namespace {
    157 
    158 // In the event that IDs match, but tags differ AttemptReuniteClient tag
    159 // will have refused to unify the update.
    160 // We should not attempt to apply it at all since it violates consistency
    161 // rules.
    162 VerifyResult VerifyTagConsistency(const sync_pb::SyncEntity& entry,
    163                                   const syncable::MutableEntry& same_id) {
    164   if (entry.has_client_defined_unique_tag() &&
    165       entry.client_defined_unique_tag() !=
    166           same_id.Get(syncable::UNIQUE_CLIENT_TAG)) {
    167     return VERIFY_FAIL;
    168   }
    169   return VERIFY_UNDECIDED;
    170 }
    171 
    172 }  // namespace
    173 
    174 VerifyResult ProcessUpdatesCommand::VerifyUpdate(
    175     syncable::WriteTransaction* trans, const sync_pb::SyncEntity& entry,
    176     ModelTypeSet requested_types,
    177     const ModelSafeRoutingInfo& routes) {
    178   syncable::Id id = SyncableIdFromProto(entry.id_string());
    179   VerifyResult result = VERIFY_FAIL;
    180 
    181   const bool deleted = entry.has_deleted() && entry.deleted();
    182   const bool is_directory = IsFolder(entry);
    183   const ModelType model_type = GetModelType(entry);
    184 
    185   if (!id.ServerKnows()) {
    186     LOG(ERROR) << "Illegal negative id in received updates";
    187     return result;
    188   }
    189   {
    190     const std::string name = SyncerProtoUtil::NameFromSyncEntity(entry);
    191     if (name.empty() && !deleted) {
    192       LOG(ERROR) << "Zero length name in non-deleted update";
    193       return result;
    194     }
    195   }
    196 
    197   syncable::MutableEntry same_id(trans, GET_BY_ID, id);
    198   result = VerifyNewEntry(entry, &same_id, deleted);
    199 
    200   ModelType placement_type = !deleted ? GetModelType(entry)
    201       : same_id.good() ? same_id.GetModelType() : UNSPECIFIED;
    202 
    203   if (VERIFY_UNDECIDED == result) {
    204     result = VerifyTagConsistency(entry, same_id);
    205   }
    206 
    207   if (VERIFY_UNDECIDED == result) {
    208     if (deleted) {
    209       // For deletes the server could send tombostones for items that
    210       // the client did not request. If so ignore those items.
    211       if (IsRealDataType(placement_type) &&
    212           !requested_types.Has(placement_type)) {
    213         result = VERIFY_SKIP;
    214       } else {
    215         result = VERIFY_SUCCESS;
    216       }
    217     }
    218   }
    219 
    220   // If we have an existing entry, we check here for updates that break
    221   // consistency rules.
    222   if (VERIFY_UNDECIDED == result) {
    223     result = VerifyUpdateConsistency(trans, entry, &same_id,
    224                                      deleted, is_directory, model_type);
    225   }
    226 
    227   if (VERIFY_UNDECIDED == result)
    228     result = VERIFY_SUCCESS;  // No news is good news.
    229 
    230   return result;  // This might be VERIFY_SUCCESS as well
    231 }
    232 
    233 namespace {
    234 // Returns true if the entry is still ok to process.
    235 bool ReverifyEntry(syncable::WriteTransaction* trans,
    236                    const sync_pb::SyncEntity& entry,
    237                    syncable::MutableEntry* same_id) {
    238 
    239   const bool deleted = entry.has_deleted() && entry.deleted();
    240   const bool is_directory = IsFolder(entry);
    241   const ModelType model_type = GetModelType(entry);
    242 
    243   return VERIFY_SUCCESS == VerifyUpdateConsistency(trans,
    244                                                    entry,
    245                                                    same_id,
    246                                                    deleted,
    247                                                    is_directory,
    248                                                    model_type);
    249 }
    250 }  // namespace
    251 
    252 // Process a single update. Will avoid touching global state.
    253 ServerUpdateProcessingResult ProcessUpdatesCommand::ProcessUpdate(
    254     const sync_pb::SyncEntity& update,
    255     const Cryptographer* cryptographer,
    256     syncable::WriteTransaction* const trans) {
    257   const syncable::Id& server_id = SyncableIdFromProto(update.id_string());
    258   const std::string name = SyncerProtoUtil::NameFromSyncEntity(update);
    259 
    260   // Look to see if there's a local item that should recieve this update,
    261   // maybe due to a duplicate client tag or a lost commit response.
    262   syncable::Id local_id = FindLocalIdToUpdate(trans, update);
    263 
    264   // FindLocalEntryToUpdate has veto power.
    265   if (local_id.IsNull()) {
    266     return SUCCESS_PROCESSED;  // The entry has become irrelevant.
    267   }
    268 
    269   CreateNewEntry(trans, local_id);
    270 
    271   // We take a two step approach. First we store the entries data in the
    272   // server fields of a local entry and then move the data to the local fields
    273   syncable::MutableEntry target_entry(trans, GET_BY_ID, local_id);
    274 
    275   // We need to run the Verify checks again; the world could have changed
    276   // since we last verified.
    277   if (!ReverifyEntry(trans, update, &target_entry)) {
    278     return SUCCESS_PROCESSED;  // The entry has become irrelevant.
    279   }
    280 
    281   // If we're repurposing an existing local entry with a new server ID,
    282   // change the ID now, after we're sure that the update can succeed.
    283   if (local_id != server_id) {
    284     DCHECK(!update.deleted());
    285     ChangeEntryIDAndUpdateChildren(trans, &target_entry, server_id);
    286     // When IDs change, versions become irrelevant.  Forcing BASE_VERSION
    287     // to zero would ensure that this update gets applied, but would indicate
    288     // creation or undeletion if it were committed that way.  Instead, prefer
    289     // forcing BASE_VERSION to entry.version() while also forcing
    290     // IS_UNAPPLIED_UPDATE to true.  If the item is UNSYNCED, it's committable
    291     // from the new state; it may commit before the conflict resolver gets
    292     // a crack at it.
    293     if (target_entry.Get(syncable::IS_UNSYNCED) ||
    294         target_entry.Get(syncable::BASE_VERSION) > 0) {
    295       // If either of these conditions are met, then we can expect valid client
    296       // fields for this entry.  When BASE_VERSION is positive, consistency is
    297       // enforced on the client fields at update-application time.  Otherwise,
    298       // we leave the BASE_VERSION field alone; it'll get updated the first time
    299       // we successfully apply this update.
    300       target_entry.Put(syncable::BASE_VERSION, update.version());
    301     }
    302     // Force application of this update, no matter what.
    303     target_entry.Put(syncable::IS_UNAPPLIED_UPDATE, true);
    304   }
    305 
    306   // If this is a newly received undecryptable update, and the only thing that
    307   // has changed are the specifics, store the original decryptable specifics,
    308   // (on which any current or future local changes are based) before we
    309   // overwrite SERVER_SPECIFICS.
    310   // MTIME, CTIME, and NON_UNIQUE_NAME are not enforced.
    311 
    312   bool position_matches = false;
    313   if (target_entry.ShouldMaintainPosition() && !update.deleted()) {
    314     std::string update_tag = GetUniqueBookmarkTagFromUpdate(update);
    315     if (UniquePosition::IsValidSuffix(update_tag)) {
    316       position_matches = GetUpdatePosition(update, update_tag).Equals(
    317           target_entry.Get(syncable::SERVER_UNIQUE_POSITION));
    318     } else {
    319       NOTREACHED();
    320     }
    321   } else {
    322     // If this item doesn't care about positions, then set this flag to true.
    323     position_matches = true;
    324   }
    325 
    326   if (!update.deleted() && !target_entry.Get(syncable::SERVER_IS_DEL) &&
    327       (SyncableIdFromProto(update.parent_id_string()) ==
    328           target_entry.Get(syncable::SERVER_PARENT_ID)) &&
    329       position_matches &&
    330       update.has_specifics() && update.specifics().has_encrypted() &&
    331       !cryptographer->CanDecrypt(update.specifics().encrypted())) {
    332     sync_pb::EntitySpecifics prev_specifics =
    333         target_entry.Get(syncable::SERVER_SPECIFICS);
    334     // We only store the old specifics if they were decryptable and applied and
    335     // there is no BASE_SERVER_SPECIFICS already. Else do nothing.
    336     if (!target_entry.Get(syncable::IS_UNAPPLIED_UPDATE) &&
    337         !IsRealDataType(GetModelTypeFromSpecifics(
    338             target_entry.Get(syncable::BASE_SERVER_SPECIFICS))) &&
    339         (!prev_specifics.has_encrypted() ||
    340          cryptographer->CanDecrypt(prev_specifics.encrypted()))) {
    341       DVLOG(2) << "Storing previous server specifcs: "
    342                << prev_specifics.SerializeAsString();
    343       target_entry.Put(syncable::BASE_SERVER_SPECIFICS, prev_specifics);
    344     }
    345   } else if (IsRealDataType(GetModelTypeFromSpecifics(
    346                  target_entry.Get(syncable::BASE_SERVER_SPECIFICS)))) {
    347     // We have a BASE_SERVER_SPECIFICS, but a subsequent non-specifics-only
    348     // change arrived. As a result, we can't use the specifics alone to detect
    349     // changes, so we clear BASE_SERVER_SPECIFICS.
    350     target_entry.Put(syncable::BASE_SERVER_SPECIFICS,
    351                      sync_pb::EntitySpecifics());
    352   }
    353 
    354   UpdateServerFieldsFromUpdate(&target_entry, update, name);
    355 
    356   return SUCCESS_PROCESSED;
    357 }
    358 
    359 }  // namespace syncer
    360