Home | History | Annotate | Download | only in engine
      1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/sync/engine/syncer.h"
      6 
      7 #include "base/message_loop.h"
      8 #include "base/time.h"
      9 #include "chrome/browser/sync/engine/apply_updates_command.h"
     10 #include "chrome/browser/sync/engine/build_and_process_conflict_sets_command.h"
     11 #include "chrome/browser/sync/engine/build_commit_command.h"
     12 #include "chrome/browser/sync/engine/cleanup_disabled_types_command.h"
     13 #include "chrome/browser/sync/engine/clear_data_command.h"
     14 #include "chrome/browser/sync/engine/conflict_resolver.h"
     15 #include "chrome/browser/sync/engine/download_updates_command.h"
     16 #include "chrome/browser/sync/engine/get_commit_ids_command.h"
     17 #include "chrome/browser/sync/engine/net/server_connection_manager.h"
     18 #include "chrome/browser/sync/engine/post_commit_message_command.h"
     19 #include "chrome/browser/sync/engine/process_commit_response_command.h"
     20 #include "chrome/browser/sync/engine/process_updates_command.h"
     21 #include "chrome/browser/sync/engine/resolve_conflicts_command.h"
     22 #include "chrome/browser/sync/engine/store_timestamps_command.h"
     23 #include "chrome/browser/sync/engine/syncer_end_command.h"
     24 #include "chrome/browser/sync/engine/syncer_types.h"
     25 #include "chrome/browser/sync/engine/syncer_util.h"
     26 #include "chrome/browser/sync/engine/syncproto.h"
     27 #include "chrome/browser/sync/engine/verify_updates_command.h"
     28 #include "chrome/browser/sync/syncable/directory_manager.h"
     29 #include "chrome/browser/sync/syncable/syncable-inl.h"
     30 #include "chrome/browser/sync/syncable/syncable.h"
     31 
     32 using base::TimeDelta;
     33 using sync_pb::ClientCommand;
     34 using syncable::Blob;
     35 using syncable::IS_UNAPPLIED_UPDATE;
     36 using syncable::SERVER_CTIME;
     37 using syncable::SERVER_IS_DEL;
     38 using syncable::SERVER_IS_DIR;
     39 using syncable::SERVER_MTIME;
     40 using syncable::SERVER_NON_UNIQUE_NAME;
     41 using syncable::SERVER_PARENT_ID;
     42 using syncable::SERVER_POSITION_IN_PARENT;
     43 using syncable::SERVER_SPECIFICS;
     44 using syncable::SERVER_VERSION;
     45 using syncable::SYNCER;
     46 using syncable::ScopedDirLookup;
     47 using syncable::WriteTransaction;
     48 
     49 namespace browser_sync {
     50 
     51 using sessions::ScopedSessionContextConflictResolver;
     52 using sessions::StatusController;
     53 using sessions::SyncSession;
     54 using sessions::ConflictProgress;
     55 
     56 Syncer::Syncer()
     57     : early_exit_requested_(false),
     58       pre_conflict_resolution_closure_(NULL) {
     59 }
     60 
     61 Syncer::~Syncer() {}
     62 
     63 bool Syncer::ExitRequested() {
     64   base::AutoLock lock(early_exit_requested_lock_);
     65   return early_exit_requested_;
     66 }
     67 
     68 void Syncer::RequestEarlyExit() {
     69   base::AutoLock lock(early_exit_requested_lock_);
     70   early_exit_requested_ = true;
     71 }
     72 
     73 // TODO(tim): Deprecated.
     74 void Syncer::SyncShare(sessions::SyncSession* session) {
     75   ScopedDirLookup dir(session->context()->directory_manager(),
     76                       session->context()->account_name());
     77   // The directory must be good here.
     78   CHECK(dir.good());
     79 
     80   const sessions::SyncSourceInfo& source(session->source());
     81   if (sync_pb::GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA ==
     82       source.updates_source) {
     83     SyncShare(session, CLEAR_PRIVATE_DATA, SYNCER_END);
     84     return;
     85   } else {
     86     SyncShare(session, SYNCER_BEGIN, SYNCER_END);
     87   }
     88 }
     89 
     90 void Syncer::SyncShare(sessions::SyncSession* session,
     91                        const SyncerStep first_step,
     92                        const SyncerStep last_step) {
     93   ScopedDirLookup dir(session->context()->directory_manager(),
     94                       session->context()->account_name());
     95   // The directory must be good here.
     96   CHECK(dir.good());
     97 
     98   ScopedSessionContextConflictResolver scoped(session->context(),
     99                                               &resolver_);
    100   SyncerStep current_step = first_step;
    101 
    102   SyncerStep next_step = current_step;
    103   while (!ExitRequested()) {
    104     switch (current_step) {
    105       case SYNCER_BEGIN:
    106         VLOG(1) << "Syncer Begin";
    107         // This isn't perfect, as we can end up bundling extensions activity
    108         // intended for the next session into the current one.  We could do a
    109         // test-and-reset as with the source, but note that also falls short if
    110         // the commit request fails (e.g. due to lost connection), as we will
    111         // fall all the way back to the syncer thread main loop in that case,
    112         // creating a new session when a connection is established, losing the
    113         // records set here on the original attempt.  This should provide us
    114         // with the right data "most of the time", and we're only using this
    115         // for analysis purposes, so Law of Large Numbers FTW.
    116         session->context()->extensions_monitor()->GetAndClearRecords(
    117             session->mutable_extensions_activity());
    118         next_step = CLEANUP_DISABLED_TYPES;
    119         break;
    120       case CLEANUP_DISABLED_TYPES: {
    121         VLOG(1) << "Cleaning up disabled types";
    122         CleanupDisabledTypesCommand cleanup;
    123         cleanup.Execute(session);
    124         next_step = DOWNLOAD_UPDATES;
    125         break;
    126       }
    127       case DOWNLOAD_UPDATES: {
    128         VLOG(1) << "Downloading Updates";
    129         DownloadUpdatesCommand download_updates;
    130         download_updates.Execute(session);
    131         next_step = PROCESS_CLIENT_COMMAND;
    132         break;
    133       }
    134       case PROCESS_CLIENT_COMMAND: {
    135         VLOG(1) << "Processing Client Command";
    136         ProcessClientCommand(session);
    137         next_step = VERIFY_UPDATES;
    138         break;
    139       }
    140       case VERIFY_UPDATES: {
    141         VLOG(1) << "Verifying Updates";
    142         VerifyUpdatesCommand verify_updates;
    143         verify_updates.Execute(session);
    144         next_step = PROCESS_UPDATES;
    145         break;
    146       }
    147       case PROCESS_UPDATES: {
    148         VLOG(1) << "Processing Updates";
    149         ProcessUpdatesCommand process_updates;
    150         process_updates.Execute(session);
    151         next_step = STORE_TIMESTAMPS;
    152         break;
    153       }
    154       case STORE_TIMESTAMPS: {
    155         VLOG(1) << "Storing timestamps";
    156         StoreTimestampsCommand store_timestamps;
    157         store_timestamps.Execute(session);
    158         // We should download all of the updates before attempting to process
    159         // them.
    160         if (session->status_controller()->ServerSaysNothingMoreToDownload() ||
    161             !session->status_controller()->download_updates_succeeded()) {
    162           next_step = APPLY_UPDATES;
    163         } else {
    164           next_step = DOWNLOAD_UPDATES;
    165         }
    166         break;
    167       }
    168       case APPLY_UPDATES: {
    169         VLOG(1) << "Applying Updates";
    170         ApplyUpdatesCommand apply_updates;
    171         apply_updates.Execute(session);
    172         next_step = BUILD_COMMIT_REQUEST;
    173         break;
    174       }
    175       // These two steps are combined since they are executed within the same
    176       // write transaction.
    177       case BUILD_COMMIT_REQUEST: {
    178         session->status_controller()->set_syncing(true);
    179 
    180         VLOG(1) << "Processing Commit Request";
    181         ScopedDirLookup dir(session->context()->directory_manager(),
    182                             session->context()->account_name());
    183         if (!dir.good()) {
    184           LOG(ERROR) << "Scoped dir lookup failed!";
    185           return;
    186         }
    187         WriteTransaction trans(dir, SYNCER, __FILE__, __LINE__);
    188         sessions::ScopedSetSessionWriteTransaction set_trans(session, &trans);
    189 
    190         VLOG(1) << "Getting the Commit IDs";
    191         GetCommitIdsCommand get_commit_ids_command(
    192             session->context()->max_commit_batch_size());
    193         get_commit_ids_command.Execute(session);
    194 
    195         if (!session->status_controller()->commit_ids().empty()) {
    196           VLOG(1) << "Building a commit message";
    197           BuildCommitCommand build_commit_command;
    198           build_commit_command.Execute(session);
    199 
    200           next_step = POST_COMMIT_MESSAGE;
    201         } else {
    202           next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
    203         }
    204 
    205         break;
    206       }
    207       case POST_COMMIT_MESSAGE: {
    208         VLOG(1) << "Posting a commit request";
    209         PostCommitMessageCommand post_commit_command;
    210         post_commit_command.Execute(session);
    211         next_step = PROCESS_COMMIT_RESPONSE;
    212         break;
    213       }
    214       case PROCESS_COMMIT_RESPONSE: {
    215         VLOG(1) << "Processing the commit response";
    216         session->status_controller()->reset_num_conflicting_commits();
    217         ProcessCommitResponseCommand process_response_command;
    218         process_response_command.Execute(session);
    219         next_step = BUILD_AND_PROCESS_CONFLICT_SETS;
    220         break;
    221       }
    222       case BUILD_AND_PROCESS_CONFLICT_SETS: {
    223         VLOG(1) << "Building and Processing Conflict Sets";
    224         BuildAndProcessConflictSetsCommand build_process_conflict_sets;
    225         build_process_conflict_sets.Execute(session);
    226         if (session->status_controller()->conflict_sets_built())
    227           next_step = SYNCER_END;
    228         else
    229           next_step = RESOLVE_CONFLICTS;
    230         break;
    231       }
    232       case RESOLVE_CONFLICTS: {
    233         VLOG(1) << "Resolving Conflicts";
    234 
    235         // Trigger the pre_conflict_resolution_closure_, which is a testing
    236         // hook for the unit tests, if it is non-NULL.
    237         if (pre_conflict_resolution_closure_) {
    238           pre_conflict_resolution_closure_->Run();
    239         }
    240 
    241         StatusController* status = session->status_controller();
    242         status->reset_conflicts_resolved();
    243         ResolveConflictsCommand resolve_conflicts_command;
    244         resolve_conflicts_command.Execute(session);
    245         if (status->HasConflictingUpdates())
    246           next_step = APPLY_UPDATES_TO_RESOLVE_CONFLICTS;
    247         else
    248           next_step = SYNCER_END;
    249         break;
    250       }
    251       case APPLY_UPDATES_TO_RESOLVE_CONFLICTS: {
    252         StatusController* status = session->status_controller();
    253         VLOG(1) << "Applying updates to resolve conflicts";
    254         ApplyUpdatesCommand apply_updates;
    255         int before_conflicting_updates = status->TotalNumConflictingItems();
    256         apply_updates.Execute(session);
    257         int after_conflicting_updates = status->TotalNumConflictingItems();
    258         status->update_conflicts_resolved(before_conflicting_updates >
    259                                           after_conflicting_updates);
    260         if (status->conflicts_resolved())
    261           next_step = RESOLVE_CONFLICTS;
    262         else
    263           next_step = SYNCER_END;
    264         break;
    265       }
    266       case CLEAR_PRIVATE_DATA: {
    267         VLOG(1) << "Clear Private Data";
    268         ClearDataCommand clear_data_command;
    269         clear_data_command.Execute(session);
    270         next_step = SYNCER_END;
    271         break;
    272       }
    273       case SYNCER_END: {
    274         break;
    275       }
    276       default:
    277         LOG(ERROR) << "Unknown command: " << current_step;
    278     }
    279     if (last_step == current_step)
    280       break;
    281     current_step = next_step;
    282   }
    283 
    284   VLOG(1) << "Syncer End";
    285   SyncerEndCommand syncer_end_command;
    286   syncer_end_command.Execute(session);
    287   return;
    288 }
    289 
    290 void Syncer::ProcessClientCommand(sessions::SyncSession* session) {
    291   const ClientToServerResponse& response =
    292       session->status_controller()->updates_response();
    293   if (!response.has_client_command())
    294     return;
    295   const ClientCommand& command = response.client_command();
    296 
    297   // The server limits the number of items a client can commit in one batch.
    298   if (command.has_max_commit_batch_size()) {
    299     session->context()->set_max_commit_batch_size(
    300         command.max_commit_batch_size());
    301   }
    302   if (command.has_set_sync_long_poll_interval()) {
    303     session->delegate()->OnReceivedLongPollIntervalUpdate(
    304         TimeDelta::FromSeconds(command.set_sync_long_poll_interval()));
    305   }
    306   if (command.has_set_sync_poll_interval()) {
    307     session->delegate()->OnReceivedShortPollIntervalUpdate(
    308         TimeDelta::FromSeconds(command.set_sync_poll_interval()));
    309   }
    310 }
    311 
    312 void CopyServerFields(syncable::Entry* src, syncable::MutableEntry* dest) {
    313   dest->Put(SERVER_NON_UNIQUE_NAME, src->Get(SERVER_NON_UNIQUE_NAME));
    314   dest->Put(SERVER_PARENT_ID, src->Get(SERVER_PARENT_ID));
    315   dest->Put(SERVER_MTIME, src->Get(SERVER_MTIME));
    316   dest->Put(SERVER_CTIME, src->Get(SERVER_CTIME));
    317   dest->Put(SERVER_VERSION, src->Get(SERVER_VERSION));
    318   dest->Put(SERVER_IS_DIR, src->Get(SERVER_IS_DIR));
    319   dest->Put(SERVER_IS_DEL, src->Get(SERVER_IS_DEL));
    320   dest->Put(IS_UNAPPLIED_UPDATE, src->Get(IS_UNAPPLIED_UPDATE));
    321   dest->Put(SERVER_SPECIFICS, src->Get(SERVER_SPECIFICS));
    322   dest->Put(SERVER_POSITION_IN_PARENT, src->Get(SERVER_POSITION_IN_PARENT));
    323 }
    324 
    325 void ClearServerData(syncable::MutableEntry* entry) {
    326   entry->Put(SERVER_NON_UNIQUE_NAME, "");
    327   entry->Put(SERVER_PARENT_ID, syncable::kNullId);
    328   entry->Put(SERVER_MTIME, 0);
    329   entry->Put(SERVER_CTIME, 0);
    330   entry->Put(SERVER_VERSION, 0);
    331   entry->Put(SERVER_IS_DIR, false);
    332   entry->Put(SERVER_IS_DEL, false);
    333   entry->Put(IS_UNAPPLIED_UPDATE, false);
    334   entry->Put(SERVER_SPECIFICS, sync_pb::EntitySpecifics::default_instance());
    335   entry->Put(SERVER_POSITION_IN_PARENT, 0);
    336 }
    337 
    338 }  // namespace browser_sync
    339