Home | History | Annotate | Download | only in quic
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/quic_data_stream.h"
      6 
      7 #include "base/logging.h"
      8 #include "net/quic/quic_session.h"
      9 #include "net/quic/quic_spdy_decompressor.h"
     10 #include "net/spdy/write_blocked_list.h"
     11 
     12 using base::StringPiece;
     13 using std::min;
     14 
     15 namespace net {
     16 
     17 #define ENDPOINT (session()->is_server() ? "Server: " : " Client: ")
     18 
     19 namespace {
     20 
     21 // This is somewhat arbitrary.  It's possible, but unlikely, we will either fail
     22 // to set a priority client-side, or cancel a stream before stripping the
     23 // priority from the wire server-side.  In either case, start out with a
     24 // priority in the middle.
     25 QuicPriority kDefaultPriority = 3;
     26 
     27 // Appends bytes from data into partial_data_buffer.  Once partial_data_buffer
     28 // reaches 4 bytes, copies the data into 'result' and clears
     29 // partial_data_buffer.
     30 // Returns the number of bytes consumed.
     31 uint32 StripUint32(const char* data, uint32 data_len,
     32                    string* partial_data_buffer,
     33                    uint32* result) {
     34   DCHECK_GT(4u, partial_data_buffer->length());
     35   size_t missing_size = 4 - partial_data_buffer->length();
     36   if (data_len < missing_size) {
     37     StringPiece(data, data_len).AppendToString(partial_data_buffer);
     38     return data_len;
     39   }
     40   StringPiece(data, missing_size).AppendToString(partial_data_buffer);
     41   DCHECK_EQ(4u, partial_data_buffer->length());
     42   memcpy(result, partial_data_buffer->data(), 4);
     43   partial_data_buffer->clear();
     44   return missing_size;
     45 }
     46 
     47 }  // namespace
     48 
     49 QuicDataStream::QuicDataStream(QuicStreamId id,
     50                                QuicSession* session)
     51     : ReliableQuicStream(id, session),
     52       visitor_(NULL),
     53       headers_decompressed_(false),
     54       priority_(kDefaultPriority),
     55       headers_id_(0),
     56       decompression_failed_(false),
     57       priority_parsed_(false) {
     58   DCHECK_NE(kCryptoStreamId, id);
     59 }
     60 
     61 QuicDataStream::~QuicDataStream() {
     62 }
     63 
     64 size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) {
     65   if (FinishedReadingHeaders()) {
     66     // If the headers have been read, simply delegate to the sequencer's
     67     // Readv method.
     68     return sequencer()->Readv(iov, iov_len);
     69   }
     70   // Otherwise, copy decompressed header data into |iov|.
     71   size_t bytes_consumed = 0;
     72   size_t iov_index = 0;
     73   while (iov_index < iov_len &&
     74          decompressed_headers_.length() > bytes_consumed) {
     75     size_t bytes_to_read = min(iov[iov_index].iov_len,
     76                                decompressed_headers_.length() - bytes_consumed);
     77     char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
     78     memcpy(iov_ptr,
     79            decompressed_headers_.data() + bytes_consumed, bytes_to_read);
     80     bytes_consumed += bytes_to_read;
     81     ++iov_index;
     82   }
     83   decompressed_headers_.erase(0, bytes_consumed);
     84   return bytes_consumed;
     85 }
     86 
     87 int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) {
     88   if (FinishedReadingHeaders()) {
     89     return sequencer()->GetReadableRegions(iov, iov_len);
     90   }
     91   if (iov_len == 0) {
     92     return 0;
     93   }
     94   iov[0].iov_base = static_cast<void*>(
     95       const_cast<char*>(decompressed_headers_.data()));
     96   iov[0].iov_len = decompressed_headers_.length();
     97   return 1;
     98 }
     99 
    100 bool QuicDataStream::IsDoneReading() const {
    101   if (!headers_decompressed_ || !decompressed_headers_.empty()) {
    102     return false;
    103   }
    104   return sequencer()->IsClosed();
    105 }
    106 
    107 bool QuicDataStream::HasBytesToRead() const {
    108   return !decompressed_headers_.empty() || sequencer()->HasBytesToRead();
    109 }
    110 
    111 void QuicDataStream::set_priority(QuicPriority priority) {
    112   DCHECK_EQ(0u, stream_bytes_written());
    113   priority_ = priority;
    114 }
    115 
    116 QuicPriority QuicDataStream::EffectivePriority() const {
    117   return priority();
    118 }
    119 
    120 uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) {
    121   DCHECK_NE(0u, data_len);
    122 
    123   uint32 total_bytes_consumed = 0;
    124   if (headers_id_ == 0u) {
    125     total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
    126     data += total_bytes_consumed;
    127     data_len -= total_bytes_consumed;
    128     if (data_len == 0 || total_bytes_consumed == 0) {
    129       return total_bytes_consumed;
    130     }
    131   }
    132   DCHECK_NE(0u, headers_id_);
    133 
    134   // Once the headers are finished, we simply pass the data through.
    135   if (headers_decompressed_) {
    136     // Some buffered header data remains.
    137     if (!decompressed_headers_.empty()) {
    138       ProcessHeaderData();
    139     }
    140     if (decompressed_headers_.empty()) {
    141       DVLOG(1) << "Delegating procesing to ProcessData";
    142       total_bytes_consumed += ProcessData(data, data_len);
    143     }
    144     return total_bytes_consumed;
    145   }
    146 
    147   QuicHeaderId current_header_id =
    148       session()->decompressor()->current_header_id();
    149   // Ensure that this header id looks sane.
    150   if (headers_id_ < current_header_id ||
    151       headers_id_ > kMaxHeaderIdDelta + current_header_id) {
    152     DVLOG(1) << ENDPOINT
    153              << "Invalid headers for stream: " << id()
    154              << " header_id: " << headers_id_
    155              << " current_header_id: " << current_header_id;
    156     session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
    157     return total_bytes_consumed;
    158   }
    159 
    160   // If we are head-of-line blocked on decompression, then back up.
    161   if (current_header_id != headers_id_) {
    162     session()->MarkDecompressionBlocked(headers_id_, id());
    163     DVLOG(1) << ENDPOINT
    164              << "Unable to decompress header data for stream: " << id()
    165              << " header_id: " << headers_id_;
    166     return total_bytes_consumed;
    167   }
    168 
    169   // Decompressed data will be delivered to decompressed_headers_.
    170   size_t bytes_consumed = session()->decompressor()->DecompressData(
    171       StringPiece(data, data_len), this);
    172   DCHECK_NE(0u, bytes_consumed);
    173   if (bytes_consumed > data_len) {
    174     DCHECK(false) << "DecompressData returned illegal value";
    175     OnDecompressionError();
    176     return total_bytes_consumed;
    177   }
    178   total_bytes_consumed += bytes_consumed;
    179   data += bytes_consumed;
    180   data_len -= bytes_consumed;
    181 
    182   if (decompression_failed_) {
    183     // The session will have been closed in OnDecompressionError.
    184     return total_bytes_consumed;
    185   }
    186 
    187   // Headers are complete if the decompressor has moved on to the
    188   // next stream.
    189   headers_decompressed_ =
    190       session()->decompressor()->current_header_id() != headers_id_;
    191   if (!headers_decompressed_) {
    192     DCHECK_EQ(0u, data_len);
    193   }
    194 
    195   ProcessHeaderData();
    196 
    197   if (!headers_decompressed_ || !decompressed_headers_.empty()) {
    198     return total_bytes_consumed;
    199   }
    200 
    201   // We have processed all of the decompressed data but we might
    202   // have some more raw data to process.
    203   if (data_len > 0) {
    204     total_bytes_consumed += ProcessData(data, data_len);
    205   }
    206 
    207   // The sequencer will push any additional buffered frames if this data
    208   // has been completely consumed.
    209   return total_bytes_consumed;
    210 }
    211 
    212 const IPEndPoint& QuicDataStream::GetPeerAddress() {
    213   return session()->peer_address();
    214 }
    215 
    216 QuicSpdyCompressor* QuicDataStream::compressor() {
    217   return session()->compressor();
    218 }
    219 
    220 bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) {
    221   return session()->GetSSLInfo(ssl_info);
    222 }
    223 
    224 uint32 QuicDataStream::ProcessHeaderData() {
    225   if (decompressed_headers_.empty()) {
    226     return 0;
    227   }
    228 
    229   size_t bytes_processed = ProcessData(decompressed_headers_.data(),
    230                                        decompressed_headers_.length());
    231   if (bytes_processed == decompressed_headers_.length()) {
    232     decompressed_headers_.clear();
    233   } else {
    234     decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
    235   }
    236   return bytes_processed;
    237 }
    238 
    239 void QuicDataStream::OnDecompressorAvailable() {
    240   DCHECK_EQ(headers_id_,
    241             session()->decompressor()->current_header_id());
    242   DCHECK(!headers_decompressed_);
    243   DCHECK(!decompression_failed_);
    244   DCHECK_EQ(0u, decompressed_headers_.length());
    245 
    246   while (!headers_decompressed_) {
    247     struct iovec iovec;
    248     if (sequencer()->GetReadableRegions(&iovec, 1) == 0) {
    249       return;
    250     }
    251 
    252     size_t bytes_consumed = session()->decompressor()->DecompressData(
    253         StringPiece(static_cast<char*>(iovec.iov_base),
    254                     iovec.iov_len),
    255         this);
    256     DCHECK_LE(bytes_consumed, iovec.iov_len);
    257     if (decompression_failed_) {
    258       return;
    259     }
    260     sequencer()->MarkConsumed(bytes_consumed);
    261 
    262     headers_decompressed_ =
    263         session()->decompressor()->current_header_id() != headers_id_;
    264   }
    265 
    266   // Either the headers are complete, or the all data as been consumed.
    267   ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
    268   if (IsDoneReading()) {
    269     OnFinRead();
    270   } else if (FinishedReadingHeaders()) {
    271     sequencer()->FlushBufferedFrames();
    272   }
    273 }
    274 
    275 bool QuicDataStream::OnDecompressedData(StringPiece data) {
    276   data.AppendToString(&decompressed_headers_);
    277   return true;
    278 }
    279 
    280 void QuicDataStream::OnDecompressionError() {
    281   DCHECK(!decompression_failed_);
    282   decompression_failed_ = true;
    283   session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
    284 }
    285 
    286 void QuicDataStream::OnClose() {
    287   ReliableQuicStream::OnClose();
    288 
    289   if (visitor_) {
    290     Visitor* visitor = visitor_;
    291     // Calling Visitor::OnClose() may result the destruction of the visitor,
    292     // so we need to ensure we don't call it again.
    293     visitor_ = NULL;
    294     visitor->OnClose(this);
    295   }
    296 }
    297 
    298 uint32 QuicDataStream::StripPriorityAndHeaderId(
    299     const char* data, uint32 data_len) {
    300   uint32 total_bytes_parsed = 0;
    301 
    302   if (!priority_parsed_ && session()->connection()->is_server()) {
    303     QuicPriority temporary_priority = priority_;
    304     total_bytes_parsed = StripUint32(
    305         data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
    306     if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
    307       priority_parsed_ = true;
    308 
    309       // Spdy priorities are inverted, so the highest numerical value is the
    310       // lowest legal priority.
    311       if (temporary_priority > QuicUtils::LowestPriority()) {
    312         session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
    313         return 0;
    314       }
    315       priority_ = temporary_priority;
    316     }
    317     data += total_bytes_parsed;
    318     data_len -= total_bytes_parsed;
    319   }
    320   if (data_len > 0 && headers_id_ == 0u) {
    321     // The headers ID has not yet been read.  Strip it from the beginning of
    322     // the data stream.
    323     total_bytes_parsed += StripUint32(
    324         data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
    325   }
    326   return total_bytes_parsed;
    327 }
    328 
    329 bool QuicDataStream::FinishedReadingHeaders() {
    330   return headers_decompressed_ && decompressed_headers_.empty();
    331 }
    332 
    333 }  // namespace net
    334