1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "components/invalidation/sync_system_resources.h" 6 7 #include <cstdlib> 8 #include <cstring> 9 #include <string> 10 11 #include "base/bind.h" 12 #include "base/logging.h" 13 #include "base/message_loop/message_loop.h" 14 #include "base/stl_util.h" 15 #include "base/strings/string_util.h" 16 #include "base/strings/stringprintf.h" 17 #include "components/invalidation/gcm_network_channel.h" 18 #include "components/invalidation/gcm_network_channel_delegate.h" 19 #include "components/invalidation/invalidation_util.h" 20 #include "components/invalidation/push_client_channel.h" 21 #include "google/cacheinvalidation/deps/callback.h" 22 #include "google/cacheinvalidation/include/types.h" 23 #include "jingle/notifier/listener/push_client.h" 24 25 namespace syncer { 26 27 SyncLogger::SyncLogger() {} 28 SyncLogger::~SyncLogger() {} 29 30 void SyncLogger::Log(LogLevel level, const char* file, int line, 31 const char* format, ...) { 32 logging::LogSeverity log_severity = -2; // VLOG(2) 33 bool emit_log = false; 34 switch (level) { 35 case FINE_LEVEL: 36 log_severity = -2; // VLOG(2) 37 emit_log = VLOG_IS_ON(2); 38 break; 39 case INFO_LEVEL: 40 log_severity = -1; // VLOG(1) 41 emit_log = VLOG_IS_ON(1); 42 break; 43 case WARNING_LEVEL: 44 log_severity = logging::LOG_WARNING; 45 emit_log = LOG_IS_ON(WARNING); 46 break; 47 case SEVERE_LEVEL: 48 log_severity = logging::LOG_ERROR; 49 emit_log = LOG_IS_ON(ERROR); 50 break; 51 } 52 if (emit_log) { 53 va_list ap; 54 va_start(ap, format); 55 std::string result; 56 base::StringAppendV(&result, format, ap); 57 logging::LogMessage(file, line, log_severity).stream() << result; 58 va_end(ap); 59 } 60 } 61 62 void SyncLogger::SetSystemResources(invalidation::SystemResources* resources) { 63 // Do nothing. 64 } 65 66 SyncInvalidationScheduler::SyncInvalidationScheduler() 67 : created_on_loop_(base::MessageLoop::current()), 68 is_started_(false), 69 is_stopped_(false), 70 weak_factory_(this) { 71 CHECK(created_on_loop_); 72 } 73 74 SyncInvalidationScheduler::~SyncInvalidationScheduler() { 75 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 76 CHECK(is_stopped_); 77 } 78 79 void SyncInvalidationScheduler::Start() { 80 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 81 CHECK(!is_started_); 82 is_started_ = true; 83 is_stopped_ = false; 84 weak_factory_.InvalidateWeakPtrs(); 85 } 86 87 void SyncInvalidationScheduler::Stop() { 88 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 89 is_stopped_ = true; 90 is_started_ = false; 91 weak_factory_.InvalidateWeakPtrs(); 92 STLDeleteElements(&posted_tasks_); 93 posted_tasks_.clear(); 94 } 95 96 void SyncInvalidationScheduler::Schedule(invalidation::TimeDelta delay, 97 invalidation::Closure* task) { 98 DCHECK(invalidation::IsCallbackRepeatable(task)); 99 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 100 101 if (!is_started_) { 102 delete task; 103 return; 104 } 105 106 posted_tasks_.insert(task); 107 base::MessageLoop::current()->PostDelayedTask( 108 FROM_HERE, base::Bind(&SyncInvalidationScheduler::RunPostedTask, 109 weak_factory_.GetWeakPtr(), task), 110 delay); 111 } 112 113 bool SyncInvalidationScheduler::IsRunningOnThread() const { 114 return created_on_loop_ == base::MessageLoop::current(); 115 } 116 117 invalidation::Time SyncInvalidationScheduler::GetCurrentTime() const { 118 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 119 return base::Time::Now(); 120 } 121 122 void SyncInvalidationScheduler::SetSystemResources( 123 invalidation::SystemResources* resources) { 124 // Do nothing. 125 } 126 127 void SyncInvalidationScheduler::RunPostedTask(invalidation::Closure* task) { 128 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 129 task->Run(); 130 posted_tasks_.erase(task); 131 delete task; 132 } 133 134 SyncNetworkChannel::SyncNetworkChannel() 135 : last_network_status_(false), 136 received_messages_count_(0) {} 137 138 SyncNetworkChannel::~SyncNetworkChannel() { 139 STLDeleteElements(&network_status_receivers_); 140 } 141 142 void SyncNetworkChannel::SetMessageReceiver( 143 invalidation::MessageCallback* incoming_receiver) { 144 incoming_receiver_.reset(incoming_receiver); 145 } 146 147 void SyncNetworkChannel::AddNetworkStatusReceiver( 148 invalidation::NetworkStatusCallback* network_status_receiver) { 149 network_status_receiver->Run(last_network_status_); 150 network_status_receivers_.push_back(network_status_receiver); 151 } 152 153 void SyncNetworkChannel::SetSystemResources( 154 invalidation::SystemResources* resources) { 155 // Do nothing. 156 } 157 158 void SyncNetworkChannel::AddObserver(Observer* observer) { 159 observers_.AddObserver(observer); 160 } 161 162 void SyncNetworkChannel::RemoveObserver(Observer* observer) { 163 observers_.RemoveObserver(observer); 164 } 165 166 scoped_ptr<SyncNetworkChannel> SyncNetworkChannel::CreatePushClientChannel( 167 const notifier::NotifierOptions& notifier_options) { 168 scoped_ptr<notifier::PushClient> push_client( 169 notifier::PushClient::CreateDefaultOnIOThread(notifier_options)); 170 return scoped_ptr<SyncNetworkChannel>( 171 new PushClientChannel(push_client.Pass())); 172 } 173 174 scoped_ptr<SyncNetworkChannel> SyncNetworkChannel::CreateGCMNetworkChannel( 175 scoped_refptr<net::URLRequestContextGetter> request_context_getter, 176 scoped_ptr<GCMNetworkChannelDelegate> delegate) { 177 return scoped_ptr<SyncNetworkChannel>(new GCMNetworkChannel( 178 request_context_getter, delegate.Pass())); 179 } 180 181 void SyncNetworkChannel::NotifyNetworkStatusChange(bool online) { 182 // Remember network state for future NetworkStatusReceivers. 183 last_network_status_ = online; 184 // Notify NetworkStatusReceivers in cacheinvalidation. 185 for (NetworkStatusReceiverList::const_iterator it = 186 network_status_receivers_.begin(); 187 it != network_status_receivers_.end(); ++it) { 188 (*it)->Run(online); 189 } 190 } 191 192 void SyncNetworkChannel::NotifyChannelStateChange( 193 InvalidatorState invalidator_state) { 194 FOR_EACH_OBSERVER(Observer, observers_, 195 OnNetworkChannelStateChanged(invalidator_state)); 196 } 197 198 bool SyncNetworkChannel::DeliverIncomingMessage(const std::string& message) { 199 if (!incoming_receiver_) { 200 DLOG(ERROR) << "No receiver for incoming notification"; 201 return false; 202 } 203 received_messages_count_++; 204 incoming_receiver_->Run(message); 205 return true; 206 } 207 208 int SyncNetworkChannel::GetReceivedMessagesCount() const { 209 return received_messages_count_; 210 } 211 212 SyncStorage::SyncStorage(StateWriter* state_writer, 213 invalidation::Scheduler* scheduler) 214 : state_writer_(state_writer), 215 scheduler_(scheduler) { 216 DCHECK(state_writer_); 217 DCHECK(scheduler_); 218 } 219 220 SyncStorage::~SyncStorage() {} 221 222 void SyncStorage::WriteKey(const std::string& key, const std::string& value, 223 invalidation::WriteKeyCallback* done) { 224 CHECK(state_writer_); 225 // TODO(ghc): actually write key,value associations, and don't invoke the 226 // callback until the operation completes. 227 state_writer_->WriteState(value); 228 cached_state_ = value; 229 // According to the cache invalidation API folks, we can do this as 230 // long as we make sure to clear the persistent state that we start 231 // up the cache invalidation client with. However, we musn't do it 232 // right away, as we may be called under a lock that the callback 233 // uses. 234 scheduler_->Schedule( 235 invalidation::Scheduler::NoDelay(), 236 invalidation::NewPermanentCallback( 237 this, &SyncStorage::RunAndDeleteWriteKeyCallback, 238 done)); 239 } 240 241 void SyncStorage::ReadKey(const std::string& key, 242 invalidation::ReadKeyCallback* done) { 243 DCHECK(scheduler_->IsRunningOnThread()) << "not running on scheduler thread"; 244 RunAndDeleteReadKeyCallback(done, cached_state_); 245 } 246 247 void SyncStorage::DeleteKey(const std::string& key, 248 invalidation::DeleteKeyCallback* done) { 249 // TODO(ghc): Implement. 250 LOG(WARNING) << "ignoring call to DeleteKey(" << key << ", callback)"; 251 } 252 253 void SyncStorage::ReadAllKeys(invalidation::ReadAllKeysCallback* done) { 254 // TODO(ghc): Implement. 255 LOG(WARNING) << "ignoring call to ReadAllKeys(callback)"; 256 } 257 258 void SyncStorage::SetSystemResources( 259 invalidation::SystemResources* resources) { 260 // Do nothing. 261 } 262 263 void SyncStorage::RunAndDeleteWriteKeyCallback( 264 invalidation::WriteKeyCallback* callback) { 265 callback->Run( 266 invalidation::Status(invalidation::Status::SUCCESS, std::string())); 267 delete callback; 268 } 269 270 void SyncStorage::RunAndDeleteReadKeyCallback( 271 invalidation::ReadKeyCallback* callback, const std::string& value) { 272 callback->Run(std::make_pair( 273 invalidation::Status(invalidation::Status::SUCCESS, std::string()), 274 value)); 275 delete callback; 276 } 277 278 SyncSystemResources::SyncSystemResources( 279 SyncNetworkChannel* sync_network_channel, 280 StateWriter* state_writer) 281 : is_started_(false), 282 logger_(new SyncLogger()), 283 internal_scheduler_(new SyncInvalidationScheduler()), 284 listener_scheduler_(new SyncInvalidationScheduler()), 285 storage_(new SyncStorage(state_writer, internal_scheduler_.get())), 286 sync_network_channel_(sync_network_channel) { 287 } 288 289 SyncSystemResources::~SyncSystemResources() { 290 Stop(); 291 } 292 293 void SyncSystemResources::Start() { 294 internal_scheduler_->Start(); 295 listener_scheduler_->Start(); 296 is_started_ = true; 297 } 298 299 void SyncSystemResources::Stop() { 300 internal_scheduler_->Stop(); 301 listener_scheduler_->Stop(); 302 } 303 304 bool SyncSystemResources::IsStarted() const { 305 return is_started_; 306 } 307 308 void SyncSystemResources::set_platform(const std::string& platform) { 309 platform_ = platform; 310 } 311 312 std::string SyncSystemResources::platform() const { 313 return platform_; 314 } 315 316 SyncLogger* SyncSystemResources::logger() { 317 return logger_.get(); 318 } 319 320 SyncStorage* SyncSystemResources::storage() { 321 return storage_.get(); 322 } 323 324 SyncNetworkChannel* SyncSystemResources::network() { 325 return sync_network_channel_; 326 } 327 328 SyncInvalidationScheduler* SyncSystemResources::internal_scheduler() { 329 return internal_scheduler_.get(); 330 } 331 332 SyncInvalidationScheduler* SyncSystemResources::listener_scheduler() { 333 return listener_scheduler_.get(); 334 } 335 336 } // namespace syncer 337