1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/spdy/spdy_session.h" 6 7 #include "base/basictypes.h" 8 #include "base/logging.h" 9 #include "base/memory/linked_ptr.h" 10 #include "base/message_loop.h" 11 #include "base/metrics/field_trial.h" 12 #include "base/metrics/stats_counters.h" 13 #include "base/stl_util-inl.h" 14 #include "base/string_number_conversions.h" 15 #include "base/string_util.h" 16 #include "base/stringprintf.h" 17 #include "base/time.h" 18 #include "base/utf_string_conversions.h" 19 #include "base/values.h" 20 #include "net/base/connection_type_histograms.h" 21 #include "net/base/net_log.h" 22 #include "net/base/net_util.h" 23 #include "net/http/http_network_session.h" 24 #include "net/socket/ssl_client_socket.h" 25 #include "net/spdy/spdy_frame_builder.h" 26 #include "net/spdy/spdy_http_utils.h" 27 #include "net/spdy/spdy_protocol.h" 28 #include "net/spdy/spdy_session_pool.h" 29 #include "net/spdy/spdy_settings_storage.h" 30 #include "net/spdy/spdy_stream.h" 31 32 namespace net { 33 34 NetLogSpdySynParameter::NetLogSpdySynParameter( 35 const linked_ptr<spdy::SpdyHeaderBlock>& headers, 36 spdy::SpdyControlFlags flags, 37 spdy::SpdyStreamId id, 38 spdy::SpdyStreamId associated_stream) 39 : headers_(headers), 40 flags_(flags), 41 id_(id), 42 associated_stream_(associated_stream) { 43 } 44 45 NetLogSpdySynParameter::~NetLogSpdySynParameter() { 46 } 47 48 Value* NetLogSpdySynParameter::ToValue() const { 49 DictionaryValue* dict = new DictionaryValue(); 50 ListValue* headers_list = new ListValue(); 51 for (spdy::SpdyHeaderBlock::const_iterator it = headers_->begin(); 52 it != headers_->end(); ++it) { 53 headers_list->Append(new StringValue(base::StringPrintf( 54 "%s: %s", it->first.c_str(), it->second.c_str()))); 55 } 56 dict->SetInteger("flags", flags_); 57 dict->Set("headers", headers_list); 58 dict->SetInteger("id", id_); 59 if (associated_stream_) 60 dict->SetInteger("associated_stream", associated_stream_); 61 return dict; 62 } 63 64 namespace { 65 66 const int kReadBufferSize = 8 * 1024; 67 68 class NetLogSpdySessionParameter : public NetLog::EventParameters { 69 public: 70 NetLogSpdySessionParameter(const HostPortProxyPair& host_pair) 71 : host_pair_(host_pair) {} 72 virtual Value* ToValue() const { 73 DictionaryValue* dict = new DictionaryValue(); 74 dict->Set("host", new StringValue(host_pair_.first.ToString())); 75 dict->Set("proxy", new StringValue(host_pair_.second.ToPacString())); 76 return dict; 77 } 78 private: 79 const HostPortProxyPair host_pair_; 80 DISALLOW_COPY_AND_ASSIGN(NetLogSpdySessionParameter); 81 }; 82 83 class NetLogSpdySettingsParameter : public NetLog::EventParameters { 84 public: 85 explicit NetLogSpdySettingsParameter(const spdy::SpdySettings& settings) 86 : settings_(settings) {} 87 88 virtual Value* ToValue() const { 89 DictionaryValue* dict = new DictionaryValue(); 90 ListValue* settings = new ListValue(); 91 for (spdy::SpdySettings::const_iterator it = settings_.begin(); 92 it != settings_.end(); ++it) { 93 settings->Append(new StringValue( 94 base::StringPrintf("[%u:%u]", it->first.id(), it->second))); 95 } 96 dict->Set("settings", settings); 97 return dict; 98 } 99 100 private: 101 ~NetLogSpdySettingsParameter() {} 102 const spdy::SpdySettings settings_; 103 104 DISALLOW_COPY_AND_ASSIGN(NetLogSpdySettingsParameter); 105 }; 106 107 class NetLogSpdyWindowUpdateParameter : public NetLog::EventParameters { 108 public: 109 NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id, 110 int delta, 111 int window_size) 112 : stream_id_(stream_id), delta_(delta), window_size_(window_size) {} 113 114 virtual Value* ToValue() const { 115 DictionaryValue* dict = new DictionaryValue(); 116 dict->SetInteger("stream_id", static_cast<int>(stream_id_)); 117 dict->SetInteger("delta", delta_); 118 dict->SetInteger("window_size", window_size_); 119 return dict; 120 } 121 122 private: 123 ~NetLogSpdyWindowUpdateParameter() {} 124 const spdy::SpdyStreamId stream_id_; 125 const int delta_; 126 const int window_size_; 127 128 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyWindowUpdateParameter); 129 }; 130 131 class NetLogSpdyDataParameter : public NetLog::EventParameters { 132 public: 133 NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id, 134 int size, 135 spdy::SpdyDataFlags flags) 136 : stream_id_(stream_id), size_(size), flags_(flags) {} 137 138 virtual Value* ToValue() const { 139 DictionaryValue* dict = new DictionaryValue(); 140 dict->SetInteger("stream_id", static_cast<int>(stream_id_)); 141 dict->SetInteger("size", size_); 142 dict->SetInteger("flags", static_cast<int>(flags_)); 143 return dict; 144 } 145 146 private: 147 ~NetLogSpdyDataParameter() {} 148 const spdy::SpdyStreamId stream_id_; 149 const int size_; 150 const spdy::SpdyDataFlags flags_; 151 152 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyDataParameter); 153 }; 154 155 class NetLogSpdyRstParameter : public NetLog::EventParameters { 156 public: 157 NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id, int status) 158 : stream_id_(stream_id), status_(status) {} 159 160 virtual Value* ToValue() const { 161 DictionaryValue* dict = new DictionaryValue(); 162 dict->SetInteger("stream_id", static_cast<int>(stream_id_)); 163 dict->SetInteger("status", status_); 164 return dict; 165 } 166 167 private: 168 ~NetLogSpdyRstParameter() {} 169 const spdy::SpdyStreamId stream_id_; 170 const int status_; 171 172 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyRstParameter); 173 }; 174 175 class NetLogSpdyPingParameter : public NetLog::EventParameters { 176 public: 177 explicit NetLogSpdyPingParameter(uint32 unique_id) : unique_id_(unique_id) {} 178 179 virtual Value* ToValue() const { 180 DictionaryValue* dict = new DictionaryValue(); 181 dict->SetInteger("unique_id", unique_id_); 182 return dict; 183 } 184 185 private: 186 ~NetLogSpdyPingParameter() {} 187 const uint32 unique_id_; 188 189 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyPingParameter); 190 }; 191 192 class NetLogSpdyGoAwayParameter : public NetLog::EventParameters { 193 public: 194 NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id, 195 int active_streams, 196 int unclaimed_streams) 197 : last_stream_id_(last_stream_id), 198 active_streams_(active_streams), 199 unclaimed_streams_(unclaimed_streams) {} 200 201 virtual Value* ToValue() const { 202 DictionaryValue* dict = new DictionaryValue(); 203 dict->SetInteger("last_accepted_stream_id", 204 static_cast<int>(last_stream_id_)); 205 dict->SetInteger("active_streams", active_streams_); 206 dict->SetInteger("unclaimed_streams", unclaimed_streams_); 207 return dict; 208 } 209 210 private: 211 ~NetLogSpdyGoAwayParameter() {} 212 const spdy::SpdyStreamId last_stream_id_; 213 const int active_streams_; 214 const int unclaimed_streams_; 215 216 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyGoAwayParameter); 217 }; 218 219 } // namespace 220 221 // static 222 bool SpdySession::use_ssl_ = true; 223 224 // static 225 bool SpdySession::use_flow_control_ = false; 226 227 // static 228 size_t SpdySession::max_concurrent_stream_limit_ = 256; 229 230 // static 231 bool SpdySession::enable_ping_based_connection_checking_ = true; 232 233 // static 234 int SpdySession::connection_at_risk_of_loss_ms_ = 0; 235 236 // static 237 int SpdySession::trailing_ping_delay_time_ms_ = 1000; 238 239 // static 240 int SpdySession::hung_interval_ms_ = 10000; 241 242 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, 243 SpdySessionPool* spdy_session_pool, 244 SpdySettingsStorage* spdy_settings, 245 NetLog* net_log) 246 : ALLOW_THIS_IN_INITIALIZER_LIST( 247 read_callback_(this, &SpdySession::OnReadComplete)), 248 ALLOW_THIS_IN_INITIALIZER_LIST( 249 write_callback_(this, &SpdySession::OnWriteComplete)), 250 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)), 251 host_port_proxy_pair_(host_port_proxy_pair), 252 spdy_session_pool_(spdy_session_pool), 253 spdy_settings_(spdy_settings), 254 connection_(new ClientSocketHandle), 255 read_buffer_(new IOBuffer(kReadBufferSize)), 256 read_pending_(false), 257 stream_hi_water_mark_(1), // Always start at 1 for the first stream id. 258 write_pending_(false), 259 delayed_write_pending_(false), 260 is_secure_(false), 261 certificate_error_code_(OK), 262 error_(OK), 263 state_(IDLE), 264 max_concurrent_streams_(kDefaultMaxConcurrentStreams), 265 streams_initiated_count_(0), 266 streams_pushed_count_(0), 267 streams_pushed_and_claimed_count_(0), 268 streams_abandoned_count_(0), 269 frames_received_(0), 270 bytes_received_(0), 271 sent_settings_(false), 272 received_settings_(false), 273 stalled_streams_(0), 274 pings_in_flight_(0), 275 next_ping_id_(1), 276 received_data_time_(base::TimeTicks::Now()), 277 trailing_ping_pending_(false), 278 check_ping_status_pending_(false), 279 need_to_send_ping_(false), 280 initial_send_window_size_(spdy::kSpdyStreamInitialWindowSize), 281 initial_recv_window_size_(spdy::kSpdyStreamInitialWindowSize), 282 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)) { 283 DCHECK(HttpStreamFactory::spdy_enabled()); 284 net_log_.BeginEvent( 285 NetLog::TYPE_SPDY_SESSION, 286 make_scoped_refptr( 287 new NetLogSpdySessionParameter(host_port_proxy_pair_))); 288 289 // TODO(mbelshe): consider randomization of the stream_hi_water_mark. 290 291 spdy_framer_.set_visitor(this); 292 293 SendSettings(); 294 } 295 296 SpdySession::~SpdySession() { 297 if (state_ != CLOSED) { 298 state_ = CLOSED; 299 300 // Cleanup all the streams. 301 CloseAllStreams(net::ERR_ABORTED); 302 } 303 304 if (connection_->is_initialized()) { 305 // With Spdy we can't recycle sockets. 306 connection_->socket()->Disconnect(); 307 } 308 309 // Streams should all be gone now. 310 DCHECK_EQ(0u, num_active_streams()); 311 DCHECK_EQ(0u, num_unclaimed_pushed_streams()); 312 313 DCHECK(pending_callback_map_.empty()); 314 315 RecordHistograms(); 316 317 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION, NULL); 318 } 319 320 net::Error SpdySession::InitializeWithSocket( 321 ClientSocketHandle* connection, 322 bool is_secure, 323 int certificate_error_code) { 324 base::StatsCounter spdy_sessions("spdy.sessions"); 325 spdy_sessions.Increment(); 326 327 state_ = CONNECTED; 328 connection_.reset(connection); 329 is_secure_ = is_secure; 330 certificate_error_code_ = certificate_error_code; 331 332 // Write out any data that we might have to send, such as the settings frame. 333 WriteSocketLater(); 334 net::Error error = ReadSocket(); 335 if (error == ERR_IO_PENDING) 336 return OK; 337 return error; 338 } 339 340 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { 341 if (state_ != CONNECTED) 342 return false; 343 344 SSLInfo ssl_info; 345 bool was_npn_negotiated; 346 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated)) 347 return true; // This is not a secure session, so all domains are okay. 348 349 return ssl_info.cert->VerifyNameMatch(domain); 350 } 351 352 int SpdySession::GetPushStream( 353 const GURL& url, 354 scoped_refptr<SpdyStream>* stream, 355 const BoundNetLog& stream_net_log) { 356 CHECK_NE(state_, CLOSED); 357 358 *stream = NULL; 359 360 // Don't allow access to secure push streams over an unauthenticated, but 361 // encrypted SSL socket. 362 if (is_secure_ && certificate_error_code_ != OK && 363 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 364 LOG(ERROR) << "Tried to get pushed spdy stream for secure content over an " 365 << "unauthenticated session."; 366 CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true); 367 return ERR_SPDY_PROTOCOL_ERROR; 368 } 369 370 *stream = GetActivePushStream(url.spec()); 371 if (stream->get()) { 372 DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_); 373 streams_pushed_and_claimed_count_++; 374 return OK; 375 } 376 return 0; 377 } 378 379 int SpdySession::CreateStream( 380 const GURL& url, 381 RequestPriority priority, 382 scoped_refptr<SpdyStream>* spdy_stream, 383 const BoundNetLog& stream_net_log, 384 CompletionCallback* callback) { 385 if (!max_concurrent_streams_ || 386 active_streams_.size() < max_concurrent_streams_) { 387 return CreateStreamImpl(url, priority, spdy_stream, stream_net_log); 388 } 389 390 stalled_streams_++; 391 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL); 392 create_stream_queues_[priority].push( 393 PendingCreateStream(url, priority, spdy_stream, 394 stream_net_log, callback)); 395 return ERR_IO_PENDING; 396 } 397 398 void SpdySession::ProcessPendingCreateStreams() { 399 while (!max_concurrent_streams_ || 400 active_streams_.size() < max_concurrent_streams_) { 401 bool no_pending_create_streams = true; 402 for (int i = 0;i < NUM_PRIORITIES;++i) { 403 if (!create_stream_queues_[i].empty()) { 404 PendingCreateStream pending_create = create_stream_queues_[i].front(); 405 create_stream_queues_[i].pop(); 406 no_pending_create_streams = false; 407 int error = CreateStreamImpl(*pending_create.url, 408 pending_create.priority, 409 pending_create.spdy_stream, 410 *pending_create.stream_net_log); 411 scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream; 412 DCHECK(!ContainsKey(pending_callback_map_, stream)); 413 pending_callback_map_[stream] = 414 CallbackResultPair(pending_create.callback, error); 415 MessageLoop::current()->PostTask( 416 FROM_HERE, 417 method_factory_.NewRunnableMethod( 418 &SpdySession::InvokeUserStreamCreationCallback, stream)); 419 break; 420 } 421 } 422 if (no_pending_create_streams) 423 return; // there were no streams in any queue 424 } 425 } 426 427 void SpdySession::CancelPendingCreateStreams( 428 const scoped_refptr<SpdyStream>* spdy_stream) { 429 PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream); 430 if (it != pending_callback_map_.end()) { 431 pending_callback_map_.erase(it); 432 return; 433 } 434 435 for (int i = 0;i < NUM_PRIORITIES;++i) { 436 PendingCreateStreamQueue tmp; 437 // Make a copy removing this trans 438 while (!create_stream_queues_[i].empty()) { 439 PendingCreateStream pending_create = create_stream_queues_[i].front(); 440 create_stream_queues_[i].pop(); 441 if (pending_create.spdy_stream != spdy_stream) 442 tmp.push(pending_create); 443 } 444 // Now copy it back 445 while (!tmp.empty()) { 446 create_stream_queues_[i].push(tmp.front()); 447 tmp.pop(); 448 } 449 } 450 } 451 452 int SpdySession::CreateStreamImpl( 453 const GURL& url, 454 RequestPriority priority, 455 scoped_refptr<SpdyStream>* spdy_stream, 456 const BoundNetLog& stream_net_log) { 457 // Make sure that we don't try to send https/wss over an unauthenticated, but 458 // encrypted SSL socket. 459 if (is_secure_ && certificate_error_code_ != OK && 460 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 461 LOG(ERROR) << "Tried to create spdy stream for secure content over an " 462 << "unauthenticated session."; 463 CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true); 464 return ERR_SPDY_PROTOCOL_ERROR; 465 } 466 467 const std::string& path = url.PathForRequest(); 468 469 const spdy::SpdyStreamId stream_id = GetNewStreamId(); 470 471 *spdy_stream = new SpdyStream(this, 472 stream_id, 473 false, 474 stream_net_log); 475 const scoped_refptr<SpdyStream>& stream = *spdy_stream; 476 477 stream->set_priority(priority); 478 stream->set_path(path); 479 stream->set_send_window_size(initial_send_window_size_); 480 stream->set_recv_window_size(initial_recv_window_size_); 481 ActivateStream(stream); 482 483 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", 484 static_cast<int>(priority), 0, 10, 11); 485 486 // TODO(mbelshe): Optimize memory allocations 487 DCHECK(priority >= net::HIGHEST && priority < net::NUM_PRIORITIES); 488 489 DCHECK_EQ(active_streams_[stream_id].get(), stream.get()); 490 return OK; 491 } 492 493 int SpdySession::WriteSynStream( 494 spdy::SpdyStreamId stream_id, 495 RequestPriority priority, 496 spdy::SpdyControlFlags flags, 497 const linked_ptr<spdy::SpdyHeaderBlock>& headers) { 498 // Find our stream 499 if (!IsStreamActive(stream_id)) 500 return ERR_INVALID_SPDY_STREAM; 501 const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; 502 CHECK_EQ(stream->stream_id(), stream_id); 503 504 SendPrefacePingIfNoneInFlight(); 505 506 scoped_ptr<spdy::SpdySynStreamControlFrame> syn_frame( 507 spdy_framer_.CreateSynStream( 508 stream_id, 0, 509 ConvertRequestPriorityToSpdyPriority(priority), 510 flags, false, headers.get())); 511 QueueFrame(syn_frame.get(), priority, stream); 512 513 base::StatsCounter spdy_requests("spdy.requests"); 514 spdy_requests.Increment(); 515 streams_initiated_count_++; 516 517 if (net_log().IsLoggingAllEvents()) { 518 net_log().AddEvent( 519 NetLog::TYPE_SPDY_SESSION_SYN_STREAM, 520 make_scoped_refptr( 521 new NetLogSpdySynParameter(headers, flags, stream_id, 0))); 522 } 523 524 // Some servers don't like too many pings, so we limit our current sending to 525 // no more than one ping for any syn sent. To do this, we avoid ever setting 526 // this to true unless we send a syn (which we have just done). This approach 527 // may change over time as servers change their responses to pings. 528 need_to_send_ping_ = true; 529 530 return ERR_IO_PENDING; 531 } 532 533 int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id, 534 net::IOBuffer* data, int len, 535 spdy::SpdyDataFlags flags) { 536 // Find our stream 537 DCHECK(IsStreamActive(stream_id)); 538 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 539 CHECK_EQ(stream->stream_id(), stream_id); 540 if (!stream) 541 return ERR_INVALID_SPDY_STREAM; 542 543 SendPrefacePingIfNoneInFlight(); 544 545 if (len > kMaxSpdyFrameChunkSize) { 546 len = kMaxSpdyFrameChunkSize; 547 flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN); 548 } 549 550 // Obey send window size of the stream if flow control is enabled. 551 if (use_flow_control_) { 552 if (stream->send_window_size() <= 0) { 553 // Because we queue frames onto the session, it is possible that 554 // a stream was not flow controlled at the time it attempted the 555 // write, but when we go to fulfill the write, it is now flow 556 // controlled. This is why we need the session to mark the stream 557 // as stalled - because only the session knows for sure when the 558 // stall occurs. 559 stream->set_stalled_by_flow_control(true); 560 net_log().AddEvent( 561 NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW, 562 make_scoped_refptr( 563 new NetLogIntegerParameter("stream_id", stream_id))); 564 return ERR_IO_PENDING; 565 } 566 int new_len = std::min(len, stream->send_window_size()); 567 if (new_len < len) { 568 len = new_len; 569 flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN); 570 } 571 stream->DecreaseSendWindowSize(len); 572 } 573 574 if (net_log().IsLoggingAllEvents()) { 575 net_log().AddEvent( 576 NetLog::TYPE_SPDY_SESSION_SEND_DATA, 577 make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags))); 578 } 579 580 // TODO(mbelshe): reduce memory copies here. 581 scoped_ptr<spdy::SpdyDataFrame> frame( 582 spdy_framer_.CreateDataFrame(stream_id, data->data(), len, flags)); 583 QueueFrame(frame.get(), stream->priority(), stream); 584 return ERR_IO_PENDING; 585 } 586 587 void SpdySession::CloseStream(spdy::SpdyStreamId stream_id, int status) { 588 // TODO(mbelshe): We should send a RST_STREAM control frame here 589 // so that the server can cancel a large send. 590 591 DeleteStream(stream_id, status); 592 } 593 594 void SpdySession::ResetStream( 595 spdy::SpdyStreamId stream_id, spdy::SpdyStatusCodes status) { 596 597 net_log().AddEvent( 598 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM, 599 make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status))); 600 601 scoped_ptr<spdy::SpdyRstStreamControlFrame> rst_frame( 602 spdy_framer_.CreateRstStream(stream_id, status)); 603 604 // Default to lowest priority unless we know otherwise. 605 int priority = 3; 606 if(IsStreamActive(stream_id)) { 607 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 608 priority = stream->priority(); 609 } 610 QueueFrame(rst_frame.get(), priority, NULL); 611 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); 612 } 613 614 bool SpdySession::IsStreamActive(spdy::SpdyStreamId stream_id) const { 615 return ContainsKey(active_streams_, stream_id); 616 } 617 618 LoadState SpdySession::GetLoadState() const { 619 // NOTE: The application only queries the LoadState via the 620 // SpdyNetworkTransaction, and details are only needed when 621 // we're in the process of connecting. 622 623 // If we're connecting, defer to the connection to give us the actual 624 // LoadState. 625 if (state_ == CONNECTING) 626 return connection_->GetLoadState(); 627 628 // Just report that we're idle since the session could be doing 629 // many things concurrently. 630 return LOAD_STATE_IDLE; 631 } 632 633 void SpdySession::OnReadComplete(int bytes_read) { 634 // Parse a frame. For now this code requires that the frame fit into our 635 // buffer (32KB). 636 // TODO(mbelshe): support arbitrarily large frames! 637 638 read_pending_ = false; 639 640 if (bytes_read <= 0) { 641 // Session is tearing down. 642 net::Error error = static_cast<net::Error>(bytes_read); 643 if (bytes_read == 0) 644 error = ERR_CONNECTION_CLOSED; 645 CloseSessionOnError(error, true); 646 return; 647 } 648 649 bytes_received_ += bytes_read; 650 651 received_data_time_ = base::TimeTicks::Now(); 652 653 // The SpdyFramer will use callbacks onto |this| as it parses frames. 654 // When errors occur, those callbacks can lead to teardown of all references 655 // to |this|, so maintain a reference to self during this call for safe 656 // cleanup. 657 scoped_refptr<SpdySession> self(this); 658 659 char *data = read_buffer_->data(); 660 while (bytes_read && 661 spdy_framer_.error_code() == spdy::SpdyFramer::SPDY_NO_ERROR) { 662 uint32 bytes_processed = spdy_framer_.ProcessInput(data, bytes_read); 663 bytes_read -= bytes_processed; 664 data += bytes_processed; 665 if (spdy_framer_.state() == spdy::SpdyFramer::SPDY_DONE) 666 spdy_framer_.Reset(); 667 } 668 669 if (state_ != CLOSED) 670 ReadSocket(); 671 } 672 673 void SpdySession::OnWriteComplete(int result) { 674 DCHECK(write_pending_); 675 DCHECK(in_flight_write_.size()); 676 677 write_pending_ = false; 678 679 scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); 680 681 if (result >= 0) { 682 // It should not be possible to have written more bytes than our 683 // in_flight_write_. 684 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); 685 686 in_flight_write_.buffer()->DidConsume(result); 687 688 // We only notify the stream when we've fully written the pending frame. 689 if (!in_flight_write_.buffer()->BytesRemaining()) { 690 if (stream) { 691 // Report the number of bytes written to the caller, but exclude the 692 // frame size overhead. NOTE: if this frame was compressed the 693 // reported bytes written is the compressed size, not the original 694 // size. 695 if (result > 0) { 696 result = in_flight_write_.buffer()->size(); 697 DCHECK_GE(result, static_cast<int>(spdy::SpdyFrame::size())); 698 result -= static_cast<int>(spdy::SpdyFrame::size()); 699 } 700 701 // It is possible that the stream was cancelled while we were writing 702 // to the socket. 703 if (!stream->cancelled()) 704 stream->OnWriteComplete(result); 705 } 706 707 // Cleanup the write which just completed. 708 in_flight_write_.release(); 709 } 710 711 // Write more data. We're already in a continuation, so we can 712 // go ahead and write it immediately (without going back to the 713 // message loop). 714 WriteSocketLater(); 715 } else { 716 in_flight_write_.release(); 717 718 // The stream is now errored. Close it down. 719 CloseSessionOnError(static_cast<net::Error>(result), true); 720 } 721 } 722 723 net::Error SpdySession::ReadSocket() { 724 if (read_pending_) 725 return OK; 726 727 if (state_ == CLOSED) { 728 NOTREACHED(); 729 return ERR_UNEXPECTED; 730 } 731 732 CHECK(connection_.get()); 733 CHECK(connection_->socket()); 734 int bytes_read = connection_->socket()->Read(read_buffer_.get(), 735 kReadBufferSize, 736 &read_callback_); 737 switch (bytes_read) { 738 case 0: 739 // Socket is closed! 740 CloseSessionOnError(ERR_CONNECTION_CLOSED, true); 741 return ERR_CONNECTION_CLOSED; 742 case net::ERR_IO_PENDING: 743 // Waiting for data. Nothing to do now. 744 read_pending_ = true; 745 return ERR_IO_PENDING; 746 default: 747 // Data was read, process it. 748 // Schedule the work through the message loop to avoid recursive 749 // callbacks. 750 read_pending_ = true; 751 MessageLoop::current()->PostTask( 752 FROM_HERE, 753 method_factory_.NewRunnableMethod( 754 &SpdySession::OnReadComplete, bytes_read)); 755 break; 756 } 757 return OK; 758 } 759 760 void SpdySession::WriteSocketLater() { 761 if (delayed_write_pending_) 762 return; 763 764 if (state_ < CONNECTED) 765 return; 766 767 delayed_write_pending_ = true; 768 MessageLoop::current()->PostTask( 769 FROM_HERE, 770 method_factory_.NewRunnableMethod(&SpdySession::WriteSocket)); 771 } 772 773 void SpdySession::WriteSocket() { 774 // This function should only be called via WriteSocketLater. 775 DCHECK(delayed_write_pending_); 776 delayed_write_pending_ = false; 777 778 // If the socket isn't connected yet, just wait; we'll get called 779 // again when the socket connection completes. If the socket is 780 // closed, just return. 781 if (state_ < CONNECTED || state_ == CLOSED) 782 return; 783 784 if (write_pending_) // Another write is in progress still. 785 return; 786 787 // Loop sending frames until we've sent everything or until the write 788 // returns error (or ERR_IO_PENDING). 789 while (in_flight_write_.buffer() || !queue_.empty()) { 790 if (!in_flight_write_.buffer()) { 791 // Grab the next SpdyFrame to send. 792 SpdyIOBuffer next_buffer = queue_.top(); 793 queue_.pop(); 794 795 // We've deferred compression until just before we write it to the socket, 796 // which is now. At this time, we don't compress our data frames. 797 spdy::SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false); 798 size_t size; 799 if (spdy_framer_.IsCompressible(uncompressed_frame)) { 800 scoped_ptr<spdy::SpdyFrame> compressed_frame( 801 spdy_framer_.CompressFrame(uncompressed_frame)); 802 if (!compressed_frame.get()) { 803 LOG(ERROR) << "SPDY Compression failure"; 804 CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true); 805 return; 806 } 807 808 size = compressed_frame->length() + spdy::SpdyFrame::size(); 809 810 DCHECK_GT(size, 0u); 811 812 // TODO(mbelshe): We have too much copying of data here. 813 IOBufferWithSize* buffer = new IOBufferWithSize(size); 814 memcpy(buffer->data(), compressed_frame->data(), size); 815 816 // Attempt to send the frame. 817 in_flight_write_ = SpdyIOBuffer(buffer, size, 0, next_buffer.stream()); 818 } else { 819 size = uncompressed_frame.length() + spdy::SpdyFrame::size(); 820 in_flight_write_ = next_buffer; 821 } 822 } else { 823 DCHECK(in_flight_write_.buffer()->BytesRemaining()); 824 } 825 826 write_pending_ = true; 827 int rv = connection_->socket()->Write(in_flight_write_.buffer(), 828 in_flight_write_.buffer()->BytesRemaining(), &write_callback_); 829 if (rv == net::ERR_IO_PENDING) 830 break; 831 832 // We sent the frame successfully. 833 OnWriteComplete(rv); 834 835 // TODO(mbelshe): Test this error case. Maybe we should mark the socket 836 // as in an error state. 837 if (rv < 0) 838 break; 839 } 840 } 841 842 void SpdySession::CloseAllStreams(net::Error status) { 843 base::StatsCounter abandoned_streams("spdy.abandoned_streams"); 844 base::StatsCounter abandoned_push_streams( 845 "spdy.abandoned_push_streams"); 846 847 if (!active_streams_.empty()) 848 abandoned_streams.Add(active_streams_.size()); 849 if (!unclaimed_pushed_streams_.empty()) { 850 streams_abandoned_count_ += unclaimed_pushed_streams_.size(); 851 abandoned_push_streams.Add(unclaimed_pushed_streams_.size()); 852 unclaimed_pushed_streams_.clear(); 853 } 854 855 for (int i = 0;i < NUM_PRIORITIES;++i) { 856 while (!create_stream_queues_[i].empty()) { 857 PendingCreateStream pending_create = create_stream_queues_[i].front(); 858 create_stream_queues_[i].pop(); 859 pending_create.callback->Run(ERR_ABORTED); 860 } 861 } 862 863 while (!active_streams_.empty()) { 864 ActiveStreamMap::iterator it = active_streams_.begin(); 865 const scoped_refptr<SpdyStream>& stream = it->second; 866 DCHECK(stream); 867 LOG(WARNING) << "ABANDONED (stream_id=" << stream->stream_id() 868 << "): " << stream->path(); 869 DeleteStream(stream->stream_id(), status); 870 } 871 872 // We also need to drain the queue. 873 while (queue_.size()) 874 queue_.pop(); 875 } 876 877 int SpdySession::GetNewStreamId() { 878 int id = stream_hi_water_mark_; 879 stream_hi_water_mark_ += 2; 880 if (stream_hi_water_mark_ > 0x7fff) 881 stream_hi_water_mark_ = 1; 882 return id; 883 } 884 885 void SpdySession::QueueFrame(spdy::SpdyFrame* frame, 886 spdy::SpdyPriority priority, 887 SpdyStream* stream) { 888 int length = spdy::SpdyFrame::size() + frame->length(); 889 IOBuffer* buffer = new IOBuffer(length); 890 memcpy(buffer->data(), frame->data(), length); 891 queue_.push(SpdyIOBuffer(buffer, length, priority, stream)); 892 893 WriteSocketLater(); 894 } 895 896 void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool) { 897 // Closing all streams can have a side-effect of dropping the last reference 898 // to |this|. Hold a reference through this function. 899 scoped_refptr<SpdySession> self(this); 900 901 DCHECK_LT(err, OK); 902 net_log_.AddEvent( 903 NetLog::TYPE_SPDY_SESSION_CLOSE, 904 make_scoped_refptr(new NetLogIntegerParameter("status", err))); 905 906 // Don't close twice. This can occur because we can have both 907 // a read and a write outstanding, and each can complete with 908 // an error. 909 if (state_ != CLOSED) { 910 state_ = CLOSED; 911 error_ = err; 912 if (remove_from_pool) 913 RemoveFromPool(); 914 CloseAllStreams(err); 915 } 916 } 917 918 Value* SpdySession::GetInfoAsValue() const { 919 DictionaryValue* dict = new DictionaryValue(); 920 921 dict->SetInteger("source_id", net_log_.source().id); 922 923 dict->SetString("host_port_pair", host_port_proxy_pair_.first.ToString()); 924 dict->SetString("proxy", host_port_proxy_pair_.second.ToURI()); 925 926 dict->SetInteger("active_streams", active_streams_.size()); 927 928 dict->SetInteger("unclaimed_pushed_streams", 929 unclaimed_pushed_streams_.size()); 930 931 dict->SetBoolean("is_secure", is_secure_); 932 933 dict->SetInteger("error", error_); 934 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_); 935 936 dict->SetInteger("streams_initiated_count", streams_initiated_count_); 937 dict->SetInteger("streams_pushed_count", streams_pushed_count_); 938 dict->SetInteger("streams_pushed_and_claimed_count", 939 streams_pushed_and_claimed_count_); 940 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_); 941 dict->SetInteger("frames_received", frames_received_); 942 943 dict->SetBoolean("sent_settings", sent_settings_); 944 dict->SetBoolean("received_settings", received_settings_); 945 return dict; 946 } 947 948 int SpdySession::GetPeerAddress(AddressList* address) const { 949 if (!connection_->socket()) 950 return ERR_SOCKET_NOT_CONNECTED; 951 952 return connection_->socket()->GetPeerAddress(address); 953 } 954 955 int SpdySession::GetLocalAddress(IPEndPoint* address) const { 956 if (!connection_->socket()) 957 return ERR_SOCKET_NOT_CONNECTED; 958 959 return connection_->socket()->GetLocalAddress(address); 960 } 961 962 void SpdySession::ActivateStream(SpdyStream* stream) { 963 const spdy::SpdyStreamId id = stream->stream_id(); 964 DCHECK(!IsStreamActive(id)); 965 966 active_streams_[id] = stream; 967 } 968 969 void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) { 970 // For push streams, if they are being deleted normally, we leave 971 // the stream in the unclaimed_pushed_streams_ list. However, if 972 // the stream is errored out, clean it up entirely. 973 if (status != OK) { 974 PushedStreamMap::iterator it; 975 for (it = unclaimed_pushed_streams_.begin(); 976 it != unclaimed_pushed_streams_.end(); ++it) { 977 scoped_refptr<SpdyStream> curr = it->second; 978 if (id == curr->stream_id()) { 979 unclaimed_pushed_streams_.erase(it); 980 break; 981 } 982 } 983 } 984 985 // The stream might have been deleted. 986 ActiveStreamMap::iterator it2 = active_streams_.find(id); 987 if (it2 == active_streams_.end()) 988 return; 989 990 // If this is an active stream, call the callback. 991 const scoped_refptr<SpdyStream> stream(it2->second); 992 active_streams_.erase(it2); 993 if (stream) 994 stream->OnClose(status); 995 ProcessPendingCreateStreams(); 996 } 997 998 void SpdySession::RemoveFromPool() { 999 if (spdy_session_pool_) { 1000 spdy_session_pool_->Remove(make_scoped_refptr(this)); 1001 spdy_session_pool_ = NULL; 1002 } 1003 } 1004 1005 scoped_refptr<SpdyStream> SpdySession::GetActivePushStream( 1006 const std::string& path) { 1007 base::StatsCounter used_push_streams("spdy.claimed_push_streams"); 1008 1009 PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(path); 1010 if (it != unclaimed_pushed_streams_.end()) { 1011 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL); 1012 scoped_refptr<SpdyStream> stream = it->second; 1013 unclaimed_pushed_streams_.erase(it); 1014 used_push_streams.Increment(); 1015 return stream; 1016 } 1017 return NULL; 1018 } 1019 1020 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) { 1021 if (is_secure_) { 1022 SSLClientSocket* ssl_socket = 1023 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 1024 ssl_socket->GetSSLInfo(ssl_info); 1025 *was_npn_negotiated = ssl_socket->was_npn_negotiated(); 1026 return true; 1027 } 1028 return false; 1029 } 1030 1031 bool SpdySession::GetSSLCertRequestInfo( 1032 SSLCertRequestInfo* cert_request_info) { 1033 if (is_secure_) { 1034 SSLClientSocket* ssl_socket = 1035 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 1036 ssl_socket->GetSSLCertRequestInfo(cert_request_info); 1037 return true; 1038 } 1039 return false; 1040 } 1041 1042 void SpdySession::OnError(spdy::SpdyFramer* framer) { 1043 CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true); 1044 } 1045 1046 void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id, 1047 const char* data, 1048 size_t len) { 1049 if (net_log().IsLoggingAllEvents()) { 1050 net_log().AddEvent( 1051 NetLog::TYPE_SPDY_SESSION_RECV_DATA, 1052 make_scoped_refptr(new NetLogSpdyDataParameter( 1053 stream_id, len, spdy::SpdyDataFlags()))); 1054 } 1055 1056 if (!IsStreamActive(stream_id)) { 1057 // NOTE: it may just be that the stream was cancelled. 1058 LOG(WARNING) << "Received data frame for invalid stream " << stream_id; 1059 return; 1060 } 1061 1062 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1063 stream->OnDataReceived(data, len); 1064 } 1065 1066 bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers, 1067 const scoped_refptr<SpdyStream> stream) { 1068 int rv = OK; 1069 rv = stream->OnResponseReceived(headers); 1070 if (rv < 0) { 1071 DCHECK_NE(rv, ERR_IO_PENDING); 1072 const spdy::SpdyStreamId stream_id = stream->stream_id(); 1073 DeleteStream(stream_id, rv); 1074 return false; 1075 } 1076 return true; 1077 } 1078 1079 void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame, 1080 const linked_ptr<spdy::SpdyHeaderBlock>& headers) { 1081 spdy::SpdyStreamId stream_id = frame.stream_id(); 1082 spdy::SpdyStreamId associated_stream_id = frame.associated_stream_id(); 1083 1084 if (net_log_.IsLoggingAllEvents()) { 1085 net_log_.AddEvent( 1086 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM, 1087 make_scoped_refptr(new NetLogSpdySynParameter( 1088 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()), 1089 stream_id, associated_stream_id))); 1090 } 1091 1092 // Server-initiated streams should have even sequence numbers. 1093 if ((stream_id & 0x1) != 0) { 1094 LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id; 1095 return; 1096 } 1097 1098 if (IsStreamActive(stream_id)) { 1099 LOG(WARNING) << "Received OnSyn for active stream " << stream_id; 1100 return; 1101 } 1102 1103 if (associated_stream_id == 0) { 1104 LOG(WARNING) << "Received invalid OnSyn associated stream id " 1105 << associated_stream_id 1106 << " for stream " << stream_id; 1107 ResetStream(stream_id, spdy::INVALID_STREAM); 1108 return; 1109 } 1110 1111 streams_pushed_count_++; 1112 1113 // TODO(mbelshe): DCHECK that this is a GET method? 1114 1115 // Verify that the response had a URL for us. 1116 const std::string& url = ContainsKey(*headers, "url") ? 1117 headers->find("url")->second : ""; 1118 if (url.empty()) { 1119 ResetStream(stream_id, spdy::PROTOCOL_ERROR); 1120 LOG(WARNING) << "Pushed stream did not contain a url."; 1121 return; 1122 } 1123 1124 GURL gurl(url); 1125 if (!gurl.is_valid()) { 1126 ResetStream(stream_id, spdy::PROTOCOL_ERROR); 1127 LOG(WARNING) << "Pushed stream url was invalid: " << url; 1128 return; 1129 } 1130 1131 // Verify we have a valid stream association. 1132 if (!IsStreamActive(associated_stream_id)) { 1133 LOG(WARNING) << "Received OnSyn with inactive associated stream " 1134 << associated_stream_id; 1135 ResetStream(stream_id, spdy::INVALID_ASSOCIATED_STREAM); 1136 return; 1137 } 1138 1139 scoped_refptr<SpdyStream> associated_stream = 1140 active_streams_[associated_stream_id]; 1141 GURL associated_url(associated_stream->GetUrl()); 1142 if (associated_url.GetOrigin() != gurl.GetOrigin()) { 1143 LOG(WARNING) << "Rejected Cross Origin Push Stream " 1144 << associated_stream_id; 1145 ResetStream(stream_id, spdy::REFUSED_STREAM); 1146 return; 1147 } 1148 1149 // There should not be an existing pushed stream with the same path. 1150 PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url); 1151 if (it != unclaimed_pushed_streams_.end()) { 1152 LOG(WARNING) << "Received duplicate pushed stream with url: " << url; 1153 ResetStream(stream_id, spdy::PROTOCOL_ERROR); 1154 return; 1155 } 1156 1157 scoped_refptr<SpdyStream> stream( 1158 new SpdyStream(this, stream_id, true, net_log_)); 1159 1160 stream->set_path(gurl.PathForRequest()); 1161 1162 unclaimed_pushed_streams_[url] = stream; 1163 1164 ActivateStream(stream); 1165 stream->set_response_received(); 1166 1167 // Parse the headers. 1168 if (!Respond(*headers, stream)) 1169 return; 1170 1171 base::StatsCounter push_requests("spdy.pushed_streams"); 1172 push_requests.Increment(); 1173 } 1174 1175 void SpdySession::OnSynReply(const spdy::SpdySynReplyControlFrame& frame, 1176 const linked_ptr<spdy::SpdyHeaderBlock>& headers) { 1177 spdy::SpdyStreamId stream_id = frame.stream_id(); 1178 1179 bool valid_stream = IsStreamActive(stream_id); 1180 if (!valid_stream) { 1181 // NOTE: it may just be that the stream was cancelled. 1182 LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id; 1183 return; 1184 } 1185 1186 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1187 CHECK_EQ(stream->stream_id(), stream_id); 1188 CHECK(!stream->cancelled()); 1189 1190 if (stream->response_received()) { 1191 LOG(WARNING) << "Received duplicate SYN_REPLY for stream " << stream_id; 1192 CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR); 1193 return; 1194 } 1195 stream->set_response_received(); 1196 1197 if (net_log().IsLoggingAllEvents()) { 1198 net_log().AddEvent( 1199 NetLog::TYPE_SPDY_SESSION_SYN_REPLY, 1200 make_scoped_refptr(new NetLogSpdySynParameter( 1201 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()), 1202 stream_id, 0))); 1203 } 1204 1205 Respond(*headers, stream); 1206 } 1207 1208 void SpdySession::OnHeaders(const spdy::SpdyHeadersControlFrame& frame, 1209 const linked_ptr<spdy::SpdyHeaderBlock>& headers) { 1210 spdy::SpdyStreamId stream_id = frame.stream_id(); 1211 1212 bool valid_stream = IsStreamActive(stream_id); 1213 if (!valid_stream) { 1214 // NOTE: it may just be that the stream was cancelled. 1215 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id; 1216 return; 1217 } 1218 1219 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1220 CHECK_EQ(stream->stream_id(), stream_id); 1221 CHECK(!stream->cancelled()); 1222 1223 if (net_log().IsLoggingAllEvents()) { 1224 net_log().AddEvent( 1225 NetLog::TYPE_SPDY_SESSION_HEADERS, 1226 make_scoped_refptr(new NetLogSpdySynParameter( 1227 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()), 1228 stream_id, 0))); 1229 } 1230 1231 int rv = stream->OnHeaders(*headers); 1232 if (rv < 0) { 1233 DCHECK_NE(rv, ERR_IO_PENDING); 1234 const spdy::SpdyStreamId stream_id = stream->stream_id(); 1235 DeleteStream(stream_id, rv); 1236 } 1237 } 1238 1239 void SpdySession::OnControl(const spdy::SpdyControlFrame* frame) { 1240 const linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock); 1241 uint32 type = frame->type(); 1242 if (type == spdy::SYN_STREAM || 1243 type == spdy::SYN_REPLY || 1244 type == spdy::HEADERS) { 1245 if (!spdy_framer_.ParseHeaderBlock(frame, headers.get())) { 1246 LOG(WARNING) << "Could not parse Spdy Control Frame Header."; 1247 int stream_id = 0; 1248 if (type == spdy::SYN_STREAM) { 1249 stream_id = (reinterpret_cast<const spdy::SpdySynStreamControlFrame*> 1250 (frame))->stream_id(); 1251 } else if (type == spdy::SYN_REPLY) { 1252 stream_id = (reinterpret_cast<const spdy::SpdySynReplyControlFrame*> 1253 (frame))->stream_id(); 1254 } else if (type == spdy::HEADERS) { 1255 stream_id = (reinterpret_cast<const spdy::SpdyHeadersControlFrame*> 1256 (frame))->stream_id(); 1257 } 1258 if(IsStreamActive(stream_id)) 1259 ResetStream(stream_id, spdy::PROTOCOL_ERROR); 1260 return; 1261 } 1262 } 1263 1264 frames_received_++; 1265 1266 switch (type) { 1267 case spdy::GOAWAY: 1268 OnGoAway(*reinterpret_cast<const spdy::SpdyGoAwayControlFrame*>(frame)); 1269 break; 1270 case spdy::PING: 1271 OnPing(*reinterpret_cast<const spdy::SpdyPingControlFrame*>(frame)); 1272 break; 1273 case spdy::SETTINGS: 1274 OnSettings( 1275 *reinterpret_cast<const spdy::SpdySettingsControlFrame*>(frame)); 1276 break; 1277 case spdy::RST_STREAM: 1278 OnRst(*reinterpret_cast<const spdy::SpdyRstStreamControlFrame*>(frame)); 1279 break; 1280 case spdy::SYN_STREAM: 1281 OnSyn(*reinterpret_cast<const spdy::SpdySynStreamControlFrame*>(frame), 1282 headers); 1283 break; 1284 case spdy::HEADERS: 1285 OnHeaders(*reinterpret_cast<const spdy::SpdyHeadersControlFrame*>(frame), 1286 headers); 1287 break; 1288 case spdy::SYN_REPLY: 1289 OnSynReply( 1290 *reinterpret_cast<const spdy::SpdySynReplyControlFrame*>(frame), 1291 headers); 1292 break; 1293 case spdy::WINDOW_UPDATE: 1294 OnWindowUpdate( 1295 *reinterpret_cast<const spdy::SpdyWindowUpdateControlFrame*>(frame)); 1296 break; 1297 default: 1298 DCHECK(false); // Error! 1299 } 1300 } 1301 1302 bool SpdySession::OnControlFrameHeaderData(spdy::SpdyStreamId stream_id, 1303 const char* header_data, 1304 size_t len) { 1305 DCHECK(false); 1306 return false; 1307 } 1308 1309 void SpdySession::OnDataFrameHeader(const spdy::SpdyDataFrame* frame) { 1310 DCHECK(false); 1311 } 1312 1313 void SpdySession::OnRst(const spdy::SpdyRstStreamControlFrame& frame) { 1314 spdy::SpdyStreamId stream_id = frame.stream_id(); 1315 1316 net_log().AddEvent( 1317 NetLog::TYPE_SPDY_SESSION_RST_STREAM, 1318 make_scoped_refptr( 1319 new NetLogSpdyRstParameter(stream_id, frame.status()))); 1320 1321 bool valid_stream = IsStreamActive(stream_id); 1322 if (!valid_stream) { 1323 // NOTE: it may just be that the stream was cancelled. 1324 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 1325 return; 1326 } 1327 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1328 CHECK_EQ(stream->stream_id(), stream_id); 1329 CHECK(!stream->cancelled()); 1330 1331 if (frame.status() == 0) { 1332 stream->OnDataReceived(NULL, 0); 1333 } else { 1334 LOG(ERROR) << "Spdy stream closed: " << frame.status(); 1335 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 1336 // For now, it doesn't matter much - it is a protocol error. 1337 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); 1338 } 1339 } 1340 1341 void SpdySession::OnGoAway(const spdy::SpdyGoAwayControlFrame& frame) { 1342 net_log_.AddEvent( 1343 NetLog::TYPE_SPDY_SESSION_GOAWAY, 1344 make_scoped_refptr( 1345 new NetLogSpdyGoAwayParameter(frame.last_accepted_stream_id(), 1346 active_streams_.size(), 1347 unclaimed_pushed_streams_.size()))); 1348 RemoveFromPool(); 1349 CloseAllStreams(net::ERR_ABORTED); 1350 1351 // TODO(willchan): Cancel any streams that are past the GoAway frame's 1352 // |last_accepted_stream_id|. 1353 1354 // Don't bother killing any streams that are still reading. They'll either 1355 // complete successfully or get an ERR_CONNECTION_CLOSED when the socket is 1356 // closed. 1357 } 1358 1359 void SpdySession::OnPing(const spdy::SpdyPingControlFrame& frame) { 1360 net_log_.AddEvent( 1361 NetLog::TYPE_SPDY_SESSION_PING, 1362 make_scoped_refptr(new NetLogSpdyPingParameter(frame.unique_id()))); 1363 1364 // Send response to a PING from server. 1365 if (frame.unique_id() % 2 == 0) { 1366 WritePingFrame(frame.unique_id()); 1367 return; 1368 } 1369 1370 --pings_in_flight_; 1371 if (pings_in_flight_ < 0) { 1372 CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true); 1373 return; 1374 } 1375 1376 if (pings_in_flight_ > 0) 1377 return; 1378 1379 if (!need_to_send_ping_) 1380 return; 1381 1382 PlanToSendTrailingPing(); 1383 } 1384 1385 void SpdySession::OnSettings(const spdy::SpdySettingsControlFrame& frame) { 1386 spdy::SpdySettings settings; 1387 if (spdy_framer_.ParseSettings(&frame, &settings)) { 1388 HandleSettings(settings); 1389 spdy_settings_->Set(host_port_pair(), settings); 1390 } 1391 1392 received_settings_ = true; 1393 1394 net_log_.AddEvent( 1395 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS, 1396 make_scoped_refptr(new NetLogSpdySettingsParameter(settings))); 1397 } 1398 1399 void SpdySession::OnWindowUpdate( 1400 const spdy::SpdyWindowUpdateControlFrame& frame) { 1401 spdy::SpdyStreamId stream_id = frame.stream_id(); 1402 if (!IsStreamActive(stream_id)) { 1403 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id; 1404 return; 1405 } 1406 1407 int delta_window_size = static_cast<int>(frame.delta_window_size()); 1408 if (delta_window_size < 1) { 1409 LOG(WARNING) << "Received WINDOW_UPDATE with an invalid delta_window_size " 1410 << delta_window_size; 1411 ResetStream(stream_id, spdy::FLOW_CONTROL_ERROR); 1412 return; 1413 } 1414 1415 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1416 CHECK_EQ(stream->stream_id(), stream_id); 1417 CHECK(!stream->cancelled()); 1418 1419 if (use_flow_control_) 1420 stream->IncreaseSendWindowSize(delta_window_size); 1421 1422 net_log_.AddEvent( 1423 NetLog::TYPE_SPDY_SESSION_SEND_WINDOW_UPDATE, 1424 make_scoped_refptr(new NetLogSpdyWindowUpdateParameter( 1425 stream_id, delta_window_size, stream->send_window_size()))); 1426 } 1427 1428 void SpdySession::SendWindowUpdate(spdy::SpdyStreamId stream_id, 1429 int delta_window_size) { 1430 DCHECK(IsStreamActive(stream_id)); 1431 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1432 CHECK_EQ(stream->stream_id(), stream_id); 1433 1434 net_log_.AddEvent( 1435 NetLog::TYPE_SPDY_SESSION_RECV_WINDOW_UPDATE, 1436 make_scoped_refptr(new NetLogSpdyWindowUpdateParameter( 1437 stream_id, delta_window_size, stream->recv_window_size()))); 1438 1439 scoped_ptr<spdy::SpdyWindowUpdateControlFrame> window_update_frame( 1440 spdy_framer_.CreateWindowUpdate(stream_id, delta_window_size)); 1441 QueueFrame(window_update_frame.get(), stream->priority(), stream); 1442 } 1443 1444 // Given a cwnd that we would have sent to the server, modify it based on the 1445 // field trial policy. 1446 uint32 ApplyCwndFieldTrialPolicy(int cwnd) { 1447 base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd"); 1448 if (!trial) { 1449 LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList"; 1450 return cwnd; 1451 } 1452 if (trial->group_name() == "cwnd10") 1453 return 10; 1454 else if (trial->group_name() == "cwnd16") 1455 return 16; 1456 else if (trial->group_name() == "cwndMin16") 1457 return std::max(cwnd, 16); 1458 else if (trial->group_name() == "cwndMin10") 1459 return std::max(cwnd, 10); 1460 else if (trial->group_name() == "cwndDynamic") 1461 return cwnd; 1462 NOTREACHED(); 1463 return cwnd; 1464 } 1465 1466 void SpdySession::SendSettings() { 1467 // Note: we're copying the settings here, so that we can potentially modify 1468 // the settings for the field trial. When removing the field trial, make 1469 // this a reference to the const SpdySettings again. 1470 spdy::SpdySettings settings = spdy_settings_->Get(host_port_pair()); 1471 if (settings.empty()) 1472 return; 1473 1474 // Record Histogram Data and Apply the SpdyCwnd FieldTrial if applicable. 1475 for (spdy::SpdySettings::iterator i = settings.begin(), 1476 end = settings.end(); i != end; ++i) { 1477 const uint32 id = i->first.id(); 1478 const uint32 val = i->second; 1479 switch (id) { 1480 case spdy::SETTINGS_CURRENT_CWND: 1481 uint32 cwnd = 0; 1482 cwnd = ApplyCwndFieldTrialPolicy(val); 1483 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", 1484 cwnd, 1485 1, 200, 100); 1486 if (cwnd != val) { 1487 i->second = cwnd; 1488 i->first.set_flags(spdy::SETTINGS_FLAG_PLEASE_PERSIST); 1489 spdy_settings_->Set(host_port_pair(), settings); 1490 } 1491 break; 1492 } 1493 } 1494 1495 HandleSettings(settings); 1496 1497 net_log_.AddEvent( 1498 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 1499 make_scoped_refptr(new NetLogSpdySettingsParameter(settings))); 1500 1501 // Create the SETTINGS frame and send it. 1502 scoped_ptr<spdy::SpdySettingsControlFrame> settings_frame( 1503 spdy_framer_.CreateSettings(settings)); 1504 sent_settings_ = true; 1505 QueueFrame(settings_frame.get(), 0, NULL); 1506 } 1507 1508 void SpdySession::HandleSettings(const spdy::SpdySettings& settings) { 1509 for (spdy::SpdySettings::const_iterator i = settings.begin(), 1510 end = settings.end(); i != end; ++i) { 1511 const uint32 id = i->first.id(); 1512 const uint32 val = i->second; 1513 switch (id) { 1514 case spdy::SETTINGS_MAX_CONCURRENT_STREAMS: 1515 max_concurrent_streams_ = std::min(static_cast<size_t>(val), 1516 max_concurrent_stream_limit_); 1517 ProcessPendingCreateStreams(); 1518 break; 1519 } 1520 } 1521 } 1522 1523 void SpdySession::SendPrefacePingIfNoneInFlight() { 1524 if (pings_in_flight_ || trailing_ping_pending_ || 1525 !enable_ping_based_connection_checking_) 1526 return; 1527 1528 const base::TimeDelta kConnectionAtRiskOfLoss = 1529 base::TimeDelta::FromMilliseconds(connection_at_risk_of_loss_ms_); 1530 1531 base::TimeTicks now = base::TimeTicks::Now(); 1532 // If we haven't heard from server, then send a preface-PING. 1533 if ((now - received_data_time_) > kConnectionAtRiskOfLoss) 1534 SendPrefacePing(); 1535 1536 PlanToSendTrailingPing(); 1537 } 1538 1539 void SpdySession::SendPrefacePing() { 1540 // TODO(rtenneti): Send preface pings when more servers support additional 1541 // pings. 1542 // WritePingFrame(next_ping_id_); 1543 } 1544 1545 void SpdySession::PlanToSendTrailingPing() { 1546 if (trailing_ping_pending_) 1547 return; 1548 1549 trailing_ping_pending_ = true; 1550 MessageLoop::current()->PostDelayedTask( 1551 FROM_HERE, 1552 method_factory_.NewRunnableMethod(&SpdySession::SendTrailingPing), 1553 trailing_ping_delay_time_ms_); 1554 } 1555 1556 void SpdySession::SendTrailingPing() { 1557 DCHECK(trailing_ping_pending_); 1558 trailing_ping_pending_ = false; 1559 WritePingFrame(next_ping_id_); 1560 } 1561 1562 void SpdySession::WritePingFrame(uint32 unique_id) { 1563 scoped_ptr<spdy::SpdyPingControlFrame> ping_frame( 1564 spdy_framer_.CreatePingFrame(next_ping_id_)); 1565 QueueFrame(ping_frame.get(), SPDY_PRIORITY_HIGHEST, NULL); 1566 1567 if (net_log().IsLoggingAllEvents()) { 1568 net_log().AddEvent( 1569 NetLog::TYPE_SPDY_SESSION_PING, 1570 make_scoped_refptr(new NetLogSpdyPingParameter(next_ping_id_))); 1571 } 1572 if (unique_id % 2 != 0) { 1573 next_ping_id_ += 2; 1574 ++pings_in_flight_; 1575 need_to_send_ping_ = false; 1576 PlanToCheckPingStatus(); 1577 } 1578 } 1579 1580 void SpdySession::PlanToCheckPingStatus() { 1581 if (check_ping_status_pending_) 1582 return; 1583 1584 check_ping_status_pending_ = true; 1585 MessageLoop::current()->PostDelayedTask( 1586 FROM_HERE, 1587 method_factory_.NewRunnableMethod( 1588 &SpdySession::CheckPingStatus, base::TimeTicks::Now()), 1589 hung_interval_ms_); 1590 } 1591 1592 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) { 1593 // Check if we got a response back for all PINGs we had sent. 1594 if (pings_in_flight_ == 0) { 1595 check_ping_status_pending_ = false; 1596 return; 1597 } 1598 1599 DCHECK(check_ping_status_pending_); 1600 1601 const base::TimeDelta kHungInterval = 1602 base::TimeDelta::FromMilliseconds(hung_interval_ms_); 1603 1604 base::TimeTicks now = base::TimeTicks::Now(); 1605 base::TimeDelta delay = kHungInterval - (now - received_data_time_); 1606 1607 if (delay.InMilliseconds() < 0 || received_data_time_ < last_check_time) { 1608 DCHECK(now - received_data_time_ > kHungInterval); 1609 CloseSessionOnError(net::ERR_SPDY_PING_FAILED, true); 1610 return; 1611 } 1612 1613 // Check the status of connection after a delay. 1614 MessageLoop::current()->PostDelayedTask( 1615 FROM_HERE, 1616 method_factory_.NewRunnableMethod(&SpdySession::CheckPingStatus, now), 1617 delay.InMilliseconds()); 1618 } 1619 1620 void SpdySession::RecordHistograms() { 1621 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession", 1622 streams_initiated_count_, 1623 0, 300, 50); 1624 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession", 1625 streams_pushed_count_, 1626 0, 300, 50); 1627 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession", 1628 streams_pushed_and_claimed_count_, 1629 0, 300, 50); 1630 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession", 1631 streams_abandoned_count_, 1632 0, 300, 50); 1633 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent", 1634 sent_settings_ ? 1 : 0, 2); 1635 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived", 1636 received_settings_ ? 1 : 0, 2); 1637 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession", 1638 stalled_streams_, 1639 0, 300, 50); 1640 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls", 1641 stalled_streams_ > 0 ? 1 : 0, 2); 1642 1643 if (received_settings_) { 1644 // Enumerate the saved settings, and set histograms for it. 1645 const spdy::SpdySettings& settings = spdy_settings_->Get(host_port_pair()); 1646 1647 spdy::SpdySettings::const_iterator it; 1648 for (it = settings.begin(); it != settings.end(); ++it) { 1649 const spdy::SpdySetting setting = *it; 1650 switch (setting.first.id()) { 1651 case spdy::SETTINGS_CURRENT_CWND: 1652 // Record several different histograms to see if cwnd converges 1653 // for larger volumes of data being sent. 1654 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd", 1655 setting.second, 1656 1, 200, 100); 1657 if (bytes_received_ > 10 * 1024) { 1658 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K", 1659 setting.second, 1660 1, 200, 100); 1661 if (bytes_received_ > 25 * 1024) { 1662 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K", 1663 setting.second, 1664 1, 200, 100); 1665 if (bytes_received_ > 50 * 1024) { 1666 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K", 1667 setting.second, 1668 1, 200, 100); 1669 if (bytes_received_ > 100 * 1024) { 1670 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K", 1671 setting.second, 1672 1, 200, 100); 1673 } 1674 } 1675 } 1676 } 1677 break; 1678 case spdy::SETTINGS_ROUND_TRIP_TIME: 1679 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT", 1680 setting.second, 1681 1, 1200, 100); 1682 break; 1683 case spdy::SETTINGS_DOWNLOAD_RETRANS_RATE: 1684 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate", 1685 setting.second, 1686 1, 100, 50); 1687 break; 1688 } 1689 } 1690 } 1691 } 1692 1693 void SpdySession::InvokeUserStreamCreationCallback( 1694 scoped_refptr<SpdyStream>* stream) { 1695 PendingCallbackMap::iterator it = pending_callback_map_.find(stream); 1696 1697 // Exit if the request has already been cancelled. 1698 if (it == pending_callback_map_.end()) 1699 return; 1700 1701 CompletionCallback* callback = it->second.callback; 1702 int result = it->second.result; 1703 pending_callback_map_.erase(it); 1704 callback->Run(result); 1705 } 1706 1707 } // namespace net 1708