1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/engine/sync_scheduler_impl.h" 6 7 #include <algorithm> 8 #include <cstring> 9 10 #include "base/auto_reset.h" 11 #include "base/bind.h" 12 #include "base/bind_helpers.h" 13 #include "base/compiler_specific.h" 14 #include "base/location.h" 15 #include "base/logging.h" 16 #include "base/message_loop/message_loop.h" 17 #include "sync/engine/backoff_delay_provider.h" 18 #include "sync/engine/syncer.h" 19 #include "sync/protocol/proto_enum_conversions.h" 20 #include "sync/protocol/sync.pb.h" 21 #include "sync/util/data_type_histogram.h" 22 #include "sync/util/logging.h" 23 24 using base::TimeDelta; 25 using base::TimeTicks; 26 27 namespace syncer { 28 29 using sessions::SyncSession; 30 using sessions::SyncSessionSnapshot; 31 using sync_pb::GetUpdatesCallerInfo; 32 33 namespace { 34 35 bool ShouldRequestEarlyExit(const SyncProtocolError& error) { 36 switch (error.error_type) { 37 case SYNC_SUCCESS: 38 case MIGRATION_DONE: 39 case THROTTLED: 40 case TRANSIENT_ERROR: 41 return false; 42 case NOT_MY_BIRTHDAY: 43 case CLEAR_PENDING: 44 case DISABLED_BY_ADMIN: 45 // If we send terminate sync early then |sync_cycle_ended| notification 46 // would not be sent. If there were no actions then |ACTIONABLE_ERROR| 47 // notification wouldnt be sent either. Then the UI layer would be left 48 // waiting forever. So assert we would send something. 49 DCHECK_NE(error.action, UNKNOWN_ACTION); 50 return true; 51 case INVALID_CREDENTIAL: 52 // The notification for this is handled by PostAndProcessHeaders|. 53 // Server does no have to send any action for this. 54 return true; 55 // Make the default a NOTREACHED. So if a new error is introduced we 56 // think about its expected functionality. 57 default: 58 NOTREACHED(); 59 return false; 60 } 61 } 62 63 bool IsActionableError( 64 const SyncProtocolError& error) { 65 return (error.action != UNKNOWN_ACTION); 66 } 67 } // namespace 68 69 ConfigurationParams::ConfigurationParams() 70 : source(GetUpdatesCallerInfo::UNKNOWN) {} 71 ConfigurationParams::ConfigurationParams( 72 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, 73 ModelTypeSet types_to_download, 74 const ModelSafeRoutingInfo& routing_info, 75 const base::Closure& ready_task) 76 : source(source), 77 types_to_download(types_to_download), 78 routing_info(routing_info), 79 ready_task(ready_task) { 80 DCHECK(!ready_task.is_null()); 81 } 82 ConfigurationParams::~ConfigurationParams() {} 83 84 SyncSchedulerImpl::WaitInterval::WaitInterval() 85 : mode(UNKNOWN) {} 86 87 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) 88 : mode(mode), length(length) {} 89 90 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} 91 92 #define ENUM_CASE(x) case x: return #x; break; 93 94 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { 95 switch (mode) { 96 ENUM_CASE(UNKNOWN); 97 ENUM_CASE(EXPONENTIAL_BACKOFF); 98 ENUM_CASE(THROTTLED); 99 } 100 NOTREACHED(); 101 return ""; 102 } 103 104 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( 105 NudgeSource source) { 106 switch (source) { 107 case NUDGE_SOURCE_NOTIFICATION: 108 return GetUpdatesCallerInfo::NOTIFICATION; 109 case NUDGE_SOURCE_LOCAL: 110 return GetUpdatesCallerInfo::LOCAL; 111 case NUDGE_SOURCE_LOCAL_REFRESH: 112 return GetUpdatesCallerInfo::DATATYPE_REFRESH; 113 case NUDGE_SOURCE_UNKNOWN: 114 return GetUpdatesCallerInfo::UNKNOWN; 115 default: 116 NOTREACHED(); 117 return GetUpdatesCallerInfo::UNKNOWN; 118 } 119 } 120 121 // Helper macros to log with the syncer thread name; useful when there 122 // are multiple syncer threads involved. 123 124 #define SLOG(severity) LOG(severity) << name_ << ": " 125 126 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " 127 128 #define SDVLOG_LOC(from_here, verbose_level) \ 129 DVLOG_LOC(from_here, verbose_level) << name_ << ": " 130 131 namespace { 132 133 const int kDefaultSessionsCommitDelaySeconds = 10; 134 135 bool IsConfigRelatedUpdateSourceValue( 136 GetUpdatesCallerInfo::GetUpdatesSource source) { 137 switch (source) { 138 case GetUpdatesCallerInfo::RECONFIGURATION: 139 case GetUpdatesCallerInfo::MIGRATION: 140 case GetUpdatesCallerInfo::NEW_CLIENT: 141 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE: 142 return true; 143 default: 144 return false; 145 } 146 } 147 148 } // namespace 149 150 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, 151 BackoffDelayProvider* delay_provider, 152 sessions::SyncSessionContext* context, 153 Syncer* syncer) 154 : weak_ptr_factory_(this), 155 weak_ptr_factory_for_weak_handle_(this), 156 weak_handle_this_(MakeWeakHandle( 157 weak_ptr_factory_for_weak_handle_.GetWeakPtr())), 158 name_(name), 159 started_(false), 160 syncer_short_poll_interval_seconds_( 161 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 162 syncer_long_poll_interval_seconds_( 163 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 164 sessions_commit_delay_( 165 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), 166 mode_(NORMAL_MODE), 167 delay_provider_(delay_provider), 168 syncer_(syncer), 169 session_context_(context), 170 no_scheduling_allowed_(false), 171 do_poll_after_credentials_updated_(false) { 172 } 173 174 SyncSchedulerImpl::~SyncSchedulerImpl() { 175 DCHECK(CalledOnValidThread()); 176 StopImpl(); 177 } 178 179 void SyncSchedulerImpl::OnCredentialsUpdated() { 180 DCHECK(CalledOnValidThread()); 181 182 if (HttpResponse::SYNC_AUTH_ERROR == 183 session_context_->connection_manager()->server_status()) { 184 OnServerConnectionErrorFixed(); 185 } 186 } 187 188 void SyncSchedulerImpl::OnConnectionStatusChange() { 189 if (HttpResponse::CONNECTION_UNAVAILABLE == 190 session_context_->connection_manager()->server_status()) { 191 // Optimistically assume that the connection is fixed and try 192 // connecting. 193 OnServerConnectionErrorFixed(); 194 } 195 } 196 197 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { 198 // There could be a pending nudge or configuration job in several cases: 199 // 200 // 1. We're in exponential backoff. 201 // 2. We're silenced / throttled. 202 // 3. A nudge was saved previously due to not having a valid auth token. 203 // 4. A nudge was scheduled + saved while in configuration mode. 204 // 205 // In all cases except (2), we want to retry contacting the server. We 206 // call DoCanaryJob to achieve this, and note that nothing -- not even a 207 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that 208 // has the authority to do that is the Unthrottle timer. 209 TryCanaryJob(); 210 } 211 212 void SyncSchedulerImpl::Start(Mode mode) { 213 DCHECK(CalledOnValidThread()); 214 std::string thread_name = base::MessageLoop::current()->thread_name(); 215 if (thread_name.empty()) 216 thread_name = "<Main thread>"; 217 SDVLOG(2) << "Start called from thread " 218 << thread_name << " with mode " << GetModeString(mode); 219 if (!started_) { 220 started_ = true; 221 SendInitialSnapshot(); 222 } 223 224 DCHECK(!session_context_->account_name().empty()); 225 DCHECK(syncer_.get()); 226 Mode old_mode = mode_; 227 mode_ = mode; 228 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed. 229 230 if (old_mode != mode_ && 231 mode_ == NORMAL_MODE && 232 nudge_tracker_.IsSyncRequired() && 233 CanRunNudgeJobNow(NORMAL_PRIORITY)) { 234 // We just got back to normal mode. Let's try to run the work that was 235 // queued up while we were configuring. 236 DoNudgeSyncSessionJob(NORMAL_PRIORITY); 237 } 238 } 239 240 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() { 241 ModelTypeSet enabled_types = 242 GetRoutingInfoTypes(session_context_->routing_info()); 243 ModelTypeSet throttled_types = 244 nudge_tracker_.GetThrottledTypes(); 245 return Difference(enabled_types, throttled_types); 246 } 247 248 void SyncSchedulerImpl::SendInitialSnapshot() { 249 DCHECK(CalledOnValidThread()); 250 scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this)); 251 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 252 event.snapshot = dummy->TakeSnapshot(); 253 session_context_->NotifyListeners(event); 254 } 255 256 namespace { 257 258 // Helper to extract the routing info corresponding to types in 259 // |types_to_download| from |current_routes|. 260 void BuildModelSafeParams( 261 ModelTypeSet types_to_download, 262 const ModelSafeRoutingInfo& current_routes, 263 ModelSafeRoutingInfo* result_routes) { 264 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); 265 iter.Inc()) { 266 ModelType type = iter.Get(); 267 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); 268 DCHECK(route != current_routes.end()); 269 ModelSafeGroup group = route->second; 270 (*result_routes)[type] = group; 271 } 272 } 273 274 } // namespace. 275 276 bool SyncSchedulerImpl::ScheduleConfiguration( 277 const ConfigurationParams& params) { 278 DCHECK(CalledOnValidThread()); 279 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); 280 DCHECK_EQ(CONFIGURATION_MODE, mode_); 281 DCHECK(!params.ready_task.is_null()); 282 CHECK(started_) << "Scheduler must be running to configure."; 283 SDVLOG(2) << "Reconfiguring syncer."; 284 285 // Only one configuration is allowed at a time. Verify we're not waiting 286 // for a pending configure job. 287 DCHECK(!pending_configure_params_); 288 289 ModelSafeRoutingInfo restricted_routes; 290 BuildModelSafeParams(params.types_to_download, 291 params.routing_info, 292 &restricted_routes); 293 session_context_->set_routing_info(restricted_routes); 294 295 // Only reconfigure if we have types to download. 296 if (!params.types_to_download.Empty()) { 297 pending_configure_params_.reset(new ConfigurationParams(params)); 298 bool succeeded = DoConfigurationSyncSessionJob(NORMAL_PRIORITY); 299 300 // If we failed, the job would have been saved as the pending configure 301 // job and a wait interval would have been set. 302 if (!succeeded) { 303 DCHECK(pending_configure_params_); 304 } else { 305 DCHECK(!pending_configure_params_); 306 } 307 return succeeded; 308 } else { 309 SDVLOG(2) << "No change in routing info, calling ready task directly."; 310 params.ready_task.Run(); 311 } 312 313 return true; 314 } 315 316 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) { 317 DCHECK(CalledOnValidThread()); 318 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) { 319 SDVLOG(1) << "Unable to run a job because we're throttled."; 320 return false; 321 } 322 323 if (wait_interval_ 324 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF 325 && priority != CANARY_PRIORITY) { 326 SDVLOG(1) << "Unable to run a job because we're backing off."; 327 return false; 328 } 329 330 if (session_context_->connection_manager()->HasInvalidAuthToken()) { 331 SDVLOG(1) << "Unable to run a job because we have no valid auth token."; 332 return false; 333 } 334 335 return true; 336 } 337 338 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) { 339 DCHECK(CalledOnValidThread()); 340 341 if (!CanRunJobNow(priority)) { 342 SDVLOG(1) << "Unable to run a nudge job right now"; 343 return false; 344 } 345 346 const ModelTypeSet enabled_types = 347 GetRoutingInfoTypes(session_context_->routing_info()); 348 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) { 349 SDVLOG(1) << "Not running a nudge because we're fully type throttled."; 350 return false; 351 } 352 353 if (mode_ == CONFIGURATION_MODE) { 354 SDVLOG(1) << "Not running nudge because we're in configuration mode."; 355 return false; 356 } 357 358 return true; 359 } 360 361 void SyncSchedulerImpl::ScheduleLocalNudge( 362 const TimeDelta& desired_delay, 363 ModelTypeSet types, 364 const tracked_objects::Location& nudge_location) { 365 DCHECK(CalledOnValidThread()); 366 DCHECK(!types.Empty()); 367 368 SDVLOG_LOC(nudge_location, 2) 369 << "Scheduling sync because of local change to " 370 << ModelTypeSetToString(types); 371 UpdateNudgeTimeRecords(types); 372 nudge_tracker_.RecordLocalChange(types); 373 ScheduleNudgeImpl(desired_delay, nudge_location); 374 } 375 376 void SyncSchedulerImpl::ScheduleLocalRefreshRequest( 377 const TimeDelta& desired_delay, 378 ModelTypeSet types, 379 const tracked_objects::Location& nudge_location) { 380 DCHECK(CalledOnValidThread()); 381 DCHECK(!types.Empty()); 382 383 SDVLOG_LOC(nudge_location, 2) 384 << "Scheduling sync because of local refresh request for " 385 << ModelTypeSetToString(types); 386 nudge_tracker_.RecordLocalRefreshRequest(types); 387 ScheduleNudgeImpl(desired_delay, nudge_location); 388 } 389 390 void SyncSchedulerImpl::ScheduleInvalidationNudge( 391 const TimeDelta& desired_delay, 392 const ModelTypeInvalidationMap& invalidation_map, 393 const tracked_objects::Location& nudge_location) { 394 DCHECK(CalledOnValidThread()); 395 DCHECK(!invalidation_map.empty()); 396 397 SDVLOG_LOC(nudge_location, 2) 398 << "Scheduling sync because we received invalidation for " 399 << ModelTypeInvalidationMapToString(invalidation_map); 400 nudge_tracker_.RecordRemoteInvalidation(invalidation_map); 401 ScheduleNudgeImpl(desired_delay, nudge_location); 402 } 403 404 // TODO(zea): Consider adding separate throttling/backoff for datatype 405 // refresh requests. 406 void SyncSchedulerImpl::ScheduleNudgeImpl( 407 const TimeDelta& delay, 408 const tracked_objects::Location& nudge_location) { 409 DCHECK(CalledOnValidThread()); 410 411 if (no_scheduling_allowed_) { 412 NOTREACHED() << "Illegal to schedule job while session in progress."; 413 return; 414 } 415 416 if (!started_) { 417 SDVLOG_LOC(nudge_location, 2) 418 << "Dropping nudge, scheduler is not running."; 419 return; 420 } 421 422 SDVLOG_LOC(nudge_location, 2) 423 << "In ScheduleNudgeImpl with delay " 424 << delay.InMilliseconds() << " ms"; 425 426 if (!CanRunNudgeJobNow(NORMAL_PRIORITY)) 427 return; 428 429 if (!started_) { 430 SDVLOG_LOC(nudge_location, 2) 431 << "Schedule not started; not running a nudge."; 432 return; 433 } 434 435 TimeTicks incoming_run_time = TimeTicks::Now() + delay; 436 if (!scheduled_nudge_time_.is_null() && 437 (scheduled_nudge_time_ < incoming_run_time)) { 438 // Old job arrives sooner than this one. Don't reschedule it. 439 return; 440 } 441 442 // Either there is no existing nudge in flight or the incoming nudge should be 443 // made to arrive first (preempt) the existing nudge. We reschedule in either 444 // case. 445 SDVLOG_LOC(nudge_location, 2) 446 << "Scheduling a nudge with " 447 << delay.InMilliseconds() << " ms delay"; 448 scheduled_nudge_time_ = incoming_run_time; 449 pending_wakeup_timer_.Start( 450 nudge_location, 451 delay, 452 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge, 453 weak_ptr_factory_.GetWeakPtr())); 454 } 455 456 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 457 switch (mode) { 458 ENUM_CASE(CONFIGURATION_MODE); 459 ENUM_CASE(NORMAL_MODE); 460 } 461 return ""; 462 } 463 464 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { 465 DCHECK(CalledOnValidThread()); 466 DCHECK(CanRunNudgeJobNow(priority)); 467 468 DVLOG(2) << "Will run normal mode sync cycle with routing info " 469 << ModelSafeRoutingInfoToString(session_context_->routing_info()); 470 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 471 bool premature_exit = !syncer_->NormalSyncShare( 472 GetEnabledAndUnthrottledTypes(), 473 nudge_tracker_, 474 session.get()); 475 AdjustPolling(FORCE_RESET); 476 // Don't run poll job till the next time poll timer fires. 477 do_poll_after_credentials_updated_ = false; 478 479 bool success = !premature_exit 480 && !sessions::HasSyncerError( 481 session->status_controller().model_neutral_state()); 482 483 if (success) { 484 // That cycle took care of any outstanding work we had. 485 SDVLOG(2) << "Nudge succeeded."; 486 nudge_tracker_.RecordSuccessfulSyncCycle(); 487 scheduled_nudge_time_ = base::TimeTicks(); 488 489 // If we're here, then we successfully reached the server. End all backoff. 490 wait_interval_.reset(); 491 NotifyRetryTime(base::Time()); 492 return; 493 } else { 494 HandleFailure(session->status_controller().model_neutral_state()); 495 } 496 } 497 498 bool SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { 499 DCHECK(CalledOnValidThread()); 500 DCHECK_EQ(mode_, CONFIGURATION_MODE); 501 502 if (!CanRunJobNow(priority)) { 503 SDVLOG(2) << "Unable to run configure job right now."; 504 return false; 505 } 506 507 SDVLOG(2) << "Will run configure SyncShare with routes " 508 << ModelSafeRoutingInfoToString(session_context_->routing_info()); 509 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 510 bool premature_exit = !syncer_->ConfigureSyncShare( 511 GetRoutingInfoTypes(session_context_->routing_info()), 512 pending_configure_params_->source, 513 session.get()); 514 AdjustPolling(FORCE_RESET); 515 // Don't run poll job till the next time poll timer fires. 516 do_poll_after_credentials_updated_ = false; 517 518 bool success = !premature_exit 519 && !sessions::HasSyncerError( 520 session->status_controller().model_neutral_state()); 521 522 if (success) { 523 SDVLOG(2) << "Configure succeeded."; 524 pending_configure_params_->ready_task.Run(); 525 pending_configure_params_.reset(); 526 527 // If we're here, then we successfully reached the server. End all backoff. 528 wait_interval_.reset(); 529 NotifyRetryTime(base::Time()); 530 return true; 531 } else { 532 HandleFailure(session->status_controller().model_neutral_state()); 533 return false; 534 } 535 } 536 537 void SyncSchedulerImpl::HandleFailure( 538 const sessions::ModelNeutralState& model_neutral_state) { 539 if (IsCurrentlyThrottled()) { 540 SDVLOG(2) << "Was throttled during previous sync cycle."; 541 RestartWaiting(); 542 } else if (!IsBackingOff()) { 543 // Setup our backoff if this is our first such failure. 544 TimeDelta length = delay_provider_->GetDelay( 545 delay_provider_->GetInitialDelay(model_neutral_state)); 546 wait_interval_.reset( 547 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); 548 SDVLOG(2) << "Sync cycle failed. Will back off for " 549 << wait_interval_->length.InMilliseconds() << "ms."; 550 RestartWaiting(); 551 } 552 } 553 554 void SyncSchedulerImpl::DoPollSyncSessionJob() { 555 ModelSafeRoutingInfo r; 556 ModelTypeInvalidationMap invalidation_map = 557 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); 558 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); 559 560 if (!CanRunJobNow(NORMAL_PRIORITY)) { 561 SDVLOG(2) << "Unable to run a poll job right now."; 562 return; 563 } 564 565 if (mode_ != NORMAL_MODE) { 566 SDVLOG(2) << "Not running poll job in configure mode."; 567 return; 568 } 569 570 SDVLOG(2) << "Polling with routes " 571 << ModelSafeRoutingInfoToString(session_context_->routing_info()); 572 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 573 syncer_->PollSyncShare( 574 GetEnabledAndUnthrottledTypes(), 575 session.get()); 576 577 AdjustPolling(UPDATE_INTERVAL); 578 579 if (IsCurrentlyThrottled()) { 580 SDVLOG(2) << "Poll request got us throttled."; 581 // The OnSilencedUntil() call set up the WaitInterval for us. All we need 582 // to do is start the timer. 583 RestartWaiting(); 584 } 585 } 586 587 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) { 588 DCHECK(CalledOnValidThread()); 589 base::TimeTicks now = TimeTicks::Now(); 590 // Update timing information for how often datatypes are triggering nudges. 591 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) { 592 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()]; 593 last_local_nudges_by_model_type_[iter.Get()] = now; 594 if (previous.is_null()) 595 continue; 596 597 #define PER_DATA_TYPE_MACRO(type_str) \ 598 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 599 SYNC_DATA_TYPE_HISTOGRAM(iter.Get()); 600 #undef PER_DATA_TYPE_MACRO 601 } 602 } 603 604 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) { 605 DCHECK(CalledOnValidThread()); 606 607 TimeDelta poll = (!session_context_->notifications_enabled() || 608 !session_context_->ShouldFetchUpdatesBeforeCommit()) ? 609 syncer_short_poll_interval_seconds_ : 610 syncer_long_poll_interval_seconds_; 611 bool rate_changed = !poll_timer_.IsRunning() || 612 poll != poll_timer_.GetCurrentDelay(); 613 614 if (type == FORCE_RESET && !rate_changed) 615 poll_timer_.Reset(); 616 617 if (!rate_changed) 618 return; 619 620 // Adjust poll rate. 621 poll_timer_.Stop(); 622 poll_timer_.Start(FROM_HERE, poll, this, 623 &SyncSchedulerImpl::PollTimerCallback); 624 } 625 626 void SyncSchedulerImpl::RestartWaiting() { 627 CHECK(wait_interval_.get()); 628 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); 629 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 630 SDVLOG(2) << "Starting WaitInterval timer of length " 631 << wait_interval_->length.InMilliseconds() << "ms."; 632 if (wait_interval_->mode == WaitInterval::THROTTLED) { 633 pending_wakeup_timer_.Start( 634 FROM_HERE, 635 wait_interval_->length, 636 base::Bind(&SyncSchedulerImpl::Unthrottle, 637 weak_ptr_factory_.GetWeakPtr())); 638 } else { 639 pending_wakeup_timer_.Start( 640 FROM_HERE, 641 wait_interval_->length, 642 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry, 643 weak_ptr_factory_.GetWeakPtr())); 644 } 645 } 646 647 void SyncSchedulerImpl::RequestStop() { 648 syncer_->RequestEarlyExit(); // Safe to call from any thread. 649 DCHECK(weak_handle_this_.IsInitialized()); 650 SDVLOG(3) << "Posting StopImpl"; 651 weak_handle_this_.Call(FROM_HERE, 652 &SyncSchedulerImpl::StopImpl); 653 } 654 655 void SyncSchedulerImpl::StopImpl() { 656 DCHECK(CalledOnValidThread()); 657 SDVLOG(2) << "StopImpl called"; 658 659 // Kill any in-flight method calls. 660 weak_ptr_factory_.InvalidateWeakPtrs(); 661 wait_interval_.reset(); 662 NotifyRetryTime(base::Time()); 663 poll_timer_.Stop(); 664 pending_wakeup_timer_.Stop(); 665 pending_configure_params_.reset(); 666 if (started_) 667 started_ = false; 668 } 669 670 // This is the only place where we invoke DoSyncSessionJob with canary 671 // privileges. Everyone else should use NORMAL_PRIORITY. 672 void SyncSchedulerImpl::TryCanaryJob() { 673 DCHECK(CalledOnValidThread()); 674 675 if (mode_ == CONFIGURATION_MODE && pending_configure_params_) { 676 SDVLOG(2) << "Found pending configure job; will run as canary"; 677 DoConfigurationSyncSessionJob(CANARY_PRIORITY); 678 } else if (mode_ == NORMAL_MODE && nudge_tracker_.IsSyncRequired() && 679 CanRunNudgeJobNow(CANARY_PRIORITY)) { 680 SDVLOG(2) << "Found pending nudge job; will run as canary"; 681 DoNudgeSyncSessionJob(CANARY_PRIORITY); 682 } else if (mode_ == NORMAL_MODE && CanRunJobNow(CANARY_PRIORITY) && 683 do_poll_after_credentials_updated_) { 684 // Retry poll if poll timer recently fired and ProfileSyncService received 685 // fresh access token. 686 DoPollSyncSessionJob(); 687 } else { 688 SDVLOG(2) << "Found no work to do; will not run a canary"; 689 } 690 // Don't run poll job till the next time poll timer fires. 691 do_poll_after_credentials_updated_ = false; 692 } 693 694 void SyncSchedulerImpl::PollTimerCallback() { 695 DCHECK(CalledOnValidThread()); 696 if (no_scheduling_allowed_) { 697 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in 698 // functions that are called only on the sync thread. This function is also 699 // called only on the sync thread, and only when it is posted by an expiring 700 // timer. If we find that no_scheduling_allowed_ is set here, then 701 // something is very wrong. Maybe someone mistakenly called us directly, or 702 // mishandled the book-keeping for no_scheduling_allowed_. 703 NOTREACHED() << "Illegal to schedule job while session in progress."; 704 return; 705 } 706 707 DoPollSyncSessionJob(); 708 // Poll timer fires infrequently. Usually by this time access token is already 709 // expired and poll job will fail with auth error. Set flag to retry poll once 710 // ProfileSyncService gets new access token, TryCanaryJob will be called in 711 // this case. 712 if (HttpResponse::SYNC_AUTH_ERROR == 713 session_context_->connection_manager()->server_status()) { 714 do_poll_after_credentials_updated_ = true; 715 } 716 } 717 718 void SyncSchedulerImpl::Unthrottle() { 719 DCHECK(CalledOnValidThread()); 720 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 721 722 // We're no longer throttled, so clear the wait interval. 723 wait_interval_.reset(); 724 NotifyRetryTime(base::Time()); 725 726 // We treat this as a 'canary' in the sense that it was originally scheduled 727 // to run some time ago, failed, and we now want to retry, versus a job that 728 // was just created (e.g via ScheduleNudgeImpl). The main implication is 729 // that we're careful to update routing info (etc) with such potentially 730 // stale canary jobs. 731 TryCanaryJob(); 732 } 733 734 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) { 735 DCHECK(CalledOnValidThread()); 736 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time); 737 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 738 739 if (nudge_tracker_.IsAnyTypeThrottled()) { 740 base::TimeDelta time_until_next_unthrottle = 741 nudge_tracker_.GetTimeUntilNextUnthrottle(unthrottle_time); 742 type_unthrottle_timer_.Start( 743 FROM_HERE, 744 time_until_next_unthrottle, 745 base::Bind(&SyncSchedulerImpl::TypeUnthrottle, 746 weak_ptr_factory_.GetWeakPtr(), 747 unthrottle_time + time_until_next_unthrottle)); 748 } 749 750 // Maybe this is a good time to run a nudge job. Let's try it. 751 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) 752 DoNudgeSyncSessionJob(NORMAL_PRIORITY); 753 } 754 755 void SyncSchedulerImpl::PerformDelayedNudge() { 756 // Circumstances may have changed since we scheduled this delayed nudge. 757 // We must check to see if it's OK to run the job before we do so. 758 if (CanRunNudgeJobNow(NORMAL_PRIORITY)) 759 DoNudgeSyncSessionJob(NORMAL_PRIORITY); 760 761 // We're not responsible for setting up any retries here. The functions that 762 // first put us into a state that prevents successful sync cycles (eg. global 763 // throttling, type throttling, network errors, transient errors) will also 764 // setup the appropriate retry logic (eg. retry after timeout, exponential 765 // backoff, retry when the network changes). 766 } 767 768 void SyncSchedulerImpl::ExponentialBackoffRetry() { 769 TryCanaryJob(); 770 771 if (IsBackingOff()) { 772 // If we succeeded, our wait interval would have been cleared. If it hasn't 773 // been cleared, then we should increase our backoff interval and schedule 774 // another retry. 775 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length); 776 wait_interval_.reset( 777 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); 778 SDVLOG(2) << "Sync cycle failed. Will back off for " 779 << wait_interval_->length.InMilliseconds() << "ms."; 780 RestartWaiting(); 781 } 782 } 783 784 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { 785 DCHECK(CalledOnValidThread()); 786 session_context_->NotifyListeners(SyncEngineEvent(cause)); 787 } 788 789 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { 790 SyncEngineEvent event(SyncEngineEvent::RETRY_TIME_CHANGED); 791 event.retry_time = retry_time; 792 session_context_->NotifyListeners(event); 793 } 794 795 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) { 796 SyncEngineEvent event(SyncEngineEvent::THROTTLED_TYPES_CHANGED); 797 event.throttled_types = types; 798 session_context_->NotifyListeners(event); 799 } 800 801 bool SyncSchedulerImpl::IsBackingOff() const { 802 DCHECK(CalledOnValidThread()); 803 return wait_interval_.get() && wait_interval_->mode == 804 WaitInterval::EXPONENTIAL_BACKOFF; 805 } 806 807 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) { 808 DCHECK(CalledOnValidThread()); 809 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, 810 throttle_duration)); 811 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 812 } 813 814 void SyncSchedulerImpl::OnTypesThrottled( 815 ModelTypeSet types, 816 const base::TimeDelta& throttle_duration) { 817 base::TimeTicks now = base::TimeTicks::Now(); 818 819 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now); 820 base::TimeDelta time_until_next_unthrottle = 821 nudge_tracker_.GetTimeUntilNextUnthrottle(now); 822 type_unthrottle_timer_.Start( 823 FROM_HERE, 824 time_until_next_unthrottle, 825 base::Bind(&SyncSchedulerImpl::TypeUnthrottle, 826 weak_ptr_factory_.GetWeakPtr(), 827 now + time_until_next_unthrottle)); 828 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 829 } 830 831 bool SyncSchedulerImpl::IsCurrentlyThrottled() { 832 DCHECK(CalledOnValidThread()); 833 return wait_interval_.get() && wait_interval_->mode == 834 WaitInterval::THROTTLED; 835 } 836 837 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( 838 const base::TimeDelta& new_interval) { 839 DCHECK(CalledOnValidThread()); 840 syncer_short_poll_interval_seconds_ = new_interval; 841 } 842 843 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( 844 const base::TimeDelta& new_interval) { 845 DCHECK(CalledOnValidThread()); 846 syncer_long_poll_interval_seconds_ = new_interval; 847 } 848 849 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay( 850 const base::TimeDelta& new_delay) { 851 DCHECK(CalledOnValidThread()); 852 sessions_commit_delay_ = new_delay; 853 } 854 855 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) { 856 if (size > 0) 857 nudge_tracker_.SetHintBufferSize(size); 858 else 859 NOTREACHED() << "Hint buffer size should be > 0."; 860 } 861 862 void SyncSchedulerImpl::OnShouldStopSyncingPermanently() { 863 DCHECK(CalledOnValidThread()); 864 SDVLOG(2) << "OnShouldStopSyncingPermanently"; 865 syncer_->RequestEarlyExit(); // Thread-safe. 866 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); 867 } 868 869 void SyncSchedulerImpl::OnActionableError( 870 const sessions::SyncSessionSnapshot& snap) { 871 DCHECK(CalledOnValidThread()); 872 SDVLOG(2) << "OnActionableError"; 873 SyncEngineEvent event(SyncEngineEvent::ACTIONABLE_ERROR); 874 event.snapshot = snap; 875 session_context_->NotifyListeners(event); 876 } 877 878 void SyncSchedulerImpl::OnSyncProtocolError( 879 const sessions::SyncSessionSnapshot& snapshot) { 880 DCHECK(CalledOnValidThread()); 881 if (ShouldRequestEarlyExit( 882 snapshot.model_neutral_state().sync_protocol_error)) { 883 SDVLOG(2) << "Sync Scheduler requesting early exit."; 884 syncer_->RequestEarlyExit(); // Thread-safe. 885 } 886 if (IsActionableError(snapshot.model_neutral_state().sync_protocol_error)) 887 OnActionableError(snapshot); 888 } 889 890 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { 891 DCHECK(CalledOnValidThread()); 892 session_context_->set_notifications_enabled(notifications_enabled); 893 if (notifications_enabled) 894 nudge_tracker_.OnInvalidationsEnabled(); 895 else 896 nudge_tracker_.OnInvalidationsDisabled(); 897 } 898 899 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const { 900 DCHECK(CalledOnValidThread()); 901 return sessions_commit_delay_; 902 } 903 904 #undef SDVLOG_LOC 905 906 #undef SDVLOG 907 908 #undef SLOG 909 910 #undef ENUM_CASE 911 912 } // namespace syncer 913