1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "components/invalidation/non_blocking_invalidator.h" 6 7 #include <cstddef> 8 9 #include "base/location.h" 10 #include "base/logging.h" 11 #include "base/memory/scoped_ptr.h" 12 #include "base/single_thread_task_runner.h" 13 #include "base/thread_task_runner_handle.h" 14 #include "base/threading/thread.h" 15 #include "components/invalidation/gcm_network_channel_delegate.h" 16 #include "components/invalidation/invalidation_handler.h" 17 #include "components/invalidation/invalidation_notifier.h" 18 #include "components/invalidation/object_id_invalidation_map.h" 19 #include "components/invalidation/sync_system_resources.h" 20 #include "jingle/notifier/listener/push_client.h" 21 22 namespace syncer { 23 24 struct NonBlockingInvalidator::InitializeOptions { 25 InitializeOptions( 26 NetworkChannelCreator network_channel_creator, 27 const std::string& invalidator_client_id, 28 const UnackedInvalidationsMap& saved_invalidations, 29 const std::string& invalidation_bootstrap_data, 30 const base::WeakPtr<InvalidationStateTracker>& invalidation_state_tracker, 31 const scoped_refptr<base::SingleThreadTaskRunner>& 32 invalidation_state_tracker_task_runner, 33 const std::string& client_info, 34 scoped_refptr<net::URLRequestContextGetter> request_context_getter) 35 : network_channel_creator(network_channel_creator), 36 invalidator_client_id(invalidator_client_id), 37 saved_invalidations(saved_invalidations), 38 invalidation_bootstrap_data(invalidation_bootstrap_data), 39 invalidation_state_tracker(invalidation_state_tracker), 40 invalidation_state_tracker_task_runner( 41 invalidation_state_tracker_task_runner), 42 client_info(client_info), 43 request_context_getter(request_context_getter) {} 44 45 NetworkChannelCreator network_channel_creator; 46 std::string invalidator_client_id; 47 UnackedInvalidationsMap saved_invalidations; 48 std::string invalidation_bootstrap_data; 49 base::WeakPtr<InvalidationStateTracker> invalidation_state_tracker; 50 scoped_refptr<base::SingleThreadTaskRunner> 51 invalidation_state_tracker_task_runner; 52 std::string client_info; 53 scoped_refptr<net::URLRequestContextGetter> request_context_getter; 54 }; 55 56 namespace { 57 // This class provides a wrapper for a logging class in order to receive 58 // callbacks across threads, without having to worry about owner threads. 59 class CallbackProxy { 60 public: 61 explicit CallbackProxy( 62 base::Callback<void(const base::DictionaryValue&)> callback); 63 ~CallbackProxy(); 64 65 void Run(const base::DictionaryValue& value); 66 67 private: 68 static void DoRun(base::Callback<void(const base::DictionaryValue&)> callback, 69 scoped_ptr<base::DictionaryValue> value); 70 71 base::Callback<void(const base::DictionaryValue&)> callback_; 72 scoped_refptr<base::SingleThreadTaskRunner> running_thread_; 73 74 DISALLOW_COPY_AND_ASSIGN(CallbackProxy); 75 }; 76 77 CallbackProxy::CallbackProxy( 78 base::Callback<void(const base::DictionaryValue&)> callback) 79 : callback_(callback), 80 running_thread_(base::ThreadTaskRunnerHandle::Get()) {} 81 82 CallbackProxy::~CallbackProxy() {} 83 84 void CallbackProxy::DoRun( 85 base::Callback<void(const base::DictionaryValue&)> callback, 86 scoped_ptr<base::DictionaryValue> value) { 87 callback.Run(*value); 88 } 89 90 void CallbackProxy::Run(const base::DictionaryValue& value) { 91 scoped_ptr<base::DictionaryValue> copied(value.DeepCopy()); 92 running_thread_->PostTask( 93 FROM_HERE, 94 base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied))); 95 } 96 } 97 98 class NonBlockingInvalidator::Core 99 : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>, 100 // InvalidationHandler to observe the InvalidationNotifier we create. 101 public InvalidationHandler { 102 public: 103 // Called on parent thread. |delegate_observer| should be initialized. 104 Core(const base::WeakPtr<NonBlockingInvalidator>& delegate_observer, 105 const scoped_refptr<base::SingleThreadTaskRunner>& 106 delegate_observer_task_runner); 107 108 // Helpers called on I/O thread. 109 void Initialize( 110 const NonBlockingInvalidator::InitializeOptions& initialize_options); 111 void Teardown(); 112 void UpdateRegisteredIds(const ObjectIdSet& ids); 113 void UpdateCredentials(const std::string& email, const std::string& token); 114 void RequestDetailedStatus( 115 base::Callback<void(const base::DictionaryValue&)> callback) const; 116 117 // InvalidationHandler implementation (all called on I/O thread by 118 // InvalidationNotifier). 119 virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE; 120 virtual void OnIncomingInvalidation( 121 const ObjectIdInvalidationMap& invalidation_map) OVERRIDE; 122 virtual std::string GetOwnerName() const OVERRIDE; 123 124 private: 125 friend class 126 base::RefCountedThreadSafe<NonBlockingInvalidator::Core>; 127 // Called on parent or I/O thread. 128 virtual ~Core(); 129 130 // The variables below should be used only on the I/O thread. 131 const base::WeakPtr<NonBlockingInvalidator> delegate_observer_; 132 scoped_refptr<base::SingleThreadTaskRunner> delegate_observer_task_runner_; 133 scoped_ptr<InvalidationNotifier> invalidation_notifier_; 134 scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_; 135 136 DISALLOW_COPY_AND_ASSIGN(Core); 137 }; 138 139 NonBlockingInvalidator::Core::Core( 140 const base::WeakPtr<NonBlockingInvalidator>& delegate_observer, 141 const scoped_refptr<base::SingleThreadTaskRunner>& 142 delegate_observer_task_runner) 143 : delegate_observer_(delegate_observer), 144 delegate_observer_task_runner_(delegate_observer_task_runner) { 145 DCHECK(delegate_observer_); 146 DCHECK(delegate_observer_task_runner_.get()); 147 } 148 149 NonBlockingInvalidator::Core::~Core() { 150 } 151 152 void NonBlockingInvalidator::Core::Initialize( 153 const NonBlockingInvalidator::InitializeOptions& initialize_options) { 154 DCHECK(initialize_options.request_context_getter.get()); 155 network_task_runner_ = 156 initialize_options.request_context_getter->GetNetworkTaskRunner(); 157 DCHECK(network_task_runner_->BelongsToCurrentThread()); 158 scoped_ptr<SyncNetworkChannel> network_channel = 159 initialize_options.network_channel_creator.Run(); 160 invalidation_notifier_.reset(new InvalidationNotifier( 161 network_channel.Pass(), 162 initialize_options.invalidator_client_id, 163 initialize_options.saved_invalidations, 164 initialize_options.invalidation_bootstrap_data, 165 initialize_options.invalidation_state_tracker, 166 initialize_options.invalidation_state_tracker_task_runner, 167 initialize_options.client_info)); 168 invalidation_notifier_->RegisterHandler(this); 169 } 170 171 void NonBlockingInvalidator::Core::Teardown() { 172 DCHECK(network_task_runner_->BelongsToCurrentThread()); 173 invalidation_notifier_->UnregisterHandler(this); 174 invalidation_notifier_.reset(); 175 network_task_runner_ = NULL; 176 } 177 178 void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) { 179 DCHECK(network_task_runner_->BelongsToCurrentThread()); 180 invalidation_notifier_->UpdateRegisteredIds(this, ids); 181 } 182 183 void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email, 184 const std::string& token) { 185 DCHECK(network_task_runner_->BelongsToCurrentThread()); 186 invalidation_notifier_->UpdateCredentials(email, token); 187 } 188 189 void NonBlockingInvalidator::Core::RequestDetailedStatus( 190 base::Callback<void(const base::DictionaryValue&)> callback) const { 191 DCHECK(network_task_runner_->BelongsToCurrentThread()); 192 invalidation_notifier_->RequestDetailedStatus(callback); 193 } 194 195 void NonBlockingInvalidator::Core::OnInvalidatorStateChange( 196 InvalidatorState reason) { 197 DCHECK(network_task_runner_->BelongsToCurrentThread()); 198 delegate_observer_task_runner_->PostTask( 199 FROM_HERE, 200 base::Bind(&NonBlockingInvalidator::OnInvalidatorStateChange, 201 delegate_observer_, 202 reason)); 203 } 204 205 void NonBlockingInvalidator::Core::OnIncomingInvalidation( 206 const ObjectIdInvalidationMap& invalidation_map) { 207 DCHECK(network_task_runner_->BelongsToCurrentThread()); 208 delegate_observer_task_runner_->PostTask( 209 FROM_HERE, 210 base::Bind(&NonBlockingInvalidator::OnIncomingInvalidation, 211 delegate_observer_, 212 invalidation_map)); 213 } 214 215 std::string NonBlockingInvalidator::Core::GetOwnerName() const { 216 return "Sync"; 217 } 218 219 NonBlockingInvalidator::NonBlockingInvalidator( 220 NetworkChannelCreator network_channel_creator, 221 const std::string& invalidator_client_id, 222 const UnackedInvalidationsMap& saved_invalidations, 223 const std::string& invalidation_bootstrap_data, 224 InvalidationStateTracker* invalidation_state_tracker, 225 const std::string& client_info, 226 const scoped_refptr<net::URLRequestContextGetter>& request_context_getter) 227 : invalidation_state_tracker_(invalidation_state_tracker), 228 parent_task_runner_(base::ThreadTaskRunnerHandle::Get()), 229 network_task_runner_(request_context_getter->GetNetworkTaskRunner()), 230 weak_ptr_factory_(this) { 231 base::WeakPtr<NonBlockingInvalidator> weak_ptr_this = 232 weak_ptr_factory_.GetWeakPtr(); 233 weak_ptr_this.get(); // Bind to this thread. 234 235 core_ = new Core(weak_ptr_this, 236 base::MessageLoopProxy::current()); 237 238 InitializeOptions initialize_options(network_channel_creator, 239 invalidator_client_id, 240 saved_invalidations, 241 invalidation_bootstrap_data, 242 weak_ptr_this, 243 base::MessageLoopProxy::current(), 244 client_info, 245 request_context_getter); 246 247 if (!network_task_runner_->PostTask( 248 FROM_HERE, 249 base::Bind( 250 &NonBlockingInvalidator::Core::Initialize, 251 core_.get(), 252 initialize_options))) { 253 NOTREACHED(); 254 } 255 } 256 257 NonBlockingInvalidator::~NonBlockingInvalidator() { 258 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 259 if (!network_task_runner_->PostTask( 260 FROM_HERE, 261 base::Bind(&NonBlockingInvalidator::Core::Teardown, 262 core_.get()))) { 263 DVLOG(1) << "Network thread stopped before invalidator is destroyed."; 264 } 265 } 266 267 void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) { 268 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 269 registrar_.RegisterHandler(handler); 270 } 271 272 void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler, 273 const ObjectIdSet& ids) { 274 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 275 registrar_.UpdateRegisteredIds(handler, ids); 276 if (!network_task_runner_->PostTask( 277 FROM_HERE, 278 base::Bind( 279 &NonBlockingInvalidator::Core::UpdateRegisteredIds, 280 core_.get(), 281 registrar_.GetAllRegisteredIds()))) { 282 NOTREACHED(); 283 } 284 } 285 286 void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) { 287 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 288 registrar_.UnregisterHandler(handler); 289 } 290 291 InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const { 292 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 293 return registrar_.GetInvalidatorState(); 294 } 295 296 void NonBlockingInvalidator::UpdateCredentials(const std::string& email, 297 const std::string& token) { 298 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 299 if (!network_task_runner_->PostTask( 300 FROM_HERE, 301 base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials, 302 core_.get(), email, token))) { 303 NOTREACHED(); 304 } 305 } 306 307 void NonBlockingInvalidator::RequestDetailedStatus( 308 base::Callback<void(const base::DictionaryValue&)> callback) const { 309 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 310 base::Callback<void(const base::DictionaryValue&)> proxy_callback = 311 base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback))); 312 if (!network_task_runner_->PostTask( 313 FROM_HERE, 314 base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus, 315 core_.get(), 316 proxy_callback))) { 317 NOTREACHED(); 318 } 319 } 320 321 NetworkChannelCreator 322 NonBlockingInvalidator::MakePushClientChannelCreator( 323 const notifier::NotifierOptions& notifier_options) { 324 return base::Bind(SyncNetworkChannel::CreatePushClientChannel, 325 notifier_options); 326 } 327 328 NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator( 329 scoped_refptr<net::URLRequestContextGetter> request_context_getter, 330 scoped_ptr<GCMNetworkChannelDelegate> delegate) { 331 return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel, 332 request_context_getter, 333 base::Passed(&delegate)); 334 } 335 336 void NonBlockingInvalidator::ClearAndSetNewClientId(const std::string& data) { 337 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 338 invalidation_state_tracker_->ClearAndSetNewClientId(data); 339 } 340 341 std::string NonBlockingInvalidator::GetInvalidatorClientId() const { 342 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 343 return invalidation_state_tracker_->GetInvalidatorClientId(); 344 } 345 346 void NonBlockingInvalidator::SetBootstrapData(const std::string& data) { 347 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 348 invalidation_state_tracker_->SetBootstrapData(data); 349 } 350 351 std::string NonBlockingInvalidator::GetBootstrapData() const { 352 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 353 return invalidation_state_tracker_->GetBootstrapData(); 354 } 355 356 void NonBlockingInvalidator::SetSavedInvalidations( 357 const UnackedInvalidationsMap& states) { 358 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 359 invalidation_state_tracker_->SetSavedInvalidations(states); 360 } 361 362 UnackedInvalidationsMap NonBlockingInvalidator::GetSavedInvalidations() const { 363 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 364 return invalidation_state_tracker_->GetSavedInvalidations(); 365 } 366 367 void NonBlockingInvalidator::Clear() { 368 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 369 invalidation_state_tracker_->Clear(); 370 } 371 372 void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) { 373 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 374 registrar_.UpdateInvalidatorState(state); 375 } 376 377 void NonBlockingInvalidator::OnIncomingInvalidation( 378 const ObjectIdInvalidationMap& invalidation_map) { 379 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 380 registrar_.DispatchInvalidationsToHandlers(invalidation_map); 381 } 382 383 std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; } 384 385 } // namespace syncer 386