Home | History | Annotate | Download | only in notifier
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "sync/notifier/sync_system_resources.h"
      6 
      7 #include <cstdlib>
      8 #include <cstring>
      9 #include <string>
     10 
     11 #include "base/bind.h"
     12 #include "base/logging.h"
     13 #include "base/message_loop/message_loop.h"
     14 #include "base/stl_util.h"
     15 #include "base/strings/string_util.h"
     16 #include "base/strings/stringprintf.h"
     17 #include "google/cacheinvalidation/client_gateway.pb.h"
     18 #include "google/cacheinvalidation/deps/callback.h"
     19 #include "google/cacheinvalidation/include/types.h"
     20 #include "sync/notifier/invalidation_util.h"
     21 
     22 namespace syncer {
     23 
     24 SyncLogger::SyncLogger() {}
     25 SyncLogger::~SyncLogger() {}
     26 
     27 void SyncLogger::Log(LogLevel level, const char* file, int line,
     28                      const char* format, ...) {
     29   logging::LogSeverity log_severity = -2;  // VLOG(2)
     30   bool emit_log = false;
     31   switch (level) {
     32     case FINE_LEVEL:
     33       log_severity = -2;  // VLOG(2)
     34       emit_log = VLOG_IS_ON(2);
     35       break;
     36     case INFO_LEVEL:
     37       log_severity = -1;  // VLOG(1)
     38       emit_log = VLOG_IS_ON(1);
     39       break;
     40     case WARNING_LEVEL:
     41       log_severity = logging::LOG_WARNING;
     42       emit_log = LOG_IS_ON(WARNING);
     43       break;
     44     case SEVERE_LEVEL:
     45       log_severity = logging::LOG_ERROR;
     46       emit_log = LOG_IS_ON(ERROR);
     47       break;
     48   }
     49   if (emit_log) {
     50     va_list ap;
     51     va_start(ap, format);
     52     std::string result;
     53     base::StringAppendV(&result, format, ap);
     54     logging::LogMessage(file, line, log_severity).stream() << result;
     55     va_end(ap);
     56   }
     57 }
     58 
     59 void SyncLogger::SetSystemResources(invalidation::SystemResources* resources) {
     60   // Do nothing.
     61 }
     62 
     63 SyncInvalidationScheduler::SyncInvalidationScheduler()
     64     : created_on_loop_(base::MessageLoop::current()),
     65       is_started_(false),
     66       is_stopped_(false),
     67       weak_factory_(this) {
     68   CHECK(created_on_loop_);
     69 }
     70 
     71 SyncInvalidationScheduler::~SyncInvalidationScheduler() {
     72   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
     73   CHECK(is_stopped_);
     74 }
     75 
     76 void SyncInvalidationScheduler::Start() {
     77   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
     78   CHECK(!is_started_);
     79   is_started_ = true;
     80   is_stopped_ = false;
     81   weak_factory_.InvalidateWeakPtrs();
     82 }
     83 
     84 void SyncInvalidationScheduler::Stop() {
     85   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
     86   is_stopped_ = true;
     87   is_started_ = false;
     88   weak_factory_.InvalidateWeakPtrs();
     89   STLDeleteElements(&posted_tasks_);
     90   posted_tasks_.clear();
     91 }
     92 
     93 void SyncInvalidationScheduler::Schedule(invalidation::TimeDelta delay,
     94                                          invalidation::Closure* task) {
     95   DCHECK(invalidation::IsCallbackRepeatable(task));
     96   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
     97 
     98   if (!is_started_) {
     99     delete task;
    100     return;
    101   }
    102 
    103   posted_tasks_.insert(task);
    104   base::MessageLoop::current()->PostDelayedTask(
    105       FROM_HERE, base::Bind(&SyncInvalidationScheduler::RunPostedTask,
    106                             weak_factory_.GetWeakPtr(), task),
    107       delay);
    108 }
    109 
    110 bool SyncInvalidationScheduler::IsRunningOnThread() const {
    111   return created_on_loop_ == base::MessageLoop::current();
    112 }
    113 
    114 invalidation::Time SyncInvalidationScheduler::GetCurrentTime() const {
    115   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
    116   return base::Time::Now();
    117 }
    118 
    119 void SyncInvalidationScheduler::SetSystemResources(
    120     invalidation::SystemResources* resources) {
    121   // Do nothing.
    122 }
    123 
    124 void SyncInvalidationScheduler::RunPostedTask(invalidation::Closure* task) {
    125   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
    126   task->Run();
    127   posted_tasks_.erase(task);
    128   delete task;
    129 }
    130 
    131 SyncNetworkChannel::SyncNetworkChannel()
    132     : invalidator_state_(DEFAULT_INVALIDATION_ERROR),
    133       scheduling_hash_(0) {
    134 }
    135 
    136 SyncNetworkChannel::~SyncNetworkChannel() {
    137   STLDeleteElements(&network_status_receivers_);
    138 }
    139 
    140 void SyncNetworkChannel::SendMessage(const std::string& outgoing_message) {
    141   std::string encoded_message;
    142   EncodeMessage(&encoded_message, outgoing_message, service_context_,
    143       scheduling_hash_);
    144   SendEncodedMessage(encoded_message);
    145 }
    146 
    147 void SyncNetworkChannel::SetMessageReceiver(
    148     invalidation::MessageCallback* incoming_receiver) {
    149   incoming_receiver_.reset(incoming_receiver);
    150 }
    151 
    152 void SyncNetworkChannel::AddNetworkStatusReceiver(
    153     invalidation::NetworkStatusCallback* network_status_receiver) {
    154   network_status_receiver->Run(invalidator_state_ == INVALIDATIONS_ENABLED);
    155   network_status_receivers_.push_back(network_status_receiver);
    156 }
    157 
    158 void SyncNetworkChannel::SetSystemResources(
    159     invalidation::SystemResources* resources) {
    160   // Do nothing.
    161 }
    162 
    163 void SyncNetworkChannel::AddObserver(Observer* observer) {
    164   observers_.AddObserver(observer);
    165 }
    166 
    167 void SyncNetworkChannel::RemoveObserver(Observer* observer) {
    168   observers_.RemoveObserver(observer);
    169 }
    170 
    171 const std::string& SyncNetworkChannel::GetServiceContextForTest() const {
    172   return service_context_;
    173 }
    174 
    175 int64 SyncNetworkChannel::GetSchedulingHashForTest() const {
    176   return scheduling_hash_;
    177 }
    178 
    179 std::string SyncNetworkChannel::EncodeMessageForTest(
    180     const std::string& message, const std::string& service_context,
    181     int64 scheduling_hash) {
    182   std::string encoded_message;
    183   EncodeMessage(&encoded_message, message, service_context, scheduling_hash);
    184   return encoded_message;
    185 }
    186 
    187 bool SyncNetworkChannel::DecodeMessageForTest(
    188     const std::string& data,
    189     std::string* message,
    190     std::string* service_context,
    191     int64* scheduling_hash) {
    192   return DecodeMessage(data, message, service_context, scheduling_hash);
    193 }
    194 
    195 void SyncNetworkChannel::NotifyStateChange(InvalidatorState invalidator_state) {
    196   // Remember state for future NetworkStatusReceivers.
    197   invalidator_state_ = invalidator_state;
    198   // Notify NetworkStatusReceivers in cacheinvalidation.
    199   for (NetworkStatusReceiverList::const_iterator it =
    200            network_status_receivers_.begin();
    201        it != network_status_receivers_.end(); ++it) {
    202     (*it)->Run(invalidator_state_ == INVALIDATIONS_ENABLED);
    203   }
    204   // Notify observers.
    205   FOR_EACH_OBSERVER(Observer, observers_,
    206                     OnNetworkChannelStateChanged(invalidator_state_));
    207 }
    208 
    209 void SyncNetworkChannel::DeliverIncomingMessage(const std::string& data) {
    210   if (!incoming_receiver_) {
    211     DLOG(ERROR) << "No receiver for incoming notification";
    212     return;
    213   }
    214   std::string message;
    215   if (!DecodeMessage(data,
    216                      &message, &service_context_, &scheduling_hash_)) {
    217     DLOG(ERROR) << "Could not parse ClientGatewayMessage";
    218     return;
    219   }
    220   incoming_receiver_->Run(message);
    221 }
    222 
    223 void SyncNetworkChannel::EncodeMessage(
    224     std::string* encoded_message,
    225     const std::string& message,
    226     const std::string& service_context,
    227     int64 scheduling_hash) {
    228   ipc::invalidation::ClientGatewayMessage envelope;
    229   envelope.set_is_client_to_server(true);
    230   if (!service_context.empty()) {
    231     envelope.set_service_context(service_context);
    232     envelope.set_rpc_scheduling_hash(scheduling_hash);
    233   }
    234   envelope.set_network_message(message);
    235   envelope.SerializeToString(encoded_message);
    236 }
    237 
    238 
    239 bool SyncNetworkChannel::DecodeMessage(
    240     const std::string& data,
    241     std::string* message,
    242     std::string* service_context,
    243     int64* scheduling_hash) {
    244   ipc::invalidation::ClientGatewayMessage envelope;
    245   if (!envelope.ParseFromString(data)) {
    246     return false;
    247   }
    248   *message = envelope.network_message();
    249   if (envelope.has_service_context()) {
    250     *service_context = envelope.service_context();
    251   }
    252   if (envelope.has_rpc_scheduling_hash()) {
    253     *scheduling_hash = envelope.rpc_scheduling_hash();
    254   }
    255   return true;
    256 }
    257 
    258 
    259 SyncStorage::SyncStorage(StateWriter* state_writer,
    260                          invalidation::Scheduler* scheduler)
    261     : state_writer_(state_writer),
    262       scheduler_(scheduler) {
    263   DCHECK(state_writer_);
    264   DCHECK(scheduler_);
    265 }
    266 
    267 SyncStorage::~SyncStorage() {}
    268 
    269 void SyncStorage::WriteKey(const std::string& key, const std::string& value,
    270                            invalidation::WriteKeyCallback* done) {
    271   CHECK(state_writer_);
    272   // TODO(ghc): actually write key,value associations, and don't invoke the
    273   // callback until the operation completes.
    274   state_writer_->WriteState(value);
    275   cached_state_ = value;
    276   // According to the cache invalidation API folks, we can do this as
    277   // long as we make sure to clear the persistent state that we start
    278   // up the cache invalidation client with.  However, we musn't do it
    279   // right away, as we may be called under a lock that the callback
    280   // uses.
    281   scheduler_->Schedule(
    282       invalidation::Scheduler::NoDelay(),
    283       invalidation::NewPermanentCallback(
    284           this, &SyncStorage::RunAndDeleteWriteKeyCallback,
    285           done));
    286 }
    287 
    288 void SyncStorage::ReadKey(const std::string& key,
    289                           invalidation::ReadKeyCallback* done) {
    290   DCHECK(scheduler_->IsRunningOnThread()) << "not running on scheduler thread";
    291   RunAndDeleteReadKeyCallback(done, cached_state_);
    292 }
    293 
    294 void SyncStorage::DeleteKey(const std::string& key,
    295                             invalidation::DeleteKeyCallback* done) {
    296   // TODO(ghc): Implement.
    297   LOG(WARNING) << "ignoring call to DeleteKey(" << key << ", callback)";
    298 }
    299 
    300 void SyncStorage::ReadAllKeys(invalidation::ReadAllKeysCallback* done) {
    301   // TODO(ghc): Implement.
    302   LOG(WARNING) << "ignoring call to ReadAllKeys(callback)";
    303 }
    304 
    305 void SyncStorage::SetSystemResources(
    306     invalidation::SystemResources* resources) {
    307   // Do nothing.
    308 }
    309 
    310 void SyncStorage::RunAndDeleteWriteKeyCallback(
    311     invalidation::WriteKeyCallback* callback) {
    312   callback->Run(
    313       invalidation::Status(invalidation::Status::SUCCESS, std::string()));
    314   delete callback;
    315 }
    316 
    317 void SyncStorage::RunAndDeleteReadKeyCallback(
    318     invalidation::ReadKeyCallback* callback, const std::string& value) {
    319   callback->Run(std::make_pair(
    320       invalidation::Status(invalidation::Status::SUCCESS, std::string()),
    321       value));
    322   delete callback;
    323 }
    324 
    325 SyncSystemResources::SyncSystemResources(
    326     SyncNetworkChannel* sync_network_channel,
    327     StateWriter* state_writer)
    328     : is_started_(false),
    329       logger_(new SyncLogger()),
    330       internal_scheduler_(new SyncInvalidationScheduler()),
    331       listener_scheduler_(new SyncInvalidationScheduler()),
    332       storage_(new SyncStorage(state_writer, internal_scheduler_.get())),
    333       sync_network_channel_(sync_network_channel) {
    334 }
    335 
    336 SyncSystemResources::~SyncSystemResources() {
    337   Stop();
    338 }
    339 
    340 void SyncSystemResources::Start() {
    341   internal_scheduler_->Start();
    342   listener_scheduler_->Start();
    343   is_started_ = true;
    344 }
    345 
    346 void SyncSystemResources::Stop() {
    347   internal_scheduler_->Stop();
    348   listener_scheduler_->Stop();
    349 }
    350 
    351 bool SyncSystemResources::IsStarted() const {
    352   return is_started_;
    353 }
    354 
    355 void SyncSystemResources::set_platform(const std::string& platform) {
    356   platform_ = platform;
    357 }
    358 
    359 std::string SyncSystemResources::platform() const {
    360   return platform_;
    361 }
    362 
    363 SyncLogger* SyncSystemResources::logger() {
    364   return logger_.get();
    365 }
    366 
    367 SyncStorage* SyncSystemResources::storage() {
    368   return storage_.get();
    369 }
    370 
    371 SyncNetworkChannel* SyncSystemResources::network() {
    372   return sync_network_channel_;
    373 }
    374 
    375 SyncInvalidationScheduler* SyncSystemResources::internal_scheduler() {
    376   return internal_scheduler_.get();
    377 }
    378 
    379 SyncInvalidationScheduler* SyncSystemResources::listener_scheduler() {
    380   return listener_scheduler_.get();
    381 }
    382 
    383 }  // namespace syncer
    384