Home | History | Annotate | Download | only in invalidation
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "components/invalidation/non_blocking_invalidator.h"
      6 
      7 #include <cstddef>
      8 
      9 #include "base/location.h"
     10 #include "base/logging.h"
     11 #include "base/memory/scoped_ptr.h"
     12 #include "base/single_thread_task_runner.h"
     13 #include "base/thread_task_runner_handle.h"
     14 #include "base/threading/thread.h"
     15 #include "components/invalidation/gcm_network_channel_delegate.h"
     16 #include "components/invalidation/invalidation_handler.h"
     17 #include "components/invalidation/invalidation_notifier.h"
     18 #include "components/invalidation/object_id_invalidation_map.h"
     19 #include "components/invalidation/sync_system_resources.h"
     20 #include "jingle/notifier/listener/push_client.h"
     21 
     22 namespace syncer {
     23 
     24 struct NonBlockingInvalidator::InitializeOptions {
     25   InitializeOptions(
     26       NetworkChannelCreator network_channel_creator,
     27       const std::string& invalidator_client_id,
     28       const UnackedInvalidationsMap& saved_invalidations,
     29       const std::string& invalidation_bootstrap_data,
     30       const base::WeakPtr<InvalidationStateTracker>& invalidation_state_tracker,
     31       const scoped_refptr<base::SingleThreadTaskRunner>&
     32           invalidation_state_tracker_task_runner,
     33       const std::string& client_info,
     34       scoped_refptr<net::URLRequestContextGetter> request_context_getter)
     35       : network_channel_creator(network_channel_creator),
     36         invalidator_client_id(invalidator_client_id),
     37         saved_invalidations(saved_invalidations),
     38         invalidation_bootstrap_data(invalidation_bootstrap_data),
     39         invalidation_state_tracker(invalidation_state_tracker),
     40         invalidation_state_tracker_task_runner(
     41             invalidation_state_tracker_task_runner),
     42         client_info(client_info),
     43         request_context_getter(request_context_getter) {}
     44 
     45   NetworkChannelCreator network_channel_creator;
     46   std::string invalidator_client_id;
     47   UnackedInvalidationsMap saved_invalidations;
     48   std::string invalidation_bootstrap_data;
     49   base::WeakPtr<InvalidationStateTracker> invalidation_state_tracker;
     50   scoped_refptr<base::SingleThreadTaskRunner>
     51       invalidation_state_tracker_task_runner;
     52   std::string client_info;
     53   scoped_refptr<net::URLRequestContextGetter> request_context_getter;
     54 };
     55 
     56 namespace {
     57 // This class provides a wrapper for a logging class in order to receive
     58 // callbacks across threads, without having to worry about owner threads.
     59 class CallbackProxy {
     60  public:
     61   explicit CallbackProxy(
     62       base::Callback<void(const base::DictionaryValue&)> callback);
     63   ~CallbackProxy();
     64 
     65   void Run(const base::DictionaryValue& value);
     66 
     67  private:
     68   static void DoRun(base::Callback<void(const base::DictionaryValue&)> callback,
     69                     scoped_ptr<base::DictionaryValue> value);
     70 
     71   base::Callback<void(const base::DictionaryValue&)> callback_;
     72   scoped_refptr<base::SingleThreadTaskRunner> running_thread_;
     73 
     74   DISALLOW_COPY_AND_ASSIGN(CallbackProxy);
     75 };
     76 
     77 CallbackProxy::CallbackProxy(
     78     base::Callback<void(const base::DictionaryValue&)> callback)
     79     : callback_(callback),
     80       running_thread_(base::ThreadTaskRunnerHandle::Get()) {}
     81 
     82 CallbackProxy::~CallbackProxy() {}
     83 
     84 void CallbackProxy::DoRun(
     85     base::Callback<void(const base::DictionaryValue&)> callback,
     86     scoped_ptr<base::DictionaryValue> value) {
     87   callback.Run(*value);
     88 }
     89 
     90 void CallbackProxy::Run(const base::DictionaryValue& value) {
     91   scoped_ptr<base::DictionaryValue> copied(value.DeepCopy());
     92   running_thread_->PostTask(
     93       FROM_HERE,
     94       base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied)));
     95 }
     96 }
     97 
     98 class NonBlockingInvalidator::Core
     99     : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>,
    100       // InvalidationHandler to observe the InvalidationNotifier we create.
    101       public InvalidationHandler {
    102  public:
    103   // Called on parent thread.  |delegate_observer| should be initialized.
    104   Core(const base::WeakPtr<NonBlockingInvalidator>& delegate_observer,
    105        const scoped_refptr<base::SingleThreadTaskRunner>&
    106            delegate_observer_task_runner);
    107 
    108   // Helpers called on I/O thread.
    109   void Initialize(
    110       const NonBlockingInvalidator::InitializeOptions& initialize_options);
    111   void Teardown();
    112   void UpdateRegisteredIds(const ObjectIdSet& ids);
    113   void UpdateCredentials(const std::string& email, const std::string& token);
    114   void RequestDetailedStatus(
    115       base::Callback<void(const base::DictionaryValue&)> callback) const;
    116 
    117   // InvalidationHandler implementation (all called on I/O thread by
    118   // InvalidationNotifier).
    119   virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE;
    120   virtual void OnIncomingInvalidation(
    121       const ObjectIdInvalidationMap& invalidation_map) OVERRIDE;
    122   virtual std::string GetOwnerName() const OVERRIDE;
    123 
    124  private:
    125   friend class
    126       base::RefCountedThreadSafe<NonBlockingInvalidator::Core>;
    127   // Called on parent or I/O thread.
    128   virtual ~Core();
    129 
    130   // The variables below should be used only on the I/O thread.
    131   const base::WeakPtr<NonBlockingInvalidator> delegate_observer_;
    132   scoped_refptr<base::SingleThreadTaskRunner> delegate_observer_task_runner_;
    133   scoped_ptr<InvalidationNotifier> invalidation_notifier_;
    134   scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_;
    135 
    136   DISALLOW_COPY_AND_ASSIGN(Core);
    137 };
    138 
    139 NonBlockingInvalidator::Core::Core(
    140     const base::WeakPtr<NonBlockingInvalidator>& delegate_observer,
    141     const scoped_refptr<base::SingleThreadTaskRunner>&
    142         delegate_observer_task_runner)
    143     : delegate_observer_(delegate_observer),
    144       delegate_observer_task_runner_(delegate_observer_task_runner) {
    145   DCHECK(delegate_observer_);
    146   DCHECK(delegate_observer_task_runner_.get());
    147 }
    148 
    149 NonBlockingInvalidator::Core::~Core() {
    150 }
    151 
    152 void NonBlockingInvalidator::Core::Initialize(
    153     const NonBlockingInvalidator::InitializeOptions& initialize_options) {
    154   DCHECK(initialize_options.request_context_getter.get());
    155   network_task_runner_ =
    156       initialize_options.request_context_getter->GetNetworkTaskRunner();
    157   DCHECK(network_task_runner_->BelongsToCurrentThread());
    158   scoped_ptr<SyncNetworkChannel> network_channel =
    159       initialize_options.network_channel_creator.Run();
    160   invalidation_notifier_.reset(new InvalidationNotifier(
    161       network_channel.Pass(),
    162       initialize_options.invalidator_client_id,
    163       initialize_options.saved_invalidations,
    164       initialize_options.invalidation_bootstrap_data,
    165       initialize_options.invalidation_state_tracker,
    166       initialize_options.invalidation_state_tracker_task_runner,
    167       initialize_options.client_info));
    168   invalidation_notifier_->RegisterHandler(this);
    169 }
    170 
    171 void NonBlockingInvalidator::Core::Teardown() {
    172   DCHECK(network_task_runner_->BelongsToCurrentThread());
    173   invalidation_notifier_->UnregisterHandler(this);
    174   invalidation_notifier_.reset();
    175   network_task_runner_ = NULL;
    176 }
    177 
    178 void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) {
    179   DCHECK(network_task_runner_->BelongsToCurrentThread());
    180   invalidation_notifier_->UpdateRegisteredIds(this, ids);
    181 }
    182 
    183 void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email,
    184                                                      const std::string& token) {
    185   DCHECK(network_task_runner_->BelongsToCurrentThread());
    186   invalidation_notifier_->UpdateCredentials(email, token);
    187 }
    188 
    189 void NonBlockingInvalidator::Core::RequestDetailedStatus(
    190     base::Callback<void(const base::DictionaryValue&)> callback) const {
    191   DCHECK(network_task_runner_->BelongsToCurrentThread());
    192   invalidation_notifier_->RequestDetailedStatus(callback);
    193 }
    194 
    195 void NonBlockingInvalidator::Core::OnInvalidatorStateChange(
    196     InvalidatorState reason) {
    197   DCHECK(network_task_runner_->BelongsToCurrentThread());
    198   delegate_observer_task_runner_->PostTask(
    199       FROM_HERE,
    200       base::Bind(&NonBlockingInvalidator::OnInvalidatorStateChange,
    201                  delegate_observer_,
    202                  reason));
    203 }
    204 
    205 void NonBlockingInvalidator::Core::OnIncomingInvalidation(
    206     const ObjectIdInvalidationMap& invalidation_map) {
    207   DCHECK(network_task_runner_->BelongsToCurrentThread());
    208   delegate_observer_task_runner_->PostTask(
    209       FROM_HERE,
    210       base::Bind(&NonBlockingInvalidator::OnIncomingInvalidation,
    211                  delegate_observer_,
    212                  invalidation_map));
    213 }
    214 
    215 std::string NonBlockingInvalidator::Core::GetOwnerName() const {
    216   return "Sync";
    217 }
    218 
    219 NonBlockingInvalidator::NonBlockingInvalidator(
    220     NetworkChannelCreator network_channel_creator,
    221     const std::string& invalidator_client_id,
    222     const UnackedInvalidationsMap& saved_invalidations,
    223     const std::string& invalidation_bootstrap_data,
    224     InvalidationStateTracker* invalidation_state_tracker,
    225     const std::string& client_info,
    226     const scoped_refptr<net::URLRequestContextGetter>& request_context_getter)
    227     : invalidation_state_tracker_(invalidation_state_tracker),
    228       parent_task_runner_(base::ThreadTaskRunnerHandle::Get()),
    229       network_task_runner_(request_context_getter->GetNetworkTaskRunner()),
    230       weak_ptr_factory_(this) {
    231   base::WeakPtr<NonBlockingInvalidator> weak_ptr_this =
    232       weak_ptr_factory_.GetWeakPtr();
    233   weak_ptr_this.get();  // Bind to this thread.
    234 
    235   core_ = new Core(weak_ptr_this,
    236                    base::MessageLoopProxy::current());
    237 
    238   InitializeOptions initialize_options(network_channel_creator,
    239                                        invalidator_client_id,
    240                                        saved_invalidations,
    241                                        invalidation_bootstrap_data,
    242                                        weak_ptr_this,
    243                                        base::MessageLoopProxy::current(),
    244                                        client_info,
    245                                        request_context_getter);
    246 
    247   if (!network_task_runner_->PostTask(
    248           FROM_HERE,
    249           base::Bind(
    250               &NonBlockingInvalidator::Core::Initialize,
    251               core_.get(),
    252               initialize_options))) {
    253     NOTREACHED();
    254   }
    255 }
    256 
    257 NonBlockingInvalidator::~NonBlockingInvalidator() {
    258   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    259   if (!network_task_runner_->PostTask(
    260           FROM_HERE,
    261           base::Bind(&NonBlockingInvalidator::Core::Teardown,
    262                      core_.get()))) {
    263     DVLOG(1) << "Network thread stopped before invalidator is destroyed.";
    264   }
    265 }
    266 
    267 void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) {
    268   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    269   registrar_.RegisterHandler(handler);
    270 }
    271 
    272 void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler,
    273                                                  const ObjectIdSet& ids) {
    274   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    275   registrar_.UpdateRegisteredIds(handler, ids);
    276   if (!network_task_runner_->PostTask(
    277           FROM_HERE,
    278           base::Bind(
    279               &NonBlockingInvalidator::Core::UpdateRegisteredIds,
    280               core_.get(),
    281               registrar_.GetAllRegisteredIds()))) {
    282     NOTREACHED();
    283   }
    284 }
    285 
    286 void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) {
    287   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    288   registrar_.UnregisterHandler(handler);
    289 }
    290 
    291 InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const {
    292   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    293   return registrar_.GetInvalidatorState();
    294 }
    295 
    296 void NonBlockingInvalidator::UpdateCredentials(const std::string& email,
    297                                                const std::string& token) {
    298   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    299   if (!network_task_runner_->PostTask(
    300           FROM_HERE,
    301           base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials,
    302                      core_.get(), email, token))) {
    303     NOTREACHED();
    304   }
    305 }
    306 
    307 void NonBlockingInvalidator::RequestDetailedStatus(
    308     base::Callback<void(const base::DictionaryValue&)> callback) const {
    309   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    310   base::Callback<void(const base::DictionaryValue&)> proxy_callback =
    311       base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback)));
    312   if (!network_task_runner_->PostTask(
    313           FROM_HERE,
    314           base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus,
    315                      core_.get(),
    316                      proxy_callback))) {
    317     NOTREACHED();
    318   }
    319 }
    320 
    321 NetworkChannelCreator
    322     NonBlockingInvalidator::MakePushClientChannelCreator(
    323         const notifier::NotifierOptions& notifier_options) {
    324   return base::Bind(SyncNetworkChannel::CreatePushClientChannel,
    325       notifier_options);
    326 }
    327 
    328 NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator(
    329     scoped_refptr<net::URLRequestContextGetter> request_context_getter,
    330     scoped_ptr<GCMNetworkChannelDelegate> delegate) {
    331   return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel,
    332                     request_context_getter,
    333                     base::Passed(&delegate));
    334 }
    335 
    336 void NonBlockingInvalidator::ClearAndSetNewClientId(const std::string& data) {
    337   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    338   invalidation_state_tracker_->ClearAndSetNewClientId(data);
    339 }
    340 
    341 std::string NonBlockingInvalidator::GetInvalidatorClientId() const {
    342   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    343   return invalidation_state_tracker_->GetInvalidatorClientId();
    344 }
    345 
    346 void NonBlockingInvalidator::SetBootstrapData(const std::string& data) {
    347   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    348   invalidation_state_tracker_->SetBootstrapData(data);
    349 }
    350 
    351 std::string NonBlockingInvalidator::GetBootstrapData() const {
    352   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    353   return invalidation_state_tracker_->GetBootstrapData();
    354 }
    355 
    356 void NonBlockingInvalidator::SetSavedInvalidations(
    357       const UnackedInvalidationsMap& states) {
    358   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    359   invalidation_state_tracker_->SetSavedInvalidations(states);
    360 }
    361 
    362 UnackedInvalidationsMap NonBlockingInvalidator::GetSavedInvalidations() const {
    363   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    364   return invalidation_state_tracker_->GetSavedInvalidations();
    365 }
    366 
    367 void NonBlockingInvalidator::Clear() {
    368   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    369   invalidation_state_tracker_->Clear();
    370 }
    371 
    372 void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) {
    373   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    374   registrar_.UpdateInvalidatorState(state);
    375 }
    376 
    377 void NonBlockingInvalidator::OnIncomingInvalidation(
    378         const ObjectIdInvalidationMap& invalidation_map) {
    379   DCHECK(parent_task_runner_->BelongsToCurrentThread());
    380   registrar_.DispatchInvalidationsToHandlers(invalidation_map);
    381 }
    382 
    383 std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; }
    384 
    385 }  // namespace syncer
    386