Home | History | Annotate | Download | only in engine
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "google_apis/gcm/engine/mcs_client.h"
      6 
      7 #include "base/basictypes.h"
      8 #include "base/message_loop/message_loop.h"
      9 #include "base/strings/string_number_conversions.h"
     10 #include "google_apis/gcm/base/mcs_util.h"
     11 #include "google_apis/gcm/base/socket_stream.h"
     12 #include "google_apis/gcm/engine/connection_factory.h"
     13 #include "google_apis/gcm/engine/rmq_store.h"
     14 
     15 using namespace google::protobuf::io;
     16 
     17 namespace gcm {
     18 
     19 namespace {
     20 
     21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
     22 
     23 // TODO(zea): get these values from MCS settings.
     24 const int64 kHeartbeatDefaultSeconds = 60 * 15;  // 15 minutes.
     25 
     26 // The category of messages intended for the GCM client itself from MCS.
     27 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
     28 
     29 // The from field for messages originating in the GCM client.
     30 const char kGCMFromField[] = "gcm (at) android.com";
     31 
     32 // MCS status message types.
     33 const char kIdleNotification[] = "IdleNotification";
     34 // TODO(zea): consume the following message types:
     35 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
     36 // const char kPowerNotification[] = "PowerNotification";
     37 // const char kDataActiveNotification[] = "DataActiveNotification";
     38 
     39 // The number of unacked messages to allow before sending a stream ack.
     40 // Applies to both incoming and outgoing messages.
     41 // TODO(zea): make this server configurable.
     42 const int kUnackedMessageBeforeStreamAck = 10;
     43 
     44 // The global maximum number of pending messages to have in the send queue.
     45 const size_t kMaxSendQueueSize = 10 * 1024;
     46 
     47 // The maximum message size that can be sent to the server.
     48 const int kMaxMessageBytes = 4 * 1024;  // 4KB, like the server.
     49 
     50 // Helper for converting a proto persistent id list to a vector of strings.
     51 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
     52                                     std::vector<std::string>* id_list) {
     53   mcs_proto::SelectiveAck selective_ack;
     54   if (!selective_ack.ParseFromString(bytes))
     55     return false;
     56   std::vector<std::string> new_list;
     57   for (int i = 0; i < selective_ack.id_size(); ++i) {
     58     DCHECK(!selective_ack.id(i).empty());
     59     new_list.push_back(selective_ack.id(i));
     60   }
     61   id_list->swap(new_list);
     62   return true;
     63 }
     64 
     65 }  // namespace
     66 
     67 struct ReliablePacketInfo {
     68   ReliablePacketInfo();
     69   ~ReliablePacketInfo();
     70 
     71   // The stream id with which the message was sent.
     72   uint32 stream_id;
     73 
     74   // If reliable delivery was requested, the persistent id of the message.
     75   std::string persistent_id;
     76 
     77   // The type of message itself (for easier lookup).
     78   uint8 tag;
     79 
     80   // The protobuf of the message itself.
     81   MCSProto protobuf;
     82 };
     83 
     84 ReliablePacketInfo::ReliablePacketInfo()
     85   : stream_id(0), tag(0) {
     86 }
     87 ReliablePacketInfo::~ReliablePacketInfo() {}
     88 
     89 MCSClient::MCSClient(
     90     const base::FilePath& rmq_path,
     91     ConnectionFactory* connection_factory,
     92     scoped_refptr<base::SequencedTaskRunner> blocking_task_runner)
     93     : state_(UNINITIALIZED),
     94       android_id_(0),
     95       security_token_(0),
     96       connection_factory_(connection_factory),
     97       connection_handler_(NULL),
     98       last_device_to_server_stream_id_received_(0),
     99       last_server_to_device_stream_id_received_(0),
    100       stream_id_out_(0),
    101       stream_id_in_(0),
    102       rmq_store_(rmq_path, blocking_task_runner),
    103       heartbeat_interval_(
    104           base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)),
    105       heartbeat_timer_(true, true),
    106       blocking_task_runner_(blocking_task_runner),
    107       weak_ptr_factory_(this) {
    108 }
    109 
    110 MCSClient::~MCSClient() {
    111 }
    112 
    113 void MCSClient::Initialize(
    114     const InitializationCompleteCallback& initialization_callback,
    115     const OnMessageReceivedCallback& message_received_callback,
    116     const OnMessageSentCallback& message_sent_callback) {
    117   DCHECK_EQ(state_, UNINITIALIZED);
    118   initialization_callback_ = initialization_callback;
    119   message_received_callback_ = message_received_callback;
    120   message_sent_callback_ = message_sent_callback;
    121 
    122   state_ = LOADING;
    123   rmq_store_.Load(base::Bind(&MCSClient::OnRMQLoadFinished,
    124                              weak_ptr_factory_.GetWeakPtr()));
    125 
    126   connection_factory_->Initialize(
    127       base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
    128                  weak_ptr_factory_.GetWeakPtr()),
    129       base::Bind(&MCSClient::HandlePacketFromWire,
    130                  weak_ptr_factory_.GetWeakPtr()),
    131       base::Bind(&MCSClient::MaybeSendMessage,
    132                  weak_ptr_factory_.GetWeakPtr()));
    133   connection_handler_ = connection_factory_->GetConnectionHandler();
    134 }
    135 
    136 void MCSClient::Login(uint64 android_id, uint64 security_token) {
    137   DCHECK_EQ(state_, LOADED);
    138   if (android_id != android_id_ && security_token != security_token_) {
    139     DCHECK(android_id);
    140     DCHECK(security_token);
    141     DCHECK(restored_unackeds_server_ids_.empty());
    142     android_id_ = android_id;
    143     security_token_ = security_token;
    144     rmq_store_.SetDeviceCredentials(android_id_,
    145                                     security_token_,
    146                                     base::Bind(&MCSClient::OnRMQUpdateFinished,
    147                                                weak_ptr_factory_.GetWeakPtr()));
    148   }
    149 
    150   state_ = CONNECTING;
    151   connection_factory_->Connect();
    152 }
    153 
    154 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) {
    155   DCHECK_EQ(state_, CONNECTED);
    156   if (to_send_.size() > kMaxSendQueueSize) {
    157     base::MessageLoop::current()->PostTask(
    158         FROM_HERE,
    159         base::Bind(message_sent_callback_, "Message queue full."));
    160     return;
    161   }
    162   if (message.size() > kMaxMessageBytes) {
    163     base::MessageLoop::current()->PostTask(
    164         FROM_HERE,
    165         base::Bind(message_sent_callback_, "Message too large."));
    166     return;
    167   }
    168 
    169   ReliablePacketInfo* packet_info = new ReliablePacketInfo();
    170   packet_info->protobuf = message.CloneProtobuf();
    171 
    172   if (use_rmq) {
    173     PersistentId persistent_id = GetNextPersistentId();
    174     DVLOG(1) << "Setting persistent id to " << persistent_id;
    175     packet_info->persistent_id = persistent_id;
    176     SetPersistentId(persistent_id,
    177                     packet_info->protobuf.get());
    178     rmq_store_.AddOutgoingMessage(persistent_id,
    179                                   MCSMessage(message.tag(),
    180                                              *(packet_info->protobuf)),
    181                                   base::Bind(&MCSClient::OnRMQUpdateFinished,
    182                                              weak_ptr_factory_.GetWeakPtr()));
    183   } else {
    184     // Check that there is an active connection to the endpoint.
    185     if (!connection_handler_->CanSendMessage()) {
    186       base::MessageLoop::current()->PostTask(
    187           FROM_HERE,
    188           base::Bind(message_sent_callback_, "Unable to reach endpoint"));
    189       return;
    190     }
    191   }
    192   to_send_.push_back(make_linked_ptr(packet_info));
    193   MaybeSendMessage();
    194 }
    195 
    196 void MCSClient::Destroy() {
    197   rmq_store_.Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished,
    198                                 weak_ptr_factory_.GetWeakPtr()));
    199 }
    200 
    201 void MCSClient::ResetStateAndBuildLoginRequest(
    202     mcs_proto::LoginRequest* request) {
    203   DCHECK(android_id_);
    204   DCHECK(security_token_);
    205   stream_id_in_ = 0;
    206   stream_id_out_ = 1;
    207   last_device_to_server_stream_id_received_ = 0;
    208   last_server_to_device_stream_id_received_ = 0;
    209 
    210   // TODO(zea): expire all messages older than their TTL.
    211 
    212   // Add any pending acknowledgments to the list of ids.
    213   for (StreamIdToPersistentIdMap::const_iterator iter =
    214            unacked_server_ids_.begin();
    215        iter != unacked_server_ids_.end(); ++iter) {
    216     restored_unackeds_server_ids_.push_back(iter->second);
    217   }
    218   unacked_server_ids_.clear();
    219 
    220   // Any acknowledged server ids which have not been confirmed by the server
    221   // are treated like unacknowledged ids.
    222   for (std::map<StreamId, PersistentIdList>::const_iterator iter =
    223            acked_server_ids_.begin();
    224        iter != acked_server_ids_.end(); ++iter) {
    225     restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
    226                                          iter->second.begin(),
    227                                          iter->second.end());
    228   }
    229   acked_server_ids_.clear();
    230 
    231   // Then build the request, consuming all pending acknowledgments.
    232   request->Swap(BuildLoginRequest(android_id_, security_token_).get());
    233   for (PersistentIdList::const_iterator iter =
    234            restored_unackeds_server_ids_.begin();
    235        iter != restored_unackeds_server_ids_.end(); ++iter) {
    236     request->add_received_persistent_id(*iter);
    237   }
    238   acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
    239   restored_unackeds_server_ids_.clear();
    240 
    241   // Push all unacknowledged messages to front of send queue. No need to save
    242   // to RMQ, as all messages that reach this point should already have been
    243   // saved as necessary.
    244   while (!to_resend_.empty()) {
    245     to_send_.push_front(to_resend_.back());
    246     to_resend_.pop_back();
    247   }
    248   DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
    249            << " incoming acks pending, and " << to_send_.size()
    250            << " pending outgoing messages.";
    251 
    252   heartbeat_timer_.Stop();
    253 
    254   state_ = CONNECTING;
    255 }
    256 
    257 void MCSClient::SendHeartbeat() {
    258   SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()),
    259               false);
    260 }
    261 
    262 void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) {
    263   if (!result.success) {
    264     state_ = UNINITIALIZED;
    265     LOG(ERROR) << "Failed to load/create RMQ state. Not connecting.";
    266     initialization_callback_.Run(false, 0, 0);
    267     return;
    268   }
    269   state_ = LOADED;
    270   stream_id_out_ = 1;  // Login request is hardcoded to id 1.
    271 
    272   if (result.device_android_id == 0 || result.device_security_token == 0) {
    273     DVLOG(1) << "No device credentials found, assuming new client.";
    274     initialization_callback_.Run(true, 0, 0);
    275     return;
    276   }
    277 
    278   android_id_ = result.device_android_id;
    279   security_token_ = result.device_security_token;
    280 
    281   DVLOG(1) << "RMQ Load finished with " << result.incoming_messages.size()
    282            << " incoming acks pending and " << result.outgoing_messages.size()
    283            << " outgoing messages pending.";
    284 
    285   restored_unackeds_server_ids_ = result.incoming_messages;
    286 
    287   // First go through and order the outgoing messages by recency.
    288   std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
    289   for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator
    290            iter = result.outgoing_messages.begin();
    291        iter != result.outgoing_messages.end(); ++iter) {
    292     uint64 timestamp = 0;
    293     if (!base::StringToUint64(iter->first, &timestamp)) {
    294       LOG(ERROR) << "Invalid restored message.";
    295       return;
    296     }
    297     ordered_messages[timestamp] = iter->second;
    298   }
    299 
    300   // Now go through and add the outgoing messages to the send queue in their
    301   // appropriate order (oldest at front, most recent at back).
    302   for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator
    303            iter = ordered_messages.begin();
    304        iter != ordered_messages.end(); ++iter) {
    305     ReliablePacketInfo* packet_info = new ReliablePacketInfo();
    306     packet_info->protobuf.reset(iter->second);
    307     packet_info->persistent_id = base::Uint64ToString(iter->first);
    308     to_send_.push_back(make_linked_ptr(packet_info));
    309   }
    310 
    311   initialization_callback_.Run(true, android_id_, security_token_);
    312 }
    313 
    314 void MCSClient::OnRMQUpdateFinished(bool success) {
    315   LOG_IF(ERROR, !success) << "RMQ Update failed!";
    316   // TODO(zea): Rebuild the store from scratch in case of persistence failure?
    317 }
    318 
    319 void MCSClient::MaybeSendMessage() {
    320   if (to_send_.empty())
    321     return;
    322 
    323   if (!connection_handler_->CanSendMessage())
    324     return;
    325 
    326   // TODO(zea): drop messages older than their TTL.
    327 
    328   DVLOG(1) << "Pending output message found, sending.";
    329   MCSPacketInternal packet = to_send_.front();
    330   to_send_.pop_front();
    331   if (!packet->persistent_id.empty())
    332     to_resend_.push_back(packet);
    333   SendPacketToWire(packet.get());
    334 }
    335 
    336 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
    337   // Reset the heartbeat interval.
    338   heartbeat_timer_.Reset();
    339   packet_info->stream_id = ++stream_id_out_;
    340   DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
    341 
    342   // Set the proper last received stream id to acknowledge received server
    343   // packets.
    344   DVLOG(1) << "Setting last stream id received to "
    345            << stream_id_in_;
    346   SetLastStreamIdReceived(stream_id_in_,
    347                           packet_info->protobuf.get());
    348   if (stream_id_in_ != last_server_to_device_stream_id_received_) {
    349     last_server_to_device_stream_id_received_ = stream_id_in_;
    350     // Mark all acknowledged server messages as such. Note: they're not dropped,
    351     // as it may be that they'll need to be re-acked if this message doesn't
    352     // make it.
    353     PersistentIdList persistent_id_list;
    354     for (StreamIdToPersistentIdMap::const_iterator iter =
    355              unacked_server_ids_.begin();
    356          iter != unacked_server_ids_.end(); ++iter) {
    357       DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
    358       persistent_id_list.push_back(iter->second);
    359     }
    360     unacked_server_ids_.clear();
    361     acked_server_ids_[stream_id_out_] = persistent_id_list;
    362   }
    363 
    364   connection_handler_->SendMessage(*packet_info->protobuf);
    365 }
    366 
    367 void MCSClient::HandleMCSDataMesssage(
    368     scoped_ptr<google::protobuf::MessageLite> protobuf) {
    369   mcs_proto::DataMessageStanza* data_message =
    370       reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
    371   // TODO(zea): implement a proper status manager rather than hardcoding these
    372   // values.
    373   scoped_ptr<mcs_proto::DataMessageStanza> response(
    374       new mcs_proto::DataMessageStanza());
    375   response->set_from(kGCMFromField);
    376   bool send = false;
    377   for (int i = 0; i < data_message->app_data_size(); ++i) {
    378     const mcs_proto::AppData& app_data = data_message->app_data(i);
    379     if (app_data.key() == kIdleNotification) {
    380       // Tell the MCS server the client is not idle.
    381       send = true;
    382       mcs_proto::AppData data;
    383       data.set_key(kIdleNotification);
    384       data.set_value("false");
    385       response->add_app_data()->CopyFrom(data);
    386       response->set_category(kMCSCategory);
    387     }
    388   }
    389 
    390   if (send) {
    391     SendMessage(
    392         MCSMessage(kDataMessageStanzaTag,
    393                    response.PassAs<const google::protobuf::MessageLite>()),
    394         false);
    395   }
    396 }
    397 
    398 void MCSClient::HandlePacketFromWire(
    399     scoped_ptr<google::protobuf::MessageLite> protobuf) {
    400   if (!protobuf.get())
    401     return;
    402   uint8 tag = GetMCSProtoTag(*protobuf);
    403   PersistentId persistent_id = GetPersistentId(*protobuf);
    404   StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
    405 
    406   if (last_stream_id_received != 0) {
    407     last_device_to_server_stream_id_received_ = last_stream_id_received;
    408 
    409     // Process device to server messages that have now been acknowledged by the
    410     // server. Because messages are stored in order, just pop off all that have
    411     // a stream id lower than server's last received stream id.
    412     HandleStreamAck(last_stream_id_received);
    413 
    414     // Process server_to_device_messages that the server now knows were
    415     // acknowledged. Again, they're in order, so just keep going until the
    416     // stream id is reached.
    417     StreamIdList acked_stream_ids_to_remove;
    418     for (std::map<StreamId, PersistentIdList>::iterator iter =
    419              acked_server_ids_.begin();
    420          iter != acked_server_ids_.end() &&
    421              iter->first <= last_stream_id_received; ++iter) {
    422       acked_stream_ids_to_remove.push_back(iter->first);
    423     }
    424     for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
    425          iter != acked_stream_ids_to_remove.end(); ++iter) {
    426       acked_server_ids_.erase(*iter);
    427     }
    428   }
    429 
    430   ++stream_id_in_;
    431   if (!persistent_id.empty()) {
    432     unacked_server_ids_[stream_id_in_] = persistent_id;
    433     rmq_store_.AddIncomingMessage(persistent_id,
    434                                   base::Bind(&MCSClient::OnRMQUpdateFinished,
    435                                              weak_ptr_factory_.GetWeakPtr()));
    436   }
    437 
    438   DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
    439            << " with persistent id "
    440            << (persistent_id.empty() ? "NULL" : persistent_id)
    441            << ", stream id " << stream_id_in_ << " and last stream id received "
    442            << last_stream_id_received;
    443 
    444   if (unacked_server_ids_.size() > 0 &&
    445       unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
    446     SendMessage(MCSMessage(kIqStanzaTag,
    447                            BuildStreamAck().
    448                                PassAs<const google::protobuf::MessageLite>()),
    449                 false);
    450   }
    451 
    452   switch (tag) {
    453     case kLoginResponseTag: {
    454       mcs_proto::LoginResponse* login_response =
    455           reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
    456       DVLOG(1) << "Received login response:";
    457       DVLOG(1) << "  Id: " << login_response->id();
    458       DVLOG(1) << "  Timestamp: " << login_response->server_timestamp();
    459       if (login_response->has_error()) {
    460         state_ = UNINITIALIZED;
    461         DVLOG(1) << "  Error code: " << login_response->error().code();
    462         DVLOG(1) << "  Error message: " << login_response->error().message();
    463         initialization_callback_.Run(false, 0, 0);
    464         return;
    465       }
    466 
    467       state_ = CONNECTED;
    468       stream_id_in_ = 1;  // To account for the login response.
    469       DCHECK_EQ(1U, stream_id_out_);
    470 
    471       // Pass the login response on up.
    472       base::MessageLoop::current()->PostTask(
    473           FROM_HERE,
    474           base::Bind(message_received_callback_,
    475                      MCSMessage(tag,
    476                                 protobuf.PassAs<
    477                                     const google::protobuf::MessageLite>())));
    478 
    479       // If there are pending messages, attempt to send one.
    480       if (!to_send_.empty()) {
    481         base::MessageLoop::current()->PostTask(
    482             FROM_HERE,
    483             base::Bind(&MCSClient::MaybeSendMessage,
    484                        weak_ptr_factory_.GetWeakPtr()));
    485       }
    486 
    487       heartbeat_timer_.Start(FROM_HERE,
    488                              heartbeat_interval_,
    489                              base::Bind(&MCSClient::SendHeartbeat,
    490                                         weak_ptr_factory_.GetWeakPtr()));
    491       return;
    492     }
    493     case kHeartbeatPingTag:
    494       DCHECK_GE(stream_id_in_, 1U);
    495       DVLOG(1) << "Received heartbeat ping, sending ack.";
    496       SendMessage(
    497           MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false);
    498       return;
    499     case kHeartbeatAckTag:
    500       DCHECK_GE(stream_id_in_, 1U);
    501       DVLOG(1) << "Received heartbeat ack.";
    502       // TODO(zea): add logic to reconnect if no ack received within a certain
    503       // timeout (with backoff).
    504       return;
    505     case kCloseTag:
    506       LOG(ERROR) << "Received close command, closing connection.";
    507       state_ = UNINITIALIZED;
    508       initialization_callback_.Run(false, 0, 0);
    509       // TODO(zea): should this happen in non-error cases? Reconnect?
    510       return;
    511     case kIqStanzaTag: {
    512       DCHECK_GE(stream_id_in_, 1U);
    513       mcs_proto::IqStanza* iq_stanza =
    514           reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
    515       const mcs_proto::Extension& iq_extension = iq_stanza->extension();
    516       switch (iq_extension.id()) {
    517         case kSelectiveAck: {
    518           PersistentIdList acked_ids;
    519           if (BuildPersistentIdListFromProto(iq_extension.data(),
    520                                              &acked_ids)) {
    521             HandleSelectiveAck(acked_ids);
    522           }
    523           return;
    524         }
    525         case kStreamAck:
    526           // Do nothing. The last received stream id is always processed if it's
    527           // present.
    528           return;
    529         default:
    530           LOG(WARNING) << "Received invalid iq stanza extension "
    531                        << iq_extension.id();
    532           return;
    533       }
    534     }
    535     case kDataMessageStanzaTag: {
    536       DCHECK_GE(stream_id_in_, 1U);
    537       mcs_proto::DataMessageStanza* data_message =
    538           reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
    539       if (data_message->category() == kMCSCategory) {
    540         HandleMCSDataMesssage(protobuf.Pass());
    541         return;
    542       }
    543 
    544       DCHECK(protobuf.get());
    545       base::MessageLoop::current()->PostTask(
    546           FROM_HERE,
    547           base::Bind(message_received_callback_,
    548                      MCSMessage(tag,
    549                                 protobuf.PassAs<
    550                                     const google::protobuf::MessageLite>())));
    551       return;
    552     }
    553     default:
    554       LOG(ERROR) << "Received unexpected message of type "
    555                  << static_cast<int>(tag);
    556       return;
    557   }
    558 }
    559 
    560 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
    561   PersistentIdList acked_outgoing_persistent_ids;
    562   StreamIdList acked_outgoing_stream_ids;
    563   while (!to_resend_.empty() &&
    564          to_resend_.front()->stream_id <= last_stream_id_received) {
    565     const MCSPacketInternal& outgoing_packet = to_resend_.front();
    566     acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
    567     acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
    568     to_resend_.pop_front();
    569   }
    570 
    571   DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
    572            << " outgoing messages, " << to_resend_.size()
    573            << " remaining unacked";
    574   rmq_store_.RemoveOutgoingMessages(acked_outgoing_persistent_ids,
    575                                     base::Bind(&MCSClient::OnRMQUpdateFinished,
    576                                                weak_ptr_factory_.GetWeakPtr()));
    577 
    578   HandleServerConfirmedReceipt(last_stream_id_received);
    579 }
    580 
    581 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
    582   // First check the to_resend_ queue. Acknowledgments should always happen
    583   // in the order they were sent, so if messages are present they should match
    584   // the acknowledge list.
    585   PersistentIdList::const_iterator iter = id_list.begin();
    586   for (; iter != id_list.end() && !to_resend_.empty(); ++iter) {
    587     const MCSPacketInternal& outgoing_packet = to_resend_.front();
    588     DCHECK_EQ(outgoing_packet->persistent_id, *iter);
    589 
    590     // No need to re-acknowledge any server messages this message already
    591     // acknowledged.
    592     StreamId device_stream_id = outgoing_packet->stream_id;
    593     HandleServerConfirmedReceipt(device_stream_id);
    594 
    595     to_resend_.pop_front();
    596   }
    597 
    598   // If the acknowledged ids aren't all there, they might be in the to_send_
    599   // queue (typically when a StreamAck confirms messages as part of a login
    600   // response).
    601   for (; iter != id_list.end() && !to_send_.empty(); ++iter) {
    602     const MCSPacketInternal& outgoing_packet = to_send_.front();
    603     DCHECK_EQ(outgoing_packet->persistent_id, *iter);
    604 
    605     // No need to re-acknowledge any server messages this message already
    606     // acknowledged.
    607     StreamId device_stream_id = outgoing_packet->stream_id;
    608     HandleServerConfirmedReceipt(device_stream_id);
    609 
    610     to_send_.pop_front();
    611   }
    612 
    613   DCHECK(iter == id_list.end());
    614 
    615   DVLOG(1) << "Server acked " << id_list.size()
    616            << " messages, " << to_resend_.size() << " remaining unacked.";
    617   rmq_store_.RemoveOutgoingMessages(id_list,
    618                                     base::Bind(&MCSClient::OnRMQUpdateFinished,
    619                                                weak_ptr_factory_.GetWeakPtr()));
    620 
    621   // Resend any remaining outgoing messages, as they were not received by the
    622   // server.
    623   DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
    624   while (!to_resend_.empty()) {
    625     to_send_.push_front(to_resend_.back());
    626     to_resend_.pop_back();
    627   }
    628 }
    629 
    630 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
    631   // TODO(zea): use a message id the sender understands.
    632   base::MessageLoop::current()->PostTask(
    633       FROM_HERE,
    634       base::Bind(message_sent_callback_,
    635                  "Message " + base::UintToString(device_stream_id) + " sent."));
    636 
    637   PersistentIdList acked_incoming_ids;
    638   for (std::map<StreamId, PersistentIdList>::iterator iter =
    639            acked_server_ids_.begin();
    640        iter != acked_server_ids_.end() &&
    641            iter->first <= device_stream_id;) {
    642     acked_incoming_ids.insert(acked_incoming_ids.end(),
    643                               iter->second.begin(),
    644                               iter->second.end());
    645     acked_server_ids_.erase(iter++);
    646   }
    647 
    648   DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
    649            << " acknowledged server messages.";
    650   rmq_store_.RemoveIncomingMessages(acked_incoming_ids,
    651                                     base::Bind(&MCSClient::OnRMQUpdateFinished,
    652                                                weak_ptr_factory_.GetWeakPtr()));
    653 }
    654 
    655 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
    656   return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
    657 }
    658 
    659 } // namespace gcm
    660