Home | History | Annotate | Download | only in engine
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/engine/sync_scheduler_impl.h"
      6 
      7 #include <algorithm>
      8 #include <cstring>
      9 
     10 #include "base/auto_reset.h"
     11 #include "base/bind.h"
     12 #include "base/bind_helpers.h"
     13 #include "base/compiler_specific.h"
     14 #include "base/location.h"
     15 #include "base/logging.h"
     16 #include "base/message_loop/message_loop.h"
     17 #include "sync/engine/backoff_delay_provider.h"
     18 #include "sync/engine/syncer.h"
     19 #include "sync/protocol/proto_enum_conversions.h"
     20 #include "sync/protocol/sync.pb.h"
     21 #include "sync/util/data_type_histogram.h"
     22 #include "sync/util/logging.h"
     23 
     24 using base::TimeDelta;
     25 using base::TimeTicks;
     26 
     27 namespace syncer {
     28 
     29 using sessions::SyncSession;
     30 using sessions::SyncSessionSnapshot;
     31 using sync_pb::GetUpdatesCallerInfo;
     32 
     33 namespace {
     34 
     35 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
     36   switch (error.error_type) {
     37     case SYNC_SUCCESS:
     38     case MIGRATION_DONE:
     39     case THROTTLED:
     40     case TRANSIENT_ERROR:
     41       return false;
     42     case NOT_MY_BIRTHDAY:
     43     case CLEAR_PENDING:
     44     case DISABLED_BY_ADMIN:
     45       // If we send terminate sync early then |sync_cycle_ended| notification
     46       // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
     47       // notification wouldnt be sent either. Then the UI layer would be left
     48       // waiting forever. So assert we would send something.
     49       DCHECK_NE(error.action, UNKNOWN_ACTION);
     50       return true;
     51     case INVALID_CREDENTIAL:
     52       // The notification for this is handled by PostAndProcessHeaders|.
     53       // Server does no have to send any action for this.
     54       return true;
     55     // Make the default a NOTREACHED. So if a new error is introduced we
     56     // think about its expected functionality.
     57     default:
     58       NOTREACHED();
     59       return false;
     60   }
     61 }
     62 
     63 bool IsActionableError(
     64     const SyncProtocolError& error) {
     65   return (error.action != UNKNOWN_ACTION);
     66 }
     67 }  // namespace
     68 
     69 ConfigurationParams::ConfigurationParams()
     70     : source(GetUpdatesCallerInfo::UNKNOWN) {}
     71 ConfigurationParams::ConfigurationParams(
     72     const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
     73     ModelTypeSet types_to_download,
     74     const ModelSafeRoutingInfo& routing_info,
     75     const base::Closure& ready_task)
     76     : source(source),
     77       types_to_download(types_to_download),
     78       routing_info(routing_info),
     79       ready_task(ready_task) {
     80   DCHECK(!ready_task.is_null());
     81 }
     82 ConfigurationParams::~ConfigurationParams() {}
     83 
     84 SyncSchedulerImpl::WaitInterval::WaitInterval()
     85     : mode(UNKNOWN) {}
     86 
     87 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
     88     : mode(mode), length(length) {}
     89 
     90 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
     91 
     92 #define ENUM_CASE(x) case x: return #x; break;
     93 
     94 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
     95   switch (mode) {
     96     ENUM_CASE(UNKNOWN);
     97     ENUM_CASE(EXPONENTIAL_BACKOFF);
     98     ENUM_CASE(THROTTLED);
     99   }
    100   NOTREACHED();
    101   return "";
    102 }
    103 
    104 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
    105     NudgeSource source) {
    106   switch (source) {
    107     case NUDGE_SOURCE_NOTIFICATION:
    108       return GetUpdatesCallerInfo::NOTIFICATION;
    109     case NUDGE_SOURCE_LOCAL:
    110       return GetUpdatesCallerInfo::LOCAL;
    111     case NUDGE_SOURCE_LOCAL_REFRESH:
    112       return GetUpdatesCallerInfo::DATATYPE_REFRESH;
    113     case NUDGE_SOURCE_UNKNOWN:
    114       return GetUpdatesCallerInfo::UNKNOWN;
    115     default:
    116       NOTREACHED();
    117       return GetUpdatesCallerInfo::UNKNOWN;
    118   }
    119 }
    120 
    121 // Helper macros to log with the syncer thread name; useful when there
    122 // are multiple syncer threads involved.
    123 
    124 #define SLOG(severity) LOG(severity) << name_ << ": "
    125 
    126 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
    127 
    128 #define SDVLOG_LOC(from_here, verbose_level)             \
    129   DVLOG_LOC(from_here, verbose_level) << name_ << ": "
    130 
    131 namespace {
    132 
    133 const int kDefaultSessionsCommitDelaySeconds = 10;
    134 
    135 bool IsConfigRelatedUpdateSourceValue(
    136     GetUpdatesCallerInfo::GetUpdatesSource source) {
    137   switch (source) {
    138     case GetUpdatesCallerInfo::RECONFIGURATION:
    139     case GetUpdatesCallerInfo::MIGRATION:
    140     case GetUpdatesCallerInfo::NEW_CLIENT:
    141     case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
    142       return true;
    143     default:
    144       return false;
    145   }
    146 }
    147 
    148 }  // namespace
    149 
    150 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
    151                                      BackoffDelayProvider* delay_provider,
    152                                      sessions::SyncSessionContext* context,
    153                                      Syncer* syncer)
    154     : weak_ptr_factory_(this),
    155       weak_ptr_factory_for_weak_handle_(this),
    156       weak_handle_this_(MakeWeakHandle(
    157           weak_ptr_factory_for_weak_handle_.GetWeakPtr())),
    158       name_(name),
    159       started_(false),
    160       syncer_short_poll_interval_seconds_(
    161           TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
    162       syncer_long_poll_interval_seconds_(
    163           TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
    164       sessions_commit_delay_(
    165           TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
    166       mode_(NORMAL_MODE),
    167       delay_provider_(delay_provider),
    168       syncer_(syncer),
    169       session_context_(context),
    170       no_scheduling_allowed_(false),
    171       do_poll_after_credentials_updated_(false) {
    172 }
    173 
    174 SyncSchedulerImpl::~SyncSchedulerImpl() {
    175   DCHECK(CalledOnValidThread());
    176   StopImpl();
    177 }
    178 
    179 void SyncSchedulerImpl::OnCredentialsUpdated() {
    180   DCHECK(CalledOnValidThread());
    181 
    182   if (HttpResponse::SYNC_AUTH_ERROR ==
    183       session_context_->connection_manager()->server_status()) {
    184     OnServerConnectionErrorFixed();
    185   }
    186 }
    187 
    188 void SyncSchedulerImpl::OnConnectionStatusChange() {
    189   if (HttpResponse::CONNECTION_UNAVAILABLE  ==
    190       session_context_->connection_manager()->server_status()) {
    191     // Optimistically assume that the connection is fixed and try
    192     // connecting.
    193     OnServerConnectionErrorFixed();
    194   }
    195 }
    196 
    197 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
    198   // There could be a pending nudge or configuration job in several cases:
    199   //
    200   // 1. We're in exponential backoff.
    201   // 2. We're silenced / throttled.
    202   // 3. A nudge was saved previously due to not having a valid auth token.
    203   // 4. A nudge was scheduled + saved while in configuration mode.
    204   //
    205   // In all cases except (2), we want to retry contacting the server. We
    206   // call DoCanaryJob to achieve this, and note that nothing -- not even a
    207   // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
    208   // has the authority to do that is the Unthrottle timer.
    209   TryCanaryJob();
    210 }
    211 
    212 void SyncSchedulerImpl::Start(Mode mode) {
    213   DCHECK(CalledOnValidThread());
    214   std::string thread_name = base::MessageLoop::current()->thread_name();
    215   if (thread_name.empty())
    216     thread_name = "<Main thread>";
    217   SDVLOG(2) << "Start called from thread "
    218             << thread_name << " with mode " << GetModeString(mode);
    219   if (!started_) {
    220     started_ = true;
    221     SendInitialSnapshot();
    222   }
    223 
    224   DCHECK(!session_context_->account_name().empty());
    225   DCHECK(syncer_.get());
    226   Mode old_mode = mode_;
    227   mode_ = mode;
    228   AdjustPolling(UPDATE_INTERVAL);  // Will kick start poll timer if needed.
    229 
    230   if (old_mode != mode_ &&
    231       mode_ == NORMAL_MODE &&
    232       nudge_tracker_.IsSyncRequired() &&
    233       CanRunNudgeJobNow(NORMAL_PRIORITY)) {
    234     // We just got back to normal mode.  Let's try to run the work that was
    235     // queued up while we were configuring.
    236     DoNudgeSyncSessionJob(NORMAL_PRIORITY);
    237   }
    238 }
    239 
    240 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
    241   ModelTypeSet enabled_types =
    242       GetRoutingInfoTypes(session_context_->routing_info());
    243   ModelTypeSet throttled_types =
    244       nudge_tracker_.GetThrottledTypes();
    245   return Difference(enabled_types, throttled_types);
    246 }
    247 
    248 void SyncSchedulerImpl::SendInitialSnapshot() {
    249   DCHECK(CalledOnValidThread());
    250   scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
    251   SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
    252   event.snapshot = dummy->TakeSnapshot();
    253   session_context_->NotifyListeners(event);
    254 }
    255 
    256 namespace {
    257 
    258 // Helper to extract the routing info corresponding to types in
    259 // |types_to_download| from |current_routes|.
    260 void BuildModelSafeParams(
    261     ModelTypeSet types_to_download,
    262     const ModelSafeRoutingInfo& current_routes,
    263     ModelSafeRoutingInfo* result_routes) {
    264   for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
    265        iter.Inc()) {
    266     ModelType type = iter.Get();
    267     ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
    268     DCHECK(route != current_routes.end());
    269     ModelSafeGroup group = route->second;
    270     (*result_routes)[type] = group;
    271   }
    272 }
    273 
    274 }  // namespace.
    275 
    276 bool SyncSchedulerImpl::ScheduleConfiguration(
    277     const ConfigurationParams& params) {
    278   DCHECK(CalledOnValidThread());
    279   DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
    280   DCHECK_EQ(CONFIGURATION_MODE, mode_);
    281   DCHECK(!params.ready_task.is_null());
    282   CHECK(started_) << "Scheduler must be running to configure.";
    283   SDVLOG(2) << "Reconfiguring syncer.";
    284 
    285   // Only one configuration is allowed at a time. Verify we're not waiting
    286   // for a pending configure job.
    287   DCHECK(!pending_configure_params_);
    288 
    289   ModelSafeRoutingInfo restricted_routes;
    290   BuildModelSafeParams(params.types_to_download,
    291                        params.routing_info,
    292                        &restricted_routes);
    293   session_context_->set_routing_info(restricted_routes);
    294 
    295   // Only reconfigure if we have types to download.
    296   if (!params.types_to_download.Empty()) {
    297     pending_configure_params_.reset(new ConfigurationParams(params));
    298     bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY);
    299 
    300     // If we failed, the job would have been saved as the pending configure
    301     // job and a wait interval would have been set.
    302     if (!succeeded) {
    303       DCHECK(pending_configure_params_);
    304     } else {
    305       DCHECK(!pending_configure_params_);
    306     }
    307     return succeeded;
    308   } else {
    309     SDVLOG(2) << "No change in routing info, calling ready task directly.";
    310     params.ready_task.Run();
    311   }
    312 
    313   return true;
    314 }
    315 
    316 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
    317   DCHECK(CalledOnValidThread());
    318   if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
    319     SDVLOG(1) << "Unable to run a job because we're throttled.";
    320     return false;
    321   }
    322 
    323   if (wait_interval_
    324       && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
    325       && priority != CANARY_PRIORITY) {
    326     SDVLOG(1) << "Unable to run a job because we're backing off.";
    327     return false;
    328   }
    329 
    330   if (session_context_->connection_manager()->HasInvalidAuthToken()) {
    331     SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
    332     return false;
    333   }
    334 
    335   return true;
    336 }
    337 
    338 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
    339   DCHECK(CalledOnValidThread());
    340 
    341   if (!CanRunJobNow(priority)) {
    342     SDVLOG(1) << "Unable to run a nudge job right now";
    343     return false;
    344   }
    345 
    346   const ModelTypeSet enabled_types =
    347       GetRoutingInfoTypes(session_context_->routing_info());
    348   if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
    349     SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
    350     return false;
    351   }
    352 
    353   if (mode_ == CONFIGURATION_MODE) {
    354     SDVLOG(1) << "Not running nudge because we're in configuration mode.";
    355     return false;
    356   }
    357 
    358   return true;
    359 }
    360 
    361 void SyncSchedulerImpl::ScheduleLocalNudge(
    362     const TimeDelta& desired_delay,
    363     ModelTypeSet types,
    364     const tracked_objects::Location& nudge_location) {
    365   DCHECK(CalledOnValidThread());
    366   DCHECK(!types.Empty());
    367 
    368   SDVLOG_LOC(nudge_location, 2)
    369       << "Scheduling sync because of local change to "
    370       << ModelTypeSetToString(types);
    371   UpdateNudgeTimeRecords(types);
    372   nudge_tracker_.RecordLocalChange(types);
    373   ScheduleNudgeImpl(desired_delay, nudge_location);
    374 }
    375 
    376 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
    377     const TimeDelta& desired_delay,
    378     ModelTypeSet types,
    379     const tracked_objects::Location& nudge_location) {
    380   DCHECK(CalledOnValidThread());
    381   DCHECK(!types.Empty());
    382 
    383   SDVLOG_LOC(nudge_location, 2)
    384       << "Scheduling sync because of local refresh request for "
    385       << ModelTypeSetToString(types);
    386   nudge_tracker_.RecordLocalRefreshRequest(types);
    387   ScheduleNudgeImpl(desired_delay, nudge_location);
    388 }
    389 
    390 void SyncSchedulerImpl::ScheduleInvalidationNudge(
    391     const TimeDelta& desired_delay,
    392     const ModelTypeInvalidationMap& invalidation_map,
    393     const tracked_objects::Location& nudge_location) {
    394   DCHECK(CalledOnValidThread());
    395   DCHECK(!invalidation_map.empty());
    396 
    397   SDVLOG_LOC(nudge_location, 2)
    398       << "Scheduling sync because we received invalidation for "
    399       << ModelTypeInvalidationMapToString(invalidation_map);
    400   nudge_tracker_.RecordRemoteInvalidation(invalidation_map);
    401   ScheduleNudgeImpl(desired_delay, nudge_location);
    402 }
    403 
    404 // TODO(zea): Consider adding separate throttling/backoff for datatype
    405 // refresh requests.
    406 void SyncSchedulerImpl::ScheduleNudgeImpl(
    407     const TimeDelta& delay,
    408     const tracked_objects::Location& nudge_location) {
    409   DCHECK(CalledOnValidThread());
    410 
    411   if (no_scheduling_allowed_) {
    412     NOTREACHED() << "Illegal to schedule job while session in progress.";
    413     return;
    414   }
    415 
    416   if (!started_) {
    417     SDVLOG_LOC(nudge_location, 2)
    418         << "Dropping nudge, scheduler is not running.";
    419     return;
    420   }
    421 
    422   SDVLOG_LOC(nudge_location, 2)
    423       << "In ScheduleNudgeImpl with delay "
    424       << delay.InMilliseconds() << " ms";
    425 
    426   if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
    427     return;
    428 
    429   if (!started_) {
    430     SDVLOG_LOC(nudge_location, 2)
    431         << "Schedule not started; not running a nudge.";
    432     return;
    433   }
    434 
    435   TimeTicks incoming_run_time = TimeTicks::Now() + delay;
    436   if (!scheduled_nudge_time_.is_null() &&
    437     (scheduled_nudge_time_ < incoming_run_time)) {
    438     // Old job arrives sooner than this one.  Don't reschedule it.
    439     return;
    440   }
    441 
    442   // Either there is no existing nudge in flight or the incoming nudge should be
    443   // made to arrive first (preempt) the existing nudge.  We reschedule in either
    444   // case.
    445   SDVLOG_LOC(nudge_location, 2)
    446       << "Scheduling a nudge with "
    447       << delay.InMilliseconds() << " ms delay";
    448   scheduled_nudge_time_ = incoming_run_time;
    449   pending_wakeup_timer_.Start(
    450       nudge_location,
    451       delay,
    452       base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
    453                  weak_ptr_factory_.GetWeakPtr()));
    454 }
    455 
    456 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
    457   switch (mode) {
    458     ENUM_CASE(CONFIGURATION_MODE);
    459     ENUM_CASE(NORMAL_MODE);
    460   }
    461   return "";
    462 }
    463 
    464 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
    465   DCHECK(CalledOnValidThread());
    466   DCHECK(CanRunNudgeJobNow(priority));
    467 
    468   DVLOG(2) << "Will run normal mode sync cycle with routing info "
    469            << ModelSafeRoutingInfoToString(session_context_->routing_info());
    470   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
    471   bool premature_exit = !syncer_->NormalSyncShare(
    472       GetEnabledAndUnthrottledTypes(),
    473       nudge_tracker_,
    474       session.get());
    475   AdjustPolling(FORCE_RESET);
    476   // Don't run poll job till the next time poll timer fires.
    477   do_poll_after_credentials_updated_ = false;
    478 
    479   bool success = !premature_exit
    480       && !sessions::HasSyncerError(
    481           session->status_controller().model_neutral_state());
    482 
    483   if (success) {
    484     // That cycle took care of any outstanding work we had.
    485     SDVLOG(2) << "Nudge succeeded.";
    486     nudge_tracker_.RecordSuccessfulSyncCycle();
    487     scheduled_nudge_time_ = base::TimeTicks();
    488 
    489     // If we're here, then we successfully reached the server.  End all backoff.
    490     wait_interval_.reset();
    491     NotifyRetryTime(base::Time());
    492     return;
    493   } else {
    494     HandleFailure(session->status_controller().model_neutral_state());
    495   }
    496 }
    497 
    498 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
    499   DCHECK(CalledOnValidThread());
    500   DCHECK_EQ(mode_, CONFIGURATION_MODE);
    501 
    502   if (!CanRunJobNow(priority)) {
    503     SDVLOG(2) << "Unable to run configure job right now.";
    504     return false;
    505   }
    506 
    507   SDVLOG(2) << "Will run configure SyncShare with routes "
    508            << ModelSafeRoutingInfoToString(session_context_->routing_info());
    509   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
    510   bool premature_exit = !syncer_->ConfigureSyncShare(
    511       GetRoutingInfoTypes(session_context_->routing_info()),
    512       pending_configure_params_->source,
    513       session.get());
    514   AdjustPolling(FORCE_RESET);
    515   // Don't run poll job till the next time poll timer fires.
    516   do_poll_after_credentials_updated_ = false;
    517 
    518   bool success = !premature_exit
    519       && !sessions::HasSyncerError(
    520           session->status_controller().model_neutral_state());
    521 
    522   if (success) {
    523     SDVLOG(2) << "Configure succeeded.";
    524     pending_configure_params_->ready_task.Run();
    525     pending_configure_params_.reset();
    526 
    527     // If we're here, then we successfully reached the server.  End all backoff.
    528     wait_interval_.reset();
    529     NotifyRetryTime(base::Time());
    530     return true;
    531   } else {
    532     HandleFailure(session->status_controller().model_neutral_state());
    533     return false;
    534   }
    535 }
    536 
    537 void SyncSchedulerImpl::HandleFailure(
    538     const sessions::ModelNeutralState& model_neutral_state) {
    539   if (IsCurrentlyThrottled()) {
    540     SDVLOG(2) << "Was throttled during previous sync cycle.";
    541     RestartWaiting();
    542   } else if (!IsBackingOff()) {
    543     // Setup our backoff if this is our first such failure.
    544     TimeDelta length = delay_provider_->GetDelay(
    545         delay_provider_->GetInitialDelay(model_neutral_state));
    546     wait_interval_.reset(
    547         new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
    548     SDVLOG(2) << "Sync cycle failed.  Will back off for "
    549         << wait_interval_->length.InMilliseconds() << "ms.";
    550     RestartWaiting();
    551   }
    552 }
    553 
    554 void SyncSchedulerImpl::DoPollSyncSessionJob() {
    555   ModelSafeRoutingInfo r;
    556   ModelTypeInvalidationMap invalidation_map =
    557       ModelSafeRoutingInfoToInvalidationMap(r, std::string());
    558   base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
    559 
    560   if (!CanRunJobNow(NORMAL_PRIORITY)) {
    561     SDVLOG(2) << "Unable to run a poll job right now.";
    562     return;
    563   }
    564 
    565   if (mode_ != NORMAL_MODE) {
    566     SDVLOG(2) << "Not running poll job in configure mode.";
    567     return;
    568   }
    569 
    570   SDVLOG(2) << "Polling with routes "
    571            << ModelSafeRoutingInfoToString(session_context_->routing_info());
    572   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
    573   syncer_->PollSyncShare(
    574       GetEnabledAndUnthrottledTypes(),
    575       session.get());
    576 
    577   AdjustPolling(UPDATE_INTERVAL);
    578 
    579   if (IsCurrentlyThrottled()) {
    580     SDVLOG(2) << "Poll request got us throttled.";
    581     // The OnSilencedUntil() call set up the WaitInterval for us.  All we need
    582     // to do is start the timer.
    583     RestartWaiting();
    584   }
    585 }
    586 
    587 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
    588   DCHECK(CalledOnValidThread());
    589   base::TimeTicks now = TimeTicks::Now();
    590   // Update timing information for how often datatypes are triggering nudges.
    591   for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
    592     base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
    593     last_local_nudges_by_model_type_[iter.Get()] = now;
    594     if (previous.is_null())
    595       continue;
    596 
    597 #define PER_DATA_TYPE_MACRO(type_str) \
    598     SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
    599     SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
    600 #undef PER_DATA_TYPE_MACRO
    601   }
    602 }
    603 
    604 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
    605   DCHECK(CalledOnValidThread());
    606 
    607   TimeDelta poll  = (!session_context_->notifications_enabled() ||
    608                      !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
    609       syncer_short_poll_interval_seconds_ :
    610       syncer_long_poll_interval_seconds_;
    611   bool rate_changed = !poll_timer_.IsRunning() ||
    612                        poll != poll_timer_.GetCurrentDelay();
    613 
    614   if (type == FORCE_RESET && !rate_changed)
    615     poll_timer_.Reset();
    616 
    617   if (!rate_changed)
    618     return;
    619 
    620   // Adjust poll rate.
    621   poll_timer_.Stop();
    622   poll_timer_.Start(FROM_HERE, poll, this,
    623                     &SyncSchedulerImpl::PollTimerCallback);
    624 }
    625 
    626 void SyncSchedulerImpl::RestartWaiting() {
    627   CHECK(wait_interval_.get());
    628   DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
    629   NotifyRetryTime(base::Time::Now() + wait_interval_->length);
    630   SDVLOG(2) << "Starting WaitInterval timer of length "
    631       << wait_interval_->length.InMilliseconds() << "ms.";
    632   if (wait_interval_->mode == WaitInterval::THROTTLED) {
    633     pending_wakeup_timer_.Start(
    634         FROM_HERE,
    635         wait_interval_->length,
    636         base::Bind(&SyncSchedulerImpl::Unthrottle,
    637                    weak_ptr_factory_.GetWeakPtr()));
    638   } else {
    639     pending_wakeup_timer_.Start(
    640         FROM_HERE,
    641         wait_interval_->length,
    642         base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
    643                    weak_ptr_factory_.GetWeakPtr()));
    644   }
    645 }
    646 
    647 void SyncSchedulerImpl::RequestStop() {
    648   syncer_->RequestEarlyExit();  // Safe to call from any thread.
    649   DCHECK(weak_handle_this_.IsInitialized());
    650   SDVLOG(3) << "Posting StopImpl";
    651   weak_handle_this_.Call(FROM_HERE,
    652                          &SyncSchedulerImpl::StopImpl);
    653 }
    654 
    655 void SyncSchedulerImpl::StopImpl() {
    656   DCHECK(CalledOnValidThread());
    657   SDVLOG(2) << "StopImpl called";
    658 
    659   // Kill any in-flight method calls.
    660   weak_ptr_factory_.InvalidateWeakPtrs();
    661   wait_interval_.reset();
    662   NotifyRetryTime(base::Time());
    663   poll_timer_.Stop();
    664   pending_wakeup_timer_.Stop();
    665   pending_configure_params_.reset();
    666   if (started_)
    667     started_ = false;
    668 }
    669 
    670 // This is the only place where we invoke DoSyncSessionJob with canary
    671 // privileges.  Everyone else should use NORMAL_PRIORITY.
    672 void SyncSchedulerImpl::TryCanaryJob() {
    673   DCHECK(CalledOnValidThread());
    674 
    675   if (mode_ == CONFIGURATION_MODE && pending_configure_params_) {
    676     SDVLOG(2) << "Found pending configure job; will run as canary";
    677     DoConfigurationSyncSessionJob(CANARY_PRIORITY);
    678   } else if (mode_ == NORMAL_MODE && nudge_tracker_.IsSyncRequired() &&
    679              CanRunNudgeJobNow(CANARY_PRIORITY)) {
    680     SDVLOG(2) << "Found pending nudge job; will run as canary";
    681     DoNudgeSyncSessionJob(CANARY_PRIORITY);
    682   } else if (mode_ == NORMAL_MODE && CanRunJobNow(CANARY_PRIORITY) &&
    683              do_poll_after_credentials_updated_) {
    684     // Retry poll if poll timer recently fired and ProfileSyncService received
    685     // fresh access token.
    686     DoPollSyncSessionJob();
    687   } else {
    688     SDVLOG(2) << "Found no work to do; will not run a canary";
    689   }
    690   // Don't run poll job till the next time poll timer fires.
    691   do_poll_after_credentials_updated_ = false;
    692 }
    693 
    694 void SyncSchedulerImpl::PollTimerCallback() {
    695   DCHECK(CalledOnValidThread());
    696   if (no_scheduling_allowed_) {
    697     // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
    698     // functions that are called only on the sync thread.  This function is also
    699     // called only on the sync thread, and only when it is posted by an expiring
    700     // timer.  If we find that no_scheduling_allowed_ is set here, then
    701     // something is very wrong.  Maybe someone mistakenly called us directly, or
    702     // mishandled the book-keeping for no_scheduling_allowed_.
    703     NOTREACHED() << "Illegal to schedule job while session in progress.";
    704     return;
    705   }
    706 
    707   DoPollSyncSessionJob();
    708   // Poll timer fires infrequently. Usually by this time access token is already
    709   // expired and poll job will fail with auth error. Set flag to retry poll once
    710   // ProfileSyncService gets new access token, TryCanaryJob will be called in
    711   // this case.
    712   if (HttpResponse::SYNC_AUTH_ERROR ==
    713       session_context_->connection_manager()->server_status()) {
    714     do_poll_after_credentials_updated_ = true;
    715   }
    716 }
    717 
    718 void SyncSchedulerImpl::Unthrottle() {
    719   DCHECK(CalledOnValidThread());
    720   DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
    721 
    722   // We're no longer throttled, so clear the wait interval.
    723   wait_interval_.reset();
    724   NotifyRetryTime(base::Time());
    725 
    726   // We treat this as a 'canary' in the sense that it was originally scheduled
    727   // to run some time ago, failed, and we now want to retry, versus a job that
    728   // was just created (e.g via ScheduleNudgeImpl). The main implication is
    729   // that we're careful to update routing info (etc) with such potentially
    730   // stale canary jobs.
    731   TryCanaryJob();
    732 }
    733 
    734 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
    735   DCHECK(CalledOnValidThread());
    736   nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
    737   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
    738 
    739   if (nudge_tracker_.IsAnyTypeThrottled()) {
    740     base::TimeDelta time_until_next_unthrottle =
    741         nudge_tracker_.GetTimeUntilNextUnthrottle(unthrottle_time);
    742     type_unthrottle_timer_.Start(
    743         FROM_HERE,
    744         time_until_next_unthrottle,
    745         base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
    746                    weak_ptr_factory_.GetWeakPtr(),
    747                    unthrottle_time + time_until_next_unthrottle));
    748   }
    749 
    750   // Maybe this is a good time to run a nudge job.  Let's try it.
    751   if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
    752     DoNudgeSyncSessionJob(NORMAL_PRIORITY);
    753 }
    754 
    755 void SyncSchedulerImpl::PerformDelayedNudge() {
    756   // Circumstances may have changed since we scheduled this delayed nudge.
    757   // We must check to see if it's OK to run the job before we do so.
    758   if (CanRunNudgeJobNow(NORMAL_PRIORITY))
    759     DoNudgeSyncSessionJob(NORMAL_PRIORITY);
    760 
    761   // We're not responsible for setting up any retries here.  The functions that
    762   // first put us into a state that prevents successful sync cycles (eg. global
    763   // throttling, type throttling, network errors, transient errors) will also
    764   // setup the appropriate retry logic (eg. retry after timeout, exponential
    765   // backoff, retry when the network changes).
    766 }
    767 
    768 void SyncSchedulerImpl::ExponentialBackoffRetry() {
    769   TryCanaryJob();
    770 
    771   if (IsBackingOff()) {
    772     // If we succeeded, our wait interval would have been cleared.  If it hasn't
    773     // been cleared, then we should increase our backoff interval and schedule
    774     // another retry.
    775     TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
    776     wait_interval_.reset(
    777       new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
    778     SDVLOG(2) << "Sync cycle failed.  Will back off for "
    779         << wait_interval_->length.InMilliseconds() << "ms.";
    780     RestartWaiting();
    781   }
    782 }
    783 
    784 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
    785   DCHECK(CalledOnValidThread());
    786   session_context_->NotifyListeners(SyncEngineEvent(cause));
    787 }
    788 
    789 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
    790   SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED);
    791   event.retry_time = retry_time;
    792   session_context_->NotifyListeners(event);
    793 }
    794 
    795 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
    796   SyncEngineEvent event(SyncEngineEvent::THROTTLED_TYPES_CHANGED);
    797   event.throttled_types = types;
    798   session_context_->NotifyListeners(event);
    799 }
    800 
    801 bool SyncSchedulerImpl::IsBackingOff() const {
    802   DCHECK(CalledOnValidThread());
    803   return wait_interval_.get() && wait_interval_->mode ==
    804       WaitInterval::EXPONENTIAL_BACKOFF;
    805 }
    806 
    807 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
    808   DCHECK(CalledOnValidThread());
    809   wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
    810                                         throttle_duration));
    811   NotifyRetryTime(base::Time::Now() + wait_interval_->length);
    812 }
    813 
    814 void SyncSchedulerImpl::OnTypesThrottled(
    815     ModelTypeSet types,
    816     const base::TimeDelta& throttle_duration) {
    817   base::TimeTicks now = base::TimeTicks::Now();
    818 
    819   nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
    820   base::TimeDelta time_until_next_unthrottle =
    821       nudge_tracker_.GetTimeUntilNextUnthrottle(now);
    822   type_unthrottle_timer_.Start(
    823       FROM_HERE,
    824       time_until_next_unthrottle,
    825       base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
    826                  weak_ptr_factory_.GetWeakPtr(),
    827                  now + time_until_next_unthrottle));
    828   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
    829 }
    830 
    831 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
    832   DCHECK(CalledOnValidThread());
    833   return wait_interval_.get() && wait_interval_->mode ==
    834       WaitInterval::THROTTLED;
    835 }
    836 
    837 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
    838     const base::TimeDelta& new_interval) {
    839   DCHECK(CalledOnValidThread());
    840   syncer_short_poll_interval_seconds_ = new_interval;
    841 }
    842 
    843 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
    844     const base::TimeDelta& new_interval) {
    845   DCHECK(CalledOnValidThread());
    846   syncer_long_poll_interval_seconds_ = new_interval;
    847 }
    848 
    849 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
    850     const base::TimeDelta& new_delay) {
    851   DCHECK(CalledOnValidThread());
    852   sessions_commit_delay_ = new_delay;
    853 }
    854 
    855 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
    856   if (size > 0)
    857     nudge_tracker_.SetHintBufferSize(size);
    858   else
    859     NOTREACHED() << "Hint buffer size should be > 0.";
    860 }
    861 
    862 void SyncSchedulerImpl::OnShouldStopSyncingPermanently() {
    863   DCHECK(CalledOnValidThread());
    864   SDVLOG(2) << "OnShouldStopSyncingPermanently";
    865   syncer_->RequestEarlyExit();  // Thread-safe.
    866   Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
    867 }
    868 
    869 void SyncSchedulerImpl::OnActionableError(
    870     const sessions::SyncSessionSnapshot& snap) {
    871   DCHECK(CalledOnValidThread());
    872   SDVLOG(2) << "OnActionableError";
    873   SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR);
    874   event.snapshot = snap;
    875   session_context_->NotifyListeners(event);
    876 }
    877 
    878 void SyncSchedulerImpl::OnSyncProtocolError(
    879     const sessions::SyncSessionSnapshot& snapshot) {
    880   DCHECK(CalledOnValidThread());
    881   if (ShouldRequestEarlyExit(
    882           snapshot.model_neutral_state().sync_protocol_error)) {
    883     SDVLOG(2) << "Sync Scheduler requesting early exit.";
    884     syncer_->RequestEarlyExit();  // Thread-safe.
    885   }
    886   if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error))
    887     OnActionableError(snapshot);
    888 }
    889 
    890 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
    891   DCHECK(CalledOnValidThread());
    892   session_context_->set_notifications_enabled(notifications_enabled);
    893   if (notifications_enabled)
    894     nudge_tracker_.OnInvalidationsEnabled();
    895   else
    896     nudge_tracker_.OnInvalidationsDisabled();
    897 }
    898 
    899 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const {
    900   DCHECK(CalledOnValidThread());
    901   return sessions_commit_delay_;
    902 }
    903 
    904 #undef SDVLOG_LOC
    905 
    906 #undef SDVLOG
    907 
    908 #undef SLOG
    909 
    910 #undef ENUM_CASE
    911 
    912 }  // namespace syncer
    913