1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "components/invalidation/non_blocking_invalidator.h" 6 7 #include <cstddef> 8 9 #include "base/location.h" 10 #include "base/logging.h" 11 #include "base/memory/scoped_ptr.h" 12 #include "base/single_thread_task_runner.h" 13 #include "base/thread_task_runner_handle.h" 14 #include "base/threading/thread.h" 15 #include "components/invalidation/gcm_network_channel_delegate.h" 16 #include "components/invalidation/invalidation_notifier.h" 17 #include "components/invalidation/sync_system_resources.h" 18 #include "jingle/notifier/listener/push_client.h" 19 #include "sync/internal_api/public/util/weak_handle.h" 20 #include "sync/notifier/invalidation_handler.h" 21 #include "sync/notifier/object_id_invalidation_map.h" 22 23 namespace syncer { 24 25 struct NonBlockingInvalidator::InitializeOptions { 26 InitializeOptions( 27 NetworkChannelCreator network_channel_creator, 28 const std::string& invalidator_client_id, 29 const UnackedInvalidationsMap& saved_invalidations, 30 const std::string& invalidation_bootstrap_data, 31 const WeakHandle<InvalidationStateTracker>& 32 invalidation_state_tracker, 33 const std::string& client_info, 34 scoped_refptr<net::URLRequestContextGetter> request_context_getter) 35 : network_channel_creator(network_channel_creator), 36 invalidator_client_id(invalidator_client_id), 37 saved_invalidations(saved_invalidations), 38 invalidation_bootstrap_data(invalidation_bootstrap_data), 39 invalidation_state_tracker(invalidation_state_tracker), 40 client_info(client_info), 41 request_context_getter(request_context_getter) { 42 } 43 44 NetworkChannelCreator network_channel_creator; 45 std::string invalidator_client_id; 46 UnackedInvalidationsMap saved_invalidations; 47 std::string invalidation_bootstrap_data; 48 WeakHandle<InvalidationStateTracker> invalidation_state_tracker; 49 std::string client_info; 50 scoped_refptr<net::URLRequestContextGetter> request_context_getter; 51 }; 52 53 namespace { 54 // This class provides a wrapper for a logging class in order to receive 55 // callbacks across threads, without having to worry about owner threads. 56 class CallbackProxy { 57 public: 58 explicit CallbackProxy( 59 base::Callback<void(const base::DictionaryValue&)> callback); 60 ~CallbackProxy(); 61 62 void Run(const base::DictionaryValue& value); 63 64 private: 65 static void DoRun(base::Callback<void(const base::DictionaryValue&)> callback, 66 scoped_ptr<base::DictionaryValue> value); 67 68 base::Callback<void(const base::DictionaryValue&)> callback_; 69 scoped_refptr<base::SingleThreadTaskRunner> running_thread_; 70 71 DISALLOW_COPY_AND_ASSIGN(CallbackProxy); 72 }; 73 74 CallbackProxy::CallbackProxy( 75 base::Callback<void(const base::DictionaryValue&)> callback) 76 : callback_(callback), 77 running_thread_(base::ThreadTaskRunnerHandle::Get()) {} 78 79 CallbackProxy::~CallbackProxy() {} 80 81 void CallbackProxy::DoRun( 82 base::Callback<void(const base::DictionaryValue&)> callback, 83 scoped_ptr<base::DictionaryValue> value) { 84 callback.Run(*value); 85 } 86 87 void CallbackProxy::Run(const base::DictionaryValue& value) { 88 scoped_ptr<base::DictionaryValue> copied(value.DeepCopy()); 89 running_thread_->PostTask( 90 FROM_HERE, 91 base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied))); 92 } 93 } 94 95 class NonBlockingInvalidator::Core 96 : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>, 97 // InvalidationHandler to observe the InvalidationNotifier we create. 98 public InvalidationHandler { 99 public: 100 // Called on parent thread. |delegate_observer| should be initialized. 101 explicit Core( 102 const WeakHandle<NonBlockingInvalidator>& delegate_observer); 103 104 // Helpers called on I/O thread. 105 void Initialize( 106 const NonBlockingInvalidator::InitializeOptions& initialize_options); 107 void Teardown(); 108 void UpdateRegisteredIds(const ObjectIdSet& ids); 109 void UpdateCredentials(const std::string& email, const std::string& token); 110 void RequestDetailedStatus( 111 base::Callback<void(const base::DictionaryValue&)> callback) const; 112 113 // InvalidationHandler implementation (all called on I/O thread by 114 // InvalidationNotifier). 115 virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE; 116 virtual void OnIncomingInvalidation( 117 const ObjectIdInvalidationMap& invalidation_map) OVERRIDE; 118 virtual std::string GetOwnerName() const OVERRIDE; 119 120 private: 121 friend class 122 base::RefCountedThreadSafe<NonBlockingInvalidator::Core>; 123 // Called on parent or I/O thread. 124 virtual ~Core(); 125 126 // The variables below should be used only on the I/O thread. 127 const WeakHandle<NonBlockingInvalidator> delegate_observer_; 128 scoped_ptr<InvalidationNotifier> invalidation_notifier_; 129 scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_; 130 131 DISALLOW_COPY_AND_ASSIGN(Core); 132 }; 133 134 NonBlockingInvalidator::Core::Core( 135 const WeakHandle<NonBlockingInvalidator>& delegate_observer) 136 : delegate_observer_(delegate_observer) { 137 DCHECK(delegate_observer_.IsInitialized()); 138 } 139 140 NonBlockingInvalidator::Core::~Core() { 141 } 142 143 void NonBlockingInvalidator::Core::Initialize( 144 const NonBlockingInvalidator::InitializeOptions& initialize_options) { 145 DCHECK(initialize_options.request_context_getter.get()); 146 network_task_runner_ = 147 initialize_options.request_context_getter->GetNetworkTaskRunner(); 148 DCHECK(network_task_runner_->BelongsToCurrentThread()); 149 scoped_ptr<SyncNetworkChannel> network_channel = 150 initialize_options.network_channel_creator.Run(); 151 invalidation_notifier_.reset( 152 new InvalidationNotifier( 153 network_channel.Pass(), 154 initialize_options.invalidator_client_id, 155 initialize_options.saved_invalidations, 156 initialize_options.invalidation_bootstrap_data, 157 initialize_options.invalidation_state_tracker, 158 initialize_options.client_info)); 159 invalidation_notifier_->RegisterHandler(this); 160 } 161 162 void NonBlockingInvalidator::Core::Teardown() { 163 DCHECK(network_task_runner_->BelongsToCurrentThread()); 164 invalidation_notifier_->UnregisterHandler(this); 165 invalidation_notifier_.reset(); 166 network_task_runner_ = NULL; 167 } 168 169 void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) { 170 DCHECK(network_task_runner_->BelongsToCurrentThread()); 171 invalidation_notifier_->UpdateRegisteredIds(this, ids); 172 } 173 174 void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email, 175 const std::string& token) { 176 DCHECK(network_task_runner_->BelongsToCurrentThread()); 177 invalidation_notifier_->UpdateCredentials(email, token); 178 } 179 180 void NonBlockingInvalidator::Core::RequestDetailedStatus( 181 base::Callback<void(const base::DictionaryValue&)> callback) const { 182 DCHECK(network_task_runner_->BelongsToCurrentThread()); 183 invalidation_notifier_->RequestDetailedStatus(callback); 184 } 185 186 void NonBlockingInvalidator::Core::OnInvalidatorStateChange( 187 InvalidatorState reason) { 188 DCHECK(network_task_runner_->BelongsToCurrentThread()); 189 delegate_observer_.Call(FROM_HERE, 190 &NonBlockingInvalidator::OnInvalidatorStateChange, 191 reason); 192 } 193 194 void NonBlockingInvalidator::Core::OnIncomingInvalidation( 195 const ObjectIdInvalidationMap& invalidation_map) { 196 DCHECK(network_task_runner_->BelongsToCurrentThread()); 197 delegate_observer_.Call(FROM_HERE, 198 &NonBlockingInvalidator::OnIncomingInvalidation, 199 invalidation_map); 200 } 201 202 std::string NonBlockingInvalidator::Core::GetOwnerName() const { 203 return "Sync"; 204 } 205 206 NonBlockingInvalidator::NonBlockingInvalidator( 207 NetworkChannelCreator network_channel_creator, 208 const std::string& invalidator_client_id, 209 const UnackedInvalidationsMap& saved_invalidations, 210 const std::string& invalidation_bootstrap_data, 211 InvalidationStateTracker* invalidation_state_tracker, 212 const std::string& client_info, 213 const scoped_refptr<net::URLRequestContextGetter>& request_context_getter) 214 : invalidation_state_tracker_(invalidation_state_tracker), 215 parent_task_runner_(base::ThreadTaskRunnerHandle::Get()), 216 network_task_runner_(request_context_getter->GetNetworkTaskRunner()), 217 weak_ptr_factory_(this) { 218 core_ = new Core(MakeWeakHandle(weak_ptr_factory_.GetWeakPtr())); 219 220 InitializeOptions initialize_options( 221 network_channel_creator, 222 invalidator_client_id, 223 saved_invalidations, 224 invalidation_bootstrap_data, 225 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()), 226 client_info, 227 request_context_getter); 228 229 if (!network_task_runner_->PostTask( 230 FROM_HERE, 231 base::Bind( 232 &NonBlockingInvalidator::Core::Initialize, 233 core_.get(), 234 initialize_options))) { 235 NOTREACHED(); 236 } 237 } 238 239 NonBlockingInvalidator::~NonBlockingInvalidator() { 240 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 241 if (!network_task_runner_->PostTask( 242 FROM_HERE, 243 base::Bind(&NonBlockingInvalidator::Core::Teardown, 244 core_.get()))) { 245 DVLOG(1) << "Network thread stopped before invalidator is destroyed."; 246 } 247 } 248 249 void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) { 250 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 251 registrar_.RegisterHandler(handler); 252 } 253 254 void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler, 255 const ObjectIdSet& ids) { 256 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 257 registrar_.UpdateRegisteredIds(handler, ids); 258 if (!network_task_runner_->PostTask( 259 FROM_HERE, 260 base::Bind( 261 &NonBlockingInvalidator::Core::UpdateRegisteredIds, 262 core_.get(), 263 registrar_.GetAllRegisteredIds()))) { 264 NOTREACHED(); 265 } 266 } 267 268 void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) { 269 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 270 registrar_.UnregisterHandler(handler); 271 } 272 273 InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const { 274 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 275 return registrar_.GetInvalidatorState(); 276 } 277 278 void NonBlockingInvalidator::UpdateCredentials(const std::string& email, 279 const std::string& token) { 280 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 281 if (!network_task_runner_->PostTask( 282 FROM_HERE, 283 base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials, 284 core_.get(), email, token))) { 285 NOTREACHED(); 286 } 287 } 288 289 void NonBlockingInvalidator::RequestDetailedStatus( 290 base::Callback<void(const base::DictionaryValue&)> callback) const { 291 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 292 base::Callback<void(const base::DictionaryValue&)> proxy_callback = 293 base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback))); 294 if (!network_task_runner_->PostTask( 295 FROM_HERE, 296 base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus, 297 core_.get(), 298 proxy_callback))) { 299 NOTREACHED(); 300 } 301 } 302 303 NetworkChannelCreator 304 NonBlockingInvalidator::MakePushClientChannelCreator( 305 const notifier::NotifierOptions& notifier_options) { 306 return base::Bind(SyncNetworkChannel::CreatePushClientChannel, 307 notifier_options); 308 } 309 310 NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator( 311 scoped_refptr<net::URLRequestContextGetter> request_context_getter, 312 scoped_ptr<GCMNetworkChannelDelegate> delegate) { 313 return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel, 314 request_context_getter, 315 base::Passed(&delegate)); 316 } 317 318 void NonBlockingInvalidator::ClearAndSetNewClientId(const std::string& data) { 319 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 320 invalidation_state_tracker_->ClearAndSetNewClientId(data); 321 } 322 323 std::string NonBlockingInvalidator::GetInvalidatorClientId() const { 324 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 325 return invalidation_state_tracker_->GetInvalidatorClientId(); 326 } 327 328 void NonBlockingInvalidator::SetBootstrapData(const std::string& data) { 329 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 330 invalidation_state_tracker_->SetBootstrapData(data); 331 } 332 333 std::string NonBlockingInvalidator::GetBootstrapData() const { 334 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 335 return invalidation_state_tracker_->GetBootstrapData(); 336 } 337 338 void NonBlockingInvalidator::SetSavedInvalidations( 339 const UnackedInvalidationsMap& states) { 340 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 341 invalidation_state_tracker_->SetSavedInvalidations(states); 342 } 343 344 UnackedInvalidationsMap NonBlockingInvalidator::GetSavedInvalidations() const { 345 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 346 return invalidation_state_tracker_->GetSavedInvalidations(); 347 } 348 349 void NonBlockingInvalidator::Clear() { 350 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 351 invalidation_state_tracker_->Clear(); 352 } 353 354 void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) { 355 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 356 registrar_.UpdateInvalidatorState(state); 357 } 358 359 void NonBlockingInvalidator::OnIncomingInvalidation( 360 const ObjectIdInvalidationMap& invalidation_map) { 361 DCHECK(parent_task_runner_->BelongsToCurrentThread()); 362 registrar_.DispatchInvalidationsToHandlers(invalidation_map); 363 } 364 365 std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; } 366 367 } // namespace syncer 368