Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_stream.h"
      6 
      7 #include "base/logging.h"
      8 #include "base/message_loop.h"
      9 #include "base/values.h"
     10 #include "net/spdy/spdy_session.h"
     11 
     12 namespace net {
     13 
     14 namespace {
     15 
     16 class NetLogSpdyStreamWindowUpdateParameter : public NetLog::EventParameters {
     17  public:
     18   NetLogSpdyStreamWindowUpdateParameter(spdy::SpdyStreamId stream_id,
     19                                         int delta,
     20                                         int window_size)
     21       : stream_id_(stream_id), delta_(delta), window_size_(window_size) {}
     22   virtual Value* ToValue() const {
     23     DictionaryValue* dict = new DictionaryValue();
     24     dict->SetInteger("id", static_cast<int>(stream_id_));
     25     dict->SetInteger("delta", delta_);
     26     dict->SetInteger("window_size", window_size_);
     27     return dict;
     28   }
     29  private:
     30   const spdy::SpdyStreamId stream_id_;
     31   const int delta_;
     32   const int window_size_;
     33   DISALLOW_COPY_AND_ASSIGN(NetLogSpdyStreamWindowUpdateParameter);
     34 };
     35 
     36 }
     37 
     38 SpdyStream::SpdyStream(SpdySession* session,
     39                        spdy::SpdyStreamId stream_id,
     40                        bool pushed,
     41                        const BoundNetLog& net_log)
     42     : continue_buffering_data_(true),
     43       stream_id_(stream_id),
     44       priority_(0),
     45       stalled_by_flow_control_(false),
     46       send_window_size_(spdy::kSpdyStreamInitialWindowSize),
     47       recv_window_size_(spdy::kSpdyStreamInitialWindowSize),
     48       pushed_(pushed),
     49       response_received_(false),
     50       session_(session),
     51       delegate_(NULL),
     52       request_time_(base::Time::Now()),
     53       response_(new spdy::SpdyHeaderBlock),
     54       io_state_(STATE_NONE),
     55       response_status_(OK),
     56       cancelled_(false),
     57       has_upload_data_(false),
     58       net_log_(net_log),
     59       send_bytes_(0),
     60       recv_bytes_(0) {
     61 }
     62 
     63 SpdyStream::~SpdyStream() {
     64   UpdateHistograms();
     65 }
     66 
     67 void SpdyStream::SetDelegate(Delegate* delegate) {
     68   CHECK(delegate);
     69   delegate_ = delegate;
     70 
     71   if (pushed_) {
     72     CHECK(response_received());
     73     MessageLoop::current()->PostTask(
     74         FROM_HERE, NewRunnableMethod(this,
     75                                      &SpdyStream::PushedStreamReplayData));
     76   } else {
     77     continue_buffering_data_ = false;
     78   }
     79 }
     80 
     81 void SpdyStream::PushedStreamReplayData() {
     82   if (cancelled_ || !delegate_)
     83     return;
     84 
     85   continue_buffering_data_ = false;
     86 
     87   int rv = delegate_->OnResponseReceived(*response_, response_time_, OK);
     88   if (rv == ERR_INCOMPLETE_SPDY_HEADERS) {
     89     // We don't have complete headers.  Assume we're waiting for another
     90     // HEADERS frame.  Since we don't have headers, we had better not have
     91     // any pending data frames.
     92     DCHECK_EQ(0U, pending_buffers_.size());
     93     return;
     94   }
     95 
     96   std::vector<scoped_refptr<IOBufferWithSize> > buffers;
     97   buffers.swap(pending_buffers_);
     98   for (size_t i = 0; i < buffers.size(); ++i) {
     99     // It is always possible that a callback to the delegate results in
    100     // the delegate no longer being available.
    101     if (!delegate_)
    102       break;
    103     if (buffers[i]) {
    104       delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size());
    105     } else {
    106       delegate_->OnDataReceived(NULL, 0);
    107       session_->CloseStream(stream_id_, net::OK);
    108       // Note: |this| may be deleted after calling CloseStream.
    109       DCHECK_EQ(buffers.size() - 1, i);
    110     }
    111   }
    112 }
    113 
    114 void SpdyStream::DetachDelegate() {
    115   if (delegate_)
    116     delegate_->set_chunk_callback(NULL);
    117   delegate_ = NULL;
    118   if (!closed())
    119     Cancel();
    120 }
    121 
    122 const linked_ptr<spdy::SpdyHeaderBlock>& SpdyStream::spdy_headers() const {
    123   return request_;
    124 }
    125 
    126 void SpdyStream::set_spdy_headers(
    127     const linked_ptr<spdy::SpdyHeaderBlock>& headers) {
    128   request_ = headers;
    129 }
    130 
    131 void SpdyStream::IncreaseSendWindowSize(int delta_window_size) {
    132   DCHECK_GE(delta_window_size, 1);
    133   int new_window_size = send_window_size_ + delta_window_size;
    134 
    135   // We should ignore WINDOW_UPDATEs received before or after this state,
    136   // since before means we've not written SYN_STREAM yet (i.e. it's too
    137   // early) and after means we've written a DATA frame with FIN bit.
    138   if (io_state_ != STATE_SEND_BODY_COMPLETE)
    139     return;
    140 
    141   // it's valid for send_window_size_ to become negative (via an incoming
    142   // SETTINGS), in which case incoming WINDOW_UPDATEs will eventually make
    143   // it positive; however, if send_window_size_ is positive and incoming
    144   // WINDOW_UPDATE makes it negative, we have an overflow.
    145   if (send_window_size_ > 0 && new_window_size < 0) {
    146     LOG(WARNING) << "Received WINDOW_UPDATE [delta:" << delta_window_size
    147                  << "] for stream " << stream_id_
    148                  << " overflows send_window_size_ [current:"
    149                  << send_window_size_ << "]";
    150     session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR);
    151     return;
    152   }
    153 
    154   send_window_size_ = new_window_size;
    155 
    156   net_log_.AddEvent(
    157       NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE,
    158       make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
    159           stream_id_, delta_window_size, send_window_size_)));
    160   if (stalled_by_flow_control_) {
    161     stalled_by_flow_control_ = false;
    162     io_state_ = STATE_SEND_BODY;
    163     DoLoop(OK);
    164   }
    165 }
    166 
    167 void SpdyStream::DecreaseSendWindowSize(int delta_window_size) {
    168   // we only call this method when sending a frame, therefore
    169   // |delta_window_size| should be within the valid frame size range.
    170   DCHECK_GE(delta_window_size, 1);
    171   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
    172 
    173   // |send_window_size_| should have been at least |delta_window_size| for
    174   // this call to happen.
    175   DCHECK_GE(send_window_size_, delta_window_size);
    176 
    177   send_window_size_ -= delta_window_size;
    178 
    179   net_log_.AddEvent(
    180       NetLog::TYPE_SPDY_STREAM_SEND_WINDOW_UPDATE,
    181       make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
    182           stream_id_, -delta_window_size, send_window_size_)));
    183 }
    184 
    185 void SpdyStream::IncreaseRecvWindowSize(int delta_window_size) {
    186   DCHECK_GE(delta_window_size, 1);
    187   // By the time a read is isued, stream may become inactive.
    188   if (!session_->IsStreamActive(stream_id_))
    189     return;
    190   int new_window_size = recv_window_size_ + delta_window_size;
    191   if (recv_window_size_ > 0)
    192     DCHECK(new_window_size > 0);
    193 
    194   recv_window_size_ = new_window_size;
    195   net_log_.AddEvent(
    196       NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE,
    197       make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
    198           stream_id_, delta_window_size, recv_window_size_)));
    199   session_->SendWindowUpdate(stream_id_, delta_window_size);
    200 }
    201 
    202 void SpdyStream::DecreaseRecvWindowSize(int delta_window_size) {
    203   DCHECK_GE(delta_window_size, 1);
    204 
    205   recv_window_size_ -= delta_window_size;
    206   net_log_.AddEvent(
    207       NetLog::TYPE_SPDY_STREAM_RECV_WINDOW_UPDATE,
    208       make_scoped_refptr(new NetLogSpdyStreamWindowUpdateParameter(
    209           stream_id_, -delta_window_size, recv_window_size_)));
    210 
    211   // Since we never decrease the initial window size, we should never hit
    212   // a negative |recv_window_size_|, if we do, it's a flow-control violation.
    213   if (recv_window_size_ < 0)
    214     session_->ResetStream(stream_id_, spdy::FLOW_CONTROL_ERROR);
    215 }
    216 
    217 int SpdyStream::GetPeerAddress(AddressList* address) const {
    218   return session_->GetPeerAddress(address);
    219 }
    220 
    221 int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
    222   return session_->GetLocalAddress(address);
    223 }
    224 
    225 bool SpdyStream::WasEverUsed() const {
    226   return session_->WasEverUsed();
    227 }
    228 
    229 base::Time SpdyStream::GetRequestTime() const {
    230   return request_time_;
    231 }
    232 
    233 void SpdyStream::SetRequestTime(base::Time t) {
    234   request_time_ = t;
    235 }
    236 
    237 int SpdyStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response) {
    238   int rv = OK;
    239 
    240   metrics_.StartStream();
    241 
    242   DCHECK(response_->empty());
    243   *response_ = response;  // TODO(ukai): avoid copy.
    244 
    245   recv_first_byte_time_ = base::TimeTicks::Now();
    246   response_time_ = base::Time::Now();
    247 
    248   // If we receive a response before we are in STATE_WAITING_FOR_RESPONSE, then
    249   // the server has sent the SYN_REPLY too early.
    250   if (!pushed_ && io_state_ != STATE_WAITING_FOR_RESPONSE)
    251     return ERR_SPDY_PROTOCOL_ERROR;
    252   if (pushed_)
    253     CHECK(io_state_ == STATE_NONE);
    254   io_state_ = STATE_OPEN;
    255 
    256   if (delegate_)
    257     rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
    258   // If delegate_ is not yet attached, we'll call OnResponseReceived after the
    259   // delegate gets attached to the stream.
    260 
    261   return rv;
    262 }
    263 
    264 int SpdyStream::OnHeaders(const spdy::SpdyHeaderBlock& headers) {
    265   DCHECK(!response_->empty());
    266 
    267   // Append all the headers into the response header block.
    268   for (spdy::SpdyHeaderBlock::const_iterator it = headers.begin();
    269       it != headers.end(); ++it) {
    270     // Disallow duplicate headers.  This is just to be conservative.
    271     if ((*response_).find(it->first) != (*response_).end()) {
    272       LOG(WARNING) << "HEADERS duplicate header";
    273       response_status_ = ERR_SPDY_PROTOCOL_ERROR;
    274       return ERR_SPDY_PROTOCOL_ERROR;
    275     }
    276 
    277     (*response_)[it->first] = it->second;
    278   }
    279 
    280   int rv = OK;
    281   if (delegate_) {
    282     rv = delegate_->OnResponseReceived(*response_, response_time_, rv);
    283     // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more
    284     // headers before the response header block is complete.
    285     if (rv == ERR_INCOMPLETE_SPDY_HEADERS)
    286       rv = OK;
    287   }
    288   return rv;
    289 }
    290 
    291 void SpdyStream::OnDataReceived(const char* data, int length) {
    292   DCHECK_GE(length, 0);
    293 
    294   // If we don't have a response, then the SYN_REPLY did not come through.
    295   // We cannot pass data up to the caller unless the reply headers have been
    296   // received.
    297   if (!response_received()) {
    298     session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED);
    299     return;
    300   }
    301 
    302   if (!delegate_ || continue_buffering_data_) {
    303     // It should be valid for this to happen in the server push case.
    304     // We'll return received data when delegate gets attached to the stream.
    305     if (length > 0) {
    306       IOBufferWithSize* buf = new IOBufferWithSize(length);
    307       memcpy(buf->data(), data, length);
    308       pending_buffers_.push_back(make_scoped_refptr(buf));
    309     } else {
    310       pending_buffers_.push_back(NULL);
    311       metrics_.StopStream();
    312       // Note: we leave the stream open in the session until the stream
    313       //       is claimed.
    314     }
    315     return;
    316   }
    317 
    318   CHECK(!closed());
    319 
    320   // A zero-length read means that the stream is being closed.
    321   if (!length) {
    322     metrics_.StopStream();
    323     session_->CloseStream(stream_id_, net::OK);
    324     // Note: |this| may be deleted after calling CloseStream.
    325     return;
    326   }
    327 
    328   if (session_->flow_control())
    329     DecreaseRecvWindowSize(length);
    330 
    331   // Track our bandwidth.
    332   metrics_.RecordBytes(length);
    333   recv_bytes_ += length;
    334   recv_last_byte_time_ = base::TimeTicks::Now();
    335 
    336   if (!delegate_) {
    337     // It should be valid for this to happen in the server push case.
    338     // We'll return received data when delegate gets attached to the stream.
    339     IOBufferWithSize* buf = new IOBufferWithSize(length);
    340     memcpy(buf->data(), data, length);
    341     pending_buffers_.push_back(make_scoped_refptr(buf));
    342     return;
    343   }
    344 
    345   delegate_->OnDataReceived(data, length);
    346 }
    347 
    348 // This function is only called when an entire frame is written.
    349 void SpdyStream::OnWriteComplete(int bytes) {
    350   DCHECK_LE(0, bytes);
    351   send_bytes_ += bytes;
    352   if (cancelled() || closed())
    353     return;
    354   DoLoop(bytes);
    355 }
    356 
    357 void SpdyStream::OnChunkAvailable() {
    358   DCHECK(io_state_ == STATE_SEND_HEADERS || io_state_ == STATE_SEND_BODY ||
    359          io_state_ == STATE_SEND_BODY_COMPLETE);
    360   if (io_state_ == STATE_SEND_BODY)
    361     OnWriteComplete(0);
    362 }
    363 
    364 void SpdyStream::OnClose(int status) {
    365   io_state_ = STATE_DONE;
    366   response_status_ = status;
    367   Delegate* delegate = delegate_;
    368   delegate_ = NULL;
    369   if (delegate) {
    370     delegate->set_chunk_callback(NULL);
    371     delegate->OnClose(status);
    372   }
    373 }
    374 
    375 void SpdyStream::Cancel() {
    376   if (cancelled())
    377     return;
    378 
    379   cancelled_ = true;
    380   if (session_->IsStreamActive(stream_id_))
    381     session_->ResetStream(stream_id_, spdy::CANCEL);
    382 }
    383 
    384 int SpdyStream::SendRequest(bool has_upload_data) {
    385   if (delegate_)
    386     delegate_->set_chunk_callback(this);
    387 
    388   // Pushed streams do not send any data, and should always be in STATE_OPEN or
    389   // STATE_DONE. However, we still want to return IO_PENDING to mimic non-push
    390   // behavior.
    391   has_upload_data_ = has_upload_data;
    392   if (pushed_) {
    393     send_time_ = base::TimeTicks::Now();
    394     DCHECK(!has_upload_data_);
    395     DCHECK(response_received());
    396     return ERR_IO_PENDING;
    397   }
    398   CHECK_EQ(STATE_NONE, io_state_);
    399   io_state_ = STATE_SEND_HEADERS;
    400   return DoLoop(OK);
    401 }
    402 
    403 int SpdyStream::WriteStreamData(IOBuffer* data, int length,
    404                                 spdy::SpdyDataFlags flags) {
    405   return session_->WriteStreamData(stream_id_, data, length, flags);
    406 }
    407 
    408 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) {
    409   return session_->GetSSLInfo(ssl_info, was_npn_negotiated);
    410 }
    411 
    412 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
    413   return session_->GetSSLCertRequestInfo(cert_request_info);
    414 }
    415 
    416 bool SpdyStream::HasUrl() const {
    417   if (pushed_)
    418     return response_received();
    419   return request_.get() != NULL;
    420 }
    421 
    422 GURL SpdyStream::GetUrl() const {
    423   DCHECK(HasUrl());
    424 
    425   if (pushed_) {
    426     // assemble from the response
    427     std::string url;
    428     spdy::SpdyHeaderBlock::const_iterator it;
    429     it = response_->find("url");
    430     if (it != (*response_).end())
    431       url = it->second;
    432     return GURL(url);
    433   }
    434 
    435   // assemble from the request
    436   std::string scheme;
    437   std::string host_port;
    438   std::string path;
    439   spdy::SpdyHeaderBlock::const_iterator it;
    440   it = request_->find("scheme");
    441   if (it != (*request_).end())
    442     scheme = it->second;
    443   it = request_->find("host");
    444   if (it != (*request_).end())
    445     host_port = it->second;
    446   it = request_->find("path");
    447   if (it != (*request_).end())
    448     path = it->second;
    449   std::string url = scheme + "://" + host_port + path;
    450   return GURL(url);
    451 }
    452 
    453 int SpdyStream::DoLoop(int result) {
    454   do {
    455     State state = io_state_;
    456     io_state_ = STATE_NONE;
    457     switch (state) {
    458       // State machine 1: Send headers and body.
    459       case STATE_SEND_HEADERS:
    460         CHECK_EQ(OK, result);
    461         result = DoSendHeaders();
    462         break;
    463       case STATE_SEND_HEADERS_COMPLETE:
    464         result = DoSendHeadersComplete(result);
    465         break;
    466       case STATE_SEND_BODY:
    467         CHECK_EQ(OK, result);
    468         result = DoSendBody();
    469         break;
    470       case STATE_SEND_BODY_COMPLETE:
    471         result = DoSendBodyComplete(result);
    472         break;
    473       // This is an intermediary waiting state. This state is reached when all
    474       // data has been sent, but no data has been received.
    475       case STATE_WAITING_FOR_RESPONSE:
    476         io_state_ = STATE_WAITING_FOR_RESPONSE;
    477         result = ERR_IO_PENDING;
    478         break;
    479       // State machine 2: connection is established.
    480       // In STATE_OPEN, OnResponseReceived has already been called.
    481       // OnDataReceived, OnClose and OnWriteCompelte can be called.
    482       // Only OnWriteCompletee calls DoLoop(().
    483       //
    484       // For HTTP streams, no data is sent from the client while in the OPEN
    485       // state, so OnWriteComplete is never called here.  The HTTP body is
    486       // handled in the OnDataReceived callback, which does not call into
    487       // DoLoop.
    488       //
    489       // For WebSocket streams, which are bi-directional, we'll send and
    490       // receive data once the connection is established.  Received data is
    491       // handled in OnDataReceived.  Sent data is handled in OnWriteComplete,
    492       // which calls DoOpen().
    493       case STATE_OPEN:
    494         result = DoOpen(result);
    495         break;
    496 
    497       case STATE_DONE:
    498         DCHECK(result != ERR_IO_PENDING);
    499         break;
    500       default:
    501         NOTREACHED() << io_state_;
    502         break;
    503     }
    504   } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE &&
    505            io_state_ != STATE_OPEN);
    506 
    507   return result;
    508 }
    509 
    510 int SpdyStream::DoSendHeaders() {
    511   CHECK(!cancelled_);
    512 
    513   spdy::SpdyControlFlags flags = spdy::CONTROL_FLAG_NONE;
    514   if (!has_upload_data_)
    515     flags = spdy::CONTROL_FLAG_FIN;
    516 
    517   CHECK(request_.get());
    518   int result = session_->WriteSynStream(
    519       stream_id_, static_cast<RequestPriority>(priority_), flags,
    520       request_);
    521   if (result != ERR_IO_PENDING)
    522     return result;
    523 
    524   send_time_ = base::TimeTicks::Now();
    525   io_state_ = STATE_SEND_HEADERS_COMPLETE;
    526   return ERR_IO_PENDING;
    527 }
    528 
    529 int SpdyStream::DoSendHeadersComplete(int result) {
    530   if (result < 0)
    531     return result;
    532 
    533   CHECK_GT(result, 0);
    534 
    535   if (!delegate_)
    536     return ERR_UNEXPECTED;
    537 
    538   // There is no body, skip that state.
    539   if (delegate_->OnSendHeadersComplete(result)) {
    540     io_state_ = STATE_WAITING_FOR_RESPONSE;
    541     return OK;
    542   }
    543 
    544   io_state_ = STATE_SEND_BODY;
    545   return OK;
    546 }
    547 
    548 // DoSendBody is called to send the optional body for the request.  This call
    549 // will also be called as each write of a chunk of the body completes.
    550 int SpdyStream::DoSendBody() {
    551   // If we're already in the STATE_SENDING_BODY state, then we've already
    552   // sent a portion of the body.  In that case, we need to first consume
    553   // the bytes written in the body stream.  Note that the bytes written is
    554   // the number of bytes in the frame that were written, only consume the
    555   // data portion, of course.
    556   io_state_ = STATE_SEND_BODY_COMPLETE;
    557   if (!delegate_)
    558     return ERR_UNEXPECTED;
    559   return delegate_->OnSendBody();
    560 }
    561 
    562 int SpdyStream::DoSendBodyComplete(int result) {
    563   if (result < 0)
    564     return result;
    565 
    566   if (!delegate_)
    567     return ERR_UNEXPECTED;
    568 
    569   bool eof = false;
    570   result = delegate_->OnSendBodyComplete(result, &eof);
    571   if (!eof)
    572     io_state_ = STATE_SEND_BODY;
    573   else
    574     io_state_ = STATE_WAITING_FOR_RESPONSE;
    575 
    576   return result;
    577 }
    578 
    579 int SpdyStream::DoOpen(int result) {
    580   if (delegate_)
    581     delegate_->OnDataSent(result);
    582   io_state_ = STATE_OPEN;
    583   return result;
    584 }
    585 
    586 void SpdyStream::UpdateHistograms() {
    587   // We need all timers to be filled in, otherwise metrics can be bogus.
    588   if (send_time_.is_null() || recv_first_byte_time_.is_null() ||
    589       recv_last_byte_time_.is_null())
    590     return;
    591 
    592   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
    593       recv_first_byte_time_ - send_time_);
    594   UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
    595       recv_last_byte_time_ - recv_first_byte_time_);
    596   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
    597       recv_last_byte_time_ - send_time_);
    598 
    599   UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
    600   UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
    601 }
    602 
    603 }  // namespace net
    604