1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "google_apis/gcm/engine/mcs_client.h" 6 7 #include <set> 8 9 #include "base/basictypes.h" 10 #include "base/message_loop/message_loop.h" 11 #include "base/metrics/histogram.h" 12 #include "base/strings/string_number_conversions.h" 13 #include "base/time/clock.h" 14 #include "base/time/time.h" 15 #include "google_apis/gcm/base/mcs_util.h" 16 #include "google_apis/gcm/base/socket_stream.h" 17 #include "google_apis/gcm/engine/connection_factory.h" 18 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h" 19 20 using namespace google::protobuf::io; 21 22 namespace gcm { 23 24 namespace { 25 26 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; 27 28 // The category of messages intended for the GCM client itself from MCS. 29 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice"; 30 31 // The from field for messages originating in the GCM client. 32 const char kGCMFromField[] = "gcm (at) android.com"; 33 34 // MCS status message types. 35 // TODO(zea): handle these at the GCMClient layer. 36 const char kIdleNotification[] = "IdleNotification"; 37 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle"; 38 // const char kPowerNotification[] = "PowerNotification"; 39 // const char kDataActiveNotification[] = "DataActiveNotification"; 40 41 // The number of unacked messages to allow before sending a stream ack. 42 // Applies to both incoming and outgoing messages. 43 // TODO(zea): make this server configurable. 44 const int kUnackedMessageBeforeStreamAck = 10; 45 46 // The global maximum number of pending messages to have in the send queue. 47 const size_t kMaxSendQueueSize = 10 * 1024; 48 49 // The maximum message size that can be sent to the server. 50 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server. 51 52 // Helper for converting a proto persistent id list to a vector of strings. 53 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, 54 std::vector<std::string>* id_list) { 55 mcs_proto::SelectiveAck selective_ack; 56 if (!selective_ack.ParseFromString(bytes)) 57 return false; 58 std::vector<std::string> new_list; 59 for (int i = 0; i < selective_ack.id_size(); ++i) { 60 DCHECK(!selective_ack.id(i).empty()); 61 new_list.push_back(selective_ack.id(i)); 62 } 63 id_list->swap(new_list); 64 return true; 65 } 66 67 } // namespace 68 69 class CollapseKey { 70 public: 71 explicit CollapseKey(const mcs_proto::DataMessageStanza& message); 72 ~CollapseKey(); 73 74 // Comparison operator for use in maps. 75 bool operator<(const CollapseKey& right) const; 76 77 // Whether the message had a valid collapse key. 78 bool IsValid() const; 79 80 std::string token() const { return token_; } 81 std::string app_id() const { return app_id_; } 82 int64 device_user_id() const { return device_user_id_; } 83 84 private: 85 const std::string token_; 86 const std::string app_id_; 87 const int64 device_user_id_; 88 }; 89 90 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) 91 : token_(message.token()), 92 app_id_(message.category()), 93 device_user_id_(message.device_user_id()) {} 94 95 CollapseKey::~CollapseKey() {} 96 97 bool CollapseKey::IsValid() const { 98 // Device user id is optional, but the application id and token are not. 99 return !token_.empty() && !app_id_.empty(); 100 } 101 102 bool CollapseKey::operator<(const CollapseKey& right) const { 103 if (device_user_id_ != right.device_user_id()) 104 return device_user_id_ < right.device_user_id(); 105 if (app_id_ != right.app_id()) 106 return app_id_ < right.app_id(); 107 return token_ < right.token(); 108 } 109 110 struct ReliablePacketInfo { 111 ReliablePacketInfo(); 112 ~ReliablePacketInfo(); 113 114 // The stream id with which the message was sent. 115 uint32 stream_id; 116 117 // If reliable delivery was requested, the persistent id of the message. 118 std::string persistent_id; 119 120 // The type of message itself (for easier lookup). 121 uint8 tag; 122 123 // The protobuf of the message itself. 124 MCSProto protobuf; 125 }; 126 127 ReliablePacketInfo::ReliablePacketInfo() 128 : stream_id(0), tag(0) { 129 } 130 ReliablePacketInfo::~ReliablePacketInfo() {} 131 132 int MCSClient::GetSendQueueSize() const { 133 return to_send_.size(); 134 } 135 136 int MCSClient::GetResendQueueSize() const { 137 return to_resend_.size(); 138 } 139 140 std::string MCSClient::GetStateString() const { 141 switch(state_) { 142 case UNINITIALIZED: 143 return "UNINITIALIZED"; 144 case LOADED: 145 return "LOADED"; 146 case CONNECTING: 147 return "CONNECTING"; 148 case CONNECTED: 149 return "CONNECTED"; 150 default: 151 NOTREACHED(); 152 return std::string(); 153 } 154 } 155 156 MCSClient::MCSClient(const std::string& version_string, 157 base::Clock* clock, 158 ConnectionFactory* connection_factory, 159 GCMStore* gcm_store, 160 GCMStatsRecorder* recorder) 161 : version_string_(version_string), 162 clock_(clock), 163 state_(UNINITIALIZED), 164 android_id_(0), 165 security_token_(0), 166 connection_factory_(connection_factory), 167 connection_handler_(NULL), 168 last_device_to_server_stream_id_received_(0), 169 last_server_to_device_stream_id_received_(0), 170 stream_id_out_(0), 171 stream_id_in_(0), 172 gcm_store_(gcm_store), 173 recorder_(recorder), 174 weak_ptr_factory_(this) { 175 } 176 177 MCSClient::~MCSClient() { 178 } 179 180 void MCSClient::Initialize( 181 const ErrorCallback& error_callback, 182 const OnMessageReceivedCallback& message_received_callback, 183 const OnMessageSentCallback& message_sent_callback, 184 scoped_ptr<GCMStore::LoadResult> load_result) { 185 DCHECK_EQ(state_, UNINITIALIZED); 186 187 state_ = LOADED; 188 mcs_error_callback_ = error_callback; 189 message_received_callback_ = message_received_callback; 190 message_sent_callback_ = message_sent_callback; 191 192 connection_factory_->Initialize( 193 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, 194 weak_ptr_factory_.GetWeakPtr()), 195 base::Bind(&MCSClient::HandlePacketFromWire, 196 weak_ptr_factory_.GetWeakPtr()), 197 base::Bind(&MCSClient::MaybeSendMessage, 198 weak_ptr_factory_.GetWeakPtr())); 199 connection_handler_ = connection_factory_->GetConnectionHandler(); 200 201 stream_id_out_ = 1; // Login request is hardcoded to id 1. 202 203 android_id_ = load_result->device_android_id; 204 security_token_ = load_result->device_security_token; 205 206 if (android_id_ == 0) { 207 DVLOG(1) << "No device credentials found, assuming new client."; 208 // No need to try and load RMQ data in that case. 209 return; 210 } 211 212 // |android_id_| is non-zero, so should |security_token_|. 213 DCHECK_NE(0u, security_token_) << "Security token invalid, while android id" 214 << " is non-zero."; 215 216 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size() 217 << " incoming acks pending and " 218 << load_result->outgoing_messages.size() 219 << " outgoing messages pending."; 220 221 restored_unackeds_server_ids_ = load_result->incoming_messages; 222 223 // First go through and order the outgoing messages by recency. 224 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; 225 std::vector<PersistentId> expired_ttl_ids; 226 for (GCMStore::OutgoingMessageMap::iterator iter = 227 load_result->outgoing_messages.begin(); 228 iter != load_result->outgoing_messages.end(); ++iter) { 229 uint64 timestamp = 0; 230 if (!base::StringToUint64(iter->first, ×tamp)) { 231 LOG(ERROR) << "Invalid restored message."; 232 // TODO(fgorski): Error: data unreadable 233 mcs_error_callback_.Run(); 234 return; 235 } 236 237 // Check if the TTL has expired for this message. 238 if (HasTTLExpired(*iter->second, clock_)) { 239 expired_ttl_ids.push_back(iter->first); 240 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED); 241 continue; 242 } 243 244 ordered_messages[timestamp] = iter->second.release(); 245 } 246 247 if (!expired_ttl_ids.empty()) { 248 gcm_store_->RemoveOutgoingMessages( 249 expired_ttl_ids, 250 base::Bind(&MCSClient::OnGCMUpdateFinished, 251 weak_ptr_factory_.GetWeakPtr())); 252 } 253 254 // Now go through and add the outgoing messages to the send queue in their 255 // appropriate order (oldest at front, most recent at back). 256 for (std::map<uint64, google::protobuf::MessageLite*>::iterator 257 iter = ordered_messages.begin(); 258 iter != ordered_messages.end(); ++iter) { 259 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 260 packet_info->protobuf.reset(iter->second); 261 packet_info->tag = GetMCSProtoTag(*iter->second); 262 packet_info->persistent_id = base::Uint64ToString(iter->first); 263 to_send_.push_back(make_linked_ptr(packet_info)); 264 265 if (packet_info->tag == kDataMessageStanzaTag) { 266 mcs_proto::DataMessageStanza* data_message = 267 reinterpret_cast<mcs_proto::DataMessageStanza*>( 268 packet_info->protobuf.get()); 269 CollapseKey collapse_key(*data_message); 270 if (collapse_key.IsValid()) 271 collapse_key_map_[collapse_key] = packet_info; 272 } 273 } 274 } 275 276 void MCSClient::Login(uint64 android_id, uint64 security_token) { 277 DCHECK_EQ(state_, LOADED); 278 DCHECK(android_id_ == 0 || android_id_ == android_id); 279 DCHECK(security_token_ == 0 || security_token_ == security_token); 280 281 if (android_id != android_id_ && security_token != security_token_) { 282 DCHECK(android_id); 283 DCHECK(security_token); 284 android_id_ = android_id; 285 security_token_ = security_token; 286 } 287 288 DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty()); 289 290 state_ = CONNECTING; 291 connection_factory_->Connect(); 292 } 293 294 void MCSClient::SendMessage(const MCSMessage& message) { 295 int ttl = GetTTL(message.GetProtobuf()); 296 DCHECK_GE(ttl, 0); 297 if (to_send_.size() > kMaxSendQueueSize) { 298 NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED); 299 return; 300 } 301 if (message.size() > kMaxMessageBytes) { 302 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); 303 return; 304 } 305 306 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); 307 packet_info->tag = message.tag(); 308 packet_info->protobuf = message.CloneProtobuf(); 309 310 if (ttl > 0) { 311 DCHECK_EQ(message.tag(), kDataMessageStanzaTag); 312 313 // First check if this message should replace a pending message with the 314 // same collapse key. 315 mcs_proto::DataMessageStanza* data_message = 316 reinterpret_cast<mcs_proto::DataMessageStanza*>( 317 packet_info->protobuf.get()); 318 CollapseKey collapse_key(*data_message); 319 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) { 320 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key]; 321 DVLOG(1) << "Found matching collapse key, Reusing persistent id of " 322 << original_packet->persistent_id; 323 original_packet->protobuf = packet_info->protobuf.Pass(); 324 SetPersistentId(original_packet->persistent_id, 325 original_packet->protobuf.get()); 326 gcm_store_->OverwriteOutgoingMessage( 327 original_packet->persistent_id, 328 message, 329 base::Bind(&MCSClient::OnGCMUpdateFinished, 330 weak_ptr_factory_.GetWeakPtr())); 331 332 // The message is already queued, return. 333 return; 334 } else { 335 PersistentId persistent_id = GetNextPersistentId(); 336 DVLOG(1) << "Setting persistent id to " << persistent_id; 337 packet_info->persistent_id = persistent_id; 338 SetPersistentId(persistent_id, packet_info->protobuf.get()); 339 if (!gcm_store_->AddOutgoingMessage( 340 persistent_id, 341 MCSMessage(message.tag(), *(packet_info->protobuf)), 342 base::Bind(&MCSClient::OnGCMUpdateFinished, 343 weak_ptr_factory_.GetWeakPtr()))) { 344 NotifyMessageSendStatus(message.GetProtobuf(), 345 APP_QUEUE_SIZE_LIMIT_REACHED); 346 return; 347 } 348 } 349 350 if (collapse_key.IsValid()) 351 collapse_key_map_[collapse_key] = packet_info.get(); 352 } else if (!connection_factory_->IsEndpointReachable()) { 353 DVLOG(1) << "No active connection, dropping message."; 354 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); 355 return; 356 } 357 358 to_send_.push_back(make_linked_ptr(packet_info.release())); 359 360 // Notify that the messages has been succsfully queued for sending. 361 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds. 362 NotifyMessageSendStatus(message.GetProtobuf(), QUEUED); 363 364 MaybeSendMessage(); 365 } 366 367 void MCSClient::ResetStateAndBuildLoginRequest( 368 mcs_proto::LoginRequest* request) { 369 DCHECK(android_id_); 370 DCHECK(security_token_); 371 stream_id_in_ = 0; 372 stream_id_out_ = 1; 373 last_device_to_server_stream_id_received_ = 0; 374 last_server_to_device_stream_id_received_ = 0; 375 376 heartbeat_manager_.Stop(); 377 378 // Add any pending acknowledgments to the list of ids. 379 for (StreamIdToPersistentIdMap::const_iterator iter = 380 unacked_server_ids_.begin(); 381 iter != unacked_server_ids_.end(); ++iter) { 382 restored_unackeds_server_ids_.push_back(iter->second); 383 } 384 unacked_server_ids_.clear(); 385 386 // Any acknowledged server ids which have not been confirmed by the server 387 // are treated like unacknowledged ids. 388 for (std::map<StreamId, PersistentIdList>::const_iterator iter = 389 acked_server_ids_.begin(); 390 iter != acked_server_ids_.end(); ++iter) { 391 restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(), 392 iter->second.begin(), 393 iter->second.end()); 394 } 395 acked_server_ids_.clear(); 396 397 // Then build the request, consuming all pending acknowledgments. 398 request->Swap(BuildLoginRequest(android_id_, 399 security_token_, 400 version_string_).get()); 401 for (PersistentIdList::const_iterator iter = 402 restored_unackeds_server_ids_.begin(); 403 iter != restored_unackeds_server_ids_.end(); ++iter) { 404 request->add_received_persistent_id(*iter); 405 } 406 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; 407 restored_unackeds_server_ids_.clear(); 408 409 // Push all unacknowledged messages to front of send queue. No need to save 410 // to RMQ, as all messages that reach this point should already have been 411 // saved as necessary. 412 while (!to_resend_.empty()) { 413 to_send_.push_front(to_resend_.back()); 414 to_resend_.pop_back(); 415 } 416 417 // Drop all TTL == 0 or expired TTL messages from the queue. 418 std::deque<MCSPacketInternal> new_to_send; 419 std::vector<PersistentId> expired_ttl_ids; 420 while (!to_send_.empty()) { 421 MCSPacketInternal packet = PopMessageForSend(); 422 if (GetTTL(*packet->protobuf) > 0 && 423 !HasTTLExpired(*packet->protobuf, clock_)) { 424 new_to_send.push_back(packet); 425 } else { 426 // If the TTL was 0 there is no persistent id, so no need to remove the 427 // message from the persistent store. 428 if (!packet->persistent_id.empty()) 429 expired_ttl_ids.push_back(packet->persistent_id); 430 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); 431 } 432 } 433 434 if (!expired_ttl_ids.empty()) { 435 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size() 436 << " messages expired."; 437 gcm_store_->RemoveOutgoingMessages( 438 expired_ttl_ids, 439 base::Bind(&MCSClient::OnGCMUpdateFinished, 440 weak_ptr_factory_.GetWeakPtr())); 441 } 442 443 to_send_.swap(new_to_send); 444 445 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() 446 << " incoming acks pending, and " << to_send_.size() 447 << " pending outgoing messages."; 448 449 state_ = CONNECTING; 450 } 451 452 void MCSClient::SendHeartbeat() { 453 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); 454 } 455 456 void MCSClient::OnGCMUpdateFinished(bool success) { 457 LOG_IF(ERROR, !success) << "GCM Update failed!"; 458 UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success); 459 // TODO(zea): Rebuild the store from scratch in case of persistence failure? 460 } 461 462 void MCSClient::MaybeSendMessage() { 463 if (to_send_.empty()) 464 return; 465 466 // If the connection has been reset, do nothing. On reconnection 467 // MaybeSendMessage will be automatically invoked again. 468 // TODO(zea): consider doing TTL expiration at connection reset time, rather 469 // than reconnect time. 470 if (!connection_factory_->IsEndpointReachable()) 471 return; 472 473 MCSPacketInternal packet = PopMessageForSend(); 474 if (HasTTLExpired(*packet->protobuf, clock_)) { 475 DCHECK(!packet->persistent_id.empty()); 476 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; 477 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); 478 gcm_store_->RemoveOutgoingMessage( 479 packet->persistent_id, 480 base::Bind(&MCSClient::OnGCMUpdateFinished, 481 weak_ptr_factory_.GetWeakPtr())); 482 base::MessageLoop::current()->PostTask( 483 FROM_HERE, 484 base::Bind(&MCSClient::MaybeSendMessage, 485 weak_ptr_factory_.GetWeakPtr())); 486 return; 487 } 488 DVLOG(1) << "Pending output message found, sending."; 489 if (!packet->persistent_id.empty()) 490 to_resend_.push_back(packet); 491 SendPacketToWire(packet.get()); 492 } 493 494 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { 495 packet_info->stream_id = ++stream_id_out_; 496 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); 497 498 // Set the queued time as necessary. 499 if (packet_info->tag == kDataMessageStanzaTag) { 500 mcs_proto::DataMessageStanza* data_message = 501 reinterpret_cast<mcs_proto::DataMessageStanza*>( 502 packet_info->protobuf.get()); 503 uint64 sent = data_message->sent(); 504 DCHECK_GT(sent, 0U); 505 int queued = (clock_->Now().ToInternalValue() / 506 base::Time::kMicrosecondsPerSecond) - sent; 507 DVLOG(1) << "Message was queued for " << queued << " seconds."; 508 data_message->set_queued(queued); 509 recorder_->RecordDataSentToWire( 510 data_message->category(), 511 data_message->to(), 512 data_message->id(), 513 queued); 514 } 515 516 // Set the proper last received stream id to acknowledge received server 517 // packets. 518 DVLOG(1) << "Setting last stream id received to " 519 << stream_id_in_; 520 SetLastStreamIdReceived(stream_id_in_, 521 packet_info->protobuf.get()); 522 if (stream_id_in_ != last_server_to_device_stream_id_received_) { 523 last_server_to_device_stream_id_received_ = stream_id_in_; 524 // Mark all acknowledged server messages as such. Note: they're not dropped, 525 // as it may be that they'll need to be re-acked if this message doesn't 526 // make it. 527 PersistentIdList persistent_id_list; 528 for (StreamIdToPersistentIdMap::const_iterator iter = 529 unacked_server_ids_.begin(); 530 iter != unacked_server_ids_.end(); ++iter) { 531 DCHECK_LE(iter->first, last_server_to_device_stream_id_received_); 532 persistent_id_list.push_back(iter->second); 533 } 534 unacked_server_ids_.clear(); 535 acked_server_ids_[stream_id_out_] = persistent_id_list; 536 } 537 538 connection_handler_->SendMessage(*packet_info->protobuf); 539 } 540 541 void MCSClient::HandleMCSDataMesssage( 542 scoped_ptr<google::protobuf::MessageLite> protobuf) { 543 mcs_proto::DataMessageStanza* data_message = 544 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); 545 // TODO(zea): implement a proper status manager rather than hardcoding these 546 // values. 547 scoped_ptr<mcs_proto::DataMessageStanza> response( 548 new mcs_proto::DataMessageStanza()); 549 response->set_from(kGCMFromField); 550 response->set_sent(clock_->Now().ToInternalValue() / 551 base::Time::kMicrosecondsPerSecond); 552 response->set_ttl(0); 553 bool send = false; 554 for (int i = 0; i < data_message->app_data_size(); ++i) { 555 const mcs_proto::AppData& app_data = data_message->app_data(i); 556 if (app_data.key() == kIdleNotification) { 557 // Tell the MCS server the client is not idle. 558 send = true; 559 mcs_proto::AppData data; 560 data.set_key(kIdleNotification); 561 data.set_value("false"); 562 response->add_app_data()->CopyFrom(data); 563 response->set_category(kMCSCategory); 564 } 565 } 566 567 if (send) { 568 SendMessage( 569 MCSMessage(kDataMessageStanzaTag, 570 response.PassAs<const google::protobuf::MessageLite>())); 571 } 572 } 573 574 void MCSClient::HandlePacketFromWire( 575 scoped_ptr<google::protobuf::MessageLite> protobuf) { 576 if (!protobuf.get()) 577 return; 578 uint8 tag = GetMCSProtoTag(*protobuf); 579 PersistentId persistent_id = GetPersistentId(*protobuf); 580 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); 581 582 if (last_stream_id_received != 0) { 583 last_device_to_server_stream_id_received_ = last_stream_id_received; 584 585 // Process device to server messages that have now been acknowledged by the 586 // server. Because messages are stored in order, just pop off all that have 587 // a stream id lower than server's last received stream id. 588 HandleStreamAck(last_stream_id_received); 589 590 // Process server_to_device_messages that the server now knows were 591 // acknowledged. Again, they're in order, so just keep going until the 592 // stream id is reached. 593 StreamIdList acked_stream_ids_to_remove; 594 for (std::map<StreamId, PersistentIdList>::iterator iter = 595 acked_server_ids_.begin(); 596 iter != acked_server_ids_.end() && 597 iter->first <= last_stream_id_received; ++iter) { 598 acked_stream_ids_to_remove.push_back(iter->first); 599 } 600 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin(); 601 iter != acked_stream_ids_to_remove.end(); ++iter) { 602 acked_server_ids_.erase(*iter); 603 } 604 } 605 606 ++stream_id_in_; 607 if (!persistent_id.empty()) { 608 unacked_server_ids_[stream_id_in_] = persistent_id; 609 gcm_store_->AddIncomingMessage(persistent_id, 610 base::Bind(&MCSClient::OnGCMUpdateFinished, 611 weak_ptr_factory_.GetWeakPtr())); 612 } 613 614 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() 615 << " with persistent id " 616 << (persistent_id.empty() ? "NULL" : persistent_id) 617 << ", stream id " << stream_id_in_ << " and last stream id received " 618 << last_stream_id_received; 619 620 if (unacked_server_ids_.size() > 0 && 621 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { 622 SendMessage(MCSMessage(kIqStanzaTag, 623 BuildStreamAck(). 624 PassAs<const google::protobuf::MessageLite>())); 625 } 626 627 // The connection is alive, treat this message as a heartbeat ack. 628 heartbeat_manager_.OnHeartbeatAcked(); 629 630 switch (tag) { 631 case kLoginResponseTag: { 632 DCHECK_EQ(CONNECTING, state_); 633 mcs_proto::LoginResponse* login_response = 634 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); 635 DVLOG(1) << "Received login response:"; 636 DVLOG(1) << " Id: " << login_response->id(); 637 DVLOG(1) << " Timestamp: " << login_response->server_timestamp(); 638 if (login_response->has_error() && login_response->error().code() != 0) { 639 state_ = UNINITIALIZED; 640 DVLOG(1) << " Error code: " << login_response->error().code(); 641 DVLOG(1) << " Error message: " << login_response->error().message(); 642 LOG(ERROR) << "Failed to log in to GCM, resetting connection."; 643 connection_factory_->SignalConnectionReset( 644 ConnectionFactory::LOGIN_FAILURE); 645 mcs_error_callback_.Run(); 646 return; 647 } 648 649 if (login_response->has_heartbeat_config()) { 650 heartbeat_manager_.UpdateHeartbeatConfig( 651 login_response->heartbeat_config()); 652 } 653 654 state_ = CONNECTED; 655 stream_id_in_ = 1; // To account for the login response. 656 DCHECK_EQ(1U, stream_id_out_); 657 658 // Pass the login response on up. 659 base::MessageLoop::current()->PostTask( 660 FROM_HERE, 661 base::Bind(message_received_callback_, 662 MCSMessage(tag, 663 protobuf.PassAs< 664 const google::protobuf::MessageLite>()))); 665 666 // If there are pending messages, attempt to send one. 667 if (!to_send_.empty()) { 668 base::MessageLoop::current()->PostTask( 669 FROM_HERE, 670 base::Bind(&MCSClient::MaybeSendMessage, 671 weak_ptr_factory_.GetWeakPtr())); 672 } 673 674 heartbeat_manager_.Start( 675 base::Bind(&MCSClient::SendHeartbeat, 676 weak_ptr_factory_.GetWeakPtr()), 677 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, 678 weak_ptr_factory_.GetWeakPtr())); 679 return; 680 } 681 case kHeartbeatPingTag: 682 DCHECK_GE(stream_id_in_, 1U); 683 DVLOG(1) << "Received heartbeat ping, sending ack."; 684 SendMessage( 685 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck())); 686 return; 687 case kHeartbeatAckTag: 688 DCHECK_GE(stream_id_in_, 1U); 689 DVLOG(1) << "Received heartbeat ack."; 690 // Do nothing else, all messages act as heartbeat acks. 691 return; 692 case kCloseTag: 693 LOG(ERROR) << "Received close command, resetting connection."; 694 state_ = LOADED; 695 connection_factory_->SignalConnectionReset( 696 ConnectionFactory::CLOSE_COMMAND); 697 return; 698 case kIqStanzaTag: { 699 DCHECK_GE(stream_id_in_, 1U); 700 mcs_proto::IqStanza* iq_stanza = 701 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get()); 702 const mcs_proto::Extension& iq_extension = iq_stanza->extension(); 703 switch (iq_extension.id()) { 704 case kSelectiveAck: { 705 PersistentIdList acked_ids; 706 if (BuildPersistentIdListFromProto(iq_extension.data(), 707 &acked_ids)) { 708 HandleSelectiveAck(acked_ids); 709 } 710 return; 711 } 712 case kStreamAck: 713 // Do nothing. The last received stream id is always processed if it's 714 // present. 715 return; 716 default: 717 LOG(WARNING) << "Received invalid iq stanza extension " 718 << iq_extension.id(); 719 return; 720 } 721 } 722 case kDataMessageStanzaTag: { 723 DCHECK_GE(stream_id_in_, 1U); 724 mcs_proto::DataMessageStanza* data_message = 725 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); 726 if (data_message->category() == kMCSCategory) { 727 HandleMCSDataMesssage(protobuf.Pass()); 728 return; 729 } 730 731 DCHECK(protobuf.get()); 732 base::MessageLoop::current()->PostTask( 733 FROM_HERE, 734 base::Bind(message_received_callback_, 735 MCSMessage(tag, 736 protobuf.PassAs< 737 const google::protobuf::MessageLite>()))); 738 return; 739 } 740 default: 741 LOG(ERROR) << "Received unexpected message of type " 742 << static_cast<int>(tag); 743 return; 744 } 745 } 746 747 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) { 748 PersistentIdList acked_outgoing_persistent_ids; 749 StreamIdList acked_outgoing_stream_ids; 750 while (!to_resend_.empty() && 751 to_resend_.front()->stream_id <= last_stream_id_received) { 752 const MCSPacketInternal& outgoing_packet = to_resend_.front(); 753 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id); 754 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id); 755 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); 756 to_resend_.pop_front(); 757 } 758 759 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() 760 << " outgoing messages, " << to_resend_.size() 761 << " remaining unacked"; 762 gcm_store_->RemoveOutgoingMessages( 763 acked_outgoing_persistent_ids, 764 base::Bind(&MCSClient::OnGCMUpdateFinished, 765 weak_ptr_factory_.GetWeakPtr())); 766 767 HandleServerConfirmedReceipt(last_stream_id_received); 768 } 769 770 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { 771 std::set<PersistentId> remaining_ids(id_list.begin(), id_list.end()); 772 773 StreamId last_stream_id_received = -1; 774 775 // First check the to_resend_ queue. Acknowledgments are always contiguous, 776 // so if there's a pending message that hasn't been acked, all newer messages 777 // must also be unacked. 778 while(!to_resend_.empty() && !remaining_ids.empty()) { 779 const MCSPacketInternal& outgoing_packet = to_resend_.front(); 780 if (remaining_ids.count(outgoing_packet->persistent_id) == 0) 781 break; // Newer message must be unacked too. 782 remaining_ids.erase(outgoing_packet->persistent_id); 783 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); 784 785 // No need to re-acknowledge any server messages this message already 786 // acknowledged. 787 StreamId device_stream_id = outgoing_packet->stream_id; 788 if (device_stream_id > last_stream_id_received) 789 last_stream_id_received = device_stream_id; 790 to_resend_.pop_front(); 791 } 792 793 // If the acknowledged ids aren't all there, they might be in the to_send_ 794 // queue (typically when a SelectiveAck confirms messages as part of a login 795 // response). 796 while (!to_send_.empty() && !remaining_ids.empty()) { 797 const MCSPacketInternal& outgoing_packet = to_send_.front(); 798 if (remaining_ids.count(outgoing_packet->persistent_id) == 0) 799 break; // Newer messages must be unacked too. 800 remaining_ids.erase(outgoing_packet->persistent_id); 801 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); 802 803 // No need to re-acknowledge any server messages this message already 804 // acknowledged. 805 StreamId device_stream_id = outgoing_packet->stream_id; 806 if (device_stream_id > last_stream_id_received) 807 last_stream_id_received = device_stream_id; 808 PopMessageForSend(); 809 } 810 811 // Only handle the largest stream id value. All other stream ids are 812 // implicitly handled. 813 if (last_stream_id_received > 0) 814 HandleServerConfirmedReceipt(last_stream_id_received); 815 816 // At this point, all remaining acked ids are redundant. 817 PersistentIdList acked_ids; 818 if (remaining_ids.size() > 0) { 819 for (size_t i = 0; i < id_list.size(); ++i) { 820 if (remaining_ids.count(id_list[i]) > 0) 821 continue; 822 acked_ids.push_back(id_list[i]); 823 } 824 } else { 825 acked_ids = id_list; 826 } 827 828 DVLOG(1) << "Server acked " << acked_ids.size() 829 << " messages, " << to_resend_.size() << " remaining unacked."; 830 gcm_store_->RemoveOutgoingMessages( 831 acked_ids, 832 base::Bind(&MCSClient::OnGCMUpdateFinished, 833 weak_ptr_factory_.GetWeakPtr())); 834 835 // Resend any remaining outgoing messages, as they were not received by the 836 // server. 837 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; 838 while (!to_resend_.empty()) { 839 to_send_.push_front(to_resend_.back()); 840 to_resend_.pop_back(); 841 } 842 } 843 844 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) { 845 PersistentIdList acked_incoming_ids; 846 for (std::map<StreamId, PersistentIdList>::iterator iter = 847 acked_server_ids_.begin(); 848 iter != acked_server_ids_.end() && 849 iter->first <= device_stream_id;) { 850 acked_incoming_ids.insert(acked_incoming_ids.end(), 851 iter->second.begin(), 852 iter->second.end()); 853 acked_server_ids_.erase(iter++); 854 } 855 856 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() 857 << " acknowledged server messages."; 858 gcm_store_->RemoveIncomingMessages( 859 acked_incoming_ids, 860 base::Bind(&MCSClient::OnGCMUpdateFinished, 861 weak_ptr_factory_.GetWeakPtr())); 862 } 863 864 MCSClient::PersistentId MCSClient::GetNextPersistentId() { 865 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); 866 } 867 868 void MCSClient::OnConnectionResetByHeartbeat() { 869 connection_factory_->SignalConnectionReset( 870 ConnectionFactory::HEARTBEAT_FAILURE); 871 } 872 873 void MCSClient::NotifyMessageSendStatus( 874 const google::protobuf::MessageLite& protobuf, 875 MessageSendStatus status) { 876 if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag) 877 return; 878 879 const mcs_proto::DataMessageStanza* data_message_stanza = 880 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf); 881 recorder_->RecordNotifySendStatus( 882 data_message_stanza->category(), 883 data_message_stanza->to(), 884 data_message_stanza->id(), 885 status, 886 protobuf.ByteSize(), 887 data_message_stanza->ttl()); 888 message_sent_callback_.Run( 889 data_message_stanza->device_user_id(), 890 data_message_stanza->category(), 891 data_message_stanza->id(), 892 status); 893 } 894 895 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() { 896 MCSPacketInternal packet = to_send_.front(); 897 to_send_.pop_front(); 898 899 if (packet->tag == kDataMessageStanzaTag) { 900 mcs_proto::DataMessageStanza* data_message = 901 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); 902 CollapseKey collapse_key(*data_message); 903 if (collapse_key.IsValid()) 904 collapse_key_map_.erase(collapse_key); 905 } 906 907 return packet; 908 } 909 910 } // namespace gcm 911