1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/spdy/spdy_stream.h" 6 7 #include "base/bind.h" 8 #include "base/compiler_specific.h" 9 #include "base/logging.h" 10 #include "base/message_loop/message_loop.h" 11 #include "base/strings/string_number_conversions.h" 12 #include "base/strings/stringprintf.h" 13 #include "base/values.h" 14 #include "net/spdy/spdy_buffer_producer.h" 15 #include "net/spdy/spdy_http_utils.h" 16 #include "net/spdy/spdy_session.h" 17 18 namespace net { 19 20 namespace { 21 22 base::Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id, 23 int status, 24 const std::string* description, 25 NetLog::LogLevel /* log_level */) { 26 base::DictionaryValue* dict = new base::DictionaryValue(); 27 dict->SetInteger("stream_id", static_cast<int>(stream_id)); 28 dict->SetInteger("status", status); 29 dict->SetString("description", *description); 30 return dict; 31 } 32 33 base::Value* NetLogSpdyStreamWindowUpdateCallback( 34 SpdyStreamId stream_id, 35 int32 delta, 36 int32 window_size, 37 NetLog::LogLevel /* log_level */) { 38 base::DictionaryValue* dict = new base::DictionaryValue(); 39 dict->SetInteger("stream_id", stream_id); 40 dict->SetInteger("delta", delta); 41 dict->SetInteger("window_size", window_size); 42 return dict; 43 } 44 45 bool ContainsUppercaseAscii(const std::string& str) { 46 for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) { 47 if (*i >= 'A' && *i <= 'Z') { 48 return true; 49 } 50 } 51 return false; 52 } 53 54 } // namespace 55 56 // A wrapper around a stream that calls into ProduceSynStreamFrame(). 57 class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer { 58 public: 59 SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream) 60 : stream_(stream) { 61 DCHECK(stream_.get()); 62 } 63 64 virtual ~SynStreamBufferProducer() {} 65 66 virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE { 67 if (!stream_.get()) { 68 NOTREACHED(); 69 return scoped_ptr<SpdyBuffer>(); 70 } 71 DCHECK_GT(stream_->stream_id(), 0u); 72 return scoped_ptr<SpdyBuffer>( 73 new SpdyBuffer(stream_->ProduceSynStreamFrame())); 74 } 75 76 private: 77 const base::WeakPtr<SpdyStream> stream_; 78 }; 79 80 SpdyStream::SpdyStream(SpdyStreamType type, 81 const base::WeakPtr<SpdySession>& session, 82 const GURL& url, 83 RequestPriority priority, 84 int32 initial_send_window_size, 85 int32 initial_recv_window_size, 86 const BoundNetLog& net_log) 87 : type_(type), 88 weak_ptr_factory_(this), 89 in_do_loop_(false), 90 continue_buffering_data_(type_ == SPDY_PUSH_STREAM), 91 stream_id_(0), 92 url_(url), 93 priority_(priority), 94 slot_(0), 95 send_stalled_by_flow_control_(false), 96 send_window_size_(initial_send_window_size), 97 recv_window_size_(initial_recv_window_size), 98 unacked_recv_window_bytes_(0), 99 session_(session), 100 delegate_(NULL), 101 send_status_( 102 (type_ == SPDY_PUSH_STREAM) ? 103 NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND), 104 request_time_(base::Time::Now()), 105 response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE), 106 io_state_((type_ == SPDY_PUSH_STREAM) ? STATE_IDLE : STATE_NONE), 107 response_status_(OK), 108 net_log_(net_log), 109 raw_received_bytes_(0), 110 send_bytes_(0), 111 recv_bytes_(0), 112 just_completed_frame_type_(DATA), 113 just_completed_frame_size_(0) { 114 CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM || 115 type_ == SPDY_REQUEST_RESPONSE_STREAM || 116 type_ == SPDY_PUSH_STREAM); 117 CHECK_GE(priority_, MINIMUM_PRIORITY); 118 CHECK_LE(priority_, MAXIMUM_PRIORITY); 119 } 120 121 SpdyStream::~SpdyStream() { 122 CHECK(!in_do_loop_); 123 UpdateHistograms(); 124 } 125 126 void SpdyStream::SetDelegate(Delegate* delegate) { 127 CHECK(!delegate_); 128 CHECK(delegate); 129 delegate_ = delegate; 130 131 if (type_ == SPDY_PUSH_STREAM) { 132 DCHECK(continue_buffering_data_); 133 base::MessageLoop::current()->PostTask( 134 FROM_HERE, 135 base::Bind(&SpdyStream::PushedStreamReplayData, GetWeakPtr())); 136 } 137 } 138 139 void SpdyStream::PushedStreamReplayData() { 140 DCHECK_EQ(type_, SPDY_PUSH_STREAM); 141 DCHECK_NE(stream_id_, 0u); 142 DCHECK(continue_buffering_data_); 143 144 continue_buffering_data_ = false; 145 146 // The delegate methods called below may delete |this|, so use 147 // |weak_this| to detect that. 148 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); 149 150 CHECK(delegate_); 151 SpdyResponseHeadersStatus status = 152 delegate_->OnResponseHeadersUpdated(response_headers_); 153 if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) { 154 // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not 155 // have been closed. Since we don't have complete headers, assume 156 // we're waiting for another HEADERS frame, and we had better not 157 // have any pending data frames. 158 CHECK(weak_this); 159 if (!pending_buffers_.empty()) { 160 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, 161 "Data received with incomplete headers."); 162 session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); 163 } 164 return; 165 } 166 167 // OnResponseHeadersUpdated() may have closed |this|. 168 if (!weak_this) 169 return; 170 171 response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE; 172 173 while (!pending_buffers_.empty()) { 174 // Take ownership of the first element of |pending_buffers_|. 175 scoped_ptr<SpdyBuffer> buffer(pending_buffers_.front()); 176 pending_buffers_.weak_erase(pending_buffers_.begin()); 177 178 bool eof = (buffer == NULL); 179 180 CHECK(delegate_); 181 delegate_->OnDataReceived(buffer.Pass()); 182 183 // OnDataReceived() may have closed |this|. 184 if (!weak_this) 185 return; 186 187 if (eof) { 188 DCHECK(pending_buffers_.empty()); 189 session_->CloseActiveStream(stream_id_, OK); 190 DCHECK(!weak_this); 191 // |pending_buffers_| is invalid at this point. 192 break; 193 } 194 } 195 } 196 197 scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() { 198 CHECK_EQ(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE); 199 CHECK(request_headers_); 200 CHECK_GT(stream_id_, 0u); 201 202 SpdyControlFlags flags = 203 (send_status_ == NO_MORE_DATA_TO_SEND) ? 204 CONTROL_FLAG_FIN : CONTROL_FLAG_NONE; 205 scoped_ptr<SpdyFrame> frame(session_->CreateSynStream( 206 stream_id_, priority_, slot_, flags, *request_headers_)); 207 send_time_ = base::TimeTicks::Now(); 208 return frame.Pass(); 209 } 210 211 void SpdyStream::DetachDelegate() { 212 CHECK(!in_do_loop_); 213 DCHECK(!IsClosed()); 214 delegate_ = NULL; 215 Cancel(); 216 } 217 218 void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) { 219 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 220 221 if (IsClosed()) 222 return; 223 224 // Check for wraparound. 225 if (send_window_size_ > 0) { 226 DCHECK_LE(delta_window_size, kint32max - send_window_size_); 227 } 228 if (send_window_size_ < 0) { 229 DCHECK_GE(delta_window_size, kint32min - send_window_size_); 230 } 231 send_window_size_ += delta_window_size; 232 PossiblyResumeIfSendStalled(); 233 } 234 235 void SpdyStream::OnWriteBufferConsumed( 236 size_t frame_payload_size, 237 size_t consume_size, 238 SpdyBuffer::ConsumeSource consume_source) { 239 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 240 if (consume_source == SpdyBuffer::DISCARD) { 241 // If we're discarding a frame or part of it, increase the send 242 // window by the number of discarded bytes. (Although if we're 243 // discarding part of a frame, it's probably because of a write 244 // error and we'll be tearing down the stream soon.) 245 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); 246 DCHECK_GT(remaining_payload_bytes, 0u); 247 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes)); 248 } 249 // For consumed bytes, the send window is increased when we receive 250 // a WINDOW_UPDATE frame. 251 } 252 253 void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) { 254 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 255 DCHECK_GE(delta_window_size, 1); 256 257 // Ignore late WINDOW_UPDATEs. 258 if (IsClosed()) 259 return; 260 261 if (send_window_size_ > 0) { 262 // Check for overflow. 263 int32 max_delta_window_size = kint32max - send_window_size_; 264 if (delta_window_size > max_delta_window_size) { 265 std::string desc = base::StringPrintf( 266 "Received WINDOW_UPDATE [delta: %d] for stream %d overflows " 267 "send_window_size_ [current: %d]", delta_window_size, stream_id_, 268 send_window_size_); 269 session_->ResetStream(stream_id_, RST_STREAM_FLOW_CONTROL_ERROR, desc); 270 return; 271 } 272 } 273 274 send_window_size_ += delta_window_size; 275 276 net_log_.AddEvent( 277 NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW, 278 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 279 stream_id_, delta_window_size, send_window_size_)); 280 281 PossiblyResumeIfSendStalled(); 282 } 283 284 void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) { 285 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 286 287 if (IsClosed()) 288 return; 289 290 // We only call this method when sending a frame. Therefore, 291 // |delta_window_size| should be within the valid frame size range. 292 DCHECK_GE(delta_window_size, 1); 293 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 294 295 // |send_window_size_| should have been at least |delta_window_size| for 296 // this call to happen. 297 DCHECK_GE(send_window_size_, delta_window_size); 298 299 send_window_size_ -= delta_window_size; 300 301 net_log_.AddEvent( 302 NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW, 303 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 304 stream_id_, -delta_window_size, send_window_size_)); 305 } 306 307 void SpdyStream::OnReadBufferConsumed( 308 size_t consume_size, 309 SpdyBuffer::ConsumeSource consume_source) { 310 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 311 DCHECK_GE(consume_size, 1u); 312 DCHECK_LE(consume_size, static_cast<size_t>(kint32max)); 313 IncreaseRecvWindowSize(static_cast<int32>(consume_size)); 314 } 315 316 void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) { 317 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 318 319 // By the time a read is processed by the delegate, this stream may 320 // already be inactive. 321 if (!session_->IsStreamActive(stream_id_)) 322 return; 323 324 DCHECK_GE(unacked_recv_window_bytes_, 0); 325 DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_); 326 DCHECK_GE(delta_window_size, 1); 327 // Check for overflow. 328 DCHECK_LE(delta_window_size, kint32max - recv_window_size_); 329 330 recv_window_size_ += delta_window_size; 331 net_log_.AddEvent( 332 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 333 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 334 stream_id_, delta_window_size, recv_window_size_)); 335 336 unacked_recv_window_bytes_ += delta_window_size; 337 if (unacked_recv_window_bytes_ > 338 session_->stream_initial_recv_window_size() / 2) { 339 session_->SendStreamWindowUpdate( 340 stream_id_, static_cast<uint32>(unacked_recv_window_bytes_)); 341 unacked_recv_window_bytes_ = 0; 342 } 343 } 344 345 void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) { 346 DCHECK(session_->IsStreamActive(stream_id_)); 347 DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); 348 DCHECK_GE(delta_window_size, 1); 349 350 // Since we never decrease the initial receive window size, 351 // |delta_window_size| should never cause |recv_window_size_| to go 352 // negative. If we do, the receive window isn't being respected. 353 if (delta_window_size > recv_window_size_) { 354 session_->ResetStream( 355 stream_id_, RST_STREAM_PROTOCOL_ERROR, 356 "delta_window_size is " + base::IntToString(delta_window_size) + 357 " in DecreaseRecvWindowSize, which is larger than the receive " + 358 "window size of " + base::IntToString(recv_window_size_)); 359 return; 360 } 361 362 recv_window_size_ -= delta_window_size; 363 net_log_.AddEvent( 364 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW, 365 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, 366 stream_id_, -delta_window_size, recv_window_size_)); 367 } 368 369 int SpdyStream::GetPeerAddress(IPEndPoint* address) const { 370 return session_->GetPeerAddress(address); 371 } 372 373 int SpdyStream::GetLocalAddress(IPEndPoint* address) const { 374 return session_->GetLocalAddress(address); 375 } 376 377 bool SpdyStream::WasEverUsed() const { 378 return session_->WasEverUsed(); 379 } 380 381 base::Time SpdyStream::GetRequestTime() const { 382 return request_time_; 383 } 384 385 void SpdyStream::SetRequestTime(base::Time t) { 386 request_time_ = t; 387 } 388 389 int SpdyStream::OnInitialResponseHeadersReceived( 390 const SpdyHeaderBlock& initial_response_headers, 391 base::Time response_time, 392 base::TimeTicks recv_first_byte_time) { 393 // SpdySession guarantees that this is called at most once. 394 CHECK(response_headers_.empty()); 395 396 // Check to make sure that we don't receive the response headers 397 // before we're ready for it. 398 switch (type_) { 399 case SPDY_BIDIRECTIONAL_STREAM: 400 // For a bidirectional stream, we're ready for the response 401 // headers once we've finished sending the request headers. 402 if (io_state_ < STATE_IDLE) { 403 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 404 "Response received before request sent"); 405 return ERR_SPDY_PROTOCOL_ERROR; 406 } 407 break; 408 409 case SPDY_REQUEST_RESPONSE_STREAM: 410 // For a request/response stream, we're ready for the response 411 // headers once we've finished sending the request headers and 412 // the request body (if we have one). 413 if ((io_state_ < STATE_IDLE) || (send_status_ == MORE_DATA_TO_SEND) || 414 pending_send_data_.get()) { 415 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 416 "Response received before request sent"); 417 return ERR_SPDY_PROTOCOL_ERROR; 418 } 419 break; 420 421 case SPDY_PUSH_STREAM: 422 // For a push stream, we're ready immediately. 423 DCHECK_EQ(send_status_, NO_MORE_DATA_TO_SEND); 424 DCHECK_EQ(io_state_, STATE_IDLE); 425 break; 426 } 427 428 metrics_.StartStream(); 429 430 DCHECK_EQ(io_state_, STATE_IDLE); 431 432 response_time_ = response_time; 433 recv_first_byte_time_ = recv_first_byte_time; 434 return MergeWithResponseHeaders(initial_response_headers); 435 } 436 437 int SpdyStream::OnAdditionalResponseHeadersReceived( 438 const SpdyHeaderBlock& additional_response_headers) { 439 if (type_ == SPDY_REQUEST_RESPONSE_STREAM) { 440 session_->ResetStream( 441 stream_id_, RST_STREAM_PROTOCOL_ERROR, 442 "Additional headers received for request/response stream"); 443 return ERR_SPDY_PROTOCOL_ERROR; 444 } else if (type_ == SPDY_PUSH_STREAM && 445 response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) { 446 session_->ResetStream( 447 stream_id_, RST_STREAM_PROTOCOL_ERROR, 448 "Additional headers received for push stream"); 449 return ERR_SPDY_PROTOCOL_ERROR; 450 } 451 return MergeWithResponseHeaders(additional_response_headers); 452 } 453 454 void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { 455 DCHECK(session_->IsStreamActive(stream_id_)); 456 457 // If we're still buffering data for a push stream, we will do the 458 // check for data received with incomplete headers in 459 // PushedStreamReplayData(). 460 if (!delegate_ || continue_buffering_data_) { 461 DCHECK_EQ(type_, SPDY_PUSH_STREAM); 462 // It should be valid for this to happen in the server push case. 463 // We'll return received data when delegate gets attached to the stream. 464 if (buffer) { 465 pending_buffers_.push_back(buffer.release()); 466 } else { 467 pending_buffers_.push_back(NULL); 468 metrics_.StopStream(); 469 // Note: we leave the stream open in the session until the stream 470 // is claimed. 471 } 472 return; 473 } 474 475 // If we have response headers but the delegate has indicated that 476 // it's still incomplete, then that's a protocol error. 477 if (response_headers_status_ == RESPONSE_HEADERS_ARE_INCOMPLETE) { 478 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, 479 "Data received with incomplete headers."); 480 session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); 481 return; 482 } 483 484 CHECK(!IsClosed()); 485 486 if (!buffer) { 487 metrics_.StopStream(); 488 // Deletes |this|. 489 session_->CloseActiveStream(stream_id_, OK); 490 return; 491 } 492 493 size_t length = buffer->GetRemainingSize(); 494 DCHECK_LE(length, session_->GetDataFrameMaximumPayload()); 495 if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { 496 DecreaseRecvWindowSize(static_cast<int32>(length)); 497 buffer->AddConsumeCallback( 498 base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr())); 499 } 500 501 // Track our bandwidth. 502 metrics_.RecordBytes(length); 503 recv_bytes_ += length; 504 recv_last_byte_time_ = base::TimeTicks::Now(); 505 506 // May close |this|. 507 delegate_->OnDataReceived(buffer.Pass()); 508 } 509 510 void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type, 511 size_t frame_size) { 512 if (frame_size < session_->GetFrameMinimumSize() || 513 frame_size > session_->GetFrameMaximumSize()) { 514 NOTREACHED(); 515 return; 516 } 517 if (IsClosed()) 518 return; 519 just_completed_frame_type_ = frame_type; 520 just_completed_frame_size_ = frame_size; 521 DoLoop(OK); 522 } 523 524 SpdyMajorVersion SpdyStream::GetProtocolVersion() const { 525 return session_->GetProtocolVersion(); 526 } 527 528 void SpdyStream::LogStreamError(int status, const std::string& description) { 529 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ERROR, 530 base::Bind(&NetLogSpdyStreamErrorCallback, 531 stream_id_, status, &description)); 532 } 533 534 void SpdyStream::OnClose(int status) { 535 CHECK(!in_do_loop_); 536 io_state_ = STATE_CLOSED; 537 response_status_ = status; 538 Delegate* delegate = delegate_; 539 delegate_ = NULL; 540 if (delegate) 541 delegate->OnClose(status); 542 // Unset |stream_id_| last so that the delegate can look it up. 543 stream_id_ = 0; 544 } 545 546 void SpdyStream::Cancel() { 547 CHECK(!in_do_loop_); 548 // We may be called again from a delegate's OnClose(). 549 if (io_state_ == STATE_CLOSED) 550 return; 551 552 if (stream_id_ != 0) { 553 session_->ResetStream(stream_id_, RST_STREAM_CANCEL, std::string()); 554 } else { 555 session_->CloseCreatedStream(GetWeakPtr(), RST_STREAM_CANCEL); 556 } 557 // |this| is invalid at this point. 558 } 559 560 void SpdyStream::Close() { 561 CHECK(!in_do_loop_); 562 // We may be called again from a delegate's OnClose(). 563 if (io_state_ == STATE_CLOSED) 564 return; 565 566 if (stream_id_ != 0) { 567 session_->CloseActiveStream(stream_id_, OK); 568 } else { 569 session_->CloseCreatedStream(GetWeakPtr(), OK); 570 } 571 // |this| is invalid at this point. 572 } 573 574 base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() { 575 return weak_ptr_factory_.GetWeakPtr(); 576 } 577 578 int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers, 579 SpdySendStatus send_status) { 580 CHECK_NE(type_, SPDY_PUSH_STREAM); 581 CHECK_EQ(send_status_, MORE_DATA_TO_SEND); 582 CHECK(!request_headers_); 583 CHECK(!pending_send_data_.get()); 584 CHECK_EQ(io_state_, STATE_NONE); 585 request_headers_ = request_headers.Pass(); 586 send_status_ = send_status; 587 io_state_ = STATE_SEND_REQUEST_HEADERS; 588 return DoLoop(OK); 589 } 590 591 void SpdyStream::SendData(IOBuffer* data, 592 int length, 593 SpdySendStatus send_status) { 594 CHECK_NE(type_, SPDY_PUSH_STREAM); 595 CHECK_EQ(send_status_, MORE_DATA_TO_SEND); 596 CHECK_GE(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE); 597 CHECK(!pending_send_data_.get()); 598 pending_send_data_ = new DrainableIOBuffer(data, length); 599 send_status_ = send_status; 600 QueueNextDataFrame(); 601 } 602 603 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, 604 bool* was_npn_negotiated, 605 NextProto* protocol_negotiated) { 606 return session_->GetSSLInfo( 607 ssl_info, was_npn_negotiated, protocol_negotiated); 608 } 609 610 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) { 611 return session_->GetSSLCertRequestInfo(cert_request_info); 612 } 613 614 void SpdyStream::PossiblyResumeIfSendStalled() { 615 DCHECK(!IsClosed()); 616 617 if (send_stalled_by_flow_control_ && !session_->IsSendStalled() && 618 send_window_size_ > 0) { 619 net_log_.AddEvent( 620 NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED, 621 NetLog::IntegerCallback("stream_id", stream_id_)); 622 send_stalled_by_flow_control_ = false; 623 QueueNextDataFrame(); 624 } 625 } 626 627 bool SpdyStream::IsClosed() const { 628 return io_state_ == STATE_CLOSED; 629 } 630 631 bool SpdyStream::IsIdle() const { 632 return io_state_ == STATE_IDLE; 633 } 634 635 NextProto SpdyStream::GetProtocol() const { 636 return session_->protocol(); 637 } 638 639 bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const { 640 if (stream_id_ == 0) 641 return false; 642 643 return session_->GetLoadTimingInfo(stream_id_, load_timing_info); 644 } 645 646 GURL SpdyStream::GetUrlFromHeaders() const { 647 if (type_ != SPDY_PUSH_STREAM && !request_headers_) 648 return GURL(); 649 650 const SpdyHeaderBlock& headers = 651 (type_ == SPDY_PUSH_STREAM) ? response_headers_ : *request_headers_; 652 return GetUrlFromHeaderBlock(headers, GetProtocolVersion(), 653 type_ == SPDY_PUSH_STREAM); 654 } 655 656 bool SpdyStream::HasUrlFromHeaders() const { 657 return !GetUrlFromHeaders().is_empty(); 658 } 659 660 int SpdyStream::DoLoop(int result) { 661 CHECK(!in_do_loop_); 662 in_do_loop_ = true; 663 664 do { 665 State state = io_state_; 666 io_state_ = STATE_NONE; 667 switch (state) { 668 case STATE_SEND_REQUEST_HEADERS: 669 CHECK_EQ(result, OK); 670 result = DoSendRequestHeaders(); 671 break; 672 case STATE_SEND_REQUEST_HEADERS_COMPLETE: 673 CHECK_EQ(result, OK); 674 result = DoSendRequestHeadersComplete(); 675 break; 676 677 // For request/response streams, no data is sent from the client 678 // while in the OPEN state, so OnFrameWriteComplete is never 679 // called here. The HTTP body is handled in the OnDataReceived 680 // callback, which does not call into DoLoop. 681 // 682 // For bidirectional streams, we'll send and receive data once 683 // the connection is established. Received data is handled in 684 // OnDataReceived. Sent data is handled in 685 // OnFrameWriteComplete, which calls DoOpen(). 686 case STATE_IDLE: 687 CHECK_EQ(result, OK); 688 result = DoOpen(); 689 break; 690 691 case STATE_CLOSED: 692 DCHECK_NE(result, ERR_IO_PENDING); 693 break; 694 default: 695 NOTREACHED() << io_state_; 696 break; 697 } 698 } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE && 699 io_state_ != STATE_IDLE); 700 701 CHECK(in_do_loop_); 702 in_do_loop_ = false; 703 704 return result; 705 } 706 707 int SpdyStream::DoSendRequestHeaders() { 708 DCHECK_NE(type_, SPDY_PUSH_STREAM); 709 io_state_ = STATE_SEND_REQUEST_HEADERS_COMPLETE; 710 711 session_->EnqueueStreamWrite( 712 GetWeakPtr(), SYN_STREAM, 713 scoped_ptr<SpdyBufferProducer>( 714 new SynStreamBufferProducer(GetWeakPtr()))); 715 return ERR_IO_PENDING; 716 } 717 718 namespace { 719 720 // Assuming we're in STATE_IDLE, maps the given type (which must not 721 // be SPDY_PUSH_STREAM) and send status to a result to return from 722 // DoSendRequestHeadersComplete() or DoOpen(). 723 int GetOpenStateResult(SpdyStreamType type, SpdySendStatus send_status) { 724 switch (type) { 725 case SPDY_BIDIRECTIONAL_STREAM: 726 // For bidirectional streams, there's nothing else to do. 727 DCHECK_EQ(send_status, MORE_DATA_TO_SEND); 728 return OK; 729 730 case SPDY_REQUEST_RESPONSE_STREAM: 731 // For request/response streams, wait for the delegate to send 732 // data if there's request data to send; we'll get called back 733 // when the send finishes. 734 if (send_status == MORE_DATA_TO_SEND) 735 return ERR_IO_PENDING; 736 737 return OK; 738 739 case SPDY_PUSH_STREAM: 740 // This should never be called for push streams. 741 break; 742 } 743 744 CHECK(false); 745 return ERR_UNEXPECTED; 746 } 747 748 } // namespace 749 750 int SpdyStream::DoSendRequestHeadersComplete() { 751 DCHECK_NE(type_, SPDY_PUSH_STREAM); 752 DCHECK_EQ(just_completed_frame_type_, SYN_STREAM); 753 DCHECK_NE(stream_id_, 0u); 754 755 io_state_ = STATE_IDLE; 756 757 CHECK(delegate_); 758 // Must not close |this|; if it does, it will trigger the |in_do_loop_| 759 // check in the destructor. 760 delegate_->OnRequestHeadersSent(); 761 762 return GetOpenStateResult(type_, send_status_); 763 } 764 765 int SpdyStream::DoOpen() { 766 DCHECK_NE(type_, SPDY_PUSH_STREAM); 767 768 if (just_completed_frame_type_ != DATA) { 769 NOTREACHED(); 770 return ERR_UNEXPECTED; 771 } 772 773 if (just_completed_frame_size_ < session_->GetDataFrameMinimumSize()) { 774 NOTREACHED(); 775 return ERR_UNEXPECTED; 776 } 777 778 size_t frame_payload_size = 779 just_completed_frame_size_ - session_->GetDataFrameMinimumSize(); 780 if (frame_payload_size > session_->GetDataFrameMaximumPayload()) { 781 NOTREACHED(); 782 return ERR_UNEXPECTED; 783 } 784 785 // Set |io_state_| first as |delegate_| may check it. 786 io_state_ = STATE_IDLE; 787 788 send_bytes_ += frame_payload_size; 789 790 pending_send_data_->DidConsume(frame_payload_size); 791 if (pending_send_data_->BytesRemaining() > 0) { 792 QueueNextDataFrame(); 793 return ERR_IO_PENDING; 794 } 795 796 pending_send_data_ = NULL; 797 798 CHECK(delegate_); 799 // Must not close |this|; if it does, it will trigger the 800 // |in_do_loop_| check in the destructor. 801 delegate_->OnDataSent(); 802 803 return GetOpenStateResult(type_, send_status_); 804 } 805 806 void SpdyStream::UpdateHistograms() { 807 // We need at least the receive timers to be filled in, as otherwise 808 // metrics can be bogus. 809 if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null()) 810 return; 811 812 base::TimeTicks effective_send_time; 813 if (type_ == SPDY_PUSH_STREAM) { 814 // Push streams shouldn't have |send_time_| filled in. 815 DCHECK(send_time_.is_null()); 816 effective_send_time = recv_first_byte_time_; 817 } else { 818 // For non-push streams, we also need |send_time_| to be filled 819 // in. 820 if (send_time_.is_null()) 821 return; 822 effective_send_time = send_time_; 823 } 824 825 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte", 826 recv_first_byte_time_ - effective_send_time); 827 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime", 828 recv_last_byte_time_ - recv_first_byte_time_); 829 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime", 830 recv_last_byte_time_ - effective_send_time); 831 832 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_); 833 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); 834 } 835 836 void SpdyStream::QueueNextDataFrame() { 837 // Until the request has been completely sent, we cannot be sure 838 // that our stream_id is correct. 839 DCHECK_GT(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE); 840 CHECK_GT(stream_id_, 0u); 841 CHECK(pending_send_data_.get()); 842 CHECK_GT(pending_send_data_->BytesRemaining(), 0); 843 844 SpdyDataFlags flags = 845 (send_status_ == NO_MORE_DATA_TO_SEND) ? 846 DATA_FLAG_FIN : DATA_FLAG_NONE; 847 scoped_ptr<SpdyBuffer> data_buffer( 848 session_->CreateDataBuffer(stream_id_, 849 pending_send_data_.get(), 850 pending_send_data_->BytesRemaining(), 851 flags)); 852 // We'll get called again by PossiblyResumeIfSendStalled(). 853 if (!data_buffer) 854 return; 855 856 if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { 857 DCHECK_GE(data_buffer->GetRemainingSize(), 858 session_->GetDataFrameMinimumSize()); 859 size_t payload_size = 860 data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize(); 861 DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload()); 862 DecreaseSendWindowSize(static_cast<int32>(payload_size)); 863 // This currently isn't strictly needed, since write frames are 864 // discarded only if the stream is about to be closed. But have it 865 // here anyway just in case this changes. 866 data_buffer->AddConsumeCallback( 867 base::Bind(&SpdyStream::OnWriteBufferConsumed, 868 GetWeakPtr(), payload_size)); 869 } 870 871 session_->EnqueueStreamWrite( 872 GetWeakPtr(), DATA, 873 scoped_ptr<SpdyBufferProducer>( 874 new SimpleBufferProducer(data_buffer.Pass()))); 875 } 876 877 int SpdyStream::MergeWithResponseHeaders( 878 const SpdyHeaderBlock& new_response_headers) { 879 if (new_response_headers.find("transfer-encoding") != 880 new_response_headers.end()) { 881 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 882 "Received transfer-encoding header"); 883 return ERR_SPDY_PROTOCOL_ERROR; 884 } 885 886 for (SpdyHeaderBlock::const_iterator it = new_response_headers.begin(); 887 it != new_response_headers.end(); ++it) { 888 // Disallow uppercase headers. 889 if (ContainsUppercaseAscii(it->first)) { 890 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 891 "Upper case characters in header: " + it->first); 892 return ERR_SPDY_PROTOCOL_ERROR; 893 } 894 895 SpdyHeaderBlock::iterator it2 = response_headers_.lower_bound(it->first); 896 // Disallow duplicate headers. This is just to be conservative. 897 if (it2 != response_headers_.end() && it2->first == it->first) { 898 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 899 "Duplicate header: " + it->first); 900 return ERR_SPDY_PROTOCOL_ERROR; 901 } 902 903 response_headers_.insert(it2, *it); 904 } 905 906 // If delegate_ is not yet attached, we'll call 907 // OnResponseHeadersUpdated() after the delegate gets attached to 908 // the stream. 909 if (delegate_) { 910 // The call to OnResponseHeadersUpdated() below may delete |this|, 911 // so use |weak_this| to detect that. 912 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); 913 914 SpdyResponseHeadersStatus status = 915 delegate_->OnResponseHeadersUpdated(response_headers_); 916 if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) { 917 // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not 918 // have been closed. 919 CHECK(weak_this); 920 // Incomplete headers are OK only for push streams. 921 if (type_ != SPDY_PUSH_STREAM) { 922 session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR, 923 "Incomplete headers"); 924 return ERR_INCOMPLETE_SPDY_HEADERS; 925 } 926 } else if (weak_this) { 927 response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE; 928 } 929 } 930 931 return OK; 932 } 933 934 } // namespace net 935