1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/reliable_quic_stream.h" 6 7 #include "net/quic/quic_session.h" 8 #include "net/quic/quic_spdy_decompressor.h" 9 10 using base::StringPiece; 11 using std::min; 12 13 namespace net { 14 15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 16 QuicSession* session) 17 : sequencer_(this), 18 id_(id), 19 session_(session), 20 visitor_(NULL), 21 stream_bytes_read_(0), 22 stream_bytes_written_(0), 23 headers_decompressed_(false), 24 headers_id_(0), 25 decompression_failed_(false), 26 stream_error_(QUIC_STREAM_NO_ERROR), 27 connection_error_(QUIC_NO_ERROR), 28 read_side_closed_(false), 29 write_side_closed_(false), 30 fin_buffered_(false), 31 fin_sent_(false) { 32 } 33 34 ReliableQuicStream::~ReliableQuicStream() { 35 } 36 37 bool ReliableQuicStream::WillAcceptStreamFrame( 38 const QuicStreamFrame& frame) const { 39 if (read_side_closed_) { 40 return true; 41 } 42 if (frame.stream_id != id_) { 43 LOG(ERROR) << "Error!"; 44 return false; 45 } 46 return sequencer_.WillAcceptStreamFrame(frame); 47 } 48 49 bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 50 DCHECK_EQ(frame.stream_id, id_); 51 if (read_side_closed_) { 52 DLOG(INFO) << "Ignoring frame " << frame.stream_id; 53 // We don't want to be reading: blackhole the data. 54 return true; 55 } 56 // Note: This count include duplicate data received. 57 stream_bytes_read_ += frame.data.length(); 58 59 bool accepted = sequencer_.OnStreamFrame(frame); 60 61 return accepted; 62 } 63 64 void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 65 stream_error_ = error; 66 TerminateFromPeer(false); // Full close. 67 } 68 69 void ReliableQuicStream::ConnectionClose(QuicErrorCode error, bool from_peer) { 70 if (read_side_closed_ && write_side_closed_) { 71 return; 72 } 73 if (error != QUIC_NO_ERROR) { 74 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 75 connection_error_ = error; 76 } 77 78 if (from_peer) { 79 TerminateFromPeer(false); 80 } else { 81 CloseWriteSide(); 82 CloseReadSide(); 83 } 84 } 85 86 void ReliableQuicStream::TerminateFromPeer(bool half_close) { 87 if (!half_close) { 88 CloseWriteSide(); 89 } 90 CloseReadSide(); 91 } 92 93 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 94 stream_error_ = error; 95 if (error != QUIC_STREAM_NO_ERROR) { 96 // Sending a RstStream results in calling CloseStream. 97 session()->SendRstStream(id(), error); 98 } else { 99 session_->CloseStream(id()); 100 } 101 } 102 103 size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { 104 if (headers_decompressed_ && decompressed_headers_.empty()) { 105 return sequencer_.Readv(iov, iov_len); 106 } 107 size_t bytes_consumed = 0; 108 size_t iov_index = 0; 109 while (iov_index < iov_len && 110 decompressed_headers_.length() > bytes_consumed) { 111 size_t bytes_to_read = min(iov[iov_index].iov_len, 112 decompressed_headers_.length() - bytes_consumed); 113 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); 114 memcpy(iov_ptr, 115 decompressed_headers_.data() + bytes_consumed, bytes_to_read); 116 bytes_consumed += bytes_to_read; 117 ++iov_index; 118 } 119 decompressed_headers_.erase(0, bytes_consumed); 120 return bytes_consumed; 121 } 122 123 int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { 124 if (headers_decompressed_ && decompressed_headers_.empty()) { 125 return sequencer_.GetReadableRegions(iov, iov_len); 126 } 127 if (iov_len == 0) { 128 return 0; 129 } 130 iov[0].iov_base = static_cast<void*>( 131 const_cast<char*>(decompressed_headers_.data())); 132 iov[0].iov_len = decompressed_headers_.length(); 133 return 1; 134 } 135 136 bool ReliableQuicStream::IsHalfClosed() const { 137 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 138 return false; 139 } 140 return sequencer_.IsHalfClosed(); 141 } 142 143 bool ReliableQuicStream::HasBytesToRead() const { 144 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); 145 } 146 147 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 148 return session_->peer_address(); 149 } 150 151 QuicSpdyCompressor* ReliableQuicStream::compressor() { 152 return session_->compressor(); 153 } 154 155 bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) { 156 return session_->GetSSLInfo(ssl_info); 157 } 158 159 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 160 DCHECK(data.size() > 0 || fin); 161 return WriteOrBuffer(data, fin); 162 } 163 164 QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { 165 DCHECK(!fin_buffered_); 166 167 QuicConsumedData consumed_data(0, false); 168 fin_buffered_ = fin; 169 170 if (queued_data_.empty()) { 171 consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); 172 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 173 } 174 175 // If there's unconsumed data or an unconsumed fin, queue it. 176 if (consumed_data.bytes_consumed < data.length() || 177 (fin && !consumed_data.fin_consumed)) { 178 queued_data_.push_back( 179 string(data.data() + consumed_data.bytes_consumed, 180 data.length() - consumed_data.bytes_consumed)); 181 } 182 183 return QuicConsumedData(data.size(), true); 184 } 185 186 void ReliableQuicStream::OnCanWrite() { 187 bool fin = false; 188 while (!queued_data_.empty()) { 189 const string& data = queued_data_.front(); 190 if (queued_data_.size() == 1 && fin_buffered_) { 191 fin = true; 192 } 193 QuicConsumedData consumed_data = WriteDataInternal(data, fin); 194 if (consumed_data.bytes_consumed == data.size() && 195 fin == consumed_data.fin_consumed) { 196 queued_data_.pop_front(); 197 } else { 198 queued_data_.front().erase(0, consumed_data.bytes_consumed); 199 break; 200 } 201 } 202 } 203 204 QuicConsumedData ReliableQuicStream::WriteDataInternal( 205 StringPiece data, bool fin) { 206 if (write_side_closed_) { 207 DLOG(ERROR) << "Attempt to write when the write side is closed"; 208 return QuicConsumedData(0, false); 209 } 210 211 QuicConsumedData consumed_data = 212 session()->WriteData(id(), data, stream_bytes_written_, fin); 213 stream_bytes_written_ += consumed_data.bytes_consumed; 214 if (consumed_data.bytes_consumed == data.length()) { 215 if (fin && consumed_data.fin_consumed) { 216 fin_sent_ = true; 217 CloseWriteSide(); 218 } else if (fin && !consumed_data.fin_consumed) { 219 session_->MarkWriteBlocked(id()); 220 } 221 } else { 222 session_->MarkWriteBlocked(id()); 223 } 224 return consumed_data; 225 } 226 227 void ReliableQuicStream::CloseReadSide() { 228 if (read_side_closed_) { 229 return; 230 } 231 DLOG(INFO) << "Done reading from stream " << id(); 232 233 read_side_closed_ = true; 234 if (write_side_closed_) { 235 DLOG(INFO) << "Closing stream: " << id(); 236 session_->CloseStream(id()); 237 } 238 } 239 240 uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { 241 if (id() == kCryptoStreamId) { 242 if (data_len == 0) { 243 return 0; 244 } 245 // The crypto stream does not use compression. 246 return ProcessData(data, data_len); 247 } 248 uint32 total_bytes_consumed = 0; 249 if (headers_id_ == 0u) { 250 // The headers ID has not yet been read. Strip it from the beginning of 251 // the data stream. 252 DCHECK_GT(4u, headers_id_buffer_.length()); 253 size_t missing_size = 4 - headers_id_buffer_.length(); 254 if (data_len < missing_size) { 255 StringPiece(data, data_len).AppendToString(&headers_id_buffer_); 256 return data_len; 257 } 258 total_bytes_consumed += missing_size; 259 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); 260 DCHECK_EQ(4u, headers_id_buffer_.length()); 261 memcpy(&headers_id_, headers_id_buffer_.data(), 4); 262 headers_id_buffer_.clear(); 263 data += missing_size; 264 data_len -= missing_size; 265 } 266 DCHECK_NE(0u, headers_id_); 267 if (data_len == 0) { 268 return total_bytes_consumed; 269 } 270 271 // Once the headers are finished, we simply pass the data through. 272 if (headers_decompressed_) { 273 // Some buffered header data remains. 274 if (!decompressed_headers_.empty()) { 275 ProcessHeaderData(); 276 } 277 if (decompressed_headers_.empty()) { 278 DVLOG(1) << "Delegating procesing to ProcessData"; 279 total_bytes_consumed += ProcessData(data, data_len); 280 } 281 return total_bytes_consumed; 282 } 283 284 QuicHeaderId current_header_id = 285 session_->decompressor()->current_header_id(); 286 // Ensure that this header id looks sane. 287 if (headers_id_ < current_header_id || 288 headers_id_ > kMaxHeaderIdDelta + current_header_id) { 289 DVLOG(1) << "Invalid headers for stream: " << id() 290 << " header_id: " << headers_id_ 291 << " current_header_id: " << current_header_id; 292 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); 293 return total_bytes_consumed; 294 } 295 296 // If we are head-of-line blocked on decompression, then back up. 297 if (current_header_id != headers_id_) { 298 session_->MarkDecompressionBlocked(headers_id_, id()); 299 DVLOG(1) << "Unable to decompress header data for stream: " << id() 300 << " header_id: " << headers_id_; 301 return total_bytes_consumed; 302 } 303 304 // Decompressed data will be delivered to decompressed_headers_. 305 size_t bytes_consumed = session_->decompressor()->DecompressData( 306 StringPiece(data, data_len), this); 307 DCHECK_NE(0u, bytes_consumed); 308 if (bytes_consumed > data_len) { 309 DCHECK(false) << "DecompressData returned illegal value"; 310 OnDecompressionError(); 311 return total_bytes_consumed; 312 } 313 total_bytes_consumed += bytes_consumed; 314 data += bytes_consumed; 315 data_len -= bytes_consumed; 316 317 if (decompression_failed_) { 318 // The session will have been closed in OnDecompressionError. 319 return total_bytes_consumed; 320 } 321 322 // Headers are complete if the decompressor has moved on to the 323 // next stream. 324 headers_decompressed_ = 325 session_->decompressor()->current_header_id() != headers_id_; 326 if (!headers_decompressed_) { 327 DCHECK_EQ(0u, data_len); 328 } 329 330 ProcessHeaderData(); 331 332 if (!headers_decompressed_ || !decompressed_headers_.empty()) { 333 return total_bytes_consumed; 334 } 335 336 // We have processed all of the decompressed data but we might 337 // have some more raw data to process. 338 if (data_len > 0) { 339 total_bytes_consumed += ProcessData(data, data_len); 340 } 341 342 // The sequencer will push any additional buffered frames if this data 343 // has been completely consumed. 344 return total_bytes_consumed; 345 } 346 347 uint32 ReliableQuicStream::ProcessHeaderData() { 348 if (decompressed_headers_.empty()) { 349 return 0; 350 } 351 352 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 353 decompressed_headers_.length()); 354 if (bytes_processed == decompressed_headers_.length()) { 355 decompressed_headers_.clear(); 356 } else { 357 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 358 } 359 return bytes_processed; 360 } 361 362 void ReliableQuicStream::OnDecompressorAvailable() { 363 DCHECK_EQ(headers_id_, 364 session_->decompressor()->current_header_id()); 365 DCHECK(!headers_decompressed_); 366 DCHECK(!decompression_failed_); 367 DCHECK_EQ(0u, decompressed_headers_.length()); 368 369 while (!headers_decompressed_) { 370 struct iovec iovec; 371 if (sequencer_.GetReadableRegions(&iovec, 1) == 0) { 372 return; 373 } 374 375 size_t bytes_consumed = session_->decompressor()->DecompressData( 376 StringPiece(static_cast<char*>(iovec.iov_base), 377 iovec.iov_len), 378 this); 379 DCHECK_LE(bytes_consumed, iovec.iov_len); 380 if (decompression_failed_) { 381 return; 382 } 383 sequencer_.MarkConsumed(bytes_consumed); 384 385 headers_decompressed_ = 386 session_->decompressor()->current_header_id() != headers_id_; 387 } 388 389 // Either the headers are complete, or the all data as been consumed. 390 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. 391 if (IsHalfClosed()) { 392 TerminateFromPeer(true); 393 } else if (headers_decompressed_ && decompressed_headers_.empty()) { 394 sequencer_.FlushBufferedFrames(); 395 } 396 } 397 398 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { 399 data.AppendToString(&decompressed_headers_); 400 return true; 401 } 402 403 void ReliableQuicStream::OnDecompressionError() { 404 DCHECK(!decompression_failed_); 405 decompression_failed_ = true; 406 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); 407 } 408 409 410 void ReliableQuicStream::CloseWriteSide() { 411 if (write_side_closed_) { 412 return; 413 } 414 DLOG(INFO) << "Done writing to stream " << id(); 415 416 write_side_closed_ = true; 417 if (read_side_closed_) { 418 DLOG(INFO) << "Closing stream: " << id(); 419 session_->CloseStream(id()); 420 } 421 } 422 423 void ReliableQuicStream::OnClose() { 424 CloseReadSide(); 425 CloseWriteSide(); 426 427 if (visitor_) { 428 Visitor* visitor = visitor_; 429 // Calling Visitor::OnClose() may result the destruction of the visitor, 430 // so we need to ensure we don't call it again. 431 visitor_ = NULL; 432 visitor->OnClose(this); 433 } 434 } 435 436 } // namespace net 437