Home | History | Annotate | Download | only in engine
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/sync/engine/syncer_thread.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/rand_util.h"
     10 #include "chrome/browser/sync/engine/syncer.h"
     11 
     12 using base::TimeDelta;
     13 using base::TimeTicks;
     14 
     15 namespace browser_sync {
     16 
     17 using sessions::SyncSession;
     18 using sessions::SyncSessionSnapshot;
     19 using sessions::SyncSourceInfo;
     20 using syncable::ModelTypePayloadMap;
     21 using syncable::ModelTypeBitSet;
     22 using sync_pb::GetUpdatesCallerInfo;
     23 
     24 SyncerThread::DelayProvider::DelayProvider() {}
     25 SyncerThread::DelayProvider::~DelayProvider() {}
     26 
     27 SyncerThread::WaitInterval::WaitInterval() {}
     28 SyncerThread::WaitInterval::~WaitInterval() {}
     29 
     30 SyncerThread::SyncSessionJob::SyncSessionJob() {}
     31 SyncerThread::SyncSessionJob::~SyncSessionJob() {}
     32 
     33 SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
     34     base::TimeTicks start,
     35     linked_ptr<sessions::SyncSession> session, bool is_canary_job,
     36     const tracked_objects::Location& nudge_location) : purpose(purpose),
     37         scheduled_start(start),
     38         session(session),
     39         is_canary_job(is_canary_job),
     40         nudge_location(nudge_location) {
     41 }
     42 
     43 TimeDelta SyncerThread::DelayProvider::GetDelay(
     44     const base::TimeDelta& last_delay) {
     45   return SyncerThread::GetRecommendedDelay(last_delay);
     46 }
     47 
     48 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
     49     NudgeSource source) {
     50   switch (source) {
     51     case NUDGE_SOURCE_NOTIFICATION:
     52       return GetUpdatesCallerInfo::NOTIFICATION;
     53     case NUDGE_SOURCE_LOCAL:
     54       return GetUpdatesCallerInfo::LOCAL;
     55     case NUDGE_SOURCE_CONTINUATION:
     56       return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
     57     case NUDGE_SOURCE_UNKNOWN:
     58       return GetUpdatesCallerInfo::UNKNOWN;
     59     default:
     60       NOTREACHED();
     61       return GetUpdatesCallerInfo::UNKNOWN;
     62   }
     63 }
     64 
     65 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
     66     : mode(mode), had_nudge(false), length(length) { }
     67 
     68 SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
     69                            Syncer* syncer)
     70     : thread_("SyncEngine_SyncerThread"),
     71       syncer_short_poll_interval_seconds_(
     72           TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
     73       syncer_long_poll_interval_seconds_(
     74           TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
     75       mode_(NORMAL_MODE),
     76       server_connection_ok_(false),
     77       delay_provider_(new DelayProvider()),
     78       syncer_(syncer),
     79       session_context_(context) {
     80 }
     81 
     82 SyncerThread::~SyncerThread() {
     83   DCHECK(!thread_.IsRunning());
     84 }
     85 
     86 void SyncerThread::CheckServerConnectionManagerStatus(
     87     HttpResponse::ServerConnectionCode code) {
     88 
     89   VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
     90           << "Old mode: " << server_connection_ok_ << " Code: " << code;
     91   // Note, be careful when adding cases here because if the SyncerThread
     92   // thinks there is no valid connection as determined by this method, it
     93   // will drop out of *all* forward progress sync loops (it won't poll and it
     94   // will queue up Talk notifications but not actually call SyncShare) until
     95   // some external action causes a ServerConnectionManager to broadcast that
     96   // a valid connection has been re-established.
     97   if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
     98       HttpResponse::SYNC_AUTH_ERROR == code) {
     99     server_connection_ok_ = false;
    100     VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
    101             << " new mode:" << server_connection_ok_;
    102   } else if (HttpResponse::SERVER_CONNECTION_OK == code) {
    103     server_connection_ok_ = true;
    104     VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
    105             << " new mode:" << server_connection_ok_;
    106     DoCanaryJob();
    107   }
    108 }
    109 
    110 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
    111   VLOG(1) << "SyncerThread(" << this << ")" << "  Start called from thread "
    112           << MessageLoop::current()->thread_name();
    113   if (!thread_.IsRunning()) {
    114     VLOG(1) << "SyncerThread(" << this << ")" << " Starting thread with mode "
    115             << mode;
    116     if (!thread_.Start()) {
    117       NOTREACHED() << "Unable to start SyncerThread.";
    118       return;
    119     }
    120     WatchConnectionManager();
    121     thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
    122         this, &SyncerThread::SendInitialSnapshot));
    123   }
    124 
    125   VLOG(1) << "SyncerThread(" << this << ")" << "  Entering start with mode = "
    126           << mode;
    127 
    128   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
    129       this, &SyncerThread::StartImpl, mode, callback));
    130 }
    131 
    132 void SyncerThread::SendInitialSnapshot() {
    133   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    134   scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this,
    135       SyncSourceInfo(), ModelSafeRoutingInfo(),
    136       std::vector<ModelSafeWorker*>()));
    137   SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
    138   sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot());
    139   event.snapshot = &snapshot;
    140   session_context_->NotifyListeners(event);
    141 }
    142 
    143 void SyncerThread::WatchConnectionManager() {
    144   ServerConnectionManager* scm = session_context_->connection_manager();
    145   CheckServerConnectionManagerStatus(scm->server_status());
    146   scm->AddListener(this);
    147 }
    148 
    149 void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) {
    150   VLOG(1) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode "
    151           << mode;
    152 
    153   // TODO(lipalani): This will leak if startimpl is never run. Fix it using a
    154   // ThreadSafeRefcounted object.
    155   scoped_ptr<ModeChangeCallback> scoped_callback(callback);
    156   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    157   DCHECK(!session_context_->account_name().empty());
    158   DCHECK(syncer_.get());
    159   mode_ = mode;
    160   AdjustPolling(NULL);  // Will kick start poll timer if needed.
    161   if (scoped_callback.get())
    162     scoped_callback->Run();
    163 
    164   // We just changed our mode. See if there are any pending jobs that we could
    165   // execute in the new mode.
    166   DoPendingJobIfPossible(false);
    167 }
    168 
    169 SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval(
    170     const SyncSessionJob& job) {
    171 
    172   DCHECK(wait_interval_.get());
    173   DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA);
    174 
    175   VLOG(1) << "SyncerThread(" << this << ")" << " Wait interval mode : "
    176           << wait_interval_->mode << "Wait interval had nudge : "
    177           << wait_interval_->had_nudge << "is canary job : "
    178           << job.is_canary_job;
    179 
    180   if (job.purpose == SyncSessionJob::POLL)
    181     return DROP;
    182 
    183   DCHECK(job.purpose == SyncSessionJob::NUDGE ||
    184       job.purpose == SyncSessionJob::CONFIGURATION);
    185   if (wait_interval_->mode == WaitInterval::THROTTLED)
    186     return SAVE;
    187 
    188   DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
    189   if (job.purpose == SyncSessionJob::NUDGE) {
    190     if (mode_ == CONFIGURATION_MODE)
    191       return SAVE;
    192 
    193     // If we already had one nudge then just drop this nudge. We will retry
    194     // later when the timer runs out.
    195     return wait_interval_->had_nudge ? DROP : CONTINUE;
    196   }
    197   // This is a config job.
    198   return job.is_canary_job ? CONTINUE : SAVE;
    199 }
    200 
    201 SyncerThread::JobProcessDecision SyncerThread::DecideOnJob(
    202     const SyncSessionJob& job) {
    203   if (job.purpose == SyncSessionJob::CLEAR_USER_DATA)
    204     return CONTINUE;
    205 
    206   if (wait_interval_.get())
    207     return DecideWhileInWaitInterval(job);
    208 
    209   if (mode_ == CONFIGURATION_MODE) {
    210     if (job.purpose == SyncSessionJob::NUDGE)
    211       return SAVE;
    212     else if (job.purpose == SyncSessionJob::CONFIGURATION)
    213       return CONTINUE;
    214     else
    215       return DROP;
    216   }
    217 
    218   // We are in normal mode.
    219   DCHECK_EQ(mode_, NORMAL_MODE);
    220   DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
    221 
    222   // Freshness condition
    223   if (job.scheduled_start < last_sync_session_end_time_) {
    224     VLOG(1) << "SyncerThread(" << this << ")"
    225             << " Dropping job because of freshness";
    226     return DROP;
    227   }
    228 
    229   if (server_connection_ok_)
    230     return CONTINUE;
    231 
    232   VLOG(1) << "SyncerThread(" << this << ")"
    233           << " Bad server connection. Using that to decide on job.";
    234   return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
    235 }
    236 
    237 void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) {
    238   DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
    239   if (pending_nudge_.get() == NULL) {
    240     VLOG(1) << "SyncerThread(" << this << ")"
    241             << " Creating a pending nudge job";
    242     SyncSession* s = job.session.get();
    243     scoped_ptr<SyncSession> session(new SyncSession(s->context(),
    244         s->delegate(), s->source(), s->routing_info(), s->workers()));
    245 
    246     SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
    247         make_linked_ptr(session.release()), false, job.nudge_location);
    248     pending_nudge_.reset(new SyncSessionJob(new_job));
    249 
    250     return;
    251   }
    252 
    253   VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge";
    254   pending_nudge_->session->Coalesce(*(job.session.get()));
    255   pending_nudge_->scheduled_start = job.scheduled_start;
    256 
    257   // Unfortunately the nudge location cannot be modified. So it stores the
    258   // location of the first caller.
    259 }
    260 
    261 bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) {
    262   JobProcessDecision decision = DecideOnJob(job);
    263   VLOG(1) << "SyncerThread(" << this << ")" << " Should run job, decision: "
    264           << decision << " Job purpose " << job.purpose << "mode " << mode_;
    265   if (decision != SAVE)
    266     return decision == CONTINUE;
    267 
    268   DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
    269       SyncSessionJob::CONFIGURATION);
    270 
    271   SaveJob(job);
    272   return false;
    273 }
    274 
    275 void SyncerThread::SaveJob(const SyncSessionJob& job) {
    276   DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA);
    277   if (job.purpose == SyncSessionJob::NUDGE) {
    278     VLOG(1) << "SyncerThread(" << this << ")" << " Saving a nudge job";
    279     InitOrCoalescePendingJob(job);
    280   } else if (job.purpose == SyncSessionJob::CONFIGURATION){
    281     VLOG(1) << "SyncerThread(" << this << ")" << " Saving a configuration job";
    282     DCHECK(wait_interval_.get());
    283     DCHECK(mode_ == CONFIGURATION_MODE);
    284 
    285     SyncSession* old = job.session.get();
    286     SyncSession* s(new SyncSession(session_context_.get(), this,
    287         old->source(), old->routing_info(), old->workers()));
    288     SyncSessionJob new_job(job.purpose, TimeTicks::Now(),
    289                           make_linked_ptr(s), false, job.nudge_location);
    290     wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
    291   } // drop the rest.
    292 }
    293 
    294 // Functor for std::find_if to search by ModelSafeGroup.
    295 struct ModelSafeWorkerGroupIs {
    296   explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
    297   bool operator()(ModelSafeWorker* w) {
    298     return group == w->GetModelSafeGroup();
    299   }
    300   ModelSafeGroup group;
    301 };
    302 
    303 void SyncerThread::ScheduleClearUserData() {
    304   if (!thread_.IsRunning()) {
    305     NOTREACHED();
    306     return;
    307   }
    308   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
    309       this, &SyncerThread::ScheduleClearUserDataImpl));
    310 }
    311 
    312 void SyncerThread::ScheduleNudge(const TimeDelta& delay,
    313     NudgeSource source, const ModelTypeBitSet& types,
    314     const tracked_objects::Location& nudge_location) {
    315   if (!thread_.IsRunning()) {
    316     NOTREACHED();
    317     return;
    318   }
    319 
    320   VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled";
    321 
    322   ModelTypePayloadMap types_with_payloads =
    323       syncable::ModelTypePayloadMapFromBitSet(types, std::string());
    324   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
    325       this, &SyncerThread::ScheduleNudgeImpl, delay,
    326       GetUpdatesFromNudgeSource(source), types_with_payloads, false,
    327       nudge_location));
    328 }
    329 
    330 void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
    331     NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
    332     const tracked_objects::Location& nudge_location) {
    333   if (!thread_.IsRunning()) {
    334     NOTREACHED();
    335     return;
    336   }
    337 
    338   VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads";
    339 
    340   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
    341       this, &SyncerThread::ScheduleNudgeImpl, delay,
    342       GetUpdatesFromNudgeSource(source), types_with_payloads, false,
    343       nudge_location));
    344 }
    345 
    346 void SyncerThread::ScheduleClearUserDataImpl() {
    347   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    348   SyncSession* session = new SyncSession(session_context_.get(), this,
    349       SyncSourceInfo(), ModelSafeRoutingInfo(),
    350       std::vector<ModelSafeWorker*>());
    351   ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
    352       SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE);
    353 }
    354 
    355 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
    356     GetUpdatesCallerInfo::GetUpdatesSource source,
    357     const ModelTypePayloadMap& types_with_payloads,
    358     bool is_canary_job, const tracked_objects::Location& nudge_location) {
    359   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    360 
    361   VLOG(1) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl";
    362   // Note we currently nudge for all types regardless of the ones incurring
    363   // the nudge.  Doing different would throw off some syncer commands like
    364   // CleanupDisabledTypes.  We may want to change this in the future.
    365   SyncSourceInfo info(source, types_with_payloads);
    366 
    367   SyncSession* session(CreateSyncSession(info));
    368   SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
    369                      make_linked_ptr(session), is_canary_job,
    370                      nudge_location);
    371 
    372   session = NULL;
    373   if (!ShouldRunJob(job))
    374     return;
    375 
    376   if (pending_nudge_.get()) {
    377     if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
    378       VLOG(1) << "SyncerThread(" << this << ")" << " Dropping the nudge because"
    379               << "we are in backoff";
    380       return;
    381     }
    382 
    383     VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing pending nudge";
    384     pending_nudge_->session->Coalesce(*(job.session.get()));
    385 
    386     if (!IsBackingOff()) {
    387       VLOG(1) << "SyncerThread(" << this << ")" << " Dropping a nudge because"
    388               << " we are not in backoff and the job was coalesced";
    389       return;
    390     } else {
    391       VLOG(1) << "SyncerThread(" << this << ")"
    392               << " Rescheduling pending nudge";
    393       SyncSession* s = pending_nudge_->session.get();
    394       job.session.reset(new SyncSession(s->context(), s->delegate(),
    395           s->source(), s->routing_info(), s->workers()));
    396       pending_nudge_.reset();
    397     }
    398   }
    399 
    400   // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob.
    401   ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(),
    402       nudge_location);
    403 }
    404 
    405 // Helper to extract the routing info and workers corresponding to types in
    406 // |types| from |registrar|.
    407 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types,
    408     ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes,
    409     std::vector<ModelSafeWorker*>* workers) {
    410   ModelSafeRoutingInfo r_tmp;
    411   std::vector<ModelSafeWorker*> w_tmp;
    412   registrar->GetModelSafeRoutingInfo(&r_tmp);
    413   registrar->GetWorkers(&w_tmp);
    414 
    415   bool passive_group_added = false;
    416 
    417   typedef std::vector<ModelSafeWorker*>::const_iterator iter;
    418   for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) {
    419     if (!types.test(i))
    420       continue;
    421     syncable::ModelType t = syncable::ModelTypeFromInt(i);
    422     DCHECK_EQ(1U, r_tmp.count(t));
    423     (*routes)[t] = r_tmp[t];
    424     iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
    425                            ModelSafeWorkerGroupIs(r_tmp[t]));
    426     if (it != w_tmp.end()) {
    427       iter it2 = std::find_if(workers->begin(), workers->end(),
    428                               ModelSafeWorkerGroupIs(r_tmp[t]));
    429       if (it2 == workers->end())
    430         workers->push_back(*it);
    431 
    432       if (r_tmp[t] == GROUP_PASSIVE)
    433         passive_group_added = true;
    434     } else {
    435         NOTREACHED();
    436     }
    437   }
    438 
    439   // Always add group passive.
    440   if (passive_group_added == false) {
    441     iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
    442                            ModelSafeWorkerGroupIs(GROUP_PASSIVE));
    443     if (it != w_tmp.end())
    444       workers->push_back(*it);
    445     else
    446       NOTREACHED();
    447   }
    448 }
    449 
    450 void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) {
    451   if (!thread_.IsRunning()) {
    452     NOTREACHED();
    453     return;
    454   }
    455 
    456   VLOG(1) << "SyncerThread(" << this << ")" << " Scheduling a config";
    457   ModelSafeRoutingInfo routes;
    458   std::vector<ModelSafeWorker*> workers;
    459   GetModelSafeParamsForTypes(types, session_context_->registrar(),
    460                              &routes, &workers);
    461 
    462   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
    463       this, &SyncerThread::ScheduleConfigImpl, routes, workers,
    464       GetUpdatesCallerInfo::FIRST_UPDATE));
    465 }
    466 
    467 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info,
    468     const std::vector<ModelSafeWorker*>& workers,
    469     const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
    470   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    471 
    472   VLOG(1) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl...";
    473   // TODO(tim): config-specific GetUpdatesCallerInfo value?
    474   SyncSession* session = new SyncSession(session_context_.get(), this,
    475       SyncSourceInfo(source,
    476           syncable::ModelTypePayloadMapFromRoutingInfo(
    477               routing_info, std::string())),
    478       routing_info, workers);
    479   ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
    480     SyncSessionJob::CONFIGURATION, session, FROM_HERE);
    481 }
    482 
    483 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
    484     SyncSessionJob::SyncSessionJobPurpose purpose,
    485     sessions::SyncSession* session,
    486     const tracked_objects::Location& nudge_location) {
    487   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    488 
    489   SyncSessionJob job(purpose, TimeTicks::Now() + delay,
    490                         make_linked_ptr(session), false, nudge_location);
    491   if (purpose == SyncSessionJob::NUDGE) {
    492     VLOG(1) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in"
    493             << " ScheduleSyncSessionJob";
    494     DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session);
    495     pending_nudge_.reset(new SyncSessionJob(job));
    496   }
    497   VLOG(1) << "SyncerThread(" << this << ")"
    498           << " Posting job to execute in DoSyncSessionJob. Job purpose "
    499           << job.purpose;
    500   MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this,
    501       &SyncerThread::DoSyncSessionJob, job),
    502       delay.InMilliseconds());
    503 }
    504 
    505 void SyncerThread::SetSyncerStepsForPurpose(
    506     SyncSessionJob::SyncSessionJobPurpose purpose,
    507     SyncerStep* start, SyncerStep* end) {
    508   *end = SYNCER_END;
    509   switch (purpose) {
    510     case SyncSessionJob::CONFIGURATION:
    511       *start = DOWNLOAD_UPDATES;
    512       *end = APPLY_UPDATES;
    513       return;
    514     case SyncSessionJob::CLEAR_USER_DATA:
    515       *start = CLEAR_PRIVATE_DATA;
    516        return;
    517     case SyncSessionJob::NUDGE:
    518     case SyncSessionJob::POLL:
    519       *start = SYNCER_BEGIN;
    520       return;
    521     default:
    522       NOTREACHED();
    523   }
    524 }
    525 
    526 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
    527   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    528   if (!ShouldRunJob(job)) {
    529     LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = "
    530         << job.session->source().updates_source;
    531     return;
    532   }
    533 
    534   if (job.purpose == SyncSessionJob::NUDGE) {
    535     if (pending_nudge_.get() == NULL || pending_nudge_->session != job.session)
    536       return;  // Another nudge must have been scheduled in in the meantime.
    537     pending_nudge_.reset();
    538   }
    539   VLOG(1) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose "
    540           << job.purpose;
    541 
    542   SyncerStep begin(SYNCER_BEGIN);
    543   SyncerStep end(SYNCER_END);
    544   SetSyncerStepsForPurpose(job.purpose, &begin, &end);
    545 
    546   bool has_more_to_sync = true;
    547   while (ShouldRunJob(job) && has_more_to_sync) {
    548     VLOG(1) << "SyncerThread(" << this << ")"
    549             << " SyncerThread: Calling SyncShare.";
    550     // Synchronously perform the sync session from this thread.
    551     syncer_->SyncShare(job.session.get(), begin, end);
    552     has_more_to_sync = job.session->HasMoreToSync();
    553     if (has_more_to_sync)
    554       job.session->ResetTransientState();
    555   }
    556   VLOG(1) << "SyncerThread(" << this << ")"
    557           << " SyncerThread: Done SyncShare looping.";
    558   FinishSyncSessionJob(job);
    559 }
    560 
    561 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
    562   if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
    563     // Whatever types were part of a configuration task will have had updates
    564     // downloaded.  For that reason, we make sure they get recorded in the
    565     // event that they get disabled at a later time.
    566     ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
    567     if (!r.empty()) {
    568       ModelSafeRoutingInfo temp_r;
    569       ModelSafeRoutingInfo old_info(old_job.session->routing_info());
    570       std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(),
    571           std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin()));
    572       session_context_->set_previous_session_routing_info(temp_r);
    573     }
    574   } else {
    575     session_context_->set_previous_session_routing_info(
    576         old_job.session->routing_info());
    577   }
    578 }
    579 
    580 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
    581   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    582   // Update timing information for how often datatypes are triggering nudges.
    583   base::TimeTicks now = TimeTicks::Now();
    584   if (!last_sync_session_end_time_.is_null()) {
    585     ModelTypePayloadMap::const_iterator iter;
    586     for (iter = job.session->source().types.begin();
    587          iter != job.session->source().types.end();
    588          ++iter) {
    589       syncable::PostTimeToTypeHistogram(iter->first,
    590                                         now - last_sync_session_end_time_);
    591     }
    592   }
    593   last_sync_session_end_time_ = now;
    594   UpdateCarryoverSessionState(job);
    595   if (IsSyncingCurrentlySilenced()) {
    596     VLOG(1) << "SyncerThread(" << this << ")"
    597             << " We are currently throttled. So not scheduling the next sync.";
    598     SaveJob(job);
    599     return;  // Nothing to do.
    600   }
    601 
    602   VLOG(1) << "SyncerThread(" << this << ")"
    603           << " Updating the next polling time after SyncMain";
    604   ScheduleNextSync(job);
    605 }
    606 
    607 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
    608   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    609   DCHECK(!old_job.session->HasMoreToSync());
    610   // Note: |num_server_changes_remaining| > 0 here implies that we received a
    611   // broken response while trying to download all updates, because the Syncer
    612   // will loop until this value is exhausted. Also, if unsynced_handles exist
    613   // but HasMoreToSync is false, this implies that the Syncer determined no
    614   // forward progress was possible at this time (an error, such as an HTTP
    615   // 500, is likely to have occurred during commit).
    616   const bool work_to_do =
    617      old_job.session->status_controller()->num_server_changes_remaining() > 0
    618      || old_job.session->status_controller()->unsynced_handles().size() > 0;
    619   VLOG(1) << "SyncerThread(" << this << ")" << " syncer has work to do: "
    620           << work_to_do;
    621 
    622   AdjustPolling(&old_job);
    623 
    624   // TODO(tim): Old impl had special code if notifications disabled. Needed?
    625   if (!work_to_do) {
    626     // Success implies backoff relief.  Note that if this was a "one-off" job
    627     // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was
    628     // work_to_do before it ran this wont have changed, as jobs like this don't
    629     // run a full sync cycle.  So we don't need special code here.
    630     wait_interval_.reset();
    631     VLOG(1) << "SyncerThread(" << this << ")"
    632             << " Job suceeded so not scheduling more jobs";
    633     return;
    634   }
    635 
    636   if (old_job.session->source().updates_source ==
    637       GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) {
    638     VLOG(1) << "SyncerThread(" << this << ")"
    639             << " Job failed with source continuation";
    640     // We don't seem to have made forward progress. Start or extend backoff.
    641     HandleConsecutiveContinuationError(old_job);
    642   } else if (IsBackingOff()) {
    643     VLOG(1) << "SyncerThread(" << this << ")"
    644             << " A nudge during backoff failed";
    645     // We weren't continuing but we're in backoff; must have been a nudge.
    646     DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
    647     DCHECK(!wait_interval_->had_nudge);
    648     wait_interval_->had_nudge = true;
    649     wait_interval_->timer.Reset();
    650   } else {
    651     VLOG(1) << "SyncerThread(" << this << ")"
    652             << " Failed. Schedule a job with continuation as source";
    653     // We weren't continuing and we aren't in backoff.  Schedule a normal
    654     // continuation.
    655     if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
    656       ScheduleConfigImpl(old_job.session->routing_info(),
    657           old_job.session->workers(),
    658           GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION));
    659     } else  {
    660       // For all other purposes(nudge and poll) we schedule a retry nudge.
    661       ScheduleNudgeImpl(TimeDelta::FromSeconds(0),
    662                         GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION),
    663                         old_job.session->source().types, false, FROM_HERE);
    664     }
    665   }
    666 }
    667 
    668 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
    669   DCHECK(thread_.IsRunning());
    670   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    671 
    672   TimeDelta poll  = (!session_context_->notifications_enabled()) ?
    673       syncer_short_poll_interval_seconds_ :
    674       syncer_long_poll_interval_seconds_;
    675   bool rate_changed = !poll_timer_.IsRunning() ||
    676                        poll != poll_timer_.GetCurrentDelay();
    677 
    678   if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
    679     poll_timer_.Reset();
    680 
    681   if (!rate_changed)
    682     return;
    683 
    684   // Adjust poll rate.
    685   poll_timer_.Stop();
    686   poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback);
    687 }
    688 
    689 void SyncerThread::HandleConsecutiveContinuationError(
    690     const SyncSessionJob& old_job) {
    691   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    692   // This if conditions should be compiled out in retail builds.
    693   if (IsBackingOff()) {
    694     DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
    695   }
    696   SyncSession* old = old_job.session.get();
    697   SyncSession* s(new SyncSession(session_context_.get(), this,
    698       old->source(), old->routing_info(), old->workers()));
    699   TimeDelta length = delay_provider_->GetDelay(
    700       IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
    701 
    702   VLOG(1) << "SyncerThread(" << this << ")"
    703           << " In handle continuation error. Old job purpose is "
    704           << old_job.purpose;
    705   VLOG(1) << "SyncerThread(" << this << ")"
    706     << " In Handle continuation error. The time delta(ms) is: "
    707           << length.InMilliseconds();
    708 
    709   // This will reset the had_nudge variable as well.
    710   wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
    711                                         length));
    712   if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
    713     SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
    714                         make_linked_ptr(s), false, FROM_HERE);
    715     wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
    716   } else {
    717     // We are not in configuration mode. So wait_interval's pending job
    718     // should be null.
    719     DCHECK(wait_interval_->pending_configure_job.get() == NULL);
    720 
    721     // TODO(lipalani) - handle clear user data.
    722     InitOrCoalescePendingJob(old_job);
    723   }
    724   wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
    725 }
    726 
    727 // static
    728 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) {
    729   if (last_delay.InSeconds() >= kMaxBackoffSeconds)
    730     return TimeDelta::FromSeconds(kMaxBackoffSeconds);
    731 
    732   // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
    733   int64 backoff_s =
    734       std::max(static_cast<int64>(1),
    735                last_delay.InSeconds() * kBackoffRandomizationFactor);
    736 
    737   // Flip a coin to randomize backoff interval by +/- 50%.
    738   int rand_sign = base::RandInt(0, 1) * 2 - 1;
    739 
    740   // Truncation is adequate for rounding here.
    741   backoff_s = backoff_s +
    742       (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));
    743 
    744   // Cap the backoff interval.
    745   backoff_s = std::max(static_cast<int64>(1),
    746                        std::min(backoff_s, kMaxBackoffSeconds));
    747 
    748   return TimeDelta::FromSeconds(backoff_s);
    749 }
    750 
    751 void SyncerThread::Stop() {
    752   VLOG(1) << "SyncerThread(" << this << ")" << " stop called";
    753   syncer_->RequestEarlyExit();  // Safe to call from any thread.
    754   session_context_->connection_manager()->RemoveListener(this);
    755   thread_.Stop();
    756 }
    757 
    758 void SyncerThread::DoCanaryJob() {
    759   VLOG(1) << "SyncerThread(" << this << ")" << " Do canary job";
    760   DoPendingJobIfPossible(true);
    761 }
    762 
    763 void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) {
    764   SyncSessionJob* job_to_execute = NULL;
    765   if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
    766       && wait_interval_->pending_configure_job.get()) {
    767     VLOG(1) << "SyncerThread(" << this << ")" << " Found pending configure job";
    768     job_to_execute = wait_interval_->pending_configure_job.get();
    769   } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
    770     VLOG(1) << "SyncerThread(" << this << ")" << " Found pending nudge job";
    771     // Pending jobs mostly have time from the past. Reset it so this job
    772     // will get executed.
    773     if (pending_nudge_->scheduled_start < TimeTicks::Now())
    774       pending_nudge_->scheduled_start = TimeTicks::Now();
    775 
    776     scoped_ptr<SyncSession> session(CreateSyncSession(
    777         pending_nudge_->session->source()));
    778 
    779     // Also the routing info might have been changed since we cached the
    780     // pending nudge. Update it by coalescing to the latest.
    781     pending_nudge_->session->Coalesce(*(session.get()));
    782     // The pending nudge would be cleared in the DoSyncSessionJob function.
    783     job_to_execute = pending_nudge_.get();
    784   }
    785 
    786   if (job_to_execute != NULL) {
    787     VLOG(1) << "SyncerThread(" << this << ")" << " Executing pending job";
    788     SyncSessionJob copy = *job_to_execute;
    789     copy.is_canary_job = is_canary_job;
    790     DoSyncSessionJob(copy);
    791   }
    792 }
    793 
    794 SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) {
    795   ModelSafeRoutingInfo routes;
    796   std::vector<ModelSafeWorker*> workers;
    797   session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
    798   session_context_->registrar()->GetWorkers(&workers);
    799   SyncSourceInfo info(source);
    800 
    801   SyncSession* session(new SyncSession(session_context_.get(), this, info,
    802       routes, workers));
    803 
    804   return session;
    805 }
    806 
    807 void SyncerThread::PollTimerCallback() {
    808   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    809   ModelSafeRoutingInfo r;
    810   ModelTypePayloadMap types_with_payloads =
    811       syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
    812   SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
    813   SyncSession* s = CreateSyncSession(info);
    814   ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s,
    815       FROM_HERE);
    816 }
    817 
    818 void SyncerThread::Unthrottle() {
    819   DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
    820   VLOG(1) << "SyncerThread(" << this << ")" << " Unthrottled..";
    821   DoCanaryJob();
    822   wait_interval_.reset();
    823 }
    824 
    825 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
    826   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    827   session_context_->NotifyListeners(SyncEngineEvent(cause));
    828 }
    829 
    830 bool SyncerThread::IsBackingOff() const {
    831   return wait_interval_.get() && wait_interval_->mode ==
    832       WaitInterval::EXPONENTIAL_BACKOFF;
    833 }
    834 
    835 void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
    836   wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
    837                                         silenced_until - TimeTicks::Now()));
    838   wait_interval_->timer.Start(wait_interval_->length, this,
    839       &SyncerThread::Unthrottle);
    840 }
    841 
    842 bool SyncerThread::IsSyncingCurrentlySilenced() {
    843   return wait_interval_.get() && wait_interval_->mode ==
    844       WaitInterval::THROTTLED;
    845 }
    846 
    847 void SyncerThread::OnReceivedShortPollIntervalUpdate(
    848     const base::TimeDelta& new_interval) {
    849   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    850   syncer_short_poll_interval_seconds_ = new_interval;
    851 }
    852 
    853 void SyncerThread::OnReceivedLongPollIntervalUpdate(
    854     const base::TimeDelta& new_interval) {
    855   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
    856   syncer_long_poll_interval_seconds_ = new_interval;
    857 }
    858 
    859 void SyncerThread::OnShouldStopSyncingPermanently() {
    860   VLOG(1) << "SyncerThread(" << this << ")"
    861           << " OnShouldStopSyncingPermanently";
    862   syncer_->RequestEarlyExit();  // Thread-safe.
    863   Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
    864 }
    865 
    866 void SyncerThread::OnServerConnectionEvent(
    867     const ServerConnectionEvent2& event) {
    868   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
    869       &SyncerThread::CheckServerConnectionManagerStatus,
    870       event.connection_code));
    871 }
    872 
    873 void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
    874   session_context_->set_notifications_enabled(notifications_enabled);
    875 }
    876 
    877 }  // browser_sync
    878