1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/quic_session.h" 6 7 #include "base/stl_util.h" 8 #include "net/quic/crypto/proof_verifier.h" 9 #include "net/quic/quic_connection.h" 10 #include "net/ssl/ssl_info.h" 11 12 using base::StringPiece; 13 using base::hash_map; 14 using base::hash_set; 15 using std::make_pair; 16 using std::vector; 17 18 namespace net { 19 20 const size_t kMaxPrematurelyClosedStreamsTracked = 20; 21 22 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") 23 24 // We want to make sure we delete any closed streams in a safe manner. 25 // To avoid deleting a stream in mid-operation, we have a simple shim between 26 // us and the stream, so we can delete any streams when we return from 27 // processing. 28 // 29 // We could just override the base methods, but this makes it easier to make 30 // sure we don't miss any. 31 class VisitorShim : public QuicConnectionVisitorInterface { 32 public: 33 explicit VisitorShim(QuicSession* session) : session_(session) {} 34 35 virtual bool OnPacket(const IPEndPoint& self_address, 36 const IPEndPoint& peer_address, 37 const QuicPacketHeader& header, 38 const vector<QuicStreamFrame>& frame) OVERRIDE { 39 bool accepted = session_->OnPacket(self_address, peer_address, header, 40 frame); 41 session_->PostProcessAfterData(); 42 return accepted; 43 } 44 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE { 45 session_->OnRstStream(frame); 46 session_->PostProcessAfterData(); 47 } 48 49 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE { 50 session_->OnGoAway(frame); 51 session_->PostProcessAfterData(); 52 } 53 54 virtual void OnAck(const SequenceNumberSet& acked_packets) OVERRIDE { 55 session_->OnAck(acked_packets); 56 session_->PostProcessAfterData(); 57 } 58 59 virtual bool OnCanWrite() OVERRIDE { 60 bool rc = session_->OnCanWrite(); 61 session_->PostProcessAfterData(); 62 return rc; 63 } 64 65 virtual void ConnectionClose(QuicErrorCode error, bool from_peer) OVERRIDE { 66 session_->ConnectionClose(error, from_peer); 67 // The session will go away, so don't bother with cleanup. 68 } 69 70 private: 71 QuicSession* session_; 72 }; 73 74 QuicSession::QuicSession(QuicConnection* connection, 75 const QuicConfig& config, 76 bool is_server) 77 : connection_(connection), 78 visitor_shim_(new VisitorShim(this)), 79 config_(config), 80 max_open_streams_(config_.max_streams_per_connection()), 81 next_stream_id_(is_server ? 2 : 3), 82 is_server_(is_server), 83 largest_peer_created_stream_id_(0), 84 error_(QUIC_NO_ERROR), 85 goaway_received_(false), 86 goaway_sent_(false) { 87 88 connection_->set_visitor(visitor_shim_.get()); 89 connection_->SetIdleNetworkTimeout(config_.idle_connection_state_lifetime()); 90 if (connection_->connected()) { 91 connection_->SetOverallConnectionTimeout( 92 config_.max_time_before_crypto_handshake()); 93 } 94 // TODO(satyamshekhar): Set congestion control and ICSL also. 95 } 96 97 QuicSession::~QuicSession() { 98 STLDeleteElements(&closed_streams_); 99 STLDeleteValues(&stream_map_); 100 } 101 102 bool QuicSession::OnPacket(const IPEndPoint& self_address, 103 const IPEndPoint& peer_address, 104 const QuicPacketHeader& header, 105 const vector<QuicStreamFrame>& frames) { 106 if (header.public_header.guid != connection()->guid()) { 107 DLOG(INFO) << ENDPOINT << "Got packet header for invalid GUID: " 108 << header.public_header.guid; 109 return false; 110 } 111 112 for (size_t i = 0; i < frames.size(); ++i) { 113 // TODO(rch) deal with the error case of stream id 0 114 if (IsClosedStream(frames[i].stream_id)) { 115 // If we get additional frames for a stream where we didn't process 116 // headers, it's highly likely our compression context will end up 117 // permanently out of sync with the peer's, so we give up and close the 118 // connection. 119 if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) { 120 connection()->SendConnectionClose( 121 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 122 return false; 123 } 124 continue; 125 } 126 127 ReliableQuicStream* stream = GetStream(frames[i].stream_id); 128 if (stream == NULL) return false; 129 if (!stream->WillAcceptStreamFrame(frames[i])) return false; 130 131 // TODO(alyssar) check against existing connection address: if changed, make 132 // sure we update the connection. 133 } 134 135 for (size_t i = 0; i < frames.size(); ++i) { 136 ReliableQuicStream* stream = GetStream(frames[i].stream_id); 137 if (stream) { 138 stream->OnStreamFrame(frames[i]); 139 } 140 } 141 142 while (!decompression_blocked_streams_.empty()) { 143 QuicHeaderId header_id = decompression_blocked_streams_.begin()->first; 144 if (header_id != decompressor_.current_header_id()) { 145 break; 146 } 147 QuicStreamId stream_id = decompression_blocked_streams_.begin()->second; 148 decompression_blocked_streams_.erase(header_id); 149 ReliableQuicStream* stream = GetStream(stream_id); 150 if (!stream) { 151 connection()->SendConnectionClose( 152 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 153 return false; 154 } 155 stream->OnDecompressorAvailable(); 156 } 157 return true; 158 } 159 160 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) { 161 ReliableQuicStream* stream = GetStream(frame.stream_id); 162 if (!stream) { 163 return; // Errors are handled by GetStream. 164 } 165 stream->OnStreamReset(frame.error_code); 166 } 167 168 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) { 169 DCHECK(frame.last_good_stream_id < next_stream_id_); 170 goaway_received_ = true; 171 } 172 173 void QuicSession::ConnectionClose(QuicErrorCode error, bool from_peer) { 174 if (error_ == QUIC_NO_ERROR) { 175 error_ = error; 176 } 177 178 while (stream_map_.size() != 0) { 179 ReliableStreamMap::iterator it = stream_map_.begin(); 180 QuicStreamId id = it->first; 181 it->second->ConnectionClose(error, from_peer); 182 // The stream should call CloseStream as part of ConnectionClose. 183 if (stream_map_.find(id) != stream_map_.end()) { 184 LOG(DFATAL) << ENDPOINT << "Stream failed to close under ConnectionClose"; 185 CloseStream(id); 186 } 187 } 188 } 189 190 bool QuicSession::OnCanWrite() { 191 // We latch this here rather than doing a traditional loop, because streams 192 // may be modifying the list as we loop. 193 int remaining_writes = write_blocked_streams_.NumObjects(); 194 195 while (!connection_->HasQueuedData() && 196 remaining_writes > 0) { 197 DCHECK(!write_blocked_streams_.IsEmpty()); 198 ReliableQuicStream* stream = 199 GetStream(write_blocked_streams_.GetNextBlockedObject()); 200 if (stream != NULL) { 201 // If the stream can't write all bytes, it'll re-add itself to the blocked 202 // list. 203 stream->OnCanWrite(); 204 } 205 --remaining_writes; 206 } 207 208 return write_blocked_streams_.IsEmpty(); 209 } 210 211 QuicConsumedData QuicSession::WriteData(QuicStreamId id, 212 StringPiece data, 213 QuicStreamOffset offset, 214 bool fin) { 215 return connection_->SendStreamData(id, data, offset, fin); 216 } 217 218 void QuicSession::SendRstStream(QuicStreamId id, 219 QuicRstStreamErrorCode error) { 220 connection_->SendRstStream(id, error); 221 CloseStream(id); 222 } 223 224 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) { 225 goaway_sent_ = true; 226 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason); 227 } 228 229 void QuicSession::CloseStream(QuicStreamId stream_id) { 230 DLOG(INFO) << ENDPOINT << "Closing stream " << stream_id; 231 232 ReliableStreamMap::iterator it = stream_map_.find(stream_id); 233 if (it == stream_map_.end()) { 234 DLOG(INFO) << ENDPOINT << "Stream is already closed: " << stream_id; 235 return; 236 } 237 ReliableQuicStream* stream = it->second; 238 if (!stream->headers_decompressed()) { 239 if (prematurely_closed_streams_.size() == 240 kMaxPrematurelyClosedStreamsTracked) { 241 prematurely_closed_streams_.erase(prematurely_closed_streams_.begin()); 242 } 243 prematurely_closed_streams_.insert(make_pair(stream->id(), true)); 244 } 245 closed_streams_.push_back(it->second); 246 stream_map_.erase(it); 247 stream->OnClose(); 248 } 249 250 bool QuicSession::IsEncryptionEstablished() { 251 return GetCryptoStream()->encryption_established(); 252 } 253 254 bool QuicSession::IsCryptoHandshakeConfirmed() { 255 return GetCryptoStream()->handshake_confirmed(); 256 } 257 258 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { 259 switch (event) { 260 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter 261 // to QuicSession since it is the glue. 262 case ENCRYPTION_FIRST_ESTABLISHED: 263 break; 264 265 case ENCRYPTION_REESTABLISHED: 266 // Retransmit originally packets that were sent, since they can't be 267 // decrypted by the peer. 268 connection_->RetransmitUnackedPackets( 269 QuicConnection::INITIAL_ENCRYPTION_ONLY); 270 break; 271 272 case HANDSHAKE_CONFIRMED: 273 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT 274 << "Handshake confirmed without parameter negotiation."; 275 connection_->SetIdleNetworkTimeout( 276 config_.idle_connection_state_lifetime()); 277 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite()); 278 max_open_streams_ = config_.max_streams_per_connection(); 279 break; 280 281 default: 282 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event; 283 } 284 } 285 286 QuicConfig* QuicSession::config() { 287 return &config_; 288 } 289 290 void QuicSession::ActivateStream(ReliableQuicStream* stream) { 291 DLOG(INFO) << ENDPOINT << "num_streams: " << stream_map_.size() 292 << ". activating " << stream->id(); 293 DCHECK(stream_map_.count(stream->id()) == 0); 294 stream_map_[stream->id()] = stream; 295 } 296 297 QuicStreamId QuicSession::GetNextStreamId() { 298 QuicStreamId id = next_stream_id_; 299 next_stream_id_ += 2; 300 return id; 301 } 302 303 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { 304 if (stream_id == kCryptoStreamId) { 305 return GetCryptoStream(); 306 } 307 308 ReliableStreamMap::iterator it = stream_map_.find(stream_id); 309 if (it != stream_map_.end()) { 310 return it->second; 311 } 312 313 if (IsClosedStream(stream_id)) { 314 return NULL; 315 } 316 317 if (stream_id % 2 == next_stream_id_ % 2) { 318 // We've received a frame for a locally-created stream that is not 319 // currently active. This is an error. 320 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM); 321 return NULL; 322 } 323 324 return GetIncomingReliableStream(stream_id); 325 } 326 327 ReliableQuicStream* QuicSession::GetIncomingReliableStream( 328 QuicStreamId stream_id) { 329 if (IsClosedStream(stream_id)) { 330 return NULL; 331 } 332 333 if (goaway_sent_) { 334 // We've already sent a GoAway 335 SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY); 336 return NULL; 337 } 338 339 implicitly_created_streams_.erase(stream_id); 340 if (stream_id > largest_peer_created_stream_id_) { 341 // TODO(rch) add unit test for this 342 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) { 343 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID); 344 return NULL; 345 } 346 if (largest_peer_created_stream_id_ != 0) { 347 for (QuicStreamId id = largest_peer_created_stream_id_ + 2; 348 id < stream_id; 349 id += 2) { 350 implicitly_created_streams_.insert(id); 351 } 352 } 353 largest_peer_created_stream_id_ = stream_id; 354 } 355 ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id); 356 if (stream == NULL) { 357 return NULL; 358 } 359 ActivateStream(stream); 360 return stream; 361 } 362 363 bool QuicSession::IsClosedStream(QuicStreamId id) { 364 DCHECK_NE(0u, id); 365 if (id == kCryptoStreamId) { 366 return false; 367 } 368 if (stream_map_.count(id) != 0) { 369 // Stream is active 370 return false; 371 } 372 if (id % 2 == next_stream_id_ % 2) { 373 // Locally created streams are strictly in-order. If the id is in the 374 // range of created streams and it's not active, it must have been closed. 375 return id < next_stream_id_; 376 } 377 // For peer created streams, we also need to consider implicitly created 378 // streams. 379 return id <= largest_peer_created_stream_id_ && 380 implicitly_created_streams_.count(id) == 0; 381 } 382 383 size_t QuicSession::GetNumOpenStreams() const { 384 return stream_map_.size() + implicitly_created_streams_.size(); 385 } 386 387 void QuicSession::MarkWriteBlocked(QuicStreamId id) { 388 write_blocked_streams_.AddBlockedObject(id); 389 } 390 391 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id, 392 QuicStreamId stream_id) { 393 decompression_blocked_streams_[header_id] = stream_id; 394 } 395 396 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) { 397 NOTIMPLEMENTED(); 398 return false; 399 } 400 401 void QuicSession::PostProcessAfterData() { 402 STLDeleteElements(&closed_streams_); 403 closed_streams_.clear(); 404 } 405 406 } // namespace net 407