Home | History | Annotate | Download | only in engine
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "google_apis/gcm/engine/mcs_client.h"
      6 
      7 #include <set>
      8 
      9 #include "base/basictypes.h"
     10 #include "base/message_loop/message_loop.h"
     11 #include "base/metrics/histogram.h"
     12 #include "base/strings/string_number_conversions.h"
     13 #include "base/time/clock.h"
     14 #include "base/time/time.h"
     15 #include "google_apis/gcm/base/mcs_util.h"
     16 #include "google_apis/gcm/base/socket_stream.h"
     17 #include "google_apis/gcm/engine/connection_factory.h"
     18 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h"
     19 
     20 using namespace google::protobuf::io;
     21 
     22 namespace gcm {
     23 
     24 namespace {
     25 
     26 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
     27 
     28 // The category of messages intended for the GCM client itself from MCS.
     29 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
     30 
     31 // The from field for messages originating in the GCM client.
     32 const char kGCMFromField[] = "gcm (at) android.com";
     33 
     34 // MCS status message types.
     35 // TODO(zea): handle these at the GCMClient layer.
     36 const char kIdleNotification[] = "IdleNotification";
     37 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
     38 // const char kPowerNotification[] = "PowerNotification";
     39 // const char kDataActiveNotification[] = "DataActiveNotification";
     40 
     41 // The number of unacked messages to allow before sending a stream ack.
     42 // Applies to both incoming and outgoing messages.
     43 // TODO(zea): make this server configurable.
     44 const int kUnackedMessageBeforeStreamAck = 10;
     45 
     46 // The global maximum number of pending messages to have in the send queue.
     47 const size_t kMaxSendQueueSize = 10 * 1024;
     48 
     49 // The maximum message size that can be sent to the server.
     50 const int kMaxMessageBytes = 4 * 1024;  // 4KB, like the server.
     51 
     52 // Helper for converting a proto persistent id list to a vector of strings.
     53 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
     54                                     std::vector<std::string>* id_list) {
     55   mcs_proto::SelectiveAck selective_ack;
     56   if (!selective_ack.ParseFromString(bytes))
     57     return false;
     58   std::vector<std::string> new_list;
     59   for (int i = 0; i < selective_ack.id_size(); ++i) {
     60     DCHECK(!selective_ack.id(i).empty());
     61     new_list.push_back(selective_ack.id(i));
     62   }
     63   id_list->swap(new_list);
     64   return true;
     65 }
     66 
     67 }  // namespace
     68 
     69 class CollapseKey {
     70  public:
     71   explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
     72   ~CollapseKey();
     73 
     74   // Comparison operator for use in maps.
     75   bool operator<(const CollapseKey& right) const;
     76 
     77   // Whether the message had a valid collapse key.
     78   bool IsValid() const;
     79 
     80   std::string token() const { return token_; }
     81   std::string app_id() const { return app_id_; }
     82   int64 device_user_id() const { return device_user_id_; }
     83 
     84  private:
     85   const std::string token_;
     86   const std::string app_id_;
     87   const int64 device_user_id_;
     88 };
     89 
     90 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
     91     : token_(message.token()),
     92       app_id_(message.category()),
     93       device_user_id_(message.device_user_id()) {}
     94 
     95 CollapseKey::~CollapseKey() {}
     96 
     97 bool CollapseKey::IsValid() const {
     98   // Device user id is optional, but the application id and token are not.
     99   return !token_.empty() && !app_id_.empty();
    100 }
    101 
    102 bool CollapseKey::operator<(const CollapseKey& right) const {
    103   if (device_user_id_ != right.device_user_id())
    104     return device_user_id_ < right.device_user_id();
    105   if (app_id_ != right.app_id())
    106     return app_id_ < right.app_id();
    107   return token_ < right.token();
    108 }
    109 
    110 struct ReliablePacketInfo {
    111   ReliablePacketInfo();
    112   ~ReliablePacketInfo();
    113 
    114   // The stream id with which the message was sent.
    115   uint32 stream_id;
    116 
    117   // If reliable delivery was requested, the persistent id of the message.
    118   std::string persistent_id;
    119 
    120   // The type of message itself (for easier lookup).
    121   uint8 tag;
    122 
    123   // The protobuf of the message itself.
    124   MCSProto protobuf;
    125 };
    126 
    127 ReliablePacketInfo::ReliablePacketInfo()
    128   : stream_id(0), tag(0) {
    129 }
    130 ReliablePacketInfo::~ReliablePacketInfo() {}
    131 
    132 int MCSClient::GetSendQueueSize() const {
    133   return to_send_.size();
    134 }
    135 
    136 int MCSClient::GetResendQueueSize() const {
    137   return to_resend_.size();
    138 }
    139 
    140 std::string MCSClient::GetStateString() const {
    141   switch(state_) {
    142     case UNINITIALIZED:
    143       return "UNINITIALIZED";
    144     case LOADED:
    145       return "LOADED";
    146     case CONNECTING:
    147       return "CONNECTING";
    148     case CONNECTED:
    149       return "CONNECTED";
    150     default:
    151       NOTREACHED();
    152       return std::string();
    153   }
    154 }
    155 
    156 MCSClient::MCSClient(const std::string& version_string,
    157                      base::Clock* clock,
    158                      ConnectionFactory* connection_factory,
    159                      GCMStore* gcm_store,
    160                      GCMStatsRecorder* recorder)
    161     : version_string_(version_string),
    162       clock_(clock),
    163       state_(UNINITIALIZED),
    164       android_id_(0),
    165       security_token_(0),
    166       connection_factory_(connection_factory),
    167       connection_handler_(NULL),
    168       last_device_to_server_stream_id_received_(0),
    169       last_server_to_device_stream_id_received_(0),
    170       stream_id_out_(0),
    171       stream_id_in_(0),
    172       gcm_store_(gcm_store),
    173       recorder_(recorder),
    174       weak_ptr_factory_(this) {
    175 }
    176 
    177 MCSClient::~MCSClient() {
    178 }
    179 
    180 void MCSClient::Initialize(
    181     const ErrorCallback& error_callback,
    182     const OnMessageReceivedCallback& message_received_callback,
    183     const OnMessageSentCallback& message_sent_callback,
    184     scoped_ptr<GCMStore::LoadResult> load_result) {
    185   DCHECK_EQ(state_, UNINITIALIZED);
    186 
    187   state_ = LOADED;
    188   mcs_error_callback_ = error_callback;
    189   message_received_callback_ = message_received_callback;
    190   message_sent_callback_ = message_sent_callback;
    191 
    192   connection_factory_->Initialize(
    193       base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
    194                  weak_ptr_factory_.GetWeakPtr()),
    195       base::Bind(&MCSClient::HandlePacketFromWire,
    196                  weak_ptr_factory_.GetWeakPtr()),
    197       base::Bind(&MCSClient::MaybeSendMessage,
    198                  weak_ptr_factory_.GetWeakPtr()));
    199   connection_handler_ = connection_factory_->GetConnectionHandler();
    200 
    201   stream_id_out_ = 1;  // Login request is hardcoded to id 1.
    202 
    203   android_id_ = load_result->device_android_id;
    204   security_token_ = load_result->device_security_token;
    205 
    206   if (android_id_ == 0) {
    207     DVLOG(1) << "No device credentials found, assuming new client.";
    208     // No need to try and load RMQ data in that case.
    209     return;
    210   }
    211 
    212   // |android_id_| is non-zero, so should |security_token_|.
    213   DCHECK_NE(0u, security_token_) << "Security token invalid, while android id"
    214                                  << " is non-zero.";
    215 
    216   DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size()
    217            << " incoming acks pending and "
    218            << load_result->outgoing_messages.size()
    219            << " outgoing messages pending.";
    220 
    221   restored_unackeds_server_ids_ = load_result->incoming_messages;
    222 
    223   // First go through and order the outgoing messages by recency.
    224   std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
    225   std::vector<PersistentId> expired_ttl_ids;
    226   for (GCMStore::OutgoingMessageMap::iterator iter =
    227            load_result->outgoing_messages.begin();
    228        iter != load_result->outgoing_messages.end(); ++iter) {
    229     uint64 timestamp = 0;
    230     if (!base::StringToUint64(iter->first, &timestamp)) {
    231       LOG(ERROR) << "Invalid restored message.";
    232       // TODO(fgorski): Error: data unreadable
    233       mcs_error_callback_.Run();
    234       return;
    235     }
    236 
    237     // Check if the TTL has expired for this message.
    238     if (HasTTLExpired(*iter->second, clock_)) {
    239       expired_ttl_ids.push_back(iter->first);
    240       NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED);
    241       continue;
    242     }
    243 
    244     ordered_messages[timestamp] = iter->second.release();
    245   }
    246 
    247   if (!expired_ttl_ids.empty()) {
    248     gcm_store_->RemoveOutgoingMessages(
    249         expired_ttl_ids,
    250         base::Bind(&MCSClient::OnGCMUpdateFinished,
    251                    weak_ptr_factory_.GetWeakPtr()));
    252   }
    253 
    254   // Now go through and add the outgoing messages to the send queue in their
    255   // appropriate order (oldest at front, most recent at back).
    256   for (std::map<uint64, google::protobuf::MessageLite*>::iterator
    257            iter = ordered_messages.begin();
    258        iter != ordered_messages.end(); ++iter) {
    259     ReliablePacketInfo* packet_info = new ReliablePacketInfo();
    260     packet_info->protobuf.reset(iter->second);
    261     packet_info->tag = GetMCSProtoTag(*iter->second);
    262     packet_info->persistent_id = base::Uint64ToString(iter->first);
    263     to_send_.push_back(make_linked_ptr(packet_info));
    264 
    265     if (packet_info->tag == kDataMessageStanzaTag) {
    266       mcs_proto::DataMessageStanza* data_message =
    267           reinterpret_cast<mcs_proto::DataMessageStanza*>(
    268               packet_info->protobuf.get());
    269       CollapseKey collapse_key(*data_message);
    270       if (collapse_key.IsValid())
    271         collapse_key_map_[collapse_key] = packet_info;
    272     }
    273   }
    274 }
    275 
    276 void MCSClient::Login(uint64 android_id, uint64 security_token) {
    277   DCHECK_EQ(state_, LOADED);
    278   DCHECK(android_id_ == 0 || android_id_ == android_id);
    279   DCHECK(security_token_ == 0 || security_token_ == security_token);
    280 
    281   if (android_id != android_id_ && security_token != security_token_) {
    282     DCHECK(android_id);
    283     DCHECK(security_token);
    284     android_id_ = android_id;
    285     security_token_ = security_token;
    286   }
    287 
    288   DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty());
    289 
    290   state_ = CONNECTING;
    291   connection_factory_->Connect();
    292 }
    293 
    294 void MCSClient::SendMessage(const MCSMessage& message) {
    295   int ttl = GetTTL(message.GetProtobuf());
    296   DCHECK_GE(ttl, 0);
    297   if (to_send_.size() > kMaxSendQueueSize) {
    298     NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED);
    299     return;
    300   }
    301   if (message.size() > kMaxMessageBytes) {
    302     NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
    303     return;
    304   }
    305 
    306   scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
    307   packet_info->tag = message.tag();
    308   packet_info->protobuf = message.CloneProtobuf();
    309 
    310   if (ttl > 0) {
    311     DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
    312 
    313     // First check if this message should replace a pending message with the
    314     // same collapse key.
    315     mcs_proto::DataMessageStanza* data_message =
    316         reinterpret_cast<mcs_proto::DataMessageStanza*>(
    317             packet_info->protobuf.get());
    318     CollapseKey collapse_key(*data_message);
    319     if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
    320       ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
    321       DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
    322                << original_packet->persistent_id;
    323       original_packet->protobuf = packet_info->protobuf.Pass();
    324       SetPersistentId(original_packet->persistent_id,
    325                       original_packet->protobuf.get());
    326       gcm_store_->OverwriteOutgoingMessage(
    327           original_packet->persistent_id,
    328           message,
    329           base::Bind(&MCSClient::OnGCMUpdateFinished,
    330                      weak_ptr_factory_.GetWeakPtr()));
    331 
    332       // The message is already queued, return.
    333       return;
    334     } else {
    335       PersistentId persistent_id = GetNextPersistentId();
    336       DVLOG(1) << "Setting persistent id to " << persistent_id;
    337       packet_info->persistent_id = persistent_id;
    338       SetPersistentId(persistent_id, packet_info->protobuf.get());
    339       if (!gcm_store_->AddOutgoingMessage(
    340                persistent_id,
    341                MCSMessage(message.tag(), *(packet_info->protobuf)),
    342                base::Bind(&MCSClient::OnGCMUpdateFinished,
    343                           weak_ptr_factory_.GetWeakPtr()))) {
    344         NotifyMessageSendStatus(message.GetProtobuf(),
    345                                 APP_QUEUE_SIZE_LIMIT_REACHED);
    346         return;
    347       }
    348     }
    349 
    350     if (collapse_key.IsValid())
    351       collapse_key_map_[collapse_key] = packet_info.get();
    352   } else if (!connection_factory_->IsEndpointReachable()) {
    353     DVLOG(1) << "No active connection, dropping message.";
    354     NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
    355     return;
    356   }
    357 
    358   to_send_.push_back(make_linked_ptr(packet_info.release()));
    359 
    360   // Notify that the messages has been succsfully queued for sending.
    361   // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
    362   NotifyMessageSendStatus(message.GetProtobuf(), QUEUED);
    363 
    364   MaybeSendMessage();
    365 }
    366 
    367 void MCSClient::ResetStateAndBuildLoginRequest(
    368     mcs_proto::LoginRequest* request) {
    369   DCHECK(android_id_);
    370   DCHECK(security_token_);
    371   stream_id_in_ = 0;
    372   stream_id_out_ = 1;
    373   last_device_to_server_stream_id_received_ = 0;
    374   last_server_to_device_stream_id_received_ = 0;
    375 
    376   heartbeat_manager_.Stop();
    377 
    378   // Add any pending acknowledgments to the list of ids.
    379   for (StreamIdToPersistentIdMap::const_iterator iter =
    380            unacked_server_ids_.begin();
    381        iter != unacked_server_ids_.end(); ++iter) {
    382     restored_unackeds_server_ids_.push_back(iter->second);
    383   }
    384   unacked_server_ids_.clear();
    385 
    386   // Any acknowledged server ids which have not been confirmed by the server
    387   // are treated like unacknowledged ids.
    388   for (std::map<StreamId, PersistentIdList>::const_iterator iter =
    389            acked_server_ids_.begin();
    390        iter != acked_server_ids_.end(); ++iter) {
    391     restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
    392                                          iter->second.begin(),
    393                                          iter->second.end());
    394   }
    395   acked_server_ids_.clear();
    396 
    397   // Then build the request, consuming all pending acknowledgments.
    398   request->Swap(BuildLoginRequest(android_id_,
    399                                   security_token_,
    400                                   version_string_).get());
    401   for (PersistentIdList::const_iterator iter =
    402            restored_unackeds_server_ids_.begin();
    403        iter != restored_unackeds_server_ids_.end(); ++iter) {
    404     request->add_received_persistent_id(*iter);
    405   }
    406   acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
    407   restored_unackeds_server_ids_.clear();
    408 
    409   // Push all unacknowledged messages to front of send queue. No need to save
    410   // to RMQ, as all messages that reach this point should already have been
    411   // saved as necessary.
    412   while (!to_resend_.empty()) {
    413     to_send_.push_front(to_resend_.back());
    414     to_resend_.pop_back();
    415   }
    416 
    417   // Drop all TTL == 0 or expired TTL messages from the queue.
    418   std::deque<MCSPacketInternal> new_to_send;
    419   std::vector<PersistentId> expired_ttl_ids;
    420   while (!to_send_.empty()) {
    421     MCSPacketInternal packet = PopMessageForSend();
    422     if (GetTTL(*packet->protobuf) > 0 &&
    423         !HasTTLExpired(*packet->protobuf, clock_)) {
    424       new_to_send.push_back(packet);
    425     } else {
    426       // If the TTL was 0 there is no persistent id, so no need to remove the
    427       // message from the persistent store.
    428       if (!packet->persistent_id.empty())
    429         expired_ttl_ids.push_back(packet->persistent_id);
    430       NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
    431     }
    432   }
    433 
    434   if (!expired_ttl_ids.empty()) {
    435     DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
    436              << " messages expired.";
    437     gcm_store_->RemoveOutgoingMessages(
    438         expired_ttl_ids,
    439         base::Bind(&MCSClient::OnGCMUpdateFinished,
    440                    weak_ptr_factory_.GetWeakPtr()));
    441   }
    442 
    443   to_send_.swap(new_to_send);
    444 
    445   DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
    446            << " incoming acks pending, and " << to_send_.size()
    447            << " pending outgoing messages.";
    448 
    449   state_ = CONNECTING;
    450 }
    451 
    452 void MCSClient::SendHeartbeat() {
    453   SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
    454 }
    455 
    456 void MCSClient::OnGCMUpdateFinished(bool success) {
    457   LOG_IF(ERROR, !success) << "GCM Update failed!";
    458   UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success);
    459   // TODO(zea): Rebuild the store from scratch in case of persistence failure?
    460 }
    461 
    462 void MCSClient::MaybeSendMessage() {
    463   if (to_send_.empty())
    464     return;
    465 
    466   // If the connection has been reset, do nothing. On reconnection
    467   // MaybeSendMessage will be automatically invoked again.
    468   // TODO(zea): consider doing TTL expiration at connection reset time, rather
    469   // than reconnect time.
    470   if (!connection_factory_->IsEndpointReachable())
    471     return;
    472 
    473   MCSPacketInternal packet = PopMessageForSend();
    474   if (HasTTLExpired(*packet->protobuf, clock_)) {
    475     DCHECK(!packet->persistent_id.empty());
    476     DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
    477     NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
    478     gcm_store_->RemoveOutgoingMessage(
    479         packet->persistent_id,
    480         base::Bind(&MCSClient::OnGCMUpdateFinished,
    481                    weak_ptr_factory_.GetWeakPtr()));
    482     base::MessageLoop::current()->PostTask(
    483             FROM_HERE,
    484             base::Bind(&MCSClient::MaybeSendMessage,
    485                        weak_ptr_factory_.GetWeakPtr()));
    486     return;
    487   }
    488   DVLOG(1) << "Pending output message found, sending.";
    489   if (!packet->persistent_id.empty())
    490     to_resend_.push_back(packet);
    491   SendPacketToWire(packet.get());
    492 }
    493 
    494 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
    495   packet_info->stream_id = ++stream_id_out_;
    496   DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
    497 
    498   // Set the queued time as necessary.
    499   if (packet_info->tag == kDataMessageStanzaTag) {
    500     mcs_proto::DataMessageStanza* data_message =
    501         reinterpret_cast<mcs_proto::DataMessageStanza*>(
    502             packet_info->protobuf.get());
    503     uint64 sent = data_message->sent();
    504     DCHECK_GT(sent, 0U);
    505     int queued = (clock_->Now().ToInternalValue() /
    506         base::Time::kMicrosecondsPerSecond) - sent;
    507     DVLOG(1) << "Message was queued for " << queued << " seconds.";
    508     data_message->set_queued(queued);
    509     recorder_->RecordDataSentToWire(
    510         data_message->category(),
    511         data_message->to(),
    512         data_message->id(),
    513         queued);
    514   }
    515 
    516   // Set the proper last received stream id to acknowledge received server
    517   // packets.
    518   DVLOG(1) << "Setting last stream id received to "
    519            << stream_id_in_;
    520   SetLastStreamIdReceived(stream_id_in_,
    521                           packet_info->protobuf.get());
    522   if (stream_id_in_ != last_server_to_device_stream_id_received_) {
    523     last_server_to_device_stream_id_received_ = stream_id_in_;
    524     // Mark all acknowledged server messages as such. Note: they're not dropped,
    525     // as it may be that they'll need to be re-acked if this message doesn't
    526     // make it.
    527     PersistentIdList persistent_id_list;
    528     for (StreamIdToPersistentIdMap::const_iterator iter =
    529              unacked_server_ids_.begin();
    530          iter != unacked_server_ids_.end(); ++iter) {
    531       DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
    532       persistent_id_list.push_back(iter->second);
    533     }
    534     unacked_server_ids_.clear();
    535     acked_server_ids_[stream_id_out_] = persistent_id_list;
    536   }
    537 
    538   connection_handler_->SendMessage(*packet_info->protobuf);
    539 }
    540 
    541 void MCSClient::HandleMCSDataMesssage(
    542     scoped_ptr<google::protobuf::MessageLite> protobuf) {
    543   mcs_proto::DataMessageStanza* data_message =
    544       reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
    545   // TODO(zea): implement a proper status manager rather than hardcoding these
    546   // values.
    547   scoped_ptr<mcs_proto::DataMessageStanza> response(
    548       new mcs_proto::DataMessageStanza());
    549   response->set_from(kGCMFromField);
    550   response->set_sent(clock_->Now().ToInternalValue() /
    551                          base::Time::kMicrosecondsPerSecond);
    552   response->set_ttl(0);
    553   bool send = false;
    554   for (int i = 0; i < data_message->app_data_size(); ++i) {
    555     const mcs_proto::AppData& app_data = data_message->app_data(i);
    556     if (app_data.key() == kIdleNotification) {
    557       // Tell the MCS server the client is not idle.
    558       send = true;
    559       mcs_proto::AppData data;
    560       data.set_key(kIdleNotification);
    561       data.set_value("false");
    562       response->add_app_data()->CopyFrom(data);
    563       response->set_category(kMCSCategory);
    564     }
    565   }
    566 
    567   if (send) {
    568     SendMessage(
    569         MCSMessage(kDataMessageStanzaTag,
    570                    response.PassAs<const google::protobuf::MessageLite>()));
    571   }
    572 }
    573 
    574 void MCSClient::HandlePacketFromWire(
    575     scoped_ptr<google::protobuf::MessageLite> protobuf) {
    576   if (!protobuf.get())
    577     return;
    578   uint8 tag = GetMCSProtoTag(*protobuf);
    579   PersistentId persistent_id = GetPersistentId(*protobuf);
    580   StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
    581 
    582   if (last_stream_id_received != 0) {
    583     last_device_to_server_stream_id_received_ = last_stream_id_received;
    584 
    585     // Process device to server messages that have now been acknowledged by the
    586     // server. Because messages are stored in order, just pop off all that have
    587     // a stream id lower than server's last received stream id.
    588     HandleStreamAck(last_stream_id_received);
    589 
    590     // Process server_to_device_messages that the server now knows were
    591     // acknowledged. Again, they're in order, so just keep going until the
    592     // stream id is reached.
    593     StreamIdList acked_stream_ids_to_remove;
    594     for (std::map<StreamId, PersistentIdList>::iterator iter =
    595              acked_server_ids_.begin();
    596          iter != acked_server_ids_.end() &&
    597              iter->first <= last_stream_id_received; ++iter) {
    598       acked_stream_ids_to_remove.push_back(iter->first);
    599     }
    600     for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
    601          iter != acked_stream_ids_to_remove.end(); ++iter) {
    602       acked_server_ids_.erase(*iter);
    603     }
    604   }
    605 
    606   ++stream_id_in_;
    607   if (!persistent_id.empty()) {
    608     unacked_server_ids_[stream_id_in_] = persistent_id;
    609     gcm_store_->AddIncomingMessage(persistent_id,
    610                                    base::Bind(&MCSClient::OnGCMUpdateFinished,
    611                                               weak_ptr_factory_.GetWeakPtr()));
    612   }
    613 
    614   DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
    615            << " with persistent id "
    616            << (persistent_id.empty() ? "NULL" : persistent_id)
    617            << ", stream id " << stream_id_in_ << " and last stream id received "
    618            << last_stream_id_received;
    619 
    620   if (unacked_server_ids_.size() > 0 &&
    621       unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
    622     SendMessage(MCSMessage(kIqStanzaTag,
    623                            BuildStreamAck().
    624                                PassAs<const google::protobuf::MessageLite>()));
    625   }
    626 
    627   // The connection is alive, treat this message as a heartbeat ack.
    628   heartbeat_manager_.OnHeartbeatAcked();
    629 
    630   switch (tag) {
    631     case kLoginResponseTag: {
    632       DCHECK_EQ(CONNECTING, state_);
    633       mcs_proto::LoginResponse* login_response =
    634           reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
    635       DVLOG(1) << "Received login response:";
    636       DVLOG(1) << "  Id: " << login_response->id();
    637       DVLOG(1) << "  Timestamp: " << login_response->server_timestamp();
    638       if (login_response->has_error() && login_response->error().code() != 0) {
    639         state_ = UNINITIALIZED;
    640         DVLOG(1) << "  Error code: " << login_response->error().code();
    641         DVLOG(1) << "  Error message: " << login_response->error().message();
    642         LOG(ERROR) << "Failed to log in to GCM, resetting connection.";
    643         connection_factory_->SignalConnectionReset(
    644             ConnectionFactory::LOGIN_FAILURE);
    645         mcs_error_callback_.Run();
    646         return;
    647       }
    648 
    649       if (login_response->has_heartbeat_config()) {
    650         heartbeat_manager_.UpdateHeartbeatConfig(
    651             login_response->heartbeat_config());
    652       }
    653 
    654       state_ = CONNECTED;
    655       stream_id_in_ = 1;  // To account for the login response.
    656       DCHECK_EQ(1U, stream_id_out_);
    657 
    658       // Pass the login response on up.
    659       base::MessageLoop::current()->PostTask(
    660           FROM_HERE,
    661           base::Bind(message_received_callback_,
    662                      MCSMessage(tag,
    663                                 protobuf.PassAs<
    664                                     const google::protobuf::MessageLite>())));
    665 
    666       // If there are pending messages, attempt to send one.
    667       if (!to_send_.empty()) {
    668         base::MessageLoop::current()->PostTask(
    669             FROM_HERE,
    670             base::Bind(&MCSClient::MaybeSendMessage,
    671                        weak_ptr_factory_.GetWeakPtr()));
    672       }
    673 
    674       heartbeat_manager_.Start(
    675           base::Bind(&MCSClient::SendHeartbeat,
    676                      weak_ptr_factory_.GetWeakPtr()),
    677           base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
    678                      weak_ptr_factory_.GetWeakPtr()));
    679       return;
    680     }
    681     case kHeartbeatPingTag:
    682       DCHECK_GE(stream_id_in_, 1U);
    683       DVLOG(1) << "Received heartbeat ping, sending ack.";
    684       SendMessage(
    685           MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
    686       return;
    687     case kHeartbeatAckTag:
    688       DCHECK_GE(stream_id_in_, 1U);
    689       DVLOG(1) << "Received heartbeat ack.";
    690       // Do nothing else, all messages act as heartbeat acks.
    691       return;
    692     case kCloseTag:
    693       LOG(ERROR) << "Received close command, resetting connection.";
    694       state_ = LOADED;
    695       connection_factory_->SignalConnectionReset(
    696           ConnectionFactory::CLOSE_COMMAND);
    697       return;
    698     case kIqStanzaTag: {
    699       DCHECK_GE(stream_id_in_, 1U);
    700       mcs_proto::IqStanza* iq_stanza =
    701           reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
    702       const mcs_proto::Extension& iq_extension = iq_stanza->extension();
    703       switch (iq_extension.id()) {
    704         case kSelectiveAck: {
    705           PersistentIdList acked_ids;
    706           if (BuildPersistentIdListFromProto(iq_extension.data(),
    707                                              &acked_ids)) {
    708             HandleSelectiveAck(acked_ids);
    709           }
    710           return;
    711         }
    712         case kStreamAck:
    713           // Do nothing. The last received stream id is always processed if it's
    714           // present.
    715           return;
    716         default:
    717           LOG(WARNING) << "Received invalid iq stanza extension "
    718                        << iq_extension.id();
    719           return;
    720       }
    721     }
    722     case kDataMessageStanzaTag: {
    723       DCHECK_GE(stream_id_in_, 1U);
    724       mcs_proto::DataMessageStanza* data_message =
    725           reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
    726       if (data_message->category() == kMCSCategory) {
    727         HandleMCSDataMesssage(protobuf.Pass());
    728         return;
    729       }
    730 
    731       DCHECK(protobuf.get());
    732       base::MessageLoop::current()->PostTask(
    733           FROM_HERE,
    734           base::Bind(message_received_callback_,
    735                      MCSMessage(tag,
    736                                 protobuf.PassAs<
    737                                     const google::protobuf::MessageLite>())));
    738       return;
    739     }
    740     default:
    741       LOG(ERROR) << "Received unexpected message of type "
    742                  << static_cast<int>(tag);
    743       return;
    744   }
    745 }
    746 
    747 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
    748   PersistentIdList acked_outgoing_persistent_ids;
    749   StreamIdList acked_outgoing_stream_ids;
    750   while (!to_resend_.empty() &&
    751          to_resend_.front()->stream_id <= last_stream_id_received) {
    752     const MCSPacketInternal& outgoing_packet = to_resend_.front();
    753     acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
    754     acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
    755     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
    756     to_resend_.pop_front();
    757   }
    758 
    759   DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
    760            << " outgoing messages, " << to_resend_.size()
    761            << " remaining unacked";
    762   gcm_store_->RemoveOutgoingMessages(
    763       acked_outgoing_persistent_ids,
    764       base::Bind(&MCSClient::OnGCMUpdateFinished,
    765                  weak_ptr_factory_.GetWeakPtr()));
    766 
    767   HandleServerConfirmedReceipt(last_stream_id_received);
    768 }
    769 
    770 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
    771   std::set<PersistentId> remaining_ids(id_list.begin(), id_list.end());
    772 
    773   StreamId last_stream_id_received = 0;
    774 
    775   // First check the to_resend_ queue. Acknowledgments are always contiguous,
    776   // so if there's a pending message that hasn't been acked, all newer messages
    777   // must also be unacked.
    778   while(!to_resend_.empty() && !remaining_ids.empty()) {
    779     const MCSPacketInternal& outgoing_packet = to_resend_.front();
    780     if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
    781       break;  // Newer message must be unacked too.
    782     remaining_ids.erase(outgoing_packet->persistent_id);
    783     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
    784 
    785     // No need to re-acknowledge any server messages this message already
    786     // acknowledged.
    787     StreamId device_stream_id = outgoing_packet->stream_id;
    788     if (device_stream_id > last_stream_id_received)
    789       last_stream_id_received = device_stream_id;
    790     to_resend_.pop_front();
    791   }
    792 
    793   // If the acknowledged ids aren't all there, they might be in the to_send_
    794   // queue (typically when a SelectiveAck confirms messages as part of a login
    795   // response).
    796   while (!to_send_.empty() && !remaining_ids.empty()) {
    797     const MCSPacketInternal& outgoing_packet = to_send_.front();
    798     if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
    799       break;  // Newer messages must be unacked too.
    800     remaining_ids.erase(outgoing_packet->persistent_id);
    801     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
    802 
    803     // No need to re-acknowledge any server messages this message already
    804     // acknowledged.
    805     StreamId device_stream_id = outgoing_packet->stream_id;
    806     if (device_stream_id > last_stream_id_received)
    807       last_stream_id_received = device_stream_id;
    808     PopMessageForSend();
    809   }
    810 
    811   // Only handle the largest stream id value. All other stream ids are
    812   // implicitly handled.
    813   if (last_stream_id_received > 0)
    814     HandleServerConfirmedReceipt(last_stream_id_received);
    815 
    816   // At this point, all remaining acked ids are redundant.
    817   PersistentIdList acked_ids;
    818   if (remaining_ids.size() > 0) {
    819     for (size_t i = 0; i < id_list.size(); ++i) {
    820       if (remaining_ids.count(id_list[i]) > 0)
    821         continue;
    822       acked_ids.push_back(id_list[i]);
    823     }
    824   } else {
    825     acked_ids = id_list;
    826   }
    827 
    828   DVLOG(1) << "Server acked " << acked_ids.size()
    829            << " messages, " << to_resend_.size() << " remaining unacked.";
    830   gcm_store_->RemoveOutgoingMessages(
    831       acked_ids,
    832       base::Bind(&MCSClient::OnGCMUpdateFinished,
    833                  weak_ptr_factory_.GetWeakPtr()));
    834 
    835   // Resend any remaining outgoing messages, as they were not received by the
    836   // server.
    837   DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
    838   while (!to_resend_.empty()) {
    839     to_send_.push_front(to_resend_.back());
    840     to_resend_.pop_back();
    841   }
    842   base::MessageLoop::current()->PostTask(
    843       FROM_HERE,
    844       base::Bind(&MCSClient::MaybeSendMessage,
    845                  weak_ptr_factory_.GetWeakPtr()));
    846 }
    847 
    848 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
    849   PersistentIdList acked_incoming_ids;
    850   for (std::map<StreamId, PersistentIdList>::iterator iter =
    851            acked_server_ids_.begin();
    852        iter != acked_server_ids_.end() &&
    853            iter->first <= device_stream_id;) {
    854     acked_incoming_ids.insert(acked_incoming_ids.end(),
    855                               iter->second.begin(),
    856                               iter->second.end());
    857     acked_server_ids_.erase(iter++);
    858   }
    859 
    860   DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
    861            << " acknowledged server messages.";
    862   gcm_store_->RemoveIncomingMessages(
    863       acked_incoming_ids,
    864       base::Bind(&MCSClient::OnGCMUpdateFinished,
    865                  weak_ptr_factory_.GetWeakPtr()));
    866 }
    867 
    868 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
    869   return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
    870 }
    871 
    872 void MCSClient::OnConnectionResetByHeartbeat() {
    873   connection_factory_->SignalConnectionReset(
    874       ConnectionFactory::HEARTBEAT_FAILURE);
    875 }
    876 
    877 void MCSClient::NotifyMessageSendStatus(
    878     const google::protobuf::MessageLite& protobuf,
    879     MessageSendStatus status) {
    880   if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag)
    881     return;
    882 
    883   const mcs_proto::DataMessageStanza* data_message_stanza =
    884       reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
    885   recorder_->RecordNotifySendStatus(
    886       data_message_stanza->category(),
    887       data_message_stanza->to(),
    888       data_message_stanza->id(),
    889       status,
    890       protobuf.ByteSize(),
    891       data_message_stanza->ttl());
    892   message_sent_callback_.Run(
    893       data_message_stanza->device_user_id(),
    894       data_message_stanza->category(),
    895       data_message_stanza->id(),
    896       status);
    897 }
    898 
    899 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
    900   MCSPacketInternal packet = to_send_.front();
    901   to_send_.pop_front();
    902 
    903   if (packet->tag == kDataMessageStanzaTag) {
    904     mcs_proto::DataMessageStanza* data_message =
    905         reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
    906     CollapseKey collapse_key(*data_message);
    907     if (collapse_key.IsValid())
    908       collapse_key_map_.erase(collapse_key);
    909   }
    910 
    911   return packet;
    912 }
    913 
    914 } // namespace gcm
    915