Home | History | Annotate | Download | only in engine
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_
      6 #define GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_
      7 
      8 #include <deque>
      9 #include <map>
     10 #include <string>
     11 #include <vector>
     12 
     13 #include "base/files/file_path.h"
     14 #include "base/memory/linked_ptr.h"
     15 #include "base/memory/weak_ptr.h"
     16 #include "base/timer/timer.h"
     17 #include "google_apis/gcm/base/gcm_export.h"
     18 #include "google_apis/gcm/base/mcs_message.h"
     19 #include "google_apis/gcm/engine/connection_handler.h"
     20 #include "google_apis/gcm/engine/rmq_store.h"
     21 
     22 namespace google {
     23 namespace protobuf {
     24 class MessageLite;
     25 }  // namespace protobuf
     26 }  // namespace google
     27 
     28 namespace mcs_proto {
     29 class LoginRequest;
     30 }
     31 
     32 namespace gcm {
     33 
     34 class ConnectionFactory;
     35 struct ReliablePacketInfo;
     36 
     37 // An MCS client. This client is in charge of all communications with an
     38 // MCS endpoint, and is capable of reliably sending/receiving GCM messages.
     39 // NOTE: Not thread safe. This class should live on the same thread as that
     40 // network requests are performed on.
     41 class GCM_EXPORT MCSClient {
     42  public:
     43   enum State {
     44     UNINITIALIZED,    // Uninitialized.
     45     LOADING,          // Waiting for RMQ load to finish.
     46     LOADED,           // RMQ Load finished, waiting to connect.
     47     CONNECTING,       // Connection in progress.
     48     CONNECTED,        // Connected and running.
     49   };
     50 
     51   // Callback for informing MCSClient status. It is valid for this to be
     52   // invoked more than once if a permanent error is encountered after a
     53   // successful login was initiated.
     54   typedef base::Callback<
     55       void(bool success,
     56            uint64 restored_android_id,
     57            uint64 restored_security_token)> InitializationCompleteCallback;
     58   // Callback when a message is received.
     59   typedef base::Callback<void(const MCSMessage& message)>
     60       OnMessageReceivedCallback;
     61   // Callback when a message is sent (and receipt has been acknowledged by
     62   // the MCS endpoint).
     63   // TODO(zea): pass some sort of structure containing more details about
     64   // send failures.
     65   typedef base::Callback<void(const std::string& message_id)>
     66       OnMessageSentCallback;
     67 
     68   MCSClient(const base::FilePath& rmq_path,
     69             ConnectionFactory* connection_factory,
     70             scoped_refptr<base::SequencedTaskRunner> blocking_task_runner);
     71   virtual ~MCSClient();
     72 
     73   // Initialize the client. Will load any previous id/token information as well
     74   // as unacknowledged message information from the RMQ storage, if it exists,
     75   // passing the id/token information back via |initialization_callback| along
     76   // with a |success == true| result. If no RMQ information is present (and
     77   // this is therefore a fresh client), a clean RMQ store will be created and
     78   // values of 0 will be returned via |initialization_callback| with
     79   // |success == true|.
     80   /// If an error loading the RMQ store is encountered,
     81   // |initialization_callback| will be invoked with |success == false|.
     82   void Initialize(const InitializationCompleteCallback& initialization_callback,
     83                   const OnMessageReceivedCallback& message_received_callback,
     84                   const OnMessageSentCallback& message_sent_callback);
     85 
     86   // Logs the client into the server. Client must be initialized.
     87   // |android_id| and |security_token| are optional if this is not a new
     88   // client, else they must be non-zero.
     89   // Successful login will result in |message_received_callback| being invoked
     90   // with a valid LoginResponse.
     91   // Login failure (typically invalid id/token) will shut down the client, and
     92   // |initialization_callback| to be invoked with |success = false|.
     93   void Login(uint64 android_id, uint64 security_token);
     94 
     95   // Sends a message, with or without reliable message queueing (RMQ) support.
     96   // Will asynchronously invoke the OnMessageSent callback regardless.
     97   // TODO(zea): support TTL.
     98   void SendMessage(const MCSMessage& message, bool use_rmq);
     99 
    100   // Disconnects the client and permanently destroys the persistent RMQ store.
    101   // WARNING: This is permanent, and the client must be recreated with new
    102   // credentials afterwards.
    103   void Destroy();
    104 
    105   // Returns the current state of the client.
    106   State state() const { return state_; }
    107 
    108  private:
    109   typedef uint32 StreamId;
    110   typedef std::string PersistentId;
    111   typedef std::vector<StreamId> StreamIdList;
    112   typedef std::vector<PersistentId> PersistentIdList;
    113   typedef std::map<StreamId, PersistentId> StreamIdToPersistentIdMap;
    114   typedef linked_ptr<ReliablePacketInfo> MCSPacketInternal;
    115 
    116   // Resets the internal state and builds a new login request, acknowledging
    117   // any pending server-to-device messages and rebuilding the send queue
    118   // from all unacknowledged device-to-server messages.
    119   // Should only be called when the connection has been reset.
    120   void ResetStateAndBuildLoginRequest(mcs_proto::LoginRequest* request);
    121 
    122   // Send a heartbeat to the MCS server.
    123   void SendHeartbeat();
    124 
    125   // RMQ Store callbacks.
    126   void OnRMQLoadFinished(const RMQStore::LoadResult& result);
    127   void OnRMQUpdateFinished(bool success);
    128 
    129   // Attempt to send a message.
    130   void MaybeSendMessage();
    131 
    132   // Helper for sending a protobuf along with any unacknowledged ids to the
    133   // wire.
    134   void SendPacketToWire(ReliablePacketInfo* packet_info);
    135 
    136   // Handle a data message sent to the MCS client system from the MCS server.
    137   void HandleMCSDataMesssage(
    138       scoped_ptr<google::protobuf::MessageLite> protobuf);
    139 
    140   // Handle a packet received over the wire.
    141   void HandlePacketFromWire(scoped_ptr<google::protobuf::MessageLite> protobuf);
    142 
    143   // ReliableMessageQueue acknowledgment helpers.
    144   // Handle a StreamAck sent by the server confirming receipt of all
    145   // messages up to the message with stream id |last_stream_id_received|.
    146   void HandleStreamAck(StreamId last_stream_id_received_);
    147   // Handle a SelectiveAck sent by the server confirming all messages
    148   // in |id_list|.
    149   void HandleSelectiveAck(const PersistentIdList& id_list);
    150   // Handle server confirmation of a device message, including device's
    151   // acknowledgment of receipt of messages.
    152   void HandleServerConfirmedReceipt(StreamId device_stream_id);
    153 
    154   // Generates a new persistent id for messages.
    155   // Virtual for testing.
    156   virtual PersistentId GetNextPersistentId();
    157 
    158   // Client state.
    159   State state_;
    160 
    161   // Callbacks for owner.
    162   InitializationCompleteCallback initialization_callback_;
    163   OnMessageReceivedCallback message_received_callback_;
    164   OnMessageSentCallback message_sent_callback_;
    165 
    166   // The android id and security token in use by this device.
    167   uint64 android_id_;
    168   uint64 security_token_;
    169 
    170   // Factory for creating new connections and connection handlers.
    171   ConnectionFactory* connection_factory_;
    172 
    173   // Connection handler to handle all over-the-wire protocol communication
    174   // with the mobile connection server.
    175   ConnectionHandler* connection_handler_;
    176 
    177   // -----  Reliablie Message Queue section -----
    178   // Note: all queues/maps are ordered from oldest (front/begin) message to
    179   // most recent (back/end).
    180 
    181   // Send/acknowledge queues.
    182   std::deque<MCSPacketInternal> to_send_;
    183   std::deque<MCSPacketInternal> to_resend_;
    184 
    185   // Last device_to_server stream id acknowledged by the server.
    186   StreamId last_device_to_server_stream_id_received_;
    187   // Last server_to_device stream id acknowledged by this device.
    188   StreamId last_server_to_device_stream_id_received_;
    189   // The stream id for the last sent message. A new message should consume
    190   // stream_id_out_ + 1.
    191   StreamId stream_id_out_;
    192   // The stream id of the last received message. The LoginResponse will always
    193   // have a stream id of 1, and stream ids increment by 1 for each received
    194   // message.
    195   StreamId stream_id_in_;
    196 
    197   // The server messages that have not been acked by the device yet. Keyed by
    198   // server stream id.
    199   StreamIdToPersistentIdMap unacked_server_ids_;
    200 
    201   // Those server messages that have been acked. They must remain tracked
    202   // until the ack message is itself confirmed. The list of all message ids
    203   // acknowledged are keyed off the device stream id of the message that
    204   // acknowledged them.
    205   std::map<StreamId, PersistentIdList> acked_server_ids_;
    206 
    207   // Those server messages from a previous connection that were not fully
    208   // acknowledged. They do not have associated stream ids, and will be
    209   // acknowledged on the next login attempt.
    210   PersistentIdList restored_unackeds_server_ids_;
    211 
    212   // The reliable message queue persistent store.
    213   RMQStore rmq_store_;
    214 
    215   // ----- Heartbeats -----
    216   // The current heartbeat interval.
    217   base::TimeDelta heartbeat_interval_;
    218   // Timer for triggering heartbeats.
    219   base::Timer heartbeat_timer_;
    220 
    221   // The task runner for blocking tasks (i.e. persisting RMQ state to disk).
    222   scoped_refptr<base::SequencedTaskRunner> blocking_task_runner_;
    223 
    224   base::WeakPtrFactory<MCSClient> weak_ptr_factory_;
    225 
    226   DISALLOW_COPY_AND_ASSIGN(MCSClient);
    227 };
    228 
    229 } // namespace gcm
    230 
    231 #endif  // GOOGLE_APIS_GCM_ENGINE_MCS_CLIENT_H_
    232