1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_ 6 #define GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_ 7 8 #include <deque> 9 #include <map> 10 #include <string> 11 #include <vector> 12 13 #include "base/files/file_path.h" 14 #include "base/memory/linked_ptr.h" 15 #include "base/memory/weak_ptr.h" 16 #include "base/timer/timer.h" 17 #include "google_apis/gcm/base/gcm_export.h" 18 #include "google_apis/gcm/base/mcs_message.h" 19 #include "google_apis/gcm/engine/connection_handler.h" 20 #include "google_apis/gcm/engine/rmq_store.h" 21 22 namespace google { 23 namespace protobuf { 24 class MessageLite; 25 } // namespace protobuf 26 } // namespace google 27 28 namespace mcs_proto { 29 class LoginRequest; 30 } 31 32 namespace gcm { 33 34 class ConnectionFactory; 35 struct ReliablePacketInfo; 36 37 // An MCS client. This client is in charge of all communications with an 38 // MCS endpoint, and is capable of reliably sending/receiving GCM messages. 39 // NOTE: Not thread safe. This class should live on the same thread as that 40 // network requests are performed on. 41 class GCM_EXPORT MCSClient { 42 public: 43 enum State { 44 UNINITIALIZED, // Uninitialized. 45 LOADING, // Waiting for RMQ load to finish. 46 LOADED, // RMQ Load finished, waiting to connect. 47 CONNECTING, // Connection in progress. 48 CONNECTED, // Connected and running. 49 }; 50 51 // Callback for informing MCSClient status. It is valid for this to be 52 // invoked more than once if a permanent error is encountered after a 53 // successful login was initiated. 54 typedef base::Callback< 55 void(bool success, 56 uint64 restored_android_id, 57 uint64 restored_security_token)> InitializationCompleteCallback; 58 // Callback when a message is received. 59 typedef base::Callback<void(const MCSMessage& message)> 60 OnMessageReceivedCallback; 61 // Callback when a message is sent (and receipt has been acknowledged by 62 // the MCS endpoint). 63 // TODO(zea): pass some sort of structure containing more details about 64 // send failures. 65 typedef base::Callback<void(const std::string& message_id)> 66 OnMessageSentCallback; 67 68 MCSClient(const base::FilePath& rmq_path, 69 ConnectionFactory* connection_factory, 70 scoped_refptr<base::SequencedTaskRunner> blocking_task_runner); 71 virtual ~MCSClient(); 72 73 // Initialize the client. Will load any previous id/token information as well 74 // as unacknowledged message information from the RMQ storage, if it exists, 75 // passing the id/token information back via |initialization_callback| along 76 // with a |success == true| result. If no RMQ information is present (and 77 // this is therefore a fresh client), a clean RMQ store will be created and 78 // values of 0 will be returned via |initialization_callback| with 79 // |success == true|. 80 /// If an error loading the RMQ store is encountered, 81 // |initialization_callback| will be invoked with |success == false|. 82 void Initialize(const InitializationCompleteCallback& initialization_callback, 83 const OnMessageReceivedCallback& message_received_callback, 84 const OnMessageSentCallback& message_sent_callback); 85 86 // Logs the client into the server. Client must be initialized. 87 // |android_id| and |security_token| are optional if this is not a new 88 // client, else they must be non-zero. 89 // Successful login will result in |message_received_callback| being invoked 90 // with a valid LoginResponse. 91 // Login failure (typically invalid id/token) will shut down the client, and 92 // |initialization_callback| to be invoked with |success = false|. 93 void Login(uint64 android_id, uint64 security_token); 94 95 // Sends a message, with or without reliable message queueing (RMQ) support. 96 // Will asynchronously invoke the OnMessageSent callback regardless. 97 // TODO(zea): support TTL. 98 void SendMessage(const MCSMessage& message, bool use_rmq); 99 100 // Disconnects the client and permanently destroys the persistent RMQ store. 101 // WARNING: This is permanent, and the client must be recreated with new 102 // credentials afterwards. 103 void Destroy(); 104 105 // Returns the current state of the client. 106 State state() const { return state_; } 107 108 private: 109 typedef uint32 StreamId; 110 typedef std::string PersistentId; 111 typedef std::vector<StreamId> StreamIdList; 112 typedef std::vector<PersistentId> PersistentIdList; 113 typedef std::map<StreamId, PersistentId> StreamIdToPersistentIdMap; 114 typedef linked_ptr<ReliablePacketInfo> MCSPacketInternal; 115 116 // Resets the internal state and builds a new login request, acknowledging 117 // any pending server-to-device messages and rebuilding the send queue 118 // from all unacknowledged device-to-server messages. 119 // Should only be called when the connection has been reset. 120 void ResetStateAndBuildLoginRequest(mcs_proto::LoginRequest* request); 121 122 // Send a heartbeat to the MCS server. 123 void SendHeartbeat(); 124 125 // RMQ Store callbacks. 126 void OnRMQLoadFinished(const RMQStore::LoadResult& result); 127 void OnRMQUpdateFinished(bool success); 128 129 // Attempt to send a message. 130 void MaybeSendMessage(); 131 132 // Helper for sending a protobuf along with any unacknowledged ids to the 133 // wire. 134 void SendPacketToWire(ReliablePacketInfo* packet_info); 135 136 // Handle a data message sent to the MCS client system from the MCS server. 137 void HandleMCSDataMesssage( 138 scoped_ptr<google::protobuf::MessageLite> protobuf); 139 140 // Handle a packet received over the wire. 141 void HandlePacketFromWire(scoped_ptr<google::protobuf::MessageLite> protobuf); 142 143 // ReliableMessageQueue acknowledgment helpers. 144 // Handle a StreamAck sent by the server confirming receipt of all 145 // messages up to the message with stream id |last_stream_id_received|. 146 void HandleStreamAck(StreamId last_stream_id_received_); 147 // Handle a SelectiveAck sent by the server confirming all messages 148 // in |id_list|. 149 void HandleSelectiveAck(const PersistentIdList& id_list); 150 // Handle server confirmation of a device message, including device's 151 // acknowledgment of receipt of messages. 152 void HandleServerConfirmedReceipt(StreamId device_stream_id); 153 154 // Generates a new persistent id for messages. 155 // Virtual for testing. 156 virtual PersistentId GetNextPersistentId(); 157 158 // Client state. 159 State state_; 160 161 // Callbacks for owner. 162 InitializationCompleteCallback initialization_callback_; 163 OnMessageReceivedCallback message_received_callback_; 164 OnMessageSentCallback message_sent_callback_; 165 166 // The android id and security token in use by this device. 167 uint64 android_id_; 168 uint64 security_token_; 169 170 // Factory for creating new connections and connection handlers. 171 ConnectionFactory* connection_factory_; 172 173 // Connection handler to handle all over-the-wire protocol communication 174 // with the mobile connection server. 175 ConnectionHandler* connection_handler_; 176 177 // ----- Reliablie Message Queue section ----- 178 // Note: all queues/maps are ordered from oldest (front/begin) message to 179 // most recent (back/end). 180 181 // Send/acknowledge queues. 182 std::deque<MCSPacketInternal> to_send_; 183 std::deque<MCSPacketInternal> to_resend_; 184 185 // Last device_to_server stream id acknowledged by the server. 186 StreamId last_device_to_server_stream_id_received_; 187 // Last server_to_device stream id acknowledged by this device. 188 StreamId last_server_to_device_stream_id_received_; 189 // The stream id for the last sent message. A new message should consume 190 // stream_id_out_ + 1. 191 StreamId stream_id_out_; 192 // The stream id of the last received message. The LoginResponse will always 193 // have a stream id of 1, and stream ids increment by 1 for each received 194 // message. 195 StreamId stream_id_in_; 196 197 // The server messages that have not been acked by the device yet. Keyed by 198 // server stream id. 199 StreamIdToPersistentIdMap unacked_server_ids_; 200 201 // Those server messages that have been acked. They must remain tracked 202 // until the ack message is itself confirmed. The list of all message ids 203 // acknowledged are keyed off the device stream id of the message that 204 // acknowledged them. 205 std::map<StreamId, PersistentIdList> acked_server_ids_; 206 207 // Those server messages from a previous connection that were not fully 208 // acknowledged. They do not have associated stream ids, and will be 209 // acknowledged on the next login attempt. 210 PersistentIdList restored_unackeds_server_ids_; 211 212 // The reliable message queue persistent store. 213 RMQStore rmq_store_; 214 215 // ----- Heartbeats ----- 216 // The current heartbeat interval. 217 base::TimeDelta heartbeat_interval_; 218 // Timer for triggering heartbeats. 219 base::Timer heartbeat_timer_; 220 221 // The task runner for blocking tasks (i.e. persisting RMQ state to disk). 222 scoped_refptr<base::SequencedTaskRunner> blocking_task_runner_; 223 224 base::WeakPtrFactory<MCSClient> weak_ptr_factory_; 225 226 DISALLOW_COPY_AND_ASSIGN(MCSClient); 227 }; 228 229 } // namespace gcm 230 231 #endif // GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_ 232