1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/reliable_quic_stream.h" 6 7 #include "net/quic/quic_session.h" 8 #include "net/quic/quic_spdy_decompressor.h" 9 #include "net/spdy/write_blocked_list.h" 10 11 using base::StringPiece; 12 using std::min; 13 14 namespace net { 15 16 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") 17 18 namespace { 19 20 struct iovec MakeIovec(StringPiece data) { 21 struct iovec iov = {const_cast<char*>(data.data()), 22 static_cast<size_t>(data.size())}; 23 return iov; 24 } 25 26 } // namespace 27 28 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 29 QuicSession* session) 30 : sequencer_(this), 31 id_(id), 32 session_(session), 33 stream_bytes_read_(0), 34 stream_bytes_written_(0), 35 stream_error_(QUIC_STREAM_NO_ERROR), 36 connection_error_(QUIC_NO_ERROR), 37 read_side_closed_(false), 38 write_side_closed_(false), 39 fin_buffered_(false), 40 fin_sent_(false), 41 is_server_(session_->is_server()) { 42 } 43 44 ReliableQuicStream::~ReliableQuicStream() { 45 } 46 47 bool ReliableQuicStream::WillAcceptStreamFrame( 48 const QuicStreamFrame& frame) const { 49 if (read_side_closed_) { 50 return true; 51 } 52 if (frame.stream_id != id_) { 53 LOG(ERROR) << "Error!"; 54 return false; 55 } 56 return sequencer_.WillAcceptStreamFrame(frame); 57 } 58 59 bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { 60 DCHECK_EQ(frame.stream_id, id_); 61 if (read_side_closed_) { 62 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; 63 // We don't want to be reading: blackhole the data. 64 return true; 65 } 66 // Note: This count include duplicate data received. 67 stream_bytes_read_ += frame.data.TotalBufferSize(); 68 69 bool accepted = sequencer_.OnStreamFrame(frame); 70 71 return accepted; 72 } 73 74 void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) { 75 stream_error_ = error; 76 CloseWriteSide(); 77 CloseReadSide(); 78 } 79 80 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, 81 bool from_peer) { 82 if (read_side_closed_ && write_side_closed_) { 83 return; 84 } 85 if (error != QUIC_NO_ERROR) { 86 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; 87 connection_error_ = error; 88 } 89 90 CloseWriteSide(); 91 CloseReadSide(); 92 } 93 94 void ReliableQuicStream::OnFinRead() { 95 DCHECK(sequencer_.IsClosed()); 96 CloseReadSide(); 97 } 98 99 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { 100 DCHECK_NE(QUIC_STREAM_NO_ERROR, error); 101 stream_error_ = error; 102 // Sending a RstStream results in calling CloseStream. 103 session()->SendRstStream(id(), error); 104 } 105 106 void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 107 session()->connection()->SendConnectionClose(error); 108 } 109 110 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 111 const string& details) { 112 session()->connection()->SendConnectionCloseWithDetails(error, details); 113 } 114 115 QuicVersion ReliableQuicStream::version() { 116 return session()->connection()->version(); 117 } 118 119 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { 120 DCHECK(data.size() > 0 || fin); 121 DCHECK(!fin_buffered_); 122 123 QuicConsumedData consumed_data(0, false); 124 fin_buffered_ = fin; 125 126 if (queued_data_.empty()) { 127 struct iovec iov(MakeIovec(data)); 128 consumed_data = WritevData(&iov, 1, fin, NULL); 129 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 130 } 131 132 // If there's unconsumed data or an unconsumed fin, queue it. 133 if (consumed_data.bytes_consumed < data.length() || 134 (fin && !consumed_data.fin_consumed)) { 135 queued_data_.push_back( 136 string(data.data() + consumed_data.bytes_consumed, 137 data.length() - consumed_data.bytes_consumed)); 138 } 139 } 140 141 void ReliableQuicStream::OnCanWrite() { 142 bool fin = false; 143 while (!queued_data_.empty()) { 144 const string& data = queued_data_.front(); 145 if (queued_data_.size() == 1 && fin_buffered_) { 146 fin = true; 147 } 148 struct iovec iov(MakeIovec(data)); 149 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL); 150 if (consumed_data.bytes_consumed == data.size() && 151 fin == consumed_data.fin_consumed) { 152 queued_data_.pop_front(); 153 } else { 154 queued_data_.front().erase(0, consumed_data.bytes_consumed); 155 break; 156 } 157 } 158 } 159 160 QuicConsumedData ReliableQuicStream::WritevData( 161 const struct iovec* iov, 162 int iov_count, 163 bool fin, 164 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 165 if (write_side_closed_) { 166 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 167 return QuicConsumedData(0, false); 168 } 169 170 size_t write_length = 0u; 171 for (int i = 0; i < iov_count; ++i) { 172 write_length += iov[i].iov_len; 173 } 174 QuicConsumedData consumed_data = session()->WritevData( 175 id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate); 176 stream_bytes_written_ += consumed_data.bytes_consumed; 177 if (consumed_data.bytes_consumed == write_length) { 178 if (fin && consumed_data.fin_consumed) { 179 fin_sent_ = true; 180 CloseWriteSide(); 181 } else if (fin && !consumed_data.fin_consumed) { 182 session_->MarkWriteBlocked(id(), EffectivePriority()); 183 } 184 } else { 185 session_->MarkWriteBlocked(id(), EffectivePriority()); 186 } 187 return consumed_data; 188 } 189 190 void ReliableQuicStream::CloseReadSide() { 191 if (read_side_closed_) { 192 return; 193 } 194 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); 195 196 read_side_closed_ = true; 197 if (write_side_closed_) { 198 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 199 session_->CloseStream(id()); 200 } 201 } 202 203 void ReliableQuicStream::CloseWriteSide() { 204 if (write_side_closed_) { 205 return; 206 } 207 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); 208 209 write_side_closed_ = true; 210 if (read_side_closed_) { 211 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 212 session_->CloseStream(id()); 213 } 214 } 215 216 bool ReliableQuicStream::HasBufferedData() { 217 return !queued_data_.empty(); 218 } 219 220 void ReliableQuicStream::OnClose() { 221 CloseReadSide(); 222 CloseWriteSide(); 223 } 224 225 } // namespace net 226