Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/quic_stream_sequencer.h"
      6 
      7 #include <algorithm>
      8 #include <limits>
      9 
     10 #include "base/logging.h"
     11 #include "base/metrics/sparse_histogram.h"
     12 #include "net/quic/reliable_quic_stream.h"
     13 
     14 using std::make_pair;
     15 using std::min;
     16 using std::numeric_limits;
     17 
     18 namespace net {
     19 
     20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
     21     : stream_(quic_stream),
     22       num_bytes_consumed_(0),
     23       close_offset_(numeric_limits<QuicStreamOffset>::max()),
     24       blocked_(false),
     25       num_bytes_buffered_(0),
     26       num_frames_received_(0),
     27       num_duplicate_frames_received_(0) {
     28 }
     29 
     30 QuicStreamSequencer::~QuicStreamSequencer() {
     31 }
     32 
     33 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
     34   ++num_frames_received_;
     35   if (IsDuplicate(frame)) {
     36     ++num_duplicate_frames_received_;
     37     // Silently ignore duplicates.
     38     return;
     39   }
     40 
     41   if (FrameOverlapsBufferedData(frame)) {
     42     stream_->CloseConnectionWithDetails(
     43         QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data.");
     44     return;
     45   }
     46 
     47   QuicStreamOffset byte_offset = frame.offset;
     48   size_t data_len = frame.data.TotalBufferSize();
     49   if (data_len == 0 && !frame.fin) {
     50     // Stream frames must have data or a fin flag.
     51     stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
     52                                         "Empty stream frame without FIN set.");
     53     return;
     54   }
     55 
     56   if (frame.fin) {
     57     CloseStreamAtOffset(frame.offset + data_len);
     58     if (data_len == 0) {
     59       return;
     60     }
     61   }
     62 
     63   IOVector data;
     64   data.AppendIovec(frame.data.iovec(), frame.data.Size());
     65 
     66   // If the frame has arrived in-order then we can process it immediately, only
     67   // buffering if the stream is unable to process it.
     68   if (!blocked_ && byte_offset == num_bytes_consumed_) {
     69     DVLOG(1) << "Processing byte offset " << byte_offset;
     70     size_t bytes_consumed = 0;
     71     for (size_t i = 0; i < data.Size(); ++i) {
     72       bytes_consumed += stream_->ProcessRawData(
     73           static_cast<char*>(data.iovec()[i].iov_base),
     74           data.iovec()[i].iov_len);
     75     }
     76     num_bytes_consumed_ += bytes_consumed;
     77     stream_->AddBytesConsumed(bytes_consumed);
     78 
     79     if (MaybeCloseStream()) {
     80       return;
     81     }
     82     if (bytes_consumed > data_len) {
     83       stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
     84       return;
     85     } else if (bytes_consumed == data_len) {
     86       FlushBufferedFrames();
     87       return;  // it's safe to ack this frame.
     88     } else {
     89       // Set ourselves up to buffer what's left.
     90       data_len -= bytes_consumed;
     91       data.Consume(bytes_consumed);
     92       byte_offset += bytes_consumed;
     93     }
     94   }
     95 
     96   // Buffer any remaining data to be consumed by the stream when ready.
     97   for (size_t i = 0; i < data.Size(); ++i) {
     98     DVLOG(1) << "Buffering stream data at offset " << byte_offset;
     99     const iovec& iov = data.iovec()[i];
    100     buffered_frames_.insert(make_pair(
    101         byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len)));
    102     byte_offset += iov.iov_len;
    103     num_bytes_buffered_ += iov.iov_len;
    104   }
    105   return;
    106 }
    107 
    108 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
    109   const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
    110 
    111   // If we have a scheduled termination or close, any new offset should match
    112   // it.
    113   if (close_offset_ != kMaxOffset && offset != close_offset_) {
    114     stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
    115     return;
    116   }
    117 
    118   close_offset_ = offset;
    119 
    120   MaybeCloseStream();
    121 }
    122 
    123 bool QuicStreamSequencer::MaybeCloseStream() {
    124   if (!blocked_ && IsClosed()) {
    125     DVLOG(1) << "Passing up termination, as we've processed "
    126              << num_bytes_consumed_ << " of " << close_offset_
    127              << " bytes.";
    128     // Technically it's an error if num_bytes_consumed isn't exactly
    129     // equal, but error handling seems silly at this point.
    130     stream_->OnFinRead();
    131     buffered_frames_.clear();
    132     num_bytes_buffered_ = 0;
    133     return true;
    134   }
    135   return false;
    136 }
    137 
    138 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) {
    139   DCHECK(!blocked_);
    140   FrameMap::iterator it = buffered_frames_.begin();
    141   size_t index = 0;
    142   QuicStreamOffset offset = num_bytes_consumed_;
    143   while (it != buffered_frames_.end() && index < iov_len) {
    144     if (it->first != offset) return index;
    145 
    146     iov[index].iov_base = static_cast<void*>(
    147         const_cast<char*>(it->second.data()));
    148     iov[index].iov_len = it->second.size();
    149     offset += it->second.size();
    150 
    151     ++index;
    152     ++it;
    153   }
    154   return index;
    155 }
    156 
    157 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
    158   DCHECK(!blocked_);
    159   FrameMap::iterator it = buffered_frames_.begin();
    160   size_t iov_index = 0;
    161   size_t iov_offset = 0;
    162   size_t frame_offset = 0;
    163   size_t initial_bytes_consumed = num_bytes_consumed_;
    164 
    165   while (iov_index < iov_len &&
    166          it != buffered_frames_.end() &&
    167          it->first == num_bytes_consumed_) {
    168     int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
    169                             it->second.size() - frame_offset);
    170 
    171     char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
    172     memcpy(iov_ptr,
    173            it->second.data() + frame_offset, bytes_to_read);
    174     frame_offset += bytes_to_read;
    175     iov_offset += bytes_to_read;
    176 
    177     if (iov[iov_index].iov_len == iov_offset) {
    178       // We've filled this buffer.
    179       iov_offset = 0;
    180       ++iov_index;
    181     }
    182     if (it->second.size() == frame_offset) {
    183       // We've copied this whole frame
    184       RecordBytesConsumed(it->second.size());
    185       buffered_frames_.erase(it);
    186       it = buffered_frames_.begin();
    187       frame_offset = 0;
    188     }
    189   }
    190   // We've finished copying.  If we have a partial frame, update it.
    191   if (frame_offset != 0) {
    192     buffered_frames_.insert(
    193         make_pair(it->first + frame_offset, it->second.substr(frame_offset)));
    194     buffered_frames_.erase(buffered_frames_.begin());
    195     RecordBytesConsumed(frame_offset);
    196   }
    197   return num_bytes_consumed_ - initial_bytes_consumed;
    198 }
    199 
    200 bool QuicStreamSequencer::HasBytesToRead() const {
    201   FrameMap::const_iterator it = buffered_frames_.begin();
    202 
    203   return it != buffered_frames_.end() && it->first == num_bytes_consumed_;
    204 }
    205 
    206 bool QuicStreamSequencer::IsClosed() const {
    207   return num_bytes_consumed_ >= close_offset_;
    208 }
    209 
    210 bool QuicStreamSequencer::FrameOverlapsBufferedData(
    211     const QuicStreamFrame& frame) const {
    212   if (buffered_frames_.empty()) {
    213     return false;
    214   }
    215 
    216   FrameMap::const_iterator next_frame =
    217       buffered_frames_.lower_bound(frame.offset);
    218   // Duplicate frames should have been dropped in IsDuplicate.
    219   DCHECK(next_frame == buffered_frames_.end() ||
    220          next_frame->first != frame.offset);
    221 
    222   // If there is a buffered frame with a higher starting offset, then we check
    223   // to see if the new frame runs into the higher frame.
    224   if (next_frame != buffered_frames_.end() &&
    225       (frame.offset + frame.data.TotalBufferSize()) > next_frame->first) {
    226     DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + "
    227              << frame.data.TotalBufferSize() << " > " << next_frame->first;
    228     return true;
    229   }
    230 
    231   // If there is a buffered frame with a lower starting offset, then we check
    232   // to see if the buffered frame runs into the new frame.
    233   if (next_frame != buffered_frames_.begin()) {
    234     FrameMap::const_iterator preceeding_frame = --next_frame;
    235     QuicStreamOffset offset = preceeding_frame->first;
    236     uint64 data_length = preceeding_frame->second.length();
    237     if ((offset + data_length) > frame.offset) {
    238       DVLOG(1) << "Preceeding frame overlaps new frame: " << offset << " + "
    239                << data_length << " > " << frame.offset;
    240       return true;
    241     }
    242   }
    243   return false;
    244 }
    245 
    246 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const {
    247   // A frame is duplicate if the frame offset is smaller than our bytes consumed
    248   // or we have stored the frame in our map.
    249   // TODO(pwestin): Is it possible that a new frame contain more data even if
    250   // the offset is the same?
    251   return frame.offset < num_bytes_consumed_ ||
    252       buffered_frames_.find(frame.offset) != buffered_frames_.end();
    253 }
    254 
    255 void QuicStreamSequencer::SetBlockedUntilFlush() {
    256   blocked_ = true;
    257 }
    258 
    259 void QuicStreamSequencer::FlushBufferedFrames() {
    260   blocked_ = false;
    261   FrameMap::iterator it = buffered_frames_.find(num_bytes_consumed_);
    262   while (it != buffered_frames_.end()) {
    263     DVLOG(1) << "Flushing buffered packet at offset " << it->first;
    264     string* data = &it->second;
    265     size_t bytes_consumed = stream_->ProcessRawData(data->c_str(),
    266                                                     data->size());
    267     RecordBytesConsumed(bytes_consumed);
    268     if (MaybeCloseStream()) {
    269       return;
    270     }
    271     if (bytes_consumed > data->size()) {
    272       stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);  // Programming error
    273       return;
    274     } else if (bytes_consumed == data->size()) {
    275       buffered_frames_.erase(it);
    276       it = buffered_frames_.find(num_bytes_consumed_);
    277     } else {
    278       string new_data = it->second.substr(bytes_consumed);
    279       buffered_frames_.erase(it);
    280       buffered_frames_.insert(make_pair(num_bytes_consumed_, new_data));
    281       return;
    282     }
    283   }
    284   MaybeCloseStream();
    285 }
    286 
    287 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) {
    288   num_bytes_consumed_ += bytes_consumed;
    289   num_bytes_buffered_ -= bytes_consumed;
    290 
    291   stream_->AddBytesConsumed(bytes_consumed);
    292 }
    293 
    294 }  // namespace net
    295