1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/quic_http_stream.h" 6 7 #include "base/callback_helpers.h" 8 #include "base/strings/stringprintf.h" 9 #include "net/base/io_buffer.h" 10 #include "net/base/net_errors.h" 11 #include "net/http/http_response_headers.h" 12 #include "net/http/http_util.h" 13 #include "net/quic/quic_client_session.h" 14 #include "net/quic/quic_http_utils.h" 15 #include "net/quic/quic_reliable_client_stream.h" 16 #include "net/quic/quic_utils.h" 17 #include "net/socket/next_proto.h" 18 #include "net/spdy/spdy_frame_builder.h" 19 #include "net/spdy/spdy_framer.h" 20 #include "net/spdy/spdy_http_utils.h" 21 #include "net/ssl/ssl_info.h" 22 23 namespace net { 24 25 static const size_t kHeaderBufInitialSize = 4096; 26 27 QuicHttpStream::QuicHttpStream(const base::WeakPtr<QuicClientSession>& session) 28 : next_state_(STATE_NONE), 29 session_(session), 30 session_error_(OK), 31 was_handshake_confirmed_(session->IsCryptoHandshakeConfirmed()), 32 stream_(NULL), 33 request_info_(NULL), 34 request_body_stream_(NULL), 35 priority_(MINIMUM_PRIORITY), 36 response_info_(NULL), 37 response_status_(OK), 38 response_headers_received_(false), 39 read_buf_(new GrowableIOBuffer()), 40 user_buffer_len_(0), 41 weak_factory_(this) { 42 DCHECK(session_); 43 session_->AddObserver(this); 44 } 45 46 QuicHttpStream::~QuicHttpStream() { 47 Close(false); 48 if (session_) 49 session_->RemoveObserver(this); 50 } 51 52 int QuicHttpStream::InitializeStream(const HttpRequestInfo* request_info, 53 RequestPriority priority, 54 const BoundNetLog& stream_net_log, 55 const CompletionCallback& callback) { 56 DCHECK(!stream_); 57 if (!session_) 58 return was_handshake_confirmed_ ? ERR_CONNECTION_CLOSED : 59 ERR_QUIC_HANDSHAKE_FAILED; 60 61 if (request_info->url.SchemeIsSecure()) { 62 SSLInfo ssl_info; 63 if (!session_->GetSSLInfo(&ssl_info) || !ssl_info.cert) { 64 return ERR_REQUEST_FOR_SECURE_RESOURCE_OVER_INSECURE_QUIC; 65 } 66 } 67 68 stream_net_log_ = stream_net_log; 69 request_info_ = request_info; 70 priority_ = priority; 71 72 int rv = stream_request_.StartRequest( 73 session_, &stream_, base::Bind(&QuicHttpStream::OnStreamReady, 74 weak_factory_.GetWeakPtr())); 75 if (rv == ERR_IO_PENDING) { 76 callback_ = callback; 77 } else if (rv == OK) { 78 stream_->SetDelegate(this); 79 } else if (!was_handshake_confirmed_) { 80 rv = ERR_QUIC_HANDSHAKE_FAILED; 81 } 82 83 return rv; 84 } 85 86 void QuicHttpStream::OnStreamReady(int rv) { 87 DCHECK(rv == OK || !stream_); 88 if (rv == OK) { 89 stream_->SetDelegate(this); 90 } else if (!was_handshake_confirmed_) { 91 rv = ERR_QUIC_HANDSHAKE_FAILED; 92 } 93 94 ResetAndReturn(&callback_).Run(rv); 95 } 96 97 int QuicHttpStream::SendRequest(const HttpRequestHeaders& request_headers, 98 HttpResponseInfo* response, 99 const CompletionCallback& callback) { 100 CHECK(stream_); 101 CHECK(!request_body_stream_); 102 CHECK(!response_info_); 103 CHECK(!callback.is_null()); 104 CHECK(response); 105 106 QuicPriority priority = ConvertRequestPriorityToQuicPriority(priority_); 107 stream_->set_priority(priority); 108 // Store the serialized request headers. 109 CreateSpdyHeadersFromHttpRequest(*request_info_, request_headers, 110 &request_headers_, SPDY3, /*direct=*/true); 111 112 // Store the request body. 113 request_body_stream_ = request_info_->upload_data_stream; 114 if (request_body_stream_) { 115 // TODO(rch): Can we be more precise about when to allocate 116 // raw_request_body_buf_. Removed the following check. DoReadRequestBody() 117 // was being called even if we didn't yet allocate raw_request_body_buf_. 118 // && (request_body_stream_->size() || 119 // request_body_stream_->is_chunked())) 120 // 121 // Use kMaxPacketSize as the buffer size, since the request 122 // body data is written with this size at a time. 123 // TODO(rch): use a smarter value since we can't write an entire 124 // packet due to overhead. 125 raw_request_body_buf_ = new IOBufferWithSize(kMaxPacketSize); 126 // The request body buffer is empty at first. 127 request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_.get(), 0); 128 } 129 130 // Store the response info. 131 response_info_ = response; 132 133 next_state_ = STATE_SEND_HEADERS; 134 int rv = DoLoop(OK); 135 if (rv == ERR_IO_PENDING) 136 callback_ = callback; 137 138 return rv > 0 ? OK : rv; 139 } 140 141 UploadProgress QuicHttpStream::GetUploadProgress() const { 142 if (!request_body_stream_) 143 return UploadProgress(); 144 145 return UploadProgress(request_body_stream_->position(), 146 request_body_stream_->size()); 147 } 148 149 int QuicHttpStream::ReadResponseHeaders(const CompletionCallback& callback) { 150 CHECK(!callback.is_null()); 151 152 if (stream_ == NULL) 153 return response_status_; 154 155 // Check if we already have the response headers. If so, return synchronously. 156 if (response_headers_received_) 157 return OK; 158 159 // Still waiting for the response, return IO_PENDING. 160 CHECK(callback_.is_null()); 161 callback_ = callback; 162 return ERR_IO_PENDING; 163 } 164 165 const HttpResponseInfo* QuicHttpStream::GetResponseInfo() const { 166 return response_info_; 167 } 168 169 int QuicHttpStream::ReadResponseBody( 170 IOBuffer* buf, int buf_len, const CompletionCallback& callback) { 171 CHECK(buf); 172 CHECK(buf_len); 173 CHECK(!callback.is_null()); 174 175 // If we have data buffered, complete the IO immediately. 176 if (!response_body_.empty()) { 177 int bytes_read = 0; 178 while (!response_body_.empty() && buf_len > 0) { 179 scoped_refptr<IOBufferWithSize> data = response_body_.front(); 180 const int bytes_to_copy = std::min(buf_len, data->size()); 181 memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy); 182 buf_len -= bytes_to_copy; 183 if (bytes_to_copy == data->size()) { 184 response_body_.pop_front(); 185 } else { 186 const int bytes_remaining = data->size() - bytes_to_copy; 187 IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining); 188 memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]), 189 bytes_remaining); 190 response_body_.pop_front(); 191 response_body_.push_front(make_scoped_refptr(new_buffer)); 192 } 193 bytes_read += bytes_to_copy; 194 } 195 return bytes_read; 196 } 197 198 if (!stream_) { 199 // If the stream is already closed, there is no body to read. 200 return response_status_; 201 } 202 203 CHECK(callback_.is_null()); 204 CHECK(!user_buffer_.get()); 205 CHECK_EQ(0, user_buffer_len_); 206 207 callback_ = callback; 208 user_buffer_ = buf; 209 user_buffer_len_ = buf_len; 210 return ERR_IO_PENDING; 211 } 212 213 void QuicHttpStream::Close(bool not_reusable) { 214 // Note: the not_reusable flag has no meaning for SPDY streams. 215 if (stream_) { 216 stream_->SetDelegate(NULL); 217 stream_->Reset(QUIC_STREAM_CANCELLED); 218 stream_ = NULL; 219 } 220 } 221 222 HttpStream* QuicHttpStream::RenewStreamForAuth() { 223 return NULL; 224 } 225 226 bool QuicHttpStream::IsResponseBodyComplete() const { 227 return next_state_ == STATE_OPEN && !stream_; 228 } 229 230 bool QuicHttpStream::CanFindEndOfResponse() const { 231 return true; 232 } 233 234 bool QuicHttpStream::IsConnectionReused() const { 235 // TODO(rch): do something smarter here. 236 return stream_ && stream_->id() > 1; 237 } 238 239 void QuicHttpStream::SetConnectionReused() { 240 // QUIC doesn't need an indicator here. 241 } 242 243 bool QuicHttpStream::IsConnectionReusable() const { 244 // QUIC streams aren't considered reusable. 245 return false; 246 } 247 248 int64 QuicHttpStream::GetTotalReceivedBytes() const { 249 // TODO(eustas): Implement. 250 return 0; 251 } 252 253 bool QuicHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const { 254 // TODO(mmenke): Figure out what to do here. 255 return true; 256 } 257 258 void QuicHttpStream::GetSSLInfo(SSLInfo* ssl_info) { 259 DCHECK(stream_); 260 stream_->GetSSLInfo(ssl_info); 261 } 262 263 void QuicHttpStream::GetSSLCertRequestInfo( 264 SSLCertRequestInfo* cert_request_info) { 265 DCHECK(stream_); 266 NOTIMPLEMENTED(); 267 } 268 269 bool QuicHttpStream::IsSpdyHttpStream() const { 270 return false; 271 } 272 273 void QuicHttpStream::Drain(HttpNetworkSession* session) { 274 Close(false); 275 delete this; 276 } 277 278 void QuicHttpStream::SetPriority(RequestPriority priority) { 279 priority_ = priority; 280 } 281 282 int QuicHttpStream::OnDataReceived(const char* data, int length) { 283 DCHECK_NE(0, length); 284 // Are we still reading the response headers. 285 if (!response_headers_received_) { 286 // Grow the read buffer if necessary. 287 if (read_buf_->RemainingCapacity() < length) { 288 size_t additional_capacity = length - read_buf_->RemainingCapacity(); 289 if (additional_capacity < kHeaderBufInitialSize) 290 additional_capacity = kHeaderBufInitialSize; 291 read_buf_->SetCapacity(read_buf_->capacity() + additional_capacity); 292 } 293 memcpy(read_buf_->data(), data, length); 294 read_buf_->set_offset(read_buf_->offset() + length); 295 int rv = ParseResponseHeaders(); 296 if (rv != ERR_IO_PENDING && !callback_.is_null()) { 297 DoCallback(rv); 298 } 299 return OK; 300 } 301 302 if (callback_.is_null()) { 303 BufferResponseBody(data, length); 304 return OK; 305 } 306 307 if (length <= user_buffer_len_) { 308 memcpy(user_buffer_->data(), data, length); 309 } else { 310 memcpy(user_buffer_->data(), data, user_buffer_len_); 311 int delta = length - user_buffer_len_; 312 BufferResponseBody(data + user_buffer_len_, delta); 313 length = user_buffer_len_; 314 } 315 316 user_buffer_ = NULL; 317 user_buffer_len_ = 0; 318 DoCallback(length); 319 return OK; 320 } 321 322 void QuicHttpStream::OnClose(QuicErrorCode error) { 323 if (error != QUIC_NO_ERROR) { 324 response_status_ = was_handshake_confirmed_ ? 325 ERR_QUIC_PROTOCOL_ERROR : ERR_QUIC_HANDSHAKE_FAILED; 326 } else if (!response_headers_received_) { 327 response_status_ = ERR_ABORTED; 328 } 329 330 stream_ = NULL; 331 if (!callback_.is_null()) 332 DoCallback(response_status_); 333 } 334 335 void QuicHttpStream::OnError(int error) { 336 stream_ = NULL; 337 response_status_ = was_handshake_confirmed_ ? 338 error : ERR_QUIC_HANDSHAKE_FAILED; 339 if (!callback_.is_null()) 340 DoCallback(response_status_); 341 } 342 343 bool QuicHttpStream::HasSendHeadersComplete() { 344 return next_state_ > STATE_SEND_HEADERS_COMPLETE; 345 } 346 347 void QuicHttpStream::OnCryptoHandshakeConfirmed() { 348 was_handshake_confirmed_ = true; 349 } 350 351 void QuicHttpStream::OnSessionClosed(int error) { 352 session_error_ = error; 353 session_.reset(); 354 } 355 356 void QuicHttpStream::OnIOComplete(int rv) { 357 rv = DoLoop(rv); 358 359 if (rv != ERR_IO_PENDING && !callback_.is_null()) { 360 DoCallback(rv); 361 } 362 } 363 364 void QuicHttpStream::DoCallback(int rv) { 365 CHECK_NE(rv, ERR_IO_PENDING); 366 CHECK(!callback_.is_null()); 367 368 // The client callback can do anything, including destroying this class, 369 // so any pending callback must be issued after everything else is done. 370 base::ResetAndReturn(&callback_).Run(rv); 371 } 372 373 int QuicHttpStream::DoLoop(int rv) { 374 do { 375 State state = next_state_; 376 next_state_ = STATE_NONE; 377 switch (state) { 378 case STATE_SEND_HEADERS: 379 CHECK_EQ(OK, rv); 380 rv = DoSendHeaders(); 381 break; 382 case STATE_SEND_HEADERS_COMPLETE: 383 rv = DoSendHeadersComplete(rv); 384 break; 385 case STATE_READ_REQUEST_BODY: 386 CHECK_EQ(OK, rv); 387 rv = DoReadRequestBody(); 388 break; 389 case STATE_READ_REQUEST_BODY_COMPLETE: 390 rv = DoReadRequestBodyComplete(rv); 391 break; 392 case STATE_SEND_BODY: 393 CHECK_EQ(OK, rv); 394 rv = DoSendBody(); 395 break; 396 case STATE_SEND_BODY_COMPLETE: 397 rv = DoSendBodyComplete(rv); 398 break; 399 case STATE_OPEN: 400 CHECK_EQ(OK, rv); 401 break; 402 default: 403 NOTREACHED() << "next_state_: " << next_state_; 404 break; 405 } 406 } while (next_state_ != STATE_NONE && next_state_ != STATE_OPEN && 407 rv != ERR_IO_PENDING); 408 409 return rv; 410 } 411 412 int QuicHttpStream::DoSendHeaders() { 413 if (!stream_) 414 return ERR_UNEXPECTED; 415 416 if (request_.empty() && !stream_->CanWrite( 417 base::Bind(&QuicHttpStream::OnIOComplete, 418 weak_factory_.GetWeakPtr()))) { 419 // Do not compress headers unless it is likely that they can be sent. 420 next_state_ = STATE_SEND_HEADERS; 421 return ERR_IO_PENDING; 422 } 423 request_ = stream_->compressor()->CompressHeadersWithPriority( 424 ConvertRequestPriorityToQuicPriority(priority_), request_headers_); 425 426 // Log the actual request with the URL Request's net log. 427 stream_net_log_.AddEvent( 428 NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS, 429 base::Bind(&SpdyHeaderBlockNetLogCallback, &request_headers_)); 430 // Also log to the QuicSession's net log. 431 stream_->net_log().AddEvent( 432 NetLog::TYPE_QUIC_HTTP_STREAM_SEND_REQUEST_HEADERS, 433 base::Bind(&SpdyHeaderBlockNetLogCallback, &request_headers_)); 434 request_headers_.clear(); 435 436 bool has_upload_data = request_body_stream_ != NULL; 437 438 next_state_ = STATE_SEND_HEADERS_COMPLETE; 439 return stream_->WriteStreamData( 440 request_, !has_upload_data, 441 base::Bind(&QuicHttpStream::OnIOComplete, weak_factory_.GetWeakPtr())); 442 } 443 444 int QuicHttpStream::DoSendHeadersComplete(int rv) { 445 if (rv < 0) 446 return rv; 447 448 next_state_ = request_body_stream_ ? 449 STATE_READ_REQUEST_BODY : STATE_OPEN; 450 451 return OK; 452 } 453 454 int QuicHttpStream::DoReadRequestBody() { 455 next_state_ = STATE_READ_REQUEST_BODY_COMPLETE; 456 return request_body_stream_->Read( 457 raw_request_body_buf_.get(), 458 raw_request_body_buf_->size(), 459 base::Bind(&QuicHttpStream::OnIOComplete, weak_factory_.GetWeakPtr())); 460 } 461 462 int QuicHttpStream::DoReadRequestBodyComplete(int rv) { 463 // |rv| is the result of read from the request body from the last call to 464 // DoSendBody(). 465 if (rv < 0) 466 return rv; 467 468 request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_.get(), rv); 469 if (rv == 0) { // Reached the end. 470 DCHECK(request_body_stream_->IsEOF()); 471 } 472 473 next_state_ = STATE_SEND_BODY; 474 return OK; 475 } 476 477 int QuicHttpStream::DoSendBody() { 478 if (!stream_) 479 return ERR_UNEXPECTED; 480 481 CHECK(request_body_stream_); 482 CHECK(request_body_buf_.get()); 483 const bool eof = request_body_stream_->IsEOF(); 484 int len = request_body_buf_->BytesRemaining(); 485 if (len > 0 || eof) { 486 next_state_ = STATE_SEND_BODY_COMPLETE; 487 base::StringPiece data(request_body_buf_->data(), len); 488 return stream_->WriteStreamData( 489 data, eof, 490 base::Bind(&QuicHttpStream::OnIOComplete, weak_factory_.GetWeakPtr())); 491 } 492 493 next_state_ = STATE_OPEN; 494 return OK; 495 } 496 497 int QuicHttpStream::DoSendBodyComplete(int rv) { 498 if (rv < 0) 499 return rv; 500 501 request_body_buf_->DidConsume(request_body_buf_->BytesRemaining()); 502 503 if (!request_body_stream_->IsEOF()) { 504 next_state_ = STATE_READ_REQUEST_BODY; 505 return OK; 506 } 507 508 next_state_ = STATE_OPEN; 509 return OK; 510 } 511 512 int QuicHttpStream::ParseResponseHeaders() { 513 size_t read_buf_len = static_cast<size_t>(read_buf_->offset()); 514 SpdyFramer framer(SPDY3); 515 SpdyHeaderBlock headers; 516 char* data = read_buf_->StartOfBuffer(); 517 size_t len = framer.ParseHeaderBlockInBuffer(data, read_buf_->offset(), 518 &headers); 519 520 if (len == 0) { 521 return ERR_IO_PENDING; 522 } 523 524 // Save the remaining received data. 525 size_t delta = read_buf_len - len; 526 if (delta > 0) { 527 BufferResponseBody(data + len, delta); 528 } 529 530 // The URLRequest logs these headers, so only log to the QuicSession's 531 // net log. 532 stream_->net_log().AddEvent( 533 NetLog::TYPE_QUIC_HTTP_STREAM_READ_RESPONSE_HEADERS, 534 base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); 535 536 if (!SpdyHeadersToHttpResponse(headers, SPDY3, response_info_)) { 537 DLOG(WARNING) << "Invalid headers"; 538 return ERR_QUIC_PROTOCOL_ERROR; 539 } 540 // Put the peer's IP address and port into the response. 541 IPEndPoint address = stream_->GetPeerAddress(); 542 response_info_->socket_address = HostPortPair::FromIPEndPoint(address); 543 response_info_->connection_info = 544 HttpResponseInfo::CONNECTION_INFO_QUIC1_SPDY3; 545 response_info_->vary_data 546 .Init(*request_info_, *response_info_->headers.get()); 547 response_info_->was_npn_negotiated = true; 548 response_info_->npn_negotiated_protocol = "quic/1+spdy/3"; 549 response_headers_received_ = true; 550 551 return OK; 552 } 553 554 void QuicHttpStream::BufferResponseBody(const char* data, int length) { 555 if (length == 0) 556 return; 557 IOBufferWithSize* io_buffer = new IOBufferWithSize(length); 558 memcpy(io_buffer->data(), data, length); 559 response_body_.push_back(make_scoped_refptr(io_buffer)); 560 } 561 562 } // namespace net 563