Home | History | Annotate | Download | only in engine
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/engine/sync_scheduler_impl.h"
      6 
      7 #include <algorithm>
      8 #include <cstring>
      9 
     10 #include "base/auto_reset.h"
     11 #include "base/bind.h"
     12 #include "base/bind_helpers.h"
     13 #include "base/compiler_specific.h"
     14 #include "base/location.h"
     15 #include "base/logging.h"
     16 #include "base/message_loop/message_loop.h"
     17 #include "sync/engine/backoff_delay_provider.h"
     18 #include "sync/engine/syncer.h"
     19 #include "sync/protocol/proto_enum_conversions.h"
     20 #include "sync/protocol/sync.pb.h"
     21 #include "sync/util/data_type_histogram.h"
     22 #include "sync/util/logging.h"
     23 
     24 using base::TimeDelta;
     25 using base::TimeTicks;
     26 
     27 namespace syncer {
     28 
     29 using sessions::SyncSession;
     30 using sessions::SyncSessionSnapshot;
     31 using sync_pb::GetUpdatesCallerInfo;
     32 
     33 namespace {
     34 
     35 bool IsConfigRelatedUpdateSourceValue(
     36     GetUpdatesCallerInfo::GetUpdatesSource source) {
     37   switch (source) {
     38     case GetUpdatesCallerInfo::RECONFIGURATION:
     39     case GetUpdatesCallerInfo::MIGRATION:
     40     case GetUpdatesCallerInfo::NEW_CLIENT:
     41     case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
     42     case GetUpdatesCallerInfo::PROGRAMMATIC:
     43       return true;
     44     default:
     45       return false;
     46   }
     47 }
     48 
     49 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
     50   switch (error.error_type) {
     51     case SYNC_SUCCESS:
     52     case MIGRATION_DONE:
     53     case THROTTLED:
     54     case TRANSIENT_ERROR:
     55       return false;
     56     case NOT_MY_BIRTHDAY:
     57     case CLEAR_PENDING:
     58     case DISABLED_BY_ADMIN:
     59     case USER_ROLLBACK:
     60       // If we send terminate sync early then |sync_cycle_ended| notification
     61       // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
     62       // notification wouldnt be sent either. Then the UI layer would be left
     63       // waiting forever. So assert we would send something.
     64       DCHECK_NE(error.action, UNKNOWN_ACTION);
     65       return true;
     66     case INVALID_CREDENTIAL:
     67       // The notification for this is handled by PostAndProcessHeaders|.
     68       // Server does no have to send any action for this.
     69       return true;
     70     // Make the default a NOTREACHED. So if a new error is introduced we
     71     // think about its expected functionality.
     72     default:
     73       NOTREACHED();
     74       return false;
     75   }
     76 }
     77 
     78 bool IsActionableError(
     79     const SyncProtocolError& error) {
     80   return (error.action != UNKNOWN_ACTION);
     81 }
     82 
     83 }  // namespace
     84 
     85 ConfigurationParams::ConfigurationParams()
     86     : source(GetUpdatesCallerInfo::UNKNOWN) {}
     87 ConfigurationParams::ConfigurationParams(
     88     const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
     89     ModelTypeSet types_to_download,
     90     const ModelSafeRoutingInfo& routing_info,
     91     const base::Closure& ready_task,
     92     const base::Closure& retry_task)
     93     : source(source),
     94       types_to_download(types_to_download),
     95       routing_info(routing_info),
     96       ready_task(ready_task),
     97       retry_task(retry_task) {
     98   DCHECK(!ready_task.is_null());
     99   DCHECK(!retry_task.is_null());
    100 }
    101 ConfigurationParams::~ConfigurationParams() {}
    102 
    103 SyncSchedulerImpl::WaitInterval::WaitInterval()
    104     : mode(UNKNOWN) {}
    105 
    106 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
    107     : mode(mode), length(length) {}
    108 
    109 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
    110 
    111 #define ENUM_CASE(x) case x: return #x; break;
    112 
    113 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
    114   switch (mode) {
    115     ENUM_CASE(UNKNOWN);
    116     ENUM_CASE(EXPONENTIAL_BACKOFF);
    117     ENUM_CASE(THROTTLED);
    118   }
    119   NOTREACHED();
    120   return "";
    121 }
    122 
    123 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
    124     NudgeSource source) {
    125   switch (source) {
    126     case NUDGE_SOURCE_NOTIFICATION:
    127       return GetUpdatesCallerInfo::NOTIFICATION;
    128     case NUDGE_SOURCE_LOCAL:
    129       return GetUpdatesCallerInfo::LOCAL;
    130     case NUDGE_SOURCE_LOCAL_REFRESH:
    131       return GetUpdatesCallerInfo::DATATYPE_REFRESH;
    132     case NUDGE_SOURCE_UNKNOWN:
    133       return GetUpdatesCallerInfo::UNKNOWN;
    134     default:
    135       NOTREACHED();
    136       return GetUpdatesCallerInfo::UNKNOWN;
    137   }
    138 }
    139 
    140 // Helper macros to log with the syncer thread name; useful when there
    141 // are multiple syncer threads involved.
    142 
    143 #define SLOG(severity) LOG(severity) << name_ << ": "
    144 
    145 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
    146 
    147 #define SDVLOG_LOC(from_here, verbose_level)             \
    148   DVLOG_LOC(from_here, verbose_level) << name_ << ": "
    149 
    150 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
    151                                      BackoffDelayProvider* delay_provider,
    152                                      sessions::SyncSessionContext* context,
    153                                      Syncer* syncer)
    154     : name_(name),
    155       started_(false),
    156       syncer_short_poll_interval_seconds_(
    157           TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
    158       syncer_long_poll_interval_seconds_(
    159           TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
    160       mode_(NORMAL_MODE),
    161       delay_provider_(delay_provider),
    162       syncer_(syncer),
    163       session_context_(context),
    164       no_scheduling_allowed_(false),
    165       do_poll_after_credentials_updated_(false),
    166       next_sync_session_job_priority_(NORMAL_PRIORITY),
    167       weak_ptr_factory_(this),
    168       weak_ptr_factory_for_weak_handle_(this) {
    169   weak_handle_this_ = MakeWeakHandle(
    170       weak_ptr_factory_for_weak_handle_.GetWeakPtr());
    171 }
    172 
    173 SyncSchedulerImpl::~SyncSchedulerImpl() {
    174   DCHECK(CalledOnValidThread());
    175   Stop();
    176 }
    177 
    178 void SyncSchedulerImpl::OnCredentialsUpdated() {
    179   DCHECK(CalledOnValidThread());
    180 
    181   if (HttpResponse::SYNC_AUTH_ERROR ==
    182       session_context_->connection_manager()->server_status()) {
    183     OnServerConnectionErrorFixed();
    184   }
    185 }
    186 
    187 void SyncSchedulerImpl::OnConnectionStatusChange() {
    188   if (HttpResponse::CONNECTION_UNAVAILABLE  ==
    189       session_context_->connection_manager()->server_status()) {
    190     // Optimistically assume that the connection is fixed and try
    191     // connecting.
    192     OnServerConnectionErrorFixed();
    193   }
    194 }
    195 
    196 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
    197   // There could be a pending nudge or configuration job in several cases:
    198   //
    199   // 1. We're in exponential backoff.
    200   // 2. We're silenced / throttled.
    201   // 3. A nudge was saved previously due to not having a valid auth token.
    202   // 4. A nudge was scheduled + saved while in configuration mode.
    203   //
    204   // In all cases except (2), we want to retry contacting the server. We
    205   // call TryCanaryJob to achieve this, and note that nothing -- not even a
    206   // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
    207   // has the authority to do that is the Unthrottle timer.
    208   TryCanaryJob();
    209 }
    210 
    211 void SyncSchedulerImpl::Start(Mode mode) {
    212   DCHECK(CalledOnValidThread());
    213   std::string thread_name = base::MessageLoop::current()->thread_name();
    214   if (thread_name.empty())
    215     thread_name = "<Main thread>";
    216   SDVLOG(2) << "Start called from thread "
    217             << thread_name << " with mode " << GetModeString(mode);
    218   if (!started_) {
    219     started_ = true;
    220     SendInitialSnapshot();
    221   }
    222 
    223   DCHECK(!session_context_->account_name().empty());
    224   DCHECK(syncer_.get());
    225   Mode old_mode = mode_;
    226   mode_ = mode;
    227   AdjustPolling(UPDATE_INTERVAL);  // Will kick start poll timer if needed.
    228 
    229   if (old_mode != mode_ && mode_ == NORMAL_MODE) {
    230     // We just got back to normal mode.  Let's try to run the work that was
    231     // queued up while we were configuring.
    232 
    233     // Update our current time before checking IsRetryRequired().
    234     nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
    235     if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
    236       TrySyncSessionJob();
    237     }
    238   }
    239 }
    240 
    241 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
    242   ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
    243   ModelTypeSet enabled_protocol_types =
    244       Intersection(ProtocolTypes(), enabled_types);
    245   ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
    246   return Difference(enabled_protocol_types, throttled_types);
    247 }
    248 
    249 void SyncSchedulerImpl::SendInitialSnapshot() {
    250   DCHECK(CalledOnValidThread());
    251   scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
    252   SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
    253   event.snapshot = dummy->TakeSnapshot();
    254   FOR_EACH_OBSERVER(SyncEngineEventListener,
    255                     *session_context_->listeners(),
    256                     OnSyncCycleEvent(event));
    257 }
    258 
    259 namespace {
    260 
    261 // Helper to extract the routing info corresponding to types in
    262 // |types_to_download| from |current_routes|.
    263 void BuildModelSafeParams(
    264     ModelTypeSet types_to_download,
    265     const ModelSafeRoutingInfo& current_routes,
    266     ModelSafeRoutingInfo* result_routes) {
    267   for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
    268        iter.Inc()) {
    269     ModelType type = iter.Get();
    270     ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
    271     DCHECK(route != current_routes.end());
    272     ModelSafeGroup group = route->second;
    273     (*result_routes)[type] = group;
    274   }
    275 }
    276 
    277 }  // namespace.
    278 
    279 void SyncSchedulerImpl::ScheduleConfiguration(
    280     const ConfigurationParams& params) {
    281   DCHECK(CalledOnValidThread());
    282   DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
    283   DCHECK_EQ(CONFIGURATION_MODE, mode_);
    284   DCHECK(!params.ready_task.is_null());
    285   CHECK(started_) << "Scheduler must be running to configure.";
    286   SDVLOG(2) << "Reconfiguring syncer.";
    287 
    288   // Only one configuration is allowed at a time. Verify we're not waiting
    289   // for a pending configure job.
    290   DCHECK(!pending_configure_params_);
    291 
    292   ModelSafeRoutingInfo restricted_routes;
    293   BuildModelSafeParams(params.types_to_download,
    294                        params.routing_info,
    295                        &restricted_routes);
    296   session_context_->SetRoutingInfo(restricted_routes);
    297 
    298   // Only reconfigure if we have types to download.
    299   if (!params.types_to_download.Empty()) {
    300     pending_configure_params_.reset(new ConfigurationParams(params));
    301     TrySyncSessionJob();
    302   } else {
    303     SDVLOG(2) << "No change in routing info, calling ready task directly.";
    304     params.ready_task.Run();
    305   }
    306 }
    307 
    308 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
    309   DCHECK(CalledOnValidThread());
    310   if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
    311     SDVLOG(1) << "Unable to run a job because we're throttled.";
    312     return false;
    313   }
    314 
    315   if (wait_interval_
    316       && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
    317       && priority != CANARY_PRIORITY) {
    318     SDVLOG(1) << "Unable to run a job because we're backing off.";
    319     return false;
    320   }
    321 
    322   if (session_context_->connection_manager()->HasInvalidAuthToken()) {
    323     SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
    324     return false;
    325   }
    326 
    327   return true;
    328 }
    329 
    330 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
    331   DCHECK(CalledOnValidThread());
    332 
    333   if (!CanRunJobNow(priority)) {
    334     SDVLOG(1) << "Unable to run a nudge job right now";
    335     return false;
    336   }
    337 
    338   const ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
    339   if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
    340     SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
    341     return false;
    342   }
    343 
    344   if (mode_ == CONFIGURATION_MODE) {
    345     SDVLOG(1) << "Not running nudge because we're in configuration mode.";
    346     return false;
    347   }
    348 
    349   return true;
    350 }
    351 
    352 void SyncSchedulerImpl::ScheduleLocalNudge(
    353     ModelTypeSet types,
    354     const tracked_objects::Location& nudge_location) {
    355   DCHECK(CalledOnValidThread());
    356   DCHECK(!types.Empty());
    357 
    358   SDVLOG_LOC(nudge_location, 2)
    359       << "Scheduling sync because of local change to "
    360       << ModelTypeSetToString(types);
    361   UpdateNudgeTimeRecords(types);
    362   base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalChange(types);
    363   ScheduleNudgeImpl(nudge_delay, nudge_location);
    364 }
    365 
    366 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
    367     ModelTypeSet types,
    368     const tracked_objects::Location& nudge_location) {
    369   DCHECK(CalledOnValidThread());
    370   DCHECK(!types.Empty());
    371 
    372   SDVLOG_LOC(nudge_location, 2)
    373       << "Scheduling sync because of local refresh request for "
    374       << ModelTypeSetToString(types);
    375   base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalRefreshRequest(types);
    376   ScheduleNudgeImpl(nudge_delay, nudge_location);
    377 }
    378 
    379 void SyncSchedulerImpl::ScheduleInvalidationNudge(
    380     syncer::ModelType model_type,
    381     scoped_ptr<InvalidationInterface> invalidation,
    382     const tracked_objects::Location& nudge_location) {
    383   DCHECK(CalledOnValidThread());
    384 
    385   SDVLOG_LOC(nudge_location, 2)
    386       << "Scheduling sync because we received invalidation for "
    387       << ModelTypeToString(model_type);
    388   base::TimeDelta nudge_delay =
    389       nudge_tracker_.RecordRemoteInvalidation(model_type, invalidation.Pass());
    390   ScheduleNudgeImpl(nudge_delay, nudge_location);
    391 }
    392 
    393 void SyncSchedulerImpl::ScheduleInitialSyncNudge(syncer::ModelType model_type) {
    394   DCHECK(CalledOnValidThread());
    395 
    396   SDVLOG(2) << "Scheduling non-blocking initial sync for "
    397             << ModelTypeToString(model_type);
    398   nudge_tracker_.RecordInitialSyncRequired(model_type);
    399   ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE);
    400 }
    401 
    402 // TODO(zea): Consider adding separate throttling/backoff for datatype
    403 // refresh requests.
    404 void SyncSchedulerImpl::ScheduleNudgeImpl(
    405     const TimeDelta& delay,
    406     const tracked_objects::Location& nudge_location) {
    407   DCHECK(CalledOnValidThread());
    408 
    409   if (no_scheduling_allowed_) {
    410     NOTREACHED() << "Illegal to schedule job while session in progress.";
    411     return;
    412   }
    413 
    414   if (!started_) {
    415     SDVLOG_LOC(nudge_location, 2)
    416         << "Dropping nudge, scheduler is not running.";
    417     return;
    418   }
    419 
    420   SDVLOG_LOC(nudge_location, 2)
    421       << "In ScheduleNudgeImpl with delay "
    422       << delay.InMilliseconds() << " ms";
    423 
    424   if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
    425     return;
    426 
    427   TimeTicks incoming_run_time = TimeTicks::Now() + delay;
    428   if (!scheduled_nudge_time_.is_null() &&
    429     (scheduled_nudge_time_ < incoming_run_time)) {
    430     // Old job arrives sooner than this one.  Don't reschedule it.
    431     return;
    432   }
    433 
    434   // Either there is no existing nudge in flight or the incoming nudge should be
    435   // made to arrive first (preempt) the existing nudge.  We reschedule in either
    436   // case.
    437   SDVLOG_LOC(nudge_location, 2)
    438       << "Scheduling a nudge with "
    439       << delay.InMilliseconds() << " ms delay";
    440   scheduled_nudge_time_ = incoming_run_time;
    441   pending_wakeup_timer_.Start(
    442       nudge_location,
    443       delay,
    444       base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
    445                  weak_ptr_factory_.GetWeakPtr()));
    446 }
    447 
    448 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
    449   switch (mode) {
    450     ENUM_CASE(CONFIGURATION_MODE);
    451     ENUM_CASE(NORMAL_MODE);
    452   }
    453   return "";
    454 }
    455 
    456 void SyncSchedulerImpl::SetDefaultNudgeDelay(base::TimeDelta delay_ms) {
    457   DCHECK(CalledOnValidThread());
    458   nudge_tracker_.SetDefaultNudgeDelay(delay_ms);
    459 }
    460 
    461 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
    462   DCHECK(CalledOnValidThread());
    463   DCHECK(CanRunNudgeJobNow(priority));
    464 
    465   DVLOG(2) << "Will run normal mode sync cycle with types "
    466            << ModelTypeSetToString(session_context_->GetEnabledTypes());
    467   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
    468   bool premature_exit = !syncer_->NormalSyncShare(
    469       GetEnabledAndUnthrottledTypes(),
    470       nudge_tracker_,
    471       session.get());
    472   AdjustPolling(FORCE_RESET);
    473   // Don't run poll job till the next time poll timer fires.
    474   do_poll_after_credentials_updated_ = false;
    475 
    476   bool success = !premature_exit
    477       && !sessions::HasSyncerError(
    478           session->status_controller().model_neutral_state());
    479 
    480   if (success) {
    481     // That cycle took care of any outstanding work we had.
    482     SDVLOG(2) << "Nudge succeeded.";
    483     nudge_tracker_.RecordSuccessfulSyncCycle();
    484     scheduled_nudge_time_ = base::TimeTicks();
    485 
    486     // If we're here, then we successfully reached the server.  End all backoff.
    487     wait_interval_.reset();
    488     NotifyRetryTime(base::Time());
    489   } else {
    490     HandleFailure(session->status_controller().model_neutral_state());
    491   }
    492 }
    493 
    494 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
    495   DCHECK(CalledOnValidThread());
    496   DCHECK_EQ(mode_, CONFIGURATION_MODE);
    497   DCHECK(pending_configure_params_ != NULL);
    498 
    499   if (!CanRunJobNow(priority)) {
    500     SDVLOG(2) << "Unable to run configure job right now.";
    501     if (!pending_configure_params_->retry_task.is_null()) {
    502       pending_configure_params_->retry_task.Run();
    503       pending_configure_params_->retry_task.Reset();
    504     }
    505     return;
    506   }
    507 
    508   SDVLOG(2) << "Will run configure SyncShare with types "
    509             << ModelTypeSetToString(session_context_->GetEnabledTypes());
    510   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
    511   bool premature_exit = !syncer_->ConfigureSyncShare(
    512       pending_configure_params_->types_to_download,
    513       pending_configure_params_->source,
    514       session.get());
    515   AdjustPolling(FORCE_RESET);
    516   // Don't run poll job till the next time poll timer fires.
    517   do_poll_after_credentials_updated_ = false;
    518 
    519   bool success = !premature_exit
    520       && !sessions::HasSyncerError(
    521           session->status_controller().model_neutral_state());
    522 
    523   if (success) {
    524     SDVLOG(2) << "Configure succeeded.";
    525     pending_configure_params_->ready_task.Run();
    526     pending_configure_params_.reset();
    527 
    528     // If we're here, then we successfully reached the server.  End all backoff.
    529     wait_interval_.reset();
    530     NotifyRetryTime(base::Time());
    531   } else {
    532     HandleFailure(session->status_controller().model_neutral_state());
    533     // Sync cycle might receive response from server that causes scheduler to
    534     // stop and draws pending_configure_params_ invalid.
    535     if (started_ && !pending_configure_params_->retry_task.is_null()) {
    536       pending_configure_params_->retry_task.Run();
    537       pending_configure_params_->retry_task.Reset();
    538     }
    539   }
    540 }
    541 
    542 void SyncSchedulerImpl::HandleFailure(
    543     const sessions::ModelNeutralState& model_neutral_state) {
    544   if (IsCurrentlyThrottled()) {
    545     SDVLOG(2) << "Was throttled during previous sync cycle.";
    546     RestartWaiting();
    547   } else if (!IsBackingOff()) {
    548     // Setup our backoff if this is our first such failure.
    549     TimeDelta length = delay_provider_->GetDelay(
    550         delay_provider_->GetInitialDelay(model_neutral_state));
    551     wait_interval_.reset(
    552         new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
    553     SDVLOG(2) << "Sync cycle failed.  Will back off for "
    554         << wait_interval_->length.InMilliseconds() << "ms.";
    555     RestartWaiting();
    556   }
    557 }
    558 
    559 void SyncSchedulerImpl::DoPollSyncSessionJob() {
    560   base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
    561 
    562   SDVLOG(2) << "Polling with types "
    563             << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
    564   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
    565   syncer_->PollSyncShare(
    566       GetEnabledAndUnthrottledTypes(),
    567       session.get());
    568 
    569   AdjustPolling(FORCE_RESET);
    570 
    571   if (IsCurrentlyThrottled()) {
    572     SDVLOG(2) << "Poll request got us throttled.";
    573     // The OnSilencedUntil() call set up the WaitInterval for us.  All we need
    574     // to do is start the timer.
    575     RestartWaiting();
    576   }
    577 }
    578 
    579 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
    580   DCHECK(CalledOnValidThread());
    581   base::TimeTicks now = TimeTicks::Now();
    582   // Update timing information for how often datatypes are triggering nudges.
    583   for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
    584     base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
    585     last_local_nudges_by_model_type_[iter.Get()] = now;
    586     if (previous.is_null())
    587       continue;
    588 
    589 #define PER_DATA_TYPE_MACRO(type_str) \
    590     SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
    591     SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
    592 #undef PER_DATA_TYPE_MACRO
    593   }
    594 }
    595 
    596 TimeDelta SyncSchedulerImpl::GetPollInterval() {
    597   return (!session_context_->notifications_enabled() ||
    598           !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
    599       syncer_short_poll_interval_seconds_ :
    600       syncer_long_poll_interval_seconds_;
    601 }
    602 
    603 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
    604   DCHECK(CalledOnValidThread());
    605 
    606   TimeDelta poll = GetPollInterval();
    607   bool rate_changed = !poll_timer_.IsRunning() ||
    608                        poll != poll_timer_.GetCurrentDelay();
    609 
    610   if (type == FORCE_RESET) {
    611     last_poll_reset_ = base::TimeTicks::Now();
    612     if (!rate_changed)
    613       poll_timer_.Reset();
    614   }
    615 
    616   if (!rate_changed)
    617     return;
    618 
    619   // Adjust poll rate.
    620   poll_timer_.Stop();
    621   poll_timer_.Start(FROM_HERE, poll, this,
    622                     &SyncSchedulerImpl::PollTimerCallback);
    623 }
    624 
    625 void SyncSchedulerImpl::RestartWaiting() {
    626   CHECK(wait_interval_.get());
    627   DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
    628   NotifyRetryTime(base::Time::Now() + wait_interval_->length);
    629   SDVLOG(2) << "Starting WaitInterval timer of length "
    630       << wait_interval_->length.InMilliseconds() << "ms.";
    631   if (wait_interval_->mode == WaitInterval::THROTTLED) {
    632     pending_wakeup_timer_.Start(
    633         FROM_HERE,
    634         wait_interval_->length,
    635         base::Bind(&SyncSchedulerImpl::Unthrottle,
    636                    weak_ptr_factory_.GetWeakPtr()));
    637   } else {
    638     pending_wakeup_timer_.Start(
    639         FROM_HERE,
    640         wait_interval_->length,
    641         base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
    642                    weak_ptr_factory_.GetWeakPtr()));
    643   }
    644 }
    645 
    646 void SyncSchedulerImpl::Stop() {
    647   DCHECK(CalledOnValidThread());
    648   SDVLOG(2) << "Stop called";
    649 
    650   // Kill any in-flight method calls.
    651   weak_ptr_factory_.InvalidateWeakPtrs();
    652   wait_interval_.reset();
    653   NotifyRetryTime(base::Time());
    654   poll_timer_.Stop();
    655   pending_wakeup_timer_.Stop();
    656   pending_configure_params_.reset();
    657   if (started_)
    658     started_ = false;
    659 }
    660 
    661 // This is the only place where we invoke DoSyncSessionJob with canary
    662 // privileges.  Everyone else should use NORMAL_PRIORITY.
    663 void SyncSchedulerImpl::TryCanaryJob() {
    664   next_sync_session_job_priority_ = CANARY_PRIORITY;
    665   TrySyncSessionJob();
    666 }
    667 
    668 void SyncSchedulerImpl::TrySyncSessionJob() {
    669   // Post call to TrySyncSessionJobImpl on current thread. Later request for
    670   // access token will be here.
    671   base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
    672       &SyncSchedulerImpl::TrySyncSessionJobImpl,
    673       weak_ptr_factory_.GetWeakPtr()));
    674 }
    675 
    676 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
    677   JobPriority priority = next_sync_session_job_priority_;
    678   next_sync_session_job_priority_ = NORMAL_PRIORITY;
    679 
    680   nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
    681 
    682   DCHECK(CalledOnValidThread());
    683   if (mode_ == CONFIGURATION_MODE) {
    684     if (pending_configure_params_) {
    685       SDVLOG(2) << "Found pending configure job";
    686       DoConfigurationSyncSessionJob(priority);
    687     }
    688   } else if (CanRunNudgeJobNow(priority)) {
    689     if (nudge_tracker_.IsSyncRequired()) {
    690       SDVLOG(2) << "Found pending nudge job";
    691       DoNudgeSyncSessionJob(priority);
    692     } else if (do_poll_after_credentials_updated_ ||
    693         ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) {
    694       DoPollSyncSessionJob();
    695       // Poll timer fires infrequently. Usually by this time access token is
    696       // already expired and poll job will fail with auth error. Set flag to
    697       // retry poll once ProfileSyncService gets new access token, TryCanaryJob
    698       // will be called after access token is retrieved.
    699       if (HttpResponse::SYNC_AUTH_ERROR ==
    700           session_context_->connection_manager()->server_status()) {
    701         do_poll_after_credentials_updated_ = true;
    702       }
    703     }
    704   }
    705 
    706   if (priority == CANARY_PRIORITY) {
    707     // If this is canary job then whatever result was don't run poll job till
    708     // the next time poll timer fires.
    709     do_poll_after_credentials_updated_ = false;
    710   }
    711 
    712   if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
    713     // If we succeeded, our wait interval would have been cleared.  If it hasn't
    714     // been cleared, then we should increase our backoff interval and schedule
    715     // another retry.
    716     TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
    717     wait_interval_.reset(
    718       new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
    719     SDVLOG(2) << "Sync cycle failed.  Will back off for "
    720         << wait_interval_->length.InMilliseconds() << "ms.";
    721     RestartWaiting();
    722   }
    723 }
    724 
    725 void SyncSchedulerImpl::PollTimerCallback() {
    726   DCHECK(CalledOnValidThread());
    727   if (no_scheduling_allowed_) {
    728     // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
    729     // functions that are called only on the sync thread.  This function is also
    730     // called only on the sync thread, and only when it is posted by an expiring
    731     // timer.  If we find that no_scheduling_allowed_ is set here, then
    732     // something is very wrong.  Maybe someone mistakenly called us directly, or
    733     // mishandled the book-keeping for no_scheduling_allowed_.
    734     NOTREACHED() << "Illegal to schedule job while session in progress.";
    735     return;
    736   }
    737 
    738   TrySyncSessionJob();
    739 }
    740 
    741 void SyncSchedulerImpl::RetryTimerCallback() {
    742   TrySyncSessionJob();
    743 }
    744 
    745 void SyncSchedulerImpl::Unthrottle() {
    746   DCHECK(CalledOnValidThread());
    747   DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
    748 
    749   // We're no longer throttled, so clear the wait interval.
    750   wait_interval_.reset();
    751   NotifyRetryTime(base::Time());
    752   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
    753 
    754   // We treat this as a 'canary' in the sense that it was originally scheduled
    755   // to run some time ago, failed, and we now want to retry, versus a job that
    756   // was just created (e.g via ScheduleNudgeImpl). The main implication is
    757   // that we're careful to update routing info (etc) with such potentially
    758   // stale canary jobs.
    759   TryCanaryJob();
    760 }
    761 
    762 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
    763   DCHECK(CalledOnValidThread());
    764   nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
    765   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
    766 
    767   if (nudge_tracker_.IsAnyTypeThrottled()) {
    768     const base::TimeTicks now = base::TimeTicks::Now();
    769     base::TimeDelta time_until_next_unthrottle =
    770         nudge_tracker_.GetTimeUntilNextUnthrottle(now);
    771     type_unthrottle_timer_.Start(
    772         FROM_HERE,
    773         time_until_next_unthrottle,
    774         base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
    775                    weak_ptr_factory_.GetWeakPtr(),
    776                    now + time_until_next_unthrottle));
    777   }
    778 
    779   // Maybe this is a good time to run a nudge job.  Let's try it.
    780   if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
    781     TrySyncSessionJob();
    782 }
    783 
    784 void SyncSchedulerImpl::PerformDelayedNudge() {
    785   // Circumstances may have changed since we scheduled this delayed nudge.
    786   // We must check to see if it's OK to run the job before we do so.
    787   if (CanRunNudgeJobNow(NORMAL_PRIORITY))
    788     TrySyncSessionJob();
    789 
    790   // We're not responsible for setting up any retries here.  The functions that
    791   // first put us into a state that prevents successful sync cycles (eg. global
    792   // throttling, type throttling, network errors, transient errors) will also
    793   // setup the appropriate retry logic (eg. retry after timeout, exponential
    794   // backoff, retry when the network changes).
    795 }
    796 
    797 void SyncSchedulerImpl::ExponentialBackoffRetry() {
    798   TryCanaryJob();
    799 }
    800 
    801 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
    802   FOR_EACH_OBSERVER(SyncEngineEventListener,
    803                     *session_context_->listeners(),
    804                     OnRetryTimeChanged(retry_time));
    805 }
    806 
    807 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
    808   FOR_EACH_OBSERVER(SyncEngineEventListener,
    809                     *session_context_->listeners(),
    810                     OnThrottledTypesChanged(types));
    811 }
    812 
    813 bool SyncSchedulerImpl::IsBackingOff() const {
    814   DCHECK(CalledOnValidThread());
    815   return wait_interval_.get() && wait_interval_->mode ==
    816       WaitInterval::EXPONENTIAL_BACKOFF;
    817 }
    818 
    819 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
    820   DCHECK(CalledOnValidThread());
    821   wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
    822                                         throttle_duration));
    823   NotifyRetryTime(base::Time::Now() + wait_interval_->length);
    824   NotifyThrottledTypesChanged(ModelTypeSet::All());
    825 }
    826 
    827 void SyncSchedulerImpl::OnTypesThrottled(
    828     ModelTypeSet types,
    829     const base::TimeDelta& throttle_duration) {
    830   base::TimeTicks now = base::TimeTicks::Now();
    831 
    832   nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
    833   base::TimeDelta time_until_next_unthrottle =
    834       nudge_tracker_.GetTimeUntilNextUnthrottle(now);
    835   type_unthrottle_timer_.Start(
    836       FROM_HERE,
    837       time_until_next_unthrottle,
    838       base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
    839                  weak_ptr_factory_.GetWeakPtr(),
    840                  now + time_until_next_unthrottle));
    841   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
    842 }
    843 
    844 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
    845   DCHECK(CalledOnValidThread());
    846   return wait_interval_.get() && wait_interval_->mode ==
    847       WaitInterval::THROTTLED;
    848 }
    849 
    850 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
    851     const base::TimeDelta& new_interval) {
    852   DCHECK(CalledOnValidThread());
    853   syncer_short_poll_interval_seconds_ = new_interval;
    854 }
    855 
    856 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
    857     const base::TimeDelta& new_interval) {
    858   DCHECK(CalledOnValidThread());
    859   syncer_long_poll_interval_seconds_ = new_interval;
    860 }
    861 
    862 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays(
    863     const std::map<ModelType, base::TimeDelta>& nudge_delays) {
    864   DCHECK(CalledOnValidThread());
    865   nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays);
    866 }
    867 
    868 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
    869   if (size > 0)
    870     nudge_tracker_.SetHintBufferSize(size);
    871   else
    872     NOTREACHED() << "Hint buffer size should be > 0.";
    873 }
    874 
    875 void SyncSchedulerImpl::OnSyncProtocolError(
    876     const SyncProtocolError& sync_protocol_error) {
    877   DCHECK(CalledOnValidThread());
    878   if (ShouldRequestEarlyExit(sync_protocol_error)) {
    879     SDVLOG(2) << "Sync Scheduler requesting early exit.";
    880     Stop();
    881   }
    882   if (IsActionableError(sync_protocol_error)) {
    883     SDVLOG(2) << "OnActionableError";
    884     FOR_EACH_OBSERVER(SyncEngineEventListener,
    885                       *session_context_->listeners(),
    886                       OnActionableError(sync_protocol_error));
    887   }
    888 }
    889 
    890 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
    891   nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
    892   retry_timer_.Start(FROM_HERE, delay, this,
    893                      &SyncSchedulerImpl::RetryTimerCallback);
    894 }
    895 
    896 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
    897     FOR_EACH_OBSERVER(SyncEngineEventListener,
    898                       *session_context_->listeners(),
    899                       OnMigrationRequested(types));
    900 }
    901 
    902 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
    903   DCHECK(CalledOnValidThread());
    904   session_context_->set_notifications_enabled(notifications_enabled);
    905   if (notifications_enabled)
    906     nudge_tracker_.OnInvalidationsEnabled();
    907   else
    908     nudge_tracker_.OnInvalidationsDisabled();
    909 }
    910 
    911 #undef SDVLOG_LOC
    912 
    913 #undef SDVLOG
    914 
    915 #undef SLOG
    916 
    917 #undef ENUM_CASE
    918 
    919 }  // namespace syncer
    920