Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/quic_session.h"
      6 
      7 #include "base/stl_util.h"
      8 #include "net/quic/crypto/proof_verifier.h"
      9 #include "net/quic/quic_connection.h"
     10 #include "net/ssl/ssl_info.h"
     11 
     12 using base::StringPiece;
     13 using base::hash_map;
     14 using base::hash_set;
     15 using std::make_pair;
     16 using std::vector;
     17 
     18 namespace net {
     19 
     20 const size_t kMaxPrematurelyClosedStreamsTracked = 20;
     21 
     22 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
     23 
     24 // We want to make sure we delete any closed streams in a safe manner.
     25 // To avoid deleting a stream in mid-operation, we have a simple shim between
     26 // us and the stream, so we can delete any streams when we return from
     27 // processing.
     28 //
     29 // We could just override the base methods, but this makes it easier to make
     30 // sure we don't miss any.
     31 class VisitorShim : public QuicConnectionVisitorInterface {
     32  public:
     33   explicit VisitorShim(QuicSession* session) : session_(session) {}
     34 
     35   virtual bool OnPacket(const IPEndPoint& self_address,
     36                         const IPEndPoint& peer_address,
     37                         const QuicPacketHeader& header,
     38                         const vector<QuicStreamFrame>& frame) OVERRIDE {
     39     bool accepted = session_->OnPacket(self_address, peer_address, header,
     40                                        frame);
     41     session_->PostProcessAfterData();
     42     return accepted;
     43   }
     44   virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
     45     session_->OnRstStream(frame);
     46     session_->PostProcessAfterData();
     47   }
     48 
     49   virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
     50     session_->OnGoAway(frame);
     51     session_->PostProcessAfterData();
     52   }
     53 
     54   virtual void OnAck(const SequenceNumberSet& acked_packets) OVERRIDE {
     55     session_->OnAck(acked_packets);
     56     session_->PostProcessAfterData();
     57   }
     58 
     59   virtual bool OnCanWrite() OVERRIDE {
     60     bool rc = session_->OnCanWrite();
     61     session_->PostProcessAfterData();
     62     return rc;
     63   }
     64 
     65   virtual void ConnectionClose(QuicErrorCode error, bool from_peer) OVERRIDE {
     66     session_->ConnectionClose(error, from_peer);
     67     // The session will go away, so don't bother with cleanup.
     68   }
     69 
     70  private:
     71   QuicSession* session_;
     72 };
     73 
     74 QuicSession::QuicSession(QuicConnection* connection,
     75                          const QuicConfig& config,
     76                          bool is_server)
     77     : connection_(connection),
     78       visitor_shim_(new VisitorShim(this)),
     79       config_(config),
     80       max_open_streams_(config_.max_streams_per_connection()),
     81       next_stream_id_(is_server ? 2 : 3),
     82       is_server_(is_server),
     83       largest_peer_created_stream_id_(0),
     84       error_(QUIC_NO_ERROR),
     85       goaway_received_(false),
     86       goaway_sent_(false) {
     87 
     88   connection_->set_visitor(visitor_shim_.get());
     89   connection_->SetIdleNetworkTimeout(config_.idle_connection_state_lifetime());
     90   if (connection_->connected()) {
     91     connection_->SetOverallConnectionTimeout(
     92         config_.max_time_before_crypto_handshake());
     93   }
     94   // TODO(satyamshekhar): Set congestion control and ICSL also.
     95 }
     96 
     97 QuicSession::~QuicSession() {
     98   STLDeleteElements(&closed_streams_);
     99   STLDeleteValues(&stream_map_);
    100 }
    101 
    102 bool QuicSession::OnPacket(const IPEndPoint& self_address,
    103                            const IPEndPoint& peer_address,
    104                            const QuicPacketHeader& header,
    105                            const vector<QuicStreamFrame>& frames) {
    106   if (header.public_header.guid != connection()->guid()) {
    107     DLOG(INFO) << ENDPOINT << "Got packet header for invalid GUID: "
    108                << header.public_header.guid;
    109     return false;
    110   }
    111 
    112   for (size_t i = 0; i < frames.size(); ++i) {
    113     // TODO(rch) deal with the error case of stream id 0
    114     if (IsClosedStream(frames[i].stream_id)) {
    115       // If we get additional frames for a stream where we didn't process
    116       // headers, it's highly likely our compression context will end up
    117       // permanently out of sync with the peer's, so we give up and close the
    118       // connection.
    119       if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) {
    120         connection()->SendConnectionClose(
    121             QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
    122         return false;
    123       }
    124       continue;
    125     }
    126 
    127     ReliableQuicStream* stream = GetStream(frames[i].stream_id);
    128     if (stream == NULL) return false;
    129     if (!stream->WillAcceptStreamFrame(frames[i])) return false;
    130 
    131     // TODO(alyssar) check against existing connection address: if changed, make
    132     // sure we update the connection.
    133   }
    134 
    135   for (size_t i = 0; i < frames.size(); ++i) {
    136     ReliableQuicStream* stream = GetStream(frames[i].stream_id);
    137     if (stream) {
    138       stream->OnStreamFrame(frames[i]);
    139     }
    140   }
    141 
    142   while (!decompression_blocked_streams_.empty()) {
    143     QuicHeaderId header_id = decompression_blocked_streams_.begin()->first;
    144     if (header_id != decompressor_.current_header_id()) {
    145       break;
    146     }
    147     QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
    148     decompression_blocked_streams_.erase(header_id);
    149     ReliableQuicStream* stream = GetStream(stream_id);
    150     if (!stream) {
    151       connection()->SendConnectionClose(
    152           QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
    153       return false;
    154     }
    155     stream->OnDecompressorAvailable();
    156   }
    157   return true;
    158 }
    159 
    160 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
    161   ReliableQuicStream* stream = GetStream(frame.stream_id);
    162   if (!stream) {
    163     return;  // Errors are handled by GetStream.
    164   }
    165   stream->OnStreamReset(frame.error_code);
    166 }
    167 
    168 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
    169   DCHECK(frame.last_good_stream_id < next_stream_id_);
    170   goaway_received_ = true;
    171 }
    172 
    173 void QuicSession::ConnectionClose(QuicErrorCode error, bool from_peer) {
    174   if (error_ == QUIC_NO_ERROR) {
    175     error_ = error;
    176   }
    177 
    178   while (stream_map_.size() != 0) {
    179     ReliableStreamMap::iterator it = stream_map_.begin();
    180     QuicStreamId id = it->first;
    181     it->second->ConnectionClose(error, from_peer);
    182     // The stream should call CloseStream as part of ConnectionClose.
    183     if (stream_map_.find(id) != stream_map_.end()) {
    184       LOG(DFATAL) << ENDPOINT << "Stream failed to close under ConnectionClose";
    185       CloseStream(id);
    186     }
    187   }
    188 }
    189 
    190 bool QuicSession::OnCanWrite() {
    191   // We latch this here rather than doing a traditional loop, because streams
    192   // may be modifying the list as we loop.
    193   int remaining_writes = write_blocked_streams_.NumObjects();
    194 
    195   while (!connection_->HasQueuedData() &&
    196          remaining_writes > 0) {
    197     DCHECK(!write_blocked_streams_.IsEmpty());
    198     ReliableQuicStream* stream =
    199         GetStream(write_blocked_streams_.GetNextBlockedObject());
    200     if (stream != NULL) {
    201       // If the stream can't write all bytes, it'll re-add itself to the blocked
    202       // list.
    203       stream->OnCanWrite();
    204     }
    205     --remaining_writes;
    206   }
    207 
    208   return write_blocked_streams_.IsEmpty();
    209 }
    210 
    211 QuicConsumedData QuicSession::WriteData(QuicStreamId id,
    212                                         StringPiece data,
    213                                         QuicStreamOffset offset,
    214                                         bool fin) {
    215   return connection_->SendStreamData(id, data, offset, fin);
    216 }
    217 
    218 void QuicSession::SendRstStream(QuicStreamId id,
    219                                 QuicRstStreamErrorCode error) {
    220   connection_->SendRstStream(id, error);
    221   CloseStream(id);
    222 }
    223 
    224 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
    225   goaway_sent_ = true;
    226   connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
    227 }
    228 
    229 void QuicSession::CloseStream(QuicStreamId stream_id) {
    230   DLOG(INFO) << ENDPOINT << "Closing stream " << stream_id;
    231 
    232   ReliableStreamMap::iterator it = stream_map_.find(stream_id);
    233   if (it == stream_map_.end()) {
    234     DLOG(INFO) << ENDPOINT << "Stream is already closed: " << stream_id;
    235     return;
    236   }
    237   ReliableQuicStream* stream = it->second;
    238   if (!stream->headers_decompressed()) {
    239     if (prematurely_closed_streams_.size() ==
    240         kMaxPrematurelyClosedStreamsTracked) {
    241       prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
    242     }
    243     prematurely_closed_streams_.insert(make_pair(stream->id(), true));
    244   }
    245   closed_streams_.push_back(it->second);
    246   stream_map_.erase(it);
    247   stream->OnClose();
    248 }
    249 
    250 bool QuicSession::IsEncryptionEstablished() {
    251   return GetCryptoStream()->encryption_established();
    252 }
    253 
    254 bool QuicSession::IsCryptoHandshakeConfirmed() {
    255   return GetCryptoStream()->handshake_confirmed();
    256 }
    257 
    258 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
    259   switch (event) {
    260     // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
    261     // to QuicSession since it is the glue.
    262     case ENCRYPTION_FIRST_ESTABLISHED:
    263       break;
    264 
    265     case ENCRYPTION_REESTABLISHED:
    266       // Retransmit originally packets that were sent, since they can't be
    267       // decrypted by the peer.
    268       connection_->RetransmitUnackedPackets(
    269           QuicConnection::INITIAL_ENCRYPTION_ONLY);
    270       break;
    271 
    272     case HANDSHAKE_CONFIRMED:
    273       LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
    274           << "Handshake confirmed without parameter negotiation.";
    275       connection_->SetIdleNetworkTimeout(
    276           config_.idle_connection_state_lifetime());
    277       connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
    278       max_open_streams_ = config_.max_streams_per_connection();
    279       break;
    280 
    281     default:
    282       LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
    283   }
    284 }
    285 
    286 QuicConfig* QuicSession::config() {
    287   return &config_;
    288 }
    289 
    290 void QuicSession::ActivateStream(ReliableQuicStream* stream) {
    291   DLOG(INFO) << ENDPOINT << "num_streams: " << stream_map_.size()
    292              << ". activating " << stream->id();
    293   DCHECK(stream_map_.count(stream->id()) == 0);
    294   stream_map_[stream->id()] = stream;
    295 }
    296 
    297 QuicStreamId QuicSession::GetNextStreamId() {
    298   QuicStreamId id = next_stream_id_;
    299   next_stream_id_ += 2;
    300   return id;
    301 }
    302 
    303 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
    304   if (stream_id == kCryptoStreamId) {
    305     return GetCryptoStream();
    306   }
    307 
    308   ReliableStreamMap::iterator it = stream_map_.find(stream_id);
    309   if (it != stream_map_.end()) {
    310     return it->second;
    311   }
    312 
    313   if (IsClosedStream(stream_id)) {
    314     return NULL;
    315   }
    316 
    317   if (stream_id % 2 == next_stream_id_ % 2) {
    318     // We've received a frame for a locally-created stream that is not
    319     // currently active.  This is an error.
    320     connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
    321     return NULL;
    322   }
    323 
    324   return GetIncomingReliableStream(stream_id);
    325 }
    326 
    327 ReliableQuicStream* QuicSession::GetIncomingReliableStream(
    328     QuicStreamId stream_id) {
    329   if (IsClosedStream(stream_id)) {
    330     return NULL;
    331   }
    332 
    333   if (goaway_sent_) {
    334     // We've already sent a GoAway
    335     SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY);
    336     return NULL;
    337   }
    338 
    339   implicitly_created_streams_.erase(stream_id);
    340   if (stream_id > largest_peer_created_stream_id_) {
    341     // TODO(rch) add unit test for this
    342     if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
    343       connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
    344       return NULL;
    345     }
    346     if (largest_peer_created_stream_id_ != 0) {
    347       for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
    348            id < stream_id;
    349            id += 2) {
    350         implicitly_created_streams_.insert(id);
    351       }
    352     }
    353     largest_peer_created_stream_id_ = stream_id;
    354   }
    355   ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id);
    356   if (stream == NULL) {
    357     return NULL;
    358   }
    359   ActivateStream(stream);
    360   return stream;
    361 }
    362 
    363 bool QuicSession::IsClosedStream(QuicStreamId id) {
    364   DCHECK_NE(0u, id);
    365   if (id == kCryptoStreamId) {
    366     return false;
    367   }
    368   if (stream_map_.count(id) != 0) {
    369     // Stream is active
    370     return false;
    371   }
    372   if (id % 2 == next_stream_id_ % 2) {
    373     // Locally created streams are strictly in-order.  If the id is in the
    374     // range of created streams and it's not active, it must have been closed.
    375     return id < next_stream_id_;
    376   }
    377   // For peer created streams, we also need to consider implicitly created
    378   // streams.
    379   return id <= largest_peer_created_stream_id_ &&
    380       implicitly_created_streams_.count(id) == 0;
    381 }
    382 
    383 size_t QuicSession::GetNumOpenStreams() const {
    384   return stream_map_.size() + implicitly_created_streams_.size();
    385 }
    386 
    387 void QuicSession::MarkWriteBlocked(QuicStreamId id) {
    388   write_blocked_streams_.AddBlockedObject(id);
    389 }
    390 
    391 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
    392                                            QuicStreamId stream_id) {
    393   decompression_blocked_streams_[header_id] = stream_id;
    394 }
    395 
    396 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) {
    397   NOTIMPLEMENTED();
    398   return false;
    399 }
    400 
    401 void QuicSession::PostProcessAfterData() {
    402   STLDeleteElements(&closed_streams_);
    403   closed_streams_.clear();
    404 }
    405 
    406 }  // namespace net
    407