Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/quic_session.h"
      6 
      7 #include "base/stl_util.h"
      8 #include "net/quic/crypto/proof_verifier.h"
      9 #include "net/quic/quic_connection.h"
     10 #include "net/quic/quic_flags.h"
     11 #include "net/quic/quic_flow_controller.h"
     12 #include "net/quic/quic_headers_stream.h"
     13 #include "net/ssl/ssl_info.h"
     14 
     15 using base::StringPiece;
     16 using base::hash_map;
     17 using base::hash_set;
     18 using std::make_pair;
     19 using std::vector;
     20 
     21 namespace net {
     22 
     23 #define ENDPOINT (is_server() ? "Server: " : " Client: ")
     24 
     25 // We want to make sure we delete any closed streams in a safe manner.
     26 // To avoid deleting a stream in mid-operation, we have a simple shim between
     27 // us and the stream, so we can delete any streams when we return from
     28 // processing.
     29 //
     30 // We could just override the base methods, but this makes it easier to make
     31 // sure we don't miss any.
     32 class VisitorShim : public QuicConnectionVisitorInterface {
     33  public:
     34   explicit VisitorShim(QuicSession* session) : session_(session) {}
     35 
     36   virtual void OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
     37     session_->OnStreamFrames(frames);
     38     session_->PostProcessAfterData();
     39   }
     40   virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
     41     session_->OnRstStream(frame);
     42     session_->PostProcessAfterData();
     43   }
     44 
     45   virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
     46     session_->OnGoAway(frame);
     47     session_->PostProcessAfterData();
     48   }
     49 
     50   virtual void OnWindowUpdateFrames(const vector<QuicWindowUpdateFrame>& frames)
     51       OVERRIDE {
     52     session_->OnWindowUpdateFrames(frames);
     53     session_->PostProcessAfterData();
     54   }
     55 
     56   virtual void OnBlockedFrames(const vector<QuicBlockedFrame>& frames)
     57       OVERRIDE {
     58     session_->OnBlockedFrames(frames);
     59     session_->PostProcessAfterData();
     60   }
     61 
     62   virtual void OnCanWrite() OVERRIDE {
     63     session_->OnCanWrite();
     64     session_->PostProcessAfterData();
     65   }
     66 
     67   virtual void OnCongestionWindowChange(QuicTime now) OVERRIDE {
     68     session_->OnCongestionWindowChange(now);
     69   }
     70 
     71   virtual void OnSuccessfulVersionNegotiation(
     72       const QuicVersion& version) OVERRIDE {
     73     session_->OnSuccessfulVersionNegotiation(version);
     74   }
     75 
     76   virtual void OnConnectionClosed(
     77       QuicErrorCode error, bool from_peer) OVERRIDE {
     78     session_->OnConnectionClosed(error, from_peer);
     79     // The session will go away, so don't bother with cleanup.
     80   }
     81 
     82   virtual void OnWriteBlocked() OVERRIDE {
     83     session_->OnWriteBlocked();
     84   }
     85 
     86   virtual bool WillingAndAbleToWrite() const OVERRIDE {
     87     return session_->WillingAndAbleToWrite();
     88   }
     89 
     90   virtual bool HasPendingHandshake() const OVERRIDE {
     91     return session_->HasPendingHandshake();
     92   }
     93 
     94   virtual bool HasOpenDataStreams() const OVERRIDE {
     95     return session_->HasOpenDataStreams();
     96   }
     97 
     98  private:
     99   QuicSession* session_;
    100 };
    101 
    102 QuicSession::QuicSession(QuicConnection* connection, const QuicConfig& config)
    103     : connection_(connection),
    104       visitor_shim_(new VisitorShim(this)),
    105       config_(config),
    106       max_open_streams_(config_.max_streams_per_connection()),
    107       next_stream_id_(is_server() ? 2 : 5),
    108       largest_peer_created_stream_id_(0),
    109       error_(QUIC_NO_ERROR),
    110       goaway_received_(false),
    111       goaway_sent_(false),
    112       has_pending_handshake_(false) {
    113   if (connection_->version() <= QUIC_VERSION_19) {
    114     flow_controller_.reset(new QuicFlowController(
    115         connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow,
    116         config_.GetInitialFlowControlWindowToSend(),
    117         config_.GetInitialFlowControlWindowToSend()));
    118   } else {
    119     flow_controller_.reset(new QuicFlowController(
    120         connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow,
    121         config_.GetInitialSessionFlowControlWindowToSend(),
    122         config_.GetInitialSessionFlowControlWindowToSend()));
    123   }
    124 }
    125 
    126 void QuicSession::InitializeSession() {
    127   connection_->set_visitor(visitor_shim_.get());
    128   connection_->SetFromConfig(config_);
    129   if (connection_->connected()) {
    130     connection_->SetOverallConnectionTimeout(
    131         config_.max_time_before_crypto_handshake());
    132   }
    133   headers_stream_.reset(new QuicHeadersStream(this));
    134 }
    135 
    136 QuicSession::~QuicSession() {
    137   STLDeleteElements(&closed_streams_);
    138   STLDeleteValues(&stream_map_);
    139 
    140   DLOG_IF(WARNING,
    141           locally_closed_streams_highest_offset_.size() > max_open_streams_)
    142       << "Surprisingly high number of locally closed streams still waiting for "
    143          "final byte offset: " << locally_closed_streams_highest_offset_.size();
    144 }
    145 
    146 void QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
    147   for (size_t i = 0; i < frames.size(); ++i) {
    148     // TODO(rch) deal with the error case of stream id 0.
    149     const QuicStreamFrame& frame = frames[i];
    150     QuicStreamId stream_id = frame.stream_id;
    151     ReliableQuicStream* stream = GetStream(stream_id);
    152     if (!stream) {
    153       // The stream no longer exists, but we may still be interested in the
    154       // final stream byte offset sent by the peer. A frame with a FIN can give
    155       // us this offset.
    156       if (frame.fin) {
    157         QuicStreamOffset final_byte_offset =
    158             frame.offset + frame.data.TotalBufferSize();
    159         UpdateFlowControlOnFinalReceivedByteOffset(stream_id,
    160                                                    final_byte_offset);
    161       }
    162 
    163       continue;
    164     }
    165     stream->OnStreamFrame(frames[i]);
    166   }
    167 }
    168 
    169 void QuicSession::OnStreamHeaders(QuicStreamId stream_id,
    170                                   StringPiece headers_data) {
    171   QuicDataStream* stream = GetDataStream(stream_id);
    172   if (!stream) {
    173     // It's quite possible to receive headers after a stream has been reset.
    174     return;
    175   }
    176   stream->OnStreamHeaders(headers_data);
    177 }
    178 
    179 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id,
    180                                           QuicPriority priority) {
    181   QuicDataStream* stream = GetDataStream(stream_id);
    182   if (!stream) {
    183     // It's quite possible to receive headers after a stream has been reset.
    184     return;
    185   }
    186   stream->OnStreamHeadersPriority(priority);
    187 }
    188 
    189 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id,
    190                                           bool fin,
    191                                           size_t frame_len) {
    192   QuicDataStream* stream = GetDataStream(stream_id);
    193   if (!stream) {
    194     // It's quite possible to receive headers after a stream has been reset.
    195     return;
    196   }
    197   stream->OnStreamHeadersComplete(fin, frame_len);
    198 }
    199 
    200 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
    201   if (frame.stream_id == kCryptoStreamId) {
    202     connection()->SendConnectionCloseWithDetails(
    203         QUIC_INVALID_STREAM_ID,
    204         "Attempt to reset the crypto stream");
    205     return;
    206   }
    207   if (frame.stream_id == kHeadersStreamId) {
    208     connection()->SendConnectionCloseWithDetails(
    209         QUIC_INVALID_STREAM_ID,
    210         "Attempt to reset the headers stream");
    211     return;
    212   }
    213 
    214   QuicDataStream* stream = GetDataStream(frame.stream_id);
    215   if (!stream) {
    216     // The RST frame contains the final byte offset for the stream: we can now
    217     // update the connection level flow controller if needed.
    218     UpdateFlowControlOnFinalReceivedByteOffset(frame.stream_id,
    219                                                frame.byte_offset);
    220     return;  // Errors are handled by GetStream.
    221   }
    222 
    223   stream->OnStreamReset(frame);
    224 }
    225 
    226 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
    227   DCHECK(frame.last_good_stream_id < next_stream_id_);
    228   goaway_received_ = true;
    229 }
    230 
    231 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
    232   DCHECK(!connection_->connected());
    233   if (error_ == QUIC_NO_ERROR) {
    234     error_ = error;
    235   }
    236 
    237   while (!stream_map_.empty()) {
    238     DataStreamMap::iterator it = stream_map_.begin();
    239     QuicStreamId id = it->first;
    240     it->second->OnConnectionClosed(error, from_peer);
    241     // The stream should call CloseStream as part of OnConnectionClosed.
    242     if (stream_map_.find(id) != stream_map_.end()) {
    243       LOG(DFATAL) << ENDPOINT
    244                   << "Stream failed to close under OnConnectionClosed";
    245       CloseStream(id);
    246     }
    247   }
    248 }
    249 
    250 void QuicSession::OnWindowUpdateFrames(
    251     const vector<QuicWindowUpdateFrame>& frames) {
    252   bool connection_window_updated = false;
    253   for (size_t i = 0; i < frames.size(); ++i) {
    254     // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
    255     // assume that it still exists.
    256     QuicStreamId stream_id = frames[i].stream_id;
    257     if (stream_id == kConnectionLevelId) {
    258       // This is a window update that applies to the connection, rather than an
    259       // individual stream.
    260       DVLOG(1) << ENDPOINT
    261                << "Received connection level flow control window update with "
    262                   "byte offset: " << frames[i].byte_offset;
    263       if (flow_controller_->UpdateSendWindowOffset(frames[i].byte_offset)) {
    264         connection_window_updated = true;
    265       }
    266       continue;
    267     }
    268 
    269     if (connection_->version() < QUIC_VERSION_21 &&
    270         (stream_id == kCryptoStreamId || stream_id == kHeadersStreamId)) {
    271       DLOG(DFATAL) << "WindowUpdate for stream " << stream_id << " in version "
    272                    << QuicVersionToString(connection_->version());
    273       return;
    274     }
    275 
    276     ReliableQuicStream* stream = GetStream(stream_id);
    277     if (stream) {
    278       stream->OnWindowUpdateFrame(frames[i]);
    279     }
    280   }
    281 
    282   // Connection level flow control window has increased, so blocked streams can
    283   // write again.
    284   if (connection_window_updated) {
    285     OnCanWrite();
    286   }
    287 }
    288 
    289 void QuicSession::OnBlockedFrames(const vector<QuicBlockedFrame>& frames) {
    290   for (size_t i = 0; i < frames.size(); ++i) {
    291     // TODO(rjshade): Compare our flow control receive windows for specified
    292     //                streams: if we have a large window then maybe something
    293     //                had gone wrong with the flow control accounting.
    294     DVLOG(1) << ENDPOINT << "Received BLOCKED frame with stream id: "
    295              << frames[i].stream_id;
    296   }
    297 }
    298 
    299 void QuicSession::OnCanWrite() {
    300   // We limit the number of writes to the number of pending streams. If more
    301   // streams become pending, WillingAndAbleToWrite will be true, which will
    302   // cause the connection to request resumption before yielding to other
    303   // connections.
    304   size_t num_writes = write_blocked_streams_.NumBlockedStreams();
    305   if (flow_controller_->IsBlocked()) {
    306     // If we are connection level flow control blocked, then only allow the
    307     // crypto and headers streams to try writing as all other streams will be
    308     // blocked.
    309     num_writes = 0;
    310     if (write_blocked_streams_.crypto_stream_blocked()) {
    311       num_writes += 1;
    312     }
    313     if (write_blocked_streams_.headers_stream_blocked()) {
    314       num_writes += 1;
    315     }
    316   }
    317   if (num_writes == 0) {
    318     return;
    319   }
    320 
    321   QuicConnection::ScopedPacketBundler ack_bundler(
    322       connection_.get(), QuicConnection::NO_ACK);
    323   for (size_t i = 0; i < num_writes; ++i) {
    324     if (!(write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
    325           write_blocked_streams_.HasWriteBlockedDataStreams())) {
    326       // Writing one stream removed another!? Something's broken.
    327       LOG(DFATAL) << "WriteBlockedStream is missing";
    328       connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
    329       return;
    330     }
    331     if (!connection_->CanWriteStreamData()) {
    332       return;
    333     }
    334     QuicStreamId stream_id = write_blocked_streams_.PopFront();
    335     if (stream_id == kCryptoStreamId) {
    336       has_pending_handshake_ = false;  // We just popped it.
    337     }
    338     ReliableQuicStream* stream = GetStream(stream_id);
    339     if (stream != NULL && !stream->flow_controller()->IsBlocked()) {
    340       // If the stream can't write all bytes, it'll re-add itself to the blocked
    341       // list.
    342       stream->OnCanWrite();
    343     }
    344   }
    345 }
    346 
    347 bool QuicSession::WillingAndAbleToWrite() const {
    348   // If the crypto or headers streams are blocked, we want to schedule a write -
    349   // they don't get blocked by connection level flow control. Otherwise only
    350   // schedule a write if we are not flow control blocked at the connection
    351   // level.
    352   return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
    353          (!flow_controller_->IsBlocked() &&
    354           write_blocked_streams_.HasWriteBlockedDataStreams());
    355 }
    356 
    357 bool QuicSession::HasPendingHandshake() const {
    358   return has_pending_handshake_;
    359 }
    360 
    361 bool QuicSession::HasOpenDataStreams() const {
    362   return GetNumOpenStreams() > 0;
    363 }
    364 
    365 QuicConsumedData QuicSession::WritevData(
    366     QuicStreamId id,
    367     const IOVector& data,
    368     QuicStreamOffset offset,
    369     bool fin,
    370     FecProtection fec_protection,
    371     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
    372   return connection_->SendStreamData(id, data, offset, fin, fec_protection,
    373                                      ack_notifier_delegate);
    374 }
    375 
    376 size_t QuicSession::WriteHeaders(
    377     QuicStreamId id,
    378     const SpdyHeaderBlock& headers,
    379     bool fin,
    380     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
    381   return headers_stream_->WriteHeaders(id, headers, fin, ack_notifier_delegate);
    382 }
    383 
    384 void QuicSession::SendRstStream(QuicStreamId id,
    385                                 QuicRstStreamErrorCode error,
    386                                 QuicStreamOffset bytes_written) {
    387   if (connection()->connected()) {
    388     // Only send a RST_STREAM frame if still connected.
    389     connection_->SendRstStream(id, error, bytes_written);
    390   }
    391   CloseStreamInner(id, true);
    392 }
    393 
    394 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
    395   if (goaway_sent_) {
    396     return;
    397   }
    398   goaway_sent_ = true;
    399   connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
    400 }
    401 
    402 void QuicSession::CloseStream(QuicStreamId stream_id) {
    403   CloseStreamInner(stream_id, false);
    404 }
    405 
    406 void QuicSession::CloseStreamInner(QuicStreamId stream_id,
    407                                    bool locally_reset) {
    408   DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
    409 
    410   DataStreamMap::iterator it = stream_map_.find(stream_id);
    411   if (it == stream_map_.end()) {
    412     DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
    413     return;
    414   }
    415   QuicDataStream* stream = it->second;
    416 
    417   // Tell the stream that a RST has been sent.
    418   if (locally_reset) {
    419     stream->set_rst_sent(true);
    420   }
    421 
    422   closed_streams_.push_back(it->second);
    423 
    424   // If we haven't received a FIN or RST for this stream, we need to keep track
    425   // of the how many bytes the stream's flow controller believes it has
    426   // received, for accurate connection level flow control accounting.
    427   if (!stream->HasFinalReceivedByteOffset() &&
    428       stream->flow_controller()->IsEnabled()) {
    429     locally_closed_streams_highest_offset_[stream_id] =
    430         stream->flow_controller()->highest_received_byte_offset();
    431   }
    432 
    433   stream_map_.erase(it);
    434   stream->OnClose();
    435 }
    436 
    437 void QuicSession::UpdateFlowControlOnFinalReceivedByteOffset(
    438     QuicStreamId stream_id, QuicStreamOffset final_byte_offset) {
    439   map<QuicStreamId, QuicStreamOffset>::iterator it =
    440       locally_closed_streams_highest_offset_.find(stream_id);
    441   if (it == locally_closed_streams_highest_offset_.end()) {
    442     return;
    443   }
    444 
    445   DVLOG(1) << ENDPOINT << "Received final byte offset " << final_byte_offset
    446            << " for stream " << stream_id;
    447   uint64 offset_diff = final_byte_offset - it->second;
    448   if (flow_controller_->UpdateHighestReceivedOffset(
    449       flow_controller_->highest_received_byte_offset() + offset_diff)) {
    450     // If the final offset violates flow control, close the connection now.
    451     if (flow_controller_->FlowControlViolation()) {
    452       connection_->SendConnectionClose(
    453           QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA);
    454       return;
    455     }
    456   }
    457 
    458   flow_controller_->AddBytesConsumed(offset_diff);
    459   locally_closed_streams_highest_offset_.erase(it);
    460 }
    461 
    462 bool QuicSession::IsEncryptionEstablished() {
    463   return GetCryptoStream()->encryption_established();
    464 }
    465 
    466 bool QuicSession::IsCryptoHandshakeConfirmed() {
    467   return GetCryptoStream()->handshake_confirmed();
    468 }
    469 
    470 void QuicSession::OnConfigNegotiated() {
    471   connection_->SetFromConfig(config_);
    472   QuicVersion version = connection()->version();
    473 
    474   // A server should accept a small number of additional streams beyond the
    475   // limit sent to the client. This helps avoid early connection termination
    476   // when FIN/RSTs for old streams are lost or arrive out of order.
    477   if (FLAGS_quic_allow_more_open_streams) {
    478     set_max_open_streams((is_server() ? kMaxStreamsMultiplier : 1.0) *
    479                          config_.max_streams_per_connection());
    480   }
    481 
    482   if (version <= QUIC_VERSION_16) {
    483     return;
    484   }
    485 
    486   if (version <= QUIC_VERSION_19) {
    487     // QUIC_VERSION_17,18,19 don't support independent stream/session flow
    488     // control windows.
    489     if (config_.HasReceivedInitialFlowControlWindowBytes()) {
    490       // Streams which were created before the SHLO was received (0-RTT
    491       // requests) are now informed of the peer's initial flow control window.
    492       uint32 new_window = config_.ReceivedInitialFlowControlWindowBytes();
    493       OnNewStreamFlowControlWindow(new_window);
    494       OnNewSessionFlowControlWindow(new_window);
    495     }
    496 
    497     return;
    498   }
    499 
    500   // QUIC_VERSION_21 and higher can have independent stream and session flow
    501   // control windows.
    502   if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
    503     // Streams which were created before the SHLO was received (0-RTT
    504     // requests) are now informed of the peer's initial flow control window.
    505     OnNewStreamFlowControlWindow(
    506         config_.ReceivedInitialStreamFlowControlWindowBytes());
    507   }
    508   if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
    509     OnNewSessionFlowControlWindow(
    510         config_.ReceivedInitialSessionFlowControlWindowBytes());
    511   }
    512 }
    513 
    514 void QuicSession::OnNewStreamFlowControlWindow(uint32 new_window) {
    515   if (new_window < kDefaultFlowControlSendWindow) {
    516     LOG(ERROR)
    517         << "Peer sent us an invalid stream flow control send window: "
    518         << new_window << ", below default: " << kDefaultFlowControlSendWindow;
    519     if (connection_->connected()) {
    520       connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW);
    521     }
    522     return;
    523   }
    524 
    525   // Inform all existing streams about the new window.
    526   if (connection_->version() >= QUIC_VERSION_21) {
    527     GetCryptoStream()->UpdateSendWindowOffset(new_window);
    528     headers_stream_->UpdateSendWindowOffset(new_window);
    529   }
    530   for (DataStreamMap::iterator it = stream_map_.begin();
    531        it != stream_map_.end(); ++it) {
    532     it->second->UpdateSendWindowOffset(new_window);
    533   }
    534 }
    535 
    536 void QuicSession::OnNewSessionFlowControlWindow(uint32 new_window) {
    537   if (new_window < kDefaultFlowControlSendWindow) {
    538     LOG(ERROR)
    539         << "Peer sent us an invalid session flow control send window: "
    540         << new_window << ", below default: " << kDefaultFlowControlSendWindow;
    541     if (connection_->connected()) {
    542       connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW);
    543     }
    544     return;
    545   }
    546 
    547   flow_controller_->UpdateSendWindowOffset(new_window);
    548 }
    549 
    550 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
    551   switch (event) {
    552     // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
    553     // to QuicSession since it is the glue.
    554     case ENCRYPTION_FIRST_ESTABLISHED:
    555       break;
    556 
    557     case ENCRYPTION_REESTABLISHED:
    558       // Retransmit originally packets that were sent, since they can't be
    559       // decrypted by the peer.
    560       connection_->RetransmitUnackedPackets(ALL_INITIAL_RETRANSMISSION);
    561       break;
    562 
    563     case HANDSHAKE_CONFIRMED:
    564       LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
    565           << "Handshake confirmed without parameter negotiation.";
    566       // Discard originally encrypted packets, since they can't be decrypted by
    567       // the peer.
    568       connection_->NeuterUnencryptedPackets();
    569       connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
    570       if (!FLAGS_quic_allow_more_open_streams) {
    571         max_open_streams_ = config_.max_streams_per_connection();
    572       }
    573       break;
    574 
    575     default:
    576       LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
    577   }
    578 }
    579 
    580 void QuicSession::OnCryptoHandshakeMessageSent(
    581     const CryptoHandshakeMessage& message) {
    582 }
    583 
    584 void QuicSession::OnCryptoHandshakeMessageReceived(
    585     const CryptoHandshakeMessage& message) {
    586 }
    587 
    588 QuicConfig* QuicSession::config() {
    589   return &config_;
    590 }
    591 
    592 void QuicSession::ActivateStream(QuicDataStream* stream) {
    593   DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
    594            << ". activating " << stream->id();
    595   DCHECK_EQ(stream_map_.count(stream->id()), 0u);
    596   stream_map_[stream->id()] = stream;
    597 }
    598 
    599 QuicStreamId QuicSession::GetNextStreamId() {
    600   QuicStreamId id = next_stream_id_;
    601   next_stream_id_ += 2;
    602   return id;
    603 }
    604 
    605 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
    606   if (stream_id == kCryptoStreamId) {
    607     return GetCryptoStream();
    608   }
    609   if (stream_id == kHeadersStreamId) {
    610     return headers_stream_.get();
    611   }
    612   return GetDataStream(stream_id);
    613 }
    614 
    615 QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) {
    616   if (stream_id == kCryptoStreamId) {
    617     DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id";
    618     return NULL;
    619   }
    620   if (stream_id == kHeadersStreamId) {
    621     DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id";
    622     return NULL;
    623   }
    624 
    625   DataStreamMap::iterator it = stream_map_.find(stream_id);
    626   if (it != stream_map_.end()) {
    627     return it->second;
    628   }
    629 
    630   if (IsClosedStream(stream_id)) {
    631     return NULL;
    632   }
    633 
    634   if (stream_id % 2 == next_stream_id_ % 2) {
    635     // We've received a frame for a locally-created stream that is not
    636     // currently active.  This is an error.
    637     if (connection()->connected()) {
    638       connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
    639     }
    640     return NULL;
    641   }
    642 
    643   return GetIncomingDataStream(stream_id);
    644 }
    645 
    646 QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) {
    647   if (IsClosedStream(stream_id)) {
    648     return NULL;
    649   }
    650 
    651   implicitly_created_streams_.erase(stream_id);
    652   if (stream_id > largest_peer_created_stream_id_) {
    653     if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
    654       // We may already have sent a connection close due to multiple reset
    655       // streams in the same packet.
    656       if (connection()->connected()) {
    657         LOG(ERROR) << "Trying to get stream: " << stream_id
    658                    << ", largest peer created stream: "
    659                    << largest_peer_created_stream_id_
    660                    << ", max delta: " << kMaxStreamIdDelta;
    661         connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
    662       }
    663       return NULL;
    664     }
    665     if (largest_peer_created_stream_id_ == 0) {
    666       if (is_server()) {
    667         largest_peer_created_stream_id_= 3;
    668       } else {
    669         largest_peer_created_stream_id_= 1;
    670       }
    671     }
    672     for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
    673          id < stream_id;
    674          id += 2) {
    675       implicitly_created_streams_.insert(id);
    676     }
    677     largest_peer_created_stream_id_ = stream_id;
    678   }
    679   QuicDataStream* stream = CreateIncomingDataStream(stream_id);
    680   if (stream == NULL) {
    681     return NULL;
    682   }
    683   ActivateStream(stream);
    684   return stream;
    685 }
    686 
    687 void QuicSession::set_max_open_streams(size_t max_open_streams) {
    688   DVLOG(1) << "Setting max_open_streams_ to " << max_open_streams;
    689   max_open_streams_ = max_open_streams;
    690 }
    691 
    692 bool QuicSession::IsClosedStream(QuicStreamId id) {
    693   DCHECK_NE(0u, id);
    694   if (id == kCryptoStreamId) {
    695     return false;
    696   }
    697   if (id == kHeadersStreamId) {
    698     return false;
    699   }
    700   if (ContainsKey(stream_map_, id)) {
    701     // Stream is active
    702     return false;
    703   }
    704   if (id % 2 == next_stream_id_ % 2) {
    705     // Locally created streams are strictly in-order.  If the id is in the
    706     // range of created streams and it's not active, it must have been closed.
    707     return id < next_stream_id_;
    708   }
    709   // For peer created streams, we also need to consider implicitly created
    710   // streams.
    711   return id <= largest_peer_created_stream_id_ &&
    712       implicitly_created_streams_.count(id) == 0;
    713 }
    714 
    715 size_t QuicSession::GetNumOpenStreams() const {
    716   return stream_map_.size() + implicitly_created_streams_.size();
    717 }
    718 
    719 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
    720 #ifndef NDEBUG
    721   ReliableQuicStream* stream = GetStream(id);
    722   if (stream != NULL) {
    723     LOG_IF(DFATAL, priority != stream->EffectivePriority())
    724         << ENDPOINT << "Stream " << id
    725         << "Priorities do not match.  Got: " << priority
    726         << " Expected: " << stream->EffectivePriority();
    727   } else {
    728     LOG(DFATAL) << "Marking unknown stream " << id << " blocked.";
    729   }
    730 #endif
    731 
    732   if (id == kCryptoStreamId) {
    733     DCHECK(!has_pending_handshake_);
    734     has_pending_handshake_ = true;
    735     // TODO(jar): Be sure to use the highest priority for the crypto stream,
    736     // perhaps by adding a "special" priority for it that is higher than
    737     // kHighestPriority.
    738     priority = kHighestPriority;
    739   }
    740   write_blocked_streams_.PushBack(id, priority);
    741 }
    742 
    743 bool QuicSession::HasDataToWrite() const {
    744   return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
    745          write_blocked_streams_.HasWriteBlockedDataStreams() ||
    746          connection_->HasQueuedData();
    747 }
    748 
    749 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) const {
    750   NOTIMPLEMENTED();
    751   return false;
    752 }
    753 
    754 void QuicSession::PostProcessAfterData() {
    755   STLDeleteElements(&closed_streams_);
    756   closed_streams_.clear();
    757 
    758   if (FLAGS_close_quic_connection_unfinished_streams_2 &&
    759       connection()->connected() &&
    760       locally_closed_streams_highest_offset_.size() > max_open_streams_) {
    761     // A buggy client may fail to send FIN/RSTs. Don't tolerate this.
    762     connection_->SendConnectionClose(QUIC_TOO_MANY_UNFINISHED_STREAMS);
    763   }
    764 }
    765 
    766 void QuicSession::OnSuccessfulVersionNegotiation(const QuicVersion& version) {
    767   if (version < QUIC_VERSION_19) {
    768     flow_controller_->Disable();
    769   }
    770 
    771   // Disable stream level flow control based on negotiated version. Streams may
    772   // have been created with a different version.
    773   if (version < QUIC_VERSION_21) {
    774     GetCryptoStream()->flow_controller()->Disable();
    775     headers_stream_->flow_controller()->Disable();
    776   }
    777   for (DataStreamMap::iterator it = stream_map_.begin();
    778        it != stream_map_.end(); ++it) {
    779     if (version <= QUIC_VERSION_16) {
    780       it->second->flow_controller()->Disable();
    781     }
    782   }
    783 }
    784 
    785 }  // namespace net
    786