1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/quic/quic_stream_factory.h" 6 7 #include <set> 8 9 #include "base/logging.h" 10 #include "base/message_loop/message_loop.h" 11 #include "base/message_loop/message_loop_proxy.h" 12 #include "base/rand_util.h" 13 #include "base/stl_util.h" 14 #include "base/values.h" 15 #include "net/base/net_errors.h" 16 #include "net/cert/cert_verifier.h" 17 #include "net/dns/host_resolver.h" 18 #include "net/dns/single_request_host_resolver.h" 19 #include "net/quic/crypto/proof_verifier_chromium.h" 20 #include "net/quic/crypto/quic_random.h" 21 #include "net/quic/quic_client_session.h" 22 #include "net/quic/quic_clock.h" 23 #include "net/quic/quic_connection.h" 24 #include "net/quic/quic_connection_helper.h" 25 #include "net/quic/quic_crypto_client_stream_factory.h" 26 #include "net/quic/quic_http_stream.h" 27 #include "net/quic/quic_protocol.h" 28 #include "net/socket/client_socket_factory.h" 29 30 namespace net { 31 32 // Responsible for creating a new QUIC session to the specified server, and 33 // for notifying any associated requests when complete. 34 class QuicStreamFactory::Job { 35 public: 36 Job(QuicStreamFactory* factory, 37 HostResolver* host_resolver, 38 const HostPortProxyPair& host_port_proxy_pair, 39 bool is_https, 40 CertVerifier* cert_verifier, 41 const BoundNetLog& net_log); 42 43 ~Job(); 44 45 int Run(const CompletionCallback& callback); 46 47 int DoLoop(int rv); 48 int DoResolveHost(); 49 int DoResolveHostComplete(int rv); 50 int DoConnect(); 51 int DoConnectComplete(int rv); 52 53 void OnIOComplete(int rv); 54 55 CompletionCallback callback() { 56 return callback_; 57 } 58 59 const HostPortProxyPair& host_port_proxy_pair() const { 60 return host_port_proxy_pair_; 61 } 62 63 private: 64 enum IoState { 65 STATE_NONE, 66 STATE_RESOLVE_HOST, 67 STATE_RESOLVE_HOST_COMPLETE, 68 STATE_CONNECT, 69 STATE_CONNECT_COMPLETE, 70 }; 71 IoState io_state_; 72 73 QuicStreamFactory* factory_; 74 SingleRequestHostResolver host_resolver_; 75 const HostPortProxyPair host_port_proxy_pair_; 76 bool is_https_; 77 CertVerifier* cert_verifier_; 78 const BoundNetLog net_log_; 79 QuicClientSession* session_; 80 CompletionCallback callback_; 81 AddressList address_list_; 82 DISALLOW_COPY_AND_ASSIGN(Job); 83 }; 84 85 QuicStreamFactory::Job::Job( 86 QuicStreamFactory* factory, 87 HostResolver* host_resolver, 88 const HostPortProxyPair& host_port_proxy_pair, 89 bool is_https, 90 CertVerifier* cert_verifier, 91 const BoundNetLog& net_log) 92 : factory_(factory), 93 host_resolver_(host_resolver), 94 host_port_proxy_pair_(host_port_proxy_pair), 95 is_https_(is_https), 96 cert_verifier_(cert_verifier), 97 net_log_(net_log) { 98 } 99 100 QuicStreamFactory::Job::~Job() { 101 } 102 103 int QuicStreamFactory::Job::Run(const CompletionCallback& callback) { 104 io_state_ = STATE_RESOLVE_HOST; 105 int rv = DoLoop(OK); 106 if (rv == ERR_IO_PENDING) 107 callback_ = callback; 108 109 return rv > 0 ? OK : rv; 110 } 111 112 int QuicStreamFactory::Job::DoLoop(int rv) { 113 do { 114 IoState state = io_state_; 115 io_state_ = STATE_NONE; 116 switch (state) { 117 case STATE_RESOLVE_HOST: 118 CHECK_EQ(OK, rv); 119 rv = DoResolveHost(); 120 break; 121 case STATE_RESOLVE_HOST_COMPLETE: 122 rv = DoResolveHostComplete(rv); 123 break; 124 case STATE_CONNECT: 125 CHECK_EQ(OK, rv); 126 rv = DoConnect(); 127 break; 128 case STATE_CONNECT_COMPLETE: 129 rv = DoConnectComplete(rv); 130 break; 131 default: 132 NOTREACHED() << "io_state_: " << io_state_; 133 break; 134 } 135 } while (io_state_ != STATE_NONE && rv != ERR_IO_PENDING); 136 return rv; 137 } 138 139 void QuicStreamFactory::Job::OnIOComplete(int rv) { 140 rv = DoLoop(rv); 141 142 if (rv != ERR_IO_PENDING && !callback_.is_null()) { 143 callback_.Run(rv); 144 } 145 } 146 147 int QuicStreamFactory::Job::DoResolveHost() { 148 io_state_ = STATE_RESOLVE_HOST_COMPLETE; 149 return host_resolver_.Resolve( 150 HostResolver::RequestInfo(host_port_proxy_pair_.first), &address_list_, 151 base::Bind(&QuicStreamFactory::Job::OnIOComplete, 152 base::Unretained(this)), 153 net_log_); 154 } 155 156 int QuicStreamFactory::Job::DoResolveHostComplete(int rv) { 157 if (rv != OK) 158 return rv; 159 160 // TODO(rch): remove this code! 161 AddressList::iterator it = address_list_.begin(); 162 while (it != address_list_.end()) { 163 if (it->GetFamily() == ADDRESS_FAMILY_IPV6) { 164 it = address_list_.erase(it); 165 } else { 166 it++; 167 } 168 } 169 170 DCHECK(!factory_->HasActiveSession(host_port_proxy_pair_)); 171 io_state_ = STATE_CONNECT; 172 return OK; 173 } 174 175 QuicStreamRequest::QuicStreamRequest(QuicStreamFactory* factory) 176 : factory_(factory) {} 177 178 QuicStreamRequest::~QuicStreamRequest() { 179 if (factory_ && !callback_.is_null()) 180 factory_->CancelRequest(this); 181 } 182 183 int QuicStreamRequest::Request( 184 const HostPortProxyPair& host_port_proxy_pair, 185 bool is_https, 186 CertVerifier* cert_verifier, 187 const BoundNetLog& net_log, 188 const CompletionCallback& callback) { 189 DCHECK(!stream_); 190 DCHECK(callback_.is_null()); 191 int rv = factory_->Create(host_port_proxy_pair, is_https, cert_verifier, 192 net_log, this); 193 if (rv == ERR_IO_PENDING) { 194 host_port_proxy_pair_ = host_port_proxy_pair; 195 is_https_ = is_https; 196 cert_verifier_ = cert_verifier; 197 net_log_ = net_log; 198 callback_ = callback; 199 } else { 200 factory_ = NULL; 201 } 202 if (rv == OK) 203 DCHECK(stream_); 204 return rv; 205 } 206 207 void QuicStreamRequest::set_stream(scoped_ptr<QuicHttpStream> stream) { 208 DCHECK(stream); 209 stream_ = stream.Pass(); 210 } 211 212 void QuicStreamRequest::OnRequestComplete(int rv) { 213 factory_ = NULL; 214 callback_.Run(rv); 215 } 216 217 scoped_ptr<QuicHttpStream> QuicStreamRequest::ReleaseStream() { 218 DCHECK(stream_); 219 return stream_.Pass(); 220 } 221 222 int QuicStreamFactory::Job::DoConnect() { 223 io_state_ = STATE_CONNECT_COMPLETE; 224 225 session_ = factory_->CreateSession(host_port_proxy_pair_, is_https_, 226 cert_verifier_, address_list_, net_log_); 227 session_->StartReading(); 228 int rv = session_->CryptoConnect( 229 base::Bind(&QuicStreamFactory::Job::OnIOComplete, 230 base::Unretained(this))); 231 return rv; 232 } 233 234 int QuicStreamFactory::Job::DoConnectComplete(int rv) { 235 if (rv != OK) 236 return rv; 237 238 DCHECK(!factory_->HasActiveSession(host_port_proxy_pair_)); 239 factory_->ActivateSession(host_port_proxy_pair_, session_); 240 241 return OK; 242 } 243 244 QuicStreamFactory::QuicStreamFactory( 245 HostResolver* host_resolver, 246 ClientSocketFactory* client_socket_factory, 247 QuicCryptoClientStreamFactory* quic_crypto_client_stream_factory, 248 QuicRandom* random_generator, 249 QuicClock* clock) 250 : host_resolver_(host_resolver), 251 client_socket_factory_(client_socket_factory), 252 quic_crypto_client_stream_factory_(quic_crypto_client_stream_factory), 253 random_generator_(random_generator), 254 clock_(clock), 255 weak_factory_(this) { 256 config_.SetDefaults(); 257 config_.set_idle_connection_state_lifetime( 258 QuicTime::Delta::FromSeconds(30), 259 QuicTime::Delta::FromSeconds(30)); 260 } 261 262 QuicStreamFactory::~QuicStreamFactory() { 263 STLDeleteElements(&all_sessions_); 264 STLDeleteValues(&active_jobs_); 265 STLDeleteValues(&all_crypto_configs_); 266 } 267 268 int QuicStreamFactory::Create(const HostPortProxyPair& host_port_proxy_pair, 269 bool is_https, 270 CertVerifier* cert_verifier, 271 const BoundNetLog& net_log, 272 QuicStreamRequest* request) { 273 if (HasActiveSession(host_port_proxy_pair)) { 274 request->set_stream(CreateIfSessionExists(host_port_proxy_pair, net_log)); 275 return OK; 276 } 277 278 if (HasActiveJob(host_port_proxy_pair)) { 279 Job* job = active_jobs_[host_port_proxy_pair]; 280 active_requests_[request] = job; 281 job_requests_map_[job].insert(request); 282 return ERR_IO_PENDING; 283 } 284 285 scoped_ptr<Job> job(new Job(this, host_resolver_, host_port_proxy_pair, 286 is_https, cert_verifier, net_log)); 287 int rv = job->Run(base::Bind(&QuicStreamFactory::OnJobComplete, 288 base::Unretained(this), job.get())); 289 290 if (rv == ERR_IO_PENDING) { 291 active_requests_[request] = job.get(); 292 job_requests_map_[job.get()].insert(request); 293 active_jobs_[host_port_proxy_pair] = job.release(); 294 } 295 if (rv == OK) { 296 DCHECK(HasActiveSession(host_port_proxy_pair)); 297 request->set_stream(CreateIfSessionExists(host_port_proxy_pair, net_log)); 298 } 299 return rv; 300 } 301 302 void QuicStreamFactory::OnJobComplete(Job* job, int rv) { 303 if (rv == OK) { 304 // Create all the streams, but do not notify them yet. 305 for (RequestSet::iterator it = job_requests_map_[job].begin(); 306 it != job_requests_map_[job].end() ; ++it) { 307 DCHECK(HasActiveSession(job->host_port_proxy_pair())); 308 (*it)->set_stream(CreateIfSessionExists(job->host_port_proxy_pair(), 309 (*it)->net_log())); 310 } 311 } 312 while (!job_requests_map_[job].empty()) { 313 RequestSet::iterator it = job_requests_map_[job].begin(); 314 QuicStreamRequest* request = *it; 315 job_requests_map_[job].erase(it); 316 active_requests_.erase(request); 317 // Even though we're invoking callbacks here, we don't need to worry 318 // about |this| being deleted, because the factory is owned by the 319 // profile which can not be deleted via callbacks. 320 request->OnRequestComplete(rv); 321 } 322 active_jobs_.erase(job->host_port_proxy_pair()); 323 job_requests_map_.erase(job); 324 delete job; 325 return; 326 } 327 328 // Returns a newly created QuicHttpStream owned by the caller, if a 329 // matching session already exists. Returns NULL otherwise. 330 scoped_ptr<QuicHttpStream> QuicStreamFactory::CreateIfSessionExists( 331 const HostPortProxyPair& host_port_proxy_pair, 332 const BoundNetLog& net_log) { 333 if (!HasActiveSession(host_port_proxy_pair)) { 334 DLOG(INFO) << "No active session"; 335 return scoped_ptr<QuicHttpStream>(); 336 } 337 338 QuicClientSession* session = active_sessions_[host_port_proxy_pair]; 339 DCHECK(session); 340 return scoped_ptr<QuicHttpStream>(new QuicHttpStream(session->GetWeakPtr())); 341 } 342 343 void QuicStreamFactory::OnIdleSession(QuicClientSession* session) { 344 } 345 346 void QuicStreamFactory::OnSessionClose(QuicClientSession* session) { 347 DCHECK_EQ(0u, session->GetNumOpenStreams()); 348 const AliasSet& aliases = session_aliases_[session]; 349 for (AliasSet::const_iterator it = aliases.begin(); it != aliases.end(); 350 ++it) { 351 DCHECK(active_sessions_.count(*it)); 352 DCHECK_EQ(session, active_sessions_[*it]); 353 active_sessions_.erase(*it); 354 } 355 all_sessions_.erase(session); 356 session_aliases_.erase(session); 357 delete session; 358 } 359 360 void QuicStreamFactory::CancelRequest(QuicStreamRequest* request) { 361 DCHECK(ContainsKey(active_requests_, request)); 362 Job* job = active_requests_[request]; 363 job_requests_map_[job].erase(request); 364 active_requests_.erase(request); 365 } 366 367 void QuicStreamFactory::CloseAllSessions(int error) { 368 while (!active_sessions_.empty()) { 369 size_t initial_size = active_sessions_.size(); 370 active_sessions_.begin()->second->CloseSessionOnError(error); 371 DCHECK_NE(initial_size, active_sessions_.size()); 372 } 373 while (!all_sessions_.empty()) { 374 size_t initial_size = all_sessions_.size(); 375 (*all_sessions_.begin())->CloseSessionOnError(error); 376 DCHECK_NE(initial_size, all_sessions_.size()); 377 } 378 DCHECK(all_sessions_.empty()); 379 } 380 381 base::Value* QuicStreamFactory::QuicStreamFactoryInfoToValue() const { 382 base::ListValue* list = new base::ListValue(); 383 384 for (SessionMap::const_iterator it = active_sessions_.begin(); 385 it != active_sessions_.end(); ++it) { 386 const HostPortProxyPair& pair = it->first; 387 const QuicClientSession* session = it->second; 388 389 list->Append(session->GetInfoAsValue(pair.first)); 390 } 391 return list; 392 } 393 394 void QuicStreamFactory::OnIPAddressChanged() { 395 CloseAllSessions(ERR_NETWORK_CHANGED); 396 } 397 398 bool QuicStreamFactory::HasActiveSession( 399 const HostPortProxyPair& host_port_proxy_pair) { 400 return ContainsKey(active_sessions_, host_port_proxy_pair); 401 } 402 403 QuicClientSession* QuicStreamFactory::CreateSession( 404 const HostPortProxyPair& host_port_proxy_pair, 405 bool is_https, 406 CertVerifier* cert_verifier, 407 const AddressList& address_list, 408 const BoundNetLog& net_log) { 409 QuicGuid guid = random_generator_->RandUint64(); 410 IPEndPoint addr = *address_list.begin(); 411 DatagramClientSocket* socket = 412 client_socket_factory_->CreateDatagramClientSocket( 413 DatagramSocket::DEFAULT_BIND, base::Bind(&base::RandInt), 414 net_log.net_log(), net_log.source()); 415 socket->Connect(addr); 416 417 // We should adaptively set this buffer size, but for now, we'll use a size 418 // that is more than large enough for a 100 packet congestion window, and yet 419 // does not consume "too much" memory. If we see bursty packet loss, we may 420 // revisit this setting and test for its impact. 421 const int32 kSocketBufferSize(kMaxPacketSize * 100); // Support 100 packets. 422 socket->SetReceiveBufferSize(kSocketBufferSize); 423 // TODO(jar): What should the UDP send buffer be set to? If the send buffer 424 // is too large, then we might(?) wastefully queue packets in the OS, when 425 // we'd rather construct packets just in time. We do however expect that the 426 // calculated send rate (paced, or ack clocked), will be well below the egress 427 // rate of the local machine, so that *shouldn't* be a problem. 428 // If the buffer setting is too small, then we will starve our outgoing link 429 // on a fast connection, because we won't respond fast enough to the many 430 // async callbacks to get data from us. On the other hand, until we have real 431 // pacing support (beyond ack-clocked pacing), we get a bit of adhoc-pacing by 432 // requiring the application to refill this OS buffer (ensuring that we don't 433 // blast a pile of packets at the kernel's max egress rate). 434 // socket->SetSendBufferSize(????); 435 436 QuicConnectionHelper* helper = new QuicConnectionHelper( 437 base::MessageLoop::current()->message_loop_proxy().get(), 438 clock_.get(), 439 random_generator_, 440 socket); 441 442 QuicConnection* connection = new QuicConnection(guid, addr, helper, false, 443 QuicVersionMax()); 444 445 QuicCryptoClientConfig* crypto_config = 446 GetOrCreateCryptoConfig(host_port_proxy_pair); 447 DCHECK(crypto_config); 448 449 QuicClientSession* session = 450 new QuicClientSession(connection, socket, this, 451 quic_crypto_client_stream_factory_, 452 host_port_proxy_pair.first.host(), config_, 453 crypto_config, net_log.net_log()); 454 all_sessions_.insert(session); // owning pointer 455 if (is_https) { 456 crypto_config->SetProofVerifier( 457 new ProofVerifierChromium(cert_verifier, net_log)); 458 } 459 return session; 460 } 461 462 bool QuicStreamFactory::HasActiveJob( 463 const HostPortProxyPair& host_port_proxy_pair) { 464 return ContainsKey(active_jobs_, host_port_proxy_pair); 465 } 466 467 void QuicStreamFactory::ActivateSession( 468 const HostPortProxyPair& host_port_proxy_pair, 469 QuicClientSession* session) { 470 DCHECK(!HasActiveSession(host_port_proxy_pair)); 471 active_sessions_[host_port_proxy_pair] = session; 472 session_aliases_[session].insert(host_port_proxy_pair); 473 } 474 475 QuicCryptoClientConfig* QuicStreamFactory::GetOrCreateCryptoConfig( 476 const HostPortProxyPair& host_port_proxy_pair) { 477 QuicCryptoClientConfig* crypto_config; 478 if (ContainsKey(all_crypto_configs_, host_port_proxy_pair)) { 479 crypto_config = all_crypto_configs_[host_port_proxy_pair]; 480 DCHECK(crypto_config); 481 } else { 482 crypto_config = new QuicCryptoClientConfig(); 483 crypto_config->SetDefaults(); 484 all_crypto_configs_[host_port_proxy_pair] = crypto_config; 485 } 486 return crypto_config; 487 } 488 489 } // namespace net 490