1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/quic_session.h" 6 7 #include "base/stl_util.h" 8 #include "net/quic/crypto/proof_verifier.h" 9 #include "net/quic/quic_connection.h" 10 #include "net/quic/quic_flags.h" 11 #include "net/quic/quic_flow_controller.h" 12 #include "net/quic/quic_headers_stream.h" 13 #include "net/ssl/ssl_info.h" 14 15 using base::StringPiece; 16 using base::hash_map; 17 using base::hash_set; 18 using std::make_pair; 19 using std::vector; 20 21 namespace net { 22 23 #define ENDPOINT (is_server() ? "Server: " : " Client: ") 24 25 // We want to make sure we delete any closed streams in a safe manner. 26 // To avoid deleting a stream in mid-operation, we have a simple shim between 27 // us and the stream, so we can delete any streams when we return from 28 // processing. 29 // 30 // We could just override the base methods, but this makes it easier to make 31 // sure we don't miss any. 32 class VisitorShim : public QuicConnectionVisitorInterface { 33 public: 34 explicit VisitorShim(QuicSession* session) : session_(session) {} 35 36 virtual void OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE { 37 session_->OnStreamFrames(frames); 38 session_->PostProcessAfterData(); 39 } 40 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE { 41 session_->OnRstStream(frame); 42 session_->PostProcessAfterData(); 43 } 44 45 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE { 46 session_->OnGoAway(frame); 47 session_->PostProcessAfterData(); 48 } 49 50 virtual void OnWindowUpdateFrames(const vector<QuicWindowUpdateFrame>& frames) 51 OVERRIDE { 52 session_->OnWindowUpdateFrames(frames); 53 session_->PostProcessAfterData(); 54 } 55 56 virtual void OnBlockedFrames(const vector<QuicBlockedFrame>& frames) 57 OVERRIDE { 58 session_->OnBlockedFrames(frames); 59 session_->PostProcessAfterData(); 60 } 61 62 virtual void OnCanWrite() OVERRIDE { 63 session_->OnCanWrite(); 64 session_->PostProcessAfterData(); 65 } 66 67 virtual void OnCongestionWindowChange(QuicTime now) OVERRIDE { 68 session_->OnCongestionWindowChange(now); 69 } 70 71 virtual void OnSuccessfulVersionNegotiation( 72 const QuicVersion& version) OVERRIDE { 73 session_->OnSuccessfulVersionNegotiation(version); 74 } 75 76 virtual void OnConnectionClosed( 77 QuicErrorCode error, bool from_peer) OVERRIDE { 78 session_->OnConnectionClosed(error, from_peer); 79 // The session will go away, so don't bother with cleanup. 80 } 81 82 virtual void OnWriteBlocked() OVERRIDE { 83 session_->OnWriteBlocked(); 84 } 85 86 virtual bool WillingAndAbleToWrite() const OVERRIDE { 87 return session_->WillingAndAbleToWrite(); 88 } 89 90 virtual bool HasPendingHandshake() const OVERRIDE { 91 return session_->HasPendingHandshake(); 92 } 93 94 virtual bool HasOpenDataStreams() const OVERRIDE { 95 return session_->HasOpenDataStreams(); 96 } 97 98 private: 99 QuicSession* session_; 100 }; 101 102 QuicSession::QuicSession(QuicConnection* connection, const QuicConfig& config) 103 : connection_(connection), 104 visitor_shim_(new VisitorShim(this)), 105 config_(config), 106 max_open_streams_(config_.max_streams_per_connection()), 107 next_stream_id_(is_server() ? 2 : 5), 108 largest_peer_created_stream_id_(0), 109 error_(QUIC_NO_ERROR), 110 goaway_received_(false), 111 goaway_sent_(false), 112 has_pending_handshake_(false) { 113 if (connection_->version() <= QUIC_VERSION_19) { 114 flow_controller_.reset(new QuicFlowController( 115 connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow, 116 config_.GetInitialFlowControlWindowToSend(), 117 config_.GetInitialFlowControlWindowToSend())); 118 } else { 119 flow_controller_.reset(new QuicFlowController( 120 connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow, 121 config_.GetInitialSessionFlowControlWindowToSend(), 122 config_.GetInitialSessionFlowControlWindowToSend())); 123 } 124 } 125 126 void QuicSession::InitializeSession() { 127 connection_->set_visitor(visitor_shim_.get()); 128 connection_->SetFromConfig(config_); 129 if (connection_->connected()) { 130 connection_->SetOverallConnectionTimeout( 131 config_.max_time_before_crypto_handshake()); 132 } 133 headers_stream_.reset(new QuicHeadersStream(this)); 134 } 135 136 QuicSession::~QuicSession() { 137 STLDeleteElements(&closed_streams_); 138 STLDeleteValues(&stream_map_); 139 140 DLOG_IF(WARNING, 141 locally_closed_streams_highest_offset_.size() > max_open_streams_) 142 << "Surprisingly high number of locally closed streams still waiting for " 143 "final byte offset: " << locally_closed_streams_highest_offset_.size(); 144 } 145 146 void QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) { 147 for (size_t i = 0; i < frames.size(); ++i) { 148 // TODO(rch) deal with the error case of stream id 0. 149 const QuicStreamFrame& frame = frames[i]; 150 QuicStreamId stream_id = frame.stream_id; 151 ReliableQuicStream* stream = GetStream(stream_id); 152 if (!stream) { 153 // The stream no longer exists, but we may still be interested in the 154 // final stream byte offset sent by the peer. A frame with a FIN can give 155 // us this offset. 156 if (frame.fin) { 157 QuicStreamOffset final_byte_offset = 158 frame.offset + frame.data.TotalBufferSize(); 159 UpdateFlowControlOnFinalReceivedByteOffset(stream_id, 160 final_byte_offset); 161 } 162 163 continue; 164 } 165 stream->OnStreamFrame(frames[i]); 166 } 167 } 168 169 void QuicSession::OnStreamHeaders(QuicStreamId stream_id, 170 StringPiece headers_data) { 171 QuicDataStream* stream = GetDataStream(stream_id); 172 if (!stream) { 173 // It's quite possible to receive headers after a stream has been reset. 174 return; 175 } 176 stream->OnStreamHeaders(headers_data); 177 } 178 179 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id, 180 QuicPriority priority) { 181 QuicDataStream* stream = GetDataStream(stream_id); 182 if (!stream) { 183 // It's quite possible to receive headers after a stream has been reset. 184 return; 185 } 186 stream->OnStreamHeadersPriority(priority); 187 } 188 189 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id, 190 bool fin, 191 size_t frame_len) { 192 QuicDataStream* stream = GetDataStream(stream_id); 193 if (!stream) { 194 // It's quite possible to receive headers after a stream has been reset. 195 return; 196 } 197 stream->OnStreamHeadersComplete(fin, frame_len); 198 } 199 200 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) { 201 if (frame.stream_id == kCryptoStreamId) { 202 connection()->SendConnectionCloseWithDetails( 203 QUIC_INVALID_STREAM_ID, 204 "Attempt to reset the crypto stream"); 205 return; 206 } 207 if (frame.stream_id == kHeadersStreamId) { 208 connection()->SendConnectionCloseWithDetails( 209 QUIC_INVALID_STREAM_ID, 210 "Attempt to reset the headers stream"); 211 return; 212 } 213 214 QuicDataStream* stream = GetDataStream(frame.stream_id); 215 if (!stream) { 216 // The RST frame contains the final byte offset for the stream: we can now 217 // update the connection level flow controller if needed. 218 UpdateFlowControlOnFinalReceivedByteOffset(frame.stream_id, 219 frame.byte_offset); 220 return; // Errors are handled by GetStream. 221 } 222 223 stream->OnStreamReset(frame); 224 } 225 226 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) { 227 DCHECK(frame.last_good_stream_id < next_stream_id_); 228 goaway_received_ = true; 229 } 230 231 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) { 232 DCHECK(!connection_->connected()); 233 if (error_ == QUIC_NO_ERROR) { 234 error_ = error; 235 } 236 237 while (!stream_map_.empty()) { 238 DataStreamMap::iterator it = stream_map_.begin(); 239 QuicStreamId id = it->first; 240 it->second->OnConnectionClosed(error, from_peer); 241 // The stream should call CloseStream as part of OnConnectionClosed. 242 if (stream_map_.find(id) != stream_map_.end()) { 243 LOG(DFATAL) << ENDPOINT 244 << "Stream failed to close under OnConnectionClosed"; 245 CloseStream(id); 246 } 247 } 248 } 249 250 void QuicSession::OnWindowUpdateFrames( 251 const vector<QuicWindowUpdateFrame>& frames) { 252 bool connection_window_updated = false; 253 for (size_t i = 0; i < frames.size(); ++i) { 254 // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't 255 // assume that it still exists. 256 QuicStreamId stream_id = frames[i].stream_id; 257 if (stream_id == kConnectionLevelId) { 258 // This is a window update that applies to the connection, rather than an 259 // individual stream. 260 DVLOG(1) << ENDPOINT 261 << "Received connection level flow control window update with " 262 "byte offset: " << frames[i].byte_offset; 263 if (flow_controller_->UpdateSendWindowOffset(frames[i].byte_offset)) { 264 connection_window_updated = true; 265 } 266 continue; 267 } 268 269 if (connection_->version() < QUIC_VERSION_21 && 270 (stream_id == kCryptoStreamId || stream_id == kHeadersStreamId)) { 271 DLOG(DFATAL) << "WindowUpdate for stream " << stream_id << " in version " 272 << QuicVersionToString(connection_->version()); 273 return; 274 } 275 276 ReliableQuicStream* stream = GetStream(stream_id); 277 if (stream) { 278 stream->OnWindowUpdateFrame(frames[i]); 279 } 280 } 281 282 // Connection level flow control window has increased, so blocked streams can 283 // write again. 284 if (connection_window_updated) { 285 OnCanWrite(); 286 } 287 } 288 289 void QuicSession::OnBlockedFrames(const vector<QuicBlockedFrame>& frames) { 290 for (size_t i = 0; i < frames.size(); ++i) { 291 // TODO(rjshade): Compare our flow control receive windows for specified 292 // streams: if we have a large window then maybe something 293 // had gone wrong with the flow control accounting. 294 DVLOG(1) << ENDPOINT << "Received BLOCKED frame with stream id: " 295 << frames[i].stream_id; 296 } 297 } 298 299 void QuicSession::OnCanWrite() { 300 // We limit the number of writes to the number of pending streams. If more 301 // streams become pending, WillingAndAbleToWrite will be true, which will 302 // cause the connection to request resumption before yielding to other 303 // connections. 304 size_t num_writes = write_blocked_streams_.NumBlockedStreams(); 305 if (flow_controller_->IsBlocked()) { 306 // If we are connection level flow control blocked, then only allow the 307 // crypto and headers streams to try writing as all other streams will be 308 // blocked. 309 num_writes = 0; 310 if (write_blocked_streams_.crypto_stream_blocked()) { 311 num_writes += 1; 312 } 313 if (write_blocked_streams_.headers_stream_blocked()) { 314 num_writes += 1; 315 } 316 } 317 if (num_writes == 0) { 318 return; 319 } 320 321 QuicConnection::ScopedPacketBundler ack_bundler( 322 connection_.get(), QuicConnection::NO_ACK); 323 for (size_t i = 0; i < num_writes; ++i) { 324 if (!(write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() || 325 write_blocked_streams_.HasWriteBlockedDataStreams())) { 326 // Writing one stream removed another!? Something's broken. 327 LOG(DFATAL) << "WriteBlockedStream is missing"; 328 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false); 329 return; 330 } 331 if (!connection_->CanWriteStreamData()) { 332 return; 333 } 334 QuicStreamId stream_id = write_blocked_streams_.PopFront(); 335 if (stream_id == kCryptoStreamId) { 336 has_pending_handshake_ = false; // We just popped it. 337 } 338 ReliableQuicStream* stream = GetStream(stream_id); 339 if (stream != NULL && !stream->flow_controller()->IsBlocked()) { 340 // If the stream can't write all bytes, it'll re-add itself to the blocked 341 // list. 342 stream->OnCanWrite(); 343 } 344 } 345 } 346 347 bool QuicSession::WillingAndAbleToWrite() const { 348 // If the crypto or headers streams are blocked, we want to schedule a write - 349 // they don't get blocked by connection level flow control. Otherwise only 350 // schedule a write if we are not flow control blocked at the connection 351 // level. 352 return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() || 353 (!flow_controller_->IsBlocked() && 354 write_blocked_streams_.HasWriteBlockedDataStreams()); 355 } 356 357 bool QuicSession::HasPendingHandshake() const { 358 return has_pending_handshake_; 359 } 360 361 bool QuicSession::HasOpenDataStreams() const { 362 return GetNumOpenStreams() > 0; 363 } 364 365 QuicConsumedData QuicSession::WritevData( 366 QuicStreamId id, 367 const IOVector& data, 368 QuicStreamOffset offset, 369 bool fin, 370 FecProtection fec_protection, 371 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 372 return connection_->SendStreamData(id, data, offset, fin, fec_protection, 373 ack_notifier_delegate); 374 } 375 376 size_t QuicSession::WriteHeaders( 377 QuicStreamId id, 378 const SpdyHeaderBlock& headers, 379 bool fin, 380 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 381 return headers_stream_->WriteHeaders(id, headers, fin, ack_notifier_delegate); 382 } 383 384 void QuicSession::SendRstStream(QuicStreamId id, 385 QuicRstStreamErrorCode error, 386 QuicStreamOffset bytes_written) { 387 if (connection()->connected()) { 388 // Only send a RST_STREAM frame if still connected. 389 connection_->SendRstStream(id, error, bytes_written); 390 } 391 CloseStreamInner(id, true); 392 } 393 394 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) { 395 if (goaway_sent_) { 396 return; 397 } 398 goaway_sent_ = true; 399 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason); 400 } 401 402 void QuicSession::CloseStream(QuicStreamId stream_id) { 403 CloseStreamInner(stream_id, false); 404 } 405 406 void QuicSession::CloseStreamInner(QuicStreamId stream_id, 407 bool locally_reset) { 408 DVLOG(1) << ENDPOINT << "Closing stream " << stream_id; 409 410 DataStreamMap::iterator it = stream_map_.find(stream_id); 411 if (it == stream_map_.end()) { 412 DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id; 413 return; 414 } 415 QuicDataStream* stream = it->second; 416 417 // Tell the stream that a RST has been sent. 418 if (locally_reset) { 419 stream->set_rst_sent(true); 420 } 421 422 closed_streams_.push_back(it->second); 423 424 // If we haven't received a FIN or RST for this stream, we need to keep track 425 // of the how many bytes the stream's flow controller believes it has 426 // received, for accurate connection level flow control accounting. 427 if (!stream->HasFinalReceivedByteOffset() && 428 stream->flow_controller()->IsEnabled()) { 429 locally_closed_streams_highest_offset_[stream_id] = 430 stream->flow_controller()->highest_received_byte_offset(); 431 } 432 433 stream_map_.erase(it); 434 stream->OnClose(); 435 } 436 437 void QuicSession::UpdateFlowControlOnFinalReceivedByteOffset( 438 QuicStreamId stream_id, QuicStreamOffset final_byte_offset) { 439 map<QuicStreamId, QuicStreamOffset>::iterator it = 440 locally_closed_streams_highest_offset_.find(stream_id); 441 if (it == locally_closed_streams_highest_offset_.end()) { 442 return; 443 } 444 445 DVLOG(1) << ENDPOINT << "Received final byte offset " << final_byte_offset 446 << " for stream " << stream_id; 447 uint64 offset_diff = final_byte_offset - it->second; 448 if (flow_controller_->UpdateHighestReceivedOffset( 449 flow_controller_->highest_received_byte_offset() + offset_diff)) { 450 // If the final offset violates flow control, close the connection now. 451 if (flow_controller_->FlowControlViolation()) { 452 connection_->SendConnectionClose( 453 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); 454 return; 455 } 456 } 457 458 flow_controller_->AddBytesConsumed(offset_diff); 459 locally_closed_streams_highest_offset_.erase(it); 460 } 461 462 bool QuicSession::IsEncryptionEstablished() { 463 return GetCryptoStream()->encryption_established(); 464 } 465 466 bool QuicSession::IsCryptoHandshakeConfirmed() { 467 return GetCryptoStream()->handshake_confirmed(); 468 } 469 470 void QuicSession::OnConfigNegotiated() { 471 connection_->SetFromConfig(config_); 472 QuicVersion version = connection()->version(); 473 474 // A server should accept a small number of additional streams beyond the 475 // limit sent to the client. This helps avoid early connection termination 476 // when FIN/RSTs for old streams are lost or arrive out of order. 477 if (FLAGS_quic_allow_more_open_streams) { 478 set_max_open_streams((is_server() ? kMaxStreamsMultiplier : 1.0) * 479 config_.max_streams_per_connection()); 480 } 481 482 if (version <= QUIC_VERSION_16) { 483 return; 484 } 485 486 if (version <= QUIC_VERSION_19) { 487 // QUIC_VERSION_17,18,19 don't support independent stream/session flow 488 // control windows. 489 if (config_.HasReceivedInitialFlowControlWindowBytes()) { 490 // Streams which were created before the SHLO was received (0-RTT 491 // requests) are now informed of the peer's initial flow control window. 492 uint32 new_window = config_.ReceivedInitialFlowControlWindowBytes(); 493 OnNewStreamFlowControlWindow(new_window); 494 OnNewSessionFlowControlWindow(new_window); 495 } 496 497 return; 498 } 499 500 // QUIC_VERSION_21 and higher can have independent stream and session flow 501 // control windows. 502 if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) { 503 // Streams which were created before the SHLO was received (0-RTT 504 // requests) are now informed of the peer's initial flow control window. 505 OnNewStreamFlowControlWindow( 506 config_.ReceivedInitialStreamFlowControlWindowBytes()); 507 } 508 if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) { 509 OnNewSessionFlowControlWindow( 510 config_.ReceivedInitialSessionFlowControlWindowBytes()); 511 } 512 } 513 514 void QuicSession::OnNewStreamFlowControlWindow(uint32 new_window) { 515 if (new_window < kDefaultFlowControlSendWindow) { 516 LOG(ERROR) 517 << "Peer sent us an invalid stream flow control send window: " 518 << new_window << ", below default: " << kDefaultFlowControlSendWindow; 519 if (connection_->connected()) { 520 connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW); 521 } 522 return; 523 } 524 525 // Inform all existing streams about the new window. 526 if (connection_->version() >= QUIC_VERSION_21) { 527 GetCryptoStream()->UpdateSendWindowOffset(new_window); 528 headers_stream_->UpdateSendWindowOffset(new_window); 529 } 530 for (DataStreamMap::iterator it = stream_map_.begin(); 531 it != stream_map_.end(); ++it) { 532 it->second->UpdateSendWindowOffset(new_window); 533 } 534 } 535 536 void QuicSession::OnNewSessionFlowControlWindow(uint32 new_window) { 537 if (new_window < kDefaultFlowControlSendWindow) { 538 LOG(ERROR) 539 << "Peer sent us an invalid session flow control send window: " 540 << new_window << ", below default: " << kDefaultFlowControlSendWindow; 541 if (connection_->connected()) { 542 connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW); 543 } 544 return; 545 } 546 547 flow_controller_->UpdateSendWindowOffset(new_window); 548 } 549 550 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { 551 switch (event) { 552 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter 553 // to QuicSession since it is the glue. 554 case ENCRYPTION_FIRST_ESTABLISHED: 555 break; 556 557 case ENCRYPTION_REESTABLISHED: 558 // Retransmit originally packets that were sent, since they can't be 559 // decrypted by the peer. 560 connection_->RetransmitUnackedPackets(ALL_INITIAL_RETRANSMISSION); 561 break; 562 563 case HANDSHAKE_CONFIRMED: 564 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT 565 << "Handshake confirmed without parameter negotiation."; 566 // Discard originally encrypted packets, since they can't be decrypted by 567 // the peer. 568 connection_->NeuterUnencryptedPackets(); 569 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite()); 570 if (!FLAGS_quic_allow_more_open_streams) { 571 max_open_streams_ = config_.max_streams_per_connection(); 572 } 573 break; 574 575 default: 576 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event; 577 } 578 } 579 580 void QuicSession::OnCryptoHandshakeMessageSent( 581 const CryptoHandshakeMessage& message) { 582 } 583 584 void QuicSession::OnCryptoHandshakeMessageReceived( 585 const CryptoHandshakeMessage& message) { 586 } 587 588 QuicConfig* QuicSession::config() { 589 return &config_; 590 } 591 592 void QuicSession::ActivateStream(QuicDataStream* stream) { 593 DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size() 594 << ". activating " << stream->id(); 595 DCHECK_EQ(stream_map_.count(stream->id()), 0u); 596 stream_map_[stream->id()] = stream; 597 } 598 599 QuicStreamId QuicSession::GetNextStreamId() { 600 QuicStreamId id = next_stream_id_; 601 next_stream_id_ += 2; 602 return id; 603 } 604 605 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { 606 if (stream_id == kCryptoStreamId) { 607 return GetCryptoStream(); 608 } 609 if (stream_id == kHeadersStreamId) { 610 return headers_stream_.get(); 611 } 612 return GetDataStream(stream_id); 613 } 614 615 QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) { 616 if (stream_id == kCryptoStreamId) { 617 DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id"; 618 return NULL; 619 } 620 if (stream_id == kHeadersStreamId) { 621 DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id"; 622 return NULL; 623 } 624 625 DataStreamMap::iterator it = stream_map_.find(stream_id); 626 if (it != stream_map_.end()) { 627 return it->second; 628 } 629 630 if (IsClosedStream(stream_id)) { 631 return NULL; 632 } 633 634 if (stream_id % 2 == next_stream_id_ % 2) { 635 // We've received a frame for a locally-created stream that is not 636 // currently active. This is an error. 637 if (connection()->connected()) { 638 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM); 639 } 640 return NULL; 641 } 642 643 return GetIncomingDataStream(stream_id); 644 } 645 646 QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) { 647 if (IsClosedStream(stream_id)) { 648 return NULL; 649 } 650 651 implicitly_created_streams_.erase(stream_id); 652 if (stream_id > largest_peer_created_stream_id_) { 653 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) { 654 // We may already have sent a connection close due to multiple reset 655 // streams in the same packet. 656 if (connection()->connected()) { 657 LOG(ERROR) << "Trying to get stream: " << stream_id 658 << ", largest peer created stream: " 659 << largest_peer_created_stream_id_ 660 << ", max delta: " << kMaxStreamIdDelta; 661 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID); 662 } 663 return NULL; 664 } 665 if (largest_peer_created_stream_id_ == 0) { 666 if (is_server()) { 667 largest_peer_created_stream_id_= 3; 668 } else { 669 largest_peer_created_stream_id_= 1; 670 } 671 } 672 for (QuicStreamId id = largest_peer_created_stream_id_ + 2; 673 id < stream_id; 674 id += 2) { 675 implicitly_created_streams_.insert(id); 676 } 677 largest_peer_created_stream_id_ = stream_id; 678 } 679 QuicDataStream* stream = CreateIncomingDataStream(stream_id); 680 if (stream == NULL) { 681 return NULL; 682 } 683 ActivateStream(stream); 684 return stream; 685 } 686 687 void QuicSession::set_max_open_streams(size_t max_open_streams) { 688 DVLOG(1) << "Setting max_open_streams_ to " << max_open_streams; 689 max_open_streams_ = max_open_streams; 690 } 691 692 bool QuicSession::IsClosedStream(QuicStreamId id) { 693 DCHECK_NE(0u, id); 694 if (id == kCryptoStreamId) { 695 return false; 696 } 697 if (id == kHeadersStreamId) { 698 return false; 699 } 700 if (ContainsKey(stream_map_, id)) { 701 // Stream is active 702 return false; 703 } 704 if (id % 2 == next_stream_id_ % 2) { 705 // Locally created streams are strictly in-order. If the id is in the 706 // range of created streams and it's not active, it must have been closed. 707 return id < next_stream_id_; 708 } 709 // For peer created streams, we also need to consider implicitly created 710 // streams. 711 return id <= largest_peer_created_stream_id_ && 712 implicitly_created_streams_.count(id) == 0; 713 } 714 715 size_t QuicSession::GetNumOpenStreams() const { 716 return stream_map_.size() + implicitly_created_streams_.size(); 717 } 718 719 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) { 720 #ifndef NDEBUG 721 ReliableQuicStream* stream = GetStream(id); 722 if (stream != NULL) { 723 LOG_IF(DFATAL, priority != stream->EffectivePriority()) 724 << ENDPOINT << "Stream " << id 725 << "Priorities do not match. Got: " << priority 726 << " Expected: " << stream->EffectivePriority(); 727 } else { 728 LOG(DFATAL) << "Marking unknown stream " << id << " blocked."; 729 } 730 #endif 731 732 if (id == kCryptoStreamId) { 733 DCHECK(!has_pending_handshake_); 734 has_pending_handshake_ = true; 735 // TODO(jar): Be sure to use the highest priority for the crypto stream, 736 // perhaps by adding a "special" priority for it that is higher than 737 // kHighestPriority. 738 priority = kHighestPriority; 739 } 740 write_blocked_streams_.PushBack(id, priority); 741 } 742 743 bool QuicSession::HasDataToWrite() const { 744 return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() || 745 write_blocked_streams_.HasWriteBlockedDataStreams() || 746 connection_->HasQueuedData(); 747 } 748 749 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) const { 750 NOTIMPLEMENTED(); 751 return false; 752 } 753 754 void QuicSession::PostProcessAfterData() { 755 STLDeleteElements(&closed_streams_); 756 closed_streams_.clear(); 757 758 if (FLAGS_close_quic_connection_unfinished_streams_2 && 759 connection()->connected() && 760 locally_closed_streams_highest_offset_.size() > max_open_streams_) { 761 // A buggy client may fail to send FIN/RSTs. Don't tolerate this. 762 connection_->SendConnectionClose(QUIC_TOO_MANY_UNFINISHED_STREAMS); 763 } 764 } 765 766 void QuicSession::OnSuccessfulVersionNegotiation(const QuicVersion& version) { 767 if (version < QUIC_VERSION_19) { 768 flow_controller_->Disable(); 769 } 770 771 // Disable stream level flow control based on negotiated version. Streams may 772 // have been created with a different version. 773 if (version < QUIC_VERSION_21) { 774 GetCryptoStream()->flow_controller()->Disable(); 775 headers_stream_->flow_controller()->Disable(); 776 } 777 for (DataStreamMap::iterator it = stream_map_.begin(); 778 it != stream_map_.end(); ++it) { 779 if (version <= QUIC_VERSION_16) { 780 it->second->flow_controller()->Disable(); 781 } 782 } 783 } 784 785 } // namespace net 786