1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "google_apis/gcm/engine/mcs_client.h" 6 7 #include "base/basictypes.h" 8 #include "base/message_loop/message_loop.h" 9 #include "base/strings/string_number_conversions.h" 10 #include "google_apis/gcm/base/mcs_util.h" 11 #include "google_apis/gcm/base/socket_stream.h" 12 #include "google_apis/gcm/engine/connection_factory.h" 13 #include "google_apis/gcm/engine/rmq_store.h" 14 15 using namespace google::protobuf::io; 16 17 namespace gcm { 18 19 namespace { 20 21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; 22 23 // TODO(zea): get these values from MCS settings. 24 const int64 kHeartbeatDefaultSeconds = 60 * 15; // 15 minutes. 25 26 // The category of messages intended for the GCM client itself from MCS. 27 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice"; 28 29 // The from field for messages originating in the GCM client. 30 const char kGCMFromField[] = "gcm (at) android.com"; 31 32 // MCS status message types. 33 const char kIdleNotification[] = "IdleNotification"; 34 // TODO(zea): consume the following message types: 35 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle"; 36 // const char kPowerNotification[] = "PowerNotification"; 37 // const char kDataActiveNotification[] = "DataActiveNotification"; 38 39 // The number of unacked messages to allow before sending a stream ack. 40 // Applies to both incoming and outgoing messages. 41 // TODO(zea): make this server configurable. 42 const int kUnackedMessageBeforeStreamAck = 10; 43 44 // The global maximum number of pending messages to have in the send queue. 45 const size_t kMaxSendQueueSize = 10 * 1024; 46 47 // The maximum message size that can be sent to the server. 48 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server. 49 50 // Helper for converting a proto persistent id list to a vector of strings. 51 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, 52 std::vector<std::string>* id_list) { 53 mcs_proto::SelectiveAck selective_ack; 54 if (!selective_ack.ParseFromString(bytes)) 55 return false; 56 std::vector<std::string> new_list; 57 for (int i = 0; i < selective_ack.id_size(); ++i) { 58 DCHECK(!selective_ack.id(i).empty()); 59 new_list.push_back(selective_ack.id(i)); 60 } 61 id_list->swap(new_list); 62 return true; 63 } 64 65 } // namespace 66 67 struct ReliablePacketInfo { 68 ReliablePacketInfo(); 69 ~ReliablePacketInfo(); 70 71 // The stream id with which the message was sent. 72 uint32 stream_id; 73 74 // If reliable delivery was requested, the persistent id of the message. 75 std::string persistent_id; 76 77 // The type of message itself (for easier lookup). 78 uint8 tag; 79 80 // The protobuf of the message itself. 81 MCSProto protobuf; 82 }; 83 84 ReliablePacketInfo::ReliablePacketInfo() 85 : stream_id(0), tag(0) { 86 } 87 ReliablePacketInfo::~ReliablePacketInfo() {} 88 89 MCSClient::MCSClient( 90 const base::FilePath& rmq_path, 91 ConnectionFactory* connection_factory, 92 scoped_refptr<base::SequencedTaskRunner> blocking_task_runner) 93 : state_(UNINITIALIZED), 94 android_id_(0), 95 security_token_(0), 96 connection_factory_(connection_factory), 97 connection_handler_(NULL), 98 last_device_to_server_stream_id_received_(0), 99 last_server_to_device_stream_id_received_(0), 100 stream_id_out_(0), 101 stream_id_in_(0), 102 rmq_store_(rmq_path, blocking_task_runner), 103 heartbeat_interval_( 104 base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)), 105 heartbeat_timer_(true, true), 106 blocking_task_runner_(blocking_task_runner), 107 weak_ptr_factory_(this) { 108 } 109 110 MCSClient::~MCSClient() { 111 } 112 113 void MCSClient::Initialize( 114 const InitializationCompleteCallback& initialization_callback, 115 const OnMessageReceivedCallback& message_received_callback, 116 const OnMessageSentCallback& message_sent_callback) { 117 DCHECK_EQ(state_, UNINITIALIZED); 118 initialization_callback_ = initialization_callback; 119 message_received_callback_ = message_received_callback; 120 message_sent_callback_ = message_sent_callback; 121 122 state_ = LOADING; 123 rmq_store_.Load(base::Bind(&MCSClient::OnRMQLoadFinished, 124 weak_ptr_factory_.GetWeakPtr())); 125 126 connection_factory_->Initialize( 127 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, 128 weak_ptr_factory_.GetWeakPtr()), 129 base::Bind(&MCSClient::HandlePacketFromWire, 130 weak_ptr_factory_.GetWeakPtr()), 131 base::Bind(&MCSClient::MaybeSendMessage, 132 weak_ptr_factory_.GetWeakPtr())); 133 connection_handler_ = connection_factory_->GetConnectionHandler(); 134 } 135 136 void MCSClient::Login(uint64 android_id, uint64 security_token) { 137 DCHECK_EQ(state_, LOADED); 138 if (android_id != android_id_ && security_token != security_token_) { 139 DCHECK(android_id); 140 DCHECK(security_token); 141 DCHECK(restored_unackeds_server_ids_.empty()); 142 android_id_ = android_id; 143 security_token_ = security_token; 144 rmq_store_.SetDeviceCredentials(android_id_, 145 security_token_, 146 base::Bind(&MCSClient::OnRMQUpdateFinished, 147 weak_ptr_factory_.GetWeakPtr())); 148 } 149 150 state_ = CONNECTING; 151 connection_factory_->Connect(); 152 } 153 154 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { 155 DCHECK_EQ(state_, CONNECTED); 156 if (to_send_.size() > kMaxSendQueueSize) { 157 base::MessageLoop::current()->PostTask( 158 FROM_HERE, 159 base::Bind(message_sent_callback_, "Message queue full.")); 160 return; 161 } 162 if (message.size() > kMaxMessageBytes) { 163 base::MessageLoop::current()->PostTask( 164 FROM_HERE, 165 base::Bind(message_sent_callback_, "Message too large.")); 166 return; 167 } 168 169 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 170 packet_info->protobuf = message.CloneProtobuf(); 171 172 if (use_rmq) { 173 PersistentId persistent_id = GetNextPersistentId(); 174 DVLOG(1) << "Setting persistent id to " << persistent_id; 175 packet_info->persistent_id = persistent_id; 176 SetPersistentId(persistent_id, 177 packet_info->protobuf.get()); 178 rmq_store_.AddOutgoingMessage(persistent_id, 179 MCSMessage(message.tag(), 180 *(packet_info->protobuf)), 181 base::Bind(&MCSClient::OnRMQUpdateFinished, 182 weak_ptr_factory_.GetWeakPtr())); 183 } else { 184 // Check that there is an active connection to the endpoint. 185 if (!connection_handler_->CanSendMessage()) { 186 base::MessageLoop::current()->PostTask( 187 FROM_HERE, 188 base::Bind(message_sent_callback_, "Unable to reach endpoint")); 189 return; 190 } 191 } 192 to_send_.push_back(make_linked_ptr(packet_info)); 193 MaybeSendMessage(); 194 } 195 196 void MCSClient::Destroy() { 197 rmq_store_.Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, 198 weak_ptr_factory_.GetWeakPtr())); 199 } 200 201 void MCSClient::ResetStateAndBuildLoginRequest( 202 mcs_proto::LoginRequest* request) { 203 DCHECK(android_id_); 204 DCHECK(security_token_); 205 stream_id_in_ = 0; 206 stream_id_out_ = 1; 207 last_device_to_server_stream_id_received_ = 0; 208 last_server_to_device_stream_id_received_ = 0; 209 210 // TODO(zea): expire all messages older than their TTL. 211 212 // Add any pending acknowledgments to the list of ids. 213 for (StreamIdToPersistentIdMap::const_iterator iter = 214 unacked_server_ids_.begin(); 215 iter != unacked_server_ids_.end(); ++iter) { 216 restored_unackeds_server_ids_.push_back(iter->second); 217 } 218 unacked_server_ids_.clear(); 219 220 // Any acknowledged server ids which have not been confirmed by the server 221 // are treated like unacknowledged ids. 222 for (std::map<StreamId, PersistentIdList>::const_iterator iter = 223 acked_server_ids_.begin(); 224 iter != acked_server_ids_.end(); ++iter) { 225 restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(), 226 iter->second.begin(), 227 iter->second.end()); 228 } 229 acked_server_ids_.clear(); 230 231 // Then build the request, consuming all pending acknowledgments. 232 request->Swap(BuildLoginRequest(android_id_, security_token_).get()); 233 for (PersistentIdList::const_iterator iter = 234 restored_unackeds_server_ids_.begin(); 235 iter != restored_unackeds_server_ids_.end(); ++iter) { 236 request->add_received_persistent_id(*iter); 237 } 238 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; 239 restored_unackeds_server_ids_.clear(); 240 241 // Push all unacknowledged messages to front of send queue. No need to save 242 // to RMQ, as all messages that reach this point should already have been 243 // saved as necessary. 244 while (!to_resend_.empty()) { 245 to_send_.push_front(to_resend_.back()); 246 to_resend_.pop_back(); 247 } 248 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() 249 << " incoming acks pending, and " << to_send_.size() 250 << " pending outgoing messages."; 251 252 heartbeat_timer_.Stop(); 253 254 state_ = CONNECTING; 255 } 256 257 void MCSClient::SendHeartbeat() { 258 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), 259 false); 260 } 261 262 void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) { 263 if (!result.success) { 264 state_ = UNINITIALIZED; 265 LOG(ERROR) << "Failed to load/create RMQ state. Not connecting."; 266 initialization_callback_.Run(false, 0, 0); 267 return; 268 } 269 state_ = LOADED; 270 stream_id_out_ = 1; // Login request is hardcoded to id 1. 271 272 if (result.device_android_id == 0 || result.device_security_token == 0) { 273 DVLOG(1) << "No device credentials found, assuming new client."; 274 initialization_callback_.Run(true, 0, 0); 275 return; 276 } 277 278 android_id_ = result.device_android_id; 279 security_token_ = result.device_security_token; 280 281 DVLOG(1) << "RMQ Load finished with " << result.incoming_messages.size() 282 << " incoming acks pending and " << result.outgoing_messages.size() 283 << " outgoing messages pending."; 284 285 restored_unackeds_server_ids_ = result.incoming_messages; 286 287 // First go through and order the outgoing messages by recency. 288 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; 289 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator 290 iter = result.outgoing_messages.begin(); 291 iter != result.outgoing_messages.end(); ++iter) { 292 uint64 timestamp = 0; 293 if (!base::StringToUint64(iter->first, ×tamp)) { 294 LOG(ERROR) << "Invalid restored message."; 295 return; 296 } 297 ordered_messages[timestamp] = iter->second; 298 } 299 300 // Now go through and add the outgoing messages to the send queue in their 301 // appropriate order (oldest at front, most recent at back). 302 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator 303 iter = ordered_messages.begin(); 304 iter != ordered_messages.end(); ++iter) { 305 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 306 packet_info->protobuf.reset(iter->second); 307 packet_info->persistent_id = base::Uint64ToString(iter->first); 308 to_send_.push_back(make_linked_ptr(packet_info)); 309 } 310 311 initialization_callback_.Run(true, android_id_, security_token_); 312 } 313 314 void MCSClient::OnRMQUpdateFinished(bool success) { 315 LOG_IF(ERROR, !success) << "RMQ Update failed!"; 316 // TODO(zea): Rebuild the store from scratch in case of persistence failure? 317 } 318 319 void MCSClient::MaybeSendMessage() { 320 if (to_send_.empty()) 321 return; 322 323 if (!connection_handler_->CanSendMessage()) 324 return; 325 326 // TODO(zea): drop messages older than their TTL. 327 328 DVLOG(1) << "Pending output message found, sending."; 329 MCSPacketInternal packet = to_send_.front(); 330 to_send_.pop_front(); 331 if (!packet->persistent_id.empty()) 332 to_resend_.push_back(packet); 333 SendPacketToWire(packet.get()); 334 } 335 336 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { 337 // Reset the heartbeat interval. 338 heartbeat_timer_.Reset(); 339 packet_info->stream_id = ++stream_id_out_; 340 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); 341 342 // Set the proper last received stream id to acknowledge received server 343 // packets. 344 DVLOG(1) << "Setting last stream id received to " 345 << stream_id_in_; 346 SetLastStreamIdReceived(stream_id_in_, 347 packet_info->protobuf.get()); 348 if (stream_id_in_ != last_server_to_device_stream_id_received_) { 349 last_server_to_device_stream_id_received_ = stream_id_in_; 350 // Mark all acknowledged server messages as such. Note: they're not dropped, 351 // as it may be that they'll need to be re-acked if this message doesn't 352 // make it. 353 PersistentIdList persistent_id_list; 354 for (StreamIdToPersistentIdMap::const_iterator iter = 355 unacked_server_ids_.begin(); 356 iter != unacked_server_ids_.end(); ++iter) { 357 DCHECK_LE(iter->first, last_server_to_device_stream_id_received_); 358 persistent_id_list.push_back(iter->second); 359 } 360 unacked_server_ids_.clear(); 361 acked_server_ids_[stream_id_out_] = persistent_id_list; 362 } 363 364 connection_handler_->SendMessage(*packet_info->protobuf); 365 } 366 367 void MCSClient::HandleMCSDataMesssage( 368 scoped_ptr<google::protobuf::MessageLite> protobuf) { 369 mcs_proto::DataMessageStanza* data_message = 370 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); 371 // TODO(zea): implement a proper status manager rather than hardcoding these 372 // values. 373 scoped_ptr<mcs_proto::DataMessageStanza> response( 374 new mcs_proto::DataMessageStanza()); 375 response->set_from(kGCMFromField); 376 bool send = false; 377 for (int i = 0; i < data_message->app_data_size(); ++i) { 378 const mcs_proto::AppData& app_data = data_message->app_data(i); 379 if (app_data.key() == kIdleNotification) { 380 // Tell the MCS server the client is not idle. 381 send = true; 382 mcs_proto::AppData data; 383 data.set_key(kIdleNotification); 384 data.set_value("false"); 385 response->add_app_data()->CopyFrom(data); 386 response->set_category(kMCSCategory); 387 } 388 } 389 390 if (send) { 391 SendMessage( 392 MCSMessage(kDataMessageStanzaTag, 393 response.PassAs<const google::protobuf::MessageLite>()), 394 false); 395 } 396 } 397 398 void MCSClient::HandlePacketFromWire( 399 scoped_ptr<google::protobuf::MessageLite> protobuf) { 400 if (!protobuf.get()) 401 return; 402 uint8 tag = GetMCSProtoTag(*protobuf); 403 PersistentId persistent_id = GetPersistentId(*protobuf); 404 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); 405 406 if (last_stream_id_received != 0) { 407 last_device_to_server_stream_id_received_ = last_stream_id_received; 408 409 // Process device to server messages that have now been acknowledged by the 410 // server. Because messages are stored in order, just pop off all that have 411 // a stream id lower than server's last received stream id. 412 HandleStreamAck(last_stream_id_received); 413 414 // Process server_to_device_messages that the server now knows were 415 // acknowledged. Again, they're in order, so just keep going until the 416 // stream id is reached. 417 StreamIdList acked_stream_ids_to_remove; 418 for (std::map<StreamId, PersistentIdList>::iterator iter = 419 acked_server_ids_.begin(); 420 iter != acked_server_ids_.end() && 421 iter->first <= last_stream_id_received; ++iter) { 422 acked_stream_ids_to_remove.push_back(iter->first); 423 } 424 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin(); 425 iter != acked_stream_ids_to_remove.end(); ++iter) { 426 acked_server_ids_.erase(*iter); 427 } 428 } 429 430 ++stream_id_in_; 431 if (!persistent_id.empty()) { 432 unacked_server_ids_[stream_id_in_] = persistent_id; 433 rmq_store_.AddIncomingMessage(persistent_id, 434 base::Bind(&MCSClient::OnRMQUpdateFinished, 435 weak_ptr_factory_.GetWeakPtr())); 436 } 437 438 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() 439 << " with persistent id " 440 << (persistent_id.empty() ? "NULL" : persistent_id) 441 << ", stream id " << stream_id_in_ << " and last stream id received " 442 << last_stream_id_received; 443 444 if (unacked_server_ids_.size() > 0 && 445 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { 446 SendMessage(MCSMessage(kIqStanzaTag, 447 BuildStreamAck(). 448 PassAs<const google::protobuf::MessageLite>()), 449 false); 450 } 451 452 switch (tag) { 453 case kLoginResponseTag: { 454 mcs_proto::LoginResponse* login_response = 455 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); 456 DVLOG(1) << "Received login response:"; 457 DVLOG(1) << " Id: " << login_response->id(); 458 DVLOG(1) << " Timestamp: " << login_response->server_timestamp(); 459 if (login_response->has_error()) { 460 state_ = UNINITIALIZED; 461 DVLOG(1) << " Error code: " << login_response->error().code(); 462 DVLOG(1) << " Error message: " << login_response->error().message(); 463 initialization_callback_.Run(false, 0, 0); 464 return; 465 } 466 467 state_ = CONNECTED; 468 stream_id_in_ = 1; // To account for the login response. 469 DCHECK_EQ(1U, stream_id_out_); 470 471 // Pass the login response on up. 472 base::MessageLoop::current()->PostTask( 473 FROM_HERE, 474 base::Bind(message_received_callback_, 475 MCSMessage(tag, 476 protobuf.PassAs< 477 const google::protobuf::MessageLite>()))); 478 479 // If there are pending messages, attempt to send one. 480 if (!to_send_.empty()) { 481 base::MessageLoop::current()->PostTask( 482 FROM_HERE, 483 base::Bind(&MCSClient::MaybeSendMessage, 484 weak_ptr_factory_.GetWeakPtr())); 485 } 486 487 heartbeat_timer_.Start(FROM_HERE, 488 heartbeat_interval_, 489 base::Bind(&MCSClient::SendHeartbeat, 490 weak_ptr_factory_.GetWeakPtr())); 491 return; 492 } 493 case kHeartbeatPingTag: 494 DCHECK_GE(stream_id_in_, 1U); 495 DVLOG(1) << "Received heartbeat ping, sending ack."; 496 SendMessage( 497 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); 498 return; 499 case kHeartbeatAckTag: 500 DCHECK_GE(stream_id_in_, 1U); 501 DVLOG(1) << "Received heartbeat ack."; 502 // TODO(zea): add logic to reconnect if no ack received within a certain 503 // timeout (with backoff). 504 return; 505 case kCloseTag: 506 LOG(ERROR) << "Received close command, closing connection."; 507 state_ = UNINITIALIZED; 508 initialization_callback_.Run(false, 0, 0); 509 // TODO(zea): should this happen in non-error cases? Reconnect? 510 return; 511 case kIqStanzaTag: { 512 DCHECK_GE(stream_id_in_, 1U); 513 mcs_proto::IqStanza* iq_stanza = 514 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get()); 515 const mcs_proto::Extension& iq_extension = iq_stanza->extension(); 516 switch (iq_extension.id()) { 517 case kSelectiveAck: { 518 PersistentIdList acked_ids; 519 if (BuildPersistentIdListFromProto(iq_extension.data(), 520 &acked_ids)) { 521 HandleSelectiveAck(acked_ids); 522 } 523 return; 524 } 525 case kStreamAck: 526 // Do nothing. The last received stream id is always processed if it's 527 // present. 528 return; 529 default: 530 LOG(WARNING) << "Received invalid iq stanza extension " 531 << iq_extension.id(); 532 return; 533 } 534 } 535 case kDataMessageStanzaTag: { 536 DCHECK_GE(stream_id_in_, 1U); 537 mcs_proto::DataMessageStanza* data_message = 538 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); 539 if (data_message->category() == kMCSCategory) { 540 HandleMCSDataMesssage(protobuf.Pass()); 541 return; 542 } 543 544 DCHECK(protobuf.get()); 545 base::MessageLoop::current()->PostTask( 546 FROM_HERE, 547 base::Bind(message_received_callback_, 548 MCSMessage(tag, 549 protobuf.PassAs< 550 const google::protobuf::MessageLite>()))); 551 return; 552 } 553 default: 554 LOG(ERROR) << "Received unexpected message of type " 555 << static_cast<int>(tag); 556 return; 557 } 558 } 559 560 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) { 561 PersistentIdList acked_outgoing_persistent_ids; 562 StreamIdList acked_outgoing_stream_ids; 563 while (!to_resend_.empty() && 564 to_resend_.front()->stream_id <= last_stream_id_received) { 565 const MCSPacketInternal& outgoing_packet = to_resend_.front(); 566 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id); 567 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id); 568 to_resend_.pop_front(); 569 } 570 571 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() 572 << " outgoing messages, " << to_resend_.size() 573 << " remaining unacked"; 574 rmq_store_.RemoveOutgoingMessages(acked_outgoing_persistent_ids, 575 base::Bind(&MCSClient::OnRMQUpdateFinished, 576 weak_ptr_factory_.GetWeakPtr())); 577 578 HandleServerConfirmedReceipt(last_stream_id_received); 579 } 580 581 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { 582 // First check the to_resend_ queue. Acknowledgments should always happen 583 // in the order they were sent, so if messages are present they should match 584 // the acknowledge list. 585 PersistentIdList::const_iterator iter = id_list.begin(); 586 for (; iter != id_list.end() && !to_resend_.empty(); ++iter) { 587 const MCSPacketInternal& outgoing_packet = to_resend_.front(); 588 DCHECK_EQ(outgoing_packet->persistent_id, *iter); 589 590 // No need to re-acknowledge any server messages this message already 591 // acknowledged. 592 StreamId device_stream_id = outgoing_packet->stream_id; 593 HandleServerConfirmedReceipt(device_stream_id); 594 595 to_resend_.pop_front(); 596 } 597 598 // If the acknowledged ids aren't all there, they might be in the to_send_ 599 // queue (typically when a StreamAck confirms messages as part of a login 600 // response). 601 for (; iter != id_list.end() && !to_send_.empty(); ++iter) { 602 const MCSPacketInternal& outgoing_packet = to_send_.front(); 603 DCHECK_EQ(outgoing_packet->persistent_id, *iter); 604 605 // No need to re-acknowledge any server messages this message already 606 // acknowledged. 607 StreamId device_stream_id = outgoing_packet->stream_id; 608 HandleServerConfirmedReceipt(device_stream_id); 609 610 to_send_.pop_front(); 611 } 612 613 DCHECK(iter == id_list.end()); 614 615 DVLOG(1) << "Server acked " << id_list.size() 616 << " messages, " << to_resend_.size() << " remaining unacked."; 617 rmq_store_.RemoveOutgoingMessages(id_list, 618 base::Bind(&MCSClient::OnRMQUpdateFinished, 619 weak_ptr_factory_.GetWeakPtr())); 620 621 // Resend any remaining outgoing messages, as they were not received by the 622 // server. 623 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; 624 while (!to_resend_.empty()) { 625 to_send_.push_front(to_resend_.back()); 626 to_resend_.pop_back(); 627 } 628 } 629 630 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) { 631 // TODO(zea): use a message id the sender understands. 632 base::MessageLoop::current()->PostTask( 633 FROM_HERE, 634 base::Bind(message_sent_callback_, 635 "Message " + base::UintToString(device_stream_id) + " sent.")); 636 637 PersistentIdList acked_incoming_ids; 638 for (std::map<StreamId, PersistentIdList>::iterator iter = 639 acked_server_ids_.begin(); 640 iter != acked_server_ids_.end() && 641 iter->first <= device_stream_id;) { 642 acked_incoming_ids.insert(acked_incoming_ids.end(), 643 iter->second.begin(), 644 iter->second.end()); 645 acked_server_ids_.erase(iter++); 646 } 647 648 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() 649 << " acknowledged server messages."; 650 rmq_store_.RemoveIncomingMessages(acked_incoming_ids, 651 base::Bind(&MCSClient::OnRMQUpdateFinished, 652 weak_ptr_factory_.GetWeakPtr())); 653 } 654 655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { 656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); 657 } 658 659 } // namespace gcm 660