Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/reliable_quic_stream.h"
      6 
      7 #include "base/logging.h"
      8 #include "net/quic/iovector.h"
      9 #include "net/quic/quic_flow_controller.h"
     10 #include "net/quic/quic_session.h"
     11 #include "net/quic/quic_write_blocked_list.h"
     12 
     13 using base::StringPiece;
     14 using std::min;
     15 
     16 namespace net {
     17 
     18 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
     19 
     20 namespace {
     21 
     22 struct iovec MakeIovec(StringPiece data) {
     23   struct iovec iov = {const_cast<char*>(data.data()),
     24                       static_cast<size_t>(data.size())};
     25   return iov;
     26 }
     27 
     28 size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) {
     29   QuicVersion version = session->connection()->version();
     30   if (version <= QUIC_VERSION_19) {
     31     return session->config()->GetInitialFlowControlWindowToSend();
     32   }
     33 
     34   return session->config()->GetInitialStreamFlowControlWindowToSend();
     35 }
     36 
     37 size_t GetReceivedFlowControlWindow(QuicSession* session) {
     38   QuicVersion version = session->connection()->version();
     39   if (version <= QUIC_VERSION_19) {
     40     if (session->config()->HasReceivedInitialFlowControlWindowBytes()) {
     41       return session->config()->ReceivedInitialFlowControlWindowBytes();
     42     }
     43 
     44     return kDefaultFlowControlSendWindow;
     45   }
     46 
     47   // Version must be >= QUIC_VERSION_20, so we check for stream specific flow
     48   // control window.
     49   if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
     50     return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
     51   }
     52 
     53   return kDefaultFlowControlSendWindow;
     54 }
     55 
     56 }  // namespace
     57 
     58 // Wrapper that aggregates OnAckNotifications for packets sent using
     59 // WriteOrBufferData and delivers them to the original
     60 // QuicAckNotifier::DelegateInterface after all bytes written using
     61 // WriteOrBufferData are acked.  This level of indirection is
     62 // necessary because the delegate interface provides no mechanism that
     63 // WriteOrBufferData can use to inform it that the write required
     64 // multiple WritevData calls or that only part of the data has been
     65 // sent out by the time ACKs start arriving.
     66 class ReliableQuicStream::ProxyAckNotifierDelegate
     67     : public QuicAckNotifier::DelegateInterface {
     68  public:
     69   explicit ProxyAckNotifierDelegate(DelegateInterface* delegate)
     70       : delegate_(delegate),
     71         pending_acks_(0),
     72         wrote_last_data_(false),
     73         num_original_packets_(0),
     74         num_original_bytes_(0),
     75         num_retransmitted_packets_(0),
     76         num_retransmitted_bytes_(0) {
     77   }
     78 
     79   virtual void OnAckNotification(int num_original_packets,
     80                                  int num_original_bytes,
     81                                  int num_retransmitted_packets,
     82                                  int num_retransmitted_bytes,
     83                                  QuicTime::Delta delta_largest_observed)
     84       OVERRIDE {
     85     DCHECK_LT(0, pending_acks_);
     86     --pending_acks_;
     87     num_original_packets_ += num_original_packets;
     88     num_original_bytes_ += num_original_bytes;
     89     num_retransmitted_packets_ += num_retransmitted_packets;
     90     num_retransmitted_bytes_ += num_retransmitted_bytes;
     91 
     92     if (wrote_last_data_ && pending_acks_ == 0) {
     93       delegate_->OnAckNotification(num_original_packets_,
     94                                    num_original_bytes_,
     95                                    num_retransmitted_packets_,
     96                                    num_retransmitted_bytes_,
     97                                    delta_largest_observed);
     98     }
     99   }
    100 
    101   void WroteData(bool last_data) {
    102     DCHECK(!wrote_last_data_);
    103     ++pending_acks_;
    104     wrote_last_data_ = last_data;
    105   }
    106 
    107  protected:
    108   // Delegates are ref counted.
    109   virtual ~ProxyAckNotifierDelegate() OVERRIDE {
    110   }
    111 
    112  private:
    113   // Original delegate.  delegate_->OnAckNotification will be called when:
    114   //   wrote_last_data_ == true and pending_acks_ == 0
    115   scoped_refptr<DelegateInterface> delegate_;
    116 
    117   // Number of outstanding acks.
    118   int pending_acks_;
    119 
    120   // True if no pending writes remain.
    121   bool wrote_last_data_;
    122 
    123   // Accumulators.
    124   int num_original_packets_;
    125   int num_original_bytes_;
    126   int num_retransmitted_packets_;
    127   int num_retransmitted_bytes_;
    128 
    129   DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate);
    130 };
    131 
    132 ReliableQuicStream::PendingData::PendingData(
    133     string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in)
    134     : data(data_in), delegate(delegate_in) {
    135 }
    136 
    137 ReliableQuicStream::PendingData::~PendingData() {
    138 }
    139 
    140 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
    141     : sequencer_(this),
    142       id_(id),
    143       session_(session),
    144       stream_bytes_read_(0),
    145       stream_bytes_written_(0),
    146       stream_error_(QUIC_STREAM_NO_ERROR),
    147       connection_error_(QUIC_NO_ERROR),
    148       read_side_closed_(false),
    149       write_side_closed_(false),
    150       fin_buffered_(false),
    151       fin_sent_(false),
    152       fin_received_(false),
    153       rst_sent_(false),
    154       rst_received_(false),
    155       fec_policy_(FEC_PROTECT_OPTIONAL),
    156       is_server_(session_->is_server()),
    157       flow_controller_(
    158           session_->connection(), id_, is_server_,
    159           GetReceivedFlowControlWindow(session),
    160           GetInitialStreamFlowControlWindowToSend(session),
    161           GetInitialStreamFlowControlWindowToSend(session)),
    162       connection_flow_controller_(session_->flow_controller()) {
    163 }
    164 
    165 ReliableQuicStream::~ReliableQuicStream() {
    166 }
    167 
    168 bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
    169   if (read_side_closed_) {
    170     DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
    171     // We don't want to be reading: blackhole the data.
    172     return true;
    173   }
    174 
    175   if (frame.stream_id != id_) {
    176     LOG(ERROR) << "Error!";
    177     return false;
    178   }
    179 
    180   if (frame.fin) {
    181     fin_received_ = true;
    182   }
    183 
    184   // This count include duplicate data received.
    185   size_t frame_payload_size = frame.data.TotalBufferSize();
    186   stream_bytes_read_ += frame_payload_size;
    187 
    188   // Flow control is interested in tracking highest received offset.
    189   if (MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
    190     // As the highest received offset has changed, we should check to see if
    191     // this is a violation of flow control.
    192     if (flow_controller_.FlowControlViolation() ||
    193         connection_flow_controller_->FlowControlViolation()) {
    194       session_->connection()->SendConnectionClose(
    195           QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA);
    196       return false;
    197     }
    198   }
    199 
    200   return sequencer_.OnStreamFrame(frame);
    201 }
    202 
    203 int ReliableQuicStream::num_frames_received() const {
    204   return sequencer_.num_frames_received();
    205 }
    206 
    207 int ReliableQuicStream::num_duplicate_frames_received() const {
    208   return sequencer_.num_duplicate_frames_received();
    209 }
    210 
    211 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
    212   rst_received_ = true;
    213   MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
    214 
    215   stream_error_ = frame.error_code;
    216   CloseWriteSide();
    217   CloseReadSide();
    218 }
    219 
    220 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
    221                                             bool from_peer) {
    222   if (read_side_closed_ && write_side_closed_) {
    223     return;
    224   }
    225   if (error != QUIC_NO_ERROR) {
    226     stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
    227     connection_error_ = error;
    228   }
    229 
    230   CloseWriteSide();
    231   CloseReadSide();
    232 }
    233 
    234 void ReliableQuicStream::OnFinRead() {
    235   DCHECK(sequencer_.IsClosed());
    236   CloseReadSide();
    237 }
    238 
    239 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
    240   DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
    241   stream_error_ = error;
    242   // Sending a RstStream results in calling CloseStream.
    243   session()->SendRstStream(id(), error, stream_bytes_written_);
    244   rst_sent_ = true;
    245 }
    246 
    247 void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
    248   session()->connection()->SendConnectionClose(error);
    249 }
    250 
    251 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
    252                                                     const string& details) {
    253   session()->connection()->SendConnectionCloseWithDetails(error, details);
    254 }
    255 
    256 QuicVersion ReliableQuicStream::version() const {
    257   return session()->connection()->version();
    258 }
    259 
    260 void ReliableQuicStream::WriteOrBufferData(
    261     StringPiece data,
    262     bool fin,
    263     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
    264   if (data.empty() && !fin) {
    265     LOG(DFATAL) << "data.empty() && !fin";
    266     return;
    267   }
    268 
    269   if (fin_buffered_) {
    270     LOG(DFATAL) << "Fin already buffered";
    271     return;
    272   }
    273 
    274   scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate;
    275   if (ack_notifier_delegate != NULL) {
    276     proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate);
    277   }
    278 
    279   QuicConsumedData consumed_data(0, false);
    280   fin_buffered_ = fin;
    281 
    282   if (queued_data_.empty()) {
    283     struct iovec iov(MakeIovec(data));
    284     consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get());
    285     DCHECK_LE(consumed_data.bytes_consumed, data.length());
    286   }
    287 
    288   bool write_completed;
    289   // If there's unconsumed data or an unconsumed fin, queue it.
    290   if (consumed_data.bytes_consumed < data.length() ||
    291       (fin && !consumed_data.fin_consumed)) {
    292     StringPiece remainder(data.substr(consumed_data.bytes_consumed));
    293     queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate));
    294     write_completed = false;
    295   } else {
    296     write_completed = true;
    297   }
    298 
    299   if ((proxy_delegate.get() != NULL) &&
    300       (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) {
    301     proxy_delegate->WroteData(write_completed);
    302   }
    303 }
    304 
    305 void ReliableQuicStream::OnCanWrite() {
    306   bool fin = false;
    307   while (!queued_data_.empty()) {
    308     PendingData* pending_data = &queued_data_.front();
    309     ProxyAckNotifierDelegate* delegate = pending_data->delegate.get();
    310     if (queued_data_.size() == 1 && fin_buffered_) {
    311       fin = true;
    312     }
    313     struct iovec iov(MakeIovec(pending_data->data));
    314     QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate);
    315     if (consumed_data.bytes_consumed == pending_data->data.size() &&
    316         fin == consumed_data.fin_consumed) {
    317       queued_data_.pop_front();
    318       if (delegate != NULL) {
    319         delegate->WroteData(true);
    320       }
    321     } else {
    322       if (consumed_data.bytes_consumed > 0) {
    323         pending_data->data.erase(0, consumed_data.bytes_consumed);
    324         if (delegate != NULL) {
    325           delegate->WroteData(false);
    326         }
    327       }
    328       break;
    329     }
    330   }
    331 }
    332 
    333 void ReliableQuicStream::MaybeSendBlocked() {
    334   flow_controller_.MaybeSendBlocked();
    335   connection_flow_controller_->MaybeSendBlocked();
    336   // If we are connection level flow control blocked, then add the stream
    337   // to the write blocked list. It will be given a chance to write when a
    338   // connection level WINDOW_UPDATE arrives.
    339   if (connection_flow_controller_->IsBlocked() &&
    340       !flow_controller_.IsBlocked()) {
    341     session_->MarkWriteBlocked(id(), EffectivePriority());
    342   }
    343 }
    344 
    345 QuicConsumedData ReliableQuicStream::WritevData(
    346     const struct iovec* iov,
    347     int iov_count,
    348     bool fin,
    349     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
    350   if (write_side_closed_) {
    351     DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
    352     return QuicConsumedData(0, false);
    353   }
    354 
    355   // How much data we want to write.
    356   size_t write_length = TotalIovecLength(iov, iov_count);
    357 
    358   // A FIN with zero data payload should not be flow control blocked.
    359   bool fin_with_zero_data = (fin && write_length == 0);
    360 
    361   if (flow_controller_.IsEnabled()) {
    362     // How much data we are allowed to write from flow control.
    363     uint64 send_window = flow_controller_.SendWindowSize();
    364     if (connection_flow_controller_->IsEnabled()) {
    365       send_window =
    366           min(send_window, connection_flow_controller_->SendWindowSize());
    367     }
    368 
    369     if (send_window == 0 && !fin_with_zero_data) {
    370       // Quick return if we can't send anything.
    371       MaybeSendBlocked();
    372       return QuicConsumedData(0, false);
    373     }
    374 
    375     if (write_length > send_window) {
    376       // Don't send the FIN if we aren't going to send all the data.
    377       fin = false;
    378 
    379       // Writing more data would be a violation of flow control.
    380       write_length = send_window;
    381     }
    382   }
    383 
    384   // Fill an IOVector with bytes from the iovec.
    385   IOVector data;
    386   data.AppendIovecAtMostBytes(iov, iov_count, write_length);
    387 
    388   QuicConsumedData consumed_data = session()->WritevData(
    389       id(), data, stream_bytes_written_, fin, GetFecProtection(),
    390       ack_notifier_delegate);
    391   stream_bytes_written_ += consumed_data.bytes_consumed;
    392 
    393   AddBytesSent(consumed_data.bytes_consumed);
    394 
    395   if (consumed_data.bytes_consumed == write_length) {
    396     if (!fin_with_zero_data) {
    397       MaybeSendBlocked();
    398     }
    399     if (fin && consumed_data.fin_consumed) {
    400       fin_sent_ = true;
    401       CloseWriteSide();
    402     } else if (fin && !consumed_data.fin_consumed) {
    403       session_->MarkWriteBlocked(id(), EffectivePriority());
    404     }
    405   } else {
    406     session_->MarkWriteBlocked(id(), EffectivePriority());
    407   }
    408   return consumed_data;
    409 }
    410 
    411 FecProtection ReliableQuicStream::GetFecProtection() {
    412   return fec_policy_ == FEC_PROTECT_ALWAYS ? MUST_FEC_PROTECT : MAY_FEC_PROTECT;
    413 }
    414 
    415 void ReliableQuicStream::CloseReadSide() {
    416   if (read_side_closed_) {
    417     return;
    418   }
    419   DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
    420 
    421   read_side_closed_ = true;
    422   if (write_side_closed_) {
    423     DVLOG(1) << ENDPOINT << "Closing stream: " << id();
    424     session_->CloseStream(id());
    425   }
    426 }
    427 
    428 void ReliableQuicStream::CloseWriteSide() {
    429   if (write_side_closed_) {
    430     return;
    431   }
    432   DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
    433 
    434   write_side_closed_ = true;
    435   if (read_side_closed_) {
    436     DVLOG(1) << ENDPOINT << "Closing stream: " << id();
    437     session_->CloseStream(id());
    438   }
    439 }
    440 
    441 bool ReliableQuicStream::HasBufferedData() const {
    442   return !queued_data_.empty();
    443 }
    444 
    445 void ReliableQuicStream::OnClose() {
    446   CloseReadSide();
    447   CloseWriteSide();
    448 
    449   if (!fin_sent_ && !rst_sent_) {
    450     // For flow control accounting, we must tell the peer how many bytes we have
    451     // written on this stream before termination. Done here if needed, using a
    452     // RST frame.
    453     DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
    454     session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING,
    455                             stream_bytes_written_);
    456     rst_sent_ = true;
    457   }
    458 
    459   // We are closing the stream and will not process any further incoming bytes.
    460   // As there may be more bytes in flight and we need to ensure that both
    461   // endpoints have the same connection level flow control state, mark all
    462   // unreceived or buffered bytes as consumed.
    463   uint64 bytes_to_consume = flow_controller_.highest_received_byte_offset() -
    464       flow_controller_.bytes_consumed();
    465   AddBytesConsumed(bytes_to_consume);
    466 }
    467 
    468 void ReliableQuicStream::OnWindowUpdateFrame(
    469     const QuicWindowUpdateFrame& frame) {
    470   if (!flow_controller_.IsEnabled()) {
    471     DLOG(DFATAL) << "Flow control not enabled! " << version();
    472     return;
    473   }
    474 
    475   if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
    476     // We can write again!
    477     // TODO(rjshade): This does not respect priorities (e.g. multiple
    478     //                outstanding POSTs are unblocked on arrival of
    479     //                SHLO with initial window).
    480     // As long as the connection is not flow control blocked, we can write!
    481     OnCanWrite();
    482   }
    483 }
    484 
    485 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset(uint64 new_offset) {
    486   if (flow_controller_.IsEnabled()) {
    487     uint64 increment =
    488         new_offset - flow_controller_.highest_received_byte_offset();
    489     if (flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
    490       // If |new_offset| increased the stream flow controller's highest received
    491       // offset, then we need to increase the connection flow controller's value
    492       // by the incremental difference.
    493       connection_flow_controller_->UpdateHighestReceivedOffset(
    494           connection_flow_controller_->highest_received_byte_offset() +
    495           increment);
    496       return true;
    497     }
    498   }
    499   return false;
    500 }
    501 
    502 void ReliableQuicStream::AddBytesSent(uint64 bytes) {
    503   if (flow_controller_.IsEnabled()) {
    504     flow_controller_.AddBytesSent(bytes);
    505     connection_flow_controller_->AddBytesSent(bytes);
    506   }
    507 }
    508 
    509 void ReliableQuicStream::AddBytesConsumed(uint64 bytes) {
    510   if (flow_controller_.IsEnabled()) {
    511     // Only adjust stream level flow controller if we are still reading.
    512     if (!read_side_closed_) {
    513       flow_controller_.AddBytesConsumed(bytes);
    514     }
    515 
    516     connection_flow_controller_->AddBytesConsumed(bytes);
    517   }
    518 }
    519 
    520 bool ReliableQuicStream::IsFlowControlBlocked() {
    521   return flow_controller_.IsBlocked() ||
    522          connection_flow_controller_->IsBlocked();
    523 }
    524 
    525 }  // namespace net
    526