1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/spdy/spdy_session.h" 6 7 #include <algorithm> 8 #include <map> 9 10 #include "base/basictypes.h" 11 #include "base/bind.h" 12 #include "base/compiler_specific.h" 13 #include "base/logging.h" 14 #include "base/message_loop/message_loop.h" 15 #include "base/metrics/field_trial.h" 16 #include "base/metrics/histogram.h" 17 #include "base/metrics/sparse_histogram.h" 18 #include "base/metrics/stats_counters.h" 19 #include "base/stl_util.h" 20 #include "base/strings/string_number_conversions.h" 21 #include "base/strings/string_util.h" 22 #include "base/strings/stringprintf.h" 23 #include "base/strings/utf_string_conversions.h" 24 #include "base/time/time.h" 25 #include "base/values.h" 26 #include "crypto/ec_private_key.h" 27 #include "crypto/ec_signature_creator.h" 28 #include "net/base/connection_type_histograms.h" 29 #include "net/base/net_log.h" 30 #include "net/base/net_util.h" 31 #include "net/cert/asn1_util.h" 32 #include "net/http/http_log_util.h" 33 #include "net/http/http_network_session.h" 34 #include "net/http/http_server_properties.h" 35 #include "net/http/http_util.h" 36 #include "net/spdy/spdy_buffer_producer.h" 37 #include "net/spdy/spdy_frame_builder.h" 38 #include "net/spdy/spdy_http_utils.h" 39 #include "net/spdy/spdy_protocol.h" 40 #include "net/spdy/spdy_session_pool.h" 41 #include "net/spdy/spdy_stream.h" 42 #include "net/ssl/server_bound_cert_service.h" 43 #include "net/ssl/ssl_cipher_suite_names.h" 44 #include "net/ssl/ssl_connection_status_flags.h" 45 46 namespace net { 47 48 namespace { 49 50 const int kReadBufferSize = 8 * 1024; 51 const int kDefaultConnectionAtRiskOfLossSeconds = 10; 52 const int kHungIntervalSeconds = 10; 53 54 // Minimum seconds that unclaimed pushed streams will be kept in memory. 55 const int kMinPushedStreamLifetimeSeconds = 300; 56 57 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue( 58 const SpdyHeaderBlock& headers, 59 net::NetLog::LogLevel log_level) { 60 scoped_ptr<base::ListValue> headers_list(new base::ListValue()); 61 for (SpdyHeaderBlock::const_iterator it = headers.begin(); 62 it != headers.end(); ++it) { 63 headers_list->AppendString( 64 it->first + ": " + 65 ElideHeaderValueForNetLog(log_level, it->first, it->second)); 66 } 67 return headers_list.Pass(); 68 } 69 70 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers, 71 bool fin, 72 bool unidirectional, 73 SpdyPriority spdy_priority, 74 SpdyStreamId stream_id, 75 NetLog::LogLevel log_level) { 76 base::DictionaryValue* dict = new base::DictionaryValue(); 77 dict->Set("headers", 78 SpdyHeaderBlockToListValue(*headers, log_level).release()); 79 dict->SetBoolean("fin", fin); 80 dict->SetBoolean("unidirectional", unidirectional); 81 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority)); 82 dict->SetInteger("stream_id", stream_id); 83 return dict; 84 } 85 86 base::Value* NetLogSpdySynStreamReceivedCallback( 87 const SpdyHeaderBlock* headers, 88 bool fin, 89 bool unidirectional, 90 SpdyPriority spdy_priority, 91 SpdyStreamId stream_id, 92 SpdyStreamId associated_stream, 93 NetLog::LogLevel log_level) { 94 base::DictionaryValue* dict = new base::DictionaryValue(); 95 dict->Set("headers", 96 SpdyHeaderBlockToListValue(*headers, log_level).release()); 97 dict->SetBoolean("fin", fin); 98 dict->SetBoolean("unidirectional", unidirectional); 99 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority)); 100 dict->SetInteger("stream_id", stream_id); 101 dict->SetInteger("associated_stream", associated_stream); 102 return dict; 103 } 104 105 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback( 106 const SpdyHeaderBlock* headers, 107 bool fin, 108 SpdyStreamId stream_id, 109 NetLog::LogLevel log_level) { 110 base::DictionaryValue* dict = new base::DictionaryValue(); 111 dict->Set("headers", 112 SpdyHeaderBlockToListValue(*headers, log_level).release()); 113 dict->SetBoolean("fin", fin); 114 dict->SetInteger("stream_id", stream_id); 115 return dict; 116 } 117 118 base::Value* NetLogSpdySessionCloseCallback(int net_error, 119 const std::string* description, 120 NetLog::LogLevel /* log_level */) { 121 base::DictionaryValue* dict = new base::DictionaryValue(); 122 dict->SetInteger("net_error", net_error); 123 dict->SetString("description", *description); 124 return dict; 125 } 126 127 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair, 128 NetLog::LogLevel /* log_level */) { 129 base::DictionaryValue* dict = new base::DictionaryValue(); 130 dict->SetString("host", host_pair->first.ToString()); 131 dict->SetString("proxy", host_pair->second.ToPacString()); 132 return dict; 133 } 134 135 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair, 136 bool clear_persisted, 137 NetLog::LogLevel /* log_level */) { 138 base::DictionaryValue* dict = new base::DictionaryValue(); 139 dict->SetString("host", host_port_pair.ToString()); 140 dict->SetBoolean("clear_persisted", clear_persisted); 141 return dict; 142 } 143 144 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id, 145 SpdySettingsFlags flags, 146 uint32 value, 147 NetLog::LogLevel /* log_level */) { 148 base::DictionaryValue* dict = new base::DictionaryValue(); 149 dict->SetInteger("id", id); 150 dict->SetInteger("flags", flags); 151 dict->SetInteger("value", value); 152 return dict; 153 } 154 155 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings, 156 NetLog::LogLevel /* log_level */) { 157 base::DictionaryValue* dict = new base::DictionaryValue(); 158 base::ListValue* settings_list = new base::ListValue(); 159 for (SettingsMap::const_iterator it = settings->begin(); 160 it != settings->end(); ++it) { 161 const SpdySettingsIds id = it->first; 162 const SpdySettingsFlags flags = it->second.first; 163 const uint32 value = it->second.second; 164 settings_list->Append(new base::StringValue( 165 base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value))); 166 } 167 dict->Set("settings", settings_list); 168 return dict; 169 } 170 171 base::Value* NetLogSpdyWindowUpdateFrameCallback( 172 SpdyStreamId stream_id, 173 uint32 delta, 174 NetLog::LogLevel /* log_level */) { 175 base::DictionaryValue* dict = new base::DictionaryValue(); 176 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 177 dict->SetInteger("delta", delta); 178 return dict; 179 } 180 181 base::Value* NetLogSpdySessionWindowUpdateCallback( 182 int32 delta, 183 int32 window_size, 184 NetLog::LogLevel /* log_level */) { 185 base::DictionaryValue* dict = new base::DictionaryValue(); 186 dict->SetInteger("delta", delta); 187 dict->SetInteger("window_size", window_size); 188 return dict; 189 } 190 191 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id, 192 int size, 193 bool fin, 194 NetLog::LogLevel /* log_level */) { 195 base::DictionaryValue* dict = new base::DictionaryValue(); 196 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 197 dict->SetInteger("size", size); 198 dict->SetBoolean("fin", fin); 199 return dict; 200 } 201 202 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id, 203 int status, 204 const std::string* description, 205 NetLog::LogLevel /* log_level */) { 206 base::DictionaryValue* dict = new base::DictionaryValue(); 207 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 208 dict->SetInteger("status", status); 209 dict->SetString("description", *description); 210 return dict; 211 } 212 213 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id, 214 bool is_ack, 215 const char* type, 216 NetLog::LogLevel /* log_level */) { 217 base::DictionaryValue* dict = new base::DictionaryValue(); 218 dict->SetInteger("unique_id", unique_id); 219 dict->SetString("type", type); 220 dict->SetBoolean("is_ack", is_ack); 221 return dict; 222 } 223 224 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id, 225 int active_streams, 226 int unclaimed_streams, 227 SpdyGoAwayStatus status, 228 NetLog::LogLevel /* log_level */) { 229 base::DictionaryValue* dict = new base::DictionaryValue(); 230 dict->SetInteger("last_accepted_stream_id", 231 static_cast<int>(last_stream_id)); 232 dict->SetInteger("active_streams", active_streams); 233 dict->SetInteger("unclaimed_streams", unclaimed_streams); 234 dict->SetInteger("status", static_cast<int>(status)); 235 return dict; 236 } 237 238 base::Value* NetLogSpdyPushPromiseReceivedCallback( 239 const SpdyHeaderBlock* headers, 240 SpdyStreamId stream_id, 241 SpdyStreamId promised_stream_id, 242 NetLog::LogLevel log_level) { 243 base::DictionaryValue* dict = new base::DictionaryValue(); 244 dict->Set("headers", 245 SpdyHeaderBlockToListValue(*headers, log_level).release()); 246 dict->SetInteger("id", stream_id); 247 dict->SetInteger("promised_stream_id", promised_stream_id); 248 return dict; 249 } 250 251 // Helper function to return the total size of an array of objects 252 // with .size() member functions. 253 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) { 254 size_t total_size = 0; 255 for (size_t i = 0; i < N; ++i) { 256 total_size += arr[i].size(); 257 } 258 return total_size; 259 } 260 261 // Helper class for std:find_if on STL container containing 262 // SpdyStreamRequest weak pointers. 263 class RequestEquals { 264 public: 265 RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request) 266 : request_(request) {} 267 268 bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const { 269 return request_.get() == request.get(); 270 } 271 272 private: 273 const base::WeakPtr<SpdyStreamRequest> request_; 274 }; 275 276 // The maximum number of concurrent streams we will ever create. Even if 277 // the server permits more, we will never exceed this limit. 278 const size_t kMaxConcurrentStreamLimit = 256; 279 280 } // namespace 281 282 SpdyProtocolErrorDetails MapFramerErrorToProtocolError( 283 SpdyFramer::SpdyError err) { 284 switch(err) { 285 case SpdyFramer::SPDY_NO_ERROR: 286 return SPDY_ERROR_NO_ERROR; 287 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME: 288 return SPDY_ERROR_INVALID_CONTROL_FRAME; 289 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE: 290 return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE; 291 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE: 292 return SPDY_ERROR_ZLIB_INIT_FAILURE; 293 case SpdyFramer::SPDY_UNSUPPORTED_VERSION: 294 return SPDY_ERROR_UNSUPPORTED_VERSION; 295 case SpdyFramer::SPDY_DECOMPRESS_FAILURE: 296 return SPDY_ERROR_DECOMPRESS_FAILURE; 297 case SpdyFramer::SPDY_COMPRESS_FAILURE: 298 return SPDY_ERROR_COMPRESS_FAILURE; 299 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT: 300 return SPDY_ERROR_GOAWAY_FRAME_CORRUPT; 301 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT: 302 return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT; 303 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS: 304 return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS; 305 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS: 306 return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS; 307 case SpdyFramer::SPDY_UNEXPECTED_FRAME: 308 return SPDY_ERROR_UNEXPECTED_FRAME; 309 default: 310 NOTREACHED(); 311 return static_cast<SpdyProtocolErrorDetails>(-1); 312 } 313 } 314 315 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) { 316 switch (err) { 317 case SpdyFramer::SPDY_NO_ERROR: 318 return OK; 319 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME: 320 return ERR_SPDY_PROTOCOL_ERROR; 321 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE: 322 return ERR_SPDY_FRAME_SIZE_ERROR; 323 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE: 324 return ERR_SPDY_COMPRESSION_ERROR; 325 case SpdyFramer::SPDY_UNSUPPORTED_VERSION: 326 return ERR_SPDY_PROTOCOL_ERROR; 327 case SpdyFramer::SPDY_DECOMPRESS_FAILURE: 328 return ERR_SPDY_COMPRESSION_ERROR; 329 case SpdyFramer::SPDY_COMPRESS_FAILURE: 330 return ERR_SPDY_COMPRESSION_ERROR; 331 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT: 332 return ERR_SPDY_PROTOCOL_ERROR; 333 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT: 334 return ERR_SPDY_PROTOCOL_ERROR; 335 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS: 336 return ERR_SPDY_PROTOCOL_ERROR; 337 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS: 338 return ERR_SPDY_PROTOCOL_ERROR; 339 case SpdyFramer::SPDY_UNEXPECTED_FRAME: 340 return ERR_SPDY_PROTOCOL_ERROR; 341 default: 342 NOTREACHED(); 343 return ERR_SPDY_PROTOCOL_ERROR; 344 } 345 } 346 347 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError( 348 SpdyRstStreamStatus status) { 349 switch(status) { 350 case RST_STREAM_PROTOCOL_ERROR: 351 return STATUS_CODE_PROTOCOL_ERROR; 352 case RST_STREAM_INVALID_STREAM: 353 return STATUS_CODE_INVALID_STREAM; 354 case RST_STREAM_REFUSED_STREAM: 355 return STATUS_CODE_REFUSED_STREAM; 356 case RST_STREAM_UNSUPPORTED_VERSION: 357 return STATUS_CODE_UNSUPPORTED_VERSION; 358 case RST_STREAM_CANCEL: 359 return STATUS_CODE_CANCEL; 360 case RST_STREAM_INTERNAL_ERROR: 361 return STATUS_CODE_INTERNAL_ERROR; 362 case RST_STREAM_FLOW_CONTROL_ERROR: 363 return STATUS_CODE_FLOW_CONTROL_ERROR; 364 case RST_STREAM_STREAM_IN_USE: 365 return STATUS_CODE_STREAM_IN_USE; 366 case RST_STREAM_STREAM_ALREADY_CLOSED: 367 return STATUS_CODE_STREAM_ALREADY_CLOSED; 368 case RST_STREAM_INVALID_CREDENTIALS: 369 return STATUS_CODE_INVALID_CREDENTIALS; 370 case RST_STREAM_FRAME_SIZE_ERROR: 371 return STATUS_CODE_FRAME_SIZE_ERROR; 372 case RST_STREAM_SETTINGS_TIMEOUT: 373 return STATUS_CODE_SETTINGS_TIMEOUT; 374 case RST_STREAM_CONNECT_ERROR: 375 return STATUS_CODE_CONNECT_ERROR; 376 case RST_STREAM_ENHANCE_YOUR_CALM: 377 return STATUS_CODE_ENHANCE_YOUR_CALM; 378 default: 379 NOTREACHED(); 380 return static_cast<SpdyProtocolErrorDetails>(-1); 381 } 382 } 383 384 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) { 385 switch (err) { 386 case OK: 387 return GOAWAY_NO_ERROR; 388 case ERR_SPDY_PROTOCOL_ERROR: 389 return GOAWAY_PROTOCOL_ERROR; 390 case ERR_SPDY_FLOW_CONTROL_ERROR: 391 return GOAWAY_FLOW_CONTROL_ERROR; 392 case ERR_SPDY_FRAME_SIZE_ERROR: 393 return GOAWAY_FRAME_SIZE_ERROR; 394 case ERR_SPDY_COMPRESSION_ERROR: 395 return GOAWAY_COMPRESSION_ERROR; 396 case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY: 397 return GOAWAY_INADEQUATE_SECURITY; 398 default: 399 return GOAWAY_PROTOCOL_ERROR; 400 } 401 } 402 403 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers, 404 SpdyMajorVersion protocol_version, 405 SpdyHeaderBlock* request_headers, 406 SpdyHeaderBlock* response_headers) { 407 DCHECK(response_headers); 408 DCHECK(request_headers); 409 for (SpdyHeaderBlock::const_iterator it = headers.begin(); 410 it != headers.end(); 411 ++it) { 412 SpdyHeaderBlock* to_insert = response_headers; 413 if (protocol_version == SPDY2) { 414 if (it->first == "url") 415 to_insert = request_headers; 416 } else { 417 const char* host = protocol_version >= SPDY4 ? ":authority" : ":host"; 418 static const char* scheme = ":scheme"; 419 static const char* path = ":path"; 420 if (it->first == host || it->first == scheme || it->first == path) 421 to_insert = request_headers; 422 } 423 to_insert->insert(*it); 424 } 425 } 426 427 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) { 428 Reset(); 429 } 430 431 SpdyStreamRequest::~SpdyStreamRequest() { 432 CancelRequest(); 433 } 434 435 int SpdyStreamRequest::StartRequest( 436 SpdyStreamType type, 437 const base::WeakPtr<SpdySession>& session, 438 const GURL& url, 439 RequestPriority priority, 440 const BoundNetLog& net_log, 441 const CompletionCallback& callback) { 442 DCHECK(session); 443 DCHECK(!session_); 444 DCHECK(!stream_); 445 DCHECK(callback_.is_null()); 446 447 type_ = type; 448 session_ = session; 449 url_ = url; 450 priority_ = priority; 451 net_log_ = net_log; 452 callback_ = callback; 453 454 base::WeakPtr<SpdyStream> stream; 455 int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream); 456 if (rv == OK) { 457 Reset(); 458 stream_ = stream; 459 } 460 return rv; 461 } 462 463 void SpdyStreamRequest::CancelRequest() { 464 if (session_) 465 session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr()); 466 Reset(); 467 // Do this to cancel any pending CompleteStreamRequest() tasks. 468 weak_ptr_factory_.InvalidateWeakPtrs(); 469 } 470 471 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() { 472 DCHECK(!session_); 473 base::WeakPtr<SpdyStream> stream = stream_; 474 DCHECK(stream); 475 Reset(); 476 return stream; 477 } 478 479 void SpdyStreamRequest::OnRequestCompleteSuccess( 480 const base::WeakPtr<SpdyStream>& stream) { 481 DCHECK(session_); 482 DCHECK(!stream_); 483 DCHECK(!callback_.is_null()); 484 CompletionCallback callback = callback_; 485 Reset(); 486 DCHECK(stream); 487 stream_ = stream; 488 callback.Run(OK); 489 } 490 491 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) { 492 DCHECK(session_); 493 DCHECK(!stream_); 494 DCHECK(!callback_.is_null()); 495 CompletionCallback callback = callback_; 496 Reset(); 497 DCHECK_NE(rv, OK); 498 callback.Run(rv); 499 } 500 501 void SpdyStreamRequest::Reset() { 502 type_ = SPDY_BIDIRECTIONAL_STREAM; 503 session_.reset(); 504 stream_.reset(); 505 url_ = GURL(); 506 priority_ = MINIMUM_PRIORITY; 507 net_log_ = BoundNetLog(); 508 callback_.Reset(); 509 } 510 511 SpdySession::ActiveStreamInfo::ActiveStreamInfo() 512 : stream(NULL), 513 waiting_for_syn_reply(false) {} 514 515 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream) 516 : stream(stream), 517 waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) { 518 } 519 520 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {} 521 522 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {} 523 524 SpdySession::PushedStreamInfo::PushedStreamInfo( 525 SpdyStreamId stream_id, 526 base::TimeTicks creation_time) 527 : stream_id(stream_id), 528 creation_time(creation_time) {} 529 530 SpdySession::PushedStreamInfo::~PushedStreamInfo() {} 531 532 SpdySession::SpdySession( 533 const SpdySessionKey& spdy_session_key, 534 const base::WeakPtr<HttpServerProperties>& http_server_properties, 535 bool verify_domain_authentication, 536 bool enable_sending_initial_data, 537 bool enable_compression, 538 bool enable_ping_based_connection_checking, 539 NextProto default_protocol, 540 size_t stream_initial_recv_window_size, 541 size_t initial_max_concurrent_streams, 542 size_t max_concurrent_streams_limit, 543 TimeFunc time_func, 544 const HostPortPair& trusted_spdy_proxy, 545 NetLog* net_log) 546 : in_io_loop_(false), 547 spdy_session_key_(spdy_session_key), 548 pool_(NULL), 549 http_server_properties_(http_server_properties), 550 read_buffer_(new IOBuffer(kReadBufferSize)), 551 stream_hi_water_mark_(kFirstStreamId), 552 in_flight_write_frame_type_(DATA), 553 in_flight_write_frame_size_(0), 554 is_secure_(false), 555 certificate_error_code_(OK), 556 availability_state_(STATE_AVAILABLE), 557 read_state_(READ_STATE_DO_READ), 558 write_state_(WRITE_STATE_IDLE), 559 error_on_close_(OK), 560 max_concurrent_streams_(initial_max_concurrent_streams == 0 561 ? kInitialMaxConcurrentStreams 562 : initial_max_concurrent_streams), 563 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 564 ? kMaxConcurrentStreamLimit 565 : max_concurrent_streams_limit), 566 streams_initiated_count_(0), 567 streams_pushed_count_(0), 568 streams_pushed_and_claimed_count_(0), 569 streams_abandoned_count_(0), 570 total_bytes_received_(0), 571 sent_settings_(false), 572 received_settings_(false), 573 stalled_streams_(0), 574 pings_in_flight_(0), 575 next_ping_id_(1), 576 last_activity_time_(time_func()), 577 last_compressed_frame_len_(0), 578 check_ping_status_pending_(false), 579 send_connection_header_prefix_(false), 580 flow_control_state_(FLOW_CONTROL_NONE), 581 stream_initial_send_window_size_(kSpdyStreamInitialWindowSize), 582 stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 583 ? kDefaultInitialRecvWindowSize 584 : stream_initial_recv_window_size), 585 session_send_window_size_(0), 586 session_recv_window_size_(0), 587 session_unacked_recv_window_bytes_(0), 588 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)), 589 verify_domain_authentication_(verify_domain_authentication), 590 enable_sending_initial_data_(enable_sending_initial_data), 591 enable_compression_(enable_compression), 592 enable_ping_based_connection_checking_( 593 enable_ping_based_connection_checking), 594 protocol_(default_protocol), 595 connection_at_risk_of_loss_time_( 596 base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)), 597 hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)), 598 trusted_spdy_proxy_(trusted_spdy_proxy), 599 time_func_(time_func), 600 weak_factory_(this) { 601 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion); 602 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion); 603 DCHECK(HttpStreamFactory::spdy_enabled()); 604 net_log_.BeginEvent( 605 NetLog::TYPE_SPDY_SESSION, 606 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair())); 607 next_unclaimed_push_stream_sweep_time_ = time_func_() + 608 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 609 // TODO(mbelshe): consider randomization of the stream_hi_water_mark. 610 } 611 612 SpdySession::~SpdySession() { 613 CHECK(!in_io_loop_); 614 DcheckDraining(); 615 616 // TODO(akalin): Check connection->is_initialized() instead. This 617 // requires re-working CreateFakeSpdySession(), though. 618 DCHECK(connection_->socket()); 619 // With SPDY we can't recycle sockets. 620 connection_->socket()->Disconnect(); 621 622 RecordHistograms(); 623 624 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION); 625 } 626 627 void SpdySession::InitializeWithSocket( 628 scoped_ptr<ClientSocketHandle> connection, 629 SpdySessionPool* pool, 630 bool is_secure, 631 int certificate_error_code) { 632 CHECK(!in_io_loop_); 633 DCHECK_EQ(availability_state_, STATE_AVAILABLE); 634 DCHECK_EQ(read_state_, READ_STATE_DO_READ); 635 DCHECK_EQ(write_state_, WRITE_STATE_IDLE); 636 DCHECK(!connection_); 637 638 DCHECK(certificate_error_code == OK || 639 certificate_error_code < ERR_IO_PENDING); 640 // TODO(akalin): Check connection->is_initialized() instead. This 641 // requires re-working CreateFakeSpdySession(), though. 642 DCHECK(connection->socket()); 643 644 base::StatsCounter spdy_sessions("spdy.sessions"); 645 spdy_sessions.Increment(); 646 647 connection_ = connection.Pass(); 648 is_secure_ = is_secure; 649 certificate_error_code_ = certificate_error_code; 650 651 NextProto protocol_negotiated = 652 connection_->socket()->GetNegotiatedProtocol(); 653 if (protocol_negotiated != kProtoUnknown) { 654 protocol_ = protocol_negotiated; 655 } 656 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion); 657 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion); 658 659 if (protocol_ == kProtoSPDY4) 660 send_connection_header_prefix_ = true; 661 662 if (protocol_ >= kProtoSPDY31) { 663 flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION; 664 session_send_window_size_ = kSpdySessionInitialWindowSize; 665 session_recv_window_size_ = kSpdySessionInitialWindowSize; 666 } else if (protocol_ >= kProtoSPDY3) { 667 flow_control_state_ = FLOW_CONTROL_STREAM; 668 } else { 669 flow_control_state_ = FLOW_CONTROL_NONE; 670 } 671 672 buffered_spdy_framer_.reset( 673 new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_), 674 enable_compression_)); 675 buffered_spdy_framer_->set_visitor(this); 676 buffered_spdy_framer_->set_debug_visitor(this); 677 UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion); 678 #if defined(SPDY_PROXY_AUTH_ORIGIN) 679 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy", 680 host_port_pair().Equals(HostPortPair::FromURL( 681 GURL(SPDY_PROXY_AUTH_ORIGIN)))); 682 #endif 683 684 net_log_.AddEvent( 685 NetLog::TYPE_SPDY_SESSION_INITIALIZED, 686 connection_->socket()->NetLog().source().ToEventParametersCallback()); 687 688 DCHECK_EQ(availability_state_, STATE_AVAILABLE); 689 connection_->AddHigherLayeredPool(this); 690 if (enable_sending_initial_data_) 691 SendInitialData(); 692 pool_ = pool; 693 694 // Bootstrap the read loop. 695 base::MessageLoop::current()->PostTask( 696 FROM_HERE, 697 base::Bind(&SpdySession::PumpReadLoop, 698 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 699 } 700 701 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { 702 if (!verify_domain_authentication_) 703 return true; 704 705 if (availability_state_ == STATE_DRAINING) 706 return false; 707 708 SSLInfo ssl_info; 709 bool was_npn_negotiated; 710 NextProto protocol_negotiated = kProtoUnknown; 711 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated)) 712 return true; // This is not a secure session, so all domains are okay. 713 714 // Disable pooling for secure sessions. 715 // TODO(rch): re-enable this. 716 return false; 717 #if 0 718 bool unused = false; 719 return 720 !ssl_info.client_cert_sent && 721 (!ssl_info.channel_id_sent || 722 (ServerBoundCertService::GetDomainForHost(domain) == 723 ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) && 724 ssl_info.cert->VerifyNameMatch(domain, &unused); 725 #endif 726 } 727 728 int SpdySession::GetPushStream( 729 const GURL& url, 730 base::WeakPtr<SpdyStream>* stream, 731 const BoundNetLog& stream_net_log) { 732 CHECK(!in_io_loop_); 733 734 stream->reset(); 735 736 if (availability_state_ == STATE_DRAINING) 737 return ERR_CONNECTION_CLOSED; 738 739 Error err = TryAccessStream(url); 740 if (err != OK) 741 return err; 742 743 *stream = GetActivePushStream(url); 744 if (*stream) { 745 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_); 746 streams_pushed_and_claimed_count_++; 747 } 748 return OK; 749 } 750 751 // {,Try}CreateStream() and TryAccessStream() can be called with 752 // |in_io_loop_| set if a stream is being created in response to 753 // another being closed due to received data. 754 755 Error SpdySession::TryAccessStream(const GURL& url) { 756 if (is_secure_ && certificate_error_code_ != OK && 757 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 758 RecordProtocolErrorHistogram( 759 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION); 760 DoDrainSession( 761 static_cast<Error>(certificate_error_code_), 762 "Tried to get SPDY stream for secure content over an unauthenticated " 763 "session."); 764 return ERR_SPDY_PROTOCOL_ERROR; 765 } 766 return OK; 767 } 768 769 int SpdySession::TryCreateStream( 770 const base::WeakPtr<SpdyStreamRequest>& request, 771 base::WeakPtr<SpdyStream>* stream) { 772 DCHECK(request); 773 774 if (availability_state_ == STATE_GOING_AWAY) 775 return ERR_FAILED; 776 777 if (availability_state_ == STATE_DRAINING) 778 return ERR_CONNECTION_CLOSED; 779 780 Error err = TryAccessStream(request->url()); 781 if (err != OK) 782 return err; 783 784 if (!max_concurrent_streams_ || 785 (active_streams_.size() + created_streams_.size() < 786 max_concurrent_streams_)) { 787 return CreateStream(*request, stream); 788 } 789 790 stalled_streams_++; 791 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS); 792 RequestPriority priority = request->priority(); 793 CHECK_GE(priority, MINIMUM_PRIORITY); 794 CHECK_LE(priority, MAXIMUM_PRIORITY); 795 pending_create_stream_queues_[priority].push_back(request); 796 return ERR_IO_PENDING; 797 } 798 799 int SpdySession::CreateStream(const SpdyStreamRequest& request, 800 base::WeakPtr<SpdyStream>* stream) { 801 DCHECK_GE(request.priority(), MINIMUM_PRIORITY); 802 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY); 803 804 if (availability_state_ == STATE_GOING_AWAY) 805 return ERR_FAILED; 806 807 if (availability_state_ == STATE_DRAINING) 808 return ERR_CONNECTION_CLOSED; 809 810 Error err = TryAccessStream(request.url()); 811 if (err != OK) { 812 // This should have been caught in TryCreateStream(). 813 NOTREACHED(); 814 return err; 815 } 816 817 DCHECK(connection_->socket()); 818 DCHECK(connection_->socket()->IsConnected()); 819 if (connection_->socket()) { 820 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected", 821 connection_->socket()->IsConnected()); 822 if (!connection_->socket()->IsConnected()) { 823 DoDrainSession( 824 ERR_CONNECTION_CLOSED, 825 "Tried to create SPDY stream for a closed socket connection."); 826 return ERR_CONNECTION_CLOSED; 827 } 828 } 829 830 scoped_ptr<SpdyStream> new_stream( 831 new SpdyStream(request.type(), GetWeakPtr(), request.url(), 832 request.priority(), 833 stream_initial_send_window_size_, 834 stream_initial_recv_window_size_, 835 request.net_log())); 836 *stream = new_stream->GetWeakPtr(); 837 InsertCreatedStream(new_stream.Pass()); 838 839 UMA_HISTOGRAM_CUSTOM_COUNTS( 840 "Net.SpdyPriorityCount", 841 static_cast<int>(request.priority()), 0, 10, 11); 842 843 return OK; 844 } 845 846 void SpdySession::CancelStreamRequest( 847 const base::WeakPtr<SpdyStreamRequest>& request) { 848 DCHECK(request); 849 RequestPriority priority = request->priority(); 850 CHECK_GE(priority, MINIMUM_PRIORITY); 851 CHECK_LE(priority, MAXIMUM_PRIORITY); 852 853 #if DCHECK_IS_ON 854 // |request| should not be in a queue not matching its priority. 855 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { 856 if (priority == i) 857 continue; 858 PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i]; 859 DCHECK(std::find_if(queue->begin(), 860 queue->end(), 861 RequestEquals(request)) == queue->end()); 862 } 863 #endif 864 865 PendingStreamRequestQueue* queue = 866 &pending_create_stream_queues_[priority]; 867 // Remove |request| from |queue| while preserving the order of the 868 // other elements. 869 PendingStreamRequestQueue::iterator it = 870 std::find_if(queue->begin(), queue->end(), RequestEquals(request)); 871 // The request may already be removed if there's a 872 // CompleteStreamRequest() in flight. 873 if (it != queue->end()) { 874 it = queue->erase(it); 875 // |request| should be in the queue at most once, and if it is 876 // present, should not be pending completion. 877 DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) == 878 queue->end()); 879 } 880 } 881 882 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() { 883 for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) { 884 if (pending_create_stream_queues_[j].empty()) 885 continue; 886 887 base::WeakPtr<SpdyStreamRequest> pending_request = 888 pending_create_stream_queues_[j].front(); 889 DCHECK(pending_request); 890 pending_create_stream_queues_[j].pop_front(); 891 return pending_request; 892 } 893 return base::WeakPtr<SpdyStreamRequest>(); 894 } 895 896 void SpdySession::ProcessPendingStreamRequests() { 897 // Like |max_concurrent_streams_|, 0 means infinite for 898 // |max_requests_to_process|. 899 size_t max_requests_to_process = 0; 900 if (max_concurrent_streams_ != 0) { 901 max_requests_to_process = 902 max_concurrent_streams_ - 903 (active_streams_.size() + created_streams_.size()); 904 } 905 for (size_t i = 0; 906 max_requests_to_process == 0 || i < max_requests_to_process; ++i) { 907 base::WeakPtr<SpdyStreamRequest> pending_request = 908 GetNextPendingStreamRequest(); 909 if (!pending_request) 910 break; 911 912 // Note that this post can race with other stream creations, and it's 913 // possible that the un-stalled stream will be stalled again if it loses. 914 // TODO(jgraettinger): Provide stronger ordering guarantees. 915 base::MessageLoop::current()->PostTask( 916 FROM_HERE, 917 base::Bind(&SpdySession::CompleteStreamRequest, 918 weak_factory_.GetWeakPtr(), 919 pending_request)); 920 } 921 } 922 923 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) { 924 pooled_aliases_.insert(alias_key); 925 } 926 927 SpdyMajorVersion SpdySession::GetProtocolVersion() const { 928 DCHECK(buffered_spdy_framer_.get()); 929 return buffered_spdy_framer_->protocol_version(); 930 } 931 932 bool SpdySession::HasAcceptableTransportSecurity() const { 933 // If we're not even using TLS, we have no standards to meet. 934 if (!is_secure_) { 935 return true; 936 } 937 938 // We don't enforce transport security standards for older SPDY versions. 939 if (GetProtocolVersion() < SPDY4) { 940 return true; 941 } 942 943 SSLInfo ssl_info; 944 CHECK(connection_->socket()->GetSSLInfo(&ssl_info)); 945 946 // HTTP/2 requires TLS 1.2+ 947 if (SSLConnectionStatusToVersion(ssl_info.connection_status) < 948 SSL_CONNECTION_VERSION_TLS1_2) { 949 return false; 950 } 951 952 if (!IsSecureTLSCipherSuite( 953 SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) { 954 return false; 955 } 956 957 return true; 958 } 959 960 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() { 961 return weak_factory_.GetWeakPtr(); 962 } 963 964 bool SpdySession::CloseOneIdleConnection() { 965 CHECK(!in_io_loop_); 966 DCHECK(pool_); 967 if (active_streams_.empty()) { 968 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection."); 969 } 970 // Return false as the socket wasn't immediately closed. 971 return false; 972 } 973 974 void SpdySession::EnqueueStreamWrite( 975 const base::WeakPtr<SpdyStream>& stream, 976 SpdyFrameType frame_type, 977 scoped_ptr<SpdyBufferProducer> producer) { 978 DCHECK(frame_type == HEADERS || 979 frame_type == DATA || 980 frame_type == CREDENTIAL || 981 frame_type == SYN_STREAM); 982 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream); 983 } 984 985 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( 986 SpdyStreamId stream_id, 987 RequestPriority priority, 988 SpdyControlFlags flags, 989 const SpdyHeaderBlock& headers) { 990 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 991 CHECK(it != active_streams_.end()); 992 CHECK_EQ(it->second.stream->stream_id(), stream_id); 993 994 SendPrefacePingIfNoneInFlight(); 995 996 DCHECK(buffered_spdy_framer_.get()); 997 SpdyPriority spdy_priority = 998 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()); 999 scoped_ptr<SpdyFrame> syn_frame( 1000 buffered_spdy_framer_->CreateSynStream(stream_id, 0, spdy_priority, flags, 1001 &headers)); 1002 1003 base::StatsCounter spdy_requests("spdy.requests"); 1004 spdy_requests.Increment(); 1005 streams_initiated_count_++; 1006 1007 if (net_log().IsLogging()) { 1008 net_log().AddEvent( 1009 NetLog::TYPE_SPDY_SESSION_SYN_STREAM, 1010 base::Bind(&NetLogSpdySynStreamSentCallback, &headers, 1011 (flags & CONTROL_FLAG_FIN) != 0, 1012 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0, 1013 spdy_priority, 1014 stream_id)); 1015 } 1016 1017 return syn_frame.Pass(); 1018 } 1019 1020 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, 1021 IOBuffer* data, 1022 int len, 1023 SpdyDataFlags flags) { 1024 if (availability_state_ == STATE_DRAINING) { 1025 return scoped_ptr<SpdyBuffer>(); 1026 } 1027 1028 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 1029 CHECK(it != active_streams_.end()); 1030 SpdyStream* stream = it->second.stream; 1031 CHECK_EQ(stream->stream_id(), stream_id); 1032 1033 if (len < 0) { 1034 NOTREACHED(); 1035 return scoped_ptr<SpdyBuffer>(); 1036 } 1037 1038 int effective_len = std::min(len, kMaxSpdyFrameChunkSize); 1039 1040 bool send_stalled_by_stream = 1041 (flow_control_state_ >= FLOW_CONTROL_STREAM) && 1042 (stream->send_window_size() <= 0); 1043 bool send_stalled_by_session = IsSendStalled(); 1044 1045 // NOTE: There's an enum of the same name in histograms.xml. 1046 enum SpdyFrameFlowControlState { 1047 SEND_NOT_STALLED, 1048 SEND_STALLED_BY_STREAM, 1049 SEND_STALLED_BY_SESSION, 1050 SEND_STALLED_BY_STREAM_AND_SESSION, 1051 }; 1052 1053 SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED; 1054 if (send_stalled_by_stream) { 1055 if (send_stalled_by_session) { 1056 frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION; 1057 } else { 1058 frame_flow_control_state = SEND_STALLED_BY_STREAM; 1059 } 1060 } else if (send_stalled_by_session) { 1061 frame_flow_control_state = SEND_STALLED_BY_SESSION; 1062 } 1063 1064 if (flow_control_state_ == FLOW_CONTROL_STREAM) { 1065 UMA_HISTOGRAM_ENUMERATION( 1066 "Net.SpdyFrameStreamFlowControlState", 1067 frame_flow_control_state, 1068 SEND_STALLED_BY_STREAM + 1); 1069 } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 1070 UMA_HISTOGRAM_ENUMERATION( 1071 "Net.SpdyFrameStreamAndSessionFlowControlState", 1072 frame_flow_control_state, 1073 SEND_STALLED_BY_STREAM_AND_SESSION + 1); 1074 } 1075 1076 // Obey send window size of the stream if stream flow control is 1077 // enabled. 1078 if (flow_control_state_ >= FLOW_CONTROL_STREAM) { 1079 if (send_stalled_by_stream) { 1080 stream->set_send_stalled_by_flow_control(true); 1081 // Even though we're currently stalled only by the stream, we 1082 // might end up being stalled by the session also. 1083 QueueSendStalledStream(*stream); 1084 net_log().AddEvent( 1085 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW, 1086 NetLog::IntegerCallback("stream_id", stream_id)); 1087 return scoped_ptr<SpdyBuffer>(); 1088 } 1089 1090 effective_len = std::min(effective_len, stream->send_window_size()); 1091 } 1092 1093 // Obey send window size of the session if session flow control is 1094 // enabled. 1095 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 1096 if (send_stalled_by_session) { 1097 stream->set_send_stalled_by_flow_control(true); 1098 QueueSendStalledStream(*stream); 1099 net_log().AddEvent( 1100 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW, 1101 NetLog::IntegerCallback("stream_id", stream_id)); 1102 return scoped_ptr<SpdyBuffer>(); 1103 } 1104 1105 effective_len = std::min(effective_len, session_send_window_size_); 1106 } 1107 1108 DCHECK_GE(effective_len, 0); 1109 1110 // Clear FIN flag if only some of the data will be in the data 1111 // frame. 1112 if (effective_len < len) 1113 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN); 1114 1115 if (net_log().IsLogging()) { 1116 net_log().AddEvent( 1117 NetLog::TYPE_SPDY_SESSION_SEND_DATA, 1118 base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len, 1119 (flags & DATA_FLAG_FIN) != 0)); 1120 } 1121 1122 // Send PrefacePing for DATA_FRAMEs with nonzero payload size. 1123 if (effective_len > 0) 1124 SendPrefacePingIfNoneInFlight(); 1125 1126 // TODO(mbelshe): reduce memory copies here. 1127 DCHECK(buffered_spdy_framer_.get()); 1128 scoped_ptr<SpdyFrame> frame( 1129 buffered_spdy_framer_->CreateDataFrame( 1130 stream_id, data->data(), 1131 static_cast<uint32>(effective_len), flags)); 1132 1133 scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass())); 1134 1135 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 1136 DecreaseSendWindowSize(static_cast<int32>(effective_len)); 1137 data_buffer->AddConsumeCallback( 1138 base::Bind(&SpdySession::OnWriteBufferConsumed, 1139 weak_factory_.GetWeakPtr(), 1140 static_cast<size_t>(effective_len))); 1141 } 1142 1143 return data_buffer.Pass(); 1144 } 1145 1146 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) { 1147 DCHECK_NE(stream_id, 0u); 1148 1149 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1150 if (it == active_streams_.end()) { 1151 NOTREACHED(); 1152 return; 1153 } 1154 1155 CloseActiveStreamIterator(it, status); 1156 } 1157 1158 void SpdySession::CloseCreatedStream( 1159 const base::WeakPtr<SpdyStream>& stream, int status) { 1160 DCHECK_EQ(stream->stream_id(), 0u); 1161 1162 CreatedStreamSet::iterator it = created_streams_.find(stream.get()); 1163 if (it == created_streams_.end()) { 1164 NOTREACHED(); 1165 return; 1166 } 1167 1168 CloseCreatedStreamIterator(it, status); 1169 } 1170 1171 void SpdySession::ResetStream(SpdyStreamId stream_id, 1172 SpdyRstStreamStatus status, 1173 const std::string& description) { 1174 DCHECK_NE(stream_id, 0u); 1175 1176 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1177 if (it == active_streams_.end()) { 1178 NOTREACHED(); 1179 return; 1180 } 1181 1182 ResetStreamIterator(it, status, description); 1183 } 1184 1185 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const { 1186 return ContainsKey(active_streams_, stream_id); 1187 } 1188 1189 LoadState SpdySession::GetLoadState() const { 1190 // Just report that we're idle since the session could be doing 1191 // many things concurrently. 1192 return LOAD_STATE_IDLE; 1193 } 1194 1195 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it, 1196 int status) { 1197 // TODO(mbelshe): We should send a RST_STREAM control frame here 1198 // so that the server can cancel a large send. 1199 1200 scoped_ptr<SpdyStream> owned_stream(it->second.stream); 1201 active_streams_.erase(it); 1202 1203 // TODO(akalin): When SpdyStream was ref-counted (and 1204 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this 1205 // was only done when status was not OK. This meant that pushed 1206 // streams can still be claimed after they're closed. This is 1207 // probably something that we still want to support, although server 1208 // push is hardly used. Write tests for this and fix this. (See 1209 // http://crbug.com/261712 .) 1210 if (owned_stream->type() == SPDY_PUSH_STREAM) 1211 unclaimed_pushed_streams_.erase(owned_stream->url()); 1212 1213 DeleteStream(owned_stream.Pass(), status); 1214 MaybeFinishGoingAway(); 1215 1216 // If there are no active streams and the socket pool is stalled, close the 1217 // session to free up a socket slot. 1218 if (active_streams_.empty() && connection_->IsPoolStalled()) { 1219 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection."); 1220 } 1221 } 1222 1223 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it, 1224 int status) { 1225 scoped_ptr<SpdyStream> owned_stream(*it); 1226 created_streams_.erase(it); 1227 DeleteStream(owned_stream.Pass(), status); 1228 } 1229 1230 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it, 1231 SpdyRstStreamStatus status, 1232 const std::string& description) { 1233 // Send the RST_STREAM frame first as CloseActiveStreamIterator() 1234 // may close us. 1235 SpdyStreamId stream_id = it->first; 1236 RequestPriority priority = it->second.stream->priority(); 1237 EnqueueResetStreamFrame(stream_id, priority, status, description); 1238 1239 // Removes any pending writes for the stream except for possibly an 1240 // in-flight one. 1241 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 1242 } 1243 1244 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id, 1245 RequestPriority priority, 1246 SpdyRstStreamStatus status, 1247 const std::string& description) { 1248 DCHECK_NE(stream_id, 0u); 1249 1250 net_log().AddEvent( 1251 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM, 1252 base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description)); 1253 1254 DCHECK(buffered_spdy_framer_.get()); 1255 scoped_ptr<SpdyFrame> rst_frame( 1256 buffered_spdy_framer_->CreateRstStream(stream_id, status)); 1257 1258 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass()); 1259 RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status)); 1260 } 1261 1262 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) { 1263 CHECK(!in_io_loop_); 1264 if (availability_state_ == STATE_DRAINING) { 1265 return; 1266 } 1267 ignore_result(DoReadLoop(expected_read_state, result)); 1268 } 1269 1270 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) { 1271 CHECK(!in_io_loop_); 1272 CHECK_EQ(read_state_, expected_read_state); 1273 1274 in_io_loop_ = true; 1275 1276 int bytes_read_without_yielding = 0; 1277 1278 // Loop until the session is draining, the read becomes blocked, or 1279 // the read limit is exceeded. 1280 while (true) { 1281 switch (read_state_) { 1282 case READ_STATE_DO_READ: 1283 CHECK_EQ(result, OK); 1284 result = DoRead(); 1285 break; 1286 case READ_STATE_DO_READ_COMPLETE: 1287 if (result > 0) 1288 bytes_read_without_yielding += result; 1289 result = DoReadComplete(result); 1290 break; 1291 default: 1292 NOTREACHED() << "read_state_: " << read_state_; 1293 break; 1294 } 1295 1296 if (availability_state_ == STATE_DRAINING) 1297 break; 1298 1299 if (result == ERR_IO_PENDING) 1300 break; 1301 1302 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) { 1303 read_state_ = READ_STATE_DO_READ; 1304 base::MessageLoop::current()->PostTask( 1305 FROM_HERE, 1306 base::Bind(&SpdySession::PumpReadLoop, 1307 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK)); 1308 result = ERR_IO_PENDING; 1309 break; 1310 } 1311 } 1312 1313 CHECK(in_io_loop_); 1314 in_io_loop_ = false; 1315 1316 return result; 1317 } 1318 1319 int SpdySession::DoRead() { 1320 CHECK(in_io_loop_); 1321 1322 CHECK(connection_); 1323 CHECK(connection_->socket()); 1324 read_state_ = READ_STATE_DO_READ_COMPLETE; 1325 return connection_->socket()->Read( 1326 read_buffer_.get(), 1327 kReadBufferSize, 1328 base::Bind(&SpdySession::PumpReadLoop, 1329 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE)); 1330 } 1331 1332 int SpdySession::DoReadComplete(int result) { 1333 CHECK(in_io_loop_); 1334 1335 // Parse a frame. For now this code requires that the frame fit into our 1336 // buffer (kReadBufferSize). 1337 // TODO(mbelshe): support arbitrarily large frames! 1338 1339 if (result == 0) { 1340 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF", 1341 total_bytes_received_, 1, 100000000, 50); 1342 DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed"); 1343 1344 return ERR_CONNECTION_CLOSED; 1345 } 1346 1347 if (result < 0) { 1348 DoDrainSession(static_cast<Error>(result), "result is < 0."); 1349 return result; 1350 } 1351 CHECK_LE(result, kReadBufferSize); 1352 total_bytes_received_ += result; 1353 1354 last_activity_time_ = time_func_(); 1355 1356 DCHECK(buffered_spdy_framer_.get()); 1357 char* data = read_buffer_->data(); 1358 while (result > 0) { 1359 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result); 1360 result -= bytes_processed; 1361 data += bytes_processed; 1362 1363 if (availability_state_ == STATE_DRAINING) { 1364 return ERR_CONNECTION_CLOSED; 1365 } 1366 1367 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR); 1368 } 1369 1370 read_state_ = READ_STATE_DO_READ; 1371 return OK; 1372 } 1373 1374 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) { 1375 CHECK(!in_io_loop_); 1376 DCHECK_EQ(write_state_, expected_write_state); 1377 1378 DoWriteLoop(expected_write_state, result); 1379 1380 if (availability_state_ == STATE_DRAINING && !in_flight_write_ && 1381 write_queue_.IsEmpty()) { 1382 pool_->RemoveUnavailableSession(GetWeakPtr()); // Destroys |this|. 1383 return; 1384 } 1385 } 1386 1387 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) { 1388 CHECK(!in_io_loop_); 1389 DCHECK_NE(write_state_, WRITE_STATE_IDLE); 1390 DCHECK_EQ(write_state_, expected_write_state); 1391 1392 in_io_loop_ = true; 1393 1394 // Loop until the session is closed or the write becomes blocked. 1395 while (true) { 1396 switch (write_state_) { 1397 case WRITE_STATE_DO_WRITE: 1398 DCHECK_EQ(result, OK); 1399 result = DoWrite(); 1400 break; 1401 case WRITE_STATE_DO_WRITE_COMPLETE: 1402 result = DoWriteComplete(result); 1403 break; 1404 case WRITE_STATE_IDLE: 1405 default: 1406 NOTREACHED() << "write_state_: " << write_state_; 1407 break; 1408 } 1409 1410 if (write_state_ == WRITE_STATE_IDLE) { 1411 DCHECK_EQ(result, ERR_IO_PENDING); 1412 break; 1413 } 1414 1415 if (result == ERR_IO_PENDING) 1416 break; 1417 } 1418 1419 CHECK(in_io_loop_); 1420 in_io_loop_ = false; 1421 1422 return result; 1423 } 1424 1425 int SpdySession::DoWrite() { 1426 CHECK(in_io_loop_); 1427 1428 DCHECK(buffered_spdy_framer_); 1429 if (in_flight_write_) { 1430 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1431 } else { 1432 // Grab the next frame to send. 1433 SpdyFrameType frame_type = DATA; 1434 scoped_ptr<SpdyBufferProducer> producer; 1435 base::WeakPtr<SpdyStream> stream; 1436 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) { 1437 write_state_ = WRITE_STATE_IDLE; 1438 return ERR_IO_PENDING; 1439 } 1440 1441 if (stream.get()) 1442 CHECK(!stream->IsClosed()); 1443 1444 // Activate the stream only when sending the SYN_STREAM frame to 1445 // guarantee monotonically-increasing stream IDs. 1446 if (frame_type == SYN_STREAM) { 1447 CHECK(stream.get()); 1448 CHECK_EQ(stream->stream_id(), 0u); 1449 scoped_ptr<SpdyStream> owned_stream = 1450 ActivateCreatedStream(stream.get()); 1451 InsertActivatedStream(owned_stream.Pass()); 1452 1453 if (stream_hi_water_mark_ > kLastStreamId) { 1454 CHECK_EQ(stream->stream_id(), kLastStreamId); 1455 // We've exhausted the stream ID space, and no new streams may be 1456 // created after this one. 1457 MakeUnavailable(); 1458 StartGoingAway(kLastStreamId, ERR_ABORTED); 1459 } 1460 } 1461 1462 in_flight_write_ = producer->ProduceBuffer(); 1463 if (!in_flight_write_) { 1464 NOTREACHED(); 1465 return ERR_UNEXPECTED; 1466 } 1467 in_flight_write_frame_type_ = frame_type; 1468 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); 1469 DCHECK_GE(in_flight_write_frame_size_, 1470 buffered_spdy_framer_->GetFrameMinimumSize()); 1471 in_flight_write_stream_ = stream; 1472 } 1473 1474 write_state_ = WRITE_STATE_DO_WRITE_COMPLETE; 1475 1476 // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems 1477 // with Socket implementations that don't store their IOBuffer 1478 // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345). 1479 scoped_refptr<IOBuffer> write_io_buffer = 1480 in_flight_write_->GetIOBufferForRemainingData(); 1481 return connection_->socket()->Write( 1482 write_io_buffer.get(), 1483 in_flight_write_->GetRemainingSize(), 1484 base::Bind(&SpdySession::PumpWriteLoop, 1485 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE)); 1486 } 1487 1488 int SpdySession::DoWriteComplete(int result) { 1489 CHECK(in_io_loop_); 1490 DCHECK_NE(result, ERR_IO_PENDING); 1491 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); 1492 1493 last_activity_time_ = time_func_(); 1494 1495 if (result < 0) { 1496 DCHECK_NE(result, ERR_IO_PENDING); 1497 in_flight_write_.reset(); 1498 in_flight_write_frame_type_ = DATA; 1499 in_flight_write_frame_size_ = 0; 1500 in_flight_write_stream_.reset(); 1501 write_state_ = WRITE_STATE_DO_WRITE; 1502 DoDrainSession(static_cast<Error>(result), "Write error"); 1503 return OK; 1504 } 1505 1506 // It should not be possible to have written more bytes than our 1507 // in_flight_write_. 1508 DCHECK_LE(static_cast<size_t>(result), 1509 in_flight_write_->GetRemainingSize()); 1510 1511 if (result > 0) { 1512 in_flight_write_->Consume(static_cast<size_t>(result)); 1513 1514 // We only notify the stream when we've fully written the pending frame. 1515 if (in_flight_write_->GetRemainingSize() == 0) { 1516 // It is possible that the stream was cancelled while we were 1517 // writing to the socket. 1518 if (in_flight_write_stream_.get()) { 1519 DCHECK_GT(in_flight_write_frame_size_, 0u); 1520 in_flight_write_stream_->OnFrameWriteComplete( 1521 in_flight_write_frame_type_, 1522 in_flight_write_frame_size_); 1523 } 1524 1525 // Cleanup the write which just completed. 1526 in_flight_write_.reset(); 1527 in_flight_write_frame_type_ = DATA; 1528 in_flight_write_frame_size_ = 0; 1529 in_flight_write_stream_.reset(); 1530 } 1531 } 1532 1533 write_state_ = WRITE_STATE_DO_WRITE; 1534 return OK; 1535 } 1536 1537 void SpdySession::DcheckGoingAway() const { 1538 #if DCHECK_IS_ON 1539 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1540 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) { 1541 DCHECK(pending_create_stream_queues_[i].empty()); 1542 } 1543 DCHECK(created_streams_.empty()); 1544 #endif 1545 } 1546 1547 void SpdySession::DcheckDraining() const { 1548 DcheckGoingAway(); 1549 DCHECK_EQ(availability_state_, STATE_DRAINING); 1550 DCHECK(active_streams_.empty()); 1551 DCHECK(unclaimed_pushed_streams_.empty()); 1552 } 1553 1554 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id, 1555 Error status) { 1556 DCHECK_GE(availability_state_, STATE_GOING_AWAY); 1557 1558 // The loops below are carefully written to avoid reentrancy problems. 1559 1560 while (true) { 1561 size_t old_size = GetTotalSize(pending_create_stream_queues_); 1562 base::WeakPtr<SpdyStreamRequest> pending_request = 1563 GetNextPendingStreamRequest(); 1564 if (!pending_request) 1565 break; 1566 // No new stream requests should be added while the session is 1567 // going away. 1568 DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_)); 1569 pending_request->OnRequestCompleteFailure(ERR_ABORTED); 1570 } 1571 1572 while (true) { 1573 size_t old_size = active_streams_.size(); 1574 ActiveStreamMap::iterator it = 1575 active_streams_.lower_bound(last_good_stream_id + 1); 1576 if (it == active_streams_.end()) 1577 break; 1578 LogAbandonedActiveStream(it, status); 1579 CloseActiveStreamIterator(it, status); 1580 // No new streams should be activated while the session is going 1581 // away. 1582 DCHECK_GT(old_size, active_streams_.size()); 1583 } 1584 1585 while (!created_streams_.empty()) { 1586 size_t old_size = created_streams_.size(); 1587 CreatedStreamSet::iterator it = created_streams_.begin(); 1588 LogAbandonedStream(*it, status); 1589 CloseCreatedStreamIterator(it, status); 1590 // No new streams should be created while the session is going 1591 // away. 1592 DCHECK_GT(old_size, created_streams_.size()); 1593 } 1594 1595 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id); 1596 1597 DcheckGoingAway(); 1598 } 1599 1600 void SpdySession::MaybeFinishGoingAway() { 1601 if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) { 1602 DoDrainSession(OK, "Finished going away"); 1603 } 1604 } 1605 1606 void SpdySession::DoDrainSession(Error err, const std::string& description) { 1607 if (availability_state_ == STATE_DRAINING) { 1608 return; 1609 } 1610 MakeUnavailable(); 1611 1612 // If |err| indicates an error occurred, inform the peer that we're closing 1613 // and why. Don't GOAWAY on a graceful or idle close, as that may 1614 // unnecessarily wake the radio. We could technically GOAWAY on network errors 1615 // (we'll probably fail to actually write it, but that's okay), however many 1616 // unit-tests would need to be updated. 1617 if (err != OK && 1618 err != ERR_ABORTED && // Used by SpdySessionPool to close idle sessions. 1619 err != ERR_NETWORK_CHANGED && // Used to deprecate sessions on IP change. 1620 err != ERR_SOCKET_NOT_CONNECTED && 1621 err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) { 1622 // Enqueue a GOAWAY to inform the peer of why we're closing the connection. 1623 SpdyGoAwayIR goaway_ir(0, // Last accepted stream ID. 1624 MapNetErrorToGoAwayStatus(err), 1625 description); 1626 EnqueueSessionWrite(HIGHEST, 1627 GOAWAY, 1628 scoped_ptr<SpdyFrame>( 1629 buffered_spdy_framer_->SerializeFrame(goaway_ir))); 1630 } 1631 1632 availability_state_ = STATE_DRAINING; 1633 error_on_close_ = err; 1634 1635 net_log_.AddEvent( 1636 NetLog::TYPE_SPDY_SESSION_CLOSE, 1637 base::Bind(&NetLogSpdySessionCloseCallback, err, &description)); 1638 1639 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err); 1640 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors", 1641 total_bytes_received_, 1, 100000000, 50); 1642 1643 if (err == OK) { 1644 // We ought to be going away already, as this is a graceful close. 1645 DcheckGoingAway(); 1646 } else { 1647 StartGoingAway(0, err); 1648 } 1649 DcheckDraining(); 1650 MaybePostWriteLoop(); 1651 } 1652 1653 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) { 1654 DCHECK(stream); 1655 std::string description = base::StringPrintf( 1656 "ABANDONED (stream_id=%d): ", stream->stream_id()) + 1657 stream->url().spec(); 1658 stream->LogStreamError(status, description); 1659 // We don't increment the streams abandoned counter here. If the 1660 // stream isn't active (i.e., it hasn't written anything to the wire 1661 // yet) then it's as if it never existed. If it is active, then 1662 // LogAbandonedActiveStream() will increment the counters. 1663 } 1664 1665 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it, 1666 Error status) { 1667 DCHECK_GT(it->first, 0u); 1668 LogAbandonedStream(it->second.stream, status); 1669 ++streams_abandoned_count_; 1670 base::StatsCounter abandoned_streams("spdy.abandoned_streams"); 1671 abandoned_streams.Increment(); 1672 if (it->second.stream->type() == SPDY_PUSH_STREAM && 1673 unclaimed_pushed_streams_.find(it->second.stream->url()) != 1674 unclaimed_pushed_streams_.end()) { 1675 base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams"); 1676 abandoned_push_streams.Increment(); 1677 } 1678 } 1679 1680 SpdyStreamId SpdySession::GetNewStreamId() { 1681 CHECK_LE(stream_hi_water_mark_, kLastStreamId); 1682 SpdyStreamId id = stream_hi_water_mark_; 1683 stream_hi_water_mark_ += 2; 1684 return id; 1685 } 1686 1687 void SpdySession::CloseSessionOnError(Error err, 1688 const std::string& description) { 1689 DCHECK_LT(err, ERR_IO_PENDING); 1690 DoDrainSession(err, description); 1691 } 1692 1693 void SpdySession::MakeUnavailable() { 1694 if (availability_state_ == STATE_AVAILABLE) { 1695 availability_state_ = STATE_GOING_AWAY; 1696 pool_->MakeSessionUnavailable(GetWeakPtr()); 1697 } 1698 } 1699 1700 base::Value* SpdySession::GetInfoAsValue() const { 1701 base::DictionaryValue* dict = new base::DictionaryValue(); 1702 1703 dict->SetInteger("source_id", net_log_.source().id); 1704 1705 dict->SetString("host_port_pair", host_port_pair().ToString()); 1706 if (!pooled_aliases_.empty()) { 1707 base::ListValue* alias_list = new base::ListValue(); 1708 for (std::set<SpdySessionKey>::const_iterator it = 1709 pooled_aliases_.begin(); 1710 it != pooled_aliases_.end(); it++) { 1711 alias_list->Append(new base::StringValue( 1712 it->host_port_pair().ToString())); 1713 } 1714 dict->Set("aliases", alias_list); 1715 } 1716 dict->SetString("proxy", host_port_proxy_pair().second.ToURI()); 1717 1718 dict->SetInteger("active_streams", active_streams_.size()); 1719 1720 dict->SetInteger("unclaimed_pushed_streams", 1721 unclaimed_pushed_streams_.size()); 1722 1723 dict->SetBoolean("is_secure", is_secure_); 1724 1725 dict->SetString("protocol_negotiated", 1726 SSLClientSocket::NextProtoToString( 1727 connection_->socket()->GetNegotiatedProtocol())); 1728 1729 dict->SetInteger("error", error_on_close_); 1730 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_); 1731 1732 dict->SetInteger("streams_initiated_count", streams_initiated_count_); 1733 dict->SetInteger("streams_pushed_count", streams_pushed_count_); 1734 dict->SetInteger("streams_pushed_and_claimed_count", 1735 streams_pushed_and_claimed_count_); 1736 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_); 1737 DCHECK(buffered_spdy_framer_.get()); 1738 dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received()); 1739 1740 dict->SetBoolean("sent_settings", sent_settings_); 1741 dict->SetBoolean("received_settings", received_settings_); 1742 1743 dict->SetInteger("send_window_size", session_send_window_size_); 1744 dict->SetInteger("recv_window_size", session_recv_window_size_); 1745 dict->SetInteger("unacked_recv_window_bytes", 1746 session_unacked_recv_window_bytes_); 1747 return dict; 1748 } 1749 1750 bool SpdySession::IsReused() const { 1751 return buffered_spdy_framer_->frames_received() > 0 || 1752 connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE; 1753 } 1754 1755 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id, 1756 LoadTimingInfo* load_timing_info) const { 1757 return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId, 1758 load_timing_info); 1759 } 1760 1761 int SpdySession::GetPeerAddress(IPEndPoint* address) const { 1762 int rv = ERR_SOCKET_NOT_CONNECTED; 1763 if (connection_->socket()) { 1764 rv = connection_->socket()->GetPeerAddress(address); 1765 } 1766 1767 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress", 1768 rv == ERR_SOCKET_NOT_CONNECTED); 1769 1770 return rv; 1771 } 1772 1773 int SpdySession::GetLocalAddress(IPEndPoint* address) const { 1774 int rv = ERR_SOCKET_NOT_CONNECTED; 1775 if (connection_->socket()) { 1776 rv = connection_->socket()->GetLocalAddress(address); 1777 } 1778 1779 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress", 1780 rv == ERR_SOCKET_NOT_CONNECTED); 1781 1782 return rv; 1783 } 1784 1785 void SpdySession::EnqueueSessionWrite(RequestPriority priority, 1786 SpdyFrameType frame_type, 1787 scoped_ptr<SpdyFrame> frame) { 1788 DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS || 1789 frame_type == WINDOW_UPDATE || frame_type == PING || 1790 frame_type == GOAWAY); 1791 EnqueueWrite( 1792 priority, frame_type, 1793 scoped_ptr<SpdyBufferProducer>( 1794 new SimpleBufferProducer( 1795 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), 1796 base::WeakPtr<SpdyStream>()); 1797 } 1798 1799 void SpdySession::EnqueueWrite(RequestPriority priority, 1800 SpdyFrameType frame_type, 1801 scoped_ptr<SpdyBufferProducer> producer, 1802 const base::WeakPtr<SpdyStream>& stream) { 1803 if (availability_state_ == STATE_DRAINING) 1804 return; 1805 1806 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); 1807 MaybePostWriteLoop(); 1808 } 1809 1810 void SpdySession::MaybePostWriteLoop() { 1811 if (write_state_ == WRITE_STATE_IDLE) { 1812 CHECK(!in_flight_write_); 1813 write_state_ = WRITE_STATE_DO_WRITE; 1814 base::MessageLoop::current()->PostTask( 1815 FROM_HERE, 1816 base::Bind(&SpdySession::PumpWriteLoop, 1817 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK)); 1818 } 1819 } 1820 1821 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) { 1822 CHECK_EQ(stream->stream_id(), 0u); 1823 CHECK(created_streams_.find(stream.get()) == created_streams_.end()); 1824 created_streams_.insert(stream.release()); 1825 } 1826 1827 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) { 1828 CHECK_EQ(stream->stream_id(), 0u); 1829 CHECK(created_streams_.find(stream) != created_streams_.end()); 1830 stream->set_stream_id(GetNewStreamId()); 1831 scoped_ptr<SpdyStream> owned_stream(stream); 1832 created_streams_.erase(stream); 1833 return owned_stream.Pass(); 1834 } 1835 1836 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) { 1837 SpdyStreamId stream_id = stream->stream_id(); 1838 CHECK_NE(stream_id, 0u); 1839 std::pair<ActiveStreamMap::iterator, bool> result = 1840 active_streams_.insert( 1841 std::make_pair(stream_id, ActiveStreamInfo(stream.get()))); 1842 CHECK(result.second); 1843 ignore_result(stream.release()); 1844 } 1845 1846 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) { 1847 if (in_flight_write_stream_.get() == stream.get()) { 1848 // If we're deleting the stream for the in-flight write, we still 1849 // need to let the write complete, so we clear 1850 // |in_flight_write_stream_| and let the write finish on its own 1851 // without notifying |in_flight_write_stream_|. 1852 in_flight_write_stream_.reset(); 1853 } 1854 1855 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr()); 1856 stream->OnClose(status); 1857 1858 if (availability_state_ == STATE_AVAILABLE) { 1859 ProcessPendingStreamRequests(); 1860 } 1861 } 1862 1863 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) { 1864 base::StatsCounter used_push_streams("spdy.claimed_push_streams"); 1865 1866 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url); 1867 if (unclaimed_it == unclaimed_pushed_streams_.end()) 1868 return base::WeakPtr<SpdyStream>(); 1869 1870 SpdyStreamId stream_id = unclaimed_it->second.stream_id; 1871 unclaimed_pushed_streams_.erase(unclaimed_it); 1872 1873 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 1874 if (active_it == active_streams_.end()) { 1875 NOTREACHED(); 1876 return base::WeakPtr<SpdyStream>(); 1877 } 1878 1879 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM); 1880 used_push_streams.Increment(); 1881 return active_it->second.stream->GetWeakPtr(); 1882 } 1883 1884 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, 1885 bool* was_npn_negotiated, 1886 NextProto* protocol_negotiated) { 1887 *was_npn_negotiated = connection_->socket()->WasNpnNegotiated(); 1888 *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol(); 1889 return connection_->socket()->GetSSLInfo(ssl_info); 1890 } 1891 1892 bool SpdySession::GetSSLCertRequestInfo( 1893 SSLCertRequestInfo* cert_request_info) { 1894 if (!is_secure_) 1895 return false; 1896 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info); 1897 return true; 1898 } 1899 1900 void SpdySession::OnError(SpdyFramer::SpdyError error_code) { 1901 CHECK(in_io_loop_); 1902 1903 RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code)); 1904 std::string description = 1905 base::StringPrintf("Framer error: %d (%s).", 1906 error_code, 1907 SpdyFramer::ErrorCodeToString(error_code)); 1908 DoDrainSession(MapFramerErrorToNetError(error_code), description); 1909 } 1910 1911 void SpdySession::OnStreamError(SpdyStreamId stream_id, 1912 const std::string& description) { 1913 CHECK(in_io_loop_); 1914 1915 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1916 if (it == active_streams_.end()) { 1917 // We still want to send a frame to reset the stream even if we 1918 // don't know anything about it. 1919 EnqueueResetStreamFrame( 1920 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description); 1921 return; 1922 } 1923 1924 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description); 1925 } 1926 1927 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id, 1928 size_t length, 1929 bool fin) { 1930 CHECK(in_io_loop_); 1931 1932 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1933 1934 // By the time data comes in, the stream may already be inactive. 1935 if (it == active_streams_.end()) 1936 return; 1937 1938 SpdyStream* stream = it->second.stream; 1939 CHECK_EQ(stream->stream_id(), stream_id); 1940 1941 DCHECK(buffered_spdy_framer_); 1942 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize(); 1943 stream->IncrementRawReceivedBytes(header_len); 1944 } 1945 1946 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, 1947 const char* data, 1948 size_t len, 1949 bool fin) { 1950 CHECK(in_io_loop_); 1951 1952 if (data == NULL && len != 0) { 1953 // This is notification of consumed data padding. 1954 // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames. 1955 // See crbug.com/353012. 1956 return; 1957 } 1958 1959 DCHECK_LT(len, 1u << 24); 1960 if (net_log().IsLogging()) { 1961 net_log().AddEvent( 1962 NetLog::TYPE_SPDY_SESSION_RECV_DATA, 1963 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); 1964 } 1965 1966 // Build the buffer as early as possible so that we go through the 1967 // session flow control checks and update 1968 // |unacked_recv_window_bytes_| properly even when the stream is 1969 // inactive (since the other side has still reduced its session send 1970 // window). 1971 scoped_ptr<SpdyBuffer> buffer; 1972 if (data) { 1973 DCHECK_GT(len, 0u); 1974 CHECK_LE(len, static_cast<size_t>(kReadBufferSize)); 1975 buffer.reset(new SpdyBuffer(data, len)); 1976 1977 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 1978 DecreaseRecvWindowSize(static_cast<int32>(len)); 1979 buffer->AddConsumeCallback( 1980 base::Bind(&SpdySession::OnReadBufferConsumed, 1981 weak_factory_.GetWeakPtr())); 1982 } 1983 } else { 1984 DCHECK_EQ(len, 0u); 1985 } 1986 1987 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 1988 1989 // By the time data comes in, the stream may already be inactive. 1990 if (it == active_streams_.end()) 1991 return; 1992 1993 SpdyStream* stream = it->second.stream; 1994 CHECK_EQ(stream->stream_id(), stream_id); 1995 1996 stream->IncrementRawReceivedBytes(len); 1997 1998 if (it->second.waiting_for_syn_reply) { 1999 const std::string& error = "Data received before SYN_REPLY."; 2000 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2001 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 2002 return; 2003 } 2004 2005 stream->OnDataReceived(buffer.Pass()); 2006 } 2007 2008 void SpdySession::OnSettings(bool clear_persisted) { 2009 CHECK(in_io_loop_); 2010 2011 if (clear_persisted) 2012 http_server_properties_->ClearSpdySettings(host_port_pair()); 2013 2014 if (net_log_.IsLogging()) { 2015 net_log_.AddEvent( 2016 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS, 2017 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(), 2018 clear_persisted)); 2019 } 2020 2021 if (GetProtocolVersion() >= SPDY4) { 2022 // Send an acknowledgment of the setting. 2023 SpdySettingsIR settings_ir; 2024 settings_ir.set_is_ack(true); 2025 EnqueueSessionWrite( 2026 HIGHEST, 2027 SETTINGS, 2028 scoped_ptr<SpdyFrame>( 2029 buffered_spdy_framer_->SerializeFrame(settings_ir))); 2030 } 2031 } 2032 2033 void SpdySession::OnSetting(SpdySettingsIds id, 2034 uint8 flags, 2035 uint32 value) { 2036 CHECK(in_io_loop_); 2037 2038 HandleSetting(id, value); 2039 http_server_properties_->SetSpdySetting( 2040 host_port_pair(), 2041 id, 2042 static_cast<SpdySettingsFlags>(flags), 2043 value); 2044 received_settings_ = true; 2045 2046 // Log the setting. 2047 net_log_.AddEvent( 2048 NetLog::TYPE_SPDY_SESSION_RECV_SETTING, 2049 base::Bind(&NetLogSpdySettingCallback, 2050 id, static_cast<SpdySettingsFlags>(flags), value)); 2051 } 2052 2053 void SpdySession::OnSendCompressedFrame( 2054 SpdyStreamId stream_id, 2055 SpdyFrameType type, 2056 size_t payload_len, 2057 size_t frame_len) { 2058 if (type != SYN_STREAM) 2059 return; 2060 2061 DCHECK(buffered_spdy_framer_.get()); 2062 size_t compressed_len = 2063 frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize(); 2064 2065 if (payload_len) { 2066 // Make sure we avoid early decimal truncation. 2067 int compression_pct = 100 - (100 * compressed_len) / payload_len; 2068 UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage", 2069 compression_pct); 2070 } 2071 } 2072 2073 void SpdySession::OnReceiveCompressedFrame( 2074 SpdyStreamId stream_id, 2075 SpdyFrameType type, 2076 size_t frame_len) { 2077 last_compressed_frame_len_ = frame_len; 2078 } 2079 2080 int SpdySession::OnInitialResponseHeadersReceived( 2081 const SpdyHeaderBlock& response_headers, 2082 base::Time response_time, 2083 base::TimeTicks recv_first_byte_time, 2084 SpdyStream* stream) { 2085 CHECK(in_io_loop_); 2086 SpdyStreamId stream_id = stream->stream_id(); 2087 // May invalidate |stream|. 2088 int rv = stream->OnInitialResponseHeadersReceived( 2089 response_headers, response_time, recv_first_byte_time); 2090 if (rv < 0) { 2091 DCHECK_NE(rv, ERR_IO_PENDING); 2092 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 2093 } 2094 return rv; 2095 } 2096 2097 void SpdySession::OnSynStream(SpdyStreamId stream_id, 2098 SpdyStreamId associated_stream_id, 2099 SpdyPriority priority, 2100 bool fin, 2101 bool unidirectional, 2102 const SpdyHeaderBlock& headers) { 2103 CHECK(in_io_loop_); 2104 2105 if (GetProtocolVersion() >= SPDY4) { 2106 DCHECK_EQ(0u, associated_stream_id); 2107 OnHeaders(stream_id, fin, headers); 2108 return; 2109 } 2110 2111 base::Time response_time = base::Time::Now(); 2112 base::TimeTicks recv_first_byte_time = time_func_(); 2113 2114 if (net_log_.IsLogging()) { 2115 net_log_.AddEvent( 2116 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM, 2117 base::Bind(&NetLogSpdySynStreamReceivedCallback, 2118 &headers, fin, unidirectional, priority, 2119 stream_id, associated_stream_id)); 2120 } 2121 2122 // Split headers to simulate push promise and response. 2123 SpdyHeaderBlock request_headers; 2124 SpdyHeaderBlock response_headers; 2125 SplitPushedHeadersToRequestAndResponse( 2126 headers, GetProtocolVersion(), &request_headers, &response_headers); 2127 2128 if (!TryCreatePushStream( 2129 stream_id, associated_stream_id, priority, request_headers)) 2130 return; 2131 2132 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 2133 if (active_it == active_streams_.end()) { 2134 NOTREACHED(); 2135 return; 2136 } 2137 2138 if (OnInitialResponseHeadersReceived(response_headers, 2139 response_time, 2140 recv_first_byte_time, 2141 active_it->second.stream) != OK) 2142 return; 2143 2144 base::StatsCounter push_requests("spdy.pushed_streams"); 2145 push_requests.Increment(); 2146 } 2147 2148 void SpdySession::DeleteExpiredPushedStreams() { 2149 if (unclaimed_pushed_streams_.empty()) 2150 return; 2151 2152 // Check that adequate time has elapsed since the last sweep. 2153 if (time_func_() < next_unclaimed_push_stream_sweep_time_) 2154 return; 2155 2156 // Gather old streams to delete. 2157 base::TimeTicks minimum_freshness = time_func_() - 2158 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2159 std::vector<SpdyStreamId> streams_to_close; 2160 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); 2161 it != unclaimed_pushed_streams_.end(); ++it) { 2162 if (minimum_freshness > it->second.creation_time) 2163 streams_to_close.push_back(it->second.stream_id); 2164 } 2165 2166 for (std::vector<SpdyStreamId>::const_iterator to_close_it = 2167 streams_to_close.begin(); 2168 to_close_it != streams_to_close.end(); ++to_close_it) { 2169 ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it); 2170 if (active_it == active_streams_.end()) 2171 continue; 2172 2173 LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM); 2174 // CloseActiveStreamIterator() will remove the stream from 2175 // |unclaimed_pushed_streams_|. 2176 ResetStreamIterator( 2177 active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed."); 2178 } 2179 2180 next_unclaimed_push_stream_sweep_time_ = time_func_() + 2181 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds); 2182 } 2183 2184 void SpdySession::OnSynReply(SpdyStreamId stream_id, 2185 bool fin, 2186 const SpdyHeaderBlock& headers) { 2187 CHECK(in_io_loop_); 2188 2189 base::Time response_time = base::Time::Now(); 2190 base::TimeTicks recv_first_byte_time = time_func_(); 2191 2192 if (net_log().IsLogging()) { 2193 net_log().AddEvent( 2194 NetLog::TYPE_SPDY_SESSION_SYN_REPLY, 2195 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback, 2196 &headers, fin, stream_id)); 2197 } 2198 2199 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2200 if (it == active_streams_.end()) { 2201 // NOTE: it may just be that the stream was cancelled. 2202 return; 2203 } 2204 2205 SpdyStream* stream = it->second.stream; 2206 CHECK_EQ(stream->stream_id(), stream_id); 2207 2208 stream->IncrementRawReceivedBytes(last_compressed_frame_len_); 2209 last_compressed_frame_len_ = 0; 2210 2211 if (GetProtocolVersion() >= SPDY4) { 2212 const std::string& error = 2213 "SPDY4 wasn't expecting SYN_REPLY."; 2214 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2215 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 2216 return; 2217 } 2218 if (!it->second.waiting_for_syn_reply) { 2219 const std::string& error = 2220 "Received duplicate SYN_REPLY for stream."; 2221 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2222 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 2223 return; 2224 } 2225 it->second.waiting_for_syn_reply = false; 2226 2227 ignore_result(OnInitialResponseHeadersReceived( 2228 headers, response_time, recv_first_byte_time, stream)); 2229 } 2230 2231 void SpdySession::OnHeaders(SpdyStreamId stream_id, 2232 bool fin, 2233 const SpdyHeaderBlock& headers) { 2234 CHECK(in_io_loop_); 2235 2236 if (net_log().IsLogging()) { 2237 net_log().AddEvent( 2238 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS, 2239 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback, 2240 &headers, fin, stream_id)); 2241 } 2242 2243 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2244 if (it == active_streams_.end()) { 2245 // NOTE: it may just be that the stream was cancelled. 2246 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id; 2247 return; 2248 } 2249 2250 SpdyStream* stream = it->second.stream; 2251 CHECK_EQ(stream->stream_id(), stream_id); 2252 2253 stream->IncrementRawReceivedBytes(last_compressed_frame_len_); 2254 last_compressed_frame_len_ = 0; 2255 2256 base::Time response_time = base::Time::Now(); 2257 base::TimeTicks recv_first_byte_time = time_func_(); 2258 2259 if (it->second.waiting_for_syn_reply) { 2260 if (GetProtocolVersion() < SPDY4) { 2261 const std::string& error = 2262 "Was expecting SYN_REPLY, not HEADERS."; 2263 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); 2264 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error); 2265 return; 2266 } 2267 2268 it->second.waiting_for_syn_reply = false; 2269 ignore_result(OnInitialResponseHeadersReceived( 2270 headers, response_time, recv_first_byte_time, stream)); 2271 } else if (it->second.stream->IsReservedRemote()) { 2272 ignore_result(OnInitialResponseHeadersReceived( 2273 headers, response_time, recv_first_byte_time, stream)); 2274 } else { 2275 int rv = stream->OnAdditionalResponseHeadersReceived(headers); 2276 if (rv < 0) { 2277 DCHECK_NE(rv, ERR_IO_PENDING); 2278 DCHECK(active_streams_.find(stream_id) == active_streams_.end()); 2279 } 2280 } 2281 } 2282 2283 void SpdySession::OnRstStream(SpdyStreamId stream_id, 2284 SpdyRstStreamStatus status) { 2285 CHECK(in_io_loop_); 2286 2287 std::string description; 2288 net_log().AddEvent( 2289 NetLog::TYPE_SPDY_SESSION_RST_STREAM, 2290 base::Bind(&NetLogSpdyRstCallback, 2291 stream_id, status, &description)); 2292 2293 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2294 if (it == active_streams_.end()) { 2295 // NOTE: it may just be that the stream was cancelled. 2296 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 2297 return; 2298 } 2299 2300 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2301 2302 if (status == 0) { 2303 it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>()); 2304 } else if (status == RST_STREAM_REFUSED_STREAM) { 2305 CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM); 2306 } else { 2307 RecordProtocolErrorHistogram( 2308 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM); 2309 it->second.stream->LogStreamError( 2310 ERR_SPDY_PROTOCOL_ERROR, 2311 base::StringPrintf("SPDY stream closed with status: %d", status)); 2312 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 2313 // For now, it doesn't matter much - it is a protocol error. 2314 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR); 2315 } 2316 } 2317 2318 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id, 2319 SpdyGoAwayStatus status) { 2320 CHECK(in_io_loop_); 2321 2322 // TODO(jgraettinger): UMA histogram on |status|. 2323 2324 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY, 2325 base::Bind(&NetLogSpdyGoAwayCallback, 2326 last_accepted_stream_id, 2327 active_streams_.size(), 2328 unclaimed_pushed_streams_.size(), 2329 status)); 2330 MakeUnavailable(); 2331 StartGoingAway(last_accepted_stream_id, ERR_ABORTED); 2332 // This is to handle the case when we already don't have any active 2333 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have 2334 // active streams and so the last one being closed will finish the 2335 // going away process (see DeleteStream()). 2336 MaybeFinishGoingAway(); 2337 } 2338 2339 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) { 2340 CHECK(in_io_loop_); 2341 2342 net_log_.AddEvent( 2343 NetLog::TYPE_SPDY_SESSION_PING, 2344 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received")); 2345 2346 // Send response to a PING from server. 2347 if ((protocol_ >= kProtoSPDY4 && !is_ack) || 2348 (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) { 2349 WritePingFrame(unique_id, true); 2350 return; 2351 } 2352 2353 --pings_in_flight_; 2354 if (pings_in_flight_ < 0) { 2355 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING); 2356 DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0."); 2357 pings_in_flight_ = 0; 2358 return; 2359 } 2360 2361 if (pings_in_flight_ > 0) 2362 return; 2363 2364 // We will record RTT in histogram when there are no more client sent 2365 // pings_in_flight_. 2366 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_); 2367 } 2368 2369 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id, 2370 uint32 delta_window_size) { 2371 CHECK(in_io_loop_); 2372 2373 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max)); 2374 net_log_.AddEvent( 2375 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME, 2376 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2377 stream_id, delta_window_size)); 2378 2379 if (stream_id == kSessionFlowControlStreamId) { 2380 // WINDOW_UPDATE for the session. 2381 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) { 2382 LOG(WARNING) << "Received WINDOW_UPDATE for session when " 2383 << "session flow control is not turned on"; 2384 // TODO(akalin): Record an error and close the session. 2385 return; 2386 } 2387 2388 if (delta_window_size < 1u) { 2389 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2390 DoDrainSession( 2391 ERR_SPDY_PROTOCOL_ERROR, 2392 "Received WINDOW_UPDATE with an invalid delta_window_size " + 2393 base::UintToString(delta_window_size)); 2394 return; 2395 } 2396 2397 IncreaseSendWindowSize(static_cast<int32>(delta_window_size)); 2398 } else { 2399 // WINDOW_UPDATE for a stream. 2400 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2401 // TODO(akalin): Record an error and close the session. 2402 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id 2403 << " when flow control is not turned on"; 2404 return; 2405 } 2406 2407 ActiveStreamMap::iterator it = active_streams_.find(stream_id); 2408 2409 if (it == active_streams_.end()) { 2410 // NOTE: it may just be that the stream was cancelled. 2411 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id; 2412 return; 2413 } 2414 2415 SpdyStream* stream = it->second.stream; 2416 CHECK_EQ(stream->stream_id(), stream_id); 2417 2418 if (delta_window_size < 1u) { 2419 ResetStreamIterator(it, 2420 RST_STREAM_FLOW_CONTROL_ERROR, 2421 base::StringPrintf( 2422 "Received WINDOW_UPDATE with an invalid " 2423 "delta_window_size %ud", delta_window_size)); 2424 return; 2425 } 2426 2427 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2428 it->second.stream->IncreaseSendWindowSize( 2429 static_cast<int32>(delta_window_size)); 2430 } 2431 } 2432 2433 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id, 2434 SpdyStreamId associated_stream_id, 2435 SpdyPriority priority, 2436 const SpdyHeaderBlock& headers) { 2437 // Server-initiated streams should have even sequence numbers. 2438 if ((stream_id & 0x1) != 0) { 2439 LOG(WARNING) << "Received invalid push stream id " << stream_id; 2440 return false; 2441 } 2442 2443 if (IsStreamActive(stream_id)) { 2444 LOG(WARNING) << "Received push for active stream " << stream_id; 2445 return false; 2446 } 2447 2448 RequestPriority request_priority = 2449 ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion()); 2450 2451 if (availability_state_ == STATE_GOING_AWAY) { 2452 // TODO(akalin): This behavior isn't in the SPDY spec, although it 2453 // probably should be. 2454 EnqueueResetStreamFrame(stream_id, 2455 request_priority, 2456 RST_STREAM_REFUSED_STREAM, 2457 "push stream request received when going away"); 2458 return false; 2459 } 2460 2461 if (associated_stream_id == 0) { 2462 // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and 2463 // session going away. We should never get here. 2464 CHECK_GT(SPDY4, GetProtocolVersion()); 2465 std::string description = base::StringPrintf( 2466 "Received invalid associated stream id %d for pushed stream %d", 2467 associated_stream_id, 2468 stream_id); 2469 EnqueueResetStreamFrame( 2470 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description); 2471 return false; 2472 } 2473 2474 streams_pushed_count_++; 2475 2476 // TODO(mbelshe): DCHECK that this is a GET method? 2477 2478 // Verify that the response had a URL for us. 2479 GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true); 2480 if (!gurl.is_valid()) { 2481 EnqueueResetStreamFrame(stream_id, 2482 request_priority, 2483 RST_STREAM_PROTOCOL_ERROR, 2484 "Pushed stream url was invalid: " + gurl.spec()); 2485 return false; 2486 } 2487 2488 // Verify we have a valid stream association. 2489 ActiveStreamMap::iterator associated_it = 2490 active_streams_.find(associated_stream_id); 2491 if (associated_it == active_streams_.end()) { 2492 EnqueueResetStreamFrame( 2493 stream_id, 2494 request_priority, 2495 RST_STREAM_INVALID_STREAM, 2496 base::StringPrintf("Received push for inactive associated stream %d", 2497 associated_stream_id)); 2498 return false; 2499 } 2500 2501 // Check that the pushed stream advertises the same origin as its associated 2502 // stream. Bypass this check if and only if this session is with a SPDY proxy 2503 // that is trusted explicitly via the --trusted-spdy-proxy switch. 2504 if (trusted_spdy_proxy_.Equals(host_port_pair())) { 2505 // Disallow pushing of HTTPS content. 2506 if (gurl.SchemeIs("https")) { 2507 EnqueueResetStreamFrame( 2508 stream_id, 2509 request_priority, 2510 RST_STREAM_REFUSED_STREAM, 2511 base::StringPrintf("Rejected push of Cross Origin HTTPS content %d", 2512 associated_stream_id)); 2513 } 2514 } else { 2515 GURL associated_url(associated_it->second.stream->GetUrlFromHeaders()); 2516 if (associated_url.GetOrigin() != gurl.GetOrigin()) { 2517 EnqueueResetStreamFrame( 2518 stream_id, 2519 request_priority, 2520 RST_STREAM_REFUSED_STREAM, 2521 base::StringPrintf("Rejected Cross Origin Push Stream %d", 2522 associated_stream_id)); 2523 return false; 2524 } 2525 } 2526 2527 // There should not be an existing pushed stream with the same path. 2528 PushedStreamMap::iterator pushed_it = 2529 unclaimed_pushed_streams_.lower_bound(gurl); 2530 if (pushed_it != unclaimed_pushed_streams_.end() && 2531 pushed_it->first == gurl) { 2532 EnqueueResetStreamFrame( 2533 stream_id, 2534 request_priority, 2535 RST_STREAM_PROTOCOL_ERROR, 2536 "Received duplicate pushed stream with url: " + gurl.spec()); 2537 return false; 2538 } 2539 2540 scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM, 2541 GetWeakPtr(), 2542 gurl, 2543 request_priority, 2544 stream_initial_send_window_size_, 2545 stream_initial_recv_window_size_, 2546 net_log_)); 2547 stream->set_stream_id(stream_id); 2548 2549 // In spdy4/http2 PUSH_PROMISE arrives on associated stream. 2550 if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) { 2551 associated_it->second.stream->IncrementRawReceivedBytes( 2552 last_compressed_frame_len_); 2553 } else { 2554 stream->IncrementRawReceivedBytes(last_compressed_frame_len_); 2555 } 2556 2557 last_compressed_frame_len_ = 0; 2558 2559 DeleteExpiredPushedStreams(); 2560 PushedStreamMap::iterator inserted_pushed_it = 2561 unclaimed_pushed_streams_.insert( 2562 pushed_it, 2563 std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_()))); 2564 DCHECK(inserted_pushed_it != pushed_it); 2565 2566 InsertActivatedStream(stream.Pass()); 2567 2568 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id); 2569 if (active_it == active_streams_.end()) { 2570 NOTREACHED(); 2571 return false; 2572 } 2573 2574 active_it->second.stream->OnPushPromiseHeadersReceived(headers); 2575 DCHECK(active_it->second.stream->IsReservedRemote()); 2576 return true; 2577 } 2578 2579 void SpdySession::OnPushPromise(SpdyStreamId stream_id, 2580 SpdyStreamId promised_stream_id, 2581 const SpdyHeaderBlock& headers) { 2582 CHECK(in_io_loop_); 2583 2584 if (net_log_.IsLogging()) { 2585 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE, 2586 base::Bind(&NetLogSpdyPushPromiseReceivedCallback, 2587 &headers, 2588 stream_id, 2589 promised_stream_id)); 2590 } 2591 2592 // Any priority will do. 2593 // TODO(baranovich): pass parent stream id priority? 2594 if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers)) 2595 return; 2596 2597 base::StatsCounter push_requests("spdy.pushed_streams"); 2598 push_requests.Increment(); 2599 } 2600 2601 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id, 2602 uint32 delta_window_size) { 2603 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2604 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2605 CHECK(it != active_streams_.end()); 2606 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2607 SendWindowUpdateFrame( 2608 stream_id, delta_window_size, it->second.stream->priority()); 2609 } 2610 2611 void SpdySession::SendInitialData() { 2612 DCHECK(enable_sending_initial_data_); 2613 2614 if (send_connection_header_prefix_) { 2615 DCHECK_EQ(protocol_, kProtoSPDY4); 2616 scoped_ptr<SpdyFrame> connection_header_prefix_frame( 2617 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix), 2618 kHttp2ConnectionHeaderPrefixSize, 2619 false /* take_ownership */)); 2620 // Count the prefix as part of the subsequent SETTINGS frame. 2621 EnqueueSessionWrite(HIGHEST, SETTINGS, 2622 connection_header_prefix_frame.Pass()); 2623 } 2624 2625 // First, notify the server about the settings they should use when 2626 // communicating with us. 2627 SettingsMap settings_map; 2628 // Create a new settings frame notifying the server of our 2629 // max concurrent streams and initial window size. 2630 settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] = 2631 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams); 2632 if (flow_control_state_ >= FLOW_CONTROL_STREAM && 2633 stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) { 2634 settings_map[SETTINGS_INITIAL_WINDOW_SIZE] = 2635 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, 2636 stream_initial_recv_window_size_); 2637 } 2638 SendSettings(settings_map); 2639 2640 // Next, notify the server about our initial recv window size. 2641 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { 2642 // Bump up the receive window size to the real initial value. This 2643 // has to go here since the WINDOW_UPDATE frame sent by 2644 // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|. 2645 DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_); 2646 // This condition implies that |kDefaultInitialRecvWindowSize| - 2647 // |session_recv_window_size_| doesn't overflow. 2648 DCHECK_GT(session_recv_window_size_, 0); 2649 IncreaseRecvWindowSize( 2650 kDefaultInitialRecvWindowSize - session_recv_window_size_); 2651 } 2652 2653 // Finally, notify the server about the settings they have 2654 // previously told us to use when communicating with them (after 2655 // applying them). 2656 const SettingsMap& server_settings_map = 2657 http_server_properties_->GetSpdySettings(host_port_pair()); 2658 if (server_settings_map.empty()) 2659 return; 2660 2661 SettingsMap::const_iterator it = 2662 server_settings_map.find(SETTINGS_CURRENT_CWND); 2663 uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0; 2664 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100); 2665 2666 for (SettingsMap::const_iterator it = server_settings_map.begin(); 2667 it != server_settings_map.end(); ++it) { 2668 const SpdySettingsIds new_id = it->first; 2669 const uint32 new_val = it->second.second; 2670 HandleSetting(new_id, new_val); 2671 } 2672 2673 SendSettings(server_settings_map); 2674 } 2675 2676 2677 void SpdySession::SendSettings(const SettingsMap& settings) { 2678 net_log_.AddEvent( 2679 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 2680 base::Bind(&NetLogSpdySendSettingsCallback, &settings)); 2681 2682 // Create the SETTINGS frame and send it. 2683 DCHECK(buffered_spdy_framer_.get()); 2684 scoped_ptr<SpdyFrame> settings_frame( 2685 buffered_spdy_framer_->CreateSettings(settings)); 2686 sent_settings_ = true; 2687 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass()); 2688 } 2689 2690 void SpdySession::HandleSetting(uint32 id, uint32 value) { 2691 switch (id) { 2692 case SETTINGS_MAX_CONCURRENT_STREAMS: 2693 max_concurrent_streams_ = std::min(static_cast<size_t>(value), 2694 kMaxConcurrentStreamLimit); 2695 ProcessPendingStreamRequests(); 2696 break; 2697 case SETTINGS_INITIAL_WINDOW_SIZE: { 2698 if (flow_control_state_ < FLOW_CONTROL_STREAM) { 2699 net_log().AddEvent( 2700 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL); 2701 return; 2702 } 2703 2704 if (value > static_cast<uint32>(kint32max)) { 2705 net_log().AddEvent( 2706 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE, 2707 NetLog::IntegerCallback("initial_window_size", value)); 2708 return; 2709 } 2710 2711 // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only. 2712 int32 delta_window_size = 2713 static_cast<int32>(value) - stream_initial_send_window_size_; 2714 stream_initial_send_window_size_ = static_cast<int32>(value); 2715 UpdateStreamsSendWindowSize(delta_window_size); 2716 net_log().AddEvent( 2717 NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE, 2718 NetLog::IntegerCallback("delta_window_size", delta_window_size)); 2719 break; 2720 } 2721 } 2722 } 2723 2724 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { 2725 DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2726 for (ActiveStreamMap::iterator it = active_streams_.begin(); 2727 it != active_streams_.end(); ++it) { 2728 it->second.stream->AdjustSendWindowSize(delta_window_size); 2729 } 2730 2731 for (CreatedStreamSet::const_iterator it = created_streams_.begin(); 2732 it != created_streams_.end(); it++) { 2733 (*it)->AdjustSendWindowSize(delta_window_size); 2734 } 2735 } 2736 2737 void SpdySession::SendPrefacePingIfNoneInFlight() { 2738 if (pings_in_flight_ || !enable_ping_based_connection_checking_) 2739 return; 2740 2741 base::TimeTicks now = time_func_(); 2742 // If there is no activity in the session, then send a preface-PING. 2743 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_) 2744 SendPrefacePing(); 2745 } 2746 2747 void SpdySession::SendPrefacePing() { 2748 WritePingFrame(next_ping_id_, false); 2749 } 2750 2751 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, 2752 uint32 delta_window_size, 2753 RequestPriority priority) { 2754 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM); 2755 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 2756 if (it != active_streams_.end()) { 2757 CHECK_EQ(it->second.stream->stream_id(), stream_id); 2758 } else { 2759 CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2760 CHECK_EQ(stream_id, kSessionFlowControlStreamId); 2761 } 2762 2763 net_log_.AddEvent( 2764 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME, 2765 base::Bind(&NetLogSpdyWindowUpdateFrameCallback, 2766 stream_id, delta_window_size)); 2767 2768 DCHECK(buffered_spdy_framer_.get()); 2769 scoped_ptr<SpdyFrame> window_update_frame( 2770 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); 2771 EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass()); 2772 } 2773 2774 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) { 2775 DCHECK(buffered_spdy_framer_.get()); 2776 scoped_ptr<SpdyFrame> ping_frame( 2777 buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack)); 2778 EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass()); 2779 2780 if (net_log().IsLogging()) { 2781 net_log().AddEvent( 2782 NetLog::TYPE_SPDY_SESSION_PING, 2783 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent")); 2784 } 2785 if (!is_ack) { 2786 next_ping_id_ += 2; 2787 ++pings_in_flight_; 2788 PlanToCheckPingStatus(); 2789 last_ping_sent_time_ = time_func_(); 2790 } 2791 } 2792 2793 void SpdySession::PlanToCheckPingStatus() { 2794 if (check_ping_status_pending_) 2795 return; 2796 2797 check_ping_status_pending_ = true; 2798 base::MessageLoop::current()->PostDelayedTask( 2799 FROM_HERE, 2800 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2801 time_func_()), hung_interval_); 2802 } 2803 2804 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) { 2805 CHECK(!in_io_loop_); 2806 2807 // Check if we got a response back for all PINGs we had sent. 2808 if (pings_in_flight_ == 0) { 2809 check_ping_status_pending_ = false; 2810 return; 2811 } 2812 2813 DCHECK(check_ping_status_pending_); 2814 2815 base::TimeTicks now = time_func_(); 2816 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_); 2817 2818 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) { 2819 // Track all failed PING messages in a separate bucket. 2820 RecordPingRTTHistogram(base::TimeDelta::Max()); 2821 DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping."); 2822 return; 2823 } 2824 2825 // Check the status of connection after a delay. 2826 base::MessageLoop::current()->PostDelayedTask( 2827 FROM_HERE, 2828 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(), 2829 now), 2830 delay); 2831 } 2832 2833 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) { 2834 UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration); 2835 } 2836 2837 void SpdySession::RecordProtocolErrorHistogram( 2838 SpdyProtocolErrorDetails details) { 2839 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details, 2840 NUM_SPDY_PROTOCOL_ERROR_DETAILS); 2841 if (EndsWith(host_port_pair().host(), "google.com", false)) { 2842 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details, 2843 NUM_SPDY_PROTOCOL_ERROR_DETAILS); 2844 } 2845 } 2846 2847 void SpdySession::RecordHistograms() { 2848 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession", 2849 streams_initiated_count_, 2850 0, 300, 50); 2851 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession", 2852 streams_pushed_count_, 2853 0, 300, 50); 2854 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession", 2855 streams_pushed_and_claimed_count_, 2856 0, 300, 50); 2857 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession", 2858 streams_abandoned_count_, 2859 0, 300, 50); 2860 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent", 2861 sent_settings_ ? 1 : 0, 2); 2862 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived", 2863 received_settings_ ? 1 : 0, 2); 2864 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession", 2865 stalled_streams_, 2866 0, 300, 50); 2867 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls", 2868 stalled_streams_ > 0 ? 1 : 0, 2); 2869 2870 if (received_settings_) { 2871 // Enumerate the saved settings, and set histograms for it. 2872 const SettingsMap& settings_map = 2873 http_server_properties_->GetSpdySettings(host_port_pair()); 2874 2875 SettingsMap::const_iterator it; 2876 for (it = settings_map.begin(); it != settings_map.end(); ++it) { 2877 const SpdySettingsIds id = it->first; 2878 const uint32 val = it->second.second; 2879 switch (id) { 2880 case SETTINGS_CURRENT_CWND: 2881 // Record several different histograms to see if cwnd converges 2882 // for larger volumes of data being sent. 2883 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd", 2884 val, 1, 200, 100); 2885 if (total_bytes_received_ > 10 * 1024) { 2886 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K", 2887 val, 1, 200, 100); 2888 if (total_bytes_received_ > 25 * 1024) { 2889 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K", 2890 val, 1, 200, 100); 2891 if (total_bytes_received_ > 50 * 1024) { 2892 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K", 2893 val, 1, 200, 100); 2894 if (total_bytes_received_ > 100 * 1024) { 2895 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K", 2896 val, 1, 200, 100); 2897 } 2898 } 2899 } 2900 } 2901 break; 2902 case SETTINGS_ROUND_TRIP_TIME: 2903 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT", 2904 val, 1, 1200, 100); 2905 break; 2906 case SETTINGS_DOWNLOAD_RETRANS_RATE: 2907 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate", 2908 val, 1, 100, 50); 2909 break; 2910 default: 2911 break; 2912 } 2913 } 2914 } 2915 } 2916 2917 void SpdySession::CompleteStreamRequest( 2918 const base::WeakPtr<SpdyStreamRequest>& pending_request) { 2919 // Abort if the request has already been cancelled. 2920 if (!pending_request) 2921 return; 2922 2923 base::WeakPtr<SpdyStream> stream; 2924 int rv = TryCreateStream(pending_request, &stream); 2925 2926 if (rv == OK) { 2927 DCHECK(stream); 2928 pending_request->OnRequestCompleteSuccess(stream); 2929 return; 2930 } 2931 DCHECK(!stream); 2932 2933 if (rv != ERR_IO_PENDING) { 2934 pending_request->OnRequestCompleteFailure(rv); 2935 } 2936 } 2937 2938 SSLClientSocket* SpdySession::GetSSLClientSocket() const { 2939 if (!is_secure_) 2940 return NULL; 2941 SSLClientSocket* ssl_socket = 2942 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 2943 DCHECK(ssl_socket); 2944 return ssl_socket; 2945 } 2946 2947 void SpdySession::OnWriteBufferConsumed( 2948 size_t frame_payload_size, 2949 size_t consume_size, 2950 SpdyBuffer::ConsumeSource consume_source) { 2951 // We can be called with |in_io_loop_| set if a write SpdyBuffer is 2952 // deleted (e.g., a stream is closed due to incoming data). 2953 2954 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2955 2956 if (consume_source == SpdyBuffer::DISCARD) { 2957 // If we're discarding a frame or part of it, increase the send 2958 // window by the number of discarded bytes. (Although if we're 2959 // discarding part of a frame, it's probably because of a write 2960 // error and we'll be tearing down the session soon.) 2961 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); 2962 DCHECK_GT(remaining_payload_bytes, 0u); 2963 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); 2964 } 2965 // For consumed bytes, the send window is increased when we receive 2966 // a WINDOW_UPDATE frame. 2967 } 2968 2969 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) { 2970 // We can be called with |in_io_loop_| set if a SpdyBuffer is 2971 // deleted (e.g., a stream is closed due to incoming data). 2972 2973 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 2974 DCHECK_GE(delta_window_size, 1); 2975 2976 // Check for overflow. 2977 int32 max_delta_window_size = kint32max - session_send_window_size_; 2978 if (delta_window_size > max_delta_window_size) { 2979 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE); 2980 DoDrainSession( 2981 ERR_SPDY_PROTOCOL_ERROR, 2982 "Received WINDOW_UPDATE [delta: " + 2983 base::IntToString(delta_window_size) + 2984 "] for session overflows session_send_window_size_ [current: " + 2985 base::IntToString(session_send_window_size_) + "]"); 2986 return; 2987 } 2988 2989 session_send_window_size_ += delta_window_size; 2990 2991 net_log_.AddEvent( 2992 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 2993 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 2994 delta_window_size, session_send_window_size_)); 2995 2996 DCHECK(!IsSendStalled()); 2997 ResumeSendStalledStreams(); 2998 } 2999 3000 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) { 3001 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3002 3003 // We only call this method when sending a frame. Therefore, 3004 // |delta_window_size| should be within the valid frame size range. 3005 DCHECK_GE(delta_window_size, 1); 3006 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 3007 3008 // |send_window_size_| should have been at least |delta_window_size| for 3009 // this call to happen. 3010 DCHECK_GE(session_send_window_size_, delta_window_size); 3011 3012 session_send_window_size_ -= delta_window_size; 3013 3014 net_log_.AddEvent( 3015 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW, 3016 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 3017 -delta_window_size, session_send_window_size_)); 3018 } 3019 3020 void SpdySession::OnReadBufferConsumed( 3021 size_t consume_size, 3022 SpdyBuffer::ConsumeSource consume_source) { 3023 // We can be called with |in_io_loop_| set if a read SpdyBuffer is 3024 // deleted (e.g., discarded by a SpdyReadQueue). 3025 3026 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3027 DCHECK_GE(consume_size, 1u); 3028 DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); 3029 3030 IncreaseRecvWindowSize(static_cast<int32>(consume_size)); 3031 } 3032 3033 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) { 3034 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3035 DCHECK_GE(session_unacked_recv_window_bytes_, 0); 3036 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_); 3037 DCHECK_GE(delta_window_size, 1); 3038 // Check for overflow. 3039 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_); 3040 3041 session_recv_window_size_ += delta_window_size; 3042 net_log_.AddEvent( 3043 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 3044 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 3045 delta_window_size, session_recv_window_size_)); 3046 3047 session_unacked_recv_window_bytes_ += delta_window_size; 3048 if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) { 3049 SendWindowUpdateFrame(kSessionFlowControlStreamId, 3050 session_unacked_recv_window_bytes_, 3051 HIGHEST); 3052 session_unacked_recv_window_bytes_ = 0; 3053 } 3054 } 3055 3056 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) { 3057 CHECK(in_io_loop_); 3058 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3059 DCHECK_GE(delta_window_size, 1); 3060 3061 // Since we never decrease the initial receive window size, 3062 // |delta_window_size| should never cause |recv_window_size_| to go 3063 // negative. If we do, the receive window isn't being respected. 3064 if (delta_window_size > session_recv_window_size_) { 3065 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION); 3066 DoDrainSession( 3067 ERR_SPDY_FLOW_CONTROL_ERROR, 3068 "delta_window_size is " + base::IntToString(delta_window_size) + 3069 " in DecreaseRecvWindowSize, which is larger than the receive " + 3070 "window size of " + base::IntToString(session_recv_window_size_)); 3071 return; 3072 } 3073 3074 session_recv_window_size_ -= delta_window_size; 3075 net_log_.AddEvent( 3076 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW, 3077 base::Bind(&NetLogSpdySessionWindowUpdateCallback, 3078 -delta_window_size, session_recv_window_size_)); 3079 } 3080 3081 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) { 3082 DCHECK(stream.send_stalled_by_flow_control()); 3083 RequestPriority priority = stream.priority(); 3084 CHECK_GE(priority, MINIMUM_PRIORITY); 3085 CHECK_LE(priority, MAXIMUM_PRIORITY); 3086 stream_send_unstall_queue_[priority].push_back(stream.stream_id()); 3087 } 3088 3089 void SpdySession::ResumeSendStalledStreams() { 3090 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION); 3091 3092 // We don't have to worry about new streams being queued, since 3093 // doing so would cause IsSendStalled() to return true. But we do 3094 // have to worry about streams being closed, as well as ourselves 3095 // being closed. 3096 3097 while (!IsSendStalled()) { 3098 size_t old_size = 0; 3099 #if DCHECK_IS_ON 3100 old_size = GetTotalSize(stream_send_unstall_queue_); 3101 #endif 3102 3103 SpdyStreamId stream_id = PopStreamToPossiblyResume(); 3104 if (stream_id == 0) 3105 break; 3106 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); 3107 // The stream may actually still be send-stalled after this (due 3108 // to its own send window) but that's okay -- it'll then be 3109 // resumed once its send window increases. 3110 if (it != active_streams_.end()) 3111 it->second.stream->PossiblyResumeIfSendStalled(); 3112 3113 // The size should decrease unless we got send-stalled again. 3114 if (!IsSendStalled()) 3115 DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size); 3116 } 3117 } 3118 3119 SpdyStreamId SpdySession::PopStreamToPossiblyResume() { 3120 for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) { 3121 std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i]; 3122 if (!queue->empty()) { 3123 SpdyStreamId stream_id = queue->front(); 3124 queue->pop_front(); 3125 return stream_id; 3126 } 3127 } 3128 return 0; 3129 } 3130 3131 } // namespace net 3132