Home | History | Annotate | Download | only in quic
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/quic/quic_stream_factory.h"
      6 
      7 #include <set>
      8 
      9 #include "base/logging.h"
     10 #include "base/message_loop/message_loop.h"
     11 #include "base/message_loop/message_loop_proxy.h"
     12 #include "base/rand_util.h"
     13 #include "base/stl_util.h"
     14 #include "base/values.h"
     15 #include "net/base/net_errors.h"
     16 #include "net/cert/cert_verifier.h"
     17 #include "net/dns/host_resolver.h"
     18 #include "net/dns/single_request_host_resolver.h"
     19 #include "net/quic/crypto/proof_verifier_chromium.h"
     20 #include "net/quic/crypto/quic_random.h"
     21 #include "net/quic/quic_client_session.h"
     22 #include "net/quic/quic_clock.h"
     23 #include "net/quic/quic_connection.h"
     24 #include "net/quic/quic_connection_helper.h"
     25 #include "net/quic/quic_crypto_client_stream_factory.h"
     26 #include "net/quic/quic_http_stream.h"
     27 #include "net/quic/quic_protocol.h"
     28 #include "net/socket/client_socket_factory.h"
     29 
     30 namespace net {
     31 
     32 // Responsible for creating a new QUIC session to the specified server, and
     33 // for notifying any associated requests when complete.
     34 class QuicStreamFactory::Job {
     35  public:
     36   Job(QuicStreamFactory* factory,
     37       HostResolver* host_resolver,
     38       const HostPortProxyPair& host_port_proxy_pair,
     39       bool is_https,
     40       CertVerifier* cert_verifier,
     41       const BoundNetLog& net_log);
     42 
     43   ~Job();
     44 
     45   int Run(const CompletionCallback& callback);
     46 
     47   int DoLoop(int rv);
     48   int DoResolveHost();
     49   int DoResolveHostComplete(int rv);
     50   int DoConnect();
     51   int DoConnectComplete(int rv);
     52 
     53   void OnIOComplete(int rv);
     54 
     55   CompletionCallback callback() {
     56     return callback_;
     57   }
     58 
     59   const HostPortProxyPair& host_port_proxy_pair() const {
     60     return host_port_proxy_pair_;
     61   }
     62 
     63  private:
     64   enum IoState {
     65     STATE_NONE,
     66     STATE_RESOLVE_HOST,
     67     STATE_RESOLVE_HOST_COMPLETE,
     68     STATE_CONNECT,
     69     STATE_CONNECT_COMPLETE,
     70   };
     71   IoState io_state_;
     72 
     73   QuicStreamFactory* factory_;
     74   SingleRequestHostResolver host_resolver_;
     75   const HostPortProxyPair host_port_proxy_pair_;
     76   bool is_https_;
     77   CertVerifier* cert_verifier_;
     78   const BoundNetLog net_log_;
     79   QuicClientSession* session_;
     80   CompletionCallback callback_;
     81   AddressList address_list_;
     82   DISALLOW_COPY_AND_ASSIGN(Job);
     83 };
     84 
     85 QuicStreamFactory::Job::Job(
     86     QuicStreamFactory* factory,
     87     HostResolver* host_resolver,
     88     const HostPortProxyPair& host_port_proxy_pair,
     89     bool is_https,
     90     CertVerifier* cert_verifier,
     91     const BoundNetLog& net_log)
     92     : factory_(factory),
     93       host_resolver_(host_resolver),
     94       host_port_proxy_pair_(host_port_proxy_pair),
     95       is_https_(is_https),
     96       cert_verifier_(cert_verifier),
     97       net_log_(net_log) {
     98 }
     99 
    100 QuicStreamFactory::Job::~Job() {
    101 }
    102 
    103 int QuicStreamFactory::Job::Run(const CompletionCallback& callback) {
    104   io_state_ = STATE_RESOLVE_HOST;
    105   int rv = DoLoop(OK);
    106   if (rv == ERR_IO_PENDING)
    107     callback_ = callback;
    108 
    109   return rv > 0 ? OK : rv;
    110 }
    111 
    112 int QuicStreamFactory::Job::DoLoop(int rv) {
    113   do {
    114     IoState state = io_state_;
    115     io_state_ = STATE_NONE;
    116     switch (state) {
    117       case STATE_RESOLVE_HOST:
    118         CHECK_EQ(OK, rv);
    119         rv = DoResolveHost();
    120         break;
    121       case STATE_RESOLVE_HOST_COMPLETE:
    122         rv = DoResolveHostComplete(rv);
    123         break;
    124       case STATE_CONNECT:
    125         CHECK_EQ(OK, rv);
    126         rv = DoConnect();
    127         break;
    128       case STATE_CONNECT_COMPLETE:
    129         rv = DoConnectComplete(rv);
    130         break;
    131       default:
    132         NOTREACHED() << "io_state_: " << io_state_;
    133         break;
    134     }
    135   } while (io_state_ != STATE_NONE && rv != ERR_IO_PENDING);
    136   return rv;
    137 }
    138 
    139 void QuicStreamFactory::Job::OnIOComplete(int rv) {
    140   rv = DoLoop(rv);
    141 
    142   if (rv != ERR_IO_PENDING && !callback_.is_null()) {
    143     callback_.Run(rv);
    144   }
    145 }
    146 
    147 int QuicStreamFactory::Job::DoResolveHost() {
    148   io_state_ = STATE_RESOLVE_HOST_COMPLETE;
    149   return host_resolver_.Resolve(
    150       HostResolver::RequestInfo(host_port_proxy_pair_.first), &address_list_,
    151       base::Bind(&QuicStreamFactory::Job::OnIOComplete,
    152                  base::Unretained(this)),
    153       net_log_);
    154 }
    155 
    156 int QuicStreamFactory::Job::DoResolveHostComplete(int rv) {
    157   if (rv != OK)
    158     return rv;
    159 
    160   // TODO(rch): remove this code!
    161   AddressList::iterator it = address_list_.begin();
    162   while (it != address_list_.end()) {
    163     if (it->GetFamily() == ADDRESS_FAMILY_IPV6) {
    164       it = address_list_.erase(it);
    165     } else {
    166       it++;
    167     }
    168   }
    169 
    170   DCHECK(!factory_->HasActiveSession(host_port_proxy_pair_));
    171   io_state_ = STATE_CONNECT;
    172   return OK;
    173 }
    174 
    175 QuicStreamRequest::QuicStreamRequest(QuicStreamFactory* factory)
    176     : factory_(factory) {}
    177 
    178 QuicStreamRequest::~QuicStreamRequest() {
    179   if (factory_ && !callback_.is_null())
    180     factory_->CancelRequest(this);
    181 }
    182 
    183 int QuicStreamRequest::Request(
    184     const HostPortProxyPair& host_port_proxy_pair,
    185     bool is_https,
    186     CertVerifier* cert_verifier,
    187     const BoundNetLog& net_log,
    188     const CompletionCallback& callback) {
    189   DCHECK(!stream_);
    190   DCHECK(callback_.is_null());
    191   int rv = factory_->Create(host_port_proxy_pair, is_https, cert_verifier,
    192                             net_log, this);
    193   if (rv == ERR_IO_PENDING) {
    194     host_port_proxy_pair_ = host_port_proxy_pair;
    195     is_https_ = is_https;
    196     cert_verifier_ = cert_verifier;
    197     net_log_ = net_log;
    198     callback_ = callback;
    199   } else {
    200     factory_ = NULL;
    201   }
    202   if (rv == OK)
    203     DCHECK(stream_);
    204   return rv;
    205 }
    206 
    207 void QuicStreamRequest::set_stream(scoped_ptr<QuicHttpStream> stream) {
    208   DCHECK(stream);
    209   stream_ = stream.Pass();
    210 }
    211 
    212 void QuicStreamRequest::OnRequestComplete(int rv) {
    213   factory_ = NULL;
    214   callback_.Run(rv);
    215 }
    216 
    217 scoped_ptr<QuicHttpStream> QuicStreamRequest::ReleaseStream() {
    218   DCHECK(stream_);
    219   return stream_.Pass();
    220 }
    221 
    222 int QuicStreamFactory::Job::DoConnect() {
    223   io_state_ = STATE_CONNECT_COMPLETE;
    224 
    225   session_ = factory_->CreateSession(host_port_proxy_pair_, is_https_,
    226                                      cert_verifier_, address_list_, net_log_);
    227   session_->StartReading();
    228   int rv = session_->CryptoConnect(
    229       base::Bind(&QuicStreamFactory::Job::OnIOComplete,
    230                  base::Unretained(this)));
    231   return rv;
    232 }
    233 
    234 int QuicStreamFactory::Job::DoConnectComplete(int rv) {
    235   if (rv != OK)
    236     return rv;
    237 
    238   DCHECK(!factory_->HasActiveSession(host_port_proxy_pair_));
    239   factory_->ActivateSession(host_port_proxy_pair_, session_);
    240 
    241   return OK;
    242 }
    243 
    244 QuicStreamFactory::QuicStreamFactory(
    245     HostResolver* host_resolver,
    246     ClientSocketFactory* client_socket_factory,
    247     QuicCryptoClientStreamFactory* quic_crypto_client_stream_factory,
    248     QuicRandom* random_generator,
    249     QuicClock* clock)
    250     : host_resolver_(host_resolver),
    251       client_socket_factory_(client_socket_factory),
    252       quic_crypto_client_stream_factory_(quic_crypto_client_stream_factory),
    253       random_generator_(random_generator),
    254       clock_(clock),
    255       weak_factory_(this) {
    256   config_.SetDefaults();
    257   config_.set_idle_connection_state_lifetime(
    258       QuicTime::Delta::FromSeconds(30),
    259       QuicTime::Delta::FromSeconds(30));
    260 }
    261 
    262 QuicStreamFactory::~QuicStreamFactory() {
    263   STLDeleteElements(&all_sessions_);
    264   STLDeleteValues(&active_jobs_);
    265   STLDeleteValues(&all_crypto_configs_);
    266 }
    267 
    268 int QuicStreamFactory::Create(const HostPortProxyPair& host_port_proxy_pair,
    269                               bool is_https,
    270                               CertVerifier* cert_verifier,
    271                               const BoundNetLog& net_log,
    272                               QuicStreamRequest* request) {
    273   if (HasActiveSession(host_port_proxy_pair)) {
    274     request->set_stream(CreateIfSessionExists(host_port_proxy_pair, net_log));
    275     return OK;
    276   }
    277 
    278   if (HasActiveJob(host_port_proxy_pair)) {
    279     Job* job = active_jobs_[host_port_proxy_pair];
    280     active_requests_[request] = job;
    281     job_requests_map_[job].insert(request);
    282     return ERR_IO_PENDING;
    283   }
    284 
    285   scoped_ptr<Job> job(new Job(this, host_resolver_, host_port_proxy_pair,
    286                               is_https, cert_verifier, net_log));
    287   int rv = job->Run(base::Bind(&QuicStreamFactory::OnJobComplete,
    288                                base::Unretained(this), job.get()));
    289 
    290   if (rv == ERR_IO_PENDING) {
    291     active_requests_[request] = job.get();
    292     job_requests_map_[job.get()].insert(request);
    293     active_jobs_[host_port_proxy_pair] = job.release();
    294   }
    295   if (rv == OK) {
    296     DCHECK(HasActiveSession(host_port_proxy_pair));
    297     request->set_stream(CreateIfSessionExists(host_port_proxy_pair, net_log));
    298   }
    299   return rv;
    300 }
    301 
    302 void QuicStreamFactory::OnJobComplete(Job* job, int rv) {
    303   if (rv == OK) {
    304     // Create all the streams, but do not notify them yet.
    305     for (RequestSet::iterator it = job_requests_map_[job].begin();
    306          it != job_requests_map_[job].end() ; ++it) {
    307       DCHECK(HasActiveSession(job->host_port_proxy_pair()));
    308       (*it)->set_stream(CreateIfSessionExists(job->host_port_proxy_pair(),
    309                                               (*it)->net_log()));
    310     }
    311   }
    312   while (!job_requests_map_[job].empty()) {
    313     RequestSet::iterator it = job_requests_map_[job].begin();
    314     QuicStreamRequest* request = *it;
    315     job_requests_map_[job].erase(it);
    316     active_requests_.erase(request);
    317     // Even though we're invoking callbacks here, we don't need to worry
    318     // about |this| being deleted, because the factory is owned by the
    319     // profile which can not be deleted via callbacks.
    320     request->OnRequestComplete(rv);
    321   }
    322   active_jobs_.erase(job->host_port_proxy_pair());
    323   job_requests_map_.erase(job);
    324   delete job;
    325   return;
    326 }
    327 
    328 // Returns a newly created QuicHttpStream owned by the caller, if a
    329 // matching session already exists.  Returns NULL otherwise.
    330 scoped_ptr<QuicHttpStream> QuicStreamFactory::CreateIfSessionExists(
    331     const HostPortProxyPair& host_port_proxy_pair,
    332     const BoundNetLog& net_log) {
    333   if (!HasActiveSession(host_port_proxy_pair)) {
    334     DLOG(INFO) << "No active session";
    335     return scoped_ptr<QuicHttpStream>();
    336   }
    337 
    338   QuicClientSession* session = active_sessions_[host_port_proxy_pair];
    339   DCHECK(session);
    340   return scoped_ptr<QuicHttpStream>(new QuicHttpStream(session->GetWeakPtr()));
    341 }
    342 
    343 void QuicStreamFactory::OnIdleSession(QuicClientSession* session) {
    344 }
    345 
    346 void QuicStreamFactory::OnSessionClose(QuicClientSession* session) {
    347   DCHECK_EQ(0u, session->GetNumOpenStreams());
    348   const AliasSet& aliases = session_aliases_[session];
    349   for (AliasSet::const_iterator it = aliases.begin(); it != aliases.end();
    350        ++it) {
    351     DCHECK(active_sessions_.count(*it));
    352     DCHECK_EQ(session, active_sessions_[*it]);
    353     active_sessions_.erase(*it);
    354   }
    355   all_sessions_.erase(session);
    356   session_aliases_.erase(session);
    357   delete session;
    358 }
    359 
    360 void QuicStreamFactory::CancelRequest(QuicStreamRequest* request) {
    361   DCHECK(ContainsKey(active_requests_, request));
    362   Job* job = active_requests_[request];
    363   job_requests_map_[job].erase(request);
    364   active_requests_.erase(request);
    365 }
    366 
    367 void QuicStreamFactory::CloseAllSessions(int error) {
    368   while (!active_sessions_.empty()) {
    369     size_t initial_size = active_sessions_.size();
    370     active_sessions_.begin()->second->CloseSessionOnError(error);
    371     DCHECK_NE(initial_size, active_sessions_.size());
    372   }
    373   while (!all_sessions_.empty()) {
    374     size_t initial_size = all_sessions_.size();
    375     (*all_sessions_.begin())->CloseSessionOnError(error);
    376     DCHECK_NE(initial_size, all_sessions_.size());
    377   }
    378   DCHECK(all_sessions_.empty());
    379 }
    380 
    381 base::Value* QuicStreamFactory::QuicStreamFactoryInfoToValue() const {
    382   base::ListValue* list = new base::ListValue();
    383 
    384   for (SessionMap::const_iterator it = active_sessions_.begin();
    385        it != active_sessions_.end(); ++it) {
    386     const HostPortProxyPair& pair = it->first;
    387     const QuicClientSession* session = it->second;
    388 
    389     list->Append(session->GetInfoAsValue(pair.first));
    390   }
    391   return list;
    392 }
    393 
    394 void QuicStreamFactory::OnIPAddressChanged() {
    395   CloseAllSessions(ERR_NETWORK_CHANGED);
    396 }
    397 
    398 bool QuicStreamFactory::HasActiveSession(
    399     const HostPortProxyPair& host_port_proxy_pair) {
    400   return ContainsKey(active_sessions_, host_port_proxy_pair);
    401 }
    402 
    403 QuicClientSession* QuicStreamFactory::CreateSession(
    404     const HostPortProxyPair& host_port_proxy_pair,
    405     bool is_https,
    406     CertVerifier* cert_verifier,
    407     const AddressList& address_list,
    408     const BoundNetLog& net_log) {
    409   QuicGuid guid = random_generator_->RandUint64();
    410   IPEndPoint addr = *address_list.begin();
    411   DatagramClientSocket* socket =
    412       client_socket_factory_->CreateDatagramClientSocket(
    413           DatagramSocket::DEFAULT_BIND, base::Bind(&base::RandInt),
    414           net_log.net_log(), net_log.source());
    415   socket->Connect(addr);
    416 
    417   // We should adaptively set this buffer size, but for now, we'll use a size
    418   // that is more than large enough for a 100 packet congestion window, and yet
    419   // does not consume "too much" memory.  If we see bursty packet loss, we may
    420   // revisit this setting and test for its impact.
    421   const int32 kSocketBufferSize(kMaxPacketSize * 100);  // Support 100 packets.
    422   socket->SetReceiveBufferSize(kSocketBufferSize);
    423   // TODO(jar): What should the UDP send buffer be set to?  If the send buffer
    424   // is too large, then we might(?) wastefully queue packets in the OS, when
    425   // we'd rather construct packets just in time. We do however expect that the
    426   // calculated send rate (paced, or ack clocked), will be well below the egress
    427   // rate of the local machine, so that *shouldn't* be a problem.
    428   // If the buffer setting is too small, then we will starve our outgoing link
    429   // on a fast connection, because we won't respond fast enough to the many
    430   // async callbacks to get data from us.  On the other hand, until we have real
    431   // pacing support (beyond ack-clocked pacing), we get a bit of adhoc-pacing by
    432   // requiring the application to refill this OS buffer (ensuring that we don't
    433   // blast a pile of packets at the kernel's max egress rate).
    434   // socket->SetSendBufferSize(????);
    435 
    436   QuicConnectionHelper* helper = new QuicConnectionHelper(
    437       base::MessageLoop::current()->message_loop_proxy().get(),
    438       clock_.get(),
    439       random_generator_,
    440       socket);
    441 
    442   QuicConnection* connection = new QuicConnection(guid, addr, helper, false,
    443                                                   QuicVersionMax());
    444 
    445   QuicCryptoClientConfig* crypto_config =
    446       GetOrCreateCryptoConfig(host_port_proxy_pair);
    447   DCHECK(crypto_config);
    448 
    449   QuicClientSession* session =
    450       new QuicClientSession(connection, socket, this,
    451                             quic_crypto_client_stream_factory_,
    452                             host_port_proxy_pair.first.host(), config_,
    453                             crypto_config, net_log.net_log());
    454   all_sessions_.insert(session);  // owning pointer
    455   if (is_https) {
    456     crypto_config->SetProofVerifier(
    457         new ProofVerifierChromium(cert_verifier, net_log));
    458   }
    459   return session;
    460 }
    461 
    462 bool QuicStreamFactory::HasActiveJob(
    463     const HostPortProxyPair& host_port_proxy_pair) {
    464   return ContainsKey(active_jobs_, host_port_proxy_pair);
    465 }
    466 
    467 void QuicStreamFactory::ActivateSession(
    468     const HostPortProxyPair& host_port_proxy_pair,
    469     QuicClientSession* session) {
    470   DCHECK(!HasActiveSession(host_port_proxy_pair));
    471   active_sessions_[host_port_proxy_pair] = session;
    472   session_aliases_[session].insert(host_port_proxy_pair);
    473 }
    474 
    475 QuicCryptoClientConfig* QuicStreamFactory::GetOrCreateCryptoConfig(
    476     const HostPortProxyPair& host_port_proxy_pair) {
    477   QuicCryptoClientConfig* crypto_config;
    478   if (ContainsKey(all_crypto_configs_, host_port_proxy_pair)) {
    479     crypto_config = all_crypto_configs_[host_port_proxy_pair];
    480     DCHECK(crypto_config);
    481   } else {
    482     crypto_config = new QuicCryptoClientConfig();
    483     crypto_config->SetDefaults();
    484     all_crypto_configs_[host_port_proxy_pair] = crypto_config;
    485   }
    486   return crypto_config;
    487 }
    488 
    489 }  // namespace net
    490