Home | History | Annotate | Download | only in invalidation
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "components/invalidation/non_blocking_invalidator.h"
      6 
      7 #include <cstddef>
      8 
      9 #include "base/location.h"
     10 #include "base/logging.h"
     11 #include "base/memory/scoped_ptr.h"
     12 #include "base/single_thread_task_runner.h"
     13 #include "base/thread_task_runner_handle.h"
     14 #include "base/threading/thread.h"
     15 #include "components/invalidation/gcm_network_channel_delegate.h"
     16 #include "components/invalidation/invalidation_notifier.h"
     17 #include "components/invalidation/sync_system_resources.h"
     18 #include "jingle/notifier/listener/push_client.h"
     19 #include "sync/internal_api/public/util/weak_handle.h"
     20 #include "sync/notifier/invalidation_handler.h"
     21 #include "sync/notifier/object_id_invalidation_map.h"
     22 
     23 namespace syncer {
     24 
     25 struct NonBlockingInvalidator::InitializeOptions {
     26   InitializeOptions(
     27       NetworkChannelCreator network_channel_creator,
     28       const std::string& invalidator_client_id,
     29       const UnackedInvalidationsMap& saved_invalidations,
     30       const std::string& invalidation_bootstrap_data,
     31       const WeakHandle<InvalidationStateTracker>&
     32           invalidation_state_tracker,
     33       const std::string& client_info,
     34       scoped_refptr<net::URLRequestContextGetter> request_context_getter)
     35       : network_channel_creator(network_channel_creator),
     36         invalidator_client_id(invalidator_client_id),
     37         saved_invalidations(saved_invalidations),
     38         invalidation_bootstrap_data(invalidation_bootstrap_data),
     39         invalidation_state_tracker(invalidation_state_tracker),
     40         client_info(client_info),
     41         request_context_getter(request_context_getter) {
     42   }
     43 
     44   NetworkChannelCreator network_channel_creator;
     45   std::string invalidator_client_id;
     46   UnackedInvalidationsMap saved_invalidations;
     47   std::string invalidation_bootstrap_data;
     48   WeakHandle<InvalidationStateTracker> invalidation_state_tracker;
     49   std::string client_info;
     50   scoped_refptr<net::URLRequestContextGetter> request_context_getter;
     51 };
     52 
     53 namespace {
     54 // This class provides a wrapper for a logging class in order to receive
     55 // callbacks across threads, without having to worry about owner threads.
     56 class CallbackProxy {
     57  public:
     58   explicit CallbackProxy(
     59       base::Callback<void(const base::DictionaryValue&)> callback);
     60   ~CallbackProxy();
     61 
     62   void Run(const base::DictionaryValue& value);
     63 
     64  private:
     65   static void DoRun(base::Callback<void(const base::DictionaryValue&)> callback,
     66                     scoped_ptr<base::DictionaryValue> value);
     67 
     68   base::Callback<void(const base::DictionaryValue&)> callback_;
     69   scoped_refptr<base::SingleThreadTaskRunner> running_thread_;
     70 
     71   DISALLOW_COPY_AND_ASSIGN(CallbackProxy);
     72 };
     73 
     74 CallbackProxy::CallbackProxy(
     75     base::Callback<void(const base::DictionaryValue&)> callback)
     76     : callback_(callback),
     77       running_thread_(base::ThreadTaskRunnerHandle::Get()) {}
     78 
     79 CallbackProxy::~CallbackProxy() {}
     80 
     81 void CallbackProxy::DoRun(
     82     base::Callback<void(const base::DictionaryValue&)> callback,
     83     scoped_ptr<base::DictionaryValue> value) {
     84   callback.Run(*value);
     85 }
     86 
     87 void CallbackProxy::Run(const base::DictionaryValue& value) {
     88   scoped_ptr<base::DictionaryValue> copied(value.DeepCopy());
     89   running_thread_->PostTask(
     90       FROM_HERE,
     91       base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied)));
     92 }
     93 }
     94 
     95 class NonBlockingInvalidator::Core
     96     : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>,
     97       // InvalidationHandler to observe the InvalidationNotifier we create.
     98       public InvalidationHandler {
     99  public:
    100   // Called on parent thread.  |delegate_observer| should be initialized.
    101   explicit Core(
    102       const WeakHandle<NonBlockingInvalidator>& delegate_observer);
    103 
    104   // Helpers called on I/O thread.
    105   void Initialize(
    106       const NonBlockingInvalidator::InitializeOptions& initialize_options);
    107   void Teardown();
    108   void UpdateRegisteredIds(const ObjectIdSet& ids);
    109   void UpdateCredentials(const std::string& email, const std::string& token);
    110   void RequestDetailedStatus(
    111       base::Callback<void(const base::DictionaryValue&)> callback) const;
    112 
    113   // InvalidationHandler implementation (all called on I/O thread by
    114   // InvalidationNotifier).
    115   virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE;
    116   virtual void OnIncomingInvalidation(
    117       const ObjectIdInvalidationMap& invalidation_map) OVERRIDE;
    118   virtual std::string GetOwnerName() const OVERRIDE;
    119 
    120  private:
    121   friend class
    122       base::RefCountedThreadSafe<NonBlockingInvalidator::Core>;
    123   // Called on parent or I/O thread.
    124   virtual ~Core();
    125 
    126   // The variables below should be used only on the I/O thread.
    127   const WeakHandle<NonBlockingInvalidator> delegate_observer_;
    128   scoped_ptr<InvalidationNotifier> invalidation_notifier_;
    129   scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_;
    130 
    131   DISALLOW_COPY_AND_ASSIGN(Core);
    132 };
    133 
    134 NonBlockingInvalidator::Core::Core(
    135     const WeakHandle<NonBlockingInvalidator>& delegate_observer)
    136     : delegate_observer_(delegate_observer) {
    137   DCHECK(delegate_observer_.IsInitialized());
    138 }
    139 
    140 NonBlockingInvalidator::Core::~Core() {
    141 }
    142 
    143 void NonBlockingInvalidator::Core::Initialize(
    144     const NonBlockingInvalidator::InitializeOptions& initialize_options) {
    145   DCHECK(initialize_options.request_context_getter.get());
    146   network_task_runner_ =
    147       initialize_options.request_context_getter->GetNetworkTaskRunner();
    148   DCHECK(network_task_runner_->BelongsToCurrentThread());
    149   scoped_ptr<SyncNetworkChannel> network_channel =
    150       initialize_options.network_channel_creator.Run();
    151   invalidation_notifier_.reset(
    152       new InvalidationNotifier(
    153           network_channel.Pass(),
    154           initialize_options.invalidator_client_id,
    155           initialize_options.saved_invalidations,
    156           initialize_options.invalidation_bootstrap_data,
    157           initialize_options.invalidation_state_tracker,
    158           initialize_options.client_info));
    159   invalidation_notifier_->RegisterHandler(this);
    160 }
    161 
    162 void NonBlockingInvalidator::Core::Teardown() {
    163   DCHECK(network_task_runner_->BelongsToCurrentThread());
    164   invalidation_notifier_->UnregisterHandler(this);
    165   invalidation_notifier_.reset();
    166   network_task_runner_ = NULL;
    167 }
    168 
    169 void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) {
    170   DCHECK(network_task_runner_->BelongsToCurrentThread());
    171   invalidation_notifier_->UpdateRegisteredIds(this, ids);
    172 }
    173 
    174 void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email,
    175                                                      const std::string& token) {
    176   DCHECK(network_task_runner_->BelongsToCurrentThread());
    177   invalidation_notifier_->UpdateCredentials(email, token);
    178 }
    179 
    180 void NonBlockingInvalidator::Core::RequestDetailedStatus(
    181     base::Callback<void(const base::DictionaryValue&)> callback) const {
    182   DCHECK(network_task_runner_->BelongsToCurrentThread());
    183   invalidation_notifier_->RequestDetailedStatus(callback);
    184 }
    185 
    186 void NonBlockingInvalidator::Core::OnInvalidatorStateChange(
    187     InvalidatorState reason) {
    188   DCHECK(network_task_runner_->BelongsToCurrentThread());
    189   delegate_observer_.Call(FROM_HERE,
    190                           &NonBlockingInvalidator::OnInvalidatorStateChange,
    191                           reason);
    192 }
    193 
    194 void NonBlockingInvalidator::Core::OnIncomingInvalidation(
    195     const ObjectIdInvalidationMap& invalidation_map) {
    196   DCHECK(network_task_runner_->BelongsToCurrentThread());
    197   delegate_observer_.Call(FROM_HERE,
    198                           &NonBlockingInvalidator::OnIncomingInvalidation,
    199                           invalidation_map);
    200 }
    201 
    202 std::string NonBlockingInvalidator::Core::GetOwnerName() const {
    203   return "Sync";
    204 }
    205 
    206 NonBlockingInvalidator::NonBlockingInvalidator(
    207     NetworkChannelCreator network_channel_creator,
    208     const std::string& invalidator_client_id,
    209     const UnackedInvalidationsMap& saved_invalidations,
    210     const std::string& invalidation_bootstrap_data,
    211     InvalidationStateTracker* invalidation_state_tracker,
    212     const std::string& client_info,
    213     const scoped_refptr<net::URLRequestContextGetter>& request_context_getter)
    214     : invalidation_state_tracker_(invalidation_state_tracker),
    215       parent_task_runner_(base::ThreadTaskRunnerHandle::Get()),
    216       network_task_runner_(request_context_getter->GetNetworkTaskRunner()),
    217       weak_ptr_factory_(this) {
    218   core_ = new Core(MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()));
    219 
    220   InitializeOptions initialize_options(
    221       network_channel_creator,
    222       invalidator_client_id,
    223       saved_invalidations,
    224       invalidation_bootstrap_data,
    225       MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
    226       client_info,
    227       request_context_getter);
    228 
    229   if (!network_task_runner_->PostTask(
    230           FROM_HERE,
    231           base::Bind(
    232               &NonBlockingInvalidator::Core::Initialize,
    233               core_.get(),
    234               initialize_options))) {
    235     NOTREACHED();
    236   }
    237 }
    238 
    239 NonBlockingInvalidator::~NonBlockingInvalidator() {
    240   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    241   if (!network_task_runner_->PostTask(
    242           FROM_HERE,
    243           base::Bind(&NonBlockingInvalidator::Core::Teardown,
    244                      core_.get()))) {
    245     DVLOG(1) << "Network thread stopped before invalidator is destroyed.";
    246   }
    247 }
    248 
    249 void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) {
    250   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    251   registrar_.RegisterHandler(handler);
    252 }
    253 
    254 void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler,
    255                                                  const ObjectIdSet& ids) {
    256   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    257   registrar_.UpdateRegisteredIds(handler, ids);
    258   if (!network_task_runner_->PostTask(
    259           FROM_HERE,
    260           base::Bind(
    261               &NonBlockingInvalidator::Core::UpdateRegisteredIds,
    262               core_.get(),
    263               registrar_.GetAllRegisteredIds()))) {
    264     NOTREACHED();
    265   }
    266 }
    267 
    268 void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) {
    269   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    270   registrar_.UnregisterHandler(handler);
    271 }
    272 
    273 InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const {
    274   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    275   return registrar_.GetInvalidatorState();
    276 }
    277 
    278 void NonBlockingInvalidator::UpdateCredentials(const std::string& email,
    279                                                const std::string& token) {
    280   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    281   if (!network_task_runner_->PostTask(
    282           FROM_HERE,
    283           base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials,
    284                      core_.get(), email, token))) {
    285     NOTREACHED();
    286   }
    287 }
    288 
    289 void NonBlockingInvalidator::RequestDetailedStatus(
    290     base::Callback<void(const base::DictionaryValue&)> callback) const {
    291   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    292   base::Callback<void(const base::DictionaryValue&)> proxy_callback =
    293       base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback)));
    294   if (!network_task_runner_->PostTask(
    295           FROM_HERE,
    296           base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus,
    297                      core_.get(),
    298                      proxy_callback))) {
    299     NOTREACHED();
    300   }
    301 }
    302 
    303 NetworkChannelCreator
    304     NonBlockingInvalidator::MakePushClientChannelCreator(
    305         const notifier::NotifierOptions& notifier_options) {
    306   return base::Bind(SyncNetworkChannel::CreatePushClientChannel,
    307       notifier_options);
    308 }
    309 
    310 NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator(
    311     scoped_refptr<net::URLRequestContextGetter> request_context_getter,
    312     scoped_ptr<GCMNetworkChannelDelegate> delegate) {
    313   return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel,
    314                     request_context_getter,
    315                     base::Passed(&delegate));
    316 }
    317 
    318 void NonBlockingInvalidator::ClearAndSetNewClientId(const std::string& data) {
    319   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    320   invalidation_state_tracker_->ClearAndSetNewClientId(data);
    321 }
    322 
    323 std::string NonBlockingInvalidator::GetInvalidatorClientId() const {
    324   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    325   return invalidation_state_tracker_->GetInvalidatorClientId();
    326 }
    327 
    328 void NonBlockingInvalidator::SetBootstrapData(const std::string& data) {
    329   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    330   invalidation_state_tracker_->SetBootstrapData(data);
    331 }
    332 
    333 std::string NonBlockingInvalidator::GetBootstrapData() const {
    334   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    335   return invalidation_state_tracker_->GetBootstrapData();
    336 }
    337 
    338 void NonBlockingInvalidator::SetSavedInvalidations(
    339       const UnackedInvalidationsMap& states) {
    340   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    341   invalidation_state_tracker_->SetSavedInvalidations(states);
    342 }
    343 
    344 UnackedInvalidationsMap NonBlockingInvalidator::GetSavedInvalidations() const {
    345   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    346   return invalidation_state_tracker_->GetSavedInvalidations();
    347 }
    348 
    349 void NonBlockingInvalidator::Clear() {
    350   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    351   invalidation_state_tracker_->Clear();
    352 }
    353 
    354 void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) {
    355   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    356   registrar_.UpdateInvalidatorState(state);
    357 }
    358 
    359 void NonBlockingInvalidator::OnIncomingInvalidation(
    360         const ObjectIdInvalidationMap& invalidation_map) {
    361   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    362   registrar_.DispatchInvalidationsToHandlers(invalidation_map);
    363 }
    364 
    365 std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; }
    366 
    367 }  // namespace syncer
    368