Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/quic_stream_sequencer.h"
      6 
      7 #include <algorithm>
      8 #include <limits>
      9 
     10 #include "base/logging.h"
     11 #include "net/quic/reliable_quic_stream.h"
     12 
     13 using std::min;
     14 using std::numeric_limits;
     15 
     16 namespace net {
     17 
     18 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
     19     : stream_(quic_stream),
     20       num_bytes_consumed_(0),
     21       max_frame_memory_(numeric_limits<size_t>::max()),
     22       close_offset_(numeric_limits<QuicStreamOffset>::max()) {
     23 }
     24 
     25 QuicStreamSequencer::QuicStreamSequencer(size_t max_frame_memory,
     26                                          ReliableQuicStream* quic_stream)
     27     : stream_(quic_stream),
     28       num_bytes_consumed_(0),
     29       max_frame_memory_(max_frame_memory),
     30       close_offset_(numeric_limits<QuicStreamOffset>::max()) {
     31   if (max_frame_memory < kMaxPacketSize) {
     32     LOG(DFATAL) << "Setting max frame memory to " << max_frame_memory
     33                 << ".  Some frames will be impossible to handle.";
     34   }
     35 }
     36 
     37 QuicStreamSequencer::~QuicStreamSequencer() {
     38 }
     39 
     40 bool QuicStreamSequencer::WillAcceptStreamFrame(
     41     const QuicStreamFrame& frame) const {
     42   size_t data_len = frame.data.TotalBufferSize();
     43   DCHECK_LE(data_len, max_frame_memory_);
     44 
     45   if (IsDuplicate(frame)) {
     46     return true;
     47   }
     48   QuicStreamOffset byte_offset = frame.offset;
     49   if (data_len > max_frame_memory_) {
     50     // We're never going to buffer this frame and we can't pass it up.
     51     // The stream might only consume part of it and we'd need a partial ack.
     52     //
     53     // Ideally this should never happen, as we check that
     54     // max_frame_memory_ > kMaxPacketSize and lower levels should reject
     55     // frames larger than that.
     56     return false;
     57   }
     58   if (byte_offset + data_len - num_bytes_consumed_ > max_frame_memory_) {
     59     // We can buffer this but not right now.  Toss it.
     60     // It might be worth trying an experiment where we try best-effort buffering
     61     return false;
     62   }
     63   return true;
     64 }
     65 
     66 bool QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
     67   if (!WillAcceptStreamFrame(frame)) {
     68     // This should not happen, as WillAcceptFrame should be called before
     69     // OnStreamFrame.  Error handling should be done by the caller.
     70     return false;
     71   }
     72   if (IsDuplicate(frame)) {
     73     // Silently ignore duplicates.
     74     return true;
     75   }
     76 
     77   QuicStreamOffset byte_offset = frame.offset;
     78   size_t data_len = frame.data.TotalBufferSize();
     79   if (data_len == 0 && !frame.fin) {
     80     // Stream frames must have data or a fin flag.
     81     stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
     82                                         "Empty stream frame without FIN set.");
     83     return false;
     84   }
     85 
     86   if (frame.fin) {
     87     CloseStreamAtOffset(frame.offset + data_len);
     88     if (data_len == 0) {
     89       return true;
     90     }
     91   }
     92 
     93   IOVector data;
     94   data.AppendIovec(frame.data.iovec(), frame.data.Size());
     95   if (byte_offset == num_bytes_consumed_) {
     96     DVLOG(1) << "Processing byte offset " << byte_offset;
     97     size_t bytes_consumed = 0;
     98     for (size_t i = 0; i < data.Size(); ++i) {
     99       bytes_consumed += stream_->ProcessRawData(
    100           static_cast<char*>(data.iovec()[i].iov_base),
    101           data.iovec()[i].iov_len);
    102     }
    103     num_bytes_consumed_ += bytes_consumed;
    104     if (MaybeCloseStream()) {
    105       return true;
    106     }
    107     if (bytes_consumed > data_len) {
    108       stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
    109       return false;
    110     } else if (bytes_consumed == data_len) {
    111       FlushBufferedFrames();
    112       return true;  // it's safe to ack this frame.
    113     } else {
    114       // Set ourselves up to buffer what's left
    115       data_len -= bytes_consumed;
    116       data.Consume(bytes_consumed);
    117       byte_offset += bytes_consumed;
    118     }
    119   }
    120   for (size_t i = 0; i < data.Size(); ++i) {
    121     DVLOG(1) << "Buffering stream data at offset " << byte_offset;
    122     frames_.insert(make_pair(byte_offset, string(
    123         static_cast<char*>(data.iovec()[i].iov_base),
    124         data.iovec()[i].iov_len)));
    125     byte_offset += data.iovec()[i].iov_len;
    126   }
    127   return true;
    128 }
    129 
    130 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
    131   const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
    132 
    133   // If we have a scheduled termination or close, any new offset should match
    134   // it.
    135   if (close_offset_ != kMaxOffset && offset != close_offset_) {
    136     stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
    137     return;
    138   }
    139 
    140   close_offset_ = offset;
    141 
    142   MaybeCloseStream();
    143 }
    144 
    145 bool QuicStreamSequencer::MaybeCloseStream() {
    146   if (IsClosed()) {
    147     DVLOG(1) << "Passing up termination, as we've processed "
    148              << num_bytes_consumed_ << " of " << close_offset_
    149              << " bytes.";
    150     // Technically it's an error if num_bytes_consumed isn't exactly
    151     // equal, but error handling seems silly at this point.
    152     stream_->OnFinRead();
    153     return true;
    154   }
    155   return false;
    156 }
    157 
    158 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) {
    159   FrameMap::iterator it = frames_.begin();
    160   size_t index = 0;
    161   QuicStreamOffset offset = num_bytes_consumed_;
    162   while (it != frames_.end() && index < iov_len) {
    163     if (it->first != offset) return index;
    164 
    165     iov[index].iov_base = static_cast<void*>(
    166         const_cast<char*>(it->second.data()));
    167     iov[index].iov_len = it->second.size();
    168     offset += it->second.size();
    169 
    170     ++index;
    171     ++it;
    172   }
    173   return index;
    174 }
    175 
    176 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
    177   FrameMap::iterator it = frames_.begin();
    178   size_t iov_index = 0;
    179   size_t iov_offset = 0;
    180   size_t frame_offset = 0;
    181   size_t initial_bytes_consumed = num_bytes_consumed_;
    182 
    183   while (iov_index < iov_len &&
    184          it != frames_.end() &&
    185          it->first == num_bytes_consumed_) {
    186     int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
    187                             it->second.size() - frame_offset);
    188 
    189     char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
    190     memcpy(iov_ptr,
    191            it->second.data() + frame_offset, bytes_to_read);
    192     frame_offset += bytes_to_read;
    193     iov_offset += bytes_to_read;
    194 
    195     if (iov[iov_index].iov_len == iov_offset) {
    196       // We've filled this buffer.
    197       iov_offset = 0;
    198       ++iov_index;
    199     }
    200     if (it->second.size() == frame_offset) {
    201       // We've copied this whole frame
    202       num_bytes_consumed_ += it->second.size();
    203       frames_.erase(it);
    204       it = frames_.begin();
    205       frame_offset = 0;
    206     }
    207   }
    208   // We've finished copying.  If we have a partial frame, update it.
    209   if (frame_offset != 0) {
    210     frames_.insert(make_pair(it->first + frame_offset,
    211                              it->second.substr(frame_offset)));
    212     frames_.erase(frames_.begin());
    213     num_bytes_consumed_ += frame_offset;
    214   }
    215   return num_bytes_consumed_ - initial_bytes_consumed;
    216 }
    217 
    218 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) {
    219   size_t end_offset = num_bytes_consumed_ + num_bytes_consumed;
    220   while (!frames_.empty() && end_offset != num_bytes_consumed_) {
    221     FrameMap::iterator it = frames_.begin();
    222     if (it->first != num_bytes_consumed_) {
    223       LOG(DFATAL) << "Invalid argument to MarkConsumed. "
    224                   << " num_bytes_consumed_: " << num_bytes_consumed_
    225                   << " end_offset: " << end_offset
    226                   << " offset: " << it->first
    227                   << " length: " << it->second.length();
    228       stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
    229       return;
    230     }
    231 
    232     if (it->first + it->second.length() <= end_offset) {
    233       num_bytes_consumed_ += it->second.length();
    234       // This chunk is entirely consumed.
    235       frames_.erase(it);
    236       continue;
    237     }
    238 
    239     // Partially consume this frame.
    240     size_t delta = end_offset - it->first;
    241     num_bytes_consumed_ += delta;
    242     frames_.insert(make_pair(end_offset, it->second.substr(delta)));
    243     frames_.erase(it);
    244     break;
    245   }
    246 }
    247 
    248 bool QuicStreamSequencer::HasBytesToRead() const {
    249   FrameMap::const_iterator it = frames_.begin();
    250 
    251   return it != frames_.end() && it->first == num_bytes_consumed_;
    252 }
    253 
    254 bool QuicStreamSequencer::IsClosed() const {
    255   return num_bytes_consumed_ >= close_offset_;
    256 }
    257 
    258 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const {
    259   // A frame is duplicate if the frame offset is smaller than our bytes consumed
    260   // or we have stored the frame in our map.
    261   // TODO(pwestin): Is it possible that a new frame contain more data even if
    262   // the offset is the same?
    263   return frame.offset < num_bytes_consumed_ ||
    264       frames_.find(frame.offset) != frames_.end();
    265 }
    266 
    267 void QuicStreamSequencer::FlushBufferedFrames() {
    268   FrameMap::iterator it = frames_.find(num_bytes_consumed_);
    269   while (it != frames_.end()) {
    270     DVLOG(1) << "Flushing buffered packet at offset " << it->first;
    271     string* data = &it->second;
    272     size_t bytes_consumed = stream_->ProcessRawData(data->c_str(),
    273                                                     data->size());
    274     num_bytes_consumed_ += bytes_consumed;
    275     if (MaybeCloseStream()) {
    276       return;
    277     }
    278     if (bytes_consumed > data->size()) {
    279       stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);  // Programming error
    280       return;
    281     } else if (bytes_consumed == data->size()) {
    282       frames_.erase(it);
    283       it = frames_.find(num_bytes_consumed_);
    284     } else {
    285       string new_data = it->second.substr(bytes_consumed);
    286       frames_.erase(it);
    287       frames_.insert(make_pair(num_bytes_consumed_, new_data));
    288       return;
    289     }
    290   }
    291 }
    292 
    293 }  // namespace net
    294