1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 // 5 // Protobuf ZeroCopy[Input/Output]Stream implementations capable of using a 6 // net::StreamSocket. Built to work with Protobuf CodedStreams. 7 8 #ifndef GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_ 9 #define GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_ 10 11 #include "base/basictypes.h" 12 #include "base/callback_forward.h" 13 #include "base/compiler_specific.h" 14 #include "base/memory/ref_counted.h" 15 #include "base/memory/weak_ptr.h" 16 #include "google/protobuf/io/zero_copy_stream.h" 17 #include "google_apis/gcm/base/gcm_export.h" 18 #include "net/base/net_errors.h" 19 20 namespace net { 21 class DrainableIOBuffer; 22 class IOBuffer; 23 class StreamSocket; 24 } // namespace net 25 26 namespace gcm { 27 28 // A helper class for interacting with a net::StreamSocket that is receiving 29 // protobuf encoded messages. A SocketInputStream does not take ownership of 30 // the socket itself, and it is expected that the life of the input stream 31 // should match the life of the socket itself (while the socket remains 32 // connected). If an error is encounters, the input stream will store the error 33 // in |last_error_|, and GetState() will be set to CLOSED. 34 // Typical usage: 35 // 1. Check the GetState() of the input stream before using it. If CLOSED, the 36 // input stream must be rebuilt (and the socket likely needs to be 37 // reconnected as an error was encountered). 38 // 2. If GetState() is EMPTY, call Refresh(..), passing the maximum byte size 39 // for a message, and wait until completion. It is invalid to attempt to 40 // Refresh an input stream or read data from the stream while a Refresh is 41 // pending. 42 // 3. Check GetState() again to ensure the Refresh was successful. 43 // 4. Use a CodedInputStream to read from the ZeroCopyInputStream interface of 44 // the SocketInputStream. Next(..) will return true until there is no data 45 // remaining. 46 // 5. Call RebuildBuffer when done reading, to shift any unread data to the 47 // start of the buffer. 48 // 6. Repeat as necessary. 49 class GCM_EXPORT SocketInputStream 50 : public google::protobuf::io::ZeroCopyInputStream { 51 public: 52 enum State { 53 // No valid data to read. This means the buffer is either empty or all data 54 // in the buffer has already been consumed. 55 EMPTY, 56 // Valid data to read. 57 READY, 58 // In the process of reading new data from the socket. 59 READING, 60 // An permanent error occurred and the stream is now closed. 61 CLOSED, 62 }; 63 64 // |socket| should already be connected. 65 explicit SocketInputStream(net::StreamSocket* socket); 66 virtual ~SocketInputStream(); 67 68 // ZeroCopyInputStream implementation. 69 virtual bool Next(const void** data, int* size) OVERRIDE; 70 virtual void BackUp(int count) OVERRIDE; 71 virtual bool Skip(int count) OVERRIDE; // Not implemented. 72 virtual int64 ByteCount() const OVERRIDE; 73 74 // The remaining amount of valid data available to be read. 75 size_t UnreadByteCount() const; 76 77 // Reads from the socket, appending a max of |byte_limit| bytes onto the read 78 // buffer. net::ERR_IO_PENDING is returned if the refresh can't complete 79 // synchronously, in which case the callback is invoked upon completion. If 80 // the refresh can complete synchronously, even in case of an error, returns 81 // net::OK without invoking callback. 82 // Note: GetState() (and possibly last_error()) should be checked upon 83 // completion to determine whether the Refresh encountered an error. 84 net::Error Refresh(const base::Closure& callback, int byte_limit); 85 86 // Rebuilds the buffer state by copying over any unread data to the beginning 87 // of the buffer and resetting the buffer read/write positions. 88 // Note: it is not valid to call Rebuild() if GetState() == CLOSED. The stream 89 // must be recreated from scratch in such a scenario. 90 void RebuildBuffer(); 91 92 // Returns the last fatal error encountered. Only valid if GetState() == 93 // CLOSED. 94 net::Error last_error() const; 95 96 // Returns the current state. 97 State GetState() const; 98 99 private: 100 // Clears the local state. 101 void ResetInternal(); 102 103 // Callback for Socket::Read calls. 104 void RefreshCompletionCallback(const base::Closure& callback, int result); 105 106 // Permanently closes the stream. 107 void CloseStream(net::Error error, const base::Closure& callback); 108 109 // Internal net components. 110 net::StreamSocket* const socket_; 111 const scoped_refptr<net::IOBuffer> io_buffer_; 112 // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't 113 // been written to yet by Socket::Read calls. 114 const scoped_refptr<net::DrainableIOBuffer> read_buffer_; 115 116 // Starting position of the data within |io_buffer_| to consume on subsequent 117 // Next(..) call. 0 <= next_pos_ <= read_buffer_.BytesConsumed() 118 // Note: next_pos == read_buffer_.BytesConsumed() implies GetState() == EMPTY. 119 int next_pos_; 120 121 // If < net::ERR_IO_PENDING, the last net error received. 122 // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == READING. 123 net::Error last_error_; 124 125 base::WeakPtrFactory<SocketInputStream> weak_ptr_factory_; 126 127 DISALLOW_COPY_AND_ASSIGN(SocketInputStream); 128 }; 129 130 // A helper class for writing to a SocketStream with protobuf encoded data. 131 // A SocketOutputStream does not take ownership of the socket itself, and it is 132 // expected that the life of the output stream should match the life of the 133 // socket itself (while the socket remains connected). 134 // Typical usage: 135 // 1. Check the GetState() of the output stream before using it. If CLOSED, the 136 // output stream must be rebuilt (and the socket likely needs to be 137 // reconnected, as an error was encountered). 138 // 2. If EMPTY, the output stream can be written via a CodedOutputStream using 139 // the ZeroCopyOutputStream interface. 140 // 3. Once done writing, GetState() should be READY, so call Flush(..) to write 141 // the buffer into the StreamSocket. Wait for the callback to be invoked 142 // (it's invalid to write to an output stream while it's flushing). 143 // 4. Check the GetState() again to ensure the Flush was successful. GetState() 144 // should be EMPTY again. 145 // 5. Repeat. 146 class GCM_EXPORT SocketOutputStream 147 : public google::protobuf::io::ZeroCopyOutputStream { 148 public: 149 enum State { 150 // No valid data yet. 151 EMPTY, 152 // Ready for flushing (some data is present). 153 READY, 154 // In the process of flushing into the socket. 155 FLUSHING, 156 // A permanent error occurred, and the stream is now closed. 157 CLOSED, 158 }; 159 160 // |socket| should already be connected. 161 explicit SocketOutputStream(net::StreamSocket* socket); 162 virtual ~SocketOutputStream(); 163 164 // ZeroCopyOutputStream implementation. 165 virtual bool Next(void** data, int* size) OVERRIDE; 166 virtual void BackUp(int count) OVERRIDE; 167 virtual int64 ByteCount() const OVERRIDE; 168 169 // Writes the buffer into the Socket. 170 net::Error Flush(const base::Closure& callback); 171 172 // Returns the last fatal error encountered. Only valid if GetState() == 173 // CLOSED. 174 net::Error last_error() const; 175 176 // Returns the current state. 177 State GetState() const; 178 179 private: 180 void FlushCompletionCallback(const base::Closure& callback, int result); 181 182 // Internal net components. 183 net::StreamSocket* const socket_; 184 const scoped_refptr<net::IOBuffer> io_buffer_; 185 // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't 186 // been written to the socket yet. 187 const scoped_refptr<net::DrainableIOBuffer> write_buffer_; 188 189 // Starting position of the data within |io_buffer_| to consume on subsequent 190 // Next(..) call. 0 <= write_buffer_.BytesConsumed() <= next_pos_ 191 // Note: next_pos == 0 implies GetState() == EMPTY. 192 int next_pos_; 193 194 // If < net::ERR_IO_PENDING, the last net error received. 195 // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == FLUSHING. 196 net::Error last_error_; 197 198 base::WeakPtrFactory<SocketOutputStream> weak_ptr_factory_; 199 200 DISALLOW_COPY_AND_ASSIGN(SocketOutputStream); 201 }; 202 203 } // namespace gcm 204 205 #endif // GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_ 206