1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/http/http_pipelined_connection_impl.h" 6 7 #include "base/bind.h" 8 #include "base/bind_helpers.h" 9 #include "base/message_loop/message_loop.h" 10 #include "base/stl_util.h" 11 #include "base/values.h" 12 #include "net/base/io_buffer.h" 13 #include "net/http/http_pipelined_stream.h" 14 #include "net/http/http_request_info.h" 15 #include "net/http/http_response_body_drainer.h" 16 #include "net/http/http_response_headers.h" 17 #include "net/http/http_stream_parser.h" 18 #include "net/http/http_version.h" 19 #include "net/socket/client_socket_handle.h" 20 21 namespace net { 22 23 namespace { 24 25 base::Value* NetLogReceivedHeadersCallback(const NetLog::Source& source, 26 const std::string* feedback, 27 NetLog::LogLevel /* log_level */) { 28 base::DictionaryValue* dict = new base::DictionaryValue; 29 source.AddToEventParameters(dict); 30 dict->SetString("feedback", *feedback); 31 return dict; 32 } 33 34 base::Value* NetLogStreamClosedCallback(const NetLog::Source& source, 35 bool not_reusable, 36 NetLog::LogLevel /* log_level */) { 37 base::DictionaryValue* dict = new base::DictionaryValue; 38 source.AddToEventParameters(dict); 39 dict->SetBoolean("not_reusable", not_reusable); 40 return dict; 41 } 42 43 base::Value* NetLogHostPortPairCallback(const HostPortPair* host_port_pair, 44 NetLog::LogLevel /* log_level */) { 45 base::DictionaryValue* dict = new base::DictionaryValue; 46 dict->SetString("host_and_port", host_port_pair->ToString()); 47 return dict; 48 } 49 50 } // anonymous namespace 51 52 HttpPipelinedConnection* 53 HttpPipelinedConnectionImpl::Factory::CreateNewPipeline( 54 ClientSocketHandle* connection, 55 HttpPipelinedConnection::Delegate* delegate, 56 const HostPortPair& origin, 57 const SSLConfig& used_ssl_config, 58 const ProxyInfo& used_proxy_info, 59 const BoundNetLog& net_log, 60 bool was_npn_negotiated, 61 NextProto protocol_negotiated) { 62 return new HttpPipelinedConnectionImpl(connection, delegate, origin, 63 used_ssl_config, used_proxy_info, 64 net_log, was_npn_negotiated, 65 protocol_negotiated); 66 } 67 68 HttpPipelinedConnectionImpl::HttpPipelinedConnectionImpl( 69 ClientSocketHandle* connection, 70 HttpPipelinedConnection::Delegate* delegate, 71 const HostPortPair& origin, 72 const SSLConfig& used_ssl_config, 73 const ProxyInfo& used_proxy_info, 74 const BoundNetLog& net_log, 75 bool was_npn_negotiated, 76 NextProto protocol_negotiated) 77 : delegate_(delegate), 78 connection_(connection), 79 used_ssl_config_(used_ssl_config), 80 used_proxy_info_(used_proxy_info), 81 net_log_(BoundNetLog::Make(net_log.net_log(), 82 NetLog::SOURCE_HTTP_PIPELINED_CONNECTION)), 83 was_npn_negotiated_(was_npn_negotiated), 84 protocol_negotiated_(protocol_negotiated), 85 read_buf_(new GrowableIOBuffer()), 86 next_pipeline_id_(1), 87 active_(false), 88 usable_(true), 89 completed_one_request_(false), 90 weak_factory_(this), 91 send_next_state_(SEND_STATE_NONE), 92 send_still_on_call_stack_(false), 93 read_next_state_(READ_STATE_NONE), 94 active_read_id_(0), 95 read_still_on_call_stack_(false) { 96 CHECK(connection_.get()); 97 net_log_.BeginEvent( 98 NetLog::TYPE_HTTP_PIPELINED_CONNECTION, 99 base::Bind(&NetLogHostPortPairCallback, &origin)); 100 } 101 102 HttpPipelinedConnectionImpl::~HttpPipelinedConnectionImpl() { 103 CHECK_EQ(depth(), 0); 104 CHECK(stream_info_map_.empty()); 105 CHECK(pending_send_request_queue_.empty()); 106 CHECK(request_order_.empty()); 107 CHECK_EQ(send_next_state_, SEND_STATE_NONE); 108 CHECK_EQ(read_next_state_, READ_STATE_NONE); 109 CHECK(!active_send_request_.get()); 110 CHECK(!active_read_id_); 111 if (!usable_) { 112 connection_->socket()->Disconnect(); 113 } 114 connection_->Reset(); 115 net_log_.EndEvent(NetLog::TYPE_HTTP_PIPELINED_CONNECTION); 116 } 117 118 HttpPipelinedStream* HttpPipelinedConnectionImpl::CreateNewStream() { 119 int pipeline_id = next_pipeline_id_++; 120 CHECK(pipeline_id); 121 HttpPipelinedStream* stream = new HttpPipelinedStream(this, pipeline_id); 122 stream_info_map_.insert(std::make_pair(pipeline_id, StreamInfo())); 123 return stream; 124 } 125 126 void HttpPipelinedConnectionImpl::InitializeParser( 127 int pipeline_id, 128 const HttpRequestInfo* request, 129 const BoundNetLog& net_log) { 130 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 131 CHECK(!stream_info_map_[pipeline_id].parser.get()); 132 stream_info_map_[pipeline_id].state = STREAM_BOUND; 133 stream_info_map_[pipeline_id].parser.reset(new HttpStreamParser( 134 connection_.get(), request, read_buf_.get(), net_log)); 135 stream_info_map_[pipeline_id].source = net_log.source(); 136 137 // In case our first stream doesn't SendRequest() immediately, we should still 138 // allow others to use this pipeline. 139 if (pipeline_id == 1) { 140 base::MessageLoop::current()->PostTask( 141 FROM_HERE, 142 base::Bind(&HttpPipelinedConnectionImpl::ActivatePipeline, 143 weak_factory_.GetWeakPtr())); 144 } 145 } 146 147 void HttpPipelinedConnectionImpl::ActivatePipeline() { 148 if (!active_) { 149 active_ = true; 150 delegate_->OnPipelineHasCapacity(this); 151 } 152 } 153 154 void HttpPipelinedConnectionImpl::OnStreamDeleted(int pipeline_id) { 155 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 156 Close(pipeline_id, false); 157 158 if (stream_info_map_[pipeline_id].state != STREAM_CREATED && 159 stream_info_map_[pipeline_id].state != STREAM_UNUSED) { 160 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_CLOSED); 161 CHECK(stream_info_map_[pipeline_id].parser.get()); 162 stream_info_map_[pipeline_id].parser.reset(); 163 } 164 CHECK(!stream_info_map_[pipeline_id].parser.get()); 165 stream_info_map_.erase(pipeline_id); 166 167 delegate_->OnPipelineHasCapacity(this); 168 } 169 170 int HttpPipelinedConnectionImpl::SendRequest( 171 int pipeline_id, 172 const std::string& request_line, 173 const HttpRequestHeaders& headers, 174 HttpResponseInfo* response, 175 const CompletionCallback& callback) { 176 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 177 CHECK_EQ(stream_info_map_[pipeline_id].state, STREAM_BOUND); 178 if (!usable_) { 179 return ERR_PIPELINE_EVICTION; 180 } 181 182 PendingSendRequest* send_request = new PendingSendRequest; 183 send_request->pipeline_id = pipeline_id; 184 send_request->request_line = request_line; 185 send_request->headers = headers; 186 send_request->response = response; 187 send_request->callback = callback; 188 pending_send_request_queue_.push(send_request); 189 190 int rv; 191 if (send_next_state_ == SEND_STATE_NONE) { 192 send_next_state_ = SEND_STATE_START_IMMEDIATELY; 193 rv = DoSendRequestLoop(OK); 194 } else { 195 rv = ERR_IO_PENDING; 196 } 197 ActivatePipeline(); 198 return rv; 199 } 200 201 int HttpPipelinedConnectionImpl::DoSendRequestLoop(int result) { 202 int rv = result; 203 do { 204 SendRequestState state = send_next_state_; 205 send_next_state_ = SEND_STATE_NONE; 206 switch (state) { 207 case SEND_STATE_START_IMMEDIATELY: 208 rv = DoStartRequestImmediately(rv); 209 break; 210 case SEND_STATE_START_NEXT_DEFERRED_REQUEST: 211 rv = DoStartNextDeferredRequest(rv); 212 break; 213 case SEND_STATE_SEND_ACTIVE_REQUEST: 214 rv = DoSendActiveRequest(rv); 215 break; 216 case SEND_STATE_COMPLETE: 217 rv = DoSendComplete(rv); 218 break; 219 case SEND_STATE_EVICT_PENDING_REQUESTS: 220 rv = DoEvictPendingSendRequests(rv); 221 break; 222 default: 223 CHECK(false) << "bad send state: " << state; 224 rv = ERR_FAILED; 225 break; 226 } 227 } while (rv != ERR_IO_PENDING && send_next_state_ != SEND_STATE_NONE); 228 send_still_on_call_stack_ = false; 229 return rv; 230 } 231 232 void HttpPipelinedConnectionImpl::OnSendIOCallback(int result) { 233 CHECK(active_send_request_.get()); 234 DoSendRequestLoop(result); 235 } 236 237 int HttpPipelinedConnectionImpl::DoStartRequestImmediately(int result) { 238 CHECK(!active_send_request_.get()); 239 CHECK_EQ(static_cast<size_t>(1), pending_send_request_queue_.size()); 240 // If SendRequest() completes synchronously, then we need to return the value 241 // directly to the caller. |send_still_on_call_stack_| will track this. 242 // Otherwise, asynchronous completions will notify the caller via callback. 243 send_still_on_call_stack_ = true; 244 active_send_request_.reset(pending_send_request_queue_.front()); 245 pending_send_request_queue_.pop(); 246 send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST; 247 return OK; 248 } 249 250 int HttpPipelinedConnectionImpl::DoStartNextDeferredRequest(int result) { 251 CHECK(!send_still_on_call_stack_); 252 CHECK(!active_send_request_.get()); 253 254 while (!pending_send_request_queue_.empty()) { 255 scoped_ptr<PendingSendRequest> next_request( 256 pending_send_request_queue_.front()); 257 pending_send_request_queue_.pop(); 258 CHECK(ContainsKey(stream_info_map_, next_request->pipeline_id)); 259 if (stream_info_map_[next_request->pipeline_id].state != STREAM_CLOSED) { 260 active_send_request_.reset(next_request.release()); 261 send_next_state_ = SEND_STATE_SEND_ACTIVE_REQUEST; 262 return OK; 263 } 264 } 265 266 send_next_state_ = SEND_STATE_NONE; 267 return OK; 268 } 269 270 int HttpPipelinedConnectionImpl::DoSendActiveRequest(int result) { 271 CHECK(stream_info_map_[active_send_request_->pipeline_id].parser.get()); 272 int rv = stream_info_map_[active_send_request_->pipeline_id].parser-> 273 SendRequest(active_send_request_->request_line, 274 active_send_request_->headers, 275 active_send_request_->response, 276 base::Bind(&HttpPipelinedConnectionImpl::OnSendIOCallback, 277 base::Unretained(this))); 278 stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENDING; 279 send_next_state_ = SEND_STATE_COMPLETE; 280 return rv; 281 } 282 283 int HttpPipelinedConnectionImpl::DoSendComplete(int result) { 284 CHECK(active_send_request_.get()); 285 CHECK_EQ(STREAM_SENDING, 286 stream_info_map_[active_send_request_->pipeline_id].state); 287 288 request_order_.push(active_send_request_->pipeline_id); 289 stream_info_map_[active_send_request_->pipeline_id].state = STREAM_SENT; 290 net_log_.AddEvent( 291 NetLog::TYPE_HTTP_PIPELINED_CONNECTION_SENT_REQUEST, 292 stream_info_map_[active_send_request_->pipeline_id].source. 293 ToEventParametersCallback()); 294 295 if (result == ERR_SOCKET_NOT_CONNECTED && completed_one_request_) { 296 result = ERR_PIPELINE_EVICTION; 297 } 298 if (result < OK) { 299 usable_ = false; 300 } 301 302 if (!send_still_on_call_stack_) { 303 QueueUserCallback(active_send_request_->pipeline_id, 304 active_send_request_->callback, result, FROM_HERE); 305 } 306 307 active_send_request_.reset(); 308 309 if (send_still_on_call_stack_) { 310 // It should be impossible for another request to appear on the queue while 311 // this send was on the call stack. 312 CHECK(pending_send_request_queue_.empty()); 313 send_next_state_ = SEND_STATE_NONE; 314 } else if (!usable_) { 315 send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS; 316 } else { 317 send_next_state_ = SEND_STATE_START_NEXT_DEFERRED_REQUEST; 318 } 319 320 return result; 321 } 322 323 int HttpPipelinedConnectionImpl::DoEvictPendingSendRequests(int result) { 324 while (!pending_send_request_queue_.empty()) { 325 scoped_ptr<PendingSendRequest> evicted_send( 326 pending_send_request_queue_.front()); 327 pending_send_request_queue_.pop(); 328 if (ContainsKey(stream_info_map_, evicted_send->pipeline_id) && 329 stream_info_map_[evicted_send->pipeline_id].state != STREAM_CLOSED) { 330 evicted_send->callback.Run(ERR_PIPELINE_EVICTION); 331 } 332 } 333 send_next_state_ = SEND_STATE_NONE; 334 return result; 335 } 336 337 int HttpPipelinedConnectionImpl::ReadResponseHeaders( 338 int pipeline_id, const CompletionCallback& callback) { 339 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 340 CHECK_EQ(STREAM_SENT, stream_info_map_[pipeline_id].state); 341 CHECK(stream_info_map_[pipeline_id].read_headers_callback.is_null()); 342 343 if (!usable_) 344 return ERR_PIPELINE_EVICTION; 345 346 stream_info_map_[pipeline_id].state = STREAM_READ_PENDING; 347 stream_info_map_[pipeline_id].read_headers_callback = callback; 348 if (read_next_state_ == READ_STATE_NONE && 349 pipeline_id == request_order_.front()) { 350 read_next_state_ = READ_STATE_START_IMMEDIATELY; 351 return DoReadHeadersLoop(OK); 352 } 353 return ERR_IO_PENDING; 354 } 355 356 void HttpPipelinedConnectionImpl::StartNextDeferredRead() { 357 if (read_next_state_ == READ_STATE_NONE) { 358 read_next_state_ = READ_STATE_START_NEXT_DEFERRED_READ; 359 DoReadHeadersLoop(OK); 360 } 361 } 362 363 int HttpPipelinedConnectionImpl::DoReadHeadersLoop(int result) { 364 int rv = result; 365 do { 366 ReadHeadersState state = read_next_state_; 367 read_next_state_ = READ_STATE_NONE; 368 switch (state) { 369 case READ_STATE_START_IMMEDIATELY: 370 rv = DoStartReadImmediately(rv); 371 break; 372 case READ_STATE_START_NEXT_DEFERRED_READ: 373 rv = DoStartNextDeferredRead(rv); 374 break; 375 case READ_STATE_READ_HEADERS: 376 rv = DoReadHeaders(rv); 377 break; 378 case READ_STATE_READ_HEADERS_COMPLETE: 379 rv = DoReadHeadersComplete(rv); 380 break; 381 case READ_STATE_WAITING_FOR_CLOSE: 382 // This is a holding state. We return instead of continuing to run hte 383 // loop. The state will advance when the stream calls Close(). 384 rv = DoReadWaitForClose(rv); 385 read_still_on_call_stack_ = false; 386 return rv; 387 case READ_STATE_STREAM_CLOSED: 388 rv = DoReadStreamClosed(); 389 break; 390 case READ_STATE_EVICT_PENDING_READS: 391 rv = DoEvictPendingReadHeaders(rv); 392 break; 393 case READ_STATE_NONE: 394 break; 395 default: 396 CHECK(false) << "bad read state"; 397 rv = ERR_FAILED; 398 break; 399 } 400 } while (rv != ERR_IO_PENDING && read_next_state_ != READ_STATE_NONE); 401 read_still_on_call_stack_ = false; 402 return rv; 403 } 404 405 void HttpPipelinedConnectionImpl::OnReadIOCallback(int result) { 406 DoReadHeadersLoop(result); 407 } 408 409 int HttpPipelinedConnectionImpl::DoStartReadImmediately(int result) { 410 CHECK(!active_read_id_); 411 CHECK(!read_still_on_call_stack_); 412 CHECK(!request_order_.empty()); 413 // If ReadResponseHeaders() completes synchronously, then we need to return 414 // the value directly to the caller. |read_still_on_call_stack_| will track 415 // this. Otherwise, asynchronous completions will notify the caller via 416 // callback. 417 read_still_on_call_stack_ = true; 418 read_next_state_ = READ_STATE_READ_HEADERS; 419 active_read_id_ = request_order_.front(); 420 request_order_.pop(); 421 return OK; 422 } 423 424 int HttpPipelinedConnectionImpl::DoStartNextDeferredRead(int result) { 425 CHECK(!active_read_id_); 426 CHECK(!read_still_on_call_stack_); 427 428 if (request_order_.empty()) { 429 read_next_state_ = READ_STATE_NONE; 430 return OK; 431 } 432 433 int next_id = request_order_.front(); 434 CHECK(ContainsKey(stream_info_map_, next_id)); 435 switch (stream_info_map_[next_id].state) { 436 case STREAM_READ_PENDING: 437 read_next_state_ = READ_STATE_READ_HEADERS; 438 active_read_id_ = next_id; 439 request_order_.pop(); 440 break; 441 442 case STREAM_CLOSED: 443 // Since nobody will read whatever data is on the pipeline associated with 444 // this closed request, we must shut down the rest of the pipeline. 445 read_next_state_ = READ_STATE_EVICT_PENDING_READS; 446 break; 447 448 case STREAM_SENT: 449 read_next_state_ = READ_STATE_NONE; 450 break; 451 452 default: 453 CHECK(false) << "Unexpected read state: " 454 << stream_info_map_[next_id].state; 455 } 456 457 return OK; 458 } 459 460 int HttpPipelinedConnectionImpl::DoReadHeaders(int result) { 461 CHECK(active_read_id_); 462 CHECK(ContainsKey(stream_info_map_, active_read_id_)); 463 CHECK_EQ(STREAM_READ_PENDING, stream_info_map_[active_read_id_].state); 464 stream_info_map_[active_read_id_].state = STREAM_ACTIVE; 465 int rv = stream_info_map_[active_read_id_].parser->ReadResponseHeaders( 466 base::Bind(&HttpPipelinedConnectionImpl::OnReadIOCallback, 467 base::Unretained(this))); 468 read_next_state_ = READ_STATE_READ_HEADERS_COMPLETE; 469 return rv; 470 } 471 472 int HttpPipelinedConnectionImpl::DoReadHeadersComplete(int result) { 473 CHECK(active_read_id_); 474 CHECK(ContainsKey(stream_info_map_, active_read_id_)); 475 CHECK_EQ(STREAM_ACTIVE, stream_info_map_[active_read_id_].state); 476 477 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; 478 if (result < OK) { 479 if (completed_one_request_ && 480 (result == ERR_CONNECTION_CLOSED || 481 result == ERR_EMPTY_RESPONSE || 482 result == ERR_SOCKET_NOT_CONNECTED)) { 483 // These usually indicate that pipelining failed on the server side. In 484 // that case, we should retry without pipelining. 485 result = ERR_PIPELINE_EVICTION; 486 } 487 usable_ = false; 488 } 489 490 CheckHeadersForPipelineCompatibility(active_read_id_, result); 491 492 if (!read_still_on_call_stack_) { 493 QueueUserCallback(active_read_id_, 494 stream_info_map_[active_read_id_].read_headers_callback, 495 result, FROM_HERE); 496 } 497 498 return result; 499 } 500 501 int HttpPipelinedConnectionImpl::DoReadWaitForClose(int result) { 502 read_next_state_ = READ_STATE_WAITING_FOR_CLOSE; 503 return result; 504 } 505 506 int HttpPipelinedConnectionImpl::DoReadStreamClosed() { 507 CHECK(active_read_id_); 508 CHECK(ContainsKey(stream_info_map_, active_read_id_)); 509 CHECK_EQ(stream_info_map_[active_read_id_].state, STREAM_CLOSED); 510 active_read_id_ = 0; 511 if (!usable_) { 512 // TODO(simonjam): Don't wait this long to evict. 513 read_next_state_ = READ_STATE_EVICT_PENDING_READS; 514 return OK; 515 } 516 completed_one_request_ = true; 517 base::MessageLoop::current()->PostTask( 518 FROM_HERE, 519 base::Bind(&HttpPipelinedConnectionImpl::StartNextDeferredRead, 520 weak_factory_.GetWeakPtr())); 521 read_next_state_ = READ_STATE_NONE; 522 return OK; 523 } 524 525 int HttpPipelinedConnectionImpl::DoEvictPendingReadHeaders(int result) { 526 while (!request_order_.empty()) { 527 int evicted_id = request_order_.front(); 528 request_order_.pop(); 529 if (!ContainsKey(stream_info_map_, evicted_id)) { 530 continue; 531 } 532 if (stream_info_map_[evicted_id].state == STREAM_READ_PENDING) { 533 stream_info_map_[evicted_id].state = STREAM_READ_EVICTED; 534 stream_info_map_[evicted_id].read_headers_callback.Run( 535 ERR_PIPELINE_EVICTION); 536 } 537 } 538 read_next_state_ = READ_STATE_NONE; 539 return result; 540 } 541 542 void HttpPipelinedConnectionImpl::Close(int pipeline_id, 543 bool not_reusable) { 544 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 545 net_log_.AddEvent( 546 NetLog::TYPE_HTTP_PIPELINED_CONNECTION_STREAM_CLOSED, 547 base::Bind(&NetLogStreamClosedCallback, 548 stream_info_map_[pipeline_id].source, not_reusable)); 549 switch (stream_info_map_[pipeline_id].state) { 550 case STREAM_CREATED: 551 stream_info_map_[pipeline_id].state = STREAM_UNUSED; 552 break; 553 554 case STREAM_BOUND: 555 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 556 break; 557 558 case STREAM_SENDING: 559 usable_ = false; 560 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 561 active_send_request_.reset(); 562 send_next_state_ = SEND_STATE_EVICT_PENDING_REQUESTS; 563 DoSendRequestLoop(OK); 564 break; 565 566 case STREAM_SENT: 567 case STREAM_READ_PENDING: 568 usable_ = false; 569 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 570 if (!request_order_.empty() && 571 pipeline_id == request_order_.front() && 572 read_next_state_ == READ_STATE_NONE) { 573 read_next_state_ = READ_STATE_EVICT_PENDING_READS; 574 DoReadHeadersLoop(OK); 575 } 576 break; 577 578 case STREAM_ACTIVE: 579 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 580 if (not_reusable) { 581 usable_ = false; 582 } 583 read_next_state_ = READ_STATE_STREAM_CLOSED; 584 DoReadHeadersLoop(OK); 585 break; 586 587 case STREAM_READ_EVICTED: 588 stream_info_map_[pipeline_id].state = STREAM_CLOSED; 589 break; 590 591 case STREAM_CLOSED: 592 case STREAM_UNUSED: 593 // TODO(simonjam): Why is Close() sometimes called twice? 594 break; 595 596 default: 597 CHECK(false); 598 break; 599 } 600 } 601 602 int HttpPipelinedConnectionImpl::ReadResponseBody( 603 int pipeline_id, IOBuffer* buf, int buf_len, 604 const CompletionCallback& callback) { 605 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 606 CHECK_EQ(active_read_id_, pipeline_id); 607 CHECK(stream_info_map_[pipeline_id].parser.get()); 608 return stream_info_map_[pipeline_id].parser->ReadResponseBody( 609 buf, buf_len, callback); 610 } 611 612 UploadProgress HttpPipelinedConnectionImpl::GetUploadProgress( 613 int pipeline_id) const { 614 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 615 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); 616 return stream_info_map_.find(pipeline_id)->second.parser->GetUploadProgress(); 617 } 618 619 HttpResponseInfo* HttpPipelinedConnectionImpl::GetResponseInfo( 620 int pipeline_id) { 621 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 622 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); 623 return stream_info_map_.find(pipeline_id)->second.parser->GetResponseInfo(); 624 } 625 626 bool HttpPipelinedConnectionImpl::IsResponseBodyComplete( 627 int pipeline_id) const { 628 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 629 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); 630 return stream_info_map_.find(pipeline_id)->second.parser-> 631 IsResponseBodyComplete(); 632 } 633 634 bool HttpPipelinedConnectionImpl::CanFindEndOfResponse(int pipeline_id) const { 635 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 636 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); 637 return stream_info_map_.find(pipeline_id)->second.parser-> 638 CanFindEndOfResponse(); 639 } 640 641 bool HttpPipelinedConnectionImpl::IsConnectionReused(int pipeline_id) const { 642 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 643 if (pipeline_id > 1) { 644 return true; 645 } 646 ClientSocketHandle::SocketReuseType reuse_type = connection_->reuse_type(); 647 return connection_->is_reused() || 648 reuse_type == ClientSocketHandle::UNUSED_IDLE; 649 } 650 651 void HttpPipelinedConnectionImpl::SetConnectionReused(int pipeline_id) { 652 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 653 connection_->set_is_reused(true); 654 } 655 656 int64 HttpPipelinedConnectionImpl::GetTotalReceivedBytes( 657 int pipeline_id) const { 658 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 659 CHECK(stream_info_map_.find(pipeline_id)->second.parser.get()); 660 return stream_info_map_.find(pipeline_id)->second.parser->received_bytes(); 661 } 662 663 bool HttpPipelinedConnectionImpl::GetLoadTimingInfo( 664 int pipeline_id, LoadTimingInfo* load_timing_info) const { 665 return connection_->GetLoadTimingInfo(IsConnectionReused(pipeline_id), 666 load_timing_info); 667 } 668 669 void HttpPipelinedConnectionImpl::GetSSLInfo(int pipeline_id, 670 SSLInfo* ssl_info) { 671 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 672 CHECK(stream_info_map_[pipeline_id].parser.get()); 673 stream_info_map_[pipeline_id].parser->GetSSLInfo(ssl_info); 674 } 675 676 void HttpPipelinedConnectionImpl::GetSSLCertRequestInfo( 677 int pipeline_id, 678 SSLCertRequestInfo* cert_request_info) { 679 CHECK(ContainsKey(stream_info_map_, pipeline_id)); 680 CHECK(stream_info_map_[pipeline_id].parser.get()); 681 stream_info_map_[pipeline_id].parser->GetSSLCertRequestInfo( 682 cert_request_info); 683 } 684 685 void HttpPipelinedConnectionImpl::Drain(HttpPipelinedStream* stream, 686 HttpNetworkSession* session) { 687 HttpResponseHeaders* headers = stream->GetResponseInfo()->headers.get(); 688 if (!stream->CanFindEndOfResponse() || headers->IsChunkEncoded() || 689 !usable_) { 690 // TODO(simonjam): Drain chunk-encoded responses if they're relatively 691 // common. 692 stream->Close(true); 693 delete stream; 694 return; 695 } 696 HttpResponseBodyDrainer* drainer = new HttpResponseBodyDrainer(stream); 697 drainer->StartWithSize(session, headers->GetContentLength()); 698 // |drainer| will delete itself when done. 699 } 700 701 void HttpPipelinedConnectionImpl::CheckHeadersForPipelineCompatibility( 702 int pipeline_id, 703 int result) { 704 if (result < OK) { 705 switch (result) { 706 // TODO(simonjam): Ignoring specific errors like this may not work. 707 // Collect metrics to see if this code is useful. 708 case ERR_ABORTED: 709 case ERR_INTERNET_DISCONNECTED: 710 case ERR_NETWORK_CHANGED: 711 // These errors are no fault of the server. 712 break; 713 714 default: 715 ReportPipelineFeedback(pipeline_id, PIPELINE_SOCKET_ERROR); 716 break; 717 } 718 return; 719 } 720 HttpResponseInfo* info = GetResponseInfo(pipeline_id); 721 const HttpVersion required_version(1, 1); 722 if (info->headers->GetParsedHttpVersion() < required_version) { 723 ReportPipelineFeedback(pipeline_id, OLD_HTTP_VERSION); 724 return; 725 } 726 if (!info->headers->IsKeepAlive() || !CanFindEndOfResponse(pipeline_id)) { 727 usable_ = false; 728 ReportPipelineFeedback(pipeline_id, MUST_CLOSE_CONNECTION); 729 return; 730 } 731 if (info->headers->HasHeader( 732 HttpAuth::GetChallengeHeaderName(HttpAuth::AUTH_SERVER))) { 733 ReportPipelineFeedback(pipeline_id, AUTHENTICATION_REQUIRED); 734 return; 735 } 736 ReportPipelineFeedback(pipeline_id, OK); 737 } 738 739 void HttpPipelinedConnectionImpl::ReportPipelineFeedback(int pipeline_id, 740 Feedback feedback) { 741 std::string feedback_str; 742 switch (feedback) { 743 case OK: 744 feedback_str = "OK"; 745 break; 746 747 case PIPELINE_SOCKET_ERROR: 748 feedback_str = "PIPELINE_SOCKET_ERROR"; 749 break; 750 751 case OLD_HTTP_VERSION: 752 feedback_str = "OLD_HTTP_VERSION"; 753 break; 754 755 case MUST_CLOSE_CONNECTION: 756 feedback_str = "MUST_CLOSE_CONNECTION"; 757 break; 758 759 case AUTHENTICATION_REQUIRED: 760 feedback_str = "AUTHENTICATION_REQUIRED"; 761 break; 762 763 default: 764 NOTREACHED(); 765 feedback_str = "UNKNOWN"; 766 break; 767 } 768 net_log_.AddEvent( 769 NetLog::TYPE_HTTP_PIPELINED_CONNECTION_RECEIVED_HEADERS, 770 base::Bind(&NetLogReceivedHeadersCallback, 771 stream_info_map_[pipeline_id].source, &feedback_str)); 772 delegate_->OnPipelineFeedback(this, feedback); 773 } 774 775 void HttpPipelinedConnectionImpl::QueueUserCallback( 776 int pipeline_id, const CompletionCallback& callback, int rv, 777 const tracked_objects::Location& from_here) { 778 CHECK(stream_info_map_[pipeline_id].pending_user_callback.is_null()); 779 stream_info_map_[pipeline_id].pending_user_callback = callback; 780 base::MessageLoop::current()->PostTask( 781 from_here, 782 base::Bind(&HttpPipelinedConnectionImpl::FireUserCallback, 783 weak_factory_.GetWeakPtr(), pipeline_id, rv)); 784 } 785 786 void HttpPipelinedConnectionImpl::FireUserCallback(int pipeline_id, 787 int result) { 788 if (ContainsKey(stream_info_map_, pipeline_id)) { 789 CHECK(!stream_info_map_[pipeline_id].pending_user_callback.is_null()); 790 CompletionCallback callback = 791 stream_info_map_[pipeline_id].pending_user_callback; 792 stream_info_map_[pipeline_id].pending_user_callback.Reset(); 793 callback.Run(result); 794 } 795 } 796 797 int HttpPipelinedConnectionImpl::depth() const { 798 return stream_info_map_.size(); 799 } 800 801 bool HttpPipelinedConnectionImpl::usable() const { 802 return usable_; 803 } 804 805 bool HttpPipelinedConnectionImpl::active() const { 806 return active_; 807 } 808 809 const SSLConfig& HttpPipelinedConnectionImpl::used_ssl_config() const { 810 return used_ssl_config_; 811 } 812 813 const ProxyInfo& HttpPipelinedConnectionImpl::used_proxy_info() const { 814 return used_proxy_info_; 815 } 816 817 const BoundNetLog& HttpPipelinedConnectionImpl::net_log() const { 818 return net_log_; 819 } 820 821 bool HttpPipelinedConnectionImpl::was_npn_negotiated() const { 822 return was_npn_negotiated_; 823 } 824 825 NextProto HttpPipelinedConnectionImpl::protocol_negotiated() 826 const { 827 return protocol_negotiated_; 828 } 829 830 HttpPipelinedConnectionImpl::PendingSendRequest::PendingSendRequest() 831 : pipeline_id(0), 832 response(NULL) { 833 } 834 835 HttpPipelinedConnectionImpl::PendingSendRequest::~PendingSendRequest() { 836 } 837 838 HttpPipelinedConnectionImpl::StreamInfo::StreamInfo() 839 : state(STREAM_CREATED) { 840 } 841 842 HttpPipelinedConnectionImpl::StreamInfo::~StreamInfo() { 843 } 844 845 } // namespace net 846