Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/reliable_quic_stream.h"
      6 
      7 #include "net/quic/quic_session.h"
      8 #include "net/quic/quic_spdy_decompressor.h"
      9 #include "net/spdy/write_blocked_list.h"
     10 
     11 using base::StringPiece;
     12 using std::min;
     13 
     14 namespace net {
     15 
     16 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
     17 
     18 namespace {
     19 
     20 struct iovec MakeIovec(StringPiece data) {
     21   struct iovec iov = {const_cast<char*>(data.data()),
     22                       static_cast<size_t>(data.size())};
     23   return iov;
     24 }
     25 
     26 }  // namespace
     27 
     28 ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
     29                                        QuicSession* session)
     30     : sequencer_(this),
     31       id_(id),
     32       session_(session),
     33       stream_bytes_read_(0),
     34       stream_bytes_written_(0),
     35       stream_error_(QUIC_STREAM_NO_ERROR),
     36       connection_error_(QUIC_NO_ERROR),
     37       read_side_closed_(false),
     38       write_side_closed_(false),
     39       fin_buffered_(false),
     40       fin_sent_(false),
     41       is_server_(session_->is_server()) {
     42 }
     43 
     44 ReliableQuicStream::~ReliableQuicStream() {
     45 }
     46 
     47 bool ReliableQuicStream::WillAcceptStreamFrame(
     48     const QuicStreamFrame& frame) const {
     49   if (read_side_closed_) {
     50     return true;
     51   }
     52   if (frame.stream_id != id_) {
     53     LOG(ERROR) << "Error!";
     54     return false;
     55   }
     56   return sequencer_.WillAcceptStreamFrame(frame);
     57 }
     58 
     59 bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
     60   DCHECK_EQ(frame.stream_id, id_);
     61   if (read_side_closed_) {
     62     DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
     63     // We don't want to be reading: blackhole the data.
     64     return true;
     65   }
     66   // Note: This count include duplicate data received.
     67   stream_bytes_read_ += frame.data.TotalBufferSize();
     68 
     69   bool accepted = sequencer_.OnStreamFrame(frame);
     70 
     71   return accepted;
     72 }
     73 
     74 void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
     75   stream_error_ = error;
     76   CloseWriteSide();
     77   CloseReadSide();
     78 }
     79 
     80 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
     81                                             bool from_peer) {
     82   if (read_side_closed_ && write_side_closed_) {
     83     return;
     84   }
     85   if (error != QUIC_NO_ERROR) {
     86     stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
     87     connection_error_ = error;
     88   }
     89 
     90   CloseWriteSide();
     91   CloseReadSide();
     92 }
     93 
     94 void ReliableQuicStream::OnFinRead() {
     95   DCHECK(sequencer_.IsClosed());
     96   CloseReadSide();
     97 }
     98 
     99 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
    100   DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
    101   stream_error_ = error;
    102   // Sending a RstStream results in calling CloseStream.
    103   session()->SendRstStream(id(), error);
    104 }
    105 
    106 void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
    107   session()->connection()->SendConnectionClose(error);
    108 }
    109 
    110 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
    111                                                     const string& details) {
    112   session()->connection()->SendConnectionCloseWithDetails(error, details);
    113 }
    114 
    115 QuicVersion ReliableQuicStream::version() {
    116   return session()->connection()->version();
    117 }
    118 
    119 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) {
    120   DCHECK(data.size() > 0 || fin);
    121   DCHECK(!fin_buffered_);
    122 
    123   QuicConsumedData consumed_data(0, false);
    124   fin_buffered_ = fin;
    125 
    126   if (queued_data_.empty()) {
    127     struct iovec iov(MakeIovec(data));
    128     consumed_data = WritevData(&iov, 1, fin, NULL);
    129     DCHECK_LE(consumed_data.bytes_consumed, data.length());
    130   }
    131 
    132   // If there's unconsumed data or an unconsumed fin, queue it.
    133   if (consumed_data.bytes_consumed < data.length() ||
    134       (fin && !consumed_data.fin_consumed)) {
    135     queued_data_.push_back(
    136         string(data.data() + consumed_data.bytes_consumed,
    137                data.length() - consumed_data.bytes_consumed));
    138   }
    139 }
    140 
    141 void ReliableQuicStream::OnCanWrite() {
    142   bool fin = false;
    143   while (!queued_data_.empty()) {
    144     const string& data = queued_data_.front();
    145     if (queued_data_.size() == 1 && fin_buffered_) {
    146       fin = true;
    147     }
    148     struct iovec iov(MakeIovec(data));
    149     QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL);
    150     if (consumed_data.bytes_consumed == data.size() &&
    151         fin == consumed_data.fin_consumed) {
    152       queued_data_.pop_front();
    153     } else {
    154       queued_data_.front().erase(0, consumed_data.bytes_consumed);
    155       break;
    156     }
    157   }
    158 }
    159 
    160 QuicConsumedData ReliableQuicStream::WritevData(
    161     const struct iovec* iov,
    162     int iov_count,
    163     bool fin,
    164     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
    165   if (write_side_closed_) {
    166     DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
    167     return QuicConsumedData(0, false);
    168   }
    169 
    170   size_t write_length = 0u;
    171   for (int i = 0; i < iov_count; ++i) {
    172     write_length += iov[i].iov_len;
    173   }
    174   QuicConsumedData consumed_data = session()->WritevData(
    175       id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate);
    176   stream_bytes_written_ += consumed_data.bytes_consumed;
    177   if (consumed_data.bytes_consumed == write_length) {
    178     if (fin && consumed_data.fin_consumed) {
    179       fin_sent_ = true;
    180       CloseWriteSide();
    181     } else if (fin && !consumed_data.fin_consumed) {
    182       session_->MarkWriteBlocked(id(), EffectivePriority());
    183     }
    184   } else {
    185     session_->MarkWriteBlocked(id(), EffectivePriority());
    186   }
    187   return consumed_data;
    188 }
    189 
    190 void ReliableQuicStream::CloseReadSide() {
    191   if (read_side_closed_) {
    192     return;
    193   }
    194   DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
    195 
    196   read_side_closed_ = true;
    197   if (write_side_closed_) {
    198     DVLOG(1) << ENDPOINT << "Closing stream: " << id();
    199     session_->CloseStream(id());
    200   }
    201 }
    202 
    203 void ReliableQuicStream::CloseWriteSide() {
    204   if (write_side_closed_) {
    205     return;
    206   }
    207   DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
    208 
    209   write_side_closed_ = true;
    210   if (read_side_closed_) {
    211     DVLOG(1) << ENDPOINT << "Closing stream: " << id();
    212     session_->CloseStream(id());
    213   }
    214 }
    215 
    216 bool ReliableQuicStream::HasBufferedData() {
    217   return !queued_data_.empty();
    218 }
    219 
    220 void ReliableQuicStream::OnClose() {
    221   CloseReadSide();
    222   CloseWriteSide();
    223 }
    224 
    225 }  // namespace net
    226