Home | History | Annotate | Download | only in engine
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/engine/get_updates_processor.h"
      6 
      7 #include <map>
      8 
      9 #include "base/debug/trace_event.h"
     10 #include "sync/engine/get_updates_delegate.h"
     11 #include "sync/engine/syncer_proto_util.h"
     12 #include "sync/engine/update_handler.h"
     13 #include "sync/internal_api/public/events/get_updates_response_event.h"
     14 #include "sync/protocol/sync.pb.h"
     15 #include "sync/sessions/status_controller.h"
     16 #include "sync/sessions/sync_session.h"
     17 #include "sync/syncable/directory.h"
     18 #include "sync/syncable/nigori_handler.h"
     19 #include "sync/syncable/syncable_read_transaction.h"
     20 
     21 typedef std::vector<const sync_pb::SyncEntity*> SyncEntityList;
     22 typedef std::map<syncer::ModelType, SyncEntityList> TypeSyncEntityMap;
     23 
     24 namespace syncer {
     25 
     26 typedef std::map<ModelType, size_t> TypeToIndexMap;
     27 
     28 namespace {
     29 
     30 bool ShouldRequestEncryptionKey(sessions::SyncSessionContext* context) {
     31   syncable::Directory* dir = context->directory();
     32   syncable::ReadTransaction trans(FROM_HERE, dir);
     33   syncable::NigoriHandler* nigori_handler = dir->GetNigoriHandler();
     34   return nigori_handler->NeedKeystoreKey(&trans);
     35 }
     36 
     37 
     38 SyncerError HandleGetEncryptionKeyResponse(
     39     const sync_pb::ClientToServerResponse& update_response,
     40     syncable::Directory* dir) {
     41   bool success = false;
     42   if (update_response.get_updates().encryption_keys_size() == 0) {
     43     LOG(ERROR) << "Failed to receive encryption key from server.";
     44     return SERVER_RESPONSE_VALIDATION_FAILED;
     45   }
     46   syncable::ReadTransaction trans(FROM_HERE, dir);
     47   syncable::NigoriHandler* nigori_handler = dir->GetNigoriHandler();
     48   success = nigori_handler->SetKeystoreKeys(
     49       update_response.get_updates().encryption_keys(),
     50       &trans);
     51 
     52   DVLOG(1) << "GetUpdates returned "
     53            << update_response.get_updates().encryption_keys_size()
     54            << "encryption keys. Nigori keystore key "
     55            << (success ? "" : "not ") << "updated.";
     56   return (success ? SYNCER_OK : SERVER_RESPONSE_VALIDATION_FAILED);
     57 }
     58 
     59 // Given a GetUpdates response, iterates over all the returned items and
     60 // divides them according to their type.  Outputs a map from model types to
     61 // received SyncEntities.  The output map will have entries (possibly empty)
     62 // for all types in |requested_types|.
     63 void PartitionUpdatesByType(const sync_pb::GetUpdatesResponse& gu_response,
     64                             ModelTypeSet requested_types,
     65                             TypeSyncEntityMap* updates_by_type) {
     66   int update_count = gu_response.entries().size();
     67   for (ModelTypeSet::Iterator it = requested_types.First();
     68        it.Good(); it.Inc()) {
     69     updates_by_type->insert(std::make_pair(it.Get(), SyncEntityList()));
     70   }
     71   for (int i = 0; i < update_count; ++i) {
     72     const sync_pb::SyncEntity& update = gu_response.entries(i);
     73     ModelType type = GetModelType(update);
     74     if (!IsRealDataType(type)) {
     75       NOTREACHED() << "Received update with invalid type.";
     76       continue;
     77     }
     78 
     79     TypeSyncEntityMap::iterator it = updates_by_type->find(type);
     80     if (it == updates_by_type->end()) {
     81       NOTREACHED() << "Received update for unexpected type "
     82                    << ModelTypeToString(type);
     83       continue;
     84     }
     85 
     86     it->second.push_back(&update);
     87   }
     88 }
     89 
     90 // Builds a map of ModelTypes to indices to progress markers in the given
     91 // |gu_response| message.  The map is returned in the |index_map| parameter.
     92 void PartitionProgressMarkersByType(
     93     const sync_pb::GetUpdatesResponse& gu_response,
     94     ModelTypeSet request_types,
     95     TypeToIndexMap* index_map) {
     96   for (int i = 0; i < gu_response.new_progress_marker_size(); ++i) {
     97     int field_number = gu_response.new_progress_marker(i).data_type_id();
     98     ModelType model_type = GetModelTypeFromSpecificsFieldNumber(field_number);
     99     if (!IsRealDataType(model_type)) {
    100       DLOG(WARNING) << "Unknown field number " << field_number;
    101       continue;
    102     }
    103     if (!request_types.Has(model_type)) {
    104       DLOG(WARNING)
    105           << "Skipping unexpected progress marker for non-enabled type "
    106           << ModelTypeToString(model_type);
    107       continue;
    108     }
    109     index_map->insert(std::make_pair(model_type, i));
    110   }
    111 }
    112 
    113 void PartitionContextMutationsByType(
    114     const sync_pb::GetUpdatesResponse& gu_response,
    115     ModelTypeSet request_types,
    116     TypeToIndexMap* index_map) {
    117   for (int i = 0; i < gu_response.context_mutations_size(); ++i) {
    118     int field_number = gu_response.context_mutations(i).data_type_id();
    119     ModelType model_type = GetModelTypeFromSpecificsFieldNumber(field_number);
    120     if (!IsRealDataType(model_type)) {
    121       DLOG(WARNING) << "Unknown field number " << field_number;
    122       continue;
    123     }
    124     if (!request_types.Has(model_type)) {
    125       DLOG(WARNING)
    126           << "Skipping unexpected context mutation for non-enabled type "
    127           << ModelTypeToString(model_type);
    128       continue;
    129     }
    130     index_map->insert(std::make_pair(model_type, i));
    131   }
    132 }
    133 
    134 // Initializes the parts of the GetUpdatesMessage that depend on shared state,
    135 // like the ShouldRequestEncryptionKey() status.  This is kept separate from the
    136 // other of the message-building functions to make the rest of the code easier
    137 // to test.
    138 void InitDownloadUpdatesContext(
    139     sessions::SyncSession* session,
    140     bool create_mobile_bookmarks_folder,
    141     sync_pb::ClientToServerMessage* message) {
    142   message->set_share(session->context()->account_name());
    143   message->set_message_contents(sync_pb::ClientToServerMessage::GET_UPDATES);
    144 
    145   sync_pb::GetUpdatesMessage* get_updates = message->mutable_get_updates();
    146 
    147   // We want folders for our associated types, always.  If we were to set
    148   // this to false, the server would send just the non-container items
    149   // (e.g. Bookmark URLs but not their containing folders).
    150   get_updates->set_fetch_folders(true);
    151 
    152   get_updates->set_create_mobile_bookmarks_folder(
    153       create_mobile_bookmarks_folder);
    154   bool need_encryption_key = ShouldRequestEncryptionKey(session->context());
    155   get_updates->set_need_encryption_key(need_encryption_key);
    156 
    157   // Set legacy GetUpdatesMessage.GetUpdatesCallerInfo information.
    158   get_updates->mutable_caller_info()->set_notifications_enabled(
    159       session->context()->notifications_enabled());
    160 }
    161 
    162 }  // namespace
    163 
    164 GetUpdatesProcessor::GetUpdatesProcessor(UpdateHandlerMap* update_handler_map,
    165                                          const GetUpdatesDelegate& delegate)
    166     : update_handler_map_(update_handler_map), delegate_(delegate) {}
    167 
    168 GetUpdatesProcessor::~GetUpdatesProcessor() {}
    169 
    170 SyncerError GetUpdatesProcessor::DownloadUpdates(
    171     ModelTypeSet request_types,
    172     sessions::SyncSession* session,
    173     bool create_mobile_bookmarks_folder) {
    174   TRACE_EVENT0("sync", "DownloadUpdates");
    175 
    176   sync_pb::ClientToServerMessage message;
    177   InitDownloadUpdatesContext(session,
    178                              create_mobile_bookmarks_folder,
    179                              &message);
    180   PrepareGetUpdates(request_types, &message);
    181 
    182   SyncerError result = ExecuteDownloadUpdates(request_types, session, &message);
    183   session->mutable_status_controller()->set_last_download_updates_result(
    184       result);
    185   return result;
    186 }
    187 
    188 void GetUpdatesProcessor::PrepareGetUpdates(
    189     ModelTypeSet gu_types,
    190     sync_pb::ClientToServerMessage* message) {
    191   sync_pb::GetUpdatesMessage* get_updates = message->mutable_get_updates();
    192 
    193   for (ModelTypeSet::Iterator it = gu_types.First(); it.Good(); it.Inc()) {
    194     UpdateHandlerMap::iterator handler_it = update_handler_map_->find(it.Get());
    195     DCHECK(handler_it != update_handler_map_->end())
    196         << "Failed to look up handler for " << ModelTypeToString(it.Get());
    197     sync_pb::DataTypeProgressMarker* progress_marker =
    198         get_updates->add_from_progress_marker();
    199     handler_it->second->GetDownloadProgress(progress_marker);
    200     progress_marker->clear_gc_directive();
    201 
    202     sync_pb::DataTypeContext context;
    203     handler_it->second->GetDataTypeContext(&context);
    204     if (!context.context().empty())
    205       get_updates->add_client_contexts()->Swap(&context);
    206   }
    207 
    208   delegate_.HelpPopulateGuMessage(get_updates);
    209 }
    210 
    211 SyncerError GetUpdatesProcessor::ExecuteDownloadUpdates(
    212     ModelTypeSet request_types,
    213     sessions::SyncSession* session,
    214     sync_pb::ClientToServerMessage* msg) {
    215   sync_pb::ClientToServerResponse update_response;
    216   sessions::StatusController* status = session->mutable_status_controller();
    217   bool need_encryption_key = ShouldRequestEncryptionKey(session->context());
    218 
    219   if (session->context()->debug_info_getter()) {
    220     sync_pb::DebugInfo* debug_info = msg->mutable_debug_info();
    221     CopyClientDebugInfo(session->context()->debug_info_getter(), debug_info);
    222   }
    223 
    224   session->SendProtocolEvent(
    225       *(delegate_.GetNetworkRequestEvent(base::Time::Now(), *msg)));
    226 
    227   SyncerError result = SyncerProtoUtil::PostClientToServerMessage(
    228       msg,
    229       &update_response,
    230       session);
    231 
    232   DVLOG(2) << SyncerProtoUtil::ClientToServerResponseDebugString(
    233       update_response);
    234 
    235   if (result != SYNCER_OK) {
    236     GetUpdatesResponseEvent response_event(
    237         base::Time::Now(), update_response, result);
    238     session->SendProtocolEvent(response_event);
    239 
    240     LOG(ERROR) << "PostClientToServerMessage() failed during GetUpdates";
    241     return result;
    242   }
    243 
    244   DVLOG(1) << "GetUpdates returned "
    245            << update_response.get_updates().entries_size()
    246            << " updates.";
    247 
    248 
    249   if (session->context()->debug_info_getter()) {
    250     // Clear debug info now that we have successfully sent it to the server.
    251     DVLOG(1) << "Clearing client debug info.";
    252     session->context()->debug_info_getter()->ClearDebugInfo();
    253   }
    254 
    255   if (need_encryption_key ||
    256       update_response.get_updates().encryption_keys_size() > 0) {
    257     syncable::Directory* dir = session->context()->directory();
    258     status->set_last_get_key_result(
    259         HandleGetEncryptionKeyResponse(update_response, dir));
    260   }
    261 
    262   SyncerError process_result = ProcessResponse(update_response.get_updates(),
    263                                               request_types,
    264                                               status);
    265 
    266   GetUpdatesResponseEvent response_event(
    267       base::Time::Now(), update_response, process_result);
    268   session->SendProtocolEvent(response_event);
    269 
    270   DVLOG(1) << "GetUpdates result: " << process_result;
    271 
    272   return process_result;
    273 }
    274 
    275 SyncerError GetUpdatesProcessor::ProcessResponse(
    276     const sync_pb::GetUpdatesResponse& gu_response,
    277     ModelTypeSet request_types,
    278     sessions::StatusController* status) {
    279   status->increment_num_updates_downloaded_by(gu_response.entries_size());
    280 
    281   // The changes remaining field is used to prevent the client from looping.  If
    282   // that field is being set incorrectly, we're in big trouble.
    283   if (!gu_response.has_changes_remaining()) {
    284     return SERVER_RESPONSE_VALIDATION_FAILED;
    285   }
    286 
    287   syncer::SyncerError result =
    288       ProcessGetUpdatesResponse(request_types, gu_response, status);
    289   if (result != syncer::SYNCER_OK)
    290     return result;
    291 
    292   if (gu_response.changes_remaining() == 0) {
    293     return SYNCER_OK;
    294   } else {
    295     return SERVER_MORE_TO_DOWNLOAD;
    296   }
    297 }
    298 
    299 syncer::SyncerError GetUpdatesProcessor::ProcessGetUpdatesResponse(
    300     ModelTypeSet gu_types,
    301     const sync_pb::GetUpdatesResponse& gu_response,
    302     sessions::StatusController* status_controller) {
    303   TypeSyncEntityMap updates_by_type;
    304   PartitionUpdatesByType(gu_response, gu_types, &updates_by_type);
    305   DCHECK_EQ(gu_types.Size(), updates_by_type.size());
    306 
    307   TypeToIndexMap progress_index_by_type;
    308   PartitionProgressMarkersByType(gu_response,
    309                                  gu_types,
    310                                  &progress_index_by_type);
    311   if (gu_types.Size() != progress_index_by_type.size()) {
    312     NOTREACHED() << "Missing progress markers in GetUpdates response.";
    313     return syncer::SERVER_RESPONSE_VALIDATION_FAILED;
    314   }
    315 
    316   TypeToIndexMap context_by_type;
    317   PartitionContextMutationsByType(gu_response, gu_types, &context_by_type);
    318 
    319   // Iterate over these maps in parallel, processing updates for each type.
    320   TypeToIndexMap::iterator progress_marker_iter =
    321       progress_index_by_type.begin();
    322   TypeSyncEntityMap::iterator updates_iter = updates_by_type.begin();
    323   for (; (progress_marker_iter != progress_index_by_type.end()
    324            && updates_iter != updates_by_type.end());
    325        ++progress_marker_iter, ++updates_iter) {
    326     DCHECK_EQ(progress_marker_iter->first, updates_iter->first);
    327     ModelType type = progress_marker_iter->first;
    328 
    329     UpdateHandlerMap::iterator update_handler_iter =
    330         update_handler_map_->find(type);
    331 
    332     sync_pb::DataTypeContext context;
    333     TypeToIndexMap::iterator context_iter = context_by_type.find(type);
    334     if (context_iter != context_by_type.end())
    335       context.CopyFrom(gu_response.context_mutations(context_iter->second));
    336 
    337     if (update_handler_iter != update_handler_map_->end()) {
    338       syncer::SyncerError result =
    339           update_handler_iter->second->ProcessGetUpdatesResponse(
    340               gu_response.new_progress_marker(progress_marker_iter->second),
    341               context,
    342               updates_iter->second,
    343               status_controller);
    344       if (result != syncer::SYNCER_OK)
    345         return result;
    346     } else {
    347       DLOG(WARNING)
    348           << "Ignoring received updates of a type we can't handle.  "
    349           << "Type is: " << ModelTypeToString(type);
    350       continue;
    351     }
    352   }
    353   DCHECK(progress_marker_iter == progress_index_by_type.end() &&
    354          updates_iter == updates_by_type.end());
    355 
    356   return syncer::SYNCER_OK;
    357 }
    358 
    359 void GetUpdatesProcessor::ApplyUpdates(
    360     ModelTypeSet gu_types,
    361     sessions::StatusController* status_controller) {
    362   delegate_.ApplyUpdates(gu_types, status_controller, update_handler_map_);
    363 }
    364 
    365 void GetUpdatesProcessor::CopyClientDebugInfo(
    366     sessions::DebugInfoGetter* debug_info_getter,
    367     sync_pb::DebugInfo* debug_info) {
    368   DVLOG(1) << "Copying client debug info to send.";
    369   debug_info_getter->GetDebugInfo(debug_info);
    370 }
    371 
    372 }  // namespace syncer
    373