Home | History | Annotate | Download | only in spdy
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/spdy/spdy_stream.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/compiler_specific.h"
      9 #include "base/logging.h"
     10 #include "base/message_loop/message_loop.h"
     11 #include "base/strings/string_number_conversions.h"
     12 #include "base/strings/stringprintf.h"
     13 #include "base/values.h"
     14 #include "net/spdy/spdy_buffer_producer.h"
     15 #include "net/spdy/spdy_http_utils.h"
     16 #include "net/spdy/spdy_session.h"
     17 
     18 namespace net {
     19 
     20 namespace {
     21 
     22 base::Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id,
     23                                            int status,
     24                                            const std::string* description,
     25                                            NetLog::LogLevel /* log_level */) {
     26   base::DictionaryValue* dict = new base::DictionaryValue();
     27   dict->SetInteger("stream_id", static_cast<int>(stream_id));
     28   dict->SetInteger("status", status);
     29   dict->SetString("description", *description);
     30   return dict;
     31 }
     32 
     33 base::Value* NetLogSpdyStreamWindowUpdateCallback(
     34     SpdyStreamId stream_id,
     35     int32 delta,
     36     int32 window_size,
     37     NetLog::LogLevel /* log_level */) {
     38   base::DictionaryValue* dict = new base::DictionaryValue();
     39   dict->SetInteger("stream_id", stream_id);
     40   dict->SetInteger("delta", delta);
     41   dict->SetInteger("window_size", window_size);
     42   return dict;
     43 }
     44 
     45 bool ContainsUppercaseAscii(const std::string& str) {
     46   for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) {
     47     if (*i >= 'A' && *i <= 'Z') {
     48       return true;
     49     }
     50   }
     51   return false;
     52 }
     53 
     54 }  // namespace
     55 
     56 // A wrapper around a stream that calls into ProduceSynStreamFrame().
     57 class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer {
     58  public:
     59   SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream)
     60       : stream_(stream) {
     61     DCHECK(stream_.get());
     62   }
     63 
     64   virtual ~SynStreamBufferProducer() {}
     65 
     66   virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
     67     if (!stream_.get()) {
     68       NOTREACHED();
     69       return scoped_ptr<SpdyBuffer>();
     70     }
     71     DCHECK_GT(stream_->stream_id(), 0u);
     72     return scoped_ptr<SpdyBuffer>(
     73         new SpdyBuffer(stream_->ProduceSynStreamFrame()));
     74   }
     75 
     76  private:
     77   const base::WeakPtr<SpdyStream> stream_;
     78 };
     79 
     80 SpdyStream::SpdyStream(SpdyStreamType type,
     81                        const base::WeakPtr<SpdySession>& session,
     82                        const GURL& url,
     83                        RequestPriority priority,
     84                        int32 initial_send_window_size,
     85                        int32 initial_recv_window_size,
     86                        const BoundNetLog& net_log)
     87     : type_(type),
     88       weak_ptr_factory_(this),
     89       in_do_loop_(false),
     90       continue_buffering_data_(type_ == SPDY_PUSH_STREAM),
     91       stream_id_(0),
     92       url_(url),
     93       priority_(priority),
     94       slot_(0),
     95       send_stalled_by_flow_control_(false),
     96       send_window_size_(initial_send_window_size),
     97       recv_window_size_(initial_recv_window_size),
     98       unacked_recv_window_bytes_(0),
     99       session_(session),
    100       delegate_(NULL),
    101       send_status_(
    102           (type_ == SPDY_PUSH_STREAM) ?
    103           NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND),
    104       request_time_(base::Time::Now()),
    105       response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
    106       io_state_((type_ == SPDY_PUSH_STREAM) ? STATE_IDLE : STATE_NONE),
    107       response_status_(OK),
    108       net_log_(net_log),
    109       send_bytes_(0),
    110       recv_bytes_(0),
    111       just_completed_frame_type_(DATA),
    112       just_completed_frame_size_(0) {
    113   CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM ||
    114         type_ == SPDY_REQUEST_RESPONSE_STREAM ||
    115         type_ == SPDY_PUSH_STREAM);
    116 }
    117 
    118 SpdyStream::~SpdyStream() {
    119   CHECK(!in_do_loop_);
    120   UpdateHistograms();
    121 }
    122 
    123 void SpdyStream::SetDelegate(Delegate* delegate) {
    124   CHECK(!delegate_);
    125   CHECK(delegate);
    126   delegate_ = delegate;
    127 
    128   if (type_ == SPDY_PUSH_STREAM) {
    129     DCHECK(continue_buffering_data_);
    130     base::MessageLoop::current()->PostTask(
    131         FROM_HERE,
    132         base::Bind(&SpdyStream::PushedStreamReplayData, GetWeakPtr()));
    133   }
    134 }
    135 
    136 void SpdyStream::PushedStreamReplayData() {
    137   DCHECK_EQ(type_, SPDY_PUSH_STREAM);
    138   DCHECK_NE(stream_id_, 0u);
    139   DCHECK(continue_buffering_data_);
    140 
    141   continue_buffering_data_ = false;
    142 
    143   // The delegate methods called below may delete |this|, so use
    144   // |weak_this| to detect that.
    145   base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
    146 
    147   CHECK(delegate_);
    148   SpdyResponseHeadersStatus status =
    149       delegate_->OnResponseHeadersUpdated(response_headers_);
    150   if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
    151     // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
    152     // have been closed. Since we don't have complete headers, assume
    153     // we're waiting for another HEADERS frame, and we had better not
    154     // have any pending data frames.
    155     CHECK(weak_this);
    156     if (!pending_buffers_.empty()) {
    157       LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
    158                      "Data received with incomplete headers.");
    159       session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
    160     }
    161     return;
    162   }
    163 
    164   // OnResponseHeadersUpdated() may have closed |this|.
    165   if (!weak_this)
    166     return;
    167 
    168   response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
    169 
    170   while (!pending_buffers_.empty()) {
    171     // Take ownership of the first element of |pending_buffers_|.
    172     scoped_ptr<SpdyBuffer> buffer(pending_buffers_.front());
    173     pending_buffers_.weak_erase(pending_buffers_.begin());
    174 
    175     bool eof = (buffer == NULL);
    176 
    177     CHECK(delegate_);
    178     delegate_->OnDataReceived(buffer.Pass());
    179 
    180     // OnDataReceived() may have closed |this|.
    181     if (!weak_this)
    182       return;
    183 
    184     if (eof) {
    185       DCHECK(pending_buffers_.empty());
    186       session_->CloseActiveStream(stream_id_, OK);
    187       DCHECK(!weak_this);
    188       // |pending_buffers_| is invalid at this point.
    189       break;
    190     }
    191   }
    192 }
    193 
    194 scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() {
    195   CHECK_EQ(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
    196   CHECK(request_headers_);
    197   CHECK_GT(stream_id_, 0u);
    198 
    199   SpdyControlFlags flags =
    200       (send_status_ == NO_MORE_DATA_TO_SEND) ?
    201       CONTROL_FLAG_FIN : CONTROL_FLAG_NONE;
    202   scoped_ptr<SpdyFrame> frame(session_->CreateSynStream(
    203       stream_id_, priority_, slot_, flags, *request_headers_));
    204   send_time_ = base::TimeTicks::Now();
    205   return frame.Pass();
    206 }
    207 
    208 void SpdyStream::DetachDelegate() {
    209   CHECK(!in_do_loop_);
    210   DCHECK(!IsClosed());
    211   delegate_ = NULL;
    212   Cancel();
    213 }
    214 
    215 void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) {
    216   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
    217 
    218   if (IsClosed())
    219     return;
    220 
    221   // Check for wraparound.
    222   if (send_window_size_ > 0) {
    223     DCHECK_LE(delta_window_size, kint32max - send_window_size_);
    224   }
    225   if (send_window_size_ < 0) {
    226     DCHECK_GE(delta_window_size, kint32min - send_window_size_);
    227   }
    228   send_window_size_ += delta_window_size;
    229   PossiblyResumeIfSendStalled();
    230 }
    231 
    232 void SpdyStream::OnWriteBufferConsumed(
    233     size_t frame_payload_size,
    234     size_t consume_size,
    235     SpdyBuffer::ConsumeSource consume_source) {
    236   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
    237   if (consume_source == SpdyBuffer::DISCARD) {
    238     // If we're discarding a frame or part of it, increase the send
    239     // window by the number of discarded bytes. (Although if we're
    240     // discarding part of a frame, it's probably because of a write
    241     // error and we'll be tearing down the stream soon.)
    242     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
    243     DCHECK_GT(remaining_payload_bytes, 0u);
    244     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
    245   }
    246   // For consumed bytes, the send window is increased when we receive
    247   // a WINDOW_UPDATE frame.
    248 }
    249 
    250 void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) {
    251   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
    252   DCHECK_GE(delta_window_size, 1);
    253 
    254   // Ignore late WINDOW_UPDATEs.
    255   if (IsClosed())
    256     return;
    257 
    258   if (send_window_size_ > 0) {
    259     // Check for overflow.
    260     int32 max_delta_window_size = kint32max - send_window_size_;
    261     if (delta_window_size > max_delta_window_size) {
    262       std::string desc = base::StringPrintf(
    263           "Received WINDOW_UPDATE [delta: %d] for stream %d overflows "
    264           "send_window_size_ [current: %d]", delta_window_size, stream_id_,
    265           send_window_size_);
    266       session_->ResetStream(stream_id_, RST_STREAM_FLOW_CONTROL_ERROR, desc);
    267       return;
    268     }
    269   }
    270 
    271   send_window_size_ += delta_window_size;
    272 
    273   net_log_.AddEvent(
    274       NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
    275       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
    276                  stream_id_, delta_window_size, send_window_size_));
    277 
    278   PossiblyResumeIfSendStalled();
    279 }
    280 
    281 void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) {
    282   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
    283 
    284   if (IsClosed())
    285     return;
    286 
    287   // We only call this method when sending a frame. Therefore,
    288   // |delta_window_size| should be within the valid frame size range.
    289   DCHECK_GE(delta_window_size, 1);
    290   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
    291 
    292   // |send_window_size_| should have been at least |delta_window_size| for
    293   // this call to happen.
    294   DCHECK_GE(send_window_size_, delta_window_size);
    295 
    296   send_window_size_ -= delta_window_size;
    297 
    298   net_log_.AddEvent(
    299       NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
    300       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
    301                  stream_id_, -delta_window_size, send_window_size_));
    302 }
    303 
    304 void SpdyStream::OnReadBufferConsumed(
    305     size_t consume_size,
    306     SpdyBuffer::ConsumeSource consume_source) {
    307   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
    308   DCHECK_GE(consume_size, 1u);
    309   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
    310   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
    311 }
    312 
    313 void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) {
    314   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
    315 
    316   // By the time a read is processed by the delegate, this stream may
    317   // already be inactive.
    318   if (!session_->IsStreamActive(stream_id_))
    319     return;
    320 
    321   DCHECK_GE(unacked_recv_window_bytes_, 0);
    322   DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
    323   DCHECK_GE(delta_window_size, 1);
    324   // Check for overflow.
    325   DCHECK_LE(delta_window_size, kint32max - recv_window_size_);
    326 
    327   recv_window_size_ += delta_window_size;
    328   net_log_.AddEvent(
    329       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
    330       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
    331                  stream_id_, delta_window_size, recv_window_size_));
    332 
    333   unacked_recv_window_bytes_ += delta_window_size;
    334   if (unacked_recv_window_bytes_ >
    335       session_->stream_initial_recv_window_size() / 2) {
    336     session_->SendStreamWindowUpdate(
    337         stream_id_, static_cast<uint32>(unacked_recv_window_bytes_));
    338     unacked_recv_window_bytes_ = 0;
    339   }
    340 }
    341 
    342 void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) {
    343   DCHECK(session_->IsStreamActive(stream_id_));
    344   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
    345   DCHECK_GE(delta_window_size, 1);
    346 
    347   // Since we never decrease the initial receive window size,
    348   // |delta_window_size| should never cause |recv_window_size_| to go
    349   // negative. If we do, the receive window isn't being respected.
    350   if (delta_window_size > recv_window_size_) {
    351     session_->ResetStream(
    352         stream_id_, RST_STREAM_PROTOCOL_ERROR,
    353         "delta_window_size is " + base::IntToString(delta_window_size) +
    354             " in DecreaseRecvWindowSize, which is larger than the receive " +
    355             "window size of " + base::IntToString(recv_window_size_));
    356     return;
    357   }
    358 
    359   recv_window_size_ -= delta_window_size;
    360   net_log_.AddEvent(
    361       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
    362       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
    363                  stream_id_, -delta_window_size, recv_window_size_));
    364 }
    365 
    366 int SpdyStream::GetPeerAddress(IPEndPoint* address) const {
    367   return session_->GetPeerAddress(address);
    368 }
    369 
    370 int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
    371   return session_->GetLocalAddress(address);
    372 }
    373 
    374 bool SpdyStream::WasEverUsed() const {
    375   return session_->WasEverUsed();
    376 }
    377 
    378 base::Time SpdyStream::GetRequestTime() const {
    379   return request_time_;
    380 }
    381 
    382 void SpdyStream::SetRequestTime(base::Time t) {
    383   request_time_ = t;
    384 }
    385 
    386 int SpdyStream::OnInitialResponseHeadersReceived(
    387     const SpdyHeaderBlock& initial_response_headers,
    388     base::Time response_time,
    389     base::TimeTicks recv_first_byte_time) {
    390   // SpdySession guarantees that this is called at most once.
    391   CHECK(response_headers_.empty());
    392 
    393   // Check to make sure that we don't receive the response headers
    394   // before we're ready for it.
    395   switch (type_) {
    396     case SPDY_BIDIRECTIONAL_STREAM:
    397       // For a bidirectional stream, we're ready for the response
    398       // headers once we've finished sending the request headers.
    399       if (io_state_ < STATE_IDLE) {
    400         session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
    401                               "Response received before request sent");
    402         return ERR_SPDY_PROTOCOL_ERROR;
    403       }
    404       break;
    405 
    406     case SPDY_REQUEST_RESPONSE_STREAM:
    407       // For a request/response stream, we're ready for the response
    408       // headers once we've finished sending the request headers and
    409       // the request body (if we have one).
    410       if ((io_state_ < STATE_IDLE) || (send_status_ == MORE_DATA_TO_SEND) ||
    411           pending_send_data_.get()) {
    412         session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
    413                               "Response received before request sent");
    414         return ERR_SPDY_PROTOCOL_ERROR;
    415       }
    416       break;
    417 
    418     case SPDY_PUSH_STREAM:
    419       // For a push stream, we're ready immediately.
    420       DCHECK_EQ(send_status_, NO_MORE_DATA_TO_SEND);
    421       DCHECK_EQ(io_state_, STATE_IDLE);
    422       break;
    423   }
    424 
    425   metrics_.StartStream();
    426 
    427   DCHECK_EQ(io_state_, STATE_IDLE);
    428 
    429   response_time_ = response_time;
    430   recv_first_byte_time_ = recv_first_byte_time;
    431   return MergeWithResponseHeaders(initial_response_headers);
    432 }
    433 
    434 int SpdyStream::OnAdditionalResponseHeadersReceived(
    435     const SpdyHeaderBlock& additional_response_headers) {
    436   if (type_ == SPDY_REQUEST_RESPONSE_STREAM) {
    437     session_->ResetStream(
    438         stream_id_, RST_STREAM_PROTOCOL_ERROR,
    439         "Additional headers received for request/response stream");
    440     return ERR_SPDY_PROTOCOL_ERROR;
    441   } else if (type_ == SPDY_PUSH_STREAM &&
    442              response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
    443     session_->ResetStream(
    444         stream_id_, RST_STREAM_PROTOCOL_ERROR,
    445         "Additional headers received for push stream");
    446     return ERR_SPDY_PROTOCOL_ERROR;
    447   }
    448   return MergeWithResponseHeaders(additional_response_headers);
    449 }
    450 
    451 void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
    452   DCHECK(session_->IsStreamActive(stream_id_));
    453 
    454   // If we're still buffering data for a push stream, we will do the
    455   // check for data received with incomplete headers in
    456   // PushedStreamReplayData().
    457   if (!delegate_ || continue_buffering_data_) {
    458     DCHECK_EQ(type_, SPDY_PUSH_STREAM);
    459     // It should be valid for this to happen in the server push case.
    460     // We'll return received data when delegate gets attached to the stream.
    461     if (buffer) {
    462       pending_buffers_.push_back(buffer.release());
    463     } else {
    464       pending_buffers_.push_back(NULL);
    465       metrics_.StopStream();
    466       // Note: we leave the stream open in the session until the stream
    467       //       is claimed.
    468     }
    469     return;
    470   }
    471 
    472   // If we have response headers but the delegate has indicated that
    473   // it's still incomplete, then that's a protocol error.
    474   if (response_headers_status_ == RESPONSE_HEADERS_ARE_INCOMPLETE) {
    475     LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
    476                    "Data received with incomplete headers.");
    477     session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
    478     return;
    479   }
    480 
    481   CHECK(!IsClosed());
    482 
    483   if (!buffer) {
    484     metrics_.StopStream();
    485     // Deletes |this|.
    486     session_->CloseActiveStream(stream_id_, OK);
    487     return;
    488   }
    489 
    490   size_t length = buffer->GetRemainingSize();
    491   DCHECK_LE(length, session_->GetDataFrameMaximumPayload());
    492   if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
    493     DecreaseRecvWindowSize(static_cast<int32>(length));
    494     buffer->AddConsumeCallback(
    495         base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr()));
    496   }
    497 
    498   // Track our bandwidth.
    499   metrics_.RecordBytes(length);
    500   recv_bytes_ += length;
    501   recv_last_byte_time_ = base::TimeTicks::Now();
    502 
    503   // May close |this|.
    504   delegate_->OnDataReceived(buffer.Pass());
    505 }
    506 
    507 void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type,
    508                                       size_t frame_size) {
    509   if (frame_size < session_->GetFrameMinimumSize() ||
    510       frame_size > session_->GetFrameMaximumSize()) {
    511     NOTREACHED();
    512     return;
    513   }
    514   if (IsClosed())
    515     return;
    516   just_completed_frame_type_ = frame_type;
    517   just_completed_frame_size_ = frame_size;
    518   DoLoop(OK);
    519 }
    520 
    521 int SpdyStream::GetProtocolVersion() const {
    522   return session_->GetProtocolVersion();
    523 }
    524 
    525 void SpdyStream::LogStreamError(int status, const std::string& description) {
    526   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ERROR,
    527                     base::Bind(&NetLogSpdyStreamErrorCallback,
    528                                stream_id_, status, &description));
    529 }
    530 
    531 void SpdyStream::OnClose(int status) {
    532   CHECK(!in_do_loop_);
    533   io_state_ = STATE_CLOSED;
    534   response_status_ = status;
    535   Delegate* delegate = delegate_;
    536   delegate_ = NULL;
    537   if (delegate)
    538     delegate->OnClose(status);
    539   // Unset |stream_id_| last so that the delegate can look it up.
    540   stream_id_ = 0;
    541 }
    542 
    543 void SpdyStream::Cancel() {
    544   CHECK(!in_do_loop_);
    545   // We may be called again from a delegate's OnClose().
    546   if (io_state_ == STATE_CLOSED)
    547     return;
    548 
    549   if (stream_id_ != 0) {
    550     session_->ResetStream(stream_id_, RST_STREAM_CANCEL, std::string());
    551   } else {
    552     session_->CloseCreatedStream(GetWeakPtr(), RST_STREAM_CANCEL);
    553   }
    554   // |this| is invalid at this point.
    555 }
    556 
    557 void SpdyStream::Close() {
    558   CHECK(!in_do_loop_);
    559   // We may be called again from a delegate's OnClose().
    560   if (io_state_ == STATE_CLOSED)
    561     return;
    562 
    563   if (stream_id_ != 0) {
    564     session_->CloseActiveStream(stream_id_, OK);
    565   } else {
    566     session_->CloseCreatedStream(GetWeakPtr(), OK);
    567   }
    568   // |this| is invalid at this point.
    569 }
    570 
    571 base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() {
    572   return weak_ptr_factory_.GetWeakPtr();
    573 }
    574 
    575 int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers,
    576                                    SpdySendStatus send_status) {
    577   CHECK_NE(type_, SPDY_PUSH_STREAM);
    578   CHECK_EQ(send_status_, MORE_DATA_TO_SEND);
    579   CHECK(!request_headers_);
    580   CHECK(!pending_send_data_.get());
    581   CHECK_EQ(io_state_, STATE_NONE);
    582   request_headers_ = request_headers.Pass();
    583   send_status_ = send_status;
    584   io_state_ = STATE_GET_DOMAIN_BOUND_CERT;
    585   return DoLoop(OK);
    586 }
    587 
    588 void SpdyStream::SendData(IOBuffer* data,
    589                           int length,
    590                           SpdySendStatus send_status) {
    591   CHECK_NE(type_, SPDY_PUSH_STREAM);
    592   CHECK_EQ(send_status_, MORE_DATA_TO_SEND);
    593   CHECK_GE(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
    594   CHECK(!pending_send_data_.get());
    595   pending_send_data_ = new DrainableIOBuffer(data, length);
    596   send_status_ = send_status;
    597   QueueNextDataFrame();
    598 }
    599 
    600 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
    601                             bool* was_npn_negotiated,
    602                             NextProto* protocol_negotiated) {
    603   return session_->GetSSLInfo(
    604       ssl_info, was_npn_negotiated, protocol_negotiated);
    605 }
    606 
    607 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
    608   return session_->GetSSLCertRequestInfo(cert_request_info);
    609 }
    610 
    611 void SpdyStream::PossiblyResumeIfSendStalled() {
    612   DCHECK(!IsClosed());
    613 
    614   if (send_stalled_by_flow_control_ && !session_->IsSendStalled() &&
    615       send_window_size_ > 0) {
    616     net_log_.AddEvent(
    617         NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED,
    618         NetLog::IntegerCallback("stream_id", stream_id_));
    619     send_stalled_by_flow_control_ = false;
    620     QueueNextDataFrame();
    621   }
    622 }
    623 
    624 bool SpdyStream::IsClosed() const {
    625   return io_state_ == STATE_CLOSED;
    626 }
    627 
    628 bool SpdyStream::IsIdle() const {
    629   return io_state_ == STATE_IDLE;
    630 }
    631 
    632 NextProto SpdyStream::GetProtocol() const {
    633   return session_->protocol();
    634 }
    635 
    636 bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
    637   if (stream_id_ == 0)
    638     return false;
    639 
    640   return session_->GetLoadTimingInfo(stream_id_, load_timing_info);
    641 }
    642 
    643 GURL SpdyStream::GetUrlFromHeaders() const {
    644   if (type_ != SPDY_PUSH_STREAM && !request_headers_)
    645     return GURL();
    646 
    647   const SpdyHeaderBlock& headers =
    648       (type_ == SPDY_PUSH_STREAM) ? response_headers_ : *request_headers_;
    649   return GetUrlFromHeaderBlock(headers, GetProtocolVersion(),
    650                                type_ == SPDY_PUSH_STREAM);
    651 }
    652 
    653 bool SpdyStream::HasUrlFromHeaders() const {
    654   return !GetUrlFromHeaders().is_empty();
    655 }
    656 
    657 void SpdyStream::OnGetDomainBoundCertComplete(int result) {
    658   DCHECK_EQ(io_state_, STATE_GET_DOMAIN_BOUND_CERT_COMPLETE);
    659   DoLoop(result);
    660 }
    661 
    662 int SpdyStream::DoLoop(int result) {
    663   CHECK(!in_do_loop_);
    664   in_do_loop_ = true;
    665 
    666   do {
    667     State state = io_state_;
    668     io_state_ = STATE_NONE;
    669     switch (state) {
    670       case STATE_GET_DOMAIN_BOUND_CERT:
    671         CHECK_EQ(result, OK);
    672         result = DoGetDomainBoundCert();
    673         break;
    674       case STATE_GET_DOMAIN_BOUND_CERT_COMPLETE:
    675         result = DoGetDomainBoundCertComplete(result);
    676         break;
    677       case STATE_SEND_DOMAIN_BOUND_CERT:
    678         CHECK_EQ(result, OK);
    679         result = DoSendDomainBoundCert();
    680         break;
    681       case STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE:
    682         result = DoSendDomainBoundCertComplete(result);
    683         break;
    684       case STATE_SEND_REQUEST_HEADERS:
    685         CHECK_EQ(result, OK);
    686         result = DoSendRequestHeaders();
    687         break;
    688       case STATE_SEND_REQUEST_HEADERS_COMPLETE:
    689         CHECK_EQ(result, OK);
    690         result = DoSendRequestHeadersComplete();
    691         break;
    692 
    693       // For request/response streams, no data is sent from the client
    694       // while in the OPEN state, so OnFrameWriteComplete is never
    695       // called here.  The HTTP body is handled in the OnDataReceived
    696       // callback, which does not call into DoLoop.
    697       //
    698       // For bidirectional streams, we'll send and receive data once
    699       // the connection is established.  Received data is handled in
    700       // OnDataReceived.  Sent data is handled in
    701       // OnFrameWriteComplete, which calls DoOpen().
    702       case STATE_IDLE:
    703         CHECK_EQ(result, OK);
    704         result = DoOpen();
    705         break;
    706 
    707       case STATE_CLOSED:
    708         DCHECK_NE(result, ERR_IO_PENDING);
    709         break;
    710       default:
    711         NOTREACHED() << io_state_;
    712         break;
    713     }
    714   } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE &&
    715            io_state_ != STATE_IDLE);
    716 
    717   CHECK(in_do_loop_);
    718   in_do_loop_ = false;
    719 
    720   return result;
    721 }
    722 
    723 int SpdyStream::DoGetDomainBoundCert() {
    724   CHECK(request_headers_);
    725   DCHECK_NE(type_, SPDY_PUSH_STREAM);
    726   GURL url = GetUrlFromHeaders();
    727   if (!session_->NeedsCredentials() || !url.SchemeIs("https")) {
    728     // Proceed directly to sending the request headers
    729     io_state_ = STATE_SEND_REQUEST_HEADERS;
    730     return OK;
    731   }
    732 
    733   slot_ = session_->credential_state()->FindCredentialSlot(GetUrlFromHeaders());
    734   if (slot_ != SpdyCredentialState::kNoEntry) {
    735     // Proceed directly to sending the request headers
    736     io_state_ = STATE_SEND_REQUEST_HEADERS;
    737     return OK;
    738   }
    739 
    740   io_state_ = STATE_GET_DOMAIN_BOUND_CERT_COMPLETE;
    741   ServerBoundCertService* sbc_service = session_->GetServerBoundCertService();
    742   DCHECK(sbc_service != NULL);
    743   int rv = sbc_service->GetDomainBoundCert(
    744       url.GetOrigin().host(),
    745       &domain_bound_private_key_,
    746       &domain_bound_cert_,
    747       base::Bind(&SpdyStream::OnGetDomainBoundCertComplete, GetWeakPtr()),
    748       &domain_bound_cert_request_handle_);
    749   return rv;
    750 }
    751 
    752 int SpdyStream::DoGetDomainBoundCertComplete(int result) {
    753   DCHECK_NE(type_, SPDY_PUSH_STREAM);
    754   if (result != OK)
    755     return result;
    756 
    757   io_state_ = STATE_SEND_DOMAIN_BOUND_CERT;
    758   slot_ =  session_->credential_state()->SetHasCredential(GetUrlFromHeaders());
    759   return OK;
    760 }
    761 
    762 int SpdyStream::DoSendDomainBoundCert() {
    763   CHECK(request_headers_);
    764   DCHECK_NE(type_, SPDY_PUSH_STREAM);
    765   io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE;
    766 
    767   std::string origin = GetUrlFromHeaders().GetOrigin().spec();
    768   DCHECK(origin[origin.length() - 1] == '/');
    769   origin.erase(origin.length() - 1);  // Trim trailing slash.
    770   scoped_ptr<SpdyFrame> frame;
    771   int rv = session_->CreateCredentialFrame(
    772       origin,
    773       domain_bound_private_key_,
    774       domain_bound_cert_,
    775       priority_,
    776       &frame);
    777   if (rv != OK) {
    778     DCHECK_NE(rv, ERR_IO_PENDING);
    779     return rv;
    780   }
    781 
    782   DCHECK(frame);
    783   // TODO(akalin): Fix the following race condition:
    784   //
    785   // Since this is decoupled from sending the SYN_STREAM frame, it is
    786   // possible that other domain-bound cert frames will clobber ours
    787   // before our SYN_STREAM frame gets sent. This can be solved by
    788   // immediately enqueueing the SYN_STREAM frame here and adjusting
    789   // the state machine appropriately.
    790   session_->EnqueueStreamWrite(
    791       GetWeakPtr(), CREDENTIAL,
    792       scoped_ptr<SpdyBufferProducer>(
    793           new SimpleBufferProducer(
    794               scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))));
    795   return ERR_IO_PENDING;
    796 }
    797 
    798 int SpdyStream::DoSendDomainBoundCertComplete(int result) {
    799   DCHECK_NE(type_, SPDY_PUSH_STREAM);
    800   if (result != OK)
    801     return result;
    802 
    803   DCHECK_EQ(just_completed_frame_type_, CREDENTIAL);
    804   io_state_ = STATE_SEND_REQUEST_HEADERS;
    805   return OK;
    806 }
    807 
    808 int SpdyStream::DoSendRequestHeaders() {
    809   DCHECK_NE(type_, SPDY_PUSH_STREAM);
    810   io_state_ = STATE_SEND_REQUEST_HEADERS_COMPLETE;
    811 
    812   session_->EnqueueStreamWrite(
    813       GetWeakPtr(), SYN_STREAM,
    814       scoped_ptr<SpdyBufferProducer>(
    815           new SynStreamBufferProducer(GetWeakPtr())));
    816   return ERR_IO_PENDING;
    817 }
    818 
    819 namespace {
    820 
    821 // Assuming we're in STATE_IDLE, maps the given type (which must not
    822 // be SPDY_PUSH_STREAM) and send status to a result to return from
    823 // DoSendRequestHeadersComplete() or DoOpen().
    824 int GetOpenStateResult(SpdyStreamType type, SpdySendStatus send_status) {
    825   switch (type) {
    826     case SPDY_BIDIRECTIONAL_STREAM:
    827       // For bidirectional streams, there's nothing else to do.
    828       DCHECK_EQ(send_status, MORE_DATA_TO_SEND);
    829       return OK;
    830 
    831     case SPDY_REQUEST_RESPONSE_STREAM:
    832       // For request/response streams, wait for the delegate to send
    833       // data if there's request data to send; we'll get called back
    834       // when the send finishes.
    835       if (send_status == MORE_DATA_TO_SEND)
    836         return ERR_IO_PENDING;
    837 
    838       return OK;
    839 
    840     case SPDY_PUSH_STREAM:
    841       // This should never be called for push streams.
    842       break;
    843   }
    844 
    845   CHECK(false);
    846   return ERR_UNEXPECTED;
    847 }
    848 
    849 }  // namespace
    850 
    851 int SpdyStream::DoSendRequestHeadersComplete() {
    852   DCHECK_NE(type_, SPDY_PUSH_STREAM);
    853   DCHECK_EQ(just_completed_frame_type_, SYN_STREAM);
    854   DCHECK_NE(stream_id_, 0u);
    855 
    856   io_state_ = STATE_IDLE;
    857 
    858   CHECK(delegate_);
    859   // Must not close |this|; if it does, it will trigger the |in_do_loop_|
    860   // check in the destructor.
    861   delegate_->OnRequestHeadersSent();
    862 
    863   return GetOpenStateResult(type_, send_status_);
    864 }
    865 
    866 int SpdyStream::DoOpen() {
    867   DCHECK_NE(type_, SPDY_PUSH_STREAM);
    868 
    869   if (just_completed_frame_type_ != DATA) {
    870     NOTREACHED();
    871     return ERR_UNEXPECTED;
    872   }
    873 
    874   if (just_completed_frame_size_ < session_->GetDataFrameMinimumSize()) {
    875     NOTREACHED();
    876     return ERR_UNEXPECTED;
    877   }
    878 
    879   size_t frame_payload_size =
    880       just_completed_frame_size_ - session_->GetDataFrameMinimumSize();
    881   if (frame_payload_size > session_->GetDataFrameMaximumPayload()) {
    882     NOTREACHED();
    883     return ERR_UNEXPECTED;
    884   }
    885 
    886   // Set |io_state_| first as |delegate_| may check it.
    887   io_state_ = STATE_IDLE;
    888 
    889   send_bytes_ += frame_payload_size;
    890 
    891   pending_send_data_->DidConsume(frame_payload_size);
    892   if (pending_send_data_->BytesRemaining() > 0) {
    893     QueueNextDataFrame();
    894     return ERR_IO_PENDING;
    895   }
    896 
    897   pending_send_data_ = NULL;
    898 
    899   CHECK(delegate_);
    900   // Must not close |this|; if it does, it will trigger the
    901   // |in_do_loop_| check in the destructor.
    902   delegate_->OnDataSent();
    903 
    904   return GetOpenStateResult(type_, send_status_);
    905 }
    906 
    907 void SpdyStream::UpdateHistograms() {
    908   // We need at least the receive timers to be filled in, as otherwise
    909   // metrics can be bogus.
    910   if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null())
    911     return;
    912 
    913   base::TimeTicks effective_send_time;
    914   if (type_ == SPDY_PUSH_STREAM) {
    915     // Push streams shouldn't have |send_time_| filled in.
    916     DCHECK(send_time_.is_null());
    917     effective_send_time = recv_first_byte_time_;
    918   } else {
    919     // For non-push streams, we also need |send_time_| to be filled
    920     // in.
    921     if (send_time_.is_null())
    922       return;
    923     effective_send_time = send_time_;
    924   }
    925 
    926   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
    927                       recv_first_byte_time_ - effective_send_time);
    928   UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
    929                       recv_last_byte_time_ - recv_first_byte_time_);
    930   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
    931                       recv_last_byte_time_ - effective_send_time);
    932 
    933   UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
    934   UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
    935 }
    936 
    937 void SpdyStream::QueueNextDataFrame() {
    938   // Until the request has been completely sent, we cannot be sure
    939   // that our stream_id is correct.
    940   DCHECK_GT(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
    941   CHECK_GT(stream_id_, 0u);
    942   CHECK(pending_send_data_.get());
    943   CHECK_GT(pending_send_data_->BytesRemaining(), 0);
    944 
    945   SpdyDataFlags flags =
    946       (send_status_ == NO_MORE_DATA_TO_SEND) ?
    947       DATA_FLAG_FIN : DATA_FLAG_NONE;
    948   scoped_ptr<SpdyBuffer> data_buffer(
    949       session_->CreateDataBuffer(stream_id_,
    950                                  pending_send_data_.get(),
    951                                  pending_send_data_->BytesRemaining(),
    952                                  flags));
    953   // We'll get called again by PossiblyResumeIfSendStalled().
    954   if (!data_buffer)
    955     return;
    956 
    957   if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
    958     DCHECK_GE(data_buffer->GetRemainingSize(),
    959               session_->GetDataFrameMinimumSize());
    960     size_t payload_size =
    961         data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize();
    962     DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload());
    963     DecreaseSendWindowSize(static_cast<int32>(payload_size));
    964     // This currently isn't strictly needed, since write frames are
    965     // discarded only if the stream is about to be closed. But have it
    966     // here anyway just in case this changes.
    967     data_buffer->AddConsumeCallback(
    968         base::Bind(&SpdyStream::OnWriteBufferConsumed,
    969                    GetWeakPtr(), payload_size));
    970   }
    971 
    972   session_->EnqueueStreamWrite(
    973       GetWeakPtr(), DATA,
    974       scoped_ptr<SpdyBufferProducer>(
    975           new SimpleBufferProducer(data_buffer.Pass())));
    976 }
    977 
    978 int SpdyStream::MergeWithResponseHeaders(
    979     const SpdyHeaderBlock& new_response_headers) {
    980   if (new_response_headers.find("transfer-encoding") !=
    981       new_response_headers.end()) {
    982     session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
    983                          "Received transfer-encoding header");
    984     return ERR_SPDY_PROTOCOL_ERROR;
    985   }
    986 
    987   for (SpdyHeaderBlock::const_iterator it = new_response_headers.begin();
    988       it != new_response_headers.end(); ++it) {
    989     // Disallow uppercase headers.
    990     if (ContainsUppercaseAscii(it->first)) {
    991       session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
    992                             "Upper case characters in header: " + it->first);
    993       return ERR_SPDY_PROTOCOL_ERROR;
    994     }
    995 
    996     SpdyHeaderBlock::iterator it2 = response_headers_.lower_bound(it->first);
    997     // Disallow duplicate headers.  This is just to be conservative.
    998     if (it2 != response_headers_.end() && it2->first == it->first) {
    999       session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
   1000                             "Duplicate header: " + it->first);
   1001       return ERR_SPDY_PROTOCOL_ERROR;
   1002     }
   1003 
   1004     response_headers_.insert(it2, *it);
   1005   }
   1006 
   1007   // If delegate_ is not yet attached, we'll call
   1008   // OnResponseHeadersUpdated() after the delegate gets attached to
   1009   // the stream.
   1010   if (delegate_) {
   1011     // The call to OnResponseHeadersUpdated() below may delete |this|,
   1012     // so use |weak_this| to detect that.
   1013     base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
   1014 
   1015     SpdyResponseHeadersStatus status =
   1016         delegate_->OnResponseHeadersUpdated(response_headers_);
   1017     if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
   1018       // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
   1019       // have been closed.
   1020       CHECK(weak_this);
   1021       // Incomplete headers are OK only for push streams.
   1022       if (type_ != SPDY_PUSH_STREAM) {
   1023         session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
   1024                               "Incomplete headers");
   1025         return ERR_INCOMPLETE_SPDY_HEADERS;
   1026       }
   1027     } else if (weak_this) {
   1028       response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
   1029     }
   1030   }
   1031 
   1032   return OK;
   1033 }
   1034 
   1035 }  // namespace net
   1036