1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/quic_session.h" 6 7 #include "base/stl_util.h" 8 #include "net/quic/crypto/proof_verifier.h" 9 #include "net/quic/quic_connection.h" 10 #include "net/quic/quic_flags.h" 11 #include "net/quic/quic_flow_controller.h" 12 #include "net/quic/quic_headers_stream.h" 13 #include "net/ssl/ssl_info.h" 14 15 using base::StringPiece; 16 using base::hash_map; 17 using base::hash_set; 18 using std::make_pair; 19 using std::vector; 20 21 namespace net { 22 23 #define ENDPOINT (is_server() ? "Server: " : " Client: ") 24 25 // We want to make sure we delete any closed streams in a safe manner. 26 // To avoid deleting a stream in mid-operation, we have a simple shim between 27 // us and the stream, so we can delete any streams when we return from 28 // processing. 29 // 30 // We could just override the base methods, but this makes it easier to make 31 // sure we don't miss any. 32 class VisitorShim : public QuicConnectionVisitorInterface { 33 public: 34 explicit VisitorShim(QuicSession* session) : session_(session) {} 35 36 virtual void OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE { 37 session_->OnStreamFrames(frames); 38 session_->PostProcessAfterData(); 39 } 40 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE { 41 session_->OnRstStream(frame); 42 session_->PostProcessAfterData(); 43 } 44 45 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE { 46 session_->OnGoAway(frame); 47 session_->PostProcessAfterData(); 48 } 49 50 virtual void OnWindowUpdateFrames(const vector<QuicWindowUpdateFrame>& frames) 51 OVERRIDE { 52 session_->OnWindowUpdateFrames(frames); 53 session_->PostProcessAfterData(); 54 } 55 56 virtual void OnBlockedFrames(const vector<QuicBlockedFrame>& frames) 57 OVERRIDE { 58 session_->OnBlockedFrames(frames); 59 session_->PostProcessAfterData(); 60 } 61 62 virtual void OnCanWrite() OVERRIDE { 63 session_->OnCanWrite(); 64 session_->PostProcessAfterData(); 65 } 66 67 virtual void OnSuccessfulVersionNegotiation( 68 const QuicVersion& version) OVERRIDE { 69 session_->OnSuccessfulVersionNegotiation(version); 70 } 71 72 virtual void OnConnectionClosed( 73 QuicErrorCode error, bool from_peer) OVERRIDE { 74 session_->OnConnectionClosed(error, from_peer); 75 // The session will go away, so don't bother with cleanup. 76 } 77 78 virtual void OnWriteBlocked() OVERRIDE { 79 session_->OnWriteBlocked(); 80 } 81 82 virtual bool WillingAndAbleToWrite() const OVERRIDE { 83 return session_->WillingAndAbleToWrite(); 84 } 85 86 virtual bool HasPendingHandshake() const OVERRIDE { 87 return session_->HasPendingHandshake(); 88 } 89 90 virtual bool HasOpenDataStreams() const OVERRIDE { 91 return session_->HasOpenDataStreams(); 92 } 93 94 private: 95 QuicSession* session_; 96 }; 97 98 QuicSession::QuicSession(QuicConnection* connection, const QuicConfig& config) 99 : connection_(connection), 100 visitor_shim_(new VisitorShim(this)), 101 config_(config), 102 max_open_streams_(config_.max_streams_per_connection()), 103 next_stream_id_(is_server() ? 2 : 3), 104 largest_peer_created_stream_id_(0), 105 error_(QUIC_NO_ERROR), 106 goaway_received_(false), 107 goaway_sent_(false), 108 has_pending_handshake_(false) { 109 if (connection_->version() <= QUIC_VERSION_19) { 110 flow_controller_.reset(new QuicFlowController( 111 connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow, 112 config_.GetInitialFlowControlWindowToSend(), 113 config_.GetInitialFlowControlWindowToSend())); 114 } else { 115 flow_controller_.reset(new QuicFlowController( 116 connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow, 117 config_.GetInitialSessionFlowControlWindowToSend(), 118 config_.GetInitialSessionFlowControlWindowToSend())); 119 } 120 121 connection_->set_visitor(visitor_shim_.get()); 122 connection_->SetFromConfig(config_); 123 if (connection_->connected()) { 124 connection_->SetOverallConnectionTimeout( 125 config_.max_time_before_crypto_handshake()); 126 } 127 headers_stream_.reset(new QuicHeadersStream(this)); 128 if (!is_server()) { 129 // For version above QUIC v12, the headers stream is stream 3, so the 130 // next available local stream ID should be 5. 131 DCHECK_EQ(kHeadersStreamId, next_stream_id_); 132 next_stream_id_ += 2; 133 } 134 } 135 136 QuicSession::~QuicSession() { 137 STLDeleteElements(&closed_streams_); 138 STLDeleteValues(&stream_map_); 139 140 DLOG_IF(WARNING, 141 locally_closed_streams_highest_offset_.size() > max_open_streams_) 142 << "Surprisingly high number of locally closed streams still waiting for " 143 "final byte offset: " << locally_closed_streams_highest_offset_.size(); 144 } 145 146 void QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) { 147 for (size_t i = 0; i < frames.size(); ++i) { 148 // TODO(rch) deal with the error case of stream id 0. 149 const QuicStreamFrame& frame = frames[i]; 150 QuicStreamId stream_id = frame.stream_id; 151 ReliableQuicStream* stream = GetStream(stream_id); 152 if (!stream) { 153 // The stream no longer exists, but we may still be interested in the 154 // final stream byte offset sent by the peer. A frame with a FIN can give 155 // us this offset. 156 if (frame.fin) { 157 QuicStreamOffset final_byte_offset = 158 frame.offset + frame.data.TotalBufferSize(); 159 UpdateFlowControlOnFinalReceivedByteOffset(stream_id, 160 final_byte_offset); 161 } 162 163 continue; 164 } 165 stream->OnStreamFrame(frames[i]); 166 } 167 } 168 169 void QuicSession::OnStreamHeaders(QuicStreamId stream_id, 170 StringPiece headers_data) { 171 QuicDataStream* stream = GetDataStream(stream_id); 172 if (!stream) { 173 // It's quite possible to receive headers after a stream has been reset. 174 return; 175 } 176 stream->OnStreamHeaders(headers_data); 177 } 178 179 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id, 180 QuicPriority priority) { 181 QuicDataStream* stream = GetDataStream(stream_id); 182 if (!stream) { 183 // It's quite possible to receive headers after a stream has been reset. 184 return; 185 } 186 stream->OnStreamHeadersPriority(priority); 187 } 188 189 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id, 190 bool fin, 191 size_t frame_len) { 192 QuicDataStream* stream = GetDataStream(stream_id); 193 if (!stream) { 194 // It's quite possible to receive headers after a stream has been reset. 195 return; 196 } 197 stream->OnStreamHeadersComplete(fin, frame_len); 198 } 199 200 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) { 201 if (frame.stream_id == kCryptoStreamId) { 202 connection()->SendConnectionCloseWithDetails( 203 QUIC_INVALID_STREAM_ID, 204 "Attempt to reset the crypto stream"); 205 return; 206 } 207 if (frame.stream_id == kHeadersStreamId) { 208 connection()->SendConnectionCloseWithDetails( 209 QUIC_INVALID_STREAM_ID, 210 "Attempt to reset the headers stream"); 211 return; 212 } 213 214 QuicDataStream* stream = GetDataStream(frame.stream_id); 215 if (!stream) { 216 // The RST frame contains the final byte offset for the stream: we can now 217 // update the connection level flow controller if needed. 218 UpdateFlowControlOnFinalReceivedByteOffset(frame.stream_id, 219 frame.byte_offset); 220 return; // Errors are handled by GetStream. 221 } 222 223 stream->OnStreamReset(frame); 224 } 225 226 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) { 227 DCHECK(frame.last_good_stream_id < next_stream_id_); 228 goaway_received_ = true; 229 } 230 231 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) { 232 DCHECK(!connection_->connected()); 233 if (error_ == QUIC_NO_ERROR) { 234 error_ = error; 235 } 236 237 while (!stream_map_.empty()) { 238 DataStreamMap::iterator it = stream_map_.begin(); 239 QuicStreamId id = it->first; 240 it->second->OnConnectionClosed(error, from_peer); 241 // The stream should call CloseStream as part of OnConnectionClosed. 242 if (stream_map_.find(id) != stream_map_.end()) { 243 LOG(DFATAL) << ENDPOINT 244 << "Stream failed to close under OnConnectionClosed"; 245 CloseStream(id); 246 } 247 } 248 } 249 250 void QuicSession::OnWindowUpdateFrames( 251 const vector<QuicWindowUpdateFrame>& frames) { 252 bool connection_window_updated = false; 253 for (size_t i = 0; i < frames.size(); ++i) { 254 // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't 255 // assume that it still exists. 256 QuicStreamId stream_id = frames[i].stream_id; 257 if (stream_id == 0) { 258 // This is a window update that applies to the connection, rather than an 259 // individual stream. 260 DVLOG(1) << ENDPOINT 261 << "Received connection level flow control window update with " 262 "byte offset: " << frames[i].byte_offset; 263 if (FLAGS_enable_quic_connection_flow_control_2 && 264 flow_controller_->UpdateSendWindowOffset(frames[i].byte_offset)) { 265 connection_window_updated = true; 266 } 267 continue; 268 } 269 270 QuicDataStream* stream = GetDataStream(stream_id); 271 if (stream) { 272 stream->OnWindowUpdateFrame(frames[i]); 273 } 274 } 275 276 // Connection level flow control window has increased, so blocked streams can 277 // write again. 278 if (connection_window_updated) { 279 OnCanWrite(); 280 } 281 } 282 283 void QuicSession::OnBlockedFrames(const vector<QuicBlockedFrame>& frames) { 284 for (size_t i = 0; i < frames.size(); ++i) { 285 // TODO(rjshade): Compare our flow control receive windows for specified 286 // streams: if we have a large window then maybe something 287 // had gone wrong with the flow control accounting. 288 DVLOG(1) << ENDPOINT << "Received BLOCKED frame with stream id: " 289 << frames[i].stream_id; 290 } 291 } 292 293 void QuicSession::OnCanWrite() { 294 // We limit the number of writes to the number of pending streams. If more 295 // streams become pending, WillingAndAbleToWrite will be true, which will 296 // cause the connection to request resumption before yielding to other 297 // connections. 298 size_t num_writes = write_blocked_streams_.NumBlockedStreams(); 299 if (flow_controller_->IsBlocked()) { 300 // If we are connection level flow control blocked, then only allow the 301 // crypto and headers streams to try writing as all other streams will be 302 // blocked. 303 num_writes = 0; 304 if (write_blocked_streams_.crypto_stream_blocked()) { 305 num_writes += 1; 306 } 307 if (write_blocked_streams_.headers_stream_blocked()) { 308 num_writes += 1; 309 } 310 } 311 if (num_writes == 0) { 312 return; 313 } 314 315 QuicConnection::ScopedPacketBundler ack_bundler( 316 connection_.get(), QuicConnection::NO_ACK); 317 for (size_t i = 0; i < num_writes; ++i) { 318 if (!(write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() || 319 write_blocked_streams_.HasWriteBlockedDataStreams())) { 320 // Writing one stream removed another!? Something's broken. 321 LOG(DFATAL) << "WriteBlockedStream is missing"; 322 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false); 323 return; 324 } 325 if (!connection_->CanWriteStreamData()) { 326 return; 327 } 328 QuicStreamId stream_id = write_blocked_streams_.PopFront(); 329 if (stream_id == kCryptoStreamId) { 330 has_pending_handshake_ = false; // We just popped it. 331 } 332 ReliableQuicStream* stream = GetStream(stream_id); 333 if (stream != NULL && !stream->flow_controller()->IsBlocked()) { 334 // If the stream can't write all bytes, it'll re-add itself to the blocked 335 // list. 336 stream->OnCanWrite(); 337 } 338 } 339 } 340 341 bool QuicSession::WillingAndAbleToWrite() const { 342 // If the crypto or headers streams are blocked, we want to schedule a write - 343 // they don't get blocked by connection level flow control. Otherwise only 344 // schedule a write if we are not flow control blocked at the connection 345 // level. 346 return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() || 347 (!flow_controller_->IsBlocked() && 348 write_blocked_streams_.HasWriteBlockedDataStreams()); 349 } 350 351 bool QuicSession::HasPendingHandshake() const { 352 return has_pending_handshake_; 353 } 354 355 bool QuicSession::HasOpenDataStreams() const { 356 return GetNumOpenStreams() > 0; 357 } 358 359 QuicConsumedData QuicSession::WritevData( 360 QuicStreamId id, 361 const IOVector& data, 362 QuicStreamOffset offset, 363 bool fin, 364 FecProtection fec_protection, 365 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 366 return connection_->SendStreamData(id, data, offset, fin, fec_protection, 367 ack_notifier_delegate); 368 } 369 370 size_t QuicSession::WriteHeaders( 371 QuicStreamId id, 372 const SpdyHeaderBlock& headers, 373 bool fin, 374 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 375 return headers_stream_->WriteHeaders(id, headers, fin, ack_notifier_delegate); 376 } 377 378 void QuicSession::SendRstStream(QuicStreamId id, 379 QuicRstStreamErrorCode error, 380 QuicStreamOffset bytes_written) { 381 if (connection()->connected()) { 382 // Only send a RST_STREAM frame if still connected. 383 connection_->SendRstStream(id, error, bytes_written); 384 } 385 CloseStreamInner(id, true); 386 } 387 388 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) { 389 if (goaway_sent_) { 390 return; 391 } 392 goaway_sent_ = true; 393 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason); 394 } 395 396 void QuicSession::CloseStream(QuicStreamId stream_id) { 397 CloseStreamInner(stream_id, false); 398 } 399 400 void QuicSession::CloseStreamInner(QuicStreamId stream_id, 401 bool locally_reset) { 402 DVLOG(1) << ENDPOINT << "Closing stream " << stream_id; 403 404 DataStreamMap::iterator it = stream_map_.find(stream_id); 405 if (it == stream_map_.end()) { 406 DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id; 407 return; 408 } 409 QuicDataStream* stream = it->second; 410 411 // Tell the stream that a RST has been sent. 412 if (locally_reset) { 413 stream->set_rst_sent(true); 414 } 415 416 closed_streams_.push_back(it->second); 417 418 // If we haven't received a FIN or RST for this stream, we need to keep track 419 // of the how many bytes the stream's flow controller believes it has 420 // received, for accurate connection level flow control accounting. 421 if (!stream->HasFinalReceivedByteOffset() && 422 stream->flow_controller()->IsEnabled() && 423 FLAGS_enable_quic_connection_flow_control_2) { 424 locally_closed_streams_highest_offset_[stream_id] = 425 stream->flow_controller()->highest_received_byte_offset(); 426 } 427 428 stream_map_.erase(it); 429 stream->OnClose(); 430 } 431 432 void QuicSession::UpdateFlowControlOnFinalReceivedByteOffset( 433 QuicStreamId stream_id, QuicStreamOffset final_byte_offset) { 434 if (!FLAGS_enable_quic_connection_flow_control_2) { 435 return; 436 } 437 438 map<QuicStreamId, QuicStreamOffset>::iterator it = 439 locally_closed_streams_highest_offset_.find(stream_id); 440 if (it == locally_closed_streams_highest_offset_.end()) { 441 return; 442 } 443 444 DVLOG(1) << ENDPOINT << "Received final byte offset " << final_byte_offset 445 << " for stream " << stream_id; 446 uint64 offset_diff = final_byte_offset - it->second; 447 if (flow_controller_->UpdateHighestReceivedOffset( 448 flow_controller_->highest_received_byte_offset() + offset_diff)) { 449 // If the final offset violates flow control, close the connection now. 450 if (flow_controller_->FlowControlViolation()) { 451 connection_->SendConnectionClose( 452 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); 453 return; 454 } 455 } 456 457 flow_controller_->AddBytesConsumed(offset_diff); 458 locally_closed_streams_highest_offset_.erase(it); 459 } 460 461 bool QuicSession::IsEncryptionEstablished() { 462 return GetCryptoStream()->encryption_established(); 463 } 464 465 bool QuicSession::IsCryptoHandshakeConfirmed() { 466 return GetCryptoStream()->handshake_confirmed(); 467 } 468 469 void QuicSession::OnConfigNegotiated() { 470 connection_->SetFromConfig(config_); 471 QuicVersion version = connection()->version(); 472 if (version < QUIC_VERSION_17) { 473 return; 474 } 475 476 if (version <= QUIC_VERSION_19) { 477 // QUIC_VERSION_17,18,19 don't support independent stream/session flow 478 // control windows. 479 if (config_.HasReceivedInitialFlowControlWindowBytes()) { 480 // Streams which were created before the SHLO was received (0-RTT 481 // requests) are now informed of the peer's initial flow control window. 482 uint32 new_window = config_.ReceivedInitialFlowControlWindowBytes(); 483 OnNewStreamFlowControlWindow(new_window); 484 OnNewSessionFlowControlWindow(new_window); 485 } 486 487 return; 488 } 489 490 // QUIC_VERSION_20 and higher can have independent stream and session flow 491 // control windows. 492 if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) { 493 // Streams which were created before the SHLO was received (0-RTT 494 // requests) are now informed of the peer's initial flow control window. 495 OnNewStreamFlowControlWindow( 496 config_.ReceivedInitialStreamFlowControlWindowBytes()); 497 } 498 if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) { 499 OnNewSessionFlowControlWindow( 500 config_.ReceivedInitialSessionFlowControlWindowBytes()); 501 } 502 } 503 504 void QuicSession::OnNewStreamFlowControlWindow(uint32 new_window) { 505 if (new_window < kDefaultFlowControlSendWindow) { 506 LOG(ERROR) 507 << "Peer sent us an invalid stream flow control send window: " 508 << new_window << ", below default: " << kDefaultFlowControlSendWindow; 509 if (connection_->connected()) { 510 connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW); 511 } 512 return; 513 } 514 515 for (DataStreamMap::iterator it = stream_map_.begin(); 516 it != stream_map_.end(); ++it) { 517 it->second->flow_controller()->UpdateSendWindowOffset(new_window); 518 } 519 } 520 521 void QuicSession::OnNewSessionFlowControlWindow(uint32 new_window) { 522 if (new_window < kDefaultFlowControlSendWindow) { 523 LOG(ERROR) 524 << "Peer sent us an invalid session flow control send window: " 525 << new_window << ", below default: " << kDefaultFlowControlSendWindow; 526 if (connection_->connected()) { 527 connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW); 528 } 529 return; 530 } 531 532 flow_controller_->UpdateSendWindowOffset(new_window); 533 } 534 535 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { 536 switch (event) { 537 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter 538 // to QuicSession since it is the glue. 539 case ENCRYPTION_FIRST_ESTABLISHED: 540 break; 541 542 case ENCRYPTION_REESTABLISHED: 543 // Retransmit originally packets that were sent, since they can't be 544 // decrypted by the peer. 545 connection_->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY); 546 break; 547 548 case HANDSHAKE_CONFIRMED: 549 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT 550 << "Handshake confirmed without parameter negotiation."; 551 // Discard originally encrypted packets, since they can't be decrypted by 552 // the peer. 553 connection_->NeuterUnencryptedPackets(); 554 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite()); 555 max_open_streams_ = config_.max_streams_per_connection(); 556 break; 557 558 default: 559 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event; 560 } 561 } 562 563 void QuicSession::OnCryptoHandshakeMessageSent( 564 const CryptoHandshakeMessage& message) { 565 } 566 567 void QuicSession::OnCryptoHandshakeMessageReceived( 568 const CryptoHandshakeMessage& message) { 569 } 570 571 QuicConfig* QuicSession::config() { 572 return &config_; 573 } 574 575 void QuicSession::ActivateStream(QuicDataStream* stream) { 576 DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size() 577 << ". activating " << stream->id(); 578 DCHECK_EQ(stream_map_.count(stream->id()), 0u); 579 stream_map_[stream->id()] = stream; 580 } 581 582 QuicStreamId QuicSession::GetNextStreamId() { 583 QuicStreamId id = next_stream_id_; 584 next_stream_id_ += 2; 585 return id; 586 } 587 588 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) { 589 if (stream_id == kCryptoStreamId) { 590 return GetCryptoStream(); 591 } 592 if (stream_id == kHeadersStreamId) { 593 return headers_stream_.get(); 594 } 595 return GetDataStream(stream_id); 596 } 597 598 QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) { 599 if (stream_id == kCryptoStreamId) { 600 DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id"; 601 return NULL; 602 } 603 if (stream_id == kHeadersStreamId) { 604 DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id"; 605 return NULL; 606 } 607 608 DataStreamMap::iterator it = stream_map_.find(stream_id); 609 if (it != stream_map_.end()) { 610 return it->second; 611 } 612 613 if (IsClosedStream(stream_id)) { 614 return NULL; 615 } 616 617 if (stream_id % 2 == next_stream_id_ % 2) { 618 // We've received a frame for a locally-created stream that is not 619 // currently active. This is an error. 620 if (connection()->connected()) { 621 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM); 622 } 623 return NULL; 624 } 625 626 return GetIncomingDataStream(stream_id); 627 } 628 629 QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) { 630 if (IsClosedStream(stream_id)) { 631 return NULL; 632 } 633 634 implicitly_created_streams_.erase(stream_id); 635 if (stream_id > largest_peer_created_stream_id_) { 636 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) { 637 // We may already have sent a connection close due to multiple reset 638 // streams in the same packet. 639 if (connection()->connected()) { 640 LOG(ERROR) << "Trying to get stream: " << stream_id 641 << ", largest peer created stream: " 642 << largest_peer_created_stream_id_ 643 << ", max delta: " << kMaxStreamIdDelta; 644 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID); 645 } 646 return NULL; 647 } 648 if (largest_peer_created_stream_id_ == 0) { 649 if (is_server()) { 650 largest_peer_created_stream_id_= 3; 651 } else { 652 largest_peer_created_stream_id_= 1; 653 } 654 } 655 for (QuicStreamId id = largest_peer_created_stream_id_ + 2; 656 id < stream_id; 657 id += 2) { 658 implicitly_created_streams_.insert(id); 659 } 660 largest_peer_created_stream_id_ = stream_id; 661 } 662 QuicDataStream* stream = CreateIncomingDataStream(stream_id); 663 if (stream == NULL) { 664 return NULL; 665 } 666 ActivateStream(stream); 667 return stream; 668 } 669 670 bool QuicSession::IsClosedStream(QuicStreamId id) { 671 DCHECK_NE(0u, id); 672 if (id == kCryptoStreamId) { 673 return false; 674 } 675 if (id == kHeadersStreamId) { 676 return false; 677 } 678 if (ContainsKey(stream_map_, id)) { 679 // Stream is active 680 return false; 681 } 682 if (id % 2 == next_stream_id_ % 2) { 683 // Locally created streams are strictly in-order. If the id is in the 684 // range of created streams and it's not active, it must have been closed. 685 return id < next_stream_id_; 686 } 687 // For peer created streams, we also need to consider implicitly created 688 // streams. 689 return id <= largest_peer_created_stream_id_ && 690 implicitly_created_streams_.count(id) == 0; 691 } 692 693 size_t QuicSession::GetNumOpenStreams() const { 694 return stream_map_.size() + implicitly_created_streams_.size(); 695 } 696 697 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) { 698 #ifndef NDEBUG 699 ReliableQuicStream* stream = GetStream(id); 700 if (stream != NULL) { 701 LOG_IF(DFATAL, priority != stream->EffectivePriority()) 702 << ENDPOINT << "Stream " << id 703 << "Priorities do not match. Got: " << priority 704 << " Expected: " << stream->EffectivePriority(); 705 } else { 706 LOG(DFATAL) << "Marking unknown stream " << id << " blocked."; 707 } 708 #endif 709 710 if (id == kCryptoStreamId) { 711 DCHECK(!has_pending_handshake_); 712 has_pending_handshake_ = true; 713 // TODO(jar): Be sure to use the highest priority for the crypto stream, 714 // perhaps by adding a "special" priority for it that is higher than 715 // kHighestPriority. 716 priority = kHighestPriority; 717 } 718 write_blocked_streams_.PushBack(id, priority); 719 } 720 721 bool QuicSession::HasDataToWrite() const { 722 return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() || 723 write_blocked_streams_.HasWriteBlockedDataStreams() || 724 connection_->HasQueuedData(); 725 } 726 727 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) const { 728 NOTIMPLEMENTED(); 729 return false; 730 } 731 732 void QuicSession::PostProcessAfterData() { 733 STLDeleteElements(&closed_streams_); 734 closed_streams_.clear(); 735 } 736 737 void QuicSession::OnSuccessfulVersionNegotiation(const QuicVersion& version) { 738 if (version < QUIC_VERSION_19) { 739 flow_controller_->Disable(); 740 } 741 742 // Inform all streams about the negotiated version. They may have been created 743 // with a different version. 744 for (DataStreamMap::iterator it = stream_map_.begin(); 745 it != stream_map_.end(); ++it) { 746 if (version < QUIC_VERSION_17) { 747 it->second->flow_controller()->Disable(); 748 } 749 } 750 } 751 752 } // namespace net 753