Home | History | Annotate | Download | only in base
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "google_apis/gcm/base/socket_stream.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/callback.h"
      9 #include "net/base/io_buffer.h"
     10 #include "net/socket/stream_socket.h"
     11 
     12 namespace gcm {
     13 
     14 namespace {
     15 
     16 // TODO(zea): consider having dynamically-sized buffers if this becomes too
     17 // expensive.
     18 const uint32 kDefaultBufferSize = 8*1024;
     19 
     20 }  // namespace
     21 
     22 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
     23     : socket_(socket),
     24       io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
     25       read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
     26                                               kDefaultBufferSize)),
     27       next_pos_(0),
     28       last_error_(net::OK),
     29       weak_ptr_factory_(this) {
     30   DCHECK(socket->IsConnected());
     31 }
     32 
     33 SocketInputStream::~SocketInputStream() {
     34 }
     35 
     36 bool SocketInputStream::Next(const void** data, int* size) {
     37   if (GetState() != EMPTY && GetState() != READY) {
     38     NOTREACHED() << "Invalid input stream read attempt.";
     39     return false;
     40   }
     41 
     42   if (GetState() == EMPTY) {
     43     DVLOG(1) << "No unread data remaining, ending read.";
     44     return false;
     45   }
     46 
     47   DCHECK_EQ(GetState(), READY)
     48       << " Input stream must have pending data before reading.";
     49   DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
     50   *data = io_buffer_->data() + next_pos_;
     51   *size = UnreadByteCount();
     52   next_pos_ = read_buffer_->BytesConsumed();
     53   DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
     54   return true;
     55 }
     56 
     57 void SocketInputStream::BackUp(int count) {
     58   DCHECK(GetState() == READY || GetState() == EMPTY);
     59   DCHECK_GT(count, 0);
     60   DCHECK_LE(count, next_pos_);
     61 
     62   next_pos_ -= count;
     63   DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
     64            << "Current position now at " << next_pos_
     65            << " of " << read_buffer_->BytesConsumed();
     66 }
     67 
     68 bool SocketInputStream::Skip(int count) {
     69   NOTIMPLEMENTED();
     70   return false;
     71 }
     72 
     73 int64 SocketInputStream::ByteCount() const {
     74   DCHECK_NE(GetState(), CLOSED);
     75   DCHECK_NE(GetState(), READING);
     76   return next_pos_;
     77 }
     78 
     79 size_t SocketInputStream::UnreadByteCount() const {
     80   DCHECK_NE(GetState(), CLOSED);
     81   DCHECK_NE(GetState(), READING);
     82   return read_buffer_->BytesConsumed() - next_pos_;
     83 }
     84 
     85 net::Error SocketInputStream::Refresh(const base::Closure& callback,
     86                                       int byte_limit) {
     87   DCHECK_NE(GetState(), CLOSED);
     88   DCHECK_NE(GetState(), READING);
     89   DCHECK_GT(byte_limit, 0);
     90 
     91   if (byte_limit > read_buffer_->BytesRemaining()) {
     92     NOTREACHED() << "Out of buffer space, closing input stream.";
     93     CloseStream(net::ERR_UNEXPECTED, base::Closure());
     94     return net::OK;
     95   }
     96 
     97   if (!socket_->IsConnected()) {
     98     LOG(ERROR) << "Socket was disconnected, closing input stream";
     99     CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
    100     return net::OK;
    101   }
    102 
    103   DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
    104   int result = socket_->Read(
    105       read_buffer_,
    106       byte_limit,
    107       base::Bind(&SocketInputStream::RefreshCompletionCallback,
    108                  weak_ptr_factory_.GetWeakPtr(),
    109                  callback));
    110   DVLOG(1) << "Read returned " << result;
    111   if (result == net::ERR_IO_PENDING) {
    112     last_error_ = net::ERR_IO_PENDING;
    113     return net::ERR_IO_PENDING;
    114   }
    115 
    116   RefreshCompletionCallback(base::Closure(), result);
    117   return net::OK;
    118 }
    119 
    120 void SocketInputStream::RebuildBuffer() {
    121   DVLOG(1) << "Rebuilding input stream, consumed "
    122            << next_pos_ << " bytes.";
    123   DCHECK_NE(GetState(), READING);
    124   DCHECK_NE(GetState(), CLOSED);
    125 
    126   int unread_data_size = 0;
    127   const void* unread_data_ptr = NULL;
    128   Next(&unread_data_ptr, &unread_data_size);
    129   ResetInternal();
    130 
    131   if (unread_data_ptr != io_buffer_->data()) {
    132     DVLOG(1) << "Have " << unread_data_size
    133              << " unread bytes remaining, shifting.";
    134     // Move any remaining unread data to the start of the buffer;
    135     std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
    136   } else {
    137     DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
    138   }
    139   read_buffer_->DidConsume(unread_data_size);
    140 }
    141 
    142 net::Error SocketInputStream::last_error() const {
    143   return last_error_;
    144 }
    145 
    146 SocketInputStream::State SocketInputStream::GetState() const {
    147   if (last_error_ < net::ERR_IO_PENDING)
    148     return CLOSED;
    149 
    150   if (last_error_ == net::ERR_IO_PENDING)
    151     return READING;
    152 
    153   DCHECK_EQ(last_error_, net::OK);
    154   if (read_buffer_->BytesConsumed() == next_pos_)
    155     return EMPTY;
    156 
    157   return READY;
    158 }
    159 
    160 void SocketInputStream::RefreshCompletionCallback(
    161     const base::Closure& callback, int result) {
    162   // If an error occurred before the completion callback could complete, ignore
    163   // the result.
    164   if (GetState() == CLOSED)
    165     return;
    166 
    167   // Result == 0 implies EOF, which is treated as an error.
    168   if (result == 0)
    169     result = net::ERR_CONNECTION_CLOSED;
    170 
    171   DCHECK_NE(result, net::ERR_IO_PENDING);
    172 
    173   if (result < net::OK) {
    174     DVLOG(1) << "Failed to refresh socket: " << result;
    175     CloseStream(static_cast<net::Error>(result), callback);
    176     return;
    177   }
    178 
    179   DCHECK_GT(result, 0);
    180   last_error_ = net::OK;
    181   read_buffer_->DidConsume(result);
    182 
    183   DVLOG(1) << "Refresh complete with " << result << " new bytes. "
    184            << "Current position " << next_pos_
    185            << " of " << read_buffer_->BytesConsumed() << ".";
    186 
    187   if (!callback.is_null())
    188     callback.Run();
    189 }
    190 
    191 void SocketInputStream::ResetInternal() {
    192   read_buffer_->SetOffset(0);
    193   next_pos_ = 0;
    194   last_error_ = net::OK;
    195   weak_ptr_factory_.InvalidateWeakPtrs();  // Invalidate any callbacks.
    196 }
    197 
    198 void SocketInputStream::CloseStream(net::Error error,
    199                                     const base::Closure& callback) {
    200   DCHECK_LT(error, net::ERR_IO_PENDING);
    201   ResetInternal();
    202   last_error_ = error;
    203   LOG(ERROR) << "Closing stream with result " << error;
    204   if (!callback.is_null())
    205     callback.Run();
    206 }
    207 
    208 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
    209     : socket_(socket),
    210       io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
    211       write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
    212                                                kDefaultBufferSize)),
    213       next_pos_(0),
    214       last_error_(net::OK),
    215       weak_ptr_factory_(this) {
    216   DCHECK(socket->IsConnected());
    217 }
    218 
    219 SocketOutputStream::~SocketOutputStream() {
    220 }
    221 
    222 bool SocketOutputStream::Next(void** data, int* size) {
    223   DCHECK_NE(GetState(), CLOSED);
    224   DCHECK_NE(GetState(), FLUSHING);
    225   if (next_pos_ == write_buffer_->size())
    226     return false;
    227 
    228   *data = write_buffer_->data() + next_pos_;
    229   *size = write_buffer_->size() - next_pos_;
    230   next_pos_ = write_buffer_->size();
    231   return true;
    232 }
    233 
    234 void SocketOutputStream::BackUp(int count) {
    235   DCHECK_GE(count, 0);
    236   if (count > next_pos_)
    237     next_pos_ = 0;
    238   next_pos_ -= count;
    239   DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
    240            << next_pos_ << " bytes used.";
    241 }
    242 
    243 int64 SocketOutputStream::ByteCount() const {
    244   DCHECK_NE(GetState(), CLOSED);
    245   DCHECK_NE(GetState(), FLUSHING);
    246   return next_pos_;
    247 }
    248 
    249 net::Error SocketOutputStream::Flush(const base::Closure& callback) {
    250   DCHECK_EQ(GetState(), READY);
    251 
    252   if (!socket_->IsConnected()) {
    253     LOG(ERROR) << "Socket was disconnected, closing output stream";
    254     last_error_ = net::ERR_CONNECTION_CLOSED;
    255     return net::OK;
    256   }
    257 
    258   DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket.";
    259   int result = socket_->Write(
    260       write_buffer_,
    261       next_pos_,
    262       base::Bind(&SocketOutputStream::FlushCompletionCallback,
    263                  weak_ptr_factory_.GetWeakPtr(),
    264                  callback));
    265   DVLOG(1) << "Write returned " << result;
    266   if (result == net::ERR_IO_PENDING) {
    267     last_error_ = net::ERR_IO_PENDING;
    268     return net::ERR_IO_PENDING;
    269   }
    270 
    271   FlushCompletionCallback(base::Closure(), result);
    272   return net::OK;
    273 }
    274 
    275 SocketOutputStream::State SocketOutputStream::GetState() const{
    276   if (last_error_ < net::ERR_IO_PENDING)
    277     return CLOSED;
    278 
    279   if (last_error_ == net::ERR_IO_PENDING)
    280     return FLUSHING;
    281 
    282   DCHECK_EQ(last_error_, net::OK);
    283   if (next_pos_ == 0)
    284     return EMPTY;
    285 
    286   return READY;
    287 }
    288 
    289 net::Error SocketOutputStream::last_error() const {
    290   return last_error_;
    291 }
    292 
    293 void SocketOutputStream::FlushCompletionCallback(
    294     const base::Closure& callback, int result) {
    295   // If an error occurred before the completion callback could complete, ignore
    296   // the result.
    297   if (GetState() == CLOSED)
    298     return;
    299 
    300   // Result == 0 implies EOF, which is treated as an error.
    301   if (result == 0)
    302     result = net::ERR_CONNECTION_CLOSED;
    303 
    304   DCHECK_NE(result, net::ERR_IO_PENDING);
    305 
    306   if (result < net::OK) {
    307     LOG(ERROR) << "Failed to flush socket.";
    308     last_error_ = static_cast<net::Error>(result);
    309     if (!callback.is_null())
    310       callback.Run();
    311     return;
    312   }
    313 
    314   DCHECK_GT(result, net::OK);
    315   last_error_ = net::OK;
    316 
    317   if (write_buffer_->BytesConsumed() + result < next_pos_) {
    318     DVLOG(1) << "Partial flush complete. Retrying.";
    319      // Only a partial write was completed. Flush again to finish the write.
    320     write_buffer_->DidConsume(result);
    321     Flush(callback);
    322     return;
    323   }
    324 
    325   DVLOG(1) << "Socket flush complete.";
    326   write_buffer_->SetOffset(0);
    327   next_pos_ = 0;
    328   if (!callback.is_null())
    329     callback.Run();
    330 }
    331 
    332 }  // namespace gcm
    333