1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/spdy/spdy_stream.h" 6 7 #include "base/logging.h" 8 #include "base/message_loop.h" 9 #include "base/values.h" 10 #include "net/spdy/spdy_session.h" 11 12 namespace net { 13 14 namespace { 15 16 class NetLogSpdyStreamWindowUpdateParameter : public NetLog::EventParameters { 17 public: 18 NetLogSpdyStreamWindowUpdateParameter(spdy::SpdyStreamId stream_id, 19 int delta, 20 int window_size) 21 : stream_id_(stream_id), delta_(delta), window_size_(window_size) {} 22 virtual Value* ToValue() const { 23 DictionaryValue* dict = new DictionaryValue(); 24 dict->SetInteger("id", static_cast<int>(stream_id_)); 25 dict->SetInteger("delta", delta_); 26 dict->SetInteger("window_size", window_size_); 27 return dict; 28 } 29 private: 30 const spdy::SpdyStreamId stream_id_; 31 const int delta_; 32 const int window_size_; 33 DISALLOW_COPY_AND_ASSIGN(NetLogSpdyStreamWindowUpdateParameter); 34 }; 35 36 } 37 38 SpdyStream::SpdyStream(SpdySession* session, 39 spdy::SpdyStreamId stream_id, 40 bool pushed, 41 const BoundNetLog& net_log) 42 : continue_buffering_data_(true), 43 stream_id_(stream_id), 44 priority_(0), 45 stalled_by_flow_control_(false), 46 send_window_size_(spdy::kSpdyStreamInitialWindowSize), 47 recv_window_size_(spdy::kSpdyStreamInitialWindowSize), 48 pushed_(pushed), 49 response_received_(false), 50 session_(session), 51 delegate_(NULL), 52 request_time_(base::Time::Now()), 53 response_(new spdy::SpdyHeaderBlock), 54 io_state_(STATE_NONE), 55 response_status_(OK), 56 cancelled_(false), 57 has_upload_data_(false), 58 net_log_(net_log), 59 send_bytes_(0), 60 recv_bytes_(0) { 61 } 62 63 SpdyStream::~SpdyStream() { 64 UpdateHistograms(); 65 } 66 67 void SpdyStream::SetDelegate(Delegate* delegate) { 68 CHECK(delegate); 69 delegate_ = delegate; 70 71 if (pushed_) { 72 CHECK(response_received()); 73 MessageLoop::current()->PostTask( 74 FROM_HERE, NewRunnableMethod(this, 75 &SpdyStream::PushedStreamReplayData)); 76 } else { 77 continue_buffering_data_ = false; 78 } 79 } 80 81 void SpdyStream::PushedStreamReplayData() { 82 if (cancelled_ || !delegate_) 83 return; 84 85 continue_buffering_data_ = false; 86 87 int rv = delegate_->OnResponseReceived(*response_, response_time_, OK); 88 if (rv == ERR_INCOMPLETE_SPDY_HEADERS) { 89 // We don't have complete headers. Assume we're waiting for another 90 // HEADERS frame. Since we don't have headers, we had better not have 91 // any pending data frames. 92 DCHECK_EQ(0U, pending_buffers_.size()); 93 return; 94 } 95 96 std::vector<scoped_refptr<IOBufferWithSize> > buffers; 97 buffers.swap(pending_buffers_); 98 for (size_t i = 0; i < buffers.size(); ++i) { 99 // It is always possible that a callback to the delegate results in 100 // the delegate no longer being available. 101 if (!delegate_) 102 break; 103 if (buffers[i]) { 104 delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size()); 105 } else { 106 delegate_->OnDataReceived(NULL, 0); 107 session_->CloseStream(stream_id_, net::OK); 108 // Note: |this| may be deleted after calling CloseStream. 109 DCHECK_EQ(buffers.size() - 1, i); 110 } 111 } 112 } 113 114 void SpdyStream::DetachDelegate() { 115 if (delegate_) 116 delegate_->set_chunk_callback(NULL); 117 delegate_ = NULL; 118 if (!closed()) 119 Cancel(); 120 } 121 122 const linked_ptr<spdy::SpdyHeaderBlock>& SpdyStream::spdy_headers() const { 123 return request_; 124 } 125 126 void SpdyStream::set_spdy_headers( 127 const linked_ptr<spdy::SpdyHeaderBlock>& headers) { 128 request_ = headers; 129 } 130 131 void SpdyStream::IncreaseSendWindowSize(int delta_window_size) { 132 DCHECK_GE(delta_window_size, 1); 133 int new_window_size = send_window_size_ + delta_window_size; 134 135 // We should ignore WINDOW_UPDATEs received before or after this state, 136 // since before means we've not written SYN_STREAM yet (i.e. it's too 137 // early) and after means we've written a DATA frame with FIN bit. 138 if (io_state_ != STATE_SEND_BODY_COMPLETE) 139 return; 140 141 // it's valid for send_window_size_ to become negative (via an incoming 142 // SETTINGS), in which case incoming WINDOW_UPDATEs will eventually make 143 // it positive; however, if send_window_size_ is positive and incoming 144 // WINDOW_UPDATE makes it negative, we have an overflow. 145 if (send_window_size_ > 0 && new_window_size < 0) { 146 LOG(WARNING) << "Received WINDOW_UPDATE [delta:" << delta_window_size 147 << "] for stream " << stream_id_ 148 << " overflows send_window_size_ [current:" 149 << send_window_size_ << "]"; 150 session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR); 151 return; 152 } 153 154 send_window_size_ = new_window_size; 155 156 net_log_.AddEvent( 157 NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE, 158 make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter( 159 stream_id_, delta_window_size, send_window_size_))); 160 if (stalled_by_flow_control_) { 161 stalled_by_flow_control_ = false; 162 io_state_ = STATE_SEND_BODY; 163 DoLoop(OK); 164 } 165 } 166 167 void SpdyStream::DecreaseSendWindowSize(int delta_window_size) { 168 // we only call this method when sending a frame, therefore 169 // |delta_window_size| should be within the valid frame size range. 170 DCHECK_GE(delta_window_size, 1); 171 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); 172 173 // |send_window_size_| should have been at least |delta_window_size| for 174 // this call to happen. 175 DCHECK_GE(send_window_size_, delta_window_size); 176 177 send_window_size_ -= delta_window_size; 178 179 net_log_.AddEvent( 180 NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE, 181 make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter( 182 stream_id_, -delta_window_size, send_window_size_))); 183 } 184 185 void SpdyStream::IncreaseRecvWindowSize(int delta_window_size) { 186 DCHECK_GE(delta_window_size, 1); 187 // By the time a read is isued, stream may become inactive. 188 if (!session_->IsStreamActive(stream_id_)) 189 return; 190 int new_window_size = recv_window_size_ + delta_window_size; 191 if (recv_window_size_ > 0) 192 DCHECK(new_window_size > 0); 193 194 recv_window_size_ = new_window_size; 195 net_log_.AddEvent( 196 NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE, 197 make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter( 198 stream_id_, delta_window_size, recv_window_size_))); 199 session_->SendWindowUpdate(stream_id_, delta_window_size); 200 } 201 202 void SpdyStream::DecreaseRecvWindowSize(int delta_window_size) { 203 DCHECK_GE(delta_window_size, 1); 204 205 recv_window_size_ -= delta_window_size; 206 net_log_.AddEvent( 207 NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE, 208 make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter( 209 stream_id_, -delta_window_size, recv_window_size_))); 210 211 // Since we never decrease the initial window size, we should never hit 212 // a negative |recv_window_size_|, if we do, it's a flow-control violation. 213 if (recv_window_size_ < 0) 214 session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR); 215 } 216 217 int SpdyStream::GetPeerAddress(AddressList* address) const { 218 return session_->GetPeerAddress(address); 219 } 220 221 int SpdyStream::GetLocalAddress(IPEndPoint* address) const { 222 return session_->GetLocalAddress(address); 223 } 224 225 bool SpdyStream::WasEverUsed() const { 226 return session_->WasEverUsed(); 227 } 228 229 base::Time SpdyStream::GetRequestTime() const { 230 return request_time_; 231 } 232 233 void SpdyStream::SetRequestTime(base::Time t) { 234 request_time_ = t; 235 } 236 237 int SpdyStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response) { 238 int rv = OK; 239 240 metrics_.StartStream(); 241 242 DCHECK(response_->empty()); 243 *response_ = response; // TODO(ukai): avoid copy. 244 245 recv_first_byte_time_ = base::TimeTicks::Now(); 246 response_time_ = base::Time::Now(); 247 248 // If we receive a response before we are in STATE_WAITING_FOR_RESPONSE, then 249 // the server has sent the SYN_REPLY too early. 250 if (!pushed_ && io_state_ != STATE_WAITING_FOR_RESPONSE) 251 return ERR_SPDY_PROTOCOL_ERROR; 252 if (pushed_) 253 CHECK(io_state_ == STATE_NONE); 254 io_state_ = STATE_OPEN; 255 256 if (delegate_) 257 rv = delegate_->OnResponseReceived(*response_, response_time_, rv); 258 // If delegate_ is not yet attached, we'll call OnResponseReceived after the 259 // delegate gets attached to the stream. 260 261 return rv; 262 } 263 264 int SpdyStream::OnHeaders(const spdy::SpdyHeaderBlock& headers) { 265 DCHECK(!response_->empty()); 266 267 // Append all the headers into the response header block. 268 for (spdy::SpdyHeaderBlock::const_iterator it = headers.begin(); 269 it != headers.end(); ++it) { 270 // Disallow duplicate headers. This is just to be conservative. 271 if ((*response_).find(it->first) != (*response_).end()) { 272 LOG(WARNING) << "HEADERS duplicate header"; 273 response_status_ = ERR_SPDY_PROTOCOL_ERROR; 274 return ERR_SPDY_PROTOCOL_ERROR; 275 } 276 277 (*response_)[it->first] = it->second; 278 } 279 280 int rv = OK; 281 if (delegate_) { 282 rv = delegate_->OnResponseReceived(*response_, response_time_, rv); 283 // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more 284 // headers before the response header block is complete. 285 if (rv == ERR_INCOMPLETE_SPDY_HEADERS) 286 rv = OK; 287 } 288 return rv; 289 } 290 291 void SpdyStream::OnDataReceived(const char* data, int length) { 292 DCHECK_GE(length, 0); 293 294 // If we don't have a response, then the SYN_REPLY did not come through. 295 // We cannot pass data up to the caller unless the reply headers have been 296 // received. 297 if (!response_received()) { 298 session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED); 299 return; 300 } 301 302 if (!delegate_ || continue_buffering_data_) { 303 // It should be valid for this to happen in the server push case. 304 // We'll return received data when delegate gets attached to the stream. 305 if (length > 0) { 306 IOBufferWithSize* buf = new IOBufferWithSize(length); 307 memcpy(buf->data(), data, length); 308 pending_buffers_.push_back(make_scoped_refptr(buf)); 309 } else { 310 pending_buffers_.push_back(NULL); 311 metrics_.StopStream(); 312 // Note: we leave the stream open in the session until the stream 313 // is claimed. 314 } 315 return; 316 } 317 318 CHECK(!closed()); 319 320 // A zero-length read means that the stream is being closed. 321 if (!length) { 322 metrics_.StopStream(); 323 session_->CloseStream(stream_id_, net::OK); 324 // Note: |this| may be deleted after calling CloseStream. 325 return; 326 } 327 328 if (session_->flow_control()) 329 DecreaseRecvWindowSize(length); 330 331 // Track our bandwidth. 332 metrics_.RecordBytes(length); 333 recv_bytes_ += length; 334 recv_last_byte_time_ = base::TimeTicks::Now(); 335 336 if (!delegate_) { 337 // It should be valid for this to happen in the server push case. 338 // We'll return received data when delegate gets attached to the stream. 339 IOBufferWithSize* buf = new IOBufferWithSize(length); 340 memcpy(buf->data(), data, length); 341 pending_buffers_.push_back(make_scoped_refptr(buf)); 342 return; 343 } 344 345 delegate_->OnDataReceived(data, length); 346 } 347 348 // This function is only called when an entire frame is written. 349 void SpdyStream::OnWriteComplete(int bytes) { 350 DCHECK_LE(0, bytes); 351 send_bytes_ += bytes; 352 if (cancelled() || closed()) 353 return; 354 DoLoop(bytes); 355 } 356 357 void SpdyStream::OnChunkAvailable() { 358 DCHECK(io_state_ == STATE_SEND_HEADERS || io_state_ == STATE_SEND_BODY || 359 io_state_ == STATE_SEND_BODY_COMPLETE); 360 if (io_state_ == STATE_SEND_BODY) 361 OnWriteComplete(0); 362 } 363 364 void SpdyStream::OnClose(int status) { 365 io_state_ = STATE_DONE; 366 response_status_ = status; 367 Delegate* delegate = delegate_; 368 delegate_ = NULL; 369 if (delegate) { 370 delegate->set_chunk_callback(NULL); 371 delegate->OnClose(status); 372 } 373 } 374 375 void SpdyStream::Cancel() { 376 if (cancelled()) 377 return; 378 379 cancelled_ = true; 380 if (session_->IsStreamActive(stream_id_)) 381 session_->ResetStream(stream_id_, spdy::CANCEL); 382 } 383 384 int SpdyStream::SendRequest(bool has_upload_data) { 385 if (delegate_) 386 delegate_->set_chunk_callback(this); 387 388 // Pushed streams do not send any data, and should always be in STATE_OPEN or 389 // STATE_DONE. However, we still want to return IO_PENDING to mimic non-push 390 // behavior. 391 has_upload_data_ = has_upload_data; 392 if (pushed_) { 393 send_time_ = base::TimeTicks::Now(); 394 DCHECK(!has_upload_data_); 395 DCHECK(response_received()); 396 return ERR_IO_PENDING; 397 } 398 CHECK_EQ(STATE_NONE, io_state_); 399 io_state_ = STATE_SEND_HEADERS; 400 return DoLoop(OK); 401 } 402 403 int SpdyStream::WriteStreamData(IOBuffer* data, int length, 404 spdy::SpdyDataFlags flags) { 405 return session_->WriteStreamData(stream_id_, data, length, flags); 406 } 407 408 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) { 409 return session_->GetSSLInfo(ssl_info, was_npn_negotiated); 410 } 411 412 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) { 413 return session_->GetSSLCertRequestInfo(cert_request_info); 414 } 415 416 bool SpdyStream::HasUrl() const { 417 if (pushed_) 418 return response_received(); 419 return request_.get() != NULL; 420 } 421 422 GURL SpdyStream::GetUrl() const { 423 DCHECK(HasUrl()); 424 425 if (pushed_) { 426 // assemble from the response 427 std::string url; 428 spdy::SpdyHeaderBlock::const_iterator it; 429 it = response_->find("url"); 430 if (it != (*response_).end()) 431 url = it->second; 432 return GURL(url); 433 } 434 435 // assemble from the request 436 std::string scheme; 437 std::string host_port; 438 std::string path; 439 spdy::SpdyHeaderBlock::const_iterator it; 440 it = request_->find("scheme"); 441 if (it != (*request_).end()) 442 scheme = it->second; 443 it = request_->find("host"); 444 if (it != (*request_).end()) 445 host_port = it->second; 446 it = request_->find("path"); 447 if (it != (*request_).end()) 448 path = it->second; 449 std::string url = scheme + "://" + host_port + path; 450 return GURL(url); 451 } 452 453 int SpdyStream::DoLoop(int result) { 454 do { 455 State state = io_state_; 456 io_state_ = STATE_NONE; 457 switch (state) { 458 // State machine 1: Send headers and body. 459 case STATE_SEND_HEADERS: 460 CHECK_EQ(OK, result); 461 result = DoSendHeaders(); 462 break; 463 case STATE_SEND_HEADERS_COMPLETE: 464 result = DoSendHeadersComplete(result); 465 break; 466 case STATE_SEND_BODY: 467 CHECK_EQ(OK, result); 468 result = DoSendBody(); 469 break; 470 case STATE_SEND_BODY_COMPLETE: 471 result = DoSendBodyComplete(result); 472 break; 473 // This is an intermediary waiting state. This state is reached when all 474 // data has been sent, but no data has been received. 475 case STATE_WAITING_FOR_RESPONSE: 476 io_state_ = STATE_WAITING_FOR_RESPONSE; 477 result = ERR_IO_PENDING; 478 break; 479 // State machine 2: connection is established. 480 // In STATE_OPEN, OnResponseReceived has already been called. 481 // OnDataReceived, OnClose and OnWriteCompelte can be called. 482 // Only OnWriteCompletee calls DoLoop((). 483 // 484 // For HTTP streams, no data is sent from the client while in the OPEN 485 // state, so OnWriteComplete is never called here. The HTTP body is 486 // handled in the OnDataReceived callback, which does not call into 487 // DoLoop. 488 // 489 // For WebSocket streams, which are bi-directional, we'll send and 490 // receive data once the connection is established. Received data is 491 // handled in OnDataReceived. Sent data is handled in OnWriteComplete, 492 // which calls DoOpen(). 493 case STATE_OPEN: 494 result = DoOpen(result); 495 break; 496 497 case STATE_DONE: 498 DCHECK(result != ERR_IO_PENDING); 499 break; 500 default: 501 NOTREACHED() << io_state_; 502 break; 503 } 504 } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE && 505 io_state_ != STATE_OPEN); 506 507 return result; 508 } 509 510 int SpdyStream::DoSendHeaders() { 511 CHECK(!cancelled_); 512 513 spdy::SpdyControlFlags flags = spdy::CONTROL_FLAG_NONE; 514 if (!has_upload_data_) 515 flags = spdy::CONTROL_FLAG_FIN; 516 517 CHECK(request_.get()); 518 int result = session_->WriteSynStream( 519 stream_id_, static_cast<RequestPriority>(priority_), flags, 520 request_); 521 if (result != ERR_IO_PENDING) 522 return result; 523 524 send_time_ = base::TimeTicks::Now(); 525 io_state_ = STATE_SEND_HEADERS_COMPLETE; 526 return ERR_IO_PENDING; 527 } 528 529 int SpdyStream::DoSendHeadersComplete(int result) { 530 if (result < 0) 531 return result; 532 533 CHECK_GT(result, 0); 534 535 if (!delegate_) 536 return ERR_UNEXPECTED; 537 538 // There is no body, skip that state. 539 if (delegate_->OnSendHeadersComplete(result)) { 540 io_state_ = STATE_WAITING_FOR_RESPONSE; 541 return OK; 542 } 543 544 io_state_ = STATE_SEND_BODY; 545 return OK; 546 } 547 548 // DoSendBody is called to send the optional body for the request. This call 549 // will also be called as each write of a chunk of the body completes. 550 int SpdyStream::DoSendBody() { 551 // If we're already in the STATE_SENDING_BODY state, then we've already 552 // sent a portion of the body. In that case, we need to first consume 553 // the bytes written in the body stream. Note that the bytes written is 554 // the number of bytes in the frame that were written, only consume the 555 // data portion, of course. 556 io_state_ = STATE_SEND_BODY_COMPLETE; 557 if (!delegate_) 558 return ERR_UNEXPECTED; 559 return delegate_->OnSendBody(); 560 } 561 562 int SpdyStream::DoSendBodyComplete(int result) { 563 if (result < 0) 564 return result; 565 566 if (!delegate_) 567 return ERR_UNEXPECTED; 568 569 bool eof = false; 570 result = delegate_->OnSendBodyComplete(result, &eof); 571 if (!eof) 572 io_state_ = STATE_SEND_BODY; 573 else 574 io_state_ = STATE_WAITING_FOR_RESPONSE; 575 576 return result; 577 } 578 579 int SpdyStream::DoOpen(int result) { 580 if (delegate_) 581 delegate_->OnDataSent(result); 582 io_state_ = STATE_OPEN; 583 return result; 584 } 585 586 void SpdyStream::UpdateHistograms() { 587 // We need all timers to be filled in, otherwise metrics can be bogus. 588 if (send_time_.is_null() || recv_first_byte_time_.is_null() || 589 recv_last_byte_time_.is_null()) 590 return; 591 592 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte", 593 recv_first_byte_time_ - send_time_); 594 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime", 595 recv_last_byte_time_ - recv_first_byte_time_); 596 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime", 597 recv_last_byte_time_ - send_time_); 598 599 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_); 600 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); 601 } 602 603 } // namespace net 604