1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/reliable_quic_stream.h" 6 7 #include "base/logging.h" 8 #include "net/quic/iovector.h" 9 #include "net/quic/quic_flow_controller.h" 10 #include "net/quic/quic_session.h" 11 #include "net/quic/quic_write_blocked_list.h" 12 13 using base::StringPiece; 14 using std::min; 15 16 namespace net { 17 18 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") 19 20 namespace { 21 22 struct iovec MakeIovec(StringPiece data) { 23 struct iovec iov = {const_cast<char*>(data.data()), 24 static_cast<size_t>(data.size())}; 25 return iov; 26 } 27 28 size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) { 29 QuicVersion version = session->connection()->version(); 30 if (version <= QUIC_VERSION_19) { 31 return session->config()->GetInitialFlowControlWindowToSend(); 32 } 33 34 return session->config()->GetInitialStreamFlowControlWindowToSend(); 35 } 36 37 size_t GetReceivedFlowControlWindow(QuicSession* session) { 38 QuicVersion version = session->connection()->version(); 39 if (version <= QUIC_VERSION_19) { 40 if (session->config()->HasReceivedInitialFlowControlWindowBytes()) { 41 return session->config()->ReceivedInitialFlowControlWindowBytes(); 42 } 43 44 return kDefaultFlowControlSendWindow; 45 } 46 47 // Version must be >= QUIC_VERSION_21, so we check for stream specific flow 48 // control window. 49 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) { 50 return session->config()->ReceivedInitialStreamFlowControlWindowBytes(); 51 } 52 53 return kDefaultFlowControlSendWindow; 54 } 55 56 } // namespace 57 58 // Wrapper that aggregates OnAckNotifications for packets sent using 59 // WriteOrBufferData and delivers them to the original 60 // QuicAckNotifier::DelegateInterface after all bytes written using 61 // WriteOrBufferData are acked. This level of indirection is 62 // necessary because the delegate interface provides no mechanism that 63 // WriteOrBufferData can use to inform it that the write required 64 // multiple WritevData calls or that only part of the data has been 65 // sent out by the time ACKs start arriving. 66 class ReliableQuicStream::ProxyAckNotifierDelegate 67 : public QuicAckNotifier::DelegateInterface { 68 public: 69 explicit ProxyAckNotifierDelegate(DelegateInterface* delegate) 70 : delegate_(delegate), 71 pending_acks_(0), 72 wrote_last_data_(false), 73 num_original_packets_(0), 74 num_original_bytes_(0), 75 num_retransmitted_packets_(0), 76 num_retransmitted_bytes_(0) { 77 } 78 79 virtual void OnAckNotification(int num_original_packets, 80 int num_original_bytes, 81 int num_retransmitted_packets, 82 int num_retransmitted_bytes, 83 QuicTime::Delta delta_largest_observed) 84 OVERRIDE { 85 DCHECK_LT(0, pending_acks_); 86 --pending_acks_; 87 num_original_packets_ += num_original_packets; 88 num_original_bytes_ += num_original_bytes; 89 num_retransmitted_packets_ += num_retransmitted_packets; 90 num_retransmitted_bytes_ += num_retransmitted_bytes; 91 92 if (wrote_last_data_ && pending_acks_ == 0) { 93 delegate_->OnAckNotification(num_original_packets_, 94 num_original_bytes_, 95 num_retransmitted_packets_, 96 num_retransmitted_bytes_, 97 delta_largest_observed); 98 } 99 } 100 101 void WroteData(bool last_data) { 102 DCHECK(!wrote_last_data_); 103 ++pending_acks_; 104 wrote_last_data_ = last_data; 105 } 106 107 protected: 108 // Delegates are ref counted. 109 virtual ~ProxyAckNotifierDelegate() OVERRIDE { 110 } 111 112 private: 113 // Original delegate. delegate_->OnAckNotification will be called when: 114 // wrote_last_data_ == true and pending_acks_ == 0 115 scoped_refptr<DelegateInterface> delegate_; 116 117 // Number of outstanding acks. 118 int pending_acks_; 119 120 // True if no pending writes remain. 121 bool wrote_last_data_; 122 123 // Accumulators. 124 int num_original_packets_; 125 int num_original_bytes_; 126 int num_retransmitted_packets_; 127 int num_retransmitted_bytes_; 128 129 DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate); 130 }; 131 132 ReliableQuicStream::PendingData::PendingData( 133 string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in) 134 : data(data_in), delegate(delegate_in) { 135 } 136 137 ReliableQuicStream::PendingData::~PendingData() { 138 } 139 140 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) 141 : sequencer_(this), 142 id_(id), 143 session_(session), 144 stream_bytes_read_(0), 145 stream_bytes_written_(0), 146 stream_error_(QUIC_STREAM_NO_ERROR), 147 connection_error_(QUIC_NO_ERROR), 148 read_side_closed_(false), 149 write_side_closed_(false), 150 fin_buffered_(false), 151 fin_sent_(false), 152 fin_received_(false), 153 rst_sent_(false), 154 rst_received_(false), 155 fec_policy_(FEC_PROTECT_OPTIONAL), 156 is_server_(session_->is_server()), 157 flow_controller_( 158 session_->connection(), id_, is_server_, 159 GetReceivedFlowControlWindow(session), 160 GetInitialStreamFlowControlWindowToSend(session), 161 GetInitialStreamFlowControlWindowToSend(session)), 162 connection_flow_controller_(session_->flow_controller()), 163 stream_contributes_to_connection_flow_control_(true) { 164 } 165 166 ReliableQuicStream::~ReliableQuicStream() { 167 } 168 169 void ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 170 if (read_side_closed_) { 171 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; 172 // We don't want to be reading: blackhole the data. 173 return; 174 } 175 176 if (frame.stream_id != id_) { 177 session_->connection()->SendConnectionClose(QUIC_INTERNAL_ERROR); 178 return; 179 } 180 181 if (frame.fin) { 182 fin_received_ = true; 183 } 184 185 // This count include duplicate data received. 186 size_t frame_payload_size = frame.data.TotalBufferSize(); 187 stream_bytes_read_ += frame_payload_size; 188 189 // Flow control is interested in tracking highest received offset. 190 if (MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { 191 // As the highest received offset has changed, we should check to see if 192 // this is a violation of flow control. 193 if (flow_controller_.FlowControlViolation() || 194 connection_flow_controller_->FlowControlViolation()) { 195 session_->connection()->SendConnectionClose( 196 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); 197 return; 198 } 199 } 200 201 sequencer_.OnStreamFrame(frame); 202 } 203 204 int ReliableQuicStream::num_frames_received() const { 205 return sequencer_.num_frames_received(); 206 } 207 208 int ReliableQuicStream::num_duplicate_frames_received() const { 209 return sequencer_.num_duplicate_frames_received(); 210 } 211 212 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { 213 rst_received_ = true; 214 MaybeIncreaseHighestReceivedOffset(frame.byte_offset); 215 216 stream_error_ = frame.error_code; 217 CloseWriteSide(); 218 CloseReadSide(); 219 } 220 221 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, 222 bool from_peer) { 223 if (read_side_closed_ && write_side_closed_) { 224 return; 225 } 226 if (error != QUIC_NO_ERROR) { 227 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 228 connection_error_ = error; 229 } 230 231 CloseWriteSide(); 232 CloseReadSide(); 233 } 234 235 void ReliableQuicStream::OnFinRead() { 236 DCHECK(sequencer_.IsClosed()); 237 fin_received_ = true; 238 CloseReadSide(); 239 } 240 241 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { 242 DCHECK_NE(QUIC_STREAM_NO_ERROR, error); 243 stream_error_ = error; 244 // Sending a RstStream results in calling CloseStream. 245 session()->SendRstStream(id(), error, stream_bytes_written_); 246 rst_sent_ = true; 247 } 248 249 void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 250 session()->connection()->SendConnectionClose(error); 251 } 252 253 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 254 const string& details) { 255 session()->connection()->SendConnectionCloseWithDetails(error, details); 256 } 257 258 QuicVersion ReliableQuicStream::version() const { 259 return session()->connection()->version(); 260 } 261 262 void ReliableQuicStream::WriteOrBufferData( 263 StringPiece data, 264 bool fin, 265 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 266 if (data.empty() && !fin) { 267 LOG(DFATAL) << "data.empty() && !fin"; 268 return; 269 } 270 271 if (fin_buffered_) { 272 LOG(DFATAL) << "Fin already buffered"; 273 return; 274 } 275 276 scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate; 277 if (ack_notifier_delegate != NULL) { 278 proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate); 279 } 280 281 QuicConsumedData consumed_data(0, false); 282 fin_buffered_ = fin; 283 284 if (queued_data_.empty()) { 285 struct iovec iov(MakeIovec(data)); 286 consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get()); 287 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 288 } 289 290 bool write_completed; 291 // If there's unconsumed data or an unconsumed fin, queue it. 292 if (consumed_data.bytes_consumed < data.length() || 293 (fin && !consumed_data.fin_consumed)) { 294 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); 295 queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate)); 296 write_completed = false; 297 } else { 298 write_completed = true; 299 } 300 301 if ((proxy_delegate.get() != NULL) && 302 (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) { 303 proxy_delegate->WroteData(write_completed); 304 } 305 } 306 307 void ReliableQuicStream::OnCanWrite() { 308 bool fin = false; 309 while (!queued_data_.empty()) { 310 PendingData* pending_data = &queued_data_.front(); 311 ProxyAckNotifierDelegate* delegate = pending_data->delegate.get(); 312 if (queued_data_.size() == 1 && fin_buffered_) { 313 fin = true; 314 } 315 struct iovec iov(MakeIovec(pending_data->data)); 316 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate); 317 if (consumed_data.bytes_consumed == pending_data->data.size() && 318 fin == consumed_data.fin_consumed) { 319 queued_data_.pop_front(); 320 if (delegate != NULL) { 321 delegate->WroteData(true); 322 } 323 } else { 324 if (consumed_data.bytes_consumed > 0) { 325 pending_data->data.erase(0, consumed_data.bytes_consumed); 326 if (delegate != NULL) { 327 delegate->WroteData(false); 328 } 329 } 330 break; 331 } 332 } 333 } 334 335 void ReliableQuicStream::MaybeSendBlocked() { 336 flow_controller_.MaybeSendBlocked(); 337 if (!stream_contributes_to_connection_flow_control_) { 338 return; 339 } 340 connection_flow_controller_->MaybeSendBlocked(); 341 // If we are connection level flow control blocked, then add the stream 342 // to the write blocked list. It will be given a chance to write when a 343 // connection level WINDOW_UPDATE arrives. 344 if (connection_flow_controller_->IsBlocked() && 345 !flow_controller_.IsBlocked()) { 346 session_->MarkWriteBlocked(id(), EffectivePriority()); 347 } 348 } 349 350 QuicConsumedData ReliableQuicStream::WritevData( 351 const struct iovec* iov, 352 int iov_count, 353 bool fin, 354 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 355 if (write_side_closed_) { 356 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 357 return QuicConsumedData(0, false); 358 } 359 360 // How much data we want to write. 361 size_t write_length = TotalIovecLength(iov, iov_count); 362 363 // A FIN with zero data payload should not be flow control blocked. 364 bool fin_with_zero_data = (fin && write_length == 0); 365 366 if (flow_controller_.IsEnabled()) { 367 // How much data we are allowed to write from flow control. 368 uint64 send_window = flow_controller_.SendWindowSize(); 369 // TODO(rjshade): Remove connection_flow_controller_->IsEnabled() check when 370 // removing QUIC_VERSION_19. 371 if (stream_contributes_to_connection_flow_control_ && 372 connection_flow_controller_->IsEnabled()) { 373 send_window = 374 min(send_window, connection_flow_controller_->SendWindowSize()); 375 } 376 377 if (send_window == 0 && !fin_with_zero_data) { 378 // Quick return if we can't send anything. 379 MaybeSendBlocked(); 380 return QuicConsumedData(0, false); 381 } 382 383 if (write_length > send_window) { 384 // Don't send the FIN if we aren't going to send all the data. 385 fin = false; 386 387 // Writing more data would be a violation of flow control. 388 write_length = send_window; 389 } 390 } 391 392 // Fill an IOVector with bytes from the iovec. 393 IOVector data; 394 data.AppendIovecAtMostBytes(iov, iov_count, write_length); 395 396 QuicConsumedData consumed_data = session()->WritevData( 397 id(), data, stream_bytes_written_, fin, GetFecProtection(), 398 ack_notifier_delegate); 399 stream_bytes_written_ += consumed_data.bytes_consumed; 400 401 AddBytesSent(consumed_data.bytes_consumed); 402 403 if (consumed_data.bytes_consumed == write_length) { 404 if (!fin_with_zero_data) { 405 MaybeSendBlocked(); 406 } 407 if (fin && consumed_data.fin_consumed) { 408 fin_sent_ = true; 409 CloseWriteSide(); 410 } else if (fin && !consumed_data.fin_consumed) { 411 session_->MarkWriteBlocked(id(), EffectivePriority()); 412 } 413 } else { 414 session_->MarkWriteBlocked(id(), EffectivePriority()); 415 } 416 return consumed_data; 417 } 418 419 FecProtection ReliableQuicStream::GetFecProtection() { 420 return fec_policy_ == FEC_PROTECT_ALWAYS ? MUST_FEC_PROTECT : MAY_FEC_PROTECT; 421 } 422 423 void ReliableQuicStream::CloseReadSide() { 424 if (read_side_closed_) { 425 return; 426 } 427 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); 428 429 read_side_closed_ = true; 430 if (write_side_closed_) { 431 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 432 session_->CloseStream(id()); 433 } 434 } 435 436 void ReliableQuicStream::CloseWriteSide() { 437 if (write_side_closed_) { 438 return; 439 } 440 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); 441 442 write_side_closed_ = true; 443 if (read_side_closed_) { 444 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 445 session_->CloseStream(id()); 446 } 447 } 448 449 bool ReliableQuicStream::HasBufferedData() const { 450 return !queued_data_.empty(); 451 } 452 453 void ReliableQuicStream::OnClose() { 454 CloseReadSide(); 455 CloseWriteSide(); 456 457 if (!fin_sent_ && !rst_sent_) { 458 // For flow control accounting, we must tell the peer how many bytes we have 459 // written on this stream before termination. Done here if needed, using a 460 // RST frame. 461 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); 462 session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING, 463 stream_bytes_written_); 464 rst_sent_ = true; 465 } 466 467 // We are closing the stream and will not process any further incoming bytes. 468 // As there may be more bytes in flight and we need to ensure that both 469 // endpoints have the same connection level flow control state, mark all 470 // unreceived or buffered bytes as consumed. 471 uint64 bytes_to_consume = flow_controller_.highest_received_byte_offset() - 472 flow_controller_.bytes_consumed(); 473 AddBytesConsumed(bytes_to_consume); 474 } 475 476 void ReliableQuicStream::OnWindowUpdateFrame( 477 const QuicWindowUpdateFrame& frame) { 478 if (!flow_controller_.IsEnabled()) { 479 DLOG(DFATAL) << "Flow control not enabled! " << version(); 480 return; 481 } 482 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { 483 // We can write again! 484 // TODO(rjshade): This does not respect priorities (e.g. multiple 485 // outstanding POSTs are unblocked on arrival of 486 // SHLO with initial window). 487 // As long as the connection is not flow control blocked, we can write! 488 OnCanWrite(); 489 } 490 } 491 492 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset(uint64 new_offset) { 493 if (!flow_controller_.IsEnabled()) { 494 return false; 495 } 496 uint64 increment = 497 new_offset - flow_controller_.highest_received_byte_offset(); 498 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) { 499 return false; 500 } 501 502 // If |new_offset| increased the stream flow controller's highest received 503 // offset, then we need to increase the connection flow controller's value 504 // by the incremental difference. 505 if (stream_contributes_to_connection_flow_control_) { 506 connection_flow_controller_->UpdateHighestReceivedOffset( 507 connection_flow_controller_->highest_received_byte_offset() + 508 increment); 509 } 510 return true; 511 } 512 513 void ReliableQuicStream::AddBytesSent(uint64 bytes) { 514 if (flow_controller_.IsEnabled()) { 515 flow_controller_.AddBytesSent(bytes); 516 if (stream_contributes_to_connection_flow_control_) { 517 connection_flow_controller_->AddBytesSent(bytes); 518 } 519 } 520 } 521 522 void ReliableQuicStream::AddBytesConsumed(uint64 bytes) { 523 if (flow_controller_.IsEnabled()) { 524 // Only adjust stream level flow controller if we are still reading. 525 if (!read_side_closed_) { 526 flow_controller_.AddBytesConsumed(bytes); 527 } 528 529 if (stream_contributes_to_connection_flow_control_) { 530 connection_flow_controller_->AddBytesConsumed(bytes); 531 } 532 } 533 } 534 535 void ReliableQuicStream::UpdateSendWindowOffset(uint64 new_window) { 536 if (flow_controller_.UpdateSendWindowOffset(new_window)) { 537 OnCanWrite(); 538 } 539 } 540 541 bool ReliableQuicStream::IsFlowControlBlocked() { 542 if (flow_controller_.IsBlocked()) { 543 return true; 544 } 545 return stream_contributes_to_connection_flow_control_ && 546 connection_flow_controller_->IsBlocked(); 547 } 548 549 } // namespace net 550