Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_http_stream.h"
      6 
      7 #include <algorithm>
      8 #include <list>
      9 
     10 #include "base/bind.h"
     11 #include "base/logging.h"
     12 #include "base/message_loop/message_loop.h"
     13 #include "base/strings/stringprintf.h"
     14 #include "net/base/host_port_pair.h"
     15 #include "net/base/net_log.h"
     16 #include "net/base/net_util.h"
     17 #include "net/base/upload_data_stream.h"
     18 #include "net/http/http_request_headers.h"
     19 #include "net/http/http_request_info.h"
     20 #include "net/http/http_response_info.h"
     21 #include "net/spdy/spdy_header_block.h"
     22 #include "net/spdy/spdy_http_utils.h"
     23 #include "net/spdy/spdy_protocol.h"
     24 #include "net/spdy/spdy_session.h"
     25 
     26 namespace net {
     27 
     28 SpdyHttpStream::SpdyHttpStream(const base::WeakPtr<SpdySession>& spdy_session,
     29                                bool direct)
     30     : weak_factory_(this),
     31       spdy_session_(spdy_session),
     32       is_reused_(spdy_session_->IsReused()),
     33       stream_closed_(false),
     34       closed_stream_status_(ERR_FAILED),
     35       closed_stream_id_(0),
     36       request_info_(NULL),
     37       response_info_(NULL),
     38       response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
     39       user_buffer_len_(0),
     40       request_body_buf_size_(0),
     41       buffered_read_callback_pending_(false),
     42       more_read_data_pending_(false),
     43       direct_(direct) {
     44   DCHECK(spdy_session_.get());
     45 }
     46 
     47 SpdyHttpStream::~SpdyHttpStream() {
     48   if (stream_.get()) {
     49     stream_->DetachDelegate();
     50     DCHECK(!stream_.get());
     51   }
     52 }
     53 
     54 int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
     55                                      RequestPriority priority,
     56                                      const BoundNetLog& stream_net_log,
     57                                      const CompletionCallback& callback) {
     58   DCHECK(!stream_);
     59   if (!spdy_session_)
     60     return ERR_CONNECTION_CLOSED;
     61 
     62   request_info_ = request_info;
     63   if (request_info_->method == "GET") {
     64     int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
     65                                              stream_net_log);
     66     if (error != OK)
     67       return error;
     68 
     69     // |stream_| may be NULL even if OK was returned.
     70     if (stream_.get()) {
     71       DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
     72       stream_->SetDelegate(this);
     73       return OK;
     74     }
     75   }
     76 
     77   int rv = stream_request_.StartRequest(
     78       SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url,
     79       priority, stream_net_log,
     80       base::Bind(&SpdyHttpStream::OnStreamCreated,
     81                  weak_factory_.GetWeakPtr(), callback));
     82 
     83   if (rv == OK) {
     84     stream_ = stream_request_.ReleaseStream();
     85     stream_->SetDelegate(this);
     86   }
     87 
     88   return rv;
     89 }
     90 
     91 const HttpResponseInfo* SpdyHttpStream::GetResponseInfo() const {
     92   return response_info_;
     93 }
     94 
     95 UploadProgress SpdyHttpStream::GetUploadProgress() const {
     96   if (!request_info_ || !HasUploadData())
     97     return UploadProgress();
     98 
     99   return UploadProgress(request_info_->upload_data_stream->position(),
    100                         request_info_->upload_data_stream->size());
    101 }
    102 
    103 int SpdyHttpStream::ReadResponseHeaders(const CompletionCallback& callback) {
    104   CHECK(!callback.is_null());
    105   if (stream_closed_)
    106     return closed_stream_status_;
    107 
    108   CHECK(stream_.get());
    109 
    110   // Check if we already have the response headers. If so, return synchronously.
    111   if (response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
    112     CHECK(stream_->IsIdle());
    113     return OK;
    114   }
    115 
    116   // Still waiting for the response, return IO_PENDING.
    117   CHECK(callback_.is_null());
    118   callback_ = callback;
    119   return ERR_IO_PENDING;
    120 }
    121 
    122 int SpdyHttpStream::ReadResponseBody(
    123     IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
    124   if (stream_.get())
    125     CHECK(stream_->IsIdle());
    126 
    127   CHECK(buf);
    128   CHECK(buf_len);
    129   CHECK(!callback.is_null());
    130 
    131   // If we have data buffered, complete the IO immediately.
    132   if (!response_body_queue_.IsEmpty()) {
    133     return response_body_queue_.Dequeue(buf->data(), buf_len);
    134   } else if (stream_closed_) {
    135     return closed_stream_status_;
    136   }
    137 
    138   CHECK(callback_.is_null());
    139   CHECK(!user_buffer_.get());
    140   CHECK_EQ(0, user_buffer_len_);
    141 
    142   callback_ = callback;
    143   user_buffer_ = buf;
    144   user_buffer_len_ = buf_len;
    145   return ERR_IO_PENDING;
    146 }
    147 
    148 void SpdyHttpStream::Close(bool not_reusable) {
    149   // Note: the not_reusable flag has no meaning for SPDY streams.
    150 
    151   Cancel();
    152   DCHECK(!stream_.get());
    153 }
    154 
    155 HttpStream* SpdyHttpStream::RenewStreamForAuth() {
    156   return NULL;
    157 }
    158 
    159 bool SpdyHttpStream::IsResponseBodyComplete() const {
    160   return stream_closed_;
    161 }
    162 
    163 bool SpdyHttpStream::CanFindEndOfResponse() const {
    164   return true;
    165 }
    166 
    167 bool SpdyHttpStream::IsConnectionReused() const {
    168   return is_reused_;
    169 }
    170 
    171 void SpdyHttpStream::SetConnectionReused() {
    172   // SPDY doesn't need an indicator here.
    173 }
    174 
    175 bool SpdyHttpStream::IsConnectionReusable() const {
    176   // SPDY streams aren't considered reusable.
    177   return false;
    178 }
    179 
    180 bool SpdyHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
    181   if (stream_closed_) {
    182     if (!closed_stream_has_load_timing_info_)
    183       return false;
    184     *load_timing_info = closed_stream_load_timing_info_;
    185     return true;
    186   }
    187 
    188   // If |stream_| has yet to be created, or does not yet have an ID, fail.
    189   // The reused flag can only be correctly set once a stream has an ID.  Streams
    190   // get their IDs once the request has been successfully sent, so this does not
    191   // behave that differently from other stream types.
    192   if (!stream_ || stream_->stream_id() == 0)
    193     return false;
    194 
    195   return stream_->GetLoadTimingInfo(load_timing_info);
    196 }
    197 
    198 int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
    199                                 HttpResponseInfo* response,
    200                                 const CompletionCallback& callback) {
    201   if (stream_closed_) {
    202     if (stream_->type() == SPDY_PUSH_STREAM)
    203       return closed_stream_status_;
    204 
    205     return (closed_stream_status_ == OK) ? ERR_FAILED : closed_stream_status_;
    206   }
    207 
    208   base::Time request_time = base::Time::Now();
    209   CHECK(stream_.get());
    210 
    211   stream_->SetRequestTime(request_time);
    212   // This should only get called in the case of a request occurring
    213   // during server push that has already begun but hasn't finished,
    214   // so we set the response's request time to be the actual one
    215   if (response_info_)
    216     response_info_->request_time = request_time;
    217 
    218   CHECK(!request_body_buf_.get());
    219   if (HasUploadData()) {
    220     // Use kMaxSpdyFrameChunkSize as the buffer size, since the request
    221     // body data is written with this size at a time.
    222     request_body_buf_ = new IOBufferWithSize(kMaxSpdyFrameChunkSize);
    223     // The request body buffer is empty at first.
    224     request_body_buf_size_ = 0;
    225   }
    226 
    227   CHECK(!callback.is_null());
    228   CHECK(response);
    229 
    230   // SendRequest can be called in two cases.
    231   //
    232   // a) A client initiated request. In this case, |response_info_| should be
    233   //    NULL to start with.
    234   // b) A client request which matches a response that the server has already
    235   //    pushed.
    236   if (push_response_info_.get()) {
    237     *response = *(push_response_info_.get());
    238     push_response_info_.reset();
    239   } else {
    240     DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);
    241   }
    242 
    243   response_info_ = response;
    244 
    245   // Put the peer's IP address and port into the response.
    246   IPEndPoint address;
    247   int result = stream_->GetPeerAddress(&address);
    248   if (result != OK)
    249     return result;
    250   response_info_->socket_address = HostPortPair::FromIPEndPoint(address);
    251 
    252   if (stream_->type() == SPDY_PUSH_STREAM) {
    253     // Pushed streams do not send any data, and should always be
    254     // idle. However, we still want to return ERR_IO_PENDING to mimic
    255     // non-push behavior. The callback will be called when the
    256     // response is received.
    257     result = ERR_IO_PENDING;
    258   } else {
    259     scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
    260     CreateSpdyHeadersFromHttpRequest(
    261         *request_info_, request_headers,
    262         headers.get(), stream_->GetProtocolVersion(),
    263         direct_);
    264     stream_->net_log().AddEvent(
    265         NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS,
    266         base::Bind(&SpdyHeaderBlockNetLogCallback, headers.get()));
    267     result =
    268         stream_->SendRequestHeaders(
    269             headers.Pass(),
    270             HasUploadData() ? MORE_DATA_TO_SEND : NO_MORE_DATA_TO_SEND);
    271   }
    272 
    273   if (result == ERR_IO_PENDING) {
    274     CHECK(callback_.is_null());
    275     callback_ = callback;
    276   }
    277   return result;
    278 }
    279 
    280 void SpdyHttpStream::Cancel() {
    281   callback_.Reset();
    282   if (stream_.get()) {
    283     stream_->Cancel();
    284     DCHECK(!stream_.get());
    285   }
    286 }
    287 
    288 void SpdyHttpStream::OnRequestHeadersSent() {
    289   if (!callback_.is_null())
    290     DoCallback(OK);
    291 
    292   // TODO(akalin): Do this immediately after sending the request
    293   // headers.
    294   if (HasUploadData())
    295     ReadAndSendRequestBodyData();
    296 }
    297 
    298 SpdyResponseHeadersStatus SpdyHttpStream::OnResponseHeadersUpdated(
    299     const SpdyHeaderBlock& response_headers) {
    300   CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_INCOMPLETE);
    301 
    302   if (!response_info_) {
    303     DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
    304     push_response_info_.reset(new HttpResponseInfo);
    305     response_info_ = push_response_info_.get();
    306   }
    307 
    308   if (!SpdyHeadersToHttpResponse(
    309           response_headers, stream_->GetProtocolVersion(), response_info_)) {
    310     // We do not have complete headers yet.
    311     return RESPONSE_HEADERS_ARE_INCOMPLETE;
    312   }
    313 
    314   response_info_->response_time = stream_->response_time();
    315   response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
    316   // Don't store the SSLInfo in the response here, HttpNetworkTransaction
    317   // will take care of that part.
    318   SSLInfo ssl_info;
    319   NextProto protocol_negotiated = kProtoUnknown;
    320   stream_->GetSSLInfo(&ssl_info,
    321                       &response_info_->was_npn_negotiated,
    322                       &protocol_negotiated);
    323   response_info_->npn_negotiated_protocol =
    324       SSLClientSocket::NextProtoToString(protocol_negotiated);
    325   response_info_->request_time = stream_->GetRequestTime();
    326   response_info_->connection_info =
    327       HttpResponseInfo::ConnectionInfoFromNextProto(stream_->GetProtocol());
    328   response_info_->vary_data
    329       .Init(*request_info_, *response_info_->headers.get());
    330 
    331   if (!callback_.is_null())
    332     DoCallback(OK);
    333 
    334   return RESPONSE_HEADERS_ARE_COMPLETE;
    335 }
    336 
    337 void SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
    338   CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_COMPLETE);
    339 
    340   // Note that data may be received for a SpdyStream prior to the user calling
    341   // ReadResponseBody(), therefore user_buffer_ may be NULL.  This may often
    342   // happen for server initiated streams.
    343   DCHECK(stream_.get());
    344   DCHECK(!stream_->IsClosed() || stream_->type() == SPDY_PUSH_STREAM);
    345   if (buffer) {
    346     response_body_queue_.Enqueue(buffer.Pass());
    347 
    348     if (user_buffer_.get()) {
    349       // Handing small chunks of data to the caller creates measurable overhead.
    350       // We buffer data in short time-spans and send a single read notification.
    351       ScheduleBufferedReadCallback();
    352     }
    353   }
    354 }
    355 
    356 void SpdyHttpStream::OnDataSent() {
    357   request_body_buf_size_ = 0;
    358   ReadAndSendRequestBodyData();
    359 }
    360 
    361 void SpdyHttpStream::OnClose(int status) {
    362   if (stream_.get()) {
    363     stream_closed_ = true;
    364     closed_stream_status_ = status;
    365     closed_stream_id_ = stream_->stream_id();
    366     closed_stream_has_load_timing_info_ =
    367         stream_->GetLoadTimingInfo(&closed_stream_load_timing_info_);
    368   }
    369   stream_.reset();
    370   bool invoked_callback = false;
    371   if (status == net::OK) {
    372     // We need to complete any pending buffered read now.
    373     invoked_callback = DoBufferedReadCallback();
    374   }
    375   if (!invoked_callback && !callback_.is_null())
    376     DoCallback(status);
    377 }
    378 
    379 bool SpdyHttpStream::HasUploadData() const {
    380   CHECK(request_info_);
    381   return
    382       request_info_->upload_data_stream &&
    383       ((request_info_->upload_data_stream->size() > 0) ||
    384        request_info_->upload_data_stream->is_chunked());
    385 }
    386 
    387 void SpdyHttpStream::OnStreamCreated(
    388     const CompletionCallback& callback,
    389     int rv) {
    390   if (rv == OK) {
    391     stream_ = stream_request_.ReleaseStream();
    392     stream_->SetDelegate(this);
    393   }
    394   callback.Run(rv);
    395 }
    396 
    397 void SpdyHttpStream::ReadAndSendRequestBodyData() {
    398   CHECK(HasUploadData());
    399   CHECK_EQ(request_body_buf_size_, 0);
    400 
    401   if (request_info_->upload_data_stream->IsEOF())
    402     return;
    403 
    404   // Read the data from the request body stream.
    405   const int rv = request_info_->upload_data_stream
    406       ->Read(request_body_buf_.get(),
    407              request_body_buf_->size(),
    408              base::Bind(&SpdyHttpStream::OnRequestBodyReadCompleted,
    409                         weak_factory_.GetWeakPtr()));
    410 
    411   if (rv != ERR_IO_PENDING) {
    412     // ERR_IO_PENDING is the only possible error.
    413     CHECK_GE(rv, 0);
    414     OnRequestBodyReadCompleted(rv);
    415   }
    416 }
    417 
    418 void SpdyHttpStream::OnRequestBodyReadCompleted(int status) {
    419   CHECK_GE(status, 0);
    420   request_body_buf_size_ = status;
    421   const bool eof = request_info_->upload_data_stream->IsEOF();
    422   if (eof) {
    423     CHECK_GE(request_body_buf_size_, 0);
    424   } else {
    425     CHECK_GT(request_body_buf_size_, 0);
    426   }
    427   stream_->SendData(request_body_buf_.get(),
    428                     request_body_buf_size_,
    429                     eof ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
    430 }
    431 
    432 void SpdyHttpStream::ScheduleBufferedReadCallback() {
    433   // If there is already a scheduled DoBufferedReadCallback, don't issue
    434   // another one.  Mark that we have received more data and return.
    435   if (buffered_read_callback_pending_) {
    436     more_read_data_pending_ = true;
    437     return;
    438   }
    439 
    440   more_read_data_pending_ = false;
    441   buffered_read_callback_pending_ = true;
    442   const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
    443   base::MessageLoop::current()->PostDelayedTask(
    444       FROM_HERE,
    445       base::Bind(base::IgnoreResult(&SpdyHttpStream::DoBufferedReadCallback),
    446                  weak_factory_.GetWeakPtr()),
    447       kBufferTime);
    448 }
    449 
    450 // Checks to see if we should wait for more buffered data before notifying
    451 // the caller.  Returns true if we should wait, false otherwise.
    452 bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
    453   // If the response is complete, there is no point in waiting.
    454   if (stream_closed_)
    455     return false;
    456 
    457   DCHECK_GT(user_buffer_len_, 0);
    458   return response_body_queue_.GetTotalSize() <
    459       static_cast<size_t>(user_buffer_len_);
    460 }
    461 
    462 bool SpdyHttpStream::DoBufferedReadCallback() {
    463   buffered_read_callback_pending_ = false;
    464 
    465   // If the transaction is cancelled or errored out, we don't need to complete
    466   // the read.
    467   if (!stream_.get() && !stream_closed_)
    468     return false;
    469 
    470   int stream_status =
    471       stream_closed_ ? closed_stream_status_ : stream_->response_status();
    472   if (stream_status != OK)
    473     return false;
    474 
    475   // When more_read_data_pending_ is true, it means that more data has
    476   // arrived since we started waiting.  Wait a little longer and continue
    477   // to buffer.
    478   if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
    479     ScheduleBufferedReadCallback();
    480     return false;
    481   }
    482 
    483   int rv = 0;
    484   if (user_buffer_.get()) {
    485     rv = ReadResponseBody(user_buffer_.get(), user_buffer_len_, callback_);
    486     CHECK_NE(rv, ERR_IO_PENDING);
    487     user_buffer_ = NULL;
    488     user_buffer_len_ = 0;
    489     DoCallback(rv);
    490     return true;
    491   }
    492   return false;
    493 }
    494 
    495 void SpdyHttpStream::DoCallback(int rv) {
    496   CHECK_NE(rv, ERR_IO_PENDING);
    497   CHECK(!callback_.is_null());
    498 
    499   // Since Run may result in being called back, clear user_callback_ in advance.
    500   CompletionCallback c = callback_;
    501   callback_.Reset();
    502   c.Run(rv);
    503 }
    504 
    505 void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
    506   DCHECK(stream_.get());
    507   bool using_npn;
    508   NextProto protocol_negotiated = kProtoUnknown;
    509   stream_->GetSSLInfo(ssl_info, &using_npn, &protocol_negotiated);
    510 }
    511 
    512 void SpdyHttpStream::GetSSLCertRequestInfo(
    513     SSLCertRequestInfo* cert_request_info) {
    514   DCHECK(stream_.get());
    515   stream_->GetSSLCertRequestInfo(cert_request_info);
    516 }
    517 
    518 bool SpdyHttpStream::IsSpdyHttpStream() const {
    519   return true;
    520 }
    521 
    522 void SpdyHttpStream::Drain(HttpNetworkSession* session) {
    523   Close(false);
    524   delete this;
    525 }
    526 
    527 }  // namespace net
    528