1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/spdy/spdy_session.h" 6 7 #include <algorithm> 8 #include <map> 9 10 #include "base/basictypes.h" 11 #include "base/bind.h" 12 #include "base/compiler_specific.h" 13 #include "base/logging.h" 14 #include "base/message_loop/message_loop.h" 15 #include "base/metrics/field_trial.h" 16 #include "base/metrics/histogram.h" 17 #include "base/metrics/sparse_histogram.h" 18 #include "base/metrics/stats_counters.h" 19 #include "base/stl_util.h" 20 #include "base/strings/string_number_conversions.h" 21 #include "base/strings/string_util.h" 22 #include "base/strings/stringprintf.h" 23 #include "base/strings/utf_string_conversions.h" 24 #include "base/time/time.h" 25 #include "base/values.h" 26 #include "crypto/ec_private_key.h" 27 #include "crypto/ec_signature_creator.h" 28 #include "net/base/connection_type_histograms.h" 29 #include "net/base/net_log.h" 30 #include "net/base/net_util.h" 31 #include "net/cert/asn1_util.h" 32 #include "net/http/http_network_session.h" 33 #include "net/http/http_server_properties.h" 34 #include "net/spdy/spdy_buffer_producer.h" 35 #include "net/spdy/spdy_credential_builder.h" 36 #include "net/spdy/spdy_frame_builder.h" 37 #include "net/spdy/spdy_http_utils.h" 38 #include "net/spdy/spdy_protocol.h" 39 #include "net/spdy/spdy_session_pool.h" 40 #include "net/spdy/spdy_stream.h" 41 #include "net/ssl/server_bound_cert_service.h" 42 43 namespace net { 44 45 namespace { 46 47 const int kReadBufferSize = 8 * 1024; 48 const int kDefaultConnectionAtRiskOfLossSeconds = 10; 49 const int kHungIntervalSeconds = 10; 50 51 // Always start at 1 for the first stream id. 52 const SpdyStreamId kFirstStreamId = 1; 53 54 // Minimum seconds that unclaimed pushed streams will be kept in memory. 55 const int kMinPushedStreamLifetimeSeconds = 300; 56 57 base::Value* NetLogSpdySynCallback(const SpdyHeaderBlock* headers, 58 bool fin, 59 bool unidirectional, 60 SpdyStreamId stream_id, 61 SpdyStreamId associated_stream, 62 NetLog::LogLevel /* log_level */) { 63 base::DictionaryValue* dict = new base::DictionaryValue(); 64 base::ListValue* headers_list = new base::ListValue(); 65 for (SpdyHeaderBlock::const_iterator it = headers->begin(); 66 it != headers->end(); ++it) { 67 headers_list->Append(new base::StringValue(base::StringPrintf( 68 "%s: %s", it->first.c_str(), 69 (ShouldShowHttpHeaderValue( 70 it->first) ? it->second : "[elided]").c_str()))); 71 } 72 dict->SetBoolean("fin", fin); 73 dict->SetBoolean("unidirectional", unidirectional); 74 dict->Set("headers", headers_list); 75 dict->SetInteger("stream_id", stream_id); 76 if (associated_stream) 77 dict->SetInteger("associated_stream", associated_stream); 78 return dict; 79 } 80 81 base::Value* NetLogSpdyCredentialCallback(size_t slot, 82 const std::string* origin, 83 NetLog::LogLevel /* log_level */) { 84 base::DictionaryValue* dict = new base::DictionaryValue(); 85 dict->SetInteger("slot", slot); 86 dict->SetString("origin", *origin); 87 return dict; 88 } 89 90 base::Value* NetLogSpdySessionCloseCallback(int net_error, 91 const std::string* description, 92 NetLog::LogLevel /* log_level */) { 93 base::DictionaryValue* dict = new base::DictionaryValue(); 94 dict->SetInteger("net_error", net_error); 95 dict->SetString("description", *description); 96 return dict; 97 } 98 99 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair, 100 NetLog::LogLevel /* log_level */) { 101 base::DictionaryValue* dict = new base::DictionaryValue(); 102 dict->SetString("host", host_pair->first.ToString()); 103 dict->SetString("proxy", host_pair->second.ToPacString()); 104 return dict; 105 } 106 107 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair, 108 bool clear_persisted, 109 NetLog::LogLevel /* log_level */) { 110 base::DictionaryValue* dict = new base::DictionaryValue(); 111 dict->SetString("host", host_port_pair.ToString()); 112 dict->SetBoolean("clear_persisted", clear_persisted); 113 return dict; 114 } 115 116 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id, 117 SpdySettingsFlags flags, 118 uint32 value, 119 NetLog::LogLevel /* log_level */) { 120 base::DictionaryValue* dict = new base::DictionaryValue(); 121 dict->SetInteger("id", id); 122 dict->SetInteger("flags", flags); 123 dict->SetInteger("value", value); 124 return dict; 125 } 126 127 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings, 128 NetLog::LogLevel /* log_level */) { 129 base::DictionaryValue* dict = new base::DictionaryValue(); 130 base::ListValue* settings_list = new base::ListValue(); 131 for (SettingsMap::const_iterator it = settings->begin(); 132 it != settings->end(); ++it) { 133 const SpdySettingsIds id = it->first; 134 const SpdySettingsFlags flags = it->second.first; 135 const uint32 value = it->second.second; 136 settings_list->Append(new base::StringValue( 137 base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value))); 138 } 139 dict->Set("settings", settings_list); 140 return dict; 141 } 142 143 base::Value* NetLogSpdyWindowUpdateFrameCallback( 144 SpdyStreamId stream_id, 145 uint32 delta, 146 NetLog::LogLevel /* log_level */) { 147 base::DictionaryValue* dict = new base::DictionaryValue(); 148 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 149 dict->SetInteger("delta", delta); 150 return dict; 151 } 152 153 base::Value* NetLogSpdySessionWindowUpdateCallback( 154 int32 delta, 155 int32 window_size, 156 NetLog::LogLevel /* log_level */) { 157 base::DictionaryValue* dict = new base::DictionaryValue(); 158 dict->SetInteger("delta", delta); 159 dict->SetInteger("window_size", window_size); 160 return dict; 161 } 162 163 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id, 164 int size, 165 bool fin, 166 NetLog::LogLevel /* log_level */) { 167 base::DictionaryValue* dict = new base::DictionaryValue(); 168 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 169 dict->SetInteger("size", size); 170 dict->SetBoolean("fin", fin); 171 return dict; 172 } 173 174 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id, 175 int status, 176 const std::string* description, 177 NetLog::LogLevel /* log_level */) { 178 base::DictionaryValue* dict = new base::DictionaryValue(); 179 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 180 dict->SetInteger("status", status); 181 dict->SetString("description", *description); 182 return dict; 183 } 184 185 base::Value* NetLogSpdyPingCallback(uint32 unique_id, 186 const char* type, 187 NetLog::LogLevel /* log_level */) { 188 base::DictionaryValue* dict = new base::DictionaryValue(); 189 dict->SetInteger("unique_id", unique_id); 190 dict->SetString("type", type); 191 return dict; 192 } 193 194 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id, 195 int active_streams, 196 int unclaimed_streams, 197 SpdyGoAwayStatus status, 198 NetLog::LogLevel /* log_level */) { 199 base::DictionaryValue* dict = new base::DictionaryValue(); 200 dict->SetInteger("last_accepted_stream_id", 201 static_cast<int>(last_stream_id)); 202 dict->SetInteger("active_streams", active_streams); 203 dict->SetInteger("unclaimed_streams", unclaimed_streams); 204 dict->SetInteger("status", static_cast<int>(status)); 205 return dict; 206 } 207 208 // The maximum number of concurrent streams we will ever create. Even if 209 // the server permits more, we will never exceed this limit. 210 const size_t kMaxConcurrentStreamLimit = 256; 211 212 } // namespace 213 214 SpdyStreamRequest::SpdyStreamRequest() { 215 Reset(); 216 } 217 218 SpdyStreamRequest::~SpdyStreamRequest() { 219 CancelRequest(); 220 } 221 222 int SpdyStreamRequest::StartRequest( 223 SpdyStreamType type, 224 const base::WeakPtr<SpdySession>& session, 225 const GURL& url, 226 RequestPriority priority, 227 const BoundNetLog& net_log, 228 const CompletionCallback& callback) { 229 DCHECK(session.get()); 230 DCHECK(!session_.get()); 231 DCHECK(!stream_.get()); 232 DCHECK(callback_.is_null()); 233 234 type_ = type; 235 session_ = session; 236 url_ = url; 237 priority_ = priority; 238 net_log_ = net_log; 239 callback_ = callback; 240 241 base::WeakPtr<SpdyStream> stream; 242 int rv = session->TryCreateStream(this, &stream); 243 if (rv == OK) { 244 Reset(); 245 stream_ = stream; 246 } 247 return rv; 248 } 249 250 void SpdyStreamRequest::CancelRequest() { 251 if (session_.get()) 252 session_->CancelStreamRequest(this); 253 Reset(); 254 } 255 256 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() { 257 DCHECK(!session_.get()); 258 base::WeakPtr<SpdyStream> stream = stream_; 259 DCHECK(stream.get()); 260 Reset(); 261 return stream; 262 } 263 264 void SpdyStreamRequest::OnRequestCompleteSuccess( 265 base::WeakPtr<SpdyStream>* stream) { 266 DCHECK(session_.get()); 267 DCHECK(!stream_.get()); 268 DCHECK(!callback_.is_null()); 269 CompletionCallback callback = callback_; 270 Reset(); 271 DCHECK(*stream); 272 stream_ = *stream; 273 callback.Run(OK); 274 } 275 276 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) { 277 DCHECK(session_.get()); 278 DCHECK(!stream_.get()); 279 DCHECK(!callback_.is_null()); 280 CompletionCallback callback = callback_; 281 Reset(); 282 DCHECK_NE(rv, OK); 283 callback.Run(rv); 284 } 285 286 void SpdyStreamRequest::Reset() { 287 type_ = SPDY_BIDIRECTIONAL_STREAM; 288 session_.reset(); 289 stream_.reset(); 290 url_ = GURL(); 291 priority_ = MINIMUM_PRIORITY; 292 net_log_ = BoundNetLog(); 293 callback_.Reset(); 294 } 295 296 SpdySession::ActiveStreamInfo::ActiveStreamInfo() 297 : stream(NULL), 298 waiting_for_syn_reply(false) {} 299 300 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream) 301 : stream(stream), 302 waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {} 303 304 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {} 305 306 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {} 307 308 SpdySession::PushedStreamInfo::PushedStreamInfo( 309 SpdyStreamId stream_id, 310 base::TimeTicks creation_time) 311 : stream_id(stream_id), 312 creation_time(creation_time) {} 313 314 SpdySession::PushedStreamInfo::~PushedStreamInfo() {} 315 316 SpdySession::SpdySession( 317 const SpdySessionKey& spdy_session_key, 318 const base::WeakPtr<HttpServerProperties>& http_server_properties, 319 bool verify_domain_authentication, 320 bool enable_sending_initial_data, 321 bool enable_credential_frames, 322 bool enable_compression, 323 bool enable_ping_based_connection_checking, 324 NextProto default_protocol, 325 size_t stream_initial_recv_window_size, 326 size_t initial_max_concurrent_streams, 327 size_t max_concurrent_streams_limit, 328 TimeFunc time_func, 329 const HostPortPair& trusted_spdy_proxy, 330 NetLog* net_log) 331 : weak_factory_(this), 332 in_io_loop_(false), 333 spdy_session_key_(spdy_session_key), 334 pool_(NULL), 335 http_server_properties_(http_server_properties), 336 read_buffer_(new IOBuffer(kReadBufferSize)), 337 stream_hi_water_mark_(kFirstStreamId), 338 in_flight_write_frame_type_(DATA), 339 in_flight_write_frame_size_(0), 340 is_secure_(false), 341 certificate_error_code_(OK), 342 availability_state_(STATE_AVAILABLE), 343 read_state_(READ_STATE_DO_READ), 344 write_state_(WRITE_STATE_IDLE), 345 error_on_close_(OK), 346 max_concurrent_streams_(initial_max_concurrent_streams == 0 ? 347 kInitialMaxConcurrentStreams : 348 initial_max_concurrent_streams), 349 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ? 350 kMaxConcurrentStreamLimit : 351 max_concurrent_streams_limit), 352 streams_initiated_count_(0), 353 streams_pushed_count_(0), 354 streams_pushed_and_claimed_count_(0), 355 streams_abandoned_count_(0), 356 total_bytes_received_(0), 357 sent_settings_(false), 358 received_settings_(false), 359 stalled_streams_(0), 360 pings_in_flight_(0), 361 next_ping_id_(1), 362 last_activity_time_(time_func()), 363 check_ping_status_pending_(false), 364 send_connection_header_prefix_(false), 365 flow_control_state_(FLOW_CONTROL_NONE), 366 stream_initial_send_window_size_(kSpdyStreamInitialWindowSize), 367 stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 ? 368 kDefaultInitialRecvWindowSize : 369 stream_initial_recv_window_size), 370 session_send_window_size_(0), 371 session_recv_window_size_(0), 372 session_unacked_recv_window_bytes_(0), 373 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)), 374 verify_domain_authentication_(verify_domain_authentication), 375 enable_sending_initial_data_(enable_sending_initial_data), 376 enable_credential_frames_(enable_credential_frames), 377 enable_compression_(enable_compression), 378 enable_ping_based_connection_checking_( 379 enable_ping_based_connection_checking), 380 protocol_(default_protocol), 381 credential_state_(SpdyCredentialState::kDefaultNumSlots), 382 connection_at_risk_of_loss_time_( 383 base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)), 384 hung_interval_( 385 base::TimeDelta::FromSeconds(kHungIntervalSeconds)), 386 trusted_spdy_proxy_(trusted_spdy_proxy), 387 time_func_(time_func) { 388 // TODO(akalin): Change this to kProtoSPDYMinimumVersion once we 389 // stop supporting SPDY/1. 390 DCHECK_GE(protocol_, kProtoSPDY2); 391 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion); 392 DCHECK(HttpStreamFactory::spdy_enabled()); 393 net_log_.BeginEvent( 394 NetLog::TYPE_SPDY_SESSION, 395 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair())); 396 next_unclaimed_push_stream_sweep_time_ = time_func_() + 397 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 398 // TODO(mbelshe): consider randomization of the stream_hi_water_mark. 399 } 400 401 SpdySession::~SpdySession() { 402 CHECK(!in_io_loop_); 403 DCHECK(!pool_); 404 DcheckClosed(); 405 406 // TODO(akalin): Check connection->is_initialized() instead. This 407 // requires re-working CreateFakeSpdySession(), though. 408 DCHECK(connection_->socket()); 409 // With SPDY we can't recycle sockets. 410 connection_->socket()->Disconnect(); 411 412 RecordHistograms(); 413 414 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); 415 } 416 417 Error SpdySession::InitializeWithSocket( 418 scoped_ptr<ClientSocketHandle> connection, 419 SpdySessionPool* pool, 420 bool is_secure, 421 int certificate_error_code) { 422 CHECK(!in_io_loop_); 423 DCHECK_EQ(availability_state_, STATE_AVAILABLE); 424 DCHECK_EQ(read_state_, READ_STATE_DO_READ); 425 DCHECK_EQ(write_state_, WRITE_STATE_IDLE); 426 DCHECK(!connection_); 427 428 DCHECK(certificate_error_code == OK || 429 certificate_error_code < ERR_IO_PENDING); 430 // TODO(akalin): Check connection->is_initialized() instead. This 431 // requires re-working CreateFakeSpdySession(), though. 432 DCHECK(connection->socket()); 433 434 base::StatsCounter spdy_sessions("spdy.sessions"); 435 spdy_sessions.Increment(); 436 437 connection_ = connection.Pass(); 438 is_secure_ = is_secure; 439 certificate_error_code_ = certificate_error_code; 440 441 NextProto protocol_negotiated = 442 connection_->socket()->GetNegotiatedProtocol(); 443 if (protocol_negotiated != kProtoUnknown) { 444 protocol_ = protocol_negotiated; 445 } 446 // TODO(akalin): Change this to kProtoSPDYMinimumVersion once we 447 // stop supporting SPDY/1. 448 DCHECK_GE(protocol_, kProtoSPDY2); 449 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion); 450 451 SSLClientSocket* ssl_socket = GetSSLClientSocket(); 452 if (ssl_socket && ssl_socket->WasChannelIDSent()) { 453 // According to the SPDY spec, the credential associated with the TLS 454 // connection is stored in slot[1]. 455 credential_state_.SetHasCredential(GURL("https://" + 456 host_port_pair().ToString())); 457 } 458 459 if (protocol_ == kProtoHTTP2Draft04) 460 send_connection_header_prefix_ = true; 461 462 if (protocol_ >= kProtoSPDY31) { 463 flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION; 464 session_send_window_size_ = kSpdySessionInitialWindowSize; 465 session_recv_window_size_ = kSpdySessionInitialWindowSize; 466 } else if (protocol_ >= kProtoSPDY3) { 467 flow_control_state_ = FLOW_CONTROL_STREAM; 468 } else { 469 flow_control_state_ = FLOW_CONTROL_NONE; 470 } 471 472 buffered_spdy_framer_.reset( 473 new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_), 474 enable_compression_)); 475 buffered_spdy_framer_->set_visitor(this); 476 buffered_spdy_framer_->set_debug_visitor(this); 477 UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion); 478 #if defined(SPDY_PROXY_AUTH_ORIGIN) 479 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy", 480 host_port_pair().Equals(HostPortPair::FromURL( 481 GURL(SPDY_PROXY_AUTH_ORIGIN)))); 482 #endif 483 484 net_log_.AddEvent( 485 NetLog::TYPE_SPDY_SESSION_INITIALIZED, 486 connection_->socket()->NetLog().source().ToEventParametersCallback()); 487 488 int error = DoReadLoop(READ_STATE_DO_READ, OK); 489 if (error == ERR_IO_PENDING) 490 error = OK; 491 if (error == OK) { 492 DCHECK_NE(availability_state_, STATE_CLOSED); 493 connection_->AddLayeredPool(this); 494 if (enable_sending_initial_data_) 495 SendInitialData(); 496 pool_ = pool; 497 } else { 498 DcheckClosed(); 499 } 500 return static_cast<Error>(error); 501 } 502 503 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { 504 if (!verify_domain_authentication_) 505 return true; 506 507 if (availability_state_ == STATE_CLOSED) 508 return false; 509 510 SSLInfo ssl_info; 511 bool was_npn_negotiated; 512 NextProto protocol_negotiated = kProtoUnknown; 513 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated)) 514 return true; // This is not a secure session, so all domains are okay. 515 516 return !ssl_info.client_cert_sent && 517 (enable_credential_frames_ || !ssl_info.channel_id_sent || 518 ServerBoundCertService::GetDomainForHost(domain) == 519 ServerBoundCertService::GetDomainForHost(host_port_pair().host())) && 520 ssl_info.cert->VerifyNameMatch(domain); 521 } 522 523 int SpdySession::GetPushStream( 524 const GURL& url, 525 base::WeakPtr<SpdyStream>* stream, 526 const BoundNetLog& stream_net_log) { 527 CHECK(!in_io_loop_); 528 529 stream->reset(); 530 531 // TODO(akalin): Add unit test exercising this code path. 532 if (availability_state_ == STATE_CLOSED) 533 return ERR_CONNECTION_CLOSED; 534 535 Error err = TryAccessStream(url); 536 if (err != OK) 537 return err; 538 539 *stream = GetActivePushStream(url); 540 if (*stream) { 541 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_); 542 streams_pushed_and_claimed_count_++; 543 } 544 return OK; 545 } 546 547 // {,Try}CreateStream() and TryAccessStream() can be called with 548 // |in_io_loop_| set if a stream is being created in response to 549 // another being closed due to received data. 550 551 Error SpdySession::TryAccessStream(const GURL& url) { 552 DCHECK_NE(availability_state_, STATE_CLOSED); 553 554 if (is_secure_ && certificate_error_code_ != OK && 555 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 556 RecordProtocolErrorHistogram( 557 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION); 558 CloseSessionResult result = DoCloseSession( 559 static_cast<Error>(certificate_error_code_), 560 "Tried to get SPDY stream for secure content over an unauthenticated " 561 "session."); 562 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); 563 return ERR_SPDY_PROTOCOL_ERROR; 564 } 565 return OK; 566 } 567 568 int SpdySession::TryCreateStream(SpdyStreamRequest* request, 569 base::WeakPtr<SpdyStream>* stream) { 570 CHECK(request); 571 572 if (availability_state_ == STATE_GOING_AWAY) 573 return ERR_FAILED; 574 575 // TODO(akalin): Add unit test exercising this code path. 576 if (availability_state_ == STATE_CLOSED) 577 return ERR_CONNECTION_CLOSED; 578 579 Error err = TryAccessStream(request->url()); 580 if (err != OK) 581 return err; 582 583 if (!max_concurrent_streams_ || 584 (active_streams_.size() + created_streams_.size() < 585 max_concurrent_streams_)) { 586 return CreateStream(*request, stream); 587 } 588 589 stalled_streams_++; 590 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS); 591 pending_create_stream_queues_[request->priority()].push_back(request); 592 return ERR_IO_PENDING; 593 } 594 595 int SpdySession::CreateStream(const SpdyStreamRequest& request, 596 base::WeakPtr<SpdyStream>* stream) { 597 DCHECK_GE(request.priority(), MINIMUM_PRIORITY); 598 DCHECK_LT(request.priority(), NUM_PRIORITIES); 599 600 if (availability_state_ == STATE_GOING_AWAY) 601 return ERR_FAILED; 602 603 // TODO(akalin): Add unit test exercising this code path. 604 if (availability_state_ == STATE_CLOSED) 605 return ERR_CONNECTION_CLOSED; 606 607 Error err = TryAccessStream(request.url()); 608 if (err != OK) { 609 // This should have been caught in TryCreateStream(). 610 NOTREACHED(); 611 return err; 612 } 613 614 DCHECK(connection_->socket()); 615 DCHECK(connection_->socket()->IsConnected()); 616 if (connection_->socket()) { 617 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected", 618 connection_->socket()->IsConnected()); 619 if (!connection_->socket()->IsConnected()) { 620 CloseSessionResult result = DoCloseSession( 621 ERR_CONNECTION_CLOSED, 622 "Tried to create SPDY stream for a closed socket connection."); 623 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); 624 return ERR_CONNECTION_CLOSED; 625 } 626 } 627 628 scoped_ptr<SpdyStream> new_stream( 629 new SpdyStream(request.type(), GetWeakPtr(), request.url(), 630 request.priority(), 631 stream_initial_send_window_size_, 632 stream_initial_recv_window_size_, 633 request.net_log())); 634 *stream = new_stream->GetWeakPtr(); 635 InsertCreatedStream(new_stream.Pass()); 636 637 UMA_HISTOGRAM_CUSTOM_COUNTS( 638 "Net.SpdyPriorityCount", 639 static_cast<int>(request.priority()), 0, 10, 11); 640 641 return OK; 642 } 643 644 void SpdySession::CancelStreamRequest(SpdyStreamRequest* request) { 645 CHECK(request); 646 647 if (DCHECK_IS_ON()) { 648 // |request| should not be in a queue not matching its priority. 649 for (int i = 0; i < NUM_PRIORITIES; ++i) { 650 if (request->priority() == i) 651 continue; 652 PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i]; 653 DCHECK(std::find(queue->begin(), queue->end(), request) == queue->end()); 654 } 655 } 656 657 PendingStreamRequestQueue* queue = 658 &pending_create_stream_queues_[request->priority()]; 659 // Remove |request| from |queue| while preserving the order of the 660 // other elements. 661 PendingStreamRequestQueue::iterator it = 662 std::find(queue->begin(), queue->end(), request); 663 if (it != queue->end()) { 664 it = queue->erase(it); 665 // |request| should be in the queue at most once, and if it is 666 // present, should not be pending completion. 667 DCHECK(std::find(it, queue->end(), request) == queue->end()); 668 DCHECK(!ContainsKey(pending_stream_request_completions_, 669 request)); 670 return; 671 } 672 673 pending_stream_request_completions_.erase(request); 674 } 675 676 void SpdySession::ProcessPendingStreamRequests() { 677 // Like |max_concurrent_streams_|, 0 means infinite for 678 // |max_requests_to_process|. 679 size_t max_requests_to_process = 0; 680 if (max_concurrent_streams_ != 0) { 681 max_requests_to_process = 682 max_concurrent_streams_ - 683 (active_streams_.size() + created_streams_.size()); 684 } 685 for (size_t i = 0; 686 max_requests_to_process == 0 || i < max_requests_to_process; ++i) { 687 bool processed_request = false; 688 for (int j = NUM_PRIORITIES - 1; j >= MINIMUM_PRIORITY; --j) { 689 if (pending_create_stream_queues_[j].empty()) 690 continue; 691 692 SpdyStreamRequest* pending_request = 693 pending_create_stream_queues_[j].front(); 694 CHECK(pending_request); 695 pending_create_stream_queues_[j].pop_front(); 696 processed_request = true; 697 DCHECK(!ContainsKey(pending_stream_request_completions_, 698 pending_request)); 699 pending_stream_request_completions_.insert(pending_request); 700 base::MessageLoop::current()->PostTask( 701 FROM_HERE, 702 base::Bind(&SpdySession::CompleteStreamRequest, 703 weak_factory_.GetWeakPtr(), pending_request)); 704 break; 705 } 706 if (!processed_request) 707 break; 708 } 709 } 710 711 bool SpdySession::NeedsCredentials() const { 712 if (!is_secure_) 713 return false; 714 SSLClientSocket* ssl_socket = GetSSLClientSocket(); 715 if (ssl_socket->GetNegotiatedProtocol() < kProtoSPDY3) 716 return false; 717 return ssl_socket->WasChannelIDSent(); 718 } 719 720 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) { 721 pooled_aliases_.insert(alias_key); 722 } 723 724 int SpdySession::GetProtocolVersion() const { 725 DCHECK(buffered_spdy_framer_.get()); 726 return buffered_spdy_framer_->protocol_version(); 727 } 728 729 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() { 730 return weak_factory_.GetWeakPtr(); 731 } 732 733 bool SpdySession::CloseOneIdleConnection() { 734 CHECK(!in_io_loop_); 735 DCHECK_NE(availability_state_, STATE_CLOSED); 736 DCHECK(pool_); 737 if (!active_streams_.empty()) 738 return false; 739 CloseSessionResult result = 740 DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection."); 741 if (result != SESSION_CLOSED_AND_REMOVED) { 742 NOTREACHED(); 743 return false; 744 } 745 return true; 746 } 747 748 void SpdySession::EnqueueStreamWrite( 749 const base::WeakPtr<SpdyStream>& stream, 750 SpdyFrameType frame_type, 751 scoped_ptr<SpdyBufferProducer> producer) { 752 DCHECK(frame_type == HEADERS || 753 frame_type == DATA || 754 frame_type == CREDENTIAL || 755 frame_type == SYN_STREAM); 756 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream); 757 } 758 759 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( 760 SpdyStreamId stream_id, 761 RequestPriority priority, 762 uint8 credential_slot, 763 SpdyControlFlags flags, 764 const SpdyHeaderBlock& headers) { 765 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 766 CHECK(it != active_streams_.end()); 767 CHECK_EQ(it->second.stream->stream_id(), stream_id); 768 769 SendPrefacePingIfNoneInFlight(); 770 771 DCHECK(buffered_spdy_framer_.get()); 772 scoped_ptr<SpdyFrame> syn_frame( 773 buffered_spdy_framer_->CreateSynStream( 774 stream_id, 0, 775 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()), 776 credential_slot, flags, enable_compression_, &headers)); 777 778 base::StatsCounter spdy_requests("spdy.requests"); 779 spdy_requests.Increment(); 780 streams_initiated_count_++; 781 782 if (net_log().IsLoggingAllEvents()) { 783 net_log().AddEvent( 784 NetLog::TYPE_SPDY_SESSION_SYN_STREAM, 785 base::Bind(&NetLogSpdySynCallback, &headers, 786 (flags & CONTROL_FLAG_FIN) != 0, 787 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0, 788 stream_id, 0)); 789 } 790 791 return syn_frame.Pass(); 792 } 793 794 int SpdySession::CreateCredentialFrame( 795 const std::string& origin, 796 const std::string& key, 797 const std::string& cert, 798 RequestPriority priority, 799 scoped_ptr<SpdyFrame>* credential_frame) { 800 DCHECK(is_secure_); 801 SSLClientSocket* ssl_socket = GetSSLClientSocket(); 802 DCHECK(ssl_socket); 803 DCHECK(ssl_socket->WasChannelIDSent()); 804 805 SpdyCredential credential; 806 std::string tls_unique; 807 ssl_socket->GetTLSUniqueChannelBinding(&tls_unique); 808 size_t slot = credential_state_.SetHasCredential(GURL(origin)); 809 int rv = SpdyCredentialBuilder::Build(tls_unique, key, cert, slot, 810 &credential); 811 DCHECK_NE(rv, ERR_IO_PENDING); 812 if (rv != OK) 813 return rv; 814 815 DCHECK(buffered_spdy_framer_.get()); 816 credential_frame->reset( 817 buffered_spdy_framer_->CreateCredentialFrame(credential)); 818 819 if (net_log().IsLoggingAllEvents()) { 820 net_log().AddEvent( 821 NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, 822 base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); 823 } 824 return OK; 825 } 826 827 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, 828 IOBuffer* data, 829 int len, 830 SpdyDataFlags flags) { 831 if (availability_state_ == STATE_CLOSED) { 832 NOTREACHED(); 833 return scoped_ptr<SpdyBuffer>(); 834 } 835 836 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 837 CHECK(it != active_streams_.end()); 838 SpdyStream* stream = it->second.stream; 839 CHECK_EQ(stream->stream_id(), stream_id); 840 841 if (len < 0) { 842 NOTREACHED(); 843 return scoped_ptr<SpdyBuffer>(); 844 } 845 846 int effective_len = std::min(len, kMaxSpdyFrameChunkSize); 847 848 bool send_stalled_by_stream = 849 (flow_control_state_ >= FLOW_CONTROL_STREAM) && 850 (stream->send_window_size() <= 0); 851 bool send_stalled_by_session = IsSendStalled(); 852 853 // NOTE: There's an enum of the same name in histograms.xml. 854 enum SpdyFrameFlowControlState { 855 SEND_NOT_STALLED, 856 SEND_STALLED_BY_STREAM, 857 SEND_STALLED_BY_SESSION, 858 SEND_STALLED_BY_STREAM_AND_SESSION, 859 }; 860 861 SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED; 862 if (send_stalled_by_stream) { 863 if (send_stalled_by_session) { 864 frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION; 865 } else { 866 frame_flow_control_state = SEND_STALLED_BY_STREAM; 867 } 868 } else if (send_stalled_by_session) { 869 frame_flow_control_state = SEND_STALLED_BY_SESSION; 870 } 871 872 if (flow_control_state_ == FLOW_CONTROL_STREAM) { 873 UMA_HISTOGRAM_ENUMERATION( 874 "Net.SpdyFrameStreamFlowControlState", 875 frame_flow_control_state, 876 SEND_STALLED_BY_STREAM + 1); 877 } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 878 UMA_HISTOGRAM_ENUMERATION( 879 "Net.SpdyFrameStreamAndSessionFlowControlState", 880 frame_flow_control_state, 881 SEND_STALLED_BY_STREAM_AND_SESSION + 1); 882 } 883 884 // Obey send window size of the stream if stream flow control is 885 // enabled. 886 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { 887 if (send_stalled_by_stream) { 888 stream->set_send_stalled_by_flow_control(true); 889 // Even though we're currently stalled only by the stream, we 890 // might end up being stalled by the session also. 891 QueueSendStalledStream(*stream); 892 net_log().AddEvent( 893 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW, 894 NetLog::IntegerCallback("stream_id", stream_id)); 895 return scoped_ptr<SpdyBuffer>(); 896 } 897 898 effective_len = std::min(effective_len, stream->send_window_size()); 899 } 900 901 // Obey send window size of the session if session flow control is 902 // enabled. 903 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 904 if (send_stalled_by_session) { 905 stream->set_send_stalled_by_flow_control(true); 906 QueueSendStalledStream(*stream); 907 net_log().AddEvent( 908 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW, 909 NetLog::IntegerCallback("stream_id", stream_id)); 910 return scoped_ptr<SpdyBuffer>(); 911 } 912 913 effective_len = std::min(effective_len, session_send_window_size_); 914 } 915 916 DCHECK_GE(effective_len, 0); 917 918 // Clear FIN flag if only some of the data will be in the data 919 // frame. 920 if (effective_len < len) 921 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 922 923 if (net_log().IsLoggingAllEvents()) { 924 net_log().AddEvent( 925 NetLog::TYPE_SPDY_SESSION_SEND_DATA, 926 base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len, 927 (flags & DATA_FLAG_FIN) != 0)); 928 } 929 930 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. 931 if (effective_len > 0) 932 SendPrefacePingIfNoneInFlight(); 933 934 // TODO(mbelshe): reduce memory copies here. 935 DCHECK(buffered_spdy_framer_.get()); 936 scoped_ptr<SpdyFrame> frame( 937 buffered_spdy_framer_->CreateDataFrame( 938 stream_id, data->data(), 939 static_cast<uint32>(effective_len), flags)); 940 941 scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass())); 942 943 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 944 DecreaseSendWindowSize(static_cast<int32>(effective_len)); 945 data_buffer->AddConsumeCallback( 946 base::Bind(&SpdySession::OnWriteBufferConsumed, 947 weak_factory_.GetWeakPtr(), 948 static_cast<size_t>(effective_len))); 949 } 950 951 return data_buffer.Pass(); 952 } 953 954 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) { 955 DCHECK_NE(stream_id, 0u); 956 957 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 958 if (it == active_streams_.end()) { 959 NOTREACHED(); 960 return; 961 } 962 963 CloseActiveStreamIterator(it, status); 964 } 965 966 void SpdySession::CloseCreatedStream( 967 const base::WeakPtr<SpdyStream>& stream, int status) { 968 DCHECK_EQ(stream->stream_id(), 0u); 969 970 CreatedStreamSet::iterator it = created_streams_.find(stream.get()); 971 if (it == created_streams_.end()) { 972 NOTREACHED(); 973 return; 974 } 975 976 CloseCreatedStreamIterator(it, status); 977 } 978 979 void SpdySession::ResetStream(SpdyStreamId stream_id, 980 SpdyRstStreamStatus status, 981 const std::string& description) { 982 DCHECK_NE(stream_id, 0u); 983 984 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 985 if (it == active_streams_.end()) { 986 NOTREACHED(); 987 return; 988 } 989 990 ResetStreamIterator(it, status, description); 991 } 992 993 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { 994 return ContainsKey(active_streams_, stream_id); 995 } 996 997 LoadState SpdySession::GetLoadState() const { 998 // Just report that we're idle since the session could be doing 999 // many things concurrently. 1000 return LOAD_STATE_IDLE; 1001 } 1002 1003 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it, 1004 int status) { 1005 // TODO(mbelshe): We should send a RST_STREAM control frame here 1006 // so that the server can cancel a large send. 1007 1008 scoped_ptr<SpdyStream> owned_stream(it->second.stream); 1009 active_streams_.erase(it); 1010 1011 // TODO(akalin): When SpdyStream was ref-counted (and 1012 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this 1013 // was only done when status was not OK. This meant that pushed 1014 // streams can still be claimed after they're closed. This is 1015 // probably something that we still want to support, although server 1016 // push is hardly used. Write tests for this and fix this. (See 1017 // http://crbug.com/261712 .) 1018 if (owned_stream->type() == SPDY_PUSH_STREAM) 1019 unclaimed_pushed_streams_.erase(owned_stream->url()); 1020 1021 DeleteStream(owned_stream.Pass(), status); 1022 } 1023 1024 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it, 1025 int status) { 1026 scoped_ptr<SpdyStream> owned_stream(*it); 1027 created_streams_.erase(it); 1028 DeleteStream(owned_stream.Pass(), status); 1029 } 1030 1031 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it, 1032 SpdyRstStreamStatus status, 1033 const std::string& description) { 1034 // Send the RST_STREAM frame first as CloseActiveStreamIterator() 1035 // may close us. 1036 SpdyStreamId stream_id = it->first; 1037 RequestPriority priority = it->second.stream->priority(); 1038 EnqueueResetStreamFrame(stream_id, priority, status, description); 1039 1040 // Removes any pending writes for the stream except for possibly an 1041 // in-flight one. 1042 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 1043 } 1044 1045 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id, 1046 RequestPriority priority, 1047 SpdyRstStreamStatus status, 1048 const std::string& description) { 1049 DCHECK_NE(stream_id, 0u); 1050 1051 net_log().AddEvent( 1052 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM, 1053 base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description)); 1054 1055 DCHECK(buffered_spdy_framer_.get()); 1056 scoped_ptr<SpdyFrame> rst_frame( 1057 buffered_spdy_framer_->CreateRstStream(stream_id, status)); 1058 1059 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass()); 1060 RecordProtocolErrorHistogram( 1061 static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); 1062 } 1063 1064 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) { 1065 CHECK(!in_io_loop_); 1066 DCHECK_NE(availability_state_, STATE_CLOSED); 1067 DCHECK_EQ(read_state_, expected_read_state); 1068 1069 result = DoReadLoop(expected_read_state, result); 1070 1071 if (availability_state_ == STATE_CLOSED) { 1072 DCHECK_EQ(result, error_on_close_); 1073 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1074 RemoveFromPool(); 1075 return; 1076 } 1077 1078 DCHECK(result == OK || result == ERR_IO_PENDING); 1079 } 1080 1081 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { 1082 CHECK(!in_io_loop_); 1083 DCHECK_NE(availability_state_, STATE_CLOSED); 1084 DCHECK_EQ(read_state_, expected_read_state); 1085 1086 in_io_loop_ = true; 1087 1088 int bytes_read_without_yielding = 0; 1089 1090 // Loop until the session is closed, the read becomes blocked, or 1091 // the read limit is exceeded. 1092 while (true) { 1093 switch (read_state_) { 1094 case READ_STATE_DO_READ: 1095 DCHECK_EQ(result, OK); 1096 result = DoRead(); 1097 break; 1098 case READ_STATE_DO_READ_COMPLETE: 1099 if (result > 0) 1100 bytes_read_without_yielding += result; 1101 result = DoReadComplete(result); 1102 break; 1103 default: 1104 NOTREACHED() << "read_state_: " << read_state_; 1105 break; 1106 } 1107 1108 if (availability_state_ == STATE_CLOSED) { 1109 DCHECK_EQ(result, error_on_close_); 1110 DCHECK_LT(result, ERR_IO_PENDING); 1111 break; 1112 } 1113 1114 if (result == ERR_IO_PENDING) 1115 break; 1116 1117 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) { 1118 read_state_ = READ_STATE_DO_READ; 1119 base::MessageLoop::current()->PostTask( 1120 FROM_HERE, 1121 base::Bind(&SpdySession::PumpReadLoop, 1122 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 1123 result = ERR_IO_PENDING; 1124 break; 1125 } 1126 } 1127 1128 CHECK(in_io_loop_); 1129 in_io_loop_ = false; 1130 1131 return result; 1132 } 1133 1134 int SpdySession::DoRead() { 1135 CHECK(in_io_loop_); 1136 DCHECK_NE(availability_state_, STATE_CLOSED); 1137 1138 CHECK(connection_); 1139 CHECK(connection_->socket()); 1140 read_state_ = READ_STATE_DO_READ_COMPLETE; 1141 return connection_->socket()->Read( 1142 read_buffer_.get(), 1143 kReadBufferSize, 1144 base::Bind(&SpdySession::PumpReadLoop, 1145 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE)); 1146 } 1147 1148 int SpdySession::DoReadComplete(int result) { 1149 CHECK(in_io_loop_); 1150 DCHECK_NE(availability_state_, STATE_CLOSED); 1151 1152 // Parse a frame. For now this code requires that the frame fit into our 1153 // buffer (kReadBufferSize). 1154 // TODO(mbelshe): support arbitrarily large frames! 1155 1156 if (result == 0) { 1157 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", 1158 total_bytes_received_, 1, 100000000, 50); 1159 CloseSessionResult close_session_result = 1160 DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed"); 1161 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); 1162 DCHECK_EQ(availability_state_, STATE_CLOSED); 1163 DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED); 1164 return ERR_CONNECTION_CLOSED; 1165 } 1166 1167 if (result < 0) { 1168 CloseSessionResult close_session_result = 1169 DoCloseSession(static_cast<Error>(result), "result is < 0."); 1170 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); 1171 DCHECK_EQ(availability_state_, STATE_CLOSED); 1172 DCHECK_EQ(error_on_close_, result); 1173 return result; 1174 } 1175 1176 total_bytes_received_ += result; 1177 1178 last_activity_time_ = time_func_(); 1179 1180 DCHECK(buffered_spdy_framer_.get()); 1181 char* data = read_buffer_->data(); 1182 while (result > 0) { 1183 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result); 1184 result -= bytes_processed; 1185 data += bytes_processed; 1186 1187 if (availability_state_ == STATE_CLOSED) { 1188 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1189 return error_on_close_; 1190 } 1191 1192 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); 1193 } 1194 1195 read_state_ = READ_STATE_DO_READ; 1196 return OK; 1197 } 1198 1199 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) { 1200 CHECK(!in_io_loop_); 1201 DCHECK_NE(availability_state_, STATE_CLOSED); 1202 DCHECK_EQ(write_state_, expected_write_state); 1203 1204 result = DoWriteLoop(expected_write_state, result); 1205 1206 if (availability_state_ == STATE_CLOSED) { 1207 DCHECK_EQ(result, error_on_close_); 1208 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1209 RemoveFromPool(); 1210 return; 1211 } 1212 1213 DCHECK(result == OK || result == ERR_IO_PENDING); 1214 } 1215 1216 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { 1217 CHECK(!in_io_loop_); 1218 DCHECK_NE(availability_state_, STATE_CLOSED); 1219 DCHECK_NE(write_state_, WRITE_STATE_IDLE); 1220 DCHECK_EQ(write_state_, expected_write_state); 1221 1222 in_io_loop_ = true; 1223 1224 // Loop until the session is closed or the write becomes blocked. 1225 while (true) { 1226 switch (write_state_) { 1227 case WRITE_STATE_DO_WRITE: 1228 DCHECK_EQ(result, OK); 1229 result = DoWrite(); 1230 break; 1231 case WRITE_STATE_DO_WRITE_COMPLETE: 1232 result = DoWriteComplete(result); 1233 break; 1234 case WRITE_STATE_IDLE: 1235 default: 1236 NOTREACHED() << "write_state_: " << write_state_; 1237 break; 1238 } 1239 1240 if (availability_state_ == STATE_CLOSED) { 1241 DCHECK_EQ(result, error_on_close_); 1242 DCHECK_LT(result, ERR_IO_PENDING); 1243 break; 1244 } 1245 1246 if (write_state_ == WRITE_STATE_IDLE) { 1247 DCHECK_EQ(result, ERR_IO_PENDING); 1248 break; 1249 } 1250 1251 if (result == ERR_IO_PENDING) 1252 break; 1253 } 1254 1255 CHECK(in_io_loop_); 1256 in_io_loop_ = false; 1257 1258 return result; 1259 } 1260 1261 int SpdySession::DoWrite() { 1262 CHECK(in_io_loop_); 1263 DCHECK_NE(availability_state_, STATE_CLOSED); 1264 1265 DCHECK(buffered_spdy_framer_); 1266 if (in_flight_write_) { 1267 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1268 } else { 1269 // Grab the next frame to send. 1270 SpdyFrameType frame_type = DATA; 1271 scoped_ptr<SpdyBufferProducer> producer; 1272 base::WeakPtr<SpdyStream> stream; 1273 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) { 1274 write_state_ = WRITE_STATE_IDLE; 1275 return ERR_IO_PENDING; 1276 } 1277 1278 if (stream.get()) 1279 DCHECK(!stream->IsClosed()); 1280 1281 // Activate the stream only when sending the SYN_STREAM frame to 1282 // guarantee monotonically-increasing stream IDs. 1283 if (frame_type == SYN_STREAM) { 1284 if (stream.get() && stream->stream_id() == 0) { 1285 scoped_ptr<SpdyStream> owned_stream = 1286 ActivateCreatedStream(stream.get()); 1287 InsertActivatedStream(owned_stream.Pass()); 1288 } else { 1289 NOTREACHED(); 1290 return ERR_UNEXPECTED; 1291 } 1292 } 1293 1294 in_flight_write_ = producer->ProduceBuffer(); 1295 if (!in_flight_write_) { 1296 NOTREACHED(); 1297 return ERR_UNEXPECTED; 1298 } 1299 in_flight_write_frame_type_ = frame_type; 1300 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); 1301 DCHECK_GE(in_flight_write_frame_size_, 1302 buffered_spdy_framer_->GetFrameMinimumSize()); 1303 in_flight_write_stream_ = stream; 1304 } 1305 1306 write_state_ = WRITE_STATE_DO_WRITE_COMPLETE; 1307 1308 // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems 1309 // with Socket implementations that don't store their IOBuffer 1310 // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). 1311 scoped_refptr<IOBuffer> write_io_buffer = 1312 in_flight_write_->GetIOBufferForRemainingData(); 1313 return connection_->socket()->Write( 1314 write_io_buffer.get(), 1315 in_flight_write_->GetRemainingSize(), 1316 base::Bind(&SpdySession::PumpWriteLoop, 1317 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE)); 1318 } 1319 1320 int SpdySession::DoWriteComplete(int result) { 1321 CHECK(in_io_loop_); 1322 DCHECK_NE(availability_state_, STATE_CLOSED); 1323 DCHECK_NE(result, ERR_IO_PENDING); 1324 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1325 1326 last_activity_time_ = time_func_(); 1327 1328 if (result < 0) { 1329 DCHECK_NE(result, ERR_IO_PENDING); 1330 in_flight_write_.reset(); 1331 in_flight_write_frame_type_ = DATA; 1332 in_flight_write_frame_size_ = 0; 1333 in_flight_write_stream_.reset(); 1334 CloseSessionResult close_session_result = 1335 DoCloseSession(static_cast<Error>(result), "Write error"); 1336 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED); 1337 DCHECK_EQ(availability_state_, STATE_CLOSED); 1338 DCHECK_EQ(error_on_close_, result); 1339 return result; 1340 } 1341 1342 // It should not be possible to have written more bytes than our 1343 // in_flight_write_. 1344 DCHECK_LE(static_cast<size_t>(result), 1345 in_flight_write_->GetRemainingSize()); 1346 1347 if (result > 0) { 1348 in_flight_write_->Consume(static_cast<size_t>(result)); 1349 1350 // We only notify the stream when we've fully written the pending frame. 1351 if (in_flight_write_->GetRemainingSize() == 0) { 1352 // It is possible that the stream was cancelled while we were 1353 // writing to the socket. 1354 if (in_flight_write_stream_.get()) { 1355 DCHECK_GT(in_flight_write_frame_size_, 0u); 1356 in_flight_write_stream_->OnFrameWriteComplete( 1357 in_flight_write_frame_type_, 1358 in_flight_write_frame_size_); 1359 } 1360 1361 // Cleanup the write which just completed. 1362 in_flight_write_.reset(); 1363 in_flight_write_frame_type_ = DATA; 1364 in_flight_write_frame_size_ = 0; 1365 in_flight_write_stream_.reset(); 1366 } 1367 } 1368 1369 write_state_ = WRITE_STATE_DO_WRITE; 1370 return OK; 1371 } 1372 1373 void SpdySession::DcheckGoingAway() const { 1374 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1375 if (DCHECK_IS_ON()) { 1376 for (int i = 0; i < NUM_PRIORITIES; ++i) { 1377 DCHECK(pending_create_stream_queues_[i].empty()); 1378 } 1379 } 1380 DCHECK(pending_stream_request_completions_.empty()); 1381 DCHECK(created_streams_.empty()); 1382 } 1383 1384 void SpdySession::DcheckClosed() const { 1385 DcheckGoingAway(); 1386 DCHECK_EQ(availability_state_, STATE_CLOSED); 1387 DCHECK_LT(error_on_close_, ERR_IO_PENDING); 1388 DCHECK(active_streams_.empty()); 1389 DCHECK(unclaimed_pushed_streams_.empty()); 1390 DCHECK(write_queue_.IsEmpty()); 1391 } 1392 1393 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id, 1394 Error status) { 1395 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1396 1397 // The loops below are carefully written to avoid reentrancy problems. 1398 // 1399 // TODO(akalin): Any of the functions below can cause |this| to be 1400 // deleted, so handle that below (and add tests for it). 1401 1402 for (int i = 0; i < NUM_PRIORITIES; ++i) { 1403 PendingStreamRequestQueue queue; 1404 queue.swap(pending_create_stream_queues_[i]); 1405 for (PendingStreamRequestQueue::const_iterator it = queue.begin(); 1406 it != queue.end(); ++it) { 1407 CHECK(*it); 1408 (*it)->OnRequestCompleteFailure(ERR_ABORTED); 1409 } 1410 } 1411 1412 PendingStreamRequestCompletionSet pending_completions; 1413 pending_completions.swap(pending_stream_request_completions_); 1414 for (PendingStreamRequestCompletionSet::const_iterator it = 1415 pending_completions.begin(); 1416 it != pending_completions.end(); ++it) { 1417 (*it)->OnRequestCompleteFailure(ERR_ABORTED); 1418 } 1419 1420 while (true) { 1421 ActiveStreamMap::iterator it = 1422 active_streams_.lower_bound(last_good_stream_id + 1); 1423 if (it == active_streams_.end()) 1424 break; 1425 LogAbandonedActiveStream(it, status); 1426 CloseActiveStreamIterator(it, status); 1427 } 1428 1429 while (!created_streams_.empty()) { 1430 CreatedStreamSet::iterator it = created_streams_.begin(); 1431 LogAbandonedStream(*it, status); 1432 CloseCreatedStreamIterator(it, status); 1433 } 1434 1435 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id); 1436 1437 DcheckGoingAway(); 1438 } 1439 1440 void SpdySession::MaybeFinishGoingAway() { 1441 DcheckGoingAway(); 1442 if (active_streams_.empty() && availability_state_ != STATE_CLOSED) { 1443 CloseSessionResult result = 1444 DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away"); 1445 DCHECK_NE(result, SESSION_ALREADY_CLOSED); 1446 } 1447 } 1448 1449 SpdySession::CloseSessionResult SpdySession::DoCloseSession( 1450 Error err, 1451 const std::string& description) { 1452 DCHECK_LT(err, ERR_IO_PENDING); 1453 1454 if (availability_state_ == STATE_CLOSED) 1455 return SESSION_ALREADY_CLOSED; 1456 1457 net_log_.AddEvent( 1458 NetLog::TYPE_SPDY_SESSION_CLOSE, 1459 base::Bind(&NetLogSpdySessionCloseCallback, err, &description)); 1460 1461 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err); 1462 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", 1463 total_bytes_received_, 1, 100000000, 50); 1464 1465 // |pool_| will be NULL when |InitializeWithSocket()| is in the 1466 // call stack. 1467 if (pool_ && availability_state_ != STATE_GOING_AWAY) 1468 pool_->MakeSessionUnavailable(GetWeakPtr()); 1469 1470 availability_state_ = STATE_CLOSED; 1471 error_on_close_ = err; 1472 1473 StartGoingAway(0, err); 1474 write_queue_.Clear(); 1475 1476 DcheckClosed(); 1477 1478 if (in_io_loop_) 1479 return SESSION_CLOSED_BUT_NOT_REMOVED; 1480 1481 RemoveFromPool(); 1482 return SESSION_CLOSED_AND_REMOVED; 1483 } 1484 1485 void SpdySession::RemoveFromPool() { 1486 DcheckClosed(); 1487 CHECK(pool_); 1488 1489 SpdySessionPool* pool = pool_; 1490 pool_ = NULL; 1491 pool->RemoveUnavailableSession(GetWeakPtr()); 1492 } 1493 1494 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) { 1495 DCHECK(stream); 1496 std::string description = base::StringPrintf( 1497 "ABANDONED (stream_id=%d): ", stream->stream_id()) + 1498 stream->url().spec(); 1499 stream->LogStreamError(status, description); 1500 // We don't increment the streams abandoned counter here. If the 1501 // stream isn't active (i.e., it hasn't written anything to the wire 1502 // yet) then it's as if it never existed. If it is active, then 1503 // LogAbandonedActiveStream() will increment the counters. 1504 } 1505 1506 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it, 1507 Error status) { 1508 DCHECK_GT(it->first, 0u); 1509 LogAbandonedStream(it->second.stream, status); 1510 ++streams_abandoned_count_; 1511 base::StatsCounter abandoned_streams("spdy.abandoned_streams"); 1512 abandoned_streams.Increment(); 1513 if (it->second.stream->type() == SPDY_PUSH_STREAM && 1514 unclaimed_pushed_streams_.find(it->second.stream->url()) != 1515 unclaimed_pushed_streams_.end()) { 1516 base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams"); 1517 abandoned_push_streams.Increment(); 1518 } 1519 } 1520 1521 int SpdySession::GetNewStreamId() { 1522 int id = stream_hi_water_mark_; 1523 stream_hi_water_mark_ += 2; 1524 if (stream_hi_water_mark_ > 0x7fff) 1525 stream_hi_water_mark_ = 1; 1526 return id; 1527 } 1528 1529 void SpdySession::CloseSessionOnError(Error err, 1530 const std::string& description) { 1531 // We may be called from anywhere, so we can't expect a particular 1532 // return value. 1533 ignore_result(DoCloseSession(err, description)); 1534 } 1535 1536 base::Value* SpdySession::GetInfoAsValue() const { 1537 base::DictionaryValue* dict = new base::DictionaryValue(); 1538 1539 dict->SetInteger("source_id", net_log_.source().id); 1540 1541 dict->SetString("host_port_pair", host_port_pair().ToString()); 1542 if (!pooled_aliases_.empty()) { 1543 base::ListValue* alias_list = new base::ListValue(); 1544 for (std::set<SpdySessionKey>::const_iterator it = 1545 pooled_aliases_.begin(); 1546 it != pooled_aliases_.end(); it++) { 1547 alias_list->Append(new base::StringValue( 1548 it->host_port_pair().ToString())); 1549 } 1550 dict->Set("aliases", alias_list); 1551 } 1552 dict->SetString("proxy", host_port_proxy_pair().second.ToURI()); 1553 1554 dict->SetInteger("active_streams", active_streams_.size()); 1555 1556 dict->SetInteger("unclaimed_pushed_streams", 1557 unclaimed_pushed_streams_.size()); 1558 1559 dict->SetBoolean("is_secure", is_secure_); 1560 1561 dict->SetString("protocol_negotiated", 1562 SSLClientSocket::NextProtoToString( 1563 connection_->socket()->GetNegotiatedProtocol())); 1564 1565 dict->SetInteger("error", error_on_close_); 1566 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_); 1567 1568 dict->SetInteger("streams_initiated_count", streams_initiated_count_); 1569 dict->SetInteger("streams_pushed_count", streams_pushed_count_); 1570 dict->SetInteger("streams_pushed_and_claimed_count", 1571 streams_pushed_and_claimed_count_); 1572 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_); 1573 DCHECK(buffered_spdy_framer_.get()); 1574 dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received()); 1575 1576 dict->SetBoolean("sent_settings", sent_settings_); 1577 dict->SetBoolean("received_settings", received_settings_); 1578 1579 dict->SetInteger("send_window_size", session_send_window_size_); 1580 dict->SetInteger("recv_window_size", session_recv_window_size_); 1581 dict->SetInteger("unacked_recv_window_bytes", 1582 session_unacked_recv_window_bytes_); 1583 return dict; 1584 } 1585 1586 bool SpdySession::IsReused() const { 1587 return buffered_spdy_framer_->frames_received() > 0; 1588 } 1589 1590 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id, 1591 LoadTimingInfo* load_timing_info) const { 1592 return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId, 1593 load_timing_info); 1594 } 1595 1596 int SpdySession::GetPeerAddress(IPEndPoint* address) const { 1597 int rv = ERR_SOCKET_NOT_CONNECTED; 1598 if (connection_->socket()) { 1599 rv = connection_->socket()->GetPeerAddress(address); 1600 } 1601 1602 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress", 1603 rv == ERR_SOCKET_NOT_CONNECTED); 1604 1605 return rv; 1606 } 1607 1608 int SpdySession::GetLocalAddress(IPEndPoint* address) const { 1609 int rv = ERR_SOCKET_NOT_CONNECTED; 1610 if (connection_->socket()) { 1611 rv = connection_->socket()->GetLocalAddress(address); 1612 } 1613 1614 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress", 1615 rv == ERR_SOCKET_NOT_CONNECTED); 1616 1617 return rv; 1618 } 1619 1620 void SpdySession::EnqueueSessionWrite(RequestPriority priority, 1621 SpdyFrameType frame_type, 1622 scoped_ptr<SpdyFrame> frame) { 1623 DCHECK(frame_type == RST_STREAM || 1624 frame_type == SETTINGS || 1625 frame_type == WINDOW_UPDATE || 1626 frame_type == PING); 1627 EnqueueWrite( 1628 priority, frame_type, 1629 scoped_ptr<SpdyBufferProducer>( 1630 new SimpleBufferProducer( 1631 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), 1632 base::WeakPtr<SpdyStream>()); 1633 } 1634 1635 void SpdySession::EnqueueWrite(RequestPriority priority, 1636 SpdyFrameType frame_type, 1637 scoped_ptr<SpdyBufferProducer> producer, 1638 const base::WeakPtr<SpdyStream>& stream) { 1639 if (availability_state_ == STATE_CLOSED) 1640 return; 1641 1642 bool was_idle = write_queue_.IsEmpty(); 1643 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); 1644 if (write_state_ == WRITE_STATE_IDLE) { 1645 DCHECK(was_idle); 1646 DCHECK(!in_flight_write_); 1647 write_state_ = WRITE_STATE_DO_WRITE; 1648 base::MessageLoop::current()->PostTask( 1649 FROM_HERE, 1650 base::Bind(&SpdySession::PumpWriteLoop, 1651 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK)); 1652 } 1653 } 1654 1655 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { 1656 DCHECK_EQ(stream->stream_id(), 0u); 1657 DCHECK(created_streams_.find(stream.get()) == created_streams_.end()); 1658 created_streams_.insert(stream.release()); 1659 } 1660 1661 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) { 1662 DCHECK_EQ(stream->stream_id(), 0u); 1663 DCHECK(created_streams_.find(stream) != created_streams_.end()); 1664 stream->set_stream_id(GetNewStreamId()); 1665 scoped_ptr<SpdyStream> owned_stream(stream); 1666 created_streams_.erase(stream); 1667 return owned_stream.Pass(); 1668 } 1669 1670 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) { 1671 SpdyStreamId stream_id = stream->stream_id(); 1672 DCHECK_NE(stream_id, 0u); 1673 std::pair<ActiveStreamMap::iterator, bool> result = 1674 active_streams_.insert( 1675 std::make_pair(stream_id, ActiveStreamInfo(stream.get()))); 1676 if (result.second) { 1677 ignore_result(stream.release()); 1678 } else { 1679 NOTREACHED(); 1680 } 1681 } 1682 1683 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) { 1684 if (in_flight_write_stream_.get() == stream.get()) { 1685 // If we're deleting the stream for the in-flight write, we still 1686 // need to let the write complete, so we clear 1687 // |in_flight_write_stream_| and let the write finish on its own 1688 // without notifying |in_flight_write_stream_|. 1689 in_flight_write_stream_.reset(); 1690 } 1691 1692 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr()); 1693 1694 // |stream->OnClose()| may end up closing |this|, so detect that. 1695 base::WeakPtr<SpdySession> weak_this = GetWeakPtr(); 1696 1697 stream->OnClose(status); 1698 1699 if (!weak_this) 1700 return; 1701 1702 switch (availability_state_) { 1703 case STATE_AVAILABLE: 1704 ProcessPendingStreamRequests(); 1705 break; 1706 case STATE_GOING_AWAY: 1707 DcheckGoingAway(); 1708 MaybeFinishGoingAway(); 1709 break; 1710 case STATE_CLOSED: 1711 // Do nothing. 1712 break; 1713 } 1714 } 1715 1716 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) { 1717 base::StatsCounter used_push_streams("spdy.claimed_push_streams"); 1718 1719 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url); 1720 if (unclaimed_it == unclaimed_pushed_streams_.end()) 1721 return base::WeakPtr<SpdyStream>(); 1722 1723 SpdyStreamId stream_id = unclaimed_it->second.stream_id; 1724 unclaimed_pushed_streams_.erase(unclaimed_it); 1725 1726 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 1727 if (active_it == active_streams_.end()) { 1728 NOTREACHED(); 1729 return base::WeakPtr<SpdyStream>(); 1730 } 1731 1732 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM); 1733 used_push_streams.Increment(); 1734 return active_it->second.stream->GetWeakPtr(); 1735 } 1736 1737 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, 1738 bool* was_npn_negotiated, 1739 NextProto* protocol_negotiated) { 1740 *was_npn_negotiated = connection_->socket()->WasNpnNegotiated(); 1741 *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol(); 1742 return connection_->socket()->GetSSLInfo(ssl_info); 1743 } 1744 1745 bool SpdySession::GetSSLCertRequestInfo( 1746 SSLCertRequestInfo* cert_request_info) { 1747 if (!is_secure_) 1748 return false; 1749 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info); 1750 return true; 1751 } 1752 1753 ServerBoundCertService* SpdySession::GetServerBoundCertService() const { 1754 if (!is_secure_) 1755 return NULL; 1756 return GetSSLClientSocket()->GetServerBoundCertService(); 1757 } 1758 1759 void SpdySession::OnError(SpdyFramer::SpdyError error_code) { 1760 CHECK(in_io_loop_); 1761 1762 if (availability_state_ == STATE_CLOSED) 1763 return; 1764 1765 RecordProtocolErrorHistogram( 1766 static_cast<SpdyProtocolErrorDetails>(error_code)); 1767 std::string description = base::StringPrintf( 1768 "SPDY_ERROR error_code: %d.", error_code); 1769 CloseSessionResult result = 1770 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description); 1771 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 1772 } 1773 1774 void SpdySession::OnStreamError(SpdyStreamId stream_id, 1775 const std::string& description) { 1776 CHECK(in_io_loop_); 1777 1778 if (availability_state_ == STATE_CLOSED) 1779 return; 1780 1781 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1782 if (it == active_streams_.end()) { 1783 // We still want to send a frame to reset the stream even if we 1784 // don't know anything about it. 1785 EnqueueResetStreamFrame( 1786 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description); 1787 return; 1788 } 1789 1790 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description); 1791 } 1792 1793 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, 1794 const char* data, 1795 size_t len, 1796 bool fin) { 1797 CHECK(in_io_loop_); 1798 1799 if (availability_state_ == STATE_CLOSED) 1800 return; 1801 1802 DCHECK_LT(len, 1u << 24); 1803 if (net_log().IsLoggingAllEvents()) { 1804 net_log().AddEvent( 1805 NetLog::TYPE_SPDY_SESSION_RECV_DATA, 1806 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); 1807 } 1808 1809 // Build the buffer as early as possible so that we go through the 1810 // session flow control checks and update 1811 // |unacked_recv_window_bytes_| properly even when the stream is 1812 // inactive (since the other side has still reduced its session send 1813 // window). 1814 scoped_ptr<SpdyBuffer> buffer; 1815 if (data) { 1816 DCHECK_GT(len, 0u); 1817 buffer.reset(new SpdyBuffer(data, len)); 1818 1819 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 1820 DecreaseRecvWindowSize(static_cast<int32>(len)); 1821 buffer->AddConsumeCallback( 1822 base::Bind(&SpdySession::OnReadBufferConsumed, 1823 weak_factory_.GetWeakPtr())); 1824 } 1825 } else { 1826 DCHECK_EQ(len, 0u); 1827 } 1828 1829 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1830 1831 // By the time data comes in, the stream may already be inactive. 1832 if (it == active_streams_.end()) 1833 return; 1834 1835 SpdyStream* stream = it->second.stream; 1836 CHECK_EQ(stream->stream_id(), stream_id); 1837 1838 if (it->second.waiting_for_syn_reply) { 1839 const std::string& error = "Data received before SYN_REPLY."; 1840 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 1841 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 1842 return; 1843 } 1844 1845 stream->OnDataReceived(buffer.Pass()); 1846 } 1847 1848 void SpdySession::OnSettings(bool clear_persisted) { 1849 CHECK(in_io_loop_); 1850 1851 if (availability_state_ == STATE_CLOSED) 1852 return; 1853 1854 if (clear_persisted) 1855 http_server_properties_->ClearSpdySettings(host_port_pair()); 1856 1857 if (net_log_.IsLoggingAllEvents()) { 1858 net_log_.AddEvent( 1859 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS, 1860 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(), 1861 clear_persisted)); 1862 } 1863 } 1864 1865 void SpdySession::OnSetting(SpdySettingsIds id, 1866 uint8 flags, 1867 uint32 value) { 1868 CHECK(in_io_loop_); 1869 1870 if (availability_state_ == STATE_CLOSED) 1871 return; 1872 1873 HandleSetting(id, value); 1874 http_server_properties_->SetSpdySetting( 1875 host_port_pair(), 1876 id, 1877 static_cast<SpdySettingsFlags>(flags), 1878 value); 1879 received_settings_ = true; 1880 1881 // Log the setting. 1882 net_log_.AddEvent( 1883 NetLog::TYPE_SPDY_SESSION_RECV_SETTING, 1884 base::Bind(&NetLogSpdySettingCallback, 1885 id, static_cast<SpdySettingsFlags>(flags), value)); 1886 } 1887 1888 void SpdySession::OnSendCompressedFrame( 1889 SpdyStreamId stream_id, 1890 SpdyFrameType type, 1891 size_t payload_len, 1892 size_t frame_len) { 1893 if (type != SYN_STREAM) 1894 return; 1895 1896 DCHECK(buffered_spdy_framer_.get()); 1897 size_t compressed_len = 1898 frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize(); 1899 1900 if (payload_len) { 1901 // Make sure we avoid early decimal truncation. 1902 int compression_pct = 100 - (100 * compressed_len) / payload_len; 1903 UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage", 1904 compression_pct); 1905 } 1906 } 1907 1908 int SpdySession::OnInitialResponseHeadersReceived( 1909 const SpdyHeaderBlock& response_headers, 1910 base::Time response_time, 1911 base::TimeTicks recv_first_byte_time, 1912 SpdyStream* stream) { 1913 CHECK(in_io_loop_); 1914 SpdyStreamId stream_id = stream->stream_id(); 1915 // May invalidate |stream|. 1916 int rv = stream->OnInitialResponseHeadersReceived( 1917 response_headers, response_time, recv_first_byte_time); 1918 if (rv < 0) { 1919 DCHECK_NE(rv, ERR_IO_PENDING); 1920 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 1921 } 1922 return rv; 1923 } 1924 1925 void SpdySession::OnSynStream(SpdyStreamId stream_id, 1926 SpdyStreamId associated_stream_id, 1927 SpdyPriority priority, 1928 uint8 credential_slot, 1929 bool fin, 1930 bool unidirectional, 1931 const SpdyHeaderBlock& headers) { 1932 CHECK(in_io_loop_); 1933 1934 if (availability_state_ == STATE_CLOSED) 1935 return; 1936 1937 base::Time response_time = base::Time::Now(); 1938 base::TimeTicks recv_first_byte_time = time_func_(); 1939 1940 if (net_log_.IsLoggingAllEvents()) { 1941 net_log_.AddEvent( 1942 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM, 1943 base::Bind(&NetLogSpdySynCallback, 1944 &headers, fin, unidirectional, 1945 stream_id, associated_stream_id)); 1946 } 1947 1948 // Server-initiated streams should have even sequence numbers. 1949 if ((stream_id & 0x1) != 0) { 1950 LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id; 1951 return; 1952 } 1953 1954 if (IsStreamActive(stream_id)) { 1955 LOG(WARNING) << "Received OnSyn for active stream " << stream_id; 1956 return; 1957 } 1958 1959 RequestPriority request_priority = 1960 ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion()); 1961 1962 if (availability_state_ == STATE_GOING_AWAY) { 1963 // TODO(akalin): This behavior isn't in the SPDY spec, although it 1964 // probably should be. 1965 EnqueueResetStreamFrame(stream_id, request_priority, 1966 RST_STREAM_REFUSED_STREAM, 1967 "OnSyn received when going away"); 1968 return; 1969 } 1970 1971 if (associated_stream_id == 0) { 1972 std::string description = base::StringPrintf( 1973 "Received invalid OnSyn associated stream id %d for stream %d", 1974 associated_stream_id, stream_id); 1975 EnqueueResetStreamFrame(stream_id, request_priority, 1976 RST_STREAM_REFUSED_STREAM, description); 1977 return; 1978 } 1979 1980 streams_pushed_count_++; 1981 1982 // TODO(mbelshe): DCHECK that this is a GET method? 1983 1984 // Verify that the response had a URL for us. 1985 GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true); 1986 if (!gurl.is_valid()) { 1987 EnqueueResetStreamFrame( 1988 stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR, 1989 "Pushed stream url was invalid: " + gurl.spec()); 1990 return; 1991 } 1992 1993 // Verify we have a valid stream association. 1994 ActiveStreamMap::iterator associated_it = 1995 active_streams_.find(associated_stream_id); 1996 if (associated_it == active_streams_.end()) { 1997 EnqueueResetStreamFrame( 1998 stream_id, request_priority, RST_STREAM_INVALID_STREAM, 1999 base::StringPrintf( 2000 "Received OnSyn with inactive associated stream %d", 2001 associated_stream_id)); 2002 return; 2003 } 2004 2005 // Check that the SYN advertises the same origin as its associated stream. 2006 // Bypass this check if and only if this session is with a SPDY proxy that 2007 // is trusted explicitly via the --trusted-spdy-proxy switch. 2008 if (trusted_spdy_proxy_.Equals(host_port_pair())) { 2009 // Disallow pushing of HTTPS content. 2010 if (gurl.SchemeIs("https")) { 2011 EnqueueResetStreamFrame( 2012 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, 2013 base::StringPrintf( 2014 "Rejected push of Cross Origin HTTPS content %d", 2015 associated_stream_id)); 2016 } 2017 } else { 2018 GURL associated_url(associated_it->second.stream->GetUrlFromHeaders()); 2019 if (associated_url.GetOrigin() != gurl.GetOrigin()) { 2020 EnqueueResetStreamFrame( 2021 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, 2022 base::StringPrintf( 2023 "Rejected Cross Origin Push Stream %d", 2024 associated_stream_id)); 2025 return; 2026 } 2027 } 2028 2029 // There should not be an existing pushed stream with the same path. 2030 PushedStreamMap::iterator pushed_it = 2031 unclaimed_pushed_streams_.lower_bound(gurl); 2032 if (pushed_it != unclaimed_pushed_streams_.end() && 2033 pushed_it->first == gurl) { 2034 EnqueueResetStreamFrame( 2035 stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR, 2036 "Received duplicate pushed stream with url: " + 2037 gurl.spec()); 2038 return; 2039 } 2040 2041 scoped_ptr<SpdyStream> stream( 2042 new SpdyStream(SPDY_PUSH_STREAM, GetWeakPtr(), gurl, 2043 request_priority, 2044 stream_initial_send_window_size_, 2045 stream_initial_recv_window_size_, 2046 net_log_)); 2047 stream->set_stream_id(stream_id); 2048 2049 DeleteExpiredPushedStreams(); 2050 PushedStreamMap::iterator inserted_pushed_it = 2051 unclaimed_pushed_streams_.insert( 2052 pushed_it, 2053 std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_()))); 2054 DCHECK(inserted_pushed_it != pushed_it); 2055 2056 InsertActivatedStream(stream.Pass()); 2057 2058 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 2059 if (active_it == active_streams_.end()) { 2060 NOTREACHED(); 2061 return; 2062 } 2063 2064 // Parse the headers. 2065 if (OnInitialResponseHeadersReceived( 2066 headers, response_time, 2067 recv_first_byte_time, active_it->second.stream) != OK) 2068 return; 2069 2070 base::StatsCounter push_requests("spdy.pushed_streams"); 2071 push_requests.Increment(); 2072 } 2073 2074 void SpdySession::DeleteExpiredPushedStreams() { 2075 if (unclaimed_pushed_streams_.empty()) 2076 return; 2077 2078 // Check that adequate time has elapsed since the last sweep. 2079 if (time_func_() < next_unclaimed_push_stream_sweep_time_) 2080 return; 2081 2082 // Gather old streams to delete. 2083 base::TimeTicks minimum_freshness = time_func_() - 2084 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2085 std::vector<SpdyStreamId> streams_to_close; 2086 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); 2087 it != unclaimed_pushed_streams_.end(); ++it) { 2088 if (minimum_freshness > it->second.creation_time) 2089 streams_to_close.push_back(it->second.stream_id); 2090 } 2091 2092 for (std::vector<SpdyStreamId>::const_iterator to_close_it = 2093 streams_to_close.begin(); 2094 to_close_it != streams_to_close.end(); ++to_close_it) { 2095 ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it); 2096 if (active_it == active_streams_.end()) 2097 continue; 2098 2099 LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM); 2100 // CloseActiveStreamIterator() will remove the stream from 2101 // |unclaimed_pushed_streams_|. 2102 CloseActiveStreamIterator(active_it, ERR_INVALID_SPDY_STREAM); 2103 } 2104 2105 next_unclaimed_push_stream_sweep_time_ = time_func_() + 2106 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2107 } 2108 2109 void SpdySession::OnSynReply(SpdyStreamId stream_id, 2110 bool fin, 2111 const SpdyHeaderBlock& headers) { 2112 CHECK(in_io_loop_); 2113 2114 if (availability_state_ == STATE_CLOSED) 2115 return; 2116 2117 base::Time response_time = base::Time::Now(); 2118 base::TimeTicks recv_first_byte_time = time_func_(); 2119 2120 if (net_log().IsLoggingAllEvents()) { 2121 net_log().AddEvent( 2122 NetLog::TYPE_SPDY_SESSION_SYN_REPLY, 2123 base::Bind(&NetLogSpdySynCallback, 2124 &headers, fin, false, // not unidirectional 2125 stream_id, 0)); 2126 } 2127 2128 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2129 if (it == active_streams_.end()) { 2130 // NOTE: it may just be that the stream was cancelled. 2131 return; 2132 } 2133 2134 SpdyStream* stream = it->second.stream; 2135 CHECK_EQ(stream->stream_id(), stream_id); 2136 2137 if (!it->second.waiting_for_syn_reply) { 2138 const std::string& error = 2139 "Received duplicate SYN_REPLY for stream."; 2140 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2141 ResetStreamIterator(it, RST_STREAM_STREAM_IN_USE, error); 2142 return; 2143 } 2144 it->second.waiting_for_syn_reply = false; 2145 2146 ignore_result(OnInitialResponseHeadersReceived( 2147 headers, response_time, recv_first_byte_time, stream)); 2148 } 2149 2150 void SpdySession::OnHeaders(SpdyStreamId stream_id, 2151 bool fin, 2152 const SpdyHeaderBlock& headers) { 2153 CHECK(in_io_loop_); 2154 2155 if (availability_state_ == STATE_CLOSED) 2156 return; 2157 2158 if (net_log().IsLoggingAllEvents()) { 2159 net_log().AddEvent( 2160 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, 2161 base::Bind(&NetLogSpdySynCallback, 2162 &headers, fin, /*unidirectional=*/false, 2163 stream_id, 0)); 2164 } 2165 2166 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2167 if (it == active_streams_.end()) { 2168 // NOTE: it may just be that the stream was cancelled. 2169 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id; 2170 return; 2171 } 2172 2173 SpdyStream* stream = it->second.stream; 2174 CHECK_EQ(stream->stream_id(), stream_id); 2175 2176 int rv = stream->OnAdditionalResponseHeadersReceived(headers); 2177 if (rv < 0) { 2178 DCHECK_NE(rv, ERR_IO_PENDING); 2179 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 2180 } 2181 } 2182 2183 void SpdySession::OnRstStream(SpdyStreamId stream_id, 2184 SpdyRstStreamStatus status) { 2185 CHECK(in_io_loop_); 2186 2187 if (availability_state_ == STATE_CLOSED) 2188 return; 2189 2190 std::string description; 2191 net_log().AddEvent( 2192 NetLog::TYPE_SPDY_SESSION_RST_STREAM, 2193 base::Bind(&NetLogSpdyRstCallback, 2194 stream_id, status, &description)); 2195 2196 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2197 if (it == active_streams_.end()) { 2198 // NOTE: it may just be that the stream was cancelled. 2199 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 2200 return; 2201 } 2202 2203 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2204 2205 if (status == 0) { 2206 it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>()); 2207 } else if (status == RST_STREAM_REFUSED_STREAM) { 2208 CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM); 2209 } else { 2210 RecordProtocolErrorHistogram( 2211 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); 2212 it->second.stream->LogStreamError( 2213 ERR_SPDY_PROTOCOL_ERROR, 2214 base::StringPrintf("SPDY stream closed with status: %d", status)); 2215 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 2216 // For now, it doesn't matter much - it is a protocol error. 2217 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 2218 } 2219 } 2220 2221 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, 2222 SpdyGoAwayStatus status) { 2223 CHECK(in_io_loop_); 2224 2225 if (availability_state_ == STATE_CLOSED) 2226 return; 2227 2228 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, 2229 base::Bind(&NetLogSpdyGoAwayCallback, 2230 last_accepted_stream_id, 2231 active_streams_.size(), 2232 unclaimed_pushed_streams_.size(), 2233 status)); 2234 if (availability_state_ < STATE_GOING_AWAY) { 2235 availability_state_ = STATE_GOING_AWAY; 2236 // |pool_| will be NULL when |InitializeWithSocket()| is in the 2237 // call stack. 2238 if (pool_) 2239 pool_->MakeSessionUnavailable(GetWeakPtr()); 2240 } 2241 StartGoingAway(last_accepted_stream_id, ERR_ABORTED); 2242 // This is to handle the case when we already don't have any active 2243 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have 2244 // active streams and so the last one being closed will finish the 2245 // going away process (see DeleteStream()). 2246 MaybeFinishGoingAway(); 2247 } 2248 2249 void SpdySession::OnPing(uint32 unique_id) { 2250 CHECK(in_io_loop_); 2251 2252 if (availability_state_ == STATE_CLOSED) 2253 return; 2254 2255 net_log_.AddEvent( 2256 NetLog::TYPE_SPDY_SESSION_PING, 2257 base::Bind(&NetLogSpdyPingCallback, unique_id, "received")); 2258 2259 // Send response to a PING from server. 2260 if (unique_id % 2 == 0) { 2261 WritePingFrame(unique_id); 2262 return; 2263 } 2264 2265 --pings_in_flight_; 2266 if (pings_in_flight_ < 0) { 2267 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING); 2268 CloseSessionResult result = 2269 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0."); 2270 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 2271 pings_in_flight_ = 0; 2272 return; 2273 } 2274 2275 if (pings_in_flight_ > 0) 2276 return; 2277 2278 // We will record RTT in histogram when there are no more client sent 2279 // pings_in_flight_. 2280 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_); 2281 } 2282 2283 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, 2284 uint32 delta_window_size) { 2285 CHECK(in_io_loop_); 2286 2287 if (availability_state_ == STATE_CLOSED) 2288 return; 2289 2290 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); 2291 net_log_.AddEvent( 2292 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, 2293 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2294 stream_id, delta_window_size)); 2295 2296 if (stream_id == kSessionFlowControlStreamId) { 2297 // WINDOW_UPDATE for the session. 2298 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) { 2299 LOG(WARNING) << "Received WINDOW_UPDATE for session when " 2300 << "session flow control is not turned on"; 2301 // TODO(akalin): Record an error and close the session. 2302 return; 2303 } 2304 2305 if (delta_window_size < 1u) { 2306 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2307 CloseSessionResult result = DoCloseSession( 2308 ERR_SPDY_PROTOCOL_ERROR, 2309 "Received WINDOW_UPDATE with an invalid delta_window_size " + 2310 base::UintToString(delta_window_size)); 2311 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 2312 return; 2313 } 2314 2315 IncreaseSendWindowSize(static_cast<int32>(delta_window_size)); 2316 } else { 2317 // WINDOW_UPDATE for a stream. 2318 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2319 // TODO(akalin): Record an error and close the session. 2320 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id 2321 << " when flow control is not turned on"; 2322 return; 2323 } 2324 2325 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2326 2327 if (it == active_streams_.end()) { 2328 // NOTE: it may just be that the stream was cancelled. 2329 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id; 2330 return; 2331 } 2332 2333 SpdyStream* stream = it->second.stream; 2334 CHECK_EQ(stream->stream_id(), stream_id); 2335 2336 if (delta_window_size < 1u) { 2337 ResetStreamIterator(it, 2338 RST_STREAM_FLOW_CONTROL_ERROR, 2339 base::StringPrintf( 2340 "Received WINDOW_UPDATE with an invalid " 2341 "delta_window_size %ud", delta_window_size)); 2342 return; 2343 } 2344 2345 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2346 it->second.stream->IncreaseSendWindowSize( 2347 static_cast<int32>(delta_window_size)); 2348 } 2349 } 2350 2351 void SpdySession::OnPushPromise(SpdyStreamId stream_id, 2352 SpdyStreamId promised_stream_id) { 2353 // TODO(akalin): Handle PUSH_PROMISE frames. 2354 } 2355 2356 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id, 2357 uint32 delta_window_size) { 2358 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2359 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2360 CHECK(it != active_streams_.end()); 2361 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2362 SendWindowUpdateFrame( 2363 stream_id, delta_window_size, it->second.stream->priority()); 2364 } 2365 2366 void SpdySession::SendInitialData() { 2367 DCHECK(enable_sending_initial_data_); 2368 DCHECK_NE(availability_state_, STATE_CLOSED); 2369 2370 if (send_connection_header_prefix_) { 2371 DCHECK_EQ(protocol_, kProtoHTTP2Draft04); 2372 scoped_ptr<SpdyFrame> connection_header_prefix_frame( 2373 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix), 2374 kHttp2ConnectionHeaderPrefixSize, 2375 false /* take_ownership */)); 2376 // Count the prefix as part of the subsequent SETTINGS frame. 2377 EnqueueSessionWrite(HIGHEST, SETTINGS, 2378 connection_header_prefix_frame.Pass()); 2379 } 2380 2381 // First, notify the server about the settings they should use when 2382 // communicating with us. 2383 SettingsMap settings_map; 2384 // Create a new settings frame notifying the server of our 2385 // max concurrent streams and initial window size. 2386 settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] = 2387 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams); 2388 if (flow_control_state_ >= FLOW_CONTROL_STREAM && 2389 stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) { 2390 settings_map[SETTINGS_INITIAL_WINDOW_SIZE] = 2391 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, 2392 stream_initial_recv_window_size_); 2393 } 2394 SendSettings(settings_map); 2395 2396 // Next, notify the server about our initial recv window size. 2397 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 2398 // Bump up the receive window size to the real initial value. This 2399 // has to go here since the WINDOW_UPDATE frame sent by 2400 // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|. 2401 DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_); 2402 // This condition implies that |kDefaultInitialRecvWindowSize| - 2403 // |session_recv_window_size_| doesn't overflow. 2404 DCHECK_GT(session_recv_window_size_, 0); 2405 IncreaseRecvWindowSize( 2406 kDefaultInitialRecvWindowSize - session_recv_window_size_); 2407 } 2408 2409 // Finally, notify the server about the settings they have 2410 // previously told us to use when communicating with them (after 2411 // applying them). 2412 const SettingsMap& server_settings_map = 2413 http_server_properties_->GetSpdySettings(host_port_pair()); 2414 if (server_settings_map.empty()) 2415 return; 2416 2417 SettingsMap::const_iterator it = 2418 server_settings_map.find(SETTINGS_CURRENT_CWND); 2419 uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0; 2420 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100); 2421 2422 for (SettingsMap::const_iterator it = server_settings_map.begin(); 2423 it != server_settings_map.end(); ++it) { 2424 const SpdySettingsIds new_id = it->first; 2425 const uint32 new_val = it->second.second; 2426 HandleSetting(new_id, new_val); 2427 } 2428 2429 SendSettings(server_settings_map); 2430 } 2431 2432 2433 void SpdySession::SendSettings(const SettingsMap& settings) { 2434 DCHECK_NE(availability_state_, STATE_CLOSED); 2435 2436 net_log_.AddEvent( 2437 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 2438 base::Bind(&NetLogSpdySendSettingsCallback, &settings)); 2439 2440 // Create the SETTINGS frame and send it. 2441 DCHECK(buffered_spdy_framer_.get()); 2442 scoped_ptr<SpdyFrame> settings_frame( 2443 buffered_spdy_framer_->CreateSettings(settings)); 2444 sent_settings_ = true; 2445 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass()); 2446 } 2447 2448 void SpdySession::HandleSetting(uint32 id, uint32 value) { 2449 switch (id) { 2450 case SETTINGS_MAX_CONCURRENT_STREAMS: 2451 max_concurrent_streams_ = std::min(static_cast<size_t>(value), 2452 kMaxConcurrentStreamLimit); 2453 ProcessPendingStreamRequests(); 2454 break; 2455 case SETTINGS_INITIAL_WINDOW_SIZE: { 2456 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2457 net_log().AddEvent( 2458 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL); 2459 return; 2460 } 2461 2462 if (value > static_cast<uint32>(kint32max)) { 2463 net_log().AddEvent( 2464 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE, 2465 NetLog::IntegerCallback("initial_window_size", value)); 2466 return; 2467 } 2468 2469 // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only. 2470 int32 delta_window_size = 2471 static_cast<int32>(value) - stream_initial_send_window_size_; 2472 stream_initial_send_window_size_ = static_cast<int32>(value); 2473 UpdateStreamsSendWindowSize(delta_window_size); 2474 net_log().AddEvent( 2475 NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE, 2476 NetLog::IntegerCallback("delta_window_size", delta_window_size)); 2477 break; 2478 } 2479 } 2480 } 2481 2482 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { 2483 DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2484 for (ActiveStreamMap::iterator it = active_streams_.begin(); 2485 it != active_streams_.end(); ++it) { 2486 it->second.stream->AdjustSendWindowSize(delta_window_size); 2487 } 2488 2489 for (CreatedStreamSet::const_iterator it = created_streams_.begin(); 2490 it != created_streams_.end(); it++) { 2491 (*it)->AdjustSendWindowSize(delta_window_size); 2492 } 2493 } 2494 2495 void SpdySession::SendPrefacePingIfNoneInFlight() { 2496 if (pings_in_flight_ || !enable_ping_based_connection_checking_) 2497 return; 2498 2499 base::TimeTicks now = time_func_(); 2500 // If there is no activity in the session, then send a preface-PING. 2501 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_) 2502 SendPrefacePing(); 2503 } 2504 2505 void SpdySession::SendPrefacePing() { 2506 WritePingFrame(next_ping_id_); 2507 } 2508 2509 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, 2510 uint32 delta_window_size, 2511 RequestPriority priority) { 2512 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2513 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2514 if (it != active_streams_.end()) { 2515 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2516 } else { 2517 CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2518 CHECK_EQ(stream_id, kSessionFlowControlStreamId); 2519 } 2520 2521 net_log_.AddEvent( 2522 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME, 2523 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2524 stream_id, delta_window_size)); 2525 2526 DCHECK(buffered_spdy_framer_.get()); 2527 scoped_ptr<SpdyFrame> window_update_frame( 2528 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); 2529 EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass()); 2530 } 2531 2532 void SpdySession::WritePingFrame(uint32 unique_id) { 2533 DCHECK(buffered_spdy_framer_.get()); 2534 scoped_ptr<SpdyFrame> ping_frame( 2535 buffered_spdy_framer_->CreatePingFrame(unique_id)); 2536 EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass()); 2537 2538 if (net_log().IsLoggingAllEvents()) { 2539 net_log().AddEvent( 2540 NetLog::TYPE_SPDY_SESSION_PING, 2541 base::Bind(&NetLogSpdyPingCallback, unique_id, "sent")); 2542 } 2543 if (unique_id % 2 != 0) { 2544 next_ping_id_ += 2; 2545 ++pings_in_flight_; 2546 PlanToCheckPingStatus(); 2547 last_ping_sent_time_ = time_func_(); 2548 } 2549 } 2550 2551 void SpdySession::PlanToCheckPingStatus() { 2552 if (check_ping_status_pending_) 2553 return; 2554 2555 check_ping_status_pending_ = true; 2556 base::MessageLoop::current()->PostDelayedTask( 2557 FROM_HERE, 2558 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2559 time_func_()), hung_interval_); 2560 } 2561 2562 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) { 2563 CHECK(!in_io_loop_); 2564 DCHECK_NE(availability_state_, STATE_CLOSED); 2565 2566 // Check if we got a response back for all PINGs we had sent. 2567 if (pings_in_flight_ == 0) { 2568 check_ping_status_pending_ = false; 2569 return; 2570 } 2571 2572 DCHECK(check_ping_status_pending_); 2573 2574 base::TimeTicks now = time_func_(); 2575 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_); 2576 2577 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) { 2578 // Track all failed PING messages in a separate bucket. 2579 const base::TimeDelta kFailedPing = 2580 base::TimeDelta::FromInternalValue(INT_MAX); 2581 RecordPingRTTHistogram(kFailedPing); 2582 CloseSessionResult result = 2583 DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping."); 2584 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED); 2585 return; 2586 } 2587 2588 // Check the status of connection after a delay. 2589 base::MessageLoop::current()->PostDelayedTask( 2590 FROM_HERE, 2591 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2592 now), 2593 delay); 2594 } 2595 2596 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) { 2597 UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration); 2598 } 2599 2600 void SpdySession::RecordProtocolErrorHistogram( 2601 SpdyProtocolErrorDetails details) { 2602 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details, 2603 NUM_SPDY_PROTOCOL_ERROR_DETAILS); 2604 if (EndsWith(host_port_pair().host(), "google.com", false)) { 2605 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details, 2606 NUM_SPDY_PROTOCOL_ERROR_DETAILS); 2607 } 2608 } 2609 2610 void SpdySession::RecordHistograms() { 2611 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession", 2612 streams_initiated_count_, 2613 0, 300, 50); 2614 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession", 2615 streams_pushed_count_, 2616 0, 300, 50); 2617 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession", 2618 streams_pushed_and_claimed_count_, 2619 0, 300, 50); 2620 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession", 2621 streams_abandoned_count_, 2622 0, 300, 50); 2623 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent", 2624 sent_settings_ ? 1 : 0, 2); 2625 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived", 2626 received_settings_ ? 1 : 0, 2); 2627 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession", 2628 stalled_streams_, 2629 0, 300, 50); 2630 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls", 2631 stalled_streams_ > 0 ? 1 : 0, 2); 2632 2633 if (received_settings_) { 2634 // Enumerate the saved settings, and set histograms for it. 2635 const SettingsMap& settings_map = 2636 http_server_properties_->GetSpdySettings(host_port_pair()); 2637 2638 SettingsMap::const_iterator it; 2639 for (it = settings_map.begin(); it != settings_map.end(); ++it) { 2640 const SpdySettingsIds id = it->first; 2641 const uint32 val = it->second.second; 2642 switch (id) { 2643 case SETTINGS_CURRENT_CWND: 2644 // Record several different histograms to see if cwnd converges 2645 // for larger volumes of data being sent. 2646 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd", 2647 val, 1, 200, 100); 2648 if (total_bytes_received_ > 10 * 1024) { 2649 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K", 2650 val, 1, 200, 100); 2651 if (total_bytes_received_ > 25 * 1024) { 2652 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K", 2653 val, 1, 200, 100); 2654 if (total_bytes_received_ > 50 * 1024) { 2655 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K", 2656 val, 1, 200, 100); 2657 if (total_bytes_received_ > 100 * 1024) { 2658 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K", 2659 val, 1, 200, 100); 2660 } 2661 } 2662 } 2663 } 2664 break; 2665 case SETTINGS_ROUND_TRIP_TIME: 2666 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT", 2667 val, 1, 1200, 100); 2668 break; 2669 case SETTINGS_DOWNLOAD_RETRANS_RATE: 2670 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate", 2671 val, 1, 100, 50); 2672 break; 2673 default: 2674 break; 2675 } 2676 } 2677 } 2678 } 2679 2680 void SpdySession::CompleteStreamRequest(SpdyStreamRequest* pending_request) { 2681 CHECK(pending_request); 2682 2683 PendingStreamRequestCompletionSet::iterator it = 2684 pending_stream_request_completions_.find(pending_request); 2685 2686 // Abort if the request has already been cancelled. 2687 if (it == pending_stream_request_completions_.end()) 2688 return; 2689 2690 base::WeakPtr<SpdyStream> stream; 2691 int rv = CreateStream(*pending_request, &stream); 2692 pending_stream_request_completions_.erase(it); 2693 2694 if (rv == OK) { 2695 DCHECK(stream.get()); 2696 pending_request->OnRequestCompleteSuccess(&stream); 2697 } else { 2698 DCHECK(!stream.get()); 2699 pending_request->OnRequestCompleteFailure(rv); 2700 } 2701 } 2702 2703 SSLClientSocket* SpdySession::GetSSLClientSocket() const { 2704 if (!is_secure_) 2705 return NULL; 2706 SSLClientSocket* ssl_socket = 2707 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 2708 DCHECK(ssl_socket); 2709 return ssl_socket; 2710 } 2711 2712 void SpdySession::OnWriteBufferConsumed( 2713 size_t frame_payload_size, 2714 size_t consume_size, 2715 SpdyBuffer::ConsumeSource consume_source) { 2716 // We can be called with |in_io_loop_| set if a write SpdyBuffer is 2717 // deleted (e.g., a stream is closed due to incoming data). 2718 2719 if (availability_state_ == STATE_CLOSED) 2720 return; 2721 2722 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2723 2724 if (consume_source == SpdyBuffer::DISCARD) { 2725 // If we're discarding a frame or part of it, increase the send 2726 // window by the number of discarded bytes. (Although if we're 2727 // discarding part of a frame, it's probably because of a write 2728 // error and we'll be tearing down the session soon.) 2729 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); 2730 DCHECK_GT(remaining_payload_bytes, 0u); 2731 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); 2732 } 2733 // For consumed bytes, the send window is increased when we receive 2734 // a WINDOW_UPDATE frame. 2735 } 2736 2737 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { 2738 // We can be called with |in_io_loop_| set if a SpdyBuffer is 2739 // deleted (e.g., a stream is closed due to incoming data). 2740 2741 DCHECK_NE(availability_state_, STATE_CLOSED); 2742 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2743 DCHECK_GE(delta_window_size, 1); 2744 2745 // Check for overflow. 2746 int32 max_delta_window_size = kint32max - session_send_window_size_; 2747 if (delta_window_size > max_delta_window_size) { 2748 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2749 CloseSessionResult result = DoCloseSession( 2750 ERR_SPDY_PROTOCOL_ERROR, 2751 "Received WINDOW_UPDATE [delta: " + 2752 base::IntToString(delta_window_size) + 2753 "] for session overflows session_send_window_size_ [current: " + 2754 base::IntToString(session_send_window_size_) + "]"); 2755 DCHECK_NE(result, SESSION_ALREADY_CLOSED); 2756 return; 2757 } 2758 2759 session_send_window_size_ += delta_window_size; 2760 2761 net_log_.AddEvent( 2762 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2763 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2764 delta_window_size, session_send_window_size_)); 2765 2766 DCHECK(!IsSendStalled()); 2767 ResumeSendStalledStreams(); 2768 } 2769 2770 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { 2771 DCHECK_NE(availability_state_, STATE_CLOSED); 2772 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2773 2774 // We only call this method when sending a frame. Therefore, 2775 // |delta_window_size| should be within the valid frame size range. 2776 DCHECK_GE(delta_window_size, 1); 2777 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 2778 2779 // |send_window_size_| should have been at least |delta_window_size| for 2780 // this call to happen. 2781 DCHECK_GE(session_send_window_size_, delta_window_size); 2782 2783 session_send_window_size_ -= delta_window_size; 2784 2785 net_log_.AddEvent( 2786 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2787 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2788 -delta_window_size, session_send_window_size_)); 2789 } 2790 2791 void SpdySession::OnReadBufferConsumed( 2792 size_t consume_size, 2793 SpdyBuffer::ConsumeSource consume_source) { 2794 // We can be called with |in_io_loop_| set if a read SpdyBuffer is 2795 // deleted (e.g., discarded by a SpdyReadQueue). 2796 2797 if (availability_state_ == STATE_CLOSED) 2798 return; 2799 2800 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2801 DCHECK_GE(consume_size, 1u); 2802 DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); 2803 2804 IncreaseRecvWindowSize(static_cast<int32>(consume_size)); 2805 } 2806 2807 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { 2808 DCHECK_NE(availability_state_, STATE_CLOSED); 2809 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2810 DCHECK_GE(session_unacked_recv_window_bytes_, 0); 2811 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); 2812 DCHECK_GE(delta_window_size, 1); 2813 // Check for overflow. 2814 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); 2815 2816 session_recv_window_size_ += delta_window_size; 2817 net_log_.AddEvent( 2818 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 2819 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2820 delta_window_size, session_recv_window_size_)); 2821 2822 session_unacked_recv_window_bytes_ += delta_window_size; 2823 if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) { 2824 SendWindowUpdateFrame(kSessionFlowControlStreamId, 2825 session_unacked_recv_window_bytes_, 2826 HIGHEST); 2827 session_unacked_recv_window_bytes_ = 0; 2828 } 2829 } 2830 2831 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { 2832 CHECK(in_io_loop_); 2833 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2834 DCHECK_GE(delta_window_size, 1); 2835 2836 // Since we never decrease the initial receive window size, 2837 // |delta_window_size| should never cause |recv_window_size_| to go 2838 // negative. If we do, the receive window isn't being respected. 2839 if (delta_window_size > session_recv_window_size_) { 2840 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION); 2841 CloseSessionResult result = DoCloseSession( 2842 ERR_SPDY_PROTOCOL_ERROR, 2843 "delta_window_size is " + base::IntToString(delta_window_size) + 2844 " in DecreaseRecvWindowSize, which is larger than the receive " + 2845 "window size of " + base::IntToString(session_recv_window_size_)); 2846 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED); 2847 return; 2848 } 2849 2850 session_recv_window_size_ -= delta_window_size; 2851 net_log_.AddEvent( 2852 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 2853 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2854 -delta_window_size, session_recv_window_size_)); 2855 } 2856 2857 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) { 2858 DCHECK(stream.send_stalled_by_flow_control()); 2859 stream_send_unstall_queue_[stream.priority()].push_back(stream.stream_id()); 2860 } 2861 2862 namespace { 2863 2864 // Helper function to return the total size of an array of objects 2865 // with .size() member functions. 2866 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) { 2867 size_t total_size = 0; 2868 for (size_t i = 0; i < N; ++i) { 2869 total_size += arr[i].size(); 2870 } 2871 return total_size; 2872 } 2873 2874 } // namespace 2875 2876 void SpdySession::ResumeSendStalledStreams() { 2877 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2878 2879 // We don't have to worry about new streams being queued, since 2880 // doing so would cause IsSendStalled() to return true. But we do 2881 // have to worry about streams being closed, as well as ourselves 2882 // being closed. 2883 2884 while (availability_state_ != STATE_CLOSED && !IsSendStalled()) { 2885 size_t old_size = 0; 2886 if (DCHECK_IS_ON()) 2887 old_size = GetTotalSize(stream_send_unstall_queue_); 2888 2889 SpdyStreamId stream_id = PopStreamToPossiblyResume(); 2890 if (stream_id == 0) 2891 break; 2892 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2893 // The stream may actually still be send-stalled after this (due 2894 // to its own send window) but that's okay -- it'll then be 2895 // resumed once its send window increases. 2896 if (it != active_streams_.end()) 2897 it->second.stream->PossiblyResumeIfSendStalled(); 2898 2899 // The size should decrease unless we got send-stalled again. 2900 if (!IsSendStalled()) 2901 DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size); 2902 } 2903 } 2904 2905 SpdyStreamId SpdySession::PopStreamToPossiblyResume() { 2906 for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { 2907 std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i]; 2908 if (!queue->empty()) { 2909 SpdyStreamId stream_id = queue->front(); 2910 queue->pop_front(); 2911 return stream_id; 2912 } 2913 } 2914 return 0; 2915 } 2916 2917 } // namespace net 2918