Home | History | Annotate | Download | only in base
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 //
      5 // Protobuf ZeroCopy[Input/Output]Stream implementations capable of using a
      6 // net::StreamSocket. Built to work with Protobuf CodedStreams.
      7 
      8 #ifndef GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
      9 #define GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
     10 
     11 #include "base/basictypes.h"
     12 #include "base/callback_forward.h"
     13 #include "base/compiler_specific.h"
     14 #include "base/memory/ref_counted.h"
     15 #include "base/memory/weak_ptr.h"
     16 #include "google/protobuf/io/zero_copy_stream.h"
     17 #include "google_apis/gcm/base/gcm_export.h"
     18 #include "net/base/net_errors.h"
     19 
     20 namespace net {
     21 class DrainableIOBuffer;
     22 class IOBuffer;
     23 class StreamSocket;
     24 }  // namespace net
     25 
     26 namespace gcm {
     27 
     28 // A helper class for interacting with a net::StreamSocket that is receiving
     29 // protobuf encoded messages. A SocketInputStream does not take ownership of
     30 // the socket itself, and it is expected that the life of the input stream
     31 // should match the life of the socket itself (while the socket remains
     32 // connected). If an error is encounters, the input stream will store the error
     33 // in |last_error_|, and GetState() will be set to CLOSED.
     34 // Typical usage:
     35 // 1. Check the GetState() of the input stream before using it. If CLOSED, the
     36 //    input stream must be rebuilt (and the socket likely needs to be
     37 //    reconnected as an error was encountered).
     38 // 2. If GetState() is EMPTY, call Refresh(..), passing the maximum byte size
     39 //    for a message, and wait until completion. It is invalid to attempt to
     40 //    Refresh an input stream or read data from the stream while a Refresh is
     41 //    pending.
     42 // 3. Check GetState() again to ensure the Refresh was successful.
     43 // 4. Use a CodedInputStream to read from the ZeroCopyInputStream interface of
     44 //    the SocketInputStream. Next(..) will return true until there is no data
     45 //    remaining.
     46 // 5. Call RebuildBuffer when done reading, to shift any unread data to the
     47 //    start of the buffer.
     48 // 6. Repeat as necessary.
     49 class GCM_EXPORT SocketInputStream
     50     : public google::protobuf::io::ZeroCopyInputStream {
     51  public:
     52   enum State {
     53     // No valid data to read. This means the buffer is either empty or all data
     54     // in the buffer has already been consumed.
     55     EMPTY,
     56     // Valid data to read.
     57     READY,
     58     // In the process of reading new data from the socket.
     59     READING,
     60     // An permanent error occurred and the stream is now closed.
     61     CLOSED,
     62   };
     63 
     64   // |socket| should already be connected.
     65   explicit SocketInputStream(net::StreamSocket* socket);
     66   virtual ~SocketInputStream();
     67 
     68   // ZeroCopyInputStream implementation.
     69   virtual bool Next(const void** data, int* size) OVERRIDE;
     70   virtual void BackUp(int count) OVERRIDE;
     71   virtual bool Skip(int count) OVERRIDE;  // Not implemented.
     72   virtual int64 ByteCount() const OVERRIDE;
     73 
     74   // The remaining amount of valid data available to be read.
     75   size_t UnreadByteCount() const;
     76 
     77   // Reads from the socket, appending a max of |byte_limit| bytes onto the read
     78   // buffer. net::ERR_IO_PENDING is returned if the refresh can't complete
     79   // synchronously, in which case the callback is invoked upon completion. If
     80   // the refresh can complete synchronously, even in case of an error, returns
     81   // net::OK without invoking callback.
     82   // Note: GetState() (and possibly last_error()) should be checked upon
     83   // completion to determine whether the Refresh encountered an error.
     84   net::Error Refresh(const base::Closure& callback, int byte_limit);
     85 
     86   // Rebuilds the buffer state by copying over any unread data to the beginning
     87   // of the buffer and resetting the buffer read/write positions.
     88   // Note: it is not valid to call Rebuild() if GetState() == CLOSED. The stream
     89   // must be recreated from scratch in such a scenario.
     90   void RebuildBuffer();
     91 
     92   // Returns the last fatal error encountered. Only valid if GetState() ==
     93   // CLOSED.
     94   net::Error last_error() const;
     95 
     96   // Returns the current state.
     97   State GetState() const;
     98 
     99  private:
    100   // Clears the local state.
    101   void ResetInternal();
    102 
    103   // Callback for Socket::Read calls.
    104   void RefreshCompletionCallback(const base::Closure& callback, int result);
    105 
    106   // Permanently closes the stream.
    107   void CloseStream(net::Error error, const base::Closure& callback);
    108 
    109   // Internal net components.
    110   net::StreamSocket* const socket_;
    111   const scoped_refptr<net::IOBuffer> io_buffer_;
    112   // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't
    113   // been written to yet by Socket::Read calls.
    114   const scoped_refptr<net::DrainableIOBuffer> read_buffer_;
    115 
    116   // Starting position of the data within |io_buffer_| to consume on subsequent
    117   // Next(..) call.  0 <= next_pos_ <= read_buffer_.BytesConsumed()
    118   // Note: next_pos == read_buffer_.BytesConsumed() implies GetState() == EMPTY.
    119   int next_pos_;
    120 
    121   // If < net::ERR_IO_PENDING, the last net error received.
    122   // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == READING.
    123   net::Error last_error_;
    124 
    125   base::WeakPtrFactory<SocketInputStream> weak_ptr_factory_;
    126 
    127   DISALLOW_COPY_AND_ASSIGN(SocketInputStream);
    128 };
    129 
    130 // A helper class for writing to a SocketStream with protobuf encoded data.
    131 // A SocketOutputStream does not take ownership of the socket itself, and it is
    132 // expected that the life of the output stream should match the life of the
    133 // socket itself (while the socket remains connected).
    134 // Typical usage:
    135 // 1. Check the GetState() of the output stream before using it. If CLOSED, the
    136 //    output stream must be rebuilt (and the socket likely needs to be
    137 //    reconnected, as an error was encountered).
    138 // 2. If EMPTY, the output stream can be written via a CodedOutputStream using
    139 //    the ZeroCopyOutputStream interface.
    140 // 3. Once done writing, GetState() should be READY, so call Flush(..) to write
    141 //    the buffer into the StreamSocket. Wait for the callback to be invoked
    142 //    (it's invalid to write to an output stream while it's flushing).
    143 // 4. Check the GetState() again to ensure the Flush was successful. GetState()
    144 //    should be EMPTY again.
    145 // 5. Repeat.
    146 class GCM_EXPORT SocketOutputStream
    147     : public google::protobuf::io::ZeroCopyOutputStream {
    148  public:
    149   enum State {
    150     // No valid data yet.
    151     EMPTY,
    152     // Ready for flushing (some data is present).
    153     READY,
    154     // In the process of flushing into the socket.
    155     FLUSHING,
    156     // A permanent error occurred, and the stream is now closed.
    157     CLOSED,
    158   };
    159 
    160   // |socket| should already be connected.
    161   explicit SocketOutputStream(net::StreamSocket* socket);
    162   virtual ~SocketOutputStream();
    163 
    164   // ZeroCopyOutputStream implementation.
    165   virtual bool Next(void** data, int* size) OVERRIDE;
    166   virtual void BackUp(int count) OVERRIDE;
    167   virtual int64 ByteCount() const OVERRIDE;
    168 
    169   // Writes the buffer into the Socket.
    170   net::Error Flush(const base::Closure& callback);
    171 
    172   // Returns the last fatal error encountered. Only valid if GetState() ==
    173   // CLOSED.
    174   net::Error last_error() const;
    175 
    176   // Returns the current state.
    177   State GetState() const;
    178 
    179  private:
    180   void FlushCompletionCallback(const base::Closure& callback, int result);
    181 
    182   // Internal net components.
    183   net::StreamSocket* const socket_;
    184   const scoped_refptr<net::IOBuffer> io_buffer_;
    185   // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't
    186   // been written to the socket yet.
    187   const scoped_refptr<net::DrainableIOBuffer> write_buffer_;
    188 
    189   // Starting position of the data within |io_buffer_| to consume on subsequent
    190   // Next(..) call.  0 <= write_buffer_.BytesConsumed() <= next_pos_
    191   // Note: next_pos == 0 implies GetState() == EMPTY.
    192   int next_pos_;
    193 
    194   // If < net::ERR_IO_PENDING, the last net error received.
    195   // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == FLUSHING.
    196   net::Error last_error_;
    197 
    198   base::WeakPtrFactory<SocketOutputStream> weak_ptr_factory_;
    199 
    200   DISALLOW_COPY_AND_ASSIGN(SocketOutputStream);
    201 };
    202 
    203 }  // namespace gcm
    204 
    205 #endif  // GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
    206