1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/engine/sync_scheduler_impl.h" 6 7 #include <algorithm> 8 #include <cstring> 9 10 #include "base/auto_reset.h" 11 #include "base/bind.h" 12 #include "base/bind_helpers.h" 13 #include "base/compiler_specific.h" 14 #include "base/location.h" 15 #include "base/logging.h" 16 #include "base/message_loop/message_loop.h" 17 #include "sync/engine/backoff_delay_provider.h" 18 #include "sync/engine/syncer.h" 19 #include "sync/notifier/object_id_invalidation_map.h" 20 #include "sync/protocol/proto_enum_conversions.h" 21 #include "sync/protocol/sync.pb.h" 22 #include "sync/util/data_type_histogram.h" 23 #include "sync/util/logging.h" 24 25 using base::TimeDelta; 26 using base::TimeTicks; 27 28 namespace syncer { 29 30 using sessions::SyncSession; 31 using sessions::SyncSessionSnapshot; 32 using sync_pb::GetUpdatesCallerInfo; 33 34 namespace { 35 36 bool ShouldRequestEarlyExit(const SyncProtocolError& error) { 37 switch (error.error_type) { 38 case SYNC_SUCCESS: 39 case MIGRATION_DONE: 40 case THROTTLED: 41 case TRANSIENT_ERROR: 42 return false; 43 case NOT_MY_BIRTHDAY: 44 case CLEAR_PENDING: 45 case DISABLED_BY_ADMIN: 46 case USER_ROLLBACK: 47 // If we send terminate sync early then |sync_cycle_ended| notification 48 // would not be sent. If there were no actions then |ACTIONABLE_ERROR| 49 // notification wouldnt be sent either. Then the UI layer would be left 50 // waiting forever. So assert we would send something. 51 DCHECK_NE(error.action, UNKNOWN_ACTION); 52 return true; 53 case INVALID_CREDENTIAL: 54 // The notification for this is handled by PostAndProcessHeaders|. 55 // Server does no have to send any action for this. 56 return true; 57 // Make the default a NOTREACHED. So if a new error is introduced we 58 // think about its expected functionality. 59 default: 60 NOTREACHED(); 61 return false; 62 } 63 } 64 65 bool IsActionableError( 66 const SyncProtocolError& error) { 67 return (error.action != UNKNOWN_ACTION); 68 } 69 } // namespace 70 71 ConfigurationParams::ConfigurationParams() 72 : source(GetUpdatesCallerInfo::UNKNOWN) {} 73 ConfigurationParams::ConfigurationParams( 74 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source, 75 ModelTypeSet types_to_download, 76 const ModelSafeRoutingInfo& routing_info, 77 const base::Closure& ready_task, 78 const base::Closure& retry_task) 79 : source(source), 80 types_to_download(types_to_download), 81 routing_info(routing_info), 82 ready_task(ready_task), 83 retry_task(retry_task) { 84 DCHECK(!ready_task.is_null()); 85 DCHECK(!retry_task.is_null()); 86 } 87 ConfigurationParams::~ConfigurationParams() {} 88 89 SyncSchedulerImpl::WaitInterval::WaitInterval() 90 : mode(UNKNOWN) {} 91 92 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length) 93 : mode(mode), length(length) {} 94 95 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} 96 97 #define ENUM_CASE(x) case x: return #x; break; 98 99 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { 100 switch (mode) { 101 ENUM_CASE(UNKNOWN); 102 ENUM_CASE(EXPONENTIAL_BACKOFF); 103 ENUM_CASE(THROTTLED); 104 } 105 NOTREACHED(); 106 return ""; 107 } 108 109 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( 110 NudgeSource source) { 111 switch (source) { 112 case NUDGE_SOURCE_NOTIFICATION: 113 return GetUpdatesCallerInfo::NOTIFICATION; 114 case NUDGE_SOURCE_LOCAL: 115 return GetUpdatesCallerInfo::LOCAL; 116 case NUDGE_SOURCE_LOCAL_REFRESH: 117 return GetUpdatesCallerInfo::DATATYPE_REFRESH; 118 case NUDGE_SOURCE_UNKNOWN: 119 return GetUpdatesCallerInfo::UNKNOWN; 120 default: 121 NOTREACHED(); 122 return GetUpdatesCallerInfo::UNKNOWN; 123 } 124 } 125 126 // Helper macros to log with the syncer thread name; useful when there 127 // are multiple syncer threads involved. 128 129 #define SLOG(severity) LOG(severity) << name_ << ": " 130 131 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " 132 133 #define SDVLOG_LOC(from_here, verbose_level) \ 134 DVLOG_LOC(from_here, verbose_level) << name_ << ": " 135 136 namespace { 137 138 const int kDefaultSessionsCommitDelaySeconds = 10; 139 140 bool IsConfigRelatedUpdateSourceValue( 141 GetUpdatesCallerInfo::GetUpdatesSource source) { 142 switch (source) { 143 case GetUpdatesCallerInfo::RECONFIGURATION: 144 case GetUpdatesCallerInfo::MIGRATION: 145 case GetUpdatesCallerInfo::NEW_CLIENT: 146 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE: 147 return true; 148 default: 149 return false; 150 } 151 } 152 153 } // namespace 154 155 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name, 156 BackoffDelayProvider* delay_provider, 157 sessions::SyncSessionContext* context, 158 Syncer* syncer) 159 : name_(name), 160 started_(false), 161 syncer_short_poll_interval_seconds_( 162 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 163 syncer_long_poll_interval_seconds_( 164 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 165 sessions_commit_delay_( 166 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), 167 mode_(NORMAL_MODE), 168 delay_provider_(delay_provider), 169 syncer_(syncer), 170 session_context_(context), 171 no_scheduling_allowed_(false), 172 do_poll_after_credentials_updated_(false), 173 next_sync_session_job_priority_(NORMAL_PRIORITY), 174 weak_ptr_factory_(this), 175 weak_ptr_factory_for_weak_handle_(this) { 176 weak_handle_this_ = MakeWeakHandle( 177 weak_ptr_factory_for_weak_handle_.GetWeakPtr()); 178 } 179 180 SyncSchedulerImpl::~SyncSchedulerImpl() { 181 DCHECK(CalledOnValidThread()); 182 Stop(); 183 } 184 185 void SyncSchedulerImpl::OnCredentialsUpdated() { 186 DCHECK(CalledOnValidThread()); 187 188 if (HttpResponse::SYNC_AUTH_ERROR == 189 session_context_->connection_manager()->server_status()) { 190 OnServerConnectionErrorFixed(); 191 } 192 } 193 194 void SyncSchedulerImpl::OnConnectionStatusChange() { 195 if (HttpResponse::CONNECTION_UNAVAILABLE == 196 session_context_->connection_manager()->server_status()) { 197 // Optimistically assume that the connection is fixed and try 198 // connecting. 199 OnServerConnectionErrorFixed(); 200 } 201 } 202 203 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { 204 // There could be a pending nudge or configuration job in several cases: 205 // 206 // 1. We're in exponential backoff. 207 // 2. We're silenced / throttled. 208 // 3. A nudge was saved previously due to not having a valid auth token. 209 // 4. A nudge was scheduled + saved while in configuration mode. 210 // 211 // In all cases except (2), we want to retry contacting the server. We 212 // call TryCanaryJob to achieve this, and note that nothing -- not even a 213 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that 214 // has the authority to do that is the Unthrottle timer. 215 TryCanaryJob(); 216 } 217 218 void SyncSchedulerImpl::Start(Mode mode) { 219 DCHECK(CalledOnValidThread()); 220 std::string thread_name = base::MessageLoop::current()->thread_name(); 221 if (thread_name.empty()) 222 thread_name = "<Main thread>"; 223 SDVLOG(2) << "Start called from thread " 224 << thread_name << " with mode " << GetModeString(mode); 225 if (!started_) { 226 started_ = true; 227 SendInitialSnapshot(); 228 } 229 230 DCHECK(!session_context_->account_name().empty()); 231 DCHECK(syncer_.get()); 232 Mode old_mode = mode_; 233 mode_ = mode; 234 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed. 235 236 if (old_mode != mode_ && mode_ == NORMAL_MODE) { 237 // We just got back to normal mode. Let's try to run the work that was 238 // queued up while we were configuring. 239 240 // Update our current time before checking IsRetryRequired(). 241 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now()); 242 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) { 243 TrySyncSessionJob(); 244 } 245 } 246 } 247 248 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() { 249 ModelTypeSet enabled_types = session_context_->GetEnabledTypes(); 250 ModelTypeSet enabled_protocol_types = 251 Intersection(ProtocolTypes(), enabled_types); 252 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes(); 253 return Difference(enabled_protocol_types, throttled_types); 254 } 255 256 void SyncSchedulerImpl::SendInitialSnapshot() { 257 DCHECK(CalledOnValidThread()); 258 scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this)); 259 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED); 260 event.snapshot = dummy->TakeSnapshot(); 261 FOR_EACH_OBSERVER(SyncEngineEventListener, 262 *session_context_->listeners(), 263 OnSyncCycleEvent(event)); 264 } 265 266 namespace { 267 268 // Helper to extract the routing info corresponding to types in 269 // |types_to_download| from |current_routes|. 270 void BuildModelSafeParams( 271 ModelTypeSet types_to_download, 272 const ModelSafeRoutingInfo& current_routes, 273 ModelSafeRoutingInfo* result_routes) { 274 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good(); 275 iter.Inc()) { 276 ModelType type = iter.Get(); 277 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type); 278 DCHECK(route != current_routes.end()); 279 ModelSafeGroup group = route->second; 280 (*result_routes)[type] = group; 281 } 282 } 283 284 } // namespace. 285 286 void SyncSchedulerImpl::ScheduleConfiguration( 287 const ConfigurationParams& params) { 288 DCHECK(CalledOnValidThread()); 289 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); 290 DCHECK_EQ(CONFIGURATION_MODE, mode_); 291 DCHECK(!params.ready_task.is_null()); 292 CHECK(started_) << "Scheduler must be running to configure."; 293 SDVLOG(2) << "Reconfiguring syncer."; 294 295 // Only one configuration is allowed at a time. Verify we're not waiting 296 // for a pending configure job. 297 DCHECK(!pending_configure_params_); 298 299 ModelSafeRoutingInfo restricted_routes; 300 BuildModelSafeParams(params.types_to_download, 301 params.routing_info, 302 &restricted_routes); 303 session_context_->SetRoutingInfo(restricted_routes); 304 305 // Only reconfigure if we have types to download. 306 if (!params.types_to_download.Empty()) { 307 pending_configure_params_.reset(new ConfigurationParams(params)); 308 TrySyncSessionJob(); 309 } else { 310 SDVLOG(2) << "No change in routing info, calling ready task directly."; 311 params.ready_task.Run(); 312 } 313 } 314 315 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) { 316 DCHECK(CalledOnValidThread()); 317 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) { 318 SDVLOG(1) << "Unable to run a job because we're throttled."; 319 return false; 320 } 321 322 if (wait_interval_ 323 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF 324 && priority != CANARY_PRIORITY) { 325 SDVLOG(1) << "Unable to run a job because we're backing off."; 326 return false; 327 } 328 329 if (session_context_->connection_manager()->HasInvalidAuthToken()) { 330 SDVLOG(1) << "Unable to run a job because we have no valid auth token."; 331 return false; 332 } 333 334 return true; 335 } 336 337 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) { 338 DCHECK(CalledOnValidThread()); 339 340 if (!CanRunJobNow(priority)) { 341 SDVLOG(1) << "Unable to run a nudge job right now"; 342 return false; 343 } 344 345 const ModelTypeSet enabled_types = session_context_->GetEnabledTypes(); 346 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) { 347 SDVLOG(1) << "Not running a nudge because we're fully type throttled."; 348 return false; 349 } 350 351 if (mode_ == CONFIGURATION_MODE) { 352 SDVLOG(1) << "Not running nudge because we're in configuration mode."; 353 return false; 354 } 355 356 return true; 357 } 358 359 void SyncSchedulerImpl::ScheduleLocalNudge( 360 const TimeDelta& desired_delay, 361 ModelTypeSet types, 362 const tracked_objects::Location& nudge_location) { 363 DCHECK(CalledOnValidThread()); 364 DCHECK(!types.Empty()); 365 366 SDVLOG_LOC(nudge_location, 2) 367 << "Scheduling sync because of local change to " 368 << ModelTypeSetToString(types); 369 UpdateNudgeTimeRecords(types); 370 nudge_tracker_.RecordLocalChange(types); 371 ScheduleNudgeImpl(desired_delay, nudge_location); 372 } 373 374 void SyncSchedulerImpl::ScheduleLocalRefreshRequest( 375 const TimeDelta& desired_delay, 376 ModelTypeSet types, 377 const tracked_objects::Location& nudge_location) { 378 DCHECK(CalledOnValidThread()); 379 DCHECK(!types.Empty()); 380 381 SDVLOG_LOC(nudge_location, 2) 382 << "Scheduling sync because of local refresh request for " 383 << ModelTypeSetToString(types); 384 nudge_tracker_.RecordLocalRefreshRequest(types); 385 ScheduleNudgeImpl(desired_delay, nudge_location); 386 } 387 388 void SyncSchedulerImpl::ScheduleInvalidationNudge( 389 const TimeDelta& desired_delay, 390 const ObjectIdInvalidationMap& invalidation_map, 391 const tracked_objects::Location& nudge_location) { 392 DCHECK(CalledOnValidThread()); 393 DCHECK(!invalidation_map.Empty()); 394 395 SDVLOG_LOC(nudge_location, 2) 396 << "Scheduling sync because we received invalidation for " 397 << ModelTypeSetToString( 398 ObjectIdSetToModelTypeSet(invalidation_map.GetObjectIds())); 399 nudge_tracker_.RecordRemoteInvalidation(invalidation_map); 400 ScheduleNudgeImpl(desired_delay, nudge_location); 401 } 402 403 // TODO(zea): Consider adding separate throttling/backoff for datatype 404 // refresh requests. 405 void SyncSchedulerImpl::ScheduleNudgeImpl( 406 const TimeDelta& delay, 407 const tracked_objects::Location& nudge_location) { 408 DCHECK(CalledOnValidThread()); 409 410 if (no_scheduling_allowed_) { 411 NOTREACHED() << "Illegal to schedule job while session in progress."; 412 return; 413 } 414 415 if (!started_) { 416 SDVLOG_LOC(nudge_location, 2) 417 << "Dropping nudge, scheduler is not running."; 418 return; 419 } 420 421 SDVLOG_LOC(nudge_location, 2) 422 << "In ScheduleNudgeImpl with delay " 423 << delay.InMilliseconds() << " ms"; 424 425 if (!CanRunNudgeJobNow(NORMAL_PRIORITY)) 426 return; 427 428 TimeTicks incoming_run_time = TimeTicks::Now() + delay; 429 if (!scheduled_nudge_time_.is_null() && 430 (scheduled_nudge_time_ < incoming_run_time)) { 431 // Old job arrives sooner than this one. Don't reschedule it. 432 return; 433 } 434 435 // Either there is no existing nudge in flight or the incoming nudge should be 436 // made to arrive first (preempt) the existing nudge. We reschedule in either 437 // case. 438 SDVLOG_LOC(nudge_location, 2) 439 << "Scheduling a nudge with " 440 << delay.InMilliseconds() << " ms delay"; 441 scheduled_nudge_time_ = incoming_run_time; 442 pending_wakeup_timer_.Start( 443 nudge_location, 444 delay, 445 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge, 446 weak_ptr_factory_.GetWeakPtr())); 447 } 448 449 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 450 switch (mode) { 451 ENUM_CASE(CONFIGURATION_MODE); 452 ENUM_CASE(NORMAL_MODE); 453 } 454 return ""; 455 } 456 457 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) { 458 DCHECK(CalledOnValidThread()); 459 DCHECK(CanRunNudgeJobNow(priority)); 460 461 DVLOG(2) << "Will run normal mode sync cycle with types " 462 << ModelTypeSetToString(session_context_->GetEnabledTypes()); 463 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 464 bool premature_exit = !syncer_->NormalSyncShare( 465 GetEnabledAndUnthrottledTypes(), 466 nudge_tracker_, 467 session.get()); 468 AdjustPolling(FORCE_RESET); 469 // Don't run poll job till the next time poll timer fires. 470 do_poll_after_credentials_updated_ = false; 471 472 bool success = !premature_exit 473 && !sessions::HasSyncerError( 474 session->status_controller().model_neutral_state()); 475 476 if (success) { 477 // That cycle took care of any outstanding work we had. 478 SDVLOG(2) << "Nudge succeeded."; 479 nudge_tracker_.RecordSuccessfulSyncCycle(); 480 scheduled_nudge_time_ = base::TimeTicks(); 481 482 // If we're here, then we successfully reached the server. End all backoff. 483 wait_interval_.reset(); 484 NotifyRetryTime(base::Time()); 485 } else { 486 HandleFailure(session->status_controller().model_neutral_state()); 487 } 488 } 489 490 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) { 491 DCHECK(CalledOnValidThread()); 492 DCHECK_EQ(mode_, CONFIGURATION_MODE); 493 DCHECK(pending_configure_params_ != NULL); 494 495 if (!CanRunJobNow(priority)) { 496 SDVLOG(2) << "Unable to run configure job right now."; 497 if (!pending_configure_params_->retry_task.is_null()) { 498 pending_configure_params_->retry_task.Run(); 499 pending_configure_params_->retry_task.Reset(); 500 } 501 return; 502 } 503 504 SDVLOG(2) << "Will run configure SyncShare with types " 505 << ModelTypeSetToString(session_context_->GetEnabledTypes()); 506 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 507 bool premature_exit = !syncer_->ConfigureSyncShare( 508 pending_configure_params_->types_to_download, 509 pending_configure_params_->source, 510 session.get()); 511 AdjustPolling(FORCE_RESET); 512 // Don't run poll job till the next time poll timer fires. 513 do_poll_after_credentials_updated_ = false; 514 515 bool success = !premature_exit 516 && !sessions::HasSyncerError( 517 session->status_controller().model_neutral_state()); 518 519 if (success) { 520 SDVLOG(2) << "Configure succeeded."; 521 pending_configure_params_->ready_task.Run(); 522 pending_configure_params_.reset(); 523 524 // If we're here, then we successfully reached the server. End all backoff. 525 wait_interval_.reset(); 526 NotifyRetryTime(base::Time()); 527 } else { 528 HandleFailure(session->status_controller().model_neutral_state()); 529 // Sync cycle might receive response from server that causes scheduler to 530 // stop and draws pending_configure_params_ invalid. 531 if (started_ && !pending_configure_params_->retry_task.is_null()) { 532 pending_configure_params_->retry_task.Run(); 533 pending_configure_params_->retry_task.Reset(); 534 } 535 } 536 } 537 538 void SyncSchedulerImpl::HandleFailure( 539 const sessions::ModelNeutralState& model_neutral_state) { 540 if (IsCurrentlyThrottled()) { 541 SDVLOG(2) << "Was throttled during previous sync cycle."; 542 RestartWaiting(); 543 } else if (!IsBackingOff()) { 544 // Setup our backoff if this is our first such failure. 545 TimeDelta length = delay_provider_->GetDelay( 546 delay_provider_->GetInitialDelay(model_neutral_state)); 547 wait_interval_.reset( 548 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); 549 SDVLOG(2) << "Sync cycle failed. Will back off for " 550 << wait_interval_->length.InMilliseconds() << "ms."; 551 RestartWaiting(); 552 } 553 } 554 555 void SyncSchedulerImpl::DoPollSyncSessionJob() { 556 base::AutoReset<bool> protector(&no_scheduling_allowed_, true); 557 558 SDVLOG(2) << "Polling with types " 559 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes()); 560 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this)); 561 syncer_->PollSyncShare( 562 GetEnabledAndUnthrottledTypes(), 563 session.get()); 564 565 AdjustPolling(FORCE_RESET); 566 567 if (IsCurrentlyThrottled()) { 568 SDVLOG(2) << "Poll request got us throttled."; 569 // The OnSilencedUntil() call set up the WaitInterval for us. All we need 570 // to do is start the timer. 571 RestartWaiting(); 572 } 573 } 574 575 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) { 576 DCHECK(CalledOnValidThread()); 577 base::TimeTicks now = TimeTicks::Now(); 578 // Update timing information for how often datatypes are triggering nudges. 579 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) { 580 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()]; 581 last_local_nudges_by_model_type_[iter.Get()] = now; 582 if (previous.is_null()) 583 continue; 584 585 #define PER_DATA_TYPE_MACRO(type_str) \ 586 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 587 SYNC_DATA_TYPE_HISTOGRAM(iter.Get()); 588 #undef PER_DATA_TYPE_MACRO 589 } 590 } 591 592 TimeDelta SyncSchedulerImpl::GetPollInterval() { 593 return (!session_context_->notifications_enabled() || 594 !session_context_->ShouldFetchUpdatesBeforeCommit()) ? 595 syncer_short_poll_interval_seconds_ : 596 syncer_long_poll_interval_seconds_; 597 } 598 599 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) { 600 DCHECK(CalledOnValidThread()); 601 602 TimeDelta poll = GetPollInterval(); 603 bool rate_changed = !poll_timer_.IsRunning() || 604 poll != poll_timer_.GetCurrentDelay(); 605 606 if (type == FORCE_RESET) { 607 last_poll_reset_ = base::TimeTicks::Now(); 608 if (!rate_changed) 609 poll_timer_.Reset(); 610 } 611 612 if (!rate_changed) 613 return; 614 615 // Adjust poll rate. 616 poll_timer_.Stop(); 617 poll_timer_.Start(FROM_HERE, poll, this, 618 &SyncSchedulerImpl::PollTimerCallback); 619 } 620 621 void SyncSchedulerImpl::RestartWaiting() { 622 CHECK(wait_interval_.get()); 623 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0)); 624 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 625 SDVLOG(2) << "Starting WaitInterval timer of length " 626 << wait_interval_->length.InMilliseconds() << "ms."; 627 if (wait_interval_->mode == WaitInterval::THROTTLED) { 628 pending_wakeup_timer_.Start( 629 FROM_HERE, 630 wait_interval_->length, 631 base::Bind(&SyncSchedulerImpl::Unthrottle, 632 weak_ptr_factory_.GetWeakPtr())); 633 } else { 634 pending_wakeup_timer_.Start( 635 FROM_HERE, 636 wait_interval_->length, 637 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry, 638 weak_ptr_factory_.GetWeakPtr())); 639 } 640 } 641 642 void SyncSchedulerImpl::Stop() { 643 DCHECK(CalledOnValidThread()); 644 SDVLOG(2) << "Stop called"; 645 646 // Kill any in-flight method calls. 647 weak_ptr_factory_.InvalidateWeakPtrs(); 648 wait_interval_.reset(); 649 NotifyRetryTime(base::Time()); 650 poll_timer_.Stop(); 651 pending_wakeup_timer_.Stop(); 652 pending_configure_params_.reset(); 653 if (started_) 654 started_ = false; 655 } 656 657 // This is the only place where we invoke DoSyncSessionJob with canary 658 // privileges. Everyone else should use NORMAL_PRIORITY. 659 void SyncSchedulerImpl::TryCanaryJob() { 660 next_sync_session_job_priority_ = CANARY_PRIORITY; 661 TrySyncSessionJob(); 662 } 663 664 void SyncSchedulerImpl::TrySyncSessionJob() { 665 // Post call to TrySyncSessionJobImpl on current thread. Later request for 666 // access token will be here. 667 base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind( 668 &SyncSchedulerImpl::TrySyncSessionJobImpl, 669 weak_ptr_factory_.GetWeakPtr())); 670 } 671 672 void SyncSchedulerImpl::TrySyncSessionJobImpl() { 673 JobPriority priority = next_sync_session_job_priority_; 674 next_sync_session_job_priority_ = NORMAL_PRIORITY; 675 676 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now()); 677 678 DCHECK(CalledOnValidThread()); 679 if (mode_ == CONFIGURATION_MODE) { 680 if (pending_configure_params_) { 681 SDVLOG(2) << "Found pending configure job"; 682 DoConfigurationSyncSessionJob(priority); 683 } 684 } else if (CanRunNudgeJobNow(priority)) { 685 if (nudge_tracker_.IsSyncRequired()) { 686 SDVLOG(2) << "Found pending nudge job"; 687 DoNudgeSyncSessionJob(priority); 688 } else if (do_poll_after_credentials_updated_ || 689 ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) { 690 DoPollSyncSessionJob(); 691 // Poll timer fires infrequently. Usually by this time access token is 692 // already expired and poll job will fail with auth error. Set flag to 693 // retry poll once ProfileSyncService gets new access token, TryCanaryJob 694 // will be called after access token is retrieved. 695 if (HttpResponse::SYNC_AUTH_ERROR == 696 session_context_->connection_manager()->server_status()) { 697 do_poll_after_credentials_updated_ = true; 698 } 699 } 700 } 701 702 if (priority == CANARY_PRIORITY) { 703 // If this is canary job then whatever result was don't run poll job till 704 // the next time poll timer fires. 705 do_poll_after_credentials_updated_ = false; 706 } 707 708 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) { 709 // If we succeeded, our wait interval would have been cleared. If it hasn't 710 // been cleared, then we should increase our backoff interval and schedule 711 // another retry. 712 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length); 713 wait_interval_.reset( 714 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length)); 715 SDVLOG(2) << "Sync cycle failed. Will back off for " 716 << wait_interval_->length.InMilliseconds() << "ms."; 717 RestartWaiting(); 718 } 719 } 720 721 void SyncSchedulerImpl::PollTimerCallback() { 722 DCHECK(CalledOnValidThread()); 723 if (no_scheduling_allowed_) { 724 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in 725 // functions that are called only on the sync thread. This function is also 726 // called only on the sync thread, and only when it is posted by an expiring 727 // timer. If we find that no_scheduling_allowed_ is set here, then 728 // something is very wrong. Maybe someone mistakenly called us directly, or 729 // mishandled the book-keeping for no_scheduling_allowed_. 730 NOTREACHED() << "Illegal to schedule job while session in progress."; 731 return; 732 } 733 734 TrySyncSessionJob(); 735 } 736 737 void SyncSchedulerImpl::RetryTimerCallback() { 738 TrySyncSessionJob(); 739 } 740 741 void SyncSchedulerImpl::Unthrottle() { 742 DCHECK(CalledOnValidThread()); 743 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 744 745 // We're no longer throttled, so clear the wait interval. 746 wait_interval_.reset(); 747 NotifyRetryTime(base::Time()); 748 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 749 750 // We treat this as a 'canary' in the sense that it was originally scheduled 751 // to run some time ago, failed, and we now want to retry, versus a job that 752 // was just created (e.g via ScheduleNudgeImpl). The main implication is 753 // that we're careful to update routing info (etc) with such potentially 754 // stale canary jobs. 755 TryCanaryJob(); 756 } 757 758 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) { 759 DCHECK(CalledOnValidThread()); 760 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time); 761 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 762 763 if (nudge_tracker_.IsAnyTypeThrottled()) { 764 const base::TimeTicks now = base::TimeTicks::Now(); 765 base::TimeDelta time_until_next_unthrottle = 766 nudge_tracker_.GetTimeUntilNextUnthrottle(now); 767 type_unthrottle_timer_.Start( 768 FROM_HERE, 769 time_until_next_unthrottle, 770 base::Bind(&SyncSchedulerImpl::TypeUnthrottle, 771 weak_ptr_factory_.GetWeakPtr(), 772 now + time_until_next_unthrottle)); 773 } 774 775 // Maybe this is a good time to run a nudge job. Let's try it. 776 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) 777 TrySyncSessionJob(); 778 } 779 780 void SyncSchedulerImpl::PerformDelayedNudge() { 781 // Circumstances may have changed since we scheduled this delayed nudge. 782 // We must check to see if it's OK to run the job before we do so. 783 if (CanRunNudgeJobNow(NORMAL_PRIORITY)) 784 TrySyncSessionJob(); 785 786 // We're not responsible for setting up any retries here. The functions that 787 // first put us into a state that prevents successful sync cycles (eg. global 788 // throttling, type throttling, network errors, transient errors) will also 789 // setup the appropriate retry logic (eg. retry after timeout, exponential 790 // backoff, retry when the network changes). 791 } 792 793 void SyncSchedulerImpl::ExponentialBackoffRetry() { 794 TryCanaryJob(); 795 } 796 797 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) { 798 FOR_EACH_OBSERVER(SyncEngineEventListener, 799 *session_context_->listeners(), 800 OnRetryTimeChanged(retry_time)); 801 } 802 803 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) { 804 FOR_EACH_OBSERVER(SyncEngineEventListener, 805 *session_context_->listeners(), 806 OnThrottledTypesChanged(types)); 807 } 808 809 bool SyncSchedulerImpl::IsBackingOff() const { 810 DCHECK(CalledOnValidThread()); 811 return wait_interval_.get() && wait_interval_->mode == 812 WaitInterval::EXPONENTIAL_BACKOFF; 813 } 814 815 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) { 816 DCHECK(CalledOnValidThread()); 817 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, 818 throttle_duration)); 819 NotifyRetryTime(base::Time::Now() + wait_interval_->length); 820 NotifyThrottledTypesChanged(ModelTypeSet::All()); 821 } 822 823 void SyncSchedulerImpl::OnTypesThrottled( 824 ModelTypeSet types, 825 const base::TimeDelta& throttle_duration) { 826 base::TimeTicks now = base::TimeTicks::Now(); 827 828 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now); 829 base::TimeDelta time_until_next_unthrottle = 830 nudge_tracker_.GetTimeUntilNextUnthrottle(now); 831 type_unthrottle_timer_.Start( 832 FROM_HERE, 833 time_until_next_unthrottle, 834 base::Bind(&SyncSchedulerImpl::TypeUnthrottle, 835 weak_ptr_factory_.GetWeakPtr(), 836 now + time_until_next_unthrottle)); 837 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes()); 838 } 839 840 bool SyncSchedulerImpl::IsCurrentlyThrottled() { 841 DCHECK(CalledOnValidThread()); 842 return wait_interval_.get() && wait_interval_->mode == 843 WaitInterval::THROTTLED; 844 } 845 846 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( 847 const base::TimeDelta& new_interval) { 848 DCHECK(CalledOnValidThread()); 849 syncer_short_poll_interval_seconds_ = new_interval; 850 } 851 852 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate( 853 const base::TimeDelta& new_interval) { 854 DCHECK(CalledOnValidThread()); 855 syncer_long_poll_interval_seconds_ = new_interval; 856 } 857 858 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay( 859 const base::TimeDelta& new_delay) { 860 DCHECK(CalledOnValidThread()); 861 sessions_commit_delay_ = new_delay; 862 } 863 864 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) { 865 if (size > 0) 866 nudge_tracker_.SetHintBufferSize(size); 867 else 868 NOTREACHED() << "Hint buffer size should be > 0."; 869 } 870 871 void SyncSchedulerImpl::OnSyncProtocolError( 872 const SyncProtocolError& sync_protocol_error) { 873 DCHECK(CalledOnValidThread()); 874 if (ShouldRequestEarlyExit(sync_protocol_error)) { 875 SDVLOG(2) << "Sync Scheduler requesting early exit."; 876 Stop(); 877 } 878 if (IsActionableError(sync_protocol_error)) { 879 SDVLOG(2) << "OnActionableError"; 880 FOR_EACH_OBSERVER(SyncEngineEventListener, 881 *session_context_->listeners(), 882 OnActionableError(sync_protocol_error)); 883 } 884 } 885 886 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) { 887 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay); 888 retry_timer_.Start(FROM_HERE, delay, this, 889 &SyncSchedulerImpl::RetryTimerCallback); 890 } 891 892 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) { 893 FOR_EACH_OBSERVER(SyncEngineEventListener, 894 *session_context_->listeners(), 895 OnMigrationRequested(types)); 896 } 897 898 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) { 899 DCHECK(CalledOnValidThread()); 900 session_context_->set_notifications_enabled(notifications_enabled); 901 if (notifications_enabled) 902 nudge_tracker_.OnInvalidationsEnabled(); 903 else 904 nudge_tracker_.OnInvalidationsDisabled(); 905 } 906 907 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const { 908 DCHECK(CalledOnValidThread()); 909 return sessions_commit_delay_; 910 } 911 912 #undef SDVLOG_LOC 913 914 #undef SDVLOG 915 916 #undef SLOG 917 918 #undef ENUM_CASE 919 920 } // namespace syncer 921