1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "sync/notifier/sync_system_resources.h" 6 7 #include <cstdlib> 8 #include <cstring> 9 #include <string> 10 11 #include "base/bind.h" 12 #include "base/logging.h" 13 #include "base/message_loop/message_loop.h" 14 #include "base/stl_util.h" 15 #include "base/strings/string_util.h" 16 #include "base/strings/stringprintf.h" 17 #include "google/cacheinvalidation/client_gateway.pb.h" 18 #include "google/cacheinvalidation/deps/callback.h" 19 #include "google/cacheinvalidation/include/types.h" 20 #include "sync/notifier/invalidation_util.h" 21 22 namespace syncer { 23 24 SyncLogger::SyncLogger() {} 25 SyncLogger::~SyncLogger() {} 26 27 void SyncLogger::Log(LogLevel level, const char* file, int line, 28 const char* format, ...) { 29 logging::LogSeverity log_severity = -2; // VLOG(2) 30 bool emit_log = false; 31 switch (level) { 32 case FINE_LEVEL: 33 log_severity = -2; // VLOG(2) 34 emit_log = VLOG_IS_ON(2); 35 break; 36 case INFO_LEVEL: 37 log_severity = -1; // VLOG(1) 38 emit_log = VLOG_IS_ON(1); 39 break; 40 case WARNING_LEVEL: 41 log_severity = logging::LOG_WARNING; 42 emit_log = LOG_IS_ON(WARNING); 43 break; 44 case SEVERE_LEVEL: 45 log_severity = logging::LOG_ERROR; 46 emit_log = LOG_IS_ON(ERROR); 47 break; 48 } 49 if (emit_log) { 50 va_list ap; 51 va_start(ap, format); 52 std::string result; 53 base::StringAppendV(&result, format, ap); 54 logging::LogMessage(file, line, log_severity).stream() << result; 55 va_end(ap); 56 } 57 } 58 59 void SyncLogger::SetSystemResources(invalidation::SystemResources* resources) { 60 // Do nothing. 61 } 62 63 SyncInvalidationScheduler::SyncInvalidationScheduler() 64 : created_on_loop_(base::MessageLoop::current()), 65 is_started_(false), 66 is_stopped_(false), 67 weak_factory_(this) { 68 CHECK(created_on_loop_); 69 } 70 71 SyncInvalidationScheduler::~SyncInvalidationScheduler() { 72 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 73 CHECK(is_stopped_); 74 } 75 76 void SyncInvalidationScheduler::Start() { 77 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 78 CHECK(!is_started_); 79 is_started_ = true; 80 is_stopped_ = false; 81 weak_factory_.InvalidateWeakPtrs(); 82 } 83 84 void SyncInvalidationScheduler::Stop() { 85 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 86 is_stopped_ = true; 87 is_started_ = false; 88 weak_factory_.InvalidateWeakPtrs(); 89 STLDeleteElements(&posted_tasks_); 90 posted_tasks_.clear(); 91 } 92 93 void SyncInvalidationScheduler::Schedule(invalidation::TimeDelta delay, 94 invalidation::Closure* task) { 95 DCHECK(invalidation::IsCallbackRepeatable(task)); 96 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 97 98 if (!is_started_) { 99 delete task; 100 return; 101 } 102 103 posted_tasks_.insert(task); 104 base::MessageLoop::current()->PostDelayedTask( 105 FROM_HERE, base::Bind(&SyncInvalidationScheduler::RunPostedTask, 106 weak_factory_.GetWeakPtr(), task), 107 delay); 108 } 109 110 bool SyncInvalidationScheduler::IsRunningOnThread() const { 111 return created_on_loop_ == base::MessageLoop::current(); 112 } 113 114 invalidation::Time SyncInvalidationScheduler::GetCurrentTime() const { 115 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 116 return base::Time::Now(); 117 } 118 119 void SyncInvalidationScheduler::SetSystemResources( 120 invalidation::SystemResources* resources) { 121 // Do nothing. 122 } 123 124 void SyncInvalidationScheduler::RunPostedTask(invalidation::Closure* task) { 125 CHECK_EQ(created_on_loop_, base::MessageLoop::current()); 126 task->Run(); 127 posted_tasks_.erase(task); 128 delete task; 129 } 130 131 SyncNetworkChannel::SyncNetworkChannel() 132 : invalidator_state_(DEFAULT_INVALIDATION_ERROR), 133 scheduling_hash_(0) { 134 } 135 136 SyncNetworkChannel::~SyncNetworkChannel() { 137 STLDeleteElements(&network_status_receivers_); 138 } 139 140 void SyncNetworkChannel::SendMessage(const std::string& outgoing_message) { 141 std::string encoded_message; 142 EncodeMessage(&encoded_message, outgoing_message, service_context_, 143 scheduling_hash_); 144 SendEncodedMessage(encoded_message); 145 } 146 147 void SyncNetworkChannel::SetMessageReceiver( 148 invalidation::MessageCallback* incoming_receiver) { 149 incoming_receiver_.reset(incoming_receiver); 150 } 151 152 void SyncNetworkChannel::AddNetworkStatusReceiver( 153 invalidation::NetworkStatusCallback* network_status_receiver) { 154 network_status_receiver->Run(invalidator_state_ == INVALIDATIONS_ENABLED); 155 network_status_receivers_.push_back(network_status_receiver); 156 } 157 158 void SyncNetworkChannel::SetSystemResources( 159 invalidation::SystemResources* resources) { 160 // Do nothing. 161 } 162 163 void SyncNetworkChannel::AddObserver(Observer* observer) { 164 observers_.AddObserver(observer); 165 } 166 167 void SyncNetworkChannel::RemoveObserver(Observer* observer) { 168 observers_.RemoveObserver(observer); 169 } 170 171 const std::string& SyncNetworkChannel::GetServiceContextForTest() const { 172 return service_context_; 173 } 174 175 int64 SyncNetworkChannel::GetSchedulingHashForTest() const { 176 return scheduling_hash_; 177 } 178 179 std::string SyncNetworkChannel::EncodeMessageForTest( 180 const std::string& message, const std::string& service_context, 181 int64 scheduling_hash) { 182 std::string encoded_message; 183 EncodeMessage(&encoded_message, message, service_context, scheduling_hash); 184 return encoded_message; 185 } 186 187 bool SyncNetworkChannel::DecodeMessageForTest( 188 const std::string& data, 189 std::string* message, 190 std::string* service_context, 191 int64* scheduling_hash) { 192 return DecodeMessage(data, message, service_context, scheduling_hash); 193 } 194 195 void SyncNetworkChannel::NotifyStateChange(InvalidatorState invalidator_state) { 196 // Remember state for future NetworkStatusReceivers. 197 invalidator_state_ = invalidator_state; 198 // Notify NetworkStatusReceivers in cacheinvalidation. 199 for (NetworkStatusReceiverList::const_iterator it = 200 network_status_receivers_.begin(); 201 it != network_status_receivers_.end(); ++it) { 202 (*it)->Run(invalidator_state_ == INVALIDATIONS_ENABLED); 203 } 204 // Notify observers. 205 FOR_EACH_OBSERVER(Observer, observers_, 206 OnNetworkChannelStateChanged(invalidator_state_)); 207 } 208 209 void SyncNetworkChannel::DeliverIncomingMessage(const std::string& data) { 210 if (!incoming_receiver_) { 211 DLOG(ERROR) << "No receiver for incoming notification"; 212 return; 213 } 214 std::string message; 215 if (!DecodeMessage(data, 216 &message, &service_context_, &scheduling_hash_)) { 217 DLOG(ERROR) << "Could not parse ClientGatewayMessage"; 218 return; 219 } 220 incoming_receiver_->Run(message); 221 } 222 223 void SyncNetworkChannel::EncodeMessage( 224 std::string* encoded_message, 225 const std::string& message, 226 const std::string& service_context, 227 int64 scheduling_hash) { 228 ipc::invalidation::ClientGatewayMessage envelope; 229 envelope.set_is_client_to_server(true); 230 if (!service_context.empty()) { 231 envelope.set_service_context(service_context); 232 envelope.set_rpc_scheduling_hash(scheduling_hash); 233 } 234 envelope.set_network_message(message); 235 envelope.SerializeToString(encoded_message); 236 } 237 238 239 bool SyncNetworkChannel::DecodeMessage( 240 const std::string& data, 241 std::string* message, 242 std::string* service_context, 243 int64* scheduling_hash) { 244 ipc::invalidation::ClientGatewayMessage envelope; 245 if (!envelope.ParseFromString(data)) { 246 return false; 247 } 248 *message = envelope.network_message(); 249 if (envelope.has_service_context()) { 250 *service_context = envelope.service_context(); 251 } 252 if (envelope.has_rpc_scheduling_hash()) { 253 *scheduling_hash = envelope.rpc_scheduling_hash(); 254 } 255 return true; 256 } 257 258 259 SyncStorage::SyncStorage(StateWriter* state_writer, 260 invalidation::Scheduler* scheduler) 261 : state_writer_(state_writer), 262 scheduler_(scheduler) { 263 DCHECK(state_writer_); 264 DCHECK(scheduler_); 265 } 266 267 SyncStorage::~SyncStorage() {} 268 269 void SyncStorage::WriteKey(const std::string& key, const std::string& value, 270 invalidation::WriteKeyCallback* done) { 271 CHECK(state_writer_); 272 // TODO(ghc): actually write key,value associations, and don't invoke the 273 // callback until the operation completes. 274 state_writer_->WriteState(value); 275 cached_state_ = value; 276 // According to the cache invalidation API folks, we can do this as 277 // long as we make sure to clear the persistent state that we start 278 // up the cache invalidation client with. However, we musn't do it 279 // right away, as we may be called under a lock that the callback 280 // uses. 281 scheduler_->Schedule( 282 invalidation::Scheduler::NoDelay(), 283 invalidation::NewPermanentCallback( 284 this, &SyncStorage::RunAndDeleteWriteKeyCallback, 285 done)); 286 } 287 288 void SyncStorage::ReadKey(const std::string& key, 289 invalidation::ReadKeyCallback* done) { 290 DCHECK(scheduler_->IsRunningOnThread()) << "not running on scheduler thread"; 291 RunAndDeleteReadKeyCallback(done, cached_state_); 292 } 293 294 void SyncStorage::DeleteKey(const std::string& key, 295 invalidation::DeleteKeyCallback* done) { 296 // TODO(ghc): Implement. 297 LOG(WARNING) << "ignoring call to DeleteKey(" << key << ", callback)"; 298 } 299 300 void SyncStorage::ReadAllKeys(invalidation::ReadAllKeysCallback* done) { 301 // TODO(ghc): Implement. 302 LOG(WARNING) << "ignoring call to ReadAllKeys(callback)"; 303 } 304 305 void SyncStorage::SetSystemResources( 306 invalidation::SystemResources* resources) { 307 // Do nothing. 308 } 309 310 void SyncStorage::RunAndDeleteWriteKeyCallback( 311 invalidation::WriteKeyCallback* callback) { 312 callback->Run( 313 invalidation::Status(invalidation::Status::SUCCESS, std::string())); 314 delete callback; 315 } 316 317 void SyncStorage::RunAndDeleteReadKeyCallback( 318 invalidation::ReadKeyCallback* callback, const std::string& value) { 319 callback->Run(std::make_pair( 320 invalidation::Status(invalidation::Status::SUCCESS, std::string()), 321 value)); 322 delete callback; 323 } 324 325 SyncSystemResources::SyncSystemResources( 326 SyncNetworkChannel* sync_network_channel, 327 StateWriter* state_writer) 328 : is_started_(false), 329 logger_(new SyncLogger()), 330 internal_scheduler_(new SyncInvalidationScheduler()), 331 listener_scheduler_(new SyncInvalidationScheduler()), 332 storage_(new SyncStorage(state_writer, internal_scheduler_.get())), 333 sync_network_channel_(sync_network_channel) { 334 } 335 336 SyncSystemResources::~SyncSystemResources() { 337 Stop(); 338 } 339 340 void SyncSystemResources::Start() { 341 internal_scheduler_->Start(); 342 listener_scheduler_->Start(); 343 is_started_ = true; 344 } 345 346 void SyncSystemResources::Stop() { 347 internal_scheduler_->Stop(); 348 listener_scheduler_->Stop(); 349 } 350 351 bool SyncSystemResources::IsStarted() const { 352 return is_started_; 353 } 354 355 void SyncSystemResources::set_platform(const std::string& platform) { 356 platform_ = platform; 357 } 358 359 std::string SyncSystemResources::platform() const { 360 return platform_; 361 } 362 363 SyncLogger* SyncSystemResources::logger() { 364 return logger_.get(); 365 } 366 367 SyncStorage* SyncSystemResources::storage() { 368 return storage_.get(); 369 } 370 371 SyncNetworkChannel* SyncSystemResources::network() { 372 return sync_network_channel_; 373 } 374 375 SyncInvalidationScheduler* SyncSystemResources::internal_scheduler() { 376 return internal_scheduler_.get(); 377 } 378 379 SyncInvalidationScheduler* SyncSystemResources::listener_scheduler() { 380 return listener_scheduler_.get(); 381 } 382 383 } // namespace syncer 384