1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/spdy/spdy_session.h" 6 7 #include "base/basictypes.h" 8 #include "base/logging.h" 9 #include "base/memory/linked_ptr.h" 10 #include "base/message_loop.h" 11 #include "base/metrics/field_trial.h" 12 #include "base/metrics/stats_counters.h" 13 #include "base/stl_util-inl.h" 14 #include "base/string_number_conversions.h" 15 #include "base/string_util.h" 16 #include "base/stringprintf.h" 17 #include "base/time.h" 18 #include "base/utf_string_conversions.h" 19 #include "base/values.h" 20 #include "net/base/connection_type_histograms.h" 21 #include "net/base/net_log.h" 22 #include "net/base/net_util.h" 23 #include "net/http/http_network_session.h" 24 #include "net/socket/ssl_client_socket.h" 25 #include "net/spdy/spdy_frame_builder.h" 26 #include "net/spdy/spdy_http_utils.h" 27 #include "net/spdy/spdy_protocol.h" 28 #include "net/spdy/spdy_session_pool.h" 29 #include "net/spdy/spdy_settings_storage.h" 30 #include "net/spdy/spdy_stream.h" 31 32 namespace net { 33 34 NetLogSpdySynParameter::NetLogSpdySynParameter( 35 const linked_ptr<spdy::SpdyHeaderBlock>& headers, 36 spdy::SpdyControlFlags flags, 37 spdy::SpdyStreamId id, 38 spdy::SpdyStreamId associated_stream) 39 : headers_(headers), 40 flags_(flags), 41 id_(id), 42 associated_stream_(associated_stream) { 43 } 44 45 NetLogSpdySynParameter::~NetLogSpdySynParameter() { 46 } 47 48 Value* NetLogSpdySynParameter::ToValue() const { 49 DictionaryValue* dict = new DictionaryValue(); 50 ListValue* headers_list = new ListValue(); 51 for (spdy::SpdyHeaderBlock::const_iterator it = headers_->begin(); 52 it != headers_->end(); ++it) { 53 headers_list->Append(new StringValue(base::StringPrintf( 54 "%s: %s", it->first.c_str(), it->second.c_str()))); 55 } 56 dict->SetInteger("flags", flags_); 57 dict->Set("headers", headers_list); 58 dict->SetInteger("id", id_); 59 if (associated_stream_) 60 dict->SetInteger("associated_stream", associated_stream_); 61 return dict; 62 } 63 64 namespace { 65 66 const int kReadBufferSize = 8 * 1024; 67 68 class NetLogSpdySessionParameter : public NetLog::EventParameters { 69 public: 70 NetLogSpdySessionParameter(const HostPortProxyPair& host_pair) 71 : host_pair_(host_pair) {} 72 virtual Value* ToValue() const { 73 DictionaryValue* dict = new DictionaryValue(); 74 dict->Set("host", new StringValue(host_pair_.first.ToString())); 75 dict->Set("proxy", new StringValue(host_pair_.second.ToPacString())); 76 return dict; 77 } 78 private: 79 const HostPortProxyPair host_pair_; 80 DISALLOW_COPY_AND_ASSIGN(NetLogSpdySessionParameter); 81 }; 82 83 class NetLogSpdySettingsParameter : public NetLog::EventParameters { 84 public: 85 explicit NetLogSpdySettingsParameter(const spdy::SpdySettings& settings) 86 : settings_(settings) {} 87 88 virtual Value* ToValue() const { 89 DictionaryValue* dict = new DictionaryValue(); 90 ListValue* settings = new ListValue(); 91 for (spdy::SpdySettings::const_iterator it = settings_.begin(); 92 it != settings_.end(); ++it) { 93 settings->Append(new StringValue( 94 base::StringPrintf("[%u:%u]", it->first.id(), it->second))); 95 } 96 dict->Set("settings", settings); 97 return dict; 98 } 99 100 private: 101 ~NetLogSpdySettingsParameter() {} 102 const spdy::SpdySettings settings_; 103 104 DISALLOW_COPY_AND_ASSIGN(NetLogSpdySettingsParameter); 105 }; 106 107 class NetLogSpdyWindowUpdateParameter : public NetLog::EventParameters { 108 public: 109 NetLogSpdyWindowUpdateParameter(spdy::SpdyStreamId stream_id, 110 int delta, 111 int window_size) 112 : stream_id_(stream_id), delta_(delta), window_size_(window_size) {} 113 114 virtual Value* ToValue() const { 115 DictionaryValue* dict = new DictionaryValue(); 116 dict->SetInteger("stream_id", static_cast<int>(stream_id_)); 117 dict->SetInteger("delta", delta_); 118 dict->SetInteger("window_size", window_size_); 119 return dict; 120 } 121 122 private: 123 ~NetLogSpdyWindowUpdateParameter() {} 124 const spdy::SpdyStreamId stream_id_; 125 const int delta_; 126 const int window_size_; 127 128 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyWindowUpdateParameter); 129 }; 130 131 class NetLogSpdyDataParameter : public NetLog::EventParameters { 132 public: 133 NetLogSpdyDataParameter(spdy::SpdyStreamId stream_id, 134 int size, 135 spdy::SpdyDataFlags flags) 136 : stream_id_(stream_id), size_(size), flags_(flags) {} 137 138 virtual Value* ToValue() const { 139 DictionaryValue* dict = new DictionaryValue(); 140 dict->SetInteger("stream_id", static_cast<int>(stream_id_)); 141 dict->SetInteger("size", size_); 142 dict->SetInteger("flags", static_cast<int>(flags_)); 143 return dict; 144 } 145 146 private: 147 ~NetLogSpdyDataParameter() {} 148 const spdy::SpdyStreamId stream_id_; 149 const int size_; 150 const spdy::SpdyDataFlags flags_; 151 152 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyDataParameter); 153 }; 154 155 class NetLogSpdyRstParameter : public NetLog::EventParameters { 156 public: 157 NetLogSpdyRstParameter(spdy::SpdyStreamId stream_id, int status) 158 : stream_id_(stream_id), status_(status) {} 159 160 virtual Value* ToValue() const { 161 DictionaryValue* dict = new DictionaryValue(); 162 dict->SetInteger("stream_id", static_cast<int>(stream_id_)); 163 dict->SetInteger("status", status_); 164 return dict; 165 } 166 167 private: 168 ~NetLogSpdyRstParameter() {} 169 const spdy::SpdyStreamId stream_id_; 170 const int status_; 171 172 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyRstParameter); 173 }; 174 175 class NetLogSpdyGoAwayParameter : public NetLog::EventParameters { 176 public: 177 NetLogSpdyGoAwayParameter(spdy::SpdyStreamId last_stream_id, 178 int active_streams, 179 int unclaimed_streams) 180 : last_stream_id_(last_stream_id), 181 active_streams_(active_streams), 182 unclaimed_streams_(unclaimed_streams) {} 183 184 virtual Value* ToValue() const { 185 DictionaryValue* dict = new DictionaryValue(); 186 dict->SetInteger("last_accepted_stream_id", 187 static_cast<int>(last_stream_id_)); 188 dict->SetInteger("active_streams", active_streams_); 189 dict->SetInteger("unclaimed_streams", unclaimed_streams_); 190 return dict; 191 } 192 193 private: 194 ~NetLogSpdyGoAwayParameter() {} 195 const spdy::SpdyStreamId last_stream_id_; 196 const int active_streams_; 197 const int unclaimed_streams_; 198 199 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyGoAwayParameter); 200 }; 201 202 } // namespace 203 204 // static 205 bool SpdySession::use_ssl_ = true; 206 207 // static 208 bool SpdySession::use_flow_control_ = false; 209 210 // static 211 size_t SpdySession::max_concurrent_stream_limit_ = 256; 212 213 SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, 214 SpdySessionPool* spdy_session_pool, 215 SpdySettingsStorage* spdy_settings, 216 NetLog* net_log) 217 : ALLOW_THIS_IN_INITIALIZER_LIST( 218 read_callback_(this, &SpdySession::OnReadComplete)), 219 ALLOW_THIS_IN_INITIALIZER_LIST( 220 write_callback_(this, &SpdySession::OnWriteComplete)), 221 ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)), 222 host_port_proxy_pair_(host_port_proxy_pair), 223 spdy_session_pool_(spdy_session_pool), 224 spdy_settings_(spdy_settings), 225 connection_(new ClientSocketHandle), 226 read_buffer_(new IOBuffer(kReadBufferSize)), 227 read_pending_(false), 228 stream_hi_water_mark_(1), // Always start at 1 for the first stream id. 229 write_pending_(false), 230 delayed_write_pending_(false), 231 is_secure_(false), 232 certificate_error_code_(OK), 233 error_(OK), 234 state_(IDLE), 235 max_concurrent_streams_(kDefaultMaxConcurrentStreams), 236 streams_initiated_count_(0), 237 streams_pushed_count_(0), 238 streams_pushed_and_claimed_count_(0), 239 streams_abandoned_count_(0), 240 frames_received_(0), 241 bytes_received_(0), 242 sent_settings_(false), 243 received_settings_(false), 244 stalled_streams_(0), 245 initial_send_window_size_(spdy::kSpdyStreamInitialWindowSize), 246 initial_recv_window_size_(spdy::kSpdyStreamInitialWindowSize), 247 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)) { 248 DCHECK(HttpStreamFactory::spdy_enabled()); 249 net_log_.BeginEvent( 250 NetLog::TYPE_SPDY_SESSION, 251 make_scoped_refptr( 252 new NetLogSpdySessionParameter(host_port_proxy_pair_))); 253 254 // TODO(mbelshe): consider randomization of the stream_hi_water_mark. 255 256 spdy_framer_.set_visitor(this); 257 258 SendSettings(); 259 } 260 261 SpdySession::~SpdySession() { 262 if (state_ != CLOSED) { 263 state_ = CLOSED; 264 265 // Cleanup all the streams. 266 CloseAllStreams(net::ERR_ABORTED); 267 } 268 269 if (connection_->is_initialized()) { 270 // With Spdy we can't recycle sockets. 271 connection_->socket()->Disconnect(); 272 } 273 274 // Streams should all be gone now. 275 DCHECK_EQ(0u, num_active_streams()); 276 DCHECK_EQ(0u, num_unclaimed_pushed_streams()); 277 278 DCHECK(pending_callback_map_.empty()); 279 280 RecordHistograms(); 281 282 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION, NULL); 283 } 284 285 net::Error SpdySession::InitializeWithSocket( 286 ClientSocketHandle* connection, 287 bool is_secure, 288 int certificate_error_code) { 289 base::StatsCounter spdy_sessions("spdy.sessions"); 290 spdy_sessions.Increment(); 291 292 state_ = CONNECTED; 293 connection_.reset(connection); 294 is_secure_ = is_secure; 295 certificate_error_code_ = certificate_error_code; 296 297 // Write out any data that we might have to send, such as the settings frame. 298 WriteSocketLater(); 299 net::Error error = ReadSocket(); 300 if (error == ERR_IO_PENDING) 301 return OK; 302 return error; 303 } 304 305 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { 306 if (state_ != CONNECTED) 307 return false; 308 309 SSLInfo ssl_info; 310 bool was_npn_negotiated; 311 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated)) 312 return true; // This is not a secure session, so all domains are okay. 313 314 return ssl_info.cert->VerifyNameMatch(domain); 315 } 316 317 int SpdySession::GetPushStream( 318 const GURL& url, 319 scoped_refptr<SpdyStream>* stream, 320 const BoundNetLog& stream_net_log) { 321 CHECK_NE(state_, CLOSED); 322 323 *stream = NULL; 324 325 // Don't allow access to secure push streams over an unauthenticated, but 326 // encrypted SSL socket. 327 if (is_secure_ && certificate_error_code_ != OK && 328 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 329 LOG(ERROR) << "Tried to get pushed spdy stream for secure content over an " 330 << "unauthenticated session."; 331 CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true); 332 return ERR_SPDY_PROTOCOL_ERROR; 333 } 334 335 *stream = GetActivePushStream(url.spec()); 336 if (stream->get()) { 337 DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_); 338 streams_pushed_and_claimed_count_++; 339 return OK; 340 } 341 return 0; 342 } 343 344 int SpdySession::CreateStream( 345 const GURL& url, 346 RequestPriority priority, 347 scoped_refptr<SpdyStream>* spdy_stream, 348 const BoundNetLog& stream_net_log, 349 CompletionCallback* callback) { 350 if (!max_concurrent_streams_ || 351 active_streams_.size() < max_concurrent_streams_) { 352 return CreateStreamImpl(url, priority, spdy_stream, stream_net_log); 353 } 354 355 stalled_streams_++; 356 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS, NULL); 357 create_stream_queues_[priority].push( 358 PendingCreateStream(url, priority, spdy_stream, 359 stream_net_log, callback)); 360 return ERR_IO_PENDING; 361 } 362 363 void SpdySession::ProcessPendingCreateStreams() { 364 while (!max_concurrent_streams_ || 365 active_streams_.size() < max_concurrent_streams_) { 366 bool no_pending_create_streams = true; 367 for (int i = 0;i < NUM_PRIORITIES;++i) { 368 if (!create_stream_queues_[i].empty()) { 369 PendingCreateStream pending_create = create_stream_queues_[i].front(); 370 create_stream_queues_[i].pop(); 371 no_pending_create_streams = false; 372 int error = CreateStreamImpl(*pending_create.url, 373 pending_create.priority, 374 pending_create.spdy_stream, 375 *pending_create.stream_net_log); 376 scoped_refptr<SpdyStream>* stream = pending_create.spdy_stream; 377 DCHECK(!ContainsKey(pending_callback_map_, stream)); 378 pending_callback_map_[stream] = 379 CallbackResultPair(pending_create.callback, error); 380 MessageLoop::current()->PostTask( 381 FROM_HERE, 382 method_factory_.NewRunnableMethod( 383 &SpdySession::InvokeUserStreamCreationCallback, stream)); 384 break; 385 } 386 } 387 if (no_pending_create_streams) 388 return; // there were no streams in any queue 389 } 390 } 391 392 void SpdySession::CancelPendingCreateStreams( 393 const scoped_refptr<SpdyStream>* spdy_stream) { 394 PendingCallbackMap::iterator it = pending_callback_map_.find(spdy_stream); 395 if (it != pending_callback_map_.end()) { 396 pending_callback_map_.erase(it); 397 return; 398 } 399 400 for (int i = 0;i < NUM_PRIORITIES;++i) { 401 PendingCreateStreamQueue tmp; 402 // Make a copy removing this trans 403 while (!create_stream_queues_[i].empty()) { 404 PendingCreateStream pending_create = create_stream_queues_[i].front(); 405 create_stream_queues_[i].pop(); 406 if (pending_create.spdy_stream != spdy_stream) 407 tmp.push(pending_create); 408 } 409 // Now copy it back 410 while (!tmp.empty()) { 411 create_stream_queues_[i].push(tmp.front()); 412 tmp.pop(); 413 } 414 } 415 } 416 417 int SpdySession::CreateStreamImpl( 418 const GURL& url, 419 RequestPriority priority, 420 scoped_refptr<SpdyStream>* spdy_stream, 421 const BoundNetLog& stream_net_log) { 422 // Make sure that we don't try to send https/wss over an unauthenticated, but 423 // encrypted SSL socket. 424 if (is_secure_ && certificate_error_code_ != OK && 425 (url.SchemeIs("https") || url.SchemeIs("wss"))) { 426 LOG(ERROR) << "Tried to create spdy stream for secure content over an " 427 << "unauthenticated session."; 428 CloseSessionOnError(static_cast<net::Error>(certificate_error_code_), true); 429 return ERR_SPDY_PROTOCOL_ERROR; 430 } 431 432 const std::string& path = url.PathForRequest(); 433 434 const spdy::SpdyStreamId stream_id = GetNewStreamId(); 435 436 *spdy_stream = new SpdyStream(this, 437 stream_id, 438 false, 439 stream_net_log); 440 const scoped_refptr<SpdyStream>& stream = *spdy_stream; 441 442 stream->set_priority(priority); 443 stream->set_path(path); 444 stream->set_send_window_size(initial_send_window_size_); 445 stream->set_recv_window_size(initial_recv_window_size_); 446 ActivateStream(stream); 447 448 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", 449 static_cast<int>(priority), 0, 10, 11); 450 451 // TODO(mbelshe): Optimize memory allocations 452 DCHECK(priority >= net::HIGHEST && priority < net::NUM_PRIORITIES); 453 454 DCHECK_EQ(active_streams_[stream_id].get(), stream.get()); 455 return OK; 456 } 457 458 int SpdySession::WriteSynStream( 459 spdy::SpdyStreamId stream_id, 460 RequestPriority priority, 461 spdy::SpdyControlFlags flags, 462 const linked_ptr<spdy::SpdyHeaderBlock>& headers) { 463 // Find our stream 464 if (!IsStreamActive(stream_id)) 465 return ERR_INVALID_SPDY_STREAM; 466 const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; 467 CHECK_EQ(stream->stream_id(), stream_id); 468 469 scoped_ptr<spdy::SpdySynStreamControlFrame> syn_frame( 470 spdy_framer_.CreateSynStream( 471 stream_id, 0, 472 ConvertRequestPriorityToSpdyPriority(priority), 473 flags, false, headers.get())); 474 QueueFrame(syn_frame.get(), priority, stream); 475 476 base::StatsCounter spdy_requests("spdy.requests"); 477 spdy_requests.Increment(); 478 streams_initiated_count_++; 479 480 if (net_log().IsLoggingAllEvents()) { 481 net_log().AddEvent( 482 NetLog::TYPE_SPDY_SESSION_SYN_STREAM, 483 make_scoped_refptr( 484 new NetLogSpdySynParameter(headers, flags, stream_id, 0))); 485 } 486 487 return ERR_IO_PENDING; 488 } 489 490 int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id, 491 net::IOBuffer* data, int len, 492 spdy::SpdyDataFlags flags) { 493 // Find our stream 494 DCHECK(IsStreamActive(stream_id)); 495 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 496 CHECK_EQ(stream->stream_id(), stream_id); 497 if (!stream) 498 return ERR_INVALID_SPDY_STREAM; 499 500 if (len > kMaxSpdyFrameChunkSize) { 501 len = kMaxSpdyFrameChunkSize; 502 flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN); 503 } 504 505 // Obey send window size of the stream if flow control is enabled. 506 if (use_flow_control_) { 507 if (stream->send_window_size() <= 0) { 508 // Because we queue frames onto the session, it is possible that 509 // a stream was not flow controlled at the time it attempted the 510 // write, but when we go to fulfill the write, it is now flow 511 // controlled. This is why we need the session to mark the stream 512 // as stalled - because only the session knows for sure when the 513 // stall occurs. 514 stream->set_stalled_by_flow_control(true); 515 net_log().AddEvent( 516 NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW, 517 make_scoped_refptr( 518 new NetLogIntegerParameter("stream_id", stream_id))); 519 return ERR_IO_PENDING; 520 } 521 int new_len = std::min(len, stream->send_window_size()); 522 if (new_len < len) { 523 len = new_len; 524 flags = static_cast<spdy::SpdyDataFlags>(flags & ~spdy::DATA_FLAG_FIN); 525 } 526 stream->DecreaseSendWindowSize(len); 527 } 528 529 if (net_log().IsLoggingAllEvents()) { 530 net_log().AddEvent( 531 NetLog::TYPE_SPDY_SESSION_SEND_DATA, 532 make_scoped_refptr(new NetLogSpdyDataParameter(stream_id, len, flags))); 533 } 534 535 // TODO(mbelshe): reduce memory copies here. 536 scoped_ptr<spdy::SpdyDataFrame> frame( 537 spdy_framer_.CreateDataFrame(stream_id, data->data(), len, flags)); 538 QueueFrame(frame.get(), stream->priority(), stream); 539 return ERR_IO_PENDING; 540 } 541 542 void SpdySession::CloseStream(spdy::SpdyStreamId stream_id, int status) { 543 // TODO(mbelshe): We should send a RST_STREAM control frame here 544 // so that the server can cancel a large send. 545 546 DeleteStream(stream_id, status); 547 } 548 549 void SpdySession::ResetStream( 550 spdy::SpdyStreamId stream_id, spdy::SpdyStatusCodes status) { 551 552 net_log().AddEvent( 553 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM, 554 make_scoped_refptr(new NetLogSpdyRstParameter(stream_id, status))); 555 556 scoped_ptr<spdy::SpdyRstStreamControlFrame> rst_frame( 557 spdy_framer_.CreateRstStream(stream_id, status)); 558 559 // Default to lowest priority unless we know otherwise. 560 int priority = 3; 561 if(IsStreamActive(stream_id)) { 562 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 563 priority = stream->priority(); 564 } 565 QueueFrame(rst_frame.get(), priority, NULL); 566 567 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); 568 } 569 570 bool SpdySession::IsStreamActive(spdy::SpdyStreamId stream_id) const { 571 return ContainsKey(active_streams_, stream_id); 572 } 573 574 LoadState SpdySession::GetLoadState() const { 575 // NOTE: The application only queries the LoadState via the 576 // SpdyNetworkTransaction, and details are only needed when 577 // we're in the process of connecting. 578 579 // If we're connecting, defer to the connection to give us the actual 580 // LoadState. 581 if (state_ == CONNECTING) 582 return connection_->GetLoadState(); 583 584 // Just report that we're idle since the session could be doing 585 // many things concurrently. 586 return LOAD_STATE_IDLE; 587 } 588 589 void SpdySession::OnReadComplete(int bytes_read) { 590 // Parse a frame. For now this code requires that the frame fit into our 591 // buffer (32KB). 592 // TODO(mbelshe): support arbitrarily large frames! 593 594 read_pending_ = false; 595 596 if (bytes_read <= 0) { 597 // Session is tearing down. 598 net::Error error = static_cast<net::Error>(bytes_read); 599 if (bytes_read == 0) 600 error = ERR_CONNECTION_CLOSED; 601 CloseSessionOnError(error, true); 602 return; 603 } 604 605 bytes_received_ += bytes_read; 606 607 // The SpdyFramer will use callbacks onto |this| as it parses frames. 608 // When errors occur, those callbacks can lead to teardown of all references 609 // to |this|, so maintain a reference to self during this call for safe 610 // cleanup. 611 scoped_refptr<SpdySession> self(this); 612 613 char *data = read_buffer_->data(); 614 while (bytes_read && 615 spdy_framer_.error_code() == spdy::SpdyFramer::SPDY_NO_ERROR) { 616 uint32 bytes_processed = spdy_framer_.ProcessInput(data, bytes_read); 617 bytes_read -= bytes_processed; 618 data += bytes_processed; 619 if (spdy_framer_.state() == spdy::SpdyFramer::SPDY_DONE) 620 spdy_framer_.Reset(); 621 } 622 623 if (state_ != CLOSED) 624 ReadSocket(); 625 } 626 627 void SpdySession::OnWriteComplete(int result) { 628 DCHECK(write_pending_); 629 DCHECK(in_flight_write_.size()); 630 631 write_pending_ = false; 632 633 scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); 634 635 if (result >= 0) { 636 // It should not be possible to have written more bytes than our 637 // in_flight_write_. 638 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); 639 640 in_flight_write_.buffer()->DidConsume(result); 641 642 // We only notify the stream when we've fully written the pending frame. 643 if (!in_flight_write_.buffer()->BytesRemaining()) { 644 if (stream) { 645 // Report the number of bytes written to the caller, but exclude the 646 // frame size overhead. NOTE: if this frame was compressed the 647 // reported bytes written is the compressed size, not the original 648 // size. 649 if (result > 0) { 650 result = in_flight_write_.buffer()->size(); 651 DCHECK_GE(result, static_cast<int>(spdy::SpdyFrame::size())); 652 result -= static_cast<int>(spdy::SpdyFrame::size()); 653 } 654 655 // It is possible that the stream was cancelled while we were writing 656 // to the socket. 657 if (!stream->cancelled()) 658 stream->OnWriteComplete(result); 659 } 660 661 // Cleanup the write which just completed. 662 in_flight_write_.release(); 663 } 664 665 // Write more data. We're already in a continuation, so we can 666 // go ahead and write it immediately (without going back to the 667 // message loop). 668 WriteSocketLater(); 669 } else { 670 in_flight_write_.release(); 671 672 // The stream is now errored. Close it down. 673 CloseSessionOnError(static_cast<net::Error>(result), true); 674 } 675 } 676 677 net::Error SpdySession::ReadSocket() { 678 if (read_pending_) 679 return OK; 680 681 if (state_ == CLOSED) { 682 NOTREACHED(); 683 return ERR_UNEXPECTED; 684 } 685 686 CHECK(connection_.get()); 687 CHECK(connection_->socket()); 688 int bytes_read = connection_->socket()->Read(read_buffer_.get(), 689 kReadBufferSize, 690 &read_callback_); 691 switch (bytes_read) { 692 case 0: 693 // Socket is closed! 694 CloseSessionOnError(ERR_CONNECTION_CLOSED, true); 695 return ERR_CONNECTION_CLOSED; 696 case net::ERR_IO_PENDING: 697 // Waiting for data. Nothing to do now. 698 read_pending_ = true; 699 return ERR_IO_PENDING; 700 default: 701 // Data was read, process it. 702 // Schedule the work through the message loop to avoid recursive 703 // callbacks. 704 read_pending_ = true; 705 MessageLoop::current()->PostTask( 706 FROM_HERE, 707 method_factory_.NewRunnableMethod( 708 &SpdySession::OnReadComplete, bytes_read)); 709 break; 710 } 711 return OK; 712 } 713 714 void SpdySession::WriteSocketLater() { 715 if (delayed_write_pending_) 716 return; 717 718 if (state_ < CONNECTED) 719 return; 720 721 delayed_write_pending_ = true; 722 MessageLoop::current()->PostTask( 723 FROM_HERE, 724 method_factory_.NewRunnableMethod(&SpdySession::WriteSocket)); 725 } 726 727 void SpdySession::WriteSocket() { 728 // This function should only be called via WriteSocketLater. 729 DCHECK(delayed_write_pending_); 730 delayed_write_pending_ = false; 731 732 // If the socket isn't connected yet, just wait; we'll get called 733 // again when the socket connection completes. If the socket is 734 // closed, just return. 735 if (state_ < CONNECTED || state_ == CLOSED) 736 return; 737 738 if (write_pending_) // Another write is in progress still. 739 return; 740 741 // Loop sending frames until we've sent everything or until the write 742 // returns error (or ERR_IO_PENDING). 743 while (in_flight_write_.buffer() || !queue_.empty()) { 744 if (!in_flight_write_.buffer()) { 745 // Grab the next SpdyFrame to send. 746 SpdyIOBuffer next_buffer = queue_.top(); 747 queue_.pop(); 748 749 // We've deferred compression until just before we write it to the socket, 750 // which is now. At this time, we don't compress our data frames. 751 spdy::SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false); 752 size_t size; 753 if (spdy_framer_.IsCompressible(uncompressed_frame)) { 754 scoped_ptr<spdy::SpdyFrame> compressed_frame( 755 spdy_framer_.CompressFrame(uncompressed_frame)); 756 if (!compressed_frame.get()) { 757 LOG(ERROR) << "SPDY Compression failure"; 758 CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true); 759 return; 760 } 761 762 size = compressed_frame->length() + spdy::SpdyFrame::size(); 763 764 DCHECK_GT(size, 0u); 765 766 // TODO(mbelshe): We have too much copying of data here. 767 IOBufferWithSize* buffer = new IOBufferWithSize(size); 768 memcpy(buffer->data(), compressed_frame->data(), size); 769 770 // Attempt to send the frame. 771 in_flight_write_ = SpdyIOBuffer(buffer, size, 0, next_buffer.stream()); 772 } else { 773 size = uncompressed_frame.length() + spdy::SpdyFrame::size(); 774 in_flight_write_ = next_buffer; 775 } 776 } else { 777 DCHECK(in_flight_write_.buffer()->BytesRemaining()); 778 } 779 780 write_pending_ = true; 781 int rv = connection_->socket()->Write(in_flight_write_.buffer(), 782 in_flight_write_.buffer()->BytesRemaining(), &write_callback_); 783 if (rv == net::ERR_IO_PENDING) 784 break; 785 786 // We sent the frame successfully. 787 OnWriteComplete(rv); 788 789 // TODO(mbelshe): Test this error case. Maybe we should mark the socket 790 // as in an error state. 791 if (rv < 0) 792 break; 793 } 794 } 795 796 void SpdySession::CloseAllStreams(net::Error status) { 797 base::StatsCounter abandoned_streams("spdy.abandoned_streams"); 798 base::StatsCounter abandoned_push_streams( 799 "spdy.abandoned_push_streams"); 800 801 if (!active_streams_.empty()) 802 abandoned_streams.Add(active_streams_.size()); 803 if (!unclaimed_pushed_streams_.empty()) { 804 streams_abandoned_count_ += unclaimed_pushed_streams_.size(); 805 abandoned_push_streams.Add(unclaimed_pushed_streams_.size()); 806 unclaimed_pushed_streams_.clear(); 807 } 808 809 for (int i = 0;i < NUM_PRIORITIES;++i) { 810 while (!create_stream_queues_[i].empty()) { 811 PendingCreateStream pending_create = create_stream_queues_[i].front(); 812 create_stream_queues_[i].pop(); 813 pending_create.callback->Run(ERR_ABORTED); 814 } 815 } 816 817 while (!active_streams_.empty()) { 818 ActiveStreamMap::iterator it = active_streams_.begin(); 819 const scoped_refptr<SpdyStream>& stream = it->second; 820 DCHECK(stream); 821 LOG(WARNING) << "ABANDONED (stream_id=" << stream->stream_id() 822 << "): " << stream->path(); 823 DeleteStream(stream->stream_id(), status); 824 } 825 826 // We also need to drain the queue. 827 while (queue_.size()) 828 queue_.pop(); 829 } 830 831 int SpdySession::GetNewStreamId() { 832 int id = stream_hi_water_mark_; 833 stream_hi_water_mark_ += 2; 834 if (stream_hi_water_mark_ > 0x7fff) 835 stream_hi_water_mark_ = 1; 836 return id; 837 } 838 839 void SpdySession::QueueFrame(spdy::SpdyFrame* frame, 840 spdy::SpdyPriority priority, 841 SpdyStream* stream) { 842 int length = spdy::SpdyFrame::size() + frame->length(); 843 IOBuffer* buffer = new IOBuffer(length); 844 memcpy(buffer->data(), frame->data(), length); 845 queue_.push(SpdyIOBuffer(buffer, length, priority, stream)); 846 847 WriteSocketLater(); 848 } 849 850 void SpdySession::CloseSessionOnError(net::Error err, bool remove_from_pool) { 851 // Closing all streams can have a side-effect of dropping the last reference 852 // to |this|. Hold a reference through this function. 853 scoped_refptr<SpdySession> self(this); 854 855 DCHECK_LT(err, OK); 856 net_log_.AddEvent( 857 NetLog::TYPE_SPDY_SESSION_CLOSE, 858 make_scoped_refptr(new NetLogIntegerParameter("status", err))); 859 860 // Don't close twice. This can occur because we can have both 861 // a read and a write outstanding, and each can complete with 862 // an error. 863 if (state_ != CLOSED) { 864 state_ = CLOSED; 865 error_ = err; 866 if (remove_from_pool) 867 RemoveFromPool(); 868 CloseAllStreams(err); 869 } 870 } 871 872 Value* SpdySession::GetInfoAsValue() const { 873 DictionaryValue* dict = new DictionaryValue(); 874 875 dict->SetInteger("source_id", net_log_.source().id); 876 877 dict->SetString("host_port_pair", host_port_proxy_pair_.first.ToString()); 878 dict->SetString("proxy", host_port_proxy_pair_.second.ToURI()); 879 880 dict->SetInteger("active_streams", active_streams_.size()); 881 882 dict->SetInteger("unclaimed_pushed_streams", 883 unclaimed_pushed_streams_.size()); 884 885 dict->SetBoolean("is_secure", is_secure_); 886 887 dict->SetInteger("error", error_); 888 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_); 889 890 dict->SetInteger("streams_initiated_count", streams_initiated_count_); 891 dict->SetInteger("streams_pushed_count", streams_pushed_count_); 892 dict->SetInteger("streams_pushed_and_claimed_count", 893 streams_pushed_and_claimed_count_); 894 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_); 895 dict->SetInteger("frames_received", frames_received_); 896 897 dict->SetBoolean("sent_settings", sent_settings_); 898 dict->SetBoolean("received_settings", received_settings_); 899 return dict; 900 } 901 902 int SpdySession::GetPeerAddress(AddressList* address) const { 903 if (!connection_->socket()) 904 return ERR_SOCKET_NOT_CONNECTED; 905 906 return connection_->socket()->GetPeerAddress(address); 907 } 908 909 int SpdySession::GetLocalAddress(IPEndPoint* address) const { 910 if (!connection_->socket()) 911 return ERR_SOCKET_NOT_CONNECTED; 912 913 return connection_->socket()->GetLocalAddress(address); 914 } 915 916 void SpdySession::ActivateStream(SpdyStream* stream) { 917 const spdy::SpdyStreamId id = stream->stream_id(); 918 DCHECK(!IsStreamActive(id)); 919 920 active_streams_[id] = stream; 921 } 922 923 void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) { 924 // For push streams, if they are being deleted normally, we leave 925 // the stream in the unclaimed_pushed_streams_ list. However, if 926 // the stream is errored out, clean it up entirely. 927 if (status != OK) { 928 PushedStreamMap::iterator it; 929 for (it = unclaimed_pushed_streams_.begin(); 930 it != unclaimed_pushed_streams_.end(); ++it) { 931 scoped_refptr<SpdyStream> curr = it->second; 932 if (id == curr->stream_id()) { 933 unclaimed_pushed_streams_.erase(it); 934 break; 935 } 936 } 937 } 938 939 // The stream might have been deleted. 940 ActiveStreamMap::iterator it2 = active_streams_.find(id); 941 if (it2 == active_streams_.end()) 942 return; 943 944 // If this is an active stream, call the callback. 945 const scoped_refptr<SpdyStream> stream(it2->second); 946 active_streams_.erase(it2); 947 if (stream) 948 stream->OnClose(status); 949 ProcessPendingCreateStreams(); 950 } 951 952 void SpdySession::RemoveFromPool() { 953 if (spdy_session_pool_) { 954 spdy_session_pool_->Remove(make_scoped_refptr(this)); 955 spdy_session_pool_ = NULL; 956 } 957 } 958 959 scoped_refptr<SpdyStream> SpdySession::GetActivePushStream( 960 const std::string& path) { 961 base::StatsCounter used_push_streams("spdy.claimed_push_streams"); 962 963 PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(path); 964 if (it != unclaimed_pushed_streams_.end()) { 965 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL); 966 scoped_refptr<SpdyStream> stream = it->second; 967 unclaimed_pushed_streams_.erase(it); 968 used_push_streams.Increment(); 969 return stream; 970 } 971 return NULL; 972 } 973 974 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) { 975 if (is_secure_) { 976 SSLClientSocket* ssl_socket = 977 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 978 ssl_socket->GetSSLInfo(ssl_info); 979 *was_npn_negotiated = ssl_socket->was_npn_negotiated(); 980 return true; 981 } 982 return false; 983 } 984 985 bool SpdySession::GetSSLCertRequestInfo( 986 SSLCertRequestInfo* cert_request_info) { 987 if (is_secure_) { 988 SSLClientSocket* ssl_socket = 989 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 990 ssl_socket->GetSSLCertRequestInfo(cert_request_info); 991 return true; 992 } 993 return false; 994 } 995 996 void SpdySession::OnError(spdy::SpdyFramer* framer) { 997 CloseSessionOnError(net::ERR_SPDY_PROTOCOL_ERROR, true); 998 } 999 1000 void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id, 1001 const char* data, 1002 size_t len) { 1003 if (net_log().IsLoggingAllEvents()) { 1004 net_log().AddEvent( 1005 NetLog::TYPE_SPDY_SESSION_RECV_DATA, 1006 make_scoped_refptr(new NetLogSpdyDataParameter( 1007 stream_id, len, spdy::SpdyDataFlags()))); 1008 } 1009 1010 if (!IsStreamActive(stream_id)) { 1011 // NOTE: it may just be that the stream was cancelled. 1012 LOG(WARNING) << "Received data frame for invalid stream " << stream_id; 1013 return; 1014 } 1015 1016 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1017 stream->OnDataReceived(data, len); 1018 } 1019 1020 bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers, 1021 const scoped_refptr<SpdyStream> stream) { 1022 int rv = OK; 1023 rv = stream->OnResponseReceived(headers); 1024 if (rv < 0) { 1025 DCHECK_NE(rv, ERR_IO_PENDING); 1026 const spdy::SpdyStreamId stream_id = stream->stream_id(); 1027 DeleteStream(stream_id, rv); 1028 return false; 1029 } 1030 return true; 1031 } 1032 1033 void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame, 1034 const linked_ptr<spdy::SpdyHeaderBlock>& headers) { 1035 spdy::SpdyStreamId stream_id = frame.stream_id(); 1036 spdy::SpdyStreamId associated_stream_id = frame.associated_stream_id(); 1037 1038 if (net_log_.IsLoggingAllEvents()) { 1039 net_log_.AddEvent( 1040 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM, 1041 make_scoped_refptr(new NetLogSpdySynParameter( 1042 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()), 1043 stream_id, associated_stream_id))); 1044 } 1045 1046 // Server-initiated streams should have even sequence numbers. 1047 if ((stream_id & 0x1) != 0) { 1048 LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id; 1049 return; 1050 } 1051 1052 if (IsStreamActive(stream_id)) { 1053 LOG(WARNING) << "Received OnSyn for active stream " << stream_id; 1054 return; 1055 } 1056 1057 if (associated_stream_id == 0) { 1058 LOG(WARNING) << "Received invalid OnSyn associated stream id " 1059 << associated_stream_id 1060 << " for stream " << stream_id; 1061 ResetStream(stream_id, spdy::INVALID_STREAM); 1062 return; 1063 } 1064 1065 streams_pushed_count_++; 1066 1067 // TODO(mbelshe): DCHECK that this is a GET method? 1068 1069 // Verify that the response had a URL for us. 1070 const std::string& url = ContainsKey(*headers, "url") ? 1071 headers->find("url")->second : ""; 1072 if (url.empty()) { 1073 ResetStream(stream_id, spdy::PROTOCOL_ERROR); 1074 LOG(WARNING) << "Pushed stream did not contain a url."; 1075 return; 1076 } 1077 1078 GURL gurl(url); 1079 if (!gurl.is_valid()) { 1080 ResetStream(stream_id, spdy::PROTOCOL_ERROR); 1081 LOG(WARNING) << "Pushed stream url was invalid: " << url; 1082 return; 1083 } 1084 1085 // Verify we have a valid stream association. 1086 if (!IsStreamActive(associated_stream_id)) { 1087 LOG(WARNING) << "Received OnSyn with inactive associated stream " 1088 << associated_stream_id; 1089 ResetStream(stream_id, spdy::INVALID_ASSOCIATED_STREAM); 1090 return; 1091 } 1092 1093 scoped_refptr<SpdyStream> associated_stream = 1094 active_streams_[associated_stream_id]; 1095 GURL associated_url(associated_stream->GetUrl()); 1096 if (associated_url.GetOrigin() != gurl.GetOrigin()) { 1097 LOG(WARNING) << "Rejected Cross Origin Push Stream " 1098 << associated_stream_id; 1099 ResetStream(stream_id, spdy::REFUSED_STREAM); 1100 return; 1101 } 1102 1103 // There should not be an existing pushed stream with the same path. 1104 PushedStreamMap::iterator it = unclaimed_pushed_streams_.find(url); 1105 if (it != unclaimed_pushed_streams_.end()) { 1106 LOG(WARNING) << "Received duplicate pushed stream with url: " << url; 1107 ResetStream(stream_id, spdy::PROTOCOL_ERROR); 1108 return; 1109 } 1110 1111 scoped_refptr<SpdyStream> stream( 1112 new SpdyStream(this, stream_id, true, net_log_)); 1113 1114 stream->set_path(gurl.PathForRequest()); 1115 1116 unclaimed_pushed_streams_[url] = stream; 1117 1118 ActivateStream(stream); 1119 stream->set_response_received(); 1120 1121 // Parse the headers. 1122 if (!Respond(*headers, stream)) 1123 return; 1124 1125 base::StatsCounter push_requests("spdy.pushed_streams"); 1126 push_requests.Increment(); 1127 } 1128 1129 void SpdySession::OnSynReply(const spdy::SpdySynReplyControlFrame& frame, 1130 const linked_ptr<spdy::SpdyHeaderBlock>& headers) { 1131 spdy::SpdyStreamId stream_id = frame.stream_id(); 1132 1133 bool valid_stream = IsStreamActive(stream_id); 1134 if (!valid_stream) { 1135 // NOTE: it may just be that the stream was cancelled. 1136 LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id; 1137 return; 1138 } 1139 1140 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1141 CHECK_EQ(stream->stream_id(), stream_id); 1142 CHECK(!stream->cancelled()); 1143 1144 if (stream->response_received()) { 1145 LOG(WARNING) << "Received duplicate SYN_REPLY for stream " << stream_id; 1146 CloseStream(stream->stream_id(), ERR_SPDY_PROTOCOL_ERROR); 1147 return; 1148 } 1149 stream->set_response_received(); 1150 1151 if (net_log().IsLoggingAllEvents()) { 1152 net_log().AddEvent( 1153 NetLog::TYPE_SPDY_SESSION_SYN_REPLY, 1154 make_scoped_refptr(new NetLogSpdySynParameter( 1155 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()), 1156 stream_id, 0))); 1157 } 1158 1159 Respond(*headers, stream); 1160 } 1161 1162 void SpdySession::OnHeaders(const spdy::SpdyHeadersControlFrame& frame, 1163 const linked_ptr<spdy::SpdyHeaderBlock>& headers) { 1164 spdy::SpdyStreamId stream_id = frame.stream_id(); 1165 1166 bool valid_stream = IsStreamActive(stream_id); 1167 if (!valid_stream) { 1168 // NOTE: it may just be that the stream was cancelled. 1169 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id; 1170 return; 1171 } 1172 1173 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1174 CHECK_EQ(stream->stream_id(), stream_id); 1175 CHECK(!stream->cancelled()); 1176 1177 if (net_log().IsLoggingAllEvents()) { 1178 net_log().AddEvent( 1179 NetLog::TYPE_SPDY_SESSION_HEADERS, 1180 make_scoped_refptr(new NetLogSpdySynParameter( 1181 headers, static_cast<spdy::SpdyControlFlags>(frame.flags()), 1182 stream_id, 0))); 1183 } 1184 1185 int rv = stream->OnHeaders(*headers); 1186 if (rv < 0) { 1187 DCHECK_NE(rv, ERR_IO_PENDING); 1188 const spdy::SpdyStreamId stream_id = stream->stream_id(); 1189 DeleteStream(stream_id, rv); 1190 } 1191 } 1192 1193 void SpdySession::OnControl(const spdy::SpdyControlFrame* frame) { 1194 const linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock); 1195 uint32 type = frame->type(); 1196 if (type == spdy::SYN_STREAM || 1197 type == spdy::SYN_REPLY || 1198 type == spdy::HEADERS) { 1199 if (!spdy_framer_.ParseHeaderBlock(frame, headers.get())) { 1200 LOG(WARNING) << "Could not parse Spdy Control Frame Header."; 1201 int stream_id = 0; 1202 if (type == spdy::SYN_STREAM) { 1203 stream_id = (reinterpret_cast<const spdy::SpdySynStreamControlFrame*> 1204 (frame))->stream_id(); 1205 } else if (type == spdy::SYN_REPLY) { 1206 stream_id = (reinterpret_cast<const spdy::SpdySynReplyControlFrame*> 1207 (frame))->stream_id(); 1208 } else if (type == spdy::HEADERS) { 1209 stream_id = (reinterpret_cast<const spdy::SpdyHeadersControlFrame*> 1210 (frame))->stream_id(); 1211 } 1212 if(IsStreamActive(stream_id)) 1213 ResetStream(stream_id, spdy::PROTOCOL_ERROR); 1214 return; 1215 } 1216 } 1217 1218 frames_received_++; 1219 1220 switch (type) { 1221 case spdy::GOAWAY: 1222 OnGoAway(*reinterpret_cast<const spdy::SpdyGoAwayControlFrame*>(frame)); 1223 break; 1224 case spdy::SETTINGS: 1225 OnSettings( 1226 *reinterpret_cast<const spdy::SpdySettingsControlFrame*>(frame)); 1227 break; 1228 case spdy::RST_STREAM: 1229 OnRst(*reinterpret_cast<const spdy::SpdyRstStreamControlFrame*>(frame)); 1230 break; 1231 case spdy::SYN_STREAM: 1232 OnSyn(*reinterpret_cast<const spdy::SpdySynStreamControlFrame*>(frame), 1233 headers); 1234 break; 1235 case spdy::HEADERS: 1236 OnHeaders(*reinterpret_cast<const spdy::SpdyHeadersControlFrame*>(frame), 1237 headers); 1238 break; 1239 case spdy::SYN_REPLY: 1240 OnSynReply( 1241 *reinterpret_cast<const spdy::SpdySynReplyControlFrame*>(frame), 1242 headers); 1243 break; 1244 case spdy::WINDOW_UPDATE: 1245 OnWindowUpdate( 1246 *reinterpret_cast<const spdy::SpdyWindowUpdateControlFrame*>(frame)); 1247 break; 1248 default: 1249 DCHECK(false); // Error! 1250 } 1251 } 1252 1253 void SpdySession::OnRst(const spdy::SpdyRstStreamControlFrame& frame) { 1254 spdy::SpdyStreamId stream_id = frame.stream_id(); 1255 1256 net_log().AddEvent( 1257 NetLog::TYPE_SPDY_SESSION_RST_STREAM, 1258 make_scoped_refptr( 1259 new NetLogSpdyRstParameter(stream_id, frame.status()))); 1260 1261 bool valid_stream = IsStreamActive(stream_id); 1262 if (!valid_stream) { 1263 // NOTE: it may just be that the stream was cancelled. 1264 LOG(WARNING) << "Received RST for invalid stream" << stream_id; 1265 return; 1266 } 1267 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1268 CHECK_EQ(stream->stream_id(), stream_id); 1269 CHECK(!stream->cancelled()); 1270 1271 if (frame.status() == 0) { 1272 stream->OnDataReceived(NULL, 0); 1273 } else { 1274 LOG(ERROR) << "Spdy stream closed: " << frame.status(); 1275 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical. 1276 // For now, it doesn't matter much - it is a protocol error. 1277 DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); 1278 } 1279 } 1280 1281 void SpdySession::OnGoAway(const spdy::SpdyGoAwayControlFrame& frame) { 1282 net_log_.AddEvent( 1283 NetLog::TYPE_SPDY_SESSION_GOAWAY, 1284 make_scoped_refptr( 1285 new NetLogSpdyGoAwayParameter(frame.last_accepted_stream_id(), 1286 active_streams_.size(), 1287 unclaimed_pushed_streams_.size()))); 1288 RemoveFromPool(); 1289 CloseAllStreams(net::ERR_ABORTED); 1290 1291 // TODO(willchan): Cancel any streams that are past the GoAway frame's 1292 // |last_accepted_stream_id|. 1293 1294 // Don't bother killing any streams that are still reading. They'll either 1295 // complete successfully or get an ERR_CONNECTION_CLOSED when the socket is 1296 // closed. 1297 } 1298 1299 void SpdySession::OnSettings(const spdy::SpdySettingsControlFrame& frame) { 1300 spdy::SpdySettings settings; 1301 if (spdy_framer_.ParseSettings(&frame, &settings)) { 1302 HandleSettings(settings); 1303 spdy_settings_->Set(host_port_pair(), settings); 1304 } 1305 1306 received_settings_ = true; 1307 1308 net_log_.AddEvent( 1309 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS, 1310 make_scoped_refptr(new NetLogSpdySettingsParameter(settings))); 1311 } 1312 1313 void SpdySession::OnWindowUpdate( 1314 const spdy::SpdyWindowUpdateControlFrame& frame) { 1315 spdy::SpdyStreamId stream_id = frame.stream_id(); 1316 if (!IsStreamActive(stream_id)) { 1317 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id; 1318 return; 1319 } 1320 1321 int delta_window_size = static_cast<int>(frame.delta_window_size()); 1322 if (delta_window_size < 1) { 1323 LOG(WARNING) << "Received WINDOW_UPDATE with an invalid delta_window_size " 1324 << delta_window_size; 1325 ResetStream(stream_id, spdy::FLOW_CONTROL_ERROR); 1326 return; 1327 } 1328 1329 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1330 CHECK_EQ(stream->stream_id(), stream_id); 1331 CHECK(!stream->cancelled()); 1332 1333 if (use_flow_control_) 1334 stream->IncreaseSendWindowSize(delta_window_size); 1335 1336 net_log_.AddEvent( 1337 NetLog::TYPE_SPDY_SESSION_SEND_WINDOW_UPDATE, 1338 make_scoped_refptr(new NetLogSpdyWindowUpdateParameter( 1339 stream_id, delta_window_size, stream->send_window_size()))); 1340 } 1341 1342 void SpdySession::SendWindowUpdate(spdy::SpdyStreamId stream_id, 1343 int delta_window_size) { 1344 DCHECK(IsStreamActive(stream_id)); 1345 scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; 1346 CHECK_EQ(stream->stream_id(), stream_id); 1347 1348 net_log_.AddEvent( 1349 NetLog::TYPE_SPDY_SESSION_RECV_WINDOW_UPDATE, 1350 make_scoped_refptr(new NetLogSpdyWindowUpdateParameter( 1351 stream_id, delta_window_size, stream->recv_window_size()))); 1352 1353 scoped_ptr<spdy::SpdyWindowUpdateControlFrame> window_update_frame( 1354 spdy_framer_.CreateWindowUpdate(stream_id, delta_window_size)); 1355 QueueFrame(window_update_frame.get(), stream->priority(), stream); 1356 } 1357 1358 // Given a cwnd that we would have sent to the server, modify it based on the 1359 // field trial policy. 1360 uint32 ApplyCwndFieldTrialPolicy(int cwnd) { 1361 base::FieldTrial* trial = base::FieldTrialList::Find("SpdyCwnd"); 1362 if (!trial) { 1363 LOG(WARNING) << "Could not find \"SpdyCwnd\" in FieldTrialList"; 1364 return cwnd; 1365 } 1366 if (trial->group_name() == "cwnd10") 1367 return 10; 1368 else if (trial->group_name() == "cwnd16") 1369 return 16; 1370 else if (trial->group_name() == "cwndMin16") 1371 return std::max(cwnd, 16); 1372 else if (trial->group_name() == "cwndMin10") 1373 return std::max(cwnd, 10); 1374 else if (trial->group_name() == "cwndDynamic") 1375 return cwnd; 1376 NOTREACHED(); 1377 return cwnd; 1378 } 1379 1380 void SpdySession::SendSettings() { 1381 // Note: we're copying the settings here, so that we can potentially modify 1382 // the settings for the field trial. When removing the field trial, make 1383 // this a reference to the const SpdySettings again. 1384 spdy::SpdySettings settings = spdy_settings_->Get(host_port_pair()); 1385 if (settings.empty()) 1386 return; 1387 1388 // Record Histogram Data and Apply the SpdyCwnd FieldTrial if applicable. 1389 for (spdy::SpdySettings::iterator i = settings.begin(), 1390 end = settings.end(); i != end; ++i) { 1391 const uint32 id = i->first.id(); 1392 const uint32 val = i->second; 1393 switch (id) { 1394 case spdy::SETTINGS_CURRENT_CWND: 1395 uint32 cwnd = 0; 1396 cwnd = ApplyCwndFieldTrialPolicy(val); 1397 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", 1398 cwnd, 1399 1, 200, 100); 1400 if (cwnd != val) { 1401 i->second = cwnd; 1402 i->first.set_flags(spdy::SETTINGS_FLAG_PLEASE_PERSIST); 1403 spdy_settings_->Set(host_port_pair(), settings); 1404 } 1405 break; 1406 } 1407 } 1408 1409 HandleSettings(settings); 1410 1411 net_log_.AddEvent( 1412 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS, 1413 make_scoped_refptr(new NetLogSpdySettingsParameter(settings))); 1414 1415 // Create the SETTINGS frame and send it. 1416 scoped_ptr<spdy::SpdySettingsControlFrame> settings_frame( 1417 spdy_framer_.CreateSettings(settings)); 1418 sent_settings_ = true; 1419 QueueFrame(settings_frame.get(), 0, NULL); 1420 } 1421 1422 void SpdySession::HandleSettings(const spdy::SpdySettings& settings) { 1423 for (spdy::SpdySettings::const_iterator i = settings.begin(), 1424 end = settings.end(); i != end; ++i) { 1425 const uint32 id = i->first.id(); 1426 const uint32 val = i->second; 1427 switch (id) { 1428 case spdy::SETTINGS_MAX_CONCURRENT_STREAMS: 1429 max_concurrent_streams_ = std::min(static_cast<size_t>(val), 1430 max_concurrent_stream_limit_); 1431 ProcessPendingCreateStreams(); 1432 break; 1433 } 1434 } 1435 } 1436 1437 void SpdySession::RecordHistograms() { 1438 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession", 1439 streams_initiated_count_, 1440 0, 300, 50); 1441 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession", 1442 streams_pushed_count_, 1443 0, 300, 50); 1444 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession", 1445 streams_pushed_and_claimed_count_, 1446 0, 300, 50); 1447 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession", 1448 streams_abandoned_count_, 1449 0, 300, 50); 1450 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent", 1451 sent_settings_ ? 1 : 0, 2); 1452 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived", 1453 received_settings_ ? 1 : 0, 2); 1454 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession", 1455 stalled_streams_, 1456 0, 300, 50); 1457 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls", 1458 stalled_streams_ > 0 ? 1 : 0, 2); 1459 1460 if (received_settings_) { 1461 // Enumerate the saved settings, and set histograms for it. 1462 const spdy::SpdySettings& settings = spdy_settings_->Get(host_port_pair()); 1463 1464 spdy::SpdySettings::const_iterator it; 1465 for (it = settings.begin(); it != settings.end(); ++it) { 1466 const spdy::SpdySetting setting = *it; 1467 switch (setting.first.id()) { 1468 case spdy::SETTINGS_CURRENT_CWND: 1469 // Record several different histograms to see if cwnd converges 1470 // for larger volumes of data being sent. 1471 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd", 1472 setting.second, 1473 1, 200, 100); 1474 if (bytes_received_ > 10 * 1024) { 1475 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K", 1476 setting.second, 1477 1, 200, 100); 1478 if (bytes_received_ > 25 * 1024) { 1479 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K", 1480 setting.second, 1481 1, 200, 100); 1482 if (bytes_received_ > 50 * 1024) { 1483 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K", 1484 setting.second, 1485 1, 200, 100); 1486 if (bytes_received_ > 100 * 1024) { 1487 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K", 1488 setting.second, 1489 1, 200, 100); 1490 } 1491 } 1492 } 1493 } 1494 break; 1495 case spdy::SETTINGS_ROUND_TRIP_TIME: 1496 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT", 1497 setting.second, 1498 1, 1200, 100); 1499 break; 1500 case spdy::SETTINGS_DOWNLOAD_RETRANS_RATE: 1501 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate", 1502 setting.second, 1503 1, 100, 50); 1504 break; 1505 } 1506 } 1507 } 1508 } 1509 1510 void SpdySession::InvokeUserStreamCreationCallback( 1511 scoped_refptr<SpdyStream>* stream) { 1512 PendingCallbackMap::iterator it = pending_callback_map_.find(stream); 1513 1514 // Exit if the request has already been cancelled. 1515 if (it == pending_callback_map_.end()) 1516 return; 1517 1518 CompletionCallback* callback = it->second.callback; 1519 int result = it->second.result; 1520 pending_callback_map_.erase(it); 1521 callback->Run(result); 1522 } 1523 1524 } // namespace net 1525