Home | History | Annotate | Download | only in http
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/http/http_pipelined_connection_impl.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/bind_helpers.h"
      9 #include "base/message_loop/message_loop.h"
     10 #include "base/stl_util.h"
     11 #include "base/values.h"
     12 #include "net/base/io_buffer.h"
     13 #include "net/http/http_pipelined_stream.h"
     14 #include "net/http/http_request_info.h"
     15 #include "net/http/http_response_body_drainer.h"
     16 #include "net/http/http_response_headers.h"
     17 #include "net/http/http_stream_parser.h"
     18 #include "net/http/http_version.h"
     19 #include "net/socket/client_socket_handle.h"
     20 
     21 namespace net {
     22 
     23 namespace {
     24 
     25 base::Value* NetLogReceivedHeadersCallback(const NetLog::Source& source,
     26                                            const std::string* feedback,
     27                                            NetLog::LogLevel /* log_level */) {
     28   base::DictionaryValue* dict = new base::DictionaryValue;
     29   source.AddToEventParameters(dict);
     30   dict->SetString("feedback", *feedback);
     31   return dict;
     32 }
     33 
     34 base::Value* NetLogStreamClosedCallback(const NetLog::Source& source,
     35                                         bool not_reusable,
     36                                         NetLog::LogLevel /* log_level */) {
     37   base::DictionaryValue* dict = new base::DictionaryValue;
     38   source.AddToEventParameters(dict);
     39   dict->SetBoolean("not_reusable", not_reusable);
     40   return dict;
     41 }
     42 
     43 base::Value* NetLogHostPortPairCallback(const HostPortPair* host_port_pair,
     44                                         NetLog::LogLevel /* log_level */) {
     45   base::DictionaryValue* dict = new base::DictionaryValue;
     46   dict->SetString("host_and_port", host_port_pair->ToString());
     47   return dict;
     48 }
     49 
     50 }  // anonymous namespace
     51 
     52 HttpPipelinedConnection*
     53 HttpPipelinedConnectionImpl::Factory::CreateNewPipeline(
     54     ClientSocketHandle* connection,
     55     HttpPipelinedConnection::Delegate* delegate,
     56     const HostPortPair& origin,
     57     const SSLConfig& used_ssl_config,
     58     const ProxyInfo& used_proxy_info,
     59     const BoundNetLog& net_log,
     60     bool was_npn_negotiated,
     61     NextProto protocol_negotiated) {
     62   return new HttpPipelinedConnectionImpl(connection, delegate, origin,
     63                                          used_ssl_config, used_proxy_info,
     64                                          net_log, was_npn_negotiated,
     65                                          protocol_negotiated);
     66 }
     67 
     68 HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl(
     69     ClientSocketHandle* connection,
     70     HttpPipelinedConnection::Delegate* delegate,
     71     const HostPortPair& origin,
     72     const SSLConfig& used_ssl_config,
     73     const ProxyInfo& used_proxy_info,
     74     const BoundNetLog& net_log,
     75     bool was_npn_negotiated,
     76     NextProto protocol_negotiated)
     77     : delegate_(delegate),
     78       connection_(connection),
     79       used_ssl_config_(used_ssl_config),
     80       used_proxy_info_(used_proxy_info),
     81       net_log_(BoundNetLog::Make(net_log.net_log(),
     82                                  NetLog::SOURCE_HTTP_PIPELINED_CONNECTION)),
     83       was_npn_negotiated_(was_npn_negotiated),
     84       protocol_negotiated_(protocol_negotiated),
     85       read_buf_(new GrowableIOBuffer()),
     86       next_pipeline_id_(1),
     87       active_(false),
     88       usable_(true),
     89       completed_one_request_(false),
     90       weak_factory_(this),
     91       send_next_state_(SEND_STATE_NONE),
     92       send_still_on_call_stack_(false),
     93       read_next_state_(READ_STATE_NONE),
     94       active_read_id_(0),
     95       read_still_on_call_stack_(false) {
     96   CHECK(connection_.get());
     97   net_log_.BeginEvent(
     98       NetLog::TYPE_HTTP_PIPELINED_CONNECTION,
     99       base::Bind(&NetLogHostPortPairCallback, &origin));
    100 }
    101 
    102 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() {
    103   CHECK_EQ(depth(), 0);
    104   CHECK(stream_info_map_.empty());
    105   CHECK(pending_send_request_queue_.empty());
    106   CHECK(request_order_.empty());
    107   CHECK_EQ(send_next_state_, SEND_STATE_NONE);
    108   CHECK_EQ(read_next_state_, READ_STATE_NONE);
    109   CHECK(!active_send_request_.get());
    110   CHECK(!active_read_id_);
    111   if (!usable_) {
    112     connection_->socket()->Disconnect();
    113   }
    114   connection_->Reset();
    115   net_log_.EndEvent(NetLog::TYPE_HTTP_PIPELINED_CONNECTION);
    116 }
    117 
    118 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() {
    119   int pipeline_id = next_pipeline_id_++;
    120   CHECK(pipeline_id);
    121   HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id);
    122   stream_info_map_.insert(std::make_pair(pipeline_id, StreamInfo()));
    123   return stream;
    124 }
    125 
    126 void HttpPipelinedConnectionImpl::InitializeParser(
    127     int pipeline_id,
    128     const HttpRequestInfo* request,
    129     const BoundNetLog& net_log) {
    130   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    131   CHECK(!stream_info_map_[pipeline_id].parser.get());
    132   stream_info_map_[pipeline_id].state = STREAM_BOUND;
    133   stream_info_map_[pipeline_id].parser.reset(new HttpStreamParser(
    134       connection_.get(), request, read_buf_.get(), net_log));
    135   stream_info_map_[pipeline_id].source = net_log.source();
    136 
    137   // In case our first stream doesn't SendRequest() immediately, we should still
    138   // allow others to use this pipeline.
    139   if (pipeline_id == 1) {
    140     base::MessageLoop::current()->PostTask(
    141         FROM_HERE,
    142         base::Bind(&HttpPipelinedConnectionImpl::ActivatePipeline,
    143                    weak_factory_.GetWeakPtr()));
    144   }
    145 }
    146 
    147 void HttpPipelinedConnectionImpl::ActivatePipeline() {
    148   if (!active_) {
    149     active_ = true;
    150     delegate_->OnPipelineHasCapacity(this);
    151   }
    152 }
    153 
    154 void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) {
    155   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    156   Close(pipeline_id, false);
    157 
    158   if (stream_info_map_[pipeline_id].state != STREAM_CREATED &&
    159       stream_info_map_[pipeline_id].state != STREAM_UNUSED) {
    160     CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED);
    161     CHECK(stream_info_map_[pipeline_id].parser.get());
    162     stream_info_map_[pipeline_id].parser.reset();
    163   }
    164   CHECK(!stream_info_map_[pipeline_id].parser.get());
    165   stream_info_map_.erase(pipeline_id);
    166 
    167   delegate_->OnPipelineHasCapacity(this);
    168 }
    169 
    170 int HttpPipelinedConnectionImpl::SendRequest(
    171     int pipeline_id,
    172     const std::string& request_line,
    173     const HttpRequestHeaders& headers,
    174     HttpResponseInfo* response,
    175     const CompletionCallback& callback) {
    176   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    177   CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND);
    178   if (!usable_) {
    179     return ERR_PIPELINE_EVICTION;
    180   }
    181 
    182   PendingSendRequest* send_request = new PendingSendRequest;
    183   send_request->pipeline_id = pipeline_id;
    184   send_request->request_line = request_line;
    185   send_request->headers = headers;
    186   send_request->response = response;
    187   send_request->callback = callback;
    188   pending_send_request_queue_.push(send_request);
    189 
    190   int rv;
    191   if (send_next_state_ == SEND_STATE_NONE) {
    192     send_next_state_ = SEND_STATE_START_IMMEDIATELY;
    193     rv = DoSendRequestLoop(OK);
    194   } else {
    195     rv = ERR_IO_PENDING;
    196   }
    197   ActivatePipeline();
    198   return rv;
    199 }
    200 
    201 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) {
    202   int rv = result;
    203   do {
    204     SendRequestState state = send_next_state_;
    205     send_next_state_ = SEND_STATE_NONE;
    206     switch (state) {
    207       case SEND_STATE_START_IMMEDIATELY:
    208         rv = DoStartRequestImmediately(rv);
    209         break;
    210       case SEND_STATE_START_NEXT_DEFERRED_REQUEST:
    211         rv = DoStartNextDeferredRequest(rv);
    212         break;
    213       case SEND_STATE_SEND_ACTIVE_REQUEST:
    214         rv = DoSendActiveRequest(rv);
    215         break;
    216       case SEND_STATE_COMPLETE:
    217         rv = DoSendComplete(rv);
    218         break;
    219       case SEND_STATE_EVICT_PENDING_REQUESTS:
    220         rv = DoEvictPendingSendRequests(rv);
    221         break;
    222       default:
    223         CHECK(false) << "bad send state: " << state;
    224         rv = ERR_FAILED;
    225         break;
    226     }
    227   } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE);
    228   send_still_on_call_stack_ = false;
    229   return rv;
    230 }
    231 
    232 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) {
    233   CHECK(active_send_request_.get());
    234   DoSendRequestLoop(result);
    235 }
    236 
    237 int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) {
    238   CHECK(!active_send_request_.get());
    239   CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size());
    240   // If SendRequest() completes synchronously, then we need to return the value
    241   // directly to the caller. |send_still_on_call_stack_| will track this.
    242   // Otherwise, asynchronous completions will notify the caller via callback.
    243   send_still_on_call_stack_ = true;
    244   active_send_request_.reset(pending_send_request_queue_.front());
    245   pending_send_request_queue_.pop();
    246   send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
    247   return OK;
    248 }
    249 
    250 int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) {
    251   CHECK(!send_still_on_call_stack_);
    252   CHECK(!active_send_request_.get());
    253 
    254   while (!pending_send_request_queue_.empty()) {
    255     scoped_ptr<PendingSendRequest> next_request(
    256         pending_send_request_queue_.front());
    257     pending_send_request_queue_.pop();
    258     CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id));
    259     if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) {
    260       active_send_request_.reset(next_request.release());
    261       send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST;
    262       return OK;
    263     }
    264   }
    265 
    266   send_next_state_ = SEND_STATE_NONE;
    267   return OK;
    268 }
    269 
    270 int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) {
    271   CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get());
    272   int rv = stream_info_map_[active_send_request_->pipeline_id].parser->
    273       SendRequest(active_send_request_->request_line,
    274                   active_send_request_->headers,
    275                   active_send_request_->response,
    276                   base::Bind(&HttpPipelinedConnectionImpl::OnSendIOCallback,
    277                              base::Unretained(this)));
    278   stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING;
    279   send_next_state_ = SEND_STATE_COMPLETE;
    280   return rv;
    281 }
    282 
    283 int HttpPipelinedConnectionImpl::DoSendComplete(int result) {
    284   CHECK(active_send_request_.get());
    285   CHECK_EQ(STREAM_SENDING,
    286            stream_info_map_[active_send_request_->pipeline_id].state);
    287 
    288   request_order_.push(active_send_request_->pipeline_id);
    289   stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT;
    290   net_log_.AddEvent(
    291       NetLog::TYPE_HTTP_PIPELINED_CONNECTION_SENT_REQUEST,
    292       stream_info_map_[active_send_request_->pipeline_id].source.
    293           ToEventParametersCallback());
    294 
    295   if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) {
    296     result = ERR_PIPELINE_EVICTION;
    297   }
    298   if (result < OK) {
    299     usable_ = false;
    300   }
    301 
    302   if (!send_still_on_call_stack_) {
    303     QueueUserCallback(active_send_request_->pipeline_id,
    304                       active_send_request_->callback, result, FROM_HERE);
    305   }
    306 
    307   active_send_request_.reset();
    308 
    309   if (send_still_on_call_stack_) {
    310     // It should be impossible for another request to appear on the queue while
    311     // this send was on the call stack.
    312     CHECK(pending_send_request_queue_.empty());
    313     send_next_state_ = SEND_STATE_NONE;
    314   } else if (!usable_) {
    315     send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
    316   } else {
    317     send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST;
    318   }
    319 
    320   return result;
    321 }
    322 
    323 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) {
    324   while (!pending_send_request_queue_.empty()) {
    325     scoped_ptr<PendingSendRequest> evicted_send(
    326         pending_send_request_queue_.front());
    327     pending_send_request_queue_.pop();
    328     if (ContainsKey(stream_info_map_, evicted_send->pipeline_id) &&
    329         stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) {
    330       evicted_send->callback.Run(ERR_PIPELINE_EVICTION);
    331     }
    332   }
    333   send_next_state_ = SEND_STATE_NONE;
    334   return result;
    335 }
    336 
    337 int HttpPipelinedConnectionImpl::ReadResponseHeaders(
    338     int pipeline_id, const CompletionCallback& callback) {
    339   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    340   CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state);
    341   CHECK(stream_info_map_[pipeline_id].read_headers_callback.is_null());
    342 
    343   if (!usable_)
    344     return ERR_PIPELINE_EVICTION;
    345 
    346   stream_info_map_[pipeline_id].state = STREAM_READ_PENDING;
    347   stream_info_map_[pipeline_id].read_headers_callback = callback;
    348   if (read_next_state_ == READ_STATE_NONE &&
    349       pipeline_id == request_order_.front()) {
    350     read_next_state_ = READ_STATE_START_IMMEDIATELY;
    351     return DoReadHeadersLoop(OK);
    352   }
    353   return ERR_IO_PENDING;
    354 }
    355 
    356 void HttpPipelinedConnectionImpl::StartNextDeferredRead() {
    357   if (read_next_state_ == READ_STATE_NONE) {
    358     read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ;
    359     DoReadHeadersLoop(OK);
    360   }
    361 }
    362 
    363 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) {
    364   int rv = result;
    365   do {
    366     ReadHeadersState state = read_next_state_;
    367     read_next_state_ = READ_STATE_NONE;
    368     switch (state) {
    369       case READ_STATE_START_IMMEDIATELY:
    370         rv = DoStartReadImmediately(rv);
    371         break;
    372       case READ_STATE_START_NEXT_DEFERRED_READ:
    373         rv = DoStartNextDeferredRead(rv);
    374         break;
    375       case READ_STATE_READ_HEADERS:
    376         rv = DoReadHeaders(rv);
    377         break;
    378       case READ_STATE_READ_HEADERS_COMPLETE:
    379         rv = DoReadHeadersComplete(rv);
    380         break;
    381       case READ_STATE_WAITING_FOR_CLOSE:
    382         // This is a holding state. We return instead of continuing to run hte
    383         // loop. The state will advance when the stream calls Close().
    384         rv = DoReadWaitForClose(rv);
    385         read_still_on_call_stack_ = false;
    386         return rv;
    387       case READ_STATE_STREAM_CLOSED:
    388         rv = DoReadStreamClosed();
    389         break;
    390       case READ_STATE_EVICT_PENDING_READS:
    391         rv = DoEvictPendingReadHeaders(rv);
    392         break;
    393       case READ_STATE_NONE:
    394         break;
    395       default:
    396         CHECK(false) << "bad read state";
    397         rv = ERR_FAILED;
    398         break;
    399     }
    400   } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE);
    401   read_still_on_call_stack_ = false;
    402   return rv;
    403 }
    404 
    405 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) {
    406   DoReadHeadersLoop(result);
    407 }
    408 
    409 int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) {
    410   CHECK(!active_read_id_);
    411   CHECK(!read_still_on_call_stack_);
    412   CHECK(!request_order_.empty());
    413   // If ReadResponseHeaders() completes synchronously, then we need to return
    414   // the value directly to the caller. |read_still_on_call_stack_| will track
    415   // this. Otherwise, asynchronous completions will notify the caller via
    416   // callback.
    417   read_still_on_call_stack_ = true;
    418   read_next_state_ = READ_STATE_READ_HEADERS;
    419   active_read_id_ = request_order_.front();
    420   request_order_.pop();
    421   return OK;
    422 }
    423 
    424 int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) {
    425   CHECK(!active_read_id_);
    426   CHECK(!read_still_on_call_stack_);
    427 
    428   if (request_order_.empty()) {
    429     read_next_state_ = READ_STATE_NONE;
    430     return OK;
    431   }
    432 
    433   int next_id = request_order_.front();
    434   CHECK(ContainsKey(stream_info_map_, next_id));
    435   switch (stream_info_map_[next_id].state) {
    436     case STREAM_READ_PENDING:
    437       read_next_state_ = READ_STATE_READ_HEADERS;
    438       active_read_id_ = next_id;
    439       request_order_.pop();
    440       break;
    441 
    442     case STREAM_CLOSED:
    443       // Since nobody will read whatever data is on the pipeline associated with
    444       // this closed request, we must shut down the rest of the pipeline.
    445       read_next_state_ = READ_STATE_EVICT_PENDING_READS;
    446       break;
    447 
    448     case STREAM_SENT:
    449       read_next_state_ = READ_STATE_NONE;
    450       break;
    451 
    452     default:
    453       CHECK(false) << "Unexpected read state: "
    454                    << stream_info_map_[next_id].state;
    455   }
    456 
    457   return OK;
    458 }
    459 
    460 int HttpPipelinedConnectionImpl::DoReadHeaders(int result) {
    461   CHECK(active_read_id_);
    462   CHECK(ContainsKey(stream_info_map_, active_read_id_));
    463   CHECK_EQ(STREAM_READ_PENDING, stream_info_map_[active_read_id_].state);
    464   stream_info_map_[active_read_id_].state = STREAM_ACTIVE;
    465   int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders(
    466       base::Bind(&HttpPipelinedConnectionImpl::OnReadIOCallback,
    467                  base::Unretained(this)));
    468   read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE;
    469   return rv;
    470 }
    471 
    472 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) {
    473   CHECK(active_read_id_);
    474   CHECK(ContainsKey(stream_info_map_, active_read_id_));
    475   CHECK_EQ(STREAM_ACTIVE, stream_info_map_[active_read_id_].state);
    476 
    477   read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
    478   if (result < OK) {
    479     if (completed_one_request_ &&
    480         (result == ERR_CONNECTION_CLOSED ||
    481          result == ERR_EMPTY_RESPONSE ||
    482          result == ERR_SOCKET_NOT_CONNECTED)) {
    483       // These usually indicate that pipelining failed on the server side. In
    484       // that case, we should retry without pipelining.
    485       result = ERR_PIPELINE_EVICTION;
    486     }
    487     usable_ = false;
    488   }
    489 
    490   CheckHeadersForPipelineCompatibility(active_read_id_, result);
    491 
    492   if (!read_still_on_call_stack_) {
    493     QueueUserCallback(active_read_id_,
    494                       stream_info_map_[active_read_id_].read_headers_callback,
    495                       result, FROM_HERE);
    496   }
    497 
    498   return result;
    499 }
    500 
    501 int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) {
    502   read_next_state_ = READ_STATE_WAITING_FOR_CLOSE;
    503   return result;
    504 }
    505 
    506 int HttpPipelinedConnectionImpl::DoReadStreamClosed() {
    507   CHECK(active_read_id_);
    508   CHECK(ContainsKey(stream_info_map_, active_read_id_));
    509   CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED);
    510   active_read_id_ = 0;
    511   if (!usable_) {
    512     // TODO(simonjam): Don't wait this long to evict.
    513     read_next_state_ = READ_STATE_EVICT_PENDING_READS;
    514     return OK;
    515   }
    516   completed_one_request_ = true;
    517   base::MessageLoop::current()->PostTask(
    518       FROM_HERE,
    519       base::Bind(&HttpPipelinedConnectionImpl::StartNextDeferredRead,
    520                  weak_factory_.GetWeakPtr()));
    521   read_next_state_ = READ_STATE_NONE;
    522   return OK;
    523 }
    524 
    525 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) {
    526   while (!request_order_.empty()) {
    527     int evicted_id = request_order_.front();
    528     request_order_.pop();
    529     if (!ContainsKey(stream_info_map_, evicted_id)) {
    530       continue;
    531     }
    532     if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) {
    533       stream_info_map_[evicted_id].state = STREAM_READ_EVICTED;
    534       stream_info_map_[evicted_id].read_headers_callback.Run(
    535           ERR_PIPELINE_EVICTION);
    536     }
    537   }
    538   read_next_state_ = READ_STATE_NONE;
    539   return result;
    540 }
    541 
    542 void HttpPipelinedConnectionImpl::Close(int pipeline_id,
    543                                         bool not_reusable) {
    544   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    545   net_log_.AddEvent(
    546       NetLog::TYPE_HTTP_PIPELINED_CONNECTION_STREAM_CLOSED,
    547       base::Bind(&NetLogStreamClosedCallback,
    548                  stream_info_map_[pipeline_id].source, not_reusable));
    549   switch (stream_info_map_[pipeline_id].state) {
    550     case STREAM_CREATED:
    551       stream_info_map_[pipeline_id].state = STREAM_UNUSED;
    552       break;
    553 
    554     case STREAM_BOUND:
    555       stream_info_map_[pipeline_id].state = STREAM_CLOSED;
    556       break;
    557 
    558     case STREAM_SENDING:
    559       usable_ = false;
    560       stream_info_map_[pipeline_id].state = STREAM_CLOSED;
    561       active_send_request_.reset();
    562       send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS;
    563       DoSendRequestLoop(OK);
    564       break;
    565 
    566     case STREAM_SENT:
    567     case STREAM_READ_PENDING:
    568       usable_ = false;
    569       stream_info_map_[pipeline_id].state = STREAM_CLOSED;
    570       if (!request_order_.empty() &&
    571           pipeline_id == request_order_.front() &&
    572           read_next_state_ == READ_STATE_NONE) {
    573         read_next_state_ = READ_STATE_EVICT_PENDING_READS;
    574         DoReadHeadersLoop(OK);
    575       }
    576       break;
    577 
    578     case STREAM_ACTIVE:
    579       stream_info_map_[pipeline_id].state = STREAM_CLOSED;
    580       if (not_reusable) {
    581         usable_ = false;
    582       }
    583       read_next_state_ = READ_STATE_STREAM_CLOSED;
    584       DoReadHeadersLoop(OK);
    585       break;
    586 
    587     case STREAM_READ_EVICTED:
    588       stream_info_map_[pipeline_id].state = STREAM_CLOSED;
    589       break;
    590 
    591     case STREAM_CLOSED:
    592     case STREAM_UNUSED:
    593       // TODO(simonjam): Why is Close() sometimes called twice?
    594       break;
    595 
    596     default:
    597       CHECK(false);
    598       break;
    599   }
    600 }
    601 
    602 int HttpPipelinedConnectionImpl::ReadResponseBody(
    603     int pipeline_id, IOBuffer* buf, int buf_len,
    604     const CompletionCallback& callback) {
    605   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    606   CHECK_EQ(active_read_id_, pipeline_id);
    607   CHECK(stream_info_map_[pipeline_id].parser.get());
    608   return stream_info_map_[pipeline_id].parser->ReadResponseBody(
    609       buf, buf_len, callback);
    610 }
    611 
    612 UploadProgress HttpPipelinedConnectionImpl::GetUploadProgress(
    613     int pipeline_id) const {
    614   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    615   CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
    616   return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress();
    617 }
    618 
    619 HttpResponseInfo* HttpPipelinedConnectionImpl::GetResponseInfo(
    620     int pipeline_id) {
    621   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    622   CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
    623   return stream_info_map_.find(pipeline_id)->second.parser->GetResponseInfo();
    624 }
    625 
    626 bool HttpPipelinedConnectionImpl::IsResponseBodyComplete(
    627     int pipeline_id) const {
    628   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    629   CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
    630   return stream_info_map_.find(pipeline_id)->second.parser->
    631       IsResponseBodyComplete();
    632 }
    633 
    634 bool HttpPipelinedConnectionImpl::CanFindEndOfResponse(int pipeline_id) const {
    635   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    636   CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
    637   return stream_info_map_.find(pipeline_id)->second.parser->
    638       CanFindEndOfResponse();
    639 }
    640 
    641 bool HttpPipelinedConnectionImpl::IsConnectionReused(int pipeline_id) const {
    642   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    643   if (pipeline_id > 1) {
    644     return true;
    645   }
    646   ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type();
    647   return connection_->is_reused() ||
    648          reuse_type == ClientSocketHandle::UNUSED_IDLE;
    649 }
    650 
    651 void HttpPipelinedConnectionImpl::SetConnectionReused(int pipeline_id) {
    652   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    653   connection_->set_is_reused(true);
    654 }
    655 
    656 int64 HttpPipelinedConnectionImpl::GetTotalReceivedBytes(
    657     int pipeline_id) const {
    658   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    659   CHECK(stream_info_map_.find(pipeline_id)->second.parser.get());
    660   return stream_info_map_.find(pipeline_id)->second.parser->received_bytes();
    661 }
    662 
    663 bool HttpPipelinedConnectionImpl::GetLoadTimingInfo(
    664     int pipeline_id, LoadTimingInfo* load_timing_info) const {
    665   return connection_->GetLoadTimingInfo(IsConnectionReused(pipeline_id),
    666                                         load_timing_info);
    667 }
    668 
    669 void HttpPipelinedConnectionImpl::GetSSLInfo(int pipeline_id,
    670                                              SSLInfo* ssl_info) {
    671   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    672   CHECK(stream_info_map_[pipeline_id].parser.get());
    673   stream_info_map_[pipeline_id].parser->GetSSLInfo(ssl_info);
    674 }
    675 
    676 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo(
    677     int pipeline_id,
    678     SSLCertRequestInfo* cert_request_info) {
    679   CHECK(ContainsKey(stream_info_map_, pipeline_id));
    680   CHECK(stream_info_map_[pipeline_id].parser.get());
    681   stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo(
    682       cert_request_info);
    683 }
    684 
    685 void HttpPipelinedConnectionImpl::Drain(HttpPipelinedStream* stream,
    686                                         HttpNetworkSession* session) {
    687   HttpResponseHeaders* headers = stream->GetResponseInfo()->headers.get();
    688   if (!stream->CanFindEndOfResponse() || headers->IsChunkEncoded() ||
    689       !usable_) {
    690     // TODO(simonjam): Drain chunk-encoded responses if they're relatively
    691     // common.
    692     stream->Close(true);
    693     delete stream;
    694     return;
    695   }
    696   HttpResponseBodyDrainer* drainer = new HttpResponseBodyDrainer(stream);
    697   drainer->StartWithSize(session, headers->GetContentLength());
    698   // |drainer| will delete itself when done.
    699 }
    700 
    701 void HttpPipelinedConnectionImpl::CheckHeadersForPipelineCompatibility(
    702     int pipeline_id,
    703     int result) {
    704   if (result < OK) {
    705     switch (result) {
    706       // TODO(simonjam): Ignoring specific errors like this may not work.
    707       // Collect metrics to see if this code is useful.
    708       case ERR_ABORTED:
    709       case ERR_INTERNET_DISCONNECTED:
    710       case ERR_NETWORK_CHANGED:
    711         // These errors are no fault of the server.
    712         break;
    713 
    714       default:
    715         ReportPipelineFeedback(pipeline_id, PIPELINE_SOCKET_ERROR);
    716         break;
    717     }
    718     return;
    719   }
    720   HttpResponseInfo* info = GetResponseInfo(pipeline_id);
    721   const HttpVersion required_version(1, 1);
    722   if (info->headers->GetParsedHttpVersion() < required_version) {
    723     ReportPipelineFeedback(pipeline_id, OLD_HTTP_VERSION);
    724     return;
    725   }
    726   if (!info->headers->IsKeepAlive() || !CanFindEndOfResponse(pipeline_id)) {
    727     usable_ = false;
    728     ReportPipelineFeedback(pipeline_id, MUST_CLOSE_CONNECTION);
    729     return;
    730   }
    731   if (info->headers->HasHeader(
    732       HttpAuth::GetChallengeHeaderName(HttpAuth::AUTH_SERVER))) {
    733     ReportPipelineFeedback(pipeline_id, AUTHENTICATION_REQUIRED);
    734     return;
    735   }
    736   ReportPipelineFeedback(pipeline_id, OK);
    737 }
    738 
    739 void HttpPipelinedConnectionImpl::ReportPipelineFeedback(int pipeline_id,
    740                                                          Feedback feedback) {
    741   std::string feedback_str;
    742   switch (feedback) {
    743     case OK:
    744       feedback_str = "OK";
    745       break;
    746 
    747     case PIPELINE_SOCKET_ERROR:
    748       feedback_str = "PIPELINE_SOCKET_ERROR";
    749       break;
    750 
    751     case OLD_HTTP_VERSION:
    752       feedback_str = "OLD_HTTP_VERSION";
    753       break;
    754 
    755     case MUST_CLOSE_CONNECTION:
    756       feedback_str = "MUST_CLOSE_CONNECTION";
    757       break;
    758 
    759     case AUTHENTICATION_REQUIRED:
    760       feedback_str = "AUTHENTICATION_REQUIRED";
    761       break;
    762 
    763     default:
    764       NOTREACHED();
    765       feedback_str = "UNKNOWN";
    766       break;
    767   }
    768   net_log_.AddEvent(
    769       NetLog::TYPE_HTTP_PIPELINED_CONNECTION_RECEIVED_HEADERS,
    770       base::Bind(&NetLogReceivedHeadersCallback,
    771                  stream_info_map_[pipeline_id].source, &feedback_str));
    772   delegate_->OnPipelineFeedback(this, feedback);
    773 }
    774 
    775 void HttpPipelinedConnectionImpl::QueueUserCallback(
    776     int pipeline_id, const CompletionCallback& callback, int rv,
    777     const tracked_objects::Location& from_here) {
    778   CHECK(stream_info_map_[pipeline_id].pending_user_callback.is_null());
    779   stream_info_map_[pipeline_id].pending_user_callback = callback;
    780   base::MessageLoop::current()->PostTask(
    781       from_here,
    782       base::Bind(&HttpPipelinedConnectionImpl::FireUserCallback,
    783                  weak_factory_.GetWeakPtr(), pipeline_id, rv));
    784 }
    785 
    786 void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id,
    787                                                    int result) {
    788   if (ContainsKey(stream_info_map_, pipeline_id)) {
    789     CHECK(!stream_info_map_[pipeline_id].pending_user_callback.is_null());
    790     CompletionCallback callback =
    791         stream_info_map_[pipeline_id].pending_user_callback;
    792     stream_info_map_[pipeline_id].pending_user_callback.Reset();
    793     callback.Run(result);
    794   }
    795 }
    796 
    797 int HttpPipelinedConnectionImpl::depth() const {
    798   return stream_info_map_.size();
    799 }
    800 
    801 bool HttpPipelinedConnectionImpl::usable() const {
    802   return usable_;
    803 }
    804 
    805 bool HttpPipelinedConnectionImpl::active() const {
    806   return active_;
    807 }
    808 
    809 const SSLConfig& HttpPipelinedConnectionImpl::used_ssl_config() const {
    810   return used_ssl_config_;
    811 }
    812 
    813 const ProxyInfo& HttpPipelinedConnectionImpl::used_proxy_info() const {
    814   return used_proxy_info_;
    815 }
    816 
    817 const BoundNetLog& HttpPipelinedConnectionImpl::net_log() const {
    818   return net_log_;
    819 }
    820 
    821 bool HttpPipelinedConnectionImpl::was_npn_negotiated() const {
    822   return was_npn_negotiated_;
    823 }
    824 
    825 NextProto HttpPipelinedConnectionImpl::protocol_negotiated()
    826     const {
    827   return protocol_negotiated_;
    828 }
    829 
    830 HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest()
    831     : pipeline_id(0),
    832       response(NULL) {
    833 }
    834 
    835 HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() {
    836 }
    837 
    838 HttpPipelinedConnectionImpl::StreamInfo::StreamInfo()
    839     : state(STREAM_CREATED) {
    840 }
    841 
    842 HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() {
    843 }
    844 
    845 }  // namespace net
    846