Home | History | Annotate | Download | only in engine
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/engine/sync_scheduler_impl.h"
      6 
      7 #include <algorithm>
      8 #include <cstring>
      9 
     10 #include "base/auto_reset.h"
     11 #include "base/bind.h"
     12 #include "base/bind_helpers.h"
     13 #include "base/compiler_specific.h"
     14 #include "base/location.h"
     15 #include "base/logging.h"
     16 #include "base/message_loop/message_loop.h"
     17 #include "sync/engine/backoff_delay_provider.h"
     18 #include "sync/engine/syncer.h"
     19 #include "sync/notifier/object_id_invalidation_map.h"
     20 #include "sync/protocol/proto_enum_conversions.h"
     21 #include "sync/protocol/sync.pb.h"
     22 #include "sync/util/data_type_histogram.h"
     23 #include "sync/util/logging.h"
     24 
     25 using base::TimeDelta;
     26 using base::TimeTicks;
     27 
     28 namespace syncer {
     29 
     30 using sessions::SyncSession;
     31 using sessions::SyncSessionSnapshot;
     32 using sync_pb::GetUpdatesCallerInfo;
     33 
     34 namespace {
     35 
     36 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
     37   switch (error.error_type) {
     38     case SYNC_SUCCESS:
     39     case MIGRATION_DONE:
     40     case THROTTLED:
     41     case TRANSIENT_ERROR:
     42       return false;
     43     case NOT_MY_BIRTHDAY:
     44     case CLEAR_PENDING:
     45     case DISABLED_BY_ADMIN:
     46     case USER_ROLLBACK:
     47       // If we send terminate sync early then |sync_cycle_ended| notification
     48       // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
     49       // notification wouldnt be sent either. Then the UI layer would be left
     50       // waiting forever. So assert we would send something.
     51       DCHECK_NE(error.action, UNKNOWN_ACTION);
     52       return true;
     53     case INVALID_CREDENTIAL:
     54       // The notification for this is handled by PostAndProcessHeaders|.
     55       // Server does no have to send any action for this.
     56       return true;
     57     // Make the default a NOTREACHED. So if a new error is introduced we
     58     // think about its expected functionality.
     59     default:
     60       NOTREACHED();
     61       return false;
     62   }
     63 }
     64 
     65 bool IsActionableError(
     66     const SyncProtocolError& error) {
     67   return (error.action != UNKNOWN_ACTION);
     68 }
     69 }  // namespace
     70 
     71 ConfigurationParams::ConfigurationParams()
     72     : source(GetUpdatesCallerInfo::UNKNOWN) {}
     73 ConfigurationParams::ConfigurationParams(
     74     const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
     75     ModelTypeSet types_to_download,
     76     const ModelSafeRoutingInfo& routing_info,
     77     const base::Closure& ready_task,
     78     const base::Closure& retry_task)
     79     : source(source),
     80       types_to_download(types_to_download),
     81       routing_info(routing_info),
     82       ready_task(ready_task),
     83       retry_task(retry_task) {
     84   DCHECK(!ready_task.is_null());
     85   DCHECK(!retry_task.is_null());
     86 }
     87 ConfigurationParams::~ConfigurationParams() {}
     88 
     89 SyncSchedulerImpl::WaitInterval::WaitInterval()
     90     : mode(UNKNOWN) {}
     91 
     92 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
     93     : mode(mode), length(length) {}
     94 
     95 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
     96 
     97 #define ENUM_CASE(x) case x: return #x; break;
     98 
     99 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
    100   switch (mode) {
    101     ENUM_CASE(UNKNOWN);
    102     ENUM_CASE(EXPONENTIAL_BACKOFF);
    103     ENUM_CASE(THROTTLED);
    104   }
    105   NOTREACHED();
    106   return "";
    107 }
    108 
    109 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
    110     NudgeSource source) {
    111   switch (source) {
    112     case NUDGE_SOURCE_NOTIFICATION:
    113       return GetUpdatesCallerInfo::NOTIFICATION;
    114     case NUDGE_SOURCE_LOCAL:
    115       return GetUpdatesCallerInfo::LOCAL;
    116     case NUDGE_SOURCE_LOCAL_REFRESH:
    117       return GetUpdatesCallerInfo::DATATYPE_REFRESH;
    118     case NUDGE_SOURCE_UNKNOWN:
    119       return GetUpdatesCallerInfo::UNKNOWN;
    120     default:
    121       NOTREACHED();
    122       return GetUpdatesCallerInfo::UNKNOWN;
    123   }
    124 }
    125 
    126 // Helper macros to log with the syncer thread name; useful when there
    127 // are multiple syncer threads involved.
    128 
    129 #define SLOG(severity) LOG(severity) << name_ << ": "
    130 
    131 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
    132 
    133 #define SDVLOG_LOC(from_here, verbose_level)             \
    134   DVLOG_LOC(from_here, verbose_level) << name_ << ": "
    135 
    136 namespace {
    137 
    138 const int kDefaultSessionsCommitDelaySeconds = 10;
    139 
    140 bool IsConfigRelatedUpdateSourceValue(
    141     GetUpdatesCallerInfo::GetUpdatesSource source) {
    142   switch (source) {
    143     case GetUpdatesCallerInfo::RECONFIGURATION:
    144     case GetUpdatesCallerInfo::MIGRATION:
    145     case GetUpdatesCallerInfo::NEW_CLIENT:
    146     case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
    147       return true;
    148     default:
    149       return false;
    150   }
    151 }
    152 
    153 }  // namespace
    154 
    155 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
    156                                      BackoffDelayProvider* delay_provider,
    157                                      sessions::SyncSessionContext* context,
    158                                      Syncer* syncer)
    159     : name_(name),
    160       started_(false),
    161       syncer_short_poll_interval_seconds_(
    162           TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
    163       syncer_long_poll_interval_seconds_(
    164           TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
    165       sessions_commit_delay_(
    166           TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
    167       mode_(NORMAL_MODE),
    168       delay_provider_(delay_provider),
    169       syncer_(syncer),
    170       session_context_(context),
    171       no_scheduling_allowed_(false),
    172       do_poll_after_credentials_updated_(false),
    173       next_sync_session_job_priority_(NORMAL_PRIORITY),
    174       weak_ptr_factory_(this),
    175       weak_ptr_factory_for_weak_handle_(this) {
    176   weak_handle_this_ = MakeWeakHandle(
    177       weak_ptr_factory_for_weak_handle_.GetWeakPtr());
    178 }
    179 
    180 SyncSchedulerImpl::~SyncSchedulerImpl() {
    181   DCHECK(CalledOnValidThread());
    182   Stop();
    183 }
    184 
    185 void SyncSchedulerImpl::OnCredentialsUpdated() {
    186   DCHECK(CalledOnValidThread());
    187 
    188   if (HttpResponse::SYNC_AUTH_ERROR ==
    189       session_context_->connection_manager()->server_status()) {
    190     OnServerConnectionErrorFixed();
    191   }
    192 }
    193 
    194 void SyncSchedulerImpl::OnConnectionStatusChange() {
    195   if (HttpResponse::CONNECTION_UNAVAILABLE  ==
    196       session_context_->connection_manager()->server_status()) {
    197     // Optimistically assume that the connection is fixed and try
    198     // connecting.
    199     OnServerConnectionErrorFixed();
    200   }
    201 }
    202 
    203 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
    204   // There could be a pending nudge or configuration job in several cases:
    205   //
    206   // 1. We're in exponential backoff.
    207   // 2. We're silenced / throttled.
    208   // 3. A nudge was saved previously due to not having a valid auth token.
    209   // 4. A nudge was scheduled + saved while in configuration mode.
    210   //
    211   // In all cases except (2), we want to retry contacting the server. We
    212   // call TryCanaryJob to achieve this, and note that nothing -- not even a
    213   // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
    214   // has the authority to do that is the Unthrottle timer.
    215   TryCanaryJob();
    216 }
    217 
    218 void SyncSchedulerImpl::Start(Mode mode) {
    219   DCHECK(CalledOnValidThread());
    220   std::string thread_name = base::MessageLoop::current()->thread_name();
    221   if (thread_name.empty())
    222     thread_name = "<Main thread>";
    223   SDVLOG(2) << "Start called from thread "
    224             << thread_name << " with mode " << GetModeString(mode);
    225   if (!started_) {
    226     started_ = true;
    227     SendInitialSnapshot();
    228   }
    229 
    230   DCHECK(!session_context_->account_name().empty());
    231   DCHECK(syncer_.get());
    232   Mode old_mode = mode_;
    233   mode_ = mode;
    234   AdjustPolling(UPDATE_INTERVAL);  // Will kick start poll timer if needed.
    235 
    236   if (old_mode != mode_ && mode_ == NORMAL_MODE) {
    237     // We just got back to normal mode.  Let's try to run the work that was
    238     // queued up while we were configuring.
    239 
    240     // Update our current time before checking IsRetryRequired().
    241     nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
    242     if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
    243       TrySyncSessionJob();
    244     }
    245   }
    246 }
    247 
    248 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
    249   ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
    250   ModelTypeSet enabled_protocol_types =
    251       Intersection(ProtocolTypes(), enabled_types);
    252   ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
    253   return Difference(enabled_protocol_types, throttled_types);
    254 }
    255 
    256 void SyncSchedulerImpl::SendInitialSnapshot() {
    257   DCHECK(CalledOnValidThread());
    258   scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
    259   SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
    260   event.snapshot = dummy->TakeSnapshot();
    261   FOR_EACH_OBSERVER(SyncEngineEventListener,
    262                     *session_context_->listeners(),
    263                     OnSyncCycleEvent(event));
    264 }
    265 
    266 namespace {
    267 
    268 // Helper to extract the routing info corresponding to types in
    269 // |types_to_download| from |current_routes|.
    270 void BuildModelSafeParams(
    271     ModelTypeSet types_to_download,
    272     const ModelSafeRoutingInfo& current_routes,
    273     ModelSafeRoutingInfo* result_routes) {
    274   for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
    275        iter.Inc()) {
    276     ModelType type = iter.Get();
    277     ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
    278     DCHECK(route != current_routes.end());
    279     ModelSafeGroup group = route->second;
    280     (*result_routes)[type] = group;
    281   }
    282 }
    283 
    284 }  // namespace.
    285 
    286 void SyncSchedulerImpl::ScheduleConfiguration(
    287     const ConfigurationParams& params) {
    288   DCHECK(CalledOnValidThread());
    289   DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
    290   DCHECK_EQ(CONFIGURATION_MODE, mode_);
    291   DCHECK(!params.ready_task.is_null());
    292   CHECK(started_) << "Scheduler must be running to configure.";
    293   SDVLOG(2) << "Reconfiguring syncer.";
    294 
    295   // Only one configuration is allowed at a time. Verify we're not waiting
    296   // for a pending configure job.
    297   DCHECK(!pending_configure_params_);
    298 
    299   ModelSafeRoutingInfo restricted_routes;
    300   BuildModelSafeParams(params.types_to_download,
    301                        params.routing_info,
    302                        &restricted_routes);
    303   session_context_->SetRoutingInfo(restricted_routes);
    304 
    305   // Only reconfigure if we have types to download.
    306   if (!params.types_to_download.Empty()) {
    307     pending_configure_params_.reset(new ConfigurationParams(params));
    308     TrySyncSessionJob();
    309   } else {
    310     SDVLOG(2) << "No change in routing info, calling ready task directly.";
    311     params.ready_task.Run();
    312   }
    313 }
    314 
    315 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
    316   DCHECK(CalledOnValidThread());
    317   if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
    318     SDVLOG(1) << "Unable to run a job because we're throttled.";
    319     return false;
    320   }
    321 
    322   if (wait_interval_
    323       && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
    324       && priority != CANARY_PRIORITY) {
    325     SDVLOG(1) << "Unable to run a job because we're backing off.";
    326     return false;
    327   }
    328 
    329   if (session_context_->connection_manager()->HasInvalidAuthToken()) {
    330     SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
    331     return false;
    332   }
    333 
    334   return true;
    335 }
    336 
    337 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
    338   DCHECK(CalledOnValidThread());
    339 
    340   if (!CanRunJobNow(priority)) {
    341     SDVLOG(1) << "Unable to run a nudge job right now";
    342     return false;
    343   }
    344 
    345   const ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
    346   if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
    347     SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
    348     return false;
    349   }
    350 
    351   if (mode_ == CONFIGURATION_MODE) {
    352     SDVLOG(1) << "Not running nudge because we're in configuration mode.";
    353     return false;
    354   }
    355 
    356   return true;
    357 }
    358 
    359 void SyncSchedulerImpl::ScheduleLocalNudge(
    360     const TimeDelta& desired_delay,
    361     ModelTypeSet types,
    362     const tracked_objects::Location& nudge_location) {
    363   DCHECK(CalledOnValidThread());
    364   DCHECK(!types.Empty());
    365 
    366   SDVLOG_LOC(nudge_location, 2)
    367       << "Scheduling sync because of local change to "
    368       << ModelTypeSetToString(types);
    369   UpdateNudgeTimeRecords(types);
    370   nudge_tracker_.RecordLocalChange(types);
    371   ScheduleNudgeImpl(desired_delay, nudge_location);
    372 }
    373 
    374 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
    375     const TimeDelta& desired_delay,
    376     ModelTypeSet types,
    377     const tracked_objects::Location& nudge_location) {
    378   DCHECK(CalledOnValidThread());
    379   DCHECK(!types.Empty());
    380 
    381   SDVLOG_LOC(nudge_location, 2)
    382       << "Scheduling sync because of local refresh request for "
    383       << ModelTypeSetToString(types);
    384   nudge_tracker_.RecordLocalRefreshRequest(types);
    385   ScheduleNudgeImpl(desired_delay, nudge_location);
    386 }
    387 
    388 void SyncSchedulerImpl::ScheduleInvalidationNudge(
    389     const TimeDelta& desired_delay,
    390     const ObjectIdInvalidationMap& invalidation_map,
    391     const tracked_objects::Location& nudge_location) {
    392   DCHECK(CalledOnValidThread());
    393   DCHECK(!invalidation_map.Empty());
    394 
    395   SDVLOG_LOC(nudge_location, 2)
    396       << "Scheduling sync because we received invalidation for "
    397       << ModelTypeSetToString(
    398           ObjectIdSetToModelTypeSet(invalidation_map.GetObjectIds()));
    399   nudge_tracker_.RecordRemoteInvalidation(invalidation_map);
    400   ScheduleNudgeImpl(desired_delay, nudge_location);
    401 }
    402 
    403 // TODO(zea): Consider adding separate throttling/backoff for datatype
    404 // refresh requests.
    405 void SyncSchedulerImpl::ScheduleNudgeImpl(
    406     const TimeDelta& delay,
    407     const tracked_objects::Location& nudge_location) {
    408   DCHECK(CalledOnValidThread());
    409 
    410   if (no_scheduling_allowed_) {
    411     NOTREACHED() << "Illegal to schedule job while session in progress.";
    412     return;
    413   }
    414 
    415   if (!started_) {
    416     SDVLOG_LOC(nudge_location, 2)
    417         << "Dropping nudge, scheduler is not running.";
    418     return;
    419   }
    420 
    421   SDVLOG_LOC(nudge_location, 2)
    422       << "In ScheduleNudgeImpl with delay "
    423       << delay.InMilliseconds() << " ms";
    424 
    425   if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
    426     return;
    427 
    428   TimeTicks incoming_run_time = TimeTicks::Now() + delay;
    429   if (!scheduled_nudge_time_.is_null() &&
    430     (scheduled_nudge_time_ < incoming_run_time)) {
    431     // Old job arrives sooner than this one.  Don't reschedule it.
    432     return;
    433   }
    434 
    435   // Either there is no existing nudge in flight or the incoming nudge should be
    436   // made to arrive first (preempt) the existing nudge.  We reschedule in either
    437   // case.
    438   SDVLOG_LOC(nudge_location, 2)
    439       << "Scheduling a nudge with "
    440       << delay.InMilliseconds() << " ms delay";
    441   scheduled_nudge_time_ = incoming_run_time;
    442   pending_wakeup_timer_.Start(
    443       nudge_location,
    444       delay,
    445       base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
    446                  weak_ptr_factory_.GetWeakPtr()));
    447 }
    448 
    449 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
    450   switch (mode) {
    451     ENUM_CASE(CONFIGURATION_MODE);
    452     ENUM_CASE(NORMAL_MODE);
    453   }
    454   return "";
    455 }
    456 
    457 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
    458   DCHECK(CalledOnValidThread());
    459   DCHECK(CanRunNudgeJobNow(priority));
    460 
    461   DVLOG(2) << "Will run normal mode sync cycle with types "
    462            << ModelTypeSetToString(session_context_->GetEnabledTypes());
    463   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
    464   bool premature_exit = !syncer_->NormalSyncShare(
    465       GetEnabledAndUnthrottledTypes(),
    466       nudge_tracker_,
    467       session.get());
    468   AdjustPolling(FORCE_RESET);
    469   // Don't run poll job till the next time poll timer fires.
    470   do_poll_after_credentials_updated_ = false;
    471 
    472   bool success = !premature_exit
    473       && !sessions::HasSyncerError(
    474           session->status_controller().model_neutral_state());
    475 
    476   if (success) {
    477     // That cycle took care of any outstanding work we had.
    478     SDVLOG(2) << "Nudge succeeded.";
    479     nudge_tracker_.RecordSuccessfulSyncCycle();
    480     scheduled_nudge_time_ = base::TimeTicks();
    481 
    482     // If we're here, then we successfully reached the server.  End all backoff.
    483     wait_interval_.reset();
    484     NotifyRetryTime(base::Time());
    485   } else {
    486     HandleFailure(session->status_controller().model_neutral_state());
    487   }
    488 }
    489 
    490 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
    491   DCHECK(CalledOnValidThread());
    492   DCHECK_EQ(mode_, CONFIGURATION_MODE);
    493   DCHECK(pending_configure_params_ != NULL);
    494 
    495   if (!CanRunJobNow(priority)) {
    496     SDVLOG(2) << "Unable to run configure job right now.";
    497     if (!pending_configure_params_->retry_task.is_null()) {
    498       pending_configure_params_->retry_task.Run();
    499       pending_configure_params_->retry_task.Reset();
    500     }
    501     return;
    502   }
    503 
    504   SDVLOG(2) << "Will run configure SyncShare with types "
    505             << ModelTypeSetToString(session_context_->GetEnabledTypes());
    506   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
    507   bool premature_exit = !syncer_->ConfigureSyncShare(
    508       pending_configure_params_->types_to_download,
    509       pending_configure_params_->source,
    510       session.get());
    511   AdjustPolling(FORCE_RESET);
    512   // Don't run poll job till the next time poll timer fires.
    513   do_poll_after_credentials_updated_ = false;
    514 
    515   bool success = !premature_exit
    516       && !sessions::HasSyncerError(
    517           session->status_controller().model_neutral_state());
    518 
    519   if (success) {
    520     SDVLOG(2) << "Configure succeeded.";
    521     pending_configure_params_->ready_task.Run();
    522     pending_configure_params_.reset();
    523 
    524     // If we're here, then we successfully reached the server.  End all backoff.
    525     wait_interval_.reset();
    526     NotifyRetryTime(base::Time());
    527   } else {
    528     HandleFailure(session->status_controller().model_neutral_state());
    529     // Sync cycle might receive response from server that causes scheduler to
    530     // stop and draws pending_configure_params_ invalid.
    531     if (started_ && !pending_configure_params_->retry_task.is_null()) {
    532       pending_configure_params_->retry_task.Run();
    533       pending_configure_params_->retry_task.Reset();
    534     }
    535   }
    536 }
    537 
    538 void SyncSchedulerImpl::HandleFailure(
    539     const sessions::ModelNeutralState& model_neutral_state) {
    540   if (IsCurrentlyThrottled()) {
    541     SDVLOG(2) << "Was throttled during previous sync cycle.";
    542     RestartWaiting();
    543   } else if (!IsBackingOff()) {
    544     // Setup our backoff if this is our first such failure.
    545     TimeDelta length = delay_provider_->GetDelay(
    546         delay_provider_->GetInitialDelay(model_neutral_state));
    547     wait_interval_.reset(
    548         new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
    549     SDVLOG(2) << "Sync cycle failed.  Will back off for "
    550         << wait_interval_->length.InMilliseconds() << "ms.";
    551     RestartWaiting();
    552   }
    553 }
    554 
    555 void SyncSchedulerImpl::DoPollSyncSessionJob() {
    556   base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
    557 
    558   SDVLOG(2) << "Polling with types "
    559             << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
    560   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
    561   syncer_->PollSyncShare(
    562       GetEnabledAndUnthrottledTypes(),
    563       session.get());
    564 
    565   AdjustPolling(FORCE_RESET);
    566 
    567   if (IsCurrentlyThrottled()) {
    568     SDVLOG(2) << "Poll request got us throttled.";
    569     // The OnSilencedUntil() call set up the WaitInterval for us.  All we need
    570     // to do is start the timer.
    571     RestartWaiting();
    572   }
    573 }
    574 
    575 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
    576   DCHECK(CalledOnValidThread());
    577   base::TimeTicks now = TimeTicks::Now();
    578   // Update timing information for how often datatypes are triggering nudges.
    579   for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
    580     base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
    581     last_local_nudges_by_model_type_[iter.Get()] = now;
    582     if (previous.is_null())
    583       continue;
    584 
    585 #define PER_DATA_TYPE_MACRO(type_str) \
    586     SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
    587     SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
    588 #undef PER_DATA_TYPE_MACRO
    589   }
    590 }
    591 
    592 TimeDelta SyncSchedulerImpl::GetPollInterval() {
    593   return (!session_context_->notifications_enabled() ||
    594           !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
    595       syncer_short_poll_interval_seconds_ :
    596       syncer_long_poll_interval_seconds_;
    597 }
    598 
    599 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
    600   DCHECK(CalledOnValidThread());
    601 
    602   TimeDelta poll = GetPollInterval();
    603   bool rate_changed = !poll_timer_.IsRunning() ||
    604                        poll != poll_timer_.GetCurrentDelay();
    605 
    606   if (type == FORCE_RESET) {
    607     last_poll_reset_ = base::TimeTicks::Now();
    608     if (!rate_changed)
    609       poll_timer_.Reset();
    610   }
    611 
    612   if (!rate_changed)
    613     return;
    614 
    615   // Adjust poll rate.
    616   poll_timer_.Stop();
    617   poll_timer_.Start(FROM_HERE, poll, this,
    618                     &SyncSchedulerImpl::PollTimerCallback);
    619 }
    620 
    621 void SyncSchedulerImpl::RestartWaiting() {
    622   CHECK(wait_interval_.get());
    623   DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
    624   NotifyRetryTime(base::Time::Now() + wait_interval_->length);
    625   SDVLOG(2) << "Starting WaitInterval timer of length "
    626       << wait_interval_->length.InMilliseconds() << "ms.";
    627   if (wait_interval_->mode == WaitInterval::THROTTLED) {
    628     pending_wakeup_timer_.Start(
    629         FROM_HERE,
    630         wait_interval_->length,
    631         base::Bind(&SyncSchedulerImpl::Unthrottle,
    632                    weak_ptr_factory_.GetWeakPtr()));
    633   } else {
    634     pending_wakeup_timer_.Start(
    635         FROM_HERE,
    636         wait_interval_->length,
    637         base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
    638                    weak_ptr_factory_.GetWeakPtr()));
    639   }
    640 }
    641 
    642 void SyncSchedulerImpl::Stop() {
    643   DCHECK(CalledOnValidThread());
    644   SDVLOG(2) << "Stop called";
    645 
    646   // Kill any in-flight method calls.
    647   weak_ptr_factory_.InvalidateWeakPtrs();
    648   wait_interval_.reset();
    649   NotifyRetryTime(base::Time());
    650   poll_timer_.Stop();
    651   pending_wakeup_timer_.Stop();
    652   pending_configure_params_.reset();
    653   if (started_)
    654     started_ = false;
    655 }
    656 
    657 // This is the only place where we invoke DoSyncSessionJob with canary
    658 // privileges.  Everyone else should use NORMAL_PRIORITY.
    659 void SyncSchedulerImpl::TryCanaryJob() {
    660   next_sync_session_job_priority_ = CANARY_PRIORITY;
    661   TrySyncSessionJob();
    662 }
    663 
    664 void SyncSchedulerImpl::TrySyncSessionJob() {
    665   // Post call to TrySyncSessionJobImpl on current thread. Later request for
    666   // access token will be here.
    667   base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
    668       &SyncSchedulerImpl::TrySyncSessionJobImpl,
    669       weak_ptr_factory_.GetWeakPtr()));
    670 }
    671 
    672 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
    673   JobPriority priority = next_sync_session_job_priority_;
    674   next_sync_session_job_priority_ = NORMAL_PRIORITY;
    675 
    676   nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
    677 
    678   DCHECK(CalledOnValidThread());
    679   if (mode_ == CONFIGURATION_MODE) {
    680     if (pending_configure_params_) {
    681       SDVLOG(2) << "Found pending configure job";
    682       DoConfigurationSyncSessionJob(priority);
    683     }
    684   } else if (CanRunNudgeJobNow(priority)) {
    685     if (nudge_tracker_.IsSyncRequired()) {
    686       SDVLOG(2) << "Found pending nudge job";
    687       DoNudgeSyncSessionJob(priority);
    688     } else if (do_poll_after_credentials_updated_ ||
    689         ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) {
    690       DoPollSyncSessionJob();
    691       // Poll timer fires infrequently. Usually by this time access token is
    692       // already expired and poll job will fail with auth error. Set flag to
    693       // retry poll once ProfileSyncService gets new access token, TryCanaryJob
    694       // will be called after access token is retrieved.
    695       if (HttpResponse::SYNC_AUTH_ERROR ==
    696           session_context_->connection_manager()->server_status()) {
    697         do_poll_after_credentials_updated_ = true;
    698       }
    699     }
    700   }
    701 
    702   if (priority == CANARY_PRIORITY) {
    703     // If this is canary job then whatever result was don't run poll job till
    704     // the next time poll timer fires.
    705     do_poll_after_credentials_updated_ = false;
    706   }
    707 
    708   if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
    709     // If we succeeded, our wait interval would have been cleared.  If it hasn't
    710     // been cleared, then we should increase our backoff interval and schedule
    711     // another retry.
    712     TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
    713     wait_interval_.reset(
    714       new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
    715     SDVLOG(2) << "Sync cycle failed.  Will back off for "
    716         << wait_interval_->length.InMilliseconds() << "ms.";
    717     RestartWaiting();
    718   }
    719 }
    720 
    721 void SyncSchedulerImpl::PollTimerCallback() {
    722   DCHECK(CalledOnValidThread());
    723   if (no_scheduling_allowed_) {
    724     // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
    725     // functions that are called only on the sync thread.  This function is also
    726     // called only on the sync thread, and only when it is posted by an expiring
    727     // timer.  If we find that no_scheduling_allowed_ is set here, then
    728     // something is very wrong.  Maybe someone mistakenly called us directly, or
    729     // mishandled the book-keeping for no_scheduling_allowed_.
    730     NOTREACHED() << "Illegal to schedule job while session in progress.";
    731     return;
    732   }
    733 
    734   TrySyncSessionJob();
    735 }
    736 
    737 void SyncSchedulerImpl::RetryTimerCallback() {
    738   TrySyncSessionJob();
    739 }
    740 
    741 void SyncSchedulerImpl::Unthrottle() {
    742   DCHECK(CalledOnValidThread());
    743   DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
    744 
    745   // We're no longer throttled, so clear the wait interval.
    746   wait_interval_.reset();
    747   NotifyRetryTime(base::Time());
    748   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
    749 
    750   // We treat this as a 'canary' in the sense that it was originally scheduled
    751   // to run some time ago, failed, and we now want to retry, versus a job that
    752   // was just created (e.g via ScheduleNudgeImpl). The main implication is
    753   // that we're careful to update routing info (etc) with such potentially
    754   // stale canary jobs.
    755   TryCanaryJob();
    756 }
    757 
    758 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
    759   DCHECK(CalledOnValidThread());
    760   nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
    761   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
    762 
    763   if (nudge_tracker_.IsAnyTypeThrottled()) {
    764     const base::TimeTicks now = base::TimeTicks::Now();
    765     base::TimeDelta time_until_next_unthrottle =
    766         nudge_tracker_.GetTimeUntilNextUnthrottle(now);
    767     type_unthrottle_timer_.Start(
    768         FROM_HERE,
    769         time_until_next_unthrottle,
    770         base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
    771                    weak_ptr_factory_.GetWeakPtr(),
    772                    now + time_until_next_unthrottle));
    773   }
    774 
    775   // Maybe this is a good time to run a nudge job.  Let's try it.
    776   if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
    777     TrySyncSessionJob();
    778 }
    779 
    780 void SyncSchedulerImpl::PerformDelayedNudge() {
    781   // Circumstances may have changed since we scheduled this delayed nudge.
    782   // We must check to see if it's OK to run the job before we do so.
    783   if (CanRunNudgeJobNow(NORMAL_PRIORITY))
    784     TrySyncSessionJob();
    785 
    786   // We're not responsible for setting up any retries here.  The functions that
    787   // first put us into a state that prevents successful sync cycles (eg. global
    788   // throttling, type throttling, network errors, transient errors) will also
    789   // setup the appropriate retry logic (eg. retry after timeout, exponential
    790   // backoff, retry when the network changes).
    791 }
    792 
    793 void SyncSchedulerImpl::ExponentialBackoffRetry() {
    794   TryCanaryJob();
    795 }
    796 
    797 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
    798   FOR_EACH_OBSERVER(SyncEngineEventListener,
    799                     *session_context_->listeners(),
    800                     OnRetryTimeChanged(retry_time));
    801 }
    802 
    803 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
    804   FOR_EACH_OBSERVER(SyncEngineEventListener,
    805                     *session_context_->listeners(),
    806                     OnThrottledTypesChanged(types));
    807 }
    808 
    809 bool SyncSchedulerImpl::IsBackingOff() const {
    810   DCHECK(CalledOnValidThread());
    811   return wait_interval_.get() && wait_interval_->mode ==
    812       WaitInterval::EXPONENTIAL_BACKOFF;
    813 }
    814 
    815 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
    816   DCHECK(CalledOnValidThread());
    817   wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
    818                                         throttle_duration));
    819   NotifyRetryTime(base::Time::Now() + wait_interval_->length);
    820   NotifyThrottledTypesChanged(ModelTypeSet::All());
    821 }
    822 
    823 void SyncSchedulerImpl::OnTypesThrottled(
    824     ModelTypeSet types,
    825     const base::TimeDelta& throttle_duration) {
    826   base::TimeTicks now = base::TimeTicks::Now();
    827 
    828   nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
    829   base::TimeDelta time_until_next_unthrottle =
    830       nudge_tracker_.GetTimeUntilNextUnthrottle(now);
    831   type_unthrottle_timer_.Start(
    832       FROM_HERE,
    833       time_until_next_unthrottle,
    834       base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
    835                  weak_ptr_factory_.GetWeakPtr(),
    836                  now + time_until_next_unthrottle));
    837   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
    838 }
    839 
    840 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
    841   DCHECK(CalledOnValidThread());
    842   return wait_interval_.get() && wait_interval_->mode ==
    843       WaitInterval::THROTTLED;
    844 }
    845 
    846 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
    847     const base::TimeDelta& new_interval) {
    848   DCHECK(CalledOnValidThread());
    849   syncer_short_poll_interval_seconds_ = new_interval;
    850 }
    851 
    852 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
    853     const base::TimeDelta& new_interval) {
    854   DCHECK(CalledOnValidThread());
    855   syncer_long_poll_interval_seconds_ = new_interval;
    856 }
    857 
    858 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
    859     const base::TimeDelta& new_delay) {
    860   DCHECK(CalledOnValidThread());
    861   sessions_commit_delay_ = new_delay;
    862 }
    863 
    864 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
    865   if (size > 0)
    866     nudge_tracker_.SetHintBufferSize(size);
    867   else
    868     NOTREACHED() << "Hint buffer size should be > 0.";
    869 }
    870 
    871 void SyncSchedulerImpl::OnSyncProtocolError(
    872     const SyncProtocolError& sync_protocol_error) {
    873   DCHECK(CalledOnValidThread());
    874   if (ShouldRequestEarlyExit(sync_protocol_error)) {
    875     SDVLOG(2) << "Sync Scheduler requesting early exit.";
    876     Stop();
    877   }
    878   if (IsActionableError(sync_protocol_error)) {
    879     SDVLOG(2) << "OnActionableError";
    880     FOR_EACH_OBSERVER(SyncEngineEventListener,
    881                       *session_context_->listeners(),
    882                       OnActionableError(sync_protocol_error));
    883   }
    884 }
    885 
    886 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
    887   nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
    888   retry_timer_.Start(FROM_HERE, delay, this,
    889                      &SyncSchedulerImpl::RetryTimerCallback);
    890 }
    891 
    892 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
    893     FOR_EACH_OBSERVER(SyncEngineEventListener,
    894                       *session_context_->listeners(),
    895                       OnMigrationRequested(types));
    896 }
    897 
    898 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
    899   DCHECK(CalledOnValidThread());
    900   session_context_->set_notifications_enabled(notifications_enabled);
    901   if (notifications_enabled)
    902     nudge_tracker_.OnInvalidationsEnabled();
    903   else
    904     nudge_tracker_.OnInvalidationsDisabled();
    905 }
    906 
    907 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const {
    908   DCHECK(CalledOnValidThread());
    909   return sessions_commit_delay_;
    910 }
    911 
    912 #undef SDVLOG_LOC
    913 
    914 #undef SDVLOG
    915 
    916 #undef SLOG
    917 
    918 #undef ENUM_CASE
    919 
    920 }  // namespace syncer
    921