Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/reliable_quic_stream.h"
      6 
      7 #include "net/quic/quic_session.h"
      8 #include "net/quic/quic_spdy_decompressor.h"
      9 
     10 using base::StringPiece;
     11 using std::min;
     12 
     13 namespace net {
     14 
     15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
     16                                        QuicSession* session)
     17     : sequencer_(this),
     18       id_(id),
     19       session_(session),
     20       visitor_(NULL),
     21       stream_bytes_read_(0),
     22       stream_bytes_written_(0),
     23       headers_decompressed_(false),
     24       headers_id_(0),
     25       decompression_failed_(false),
     26       stream_error_(QUIC_STREAM_NO_ERROR),
     27       connection_error_(QUIC_NO_ERROR),
     28       read_side_closed_(false),
     29       write_side_closed_(false),
     30       fin_buffered_(false),
     31       fin_sent_(false) {
     32 }
     33 
     34 ReliableQuicStream::~ReliableQuicStream() {
     35 }
     36 
     37 bool ReliableQuicStream::WillAcceptStreamFrame(
     38     const QuicStreamFrame& frame) const {
     39   if (read_side_closed_) {
     40     return true;
     41   }
     42   if (frame.stream_id != id_) {
     43     LOG(ERROR) << "Error!";
     44     return false;
     45   }
     46   return sequencer_.WillAcceptStreamFrame(frame);
     47 }
     48 
     49 bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
     50   DCHECK_EQ(frame.stream_id, id_);
     51   if (read_side_closed_) {
     52     DLOG(INFO) << "Ignoring frame " << frame.stream_id;
     53     // We don't want to be reading: blackhole the data.
     54     return true;
     55   }
     56   // Note: This count include duplicate data received.
     57   stream_bytes_read_ += frame.data.length();
     58 
     59   bool accepted = sequencer_.OnStreamFrame(frame);
     60 
     61   return accepted;
     62 }
     63 
     64 void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
     65   stream_error_ = error;
     66   TerminateFromPeer(false);  // Full close.
     67 }
     68 
     69 void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) {
     70   if (read_side_closed_ && write_side_closed_) {
     71     return;
     72   }
     73   if (error != QUIC_NO_ERROR) {
     74     stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
     75     connection_error_ = error;
     76   }
     77 
     78   if (from_peer) {
     79     TerminateFromPeer(false);
     80   } else {
     81     CloseWriteSide();
     82     CloseReadSide();
     83   }
     84 }
     85 
     86 void ReliableQuicStream::TerminateFromPeer(bool half_close) {
     87   if (!half_close) {
     88     CloseWriteSide();
     89   }
     90   CloseReadSide();
     91 }
     92 
     93 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
     94   stream_error_ = error;
     95   if (error != QUIC_STREAM_NO_ERROR)  {
     96     // Sending a RstStream results in calling CloseStream.
     97     session()->SendRstStream(id(), error);
     98   } else {
     99     session_->CloseStream(id());
    100   }
    101 }
    102 
    103 size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
    104   if (headers_decompressed_ && decompressed_headers_.empty()) {
    105     return sequencer_.Readv(iov, iov_len);
    106   }
    107   size_t bytes_consumed = 0;
    108   size_t iov_index = 0;
    109   while (iov_index < iov_len &&
    110          decompressed_headers_.length() > bytes_consumed) {
    111     size_t bytes_to_read = min(iov[iov_index].iov_len,
    112                                decompressed_headers_.length() - bytes_consumed);
    113     char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
    114     memcpy(iov_ptr,
    115            decompressed_headers_.data() + bytes_consumed, bytes_to_read);
    116     bytes_consumed += bytes_to_read;
    117     ++iov_index;
    118   }
    119   decompressed_headers_.erase(0, bytes_consumed);
    120   return bytes_consumed;
    121 }
    122 
    123 int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
    124   if (headers_decompressed_ && decompressed_headers_.empty()) {
    125     return sequencer_.GetReadableRegions(iov, iov_len);
    126   }
    127   if (iov_len == 0) {
    128     return 0;
    129   }
    130   iov[0].iov_base = static_cast<void*>(
    131       const_cast<char*>(decompressed_headers_.data()));
    132   iov[0].iov_len = decompressed_headers_.length();
    133   return 1;
    134 }
    135 
    136 bool ReliableQuicStream::IsHalfClosed() const {
    137   if (!headers_decompressed_ || !decompressed_headers_.empty()) {
    138     return false;
    139   }
    140   return sequencer_.IsHalfClosed();
    141 }
    142 
    143 bool ReliableQuicStream::HasBytesToRead() const {
    144   return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
    145 }
    146 
    147 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
    148   return session_->peer_address();
    149 }
    150 
    151 QuicSpdyCompressor* ReliableQuicStream::compressor() {
    152   return session_->compressor();
    153 }
    154 
    155 bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
    156   return session_->GetSSLInfo(ssl_info);
    157 }
    158 
    159 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
    160   DCHECK(data.size() > 0 || fin);
    161   return WriteOrBuffer(data, fin);
    162 }
    163 
    164 QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
    165   DCHECK(!fin_buffered_);
    166 
    167   QuicConsumedData consumed_data(0, false);
    168   fin_buffered_ = fin;
    169 
    170   if (queued_data_.empty()) {
    171     consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
    172     DCHECK_LE(consumed_data.bytes_consumed, data.length());
    173   }
    174 
    175   // If there's unconsumed data or an unconsumed fin, queue it.
    176   if (consumed_data.bytes_consumed < data.length() ||
    177       (fin && !consumed_data.fin_consumed)) {
    178     queued_data_.push_back(
    179         string(data.data() + consumed_data.bytes_consumed,
    180                data.length() - consumed_data.bytes_consumed));
    181   }
    182 
    183   return QuicConsumedData(data.size(), true);
    184 }
    185 
    186 void ReliableQuicStream::OnCanWrite() {
    187   bool fin = false;
    188   while (!queued_data_.empty()) {
    189     const string& data = queued_data_.front();
    190     if (queued_data_.size() == 1 && fin_buffered_) {
    191       fin = true;
    192     }
    193     QuicConsumedData consumed_data = WriteDataInternal(data, fin);
    194     if (consumed_data.bytes_consumed == data.size() &&
    195         fin == consumed_data.fin_consumed) {
    196       queued_data_.pop_front();
    197     } else {
    198       queued_data_.front().erase(0, consumed_data.bytes_consumed);
    199       break;
    200     }
    201   }
    202 }
    203 
    204 QuicConsumedData ReliableQuicStream::WriteDataInternal(
    205     StringPiece data, bool fin) {
    206   if (write_side_closed_) {
    207     DLOG(ERROR) << "Attempt to write when the write side is closed";
    208     return QuicConsumedData(0, false);
    209   }
    210 
    211   QuicConsumedData consumed_data =
    212       session()->WriteData(id(), data, stream_bytes_written_, fin);
    213   stream_bytes_written_ += consumed_data.bytes_consumed;
    214   if (consumed_data.bytes_consumed == data.length()) {
    215     if (fin && consumed_data.fin_consumed) {
    216       fin_sent_ = true;
    217       CloseWriteSide();
    218     } else if (fin && !consumed_data.fin_consumed) {
    219       session_->MarkWriteBlocked(id());
    220     }
    221   } else {
    222     session_->MarkWriteBlocked(id());
    223   }
    224   return consumed_data;
    225 }
    226 
    227 void ReliableQuicStream::CloseReadSide() {
    228   if (read_side_closed_) {
    229     return;
    230   }
    231   DLOG(INFO) << "Done reading from stream " << id();
    232 
    233   read_side_closed_ = true;
    234   if (write_side_closed_) {
    235     DLOG(INFO) << "Closing stream: " << id();
    236     session_->CloseStream(id());
    237   }
    238 }
    239 
    240 uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
    241   if (id() == kCryptoStreamId) {
    242     if (data_len == 0) {
    243       return 0;
    244     }
    245     // The crypto stream does not use compression.
    246     return ProcessData(data, data_len);
    247   }
    248   uint32 total_bytes_consumed = 0;
    249   if (headers_id_ == 0u) {
    250     // The headers ID has not yet been read.  Strip it from the beginning of
    251     // the data stream.
    252     DCHECK_GT(4u, headers_id_buffer_.length());
    253     size_t missing_size = 4 - headers_id_buffer_.length();
    254     if (data_len < missing_size) {
    255       StringPiece(data, data_len).AppendToString(&headers_id_buffer_);
    256       return data_len;
    257     }
    258     total_bytes_consumed += missing_size;
    259     StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
    260     DCHECK_EQ(4u, headers_id_buffer_.length());
    261     memcpy(&headers_id_, headers_id_buffer_.data(), 4);
    262     headers_id_buffer_.clear();
    263     data += missing_size;
    264     data_len -= missing_size;
    265   }
    266   DCHECK_NE(0u, headers_id_);
    267   if (data_len == 0) {
    268     return total_bytes_consumed;
    269   }
    270 
    271   // Once the headers are finished, we simply pass the data through.
    272   if (headers_decompressed_) {
    273     // Some buffered header data remains.
    274     if (!decompressed_headers_.empty()) {
    275       ProcessHeaderData();
    276     }
    277     if (decompressed_headers_.empty()) {
    278       DVLOG(1) << "Delegating procesing to ProcessData";
    279       total_bytes_consumed += ProcessData(data, data_len);
    280     }
    281     return total_bytes_consumed;
    282   }
    283 
    284   QuicHeaderId current_header_id =
    285       session_->decompressor()->current_header_id();
    286   // Ensure that this header id looks sane.
    287   if (headers_id_ < current_header_id ||
    288       headers_id_ > kMaxHeaderIdDelta + current_header_id) {
    289     DVLOG(1) << "Invalid headers for stream: " << id()
    290              << " header_id: " << headers_id_
    291              << " current_header_id: " << current_header_id;
    292     session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
    293     return total_bytes_consumed;
    294   }
    295 
    296   // If we are head-of-line blocked on decompression, then back up.
    297   if (current_header_id != headers_id_) {
    298     session_->MarkDecompressionBlocked(headers_id_, id());
    299     DVLOG(1) << "Unable to decompress header data for stream: " << id()
    300              << " header_id: " << headers_id_;
    301     return total_bytes_consumed;
    302   }
    303 
    304   // Decompressed data will be delivered to decompressed_headers_.
    305   size_t bytes_consumed = session_->decompressor()->DecompressData(
    306       StringPiece(data, data_len), this);
    307   DCHECK_NE(0u, bytes_consumed);
    308   if (bytes_consumed > data_len) {
    309     DCHECK(false) << "DecompressData returned illegal value";
    310     OnDecompressionError();
    311     return total_bytes_consumed;
    312   }
    313   total_bytes_consumed += bytes_consumed;
    314   data += bytes_consumed;
    315   data_len -= bytes_consumed;
    316 
    317   if (decompression_failed_) {
    318     // The session will have been closed in OnDecompressionError.
    319     return total_bytes_consumed;
    320   }
    321 
    322   // Headers are complete if the decompressor has moved on to the
    323   // next stream.
    324   headers_decompressed_ =
    325       session_->decompressor()->current_header_id() != headers_id_;
    326   if (!headers_decompressed_) {
    327     DCHECK_EQ(0u, data_len);
    328   }
    329 
    330   ProcessHeaderData();
    331 
    332   if (!headers_decompressed_ || !decompressed_headers_.empty()) {
    333     return total_bytes_consumed;
    334   }
    335 
    336   // We have processed all of the decompressed data but we might
    337   // have some more raw data to process.
    338   if (data_len > 0) {
    339     total_bytes_consumed += ProcessData(data, data_len);
    340   }
    341 
    342   // The sequencer will push any additional buffered frames if this data
    343   // has been completely consumed.
    344   return total_bytes_consumed;
    345 }
    346 
    347 uint32 ReliableQuicStream::ProcessHeaderData() {
    348   if (decompressed_headers_.empty()) {
    349     return 0;
    350   }
    351 
    352   size_t bytes_processed = ProcessData(decompressed_headers_.data(),
    353                                        decompressed_headers_.length());
    354   if (bytes_processed == decompressed_headers_.length()) {
    355     decompressed_headers_.clear();
    356   } else {
    357     decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
    358   }
    359   return bytes_processed;
    360 }
    361 
    362 void ReliableQuicStream::OnDecompressorAvailable() {
    363   DCHECK_EQ(headers_id_,
    364             session_->decompressor()->current_header_id());
    365   DCHECK(!headers_decompressed_);
    366   DCHECK(!decompression_failed_);
    367   DCHECK_EQ(0u, decompressed_headers_.length());
    368 
    369   while (!headers_decompressed_) {
    370     struct iovec iovec;
    371     if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
    372       return;
    373     }
    374 
    375     size_t bytes_consumed = session_->decompressor()->DecompressData(
    376         StringPiece(static_cast<char*>(iovec.iov_base),
    377                     iovec.iov_len),
    378         this);
    379     DCHECK_LE(bytes_consumed, iovec.iov_len);
    380     if (decompression_failed_) {
    381       return;
    382     }
    383     sequencer_.MarkConsumed(bytes_consumed);
    384 
    385     headers_decompressed_ =
    386         session_->decompressor()->current_header_id() != headers_id_;
    387   }
    388 
    389   // Either the headers are complete, or the all data as been consumed.
    390   ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
    391   if (IsHalfClosed()) {
    392     TerminateFromPeer(true);
    393   } else if (headers_decompressed_ && decompressed_headers_.empty()) {
    394     sequencer_.FlushBufferedFrames();
    395   }
    396 }
    397 
    398 bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
    399   data.AppendToString(&decompressed_headers_);
    400   return true;
    401 }
    402 
    403 void ReliableQuicStream::OnDecompressionError() {
    404   DCHECK(!decompression_failed_);
    405   decompression_failed_ = true;
    406   session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
    407 }
    408 
    409 
    410 void ReliableQuicStream::CloseWriteSide() {
    411   if (write_side_closed_) {
    412     return;
    413   }
    414   DLOG(INFO) << "Done writing to stream " << id();
    415 
    416   write_side_closed_ = true;
    417   if (read_side_closed_) {
    418     DLOG(INFO) << "Closing stream: " << id();
    419     session_->CloseStream(id());
    420   }
    421 }
    422 
    423 void ReliableQuicStream::OnClose() {
    424   CloseReadSide();
    425   CloseWriteSide();
    426 
    427   if (visitor_) {
    428     Visitor* visitor = visitor_;
    429     // Calling Visitor::OnClose() may result the destruction of the visitor,
    430     // so we need to ensure we don't call it again.
    431     visitor_ = NULL;
    432     visitor->OnClose(this);
    433   }
    434 }
    435 
    436 }  // namespace net
    437