Home | History | Annotate | Download | only in socket
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/socket/buffered_write_stream_socket.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/location.h"
      9 #include "base/message_loop/message_loop.h"
     10 #include "net/base/io_buffer.h"
     11 #include "net/base/net_errors.h"
     12 
     13 namespace net {
     14 
     15 namespace {
     16 
     17 void AppendBuffer(GrowableIOBuffer* dst, IOBuffer* src, int src_len) {
     18   int old_capacity = dst->capacity();
     19   dst->SetCapacity(old_capacity + src_len);
     20   memcpy(dst->StartOfBuffer() + old_capacity, src->data(), src_len);
     21 }
     22 
     23 }  // anonymous namespace
     24 
     25 BufferedWriteStreamSocket::BufferedWriteStreamSocket(
     26     StreamSocket* socket_to_wrap)
     27     : wrapped_socket_(socket_to_wrap),
     28       io_buffer_(new GrowableIOBuffer()),
     29       backup_buffer_(new GrowableIOBuffer()),
     30       weak_factory_(this),
     31       callback_pending_(false),
     32       wrapped_write_in_progress_(false),
     33       error_(0) {
     34 }
     35 
     36 BufferedWriteStreamSocket::~BufferedWriteStreamSocket() {
     37 }
     38 
     39 int BufferedWriteStreamSocket::Read(IOBuffer* buf, int buf_len,
     40                                     const CompletionCallback& callback) {
     41   return wrapped_socket_->Read(buf, buf_len, callback);
     42 }
     43 
     44 int BufferedWriteStreamSocket::Write(IOBuffer* buf, int buf_len,
     45                                      const CompletionCallback& callback) {
     46   if (error_) {
     47     return error_;
     48   }
     49   GrowableIOBuffer* idle_buffer =
     50       wrapped_write_in_progress_ ? backup_buffer_.get() : io_buffer_.get();
     51   AppendBuffer(idle_buffer, buf, buf_len);
     52   if (!callback_pending_) {
     53     base::MessageLoop::current()->PostTask(
     54         FROM_HERE,
     55         base::Bind(&BufferedWriteStreamSocket::DoDelayedWrite,
     56                    weak_factory_.GetWeakPtr()));
     57     callback_pending_ = true;
     58   }
     59   return buf_len;
     60 }
     61 
     62 bool BufferedWriteStreamSocket::SetReceiveBufferSize(int32 size) {
     63   return wrapped_socket_->SetReceiveBufferSize(size);
     64 }
     65 
     66 bool BufferedWriteStreamSocket::SetSendBufferSize(int32 size) {
     67   return wrapped_socket_->SetSendBufferSize(size);
     68 }
     69 
     70 int BufferedWriteStreamSocket::Connect(const CompletionCallback& callback) {
     71   return wrapped_socket_->Connect(callback);
     72 }
     73 
     74 void BufferedWriteStreamSocket::Disconnect() {
     75   wrapped_socket_->Disconnect();
     76 }
     77 
     78 bool BufferedWriteStreamSocket::IsConnected() const {
     79   return wrapped_socket_->IsConnected();
     80 }
     81 
     82 bool BufferedWriteStreamSocket::IsConnectedAndIdle() const {
     83   return wrapped_socket_->IsConnectedAndIdle();
     84 }
     85 
     86 int BufferedWriteStreamSocket::GetPeerAddress(IPEndPoint* address) const {
     87   return wrapped_socket_->GetPeerAddress(address);
     88 }
     89 
     90 int BufferedWriteStreamSocket::GetLocalAddress(IPEndPoint* address) const {
     91   return wrapped_socket_->GetLocalAddress(address);
     92 }
     93 
     94 const BoundNetLog& BufferedWriteStreamSocket::NetLog() const {
     95   return wrapped_socket_->NetLog();
     96 }
     97 
     98 void BufferedWriteStreamSocket::SetSubresourceSpeculation() {
     99   wrapped_socket_->SetSubresourceSpeculation();
    100 }
    101 
    102 void BufferedWriteStreamSocket::SetOmniboxSpeculation() {
    103   wrapped_socket_->SetOmniboxSpeculation();
    104 }
    105 
    106 bool BufferedWriteStreamSocket::WasEverUsed() const {
    107   return wrapped_socket_->WasEverUsed();
    108 }
    109 
    110 bool BufferedWriteStreamSocket::UsingTCPFastOpen() const {
    111   return wrapped_socket_->UsingTCPFastOpen();
    112 }
    113 
    114 bool BufferedWriteStreamSocket::WasNpnNegotiated() const {
    115   return wrapped_socket_->WasNpnNegotiated();
    116 }
    117 
    118 NextProto BufferedWriteStreamSocket::GetNegotiatedProtocol() const {
    119   return wrapped_socket_->GetNegotiatedProtocol();
    120 }
    121 
    122 bool BufferedWriteStreamSocket::GetSSLInfo(SSLInfo* ssl_info) {
    123   return wrapped_socket_->GetSSLInfo(ssl_info);
    124 }
    125 
    126 void BufferedWriteStreamSocket::DoDelayedWrite() {
    127   int result = wrapped_socket_->Write(
    128       io_buffer_.get(),
    129       io_buffer_->RemainingCapacity(),
    130       base::Bind(&BufferedWriteStreamSocket::OnIOComplete,
    131                  base::Unretained(this)));
    132   if (result == ERR_IO_PENDING) {
    133     callback_pending_ = true;
    134     wrapped_write_in_progress_ = true;
    135   } else {
    136     OnIOComplete(result);
    137   }
    138 }
    139 
    140 void BufferedWriteStreamSocket::OnIOComplete(int result) {
    141   callback_pending_ = false;
    142   wrapped_write_in_progress_ = false;
    143   if (backup_buffer_->RemainingCapacity()) {
    144     AppendBuffer(io_buffer_.get(), backup_buffer_.get(),
    145                  backup_buffer_->RemainingCapacity());
    146     backup_buffer_->SetCapacity(0);
    147   }
    148   if (result < 0) {
    149     error_ = result;
    150     io_buffer_->SetCapacity(0);
    151   } else {
    152     io_buffer_->set_offset(io_buffer_->offset() + result);
    153     if (io_buffer_->RemainingCapacity()) {
    154       DoDelayedWrite();
    155     } else {
    156       io_buffer_->SetCapacity(0);
    157     }
    158   }
    159 }
    160 
    161 }  // namespace net
    162