1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/quic_session.h" 6 7 #include "base/stl_util.h" 8 #include "net/quic/crypto/proof_verifier.h" 9 #include "net/quic/quic_connection.h" 10 #include "net/ssl/ssl_info.h" 11 12 using base::StringPiece; 13 using base::hash_map; 14 using base::hash_set; 15 using std::make_pair; 16 using std::vector; 17 18 namespace net { 19 20 const size_t kMaxPrematurelyClosedStreamsTracked = 20; 21 const size_t kMaxZombieStreams = 20; 22 23 #define ENDPOINT (is_server() ? "Server: " : " Client: ") 24 25 // We want to make sure we delete any closed streams in a safe manner. 26 // To avoid deleting a stream in mid-operation, we have a simple shim between 27 // us and the stream, so we can delete any streams when we return from 28 // processing. 29 // 30 // We could just override the base methods, but this makes it easier to make 31 // sure we don't miss any. 32 class VisitorShim : public QuicConnectionVisitorInterface { 33 public: 34 explicit VisitorShim(QuicSession* session) : session_(session) {} 35 36 virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE { 37 bool accepted = session_->OnStreamFrames(frames); 38 session_->PostProcessAfterData(); 39 return accepted; 40 } 41 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE { 42 session_->OnRstStream(frame); 43 session_->PostProcessAfterData(); 44 } 45 46 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE { 47 session_->OnGoAway(frame); 48 session_->PostProcessAfterData(); 49 } 50 51 virtual bool OnCanWrite() OVERRIDE { 52 bool rc = session_->OnCanWrite(); 53 session_->PostProcessAfterData(); 54 return rc; 55 } 56 57 virtual void OnSuccessfulVersionNegotiation( 58 const QuicVersion& version) OVERRIDE { 59 session_->OnSuccessfulVersionNegotiation(version); 60 } 61 62 virtual void OnConfigNegotiated() OVERRIDE { 63 session_->OnConfigNegotiated(); 64 } 65 66 virtual void OnConnectionClosed(QuicErrorCode error, 67 bool from_peer) OVERRIDE { 68 session_->OnConnectionClosed(error, from_peer); 69 // The session will go away, so don't bother with cleanup. 70 } 71 72 virtual bool HasPendingHandshake() const OVERRIDE { 73 return session_->HasPendingHandshake(); 74 } 75 76 private: 77 QuicSession* session_; 78 }; 79 80 QuicSession::QuicSession(QuicConnection* connection, 81 const QuicConfig& config) 82 : connection_(connection), 83 visitor_shim_(new VisitorShim(this)), 84 config_(config), 85 max_open_streams_(config_.max_streams_per_connection()), 86 next_stream_id_(is_server() ? 2 : 3), 87 largest_peer_created_stream_id_(0), 88 error_(QUIC_NO_ERROR), 89 goaway_received_(false), 90 goaway_sent_(false), 91 has_pending_handshake_(false) { 92 93 connection_->set_visitor(visitor_shim_.get()); 94 connection_->SetFromConfig(config_); 95 if (connection_->connected()) { 96 connection_->SetOverallConnectionTimeout( 97 config_.max_time_before_crypto_handshake()); 98 } 99 } 100 101 QuicSession::~QuicSession() { 102 STLDeleteElements(&closed_streams_); 103 STLDeleteValues(&stream_map_); 104 } 105 106 bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) { 107 for (size_t i = 0; i < frames.size(); ++i) { 108 // TODO(rch) deal with the error case of stream id 0 109 if (IsClosedStream(frames[i].stream_id)) { 110 // If we get additional frames for a stream where we didn't process 111 // headers, it's highly likely our compression context will end up 112 // permanently out of sync with the peer's, so we give up and close the 113 // connection. 114 if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) { 115 connection()->SendConnectionClose( 116 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 117 return false; 118 } 119 continue; 120 } 121 122 ReliableQuicStream* stream = GetStream(frames[i].stream_id); 123 if (stream == NULL) return false; 124 if (!stream->WillAcceptStreamFrame(frames[i])) return false; 125 126 // TODO(alyssar) check against existing connection address: if changed, make 127 // sure we update the connection. 128 } 129 130 for (size_t i = 0; i < frames.size(); ++i) { 131 QuicStreamId stream_id = frames[i].stream_id; 132 ReliableQuicStream* stream = GetStream(stream_id); 133 if (!stream) { 134 continue; 135 } 136 stream->OnStreamFrame(frames[i]); 137 138 // If the stream is a data stream had been prematurely closed, and the 139 // headers are now decompressed, then we are finally finished 140 // with this stream. 141 if (ContainsKey(zombie_streams_, stream_id) && 142 static_cast<QuicDataStream*>(stream)->headers_decompressed()) { 143 CloseZombieStream(stream_id); 144 } 145 } 146 147 while (!decompression_blocked_streams_.empty()) { 148 QuicHeaderId header_id = decompression_blocked_streams_.begin()->first; 149 if (header_id != decompressor_.current_header_id()) { 150 break; 151 } 152 QuicStreamId stream_id = decompression_blocked_streams_.begin()->second; 153 decompression_blocked_streams_.erase(header_id); 154 QuicDataStream* stream = GetDataStream(stream_id); 155 if (!stream) { 156 connection()->SendConnectionClose( 157 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 158 return false; 159 } 160 stream->OnDecompressorAvailable(); 161 } 162 return true; 163 } 164 165 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) { 166 if (frame.stream_id == kCryptoStreamId) { 167 connection()->SendConnectionCloseWithDetails( 168 QUIC_INVALID_STREAM_ID, 169 "Attempt to reset the crypto stream"); 170 return; 171 } 172 QuicDataStream* stream = GetDataStream(frame.stream_id); 173 if (!stream) { 174 return; // Errors are handled by GetStream. 175 } 176 if (ContainsKey(zombie_streams_, stream->id())) { 177 // If this was a zombie stream then we close it out now. 178 CloseZombieStream(stream->id()); 179 // However, since the headers still have not been decompressed, we want to 180 // mark it a prematurely closed so that if we ever receive frames 181 // for this stream we can close the connection. 182 DCHECK(!stream->headers_decompressed()); 183 AddPrematurelyClosedStream(frame.stream_id); 184 return; 185 } 186 if (stream->stream_bytes_read() > 0 && !stream->headers_decompressed()) { 187 connection()->SendConnectionClose( 188 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED); 189 } 190 stream->OnStreamReset(frame.error_code); 191 } 192 193 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) { 194 DCHECK(frame.last_good_stream_id < next_stream_id_); 195 goaway_received_ = true; 196 } 197 198 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) { 199 DCHECK(!connection_->connected()); 200 if (error_ == QUIC_NO_ERROR) { 201 error_ = error; 202 } 203 204 while (!stream_map_.empty()) { 205 DataStreamMap::iterator it = stream_map_.begin(); 206 QuicStreamId id = it->first; 207 it->second->OnConnectionClosed(error, from_peer); 208 // The stream should call CloseStream as part of OnConnectionClosed. 209 if (stream_map_.find(id) != stream_map_.end()) { 210 LOG(DFATAL) << ENDPOINT 211 << "Stream failed to close under OnConnectionClosed"; 212 CloseStream(id); 213 } 214 } 215 } 216 217 bool QuicSession::OnCanWrite() { 218 // We latch this here rather than doing a traditional loop, because streams 219 // may be modifying the list as we loop. 220 int remaining_writes = write_blocked_streams_.NumBlockedStreams(); 221 222 while (!connection_->HasQueuedData() && 223 remaining_writes > 0) { 224 DCHECK(write_blocked_streams_.HasWriteBlockedStreams()); 225 if (!write_blocked_streams_.HasWriteBlockedStreams()) { 226 LOG(DFATAL) << "WriteBlockedStream is missing"; 227 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false); 228 return true; // We have no write blocked streams. 229 } 230 int index = write_blocked_streams_.GetHighestPriorityWriteBlockedList(); 231 QuicStreamId stream_id = write_blocked_streams_.PopFront(index); 232 if (stream_id == kCryptoStreamId) { 233 has_pending_handshake_ = false; // We just popped it. 234 } 235 ReliableQuicStream* stream = GetStream(stream_id); 236 if (stream != NULL) { 237 // If the stream can't write all bytes, it'll re-add itself to the blocked 238 // list. 239 stream->OnCanWrite(); 240 } 241 --remaining_writes; 242 } 243 244 return !write_blocked_streams_.HasWriteBlockedStreams(); 245 } 246 247 bool QuicSession::HasPendingHandshake() const { 248 return has_pending_handshake_; 249 } 250 251 QuicConsumedData QuicSession::WritevData( 252 QuicStreamId id, 253 const struct iovec* iov, 254 int iov_count, 255 QuicStreamOffset offset, 256 bool fin, 257 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 258 IOVector data; 259 data.AppendIovec(iov, iov_count); 260 return connection_->SendStreamData(id, data, offset, fin, 261 ack_notifier_delegate); 262 } 263 264 void QuicSession::SendRstStream(QuicStreamId id, 265 QuicRstStreamErrorCode error) { 266 connection_->SendRstStream(id, error); 267 CloseStreamInner(id, true); 268 } 269 270 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) { 271 goaway_sent_ = true; 272 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason); 273 } 274 275 void QuicSession::CloseStream(QuicStreamId stream_id) { 276 CloseStreamInner(stream_id, false); 277 } 278 279 void QuicSession::CloseStreamInner(QuicStreamId stream_id, 280 bool locally_reset) { 281 DVLOG(1) << ENDPOINT << "Closing stream " << stream_id; 282 283 DataStreamMap::iterator it = stream_map_.find(stream_id); 284 if (it == stream_map_.end()) { 285 DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id; 286 return; 287 } 288 QuicDataStream* stream = it->second; 289 if (connection_->connected() && !stream->headers_decompressed()) { 290 // If the stream is being closed locally (for example a client cancelling 291 // a request before receiving the response) then we need to make sure that 292 // we keep the stream alive long enough to process any response or 293 // RST_STREAM frames. 294 if (locally_reset && !is_server()) { 295 AddZombieStream(stream_id); 296 return; 297 } 298 299 // This stream has been closed before the headers were decompressed. 300 // This might cause problems with head of line blocking of headers. 301 // If the peer sent headers which were lost but we now close the stream 302 // we will never be able to decompress headers for other streams. 303 // To deal with this, we keep track of streams which have been closed 304 // prematurely. If we ever receive data frames for this steam, then we 305 // know there actually has been a problem and we close the connection. 306 AddPrematurelyClosedStream(stream->id()); 307 } 308 closed_streams_.push_back(it->second); 309 if (ContainsKey(zombie_streams_, stream->id())) { 310 zombie_streams_.erase(stream->id()); 311 } 312 stream_map_.erase(it); 313 stream->OnClose(); 314 } 315 316 void QuicSession::AddZombieStream(QuicStreamId stream_id) { 317 if (zombie_streams_.size() == kMaxZombieStreams) { 318 QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first; 319 CloseZombieStream(oldest_zombie_stream_id); 320 // However, since the headers still have not been decompressed, we want to 321 // mark it a prematurely closed so that if we ever receive frames 322 // for this stream we can close the connection. 323 AddPrematurelyClosedStream(oldest_zombie_stream_id); 324 } 325 zombie_streams_.insert(make_pair(stream_id, true)); 326 } 327 328 void QuicSession::CloseZombieStream(QuicStreamId stream_id) { 329 DCHECK(ContainsKey(zombie_streams_, stream_id)); 330 zombie_streams_.erase(stream_id); 331 QuicDataStream* stream = GetDataStream(stream_id); 332 if (!stream) { 333 return; 334 } 335 stream_map_.erase(stream_id); 336 stream->OnClose(); 337 closed_streams_.push_back(stream); 338 } 339 340 void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) { 341 if (prematurely_closed_streams_.size() == 342 kMaxPrematurelyClosedStreamsTracked) { 343 prematurely_closed_streams_.erase(prematurely_closed_streams_.begin()); 344 } 345 prematurely_closed_streams_.insert(make_pair(stream_id, true)); 346 } 347 348 bool QuicSession::IsEncryptionEstablished() { 349 return GetCryptoStream()->encryption_established(); 350 } 351 352 bool QuicSession::IsCryptoHandshakeConfirmed() { 353 return GetCryptoStream()->handshake_confirmed(); 354 } 355 356 void QuicSession::OnConfigNegotiated() { 357 connection_->SetFromConfig(config_); 358 } 359 360 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { 361 switch (event) { 362 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter 363 // to QuicSession since it is the glue. 364 case ENCRYPTION_FIRST_ESTABLISHED: 365 break; 366 367 case ENCRYPTION_REESTABLISHED: 368 // Retransmit originally packets that were sent, since they can't be 369 // decrypted by the peer. 370 connection_->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY); 371 break; 372 373 case HANDSHAKE_CONFIRMED: 374 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT 375 << "Handshake confirmed without parameter negotiation."; 376 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite()); 377 max_open_streams_ = config_.max_streams_per_connection(); 378 break; 379 380 default: 381 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event; 382 } 383 } 384 385 void QuicSession::OnCryptoHandshakeMessageSent( 386 const CryptoHandshakeMessage& message) { 387 } 388 389 void QuicSession::OnCryptoHandshakeMessageReceived( 390 const CryptoHandshakeMessage& message) { 391 } 392 393 QuicConfig* QuicSession::config() { 394 return &config_; 395 } 396 397 void QuicSession::ActivateStream(QuicDataStream* stream) { 398 DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size() 399 << ". activating " << stream->id(); 400 DCHECK_EQ(stream_map_.count(stream->id()), 0u); 401 stream_map_[stream->id()] = stream; 402 } 403 404 QuicStreamId QuicSession::GetNextStreamId() { 405 QuicStreamId id = next_stream_id_; 406 next_stream_id_ += 2; 407 return id; 408 } 409 410 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { 411 if (stream_id == kCryptoStreamId) { 412 return GetCryptoStream(); 413 } 414 return GetDataStream(stream_id); 415 } 416 417 QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) { 418 if (stream_id == kCryptoStreamId) { 419 DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id"; 420 return NULL; 421 } 422 423 DataStreamMap::iterator it = stream_map_.find(stream_id); 424 if (it != stream_map_.end()) { 425 return it->second; 426 } 427 428 if (IsClosedStream(stream_id)) { 429 return NULL; 430 } 431 432 if (stream_id % 2 == next_stream_id_ % 2) { 433 // We've received a frame for a locally-created stream that is not 434 // currently active. This is an error. 435 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM); 436 return NULL; 437 } 438 439 return GetIncomingReliableStream(stream_id); 440 } 441 442 QuicDataStream* QuicSession::GetIncomingReliableStream( 443 QuicStreamId stream_id) { 444 if (IsClosedStream(stream_id)) { 445 return NULL; 446 } 447 448 if (goaway_sent_) { 449 // We've already sent a GoAway 450 SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY); 451 return NULL; 452 } 453 454 implicitly_created_streams_.erase(stream_id); 455 if (stream_id > largest_peer_created_stream_id_) { 456 // TODO(rch) add unit test for this 457 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) { 458 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID); 459 return NULL; 460 } 461 if (largest_peer_created_stream_id_ == 0) { 462 largest_peer_created_stream_id_= 1; 463 } 464 for (QuicStreamId id = largest_peer_created_stream_id_ + 2; 465 id < stream_id; 466 id += 2) { 467 implicitly_created_streams_.insert(id); 468 } 469 largest_peer_created_stream_id_ = stream_id; 470 } 471 QuicDataStream* stream = CreateIncomingDataStream(stream_id); 472 if (stream == NULL) { 473 return NULL; 474 } 475 ActivateStream(stream); 476 return stream; 477 } 478 479 bool QuicSession::IsClosedStream(QuicStreamId id) { 480 DCHECK_NE(0u, id); 481 if (id == kCryptoStreamId) { 482 return false; 483 } 484 if (ContainsKey(zombie_streams_, id)) { 485 return true; 486 } 487 if (ContainsKey(stream_map_, id)) { 488 // Stream is active 489 return false; 490 } 491 if (id % 2 == next_stream_id_ % 2) { 492 // Locally created streams are strictly in-order. If the id is in the 493 // range of created streams and it's not active, it must have been closed. 494 return id < next_stream_id_; 495 } 496 // For peer created streams, we also need to consider implicitly created 497 // streams. 498 return id <= largest_peer_created_stream_id_ && 499 implicitly_created_streams_.count(id) == 0; 500 } 501 502 size_t QuicSession::GetNumOpenStreams() const { 503 return stream_map_.size() + implicitly_created_streams_.size() - 504 zombie_streams_.size(); 505 } 506 507 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) { 508 if (id == kCryptoStreamId) { 509 DCHECK(!has_pending_handshake_); 510 has_pending_handshake_ = true; 511 // TODO(jar): Be sure to use the highest priority for the crypto stream, 512 // perhaps by adding a "special" priority for it that is higher than 513 // kHighestPriority. 514 priority = kHighestPriority; 515 } 516 write_blocked_streams_.PushBack(id, priority); 517 } 518 519 bool QuicSession::HasQueuedData() const { 520 return write_blocked_streams_.NumBlockedStreams() || 521 connection_->HasQueuedData(); 522 } 523 524 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id, 525 QuicStreamId stream_id) { 526 decompression_blocked_streams_[header_id] = stream_id; 527 } 528 529 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) { 530 NOTIMPLEMENTED(); 531 return false; 532 } 533 534 void QuicSession::PostProcessAfterData() { 535 STLDeleteElements(&closed_streams_); 536 closed_streams_.clear(); 537 } 538 539 } // namespace net 540