1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/engine/sync_scheduler_impl.h" 6 7 #include <algorithm> 8 #include <cstring> 9 10 #include "base/auto_reset.h" 11 #include "base/bind.h" 12 #include "base/bind_helpers.h" 13 #include "base/compiler_specific.h" 14 #include "base/location.h" 15 #include "base/logging.h" 16 #include "base/message_loop/message_loop.h" 17 #include "sync/engine/backoff_delay_provider.h" 18 #include "sync/engine/syncer.h" 19 #include "sync/protocol/proto_enum_conversions.h" 20 #include "sync/protocol/sync.pb.h" 21 #include "sync/util/data_type_histogram.h" 22 #include "sync/util/logging.h" 23 24 using base::TimeDelta; 25 using base::TimeTicks; 26 27 namespace syncer { 28 29 using sessions::SyncSession; 30 using sessions::SyncSessionSnapshot; 31 using sync_pb::GetUpdatesCallerInfo; 32 33 namespace { 34 35 bool IsConfigRelatedUpdateSourceValue( 36 GetUpdatesCallerInfo::GetUpdatesSource source) { 37 switch (source) { 38 case GetUpdatesCallerInfo::RECONFIGURATION: 39 case GetUpdatesCallerInfo::MIGRATION: 40 case GetUpdatesCallerInfo::NEW_CLIENT: 41 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE: 42 case GetUpdatesCallerInfo::PROGRAMMATIC: 43 return true; 44 default: 45 return false; 46 } 47 } 48 49 bool ShouldRequestEarlyExit(const SyncProtocolError& error) { 50 switch (error.error_type) { 51 case SYNC_SUCCESS: 52 case MIGRATION_DONE: 53 case THROTTLED: 54 case TRANSIENT_ERROR: 55 return false; 56 case NOT_MY_BIRTHDAY: 57 case CLEAR_PENDING: 58 case DISABLED_BY_ADMIN: 59 case USER_ROLLBACK: 60 // If we send terminate sync early then |sync_cycle_ended| notification 61 // would not be sent. If there were no actions then |ACTIONABLE_ERROR| 62 // notification wouldnt be sent either. Then the UI layer would be left 63 // waiting forever. So assert we would send something. 64 DCHECK_NE(error.action, UNKNOWN_ACTION); 65 return true; 66 case INVALID_CREDENTIAL: 67 // The notification for this is handled by PostAndProcessHeaders|. 68 // Server does no have to send any action for this. 69 return true; 70 // Make the default a NOTREACHED. So if a new error is introduced we 71 // think about its expected functionality. 72 default: 73 NOTREACHED(); 74 return false; 75 } 76 } 77 78 bool IsActionableError( 79 const SyncProtocolError& error) { 80 return (error.action != UNKNOWN_ACTION); 81 } 82 83 } // namespace 84 85 ConfigurationParams::ConfigurationParams() 86 : source(GetUpdatesCallerInfo::UNKNOWN) {} 87 ConfigurationParams::ConfigurationParams( 88 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, 89 ModelTypeSet types_to_download, 90 const ModelSafeRoutingInfo& routing_info, 91 const base::Closure& ready_task, 92 const base::Closure& retry_task) 93 : source(source), 94 types_to_download(types_to_download), 95 routing_info(routing_info), 96 ready_task(ready_task), 97 retry_task(retry_task) { 98 DCHECK(!ready_task.is_null()); 99 DCHECK(!retry_task.is_null()); 100 } 101 ConfigurationParams::~ConfigurationParams() {} 102 103 SyncSchedulerImpl::WaitInterval::WaitInterval() 104 : mode(UNKNOWN) {} 105 106 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) 107 : mode(mode), length(length) {} 108 109 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} 110 111 #define ENUM_CASE(x) case x: return #x; break; 112 113 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { 114 switch (mode) { 115 ENUM_CASE(UNKNOWN); 116 ENUM_CASE(EXPONENTIAL_BACKOFF); 117 ENUM_CASE(THROTTLED); 118 } 119 NOTREACHED(); 120 return ""; 121 } 122 123 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( 124 NudgeSource source) { 125 switch (source) { 126 case NUDGE_SOURCE_NOTIFICATION: 127 return GetUpdatesCallerInfo::NOTIFICATION; 128 case NUDGE_SOURCE_LOCAL: 129 return GetUpdatesCallerInfo::LOCAL; 130 case NUDGE_SOURCE_LOCAL_REFRESH: 131 return GetUpdatesCallerInfo::DATATYPE_REFRESH; 132 case NUDGE_SOURCE_UNKNOWN: 133 return GetUpdatesCallerInfo::UNKNOWN; 134 default: 135 NOTREACHED(); 136 return GetUpdatesCallerInfo::UNKNOWN; 137 } 138 } 139 140 // Helper macros to log with the syncer thread name; useful when there 141 // are multiple syncer threads involved. 142 143 #define SLOG(severity) LOG(severity) << name_ << ": " 144 145 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " 146 147 #define SDVLOG_LOC(from_here, verbose_level) \ 148 DVLOG_LOC(from_here, verbose_level) << name_ << ": " 149 150 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, 151 BackoffDelayProvider* delay_provider, 152 sessions::SyncSessionContext* context, 153 Syncer* syncer) 154 : name_(name), 155 started_(false), 156 syncer_short_poll_interval_seconds_( 157 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 158 syncer_long_poll_interval_seconds_( 159 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 160 mode_(NORMAL_MODE), 161 delay_provider_(delay_provider), 162 syncer_(syncer), 163 session_context_(context), 164 no_scheduling_allowed_(false), 165 do_poll_after_credentials_updated_(false), 166 next_sync_session_job_priority_(NORMAL_PRIORITY), 167 weak_ptr_factory_(this), 168 weak_ptr_factory_for_weak_handle_(this) { 169 weak_handle_this_ = MakeWeakHandle( 170 weak_ptr_factory_for_weak_handle_.GetWeakPtr()); 171 } 172 173 SyncSchedulerImpl::~SyncSchedulerImpl() { 174 DCHECK(CalledOnValidThread()); 175 Stop(); 176 } 177 178 void SyncSchedulerImpl::OnCredentialsUpdated() { 179 DCHECK(CalledOnValidThread()); 180 181 if (HttpResponse::SYNC_AUTH_ERROR == 182 session_context_->connection_manager()->server_status()) { 183 OnServerConnectionErrorFixed(); 184 } 185 } 186 187 void SyncSchedulerImpl::OnConnectionStatusChange() { 188 if (HttpResponse::CONNECTION_UNAVAILABLE == 189 session_context_->connection_manager()->server_status()) { 190 // Optimistically assume that the connection is fixed and try 191 // connecting. 192 OnServerConnectionErrorFixed(); 193 } 194 } 195 196 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { 197 // There could be a pending nudge or configuration job in several cases: 198 // 199 // 1. We're in exponential backoff. 200 // 2. We're silenced / throttled. 201 // 3. A nudge was saved previously due to not having a valid auth token. 202 // 4. A nudge was scheduled + saved while in configuration mode. 203 // 204 // In all cases except (2), we want to retry contacting the server. We 205 // call TryCanaryJob to achieve this, and note that nothing -- not even a 206 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that 207 // has the authority to do that is the Unthrottle timer. 208 TryCanaryJob(); 209 } 210 211 void SyncSchedulerImpl::Start(Mode mode) { 212 DCHECK(CalledOnValidThread()); 213 std::string thread_name = base::MessageLoop::current()->thread_name(); 214 if (thread_name.empty()) 215 thread_name = "<Main thread>"; 216 SDVLOG(2) << "Start called from thread " 217 << thread_name << " with mode " << GetModeString(mode); 218 if (!started_) { 219 started_ = true; 220 SendInitialSnapshot(); 221 } 222 223 DCHECK(!session_context_->account_name().empty()); 224 DCHECK(syncer_.get()); 225 Mode old_mode = mode_; 226 mode_ = mode; 227 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed. 228 229 if (old_mode != mode_ && mode_ == NORMAL_MODE) { 230 // We just got back to normal mode. Let's try to run the work that was 231 // queued up while we were configuring. 232 233 // Update our current time before checking IsRetryRequired(). 234 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now()); 235 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) { 236 TrySyncSessionJob(); 237 } 238 } 239 } 240 241 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() { 242 ModelTypeSet enabled_types = session_context_->GetEnabledTypes(); 243 ModelTypeSet enabled_protocol_types = 244 Intersection(ProtocolTypes(), enabled_types); 245 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes(); 246 return Difference(enabled_protocol_types, throttled_types); 247 } 248 249 void SyncSchedulerImpl::SendInitialSnapshot() { 250 DCHECK(CalledOnValidThread()); 251 scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this)); 252 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED); 253 event.snapshot = dummy->TakeSnapshot(); 254 FOR_EACH_OBSERVER(SyncEngineEventListener, 255 *session_context_->listeners(), 256 OnSyncCycleEvent(event)); 257 } 258 259 namespace { 260 261 // Helper to extract the routing info corresponding to types in 262 // |types_to_download| from |current_routes|. 263 void BuildModelSafeParams( 264 ModelTypeSet types_to_download, 265 const ModelSafeRoutingInfo& current_routes, 266 ModelSafeRoutingInfo* result_routes) { 267 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); 268 iter.Inc()) { 269 ModelType type = iter.Get(); 270 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); 271 DCHECK(route != current_routes.end()); 272 ModelSafeGroup group = route->second; 273 (*result_routes)[type] = group; 274 } 275 } 276 277 } // namespace. 278 279 void SyncSchedulerImpl::ScheduleConfiguration( 280 const ConfigurationParams& params) { 281 DCHECK(CalledOnValidThread()); 282 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); 283 DCHECK_EQ(CONFIGURATION_MODE, mode_); 284 DCHECK(!params.ready_task.is_null()); 285 CHECK(started_) << "Scheduler must be running to configure."; 286 SDVLOG(2) << "Reconfiguring syncer."; 287 288 // Only one configuration is allowed at a time. Verify we're not waiting 289 // for a pending configure job. 290 DCHECK(!pending_configure_params_); 291 292 ModelSafeRoutingInfo restricted_routes; 293 BuildModelSafeParams(params.types_to_download, 294 params.routing_info, 295 &restricted_routes); 296 session_context_->SetRoutingInfo(restricted_routes); 297 298 // Only reconfigure if we have types to download. 299 if (!params.types_to_download.Empty()) { 300 pending_configure_params_.reset(new ConfigurationParams(params)); 301 TrySyncSessionJob(); 302 } else { 303 SDVLOG(2) << "No change in routing info, calling ready task directly."; 304 params.ready_task.Run(); 305 } 306 } 307 308 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) { 309 DCHECK(CalledOnValidThread()); 310 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) { 311 SDVLOG(1) << "Unable to run a job because we're throttled."; 312 return false; 313 } 314 315 if (wait_interval_ 316 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF 317 && priority != CANARY_PRIORITY) { 318 SDVLOG(1) << "Unable to run a job because we're backing off."; 319 return false; 320 } 321 322 if (session_context_->connection_manager()->HasInvalidAuthToken()) { 323 SDVLOG(1) << "Unable to run a job because we have no valid auth token."; 324 return false; 325 } 326 327 return true; 328 } 329 330 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) { 331 DCHECK(CalledOnValidThread()); 332 333 if (!CanRunJobNow(priority)) { 334 SDVLOG(1) << "Unable to run a nudge job right now"; 335 return false; 336 } 337 338 const ModelTypeSet enabled_types = session_context_->GetEnabledTypes(); 339 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) { 340 SDVLOG(1) << "Not running a nudge because we're fully type throttled."; 341 return false; 342 } 343 344 if (mode_ == CONFIGURATION_MODE) { 345 SDVLOG(1) << "Not running nudge because we're in configuration mode."; 346 return false; 347 } 348 349 return true; 350 } 351 352 void SyncSchedulerImpl::ScheduleLocalNudge( 353 ModelTypeSet types, 354 const tracked_objects::Location& nudge_location) { 355 DCHECK(CalledOnValidThread()); 356 DCHECK(!types.Empty()); 357 358 SDVLOG_LOC(nudge_location, 2) 359 << "Scheduling sync because of local change to " 360 << ModelTypeSetToString(types); 361 UpdateNudgeTimeRecords(types); 362 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalChange(types); 363 ScheduleNudgeImpl(nudge_delay, nudge_location); 364 } 365 366 void SyncSchedulerImpl::ScheduleLocalRefreshRequest( 367 ModelTypeSet types, 368 const tracked_objects::Location& nudge_location) { 369 DCHECK(CalledOnValidThread()); 370 DCHECK(!types.Empty()); 371 372 SDVLOG_LOC(nudge_location, 2) 373 << "Scheduling sync because of local refresh request for " 374 << ModelTypeSetToString(types); 375 base::TimeDelta nudge_delay = nudge_tracker_.RecordLocalRefreshRequest(types); 376 ScheduleNudgeImpl(nudge_delay, nudge_location); 377 } 378 379 void SyncSchedulerImpl::ScheduleInvalidationNudge( 380 syncer::ModelType model_type, 381 scoped_ptr<InvalidationInterface> invalidation, 382 const tracked_objects::Location& nudge_location) { 383 DCHECK(CalledOnValidThread()); 384 385 SDVLOG_LOC(nudge_location, 2) 386 << "Scheduling sync because we received invalidation for " 387 << ModelTypeToString(model_type); 388 base::TimeDelta nudge_delay = 389 nudge_tracker_.RecordRemoteInvalidation(model_type, invalidation.Pass()); 390 ScheduleNudgeImpl(nudge_delay, nudge_location); 391 } 392 393 void SyncSchedulerImpl::ScheduleInitialSyncNudge(syncer::ModelType model_type) { 394 DCHECK(CalledOnValidThread()); 395 396 SDVLOG(2) << "Scheduling non-blocking initial sync for " 397 << ModelTypeToString(model_type); 398 nudge_tracker_.RecordInitialSyncRequired(model_type); 399 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), FROM_HERE); 400 } 401 402 // TODO(zea): Consider adding separate throttling/backoff for datatype 403 // refresh requests. 404 void SyncSchedulerImpl::ScheduleNudgeImpl( 405 const TimeDelta& delay, 406 const tracked_objects::Location& nudge_location) { 407 DCHECK(CalledOnValidThread()); 408 409 if (no_scheduling_allowed_) { 410 NOTREACHED() << "Illegal to schedule job while session in progress."; 411 return; 412 } 413 414 if (!started_) { 415 SDVLOG_LOC(nudge_location, 2) 416 << "Dropping nudge, scheduler is not running."; 417 return; 418 } 419 420 SDVLOG_LOC(nudge_location, 2) 421 << "In ScheduleNudgeImpl with delay " 422 << delay.InMilliseconds() << " ms"; 423 424 if (!CanRunNudgeJobNow(NORMAL_PRIORITY)) 425 return; 426 427 TimeTicks incoming_run_time = TimeTicks::Now() + delay; 428 if (!scheduled_nudge_time_.is_null() && 429 (scheduled_nudge_time_ < incoming_run_time)) { 430 // Old job arrives sooner than this one. Don't reschedule it. 431 return; 432 } 433 434 // Either there is no existing nudge in flight or the incoming nudge should be 435 // made to arrive first (preempt) the existing nudge. We reschedule in either 436 // case. 437 SDVLOG_LOC(nudge_location, 2) 438 << "Scheduling a nudge with " 439 << delay.InMilliseconds() << " ms delay"; 440 scheduled_nudge_time_ = incoming_run_time; 441 pending_wakeup_timer_.Start( 442 nudge_location, 443 delay, 444 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge, 445 weak_ptr_factory_.GetWeakPtr())); 446 } 447 448 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 449 switch (mode) { 450 ENUM_CASE(CONFIGURATION_MODE); 451 ENUM_CASE(NORMAL_MODE); 452 } 453 return ""; 454 } 455 456 void SyncSchedulerImpl::SetDefaultNudgeDelay(base::TimeDelta delay_ms) { 457 DCHECK(CalledOnValidThread()); 458 nudge_tracker_.SetDefaultNudgeDelay(delay_ms); 459 } 460 461 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { 462 DCHECK(CalledOnValidThread()); 463 DCHECK(CanRunNudgeJobNow(priority)); 464 465 DVLOG(2) << "Will run normal mode sync cycle with types " 466 << ModelTypeSetToString(session_context_->GetEnabledTypes()); 467 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 468 bool premature_exit = !syncer_->NormalSyncShare( 469 GetEnabledAndUnthrottledTypes(), 470 nudge_tracker_, 471 session.get()); 472 AdjustPolling(FORCE_RESET); 473 // Don't run poll job till the next time poll timer fires. 474 do_poll_after_credentials_updated_ = false; 475 476 bool success = !premature_exit 477 && !sessions::HasSyncerError( 478 session->status_controller().model_neutral_state()); 479 480 if (success) { 481 // That cycle took care of any outstanding work we had. 482 SDVLOG(2) << "Nudge succeeded."; 483 nudge_tracker_.RecordSuccessfulSyncCycle(); 484 scheduled_nudge_time_ = base::TimeTicks(); 485 486 // If we're here, then we successfully reached the server. End all backoff. 487 wait_interval_.reset(); 488 NotifyRetryTime(base::Time()); 489 } else { 490 HandleFailure(session->status_controller().model_neutral_state()); 491 } 492 } 493 494 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { 495 DCHECK(CalledOnValidThread()); 496 DCHECK_EQ(mode_, CONFIGURATION_MODE); 497 DCHECK(pending_configure_params_ != NULL); 498 499 if (!CanRunJobNow(priority)) { 500 SDVLOG(2) << "Unable to run configure job right now."; 501 if (!pending_configure_params_->retry_task.is_null()) { 502 pending_configure_params_->retry_task.Run(); 503 pending_configure_params_->retry_task.Reset(); 504 } 505 return; 506 } 507 508 SDVLOG(2) << "Will run configure SyncShare with types " 509 << ModelTypeSetToString(session_context_->GetEnabledTypes()); 510 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 511 bool premature_exit = !syncer_->ConfigureSyncShare( 512 pending_configure_params_->types_to_download, 513 pending_configure_params_->source, 514 session.get()); 515 AdjustPolling(FORCE_RESET); 516 // Don't run poll job till the next time poll timer fires. 517 do_poll_after_credentials_updated_ = false; 518 519 bool success = !premature_exit 520 && !sessions::HasSyncerError( 521 session->status_controller().model_neutral_state()); 522 523 if (success) { 524 SDVLOG(2) << "Configure succeeded."; 525 pending_configure_params_->ready_task.Run(); 526 pending_configure_params_.reset(); 527 528 // If we're here, then we successfully reached the server. End all backoff. 529 wait_interval_.reset(); 530 NotifyRetryTime(base::Time()); 531 } else { 532 HandleFailure(session->status_controller().model_neutral_state()); 533 // Sync cycle might receive response from server that causes scheduler to 534 // stop and draws pending_configure_params_ invalid. 535 if (started_ && !pending_configure_params_->retry_task.is_null()) { 536 pending_configure_params_->retry_task.Run(); 537 pending_configure_params_->retry_task.Reset(); 538 } 539 } 540 } 541 542 void SyncSchedulerImpl::HandleFailure( 543 const sessions::ModelNeutralState& model_neutral_state) { 544 if (IsCurrentlyThrottled()) { 545 SDVLOG(2) << "Was throttled during previous sync cycle."; 546 RestartWaiting(); 547 } else if (!IsBackingOff()) { 548 // Setup our backoff if this is our first such failure. 549 TimeDelta length = delay_provider_->GetDelay( 550 delay_provider_->GetInitialDelay(model_neutral_state)); 551 wait_interval_.reset( 552 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); 553 SDVLOG(2) << "Sync cycle failed. Will back off for " 554 << wait_interval_->length.InMilliseconds() << "ms."; 555 RestartWaiting(); 556 } 557 } 558 559 void SyncSchedulerImpl::DoPollSyncSessionJob() { 560 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); 561 562 SDVLOG(2) << "Polling with types " 563 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes()); 564 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 565 syncer_->PollSyncShare( 566 GetEnabledAndUnthrottledTypes(), 567 session.get()); 568 569 AdjustPolling(FORCE_RESET); 570 571 if (IsCurrentlyThrottled()) { 572 SDVLOG(2) << "Poll request got us throttled."; 573 // The OnSilencedUntil() call set up the WaitInterval for us. All we need 574 // to do is start the timer. 575 RestartWaiting(); 576 } 577 } 578 579 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) { 580 DCHECK(CalledOnValidThread()); 581 base::TimeTicks now = TimeTicks::Now(); 582 // Update timing information for how often datatypes are triggering nudges. 583 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) { 584 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()]; 585 last_local_nudges_by_model_type_[iter.Get()] = now; 586 if (previous.is_null()) 587 continue; 588 589 #define PER_DATA_TYPE_MACRO(type_str) \ 590 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 591 SYNC_DATA_TYPE_HISTOGRAM(iter.Get()); 592 #undef PER_DATA_TYPE_MACRO 593 } 594 } 595 596 TimeDelta SyncSchedulerImpl::GetPollInterval() { 597 return (!session_context_->notifications_enabled() || 598 !session_context_->ShouldFetchUpdatesBeforeCommit()) ? 599 syncer_short_poll_interval_seconds_ : 600 syncer_long_poll_interval_seconds_; 601 } 602 603 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) { 604 DCHECK(CalledOnValidThread()); 605 606 TimeDelta poll = GetPollInterval(); 607 bool rate_changed = !poll_timer_.IsRunning() || 608 poll != poll_timer_.GetCurrentDelay(); 609 610 if (type == FORCE_RESET) { 611 last_poll_reset_ = base::TimeTicks::Now(); 612 if (!rate_changed) 613 poll_timer_.Reset(); 614 } 615 616 if (!rate_changed) 617 return; 618 619 // Adjust poll rate. 620 poll_timer_.Stop(); 621 poll_timer_.Start(FROM_HERE, poll, this, 622 &SyncSchedulerImpl::PollTimerCallback); 623 } 624 625 void SyncSchedulerImpl::RestartWaiting() { 626 CHECK(wait_interval_.get()); 627 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); 628 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 629 SDVLOG(2) << "Starting WaitInterval timer of length " 630 << wait_interval_->length.InMilliseconds() << "ms."; 631 if (wait_interval_->mode == WaitInterval::THROTTLED) { 632 pending_wakeup_timer_.Start( 633 FROM_HERE, 634 wait_interval_->length, 635 base::Bind(&SyncSchedulerImpl::Unthrottle, 636 weak_ptr_factory_.GetWeakPtr())); 637 } else { 638 pending_wakeup_timer_.Start( 639 FROM_HERE, 640 wait_interval_->length, 641 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry, 642 weak_ptr_factory_.GetWeakPtr())); 643 } 644 } 645 646 void SyncSchedulerImpl::Stop() { 647 DCHECK(CalledOnValidThread()); 648 SDVLOG(2) << "Stop called"; 649 650 // Kill any in-flight method calls. 651 weak_ptr_factory_.InvalidateWeakPtrs(); 652 wait_interval_.reset(); 653 NotifyRetryTime(base::Time()); 654 poll_timer_.Stop(); 655 pending_wakeup_timer_.Stop(); 656 pending_configure_params_.reset(); 657 if (started_) 658 started_ = false; 659 } 660 661 // This is the only place where we invoke DoSyncSessionJob with canary 662 // privileges. Everyone else should use NORMAL_PRIORITY. 663 void SyncSchedulerImpl::TryCanaryJob() { 664 next_sync_session_job_priority_ = CANARY_PRIORITY; 665 TrySyncSessionJob(); 666 } 667 668 void SyncSchedulerImpl::TrySyncSessionJob() { 669 // Post call to TrySyncSessionJobImpl on current thread. Later request for 670 // access token will be here. 671 base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind( 672 &SyncSchedulerImpl::TrySyncSessionJobImpl, 673 weak_ptr_factory_.GetWeakPtr())); 674 } 675 676 void SyncSchedulerImpl::TrySyncSessionJobImpl() { 677 JobPriority priority = next_sync_session_job_priority_; 678 next_sync_session_job_priority_ = NORMAL_PRIORITY; 679 680 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now()); 681 682 DCHECK(CalledOnValidThread()); 683 if (mode_ == CONFIGURATION_MODE) { 684 if (pending_configure_params_) { 685 SDVLOG(2) << "Found pending configure job"; 686 DoConfigurationSyncSessionJob(priority); 687 } 688 } else if (CanRunNudgeJobNow(priority)) { 689 if (nudge_tracker_.IsSyncRequired()) { 690 SDVLOG(2) << "Found pending nudge job"; 691 DoNudgeSyncSessionJob(priority); 692 } else if (do_poll_after_credentials_updated_ || 693 ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) { 694 DoPollSyncSessionJob(); 695 // Poll timer fires infrequently. Usually by this time access token is 696 // already expired and poll job will fail with auth error. Set flag to 697 // retry poll once ProfileSyncService gets new access token, TryCanaryJob 698 // will be called after access token is retrieved. 699 if (HttpResponse::SYNC_AUTH_ERROR == 700 session_context_->connection_manager()->server_status()) { 701 do_poll_after_credentials_updated_ = true; 702 } 703 } 704 } 705 706 if (priority == CANARY_PRIORITY) { 707 // If this is canary job then whatever result was don't run poll job till 708 // the next time poll timer fires. 709 do_poll_after_credentials_updated_ = false; 710 } 711 712 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) { 713 // If we succeeded, our wait interval would have been cleared. If it hasn't 714 // been cleared, then we should increase our backoff interval and schedule 715 // another retry. 716 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length); 717 wait_interval_.reset( 718 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); 719 SDVLOG(2) << "Sync cycle failed. Will back off for " 720 << wait_interval_->length.InMilliseconds() << "ms."; 721 RestartWaiting(); 722 } 723 } 724 725 void SyncSchedulerImpl::PollTimerCallback() { 726 DCHECK(CalledOnValidThread()); 727 if (no_scheduling_allowed_) { 728 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in 729 // functions that are called only on the sync thread. This function is also 730 // called only on the sync thread, and only when it is posted by an expiring 731 // timer. If we find that no_scheduling_allowed_ is set here, then 732 // something is very wrong. Maybe someone mistakenly called us directly, or 733 // mishandled the book-keeping for no_scheduling_allowed_. 734 NOTREACHED() << "Illegal to schedule job while session in progress."; 735 return; 736 } 737 738 TrySyncSessionJob(); 739 } 740 741 void SyncSchedulerImpl::RetryTimerCallback() { 742 TrySyncSessionJob(); 743 } 744 745 void SyncSchedulerImpl::Unthrottle() { 746 DCHECK(CalledOnValidThread()); 747 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 748 749 // We're no longer throttled, so clear the wait interval. 750 wait_interval_.reset(); 751 NotifyRetryTime(base::Time()); 752 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 753 754 // We treat this as a 'canary' in the sense that it was originally scheduled 755 // to run some time ago, failed, and we now want to retry, versus a job that 756 // was just created (e.g via ScheduleNudgeImpl). The main implication is 757 // that we're careful to update routing info (etc) with such potentially 758 // stale canary jobs. 759 TryCanaryJob(); 760 } 761 762 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) { 763 DCHECK(CalledOnValidThread()); 764 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time); 765 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 766 767 if (nudge_tracker_.IsAnyTypeThrottled()) { 768 const base::TimeTicks now = base::TimeTicks::Now(); 769 base::TimeDelta time_until_next_unthrottle = 770 nudge_tracker_.GetTimeUntilNextUnthrottle(now); 771 type_unthrottle_timer_.Start( 772 FROM_HERE, 773 time_until_next_unthrottle, 774 base::Bind(&SyncSchedulerImpl::TypeUnthrottle, 775 weak_ptr_factory_.GetWeakPtr(), 776 now + time_until_next_unthrottle)); 777 } 778 779 // Maybe this is a good time to run a nudge job. Let's try it. 780 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) 781 TrySyncSessionJob(); 782 } 783 784 void SyncSchedulerImpl::PerformDelayedNudge() { 785 // Circumstances may have changed since we scheduled this delayed nudge. 786 // We must check to see if it's OK to run the job before we do so. 787 if (CanRunNudgeJobNow(NORMAL_PRIORITY)) 788 TrySyncSessionJob(); 789 790 // We're not responsible for setting up any retries here. The functions that 791 // first put us into a state that prevents successful sync cycles (eg. global 792 // throttling, type throttling, network errors, transient errors) will also 793 // setup the appropriate retry logic (eg. retry after timeout, exponential 794 // backoff, retry when the network changes). 795 } 796 797 void SyncSchedulerImpl::ExponentialBackoffRetry() { 798 TryCanaryJob(); 799 } 800 801 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { 802 FOR_EACH_OBSERVER(SyncEngineEventListener, 803 *session_context_->listeners(), 804 OnRetryTimeChanged(retry_time)); 805 } 806 807 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) { 808 FOR_EACH_OBSERVER(SyncEngineEventListener, 809 *session_context_->listeners(), 810 OnThrottledTypesChanged(types)); 811 } 812 813 bool SyncSchedulerImpl::IsBackingOff() const { 814 DCHECK(CalledOnValidThread()); 815 return wait_interval_.get() && wait_interval_->mode == 816 WaitInterval::EXPONENTIAL_BACKOFF; 817 } 818 819 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) { 820 DCHECK(CalledOnValidThread()); 821 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, 822 throttle_duration)); 823 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 824 NotifyThrottledTypesChanged(ModelTypeSet::All()); 825 } 826 827 void SyncSchedulerImpl::OnTypesThrottled( 828 ModelTypeSet types, 829 const base::TimeDelta& throttle_duration) { 830 base::TimeTicks now = base::TimeTicks::Now(); 831 832 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now); 833 base::TimeDelta time_until_next_unthrottle = 834 nudge_tracker_.GetTimeUntilNextUnthrottle(now); 835 type_unthrottle_timer_.Start( 836 FROM_HERE, 837 time_until_next_unthrottle, 838 base::Bind(&SyncSchedulerImpl::TypeUnthrottle, 839 weak_ptr_factory_.GetWeakPtr(), 840 now + time_until_next_unthrottle)); 841 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 842 } 843 844 bool SyncSchedulerImpl::IsCurrentlyThrottled() { 845 DCHECK(CalledOnValidThread()); 846 return wait_interval_.get() && wait_interval_->mode == 847 WaitInterval::THROTTLED; 848 } 849 850 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( 851 const base::TimeDelta& new_interval) { 852 DCHECK(CalledOnValidThread()); 853 syncer_short_poll_interval_seconds_ = new_interval; 854 } 855 856 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( 857 const base::TimeDelta& new_interval) { 858 DCHECK(CalledOnValidThread()); 859 syncer_long_poll_interval_seconds_ = new_interval; 860 } 861 862 void SyncSchedulerImpl::OnReceivedCustomNudgeDelays( 863 const std::map<ModelType, base::TimeDelta>& nudge_delays) { 864 DCHECK(CalledOnValidThread()); 865 nudge_tracker_.OnReceivedCustomNudgeDelays(nudge_delays); 866 } 867 868 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) { 869 if (size > 0) 870 nudge_tracker_.SetHintBufferSize(size); 871 else 872 NOTREACHED() << "Hint buffer size should be > 0."; 873 } 874 875 void SyncSchedulerImpl::OnSyncProtocolError( 876 const SyncProtocolError& sync_protocol_error) { 877 DCHECK(CalledOnValidThread()); 878 if (ShouldRequestEarlyExit(sync_protocol_error)) { 879 SDVLOG(2) << "Sync Scheduler requesting early exit."; 880 Stop(); 881 } 882 if (IsActionableError(sync_protocol_error)) { 883 SDVLOG(2) << "OnActionableError"; 884 FOR_EACH_OBSERVER(SyncEngineEventListener, 885 *session_context_->listeners(), 886 OnActionableError(sync_protocol_error)); 887 } 888 } 889 890 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) { 891 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay); 892 retry_timer_.Start(FROM_HERE, delay, this, 893 &SyncSchedulerImpl::RetryTimerCallback); 894 } 895 896 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) { 897 FOR_EACH_OBSERVER(SyncEngineEventListener, 898 *session_context_->listeners(), 899 OnMigrationRequested(types)); 900 } 901 902 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { 903 DCHECK(CalledOnValidThread()); 904 session_context_->set_notifications_enabled(notifications_enabled); 905 if (notifications_enabled) 906 nudge_tracker_.OnInvalidationsEnabled(); 907 else 908 nudge_tracker_.OnInvalidationsDisabled(); 909 } 910 911 #undef SDVLOG_LOC 912 913 #undef SDVLOG 914 915 #undef SLOG 916 917 #undef ENUM_CASE 918 919 } // namespace syncer 920