1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/quic_client_session.h" 6 7 #include "base/callback_helpers.h" 8 #include "base/message_loop/message_loop.h" 9 #include "base/metrics/histogram.h" 10 #include "base/metrics/sparse_histogram.h" 11 #include "base/stl_util.h" 12 #include "base/strings/string_number_conversions.h" 13 #include "base/values.h" 14 #include "net/base/io_buffer.h" 15 #include "net/base/net_errors.h" 16 #include "net/quic/quic_connection_helper.h" 17 #include "net/quic/quic_crypto_client_stream_factory.h" 18 #include "net/quic/quic_default_packet_writer.h" 19 #include "net/quic/quic_stream_factory.h" 20 #include "net/ssl/ssl_info.h" 21 #include "net/udp/datagram_client_socket.h" 22 23 namespace net { 24 25 namespace { 26 27 // Note: these values must be kept in sync with the corresponding values in: 28 // tools/metrics/histograms/histograms.xml 29 enum HandshakeState { 30 STATE_STARTED = 0, 31 STATE_ENCRYPTION_ESTABLISHED = 1, 32 STATE_HANDSHAKE_CONFIRMED = 2, 33 STATE_FAILED = 3, 34 NUM_HANDSHAKE_STATES = 4 35 }; 36 37 void RecordHandshakeState(HandshakeState state) { 38 UMA_HISTOGRAM_ENUMERATION("Net.QuicHandshakeState", state, 39 NUM_HANDSHAKE_STATES); 40 } 41 42 } // namespace 43 44 QuicClientSession::StreamRequest::StreamRequest() : stream_(NULL) {} 45 46 QuicClientSession::StreamRequest::~StreamRequest() { 47 CancelRequest(); 48 } 49 50 int QuicClientSession::StreamRequest::StartRequest( 51 const base::WeakPtr<QuicClientSession>& session, 52 QuicReliableClientStream** stream, 53 const CompletionCallback& callback) { 54 session_ = session; 55 stream_ = stream; 56 int rv = session_->TryCreateStream(this, stream_); 57 if (rv == ERR_IO_PENDING) { 58 callback_ = callback; 59 } 60 61 return rv; 62 } 63 64 void QuicClientSession::StreamRequest::CancelRequest() { 65 if (session_) 66 session_->CancelRequest(this); 67 session_.reset(); 68 callback_.Reset(); 69 } 70 71 void QuicClientSession::StreamRequest::OnRequestCompleteSuccess( 72 QuicReliableClientStream* stream) { 73 session_.reset(); 74 *stream_ = stream; 75 ResetAndReturn(&callback_).Run(OK); 76 } 77 78 void QuicClientSession::StreamRequest::OnRequestCompleteFailure(int rv) { 79 session_.reset(); 80 ResetAndReturn(&callback_).Run(rv); 81 } 82 83 QuicClientSession::QuicClientSession( 84 QuicConnection* connection, 85 scoped_ptr<DatagramClientSocket> socket, 86 scoped_ptr<QuicDefaultPacketWriter> writer, 87 QuicStreamFactory* stream_factory, 88 QuicCryptoClientStreamFactory* crypto_client_stream_factory, 89 const string& server_hostname, 90 const QuicConfig& config, 91 QuicCryptoClientConfig* crypto_config, 92 NetLog* net_log) 93 : QuicSession(connection, config), 94 require_confirmation_(false), 95 stream_factory_(stream_factory), 96 socket_(socket.Pass()), 97 writer_(writer.Pass()), 98 read_buffer_(new IOBufferWithSize(kMaxPacketSize)), 99 read_pending_(false), 100 num_total_streams_(0), 101 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_QUIC_SESSION)), 102 logger_(net_log_), 103 num_packets_read_(0), 104 weak_factory_(this) { 105 crypto_stream_.reset( 106 crypto_client_stream_factory ? 107 crypto_client_stream_factory->CreateQuicCryptoClientStream( 108 server_hostname, this, crypto_config) : 109 new QuicCryptoClientStream(server_hostname, this, crypto_config)); 110 111 connection->set_debug_visitor(&logger_); 112 // TODO(rch): pass in full host port proxy pair 113 net_log_.BeginEvent( 114 NetLog::TYPE_QUIC_SESSION, 115 NetLog::StringCallback("host", &server_hostname)); 116 } 117 118 QuicClientSession::~QuicClientSession() { 119 // The session must be closed before it is destroyed. 120 DCHECK(streams()->empty()); 121 CloseAllStreams(ERR_UNEXPECTED); 122 DCHECK(observers_.empty()); 123 CloseAllObservers(ERR_UNEXPECTED); 124 125 connection()->set_debug_visitor(NULL); 126 net_log_.EndEvent(NetLog::TYPE_QUIC_SESSION); 127 128 while (!stream_requests_.empty()) { 129 StreamRequest* request = stream_requests_.front(); 130 stream_requests_.pop_front(); 131 request->OnRequestCompleteFailure(ERR_ABORTED); 132 } 133 134 if (IsEncryptionEstablished()) 135 RecordHandshakeState(STATE_ENCRYPTION_ESTABLISHED); 136 if (IsCryptoHandshakeConfirmed()) 137 RecordHandshakeState(STATE_HANDSHAKE_CONFIRMED); 138 else 139 RecordHandshakeState(STATE_FAILED); 140 141 UMA_HISTOGRAM_COUNTS("Net.QuicNumSentClientHellos", 142 crypto_stream_->num_sent_client_hellos()); 143 if (IsCryptoHandshakeConfirmed()) { 144 UMA_HISTOGRAM_COUNTS("Net.QuicNumSentClientHellosCryptoHandshakeConfirmed", 145 crypto_stream_->num_sent_client_hellos()); 146 } 147 148 UMA_HISTOGRAM_COUNTS("Net.QuicSession.NumTotalStreams", num_total_streams_); 149 } 150 151 bool QuicClientSession::OnStreamFrames( 152 const std::vector<QuicStreamFrame>& frames) { 153 // Record total number of stream frames. 154 UMA_HISTOGRAM_COUNTS("Net.QuicNumStreamFramesInPacket", frames.size()); 155 156 // Record number of frames per stream in packet. 157 typedef std::map<QuicStreamId, size_t> FrameCounter; 158 FrameCounter frames_per_stream; 159 for (size_t i = 0; i < frames.size(); ++i) { 160 frames_per_stream[frames[i].stream_id]++; 161 } 162 for (FrameCounter::const_iterator it = frames_per_stream.begin(); 163 it != frames_per_stream.end(); ++it) { 164 UMA_HISTOGRAM_COUNTS("Net.QuicNumStreamFramesPerStreamInPacket", 165 it->second); 166 } 167 168 return QuicSession::OnStreamFrames(frames); 169 } 170 171 void QuicClientSession::AddObserver(Observer* observer) { 172 DCHECK(!ContainsKey(observers_, observer)); 173 observers_.insert(observer); 174 } 175 176 void QuicClientSession::RemoveObserver(Observer* observer) { 177 DCHECK(ContainsKey(observers_, observer)); 178 observers_.erase(observer); 179 } 180 181 int QuicClientSession::TryCreateStream(StreamRequest* request, 182 QuicReliableClientStream** stream) { 183 if (!crypto_stream_->encryption_established()) { 184 DLOG(DFATAL) << "Encryption not established."; 185 return ERR_CONNECTION_CLOSED; 186 } 187 188 if (goaway_received()) { 189 DVLOG(1) << "Going away."; 190 return ERR_CONNECTION_CLOSED; 191 } 192 193 if (!connection()->connected()) { 194 DVLOG(1) << "Already closed."; 195 return ERR_CONNECTION_CLOSED; 196 } 197 198 if (GetNumOpenStreams() < get_max_open_streams()) { 199 *stream = CreateOutgoingReliableStreamImpl(); 200 return OK; 201 } 202 203 stream_requests_.push_back(request); 204 return ERR_IO_PENDING; 205 } 206 207 void QuicClientSession::CancelRequest(StreamRequest* request) { 208 // Remove |request| from the queue while preserving the order of the 209 // other elements. 210 StreamRequestQueue::iterator it = 211 std::find(stream_requests_.begin(), stream_requests_.end(), request); 212 if (it != stream_requests_.end()) { 213 it = stream_requests_.erase(it); 214 } 215 } 216 217 QuicReliableClientStream* QuicClientSession::CreateOutgoingDataStream() { 218 if (!crypto_stream_->encryption_established()) { 219 DVLOG(1) << "Encryption not active so no outgoing stream created."; 220 return NULL; 221 } 222 if (GetNumOpenStreams() >= get_max_open_streams()) { 223 DVLOG(1) << "Failed to create a new outgoing stream. " 224 << "Already " << GetNumOpenStreams() << " open."; 225 return NULL; 226 } 227 if (goaway_received()) { 228 DVLOG(1) << "Failed to create a new outgoing stream. " 229 << "Already received goaway."; 230 return NULL; 231 } 232 233 return CreateOutgoingReliableStreamImpl(); 234 } 235 236 QuicReliableClientStream* 237 QuicClientSession::CreateOutgoingReliableStreamImpl() { 238 DCHECK(connection()->connected()); 239 QuicReliableClientStream* stream = 240 new QuicReliableClientStream(GetNextStreamId(), this, net_log_); 241 ActivateStream(stream); 242 ++num_total_streams_; 243 UMA_HISTOGRAM_COUNTS("Net.QuicSession.NumOpenStreams", GetNumOpenStreams()); 244 return stream; 245 } 246 247 QuicCryptoClientStream* QuicClientSession::GetCryptoStream() { 248 return crypto_stream_.get(); 249 }; 250 251 bool QuicClientSession::GetSSLInfo(SSLInfo* ssl_info) { 252 DCHECK(crypto_stream_.get()); 253 return crypto_stream_->GetSSLInfo(ssl_info); 254 } 255 256 int QuicClientSession::CryptoConnect(bool require_confirmation, 257 const CompletionCallback& callback) { 258 require_confirmation_ = require_confirmation; 259 RecordHandshakeState(STATE_STARTED); 260 if (!crypto_stream_->CryptoConnect()) { 261 // TODO(wtc): change crypto_stream_.CryptoConnect() to return a 262 // QuicErrorCode and map it to a net error code. 263 return ERR_CONNECTION_FAILED; 264 } 265 266 bool can_notify = require_confirmation_ ? 267 IsCryptoHandshakeConfirmed() : IsEncryptionEstablished(); 268 if (can_notify) { 269 return OK; 270 } 271 272 callback_ = callback; 273 return ERR_IO_PENDING; 274 } 275 276 int QuicClientSession::GetNumSentClientHellos() const { 277 return crypto_stream_->num_sent_client_hellos(); 278 } 279 280 QuicDataStream* QuicClientSession::CreateIncomingDataStream( 281 QuicStreamId id) { 282 DLOG(ERROR) << "Server push not supported"; 283 return NULL; 284 } 285 286 void QuicClientSession::CloseStream(QuicStreamId stream_id) { 287 QuicSession::CloseStream(stream_id); 288 OnClosedStream(); 289 } 290 291 void QuicClientSession::SendRstStream(QuicStreamId id, 292 QuicRstStreamErrorCode error) { 293 QuicSession::SendRstStream(id, error); 294 OnClosedStream(); 295 } 296 297 void QuicClientSession::OnClosedStream() { 298 if (GetNumOpenStreams() < get_max_open_streams() && 299 !stream_requests_.empty() && 300 crypto_stream_->encryption_established() && 301 !goaway_received() && 302 connection()->connected()) { 303 StreamRequest* request = stream_requests_.front(); 304 stream_requests_.pop_front(); 305 request->OnRequestCompleteSuccess(CreateOutgoingReliableStreamImpl()); 306 } 307 308 if (GetNumOpenStreams() == 0) { 309 stream_factory_->OnIdleSession(this); 310 } 311 } 312 313 void QuicClientSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { 314 if (!callback_.is_null() && 315 (!require_confirmation_ || event == HANDSHAKE_CONFIRMED)) { 316 // TODO(rtenneti): Currently for all CryptoHandshakeEvent events, callback_ 317 // could be called because there are no error events in CryptoHandshakeEvent 318 // enum. If error events are added to CryptoHandshakeEvent, then the 319 // following code needs to changed. 320 base::ResetAndReturn(&callback_).Run(OK); 321 } 322 if (event == HANDSHAKE_CONFIRMED) { 323 ObserverSet::iterator it = observers_.begin(); 324 while (it != observers_.end()) { 325 Observer* observer = *it; 326 ++it; 327 observer->OnCryptoHandshakeConfirmed(); 328 } 329 } 330 QuicSession::OnCryptoHandshakeEvent(event); 331 } 332 333 void QuicClientSession::OnCryptoHandshakeMessageSent( 334 const CryptoHandshakeMessage& message) { 335 logger_.OnCryptoHandshakeMessageSent(message); 336 } 337 338 void QuicClientSession::OnCryptoHandshakeMessageReceived( 339 const CryptoHandshakeMessage& message) { 340 logger_.OnCryptoHandshakeMessageReceived(message); 341 } 342 343 void QuicClientSession::OnConnectionClosed(QuicErrorCode error, 344 bool from_peer) { 345 DCHECK(!connection()->connected()); 346 logger_.OnConnectionClosed(error, from_peer); 347 if (from_peer) { 348 UMA_HISTOGRAM_SPARSE_SLOWLY( 349 "Net.QuicSession.ConnectionCloseErrorCodeServer", error); 350 } else { 351 UMA_HISTOGRAM_SPARSE_SLOWLY( 352 "Net.QuicSession.ConnectionCloseErrorCodeClient", error); 353 } 354 355 if (error == QUIC_CONNECTION_TIMED_OUT) { 356 UMA_HISTOGRAM_COUNTS( 357 "Net.QuicSession.ConnectionClose.NumOpenStreams.TimedOut", 358 GetNumOpenStreams()); 359 if (!IsCryptoHandshakeConfirmed()) { 360 // If there have been any streams created, they were 0-RTT speculative 361 // requests that have not be serviced. 362 UMA_HISTOGRAM_COUNTS( 363 "Net.QuicSession.ConnectionClose.NumTotalStreams.HandshakeTimedOut", 364 num_total_streams_); 365 } 366 } 367 368 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.QuicVersion", 369 connection()->version()); 370 NotifyFactoryOfSessionGoingAway(); 371 if (!callback_.is_null()) { 372 base::ResetAndReturn(&callback_).Run(ERR_QUIC_PROTOCOL_ERROR); 373 } 374 socket_->Close(); 375 QuicSession::OnConnectionClosed(error, from_peer); 376 DCHECK(streams()->empty()); 377 CloseAllStreams(ERR_UNEXPECTED); 378 CloseAllObservers(ERR_UNEXPECTED); 379 NotifyFactoryOfSessionClosedLater(); 380 } 381 382 void QuicClientSession::OnSuccessfulVersionNegotiation( 383 const QuicVersion& version) { 384 logger_.OnSuccessfulVersionNegotiation(version); 385 QuicSession::OnSuccessfulVersionNegotiation(version); 386 } 387 388 void QuicClientSession::StartReading() { 389 if (read_pending_) { 390 return; 391 } 392 read_pending_ = true; 393 int rv = socket_->Read(read_buffer_.get(), 394 read_buffer_->size(), 395 base::Bind(&QuicClientSession::OnReadComplete, 396 weak_factory_.GetWeakPtr())); 397 if (rv == ERR_IO_PENDING) { 398 num_packets_read_ = 0; 399 return; 400 } 401 402 if (++num_packets_read_ > 32) { 403 num_packets_read_ = 0; 404 // Data was read, process it. 405 // Schedule the work through the message loop to avoid recursive 406 // callbacks. 407 base::MessageLoop::current()->PostTask( 408 FROM_HERE, 409 base::Bind(&QuicClientSession::OnReadComplete, 410 weak_factory_.GetWeakPtr(), rv)); 411 } else { 412 OnReadComplete(rv); 413 } 414 } 415 416 void QuicClientSession::CloseSessionOnError(int error) { 417 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.CloseSessionOnError", -error); 418 CloseSessionOnErrorInner(error, QUIC_INTERNAL_ERROR); 419 NotifyFactoryOfSessionClosed(); 420 } 421 422 void QuicClientSession::CloseSessionOnErrorInner(int net_error, 423 QuicErrorCode quic_error) { 424 if (!callback_.is_null()) { 425 base::ResetAndReturn(&callback_).Run(net_error); 426 } 427 CloseAllStreams(net_error); 428 CloseAllObservers(net_error); 429 net_log_.AddEvent( 430 NetLog::TYPE_QUIC_SESSION_CLOSE_ON_ERROR, 431 NetLog::IntegerCallback("net_error", net_error)); 432 433 connection()->CloseConnection(quic_error, false); 434 DCHECK(!connection()->connected()); 435 } 436 437 void QuicClientSession::CloseAllStreams(int net_error) { 438 while (!streams()->empty()) { 439 ReliableQuicStream* stream = streams()->begin()->second; 440 QuicStreamId id = stream->id(); 441 static_cast<QuicReliableClientStream*>(stream)->OnError(net_error); 442 CloseStream(id); 443 } 444 } 445 446 void QuicClientSession::CloseAllObservers(int net_error) { 447 while (!observers_.empty()) { 448 Observer* observer = *observers_.begin(); 449 observers_.erase(observer); 450 observer->OnSessionClosed(net_error); 451 } 452 } 453 454 base::Value* QuicClientSession::GetInfoAsValue(const HostPortPair& pair) const { 455 base::DictionaryValue* dict = new base::DictionaryValue(); 456 dict->SetString("host_port_pair", pair.ToString()); 457 dict->SetString("version", QuicVersionToString(connection()->version())); 458 dict->SetInteger("open_streams", GetNumOpenStreams()); 459 dict->SetInteger("total_streams", num_total_streams_); 460 dict->SetString("peer_address", peer_address().ToString()); 461 dict->SetString("guid", base::Uint64ToString(guid())); 462 dict->SetBoolean("connected", connection()->connected()); 463 return dict; 464 } 465 466 base::WeakPtr<QuicClientSession> QuicClientSession::GetWeakPtr() { 467 return weak_factory_.GetWeakPtr(); 468 } 469 470 void QuicClientSession::OnReadComplete(int result) { 471 read_pending_ = false; 472 if (result == 0) 473 result = ERR_CONNECTION_CLOSED; 474 475 if (result < 0) { 476 DVLOG(1) << "Closing session on read error: " << result; 477 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.ReadError", -result); 478 NotifyFactoryOfSessionGoingAway(); 479 CloseSessionOnErrorInner(result, QUIC_PACKET_READ_ERROR); 480 NotifyFactoryOfSessionClosedLater(); 481 return; 482 } 483 484 scoped_refptr<IOBufferWithSize> buffer(read_buffer_); 485 read_buffer_ = new IOBufferWithSize(kMaxPacketSize); 486 QuicEncryptedPacket packet(buffer->data(), result); 487 IPEndPoint local_address; 488 IPEndPoint peer_address; 489 socket_->GetLocalAddress(&local_address); 490 socket_->GetPeerAddress(&peer_address); 491 // ProcessUdpPacket might result in |this| being deleted, so we 492 // use a weak pointer to be safe. 493 connection()->ProcessUdpPacket(local_address, peer_address, packet); 494 if (!connection()->connected()) { 495 stream_factory_->OnSessionClosed(this); 496 return; 497 } 498 StartReading(); 499 } 500 501 void QuicClientSession::NotifyFactoryOfSessionGoingAway() { 502 if (stream_factory_) 503 stream_factory_->OnSessionGoingAway(this); 504 } 505 506 void QuicClientSession::NotifyFactoryOfSessionClosedLater() { 507 DCHECK_EQ(0u, GetNumOpenStreams()); 508 DCHECK(!connection()->connected()); 509 base::MessageLoop::current()->PostTask( 510 FROM_HERE, 511 base::Bind(&QuicClientSession::NotifyFactoryOfSessionClosed, 512 weak_factory_.GetWeakPtr())); 513 } 514 515 void QuicClientSession::NotifyFactoryOfSessionClosed() { 516 DCHECK_EQ(0u, GetNumOpenStreams()); 517 // Will delete |this|. 518 if (stream_factory_) 519 stream_factory_->OnSessionClosed(this); 520 } 521 522 } // namespace net 523