Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/reliable_quic_stream.h"
      6 
      7 #include "base/logging.h"
      8 #include "net/quic/iovector.h"
      9 #include "net/quic/quic_flow_controller.h"
     10 #include "net/quic/quic_session.h"
     11 #include "net/quic/quic_write_blocked_list.h"
     12 
     13 using base::StringPiece;
     14 using std::min;
     15 
     16 namespace net {
     17 
     18 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
     19 
     20 namespace {
     21 
     22 struct iovec MakeIovec(StringPiece data) {
     23   struct iovec iov = {const_cast<char*>(data.data()),
     24                       static_cast<size_t>(data.size())};
     25   return iov;
     26 }
     27 
     28 size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) {
     29   QuicVersion version = session->connection()->version();
     30   if (version <= QUIC_VERSION_19) {
     31     return session->config()->GetInitialFlowControlWindowToSend();
     32   }
     33 
     34   return session->config()->GetInitialStreamFlowControlWindowToSend();
     35 }
     36 
     37 size_t GetReceivedFlowControlWindow(QuicSession* session) {
     38   QuicVersion version = session->connection()->version();
     39   if (version <= QUIC_VERSION_19) {
     40     if (session->config()->HasReceivedInitialFlowControlWindowBytes()) {
     41       return session->config()->ReceivedInitialFlowControlWindowBytes();
     42     }
     43 
     44     return kDefaultFlowControlSendWindow;
     45   }
     46 
     47   // Version must be >= QUIC_VERSION_21, so we check for stream specific flow
     48   // control window.
     49   if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
     50     return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
     51   }
     52 
     53   return kDefaultFlowControlSendWindow;
     54 }
     55 
     56 }  // namespace
     57 
     58 // Wrapper that aggregates OnAckNotifications for packets sent using
     59 // WriteOrBufferData and delivers them to the original
     60 // QuicAckNotifier::DelegateInterface after all bytes written using
     61 // WriteOrBufferData are acked.  This level of indirection is
     62 // necessary because the delegate interface provides no mechanism that
     63 // WriteOrBufferData can use to inform it that the write required
     64 // multiple WritevData calls or that only part of the data has been
     65 // sent out by the time ACKs start arriving.
     66 class ReliableQuicStream::ProxyAckNotifierDelegate
     67     : public QuicAckNotifier::DelegateInterface {
     68  public:
     69   explicit ProxyAckNotifierDelegate(DelegateInterface* delegate)
     70       : delegate_(delegate),
     71         pending_acks_(0),
     72         wrote_last_data_(false),
     73         num_original_packets_(0),
     74         num_original_bytes_(0),
     75         num_retransmitted_packets_(0),
     76         num_retransmitted_bytes_(0) {
     77   }
     78 
     79   virtual void OnAckNotification(int num_original_packets,
     80                                  int num_original_bytes,
     81                                  int num_retransmitted_packets,
     82                                  int num_retransmitted_bytes,
     83                                  QuicTime::Delta delta_largest_observed)
     84       OVERRIDE {
     85     DCHECK_LT(0, pending_acks_);
     86     --pending_acks_;
     87     num_original_packets_ += num_original_packets;
     88     num_original_bytes_ += num_original_bytes;
     89     num_retransmitted_packets_ += num_retransmitted_packets;
     90     num_retransmitted_bytes_ += num_retransmitted_bytes;
     91 
     92     if (wrote_last_data_ && pending_acks_ == 0) {
     93       delegate_->OnAckNotification(num_original_packets_,
     94                                    num_original_bytes_,
     95                                    num_retransmitted_packets_,
     96                                    num_retransmitted_bytes_,
     97                                    delta_largest_observed);
     98     }
     99   }
    100 
    101   void WroteData(bool last_data) {
    102     DCHECK(!wrote_last_data_);
    103     ++pending_acks_;
    104     wrote_last_data_ = last_data;
    105   }
    106 
    107  protected:
    108   // Delegates are ref counted.
    109   virtual ~ProxyAckNotifierDelegate() OVERRIDE {
    110   }
    111 
    112  private:
    113   // Original delegate.  delegate_->OnAckNotification will be called when:
    114   //   wrote_last_data_ == true and pending_acks_ == 0
    115   scoped_refptr<DelegateInterface> delegate_;
    116 
    117   // Number of outstanding acks.
    118   int pending_acks_;
    119 
    120   // True if no pending writes remain.
    121   bool wrote_last_data_;
    122 
    123   // Accumulators.
    124   int num_original_packets_;
    125   int num_original_bytes_;
    126   int num_retransmitted_packets_;
    127   int num_retransmitted_bytes_;
    128 
    129   DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate);
    130 };
    131 
    132 ReliableQuicStream::PendingData::PendingData(
    133     string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in)
    134     : data(data_in), delegate(delegate_in) {
    135 }
    136 
    137 ReliableQuicStream::PendingData::~PendingData() {
    138 }
    139 
    140 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
    141     : sequencer_(this),
    142       id_(id),
    143       session_(session),
    144       stream_bytes_read_(0),
    145       stream_bytes_written_(0),
    146       stream_error_(QUIC_STREAM_NO_ERROR),
    147       connection_error_(QUIC_NO_ERROR),
    148       read_side_closed_(false),
    149       write_side_closed_(false),
    150       fin_buffered_(false),
    151       fin_sent_(false),
    152       fin_received_(false),
    153       rst_sent_(false),
    154       rst_received_(false),
    155       fec_policy_(FEC_PROTECT_OPTIONAL),
    156       is_server_(session_->is_server()),
    157       flow_controller_(
    158           session_->connection(), id_, is_server_,
    159           GetReceivedFlowControlWindow(session),
    160           GetInitialStreamFlowControlWindowToSend(session),
    161           GetInitialStreamFlowControlWindowToSend(session)),
    162       connection_flow_controller_(session_->flow_controller()),
    163       stream_contributes_to_connection_flow_control_(true) {
    164 }
    165 
    166 ReliableQuicStream::~ReliableQuicStream() {
    167 }
    168 
    169 void ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
    170   if (read_side_closed_) {
    171     DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
    172     // We don't want to be reading: blackhole the data.
    173     return;
    174   }
    175 
    176   if (frame.stream_id != id_) {
    177     session_->connection()->SendConnectionClose(QUIC_INTERNAL_ERROR);
    178     return;
    179   }
    180 
    181   if (frame.fin) {
    182     fin_received_ = true;
    183   }
    184 
    185   // This count include duplicate data received.
    186   size_t frame_payload_size = frame.data.TotalBufferSize();
    187   stream_bytes_read_ += frame_payload_size;
    188 
    189   // Flow control is interested in tracking highest received offset.
    190   if (MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
    191     // As the highest received offset has changed, we should check to see if
    192     // this is a violation of flow control.
    193     if (flow_controller_.FlowControlViolation() ||
    194         connection_flow_controller_->FlowControlViolation()) {
    195       session_->connection()->SendConnectionClose(
    196           QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA);
    197       return;
    198     }
    199   }
    200 
    201   sequencer_.OnStreamFrame(frame);
    202 }
    203 
    204 int ReliableQuicStream::num_frames_received() const {
    205   return sequencer_.num_frames_received();
    206 }
    207 
    208 int ReliableQuicStream::num_duplicate_frames_received() const {
    209   return sequencer_.num_duplicate_frames_received();
    210 }
    211 
    212 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
    213   rst_received_ = true;
    214   MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
    215 
    216   stream_error_ = frame.error_code;
    217   CloseWriteSide();
    218   CloseReadSide();
    219 }
    220 
    221 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
    222                                             bool from_peer) {
    223   if (read_side_closed_ && write_side_closed_) {
    224     return;
    225   }
    226   if (error != QUIC_NO_ERROR) {
    227     stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
    228     connection_error_ = error;
    229   }
    230 
    231   CloseWriteSide();
    232   CloseReadSide();
    233 }
    234 
    235 void ReliableQuicStream::OnFinRead() {
    236   DCHECK(sequencer_.IsClosed());
    237   fin_received_ = true;
    238   CloseReadSide();
    239 }
    240 
    241 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
    242   DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
    243   stream_error_ = error;
    244   // Sending a RstStream results in calling CloseStream.
    245   session()->SendRstStream(id(), error, stream_bytes_written_);
    246   rst_sent_ = true;
    247 }
    248 
    249 void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
    250   session()->connection()->SendConnectionClose(error);
    251 }
    252 
    253 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
    254                                                     const string& details) {
    255   session()->connection()->SendConnectionCloseWithDetails(error, details);
    256 }
    257 
    258 QuicVersion ReliableQuicStream::version() const {
    259   return session()->connection()->version();
    260 }
    261 
    262 void ReliableQuicStream::WriteOrBufferData(
    263     StringPiece data,
    264     bool fin,
    265     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
    266   if (data.empty() && !fin) {
    267     LOG(DFATAL) << "data.empty() && !fin";
    268     return;
    269   }
    270 
    271   if (fin_buffered_) {
    272     LOG(DFATAL) << "Fin already buffered";
    273     return;
    274   }
    275 
    276   scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate;
    277   if (ack_notifier_delegate != NULL) {
    278     proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate);
    279   }
    280 
    281   QuicConsumedData consumed_data(0, false);
    282   fin_buffered_ = fin;
    283 
    284   if (queued_data_.empty()) {
    285     struct iovec iov(MakeIovec(data));
    286     consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get());
    287     DCHECK_LE(consumed_data.bytes_consumed, data.length());
    288   }
    289 
    290   bool write_completed;
    291   // If there's unconsumed data or an unconsumed fin, queue it.
    292   if (consumed_data.bytes_consumed < data.length() ||
    293       (fin && !consumed_data.fin_consumed)) {
    294     StringPiece remainder(data.substr(consumed_data.bytes_consumed));
    295     queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate));
    296     write_completed = false;
    297   } else {
    298     write_completed = true;
    299   }
    300 
    301   if ((proxy_delegate.get() != NULL) &&
    302       (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) {
    303     proxy_delegate->WroteData(write_completed);
    304   }
    305 }
    306 
    307 void ReliableQuicStream::OnCanWrite() {
    308   bool fin = false;
    309   while (!queued_data_.empty()) {
    310     PendingData* pending_data = &queued_data_.front();
    311     ProxyAckNotifierDelegate* delegate = pending_data->delegate.get();
    312     if (queued_data_.size() == 1 && fin_buffered_) {
    313       fin = true;
    314     }
    315     struct iovec iov(MakeIovec(pending_data->data));
    316     QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate);
    317     if (consumed_data.bytes_consumed == pending_data->data.size() &&
    318         fin == consumed_data.fin_consumed) {
    319       queued_data_.pop_front();
    320       if (delegate != NULL) {
    321         delegate->WroteData(true);
    322       }
    323     } else {
    324       if (consumed_data.bytes_consumed > 0) {
    325         pending_data->data.erase(0, consumed_data.bytes_consumed);
    326         if (delegate != NULL) {
    327           delegate->WroteData(false);
    328         }
    329       }
    330       break;
    331     }
    332   }
    333 }
    334 
    335 void ReliableQuicStream::MaybeSendBlocked() {
    336   flow_controller_.MaybeSendBlocked();
    337   if (!stream_contributes_to_connection_flow_control_) {
    338     return;
    339   }
    340   connection_flow_controller_->MaybeSendBlocked();
    341   // If we are connection level flow control blocked, then add the stream
    342   // to the write blocked list. It will be given a chance to write when a
    343   // connection level WINDOW_UPDATE arrives.
    344   if (connection_flow_controller_->IsBlocked() &&
    345       !flow_controller_.IsBlocked()) {
    346     session_->MarkWriteBlocked(id(), EffectivePriority());
    347   }
    348 }
    349 
    350 QuicConsumedData ReliableQuicStream::WritevData(
    351     const struct iovec* iov,
    352     int iov_count,
    353     bool fin,
    354     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
    355   if (write_side_closed_) {
    356     DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
    357     return QuicConsumedData(0, false);
    358   }
    359 
    360   // How much data we want to write.
    361   size_t write_length = TotalIovecLength(iov, iov_count);
    362 
    363   // A FIN with zero data payload should not be flow control blocked.
    364   bool fin_with_zero_data = (fin && write_length == 0);
    365 
    366   if (flow_controller_.IsEnabled()) {
    367     // How much data we are allowed to write from flow control.
    368     uint64 send_window = flow_controller_.SendWindowSize();
    369     // TODO(rjshade): Remove connection_flow_controller_->IsEnabled() check when
    370     // removing QUIC_VERSION_19.
    371     if (stream_contributes_to_connection_flow_control_ &&
    372         connection_flow_controller_->IsEnabled()) {
    373       send_window =
    374           min(send_window, connection_flow_controller_->SendWindowSize());
    375     }
    376 
    377     if (send_window == 0 && !fin_with_zero_data) {
    378       // Quick return if we can't send anything.
    379       MaybeSendBlocked();
    380       return QuicConsumedData(0, false);
    381     }
    382 
    383     if (write_length > send_window) {
    384       // Don't send the FIN if we aren't going to send all the data.
    385       fin = false;
    386 
    387       // Writing more data would be a violation of flow control.
    388       write_length = send_window;
    389     }
    390   }
    391 
    392   // Fill an IOVector with bytes from the iovec.
    393   IOVector data;
    394   data.AppendIovecAtMostBytes(iov, iov_count, write_length);
    395 
    396   QuicConsumedData consumed_data = session()->WritevData(
    397       id(), data, stream_bytes_written_, fin, GetFecProtection(),
    398       ack_notifier_delegate);
    399   stream_bytes_written_ += consumed_data.bytes_consumed;
    400 
    401   AddBytesSent(consumed_data.bytes_consumed);
    402 
    403   if (consumed_data.bytes_consumed == write_length) {
    404     if (!fin_with_zero_data) {
    405       MaybeSendBlocked();
    406     }
    407     if (fin && consumed_data.fin_consumed) {
    408       fin_sent_ = true;
    409       CloseWriteSide();
    410     } else if (fin && !consumed_data.fin_consumed) {
    411       session_->MarkWriteBlocked(id(), EffectivePriority());
    412     }
    413   } else {
    414     session_->MarkWriteBlocked(id(), EffectivePriority());
    415   }
    416   return consumed_data;
    417 }
    418 
    419 FecProtection ReliableQuicStream::GetFecProtection() {
    420   return fec_policy_ == FEC_PROTECT_ALWAYS ? MUST_FEC_PROTECT : MAY_FEC_PROTECT;
    421 }
    422 
    423 void ReliableQuicStream::CloseReadSide() {
    424   if (read_side_closed_) {
    425     return;
    426   }
    427   DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
    428 
    429   read_side_closed_ = true;
    430   if (write_side_closed_) {
    431     DVLOG(1) << ENDPOINT << "Closing stream: " << id();
    432     session_->CloseStream(id());
    433   }
    434 }
    435 
    436 void ReliableQuicStream::CloseWriteSide() {
    437   if (write_side_closed_) {
    438     return;
    439   }
    440   DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
    441 
    442   write_side_closed_ = true;
    443   if (read_side_closed_) {
    444     DVLOG(1) << ENDPOINT << "Closing stream: " << id();
    445     session_->CloseStream(id());
    446   }
    447 }
    448 
    449 bool ReliableQuicStream::HasBufferedData() const {
    450   return !queued_data_.empty();
    451 }
    452 
    453 void ReliableQuicStream::OnClose() {
    454   CloseReadSide();
    455   CloseWriteSide();
    456 
    457   if (!fin_sent_ && !rst_sent_) {
    458     // For flow control accounting, we must tell the peer how many bytes we have
    459     // written on this stream before termination. Done here if needed, using a
    460     // RST frame.
    461     DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
    462     session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING,
    463                             stream_bytes_written_);
    464     rst_sent_ = true;
    465   }
    466 
    467   // We are closing the stream and will not process any further incoming bytes.
    468   // As there may be more bytes in flight and we need to ensure that both
    469   // endpoints have the same connection level flow control state, mark all
    470   // unreceived or buffered bytes as consumed.
    471   uint64 bytes_to_consume = flow_controller_.highest_received_byte_offset() -
    472       flow_controller_.bytes_consumed();
    473   AddBytesConsumed(bytes_to_consume);
    474 }
    475 
    476 void ReliableQuicStream::OnWindowUpdateFrame(
    477     const QuicWindowUpdateFrame& frame) {
    478   if (!flow_controller_.IsEnabled()) {
    479     DLOG(DFATAL) << "Flow control not enabled! " << version();
    480     return;
    481   }
    482   if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
    483     // We can write again!
    484     // TODO(rjshade): This does not respect priorities (e.g. multiple
    485     //                outstanding POSTs are unblocked on arrival of
    486     //                SHLO with initial window).
    487     // As long as the connection is not flow control blocked, we can write!
    488     OnCanWrite();
    489   }
    490 }
    491 
    492 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset(uint64 new_offset) {
    493   if (!flow_controller_.IsEnabled()) {
    494     return false;
    495   }
    496   uint64 increment =
    497       new_offset - flow_controller_.highest_received_byte_offset();
    498   if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
    499     return false;
    500   }
    501 
    502   // If |new_offset| increased the stream flow controller's highest received
    503   // offset, then we need to increase the connection flow controller's value
    504   // by the incremental difference.
    505   if (stream_contributes_to_connection_flow_control_) {
    506     connection_flow_controller_->UpdateHighestReceivedOffset(
    507         connection_flow_controller_->highest_received_byte_offset() +
    508         increment);
    509   }
    510   return true;
    511 }
    512 
    513 void ReliableQuicStream::AddBytesSent(uint64 bytes) {
    514   if (flow_controller_.IsEnabled()) {
    515     flow_controller_.AddBytesSent(bytes);
    516     if (stream_contributes_to_connection_flow_control_) {
    517       connection_flow_controller_->AddBytesSent(bytes);
    518     }
    519   }
    520 }
    521 
    522 void ReliableQuicStream::AddBytesConsumed(uint64 bytes) {
    523   if (flow_controller_.IsEnabled()) {
    524     // Only adjust stream level flow controller if we are still reading.
    525     if (!read_side_closed_) {
    526       flow_controller_.AddBytesConsumed(bytes);
    527     }
    528 
    529     if (stream_contributes_to_connection_flow_control_) {
    530       connection_flow_controller_->AddBytesConsumed(bytes);
    531     }
    532   }
    533 }
    534 
    535 void ReliableQuicStream::UpdateSendWindowOffset(uint64 new_window) {
    536   if (flow_controller_.UpdateSendWindowOffset(new_window)) {
    537     OnCanWrite();
    538   }
    539 }
    540 
    541 bool ReliableQuicStream::IsFlowControlBlocked() {
    542   if (flow_controller_.IsBlocked()) {
    543     return true;
    544   }
    545   return stream_contributes_to_connection_flow_control_ &&
    546       connection_flow_controller_->IsBlocked();
    547 }
    548 
    549 }  // namespace net
    550