1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/sync/engine/syncer_thread.h" 6 7 #include <algorithm> 8 9 #include "base/rand_util.h" 10 #include "chrome/browser/sync/engine/syncer.h" 11 12 using base::TimeDelta; 13 using base::TimeTicks; 14 15 namespace browser_sync { 16 17 using sessions::SyncSession; 18 using sessions::SyncSessionSnapshot; 19 using sessions::SyncSourceInfo; 20 using syncable::ModelTypePayloadMap; 21 using syncable::ModelTypeBitSet; 22 using sync_pb::GetUpdatesCallerInfo; 23 24 SyncerThread::DelayProvider::DelayProvider() {} 25 SyncerThread::DelayProvider::~DelayProvider() {} 26 27 SyncerThread::WaitInterval::WaitInterval() {} 28 SyncerThread::WaitInterval::~WaitInterval() {} 29 30 SyncerThread::SyncSessionJob::SyncSessionJob() {} 31 SyncerThread::SyncSessionJob::~SyncSessionJob() {} 32 33 SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose, 34 base::TimeTicks start, 35 linked_ptr<sessions::SyncSession> session, bool is_canary_job, 36 const tracked_objects::Location& nudge_location) : purpose(purpose), 37 scheduled_start(start), 38 session(session), 39 is_canary_job(is_canary_job), 40 nudge_location(nudge_location) { 41 } 42 43 TimeDelta SyncerThread::DelayProvider::GetDelay( 44 const base::TimeDelta& last_delay) { 45 return SyncerThread::GetRecommendedDelay(last_delay); 46 } 47 48 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( 49 NudgeSource source) { 50 switch (source) { 51 case NUDGE_SOURCE_NOTIFICATION: 52 return GetUpdatesCallerInfo::NOTIFICATION; 53 case NUDGE_SOURCE_LOCAL: 54 return GetUpdatesCallerInfo::LOCAL; 55 case NUDGE_SOURCE_CONTINUATION: 56 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; 57 case NUDGE_SOURCE_UNKNOWN: 58 return GetUpdatesCallerInfo::UNKNOWN; 59 default: 60 NOTREACHED(); 61 return GetUpdatesCallerInfo::UNKNOWN; 62 } 63 } 64 65 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) 66 : mode(mode), had_nudge(false), length(length) { } 67 68 SyncerThread::SyncerThread(sessions::SyncSessionContext* context, 69 Syncer* syncer) 70 : thread_("SyncEngine_SyncerThread"), 71 syncer_short_poll_interval_seconds_( 72 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 73 syncer_long_poll_interval_seconds_( 74 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 75 mode_(NORMAL_MODE), 76 server_connection_ok_(false), 77 delay_provider_(new DelayProvider()), 78 syncer_(syncer), 79 session_context_(context) { 80 } 81 82 SyncerThread::~SyncerThread() { 83 DCHECK(!thread_.IsRunning()); 84 } 85 86 void SyncerThread::CheckServerConnectionManagerStatus( 87 HttpResponse::ServerConnectionCode code) { 88 89 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." 90 << "Old mode: " << server_connection_ok_ << " Code: " << code; 91 // Note, be careful when adding cases here because if the SyncerThread 92 // thinks there is no valid connection as determined by this method, it 93 // will drop out of *all* forward progress sync loops (it won't poll and it 94 // will queue up Talk notifications but not actually call SyncShare) until 95 // some external action causes a ServerConnectionManager to broadcast that 96 // a valid connection has been re-established. 97 if (HttpResponse::CONNECTION_UNAVAILABLE == code || 98 HttpResponse::SYNC_AUTH_ERROR == code) { 99 server_connection_ok_ = false; 100 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." 101 << " new mode:" << server_connection_ok_; 102 } else if (HttpResponse::SERVER_CONNECTION_OK == code) { 103 server_connection_ok_ = true; 104 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." 105 << " new mode:" << server_connection_ok_; 106 DoCanaryJob(); 107 } 108 } 109 110 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { 111 VLOG(1) << "SyncerThread(" << this << ")" << " Start called from thread " 112 << MessageLoop::current()->thread_name(); 113 if (!thread_.IsRunning()) { 114 VLOG(1) << "SyncerThread(" << this << ")" << " Starting thread with mode " 115 << mode; 116 if (!thread_.Start()) { 117 NOTREACHED() << "Unable to start SyncerThread."; 118 return; 119 } 120 WatchConnectionManager(); 121 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 122 this, &SyncerThread::SendInitialSnapshot)); 123 } 124 125 VLOG(1) << "SyncerThread(" << this << ")" << " Entering start with mode = " 126 << mode; 127 128 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 129 this, &SyncerThread::StartImpl, mode, callback)); 130 } 131 132 void SyncerThread::SendInitialSnapshot() { 133 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 134 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, 135 SyncSourceInfo(), ModelSafeRoutingInfo(), 136 std::vector<ModelSafeWorker*>())); 137 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 138 sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); 139 event.snapshot = &snapshot; 140 session_context_->NotifyListeners(event); 141 } 142 143 void SyncerThread::WatchConnectionManager() { 144 ServerConnectionManager* scm = session_context_->connection_manager(); 145 CheckServerConnectionManagerStatus(scm->server_status()); 146 scm->AddListener(this); 147 } 148 149 void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) { 150 VLOG(1) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " 151 << mode; 152 153 // TODO(lipalani): This will leak if startimpl is never run. Fix it using a 154 // ThreadSafeRefcounted object. 155 scoped_ptr<ModeChangeCallback> scoped_callback(callback); 156 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 157 DCHECK(!session_context_->account_name().empty()); 158 DCHECK(syncer_.get()); 159 mode_ = mode; 160 AdjustPolling(NULL); // Will kick start poll timer if needed. 161 if (scoped_callback.get()) 162 scoped_callback->Run(); 163 164 // We just changed our mode. See if there are any pending jobs that we could 165 // execute in the new mode. 166 DoPendingJobIfPossible(false); 167 } 168 169 SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( 170 const SyncSessionJob& job) { 171 172 DCHECK(wait_interval_.get()); 173 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); 174 175 VLOG(1) << "SyncerThread(" << this << ")" << " Wait interval mode : " 176 << wait_interval_->mode << "Wait interval had nudge : " 177 << wait_interval_->had_nudge << "is canary job : " 178 << job.is_canary_job; 179 180 if (job.purpose == SyncSessionJob::POLL) 181 return DROP; 182 183 DCHECK(job.purpose == SyncSessionJob::NUDGE || 184 job.purpose == SyncSessionJob::CONFIGURATION); 185 if (wait_interval_->mode == WaitInterval::THROTTLED) 186 return SAVE; 187 188 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); 189 if (job.purpose == SyncSessionJob::NUDGE) { 190 if (mode_ == CONFIGURATION_MODE) 191 return SAVE; 192 193 // If we already had one nudge then just drop this nudge. We will retry 194 // later when the timer runs out. 195 return wait_interval_->had_nudge ? DROP : CONTINUE; 196 } 197 // This is a config job. 198 return job.is_canary_job ? CONTINUE : SAVE; 199 } 200 201 SyncerThread::JobProcessDecision SyncerThread::DecideOnJob( 202 const SyncSessionJob& job) { 203 if (job.purpose == SyncSessionJob::CLEAR_USER_DATA) 204 return CONTINUE; 205 206 if (wait_interval_.get()) 207 return DecideWhileInWaitInterval(job); 208 209 if (mode_ == CONFIGURATION_MODE) { 210 if (job.purpose == SyncSessionJob::NUDGE) 211 return SAVE; 212 else if (job.purpose == SyncSessionJob::CONFIGURATION) 213 return CONTINUE; 214 else 215 return DROP; 216 } 217 218 // We are in normal mode. 219 DCHECK_EQ(mode_, NORMAL_MODE); 220 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); 221 222 // Freshness condition 223 if (job.scheduled_start < last_sync_session_end_time_) { 224 VLOG(1) << "SyncerThread(" << this << ")" 225 << " Dropping job because of freshness"; 226 return DROP; 227 } 228 229 if (server_connection_ok_) 230 return CONTINUE; 231 232 VLOG(1) << "SyncerThread(" << this << ")" 233 << " Bad server connection. Using that to decide on job."; 234 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; 235 } 236 237 void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { 238 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); 239 if (pending_nudge_.get() == NULL) { 240 VLOG(1) << "SyncerThread(" << this << ")" 241 << " Creating a pending nudge job"; 242 SyncSession* s = job.session.get(); 243 scoped_ptr<SyncSession> session(new SyncSession(s->context(), 244 s->delegate(), s->source(), s->routing_info(), s->workers())); 245 246 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, 247 make_linked_ptr(session.release()), false, job.nudge_location); 248 pending_nudge_.reset(new SyncSessionJob(new_job)); 249 250 return; 251 } 252 253 VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; 254 pending_nudge_->session->Coalesce(*(job.session.get())); 255 pending_nudge_->scheduled_start = job.scheduled_start; 256 257 // Unfortunately the nudge location cannot be modified. So it stores the 258 // location of the first caller. 259 } 260 261 bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { 262 JobProcessDecision decision = DecideOnJob(job); 263 VLOG(1) << "SyncerThread(" << this << ")" << " Should run job, decision: " 264 << decision << " Job purpose " << job.purpose << "mode " << mode_; 265 if (decision != SAVE) 266 return decision == CONTINUE; 267 268 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == 269 SyncSessionJob::CONFIGURATION); 270 271 SaveJob(job); 272 return false; 273 } 274 275 void SyncerThread::SaveJob(const SyncSessionJob& job) { 276 DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); 277 if (job.purpose == SyncSessionJob::NUDGE) { 278 VLOG(1) << "SyncerThread(" << this << ")" << " Saving a nudge job"; 279 InitOrCoalescePendingJob(job); 280 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ 281 VLOG(1) << "SyncerThread(" << this << ")" << " Saving a configuration job"; 282 DCHECK(wait_interval_.get()); 283 DCHECK(mode_ == CONFIGURATION_MODE); 284 285 SyncSession* old = job.session.get(); 286 SyncSession* s(new SyncSession(session_context_.get(), this, 287 old->source(), old->routing_info(), old->workers())); 288 SyncSessionJob new_job(job.purpose, TimeTicks::Now(), 289 make_linked_ptr(s), false, job.nudge_location); 290 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); 291 } // drop the rest. 292 } 293 294 // Functor for std::find_if to search by ModelSafeGroup. 295 struct ModelSafeWorkerGroupIs { 296 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} 297 bool operator()(ModelSafeWorker* w) { 298 return group == w->GetModelSafeGroup(); 299 } 300 ModelSafeGroup group; 301 }; 302 303 void SyncerThread::ScheduleClearUserData() { 304 if (!thread_.IsRunning()) { 305 NOTREACHED(); 306 return; 307 } 308 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 309 this, &SyncerThread::ScheduleClearUserDataImpl)); 310 } 311 312 void SyncerThread::ScheduleNudge(const TimeDelta& delay, 313 NudgeSource source, const ModelTypeBitSet& types, 314 const tracked_objects::Location& nudge_location) { 315 if (!thread_.IsRunning()) { 316 NOTREACHED(); 317 return; 318 } 319 320 VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled"; 321 322 ModelTypePayloadMap types_with_payloads = 323 syncable::ModelTypePayloadMapFromBitSet(types, std::string()); 324 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 325 this, &SyncerThread::ScheduleNudgeImpl, delay, 326 GetUpdatesFromNudgeSource(source), types_with_payloads, false, 327 nudge_location)); 328 } 329 330 void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, 331 NudgeSource source, const ModelTypePayloadMap& types_with_payloads, 332 const tracked_objects::Location& nudge_location) { 333 if (!thread_.IsRunning()) { 334 NOTREACHED(); 335 return; 336 } 337 338 VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; 339 340 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 341 this, &SyncerThread::ScheduleNudgeImpl, delay, 342 GetUpdatesFromNudgeSource(source), types_with_payloads, false, 343 nudge_location)); 344 } 345 346 void SyncerThread::ScheduleClearUserDataImpl() { 347 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 348 SyncSession* session = new SyncSession(session_context_.get(), this, 349 SyncSourceInfo(), ModelSafeRoutingInfo(), 350 std::vector<ModelSafeWorker*>()); 351 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), 352 SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); 353 } 354 355 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, 356 GetUpdatesCallerInfo::GetUpdatesSource source, 357 const ModelTypePayloadMap& types_with_payloads, 358 bool is_canary_job, const tracked_objects::Location& nudge_location) { 359 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 360 361 VLOG(1) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; 362 // Note we currently nudge for all types regardless of the ones incurring 363 // the nudge. Doing different would throw off some syncer commands like 364 // CleanupDisabledTypes. We may want to change this in the future. 365 SyncSourceInfo info(source, types_with_payloads); 366 367 SyncSession* session(CreateSyncSession(info)); 368 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, 369 make_linked_ptr(session), is_canary_job, 370 nudge_location); 371 372 session = NULL; 373 if (!ShouldRunJob(job)) 374 return; 375 376 if (pending_nudge_.get()) { 377 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { 378 VLOG(1) << "SyncerThread(" << this << ")" << " Dropping the nudge because" 379 << "we are in backoff"; 380 return; 381 } 382 383 VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; 384 pending_nudge_->session->Coalesce(*(job.session.get())); 385 386 if (!IsBackingOff()) { 387 VLOG(1) << "SyncerThread(" << this << ")" << " Dropping a nudge because" 388 << " we are not in backoff and the job was coalesced"; 389 return; 390 } else { 391 VLOG(1) << "SyncerThread(" << this << ")" 392 << " Rescheduling pending nudge"; 393 SyncSession* s = pending_nudge_->session.get(); 394 job.session.reset(new SyncSession(s->context(), s->delegate(), 395 s->source(), s->routing_info(), s->workers())); 396 pending_nudge_.reset(); 397 } 398 } 399 400 // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. 401 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), 402 nudge_location); 403 } 404 405 // Helper to extract the routing info and workers corresponding to types in 406 // |types| from |registrar|. 407 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, 408 ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, 409 std::vector<ModelSafeWorker*>* workers) { 410 ModelSafeRoutingInfo r_tmp; 411 std::vector<ModelSafeWorker*> w_tmp; 412 registrar->GetModelSafeRoutingInfo(&r_tmp); 413 registrar->GetWorkers(&w_tmp); 414 415 bool passive_group_added = false; 416 417 typedef std::vector<ModelSafeWorker*>::const_iterator iter; 418 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { 419 if (!types.test(i)) 420 continue; 421 syncable::ModelType t = syncable::ModelTypeFromInt(i); 422 DCHECK_EQ(1U, r_tmp.count(t)); 423 (*routes)[t] = r_tmp[t]; 424 iter it = std::find_if(w_tmp.begin(), w_tmp.end(), 425 ModelSafeWorkerGroupIs(r_tmp[t])); 426 if (it != w_tmp.end()) { 427 iter it2 = std::find_if(workers->begin(), workers->end(), 428 ModelSafeWorkerGroupIs(r_tmp[t])); 429 if (it2 == workers->end()) 430 workers->push_back(*it); 431 432 if (r_tmp[t] == GROUP_PASSIVE) 433 passive_group_added = true; 434 } else { 435 NOTREACHED(); 436 } 437 } 438 439 // Always add group passive. 440 if (passive_group_added == false) { 441 iter it = std::find_if(w_tmp.begin(), w_tmp.end(), 442 ModelSafeWorkerGroupIs(GROUP_PASSIVE)); 443 if (it != w_tmp.end()) 444 workers->push_back(*it); 445 else 446 NOTREACHED(); 447 } 448 } 449 450 void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) { 451 if (!thread_.IsRunning()) { 452 NOTREACHED(); 453 return; 454 } 455 456 VLOG(1) << "SyncerThread(" << this << ")" << " Scheduling a config"; 457 ModelSafeRoutingInfo routes; 458 std::vector<ModelSafeWorker*> workers; 459 GetModelSafeParamsForTypes(types, session_context_->registrar(), 460 &routes, &workers); 461 462 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 463 this, &SyncerThread::ScheduleConfigImpl, routes, workers, 464 GetUpdatesCallerInfo::FIRST_UPDATE)); 465 } 466 467 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, 468 const std::vector<ModelSafeWorker*>& workers, 469 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { 470 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 471 472 VLOG(1) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; 473 // TODO(tim): config-specific GetUpdatesCallerInfo value? 474 SyncSession* session = new SyncSession(session_context_.get(), this, 475 SyncSourceInfo(source, 476 syncable::ModelTypePayloadMapFromRoutingInfo( 477 routing_info, std::string())), 478 routing_info, workers); 479 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), 480 SyncSessionJob::CONFIGURATION, session, FROM_HERE); 481 } 482 483 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, 484 SyncSessionJob::SyncSessionJobPurpose purpose, 485 sessions::SyncSession* session, 486 const tracked_objects::Location& nudge_location) { 487 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 488 489 SyncSessionJob job(purpose, TimeTicks::Now() + delay, 490 make_linked_ptr(session), false, nudge_location); 491 if (purpose == SyncSessionJob::NUDGE) { 492 VLOG(1) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" 493 << " ScheduleSyncSessionJob"; 494 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); 495 pending_nudge_.reset(new SyncSessionJob(job)); 496 } 497 VLOG(1) << "SyncerThread(" << this << ")" 498 << " Posting job to execute in DoSyncSessionJob. Job purpose " 499 << job.purpose; 500 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, 501 &SyncerThread::DoSyncSessionJob, job), 502 delay.InMilliseconds()); 503 } 504 505 void SyncerThread::SetSyncerStepsForPurpose( 506 SyncSessionJob::SyncSessionJobPurpose purpose, 507 SyncerStep* start, SyncerStep* end) { 508 *end = SYNCER_END; 509 switch (purpose) { 510 case SyncSessionJob::CONFIGURATION: 511 *start = DOWNLOAD_UPDATES; 512 *end = APPLY_UPDATES; 513 return; 514 case SyncSessionJob::CLEAR_USER_DATA: 515 *start = CLEAR_PRIVATE_DATA; 516 return; 517 case SyncSessionJob::NUDGE: 518 case SyncSessionJob::POLL: 519 *start = SYNCER_BEGIN; 520 return; 521 default: 522 NOTREACHED(); 523 } 524 } 525 526 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { 527 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 528 if (!ShouldRunJob(job)) { 529 LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = " 530 << job.session->source().updates_source; 531 return; 532 } 533 534 if (job.purpose == SyncSessionJob::NUDGE) { 535 if (pending_nudge_.get() == NULL || pending_nudge_->session != job.session) 536 return; // Another nudge must have been scheduled in in the meantime. 537 pending_nudge_.reset(); 538 } 539 VLOG(1) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " 540 << job.purpose; 541 542 SyncerStep begin(SYNCER_BEGIN); 543 SyncerStep end(SYNCER_END); 544 SetSyncerStepsForPurpose(job.purpose, &begin, &end); 545 546 bool has_more_to_sync = true; 547 while (ShouldRunJob(job) && has_more_to_sync) { 548 VLOG(1) << "SyncerThread(" << this << ")" 549 << " SyncerThread: Calling SyncShare."; 550 // Synchronously perform the sync session from this thread. 551 syncer_->SyncShare(job.session.get(), begin, end); 552 has_more_to_sync = job.session->HasMoreToSync(); 553 if (has_more_to_sync) 554 job.session->ResetTransientState(); 555 } 556 VLOG(1) << "SyncerThread(" << this << ")" 557 << " SyncerThread: Done SyncShare looping."; 558 FinishSyncSessionJob(job); 559 } 560 561 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { 562 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 563 // Whatever types were part of a configuration task will have had updates 564 // downloaded. For that reason, we make sure they get recorded in the 565 // event that they get disabled at a later time. 566 ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); 567 if (!r.empty()) { 568 ModelSafeRoutingInfo temp_r; 569 ModelSafeRoutingInfo old_info(old_job.session->routing_info()); 570 std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(), 571 std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin())); 572 session_context_->set_previous_session_routing_info(temp_r); 573 } 574 } else { 575 session_context_->set_previous_session_routing_info( 576 old_job.session->routing_info()); 577 } 578 } 579 580 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { 581 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 582 // Update timing information for how often datatypes are triggering nudges. 583 base::TimeTicks now = TimeTicks::Now(); 584 if (!last_sync_session_end_time_.is_null()) { 585 ModelTypePayloadMap::const_iterator iter; 586 for (iter = job.session->source().types.begin(); 587 iter != job.session->source().types.end(); 588 ++iter) { 589 syncable::PostTimeToTypeHistogram(iter->first, 590 now - last_sync_session_end_time_); 591 } 592 } 593 last_sync_session_end_time_ = now; 594 UpdateCarryoverSessionState(job); 595 if (IsSyncingCurrentlySilenced()) { 596 VLOG(1) << "SyncerThread(" << this << ")" 597 << " We are currently throttled. So not scheduling the next sync."; 598 SaveJob(job); 599 return; // Nothing to do. 600 } 601 602 VLOG(1) << "SyncerThread(" << this << ")" 603 << " Updating the next polling time after SyncMain"; 604 ScheduleNextSync(job); 605 } 606 607 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { 608 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 609 DCHECK(!old_job.session->HasMoreToSync()); 610 // Note: |num_server_changes_remaining| > 0 here implies that we received a 611 // broken response while trying to download all updates, because the Syncer 612 // will loop until this value is exhausted. Also, if unsynced_handles exist 613 // but HasMoreToSync is false, this implies that the Syncer determined no 614 // forward progress was possible at this time (an error, such as an HTTP 615 // 500, is likely to have occurred during commit). 616 const bool work_to_do = 617 old_job.session->status_controller()->num_server_changes_remaining() > 0 618 || old_job.session->status_controller()->unsynced_handles().size() > 0; 619 VLOG(1) << "SyncerThread(" << this << ")" << " syncer has work to do: " 620 << work_to_do; 621 622 AdjustPolling(&old_job); 623 624 // TODO(tim): Old impl had special code if notifications disabled. Needed? 625 if (!work_to_do) { 626 // Success implies backoff relief. Note that if this was a "one-off" job 627 // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was 628 // work_to_do before it ran this wont have changed, as jobs like this don't 629 // run a full sync cycle. So we don't need special code here. 630 wait_interval_.reset(); 631 VLOG(1) << "SyncerThread(" << this << ")" 632 << " Job suceeded so not scheduling more jobs"; 633 return; 634 } 635 636 if (old_job.session->source().updates_source == 637 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { 638 VLOG(1) << "SyncerThread(" << this << ")" 639 << " Job failed with source continuation"; 640 // We don't seem to have made forward progress. Start or extend backoff. 641 HandleConsecutiveContinuationError(old_job); 642 } else if (IsBackingOff()) { 643 VLOG(1) << "SyncerThread(" << this << ")" 644 << " A nudge during backoff failed"; 645 // We weren't continuing but we're in backoff; must have been a nudge. 646 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); 647 DCHECK(!wait_interval_->had_nudge); 648 wait_interval_->had_nudge = true; 649 wait_interval_->timer.Reset(); 650 } else { 651 VLOG(1) << "SyncerThread(" << this << ")" 652 << " Failed. Schedule a job with continuation as source"; 653 // We weren't continuing and we aren't in backoff. Schedule a normal 654 // continuation. 655 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 656 ScheduleConfigImpl(old_job.session->routing_info(), 657 old_job.session->workers(), 658 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); 659 } else { 660 // For all other purposes(nudge and poll) we schedule a retry nudge. 661 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), 662 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), 663 old_job.session->source().types, false, FROM_HERE); 664 } 665 } 666 } 667 668 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { 669 DCHECK(thread_.IsRunning()); 670 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 671 672 TimeDelta poll = (!session_context_->notifications_enabled()) ? 673 syncer_short_poll_interval_seconds_ : 674 syncer_long_poll_interval_seconds_; 675 bool rate_changed = !poll_timer_.IsRunning() || 676 poll != poll_timer_.GetCurrentDelay(); 677 678 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) 679 poll_timer_.Reset(); 680 681 if (!rate_changed) 682 return; 683 684 // Adjust poll rate. 685 poll_timer_.Stop(); 686 poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); 687 } 688 689 void SyncerThread::HandleConsecutiveContinuationError( 690 const SyncSessionJob& old_job) { 691 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 692 // This if conditions should be compiled out in retail builds. 693 if (IsBackingOff()) { 694 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); 695 } 696 SyncSession* old = old_job.session.get(); 697 SyncSession* s(new SyncSession(session_context_.get(), this, 698 old->source(), old->routing_info(), old->workers())); 699 TimeDelta length = delay_provider_->GetDelay( 700 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); 701 702 VLOG(1) << "SyncerThread(" << this << ")" 703 << " In handle continuation error. Old job purpose is " 704 << old_job.purpose; 705 VLOG(1) << "SyncerThread(" << this << ")" 706 << " In Handle continuation error. The time delta(ms) is: " 707 << length.InMilliseconds(); 708 709 // This will reset the had_nudge variable as well. 710 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 711 length)); 712 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 713 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, 714 make_linked_ptr(s), false, FROM_HERE); 715 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job)); 716 } else { 717 // We are not in configuration mode. So wait_interval's pending job 718 // should be null. 719 DCHECK(wait_interval_->pending_configure_job.get() == NULL); 720 721 // TODO(lipalani) - handle clear user data. 722 InitOrCoalescePendingJob(old_job); 723 } 724 wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); 725 } 726 727 // static 728 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { 729 if (last_delay.InSeconds() >= kMaxBackoffSeconds) 730 return TimeDelta::FromSeconds(kMaxBackoffSeconds); 731 732 // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 733 int64 backoff_s = 734 std::max(static_cast<int64>(1), 735 last_delay.InSeconds() * kBackoffRandomizationFactor); 736 737 // Flip a coin to randomize backoff interval by +/- 50%. 738 int rand_sign = base::RandInt(0, 1) * 2 - 1; 739 740 // Truncation is adequate for rounding here. 741 backoff_s = backoff_s + 742 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); 743 744 // Cap the backoff interval. 745 backoff_s = std::max(static_cast<int64>(1), 746 std::min(backoff_s, kMaxBackoffSeconds)); 747 748 return TimeDelta::FromSeconds(backoff_s); 749 } 750 751 void SyncerThread::Stop() { 752 VLOG(1) << "SyncerThread(" << this << ")" << " stop called"; 753 syncer_->RequestEarlyExit(); // Safe to call from any thread. 754 session_context_->connection_manager()->RemoveListener(this); 755 thread_.Stop(); 756 } 757 758 void SyncerThread::DoCanaryJob() { 759 VLOG(1) << "SyncerThread(" << this << ")" << " Do canary job"; 760 DoPendingJobIfPossible(true); 761 } 762 763 void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { 764 SyncSessionJob* job_to_execute = NULL; 765 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() 766 && wait_interval_->pending_configure_job.get()) { 767 VLOG(1) << "SyncerThread(" << this << ")" << " Found pending configure job"; 768 job_to_execute = wait_interval_->pending_configure_job.get(); 769 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { 770 VLOG(1) << "SyncerThread(" << this << ")" << " Found pending nudge job"; 771 // Pending jobs mostly have time from the past. Reset it so this job 772 // will get executed. 773 if (pending_nudge_->scheduled_start < TimeTicks::Now()) 774 pending_nudge_->scheduled_start = TimeTicks::Now(); 775 776 scoped_ptr<SyncSession> session(CreateSyncSession( 777 pending_nudge_->session->source())); 778 779 // Also the routing info might have been changed since we cached the 780 // pending nudge. Update it by coalescing to the latest. 781 pending_nudge_->session->Coalesce(*(session.get())); 782 // The pending nudge would be cleared in the DoSyncSessionJob function. 783 job_to_execute = pending_nudge_.get(); 784 } 785 786 if (job_to_execute != NULL) { 787 VLOG(1) << "SyncerThread(" << this << ")" << " Executing pending job"; 788 SyncSessionJob copy = *job_to_execute; 789 copy.is_canary_job = is_canary_job; 790 DoSyncSessionJob(copy); 791 } 792 } 793 794 SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { 795 ModelSafeRoutingInfo routes; 796 std::vector<ModelSafeWorker*> workers; 797 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); 798 session_context_->registrar()->GetWorkers(&workers); 799 SyncSourceInfo info(source); 800 801 SyncSession* session(new SyncSession(session_context_.get(), this, info, 802 routes, workers)); 803 804 return session; 805 } 806 807 void SyncerThread::PollTimerCallback() { 808 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 809 ModelSafeRoutingInfo r; 810 ModelTypePayloadMap types_with_payloads = 811 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); 812 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); 813 SyncSession* s = CreateSyncSession(info); 814 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, 815 FROM_HERE); 816 } 817 818 void SyncerThread::Unthrottle() { 819 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 820 VLOG(1) << "SyncerThread(" << this << ")" << " Unthrottled.."; 821 DoCanaryJob(); 822 wait_interval_.reset(); 823 } 824 825 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { 826 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 827 session_context_->NotifyListeners(SyncEngineEvent(cause)); 828 } 829 830 bool SyncerThread::IsBackingOff() const { 831 return wait_interval_.get() && wait_interval_->mode == 832 WaitInterval::EXPONENTIAL_BACKOFF; 833 } 834 835 void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { 836 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, 837 silenced_until - TimeTicks::Now())); 838 wait_interval_->timer.Start(wait_interval_->length, this, 839 &SyncerThread::Unthrottle); 840 } 841 842 bool SyncerThread::IsSyncingCurrentlySilenced() { 843 return wait_interval_.get() && wait_interval_->mode == 844 WaitInterval::THROTTLED; 845 } 846 847 void SyncerThread::OnReceivedShortPollIntervalUpdate( 848 const base::TimeDelta& new_interval) { 849 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 850 syncer_short_poll_interval_seconds_ = new_interval; 851 } 852 853 void SyncerThread::OnReceivedLongPollIntervalUpdate( 854 const base::TimeDelta& new_interval) { 855 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 856 syncer_long_poll_interval_seconds_ = new_interval; 857 } 858 859 void SyncerThread::OnShouldStopSyncingPermanently() { 860 VLOG(1) << "SyncerThread(" << this << ")" 861 << " OnShouldStopSyncingPermanently"; 862 syncer_->RequestEarlyExit(); // Thread-safe. 863 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); 864 } 865 866 void SyncerThread::OnServerConnectionEvent( 867 const ServerConnectionEvent2& event) { 868 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, 869 &SyncerThread::CheckServerConnectionManagerStatus, 870 event.connection_code)); 871 } 872 873 void SyncerThread::set_notifications_enabled(bool notifications_enabled) { 874 session_context_->set_notifications_enabled(notifications_enabled); 875 } 876 877 } // browser_sync 878