Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_http_stream.h"
      6 
      7 #include <algorithm>
      8 #include <list>
      9 
     10 #include "base/bind.h"
     11 #include "base/logging.h"
     12 #include "base/message_loop/message_loop.h"
     13 #include "base/strings/stringprintf.h"
     14 #include "net/base/host_port_pair.h"
     15 #include "net/base/net_log.h"
     16 #include "net/base/net_util.h"
     17 #include "net/base/upload_data_stream.h"
     18 #include "net/http/http_request_headers.h"
     19 #include "net/http/http_request_info.h"
     20 #include "net/http/http_response_info.h"
     21 #include "net/spdy/spdy_header_block.h"
     22 #include "net/spdy/spdy_http_utils.h"
     23 #include "net/spdy/spdy_protocol.h"
     24 #include "net/spdy/spdy_session.h"
     25 
     26 namespace net {
     27 
     28 SpdyHttpStream::SpdyHttpStream(const base::WeakPtr<SpdySession>& spdy_session,
     29                                bool direct)
     30     : spdy_session_(spdy_session),
     31       is_reused_(spdy_session_->IsReused()),
     32       stream_closed_(false),
     33       closed_stream_status_(ERR_FAILED),
     34       closed_stream_id_(0),
     35       closed_stream_received_bytes_(0),
     36       request_info_(NULL),
     37       response_info_(NULL),
     38       response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
     39       user_buffer_len_(0),
     40       request_body_buf_size_(0),
     41       buffered_read_callback_pending_(false),
     42       more_read_data_pending_(false),
     43       direct_(direct),
     44       weak_factory_(this) {
     45   DCHECK(spdy_session_.get());
     46 }
     47 
     48 SpdyHttpStream::~SpdyHttpStream() {
     49   if (stream_.get()) {
     50     stream_->DetachDelegate();
     51     DCHECK(!stream_.get());
     52   }
     53 }
     54 
     55 int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
     56                                      RequestPriority priority,
     57                                      const BoundNetLog& stream_net_log,
     58                                      const CompletionCallback& callback) {
     59   DCHECK(!stream_);
     60   if (!spdy_session_)
     61     return ERR_CONNECTION_CLOSED;
     62 
     63   request_info_ = request_info;
     64   if (request_info_->method == "GET") {
     65     int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
     66                                              stream_net_log);
     67     if (error != OK)
     68       return error;
     69 
     70     // |stream_| may be NULL even if OK was returned.
     71     if (stream_.get()) {
     72       DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
     73       stream_->SetDelegate(this);
     74       return OK;
     75     }
     76   }
     77 
     78   int rv = stream_request_.StartRequest(
     79       SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url,
     80       priority, stream_net_log,
     81       base::Bind(&SpdyHttpStream::OnStreamCreated,
     82                  weak_factory_.GetWeakPtr(), callback));
     83 
     84   if (rv == OK) {
     85     stream_ = stream_request_.ReleaseStream();
     86     stream_->SetDelegate(this);
     87   }
     88 
     89   return rv;
     90 }
     91 
     92 UploadProgress SpdyHttpStream::GetUploadProgress() const {
     93   if (!request_info_ || !HasUploadData())
     94     return UploadProgress();
     95 
     96   return UploadProgress(request_info_->upload_data_stream->position(),
     97                         request_info_->upload_data_stream->size());
     98 }
     99 
    100 int SpdyHttpStream::ReadResponseHeaders(const CompletionCallback& callback) {
    101   CHECK(!callback.is_null());
    102   if (stream_closed_)
    103     return closed_stream_status_;
    104 
    105   CHECK(stream_.get());
    106 
    107   // Check if we already have the response headers. If so, return synchronously.
    108   if (response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
    109     CHECK(!stream_->IsIdle());
    110     return OK;
    111   }
    112 
    113   // Still waiting for the response, return IO_PENDING.
    114   CHECK(callback_.is_null());
    115   callback_ = callback;
    116   return ERR_IO_PENDING;
    117 }
    118 
    119 int SpdyHttpStream::ReadResponseBody(
    120     IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
    121   if (stream_.get())
    122     CHECK(!stream_->IsIdle());
    123 
    124   CHECK(buf);
    125   CHECK(buf_len);
    126   CHECK(!callback.is_null());
    127 
    128   // If we have data buffered, complete the IO immediately.
    129   if (!response_body_queue_.IsEmpty()) {
    130     return response_body_queue_.Dequeue(buf->data(), buf_len);
    131   } else if (stream_closed_) {
    132     return closed_stream_status_;
    133   }
    134 
    135   CHECK(callback_.is_null());
    136   CHECK(!user_buffer_.get());
    137   CHECK_EQ(0, user_buffer_len_);
    138 
    139   callback_ = callback;
    140   user_buffer_ = buf;
    141   user_buffer_len_ = buf_len;
    142   return ERR_IO_PENDING;
    143 }
    144 
    145 void SpdyHttpStream::Close(bool not_reusable) {
    146   // Note: the not_reusable flag has no meaning for SPDY streams.
    147 
    148   Cancel();
    149   DCHECK(!stream_.get());
    150 }
    151 
    152 HttpStream* SpdyHttpStream::RenewStreamForAuth() {
    153   return NULL;
    154 }
    155 
    156 bool SpdyHttpStream::IsResponseBodyComplete() const {
    157   return stream_closed_;
    158 }
    159 
    160 bool SpdyHttpStream::CanFindEndOfResponse() const {
    161   return true;
    162 }
    163 
    164 bool SpdyHttpStream::IsConnectionReused() const {
    165   return is_reused_;
    166 }
    167 
    168 void SpdyHttpStream::SetConnectionReused() {
    169   // SPDY doesn't need an indicator here.
    170 }
    171 
    172 bool SpdyHttpStream::IsConnectionReusable() const {
    173   // SPDY streams aren't considered reusable.
    174   return false;
    175 }
    176 
    177 int64 SpdyHttpStream::GetTotalReceivedBytes() const {
    178   if (stream_closed_)
    179     return closed_stream_received_bytes_;
    180 
    181   if (!stream_)
    182     return 0;
    183 
    184   return stream_->raw_received_bytes();
    185 }
    186 
    187 bool SpdyHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
    188   if (stream_closed_) {
    189     if (!closed_stream_has_load_timing_info_)
    190       return false;
    191     *load_timing_info = closed_stream_load_timing_info_;
    192     return true;
    193   }
    194 
    195   // If |stream_| has yet to be created, or does not yet have an ID, fail.
    196   // The reused flag can only be correctly set once a stream has an ID.  Streams
    197   // get their IDs once the request has been successfully sent, so this does not
    198   // behave that differently from other stream types.
    199   if (!stream_ || stream_->stream_id() == 0)
    200     return false;
    201 
    202   return stream_->GetLoadTimingInfo(load_timing_info);
    203 }
    204 
    205 int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
    206                                 HttpResponseInfo* response,
    207                                 const CompletionCallback& callback) {
    208   if (stream_closed_) {
    209     return closed_stream_status_;
    210   }
    211 
    212   base::Time request_time = base::Time::Now();
    213   CHECK(stream_.get());
    214 
    215   stream_->SetRequestTime(request_time);
    216   // This should only get called in the case of a request occurring
    217   // during server push that has already begun but hasn't finished,
    218   // so we set the response's request time to be the actual one
    219   if (response_info_)
    220     response_info_->request_time = request_time;
    221 
    222   CHECK(!request_body_buf_.get());
    223   if (HasUploadData()) {
    224     // Use kMaxSpdyFrameChunkSize as the buffer size, since the request
    225     // body data is written with this size at a time.
    226     request_body_buf_ = new IOBufferWithSize(kMaxSpdyFrameChunkSize);
    227     // The request body buffer is empty at first.
    228     request_body_buf_size_ = 0;
    229   }
    230 
    231   CHECK(!callback.is_null());
    232   CHECK(response);
    233 
    234   // SendRequest can be called in two cases.
    235   //
    236   // a) A client initiated request. In this case, |response_info_| should be
    237   //    NULL to start with.
    238   // b) A client request which matches a response that the server has already
    239   //    pushed.
    240   if (push_response_info_.get()) {
    241     *response = *(push_response_info_.get());
    242     push_response_info_.reset();
    243   } else {
    244     DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);
    245   }
    246 
    247   response_info_ = response;
    248 
    249   // Put the peer's IP address and port into the response.
    250   IPEndPoint address;
    251   int result = stream_->GetPeerAddress(&address);
    252   if (result != OK)
    253     return result;
    254   response_info_->socket_address = HostPortPair::FromIPEndPoint(address);
    255 
    256   if (stream_->type() == SPDY_PUSH_STREAM) {
    257     // Pushed streams do not send any data, and should always be
    258     // idle. However, we still want to return ERR_IO_PENDING to mimic
    259     // non-push behavior. The callback will be called when the
    260     // response is received.
    261     result = ERR_IO_PENDING;
    262   } else {
    263     scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
    264     CreateSpdyHeadersFromHttpRequest(
    265         *request_info_, request_headers,
    266         headers.get(), stream_->GetProtocolVersion(),
    267         direct_);
    268     stream_->net_log().AddEvent(
    269         NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS,
    270         base::Bind(&SpdyHeaderBlockNetLogCallback, headers.get()));
    271     result =
    272         stream_->SendRequestHeaders(
    273             headers.Pass(),
    274             HasUploadData() ? MORE_DATA_TO_SEND : NO_MORE_DATA_TO_SEND);
    275   }
    276 
    277   if (result == ERR_IO_PENDING) {
    278     CHECK(callback_.is_null());
    279     callback_ = callback;
    280   }
    281   return result;
    282 }
    283 
    284 void SpdyHttpStream::Cancel() {
    285   callback_.Reset();
    286   if (stream_.get()) {
    287     stream_->Cancel();
    288     DCHECK(!stream_.get());
    289   }
    290 }
    291 
    292 void SpdyHttpStream::OnRequestHeadersSent() {
    293   if (!callback_.is_null())
    294     DoCallback(OK);
    295 
    296   // TODO(akalin): Do this immediately after sending the request
    297   // headers.
    298   if (HasUploadData())
    299     ReadAndSendRequestBodyData();
    300 }
    301 
    302 SpdyResponseHeadersStatus SpdyHttpStream::OnResponseHeadersUpdated(
    303     const SpdyHeaderBlock& response_headers) {
    304   CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_INCOMPLETE);
    305 
    306   if (!response_info_) {
    307     DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
    308     push_response_info_.reset(new HttpResponseInfo);
    309     response_info_ = push_response_info_.get();
    310   }
    311 
    312   if (!SpdyHeadersToHttpResponse(
    313           response_headers, stream_->GetProtocolVersion(), response_info_)) {
    314     // We do not have complete headers yet.
    315     return RESPONSE_HEADERS_ARE_INCOMPLETE;
    316   }
    317 
    318   response_info_->response_time = stream_->response_time();
    319   response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
    320   // Don't store the SSLInfo in the response here, HttpNetworkTransaction
    321   // will take care of that part.
    322   SSLInfo ssl_info;
    323   NextProto protocol_negotiated = kProtoUnknown;
    324   stream_->GetSSLInfo(&ssl_info,
    325                       &response_info_->was_npn_negotiated,
    326                       &protocol_negotiated);
    327   response_info_->npn_negotiated_protocol =
    328       SSLClientSocket::NextProtoToString(protocol_negotiated);
    329   response_info_->request_time = stream_->GetRequestTime();
    330   response_info_->connection_info =
    331       HttpResponseInfo::ConnectionInfoFromNextProto(stream_->GetProtocol());
    332   response_info_->vary_data
    333       .Init(*request_info_, *response_info_->headers.get());
    334 
    335   if (!callback_.is_null())
    336     DoCallback(OK);
    337 
    338   return RESPONSE_HEADERS_ARE_COMPLETE;
    339 }
    340 
    341 void SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
    342   CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_COMPLETE);
    343 
    344   // Note that data may be received for a SpdyStream prior to the user calling
    345   // ReadResponseBody(), therefore user_buffer_ may be NULL.  This may often
    346   // happen for server initiated streams.
    347   DCHECK(stream_.get());
    348   DCHECK(!stream_->IsClosed() || stream_->type() == SPDY_PUSH_STREAM);
    349   if (buffer) {
    350     response_body_queue_.Enqueue(buffer.Pass());
    351 
    352     if (user_buffer_.get()) {
    353       // Handing small chunks of data to the caller creates measurable overhead.
    354       // We buffer data in short time-spans and send a single read notification.
    355       ScheduleBufferedReadCallback();
    356     }
    357   }
    358 }
    359 
    360 void SpdyHttpStream::OnDataSent() {
    361   request_body_buf_size_ = 0;
    362   ReadAndSendRequestBodyData();
    363 }
    364 
    365 void SpdyHttpStream::OnClose(int status) {
    366   if (stream_.get()) {
    367     stream_closed_ = true;
    368     closed_stream_status_ = status;
    369     closed_stream_id_ = stream_->stream_id();
    370     closed_stream_has_load_timing_info_ =
    371         stream_->GetLoadTimingInfo(&closed_stream_load_timing_info_);
    372     closed_stream_received_bytes_ = stream_->raw_received_bytes();
    373   }
    374   stream_.reset();
    375   bool invoked_callback = false;
    376   if (status == net::OK) {
    377     // We need to complete any pending buffered read now.
    378     invoked_callback = DoBufferedReadCallback();
    379   }
    380   if (!invoked_callback && !callback_.is_null())
    381     DoCallback(status);
    382 }
    383 
    384 bool SpdyHttpStream::HasUploadData() const {
    385   CHECK(request_info_);
    386   return
    387       request_info_->upload_data_stream &&
    388       ((request_info_->upload_data_stream->size() > 0) ||
    389        request_info_->upload_data_stream->is_chunked());
    390 }
    391 
    392 void SpdyHttpStream::OnStreamCreated(
    393     const CompletionCallback& callback,
    394     int rv) {
    395   if (rv == OK) {
    396     stream_ = stream_request_.ReleaseStream();
    397     stream_->SetDelegate(this);
    398   }
    399   callback.Run(rv);
    400 }
    401 
    402 void SpdyHttpStream::ReadAndSendRequestBodyData() {
    403   CHECK(HasUploadData());
    404   CHECK_EQ(request_body_buf_size_, 0);
    405 
    406   if (request_info_->upload_data_stream->IsEOF())
    407     return;
    408 
    409   // Read the data from the request body stream.
    410   const int rv = request_info_->upload_data_stream
    411       ->Read(request_body_buf_.get(),
    412              request_body_buf_->size(),
    413              base::Bind(&SpdyHttpStream::OnRequestBodyReadCompleted,
    414                         weak_factory_.GetWeakPtr()));
    415 
    416   if (rv != ERR_IO_PENDING) {
    417     // ERR_IO_PENDING is the only possible error.
    418     CHECK_GE(rv, 0);
    419     OnRequestBodyReadCompleted(rv);
    420   }
    421 }
    422 
    423 void SpdyHttpStream::OnRequestBodyReadCompleted(int status) {
    424   CHECK_GE(status, 0);
    425   request_body_buf_size_ = status;
    426   const bool eof = request_info_->upload_data_stream->IsEOF();
    427   if (eof) {
    428     CHECK_GE(request_body_buf_size_, 0);
    429   } else {
    430     CHECK_GT(request_body_buf_size_, 0);
    431   }
    432   stream_->SendData(request_body_buf_.get(),
    433                     request_body_buf_size_,
    434                     eof ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
    435 }
    436 
    437 void SpdyHttpStream::ScheduleBufferedReadCallback() {
    438   // If there is already a scheduled DoBufferedReadCallback, don't issue
    439   // another one.  Mark that we have received more data and return.
    440   if (buffered_read_callback_pending_) {
    441     more_read_data_pending_ = true;
    442     return;
    443   }
    444 
    445   more_read_data_pending_ = false;
    446   buffered_read_callback_pending_ = true;
    447   const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
    448   base::MessageLoop::current()->PostDelayedTask(
    449       FROM_HERE,
    450       base::Bind(base::IgnoreResult(&SpdyHttpStream::DoBufferedReadCallback),
    451                  weak_factory_.GetWeakPtr()),
    452       kBufferTime);
    453 }
    454 
    455 // Checks to see if we should wait for more buffered data before notifying
    456 // the caller.  Returns true if we should wait, false otherwise.
    457 bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
    458   // If the response is complete, there is no point in waiting.
    459   if (stream_closed_)
    460     return false;
    461 
    462   DCHECK_GT(user_buffer_len_, 0);
    463   return response_body_queue_.GetTotalSize() <
    464       static_cast<size_t>(user_buffer_len_);
    465 }
    466 
    467 bool SpdyHttpStream::DoBufferedReadCallback() {
    468   buffered_read_callback_pending_ = false;
    469 
    470   // If the transaction is cancelled or errored out, we don't need to complete
    471   // the read.
    472   if (!stream_.get() && !stream_closed_)
    473     return false;
    474 
    475   int stream_status =
    476       stream_closed_ ? closed_stream_status_ : stream_->response_status();
    477   if (stream_status != OK)
    478     return false;
    479 
    480   // When more_read_data_pending_ is true, it means that more data has
    481   // arrived since we started waiting.  Wait a little longer and continue
    482   // to buffer.
    483   if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
    484     ScheduleBufferedReadCallback();
    485     return false;
    486   }
    487 
    488   int rv = 0;
    489   if (user_buffer_.get()) {
    490     rv = ReadResponseBody(user_buffer_.get(), user_buffer_len_, callback_);
    491     CHECK_NE(rv, ERR_IO_PENDING);
    492     user_buffer_ = NULL;
    493     user_buffer_len_ = 0;
    494     DoCallback(rv);
    495     return true;
    496   }
    497   return false;
    498 }
    499 
    500 void SpdyHttpStream::DoCallback(int rv) {
    501   CHECK_NE(rv, ERR_IO_PENDING);
    502   CHECK(!callback_.is_null());
    503 
    504   // Since Run may result in being called back, clear user_callback_ in advance.
    505   CompletionCallback c = callback_;
    506   callback_.Reset();
    507   c.Run(rv);
    508 }
    509 
    510 void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
    511   DCHECK(stream_.get());
    512   bool using_npn;
    513   NextProto protocol_negotiated = kProtoUnknown;
    514   stream_->GetSSLInfo(ssl_info, &using_npn, &protocol_negotiated);
    515 }
    516 
    517 void SpdyHttpStream::GetSSLCertRequestInfo(
    518     SSLCertRequestInfo* cert_request_info) {
    519   DCHECK(stream_.get());
    520   stream_->GetSSLCertRequestInfo(cert_request_info);
    521 }
    522 
    523 bool SpdyHttpStream::IsSpdyHttpStream() const {
    524   return true;
    525 }
    526 
    527 void SpdyHttpStream::Drain(HttpNetworkSession* session) {
    528   Close(false);
    529   delete this;
    530 }
    531 
    532 void SpdyHttpStream::SetPriority(RequestPriority priority) {
    533   // TODO(akalin): Plumb this through to |stream_request_| and
    534   // |stream_|.
    535 }
    536 
    537 }  // namespace net
    538