Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/quic_client_session.h"
      6 
      7 #include "base/callback_helpers.h"
      8 #include "base/message_loop/message_loop.h"
      9 #include "base/metrics/histogram.h"
     10 #include "base/metrics/sparse_histogram.h"
     11 #include "base/stl_util.h"
     12 #include "base/strings/string_number_conversions.h"
     13 #include "base/values.h"
     14 #include "net/base/io_buffer.h"
     15 #include "net/base/net_errors.h"
     16 #include "net/quic/quic_connection_helper.h"
     17 #include "net/quic/quic_crypto_client_stream_factory.h"
     18 #include "net/quic/quic_default_packet_writer.h"
     19 #include "net/quic/quic_stream_factory.h"
     20 #include "net/ssl/ssl_info.h"
     21 #include "net/udp/datagram_client_socket.h"
     22 
     23 namespace net {
     24 
     25 namespace {
     26 
     27 // Note: these values must be kept in sync with the corresponding values in:
     28 // tools/metrics/histograms/histograms.xml
     29 enum HandshakeState {
     30   STATE_STARTED = 0,
     31   STATE_ENCRYPTION_ESTABLISHED = 1,
     32   STATE_HANDSHAKE_CONFIRMED = 2,
     33   STATE_FAILED = 3,
     34   NUM_HANDSHAKE_STATES = 4
     35 };
     36 
     37 void RecordHandshakeState(HandshakeState state) {
     38   UMA_HISTOGRAM_ENUMERATION("Net.QuicHandshakeState", state,
     39                             NUM_HANDSHAKE_STATES);
     40 }
     41 
     42 }  // namespace
     43 
     44 QuicClientSession::StreamRequest::StreamRequest() : stream_(NULL) {}
     45 
     46 QuicClientSession::StreamRequest::~StreamRequest() {
     47   CancelRequest();
     48 }
     49 
     50 int QuicClientSession::StreamRequest::StartRequest(
     51     const base::WeakPtr<QuicClientSession>& session,
     52     QuicReliableClientStream** stream,
     53     const CompletionCallback& callback) {
     54   session_ = session;
     55   stream_ = stream;
     56   int rv = session_->TryCreateStream(this, stream_);
     57   if (rv == ERR_IO_PENDING) {
     58     callback_ = callback;
     59   }
     60 
     61   return rv;
     62 }
     63 
     64 void QuicClientSession::StreamRequest::CancelRequest() {
     65   if (session_)
     66     session_->CancelRequest(this);
     67   session_.reset();
     68   callback_.Reset();
     69 }
     70 
     71 void QuicClientSession::StreamRequest::OnRequestCompleteSuccess(
     72     QuicReliableClientStream* stream) {
     73   session_.reset();
     74   *stream_ = stream;
     75   ResetAndReturn(&callback_).Run(OK);
     76 }
     77 
     78 void QuicClientSession::StreamRequest::OnRequestCompleteFailure(int rv) {
     79   session_.reset();
     80   ResetAndReturn(&callback_).Run(rv);
     81 }
     82 
     83 QuicClientSession::QuicClientSession(
     84     QuicConnection* connection,
     85     scoped_ptr<DatagramClientSocket> socket,
     86     scoped_ptr<QuicDefaultPacketWriter> writer,
     87     QuicStreamFactory* stream_factory,
     88     QuicCryptoClientStreamFactory* crypto_client_stream_factory,
     89     const string& server_hostname,
     90     const QuicConfig& config,
     91     QuicCryptoClientConfig* crypto_config,
     92     NetLog* net_log)
     93     : QuicSession(connection, config),
     94       require_confirmation_(false),
     95       stream_factory_(stream_factory),
     96       socket_(socket.Pass()),
     97       writer_(writer.Pass()),
     98       read_buffer_(new IOBufferWithSize(kMaxPacketSize)),
     99       read_pending_(false),
    100       num_total_streams_(0),
    101       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_QUIC_SESSION)),
    102       logger_(net_log_),
    103       num_packets_read_(0),
    104       weak_factory_(this) {
    105   crypto_stream_.reset(
    106       crypto_client_stream_factory ?
    107           crypto_client_stream_factory->CreateQuicCryptoClientStream(
    108               server_hostname, this, crypto_config) :
    109           new QuicCryptoClientStream(server_hostname, this, crypto_config));
    110 
    111   connection->set_debug_visitor(&logger_);
    112   // TODO(rch): pass in full host port proxy pair
    113   net_log_.BeginEvent(
    114       NetLog::TYPE_QUIC_SESSION,
    115       NetLog::StringCallback("host", &server_hostname));
    116 }
    117 
    118 QuicClientSession::~QuicClientSession() {
    119   // The session must be closed before it is destroyed.
    120   DCHECK(streams()->empty());
    121   CloseAllStreams(ERR_UNEXPECTED);
    122   DCHECK(observers_.empty());
    123   CloseAllObservers(ERR_UNEXPECTED);
    124 
    125   connection()->set_debug_visitor(NULL);
    126   net_log_.EndEvent(NetLog::TYPE_QUIC_SESSION);
    127 
    128   while (!stream_requests_.empty()) {
    129     StreamRequest* request = stream_requests_.front();
    130     stream_requests_.pop_front();
    131     request->OnRequestCompleteFailure(ERR_ABORTED);
    132   }
    133 
    134   if (IsEncryptionEstablished())
    135     RecordHandshakeState(STATE_ENCRYPTION_ESTABLISHED);
    136   if (IsCryptoHandshakeConfirmed())
    137     RecordHandshakeState(STATE_HANDSHAKE_CONFIRMED);
    138   else
    139     RecordHandshakeState(STATE_FAILED);
    140 
    141   UMA_HISTOGRAM_COUNTS("Net.QuicNumSentClientHellos",
    142                        crypto_stream_->num_sent_client_hellos());
    143   if (IsCryptoHandshakeConfirmed()) {
    144     UMA_HISTOGRAM_COUNTS("Net.QuicNumSentClientHellosCryptoHandshakeConfirmed",
    145                          crypto_stream_->num_sent_client_hellos());
    146   }
    147 
    148   UMA_HISTOGRAM_COUNTS("Net.QuicSession.NumTotalStreams", num_total_streams_);
    149 }
    150 
    151 bool QuicClientSession::OnStreamFrames(
    152     const std::vector<QuicStreamFrame>& frames) {
    153   // Record total number of stream frames.
    154   UMA_HISTOGRAM_COUNTS("Net.QuicNumStreamFramesInPacket", frames.size());
    155 
    156   // Record number of frames per stream in packet.
    157   typedef std::map<QuicStreamId, size_t> FrameCounter;
    158   FrameCounter frames_per_stream;
    159   for (size_t i = 0; i < frames.size(); ++i) {
    160     frames_per_stream[frames[i].stream_id]++;
    161   }
    162   for (FrameCounter::const_iterator it = frames_per_stream.begin();
    163        it != frames_per_stream.end(); ++it) {
    164     UMA_HISTOGRAM_COUNTS("Net.QuicNumStreamFramesPerStreamInPacket",
    165                          it->second);
    166   }
    167 
    168   return QuicSession::OnStreamFrames(frames);
    169 }
    170 
    171 void QuicClientSession::AddObserver(Observer* observer) {
    172   DCHECK(!ContainsKey(observers_, observer));
    173   observers_.insert(observer);
    174 }
    175 
    176 void QuicClientSession::RemoveObserver(Observer* observer) {
    177   DCHECK(ContainsKey(observers_, observer));
    178   observers_.erase(observer);
    179 }
    180 
    181 int QuicClientSession::TryCreateStream(StreamRequest* request,
    182                                        QuicReliableClientStream** stream) {
    183   if (!crypto_stream_->encryption_established()) {
    184     DLOG(DFATAL) << "Encryption not established.";
    185     return ERR_CONNECTION_CLOSED;
    186   }
    187 
    188   if (goaway_received()) {
    189     DVLOG(1) << "Going away.";
    190     return ERR_CONNECTION_CLOSED;
    191   }
    192 
    193   if (!connection()->connected()) {
    194     DVLOG(1) << "Already closed.";
    195     return ERR_CONNECTION_CLOSED;
    196   }
    197 
    198   if (GetNumOpenStreams() < get_max_open_streams()) {
    199     *stream = CreateOutgoingReliableStreamImpl();
    200     return OK;
    201   }
    202 
    203   stream_requests_.push_back(request);
    204   return ERR_IO_PENDING;
    205 }
    206 
    207 void QuicClientSession::CancelRequest(StreamRequest* request) {
    208   // Remove |request| from the queue while preserving the order of the
    209   // other elements.
    210   StreamRequestQueue::iterator it =
    211       std::find(stream_requests_.begin(), stream_requests_.end(), request);
    212   if (it != stream_requests_.end()) {
    213     it = stream_requests_.erase(it);
    214   }
    215 }
    216 
    217 QuicReliableClientStream* QuicClientSession::CreateOutgoingDataStream() {
    218   if (!crypto_stream_->encryption_established()) {
    219     DVLOG(1) << "Encryption not active so no outgoing stream created.";
    220     return NULL;
    221   }
    222   if (GetNumOpenStreams() >= get_max_open_streams()) {
    223     DVLOG(1) << "Failed to create a new outgoing stream. "
    224                << "Already " << GetNumOpenStreams() << " open.";
    225     return NULL;
    226   }
    227   if (goaway_received()) {
    228     DVLOG(1) << "Failed to create a new outgoing stream. "
    229                << "Already received goaway.";
    230     return NULL;
    231   }
    232 
    233   return CreateOutgoingReliableStreamImpl();
    234 }
    235 
    236 QuicReliableClientStream*
    237 QuicClientSession::CreateOutgoingReliableStreamImpl() {
    238   DCHECK(connection()->connected());
    239   QuicReliableClientStream* stream =
    240       new QuicReliableClientStream(GetNextStreamId(), this, net_log_);
    241   ActivateStream(stream);
    242   ++num_total_streams_;
    243   UMA_HISTOGRAM_COUNTS("Net.QuicSession.NumOpenStreams", GetNumOpenStreams());
    244   return stream;
    245 }
    246 
    247 QuicCryptoClientStream* QuicClientSession::GetCryptoStream() {
    248   return crypto_stream_.get();
    249 };
    250 
    251 bool QuicClientSession::GetSSLInfo(SSLInfo* ssl_info) {
    252   DCHECK(crypto_stream_.get());
    253   return crypto_stream_->GetSSLInfo(ssl_info);
    254 }
    255 
    256 int QuicClientSession::CryptoConnect(bool require_confirmation,
    257                                      const CompletionCallback& callback) {
    258   require_confirmation_ = require_confirmation;
    259   RecordHandshakeState(STATE_STARTED);
    260   if (!crypto_stream_->CryptoConnect()) {
    261     // TODO(wtc): change crypto_stream_.CryptoConnect() to return a
    262     // QuicErrorCode and map it to a net error code.
    263     return ERR_CONNECTION_FAILED;
    264   }
    265 
    266   bool can_notify = require_confirmation_ ?
    267       IsCryptoHandshakeConfirmed() : IsEncryptionEstablished();
    268   if (can_notify) {
    269     return OK;
    270   }
    271 
    272   callback_ = callback;
    273   return ERR_IO_PENDING;
    274 }
    275 
    276 int QuicClientSession::GetNumSentClientHellos() const {
    277   return crypto_stream_->num_sent_client_hellos();
    278 }
    279 
    280 QuicDataStream* QuicClientSession::CreateIncomingDataStream(
    281     QuicStreamId id) {
    282   DLOG(ERROR) << "Server push not supported";
    283   return NULL;
    284 }
    285 
    286 void QuicClientSession::CloseStream(QuicStreamId stream_id) {
    287   QuicSession::CloseStream(stream_id);
    288   OnClosedStream();
    289 }
    290 
    291 void QuicClientSession::SendRstStream(QuicStreamId id,
    292                                       QuicRstStreamErrorCode error) {
    293   QuicSession::SendRstStream(id, error);
    294   OnClosedStream();
    295 }
    296 
    297 void QuicClientSession::OnClosedStream() {
    298   if (GetNumOpenStreams() < get_max_open_streams() &&
    299       !stream_requests_.empty() &&
    300       crypto_stream_->encryption_established() &&
    301       !goaway_received() &&
    302       connection()->connected()) {
    303     StreamRequest* request = stream_requests_.front();
    304     stream_requests_.pop_front();
    305     request->OnRequestCompleteSuccess(CreateOutgoingReliableStreamImpl());
    306   }
    307 
    308   if (GetNumOpenStreams() == 0) {
    309     stream_factory_->OnIdleSession(this);
    310   }
    311 }
    312 
    313 void QuicClientSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
    314   if (!callback_.is_null() &&
    315       (!require_confirmation_ || event == HANDSHAKE_CONFIRMED)) {
    316     // TODO(rtenneti): Currently for all CryptoHandshakeEvent events, callback_
    317     // could be called because there are no error events in CryptoHandshakeEvent
    318     // enum. If error events are added to CryptoHandshakeEvent, then the
    319     // following code needs to changed.
    320     base::ResetAndReturn(&callback_).Run(OK);
    321   }
    322   if (event == HANDSHAKE_CONFIRMED) {
    323     ObserverSet::iterator it = observers_.begin();
    324     while (it != observers_.end()) {
    325       Observer* observer = *it;
    326       ++it;
    327       observer->OnCryptoHandshakeConfirmed();
    328     }
    329   }
    330   QuicSession::OnCryptoHandshakeEvent(event);
    331 }
    332 
    333 void QuicClientSession::OnCryptoHandshakeMessageSent(
    334     const CryptoHandshakeMessage& message) {
    335   logger_.OnCryptoHandshakeMessageSent(message);
    336 }
    337 
    338 void QuicClientSession::OnCryptoHandshakeMessageReceived(
    339     const CryptoHandshakeMessage& message) {
    340   logger_.OnCryptoHandshakeMessageReceived(message);
    341 }
    342 
    343 void QuicClientSession::OnConnectionClosed(QuicErrorCode error,
    344                                            bool from_peer) {
    345   DCHECK(!connection()->connected());
    346   logger_.OnConnectionClosed(error, from_peer);
    347   if (from_peer) {
    348     UMA_HISTOGRAM_SPARSE_SLOWLY(
    349         "Net.QuicSession.ConnectionCloseErrorCodeServer", error);
    350   } else {
    351     UMA_HISTOGRAM_SPARSE_SLOWLY(
    352         "Net.QuicSession.ConnectionCloseErrorCodeClient", error);
    353   }
    354 
    355   if (error == QUIC_CONNECTION_TIMED_OUT) {
    356     UMA_HISTOGRAM_COUNTS(
    357         "Net.QuicSession.ConnectionClose.NumOpenStreams.TimedOut",
    358         GetNumOpenStreams());
    359     if (!IsCryptoHandshakeConfirmed()) {
    360       // If there have been any streams created, they were 0-RTT speculative
    361       // requests that have not be serviced.
    362       UMA_HISTOGRAM_COUNTS(
    363           "Net.QuicSession.ConnectionClose.NumTotalStreams.HandshakeTimedOut",
    364           num_total_streams_);
    365     }
    366   }
    367 
    368   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.QuicVersion",
    369                               connection()->version());
    370   NotifyFactoryOfSessionGoingAway();
    371   if (!callback_.is_null()) {
    372     base::ResetAndReturn(&callback_).Run(ERR_QUIC_PROTOCOL_ERROR);
    373   }
    374   socket_->Close();
    375   QuicSession::OnConnectionClosed(error, from_peer);
    376   DCHECK(streams()->empty());
    377   CloseAllStreams(ERR_UNEXPECTED);
    378   CloseAllObservers(ERR_UNEXPECTED);
    379   NotifyFactoryOfSessionClosedLater();
    380 }
    381 
    382 void QuicClientSession::OnSuccessfulVersionNegotiation(
    383     const QuicVersion& version) {
    384   logger_.OnSuccessfulVersionNegotiation(version);
    385   QuicSession::OnSuccessfulVersionNegotiation(version);
    386 }
    387 
    388 void QuicClientSession::StartReading() {
    389   if (read_pending_) {
    390     return;
    391   }
    392   read_pending_ = true;
    393   int rv = socket_->Read(read_buffer_.get(),
    394                          read_buffer_->size(),
    395                          base::Bind(&QuicClientSession::OnReadComplete,
    396                                     weak_factory_.GetWeakPtr()));
    397   if (rv == ERR_IO_PENDING) {
    398     num_packets_read_ = 0;
    399     return;
    400   }
    401 
    402   if (++num_packets_read_ > 32) {
    403     num_packets_read_ = 0;
    404     // Data was read, process it.
    405     // Schedule the work through the message loop to avoid recursive
    406     // callbacks.
    407     base::MessageLoop::current()->PostTask(
    408         FROM_HERE,
    409         base::Bind(&QuicClientSession::OnReadComplete,
    410                    weak_factory_.GetWeakPtr(), rv));
    411   } else {
    412     OnReadComplete(rv);
    413   }
    414 }
    415 
    416 void QuicClientSession::CloseSessionOnError(int error) {
    417   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.CloseSessionOnError", -error);
    418   CloseSessionOnErrorInner(error, QUIC_INTERNAL_ERROR);
    419   NotifyFactoryOfSessionClosed();
    420 }
    421 
    422 void QuicClientSession::CloseSessionOnErrorInner(int net_error,
    423                                                  QuicErrorCode quic_error) {
    424   if (!callback_.is_null()) {
    425     base::ResetAndReturn(&callback_).Run(net_error);
    426   }
    427   CloseAllStreams(net_error);
    428   CloseAllObservers(net_error);
    429   net_log_.AddEvent(
    430       NetLog::TYPE_QUIC_SESSION_CLOSE_ON_ERROR,
    431       NetLog::IntegerCallback("net_error", net_error));
    432 
    433   connection()->CloseConnection(quic_error, false);
    434   DCHECK(!connection()->connected());
    435 }
    436 
    437 void QuicClientSession::CloseAllStreams(int net_error) {
    438   while (!streams()->empty()) {
    439     ReliableQuicStream* stream = streams()->begin()->second;
    440     QuicStreamId id = stream->id();
    441     static_cast<QuicReliableClientStream*>(stream)->OnError(net_error);
    442     CloseStream(id);
    443   }
    444 }
    445 
    446 void QuicClientSession::CloseAllObservers(int net_error) {
    447   while (!observers_.empty()) {
    448     Observer* observer = *observers_.begin();
    449     observers_.erase(observer);
    450     observer->OnSessionClosed(net_error);
    451   }
    452 }
    453 
    454 base::Value* QuicClientSession::GetInfoAsValue(const HostPortPair& pair) const {
    455   base::DictionaryValue* dict = new base::DictionaryValue();
    456   dict->SetString("host_port_pair", pair.ToString());
    457   dict->SetString("version", QuicVersionToString(connection()->version()));
    458   dict->SetInteger("open_streams", GetNumOpenStreams());
    459   dict->SetInteger("total_streams", num_total_streams_);
    460   dict->SetString("peer_address", peer_address().ToString());
    461   dict->SetString("guid", base::Uint64ToString(guid()));
    462   dict->SetBoolean("connected", connection()->connected());
    463   return dict;
    464 }
    465 
    466 base::WeakPtr<QuicClientSession> QuicClientSession::GetWeakPtr() {
    467   return weak_factory_.GetWeakPtr();
    468 }
    469 
    470 void QuicClientSession::OnReadComplete(int result) {
    471   read_pending_ = false;
    472   if (result == 0)
    473     result = ERR_CONNECTION_CLOSED;
    474 
    475   if (result < 0) {
    476     DVLOG(1) << "Closing session on read error: " << result;
    477     UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.ReadError", -result);
    478     NotifyFactoryOfSessionGoingAway();
    479     CloseSessionOnErrorInner(result, QUIC_PACKET_READ_ERROR);
    480     NotifyFactoryOfSessionClosedLater();
    481     return;
    482   }
    483 
    484   scoped_refptr<IOBufferWithSize> buffer(read_buffer_);
    485   read_buffer_ = new IOBufferWithSize(kMaxPacketSize);
    486   QuicEncryptedPacket packet(buffer->data(), result);
    487   IPEndPoint local_address;
    488   IPEndPoint peer_address;
    489   socket_->GetLocalAddress(&local_address);
    490   socket_->GetPeerAddress(&peer_address);
    491   // ProcessUdpPacket might result in |this| being deleted, so we
    492   // use a weak pointer to be safe.
    493   connection()->ProcessUdpPacket(local_address, peer_address, packet);
    494   if (!connection()->connected()) {
    495     stream_factory_->OnSessionClosed(this);
    496     return;
    497   }
    498   StartReading();
    499 }
    500 
    501 void QuicClientSession::NotifyFactoryOfSessionGoingAway() {
    502   if (stream_factory_)
    503     stream_factory_->OnSessionGoingAway(this);
    504 }
    505 
    506 void QuicClientSession::NotifyFactoryOfSessionClosedLater() {
    507   DCHECK_EQ(0u, GetNumOpenStreams());
    508   DCHECK(!connection()->connected());
    509   base::MessageLoop::current()->PostTask(
    510       FROM_HERE,
    511       base::Bind(&QuicClientSession::NotifyFactoryOfSessionClosed,
    512                  weak_factory_.GetWeakPtr()));
    513 }
    514 
    515 void QuicClientSession::NotifyFactoryOfSessionClosed() {
    516   DCHECK_EQ(0u, GetNumOpenStreams());
    517   // Will delete |this|.
    518   if (stream_factory_)
    519     stream_factory_->OnSessionClosed(this);
    520 }
    521 
    522 }  // namespace net
    523