1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/spdy/spdy_session.h" 6 7 #include <algorithm> 8 #include <map> 9 10 #include "base/basictypes.h" 11 #include "base/bind.h" 12 #include "base/compiler_specific.h" 13 #include "base/logging.h" 14 #include "base/message_loop/message_loop.h" 15 #include "base/metrics/field_trial.h" 16 #include "base/metrics/histogram.h" 17 #include "base/metrics/sparse_histogram.h" 18 #include "base/metrics/stats_counters.h" 19 #include "base/stl_util.h" 20 #include "base/strings/string_number_conversions.h" 21 #include "base/strings/string_util.h" 22 #include "base/strings/stringprintf.h" 23 #include "base/strings/utf_string_conversions.h" 24 #include "base/time/time.h" 25 #include "base/values.h" 26 #include "crypto/ec_private_key.h" 27 #include "crypto/ec_signature_creator.h" 28 #include "net/base/connection_type_histograms.h" 29 #include "net/base/net_log.h" 30 #include "net/base/net_util.h" 31 #include "net/cert/asn1_util.h" 32 #include "net/http/http_network_session.h" 33 #include "net/http/http_server_properties.h" 34 #include "net/spdy/spdy_buffer_producer.h" 35 #include "net/spdy/spdy_frame_builder.h" 36 #include "net/spdy/spdy_http_utils.h" 37 #include "net/spdy/spdy_protocol.h" 38 #include "net/spdy/spdy_session_pool.h" 39 #include "net/spdy/spdy_stream.h" 40 #include "net/ssl/server_bound_cert_service.h" 41 42 namespace net { 43 44 namespace { 45 46 const int kReadBufferSize = 8 * 1024; 47 const int kDefaultConnectionAtRiskOfLossSeconds = 10; 48 const int kHungIntervalSeconds = 10; 49 50 // Always start at 1 for the first stream id. 51 const SpdyStreamId kFirstStreamId = 1; 52 53 // Minimum seconds that unclaimed pushed streams will be kept in memory. 54 const int kMinPushedStreamLifetimeSeconds = 300; 55 56 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue( 57 const SpdyHeaderBlock& headers) { 58 scoped_ptr<base::ListValue> headers_list(new base::ListValue()); 59 for (SpdyHeaderBlock::const_iterator it = headers.begin(); 60 it != headers.end(); ++it) { 61 headers_list->AppendString( 62 it->first + ": " + 63 (ShouldShowHttpHeaderValue(it->first) ? it->second : "[elided]")); 64 } 65 return headers_list.Pass(); 66 } 67 68 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers, 69 bool fin, 70 bool unidirectional, 71 SpdyPriority spdy_priority, 72 SpdyStreamId stream_id, 73 NetLog::LogLevel /* log_level */) { 74 base::DictionaryValue* dict = new base::DictionaryValue(); 75 dict->Set("headers", SpdyHeaderBlockToListValue(*headers).release()); 76 dict->SetBoolean("fin", fin); 77 dict->SetBoolean("unidirectional", unidirectional); 78 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority)); 79 dict->SetInteger("stream_id", stream_id); 80 return dict; 81 } 82 83 base::Value* NetLogSpdySynStreamReceivedCallback( 84 const SpdyHeaderBlock* headers, 85 bool fin, 86 bool unidirectional, 87 SpdyPriority spdy_priority, 88 SpdyStreamId stream_id, 89 SpdyStreamId associated_stream, 90 NetLog::LogLevel /* log_level */) { 91 base::DictionaryValue* dict = new base::DictionaryValue(); 92 dict->Set("headers", SpdyHeaderBlockToListValue(*headers).release()); 93 dict->SetBoolean("fin", fin); 94 dict->SetBoolean("unidirectional", unidirectional); 95 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority)); 96 dict->SetInteger("stream_id", stream_id); 97 dict->SetInteger("associated_stream", associated_stream); 98 return dict; 99 } 100 101 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback( 102 const SpdyHeaderBlock* headers, 103 bool fin, 104 SpdyStreamId stream_id, 105 NetLog::LogLevel /* log_level */) { 106 base::DictionaryValue* dict = new base::DictionaryValue(); 107 dict->Set("headers", SpdyHeaderBlockToListValue(*headers).release()); 108 dict->SetBoolean("fin", fin); 109 dict->SetInteger("stream_id", stream_id); 110 return dict; 111 } 112 113 base::Value* NetLogSpdySessionCloseCallback(int net_error, 114 const std::string* description, 115 NetLog::LogLevel /* log_level */) { 116 base::DictionaryValue* dict = new base::DictionaryValue(); 117 dict->SetInteger("net_error", net_error); 118 dict->SetString("description", *description); 119 return dict; 120 } 121 122 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair, 123 NetLog::LogLevel /* log_level */) { 124 base::DictionaryValue* dict = new base::DictionaryValue(); 125 dict->SetString("host", host_pair->first.ToString()); 126 dict->SetString("proxy", host_pair->second.ToPacString()); 127 return dict; 128 } 129 130 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair, 131 bool clear_persisted, 132 NetLog::LogLevel /* log_level */) { 133 base::DictionaryValue* dict = new base::DictionaryValue(); 134 dict->SetString("host", host_port_pair.ToString()); 135 dict->SetBoolean("clear_persisted", clear_persisted); 136 return dict; 137 } 138 139 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id, 140 SpdySettingsFlags flags, 141 uint32 value, 142 NetLog::LogLevel /* log_level */) { 143 base::DictionaryValue* dict = new base::DictionaryValue(); 144 dict->SetInteger("id", id); 145 dict->SetInteger("flags", flags); 146 dict->SetInteger("value", value); 147 return dict; 148 } 149 150 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings, 151 NetLog::LogLevel /* log_level */) { 152 base::DictionaryValue* dict = new base::DictionaryValue(); 153 base::ListValue* settings_list = new base::ListValue(); 154 for (SettingsMap::const_iterator it = settings->begin(); 155 it != settings->end(); ++it) { 156 const SpdySettingsIds id = it->first; 157 const SpdySettingsFlags flags = it->second.first; 158 const uint32 value = it->second.second; 159 settings_list->Append(new base::StringValue( 160 base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value))); 161 } 162 dict->Set("settings", settings_list); 163 return dict; 164 } 165 166 base::Value* NetLogSpdyWindowUpdateFrameCallback( 167 SpdyStreamId stream_id, 168 uint32 delta, 169 NetLog::LogLevel /* log_level */) { 170 base::DictionaryValue* dict = new base::DictionaryValue(); 171 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 172 dict->SetInteger("delta", delta); 173 return dict; 174 } 175 176 base::Value* NetLogSpdySessionWindowUpdateCallback( 177 int32 delta, 178 int32 window_size, 179 NetLog::LogLevel /* log_level */) { 180 base::DictionaryValue* dict = new base::DictionaryValue(); 181 dict->SetInteger("delta", delta); 182 dict->SetInteger("window_size", window_size); 183 return dict; 184 } 185 186 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id, 187 int size, 188 bool fin, 189 NetLog::LogLevel /* log_level */) { 190 base::DictionaryValue* dict = new base::DictionaryValue(); 191 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 192 dict->SetInteger("size", size); 193 dict->SetBoolean("fin", fin); 194 return dict; 195 } 196 197 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id, 198 int status, 199 const std::string* description, 200 NetLog::LogLevel /* log_level */) { 201 base::DictionaryValue* dict = new base::DictionaryValue(); 202 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 203 dict->SetInteger("status", status); 204 dict->SetString("description", *description); 205 return dict; 206 } 207 208 base::Value* NetLogSpdyPingCallback(uint32 unique_id, 209 const char* type, 210 NetLog::LogLevel /* log_level */) { 211 base::DictionaryValue* dict = new base::DictionaryValue(); 212 dict->SetInteger("unique_id", unique_id); 213 dict->SetString("type", type); 214 return dict; 215 } 216 217 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id, 218 int active_streams, 219 int unclaimed_streams, 220 SpdyGoAwayStatus status, 221 NetLog::LogLevel /* log_level */) { 222 base::DictionaryValue* dict = new base::DictionaryValue(); 223 dict->SetInteger("last_accepted_stream_id", 224 static_cast<int>(last_stream_id)); 225 dict->SetInteger("active_streams", active_streams); 226 dict->SetInteger("unclaimed_streams", unclaimed_streams); 227 dict->SetInteger("status", static_cast<int>(status)); 228 return dict; 229 } 230 231 // Helper function to return the total size of an array of objects 232 // with .size() member functions. 233 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) { 234 size_t total_size = 0; 235 for (size_t i = 0; i < N; ++i) { 236 total_size += arr[i].size(); 237 } 238 return total_size; 239 } 240 241 // Helper class for std:find_if on STL container containing 242 // SpdyStreamRequest weak pointers. 243 class RequestEquals { 244 public: 245 RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request) 246 : request_(request) {} 247 248 bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const { 249 return request_.get() == request.get(); 250 } 251 252 private: 253 const base::WeakPtr<SpdyStreamRequest> request_; 254 }; 255 256 // The maximum number of concurrent streams we will ever create. Even if 257 // the server permits more, we will never exceed this limit. 258 const size_t kMaxConcurrentStreamLimit = 256; 259 260 } // namespace 261 262 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) { 263 Reset(); 264 } 265 266 SpdyStreamRequest::~SpdyStreamRequest() { 267 CancelRequest(); 268 } 269 270 int SpdyStreamRequest::StartRequest( 271 SpdyStreamType type, 272 const base::WeakPtr<SpdySession>& session, 273 const GURL& url, 274 RequestPriority priority, 275 const BoundNetLog& net_log, 276 const CompletionCallback& callback) { 277 DCHECK(session); 278 DCHECK(!session_); 279 DCHECK(!stream_); 280 DCHECK(callback_.is_null()); 281 282 type_ = type; 283 session_ = session; 284 url_ = url; 285 priority_ = priority; 286 net_log_ = net_log; 287 callback_ = callback; 288 289 base::WeakPtr<SpdyStream> stream; 290 int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream); 291 if (rv == OK) { 292 Reset(); 293 stream_ = stream; 294 } 295 return rv; 296 } 297 298 void SpdyStreamRequest::CancelRequest() { 299 if (session_) 300 session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr()); 301 Reset(); 302 // Do this to cancel any pending CompleteStreamRequest() tasks. 303 weak_ptr_factory_.InvalidateWeakPtrs(); 304 } 305 306 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() { 307 DCHECK(!session_); 308 base::WeakPtr<SpdyStream> stream = stream_; 309 DCHECK(stream); 310 Reset(); 311 return stream; 312 } 313 314 void SpdyStreamRequest::OnRequestCompleteSuccess( 315 const base::WeakPtr<SpdyStream>& stream) { 316 DCHECK(session_); 317 DCHECK(!stream_); 318 DCHECK(!callback_.is_null()); 319 CompletionCallback callback = callback_; 320 Reset(); 321 DCHECK(stream); 322 stream_ = stream; 323 callback.Run(OK); 324 } 325 326 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) { 327 DCHECK(session_); 328 DCHECK(!stream_); 329 DCHECK(!callback_.is_null()); 330 CompletionCallback callback = callback_; 331 Reset(); 332 DCHECK_NE(rv, OK); 333 callback.Run(rv); 334 } 335 336 void SpdyStreamRequest::Reset() { 337 type_ = SPDY_BIDIRECTIONAL_STREAM; 338 session_.reset(); 339 stream_.reset(); 340 url_ = GURL(); 341 priority_ = MINIMUM_PRIORITY; 342 net_log_ = BoundNetLog(); 343 callback_.Reset(); 344 } 345 346 SpdySession::ActiveStreamInfo::ActiveStreamInfo() 347 : stream(NULL), 348 waiting_for_syn_reply(false) {} 349 350 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream) 351 : stream(stream), 352 waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {} 353 354 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {} 355 356 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {} 357 358 SpdySession::PushedStreamInfo::PushedStreamInfo( 359 SpdyStreamId stream_id, 360 base::TimeTicks creation_time) 361 : stream_id(stream_id), 362 creation_time(creation_time) {} 363 364 SpdySession::PushedStreamInfo::~PushedStreamInfo() {} 365 366 SpdySession::SpdySession( 367 const SpdySessionKey& spdy_session_key, 368 const base::WeakPtr<HttpServerProperties>& http_server_properties, 369 bool verify_domain_authentication, 370 bool enable_sending_initial_data, 371 bool enable_compression, 372 bool enable_ping_based_connection_checking, 373 NextProto default_protocol, 374 size_t stream_initial_recv_window_size, 375 size_t initial_max_concurrent_streams, 376 size_t max_concurrent_streams_limit, 377 TimeFunc time_func, 378 const HostPortPair& trusted_spdy_proxy, 379 NetLog* net_log) 380 : weak_factory_(this), 381 in_io_loop_(false), 382 spdy_session_key_(spdy_session_key), 383 pool_(NULL), 384 http_server_properties_(http_server_properties), 385 read_buffer_(new IOBuffer(kReadBufferSize)), 386 stream_hi_water_mark_(kFirstStreamId), 387 in_flight_write_frame_type_(DATA), 388 in_flight_write_frame_size_(0), 389 is_secure_(false), 390 certificate_error_code_(OK), 391 availability_state_(STATE_AVAILABLE), 392 read_state_(READ_STATE_DO_READ), 393 write_state_(WRITE_STATE_IDLE), 394 error_on_close_(OK), 395 max_concurrent_streams_(initial_max_concurrent_streams == 0 ? 396 kInitialMaxConcurrentStreams : 397 initial_max_concurrent_streams), 398 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ? 399 kMaxConcurrentStreamLimit : 400 max_concurrent_streams_limit), 401 streams_initiated_count_(0), 402 streams_pushed_count_(0), 403 streams_pushed_and_claimed_count_(0), 404 streams_abandoned_count_(0), 405 total_bytes_received_(0), 406 sent_settings_(false), 407 received_settings_(false), 408 stalled_streams_(0), 409 pings_in_flight_(0), 410 next_ping_id_(1), 411 last_activity_time_(time_func()), 412 last_compressed_frame_len_(0), 413 check_ping_status_pending_(false), 414 send_connection_header_prefix_(false), 415 flow_control_state_(FLOW_CONTROL_NONE), 416 stream_initial_send_window_size_(kSpdyStreamInitialWindowSize), 417 stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 ? 418 kDefaultInitialRecvWindowSize : 419 stream_initial_recv_window_size), 420 session_send_window_size_(0), 421 session_recv_window_size_(0), 422 session_unacked_recv_window_bytes_(0), 423 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)), 424 verify_domain_authentication_(verify_domain_authentication), 425 enable_sending_initial_data_(enable_sending_initial_data), 426 enable_compression_(enable_compression), 427 enable_ping_based_connection_checking_( 428 enable_ping_based_connection_checking), 429 protocol_(default_protocol), 430 connection_at_risk_of_loss_time_( 431 base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)), 432 hung_interval_( 433 base::TimeDelta::FromSeconds(kHungIntervalSeconds)), 434 trusted_spdy_proxy_(trusted_spdy_proxy), 435 time_func_(time_func) { 436 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion); 437 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion); 438 DCHECK(HttpStreamFactory::spdy_enabled()); 439 net_log_.BeginEvent( 440 NetLog::TYPE_SPDY_SESSION, 441 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair())); 442 next_unclaimed_push_stream_sweep_time_ = time_func_() + 443 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 444 // TODO(mbelshe): consider randomization of the stream_hi_water_mark. 445 } 446 447 SpdySession::~SpdySession() { 448 CHECK(!in_io_loop_); 449 DCHECK(!pool_); 450 DcheckClosed(); 451 452 // TODO(akalin): Check connection->is_initialized() instead. This 453 // requires re-working CreateFakeSpdySession(), though. 454 DCHECK(connection_->socket()); 455 // With SPDY we can't recycle sockets. 456 connection_->socket()->Disconnect(); 457 458 RecordHistograms(); 459 460 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); 461 } 462 463 Error SpdySession::InitializeWithSocket( 464 scoped_ptr<ClientSocketHandle> connection, 465 SpdySessionPool* pool, 466 bool is_secure, 467 int certificate_error_code) { 468 CHECK(!in_io_loop_); 469 DCHECK_EQ(availability_state_, STATE_AVAILABLE); 470 DCHECK_EQ(read_state_, READ_STATE_DO_READ); 471 DCHECK_EQ(write_state_, WRITE_STATE_IDLE); 472 DCHECK(!connection_); 473 474 DCHECK(certificate_error_code == OK || 475 certificate_error_code < ERR_IO_PENDING); 476 // TODO(akalin): Check connection->is_initialized() instead. This 477 // requires re-working CreateFakeSpdySession(), though. 478 DCHECK(connection->socket()); 479 480 base::StatsCounter spdy_sessions("spdy.sessions"); 481 spdy_sessions.Increment(); 482 483 connection_ = connection.Pass(); 484 is_secure_ = is_secure; 485 certificate_error_code_ = certificate_error_code; 486 487 NextProto protocol_negotiated = 488 connection_->socket()->GetNegotiatedProtocol(); 489 if (protocol_negotiated != kProtoUnknown) { 490 protocol_ = protocol_negotiated; 491 } 492 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion); 493 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion); 494 495 if (protocol_ == kProtoHTTP2Draft04) 496 send_connection_header_prefix_ = true; 497 498 if (protocol_ >= kProtoSPDY31) { 499 flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION; 500 session_send_window_size_ = kSpdySessionInitialWindowSize; 501 session_recv_window_size_ = kSpdySessionInitialWindowSize; 502 } else if (protocol_ >= kProtoSPDY3) { 503 flow_control_state_ = FLOW_CONTROL_STREAM; 504 } else { 505 flow_control_state_ = FLOW_CONTROL_NONE; 506 } 507 508 buffered_spdy_framer_.reset( 509 new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_), 510 enable_compression_)); 511 buffered_spdy_framer_->set_visitor(this); 512 buffered_spdy_framer_->set_debug_visitor(this); 513 UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion); 514 #if defined(SPDY_PROXY_AUTH_ORIGIN) 515 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy", 516 host_port_pair().Equals(HostPortPair::FromURL( 517 GURL(SPDY_PROXY_AUTH_ORIGIN)))); 518 #endif 519 520 net_log_.AddEvent( 521 NetLog::TYPE_SPDY_SESSION_INITIALIZED, 522 connection_->socket()->NetLog().source().ToEventParametersCallback()); 523 524 int error = DoReadLoop(READ_STATE_DO_READ, OK); 525 if (error == ERR_IO_PENDING) 526 error = OK; 527 if (error == OK) { 528 DCHECK_NE(availability_state_, STATE_CLOSED); 529 connection_->AddHigherLayeredPool(this); 530 if (enable_sending_initial_data_) 531 SendInitialData(); 532 pool_ = pool; 533 } else { 534 DcheckClosed(); 535 } 536 return static_cast<Error>(error); 537 } 538 539 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { 540 if (!verify_domain_authentication_) 541 return true; 542 543 if (availability_state_ == STATE_CLOSED) 544 return false; 545 546 SSLInfo ssl_info; 547 bool was_npn_negotiated; 548 NextProto protocol_negotiated = kProtoUnknown; 549 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated)) 550 return true; // This is not a secure session, so all domains are okay. 551 552 bool unused = false; 553 return 554 !ssl_info.client_cert_sent && 555 (!ssl_info.channel_id_sent || 556 (ServerBoundCertService::GetDomainForHost(domain) == 557 ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) && 558 ssl_info.cert->VerifyNameMatch(domain, &unused); 559 } 560 561 int SpdySession::GetPushStream( 562 const GURL& url, 563 base::WeakPtr<SpdyStream>* stream, 564 const BoundNetLog& stream_net_log) { 565 CHECK(!in_io_loop_); 566 567 stream->reset(); 568 569 // TODO(akalin): Add unit test exercising this code path. 570 if (availability_state_ == STATE_CLOSED) 571 return ERR_CONNECTION_CLOSED; 572 573 Error err = TryAccessStream(url); 574 if (err != OK) 575 return err; 576 577 *stream = GetActivePushStream(url); 578 if (*stream) { 579 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_); 580 streams_pushed_and_claimed_count_++; 581 } 582 return OK; 583 } 584 585 // {,Try}CreateStream() and TryAccessStream() can be called with 586 // |in_io_loop_| set if a stream is being created in response to 587 // another being closed due to received data. 588 589 Error SpdySession::TryAccessStream(const GURL& url) { 590 DCHECK_NE(availability_state_, STATE_CLOSED); 591 592 if (is_secure_ && certificate_error_code_ != OK && 593 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 594 RecordProtocolErrorHistogram( 595 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION); 596 CloseSessionResult result = DoCloseSession( 597 static_cast<Error>(certificate_error_code_), 598 "Tried to get SPDY stream for secure content over an unauthenticated " 599 "session."); 600 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); 601 return ERR_SPDY_PROTOCOL_ERROR; 602 } 603 return OK; 604 } 605 606 int SpdySession::TryCreateStream( 607 const base::WeakPtr<SpdyStreamRequest>& request, 608 base::WeakPtr<SpdyStream>* stream) { 609 DCHECK(request); 610 611 if (availability_state_ == STATE_GOING_AWAY) 612 return ERR_FAILED; 613 614 // TODO(akalin): Add unit test exercising this code path. 615 if (availability_state_ == STATE_CLOSED) 616 return ERR_CONNECTION_CLOSED; 617 618 Error err = TryAccessStream(request->url()); 619 if (err != OK) 620 return err; 621 622 if (!max_concurrent_streams_ || 623 (active_streams_.size() + created_streams_.size() < 624 max_concurrent_streams_)) { 625 return CreateStream(*request, stream); 626 } 627 628 stalled_streams_++; 629 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS); 630 RequestPriority priority = request->priority(); 631 CHECK_GE(priority, MINIMUM_PRIORITY); 632 CHECK_LE(priority, MAXIMUM_PRIORITY); 633 pending_create_stream_queues_[priority].push_back(request); 634 return ERR_IO_PENDING; 635 } 636 637 int SpdySession::CreateStream(const SpdyStreamRequest& request, 638 base::WeakPtr<SpdyStream>* stream) { 639 DCHECK_GE(request.priority(), MINIMUM_PRIORITY); 640 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY); 641 642 if (availability_state_ == STATE_GOING_AWAY) 643 return ERR_FAILED; 644 645 // TODO(akalin): Add unit test exercising this code path. 646 if (availability_state_ == STATE_CLOSED) 647 return ERR_CONNECTION_CLOSED; 648 649 Error err = TryAccessStream(request.url()); 650 if (err != OK) { 651 // This should have been caught in TryCreateStream(). 652 NOTREACHED(); 653 return err; 654 } 655 656 DCHECK(connection_->socket()); 657 DCHECK(connection_->socket()->IsConnected()); 658 if (connection_->socket()) { 659 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected", 660 connection_->socket()->IsConnected()); 661 if (!connection_->socket()->IsConnected()) { 662 CloseSessionResult result = DoCloseSession( 663 ERR_CONNECTION_CLOSED, 664 "Tried to create SPDY stream for a closed socket connection."); 665 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); 666 return ERR_CONNECTION_CLOSED; 667 } 668 } 669 670 scoped_ptr<SpdyStream> new_stream( 671 new SpdyStream(request.type(), GetWeakPtr(), request.url(), 672 request.priority(), 673 stream_initial_send_window_size_, 674 stream_initial_recv_window_size_, 675 request.net_log())); 676 *stream = new_stream->GetWeakPtr(); 677 InsertCreatedStream(new_stream.Pass()); 678 679 UMA_HISTOGRAM_CUSTOM_COUNTS( 680 "Net.SpdyPriorityCount", 681 static_cast<int>(request.priority()), 0, 10, 11); 682 683 return OK; 684 } 685 686 void SpdySession::CancelStreamRequest( 687 const base::WeakPtr<SpdyStreamRequest>& request) { 688 DCHECK(request); 689 RequestPriority priority = request->priority(); 690 CHECK_GE(priority, MINIMUM_PRIORITY); 691 CHECK_LE(priority, MAXIMUM_PRIORITY); 692 693 if (DCHECK_IS_ON()) { 694 // |request| should not be in a queue not matching its priority. 695 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { 696 if (priority == i) 697 continue; 698 PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i]; 699 DCHECK(std::find_if(queue->begin(), 700 queue->end(), 701 RequestEquals(request)) == queue->end()); 702 } 703 } 704 705 PendingStreamRequestQueue* queue = 706 &pending_create_stream_queues_[priority]; 707 // Remove |request| from |queue| while preserving the order of the 708 // other elements. 709 PendingStreamRequestQueue::iterator it = 710 std::find_if(queue->begin(), queue->end(), RequestEquals(request)); 711 // The request may already be removed if there's a 712 // CompleteStreamRequest() in flight. 713 if (it != queue->end()) { 714 it = queue->erase(it); 715 // |request| should be in the queue at most once, and if it is 716 // present, should not be pending completion. 717 DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) == 718 queue->end()); 719 } 720 } 721 722 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() { 723 for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) { 724 if (pending_create_stream_queues_[j].empty()) 725 continue; 726 727 base::WeakPtr<SpdyStreamRequest> pending_request = 728 pending_create_stream_queues_[j].front(); 729 DCHECK(pending_request); 730 pending_create_stream_queues_[j].pop_front(); 731 return pending_request; 732 } 733 return base::WeakPtr<SpdyStreamRequest>(); 734 } 735 736 void SpdySession::ProcessPendingStreamRequests() { 737 // Like |max_concurrent_streams_|, 0 means infinite for 738 // |max_requests_to_process|. 739 size_t max_requests_to_process = 0; 740 if (max_concurrent_streams_ != 0) { 741 max_requests_to_process = 742 max_concurrent_streams_ - 743 (active_streams_.size() + created_streams_.size()); 744 } 745 for (size_t i = 0; 746 max_requests_to_process == 0 || i < max_requests_to_process; ++i) { 747 base::WeakPtr<SpdyStreamRequest> pending_request = 748 GetNextPendingStreamRequest(); 749 if (!pending_request) 750 break; 751 752 base::MessageLoop::current()->PostTask( 753 FROM_HERE, 754 base::Bind(&SpdySession::CompleteStreamRequest, 755 weak_factory_.GetWeakPtr(), 756 pending_request)); 757 } 758 } 759 760 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) { 761 pooled_aliases_.insert(alias_key); 762 } 763 764 SpdyMajorVersion SpdySession::GetProtocolVersion() const { 765 DCHECK(buffered_spdy_framer_.get()); 766 return buffered_spdy_framer_->protocol_version(); 767 } 768 769 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() { 770 return weak_factory_.GetWeakPtr(); 771 } 772 773 bool SpdySession::CloseOneIdleConnection() { 774 CHECK(!in_io_loop_); 775 DCHECK_NE(availability_state_, STATE_CLOSED); 776 DCHECK(pool_); 777 if (!active_streams_.empty()) 778 return false; 779 CloseSessionResult result = 780 DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection."); 781 if (result != SESSION_CLOSED_AND_REMOVED) { 782 NOTREACHED(); 783 return false; 784 } 785 return true; 786 } 787 788 void SpdySession::EnqueueStreamWrite( 789 const base::WeakPtr<SpdyStream>& stream, 790 SpdyFrameType frame_type, 791 scoped_ptr<SpdyBufferProducer> producer) { 792 DCHECK(frame_type == HEADERS || 793 frame_type == DATA || 794 frame_type == CREDENTIAL || 795 frame_type == SYN_STREAM); 796 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream); 797 } 798 799 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( 800 SpdyStreamId stream_id, 801 RequestPriority priority, 802 uint8 credential_slot, 803 SpdyControlFlags flags, 804 const SpdyHeaderBlock& headers) { 805 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 806 CHECK(it != active_streams_.end()); 807 CHECK_EQ(it->second.stream->stream_id(), stream_id); 808 809 SendPrefacePingIfNoneInFlight(); 810 811 DCHECK(buffered_spdy_framer_.get()); 812 SpdyPriority spdy_priority = 813 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()); 814 scoped_ptr<SpdyFrame> syn_frame( 815 buffered_spdy_framer_->CreateSynStream( 816 stream_id, 0, spdy_priority, 817 credential_slot, flags, &headers)); 818 819 base::StatsCounter spdy_requests("spdy.requests"); 820 spdy_requests.Increment(); 821 streams_initiated_count_++; 822 823 if (net_log().IsLoggingAllEvents()) { 824 net_log().AddEvent( 825 NetLog::TYPE_SPDY_SESSION_SYN_STREAM, 826 base::Bind(&NetLogSpdySynStreamSentCallback, &headers, 827 (flags & CONTROL_FLAG_FIN) != 0, 828 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0, 829 spdy_priority, 830 stream_id)); 831 } 832 833 return syn_frame.Pass(); 834 } 835 836 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, 837 IOBuffer* data, 838 int len, 839 SpdyDataFlags flags) { 840 if (availability_state_ == STATE_CLOSED) { 841 NOTREACHED(); 842 return scoped_ptr<SpdyBuffer>(); 843 } 844 845 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 846 CHECK(it != active_streams_.end()); 847 SpdyStream* stream = it->second.stream; 848 CHECK_EQ(stream->stream_id(), stream_id); 849 850 if (len < 0) { 851 NOTREACHED(); 852 return scoped_ptr<SpdyBuffer>(); 853 } 854 855 int effective_len = std::min(len, kMaxSpdyFrameChunkSize); 856 857 bool send_stalled_by_stream = 858 (flow_control_state_ >= FLOW_CONTROL_STREAM) && 859 (stream->send_window_size() <= 0); 860 bool send_stalled_by_session = IsSendStalled(); 861 862 // NOTE: There's an enum of the same name in histograms.xml. 863 enum SpdyFrameFlowControlState { 864 SEND_NOT_STALLED, 865 SEND_STALLED_BY_STREAM, 866 SEND_STALLED_BY_SESSION, 867 SEND_STALLED_BY_STREAM_AND_SESSION, 868 }; 869 870 SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED; 871 if (send_stalled_by_stream) { 872 if (send_stalled_by_session) { 873 frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION; 874 } else { 875 frame_flow_control_state = SEND_STALLED_BY_STREAM; 876 } 877 } else if (send_stalled_by_session) { 878 frame_flow_control_state = SEND_STALLED_BY_SESSION; 879 } 880 881 if (flow_control_state_ == FLOW_CONTROL_STREAM) { 882 UMA_HISTOGRAM_ENUMERATION( 883 "Net.SpdyFrameStreamFlowControlState", 884 frame_flow_control_state, 885 SEND_STALLED_BY_STREAM + 1); 886 } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 887 UMA_HISTOGRAM_ENUMERATION( 888 "Net.SpdyFrameStreamAndSessionFlowControlState", 889 frame_flow_control_state, 890 SEND_STALLED_BY_STREAM_AND_SESSION + 1); 891 } 892 893 // Obey send window size of the stream if stream flow control is 894 // enabled. 895 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { 896 if (send_stalled_by_stream) { 897 stream->set_send_stalled_by_flow_control(true); 898 // Even though we're currently stalled only by the stream, we 899 // might end up being stalled by the session also. 900 QueueSendStalledStream(*stream); 901 net_log().AddEvent( 902 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW, 903 NetLog::IntegerCallback("stream_id", stream_id)); 904 return scoped_ptr<SpdyBuffer>(); 905 } 906 907 effective_len = std::min(effective_len, stream->send_window_size()); 908 } 909 910 // Obey send window size of the session if session flow control is 911 // enabled. 912 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 913 if (send_stalled_by_session) { 914 stream->set_send_stalled_by_flow_control(true); 915 QueueSendStalledStream(*stream); 916 net_log().AddEvent( 917 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW, 918 NetLog::IntegerCallback("stream_id", stream_id)); 919 return scoped_ptr<SpdyBuffer>(); 920 } 921 922 effective_len = std::min(effective_len, session_send_window_size_); 923 } 924 925 DCHECK_GE(effective_len, 0); 926 927 // Clear FIN flag if only some of the data will be in the data 928 // frame. 929 if (effective_len < len) 930 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 931 932 if (net_log().IsLoggingAllEvents()) { 933 net_log().AddEvent( 934 NetLog::TYPE_SPDY_SESSION_SEND_DATA, 935 base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len, 936 (flags & DATA_FLAG_FIN) != 0)); 937 } 938 939 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. 940 if (effective_len > 0) 941 SendPrefacePingIfNoneInFlight(); 942 943 // TODO(mbelshe): reduce memory copies here. 944 DCHECK(buffered_spdy_framer_.get()); 945 scoped_ptr<SpdyFrame> frame( 946 buffered_spdy_framer_->CreateDataFrame( 947 stream_id, data->data(), 948 static_cast<uint32>(effective_len), flags)); 949 950 scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass())); 951 952 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 953 DecreaseSendWindowSize(static_cast<int32>(effective_len)); 954 data_buffer->AddConsumeCallback( 955 base::Bind(&SpdySession::OnWriteBufferConsumed, 956 weak_factory_.GetWeakPtr(), 957 static_cast<size_t>(effective_len))); 958 } 959 960 return data_buffer.Pass(); 961 } 962 963 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) { 964 DCHECK_NE(stream_id, 0u); 965 966 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 967 if (it == active_streams_.end()) { 968 NOTREACHED(); 969 return; 970 } 971 972 CloseActiveStreamIterator(it, status); 973 } 974 975 void SpdySession::CloseCreatedStream( 976 const base::WeakPtr<SpdyStream>& stream, int status) { 977 DCHECK_EQ(stream->stream_id(), 0u); 978 979 CreatedStreamSet::iterator it = created_streams_.find(stream.get()); 980 if (it == created_streams_.end()) { 981 NOTREACHED(); 982 return; 983 } 984 985 CloseCreatedStreamIterator(it, status); 986 } 987 988 void SpdySession::ResetStream(SpdyStreamId stream_id, 989 SpdyRstStreamStatus status, 990 const std::string& description) { 991 DCHECK_NE(stream_id, 0u); 992 993 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 994 if (it == active_streams_.end()) { 995 NOTREACHED(); 996 return; 997 } 998 999 ResetStreamIterator(it, status, description); 1000 } 1001 1002 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { 1003 return ContainsKey(active_streams_, stream_id); 1004 } 1005 1006 LoadState SpdySession::GetLoadState() const { 1007 // Just report that we're idle since the session could be doing 1008 // many things concurrently. 1009 return LOAD_STATE_IDLE; 1010 } 1011 1012 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it, 1013 int status) { 1014 // TODO(mbelshe): We should send a RST_STREAM control frame here 1015 // so that the server can cancel a large send. 1016 1017 scoped_ptr<SpdyStream> owned_stream(it->second.stream); 1018 active_streams_.erase(it); 1019 1020 // TODO(akalin): When SpdyStream was ref-counted (and 1021 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this 1022 // was only done when status was not OK. This meant that pushed 1023 // streams can still be claimed after they're closed. This is 1024 // probably something that we still want to support, although server 1025 // push is hardly used. Write tests for this and fix this. (See 1026 // http://crbug.com/261712 .) 1027 if (owned_stream->type() == SPDY_PUSH_STREAM) 1028 unclaimed_pushed_streams_.erase(owned_stream->url()); 1029 1030 base::WeakPtr<SpdySession> weak_this = GetWeakPtr(); 1031 1032 DeleteStream(owned_stream.Pass(), status); 1033 1034 if (!weak_this) 1035 return; 1036 1037 if (availability_state_ == STATE_CLOSED) 1038 return; 1039 1040 // If there are no active streams and the socket pool is stalled, close the 1041 // session to free up a socket slot. 1042 if (active_streams_.empty() && connection_->IsPoolStalled()) { 1043 CloseSessionResult result = 1044 DoCloseSession(ERR_CONNECTION_CLOSED, "Closing idle connection."); 1045 DCHECK_NE(result, SESSION_ALREADY_CLOSED); 1046 } 1047 } 1048 1049 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it, 1050 int status) { 1051 scoped_ptr<SpdyStream> owned_stream(*it); 1052 created_streams_.erase(it); 1053 DeleteStream(owned_stream.Pass(), status); 1054 } 1055 1056 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it, 1057 SpdyRstStreamStatus status, 1058 const std::string& description) { 1059 // Send the RST_STREAM frame first as CloseActiveStreamIterator() 1060 // may close us. 1061 SpdyStreamId stream_id = it->first; 1062 RequestPriority priority = it->second.stream->priority(); 1063 EnqueueResetStreamFrame(stream_id, priority, status, description); 1064 1065 // Removes any pending writes for the stream except for possibly an 1066 // in-flight one. 1067 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 1068 } 1069 1070 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id, 1071 RequestPriority priority, 1072 SpdyRstStreamStatus status, 1073 const std::string& description) { 1074 DCHECK_NE(stream_id, 0u); 1075 1076 net_log().AddEvent( 1077 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM, 1078 base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description)); 1079 1080 DCHECK(buffered_spdy_framer_.get()); 1081 scoped_ptr<SpdyFrame> rst_frame( 1082 buffered_spdy_framer_->CreateRstStream(stream_id, status)); 1083 1084 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass()); 1085 RecordProtocolErrorHistogram( 1086 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); 1087 } 1088 1089 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) { 1090 CHECK(!in_io_loop_); 1091 DCHECK_NE(availability_state_, STATE_CLOSED); 1092 DCHECK_EQ(read_state_, expected_read_state); 1093 1094 result = DoReadLoop(expected_read_state, result); 1095 1096 if (availability_state_ == STATE_CLOSED) { 1097 DCHECK_EQ(result, error_on_close_); 1098 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1099 RemoveFromPool(); 1100 return; 1101 } 1102 1103 DCHECK(result == OK || result == ERR_IO_PENDING); 1104 } 1105 1106 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { 1107 CHECK(!in_io_loop_); 1108 DCHECK_NE(availability_state_, STATE_CLOSED); 1109 DCHECK_EQ(read_state_, expected_read_state); 1110 1111 in_io_loop_ = true; 1112 1113 int bytes_read_without_yielding = 0; 1114 1115 // Loop until the session is closed, the read becomes blocked, or 1116 // the read limit is exceeded. 1117 while (true) { 1118 switch (read_state_) { 1119 case READ_STATE_DO_READ: 1120 DCHECK_EQ(result, OK); 1121 result = DoRead(); 1122 break; 1123 case READ_STATE_DO_READ_COMPLETE: 1124 if (result > 0) 1125 bytes_read_without_yielding += result; 1126 result = DoReadComplete(result); 1127 break; 1128 default: 1129 NOTREACHED() << "read_state_: " << read_state_; 1130 break; 1131 } 1132 1133 if (availability_state_ == STATE_CLOSED) { 1134 DCHECK_EQ(result, error_on_close_); 1135 DCHECK_LT(result, ERR_IO_PENDING); 1136 break; 1137 } 1138 1139 if (result == ERR_IO_PENDING) 1140 break; 1141 1142 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) { 1143 read_state_ = READ_STATE_DO_READ; 1144 base::MessageLoop::current()->PostTask( 1145 FROM_HERE, 1146 base::Bind(&SpdySession::PumpReadLoop, 1147 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 1148 result = ERR_IO_PENDING; 1149 break; 1150 } 1151 } 1152 1153 CHECK(in_io_loop_); 1154 in_io_loop_ = false; 1155 1156 return result; 1157 } 1158 1159 int SpdySession::DoRead() { 1160 CHECK(in_io_loop_); 1161 DCHECK_NE(availability_state_, STATE_CLOSED); 1162 1163 CHECK(connection_); 1164 CHECK(connection_->socket()); 1165 read_state_ = READ_STATE_DO_READ_COMPLETE; 1166 return connection_->socket()->Read( 1167 read_buffer_.get(), 1168 kReadBufferSize, 1169 base::Bind(&SpdySession::PumpReadLoop, 1170 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE)); 1171 } 1172 1173 int SpdySession::DoReadComplete(int result) { 1174 CHECK(in_io_loop_); 1175 DCHECK_NE(availability_state_, STATE_CLOSED); 1176 1177 // Parse a frame. For now this code requires that the frame fit into our 1178 // buffer (kReadBufferSize). 1179 // TODO(mbelshe): support arbitrarily large frames! 1180 1181 if (result == 0) { 1182 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", 1183 total_bytes_received_, 1, 100000000, 50); 1184 CloseSessionResult close_session_result = 1185 DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed"); 1186 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); 1187 DCHECK_EQ(availability_state_, STATE_CLOSED); 1188 DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED); 1189 return ERR_CONNECTION_CLOSED; 1190 } 1191 1192 if (result < 0) { 1193 CloseSessionResult close_session_result = 1194 DoCloseSession(static_cast<Error>(result), "result is < 0."); 1195 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); 1196 DCHECK_EQ(availability_state_, STATE_CLOSED); 1197 DCHECK_EQ(error_on_close_, result); 1198 return result; 1199 } 1200 1201 total_bytes_received_ += result; 1202 1203 last_activity_time_ = time_func_(); 1204 1205 DCHECK(buffered_spdy_framer_.get()); 1206 char* data = read_buffer_->data(); 1207 while (result > 0) { 1208 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result); 1209 result -= bytes_processed; 1210 data += bytes_processed; 1211 1212 if (availability_state_ == STATE_CLOSED) { 1213 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1214 return error_on_close_; 1215 } 1216 1217 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); 1218 } 1219 1220 read_state_ = READ_STATE_DO_READ; 1221 return OK; 1222 } 1223 1224 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) { 1225 CHECK(!in_io_loop_); 1226 DCHECK_NE(availability_state_, STATE_CLOSED); 1227 DCHECK_EQ(write_state_, expected_write_state); 1228 1229 result = DoWriteLoop(expected_write_state, result); 1230 1231 if (availability_state_ == STATE_CLOSED) { 1232 DCHECK_EQ(result, error_on_close_); 1233 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1234 RemoveFromPool(); 1235 return; 1236 } 1237 1238 DCHECK(result == OK || result == ERR_IO_PENDING); 1239 } 1240 1241 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { 1242 CHECK(!in_io_loop_); 1243 DCHECK_NE(availability_state_, STATE_CLOSED); 1244 DCHECK_NE(write_state_, WRITE_STATE_IDLE); 1245 DCHECK_EQ(write_state_, expected_write_state); 1246 1247 in_io_loop_ = true; 1248 1249 // Loop until the session is closed or the write becomes blocked. 1250 while (true) { 1251 switch (write_state_) { 1252 case WRITE_STATE_DO_WRITE: 1253 DCHECK_EQ(result, OK); 1254 result = DoWrite(); 1255 break; 1256 case WRITE_STATE_DO_WRITE_COMPLETE: 1257 result = DoWriteComplete(result); 1258 break; 1259 case WRITE_STATE_IDLE: 1260 default: 1261 NOTREACHED() << "write_state_: " << write_state_; 1262 break; 1263 } 1264 1265 if (availability_state_ == STATE_CLOSED) { 1266 DCHECK_EQ(result, error_on_close_); 1267 DCHECK_LT(result, ERR_IO_PENDING); 1268 break; 1269 } 1270 1271 if (write_state_ == WRITE_STATE_IDLE) { 1272 DCHECK_EQ(result, ERR_IO_PENDING); 1273 break; 1274 } 1275 1276 if (result == ERR_IO_PENDING) 1277 break; 1278 } 1279 1280 CHECK(in_io_loop_); 1281 in_io_loop_ = false; 1282 1283 return result; 1284 } 1285 1286 int SpdySession::DoWrite() { 1287 CHECK(in_io_loop_); 1288 DCHECK_NE(availability_state_, STATE_CLOSED); 1289 1290 DCHECK(buffered_spdy_framer_); 1291 if (in_flight_write_) { 1292 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1293 } else { 1294 // Grab the next frame to send. 1295 SpdyFrameType frame_type = DATA; 1296 scoped_ptr<SpdyBufferProducer> producer; 1297 base::WeakPtr<SpdyStream> stream; 1298 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) { 1299 write_state_ = WRITE_STATE_IDLE; 1300 return ERR_IO_PENDING; 1301 } 1302 1303 if (stream.get()) 1304 DCHECK(!stream->IsClosed()); 1305 1306 // Activate the stream only when sending the SYN_STREAM frame to 1307 // guarantee monotonically-increasing stream IDs. 1308 if (frame_type == SYN_STREAM) { 1309 if (stream.get() && stream->stream_id() == 0) { 1310 scoped_ptr<SpdyStream> owned_stream = 1311 ActivateCreatedStream(stream.get()); 1312 InsertActivatedStream(owned_stream.Pass()); 1313 } else { 1314 NOTREACHED(); 1315 return ERR_UNEXPECTED; 1316 } 1317 } 1318 1319 in_flight_write_ = producer->ProduceBuffer(); 1320 if (!in_flight_write_) { 1321 NOTREACHED(); 1322 return ERR_UNEXPECTED; 1323 } 1324 in_flight_write_frame_type_ = frame_type; 1325 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); 1326 DCHECK_GE(in_flight_write_frame_size_, 1327 buffered_spdy_framer_->GetFrameMinimumSize()); 1328 in_flight_write_stream_ = stream; 1329 } 1330 1331 write_state_ = WRITE_STATE_DO_WRITE_COMPLETE; 1332 1333 // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems 1334 // with Socket implementations that don't store their IOBuffer 1335 // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). 1336 scoped_refptr<IOBuffer> write_io_buffer = 1337 in_flight_write_->GetIOBufferForRemainingData(); 1338 return connection_->socket()->Write( 1339 write_io_buffer.get(), 1340 in_flight_write_->GetRemainingSize(), 1341 base::Bind(&SpdySession::PumpWriteLoop, 1342 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE)); 1343 } 1344 1345 int SpdySession::DoWriteComplete(int result) { 1346 CHECK(in_io_loop_); 1347 DCHECK_NE(availability_state_, STATE_CLOSED); 1348 DCHECK_NE(result, ERR_IO_PENDING); 1349 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1350 1351 last_activity_time_ = time_func_(); 1352 1353 if (result < 0) { 1354 DCHECK_NE(result, ERR_IO_PENDING); 1355 in_flight_write_.reset(); 1356 in_flight_write_frame_type_ = DATA; 1357 in_flight_write_frame_size_ = 0; 1358 in_flight_write_stream_.reset(); 1359 CloseSessionResult close_session_result = 1360 DoCloseSession(static_cast<Error>(result), "Write error"); 1361 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); 1362 DCHECK_EQ(availability_state_, STATE_CLOSED); 1363 DCHECK_EQ(error_on_close_, result); 1364 return result; 1365 } 1366 1367 // It should not be possible to have written more bytes than our 1368 // in_flight_write_. 1369 DCHECK_LE(static_cast<size_t>(result), 1370 in_flight_write_->GetRemainingSize()); 1371 1372 if (result > 0) { 1373 in_flight_write_->Consume(static_cast<size_t>(result)); 1374 1375 // We only notify the stream when we've fully written the pending frame. 1376 if (in_flight_write_->GetRemainingSize() == 0) { 1377 // It is possible that the stream was cancelled while we were 1378 // writing to the socket. 1379 if (in_flight_write_stream_.get()) { 1380 DCHECK_GT(in_flight_write_frame_size_, 0u); 1381 in_flight_write_stream_->OnFrameWriteComplete( 1382 in_flight_write_frame_type_, 1383 in_flight_write_frame_size_); 1384 } 1385 1386 // Cleanup the write which just completed. 1387 in_flight_write_.reset(); 1388 in_flight_write_frame_type_ = DATA; 1389 in_flight_write_frame_size_ = 0; 1390 in_flight_write_stream_.reset(); 1391 } 1392 } 1393 1394 write_state_ = WRITE_STATE_DO_WRITE; 1395 return OK; 1396 } 1397 1398 void SpdySession::DcheckGoingAway() const { 1399 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1400 if (DCHECK_IS_ON()) { 1401 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { 1402 DCHECK(pending_create_stream_queues_[i].empty()); 1403 } 1404 } 1405 DCHECK(created_streams_.empty()); 1406 } 1407 1408 void SpdySession::DcheckClosed() const { 1409 DcheckGoingAway(); 1410 DCHECK_EQ(availability_state_, STATE_CLOSED); 1411 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1412 DCHECK(active_streams_.empty()); 1413 DCHECK(unclaimed_pushed_streams_.empty()); 1414 DCHECK(write_queue_.IsEmpty()); 1415 } 1416 1417 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id, 1418 Error status) { 1419 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1420 1421 // The loops below are carefully written to avoid reentrancy problems. 1422 1423 while (true) { 1424 size_t old_size = GetTotalSize(pending_create_stream_queues_); 1425 base::WeakPtr<SpdyStreamRequest> pending_request = 1426 GetNextPendingStreamRequest(); 1427 if (!pending_request) 1428 break; 1429 // No new stream requests should be added while the session is 1430 // going away. 1431 DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_)); 1432 pending_request->OnRequestCompleteFailure(ERR_ABORTED); 1433 } 1434 1435 while (true) { 1436 size_t old_size = active_streams_.size(); 1437 ActiveStreamMap::iterator it = 1438 active_streams_.lower_bound(last_good_stream_id + 1); 1439 if (it == active_streams_.end()) 1440 break; 1441 LogAbandonedActiveStream(it, status); 1442 CloseActiveStreamIterator(it, status); 1443 // No new streams should be activated while the session is going 1444 // away. 1445 DCHECK_GT(old_size, active_streams_.size()); 1446 } 1447 1448 while (!created_streams_.empty()) { 1449 size_t old_size = created_streams_.size(); 1450 CreatedStreamSet::iterator it = created_streams_.begin(); 1451 LogAbandonedStream(*it, status); 1452 CloseCreatedStreamIterator(it, status); 1453 // No new streams should be created while the session is going 1454 // away. 1455 DCHECK_GT(old_size, created_streams_.size()); 1456 } 1457 1458 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id); 1459 1460 DcheckGoingAway(); 1461 } 1462 1463 void SpdySession::MaybeFinishGoingAway() { 1464 DcheckGoingAway(); 1465 if (active_streams_.empty() && availability_state_ != STATE_CLOSED) { 1466 CloseSessionResult result = 1467 DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away"); 1468 DCHECK_NE(result, SESSION_ALREADY_CLOSED); 1469 } 1470 } 1471 1472 SpdySession::CloseSessionResult SpdySession::DoCloseSession( 1473 Error err, 1474 const std::string& description) { 1475 DCHECK_LT(err, ERR_IO_PENDING); 1476 1477 if (availability_state_ == STATE_CLOSED) 1478 return SESSION_ALREADY_CLOSED; 1479 1480 net_log_.AddEvent( 1481 NetLog::TYPE_SPDY_SESSION_CLOSE, 1482 base::Bind(&NetLogSpdySessionCloseCallback, err, &description)); 1483 1484 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err); 1485 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", 1486 total_bytes_received_, 1, 100000000, 50); 1487 1488 // |pool_| will be NULL when |InitializeWithSocket()| is in the 1489 // call stack. 1490 if (pool_ && availability_state_ != STATE_GOING_AWAY) 1491 pool_->MakeSessionUnavailable(GetWeakPtr()); 1492 1493 availability_state_ = STATE_CLOSED; 1494 error_on_close_ = err; 1495 1496 StartGoingAway(0, err); 1497 write_queue_.Clear(); 1498 1499 DcheckClosed(); 1500 1501 if (in_io_loop_) 1502 return SESSION_CLOSED_BUT_NOT_REMOVED; 1503 1504 RemoveFromPool(); 1505 return SESSION_CLOSED_AND_REMOVED; 1506 } 1507 1508 void SpdySession::RemoveFromPool() { 1509 DcheckClosed(); 1510 CHECK(pool_); 1511 1512 SpdySessionPool* pool = pool_; 1513 pool_ = NULL; 1514 pool->RemoveUnavailableSession(GetWeakPtr()); 1515 } 1516 1517 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) { 1518 DCHECK(stream); 1519 std::string description = base::StringPrintf( 1520 "ABANDONED (stream_id=%d): ", stream->stream_id()) + 1521 stream->url().spec(); 1522 stream->LogStreamError(status, description); 1523 // We don't increment the streams abandoned counter here. If the 1524 // stream isn't active (i.e., it hasn't written anything to the wire 1525 // yet) then it's as if it never existed. If it is active, then 1526 // LogAbandonedActiveStream() will increment the counters. 1527 } 1528 1529 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it, 1530 Error status) { 1531 DCHECK_GT(it->first, 0u); 1532 LogAbandonedStream(it->second.stream, status); 1533 ++streams_abandoned_count_; 1534 base::StatsCounter abandoned_streams("spdy.abandoned_streams"); 1535 abandoned_streams.Increment(); 1536 if (it->second.stream->type() == SPDY_PUSH_STREAM && 1537 unclaimed_pushed_streams_.find(it->second.stream->url()) != 1538 unclaimed_pushed_streams_.end()) { 1539 base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams"); 1540 abandoned_push_streams.Increment(); 1541 } 1542 } 1543 1544 int SpdySession::GetNewStreamId() { 1545 int id = stream_hi_water_mark_; 1546 stream_hi_water_mark_ += 2; 1547 if (stream_hi_water_mark_ > 0x7fff) 1548 stream_hi_water_mark_ = 1; 1549 return id; 1550 } 1551 1552 void SpdySession::CloseSessionOnError(Error err, 1553 const std::string& description) { 1554 // We may be called from anywhere, so we can't expect a particular 1555 // return value. 1556 ignore_result(DoCloseSession(err, description)); 1557 } 1558 1559 base::Value* SpdySession::GetInfoAsValue() const { 1560 base::DictionaryValue* dict = new base::DictionaryValue(); 1561 1562 dict->SetInteger("source_id", net_log_.source().id); 1563 1564 dict->SetString("host_port_pair", host_port_pair().ToString()); 1565 if (!pooled_aliases_.empty()) { 1566 base::ListValue* alias_list = new base::ListValue(); 1567 for (std::set<SpdySessionKey>::const_iterator it = 1568 pooled_aliases_.begin(); 1569 it != pooled_aliases_.end(); it++) { 1570 alias_list->Append(new base::StringValue( 1571 it->host_port_pair().ToString())); 1572 } 1573 dict->Set("aliases", alias_list); 1574 } 1575 dict->SetString("proxy", host_port_proxy_pair().second.ToURI()); 1576 1577 dict->SetInteger("active_streams", active_streams_.size()); 1578 1579 dict->SetInteger("unclaimed_pushed_streams", 1580 unclaimed_pushed_streams_.size()); 1581 1582 dict->SetBoolean("is_secure", is_secure_); 1583 1584 dict->SetString("protocol_negotiated", 1585 SSLClientSocket::NextProtoToString( 1586 connection_->socket()->GetNegotiatedProtocol())); 1587 1588 dict->SetInteger("error", error_on_close_); 1589 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_); 1590 1591 dict->SetInteger("streams_initiated_count", streams_initiated_count_); 1592 dict->SetInteger("streams_pushed_count", streams_pushed_count_); 1593 dict->SetInteger("streams_pushed_and_claimed_count", 1594 streams_pushed_and_claimed_count_); 1595 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_); 1596 DCHECK(buffered_spdy_framer_.get()); 1597 dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received()); 1598 1599 dict->SetBoolean("sent_settings", sent_settings_); 1600 dict->SetBoolean("received_settings", received_settings_); 1601 1602 dict->SetInteger("send_window_size", session_send_window_size_); 1603 dict->SetInteger("recv_window_size", session_recv_window_size_); 1604 dict->SetInteger("unacked_recv_window_bytes", 1605 session_unacked_recv_window_bytes_); 1606 return dict; 1607 } 1608 1609 bool SpdySession::IsReused() const { 1610 return buffered_spdy_framer_->frames_received() > 0; 1611 } 1612 1613 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id, 1614 LoadTimingInfo* load_timing_info) const { 1615 return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId, 1616 load_timing_info); 1617 } 1618 1619 int SpdySession::GetPeerAddress(IPEndPoint* address) const { 1620 int rv = ERR_SOCKET_NOT_CONNECTED; 1621 if (connection_->socket()) { 1622 rv = connection_->socket()->GetPeerAddress(address); 1623 } 1624 1625 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress", 1626 rv == ERR_SOCKET_NOT_CONNECTED); 1627 1628 return rv; 1629 } 1630 1631 int SpdySession::GetLocalAddress(IPEndPoint* address) const { 1632 int rv = ERR_SOCKET_NOT_CONNECTED; 1633 if (connection_->socket()) { 1634 rv = connection_->socket()->GetLocalAddress(address); 1635 } 1636 1637 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress", 1638 rv == ERR_SOCKET_NOT_CONNECTED); 1639 1640 return rv; 1641 } 1642 1643 void SpdySession::EnqueueSessionWrite(RequestPriority priority, 1644 SpdyFrameType frame_type, 1645 scoped_ptr<SpdyFrame> frame) { 1646 DCHECK(frame_type == RST_STREAM || 1647 frame_type == SETTINGS || 1648 frame_type == WINDOW_UPDATE || 1649 frame_type == PING); 1650 EnqueueWrite( 1651 priority, frame_type, 1652 scoped_ptr<SpdyBufferProducer>( 1653 new SimpleBufferProducer( 1654 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), 1655 base::WeakPtr<SpdyStream>()); 1656 } 1657 1658 void SpdySession::EnqueueWrite(RequestPriority priority, 1659 SpdyFrameType frame_type, 1660 scoped_ptr<SpdyBufferProducer> producer, 1661 const base::WeakPtr<SpdyStream>& stream) { 1662 if (availability_state_ == STATE_CLOSED) 1663 return; 1664 1665 bool was_idle = write_queue_.IsEmpty(); 1666 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); 1667 if (write_state_ == WRITE_STATE_IDLE) { 1668 DCHECK(was_idle); 1669 DCHECK(!in_flight_write_); 1670 write_state_ = WRITE_STATE_DO_WRITE; 1671 base::MessageLoop::current()->PostTask( 1672 FROM_HERE, 1673 base::Bind(&SpdySession::PumpWriteLoop, 1674 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK)); 1675 } 1676 } 1677 1678 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { 1679 DCHECK_EQ(stream->stream_id(), 0u); 1680 DCHECK(created_streams_.find(stream.get()) == created_streams_.end()); 1681 created_streams_.insert(stream.release()); 1682 } 1683 1684 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) { 1685 DCHECK_EQ(stream->stream_id(), 0u); 1686 DCHECK(created_streams_.find(stream) != created_streams_.end()); 1687 stream->set_stream_id(GetNewStreamId()); 1688 scoped_ptr<SpdyStream> owned_stream(stream); 1689 created_streams_.erase(stream); 1690 return owned_stream.Pass(); 1691 } 1692 1693 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) { 1694 SpdyStreamId stream_id = stream->stream_id(); 1695 DCHECK_NE(stream_id, 0u); 1696 std::pair<ActiveStreamMap::iterator, bool> result = 1697 active_streams_.insert( 1698 std::make_pair(stream_id, ActiveStreamInfo(stream.get()))); 1699 if (result.second) { 1700 ignore_result(stream.release()); 1701 } else { 1702 NOTREACHED(); 1703 } 1704 } 1705 1706 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) { 1707 if (in_flight_write_stream_.get() == stream.get()) { 1708 // If we're deleting the stream for the in-flight write, we still 1709 // need to let the write complete, so we clear 1710 // |in_flight_write_stream_| and let the write finish on its own 1711 // without notifying |in_flight_write_stream_|. 1712 in_flight_write_stream_.reset(); 1713 } 1714 1715 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr()); 1716 1717 // |stream->OnClose()| may end up closing |this|, so detect that. 1718 base::WeakPtr<SpdySession> weak_this = GetWeakPtr(); 1719 1720 stream->OnClose(status); 1721 1722 if (!weak_this) 1723 return; 1724 1725 switch (availability_state_) { 1726 case STATE_AVAILABLE: 1727 ProcessPendingStreamRequests(); 1728 break; 1729 case STATE_GOING_AWAY: 1730 DcheckGoingAway(); 1731 MaybeFinishGoingAway(); 1732 break; 1733 case STATE_CLOSED: 1734 // Do nothing. 1735 break; 1736 } 1737 } 1738 1739 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) { 1740 base::StatsCounter used_push_streams("spdy.claimed_push_streams"); 1741 1742 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url); 1743 if (unclaimed_it == unclaimed_pushed_streams_.end()) 1744 return base::WeakPtr<SpdyStream>(); 1745 1746 SpdyStreamId stream_id = unclaimed_it->second.stream_id; 1747 unclaimed_pushed_streams_.erase(unclaimed_it); 1748 1749 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 1750 if (active_it == active_streams_.end()) { 1751 NOTREACHED(); 1752 return base::WeakPtr<SpdyStream>(); 1753 } 1754 1755 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM); 1756 used_push_streams.Increment(); 1757 return active_it->second.stream->GetWeakPtr(); 1758 } 1759 1760 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, 1761 bool* was_npn_negotiated, 1762 NextProto* protocol_negotiated) { 1763 *was_npn_negotiated = connection_->socket()->WasNpnNegotiated(); 1764 *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol(); 1765 return connection_->socket()->GetSSLInfo(ssl_info); 1766 } 1767 1768 bool SpdySession::GetSSLCertRequestInfo( 1769 SSLCertRequestInfo* cert_request_info) { 1770 if (!is_secure_) 1771 return false; 1772 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info); 1773 return true; 1774 } 1775 1776 void SpdySession::OnError(SpdyFramer::SpdyError error_code) { 1777 CHECK(in_io_loop_); 1778 1779 if (availability_state_ == STATE_CLOSED) 1780 return; 1781 1782 RecordProtocolErrorHistogram( 1783 static_cast<SpdyProtocolErrorDetails>(error_code)); 1784 std::string description = base::StringPrintf( 1785 "SPDY_ERROR error_code: %d.", error_code); 1786 CloseSessionResult result = 1787 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description); 1788 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 1789 } 1790 1791 void SpdySession::OnStreamError(SpdyStreamId stream_id, 1792 const std::string& description) { 1793 CHECK(in_io_loop_); 1794 1795 if (availability_state_ == STATE_CLOSED) 1796 return; 1797 1798 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1799 if (it == active_streams_.end()) { 1800 // We still want to send a frame to reset the stream even if we 1801 // don't know anything about it. 1802 EnqueueResetStreamFrame( 1803 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description); 1804 return; 1805 } 1806 1807 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description); 1808 } 1809 1810 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id, 1811 size_t length, 1812 bool fin) { 1813 CHECK(in_io_loop_); 1814 1815 if (availability_state_ == STATE_CLOSED) 1816 return; 1817 1818 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1819 1820 // By the time data comes in, the stream may already be inactive. 1821 if (it == active_streams_.end()) 1822 return; 1823 1824 SpdyStream* stream = it->second.stream; 1825 CHECK_EQ(stream->stream_id(), stream_id); 1826 1827 DCHECK(buffered_spdy_framer_); 1828 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize(); 1829 stream->IncrementRawReceivedBytes(header_len); 1830 } 1831 1832 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, 1833 const char* data, 1834 size_t len, 1835 bool fin) { 1836 CHECK(in_io_loop_); 1837 1838 if (availability_state_ == STATE_CLOSED) 1839 return; 1840 1841 DCHECK_LT(len, 1u << 24); 1842 if (net_log().IsLoggingAllEvents()) { 1843 net_log().AddEvent( 1844 NetLog::TYPE_SPDY_SESSION_RECV_DATA, 1845 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); 1846 } 1847 1848 // Build the buffer as early as possible so that we go through the 1849 // session flow control checks and update 1850 // |unacked_recv_window_bytes_| properly even when the stream is 1851 // inactive (since the other side has still reduced its session send 1852 // window). 1853 scoped_ptr<SpdyBuffer> buffer; 1854 if (data) { 1855 DCHECK_GT(len, 0u); 1856 buffer.reset(new SpdyBuffer(data, len)); 1857 1858 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 1859 DecreaseRecvWindowSize(static_cast<int32>(len)); 1860 buffer->AddConsumeCallback( 1861 base::Bind(&SpdySession::OnReadBufferConsumed, 1862 weak_factory_.GetWeakPtr())); 1863 } 1864 } else { 1865 DCHECK_EQ(len, 0u); 1866 } 1867 1868 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1869 1870 // By the time data comes in, the stream may already be inactive. 1871 if (it == active_streams_.end()) 1872 return; 1873 1874 SpdyStream* stream = it->second.stream; 1875 CHECK_EQ(stream->stream_id(), stream_id); 1876 1877 stream->IncrementRawReceivedBytes(len); 1878 1879 if (it->second.waiting_for_syn_reply) { 1880 const std::string& error = "Data received before SYN_REPLY."; 1881 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 1882 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 1883 return; 1884 } 1885 1886 stream->OnDataReceived(buffer.Pass()); 1887 } 1888 1889 void SpdySession::OnSettings(bool clear_persisted) { 1890 CHECK(in_io_loop_); 1891 1892 if (availability_state_ == STATE_CLOSED) 1893 return; 1894 1895 if (clear_persisted) 1896 http_server_properties_->ClearSpdySettings(host_port_pair()); 1897 1898 if (net_log_.IsLoggingAllEvents()) { 1899 net_log_.AddEvent( 1900 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS, 1901 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(), 1902 clear_persisted)); 1903 } 1904 } 1905 1906 void SpdySession::OnSetting(SpdySettingsIds id, 1907 uint8 flags, 1908 uint32 value) { 1909 CHECK(in_io_loop_); 1910 1911 if (availability_state_ == STATE_CLOSED) 1912 return; 1913 1914 HandleSetting(id, value); 1915 http_server_properties_->SetSpdySetting( 1916 host_port_pair(), 1917 id, 1918 static_cast<SpdySettingsFlags>(flags), 1919 value); 1920 received_settings_ = true; 1921 1922 // Log the setting. 1923 net_log_.AddEvent( 1924 NetLog::TYPE_SPDY_SESSION_RECV_SETTING, 1925 base::Bind(&NetLogSpdySettingCallback, 1926 id, static_cast<SpdySettingsFlags>(flags), value)); 1927 } 1928 1929 void SpdySession::OnSendCompressedFrame( 1930 SpdyStreamId stream_id, 1931 SpdyFrameType type, 1932 size_t payload_len, 1933 size_t frame_len) { 1934 if (type != SYN_STREAM) 1935 return; 1936 1937 DCHECK(buffered_spdy_framer_.get()); 1938 size_t compressed_len = 1939 frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize(); 1940 1941 if (payload_len) { 1942 // Make sure we avoid early decimal truncation. 1943 int compression_pct = 100 - (100 * compressed_len) / payload_len; 1944 UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage", 1945 compression_pct); 1946 } 1947 } 1948 1949 void SpdySession::OnReceiveCompressedFrame( 1950 SpdyStreamId stream_id, 1951 SpdyFrameType type, 1952 size_t frame_len) { 1953 last_compressed_frame_len_ = frame_len; 1954 } 1955 1956 int SpdySession::OnInitialResponseHeadersReceived( 1957 const SpdyHeaderBlock& response_headers, 1958 base::Time response_time, 1959 base::TimeTicks recv_first_byte_time, 1960 SpdyStream* stream) { 1961 CHECK(in_io_loop_); 1962 SpdyStreamId stream_id = stream->stream_id(); 1963 // May invalidate |stream|. 1964 int rv = stream->OnInitialResponseHeadersReceived( 1965 response_headers, response_time, recv_first_byte_time); 1966 if (rv < 0) { 1967 DCHECK_NE(rv, ERR_IO_PENDING); 1968 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 1969 } 1970 return rv; 1971 } 1972 1973 void SpdySession::OnSynStream(SpdyStreamId stream_id, 1974 SpdyStreamId associated_stream_id, 1975 SpdyPriority priority, 1976 uint8 credential_slot, 1977 bool fin, 1978 bool unidirectional, 1979 const SpdyHeaderBlock& headers) { 1980 CHECK(in_io_loop_); 1981 1982 if (availability_state_ == STATE_CLOSED) 1983 return; 1984 1985 base::Time response_time = base::Time::Now(); 1986 base::TimeTicks recv_first_byte_time = time_func_(); 1987 1988 if (net_log_.IsLoggingAllEvents()) { 1989 net_log_.AddEvent( 1990 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM, 1991 base::Bind(&NetLogSpdySynStreamReceivedCallback, 1992 &headers, fin, unidirectional, priority, 1993 stream_id, associated_stream_id)); 1994 } 1995 1996 // Server-initiated streams should have even sequence numbers. 1997 if ((stream_id & 0x1) != 0) { 1998 LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id; 1999 return; 2000 } 2001 2002 if (IsStreamActive(stream_id)) { 2003 LOG(WARNING) << "Received OnSyn for active stream " << stream_id; 2004 return; 2005 } 2006 2007 RequestPriority request_priority = 2008 ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion()); 2009 2010 if (availability_state_ == STATE_GOING_AWAY) { 2011 // TODO(akalin): This behavior isn't in the SPDY spec, although it 2012 // probably should be. 2013 EnqueueResetStreamFrame(stream_id, request_priority, 2014 RST_STREAM_REFUSED_STREAM, 2015 "OnSyn received when going away"); 2016 return; 2017 } 2018 2019 if (associated_stream_id == 0) { 2020 std::string description = base::StringPrintf( 2021 "Received invalid OnSyn associated stream id %d for stream %d", 2022 associated_stream_id, stream_id); 2023 EnqueueResetStreamFrame(stream_id, request_priority, 2024 RST_STREAM_REFUSED_STREAM, description); 2025 return; 2026 } 2027 2028 streams_pushed_count_++; 2029 2030 // TODO(mbelshe): DCHECK that this is a GET method? 2031 2032 // Verify that the response had a URL for us. 2033 GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true); 2034 if (!gurl.is_valid()) { 2035 EnqueueResetStreamFrame( 2036 stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR, 2037 "Pushed stream url was invalid: " + gurl.spec()); 2038 return; 2039 } 2040 2041 // Verify we have a valid stream association. 2042 ActiveStreamMap::iterator associated_it = 2043 active_streams_.find(associated_stream_id); 2044 if (associated_it == active_streams_.end()) { 2045 EnqueueResetStreamFrame( 2046 stream_id, request_priority, RST_STREAM_INVALID_STREAM, 2047 base::StringPrintf( 2048 "Received OnSyn with inactive associated stream %d", 2049 associated_stream_id)); 2050 return; 2051 } 2052 2053 // Check that the SYN advertises the same origin as its associated stream. 2054 // Bypass this check if and only if this session is with a SPDY proxy that 2055 // is trusted explicitly via the --trusted-spdy-proxy switch. 2056 if (trusted_spdy_proxy_.Equals(host_port_pair())) { 2057 // Disallow pushing of HTTPS content. 2058 if (gurl.SchemeIs("https")) { 2059 EnqueueResetStreamFrame( 2060 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, 2061 base::StringPrintf( 2062 "Rejected push of Cross Origin HTTPS content %d", 2063 associated_stream_id)); 2064 } 2065 } else { 2066 GURL associated_url(associated_it->second.stream->GetUrlFromHeaders()); 2067 if (associated_url.GetOrigin() != gurl.GetOrigin()) { 2068 EnqueueResetStreamFrame( 2069 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, 2070 base::StringPrintf( 2071 "Rejected Cross Origin Push Stream %d", 2072 associated_stream_id)); 2073 return; 2074 } 2075 } 2076 2077 // There should not be an existing pushed stream with the same path. 2078 PushedStreamMap::iterator pushed_it = 2079 unclaimed_pushed_streams_.lower_bound(gurl); 2080 if (pushed_it != unclaimed_pushed_streams_.end() && 2081 pushed_it->first == gurl) { 2082 EnqueueResetStreamFrame( 2083 stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR, 2084 "Received duplicate pushed stream with url: " + 2085 gurl.spec()); 2086 return; 2087 } 2088 2089 scoped_ptr<SpdyStream> stream( 2090 new SpdyStream(SPDY_PUSH_STREAM, GetWeakPtr(), gurl, 2091 request_priority, 2092 stream_initial_send_window_size_, 2093 stream_initial_recv_window_size_, 2094 net_log_)); 2095 stream->set_stream_id(stream_id); 2096 stream->IncrementRawReceivedBytes(last_compressed_frame_len_); 2097 last_compressed_frame_len_ = 0; 2098 2099 DeleteExpiredPushedStreams(); 2100 PushedStreamMap::iterator inserted_pushed_it = 2101 unclaimed_pushed_streams_.insert( 2102 pushed_it, 2103 std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_()))); 2104 DCHECK(inserted_pushed_it != pushed_it); 2105 2106 InsertActivatedStream(stream.Pass()); 2107 2108 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 2109 if (active_it == active_streams_.end()) { 2110 NOTREACHED(); 2111 return; 2112 } 2113 2114 // Parse the headers. 2115 if (OnInitialResponseHeadersReceived( 2116 headers, response_time, 2117 recv_first_byte_time, active_it->second.stream) != OK) 2118 return; 2119 2120 base::StatsCounter push_requests("spdy.pushed_streams"); 2121 push_requests.Increment(); 2122 } 2123 2124 void SpdySession::DeleteExpiredPushedStreams() { 2125 if (unclaimed_pushed_streams_.empty()) 2126 return; 2127 2128 // Check that adequate time has elapsed since the last sweep. 2129 if (time_func_() < next_unclaimed_push_stream_sweep_time_) 2130 return; 2131 2132 // Gather old streams to delete. 2133 base::TimeTicks minimum_freshness = time_func_() - 2134 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2135 std::vector<SpdyStreamId> streams_to_close; 2136 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); 2137 it != unclaimed_pushed_streams_.end(); ++it) { 2138 if (minimum_freshness > it->second.creation_time) 2139 streams_to_close.push_back(it->second.stream_id); 2140 } 2141 2142 for (std::vector<SpdyStreamId>::const_iterator to_close_it = 2143 streams_to_close.begin(); 2144 to_close_it != streams_to_close.end(); ++to_close_it) { 2145 ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it); 2146 if (active_it == active_streams_.end()) 2147 continue; 2148 2149 LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM); 2150 // CloseActiveStreamIterator() will remove the stream from 2151 // |unclaimed_pushed_streams_|. 2152 CloseActiveStreamIterator(active_it, ERR_INVALID_SPDY_STREAM); 2153 } 2154 2155 next_unclaimed_push_stream_sweep_time_ = time_func_() + 2156 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2157 } 2158 2159 void SpdySession::OnSynReply(SpdyStreamId stream_id, 2160 bool fin, 2161 const SpdyHeaderBlock& headers) { 2162 CHECK(in_io_loop_); 2163 2164 if (availability_state_ == STATE_CLOSED) 2165 return; 2166 2167 base::Time response_time = base::Time::Now(); 2168 base::TimeTicks recv_first_byte_time = time_func_(); 2169 2170 if (net_log().IsLoggingAllEvents()) { 2171 net_log().AddEvent( 2172 NetLog::TYPE_SPDY_SESSION_SYN_REPLY, 2173 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback, 2174 &headers, fin, stream_id)); 2175 } 2176 2177 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2178 if (it == active_streams_.end()) { 2179 // NOTE: it may just be that the stream was cancelled. 2180 return; 2181 } 2182 2183 SpdyStream* stream = it->second.stream; 2184 CHECK_EQ(stream->stream_id(), stream_id); 2185 2186 stream->IncrementRawReceivedBytes(last_compressed_frame_len_); 2187 last_compressed_frame_len_ = 0; 2188 2189 if (!it->second.waiting_for_syn_reply) { 2190 const std::string& error = 2191 "Received duplicate SYN_REPLY for stream."; 2192 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2193 ResetStreamIterator(it, RST_STREAM_STREAM_IN_USE, error); 2194 return; 2195 } 2196 it->second.waiting_for_syn_reply = false; 2197 2198 ignore_result(OnInitialResponseHeadersReceived( 2199 headers, response_time, recv_first_byte_time, stream)); 2200 } 2201 2202 void SpdySession::OnHeaders(SpdyStreamId stream_id, 2203 bool fin, 2204 const SpdyHeaderBlock& headers) { 2205 CHECK(in_io_loop_); 2206 2207 if (availability_state_ == STATE_CLOSED) 2208 return; 2209 2210 if (net_log().IsLoggingAllEvents()) { 2211 net_log().AddEvent( 2212 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, 2213 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback, 2214 &headers, fin, stream_id)); 2215 } 2216 2217 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2218 if (it == active_streams_.end()) { 2219 // NOTE: it may just be that the stream was cancelled. 2220 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id; 2221 return; 2222 } 2223 2224 SpdyStream* stream = it->second.stream; 2225 CHECK_EQ(stream->stream_id(), stream_id); 2226 2227 stream->IncrementRawReceivedBytes(last_compressed_frame_len_); 2228 last_compressed_frame_len_ = 0; 2229 2230 int rv = stream->OnAdditionalResponseHeadersReceived(headers); 2231 if (rv < 0) { 2232 DCHECK_NE(rv, ERR_IO_PENDING); 2233 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 2234 } 2235 } 2236 2237 void SpdySession::OnRstStream(SpdyStreamId stream_id, 2238 SpdyRstStreamStatus status) { 2239 CHECK(in_io_loop_); 2240 2241 if (availability_state_ == STATE_CLOSED) 2242 return; 2243 2244 std::string description; 2245 net_log().AddEvent( 2246 NetLog::TYPE_SPDY_SESSION_RST_STREAM, 2247 base::Bind(&NetLogSpdyRstCallback, 2248 stream_id, status, &description)); 2249 2250 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2251 if (it == active_streams_.end()) { 2252 // NOTE: it may just be that the stream was cancelled. 2253 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 2254 return; 2255 } 2256 2257 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2258 2259 if (status == 0) { 2260 it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>()); 2261 } else if (status == RST_STREAM_REFUSED_STREAM) { 2262 CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM); 2263 } else { 2264 RecordProtocolErrorHistogram( 2265 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); 2266 it->second.stream->LogStreamError( 2267 ERR_SPDY_PROTOCOL_ERROR, 2268 base::StringPrintf("SPDY stream closed with status: %d", status)); 2269 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 2270 // For now, it doesn't matter much - it is a protocol error. 2271 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 2272 } 2273 } 2274 2275 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, 2276 SpdyGoAwayStatus status) { 2277 CHECK(in_io_loop_); 2278 2279 if (availability_state_ == STATE_CLOSED) 2280 return; 2281 2282 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, 2283 base::Bind(&NetLogSpdyGoAwayCallback, 2284 last_accepted_stream_id, 2285 active_streams_.size(), 2286 unclaimed_pushed_streams_.size(), 2287 status)); 2288 if (availability_state_ < STATE_GOING_AWAY) { 2289 availability_state_ = STATE_GOING_AWAY; 2290 // |pool_| will be NULL when |InitializeWithSocket()| is in the 2291 // call stack. 2292 if (pool_) 2293 pool_->MakeSessionUnavailable(GetWeakPtr()); 2294 } 2295 StartGoingAway(last_accepted_stream_id, ERR_ABORTED); 2296 // This is to handle the case when we already don't have any active 2297 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have 2298 // active streams and so the last one being closed will finish the 2299 // going away process (see DeleteStream()). 2300 MaybeFinishGoingAway(); 2301 } 2302 2303 void SpdySession::OnPing(uint32 unique_id) { 2304 CHECK(in_io_loop_); 2305 2306 if (availability_state_ == STATE_CLOSED) 2307 return; 2308 2309 net_log_.AddEvent( 2310 NetLog::TYPE_SPDY_SESSION_PING, 2311 base::Bind(&NetLogSpdyPingCallback, unique_id, "received")); 2312 2313 // Send response to a PING from server. 2314 if (unique_id % 2 == 0) { 2315 WritePingFrame(unique_id); 2316 return; 2317 } 2318 2319 --pings_in_flight_; 2320 if (pings_in_flight_ < 0) { 2321 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING); 2322 CloseSessionResult result = 2323 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0."); 2324 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 2325 pings_in_flight_ = 0; 2326 return; 2327 } 2328 2329 if (pings_in_flight_ > 0) 2330 return; 2331 2332 // We will record RTT in histogram when there are no more client sent 2333 // pings_in_flight_. 2334 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_); 2335 } 2336 2337 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, 2338 uint32 delta_window_size) { 2339 CHECK(in_io_loop_); 2340 2341 if (availability_state_ == STATE_CLOSED) 2342 return; 2343 2344 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); 2345 net_log_.AddEvent( 2346 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, 2347 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2348 stream_id, delta_window_size)); 2349 2350 if (stream_id == kSessionFlowControlStreamId) { 2351 // WINDOW_UPDATE for the session. 2352 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) { 2353 LOG(WARNING) << "Received WINDOW_UPDATE for session when " 2354 << "session flow control is not turned on"; 2355 // TODO(akalin): Record an error and close the session. 2356 return; 2357 } 2358 2359 if (delta_window_size < 1u) { 2360 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2361 CloseSessionResult result = DoCloseSession( 2362 ERR_SPDY_PROTOCOL_ERROR, 2363 "Received WINDOW_UPDATE with an invalid delta_window_size " + 2364 base::UintToString(delta_window_size)); 2365 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 2366 return; 2367 } 2368 2369 IncreaseSendWindowSize(static_cast<int32>(delta_window_size)); 2370 } else { 2371 // WINDOW_UPDATE for a stream. 2372 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2373 // TODO(akalin): Record an error and close the session. 2374 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id 2375 << " when flow control is not turned on"; 2376 return; 2377 } 2378 2379 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2380 2381 if (it == active_streams_.end()) { 2382 // NOTE: it may just be that the stream was cancelled. 2383 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id; 2384 return; 2385 } 2386 2387 SpdyStream* stream = it->second.stream; 2388 CHECK_EQ(stream->stream_id(), stream_id); 2389 2390 if (delta_window_size < 1u) { 2391 ResetStreamIterator(it, 2392 RST_STREAM_FLOW_CONTROL_ERROR, 2393 base::StringPrintf( 2394 "Received WINDOW_UPDATE with an invalid " 2395 "delta_window_size %ud", delta_window_size)); 2396 return; 2397 } 2398 2399 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2400 it->second.stream->IncreaseSendWindowSize( 2401 static_cast<int32>(delta_window_size)); 2402 } 2403 } 2404 2405 void SpdySession::OnPushPromise(SpdyStreamId stream_id, 2406 SpdyStreamId promised_stream_id) { 2407 // TODO(akalin): Handle PUSH_PROMISE frames. 2408 } 2409 2410 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id, 2411 uint32 delta_window_size) { 2412 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2413 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2414 CHECK(it != active_streams_.end()); 2415 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2416 SendWindowUpdateFrame( 2417 stream_id, delta_window_size, it->second.stream->priority()); 2418 } 2419 2420 void SpdySession::SendInitialData() { 2421 DCHECK(enable_sending_initial_data_); 2422 DCHECK_NE(availability_state_, STATE_CLOSED); 2423 2424 if (send_connection_header_prefix_) { 2425 DCHECK_EQ(protocol_, kProtoHTTP2Draft04); 2426 scoped_ptr<SpdyFrame> connection_header_prefix_frame( 2427 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix), 2428 kHttp2ConnectionHeaderPrefixSize, 2429 false /* take_ownership */)); 2430 // Count the prefix as part of the subsequent SETTINGS frame. 2431 EnqueueSessionWrite(HIGHEST, SETTINGS, 2432 connection_header_prefix_frame.Pass()); 2433 } 2434 2435 // First, notify the server about the settings they should use when 2436 // communicating with us. 2437 SettingsMap settings_map; 2438 // Create a new settings frame notifying the server of our 2439 // max concurrent streams and initial window size. 2440 settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] = 2441 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams); 2442 if (flow_control_state_ >= FLOW_CONTROL_STREAM && 2443 stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) { 2444 settings_map[SETTINGS_INITIAL_WINDOW_SIZE] = 2445 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, 2446 stream_initial_recv_window_size_); 2447 } 2448 SendSettings(settings_map); 2449 2450 // Next, notify the server about our initial recv window size. 2451 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 2452 // Bump up the receive window size to the real initial value. This 2453 // has to go here since the WINDOW_UPDATE frame sent by 2454 // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|. 2455 DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_); 2456 // This condition implies that |kDefaultInitialRecvWindowSize| - 2457 // |session_recv_window_size_| doesn't overflow. 2458 DCHECK_GT(session_recv_window_size_, 0); 2459 IncreaseRecvWindowSize( 2460 kDefaultInitialRecvWindowSize - session_recv_window_size_); 2461 } 2462 2463 // Finally, notify the server about the settings they have 2464 // previously told us to use when communicating with them (after 2465 // applying them). 2466 const SettingsMap& server_settings_map = 2467 http_server_properties_->GetSpdySettings(host_port_pair()); 2468 if (server_settings_map.empty()) 2469 return; 2470 2471 SettingsMap::const_iterator it = 2472 server_settings_map.find(SETTINGS_CURRENT_CWND); 2473 uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0; 2474 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100); 2475 2476 for (SettingsMap::const_iterator it = server_settings_map.begin(); 2477 it != server_settings_map.end(); ++it) { 2478 const SpdySettingsIds new_id = it->first; 2479 const uint32 new_val = it->second.second; 2480 HandleSetting(new_id, new_val); 2481 } 2482 2483 SendSettings(server_settings_map); 2484 } 2485 2486 2487 void SpdySession::SendSettings(const SettingsMap& settings) { 2488 DCHECK_NE(availability_state_, STATE_CLOSED); 2489 2490 net_log_.AddEvent( 2491 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 2492 base::Bind(&NetLogSpdySendSettingsCallback, &settings)); 2493 2494 // Create the SETTINGS frame and send it. 2495 DCHECK(buffered_spdy_framer_.get()); 2496 scoped_ptr<SpdyFrame> settings_frame( 2497 buffered_spdy_framer_->CreateSettings(settings)); 2498 sent_settings_ = true; 2499 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass()); 2500 } 2501 2502 void SpdySession::HandleSetting(uint32 id, uint32 value) { 2503 switch (id) { 2504 case SETTINGS_MAX_CONCURRENT_STREAMS: 2505 max_concurrent_streams_ = std::min(static_cast<size_t>(value), 2506 kMaxConcurrentStreamLimit); 2507 ProcessPendingStreamRequests(); 2508 break; 2509 case SETTINGS_INITIAL_WINDOW_SIZE: { 2510 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2511 net_log().AddEvent( 2512 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL); 2513 return; 2514 } 2515 2516 if (value > static_cast<uint32>(kint32max)) { 2517 net_log().AddEvent( 2518 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE, 2519 NetLog::IntegerCallback("initial_window_size", value)); 2520 return; 2521 } 2522 2523 // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only. 2524 int32 delta_window_size = 2525 static_cast<int32>(value) - stream_initial_send_window_size_; 2526 stream_initial_send_window_size_ = static_cast<int32>(value); 2527 UpdateStreamsSendWindowSize(delta_window_size); 2528 net_log().AddEvent( 2529 NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE, 2530 NetLog::IntegerCallback("delta_window_size", delta_window_size)); 2531 break; 2532 } 2533 } 2534 } 2535 2536 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { 2537 DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2538 for (ActiveStreamMap::iterator it = active_streams_.begin(); 2539 it != active_streams_.end(); ++it) { 2540 it->second.stream->AdjustSendWindowSize(delta_window_size); 2541 } 2542 2543 for (CreatedStreamSet::const_iterator it = created_streams_.begin(); 2544 it != created_streams_.end(); it++) { 2545 (*it)->AdjustSendWindowSize(delta_window_size); 2546 } 2547 } 2548 2549 void SpdySession::SendPrefacePingIfNoneInFlight() { 2550 if (pings_in_flight_ || !enable_ping_based_connection_checking_) 2551 return; 2552 2553 base::TimeTicks now = time_func_(); 2554 // If there is no activity in the session, then send a preface-PING. 2555 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_) 2556 SendPrefacePing(); 2557 } 2558 2559 void SpdySession::SendPrefacePing() { 2560 WritePingFrame(next_ping_id_); 2561 } 2562 2563 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, 2564 uint32 delta_window_size, 2565 RequestPriority priority) { 2566 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2567 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2568 if (it != active_streams_.end()) { 2569 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2570 } else { 2571 CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2572 CHECK_EQ(stream_id, kSessionFlowControlStreamId); 2573 } 2574 2575 net_log_.AddEvent( 2576 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME, 2577 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2578 stream_id, delta_window_size)); 2579 2580 DCHECK(buffered_spdy_framer_.get()); 2581 scoped_ptr<SpdyFrame> window_update_frame( 2582 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); 2583 EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass()); 2584 } 2585 2586 void SpdySession::WritePingFrame(uint32 unique_id) { 2587 DCHECK(buffered_spdy_framer_.get()); 2588 scoped_ptr<SpdyFrame> ping_frame( 2589 buffered_spdy_framer_->CreatePingFrame(unique_id)); 2590 EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass()); 2591 2592 if (net_log().IsLoggingAllEvents()) { 2593 net_log().AddEvent( 2594 NetLog::TYPE_SPDY_SESSION_PING, 2595 base::Bind(&NetLogSpdyPingCallback, unique_id, "sent")); 2596 } 2597 if (unique_id % 2 != 0) { 2598 next_ping_id_ += 2; 2599 ++pings_in_flight_; 2600 PlanToCheckPingStatus(); 2601 last_ping_sent_time_ = time_func_(); 2602 } 2603 } 2604 2605 void SpdySession::PlanToCheckPingStatus() { 2606 if (check_ping_status_pending_) 2607 return; 2608 2609 check_ping_status_pending_ = true; 2610 base::MessageLoop::current()->PostDelayedTask( 2611 FROM_HERE, 2612 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2613 time_func_()), hung_interval_); 2614 } 2615 2616 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) { 2617 CHECK(!in_io_loop_); 2618 DCHECK_NE(availability_state_, STATE_CLOSED); 2619 2620 // Check if we got a response back for all PINGs we had sent. 2621 if (pings_in_flight_ == 0) { 2622 check_ping_status_pending_ = false; 2623 return; 2624 } 2625 2626 DCHECK(check_ping_status_pending_); 2627 2628 base::TimeTicks now = time_func_(); 2629 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_); 2630 2631 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) { 2632 // Track all failed PING messages in a separate bucket. 2633 const base::TimeDelta kFailedPing = 2634 base::TimeDelta::FromInternalValue(INT_MAX); 2635 RecordPingRTTHistogram(kFailedPing); 2636 CloseSessionResult result = 2637 DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping."); 2638 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); 2639 return; 2640 } 2641 2642 // Check the status of connection after a delay. 2643 base::MessageLoop::current()->PostDelayedTask( 2644 FROM_HERE, 2645 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2646 now), 2647 delay); 2648 } 2649 2650 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) { 2651 UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration); 2652 } 2653 2654 void SpdySession::RecordProtocolErrorHistogram( 2655 SpdyProtocolErrorDetails details) { 2656 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details, 2657 NUM_SPDY_PROTOCOL_ERROR_DETAILS); 2658 if (EndsWith(host_port_pair().host(), "google.com", false)) { 2659 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details, 2660 NUM_SPDY_PROTOCOL_ERROR_DETAILS); 2661 } 2662 } 2663 2664 void SpdySession::RecordHistograms() { 2665 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession", 2666 streams_initiated_count_, 2667 0, 300, 50); 2668 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession", 2669 streams_pushed_count_, 2670 0, 300, 50); 2671 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession", 2672 streams_pushed_and_claimed_count_, 2673 0, 300, 50); 2674 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession", 2675 streams_abandoned_count_, 2676 0, 300, 50); 2677 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent", 2678 sent_settings_ ? 1 : 0, 2); 2679 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived", 2680 received_settings_ ? 1 : 0, 2); 2681 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession", 2682 stalled_streams_, 2683 0, 300, 50); 2684 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls", 2685 stalled_streams_ > 0 ? 1 : 0, 2); 2686 2687 if (received_settings_) { 2688 // Enumerate the saved settings, and set histograms for it. 2689 const SettingsMap& settings_map = 2690 http_server_properties_->GetSpdySettings(host_port_pair()); 2691 2692 SettingsMap::const_iterator it; 2693 for (it = settings_map.begin(); it != settings_map.end(); ++it) { 2694 const SpdySettingsIds id = it->first; 2695 const uint32 val = it->second.second; 2696 switch (id) { 2697 case SETTINGS_CURRENT_CWND: 2698 // Record several different histograms to see if cwnd converges 2699 // for larger volumes of data being sent. 2700 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd", 2701 val, 1, 200, 100); 2702 if (total_bytes_received_ > 10 * 1024) { 2703 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K", 2704 val, 1, 200, 100); 2705 if (total_bytes_received_ > 25 * 1024) { 2706 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K", 2707 val, 1, 200, 100); 2708 if (total_bytes_received_ > 50 * 1024) { 2709 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K", 2710 val, 1, 200, 100); 2711 if (total_bytes_received_ > 100 * 1024) { 2712 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K", 2713 val, 1, 200, 100); 2714 } 2715 } 2716 } 2717 } 2718 break; 2719 case SETTINGS_ROUND_TRIP_TIME: 2720 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT", 2721 val, 1, 1200, 100); 2722 break; 2723 case SETTINGS_DOWNLOAD_RETRANS_RATE: 2724 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate", 2725 val, 1, 100, 50); 2726 break; 2727 default: 2728 break; 2729 } 2730 } 2731 } 2732 } 2733 2734 void SpdySession::CompleteStreamRequest( 2735 const base::WeakPtr<SpdyStreamRequest>& pending_request) { 2736 // Abort if the request has already been cancelled. 2737 if (!pending_request) 2738 return; 2739 2740 base::WeakPtr<SpdyStream> stream; 2741 int rv = CreateStream(*pending_request, &stream); 2742 2743 if (rv == OK) { 2744 DCHECK(stream); 2745 pending_request->OnRequestCompleteSuccess(stream); 2746 } else { 2747 DCHECK(!stream); 2748 pending_request->OnRequestCompleteFailure(rv); 2749 } 2750 } 2751 2752 SSLClientSocket* SpdySession::GetSSLClientSocket() const { 2753 if (!is_secure_) 2754 return NULL; 2755 SSLClientSocket* ssl_socket = 2756 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 2757 DCHECK(ssl_socket); 2758 return ssl_socket; 2759 } 2760 2761 void SpdySession::OnWriteBufferConsumed( 2762 size_t frame_payload_size, 2763 size_t consume_size, 2764 SpdyBuffer::ConsumeSource consume_source) { 2765 // We can be called with |in_io_loop_| set if a write SpdyBuffer is 2766 // deleted (e.g., a stream is closed due to incoming data). 2767 2768 if (availability_state_ == STATE_CLOSED) 2769 return; 2770 2771 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2772 2773 if (consume_source == SpdyBuffer::DISCARD) { 2774 // If we're discarding a frame or part of it, increase the send 2775 // window by the number of discarded bytes. (Although if we're 2776 // discarding part of a frame, it's probably because of a write 2777 // error and we'll be tearing down the session soon.) 2778 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); 2779 DCHECK_GT(remaining_payload_bytes, 0u); 2780 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); 2781 } 2782 // For consumed bytes, the send window is increased when we receive 2783 // a WINDOW_UPDATE frame. 2784 } 2785 2786 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { 2787 // We can be called with |in_io_loop_| set if a SpdyBuffer is 2788 // deleted (e.g., a stream is closed due to incoming data). 2789 2790 DCHECK_NE(availability_state_, STATE_CLOSED); 2791 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2792 DCHECK_GE(delta_window_size, 1); 2793 2794 // Check for overflow. 2795 int32 max_delta_window_size = kint32max - session_send_window_size_; 2796 if (delta_window_size > max_delta_window_size) { 2797 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2798 CloseSessionResult result = DoCloseSession( 2799 ERR_SPDY_PROTOCOL_ERROR, 2800 "Received WINDOW_UPDATE [delta: " + 2801 base::IntToString(delta_window_size) + 2802 "] for session overflows session_send_window_size_ [current: " + 2803 base::IntToString(session_send_window_size_) + "]"); 2804 DCHECK_NE(result, SESSION_ALREADY_CLOSED); 2805 return; 2806 } 2807 2808 session_send_window_size_ += delta_window_size; 2809 2810 net_log_.AddEvent( 2811 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2812 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2813 delta_window_size, session_send_window_size_)); 2814 2815 DCHECK(!IsSendStalled()); 2816 ResumeSendStalledStreams(); 2817 } 2818 2819 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { 2820 DCHECK_NE(availability_state_, STATE_CLOSED); 2821 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2822 2823 // We only call this method when sending a frame. Therefore, 2824 // |delta_window_size| should be within the valid frame size range. 2825 DCHECK_GE(delta_window_size, 1); 2826 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 2827 2828 // |send_window_size_| should have been at least |delta_window_size| for 2829 // this call to happen. 2830 DCHECK_GE(session_send_window_size_, delta_window_size); 2831 2832 session_send_window_size_ -= delta_window_size; 2833 2834 net_log_.AddEvent( 2835 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2836 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2837 -delta_window_size, session_send_window_size_)); 2838 } 2839 2840 void SpdySession::OnReadBufferConsumed( 2841 size_t consume_size, 2842 SpdyBuffer::ConsumeSource consume_source) { 2843 // We can be called with |in_io_loop_| set if a read SpdyBuffer is 2844 // deleted (e.g., discarded by a SpdyReadQueue). 2845 2846 if (availability_state_ == STATE_CLOSED) 2847 return; 2848 2849 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2850 DCHECK_GE(consume_size, 1u); 2851 DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); 2852 2853 IncreaseRecvWindowSize(static_cast<int32>(consume_size)); 2854 } 2855 2856 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { 2857 DCHECK_NE(availability_state_, STATE_CLOSED); 2858 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2859 DCHECK_GE(session_unacked_recv_window_bytes_, 0); 2860 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); 2861 DCHECK_GE(delta_window_size, 1); 2862 // Check for overflow. 2863 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); 2864 2865 session_recv_window_size_ += delta_window_size; 2866 net_log_.AddEvent( 2867 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 2868 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2869 delta_window_size, session_recv_window_size_)); 2870 2871 session_unacked_recv_window_bytes_ += delta_window_size; 2872 if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) { 2873 SendWindowUpdateFrame(kSessionFlowControlStreamId, 2874 session_unacked_recv_window_bytes_, 2875 HIGHEST); 2876 session_unacked_recv_window_bytes_ = 0; 2877 } 2878 } 2879 2880 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { 2881 CHECK(in_io_loop_); 2882 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2883 DCHECK_GE(delta_window_size, 1); 2884 2885 // Since we never decrease the initial receive window size, 2886 // |delta_window_size| should never cause |recv_window_size_| to go 2887 // negative. If we do, the receive window isn't being respected. 2888 if (delta_window_size > session_recv_window_size_) { 2889 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION); 2890 CloseSessionResult result = DoCloseSession( 2891 ERR_SPDY_PROTOCOL_ERROR, 2892 "delta_window_size is " + base::IntToString(delta_window_size) + 2893 " in DecreaseRecvWindowSize, which is larger than the receive " + 2894 "window size of " + base::IntToString(session_recv_window_size_)); 2895 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 2896 return; 2897 } 2898 2899 session_recv_window_size_ -= delta_window_size; 2900 net_log_.AddEvent( 2901 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 2902 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2903 -delta_window_size, session_recv_window_size_)); 2904 } 2905 2906 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) { 2907 DCHECK(stream.send_stalled_by_flow_control()); 2908 RequestPriority priority = stream.priority(); 2909 CHECK_GE(priority, MINIMUM_PRIORITY); 2910 CHECK_LE(priority, MAXIMUM_PRIORITY); 2911 stream_send_unstall_queue_[priority].push_back(stream.stream_id()); 2912 } 2913 2914 void SpdySession::ResumeSendStalledStreams() { 2915 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2916 2917 // We don't have to worry about new streams being queued, since 2918 // doing so would cause IsSendStalled() to return true. But we do 2919 // have to worry about streams being closed, as well as ourselves 2920 // being closed. 2921 2922 while (availability_state_ != STATE_CLOSED && !IsSendStalled()) { 2923 size_t old_size = 0; 2924 if (DCHECK_IS_ON()) 2925 old_size = GetTotalSize(stream_send_unstall_queue_); 2926 2927 SpdyStreamId stream_id = PopStreamToPossiblyResume(); 2928 if (stream_id == 0) 2929 break; 2930 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2931 // The stream may actually still be send-stalled after this (due 2932 // to its own send window) but that's okay -- it'll then be 2933 // resumed once its send window increases. 2934 if (it != active_streams_.end()) 2935 it->second.stream->PossiblyResumeIfSendStalled(); 2936 2937 // The size should decrease unless we got send-stalled again. 2938 if (!IsSendStalled()) 2939 DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size); 2940 } 2941 } 2942 2943 SpdyStreamId SpdySession::PopStreamToPossiblyResume() { 2944 for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) { 2945 std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i]; 2946 if (!queue->empty()) { 2947 SpdyStreamId stream_id = queue->front(); 2948 queue->pop_front(); 2949 return stream_id; 2950 } 2951 } 2952 return 0; 2953 } 2954 2955 } // namespace net 2956