Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_http_stream.h"
      6 
      7 #include <algorithm>
      8 #include <list>
      9 
     10 #include "base/bind.h"
     11 #include "base/logging.h"
     12 #include "base/message_loop/message_loop.h"
     13 #include "base/strings/stringprintf.h"
     14 #include "net/base/host_port_pair.h"
     15 #include "net/base/net_log.h"
     16 #include "net/base/net_util.h"
     17 #include "net/base/upload_data_stream.h"
     18 #include "net/http/http_request_headers.h"
     19 #include "net/http/http_request_info.h"
     20 #include "net/http/http_response_info.h"
     21 #include "net/spdy/spdy_header_block.h"
     22 #include "net/spdy/spdy_http_utils.h"
     23 #include "net/spdy/spdy_protocol.h"
     24 #include "net/spdy/spdy_session.h"
     25 
     26 namespace net {
     27 
     28 SpdyHttpStream::SpdyHttpStream(const base::WeakPtr<SpdySession>& spdy_session,
     29                                bool direct)
     30     : weak_factory_(this),
     31       spdy_session_(spdy_session),
     32       is_reused_(spdy_session_->IsReused()),
     33       stream_closed_(false),
     34       closed_stream_status_(ERR_FAILED),
     35       closed_stream_id_(0),
     36       closed_stream_received_bytes_(0),
     37       request_info_(NULL),
     38       response_info_(NULL),
     39       response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
     40       user_buffer_len_(0),
     41       request_body_buf_size_(0),
     42       buffered_read_callback_pending_(false),
     43       more_read_data_pending_(false),
     44       direct_(direct) {
     45   DCHECK(spdy_session_.get());
     46 }
     47 
     48 SpdyHttpStream::~SpdyHttpStream() {
     49   if (stream_.get()) {
     50     stream_->DetachDelegate();
     51     DCHECK(!stream_.get());
     52   }
     53 }
     54 
     55 int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
     56                                      RequestPriority priority,
     57                                      const BoundNetLog& stream_net_log,
     58                                      const CompletionCallback& callback) {
     59   DCHECK(!stream_);
     60   if (!spdy_session_)
     61     return ERR_CONNECTION_CLOSED;
     62 
     63   request_info_ = request_info;
     64   if (request_info_->method == "GET") {
     65     int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
     66                                              stream_net_log);
     67     if (error != OK)
     68       return error;
     69 
     70     // |stream_| may be NULL even if OK was returned.
     71     if (stream_.get()) {
     72       DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
     73       stream_->SetDelegate(this);
     74       return OK;
     75     }
     76   }
     77 
     78   int rv = stream_request_.StartRequest(
     79       SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url,
     80       priority, stream_net_log,
     81       base::Bind(&SpdyHttpStream::OnStreamCreated,
     82                  weak_factory_.GetWeakPtr(), callback));
     83 
     84   if (rv == OK) {
     85     stream_ = stream_request_.ReleaseStream();
     86     stream_->SetDelegate(this);
     87   }
     88 
     89   return rv;
     90 }
     91 
     92 const HttpResponseInfo* SpdyHttpStream::GetResponseInfo() const {
     93   return response_info_;
     94 }
     95 
     96 UploadProgress SpdyHttpStream::GetUploadProgress() const {
     97   if (!request_info_ || !HasUploadData())
     98     return UploadProgress();
     99 
    100   return UploadProgress(request_info_->upload_data_stream->position(),
    101                         request_info_->upload_data_stream->size());
    102 }
    103 
    104 int SpdyHttpStream::ReadResponseHeaders(const CompletionCallback& callback) {
    105   CHECK(!callback.is_null());
    106   if (stream_closed_)
    107     return closed_stream_status_;
    108 
    109   CHECK(stream_.get());
    110 
    111   // Check if we already have the response headers. If so, return synchronously.
    112   if (response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
    113     CHECK(stream_->IsIdle());
    114     return OK;
    115   }
    116 
    117   // Still waiting for the response, return IO_PENDING.
    118   CHECK(callback_.is_null());
    119   callback_ = callback;
    120   return ERR_IO_PENDING;
    121 }
    122 
    123 int SpdyHttpStream::ReadResponseBody(
    124     IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
    125   if (stream_.get())
    126     CHECK(stream_->IsIdle());
    127 
    128   CHECK(buf);
    129   CHECK(buf_len);
    130   CHECK(!callback.is_null());
    131 
    132   // If we have data buffered, complete the IO immediately.
    133   if (!response_body_queue_.IsEmpty()) {
    134     return response_body_queue_.Dequeue(buf->data(), buf_len);
    135   } else if (stream_closed_) {
    136     return closed_stream_status_;
    137   }
    138 
    139   CHECK(callback_.is_null());
    140   CHECK(!user_buffer_.get());
    141   CHECK_EQ(0, user_buffer_len_);
    142 
    143   callback_ = callback;
    144   user_buffer_ = buf;
    145   user_buffer_len_ = buf_len;
    146   return ERR_IO_PENDING;
    147 }
    148 
    149 void SpdyHttpStream::Close(bool not_reusable) {
    150   // Note: the not_reusable flag has no meaning for SPDY streams.
    151 
    152   Cancel();
    153   DCHECK(!stream_.get());
    154 }
    155 
    156 HttpStream* SpdyHttpStream::RenewStreamForAuth() {
    157   return NULL;
    158 }
    159 
    160 bool SpdyHttpStream::IsResponseBodyComplete() const {
    161   return stream_closed_;
    162 }
    163 
    164 bool SpdyHttpStream::CanFindEndOfResponse() const {
    165   return true;
    166 }
    167 
    168 bool SpdyHttpStream::IsConnectionReused() const {
    169   return is_reused_;
    170 }
    171 
    172 void SpdyHttpStream::SetConnectionReused() {
    173   // SPDY doesn't need an indicator here.
    174 }
    175 
    176 bool SpdyHttpStream::IsConnectionReusable() const {
    177   // SPDY streams aren't considered reusable.
    178   return false;
    179 }
    180 
    181 int64 SpdyHttpStream::GetTotalReceivedBytes() const {
    182   if (stream_closed_)
    183     return closed_stream_received_bytes_;
    184 
    185   if (!stream_)
    186     return 0;
    187 
    188   return stream_->raw_received_bytes();
    189 }
    190 
    191 bool SpdyHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
    192   if (stream_closed_) {
    193     if (!closed_stream_has_load_timing_info_)
    194       return false;
    195     *load_timing_info = closed_stream_load_timing_info_;
    196     return true;
    197   }
    198 
    199   // If |stream_| has yet to be created, or does not yet have an ID, fail.
    200   // The reused flag can only be correctly set once a stream has an ID.  Streams
    201   // get their IDs once the request has been successfully sent, so this does not
    202   // behave that differently from other stream types.
    203   if (!stream_ || stream_->stream_id() == 0)
    204     return false;
    205 
    206   return stream_->GetLoadTimingInfo(load_timing_info);
    207 }
    208 
    209 int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
    210                                 HttpResponseInfo* response,
    211                                 const CompletionCallback& callback) {
    212   if (stream_closed_) {
    213     if (stream_->type() == SPDY_PUSH_STREAM)
    214       return closed_stream_status_;
    215 
    216     return (closed_stream_status_ == OK) ? ERR_FAILED : closed_stream_status_;
    217   }
    218 
    219   base::Time request_time = base::Time::Now();
    220   CHECK(stream_.get());
    221 
    222   stream_->SetRequestTime(request_time);
    223   // This should only get called in the case of a request occurring
    224   // during server push that has already begun but hasn't finished,
    225   // so we set the response's request time to be the actual one
    226   if (response_info_)
    227     response_info_->request_time = request_time;
    228 
    229   CHECK(!request_body_buf_.get());
    230   if (HasUploadData()) {
    231     // Use kMaxSpdyFrameChunkSize as the buffer size, since the request
    232     // body data is written with this size at a time.
    233     request_body_buf_ = new IOBufferWithSize(kMaxSpdyFrameChunkSize);
    234     // The request body buffer is empty at first.
    235     request_body_buf_size_ = 0;
    236   }
    237 
    238   CHECK(!callback.is_null());
    239   CHECK(response);
    240 
    241   // SendRequest can be called in two cases.
    242   //
    243   // a) A client initiated request. In this case, |response_info_| should be
    244   //    NULL to start with.
    245   // b) A client request which matches a response that the server has already
    246   //    pushed.
    247   if (push_response_info_.get()) {
    248     *response = *(push_response_info_.get());
    249     push_response_info_.reset();
    250   } else {
    251     DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);
    252   }
    253 
    254   response_info_ = response;
    255 
    256   // Put the peer's IP address and port into the response.
    257   IPEndPoint address;
    258   int result = stream_->GetPeerAddress(&address);
    259   if (result != OK)
    260     return result;
    261   response_info_->socket_address = HostPortPair::FromIPEndPoint(address);
    262 
    263   if (stream_->type() == SPDY_PUSH_STREAM) {
    264     // Pushed streams do not send any data, and should always be
    265     // idle. However, we still want to return ERR_IO_PENDING to mimic
    266     // non-push behavior. The callback will be called when the
    267     // response is received.
    268     result = ERR_IO_PENDING;
    269   } else {
    270     scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
    271     CreateSpdyHeadersFromHttpRequest(
    272         *request_info_, request_headers,
    273         headers.get(), stream_->GetProtocolVersion(),
    274         direct_);
    275     stream_->net_log().AddEvent(
    276         NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS,
    277         base::Bind(&SpdyHeaderBlockNetLogCallback, headers.get()));
    278     result =
    279         stream_->SendRequestHeaders(
    280             headers.Pass(),
    281             HasUploadData() ? MORE_DATA_TO_SEND : NO_MORE_DATA_TO_SEND);
    282   }
    283 
    284   if (result == ERR_IO_PENDING) {
    285     CHECK(callback_.is_null());
    286     callback_ = callback;
    287   }
    288   return result;
    289 }
    290 
    291 void SpdyHttpStream::Cancel() {
    292   callback_.Reset();
    293   if (stream_.get()) {
    294     stream_->Cancel();
    295     DCHECK(!stream_.get());
    296   }
    297 }
    298 
    299 void SpdyHttpStream::OnRequestHeadersSent() {
    300   if (!callback_.is_null())
    301     DoCallback(OK);
    302 
    303   // TODO(akalin): Do this immediately after sending the request
    304   // headers.
    305   if (HasUploadData())
    306     ReadAndSendRequestBodyData();
    307 }
    308 
    309 SpdyResponseHeadersStatus SpdyHttpStream::OnResponseHeadersUpdated(
    310     const SpdyHeaderBlock& response_headers) {
    311   CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_INCOMPLETE);
    312 
    313   if (!response_info_) {
    314     DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
    315     push_response_info_.reset(new HttpResponseInfo);
    316     response_info_ = push_response_info_.get();
    317   }
    318 
    319   if (!SpdyHeadersToHttpResponse(
    320           response_headers, stream_->GetProtocolVersion(), response_info_)) {
    321     // We do not have complete headers yet.
    322     return RESPONSE_HEADERS_ARE_INCOMPLETE;
    323   }
    324 
    325   response_info_->response_time = stream_->response_time();
    326   response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
    327   // Don't store the SSLInfo in the response here, HttpNetworkTransaction
    328   // will take care of that part.
    329   SSLInfo ssl_info;
    330   NextProto protocol_negotiated = kProtoUnknown;
    331   stream_->GetSSLInfo(&ssl_info,
    332                       &response_info_->was_npn_negotiated,
    333                       &protocol_negotiated);
    334   response_info_->npn_negotiated_protocol =
    335       SSLClientSocket::NextProtoToString(protocol_negotiated);
    336   response_info_->request_time = stream_->GetRequestTime();
    337   response_info_->connection_info =
    338       HttpResponseInfo::ConnectionInfoFromNextProto(stream_->GetProtocol());
    339   response_info_->vary_data
    340       .Init(*request_info_, *response_info_->headers.get());
    341 
    342   if (!callback_.is_null())
    343     DoCallback(OK);
    344 
    345   return RESPONSE_HEADERS_ARE_COMPLETE;
    346 }
    347 
    348 void SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
    349   CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_COMPLETE);
    350 
    351   // Note that data may be received for a SpdyStream prior to the user calling
    352   // ReadResponseBody(), therefore user_buffer_ may be NULL.  This may often
    353   // happen for server initiated streams.
    354   DCHECK(stream_.get());
    355   DCHECK(!stream_->IsClosed() || stream_->type() == SPDY_PUSH_STREAM);
    356   if (buffer) {
    357     response_body_queue_.Enqueue(buffer.Pass());
    358 
    359     if (user_buffer_.get()) {
    360       // Handing small chunks of data to the caller creates measurable overhead.
    361       // We buffer data in short time-spans and send a single read notification.
    362       ScheduleBufferedReadCallback();
    363     }
    364   }
    365 }
    366 
    367 void SpdyHttpStream::OnDataSent() {
    368   request_body_buf_size_ = 0;
    369   ReadAndSendRequestBodyData();
    370 }
    371 
    372 void SpdyHttpStream::OnClose(int status) {
    373   if (stream_.get()) {
    374     stream_closed_ = true;
    375     closed_stream_status_ = status;
    376     closed_stream_id_ = stream_->stream_id();
    377     closed_stream_has_load_timing_info_ =
    378         stream_->GetLoadTimingInfo(&closed_stream_load_timing_info_);
    379     closed_stream_received_bytes_ = stream_->raw_received_bytes();
    380   }
    381   stream_.reset();
    382   bool invoked_callback = false;
    383   if (status == net::OK) {
    384     // We need to complete any pending buffered read now.
    385     invoked_callback = DoBufferedReadCallback();
    386   }
    387   if (!invoked_callback && !callback_.is_null())
    388     DoCallback(status);
    389 }
    390 
    391 bool SpdyHttpStream::HasUploadData() const {
    392   CHECK(request_info_);
    393   return
    394       request_info_->upload_data_stream &&
    395       ((request_info_->upload_data_stream->size() > 0) ||
    396        request_info_->upload_data_stream->is_chunked());
    397 }
    398 
    399 void SpdyHttpStream::OnStreamCreated(
    400     const CompletionCallback& callback,
    401     int rv) {
    402   if (rv == OK) {
    403     stream_ = stream_request_.ReleaseStream();
    404     stream_->SetDelegate(this);
    405   }
    406   callback.Run(rv);
    407 }
    408 
    409 void SpdyHttpStream::ReadAndSendRequestBodyData() {
    410   CHECK(HasUploadData());
    411   CHECK_EQ(request_body_buf_size_, 0);
    412 
    413   if (request_info_->upload_data_stream->IsEOF())
    414     return;
    415 
    416   // Read the data from the request body stream.
    417   const int rv = request_info_->upload_data_stream
    418       ->Read(request_body_buf_.get(),
    419              request_body_buf_->size(),
    420              base::Bind(&SpdyHttpStream::OnRequestBodyReadCompleted,
    421                         weak_factory_.GetWeakPtr()));
    422 
    423   if (rv != ERR_IO_PENDING) {
    424     // ERR_IO_PENDING is the only possible error.
    425     CHECK_GE(rv, 0);
    426     OnRequestBodyReadCompleted(rv);
    427   }
    428 }
    429 
    430 void SpdyHttpStream::OnRequestBodyReadCompleted(int status) {
    431   CHECK_GE(status, 0);
    432   request_body_buf_size_ = status;
    433   const bool eof = request_info_->upload_data_stream->IsEOF();
    434   if (eof) {
    435     CHECK_GE(request_body_buf_size_, 0);
    436   } else {
    437     CHECK_GT(request_body_buf_size_, 0);
    438   }
    439   stream_->SendData(request_body_buf_.get(),
    440                     request_body_buf_size_,
    441                     eof ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
    442 }
    443 
    444 void SpdyHttpStream::ScheduleBufferedReadCallback() {
    445   // If there is already a scheduled DoBufferedReadCallback, don't issue
    446   // another one.  Mark that we have received more data and return.
    447   if (buffered_read_callback_pending_) {
    448     more_read_data_pending_ = true;
    449     return;
    450   }
    451 
    452   more_read_data_pending_ = false;
    453   buffered_read_callback_pending_ = true;
    454   const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
    455   base::MessageLoop::current()->PostDelayedTask(
    456       FROM_HERE,
    457       base::Bind(base::IgnoreResult(&SpdyHttpStream::DoBufferedReadCallback),
    458                  weak_factory_.GetWeakPtr()),
    459       kBufferTime);
    460 }
    461 
    462 // Checks to see if we should wait for more buffered data before notifying
    463 // the caller.  Returns true if we should wait, false otherwise.
    464 bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
    465   // If the response is complete, there is no point in waiting.
    466   if (stream_closed_)
    467     return false;
    468 
    469   DCHECK_GT(user_buffer_len_, 0);
    470   return response_body_queue_.GetTotalSize() <
    471       static_cast<size_t>(user_buffer_len_);
    472 }
    473 
    474 bool SpdyHttpStream::DoBufferedReadCallback() {
    475   buffered_read_callback_pending_ = false;
    476 
    477   // If the transaction is cancelled or errored out, we don't need to complete
    478   // the read.
    479   if (!stream_.get() && !stream_closed_)
    480     return false;
    481 
    482   int stream_status =
    483       stream_closed_ ? closed_stream_status_ : stream_->response_status();
    484   if (stream_status != OK)
    485     return false;
    486 
    487   // When more_read_data_pending_ is true, it means that more data has
    488   // arrived since we started waiting.  Wait a little longer and continue
    489   // to buffer.
    490   if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
    491     ScheduleBufferedReadCallback();
    492     return false;
    493   }
    494 
    495   int rv = 0;
    496   if (user_buffer_.get()) {
    497     rv = ReadResponseBody(user_buffer_.get(), user_buffer_len_, callback_);
    498     CHECK_NE(rv, ERR_IO_PENDING);
    499     user_buffer_ = NULL;
    500     user_buffer_len_ = 0;
    501     DoCallback(rv);
    502     return true;
    503   }
    504   return false;
    505 }
    506 
    507 void SpdyHttpStream::DoCallback(int rv) {
    508   CHECK_NE(rv, ERR_IO_PENDING);
    509   CHECK(!callback_.is_null());
    510 
    511   // Since Run may result in being called back, clear user_callback_ in advance.
    512   CompletionCallback c = callback_;
    513   callback_.Reset();
    514   c.Run(rv);
    515 }
    516 
    517 void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
    518   DCHECK(stream_.get());
    519   bool using_npn;
    520   NextProto protocol_negotiated = kProtoUnknown;
    521   stream_->GetSSLInfo(ssl_info, &using_npn, &protocol_negotiated);
    522 }
    523 
    524 void SpdyHttpStream::GetSSLCertRequestInfo(
    525     SSLCertRequestInfo* cert_request_info) {
    526   DCHECK(stream_.get());
    527   stream_->GetSSLCertRequestInfo(cert_request_info);
    528 }
    529 
    530 bool SpdyHttpStream::IsSpdyHttpStream() const {
    531   return true;
    532 }
    533 
    534 void SpdyHttpStream::Drain(HttpNetworkSession* session) {
    535   Close(false);
    536   delete this;
    537 }
    538 
    539 void SpdyHttpStream::SetPriority(RequestPriority priority) {
    540   // TODO(akalin): Plumb this through to |stream_request_| and
    541   // |stream_|.
    542 }
    543 
    544 }  // namespace net
    545