1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/quic_stream_sequencer.h" 6 7 #include <algorithm> 8 #include <limits> 9 10 #include "base/logging.h" 11 #include "base/metrics/sparse_histogram.h" 12 #include "net/quic/reliable_quic_stream.h" 13 14 using std::make_pair; 15 using std::min; 16 using std::numeric_limits; 17 18 namespace net { 19 20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream) 21 : stream_(quic_stream), 22 num_bytes_consumed_(0), 23 close_offset_(numeric_limits<QuicStreamOffset>::max()), 24 blocked_(false), 25 num_bytes_buffered_(0), 26 num_frames_received_(0), 27 num_duplicate_frames_received_(0) { 28 } 29 30 QuicStreamSequencer::~QuicStreamSequencer() { 31 } 32 33 bool QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { 34 ++num_frames_received_; 35 if (IsDuplicate(frame)) { 36 ++num_duplicate_frames_received_; 37 // Silently ignore duplicates. 38 return true; 39 } 40 41 if (FrameOverlapsBufferedData(frame)) { 42 stream_->CloseConnectionWithDetails( 43 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data."); 44 return false; 45 } 46 47 QuicStreamOffset byte_offset = frame.offset; 48 size_t data_len = frame.data.TotalBufferSize(); 49 if (data_len == 0 && !frame.fin) { 50 // Stream frames must have data or a fin flag. 51 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME, 52 "Empty stream frame without FIN set."); 53 return false; 54 } 55 56 if (frame.fin) { 57 CloseStreamAtOffset(frame.offset + data_len); 58 if (data_len == 0) { 59 return true; 60 } 61 } 62 63 IOVector data; 64 data.AppendIovec(frame.data.iovec(), frame.data.Size()); 65 66 // If the frame has arrived in-order then we can process it immediately, only 67 // buffering if the stream is unable to process it. 68 if (!blocked_ && byte_offset == num_bytes_consumed_) { 69 DVLOG(1) << "Processing byte offset " << byte_offset; 70 size_t bytes_consumed = 0; 71 for (size_t i = 0; i < data.Size(); ++i) { 72 bytes_consumed += stream_->ProcessRawData( 73 static_cast<char*>(data.iovec()[i].iov_base), 74 data.iovec()[i].iov_len); 75 } 76 num_bytes_consumed_ += bytes_consumed; 77 stream_->AddBytesConsumed(bytes_consumed); 78 79 if (MaybeCloseStream()) { 80 return true; 81 } 82 if (bytes_consumed > data_len) { 83 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); 84 return false; 85 } else if (bytes_consumed == data_len) { 86 FlushBufferedFrames(); 87 return true; // it's safe to ack this frame. 88 } else { 89 // Set ourselves up to buffer what's left. 90 data_len -= bytes_consumed; 91 data.Consume(bytes_consumed); 92 byte_offset += bytes_consumed; 93 } 94 } 95 96 // Buffer any remaining data to be consumed by the stream when ready. 97 for (size_t i = 0; i < data.Size(); ++i) { 98 DVLOG(1) << "Buffering stream data at offset " << byte_offset; 99 const iovec& iov = data.iovec()[i]; 100 buffered_frames_.insert(make_pair( 101 byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len))); 102 byte_offset += iov.iov_len; 103 num_bytes_buffered_ += iov.iov_len; 104 } 105 return true; 106 } 107 108 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { 109 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max(); 110 111 // If we have a scheduled termination or close, any new offset should match 112 // it. 113 if (close_offset_ != kMaxOffset && offset != close_offset_) { 114 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS); 115 return; 116 } 117 118 close_offset_ = offset; 119 120 MaybeCloseStream(); 121 } 122 123 bool QuicStreamSequencer::MaybeCloseStream() { 124 if (!blocked_ && IsClosed()) { 125 DVLOG(1) << "Passing up termination, as we've processed " 126 << num_bytes_consumed_ << " of " << close_offset_ 127 << " bytes."; 128 // Technically it's an error if num_bytes_consumed isn't exactly 129 // equal, but error handling seems silly at this point. 130 stream_->OnFinRead(); 131 buffered_frames_.clear(); 132 num_bytes_buffered_ = 0; 133 return true; 134 } 135 return false; 136 } 137 138 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) { 139 DCHECK(!blocked_); 140 FrameMap::iterator it = buffered_frames_.begin(); 141 size_t index = 0; 142 QuicStreamOffset offset = num_bytes_consumed_; 143 while (it != buffered_frames_.end() && index < iov_len) { 144 if (it->first != offset) return index; 145 146 iov[index].iov_base = static_cast<void*>( 147 const_cast<char*>(it->second.data())); 148 iov[index].iov_len = it->second.size(); 149 offset += it->second.size(); 150 151 ++index; 152 ++it; 153 } 154 return index; 155 } 156 157 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { 158 DCHECK(!blocked_); 159 FrameMap::iterator it = buffered_frames_.begin(); 160 size_t iov_index = 0; 161 size_t iov_offset = 0; 162 size_t frame_offset = 0; 163 size_t initial_bytes_consumed = num_bytes_consumed_; 164 165 while (iov_index < iov_len && 166 it != buffered_frames_.end() && 167 it->first == num_bytes_consumed_) { 168 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset, 169 it->second.size() - frame_offset); 170 171 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset; 172 memcpy(iov_ptr, 173 it->second.data() + frame_offset, bytes_to_read); 174 frame_offset += bytes_to_read; 175 iov_offset += bytes_to_read; 176 177 if (iov[iov_index].iov_len == iov_offset) { 178 // We've filled this buffer. 179 iov_offset = 0; 180 ++iov_index; 181 } 182 if (it->second.size() == frame_offset) { 183 // We've copied this whole frame 184 RecordBytesConsumed(it->second.size()); 185 buffered_frames_.erase(it); 186 it = buffered_frames_.begin(); 187 frame_offset = 0; 188 } 189 } 190 // We've finished copying. If we have a partial frame, update it. 191 if (frame_offset != 0) { 192 buffered_frames_.insert( 193 make_pair(it->first + frame_offset, it->second.substr(frame_offset))); 194 buffered_frames_.erase(buffered_frames_.begin()); 195 RecordBytesConsumed(frame_offset); 196 } 197 return num_bytes_consumed_ - initial_bytes_consumed; 198 } 199 200 bool QuicStreamSequencer::HasBytesToRead() const { 201 FrameMap::const_iterator it = buffered_frames_.begin(); 202 203 return it != buffered_frames_.end() && it->first == num_bytes_consumed_; 204 } 205 206 bool QuicStreamSequencer::IsClosed() const { 207 return num_bytes_consumed_ >= close_offset_; 208 } 209 210 bool QuicStreamSequencer::FrameOverlapsBufferedData( 211 const QuicStreamFrame& frame) const { 212 if (buffered_frames_.empty()) { 213 return false; 214 } 215 216 FrameMap::const_iterator next_frame = 217 buffered_frames_.lower_bound(frame.offset); 218 // Duplicate frames should have been dropped in IsDuplicate. 219 DCHECK(next_frame == buffered_frames_.end() || 220 next_frame->first != frame.offset); 221 222 // If there is a buffered frame with a higher starting offset, then we check 223 // to see if the new frame runs into the higher frame. 224 if (next_frame != buffered_frames_.end() && 225 (frame.offset + frame.data.TotalBufferSize()) > next_frame->first) { 226 DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + " 227 << frame.data.TotalBufferSize() << " > " << next_frame->first; 228 return true; 229 } 230 231 // If there is a buffered frame with a lower starting offset, then we check 232 // to see if the buffered frame runs into the new frame. 233 if (next_frame != buffered_frames_.begin()) { 234 FrameMap::const_iterator preceeding_frame = --next_frame; 235 QuicStreamOffset offset = preceeding_frame->first; 236 uint64 data_length = preceeding_frame->second.length(); 237 if ((offset + data_length) > frame.offset) { 238 DVLOG(1) << "Preceeding frame overlaps new frame: " << offset << " + " 239 << data_length << " > " << frame.offset; 240 return true; 241 } 242 } 243 return false; 244 } 245 246 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const { 247 // A frame is duplicate if the frame offset is smaller than our bytes consumed 248 // or we have stored the frame in our map. 249 // TODO(pwestin): Is it possible that a new frame contain more data even if 250 // the offset is the same? 251 return frame.offset < num_bytes_consumed_ || 252 buffered_frames_.find(frame.offset) != buffered_frames_.end(); 253 } 254 255 void QuicStreamSequencer::SetBlockedUntilFlush() { 256 blocked_ = true; 257 } 258 259 void QuicStreamSequencer::FlushBufferedFrames() { 260 blocked_ = false; 261 FrameMap::iterator it = buffered_frames_.find(num_bytes_consumed_); 262 while (it != buffered_frames_.end()) { 263 DVLOG(1) << "Flushing buffered packet at offset " << it->first; 264 string* data = &it->second; 265 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(), 266 data->size()); 267 RecordBytesConsumed(bytes_consumed); 268 if (MaybeCloseStream()) { 269 return; 270 } 271 if (bytes_consumed > data->size()) { 272 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error 273 return; 274 } else if (bytes_consumed == data->size()) { 275 buffered_frames_.erase(it); 276 it = buffered_frames_.find(num_bytes_consumed_); 277 } else { 278 string new_data = it->second.substr(bytes_consumed); 279 buffered_frames_.erase(it); 280 buffered_frames_.insert(make_pair(num_bytes_consumed_, new_data)); 281 return; 282 } 283 } 284 MaybeCloseStream(); 285 } 286 287 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { 288 num_bytes_consumed_ += bytes_consumed; 289 num_bytes_buffered_ -= bytes_consumed; 290 291 stream_->AddBytesConsumed(bytes_consumed); 292 } 293 294 } // namespace net 295