Home | History | Annotate | Download | only in flip
      1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/flip/flip_stream.h"
      6 
      7 #include "base/logging.h"
      8 #include "net/flip/flip_session.h"
      9 #include "net/http/http_request_info.h"
     10 #include "net/http/http_response_info.h"
     11 
     12 namespace net {
     13 
     14 FlipStream::FlipStream(FlipSession* session, flip::FlipStreamId stream_id,
     15                        bool pushed, LoadLog* log)
     16     : stream_id_(stream_id),
     17       priority_(0),
     18       pushed_(pushed),
     19       download_finished_(false),
     20       metrics_(Singleton<BandwidthMetrics>::get()),
     21       session_(session),
     22       response_(NULL),
     23       request_body_stream_(NULL),
     24       response_complete_(false),
     25       io_state_(STATE_NONE),
     26       response_status_(OK),
     27       user_callback_(NULL),
     28       user_buffer_(NULL),
     29       user_buffer_len_(0),
     30       cancelled_(false),
     31       load_log_(log),
     32       send_bytes_(0),
     33       recv_bytes_(0),
     34       histograms_recorded_(false) {}
     35 
     36 FlipStream::~FlipStream() {
     37   DLOG(INFO) << "Deleting FlipStream for stream " << stream_id_;
     38 
     39   // TODO(willchan): We're still calling CancelStream() too many times, because
     40   // inactive pending/pushed streams will still have stream_id_ set.
     41   if (stream_id_) {
     42     session_->CancelStream(stream_id_);
     43   } else if (!response_complete_) {
     44     NOTREACHED();
     45   }
     46 }
     47 
     48 uint64 FlipStream::GetUploadProgress() const {
     49   if (!request_body_stream_.get())
     50     return 0;
     51 
     52   return request_body_stream_->position();
     53 }
     54 
     55 const HttpResponseInfo* FlipStream::GetResponseInfo() const {
     56   return response_;
     57 }
     58 
     59 int FlipStream::ReadResponseHeaders(CompletionCallback* callback) {
     60   // Note: The FlipStream may have already received the response headers, so
     61   //       this call may complete synchronously.
     62   CHECK(callback);
     63   CHECK(io_state_ == STATE_NONE);
     64   CHECK(!cancelled_);
     65 
     66   // The SYN_REPLY has already been received.
     67   if (response_->headers)
     68     return OK;
     69 
     70   io_state_ = STATE_READ_HEADERS;
     71   CHECK(!user_callback_);
     72   user_callback_ = callback;
     73   return ERR_IO_PENDING;
     74 }
     75 
     76 int FlipStream::ReadResponseBody(
     77     IOBuffer* buf, int buf_len, CompletionCallback* callback) {
     78   DCHECK_EQ(io_state_, STATE_NONE);
     79   CHECK(buf);
     80   CHECK(buf_len);
     81   CHECK(callback);
     82   CHECK(!cancelled_);
     83 
     84   // If we have data buffered, complete the IO immediately.
     85   if (response_body_.size()) {
     86     int bytes_read = 0;
     87     while (response_body_.size() && buf_len > 0) {
     88       scoped_refptr<IOBufferWithSize> data = response_body_.front();
     89       const int bytes_to_copy = std::min(buf_len, data->size());
     90       memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
     91       buf_len -= bytes_to_copy;
     92       if (bytes_to_copy == data->size()) {
     93         response_body_.pop_front();
     94       } else {
     95         const int bytes_remaining = data->size() - bytes_to_copy;
     96         IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
     97         memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
     98                bytes_remaining);
     99         response_body_.pop_front();
    100         response_body_.push_front(new_buffer);
    101       }
    102       bytes_read += bytes_to_copy;
    103     }
    104     if (bytes_read > 0)
    105       recv_bytes_ += bytes_read;
    106     return bytes_read;
    107   } else if (response_complete_) {
    108     return response_status_;
    109   }
    110 
    111   CHECK(!user_callback_);
    112   CHECK(!user_buffer_);
    113   CHECK(user_buffer_len_ == 0);
    114 
    115   user_callback_ = callback;
    116   user_buffer_ = buf;
    117   user_buffer_len_ = buf_len;
    118   return ERR_IO_PENDING;
    119 }
    120 
    121 int FlipStream::SendRequest(UploadDataStream* upload_data,
    122                             HttpResponseInfo* response,
    123                             CompletionCallback* callback) {
    124   CHECK(callback);
    125   CHECK(!cancelled_);
    126   CHECK(response);
    127 
    128   response_ = response;
    129 
    130   if (upload_data) {
    131     if (upload_data->size())
    132       request_body_stream_.reset(upload_data);
    133     else
    134       delete upload_data;
    135   }
    136 
    137   send_time_ = base::TimeTicks::Now();
    138 
    139   DCHECK_EQ(io_state_, STATE_NONE);
    140   if (!pushed_)
    141     io_state_ = STATE_SEND_HEADERS;
    142   else
    143     io_state_ = STATE_READ_HEADERS;
    144   int result = DoLoop(OK);
    145   if (result == ERR_IO_PENDING) {
    146     CHECK(!user_callback_);
    147     user_callback_ = callback;
    148   }
    149   return result;
    150 }
    151 
    152 void FlipStream::Cancel() {
    153   cancelled_ = true;
    154   user_callback_ = NULL;
    155 
    156   session_->CancelStream(stream_id_);
    157 }
    158 
    159 void FlipStream::OnResponseReceived(const HttpResponseInfo& response) {
    160   metrics_.StartStream();
    161 
    162   CHECK(!response_->headers);
    163 
    164   *response_ = response;  // TODO(mbelshe): avoid copy.
    165   DCHECK(response_->headers);
    166 
    167   recv_first_byte_time_ = base::TimeTicks::Now();
    168 
    169   if (io_state_ == STATE_NONE) {
    170     CHECK(pushed_);
    171   } else if (io_state_ == STATE_READ_HEADERS_COMPLETE) {
    172     CHECK(!pushed_);
    173   } else {
    174     NOTREACHED();
    175   }
    176 
    177   int rv = DoLoop(OK);
    178 
    179   if (user_callback_)
    180     DoCallback(rv);
    181 }
    182 
    183 bool FlipStream::OnDataReceived(const char* data, int length) {
    184   DCHECK_GE(length, 0);
    185   LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
    186             << stream_id_;
    187 
    188   // If we don't have a response, then the SYN_REPLY did not come through.
    189   // We cannot pass data up to the caller unless the reply headers have been
    190   // received.
    191   if (!response_->headers) {
    192     OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
    193     return false;
    194   }
    195 
    196   if (length > 0)
    197     recv_bytes_ += length;
    198   recv_last_byte_time_ = base::TimeTicks::Now();
    199 
    200   // A zero-length read means that the stream is being closed.
    201   if (!length) {
    202     metrics_.StopStream();
    203     download_finished_ = true;
    204     OnClose(net::OK);
    205     return true;
    206   }
    207 
    208   // Track our bandwidth.
    209   metrics_.RecordBytes(length);
    210 
    211   if (length > 0) {
    212     // TODO(mbelshe): If read is pending, we should copy the data straight into
    213     // the read buffer here.  For now, we'll queue it always.
    214     // TODO(mbelshe): We need to have some throttling on this.  We shouldn't
    215     //                buffer an infinite amount of data.
    216 
    217     IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
    218     memcpy(io_buffer->data(), data, length);
    219 
    220     response_body_.push_back(io_buffer);
    221   }
    222 
    223   // Note that data may be received for a FlipStream prior to the user calling
    224   // ReadResponseBody(), therefore user_callback_ may be NULL.  This may often
    225   // happen for server initiated streams.
    226   if (user_callback_) {
    227     int rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
    228     CHECK(rv != ERR_IO_PENDING);
    229     user_buffer_ = NULL;
    230     user_buffer_len_ = 0;
    231     DoCallback(rv);
    232   }
    233 
    234   return true;
    235 }
    236 
    237 void FlipStream::OnWriteComplete(int status) {
    238   // TODO(mbelshe): Check for cancellation here.  If we're cancelled, we
    239   // should discontinue the DoLoop.
    240 
    241   if (status > 0)
    242     send_bytes_ += status;
    243 
    244   DoLoop(status);
    245 }
    246 
    247 void FlipStream::OnClose(int status) {
    248   response_complete_ = true;
    249   response_status_ = status;
    250   stream_id_ = 0;
    251 
    252   if (user_callback_)
    253     DoCallback(status);
    254 
    255   UpdateHistograms();
    256 }
    257 
    258 int FlipStream::DoLoop(int result) {
    259   do {
    260     State state = io_state_;
    261     io_state_ = STATE_NONE;
    262     switch (state) {
    263       // State machine 1: Send headers and wait for response headers.
    264       case STATE_SEND_HEADERS:
    265         CHECK(result == OK);
    266         LoadLog::BeginEvent(load_log_,
    267                             LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS);
    268         result = DoSendHeaders();
    269         break;
    270       case STATE_SEND_HEADERS_COMPLETE:
    271         LoadLog::EndEvent(load_log_,
    272                           LoadLog::TYPE_FLIP_STREAM_SEND_HEADERS);
    273         result = DoSendHeadersComplete(result);
    274         break;
    275       case STATE_SEND_BODY:
    276         CHECK(result == OK);
    277         LoadLog::BeginEvent(load_log_,
    278                             LoadLog::TYPE_FLIP_STREAM_SEND_BODY);
    279         result = DoSendBody();
    280         break;
    281       case STATE_SEND_BODY_COMPLETE:
    282         LoadLog::EndEvent(load_log_,
    283                           LoadLog::TYPE_FLIP_STREAM_SEND_BODY);
    284         result = DoSendBodyComplete(result);
    285         break;
    286       case STATE_READ_HEADERS:
    287         CHECK(result == OK);
    288         LoadLog::BeginEvent(load_log_,
    289                             LoadLog::TYPE_FLIP_STREAM_READ_HEADERS);
    290         result = DoReadHeaders();
    291         break;
    292       case STATE_READ_HEADERS_COMPLETE:
    293         LoadLog::EndEvent(load_log_,
    294                           LoadLog::TYPE_FLIP_STREAM_READ_HEADERS);
    295         result = DoReadHeadersComplete(result);
    296         break;
    297 
    298       // State machine 2: Read body.
    299       // NOTE(willchan): Currently unused.  Currently we handle this stuff in
    300       // the OnDataReceived()/OnClose()/ReadResponseHeaders()/etc.  Only reason
    301       // to do this is for consistency with the Http code.
    302       case STATE_READ_BODY:
    303         LoadLog::BeginEvent(load_log_,
    304                             LoadLog::TYPE_FLIP_STREAM_READ_BODY);
    305         result = DoReadBody();
    306         break;
    307       case STATE_READ_BODY_COMPLETE:
    308         LoadLog::EndEvent(load_log_,
    309                           LoadLog::TYPE_FLIP_STREAM_READ_BODY);
    310         result = DoReadBodyComplete(result);
    311         break;
    312       case STATE_DONE:
    313         DCHECK(result != ERR_IO_PENDING);
    314         break;
    315       default:
    316         NOTREACHED();
    317         break;
    318     }
    319   } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE);
    320 
    321   return result;
    322 }
    323 
    324 void FlipStream::DoCallback(int rv) {
    325   CHECK(rv != ERR_IO_PENDING);
    326   CHECK(user_callback_);
    327 
    328   // Since Run may result in being called back, clear user_callback_ in advance.
    329   CompletionCallback* c = user_callback_;
    330   user_callback_ = NULL;
    331   c->Run(rv);
    332 }
    333 
    334 int FlipStream::DoSendHeaders() {
    335   // The FlipSession will always call us back when the send is complete.
    336   // TODO(willchan): This code makes the assumption that for the non-push stream
    337   // case, the client code calls SendRequest() after creating the stream and
    338   // before yielding back to the MessageLoop.  This is true in the current code,
    339   // but is not obvious from the headers.  We should make the code handle
    340   // SendRequest() being called after the SYN_REPLY has been received.
    341   io_state_ = STATE_SEND_HEADERS_COMPLETE;
    342   return ERR_IO_PENDING;
    343 }
    344 
    345 int FlipStream::DoSendHeadersComplete(int result) {
    346   if (result < 0)
    347     return result;
    348 
    349   CHECK(result > 0);
    350 
    351   // There is no body, skip that state.
    352   if (!request_body_stream_.get()) {
    353     io_state_ = STATE_READ_HEADERS;
    354     return OK;
    355   }
    356 
    357   io_state_ = STATE_SEND_BODY;
    358   return OK;
    359 }
    360 
    361 // DoSendBody is called to send the optional body for the request.  This call
    362 // will also be called as each write of a chunk of the body completes.
    363 int FlipStream::DoSendBody() {
    364   // If we're already in the STATE_SENDING_BODY state, then we've already
    365   // sent a portion of the body.  In that case, we need to first consume
    366   // the bytes written in the body stream.  Note that the bytes written is
    367   // the number of bytes in the frame that were written, only consume the
    368   // data portion, of course.
    369   io_state_ = STATE_SEND_BODY_COMPLETE;
    370   int buf_len = static_cast<int>(request_body_stream_->buf_len());
    371   return session_->WriteStreamData(stream_id_,
    372                                    request_body_stream_->buf(),
    373                                    buf_len);
    374 }
    375 
    376 int FlipStream::DoSendBodyComplete(int result) {
    377   if (result < 0)
    378     return result;
    379 
    380   CHECK(result != 0);
    381 
    382   request_body_stream_->DidConsume(result);
    383 
    384   if (request_body_stream_->position() < request_body_stream_->size())
    385     io_state_ = STATE_SEND_BODY;
    386   else
    387     io_state_ = STATE_READ_HEADERS;
    388 
    389   return OK;
    390 }
    391 
    392 int FlipStream::DoReadHeaders() {
    393   io_state_ = STATE_READ_HEADERS_COMPLETE;
    394   return response_->headers ? OK : ERR_IO_PENDING;
    395 }
    396 
    397 int FlipStream::DoReadHeadersComplete(int result) {
    398   return result;
    399 }
    400 
    401 int FlipStream::DoReadBody() {
    402   // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
    403   // makes sense.
    404   return ERR_IO_PENDING;
    405 }
    406 
    407 int FlipStream::DoReadBodyComplete(int result) {
    408   // TODO(mbelshe): merge FlipStreamParser with FlipStream and then this
    409   // makes sense.
    410   return ERR_IO_PENDING;
    411 }
    412 
    413 void FlipStream::UpdateHistograms() {
    414   if (histograms_recorded_)
    415     return;
    416 
    417   histograms_recorded_ = true;
    418 
    419   // We need all timers to be filled in, otherwise metrics can be bogus.
    420   if (send_time_.is_null() || recv_first_byte_time_.is_null() ||
    421       recv_last_byte_time_.is_null())
    422     return;
    423 
    424   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
    425       recv_first_byte_time_ - send_time_);
    426   UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
    427       recv_last_byte_time_ - recv_first_byte_time_);
    428   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
    429       recv_last_byte_time_ - send_time_);
    430 
    431   UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
    432   UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
    433 }
    434 
    435 }  // namespace net
    436