1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/spdy/spdy_session.h" 6 7 #include <algorithm> 8 #include <map> 9 10 #include "base/basictypes.h" 11 #include "base/bind.h" 12 #include "base/compiler_specific.h" 13 #include "base/logging.h" 14 #include "base/message_loop/message_loop.h" 15 #include "base/metrics/field_trial.h" 16 #include "base/metrics/histogram.h" 17 #include "base/metrics/sparse_histogram.h" 18 #include "base/metrics/stats_counters.h" 19 #include "base/stl_util.h" 20 #include "base/strings/string_number_conversions.h" 21 #include "base/strings/string_util.h" 22 #include "base/strings/stringprintf.h" 23 #include "base/strings/utf_string_conversions.h" 24 #include "base/time/time.h" 25 #include "base/values.h" 26 #include "crypto/ec_private_key.h" 27 #include "crypto/ec_signature_creator.h" 28 #include "net/base/connection_type_histograms.h" 29 #include "net/base/net_log.h" 30 #include "net/base/net_util.h" 31 #include "net/cert/asn1_util.h" 32 #include "net/cert/cert_verify_result.h" 33 #include "net/http/http_log_util.h" 34 #include "net/http/http_network_session.h" 35 #include "net/http/http_server_properties.h" 36 #include "net/http/http_util.h" 37 #include "net/http/transport_security_state.h" 38 #include "net/socket/ssl_client_socket.h" 39 #include "net/spdy/spdy_buffer_producer.h" 40 #include "net/spdy/spdy_frame_builder.h" 41 #include "net/spdy/spdy_http_utils.h" 42 #include "net/spdy/spdy_protocol.h" 43 #include "net/spdy/spdy_session_pool.h" 44 #include "net/spdy/spdy_stream.h" 45 #include "net/ssl/channel_id_service.h" 46 #include "net/ssl/ssl_cipher_suite_names.h" 47 #include "net/ssl/ssl_connection_status_flags.h" 48 49 namespace net { 50 51 namespace { 52 53 const int kReadBufferSize = 8 * 1024; 54 const int kDefaultConnectionAtRiskOfLossSeconds = 10; 55 const int kHungIntervalSeconds = 10; 56 57 // Minimum seconds that unclaimed pushed streams will be kept in memory. 58 const int kMinPushedStreamLifetimeSeconds = 300; 59 60 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue( 61 const SpdyHeaderBlock& headers, 62 net::NetLog::LogLevel log_level) { 63 scoped_ptr<base::ListValue> headers_list(new base::ListValue()); 64 for (SpdyHeaderBlock::const_iterator it = headers.begin(); 65 it != headers.end(); ++it) { 66 headers_list->AppendString( 67 it->first + ": " + 68 ElideHeaderValueForNetLog(log_level, it->first, it->second)); 69 } 70 return headers_list.Pass(); 71 } 72 73 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers, 74 bool fin, 75 bool unidirectional, 76 SpdyPriority spdy_priority, 77 SpdyStreamId stream_id, 78 NetLog::LogLevel log_level) { 79 base::DictionaryValue* dict = new base::DictionaryValue(); 80 dict->Set("headers", 81 SpdyHeaderBlockToListValue(*headers, log_level).release()); 82 dict->SetBoolean("fin", fin); 83 dict->SetBoolean("unidirectional", unidirectional); 84 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority)); 85 dict->SetInteger("stream_id", stream_id); 86 return dict; 87 } 88 89 base::Value* NetLogSpdySynStreamReceivedCallback( 90 const SpdyHeaderBlock* headers, 91 bool fin, 92 bool unidirectional, 93 SpdyPriority spdy_priority, 94 SpdyStreamId stream_id, 95 SpdyStreamId associated_stream, 96 NetLog::LogLevel log_level) { 97 base::DictionaryValue* dict = new base::DictionaryValue(); 98 dict->Set("headers", 99 SpdyHeaderBlockToListValue(*headers, log_level).release()); 100 dict->SetBoolean("fin", fin); 101 dict->SetBoolean("unidirectional", unidirectional); 102 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority)); 103 dict->SetInteger("stream_id", stream_id); 104 dict->SetInteger("associated_stream", associated_stream); 105 return dict; 106 } 107 108 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback( 109 const SpdyHeaderBlock* headers, 110 bool fin, 111 SpdyStreamId stream_id, 112 NetLog::LogLevel log_level) { 113 base::DictionaryValue* dict = new base::DictionaryValue(); 114 dict->Set("headers", 115 SpdyHeaderBlockToListValue(*headers, log_level).release()); 116 dict->SetBoolean("fin", fin); 117 dict->SetInteger("stream_id", stream_id); 118 return dict; 119 } 120 121 base::Value* NetLogSpdySessionCloseCallback(int net_error, 122 const std::string* description, 123 NetLog::LogLevel /* log_level */) { 124 base::DictionaryValue* dict = new base::DictionaryValue(); 125 dict->SetInteger("net_error", net_error); 126 dict->SetString("description", *description); 127 return dict; 128 } 129 130 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair, 131 NetLog::LogLevel /* log_level */) { 132 base::DictionaryValue* dict = new base::DictionaryValue(); 133 dict->SetString("host", host_pair->first.ToString()); 134 dict->SetString("proxy", host_pair->second.ToPacString()); 135 return dict; 136 } 137 138 base::Value* NetLogSpdyInitializedCallback(NetLog::Source source, 139 const NextProto protocol_version, 140 NetLog::LogLevel /* log_level */) { 141 base::DictionaryValue* dict = new base::DictionaryValue(); 142 if (source.IsValid()) { 143 source.AddToEventParameters(dict); 144 } 145 dict->SetString("protocol", 146 SSLClientSocket::NextProtoToString(protocol_version)); 147 return dict; 148 } 149 150 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair, 151 bool clear_persisted, 152 NetLog::LogLevel /* log_level */) { 153 base::DictionaryValue* dict = new base::DictionaryValue(); 154 dict->SetString("host", host_port_pair.ToString()); 155 dict->SetBoolean("clear_persisted", clear_persisted); 156 return dict; 157 } 158 159 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id, 160 const SpdyMajorVersion protocol_version, 161 SpdySettingsFlags flags, 162 uint32 value, 163 NetLog::LogLevel /* log_level */) { 164 base::DictionaryValue* dict = new base::DictionaryValue(); 165 dict->SetInteger("id", 166 SpdyConstants::SerializeSettingId(protocol_version, id)); 167 dict->SetInteger("flags", flags); 168 dict->SetInteger("value", value); 169 return dict; 170 } 171 172 base::Value* NetLogSpdySendSettingsCallback( 173 const SettingsMap* settings, 174 const SpdyMajorVersion protocol_version, 175 NetLog::LogLevel /* log_level */) { 176 base::DictionaryValue* dict = new base::DictionaryValue(); 177 base::ListValue* settings_list = new base::ListValue(); 178 for (SettingsMap::const_iterator it = settings->begin(); 179 it != settings->end(); ++it) { 180 const SpdySettingsIds id = it->first; 181 const SpdySettingsFlags flags = it->second.first; 182 const uint32 value = it->second.second; 183 settings_list->Append(new base::StringValue(base::StringPrintf( 184 "[id:%u flags:%u value:%u]", 185 SpdyConstants::SerializeSettingId(protocol_version, id), 186 flags, 187 value))); 188 } 189 dict->Set("settings", settings_list); 190 return dict; 191 } 192 193 base::Value* NetLogSpdyWindowUpdateFrameCallback( 194 SpdyStreamId stream_id, 195 uint32 delta, 196 NetLog::LogLevel /* log_level */) { 197 base::DictionaryValue* dict = new base::DictionaryValue(); 198 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 199 dict->SetInteger("delta", delta); 200 return dict; 201 } 202 203 base::Value* NetLogSpdySessionWindowUpdateCallback( 204 int32 delta, 205 int32 window_size, 206 NetLog::LogLevel /* log_level */) { 207 base::DictionaryValue* dict = new base::DictionaryValue(); 208 dict->SetInteger("delta", delta); 209 dict->SetInteger("window_size", window_size); 210 return dict; 211 } 212 213 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id, 214 int size, 215 bool fin, 216 NetLog::LogLevel /* log_level */) { 217 base::DictionaryValue* dict = new base::DictionaryValue(); 218 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 219 dict->SetInteger("size", size); 220 dict->SetBoolean("fin", fin); 221 return dict; 222 } 223 224 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id, 225 int status, 226 const std::string* description, 227 NetLog::LogLevel /* log_level */) { 228 base::DictionaryValue* dict = new base::DictionaryValue(); 229 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 230 dict->SetInteger("status", status); 231 dict->SetString("description", *description); 232 return dict; 233 } 234 235 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id, 236 bool is_ack, 237 const char* type, 238 NetLog::LogLevel /* log_level */) { 239 base::DictionaryValue* dict = new base::DictionaryValue(); 240 dict->SetInteger("unique_id", unique_id); 241 dict->SetString("type", type); 242 dict->SetBoolean("is_ack", is_ack); 243 return dict; 244 } 245 246 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id, 247 int active_streams, 248 int unclaimed_streams, 249 SpdyGoAwayStatus status, 250 NetLog::LogLevel /* log_level */) { 251 base::DictionaryValue* dict = new base::DictionaryValue(); 252 dict->SetInteger("last_accepted_stream_id", 253 static_cast<int>(last_stream_id)); 254 dict->SetInteger("active_streams", active_streams); 255 dict->SetInteger("unclaimed_streams", unclaimed_streams); 256 dict->SetInteger("status", static_cast<int>(status)); 257 return dict; 258 } 259 260 base::Value* NetLogSpdyPushPromiseReceivedCallback( 261 const SpdyHeaderBlock* headers, 262 SpdyStreamId stream_id, 263 SpdyStreamId promised_stream_id, 264 NetLog::LogLevel log_level) { 265 base::DictionaryValue* dict = new base::DictionaryValue(); 266 dict->Set("headers", 267 SpdyHeaderBlockToListValue(*headers, log_level).release()); 268 dict->SetInteger("id", stream_id); 269 dict->SetInteger("promised_stream_id", promised_stream_id); 270 return dict; 271 } 272 273 // Helper function to return the total size of an array of objects 274 // with .size() member functions. 275 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) { 276 size_t total_size = 0; 277 for (size_t i = 0; i < N; ++i) { 278 total_size += arr[i].size(); 279 } 280 return total_size; 281 } 282 283 // Helper class for std:find_if on STL container containing 284 // SpdyStreamRequest weak pointers. 285 class RequestEquals { 286 public: 287 RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request) 288 : request_(request) {} 289 290 bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const { 291 return request_.get() == request.get(); 292 } 293 294 private: 295 const base::WeakPtr<SpdyStreamRequest> request_; 296 }; 297 298 // The maximum number of concurrent streams we will ever create. Even if 299 // the server permits more, we will never exceed this limit. 300 const size_t kMaxConcurrentStreamLimit = 256; 301 302 } // namespace 303 304 SpdyProtocolErrorDetails MapFramerErrorToProtocolError( 305 SpdyFramer::SpdyError err) { 306 switch(err) { 307 case SpdyFramer::SPDY_NO_ERROR: 308 return SPDY_ERROR_NO_ERROR; 309 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME: 310 return SPDY_ERROR_INVALID_CONTROL_FRAME; 311 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE: 312 return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE; 313 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE: 314 return SPDY_ERROR_ZLIB_INIT_FAILURE; 315 case SpdyFramer::SPDY_UNSUPPORTED_VERSION: 316 return SPDY_ERROR_UNSUPPORTED_VERSION; 317 case SpdyFramer::SPDY_DECOMPRESS_FAILURE: 318 return SPDY_ERROR_DECOMPRESS_FAILURE; 319 case SpdyFramer::SPDY_COMPRESS_FAILURE: 320 return SPDY_ERROR_COMPRESS_FAILURE; 321 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT: 322 return SPDY_ERROR_GOAWAY_FRAME_CORRUPT; 323 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT: 324 return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT; 325 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS: 326 return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS; 327 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS: 328 return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS; 329 case SpdyFramer::SPDY_UNEXPECTED_FRAME: 330 return SPDY_ERROR_UNEXPECTED_FRAME; 331 default: 332 NOTREACHED(); 333 return static_cast<SpdyProtocolErrorDetails>(-1); 334 } 335 } 336 337 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) { 338 switch (err) { 339 case SpdyFramer::SPDY_NO_ERROR: 340 return OK; 341 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME: 342 return ERR_SPDY_PROTOCOL_ERROR; 343 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE: 344 return ERR_SPDY_FRAME_SIZE_ERROR; 345 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE: 346 return ERR_SPDY_COMPRESSION_ERROR; 347 case SpdyFramer::SPDY_UNSUPPORTED_VERSION: 348 return ERR_SPDY_PROTOCOL_ERROR; 349 case SpdyFramer::SPDY_DECOMPRESS_FAILURE: 350 return ERR_SPDY_COMPRESSION_ERROR; 351 case SpdyFramer::SPDY_COMPRESS_FAILURE: 352 return ERR_SPDY_COMPRESSION_ERROR; 353 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT: 354 return ERR_SPDY_PROTOCOL_ERROR; 355 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT: 356 return ERR_SPDY_PROTOCOL_ERROR; 357 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS: 358 return ERR_SPDY_PROTOCOL_ERROR; 359 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS: 360 return ERR_SPDY_PROTOCOL_ERROR; 361 case SpdyFramer::SPDY_UNEXPECTED_FRAME: 362 return ERR_SPDY_PROTOCOL_ERROR; 363 default: 364 NOTREACHED(); 365 return ERR_SPDY_PROTOCOL_ERROR; 366 } 367 } 368 369 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError( 370 SpdyRstStreamStatus status) { 371 switch(status) { 372 case RST_STREAM_PROTOCOL_ERROR: 373 return STATUS_CODE_PROTOCOL_ERROR; 374 case RST_STREAM_INVALID_STREAM: 375 return STATUS_CODE_INVALID_STREAM; 376 case RST_STREAM_REFUSED_STREAM: 377 return STATUS_CODE_REFUSED_STREAM; 378 case RST_STREAM_UNSUPPORTED_VERSION: 379 return STATUS_CODE_UNSUPPORTED_VERSION; 380 case RST_STREAM_CANCEL: 381 return STATUS_CODE_CANCEL; 382 case RST_STREAM_INTERNAL_ERROR: 383 return STATUS_CODE_INTERNAL_ERROR; 384 case RST_STREAM_FLOW_CONTROL_ERROR: 385 return STATUS_CODE_FLOW_CONTROL_ERROR; 386 case RST_STREAM_STREAM_IN_USE: 387 return STATUS_CODE_STREAM_IN_USE; 388 case RST_STREAM_STREAM_ALREADY_CLOSED: 389 return STATUS_CODE_STREAM_ALREADY_CLOSED; 390 case RST_STREAM_INVALID_CREDENTIALS: 391 return STATUS_CODE_INVALID_CREDENTIALS; 392 case RST_STREAM_FRAME_SIZE_ERROR: 393 return STATUS_CODE_FRAME_SIZE_ERROR; 394 case RST_STREAM_SETTINGS_TIMEOUT: 395 return STATUS_CODE_SETTINGS_TIMEOUT; 396 case RST_STREAM_CONNECT_ERROR: 397 return STATUS_CODE_CONNECT_ERROR; 398 case RST_STREAM_ENHANCE_YOUR_CALM: 399 return STATUS_CODE_ENHANCE_YOUR_CALM; 400 default: 401 NOTREACHED(); 402 return static_cast<SpdyProtocolErrorDetails>(-1); 403 } 404 } 405 406 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) { 407 switch (err) { 408 case OK: 409 return GOAWAY_NO_ERROR; 410 case ERR_SPDY_PROTOCOL_ERROR: 411 return GOAWAY_PROTOCOL_ERROR; 412 case ERR_SPDY_FLOW_CONTROL_ERROR: 413 return GOAWAY_FLOW_CONTROL_ERROR; 414 case ERR_SPDY_FRAME_SIZE_ERROR: 415 return GOAWAY_FRAME_SIZE_ERROR; 416 case ERR_SPDY_COMPRESSION_ERROR: 417 return GOAWAY_COMPRESSION_ERROR; 418 case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY: 419 return GOAWAY_INADEQUATE_SECURITY; 420 default: 421 return GOAWAY_PROTOCOL_ERROR; 422 } 423 } 424 425 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers, 426 SpdyMajorVersion protocol_version, 427 SpdyHeaderBlock* request_headers, 428 SpdyHeaderBlock* response_headers) { 429 DCHECK(response_headers); 430 DCHECK(request_headers); 431 for (SpdyHeaderBlock::const_iterator it = headers.begin(); 432 it != headers.end(); 433 ++it) { 434 SpdyHeaderBlock* to_insert = response_headers; 435 if (protocol_version == SPDY2) { 436 if (it->first == "url") 437 to_insert = request_headers; 438 } else { 439 const char* host = protocol_version >= SPDY4 ? ":authority" : ":host"; 440 static const char* scheme = ":scheme"; 441 static const char* path = ":path"; 442 if (it->first == host || it->first == scheme || it->first == path) 443 to_insert = request_headers; 444 } 445 to_insert->insert(*it); 446 } 447 } 448 449 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) { 450 Reset(); 451 } 452 453 SpdyStreamRequest::~SpdyStreamRequest() { 454 CancelRequest(); 455 } 456 457 int SpdyStreamRequest::StartRequest( 458 SpdyStreamType type, 459 const base::WeakPtr<SpdySession>& session, 460 const GURL& url, 461 RequestPriority priority, 462 const BoundNetLog& net_log, 463 const CompletionCallback& callback) { 464 DCHECK(session); 465 DCHECK(!session_); 466 DCHECK(!stream_); 467 DCHECK(callback_.is_null()); 468 469 type_ = type; 470 session_ = session; 471 url_ = url; 472 priority_ = priority; 473 net_log_ = net_log; 474 callback_ = callback; 475 476 base::WeakPtr<SpdyStream> stream; 477 int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream); 478 if (rv == OK) { 479 Reset(); 480 stream_ = stream; 481 } 482 return rv; 483 } 484 485 void SpdyStreamRequest::CancelRequest() { 486 if (session_) 487 session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr()); 488 Reset(); 489 // Do this to cancel any pending CompleteStreamRequest() tasks. 490 weak_ptr_factory_.InvalidateWeakPtrs(); 491 } 492 493 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() { 494 DCHECK(!session_); 495 base::WeakPtr<SpdyStream> stream = stream_; 496 DCHECK(stream); 497 Reset(); 498 return stream; 499 } 500 501 void SpdyStreamRequest::OnRequestCompleteSuccess( 502 const base::WeakPtr<SpdyStream>& stream) { 503 DCHECK(session_); 504 DCHECK(!stream_); 505 DCHECK(!callback_.is_null()); 506 CompletionCallback callback = callback_; 507 Reset(); 508 DCHECK(stream); 509 stream_ = stream; 510 callback.Run(OK); 511 } 512 513 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) { 514 DCHECK(session_); 515 DCHECK(!stream_); 516 DCHECK(!callback_.is_null()); 517 CompletionCallback callback = callback_; 518 Reset(); 519 DCHECK_NE(rv, OK); 520 callback.Run(rv); 521 } 522 523 void SpdyStreamRequest::Reset() { 524 type_ = SPDY_BIDIRECTIONAL_STREAM; 525 session_.reset(); 526 stream_.reset(); 527 url_ = GURL(); 528 priority_ = MINIMUM_PRIORITY; 529 net_log_ = BoundNetLog(); 530 callback_.Reset(); 531 } 532 533 SpdySession::ActiveStreamInfo::ActiveStreamInfo() 534 : stream(NULL), 535 waiting_for_syn_reply(false) {} 536 537 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream) 538 : stream(stream), 539 waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) { 540 } 541 542 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {} 543 544 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {} 545 546 SpdySession::PushedStreamInfo::PushedStreamInfo( 547 SpdyStreamId stream_id, 548 base::TimeTicks creation_time) 549 : stream_id(stream_id), 550 creation_time(creation_time) {} 551 552 SpdySession::PushedStreamInfo::~PushedStreamInfo() {} 553 554 // static 555 bool SpdySession::CanPool(TransportSecurityState* transport_security_state, 556 const SSLInfo& ssl_info, 557 const std::string& old_hostname, 558 const std::string& new_hostname) { 559 // Pooling is prohibited if the server cert is not valid for the new domain, 560 // and for connections on which client certs were sent. It is also prohibited 561 // when channel ID was sent if the hosts are from different eTLDs+1. 562 if (IsCertStatusError(ssl_info.cert_status)) 563 return false; 564 565 if (ssl_info.client_cert_sent) 566 return false; 567 568 if (ssl_info.channel_id_sent && 569 ChannelIDService::GetDomainForHost(new_hostname) != 570 ChannelIDService::GetDomainForHost(old_hostname)) { 571 return false; 572 } 573 574 bool unused = false; 575 if (!ssl_info.cert->VerifyNameMatch(new_hostname, &unused)) 576 return false; 577 578 std::string pinning_failure_log; 579 if (!transport_security_state->CheckPublicKeyPins( 580 new_hostname, 581 ssl_info.is_issued_by_known_root, 582 ssl_info.public_key_hashes, 583 &pinning_failure_log)) { 584 return false; 585 } 586 587 return true; 588 } 589 590 SpdySession::SpdySession( 591 const SpdySessionKey& spdy_session_key, 592 const base::WeakPtr<HttpServerProperties>& http_server_properties, 593 TransportSecurityState* transport_security_state, 594 bool verify_domain_authentication, 595 bool enable_sending_initial_data, 596 bool enable_compression, 597 bool enable_ping_based_connection_checking, 598 NextProto default_protocol, 599 size_t stream_initial_recv_window_size, 600 size_t initial_max_concurrent_streams, 601 size_t max_concurrent_streams_limit, 602 TimeFunc time_func, 603 const HostPortPair& trusted_spdy_proxy, 604 NetLog* net_log) 605 : in_io_loop_(false), 606 spdy_session_key_(spdy_session_key), 607 pool_(NULL), 608 http_server_properties_(http_server_properties), 609 transport_security_state_(transport_security_state), 610 read_buffer_(new IOBuffer(kReadBufferSize)), 611 stream_hi_water_mark_(kFirstStreamId), 612 last_accepted_push_stream_id_(0), 613 num_pushed_streams_(0u), 614 num_active_pushed_streams_(0u), 615 in_flight_write_frame_type_(DATA), 616 in_flight_write_frame_size_(0), 617 is_secure_(false), 618 certificate_error_code_(OK), 619 availability_state_(STATE_AVAILABLE), 620 read_state_(READ_STATE_DO_READ), 621 write_state_(WRITE_STATE_IDLE), 622 error_on_close_(OK), 623 max_concurrent_streams_(initial_max_concurrent_streams == 0 624 ? kInitialMaxConcurrentStreams 625 : initial_max_concurrent_streams), 626 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 627 ? kMaxConcurrentStreamLimit 628 : max_concurrent_streams_limit), 629 max_concurrent_pushed_streams_(kMaxConcurrentPushedStreams), 630 streams_initiated_count_(0), 631 streams_pushed_count_(0), 632 streams_pushed_and_claimed_count_(0), 633 streams_abandoned_count_(0), 634 total_bytes_received_(0), 635 sent_settings_(false), 636 received_settings_(false), 637 stalled_streams_(0), 638 pings_in_flight_(0), 639 next_ping_id_(1), 640 last_activity_time_(time_func()), 641 last_compressed_frame_len_(0), 642 check_ping_status_pending_(false), 643 send_connection_header_prefix_(false), 644 flow_control_state_(FLOW_CONTROL_NONE), 645 stream_initial_send_window_size_(kSpdyStreamInitialWindowSize), 646 stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 647 ? kDefaultInitialRecvWindowSize 648 : stream_initial_recv_window_size), 649 session_send_window_size_(0), 650 session_recv_window_size_(0), 651 session_unacked_recv_window_bytes_(0), 652 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)), 653 verify_domain_authentication_(verify_domain_authentication), 654 enable_sending_initial_data_(enable_sending_initial_data), 655 enable_compression_(enable_compression), 656 enable_ping_based_connection_checking_( 657 enable_ping_based_connection_checking), 658 protocol_(default_protocol), 659 connection_at_risk_of_loss_time_( 660 base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)), 661 hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)), 662 trusted_spdy_proxy_(trusted_spdy_proxy), 663 time_func_(time_func), 664 weak_factory_(this) { 665 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion); 666 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion); 667 DCHECK(HttpStreamFactory::spdy_enabled()); 668 net_log_.BeginEvent( 669 NetLog::TYPE_SPDY_SESSION, 670 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair())); 671 next_unclaimed_push_stream_sweep_time_ = time_func_() + 672 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 673 // TODO(mbelshe): consider randomization of the stream_hi_water_mark. 674 } 675 676 SpdySession::~SpdySession() { 677 CHECK(!in_io_loop_); 678 DcheckDraining(); 679 680 // TODO(akalin): Check connection->is_initialized() instead. This 681 // requires re-working CreateFakeSpdySession(), though. 682 DCHECK(connection_->socket()); 683 // With SPDY we can't recycle sockets. 684 connection_->socket()->Disconnect(); 685 686 RecordHistograms(); 687 688 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); 689 } 690 691 void SpdySession::InitializeWithSocket( 692 scoped_ptr<ClientSocketHandle> connection, 693 SpdySessionPool* pool, 694 bool is_secure, 695 int certificate_error_code) { 696 CHECK(!in_io_loop_); 697 DCHECK_EQ(availability_state_, STATE_AVAILABLE); 698 DCHECK_EQ(read_state_, READ_STATE_DO_READ); 699 DCHECK_EQ(write_state_, WRITE_STATE_IDLE); 700 DCHECK(!connection_); 701 702 DCHECK(certificate_error_code == OK || 703 certificate_error_code < ERR_IO_PENDING); 704 // TODO(akalin): Check connection->is_initialized() instead. This 705 // requires re-working CreateFakeSpdySession(), though. 706 DCHECK(connection->socket()); 707 708 base::StatsCounter spdy_sessions("spdy.sessions"); 709 spdy_sessions.Increment(); 710 711 connection_ = connection.Pass(); 712 is_secure_ = is_secure; 713 certificate_error_code_ = certificate_error_code; 714 715 NextProto protocol_negotiated = 716 connection_->socket()->GetNegotiatedProtocol(); 717 if (protocol_negotiated != kProtoUnknown) { 718 protocol_ = protocol_negotiated; 719 } 720 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion); 721 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion); 722 723 if (protocol_ == kProtoSPDY4) 724 send_connection_header_prefix_ = true; 725 726 if (protocol_ >= kProtoSPDY31) { 727 flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION; 728 session_send_window_size_ = kSpdySessionInitialWindowSize; 729 session_recv_window_size_ = kSpdySessionInitialWindowSize; 730 } else if (protocol_ >= kProtoSPDY3) { 731 flow_control_state_ = FLOW_CONTROL_STREAM; 732 } else { 733 flow_control_state_ = FLOW_CONTROL_NONE; 734 } 735 736 buffered_spdy_framer_.reset( 737 new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_), 738 enable_compression_)); 739 buffered_spdy_framer_->set_visitor(this); 740 buffered_spdy_framer_->set_debug_visitor(this); 741 UMA_HISTOGRAM_ENUMERATION( 742 "Net.SpdyVersion2", 743 protocol_ - kProtoSPDYMinimumVersion, 744 kProtoSPDYMaximumVersion - kProtoSPDYMinimumVersion + 1); 745 746 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_INITIALIZED, 747 base::Bind(&NetLogSpdyInitializedCallback, 748 connection_->socket()->NetLog().source(), 749 protocol_)); 750 751 DCHECK_EQ(availability_state_, STATE_AVAILABLE); 752 connection_->AddHigherLayeredPool(this); 753 if (enable_sending_initial_data_) 754 SendInitialData(); 755 pool_ = pool; 756 757 // Bootstrap the read loop. 758 base::MessageLoop::current()->PostTask( 759 FROM_HERE, 760 base::Bind(&SpdySession::PumpReadLoop, 761 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 762 } 763 764 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { 765 if (!verify_domain_authentication_) 766 return true; 767 768 if (availability_state_ == STATE_DRAINING) 769 return false; 770 771 SSLInfo ssl_info; 772 bool was_npn_negotiated; 773 NextProto protocol_negotiated = kProtoUnknown; 774 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated)) 775 return true; // This is not a secure session, so all domains are okay. 776 777 return CanPool(transport_security_state_, ssl_info, 778 host_port_pair().host(), domain); 779 } 780 781 int SpdySession::GetPushStream( 782 const GURL& url, 783 base::WeakPtr<SpdyStream>* stream, 784 const BoundNetLog& stream_net_log) { 785 CHECK(!in_io_loop_); 786 787 stream->reset(); 788 789 if (availability_state_ == STATE_DRAINING) 790 return ERR_CONNECTION_CLOSED; 791 792 Error err = TryAccessStream(url); 793 if (err != OK) 794 return err; 795 796 *stream = GetActivePushStream(url); 797 if (*stream) { 798 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_); 799 streams_pushed_and_claimed_count_++; 800 } 801 return OK; 802 } 803 804 // {,Try}CreateStream() and TryAccessStream() can be called with 805 // |in_io_loop_| set if a stream is being created in response to 806 // another being closed due to received data. 807 808 Error SpdySession::TryAccessStream(const GURL& url) { 809 if (is_secure_ && certificate_error_code_ != OK && 810 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 811 RecordProtocolErrorHistogram( 812 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION); 813 DoDrainSession( 814 static_cast<Error>(certificate_error_code_), 815 "Tried to get SPDY stream for secure content over an unauthenticated " 816 "session."); 817 return ERR_SPDY_PROTOCOL_ERROR; 818 } 819 return OK; 820 } 821 822 int SpdySession::TryCreateStream( 823 const base::WeakPtr<SpdyStreamRequest>& request, 824 base::WeakPtr<SpdyStream>* stream) { 825 DCHECK(request); 826 827 if (availability_state_ == STATE_GOING_AWAY) 828 return ERR_FAILED; 829 830 if (availability_state_ == STATE_DRAINING) 831 return ERR_CONNECTION_CLOSED; 832 833 Error err = TryAccessStream(request->url()); 834 if (err != OK) 835 return err; 836 837 if (!max_concurrent_streams_ || 838 (active_streams_.size() + created_streams_.size() - num_pushed_streams_ < 839 max_concurrent_streams_)) { 840 return CreateStream(*request, stream); 841 } 842 843 stalled_streams_++; 844 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS); 845 RequestPriority priority = request->priority(); 846 CHECK_GE(priority, MINIMUM_PRIORITY); 847 CHECK_LE(priority, MAXIMUM_PRIORITY); 848 pending_create_stream_queues_[priority].push_back(request); 849 return ERR_IO_PENDING; 850 } 851 852 int SpdySession::CreateStream(const SpdyStreamRequest& request, 853 base::WeakPtr<SpdyStream>* stream) { 854 DCHECK_GE(request.priority(), MINIMUM_PRIORITY); 855 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY); 856 857 if (availability_state_ == STATE_GOING_AWAY) 858 return ERR_FAILED; 859 860 if (availability_state_ == STATE_DRAINING) 861 return ERR_CONNECTION_CLOSED; 862 863 Error err = TryAccessStream(request.url()); 864 if (err != OK) { 865 // This should have been caught in TryCreateStream(). 866 NOTREACHED(); 867 return err; 868 } 869 870 DCHECK(connection_->socket()); 871 DCHECK(connection_->socket()->IsConnected()); 872 if (connection_->socket()) { 873 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected", 874 connection_->socket()->IsConnected()); 875 if (!connection_->socket()->IsConnected()) { 876 DoDrainSession( 877 ERR_CONNECTION_CLOSED, 878 "Tried to create SPDY stream for a closed socket connection."); 879 return ERR_CONNECTION_CLOSED; 880 } 881 } 882 883 scoped_ptr<SpdyStream> new_stream( 884 new SpdyStream(request.type(), GetWeakPtr(), request.url(), 885 request.priority(), 886 stream_initial_send_window_size_, 887 stream_initial_recv_window_size_, 888 request.net_log())); 889 *stream = new_stream->GetWeakPtr(); 890 InsertCreatedStream(new_stream.Pass()); 891 892 UMA_HISTOGRAM_CUSTOM_COUNTS( 893 "Net.SpdyPriorityCount", 894 static_cast<int>(request.priority()), 0, 10, 11); 895 896 return OK; 897 } 898 899 void SpdySession::CancelStreamRequest( 900 const base::WeakPtr<SpdyStreamRequest>& request) { 901 DCHECK(request); 902 RequestPriority priority = request->priority(); 903 CHECK_GE(priority, MINIMUM_PRIORITY); 904 CHECK_LE(priority, MAXIMUM_PRIORITY); 905 906 #if DCHECK_IS_ON 907 // |request| should not be in a queue not matching its priority. 908 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { 909 if (priority == i) 910 continue; 911 PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i]; 912 DCHECK(std::find_if(queue->begin(), 913 queue->end(), 914 RequestEquals(request)) == queue->end()); 915 } 916 #endif 917 918 PendingStreamRequestQueue* queue = 919 &pending_create_stream_queues_[priority]; 920 // Remove |request| from |queue| while preserving the order of the 921 // other elements. 922 PendingStreamRequestQueue::iterator it = 923 std::find_if(queue->begin(), queue->end(), RequestEquals(request)); 924 // The request may already be removed if there's a 925 // CompleteStreamRequest() in flight. 926 if (it != queue->end()) { 927 it = queue->erase(it); 928 // |request| should be in the queue at most once, and if it is 929 // present, should not be pending completion. 930 DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) == 931 queue->end()); 932 } 933 } 934 935 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() { 936 for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) { 937 if (pending_create_stream_queues_[j].empty()) 938 continue; 939 940 base::WeakPtr<SpdyStreamRequest> pending_request = 941 pending_create_stream_queues_[j].front(); 942 DCHECK(pending_request); 943 pending_create_stream_queues_[j].pop_front(); 944 return pending_request; 945 } 946 return base::WeakPtr<SpdyStreamRequest>(); 947 } 948 949 void SpdySession::ProcessPendingStreamRequests() { 950 // Like |max_concurrent_streams_|, 0 means infinite for 951 // |max_requests_to_process|. 952 size_t max_requests_to_process = 0; 953 if (max_concurrent_streams_ != 0) { 954 max_requests_to_process = 955 max_concurrent_streams_ - 956 (active_streams_.size() + created_streams_.size()); 957 } 958 for (size_t i = 0; 959 max_requests_to_process == 0 || i < max_requests_to_process; ++i) { 960 base::WeakPtr<SpdyStreamRequest> pending_request = 961 GetNextPendingStreamRequest(); 962 if (!pending_request) 963 break; 964 965 // Note that this post can race with other stream creations, and it's 966 // possible that the un-stalled stream will be stalled again if it loses. 967 // TODO(jgraettinger): Provide stronger ordering guarantees. 968 base::MessageLoop::current()->PostTask( 969 FROM_HERE, 970 base::Bind(&SpdySession::CompleteStreamRequest, 971 weak_factory_.GetWeakPtr(), 972 pending_request)); 973 } 974 } 975 976 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) { 977 pooled_aliases_.insert(alias_key); 978 } 979 980 SpdyMajorVersion SpdySession::GetProtocolVersion() const { 981 DCHECK(buffered_spdy_framer_.get()); 982 return buffered_spdy_framer_->protocol_version(); 983 } 984 985 bool SpdySession::HasAcceptableTransportSecurity() const { 986 // If we're not even using TLS, we have no standards to meet. 987 if (!is_secure_) { 988 return true; 989 } 990 991 // We don't enforce transport security standards for older SPDY versions. 992 if (GetProtocolVersion() < SPDY4) { 993 return true; 994 } 995 996 SSLInfo ssl_info; 997 CHECK(connection_->socket()->GetSSLInfo(&ssl_info)); 998 999 // HTTP/2 requires TLS 1.2+ 1000 if (SSLConnectionStatusToVersion(ssl_info.connection_status) < 1001 SSL_CONNECTION_VERSION_TLS1_2) { 1002 return false; 1003 } 1004 1005 if (!IsSecureTLSCipherSuite( 1006 SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) { 1007 return false; 1008 } 1009 1010 return true; 1011 } 1012 1013 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() { 1014 return weak_factory_.GetWeakPtr(); 1015 } 1016 1017 bool SpdySession::CloseOneIdleConnection() { 1018 CHECK(!in_io_loop_); 1019 DCHECK(pool_); 1020 if (active_streams_.empty()) { 1021 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection."); 1022 } 1023 // Return false as the socket wasn't immediately closed. 1024 return false; 1025 } 1026 1027 void SpdySession::EnqueueStreamWrite( 1028 const base::WeakPtr<SpdyStream>& stream, 1029 SpdyFrameType frame_type, 1030 scoped_ptr<SpdyBufferProducer> producer) { 1031 DCHECK(frame_type == HEADERS || 1032 frame_type == DATA || 1033 frame_type == CREDENTIAL || 1034 frame_type == SYN_STREAM); 1035 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream); 1036 } 1037 1038 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( 1039 SpdyStreamId stream_id, 1040 RequestPriority priority, 1041 SpdyControlFlags flags, 1042 const SpdyHeaderBlock& block) { 1043 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 1044 CHECK(it != active_streams_.end()); 1045 CHECK_EQ(it->second.stream->stream_id(), stream_id); 1046 1047 SendPrefacePingIfNoneInFlight(); 1048 1049 DCHECK(buffered_spdy_framer_.get()); 1050 SpdyPriority spdy_priority = 1051 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()); 1052 1053 scoped_ptr<SpdyFrame> syn_frame; 1054 // TODO(hkhalil): Avoid copy of |block|. 1055 if (GetProtocolVersion() <= SPDY3) { 1056 SpdySynStreamIR syn_stream(stream_id); 1057 syn_stream.set_associated_to_stream_id(0); 1058 syn_stream.set_priority(spdy_priority); 1059 syn_stream.set_fin((flags & CONTROL_FLAG_FIN) != 0); 1060 syn_stream.set_unidirectional((flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0); 1061 syn_stream.set_name_value_block(block); 1062 syn_frame.reset(buffered_spdy_framer_->SerializeFrame(syn_stream)); 1063 } else { 1064 SpdyHeadersIR headers(stream_id); 1065 headers.set_priority(spdy_priority); 1066 headers.set_has_priority(true); 1067 headers.set_fin((flags & CONTROL_FLAG_FIN) != 0); 1068 headers.set_name_value_block(block); 1069 syn_frame.reset(buffered_spdy_framer_->SerializeFrame(headers)); 1070 } 1071 1072 base::StatsCounter spdy_requests("spdy.requests"); 1073 spdy_requests.Increment(); 1074 streams_initiated_count_++; 1075 1076 if (net_log().IsLogging()) { 1077 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_SYN_STREAM, 1078 base::Bind(&NetLogSpdySynStreamSentCallback, 1079 &block, 1080 (flags & CONTROL_FLAG_FIN) != 0, 1081 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0, 1082 spdy_priority, 1083 stream_id)); 1084 } 1085 1086 return syn_frame.Pass(); 1087 } 1088 1089 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, 1090 IOBuffer* data, 1091 int len, 1092 SpdyDataFlags flags) { 1093 if (availability_state_ == STATE_DRAINING) { 1094 return scoped_ptr<SpdyBuffer>(); 1095 } 1096 1097 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 1098 CHECK(it != active_streams_.end()); 1099 SpdyStream* stream = it->second.stream; 1100 CHECK_EQ(stream->stream_id(), stream_id); 1101 1102 if (len < 0) { 1103 NOTREACHED(); 1104 return scoped_ptr<SpdyBuffer>(); 1105 } 1106 1107 int effective_len = std::min(len, kMaxSpdyFrameChunkSize); 1108 1109 bool send_stalled_by_stream = 1110 (flow_control_state_ >= FLOW_CONTROL_STREAM) && 1111 (stream->send_window_size() <= 0); 1112 bool send_stalled_by_session = IsSendStalled(); 1113 1114 // NOTE: There's an enum of the same name in histograms.xml. 1115 enum SpdyFrameFlowControlState { 1116 SEND_NOT_STALLED, 1117 SEND_STALLED_BY_STREAM, 1118 SEND_STALLED_BY_SESSION, 1119 SEND_STALLED_BY_STREAM_AND_SESSION, 1120 }; 1121 1122 SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED; 1123 if (send_stalled_by_stream) { 1124 if (send_stalled_by_session) { 1125 frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION; 1126 } else { 1127 frame_flow_control_state = SEND_STALLED_BY_STREAM; 1128 } 1129 } else if (send_stalled_by_session) { 1130 frame_flow_control_state = SEND_STALLED_BY_SESSION; 1131 } 1132 1133 if (flow_control_state_ == FLOW_CONTROL_STREAM) { 1134 UMA_HISTOGRAM_ENUMERATION( 1135 "Net.SpdyFrameStreamFlowControlState", 1136 frame_flow_control_state, 1137 SEND_STALLED_BY_STREAM + 1); 1138 } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 1139 UMA_HISTOGRAM_ENUMERATION( 1140 "Net.SpdyFrameStreamAndSessionFlowControlState", 1141 frame_flow_control_state, 1142 SEND_STALLED_BY_STREAM_AND_SESSION + 1); 1143 } 1144 1145 // Obey send window size of the stream if stream flow control is 1146 // enabled. 1147 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { 1148 if (send_stalled_by_stream) { 1149 stream->set_send_stalled_by_flow_control(true); 1150 // Even though we're currently stalled only by the stream, we 1151 // might end up being stalled by the session also. 1152 QueueSendStalledStream(*stream); 1153 net_log().AddEvent( 1154 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW, 1155 NetLog::IntegerCallback("stream_id", stream_id)); 1156 return scoped_ptr<SpdyBuffer>(); 1157 } 1158 1159 effective_len = std::min(effective_len, stream->send_window_size()); 1160 } 1161 1162 // Obey send window size of the session if session flow control is 1163 // enabled. 1164 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 1165 if (send_stalled_by_session) { 1166 stream->set_send_stalled_by_flow_control(true); 1167 QueueSendStalledStream(*stream); 1168 net_log().AddEvent( 1169 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW, 1170 NetLog::IntegerCallback("stream_id", stream_id)); 1171 return scoped_ptr<SpdyBuffer>(); 1172 } 1173 1174 effective_len = std::min(effective_len, session_send_window_size_); 1175 } 1176 1177 DCHECK_GE(effective_len, 0); 1178 1179 // Clear FIN flag if only some of the data will be in the data 1180 // frame. 1181 if (effective_len < len) 1182 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 1183 1184 if (net_log().IsLogging()) { 1185 net_log().AddEvent( 1186 NetLog::TYPE_SPDY_SESSION_SEND_DATA, 1187 base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len, 1188 (flags & DATA_FLAG_FIN) != 0)); 1189 } 1190 1191 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. 1192 if (effective_len > 0) 1193 SendPrefacePingIfNoneInFlight(); 1194 1195 // TODO(mbelshe): reduce memory copies here. 1196 DCHECK(buffered_spdy_framer_.get()); 1197 scoped_ptr<SpdyFrame> frame( 1198 buffered_spdy_framer_->CreateDataFrame( 1199 stream_id, data->data(), 1200 static_cast<uint32>(effective_len), flags)); 1201 1202 scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass())); 1203 1204 // Send window size is based on payload size, so nothing to do if this is 1205 // just a FIN with no payload. 1206 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && 1207 effective_len != 0) { 1208 DecreaseSendWindowSize(static_cast<int32>(effective_len)); 1209 data_buffer->AddConsumeCallback( 1210 base::Bind(&SpdySession::OnWriteBufferConsumed, 1211 weak_factory_.GetWeakPtr(), 1212 static_cast<size_t>(effective_len))); 1213 } 1214 1215 return data_buffer.Pass(); 1216 } 1217 1218 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) { 1219 DCHECK_NE(stream_id, 0u); 1220 1221 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1222 if (it == active_streams_.end()) { 1223 NOTREACHED(); 1224 return; 1225 } 1226 1227 CloseActiveStreamIterator(it, status); 1228 } 1229 1230 void SpdySession::CloseCreatedStream( 1231 const base::WeakPtr<SpdyStream>& stream, int status) { 1232 DCHECK_EQ(stream->stream_id(), 0u); 1233 1234 CreatedStreamSet::iterator it = created_streams_.find(stream.get()); 1235 if (it == created_streams_.end()) { 1236 NOTREACHED(); 1237 return; 1238 } 1239 1240 CloseCreatedStreamIterator(it, status); 1241 } 1242 1243 void SpdySession::ResetStream(SpdyStreamId stream_id, 1244 SpdyRstStreamStatus status, 1245 const std::string& description) { 1246 DCHECK_NE(stream_id, 0u); 1247 1248 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1249 if (it == active_streams_.end()) { 1250 NOTREACHED(); 1251 return; 1252 } 1253 1254 ResetStreamIterator(it, status, description); 1255 } 1256 1257 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { 1258 return ContainsKey(active_streams_, stream_id); 1259 } 1260 1261 LoadState SpdySession::GetLoadState() const { 1262 // Just report that we're idle since the session could be doing 1263 // many things concurrently. 1264 return LOAD_STATE_IDLE; 1265 } 1266 1267 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it, 1268 int status) { 1269 // TODO(mbelshe): We should send a RST_STREAM control frame here 1270 // so that the server can cancel a large send. 1271 1272 scoped_ptr<SpdyStream> owned_stream(it->second.stream); 1273 active_streams_.erase(it); 1274 1275 // TODO(akalin): When SpdyStream was ref-counted (and 1276 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this 1277 // was only done when status was not OK. This meant that pushed 1278 // streams can still be claimed after they're closed. This is 1279 // probably something that we still want to support, although server 1280 // push is hardly used. Write tests for this and fix this. (See 1281 // http://crbug.com/261712 .) 1282 if (owned_stream->type() == SPDY_PUSH_STREAM) { 1283 unclaimed_pushed_streams_.erase(owned_stream->url()); 1284 num_pushed_streams_--; 1285 if (!owned_stream->IsReservedRemote()) 1286 num_active_pushed_streams_--; 1287 } 1288 1289 DeleteStream(owned_stream.Pass(), status); 1290 MaybeFinishGoingAway(); 1291 1292 // If there are no active streams and the socket pool is stalled, close the 1293 // session to free up a socket slot. 1294 if (active_streams_.empty() && connection_->IsPoolStalled()) { 1295 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection."); 1296 } 1297 } 1298 1299 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it, 1300 int status) { 1301 scoped_ptr<SpdyStream> owned_stream(*it); 1302 created_streams_.erase(it); 1303 DeleteStream(owned_stream.Pass(), status); 1304 } 1305 1306 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it, 1307 SpdyRstStreamStatus status, 1308 const std::string& description) { 1309 // Send the RST_STREAM frame first as CloseActiveStreamIterator() 1310 // may close us. 1311 SpdyStreamId stream_id = it->first; 1312 RequestPriority priority = it->second.stream->priority(); 1313 EnqueueResetStreamFrame(stream_id, priority, status, description); 1314 1315 // Removes any pending writes for the stream except for possibly an 1316 // in-flight one. 1317 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 1318 } 1319 1320 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id, 1321 RequestPriority priority, 1322 SpdyRstStreamStatus status, 1323 const std::string& description) { 1324 DCHECK_NE(stream_id, 0u); 1325 1326 net_log().AddEvent( 1327 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM, 1328 base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description)); 1329 1330 DCHECK(buffered_spdy_framer_.get()); 1331 scoped_ptr<SpdyFrame> rst_frame( 1332 buffered_spdy_framer_->CreateRstStream(stream_id, status)); 1333 1334 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass()); 1335 RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status)); 1336 } 1337 1338 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) { 1339 CHECK(!in_io_loop_); 1340 if (availability_state_ == STATE_DRAINING) { 1341 return; 1342 } 1343 ignore_result(DoReadLoop(expected_read_state, result)); 1344 } 1345 1346 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { 1347 CHECK(!in_io_loop_); 1348 CHECK_EQ(read_state_, expected_read_state); 1349 1350 in_io_loop_ = true; 1351 1352 int bytes_read_without_yielding = 0; 1353 1354 // Loop until the session is draining, the read becomes blocked, or 1355 // the read limit is exceeded. 1356 while (true) { 1357 switch (read_state_) { 1358 case READ_STATE_DO_READ: 1359 CHECK_EQ(result, OK); 1360 result = DoRead(); 1361 break; 1362 case READ_STATE_DO_READ_COMPLETE: 1363 if (result > 0) 1364 bytes_read_without_yielding += result; 1365 result = DoReadComplete(result); 1366 break; 1367 default: 1368 NOTREACHED() << "read_state_: " << read_state_; 1369 break; 1370 } 1371 1372 if (availability_state_ == STATE_DRAINING) 1373 break; 1374 1375 if (result == ERR_IO_PENDING) 1376 break; 1377 1378 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) { 1379 read_state_ = READ_STATE_DO_READ; 1380 base::MessageLoop::current()->PostTask( 1381 FROM_HERE, 1382 base::Bind(&SpdySession::PumpReadLoop, 1383 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 1384 result = ERR_IO_PENDING; 1385 break; 1386 } 1387 } 1388 1389 CHECK(in_io_loop_); 1390 in_io_loop_ = false; 1391 1392 return result; 1393 } 1394 1395 int SpdySession::DoRead() { 1396 CHECK(in_io_loop_); 1397 1398 CHECK(connection_); 1399 CHECK(connection_->socket()); 1400 read_state_ = READ_STATE_DO_READ_COMPLETE; 1401 return connection_->socket()->Read( 1402 read_buffer_.get(), 1403 kReadBufferSize, 1404 base::Bind(&SpdySession::PumpReadLoop, 1405 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE)); 1406 } 1407 1408 int SpdySession::DoReadComplete(int result) { 1409 CHECK(in_io_loop_); 1410 1411 // Parse a frame. For now this code requires that the frame fit into our 1412 // buffer (kReadBufferSize). 1413 // TODO(mbelshe): support arbitrarily large frames! 1414 1415 if (result == 0) { 1416 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", 1417 total_bytes_received_, 1, 100000000, 50); 1418 DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed"); 1419 1420 return ERR_CONNECTION_CLOSED; 1421 } 1422 1423 if (result < 0) { 1424 DoDrainSession(static_cast<Error>(result), "result is < 0."); 1425 return result; 1426 } 1427 CHECK_LE(result, kReadBufferSize); 1428 total_bytes_received_ += result; 1429 1430 last_activity_time_ = time_func_(); 1431 1432 DCHECK(buffered_spdy_framer_.get()); 1433 char* data = read_buffer_->data(); 1434 while (result > 0) { 1435 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result); 1436 result -= bytes_processed; 1437 data += bytes_processed; 1438 1439 if (availability_state_ == STATE_DRAINING) { 1440 return ERR_CONNECTION_CLOSED; 1441 } 1442 1443 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); 1444 } 1445 1446 read_state_ = READ_STATE_DO_READ; 1447 return OK; 1448 } 1449 1450 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) { 1451 CHECK(!in_io_loop_); 1452 DCHECK_EQ(write_state_, expected_write_state); 1453 1454 DoWriteLoop(expected_write_state, result); 1455 1456 if (availability_state_ == STATE_DRAINING && !in_flight_write_ && 1457 write_queue_.IsEmpty()) { 1458 pool_->RemoveUnavailableSession(GetWeakPtr()); // Destroys |this|. 1459 return; 1460 } 1461 } 1462 1463 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { 1464 CHECK(!in_io_loop_); 1465 DCHECK_NE(write_state_, WRITE_STATE_IDLE); 1466 DCHECK_EQ(write_state_, expected_write_state); 1467 1468 in_io_loop_ = true; 1469 1470 // Loop until the session is closed or the write becomes blocked. 1471 while (true) { 1472 switch (write_state_) { 1473 case WRITE_STATE_DO_WRITE: 1474 DCHECK_EQ(result, OK); 1475 result = DoWrite(); 1476 break; 1477 case WRITE_STATE_DO_WRITE_COMPLETE: 1478 result = DoWriteComplete(result); 1479 break; 1480 case WRITE_STATE_IDLE: 1481 default: 1482 NOTREACHED() << "write_state_: " << write_state_; 1483 break; 1484 } 1485 1486 if (write_state_ == WRITE_STATE_IDLE) { 1487 DCHECK_EQ(result, ERR_IO_PENDING); 1488 break; 1489 } 1490 1491 if (result == ERR_IO_PENDING) 1492 break; 1493 } 1494 1495 CHECK(in_io_loop_); 1496 in_io_loop_ = false; 1497 1498 return result; 1499 } 1500 1501 int SpdySession::DoWrite() { 1502 CHECK(in_io_loop_); 1503 1504 DCHECK(buffered_spdy_framer_); 1505 if (in_flight_write_) { 1506 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1507 } else { 1508 // Grab the next frame to send. 1509 SpdyFrameType frame_type = DATA; 1510 scoped_ptr<SpdyBufferProducer> producer; 1511 base::WeakPtr<SpdyStream> stream; 1512 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) { 1513 write_state_ = WRITE_STATE_IDLE; 1514 return ERR_IO_PENDING; 1515 } 1516 1517 if (stream.get()) 1518 CHECK(!stream->IsClosed()); 1519 1520 // Activate the stream only when sending the SYN_STREAM frame to 1521 // guarantee monotonically-increasing stream IDs. 1522 if (frame_type == SYN_STREAM) { 1523 CHECK(stream.get()); 1524 CHECK_EQ(stream->stream_id(), 0u); 1525 scoped_ptr<SpdyStream> owned_stream = 1526 ActivateCreatedStream(stream.get()); 1527 InsertActivatedStream(owned_stream.Pass()); 1528 1529 if (stream_hi_water_mark_ > kLastStreamId) { 1530 CHECK_EQ(stream->stream_id(), kLastStreamId); 1531 // We've exhausted the stream ID space, and no new streams may be 1532 // created after this one. 1533 MakeUnavailable(); 1534 StartGoingAway(kLastStreamId, ERR_ABORTED); 1535 } 1536 } 1537 1538 in_flight_write_ = producer->ProduceBuffer(); 1539 if (!in_flight_write_) { 1540 NOTREACHED(); 1541 return ERR_UNEXPECTED; 1542 } 1543 in_flight_write_frame_type_ = frame_type; 1544 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); 1545 DCHECK_GE(in_flight_write_frame_size_, 1546 buffered_spdy_framer_->GetFrameMinimumSize()); 1547 in_flight_write_stream_ = stream; 1548 } 1549 1550 write_state_ = WRITE_STATE_DO_WRITE_COMPLETE; 1551 1552 // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems 1553 // with Socket implementations that don't store their IOBuffer 1554 // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). 1555 scoped_refptr<IOBuffer> write_io_buffer = 1556 in_flight_write_->GetIOBufferForRemainingData(); 1557 return connection_->socket()->Write( 1558 write_io_buffer.get(), 1559 in_flight_write_->GetRemainingSize(), 1560 base::Bind(&SpdySession::PumpWriteLoop, 1561 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE)); 1562 } 1563 1564 int SpdySession::DoWriteComplete(int result) { 1565 CHECK(in_io_loop_); 1566 DCHECK_NE(result, ERR_IO_PENDING); 1567 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1568 1569 last_activity_time_ = time_func_(); 1570 1571 if (result < 0) { 1572 DCHECK_NE(result, ERR_IO_PENDING); 1573 in_flight_write_.reset(); 1574 in_flight_write_frame_type_ = DATA; 1575 in_flight_write_frame_size_ = 0; 1576 in_flight_write_stream_.reset(); 1577 write_state_ = WRITE_STATE_DO_WRITE; 1578 DoDrainSession(static_cast<Error>(result), "Write error"); 1579 return OK; 1580 } 1581 1582 // It should not be possible to have written more bytes than our 1583 // in_flight_write_. 1584 DCHECK_LE(static_cast<size_t>(result), 1585 in_flight_write_->GetRemainingSize()); 1586 1587 if (result > 0) { 1588 in_flight_write_->Consume(static_cast<size_t>(result)); 1589 1590 // We only notify the stream when we've fully written the pending frame. 1591 if (in_flight_write_->GetRemainingSize() == 0) { 1592 // It is possible that the stream was cancelled while we were 1593 // writing to the socket. 1594 if (in_flight_write_stream_.get()) { 1595 DCHECK_GT(in_flight_write_frame_size_, 0u); 1596 in_flight_write_stream_->OnFrameWriteComplete( 1597 in_flight_write_frame_type_, 1598 in_flight_write_frame_size_); 1599 } 1600 1601 // Cleanup the write which just completed. 1602 in_flight_write_.reset(); 1603 in_flight_write_frame_type_ = DATA; 1604 in_flight_write_frame_size_ = 0; 1605 in_flight_write_stream_.reset(); 1606 } 1607 } 1608 1609 write_state_ = WRITE_STATE_DO_WRITE; 1610 return OK; 1611 } 1612 1613 void SpdySession::DcheckGoingAway() const { 1614 #if DCHECK_IS_ON 1615 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1616 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { 1617 DCHECK(pending_create_stream_queues_[i].empty()); 1618 } 1619 DCHECK(created_streams_.empty()); 1620 #endif 1621 } 1622 1623 void SpdySession::DcheckDraining() const { 1624 DcheckGoingAway(); 1625 DCHECK_EQ(availability_state_, STATE_DRAINING); 1626 DCHECK(active_streams_.empty()); 1627 DCHECK(unclaimed_pushed_streams_.empty()); 1628 } 1629 1630 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id, 1631 Error status) { 1632 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1633 1634 // The loops below are carefully written to avoid reentrancy problems. 1635 1636 while (true) { 1637 size_t old_size = GetTotalSize(pending_create_stream_queues_); 1638 base::WeakPtr<SpdyStreamRequest> pending_request = 1639 GetNextPendingStreamRequest(); 1640 if (!pending_request) 1641 break; 1642 // No new stream requests should be added while the session is 1643 // going away. 1644 DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_)); 1645 pending_request->OnRequestCompleteFailure(ERR_ABORTED); 1646 } 1647 1648 while (true) { 1649 size_t old_size = active_streams_.size(); 1650 ActiveStreamMap::iterator it = 1651 active_streams_.lower_bound(last_good_stream_id + 1); 1652 if (it == active_streams_.end()) 1653 break; 1654 LogAbandonedActiveStream(it, status); 1655 CloseActiveStreamIterator(it, status); 1656 // No new streams should be activated while the session is going 1657 // away. 1658 DCHECK_GT(old_size, active_streams_.size()); 1659 } 1660 1661 while (!created_streams_.empty()) { 1662 size_t old_size = created_streams_.size(); 1663 CreatedStreamSet::iterator it = created_streams_.begin(); 1664 LogAbandonedStream(*it, status); 1665 CloseCreatedStreamIterator(it, status); 1666 // No new streams should be created while the session is going 1667 // away. 1668 DCHECK_GT(old_size, created_streams_.size()); 1669 } 1670 1671 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id); 1672 1673 DcheckGoingAway(); 1674 } 1675 1676 void SpdySession::MaybeFinishGoingAway() { 1677 if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) { 1678 DoDrainSession(OK, "Finished going away"); 1679 } 1680 } 1681 1682 void SpdySession::DoDrainSession(Error err, const std::string& description) { 1683 if (availability_state_ == STATE_DRAINING) { 1684 return; 1685 } 1686 MakeUnavailable(); 1687 1688 // If |err| indicates an error occurred, inform the peer that we're closing 1689 // and why. Don't GOAWAY on a graceful or idle close, as that may 1690 // unnecessarily wake the radio. We could technically GOAWAY on network errors 1691 // (we'll probably fail to actually write it, but that's okay), however many 1692 // unit-tests would need to be updated. 1693 if (err != OK && 1694 err != ERR_ABORTED && // Used by SpdySessionPool to close idle sessions. 1695 err != ERR_NETWORK_CHANGED && // Used to deprecate sessions on IP change. 1696 err != ERR_SOCKET_NOT_CONNECTED && 1697 err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) { 1698 // Enqueue a GOAWAY to inform the peer of why we're closing the connection. 1699 SpdyGoAwayIR goaway_ir(last_accepted_push_stream_id_, 1700 MapNetErrorToGoAwayStatus(err), 1701 description); 1702 EnqueueSessionWrite(HIGHEST, 1703 GOAWAY, 1704 scoped_ptr<SpdyFrame>( 1705 buffered_spdy_framer_->SerializeFrame(goaway_ir))); 1706 } 1707 1708 availability_state_ = STATE_DRAINING; 1709 error_on_close_ = err; 1710 1711 net_log_.AddEvent( 1712 NetLog::TYPE_SPDY_SESSION_CLOSE, 1713 base::Bind(&NetLogSpdySessionCloseCallback, err, &description)); 1714 1715 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err); 1716 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", 1717 total_bytes_received_, 1, 100000000, 50); 1718 1719 if (err == OK) { 1720 // We ought to be going away already, as this is a graceful close. 1721 DcheckGoingAway(); 1722 } else { 1723 StartGoingAway(0, err); 1724 } 1725 DcheckDraining(); 1726 MaybePostWriteLoop(); 1727 } 1728 1729 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) { 1730 DCHECK(stream); 1731 std::string description = base::StringPrintf( 1732 "ABANDONED (stream_id=%d): ", stream->stream_id()) + 1733 stream->url().spec(); 1734 stream->LogStreamError(status, description); 1735 // We don't increment the streams abandoned counter here. If the 1736 // stream isn't active (i.e., it hasn't written anything to the wire 1737 // yet) then it's as if it never existed. If it is active, then 1738 // LogAbandonedActiveStream() will increment the counters. 1739 } 1740 1741 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it, 1742 Error status) { 1743 DCHECK_GT(it->first, 0u); 1744 LogAbandonedStream(it->second.stream, status); 1745 ++streams_abandoned_count_; 1746 base::StatsCounter abandoned_streams("spdy.abandoned_streams"); 1747 abandoned_streams.Increment(); 1748 if (it->second.stream->type() == SPDY_PUSH_STREAM && 1749 unclaimed_pushed_streams_.find(it->second.stream->url()) != 1750 unclaimed_pushed_streams_.end()) { 1751 base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams"); 1752 abandoned_push_streams.Increment(); 1753 } 1754 } 1755 1756 SpdyStreamId SpdySession::GetNewStreamId() { 1757 CHECK_LE(stream_hi_water_mark_, kLastStreamId); 1758 SpdyStreamId id = stream_hi_water_mark_; 1759 stream_hi_water_mark_ += 2; 1760 return id; 1761 } 1762 1763 void SpdySession::CloseSessionOnError(Error err, 1764 const std::string& description) { 1765 DCHECK_LT(err, ERR_IO_PENDING); 1766 DoDrainSession(err, description); 1767 } 1768 1769 void SpdySession::MakeUnavailable() { 1770 if (availability_state_ == STATE_AVAILABLE) { 1771 availability_state_ = STATE_GOING_AWAY; 1772 pool_->MakeSessionUnavailable(GetWeakPtr()); 1773 } 1774 } 1775 1776 base::Value* SpdySession::GetInfoAsValue() const { 1777 base::DictionaryValue* dict = new base::DictionaryValue(); 1778 1779 dict->SetInteger("source_id", net_log_.source().id); 1780 1781 dict->SetString("host_port_pair", host_port_pair().ToString()); 1782 if (!pooled_aliases_.empty()) { 1783 base::ListValue* alias_list = new base::ListValue(); 1784 for (std::set<SpdySessionKey>::const_iterator it = 1785 pooled_aliases_.begin(); 1786 it != pooled_aliases_.end(); it++) { 1787 alias_list->Append(new base::StringValue( 1788 it->host_port_pair().ToString())); 1789 } 1790 dict->Set("aliases", alias_list); 1791 } 1792 dict->SetString("proxy", host_port_proxy_pair().second.ToURI()); 1793 1794 dict->SetInteger("active_streams", active_streams_.size()); 1795 1796 dict->SetInteger("unclaimed_pushed_streams", 1797 unclaimed_pushed_streams_.size()); 1798 1799 dict->SetBoolean("is_secure", is_secure_); 1800 1801 dict->SetString("protocol_negotiated", 1802 SSLClientSocket::NextProtoToString( 1803 connection_->socket()->GetNegotiatedProtocol())); 1804 1805 dict->SetInteger("error", error_on_close_); 1806 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_); 1807 1808 dict->SetInteger("streams_initiated_count", streams_initiated_count_); 1809 dict->SetInteger("streams_pushed_count", streams_pushed_count_); 1810 dict->SetInteger("streams_pushed_and_claimed_count", 1811 streams_pushed_and_claimed_count_); 1812 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_); 1813 DCHECK(buffered_spdy_framer_.get()); 1814 dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received()); 1815 1816 dict->SetBoolean("sent_settings", sent_settings_); 1817 dict->SetBoolean("received_settings", received_settings_); 1818 1819 dict->SetInteger("send_window_size", session_send_window_size_); 1820 dict->SetInteger("recv_window_size", session_recv_window_size_); 1821 dict->SetInteger("unacked_recv_window_bytes", 1822 session_unacked_recv_window_bytes_); 1823 return dict; 1824 } 1825 1826 bool SpdySession::IsReused() const { 1827 return buffered_spdy_framer_->frames_received() > 0 || 1828 connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE; 1829 } 1830 1831 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id, 1832 LoadTimingInfo* load_timing_info) const { 1833 return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId, 1834 load_timing_info); 1835 } 1836 1837 int SpdySession::GetPeerAddress(IPEndPoint* address) const { 1838 int rv = ERR_SOCKET_NOT_CONNECTED; 1839 if (connection_->socket()) { 1840 rv = connection_->socket()->GetPeerAddress(address); 1841 } 1842 1843 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress", 1844 rv == ERR_SOCKET_NOT_CONNECTED); 1845 1846 return rv; 1847 } 1848 1849 int SpdySession::GetLocalAddress(IPEndPoint* address) const { 1850 int rv = ERR_SOCKET_NOT_CONNECTED; 1851 if (connection_->socket()) { 1852 rv = connection_->socket()->GetLocalAddress(address); 1853 } 1854 1855 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress", 1856 rv == ERR_SOCKET_NOT_CONNECTED); 1857 1858 return rv; 1859 } 1860 1861 void SpdySession::EnqueueSessionWrite(RequestPriority priority, 1862 SpdyFrameType frame_type, 1863 scoped_ptr<SpdyFrame> frame) { 1864 DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS || 1865 frame_type == WINDOW_UPDATE || frame_type == PING || 1866 frame_type == GOAWAY); 1867 EnqueueWrite( 1868 priority, frame_type, 1869 scoped_ptr<SpdyBufferProducer>( 1870 new SimpleBufferProducer( 1871 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), 1872 base::WeakPtr<SpdyStream>()); 1873 } 1874 1875 void SpdySession::EnqueueWrite(RequestPriority priority, 1876 SpdyFrameType frame_type, 1877 scoped_ptr<SpdyBufferProducer> producer, 1878 const base::WeakPtr<SpdyStream>& stream) { 1879 if (availability_state_ == STATE_DRAINING) 1880 return; 1881 1882 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); 1883 MaybePostWriteLoop(); 1884 } 1885 1886 void SpdySession::MaybePostWriteLoop() { 1887 if (write_state_ == WRITE_STATE_IDLE) { 1888 CHECK(!in_flight_write_); 1889 write_state_ = WRITE_STATE_DO_WRITE; 1890 base::MessageLoop::current()->PostTask( 1891 FROM_HERE, 1892 base::Bind(&SpdySession::PumpWriteLoop, 1893 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK)); 1894 } 1895 } 1896 1897 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { 1898 CHECK_EQ(stream->stream_id(), 0u); 1899 CHECK(created_streams_.find(stream.get()) == created_streams_.end()); 1900 created_streams_.insert(stream.release()); 1901 } 1902 1903 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) { 1904 CHECK_EQ(stream->stream_id(), 0u); 1905 CHECK(created_streams_.find(stream) != created_streams_.end()); 1906 stream->set_stream_id(GetNewStreamId()); 1907 scoped_ptr<SpdyStream> owned_stream(stream); 1908 created_streams_.erase(stream); 1909 return owned_stream.Pass(); 1910 } 1911 1912 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) { 1913 SpdyStreamId stream_id = stream->stream_id(); 1914 CHECK_NE(stream_id, 0u); 1915 std::pair<ActiveStreamMap::iterator, bool> result = 1916 active_streams_.insert( 1917 std::make_pair(stream_id, ActiveStreamInfo(stream.get()))); 1918 CHECK(result.second); 1919 ignore_result(stream.release()); 1920 } 1921 1922 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) { 1923 if (in_flight_write_stream_.get() == stream.get()) { 1924 // If we're deleting the stream for the in-flight write, we still 1925 // need to let the write complete, so we clear 1926 // |in_flight_write_stream_| and let the write finish on its own 1927 // without notifying |in_flight_write_stream_|. 1928 in_flight_write_stream_.reset(); 1929 } 1930 1931 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr()); 1932 stream->OnClose(status); 1933 1934 if (availability_state_ == STATE_AVAILABLE) { 1935 ProcessPendingStreamRequests(); 1936 } 1937 } 1938 1939 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) { 1940 base::StatsCounter used_push_streams("spdy.claimed_push_streams"); 1941 1942 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url); 1943 if (unclaimed_it == unclaimed_pushed_streams_.end()) 1944 return base::WeakPtr<SpdyStream>(); 1945 1946 SpdyStreamId stream_id = unclaimed_it->second.stream_id; 1947 unclaimed_pushed_streams_.erase(unclaimed_it); 1948 1949 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 1950 if (active_it == active_streams_.end()) { 1951 NOTREACHED(); 1952 return base::WeakPtr<SpdyStream>(); 1953 } 1954 1955 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM); 1956 used_push_streams.Increment(); 1957 return active_it->second.stream->GetWeakPtr(); 1958 } 1959 1960 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, 1961 bool* was_npn_negotiated, 1962 NextProto* protocol_negotiated) { 1963 *was_npn_negotiated = connection_->socket()->WasNpnNegotiated(); 1964 *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol(); 1965 return connection_->socket()->GetSSLInfo(ssl_info); 1966 } 1967 1968 bool SpdySession::GetSSLCertRequestInfo( 1969 SSLCertRequestInfo* cert_request_info) { 1970 if (!is_secure_) 1971 return false; 1972 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info); 1973 return true; 1974 } 1975 1976 void SpdySession::OnError(SpdyFramer::SpdyError error_code) { 1977 CHECK(in_io_loop_); 1978 1979 RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code)); 1980 std::string description = 1981 base::StringPrintf("Framer error: %d (%s).", 1982 error_code, 1983 SpdyFramer::ErrorCodeToString(error_code)); 1984 DoDrainSession(MapFramerErrorToNetError(error_code), description); 1985 } 1986 1987 void SpdySession::OnStreamError(SpdyStreamId stream_id, 1988 const std::string& description) { 1989 CHECK(in_io_loop_); 1990 1991 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1992 if (it == active_streams_.end()) { 1993 // We still want to send a frame to reset the stream even if we 1994 // don't know anything about it. 1995 EnqueueResetStreamFrame( 1996 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description); 1997 return; 1998 } 1999 2000 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description); 2001 } 2002 2003 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id, 2004 size_t length, 2005 bool fin) { 2006 CHECK(in_io_loop_); 2007 2008 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2009 2010 // By the time data comes in, the stream may already be inactive. 2011 if (it == active_streams_.end()) 2012 return; 2013 2014 SpdyStream* stream = it->second.stream; 2015 CHECK_EQ(stream->stream_id(), stream_id); 2016 2017 DCHECK(buffered_spdy_framer_); 2018 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize(); 2019 stream->IncrementRawReceivedBytes(header_len); 2020 } 2021 2022 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, 2023 const char* data, 2024 size_t len, 2025 bool fin) { 2026 CHECK(in_io_loop_); 2027 2028 if (data == NULL && len != 0) { 2029 // This is notification of consumed data padding. 2030 // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames. 2031 // See crbug.com/353012. 2032 return; 2033 } 2034 2035 DCHECK_LT(len, 1u << 24); 2036 if (net_log().IsLogging()) { 2037 net_log().AddEvent( 2038 NetLog::TYPE_SPDY_SESSION_RECV_DATA, 2039 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); 2040 } 2041 2042 // Build the buffer as early as possible so that we go through the 2043 // session flow control checks and update 2044 // |unacked_recv_window_bytes_| properly even when the stream is 2045 // inactive (since the other side has still reduced its session send 2046 // window). 2047 scoped_ptr<SpdyBuffer> buffer; 2048 if (data) { 2049 DCHECK_GT(len, 0u); 2050 CHECK_LE(len, static_cast<size_t>(kReadBufferSize)); 2051 buffer.reset(new SpdyBuffer(data, len)); 2052 2053 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 2054 DecreaseRecvWindowSize(static_cast<int32>(len)); 2055 buffer->AddConsumeCallback( 2056 base::Bind(&SpdySession::OnReadBufferConsumed, 2057 weak_factory_.GetWeakPtr())); 2058 } 2059 } else { 2060 DCHECK_EQ(len, 0u); 2061 } 2062 2063 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2064 2065 // By the time data comes in, the stream may already be inactive. 2066 if (it == active_streams_.end()) 2067 return; 2068 2069 SpdyStream* stream = it->second.stream; 2070 CHECK_EQ(stream->stream_id(), stream_id); 2071 2072 stream->IncrementRawReceivedBytes(len); 2073 2074 if (it->second.waiting_for_syn_reply) { 2075 const std::string& error = "Data received before SYN_REPLY."; 2076 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2077 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 2078 return; 2079 } 2080 2081 stream->OnDataReceived(buffer.Pass()); 2082 } 2083 2084 void SpdySession::OnSettings(bool clear_persisted) { 2085 CHECK(in_io_loop_); 2086 2087 if (clear_persisted) 2088 http_server_properties_->ClearSpdySettings(host_port_pair()); 2089 2090 if (net_log_.IsLogging()) { 2091 net_log_.AddEvent( 2092 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS, 2093 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(), 2094 clear_persisted)); 2095 } 2096 2097 if (GetProtocolVersion() >= SPDY4) { 2098 // Send an acknowledgment of the setting. 2099 SpdySettingsIR settings_ir; 2100 settings_ir.set_is_ack(true); 2101 EnqueueSessionWrite( 2102 HIGHEST, 2103 SETTINGS, 2104 scoped_ptr<SpdyFrame>( 2105 buffered_spdy_framer_->SerializeFrame(settings_ir))); 2106 } 2107 } 2108 2109 void SpdySession::OnSetting(SpdySettingsIds id, 2110 uint8 flags, 2111 uint32 value) { 2112 CHECK(in_io_loop_); 2113 2114 HandleSetting(id, value); 2115 http_server_properties_->SetSpdySetting( 2116 host_port_pair(), 2117 id, 2118 static_cast<SpdySettingsFlags>(flags), 2119 value); 2120 received_settings_ = true; 2121 2122 // Log the setting. 2123 const SpdyMajorVersion protocol_version = GetProtocolVersion(); 2124 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_SETTING, 2125 base::Bind(&NetLogSpdySettingCallback, 2126 id, 2127 protocol_version, 2128 static_cast<SpdySettingsFlags>(flags), 2129 value)); 2130 } 2131 2132 void SpdySession::OnSendCompressedFrame( 2133 SpdyStreamId stream_id, 2134 SpdyFrameType type, 2135 size_t payload_len, 2136 size_t frame_len) { 2137 if (type != SYN_STREAM && type != HEADERS) 2138 return; 2139 2140 DCHECK(buffered_spdy_framer_.get()); 2141 size_t compressed_len = 2142 frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize(); 2143 2144 if (payload_len) { 2145 // Make sure we avoid early decimal truncation. 2146 int compression_pct = 100 - (100 * compressed_len) / payload_len; 2147 UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage", 2148 compression_pct); 2149 } 2150 } 2151 2152 void SpdySession::OnReceiveCompressedFrame( 2153 SpdyStreamId stream_id, 2154 SpdyFrameType type, 2155 size_t frame_len) { 2156 last_compressed_frame_len_ = frame_len; 2157 } 2158 2159 int SpdySession::OnInitialResponseHeadersReceived( 2160 const SpdyHeaderBlock& response_headers, 2161 base::Time response_time, 2162 base::TimeTicks recv_first_byte_time, 2163 SpdyStream* stream) { 2164 CHECK(in_io_loop_); 2165 SpdyStreamId stream_id = stream->stream_id(); 2166 2167 if (stream->type() == SPDY_PUSH_STREAM) { 2168 DCHECK(stream->IsReservedRemote()); 2169 if (max_concurrent_pushed_streams_ && 2170 num_active_pushed_streams_ >= max_concurrent_pushed_streams_) { 2171 ResetStream(stream_id, 2172 RST_STREAM_REFUSED_STREAM, 2173 "Stream concurrency limit reached."); 2174 return STATUS_CODE_REFUSED_STREAM; 2175 } 2176 } 2177 2178 if (stream->type() == SPDY_PUSH_STREAM) { 2179 // Will be balanced in DeleteStream. 2180 num_active_pushed_streams_++; 2181 } 2182 2183 // May invalidate |stream|. 2184 int rv = stream->OnInitialResponseHeadersReceived( 2185 response_headers, response_time, recv_first_byte_time); 2186 if (rv < 0) { 2187 DCHECK_NE(rv, ERR_IO_PENDING); 2188 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 2189 } 2190 2191 return rv; 2192 } 2193 2194 void SpdySession::OnSynStream(SpdyStreamId stream_id, 2195 SpdyStreamId associated_stream_id, 2196 SpdyPriority priority, 2197 bool fin, 2198 bool unidirectional, 2199 const SpdyHeaderBlock& headers) { 2200 CHECK(in_io_loop_); 2201 2202 if (GetProtocolVersion() >= SPDY4) { 2203 DCHECK_EQ(0u, associated_stream_id); 2204 OnHeaders(stream_id, fin, headers); 2205 return; 2206 } 2207 2208 base::Time response_time = base::Time::Now(); 2209 base::TimeTicks recv_first_byte_time = time_func_(); 2210 2211 if (net_log_.IsLogging()) { 2212 net_log_.AddEvent( 2213 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM, 2214 base::Bind(&NetLogSpdySynStreamReceivedCallback, 2215 &headers, fin, unidirectional, priority, 2216 stream_id, associated_stream_id)); 2217 } 2218 2219 // Split headers to simulate push promise and response. 2220 SpdyHeaderBlock request_headers; 2221 SpdyHeaderBlock response_headers; 2222 SplitPushedHeadersToRequestAndResponse( 2223 headers, GetProtocolVersion(), &request_headers, &response_headers); 2224 2225 if (!TryCreatePushStream( 2226 stream_id, associated_stream_id, priority, request_headers)) 2227 return; 2228 2229 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 2230 if (active_it == active_streams_.end()) { 2231 NOTREACHED(); 2232 return; 2233 } 2234 2235 if (OnInitialResponseHeadersReceived(response_headers, 2236 response_time, 2237 recv_first_byte_time, 2238 active_it->second.stream) != OK) 2239 return; 2240 2241 base::StatsCounter push_requests("spdy.pushed_streams"); 2242 push_requests.Increment(); 2243 } 2244 2245 void SpdySession::DeleteExpiredPushedStreams() { 2246 if (unclaimed_pushed_streams_.empty()) 2247 return; 2248 2249 // Check that adequate time has elapsed since the last sweep. 2250 if (time_func_() < next_unclaimed_push_stream_sweep_time_) 2251 return; 2252 2253 // Gather old streams to delete. 2254 base::TimeTicks minimum_freshness = time_func_() - 2255 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2256 std::vector<SpdyStreamId> streams_to_close; 2257 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); 2258 it != unclaimed_pushed_streams_.end(); ++it) { 2259 if (minimum_freshness > it->second.creation_time) 2260 streams_to_close.push_back(it->second.stream_id); 2261 } 2262 2263 for (std::vector<SpdyStreamId>::const_iterator to_close_it = 2264 streams_to_close.begin(); 2265 to_close_it != streams_to_close.end(); ++to_close_it) { 2266 ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it); 2267 if (active_it == active_streams_.end()) 2268 continue; 2269 2270 LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM); 2271 // CloseActiveStreamIterator() will remove the stream from 2272 // |unclaimed_pushed_streams_|. 2273 ResetStreamIterator( 2274 active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed."); 2275 } 2276 2277 next_unclaimed_push_stream_sweep_time_ = time_func_() + 2278 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2279 } 2280 2281 void SpdySession::OnSynReply(SpdyStreamId stream_id, 2282 bool fin, 2283 const SpdyHeaderBlock& headers) { 2284 CHECK(in_io_loop_); 2285 2286 base::Time response_time = base::Time::Now(); 2287 base::TimeTicks recv_first_byte_time = time_func_(); 2288 2289 if (net_log().IsLogging()) { 2290 net_log().AddEvent( 2291 NetLog::TYPE_SPDY_SESSION_SYN_REPLY, 2292 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback, 2293 &headers, fin, stream_id)); 2294 } 2295 2296 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2297 if (it == active_streams_.end()) { 2298 // NOTE: it may just be that the stream was cancelled. 2299 return; 2300 } 2301 2302 SpdyStream* stream = it->second.stream; 2303 CHECK_EQ(stream->stream_id(), stream_id); 2304 2305 stream->IncrementRawReceivedBytes(last_compressed_frame_len_); 2306 last_compressed_frame_len_ = 0; 2307 2308 if (GetProtocolVersion() >= SPDY4) { 2309 const std::string& error = 2310 "SPDY4 wasn't expecting SYN_REPLY."; 2311 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2312 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 2313 return; 2314 } 2315 if (!it->second.waiting_for_syn_reply) { 2316 const std::string& error = 2317 "Received duplicate SYN_REPLY for stream."; 2318 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2319 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 2320 return; 2321 } 2322 it->second.waiting_for_syn_reply = false; 2323 2324 ignore_result(OnInitialResponseHeadersReceived( 2325 headers, response_time, recv_first_byte_time, stream)); 2326 } 2327 2328 void SpdySession::OnHeaders(SpdyStreamId stream_id, 2329 bool fin, 2330 const SpdyHeaderBlock& headers) { 2331 CHECK(in_io_loop_); 2332 2333 if (net_log().IsLogging()) { 2334 net_log().AddEvent( 2335 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, 2336 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback, 2337 &headers, fin, stream_id)); 2338 } 2339 2340 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2341 if (it == active_streams_.end()) { 2342 // NOTE: it may just be that the stream was cancelled. 2343 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id; 2344 return; 2345 } 2346 2347 SpdyStream* stream = it->second.stream; 2348 CHECK_EQ(stream->stream_id(), stream_id); 2349 2350 stream->IncrementRawReceivedBytes(last_compressed_frame_len_); 2351 last_compressed_frame_len_ = 0; 2352 2353 base::Time response_time = base::Time::Now(); 2354 base::TimeTicks recv_first_byte_time = time_func_(); 2355 2356 if (it->second.waiting_for_syn_reply) { 2357 if (GetProtocolVersion() < SPDY4) { 2358 const std::string& error = 2359 "Was expecting SYN_REPLY, not HEADERS."; 2360 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2361 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 2362 return; 2363 } 2364 2365 it->second.waiting_for_syn_reply = false; 2366 ignore_result(OnInitialResponseHeadersReceived( 2367 headers, response_time, recv_first_byte_time, stream)); 2368 } else if (it->second.stream->IsReservedRemote()) { 2369 ignore_result(OnInitialResponseHeadersReceived( 2370 headers, response_time, recv_first_byte_time, stream)); 2371 } else { 2372 int rv = stream->OnAdditionalResponseHeadersReceived(headers); 2373 if (rv < 0) { 2374 DCHECK_NE(rv, ERR_IO_PENDING); 2375 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 2376 } 2377 } 2378 } 2379 2380 bool SpdySession::OnUnknownFrame(SpdyStreamId stream_id, int frame_type) { 2381 // Validate stream id. 2382 // Was the frame sent on a stream id that has not been used in this session? 2383 if (stream_id % 2 == 1 && stream_id > stream_hi_water_mark_) 2384 return false; 2385 2386 if (stream_id % 2 == 0 && stream_id > last_accepted_push_stream_id_) 2387 return false; 2388 2389 return true; 2390 } 2391 2392 void SpdySession::OnRstStream(SpdyStreamId stream_id, 2393 SpdyRstStreamStatus status) { 2394 CHECK(in_io_loop_); 2395 2396 std::string description; 2397 net_log().AddEvent( 2398 NetLog::TYPE_SPDY_SESSION_RST_STREAM, 2399 base::Bind(&NetLogSpdyRstCallback, 2400 stream_id, status, &description)); 2401 2402 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2403 if (it == active_streams_.end()) { 2404 // NOTE: it may just be that the stream was cancelled. 2405 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 2406 return; 2407 } 2408 2409 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2410 2411 if (status == 0) { 2412 it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>()); 2413 } else if (status == RST_STREAM_REFUSED_STREAM) { 2414 CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM); 2415 } else { 2416 RecordProtocolErrorHistogram( 2417 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); 2418 it->second.stream->LogStreamError( 2419 ERR_SPDY_PROTOCOL_ERROR, 2420 base::StringPrintf("SPDY stream closed with status: %d", status)); 2421 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 2422 // For now, it doesn't matter much - it is a protocol error. 2423 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 2424 } 2425 } 2426 2427 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, 2428 SpdyGoAwayStatus status) { 2429 CHECK(in_io_loop_); 2430 2431 // TODO(jgraettinger): UMA histogram on |status|. 2432 2433 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, 2434 base::Bind(&NetLogSpdyGoAwayCallback, 2435 last_accepted_stream_id, 2436 active_streams_.size(), 2437 unclaimed_pushed_streams_.size(), 2438 status)); 2439 MakeUnavailable(); 2440 StartGoingAway(last_accepted_stream_id, ERR_ABORTED); 2441 // This is to handle the case when we already don't have any active 2442 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have 2443 // active streams and so the last one being closed will finish the 2444 // going away process (see DeleteStream()). 2445 MaybeFinishGoingAway(); 2446 } 2447 2448 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) { 2449 CHECK(in_io_loop_); 2450 2451 net_log_.AddEvent( 2452 NetLog::TYPE_SPDY_SESSION_PING, 2453 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received")); 2454 2455 // Send response to a PING from server. 2456 if ((protocol_ >= kProtoSPDY4 && !is_ack) || 2457 (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) { 2458 WritePingFrame(unique_id, true); 2459 return; 2460 } 2461 2462 --pings_in_flight_; 2463 if (pings_in_flight_ < 0) { 2464 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING); 2465 DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0."); 2466 pings_in_flight_ = 0; 2467 return; 2468 } 2469 2470 if (pings_in_flight_ > 0) 2471 return; 2472 2473 // We will record RTT in histogram when there are no more client sent 2474 // pings_in_flight_. 2475 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_); 2476 } 2477 2478 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, 2479 uint32 delta_window_size) { 2480 CHECK(in_io_loop_); 2481 2482 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); 2483 net_log_.AddEvent( 2484 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, 2485 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2486 stream_id, delta_window_size)); 2487 2488 if (stream_id == kSessionFlowControlStreamId) { 2489 // WINDOW_UPDATE for the session. 2490 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) { 2491 LOG(WARNING) << "Received WINDOW_UPDATE for session when " 2492 << "session flow control is not turned on"; 2493 // TODO(akalin): Record an error and close the session. 2494 return; 2495 } 2496 2497 if (delta_window_size < 1u) { 2498 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2499 DoDrainSession( 2500 ERR_SPDY_PROTOCOL_ERROR, 2501 "Received WINDOW_UPDATE with an invalid delta_window_size " + 2502 base::UintToString(delta_window_size)); 2503 return; 2504 } 2505 2506 IncreaseSendWindowSize(static_cast<int32>(delta_window_size)); 2507 } else { 2508 // WINDOW_UPDATE for a stream. 2509 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2510 // TODO(akalin): Record an error and close the session. 2511 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id 2512 << " when flow control is not turned on"; 2513 return; 2514 } 2515 2516 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2517 2518 if (it == active_streams_.end()) { 2519 // NOTE: it may just be that the stream was cancelled. 2520 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id; 2521 return; 2522 } 2523 2524 SpdyStream* stream = it->second.stream; 2525 CHECK_EQ(stream->stream_id(), stream_id); 2526 2527 if (delta_window_size < 1u) { 2528 ResetStreamIterator(it, 2529 RST_STREAM_FLOW_CONTROL_ERROR, 2530 base::StringPrintf( 2531 "Received WINDOW_UPDATE with an invalid " 2532 "delta_window_size %ud", delta_window_size)); 2533 return; 2534 } 2535 2536 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2537 it->second.stream->IncreaseSendWindowSize( 2538 static_cast<int32>(delta_window_size)); 2539 } 2540 } 2541 2542 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id, 2543 SpdyStreamId associated_stream_id, 2544 SpdyPriority priority, 2545 const SpdyHeaderBlock& headers) { 2546 // Server-initiated streams should have even sequence numbers. 2547 if ((stream_id & 0x1) != 0) { 2548 LOG(WARNING) << "Received invalid push stream id " << stream_id; 2549 if (GetProtocolVersion() > SPDY2) 2550 CloseSessionOnError(ERR_SPDY_PROTOCOL_ERROR, "Odd push stream id."); 2551 return false; 2552 } 2553 2554 if (GetProtocolVersion() > SPDY2) { 2555 if (stream_id <= last_accepted_push_stream_id_) { 2556 LOG(WARNING) << "Received push stream id lesser or equal to the last " 2557 << "accepted before " << stream_id; 2558 CloseSessionOnError( 2559 ERR_SPDY_PROTOCOL_ERROR, 2560 "New push stream id must be greater than the last accepted."); 2561 return false; 2562 } 2563 } 2564 2565 if (IsStreamActive(stream_id)) { 2566 // For SPDY3 and higher we should not get here, we'll start going away 2567 // earlier on |last_seen_push_stream_id_| check. 2568 CHECK_GT(SPDY3, GetProtocolVersion()); 2569 LOG(WARNING) << "Received push for active stream " << stream_id; 2570 return false; 2571 } 2572 2573 last_accepted_push_stream_id_ = stream_id; 2574 2575 RequestPriority request_priority = 2576 ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion()); 2577 2578 if (availability_state_ == STATE_GOING_AWAY) { 2579 // TODO(akalin): This behavior isn't in the SPDY spec, although it 2580 // probably should be. 2581 EnqueueResetStreamFrame(stream_id, 2582 request_priority, 2583 RST_STREAM_REFUSED_STREAM, 2584 "push stream request received when going away"); 2585 return false; 2586 } 2587 2588 if (associated_stream_id == 0) { 2589 // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and 2590 // session going away. We should never get here. 2591 CHECK_GT(SPDY4, GetProtocolVersion()); 2592 std::string description = base::StringPrintf( 2593 "Received invalid associated stream id %d for pushed stream %d", 2594 associated_stream_id, 2595 stream_id); 2596 EnqueueResetStreamFrame( 2597 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description); 2598 return false; 2599 } 2600 2601 streams_pushed_count_++; 2602 2603 // TODO(mbelshe): DCHECK that this is a GET method? 2604 2605 // Verify that the response had a URL for us. 2606 GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true); 2607 if (!gurl.is_valid()) { 2608 EnqueueResetStreamFrame(stream_id, 2609 request_priority, 2610 RST_STREAM_PROTOCOL_ERROR, 2611 "Pushed stream url was invalid: " + gurl.spec()); 2612 return false; 2613 } 2614 2615 // Verify we have a valid stream association. 2616 ActiveStreamMap::iterator associated_it = 2617 active_streams_.find(associated_stream_id); 2618 if (associated_it == active_streams_.end()) { 2619 EnqueueResetStreamFrame( 2620 stream_id, 2621 request_priority, 2622 RST_STREAM_INVALID_STREAM, 2623 base::StringPrintf("Received push for inactive associated stream %d", 2624 associated_stream_id)); 2625 return false; 2626 } 2627 2628 // Check that the pushed stream advertises the same origin as its associated 2629 // stream. Bypass this check if and only if this session is with a SPDY proxy 2630 // that is trusted explicitly via the --trusted-spdy-proxy switch. 2631 if (trusted_spdy_proxy_.Equals(host_port_pair())) { 2632 // Disallow pushing of HTTPS content. 2633 if (gurl.SchemeIs("https")) { 2634 EnqueueResetStreamFrame( 2635 stream_id, 2636 request_priority, 2637 RST_STREAM_REFUSED_STREAM, 2638 base::StringPrintf("Rejected push of Cross Origin HTTPS content %d", 2639 associated_stream_id)); 2640 } 2641 } else { 2642 GURL associated_url(associated_it->second.stream->GetUrlFromHeaders()); 2643 if (associated_url.GetOrigin() != gurl.GetOrigin()) { 2644 EnqueueResetStreamFrame( 2645 stream_id, 2646 request_priority, 2647 RST_STREAM_REFUSED_STREAM, 2648 base::StringPrintf("Rejected Cross Origin Push Stream %d", 2649 associated_stream_id)); 2650 return false; 2651 } 2652 } 2653 2654 // There should not be an existing pushed stream with the same path. 2655 PushedStreamMap::iterator pushed_it = 2656 unclaimed_pushed_streams_.lower_bound(gurl); 2657 if (pushed_it != unclaimed_pushed_streams_.end() && 2658 pushed_it->first == gurl) { 2659 EnqueueResetStreamFrame( 2660 stream_id, 2661 request_priority, 2662 RST_STREAM_PROTOCOL_ERROR, 2663 "Received duplicate pushed stream with url: " + gurl.spec()); 2664 return false; 2665 } 2666 2667 scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM, 2668 GetWeakPtr(), 2669 gurl, 2670 request_priority, 2671 stream_initial_send_window_size_, 2672 stream_initial_recv_window_size_, 2673 net_log_)); 2674 stream->set_stream_id(stream_id); 2675 2676 // In spdy4/http2 PUSH_PROMISE arrives on associated stream. 2677 if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) { 2678 associated_it->second.stream->IncrementRawReceivedBytes( 2679 last_compressed_frame_len_); 2680 } else { 2681 stream->IncrementRawReceivedBytes(last_compressed_frame_len_); 2682 } 2683 2684 last_compressed_frame_len_ = 0; 2685 2686 DeleteExpiredPushedStreams(); 2687 PushedStreamMap::iterator inserted_pushed_it = 2688 unclaimed_pushed_streams_.insert( 2689 pushed_it, 2690 std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_()))); 2691 DCHECK(inserted_pushed_it != pushed_it); 2692 2693 InsertActivatedStream(stream.Pass()); 2694 2695 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 2696 if (active_it == active_streams_.end()) { 2697 NOTREACHED(); 2698 return false; 2699 } 2700 2701 active_it->second.stream->OnPushPromiseHeadersReceived(headers); 2702 DCHECK(active_it->second.stream->IsReservedRemote()); 2703 num_pushed_streams_++; 2704 return true; 2705 } 2706 2707 void SpdySession::OnPushPromise(SpdyStreamId stream_id, 2708 SpdyStreamId promised_stream_id, 2709 const SpdyHeaderBlock& headers) { 2710 CHECK(in_io_loop_); 2711 2712 if (net_log_.IsLogging()) { 2713 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE, 2714 base::Bind(&NetLogSpdyPushPromiseReceivedCallback, 2715 &headers, 2716 stream_id, 2717 promised_stream_id)); 2718 } 2719 2720 // Any priority will do. 2721 // TODO(baranovich): pass parent stream id priority? 2722 if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers)) 2723 return; 2724 2725 base::StatsCounter push_requests("spdy.pushed_streams"); 2726 push_requests.Increment(); 2727 } 2728 2729 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id, 2730 uint32 delta_window_size) { 2731 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2732 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2733 CHECK(it != active_streams_.end()); 2734 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2735 SendWindowUpdateFrame( 2736 stream_id, delta_window_size, it->second.stream->priority()); 2737 } 2738 2739 void SpdySession::SendInitialData() { 2740 DCHECK(enable_sending_initial_data_); 2741 2742 if (send_connection_header_prefix_) { 2743 DCHECK_EQ(protocol_, kProtoSPDY4); 2744 scoped_ptr<SpdyFrame> connection_header_prefix_frame( 2745 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix), 2746 kHttp2ConnectionHeaderPrefixSize, 2747 false /* take_ownership */)); 2748 // Count the prefix as part of the subsequent SETTINGS frame. 2749 EnqueueSessionWrite(HIGHEST, SETTINGS, 2750 connection_header_prefix_frame.Pass()); 2751 } 2752 2753 // First, notify the server about the settings they should use when 2754 // communicating with us. 2755 SettingsMap settings_map; 2756 // Create a new settings frame notifying the server of our 2757 // max concurrent streams and initial window size. 2758 settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] = 2759 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams); 2760 if (flow_control_state_ >= FLOW_CONTROL_STREAM && 2761 stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) { 2762 settings_map[SETTINGS_INITIAL_WINDOW_SIZE] = 2763 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, 2764 stream_initial_recv_window_size_); 2765 } 2766 SendSettings(settings_map); 2767 2768 // Next, notify the server about our initial recv window size. 2769 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 2770 // Bump up the receive window size to the real initial value. This 2771 // has to go here since the WINDOW_UPDATE frame sent by 2772 // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|. 2773 DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_); 2774 // This condition implies that |kDefaultInitialRecvWindowSize| - 2775 // |session_recv_window_size_| doesn't overflow. 2776 DCHECK_GT(session_recv_window_size_, 0); 2777 IncreaseRecvWindowSize( 2778 kDefaultInitialRecvWindowSize - session_recv_window_size_); 2779 } 2780 2781 if (protocol_ <= kProtoSPDY31) { 2782 // Finally, notify the server about the settings they have 2783 // previously told us to use when communicating with them (after 2784 // applying them). 2785 const SettingsMap& server_settings_map = 2786 http_server_properties_->GetSpdySettings(host_port_pair()); 2787 if (server_settings_map.empty()) 2788 return; 2789 2790 SettingsMap::const_iterator it = 2791 server_settings_map.find(SETTINGS_CURRENT_CWND); 2792 uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0; 2793 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100); 2794 2795 for (SettingsMap::const_iterator it = server_settings_map.begin(); 2796 it != server_settings_map.end(); ++it) { 2797 const SpdySettingsIds new_id = it->first; 2798 const uint32 new_val = it->second.second; 2799 HandleSetting(new_id, new_val); 2800 } 2801 2802 SendSettings(server_settings_map); 2803 } 2804 } 2805 2806 2807 void SpdySession::SendSettings(const SettingsMap& settings) { 2808 const SpdyMajorVersion protocol_version = GetProtocolVersion(); 2809 net_log_.AddEvent( 2810 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 2811 base::Bind(&NetLogSpdySendSettingsCallback, &settings, protocol_version)); 2812 // Create the SETTINGS frame and send it. 2813 DCHECK(buffered_spdy_framer_.get()); 2814 scoped_ptr<SpdyFrame> settings_frame( 2815 buffered_spdy_framer_->CreateSettings(settings)); 2816 sent_settings_ = true; 2817 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass()); 2818 } 2819 2820 void SpdySession::HandleSetting(uint32 id, uint32 value) { 2821 switch (id) { 2822 case SETTINGS_MAX_CONCURRENT_STREAMS: 2823 max_concurrent_streams_ = std::min(static_cast<size_t>(value), 2824 kMaxConcurrentStreamLimit); 2825 ProcessPendingStreamRequests(); 2826 break; 2827 case SETTINGS_INITIAL_WINDOW_SIZE: { 2828 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2829 net_log().AddEvent( 2830 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL); 2831 return; 2832 } 2833 2834 if (value > static_cast<uint32>(kint32max)) { 2835 net_log().AddEvent( 2836 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE, 2837 NetLog::IntegerCallback("initial_window_size", value)); 2838 return; 2839 } 2840 2841 // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only. 2842 int32 delta_window_size = 2843 static_cast<int32>(value) - stream_initial_send_window_size_; 2844 stream_initial_send_window_size_ = static_cast<int32>(value); 2845 UpdateStreamsSendWindowSize(delta_window_size); 2846 net_log().AddEvent( 2847 NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE, 2848 NetLog::IntegerCallback("delta_window_size", delta_window_size)); 2849 break; 2850 } 2851 } 2852 } 2853 2854 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { 2855 DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2856 for (ActiveStreamMap::iterator it = active_streams_.begin(); 2857 it != active_streams_.end(); ++it) { 2858 it->second.stream->AdjustSendWindowSize(delta_window_size); 2859 } 2860 2861 for (CreatedStreamSet::const_iterator it = created_streams_.begin(); 2862 it != created_streams_.end(); it++) { 2863 (*it)->AdjustSendWindowSize(delta_window_size); 2864 } 2865 } 2866 2867 void SpdySession::SendPrefacePingIfNoneInFlight() { 2868 if (pings_in_flight_ || !enable_ping_based_connection_checking_) 2869 return; 2870 2871 base::TimeTicks now = time_func_(); 2872 // If there is no activity in the session, then send a preface-PING. 2873 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_) 2874 SendPrefacePing(); 2875 } 2876 2877 void SpdySession::SendPrefacePing() { 2878 WritePingFrame(next_ping_id_, false); 2879 } 2880 2881 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, 2882 uint32 delta_window_size, 2883 RequestPriority priority) { 2884 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2885 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2886 if (it != active_streams_.end()) { 2887 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2888 } else { 2889 CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2890 CHECK_EQ(stream_id, kSessionFlowControlStreamId); 2891 } 2892 2893 net_log_.AddEvent( 2894 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME, 2895 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2896 stream_id, delta_window_size)); 2897 2898 DCHECK(buffered_spdy_framer_.get()); 2899 scoped_ptr<SpdyFrame> window_update_frame( 2900 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); 2901 EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass()); 2902 } 2903 2904 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) { 2905 DCHECK(buffered_spdy_framer_.get()); 2906 scoped_ptr<SpdyFrame> ping_frame( 2907 buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack)); 2908 EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass()); 2909 2910 if (net_log().IsLogging()) { 2911 net_log().AddEvent( 2912 NetLog::TYPE_SPDY_SESSION_PING, 2913 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent")); 2914 } 2915 if (!is_ack) { 2916 next_ping_id_ += 2; 2917 ++pings_in_flight_; 2918 PlanToCheckPingStatus(); 2919 last_ping_sent_time_ = time_func_(); 2920 } 2921 } 2922 2923 void SpdySession::PlanToCheckPingStatus() { 2924 if (check_ping_status_pending_) 2925 return; 2926 2927 check_ping_status_pending_ = true; 2928 base::MessageLoop::current()->PostDelayedTask( 2929 FROM_HERE, 2930 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2931 time_func_()), hung_interval_); 2932 } 2933 2934 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) { 2935 CHECK(!in_io_loop_); 2936 2937 // Check if we got a response back for all PINGs we had sent. 2938 if (pings_in_flight_ == 0) { 2939 check_ping_status_pending_ = false; 2940 return; 2941 } 2942 2943 DCHECK(check_ping_status_pending_); 2944 2945 base::TimeTicks now = time_func_(); 2946 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_); 2947 2948 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) { 2949 // Track all failed PING messages in a separate bucket. 2950 RecordPingRTTHistogram(base::TimeDelta::Max()); 2951 DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping."); 2952 return; 2953 } 2954 2955 // Check the status of connection after a delay. 2956 base::MessageLoop::current()->PostDelayedTask( 2957 FROM_HERE, 2958 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2959 now), 2960 delay); 2961 } 2962 2963 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) { 2964 UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration); 2965 } 2966 2967 void SpdySession::RecordProtocolErrorHistogram( 2968 SpdyProtocolErrorDetails details) { 2969 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details, 2970 NUM_SPDY_PROTOCOL_ERROR_DETAILS); 2971 if (EndsWith(host_port_pair().host(), "google.com", false)) { 2972 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details, 2973 NUM_SPDY_PROTOCOL_ERROR_DETAILS); 2974 } 2975 } 2976 2977 void SpdySession::RecordHistograms() { 2978 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession", 2979 streams_initiated_count_, 2980 0, 300, 50); 2981 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession", 2982 streams_pushed_count_, 2983 0, 300, 50); 2984 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession", 2985 streams_pushed_and_claimed_count_, 2986 0, 300, 50); 2987 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession", 2988 streams_abandoned_count_, 2989 0, 300, 50); 2990 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent", 2991 sent_settings_ ? 1 : 0, 2); 2992 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived", 2993 received_settings_ ? 1 : 0, 2); 2994 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession", 2995 stalled_streams_, 2996 0, 300, 50); 2997 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls", 2998 stalled_streams_ > 0 ? 1 : 0, 2); 2999 3000 if (received_settings_) { 3001 // Enumerate the saved settings, and set histograms for it. 3002 const SettingsMap& settings_map = 3003 http_server_properties_->GetSpdySettings(host_port_pair()); 3004 3005 SettingsMap::const_iterator it; 3006 for (it = settings_map.begin(); it != settings_map.end(); ++it) { 3007 const SpdySettingsIds id = it->first; 3008 const uint32 val = it->second.second; 3009 switch (id) { 3010 case SETTINGS_CURRENT_CWND: 3011 // Record several different histograms to see if cwnd converges 3012 // for larger volumes of data being sent. 3013 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd", 3014 val, 1, 200, 100); 3015 if (total_bytes_received_ > 10 * 1024) { 3016 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K", 3017 val, 1, 200, 100); 3018 if (total_bytes_received_ > 25 * 1024) { 3019 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K", 3020 val, 1, 200, 100); 3021 if (total_bytes_received_ > 50 * 1024) { 3022 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K", 3023 val, 1, 200, 100); 3024 if (total_bytes_received_ > 100 * 1024) { 3025 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K", 3026 val, 1, 200, 100); 3027 } 3028 } 3029 } 3030 } 3031 break; 3032 case SETTINGS_ROUND_TRIP_TIME: 3033 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT", 3034 val, 1, 1200, 100); 3035 break; 3036 case SETTINGS_DOWNLOAD_RETRANS_RATE: 3037 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate", 3038 val, 1, 100, 50); 3039 break; 3040 default: 3041 break; 3042 } 3043 } 3044 } 3045 } 3046 3047 void SpdySession::CompleteStreamRequest( 3048 const base::WeakPtr<SpdyStreamRequest>& pending_request) { 3049 // Abort if the request has already been cancelled. 3050 if (!pending_request) 3051 return; 3052 3053 base::WeakPtr<SpdyStream> stream; 3054 int rv = TryCreateStream(pending_request, &stream); 3055 3056 if (rv == OK) { 3057 DCHECK(stream); 3058 pending_request->OnRequestCompleteSuccess(stream); 3059 return; 3060 } 3061 DCHECK(!stream); 3062 3063 if (rv != ERR_IO_PENDING) { 3064 pending_request->OnRequestCompleteFailure(rv); 3065 } 3066 } 3067 3068 SSLClientSocket* SpdySession::GetSSLClientSocket() const { 3069 if (!is_secure_) 3070 return NULL; 3071 SSLClientSocket* ssl_socket = 3072 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 3073 DCHECK(ssl_socket); 3074 return ssl_socket; 3075 } 3076 3077 void SpdySession::OnWriteBufferConsumed( 3078 size_t frame_payload_size, 3079 size_t consume_size, 3080 SpdyBuffer::ConsumeSource consume_source) { 3081 // We can be called with |in_io_loop_| set if a write SpdyBuffer is 3082 // deleted (e.g., a stream is closed due to incoming data). 3083 3084 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3085 3086 if (consume_source == SpdyBuffer::DISCARD) { 3087 // If we're discarding a frame or part of it, increase the send 3088 // window by the number of discarded bytes. (Although if we're 3089 // discarding part of a frame, it's probably because of a write 3090 // error and we'll be tearing down the session soon.) 3091 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); 3092 DCHECK_GT(remaining_payload_bytes, 0u); 3093 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); 3094 } 3095 // For consumed bytes, the send window is increased when we receive 3096 // a WINDOW_UPDATE frame. 3097 } 3098 3099 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { 3100 // We can be called with |in_io_loop_| set if a SpdyBuffer is 3101 // deleted (e.g., a stream is closed due to incoming data). 3102 3103 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3104 DCHECK_GE(delta_window_size, 1); 3105 3106 // Check for overflow. 3107 int32 max_delta_window_size = kint32max - session_send_window_size_; 3108 if (delta_window_size > max_delta_window_size) { 3109 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 3110 DoDrainSession( 3111 ERR_SPDY_PROTOCOL_ERROR, 3112 "Received WINDOW_UPDATE [delta: " + 3113 base::IntToString(delta_window_size) + 3114 "] for session overflows session_send_window_size_ [current: " + 3115 base::IntToString(session_send_window_size_) + "]"); 3116 return; 3117 } 3118 3119 session_send_window_size_ += delta_window_size; 3120 3121 net_log_.AddEvent( 3122 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 3123 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 3124 delta_window_size, session_send_window_size_)); 3125 3126 DCHECK(!IsSendStalled()); 3127 ResumeSendStalledStreams(); 3128 } 3129 3130 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { 3131 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3132 3133 // We only call this method when sending a frame. Therefore, 3134 // |delta_window_size| should be within the valid frame size range. 3135 DCHECK_GE(delta_window_size, 1); 3136 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 3137 3138 // |send_window_size_| should have been at least |delta_window_size| for 3139 // this call to happen. 3140 DCHECK_GE(session_send_window_size_, delta_window_size); 3141 3142 session_send_window_size_ -= delta_window_size; 3143 3144 net_log_.AddEvent( 3145 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 3146 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 3147 -delta_window_size, session_send_window_size_)); 3148 } 3149 3150 void SpdySession::OnReadBufferConsumed( 3151 size_t consume_size, 3152 SpdyBuffer::ConsumeSource consume_source) { 3153 // We can be called with |in_io_loop_| set if a read SpdyBuffer is 3154 // deleted (e.g., discarded by a SpdyReadQueue). 3155 3156 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3157 DCHECK_GE(consume_size, 1u); 3158 DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); 3159 3160 IncreaseRecvWindowSize(static_cast<int32>(consume_size)); 3161 } 3162 3163 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { 3164 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3165 DCHECK_GE(session_unacked_recv_window_bytes_, 0); 3166 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); 3167 DCHECK_GE(delta_window_size, 1); 3168 // Check for overflow. 3169 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); 3170 3171 session_recv_window_size_ += delta_window_size; 3172 net_log_.AddEvent( 3173 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 3174 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 3175 delta_window_size, session_recv_window_size_)); 3176 3177 session_unacked_recv_window_bytes_ += delta_window_size; 3178 if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) { 3179 SendWindowUpdateFrame(kSessionFlowControlStreamId, 3180 session_unacked_recv_window_bytes_, 3181 HIGHEST); 3182 session_unacked_recv_window_bytes_ = 0; 3183 } 3184 } 3185 3186 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { 3187 CHECK(in_io_loop_); 3188 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3189 DCHECK_GE(delta_window_size, 1); 3190 3191 // Since we never decrease the initial receive window size, 3192 // |delta_window_size| should never cause |recv_window_size_| to go 3193 // negative. If we do, the receive window isn't being respected. 3194 if (delta_window_size > session_recv_window_size_) { 3195 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION); 3196 DoDrainSession( 3197 ERR_SPDY_FLOW_CONTROL_ERROR, 3198 "delta_window_size is " + base::IntToString(delta_window_size) + 3199 " in DecreaseRecvWindowSize, which is larger than the receive " + 3200 "window size of " + base::IntToString(session_recv_window_size_)); 3201 return; 3202 } 3203 3204 session_recv_window_size_ -= delta_window_size; 3205 net_log_.AddEvent( 3206 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 3207 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 3208 -delta_window_size, session_recv_window_size_)); 3209 } 3210 3211 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) { 3212 DCHECK(stream.send_stalled_by_flow_control()); 3213 RequestPriority priority = stream.priority(); 3214 CHECK_GE(priority, MINIMUM_PRIORITY); 3215 CHECK_LE(priority, MAXIMUM_PRIORITY); 3216 stream_send_unstall_queue_[priority].push_back(stream.stream_id()); 3217 } 3218 3219 void SpdySession::ResumeSendStalledStreams() { 3220 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3221 3222 // We don't have to worry about new streams being queued, since 3223 // doing so would cause IsSendStalled() to return true. But we do 3224 // have to worry about streams being closed, as well as ourselves 3225 // being closed. 3226 3227 while (!IsSendStalled()) { 3228 size_t old_size = 0; 3229 #if DCHECK_IS_ON 3230 old_size = GetTotalSize(stream_send_unstall_queue_); 3231 #endif 3232 3233 SpdyStreamId stream_id = PopStreamToPossiblyResume(); 3234 if (stream_id == 0) 3235 break; 3236 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 3237 // The stream may actually still be send-stalled after this (due 3238 // to its own send window) but that's okay -- it'll then be 3239 // resumed once its send window increases. 3240 if (it != active_streams_.end()) 3241 it->second.stream->PossiblyResumeIfSendStalled(); 3242 3243 // The size should decrease unless we got send-stalled again. 3244 if (!IsSendStalled()) 3245 DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size); 3246 } 3247 } 3248 3249 SpdyStreamId SpdySession::PopStreamToPossiblyResume() { 3250 for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) { 3251 std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i]; 3252 if (!queue->empty()) { 3253 SpdyStreamId stream_id = queue->front(); 3254 queue->pop_front(); 3255 return stream_id; 3256 } 3257 } 3258 return 0; 3259 } 3260 3261 } // namespace net 3262