1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/socket/client_socket_pool_base.h" 6 7 #include "base/compiler_specific.h" 8 #include "base/format_macros.h" 9 #include "base/logging.h" 10 #include "base/message_loop/message_loop.h" 11 #include "base/metrics/stats_counters.h" 12 #include "base/stl_util.h" 13 #include "base/strings/string_util.h" 14 #include "base/time/time.h" 15 #include "base/values.h" 16 #include "net/base/net_errors.h" 17 #include "net/base/net_log.h" 18 #include "net/socket/client_socket_handle.h" 19 20 using base::TimeDelta; 21 22 namespace net { 23 24 namespace { 25 26 // Indicate whether we should enable idle socket cleanup timer. When timer is 27 // disabled, sockets are closed next time a socket request is made. 28 bool g_cleanup_timer_enabled = true; 29 30 // The timeout value, in seconds, used to clean up idle sockets that can't be 31 // reused. 32 // 33 // Note: It's important to close idle sockets that have received data as soon 34 // as possible because the received data may cause BSOD on Windows XP under 35 // some conditions. See http://crbug.com/4606. 36 const int kCleanupInterval = 10; // DO NOT INCREASE THIS TIMEOUT. 37 38 // Indicate whether or not we should establish a new transport layer connection 39 // after a certain timeout has passed without receiving an ACK. 40 bool g_connect_backup_jobs_enabled = true; 41 42 // Compares the effective priority of two results, and returns 1 if |request1| 43 // has greater effective priority than |request2|, 0 if they have the same 44 // effective priority, and -1 if |request2| has the greater effective priority. 45 // Requests with |ignore_limits| set have higher effective priority than those 46 // without. If both requests have |ignore_limits| set/unset, then the request 47 // with the highest Pririoty has the highest effective priority. Does not take 48 // into account the fact that Requests are serviced in FIFO order if they would 49 // otherwise have the same priority. 50 int CompareEffectiveRequestPriority( 51 const internal::ClientSocketPoolBaseHelper::Request& request1, 52 const internal::ClientSocketPoolBaseHelper::Request& request2) { 53 if (request1.ignore_limits() && !request2.ignore_limits()) 54 return 1; 55 if (!request1.ignore_limits() && request2.ignore_limits()) 56 return -1; 57 if (request1.priority() > request2.priority()) 58 return 1; 59 if (request1.priority() < request2.priority()) 60 return -1; 61 return 0; 62 } 63 64 } // namespace 65 66 ConnectJob::ConnectJob(const std::string& group_name, 67 base::TimeDelta timeout_duration, 68 Delegate* delegate, 69 const BoundNetLog& net_log) 70 : group_name_(group_name), 71 timeout_duration_(timeout_duration), 72 delegate_(delegate), 73 net_log_(net_log), 74 idle_(true) { 75 DCHECK(!group_name.empty()); 76 DCHECK(delegate); 77 net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL_CONNECT_JOB, 78 NetLog::StringCallback("group_name", &group_name_)); 79 } 80 81 ConnectJob::~ConnectJob() { 82 net_log().EndEvent(NetLog::TYPE_SOCKET_POOL_CONNECT_JOB); 83 } 84 85 int ConnectJob::Connect() { 86 if (timeout_duration_ != base::TimeDelta()) 87 timer_.Start(FROM_HERE, timeout_duration_, this, &ConnectJob::OnTimeout); 88 89 idle_ = false; 90 91 LogConnectStart(); 92 93 int rv = ConnectInternal(); 94 95 if (rv != ERR_IO_PENDING) { 96 LogConnectCompletion(rv); 97 delegate_ = NULL; 98 } 99 100 return rv; 101 } 102 103 void ConnectJob::set_socket(StreamSocket* socket) { 104 if (socket) { 105 net_log().AddEvent(NetLog::TYPE_CONNECT_JOB_SET_SOCKET, 106 socket->NetLog().source().ToEventParametersCallback()); 107 } 108 socket_.reset(socket); 109 } 110 111 void ConnectJob::NotifyDelegateOfCompletion(int rv) { 112 // The delegate will delete |this|. 113 Delegate* delegate = delegate_; 114 delegate_ = NULL; 115 116 LogConnectCompletion(rv); 117 delegate->OnConnectJobComplete(rv, this); 118 } 119 120 void ConnectJob::ResetTimer(base::TimeDelta remaining_time) { 121 timer_.Stop(); 122 timer_.Start(FROM_HERE, remaining_time, this, &ConnectJob::OnTimeout); 123 } 124 125 void ConnectJob::LogConnectStart() { 126 connect_timing_.connect_start = base::TimeTicks::Now(); 127 net_log().BeginEvent(NetLog::TYPE_SOCKET_POOL_CONNECT_JOB_CONNECT); 128 } 129 130 void ConnectJob::LogConnectCompletion(int net_error) { 131 connect_timing_.connect_end = base::TimeTicks::Now(); 132 net_log().EndEventWithNetErrorCode( 133 NetLog::TYPE_SOCKET_POOL_CONNECT_JOB_CONNECT, net_error); 134 } 135 136 void ConnectJob::OnTimeout() { 137 // Make sure the socket is NULL before calling into |delegate|. 138 set_socket(NULL); 139 140 net_log_.AddEvent(NetLog::TYPE_SOCKET_POOL_CONNECT_JOB_TIMED_OUT); 141 142 NotifyDelegateOfCompletion(ERR_TIMED_OUT); 143 } 144 145 namespace internal { 146 147 ClientSocketPoolBaseHelper::Request::Request( 148 ClientSocketHandle* handle, 149 const CompletionCallback& callback, 150 RequestPriority priority, 151 bool ignore_limits, 152 Flags flags, 153 const BoundNetLog& net_log) 154 : handle_(handle), 155 callback_(callback), 156 priority_(priority), 157 ignore_limits_(ignore_limits), 158 flags_(flags), 159 net_log_(net_log) {} 160 161 ClientSocketPoolBaseHelper::Request::~Request() {} 162 163 ClientSocketPoolBaseHelper::ClientSocketPoolBaseHelper( 164 int max_sockets, 165 int max_sockets_per_group, 166 base::TimeDelta unused_idle_socket_timeout, 167 base::TimeDelta used_idle_socket_timeout, 168 ConnectJobFactory* connect_job_factory) 169 : idle_socket_count_(0), 170 connecting_socket_count_(0), 171 handed_out_socket_count_(0), 172 max_sockets_(max_sockets), 173 max_sockets_per_group_(max_sockets_per_group), 174 use_cleanup_timer_(g_cleanup_timer_enabled), 175 unused_idle_socket_timeout_(unused_idle_socket_timeout), 176 used_idle_socket_timeout_(used_idle_socket_timeout), 177 connect_job_factory_(connect_job_factory), 178 connect_backup_jobs_enabled_(false), 179 pool_generation_number_(0), 180 weak_factory_(this) { 181 DCHECK_LE(0, max_sockets_per_group); 182 DCHECK_LE(max_sockets_per_group, max_sockets); 183 184 NetworkChangeNotifier::AddIPAddressObserver(this); 185 } 186 187 ClientSocketPoolBaseHelper::~ClientSocketPoolBaseHelper() { 188 // Clean up any idle sockets and pending connect jobs. Assert that we have no 189 // remaining active sockets or pending requests. They should have all been 190 // cleaned up prior to |this| being destroyed. 191 FlushWithError(ERR_ABORTED); 192 DCHECK(group_map_.empty()); 193 DCHECK(pending_callback_map_.empty()); 194 DCHECK_EQ(0, connecting_socket_count_); 195 CHECK(higher_layer_pools_.empty()); 196 197 NetworkChangeNotifier::RemoveIPAddressObserver(this); 198 } 199 200 ClientSocketPoolBaseHelper::CallbackResultPair::CallbackResultPair() 201 : result(OK) { 202 } 203 204 ClientSocketPoolBaseHelper::CallbackResultPair::CallbackResultPair( 205 const CompletionCallback& callback_in, int result_in) 206 : callback(callback_in), 207 result(result_in) { 208 } 209 210 ClientSocketPoolBaseHelper::CallbackResultPair::~CallbackResultPair() {} 211 212 // static 213 void ClientSocketPoolBaseHelper::InsertRequestIntoQueue( 214 const Request* r, RequestQueue* pending_requests) { 215 RequestQueue::iterator it = pending_requests->begin(); 216 // TODO(mmenke): Should the network stack require requests with 217 // |ignore_limits| have the highest priority? 218 while (it != pending_requests->end() && 219 CompareEffectiveRequestPriority(*r, *(*it)) <= 0) { 220 ++it; 221 } 222 pending_requests->insert(it, r); 223 } 224 225 // static 226 const ClientSocketPoolBaseHelper::Request* 227 ClientSocketPoolBaseHelper::RemoveRequestFromQueue( 228 const RequestQueue::iterator& it, Group* group) { 229 const Request* req = *it; 230 group->mutable_pending_requests()->erase(it); 231 // If there are no more requests, we kill the backup timer. 232 if (group->pending_requests().empty()) 233 group->CleanupBackupJob(); 234 return req; 235 } 236 237 void ClientSocketPoolBaseHelper::AddLayeredPool(LayeredPool* pool) { 238 CHECK(pool); 239 CHECK(!ContainsKey(higher_layer_pools_, pool)); 240 higher_layer_pools_.insert(pool); 241 } 242 243 void ClientSocketPoolBaseHelper::RemoveLayeredPool(LayeredPool* pool) { 244 CHECK(pool); 245 CHECK(ContainsKey(higher_layer_pools_, pool)); 246 higher_layer_pools_.erase(pool); 247 } 248 249 int ClientSocketPoolBaseHelper::RequestSocket( 250 const std::string& group_name, 251 const Request* request) { 252 CHECK(!request->callback().is_null()); 253 CHECK(request->handle()); 254 255 // Cleanup any timed-out idle sockets if no timer is used. 256 if (!use_cleanup_timer_) 257 CleanupIdleSockets(false); 258 259 request->net_log().BeginEvent(NetLog::TYPE_SOCKET_POOL); 260 Group* group = GetOrCreateGroup(group_name); 261 262 int rv = RequestSocketInternal(group_name, request); 263 if (rv != ERR_IO_PENDING) { 264 request->net_log().EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv); 265 CHECK(!request->handle()->is_initialized()); 266 delete request; 267 } else { 268 InsertRequestIntoQueue(request, group->mutable_pending_requests()); 269 // Have to do this asynchronously, as closing sockets in higher level pools 270 // call back in to |this|, which will cause all sorts of fun and exciting 271 // re-entrancy issues if the socket pool is doing something else at the 272 // time. 273 if (group->IsStalledOnPoolMaxSockets(max_sockets_per_group_)) { 274 base::MessageLoop::current()->PostTask( 275 FROM_HERE, 276 base::Bind( 277 &ClientSocketPoolBaseHelper::TryToCloseSocketsInLayeredPools, 278 weak_factory_.GetWeakPtr())); 279 } 280 } 281 return rv; 282 } 283 284 void ClientSocketPoolBaseHelper::RequestSockets( 285 const std::string& group_name, 286 const Request& request, 287 int num_sockets) { 288 DCHECK(request.callback().is_null()); 289 DCHECK(!request.handle()); 290 291 // Cleanup any timed out idle sockets if no timer is used. 292 if (!use_cleanup_timer_) 293 CleanupIdleSockets(false); 294 295 if (num_sockets > max_sockets_per_group_) { 296 num_sockets = max_sockets_per_group_; 297 } 298 299 request.net_log().BeginEvent( 300 NetLog::TYPE_SOCKET_POOL_CONNECTING_N_SOCKETS, 301 NetLog::IntegerCallback("num_sockets", num_sockets)); 302 303 Group* group = GetOrCreateGroup(group_name); 304 305 // RequestSocketsInternal() may delete the group. 306 bool deleted_group = false; 307 308 int rv = OK; 309 for (int num_iterations_left = num_sockets; 310 group->NumActiveSocketSlots() < num_sockets && 311 num_iterations_left > 0 ; num_iterations_left--) { 312 rv = RequestSocketInternal(group_name, &request); 313 if (rv < 0 && rv != ERR_IO_PENDING) { 314 // We're encountering a synchronous error. Give up. 315 if (!ContainsKey(group_map_, group_name)) 316 deleted_group = true; 317 break; 318 } 319 if (!ContainsKey(group_map_, group_name)) { 320 // Unexpected. The group should only be getting deleted on synchronous 321 // error. 322 NOTREACHED(); 323 deleted_group = true; 324 break; 325 } 326 } 327 328 if (!deleted_group && group->IsEmpty()) 329 RemoveGroup(group_name); 330 331 if (rv == ERR_IO_PENDING) 332 rv = OK; 333 request.net_log().EndEventWithNetErrorCode( 334 NetLog::TYPE_SOCKET_POOL_CONNECTING_N_SOCKETS, rv); 335 } 336 337 int ClientSocketPoolBaseHelper::RequestSocketInternal( 338 const std::string& group_name, 339 const Request* request) { 340 ClientSocketHandle* const handle = request->handle(); 341 const bool preconnecting = !handle; 342 Group* group = GetOrCreateGroup(group_name); 343 344 if (!(request->flags() & NO_IDLE_SOCKETS)) { 345 // Try to reuse a socket. 346 if (AssignIdleSocketToRequest(request, group)) 347 return OK; 348 } 349 350 // If there are more ConnectJobs than pending requests, don't need to do 351 // anything. Can just wait for the extra job to connect, and then assign it 352 // to the request. 353 if (!preconnecting && group->TryToUseUnassignedConnectJob()) 354 return ERR_IO_PENDING; 355 356 // Can we make another active socket now? 357 if (!group->HasAvailableSocketSlot(max_sockets_per_group_) && 358 !request->ignore_limits()) { 359 // TODO(willchan): Consider whether or not we need to close a socket in a 360 // higher layered group. I don't think this makes sense since we would just 361 // reuse that socket then if we needed one and wouldn't make it down to this 362 // layer. 363 request->net_log().AddEvent( 364 NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS_PER_GROUP); 365 return ERR_IO_PENDING; 366 } 367 368 if (ReachedMaxSocketsLimit() && !request->ignore_limits()) { 369 // NOTE(mmenke): Wonder if we really need different code for each case 370 // here. Only reason for them now seems to be preconnects. 371 if (idle_socket_count() > 0) { 372 // There's an idle socket in this pool. Either that's because there's 373 // still one in this group, but we got here due to preconnecting bypassing 374 // idle sockets, or because there's an idle socket in another group. 375 bool closed = CloseOneIdleSocketExceptInGroup(group); 376 if (preconnecting && !closed) 377 return ERR_PRECONNECT_MAX_SOCKET_LIMIT; 378 } else { 379 // We could check if we really have a stalled group here, but it requires 380 // a scan of all groups, so just flip a flag here, and do the check later. 381 request->net_log().AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS); 382 return ERR_IO_PENDING; 383 } 384 } 385 386 // We couldn't find a socket to reuse, and there's space to allocate one, 387 // so allocate and connect a new one. 388 scoped_ptr<ConnectJob> connect_job( 389 connect_job_factory_->NewConnectJob(group_name, *request, this)); 390 391 int rv = connect_job->Connect(); 392 if (rv == OK) { 393 LogBoundConnectJobToRequest(connect_job->net_log().source(), request); 394 if (!preconnecting) { 395 HandOutSocket(connect_job->ReleaseSocket(), false /* not reused */, 396 connect_job->connect_timing(), handle, base::TimeDelta(), 397 group, request->net_log()); 398 } else { 399 AddIdleSocket(connect_job->ReleaseSocket(), group); 400 } 401 } else if (rv == ERR_IO_PENDING) { 402 // If we don't have any sockets in this group, set a timer for potentially 403 // creating a new one. If the SYN is lost, this backup socket may complete 404 // before the slow socket, improving end user latency. 405 if (connect_backup_jobs_enabled_ && 406 group->IsEmpty() && !group->HasBackupJob()) { 407 group->StartBackupSocketTimer(group_name, this); 408 } 409 410 connecting_socket_count_++; 411 412 group->AddJob(connect_job.release(), preconnecting); 413 } else { 414 LogBoundConnectJobToRequest(connect_job->net_log().source(), request); 415 StreamSocket* error_socket = NULL; 416 if (!preconnecting) { 417 DCHECK(handle); 418 connect_job->GetAdditionalErrorState(handle); 419 error_socket = connect_job->ReleaseSocket(); 420 } 421 if (error_socket) { 422 HandOutSocket(error_socket, false /* not reused */, 423 connect_job->connect_timing(), handle, base::TimeDelta(), 424 group, request->net_log()); 425 } else if (group->IsEmpty()) { 426 RemoveGroup(group_name); 427 } 428 } 429 430 return rv; 431 } 432 433 bool ClientSocketPoolBaseHelper::AssignIdleSocketToRequest( 434 const Request* request, Group* group) { 435 std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets(); 436 std::list<IdleSocket>::iterator idle_socket_it = idle_sockets->end(); 437 438 // Iterate through the idle sockets forwards (oldest to newest) 439 // * Delete any disconnected ones. 440 // * If we find a used idle socket, assign to |idle_socket|. At the end, 441 // the |idle_socket_it| will be set to the newest used idle socket. 442 for (std::list<IdleSocket>::iterator it = idle_sockets->begin(); 443 it != idle_sockets->end();) { 444 if (!it->socket->IsConnectedAndIdle()) { 445 DecrementIdleCount(); 446 delete it->socket; 447 it = idle_sockets->erase(it); 448 continue; 449 } 450 451 if (it->socket->WasEverUsed()) { 452 // We found one we can reuse! 453 idle_socket_it = it; 454 } 455 456 ++it; 457 } 458 459 // If we haven't found an idle socket, that means there are no used idle 460 // sockets. Pick the oldest (first) idle socket (FIFO). 461 462 if (idle_socket_it == idle_sockets->end() && !idle_sockets->empty()) 463 idle_socket_it = idle_sockets->begin(); 464 465 if (idle_socket_it != idle_sockets->end()) { 466 DecrementIdleCount(); 467 base::TimeDelta idle_time = 468 base::TimeTicks::Now() - idle_socket_it->start_time; 469 IdleSocket idle_socket = *idle_socket_it; 470 idle_sockets->erase(idle_socket_it); 471 HandOutSocket( 472 idle_socket.socket, 473 idle_socket.socket->WasEverUsed(), 474 LoadTimingInfo::ConnectTiming(), 475 request->handle(), 476 idle_time, 477 group, 478 request->net_log()); 479 return true; 480 } 481 482 return false; 483 } 484 485 // static 486 void ClientSocketPoolBaseHelper::LogBoundConnectJobToRequest( 487 const NetLog::Source& connect_job_source, const Request* request) { 488 request->net_log().AddEvent(NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB, 489 connect_job_source.ToEventParametersCallback()); 490 } 491 492 void ClientSocketPoolBaseHelper::CancelRequest( 493 const std::string& group_name, ClientSocketHandle* handle) { 494 PendingCallbackMap::iterator callback_it = pending_callback_map_.find(handle); 495 if (callback_it != pending_callback_map_.end()) { 496 int result = callback_it->second.result; 497 pending_callback_map_.erase(callback_it); 498 StreamSocket* socket = handle->release_socket(); 499 if (socket) { 500 if (result != OK) 501 socket->Disconnect(); 502 ReleaseSocket(handle->group_name(), socket, handle->id()); 503 } 504 return; 505 } 506 507 CHECK(ContainsKey(group_map_, group_name)); 508 509 Group* group = GetOrCreateGroup(group_name); 510 511 // Search pending_requests for matching handle. 512 RequestQueue::iterator it = group->mutable_pending_requests()->begin(); 513 for (; it != group->pending_requests().end(); ++it) { 514 if ((*it)->handle() == handle) { 515 scoped_ptr<const Request> req(RemoveRequestFromQueue(it, group)); 516 req->net_log().AddEvent(NetLog::TYPE_CANCELLED); 517 req->net_log().EndEvent(NetLog::TYPE_SOCKET_POOL); 518 519 // We let the job run, unless we're at the socket limit and there is 520 // not another request waiting on the job. 521 if (group->jobs().size() > group->pending_requests().size() && 522 ReachedMaxSocketsLimit()) { 523 RemoveConnectJob(*group->jobs().begin(), group); 524 CheckForStalledSocketGroups(); 525 } 526 break; 527 } 528 } 529 } 530 531 bool ClientSocketPoolBaseHelper::HasGroup(const std::string& group_name) const { 532 return ContainsKey(group_map_, group_name); 533 } 534 535 void ClientSocketPoolBaseHelper::CloseIdleSockets() { 536 CleanupIdleSockets(true); 537 DCHECK_EQ(0, idle_socket_count_); 538 } 539 540 int ClientSocketPoolBaseHelper::IdleSocketCountInGroup( 541 const std::string& group_name) const { 542 GroupMap::const_iterator i = group_map_.find(group_name); 543 CHECK(i != group_map_.end()); 544 545 return i->second->idle_sockets().size(); 546 } 547 548 LoadState ClientSocketPoolBaseHelper::GetLoadState( 549 const std::string& group_name, 550 const ClientSocketHandle* handle) const { 551 if (ContainsKey(pending_callback_map_, handle)) 552 return LOAD_STATE_CONNECTING; 553 554 if (!ContainsKey(group_map_, group_name)) { 555 NOTREACHED() << "ClientSocketPool does not contain group: " << group_name 556 << " for handle: " << handle; 557 return LOAD_STATE_IDLE; 558 } 559 560 // Can't use operator[] since it is non-const. 561 const Group& group = *group_map_.find(group_name)->second; 562 563 // Search the first group.jobs().size() |pending_requests| for |handle|. 564 // If it's farther back in the deque than that, it doesn't have a 565 // corresponding ConnectJob. 566 size_t connect_jobs = group.jobs().size(); 567 RequestQueue::const_iterator it = group.pending_requests().begin(); 568 for (size_t i = 0; it != group.pending_requests().end() && i < connect_jobs; 569 ++it, ++i) { 570 if ((*it)->handle() != handle) 571 continue; 572 573 // Just return the state of the farthest along ConnectJob for the first 574 // group.jobs().size() pending requests. 575 LoadState max_state = LOAD_STATE_IDLE; 576 for (ConnectJobSet::const_iterator job_it = group.jobs().begin(); 577 job_it != group.jobs().end(); ++job_it) { 578 max_state = std::max(max_state, (*job_it)->GetLoadState()); 579 } 580 return max_state; 581 } 582 583 if (group.IsStalledOnPoolMaxSockets(max_sockets_per_group_)) 584 return LOAD_STATE_WAITING_FOR_STALLED_SOCKET_POOL; 585 return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET; 586 } 587 588 base::DictionaryValue* ClientSocketPoolBaseHelper::GetInfoAsValue( 589 const std::string& name, const std::string& type) const { 590 base::DictionaryValue* dict = new base::DictionaryValue(); 591 dict->SetString("name", name); 592 dict->SetString("type", type); 593 dict->SetInteger("handed_out_socket_count", handed_out_socket_count_); 594 dict->SetInteger("connecting_socket_count", connecting_socket_count_); 595 dict->SetInteger("idle_socket_count", idle_socket_count_); 596 dict->SetInteger("max_socket_count", max_sockets_); 597 dict->SetInteger("max_sockets_per_group", max_sockets_per_group_); 598 dict->SetInteger("pool_generation_number", pool_generation_number_); 599 600 if (group_map_.empty()) 601 return dict; 602 603 base::DictionaryValue* all_groups_dict = new base::DictionaryValue(); 604 for (GroupMap::const_iterator it = group_map_.begin(); 605 it != group_map_.end(); it++) { 606 const Group* group = it->second; 607 base::DictionaryValue* group_dict = new base::DictionaryValue(); 608 609 group_dict->SetInteger("pending_request_count", 610 group->pending_requests().size()); 611 if (!group->pending_requests().empty()) { 612 group_dict->SetInteger("top_pending_priority", 613 group->TopPendingPriority()); 614 } 615 616 group_dict->SetInteger("active_socket_count", group->active_socket_count()); 617 618 base::ListValue* idle_socket_list = new base::ListValue(); 619 std::list<IdleSocket>::const_iterator idle_socket; 620 for (idle_socket = group->idle_sockets().begin(); 621 idle_socket != group->idle_sockets().end(); 622 idle_socket++) { 623 int source_id = idle_socket->socket->NetLog().source().id; 624 idle_socket_list->Append(new base::FundamentalValue(source_id)); 625 } 626 group_dict->Set("idle_sockets", idle_socket_list); 627 628 base::ListValue* connect_jobs_list = new base::ListValue(); 629 std::set<ConnectJob*>::const_iterator job = group->jobs().begin(); 630 for (job = group->jobs().begin(); job != group->jobs().end(); job++) { 631 int source_id = (*job)->net_log().source().id; 632 connect_jobs_list->Append(new base::FundamentalValue(source_id)); 633 } 634 group_dict->Set("connect_jobs", connect_jobs_list); 635 636 group_dict->SetBoolean("is_stalled", 637 group->IsStalledOnPoolMaxSockets( 638 max_sockets_per_group_)); 639 group_dict->SetBoolean("has_backup_job", group->HasBackupJob()); 640 641 all_groups_dict->SetWithoutPathExpansion(it->first, group_dict); 642 } 643 dict->Set("groups", all_groups_dict); 644 return dict; 645 } 646 647 bool ClientSocketPoolBaseHelper::IdleSocket::ShouldCleanup( 648 base::TimeTicks now, 649 base::TimeDelta timeout) const { 650 bool timed_out = (now - start_time) >= timeout; 651 if (timed_out) 652 return true; 653 if (socket->WasEverUsed()) 654 return !socket->IsConnectedAndIdle(); 655 return !socket->IsConnected(); 656 } 657 658 void ClientSocketPoolBaseHelper::CleanupIdleSockets(bool force) { 659 if (idle_socket_count_ == 0) 660 return; 661 662 // Current time value. Retrieving it once at the function start rather than 663 // inside the inner loop, since it shouldn't change by any meaningful amount. 664 base::TimeTicks now = base::TimeTicks::Now(); 665 666 GroupMap::iterator i = group_map_.begin(); 667 while (i != group_map_.end()) { 668 Group* group = i->second; 669 670 std::list<IdleSocket>::iterator j = group->mutable_idle_sockets()->begin(); 671 while (j != group->idle_sockets().end()) { 672 base::TimeDelta timeout = 673 j->socket->WasEverUsed() ? 674 used_idle_socket_timeout_ : unused_idle_socket_timeout_; 675 if (force || j->ShouldCleanup(now, timeout)) { 676 delete j->socket; 677 j = group->mutable_idle_sockets()->erase(j); 678 DecrementIdleCount(); 679 } else { 680 ++j; 681 } 682 } 683 684 // Delete group if no longer needed. 685 if (group->IsEmpty()) { 686 RemoveGroup(i++); 687 } else { 688 ++i; 689 } 690 } 691 } 692 693 ClientSocketPoolBaseHelper::Group* ClientSocketPoolBaseHelper::GetOrCreateGroup( 694 const std::string& group_name) { 695 GroupMap::iterator it = group_map_.find(group_name); 696 if (it != group_map_.end()) 697 return it->second; 698 Group* group = new Group; 699 group_map_[group_name] = group; 700 return group; 701 } 702 703 void ClientSocketPoolBaseHelper::RemoveGroup(const std::string& group_name) { 704 GroupMap::iterator it = group_map_.find(group_name); 705 CHECK(it != group_map_.end()); 706 707 RemoveGroup(it); 708 } 709 710 void ClientSocketPoolBaseHelper::RemoveGroup(GroupMap::iterator it) { 711 delete it->second; 712 group_map_.erase(it); 713 } 714 715 // static 716 bool ClientSocketPoolBaseHelper::connect_backup_jobs_enabled() { 717 return g_connect_backup_jobs_enabled; 718 } 719 720 // static 721 bool ClientSocketPoolBaseHelper::set_connect_backup_jobs_enabled(bool enabled) { 722 bool old_value = g_connect_backup_jobs_enabled; 723 g_connect_backup_jobs_enabled = enabled; 724 return old_value; 725 } 726 727 void ClientSocketPoolBaseHelper::EnableConnectBackupJobs() { 728 connect_backup_jobs_enabled_ = g_connect_backup_jobs_enabled; 729 } 730 731 void ClientSocketPoolBaseHelper::IncrementIdleCount() { 732 if (++idle_socket_count_ == 1 && use_cleanup_timer_) 733 StartIdleSocketTimer(); 734 } 735 736 void ClientSocketPoolBaseHelper::DecrementIdleCount() { 737 if (--idle_socket_count_ == 0) 738 timer_.Stop(); 739 } 740 741 // static 742 bool ClientSocketPoolBaseHelper::cleanup_timer_enabled() { 743 return g_cleanup_timer_enabled; 744 } 745 746 // static 747 bool ClientSocketPoolBaseHelper::set_cleanup_timer_enabled(bool enabled) { 748 bool old_value = g_cleanup_timer_enabled; 749 g_cleanup_timer_enabled = enabled; 750 return old_value; 751 } 752 753 void ClientSocketPoolBaseHelper::StartIdleSocketTimer() { 754 timer_.Start(FROM_HERE, TimeDelta::FromSeconds(kCleanupInterval), this, 755 &ClientSocketPoolBaseHelper::OnCleanupTimerFired); 756 } 757 758 void ClientSocketPoolBaseHelper::ReleaseSocket(const std::string& group_name, 759 StreamSocket* socket, 760 int id) { 761 GroupMap::iterator i = group_map_.find(group_name); 762 CHECK(i != group_map_.end()); 763 764 Group* group = i->second; 765 766 CHECK_GT(handed_out_socket_count_, 0); 767 handed_out_socket_count_--; 768 769 CHECK_GT(group->active_socket_count(), 0); 770 group->DecrementActiveSocketCount(); 771 772 const bool can_reuse = socket->IsConnectedAndIdle() && 773 id == pool_generation_number_; 774 if (can_reuse) { 775 // Add it to the idle list. 776 AddIdleSocket(socket, group); 777 OnAvailableSocketSlot(group_name, group); 778 } else { 779 delete socket; 780 } 781 782 CheckForStalledSocketGroups(); 783 } 784 785 void ClientSocketPoolBaseHelper::CheckForStalledSocketGroups() { 786 // If we have idle sockets, see if we can give one to the top-stalled group. 787 std::string top_group_name; 788 Group* top_group = NULL; 789 if (!FindTopStalledGroup(&top_group, &top_group_name)) 790 return; 791 792 if (ReachedMaxSocketsLimit()) { 793 if (idle_socket_count() > 0) { 794 CloseOneIdleSocket(); 795 } else { 796 // We can't activate more sockets since we're already at our global 797 // limit. 798 return; 799 } 800 } 801 802 // Note: we don't loop on waking stalled groups. If the stalled group is at 803 // its limit, may be left with other stalled groups that could be 804 // woken. This isn't optimal, but there is no starvation, so to avoid 805 // the looping we leave it at this. 806 OnAvailableSocketSlot(top_group_name, top_group); 807 } 808 809 // Search for the highest priority pending request, amongst the groups that 810 // are not at the |max_sockets_per_group_| limit. Note: for requests with 811 // the same priority, the winner is based on group hash ordering (and not 812 // insertion order). 813 bool ClientSocketPoolBaseHelper::FindTopStalledGroup( 814 Group** group, 815 std::string* group_name) const { 816 CHECK((group && group_name) || (!group && !group_name)); 817 Group* top_group = NULL; 818 const std::string* top_group_name = NULL; 819 bool has_stalled_group = false; 820 for (GroupMap::const_iterator i = group_map_.begin(); 821 i != group_map_.end(); ++i) { 822 Group* curr_group = i->second; 823 const RequestQueue& queue = curr_group->pending_requests(); 824 if (queue.empty()) 825 continue; 826 if (curr_group->IsStalledOnPoolMaxSockets(max_sockets_per_group_)) { 827 if (!group) 828 return true; 829 has_stalled_group = true; 830 bool has_higher_priority = !top_group || 831 curr_group->TopPendingPriority() > top_group->TopPendingPriority(); 832 if (has_higher_priority) { 833 top_group = curr_group; 834 top_group_name = &i->first; 835 } 836 } 837 } 838 839 if (top_group) { 840 CHECK(group); 841 *group = top_group; 842 *group_name = *top_group_name; 843 } else { 844 CHECK(!has_stalled_group); 845 } 846 return has_stalled_group; 847 } 848 849 void ClientSocketPoolBaseHelper::OnConnectJobComplete( 850 int result, ConnectJob* job) { 851 DCHECK_NE(ERR_IO_PENDING, result); 852 const std::string group_name = job->group_name(); 853 GroupMap::iterator group_it = group_map_.find(group_name); 854 CHECK(group_it != group_map_.end()); 855 Group* group = group_it->second; 856 857 scoped_ptr<StreamSocket> socket(job->ReleaseSocket()); 858 859 // Copies of these are needed because |job| may be deleted before they are 860 // accessed. 861 BoundNetLog job_log = job->net_log(); 862 LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing(); 863 864 if (result == OK) { 865 DCHECK(socket.get()); 866 RemoveConnectJob(job, group); 867 if (!group->pending_requests().empty()) { 868 scoped_ptr<const Request> r(RemoveRequestFromQueue( 869 group->mutable_pending_requests()->begin(), group)); 870 LogBoundConnectJobToRequest(job_log.source(), r.get()); 871 HandOutSocket( 872 socket.release(), false /* unused socket */, connect_timing, 873 r->handle(), base::TimeDelta(), group, r->net_log()); 874 r->net_log().EndEvent(NetLog::TYPE_SOCKET_POOL); 875 InvokeUserCallbackLater(r->handle(), r->callback(), result); 876 } else { 877 AddIdleSocket(socket.release(), group); 878 OnAvailableSocketSlot(group_name, group); 879 CheckForStalledSocketGroups(); 880 } 881 } else { 882 // If we got a socket, it must contain error information so pass that 883 // up so that the caller can retrieve it. 884 bool handed_out_socket = false; 885 if (!group->pending_requests().empty()) { 886 scoped_ptr<const Request> r(RemoveRequestFromQueue( 887 group->mutable_pending_requests()->begin(), group)); 888 LogBoundConnectJobToRequest(job_log.source(), r.get()); 889 job->GetAdditionalErrorState(r->handle()); 890 RemoveConnectJob(job, group); 891 if (socket.get()) { 892 handed_out_socket = true; 893 HandOutSocket(socket.release(), false /* unused socket */, 894 connect_timing, r->handle(), base::TimeDelta(), group, 895 r->net_log()); 896 } 897 r->net_log().EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result); 898 InvokeUserCallbackLater(r->handle(), r->callback(), result); 899 } else { 900 RemoveConnectJob(job, group); 901 } 902 if (!handed_out_socket) { 903 OnAvailableSocketSlot(group_name, group); 904 CheckForStalledSocketGroups(); 905 } 906 } 907 } 908 909 void ClientSocketPoolBaseHelper::OnIPAddressChanged() { 910 FlushWithError(ERR_NETWORK_CHANGED); 911 } 912 913 void ClientSocketPoolBaseHelper::FlushWithError(int error) { 914 pool_generation_number_++; 915 CancelAllConnectJobs(); 916 CloseIdleSockets(); 917 CancelAllRequestsWithError(error); 918 } 919 920 bool ClientSocketPoolBaseHelper::IsStalled() const { 921 // If we are not using |max_sockets_|, then clearly we are not stalled 922 if ((handed_out_socket_count_ + connecting_socket_count_) < max_sockets_) 923 return false; 924 // So in order to be stalled we need to be using |max_sockets_| AND 925 // we need to have a request that is actually stalled on the global 926 // socket limit. To find such a request, we look for a group that 927 // a has more requests that jobs AND where the number of jobs is less 928 // than |max_sockets_per_group_|. (If the number of jobs is equal to 929 // |max_sockets_per_group_|, then the request is stalled on the group, 930 // which does not count.) 931 for (GroupMap::const_iterator it = group_map_.begin(); 932 it != group_map_.end(); ++it) { 933 if (it->second->IsStalledOnPoolMaxSockets(max_sockets_per_group_)) 934 return true; 935 } 936 return false; 937 } 938 939 void ClientSocketPoolBaseHelper::RemoveConnectJob(ConnectJob* job, 940 Group* group) { 941 CHECK_GT(connecting_socket_count_, 0); 942 connecting_socket_count_--; 943 944 DCHECK(group); 945 DCHECK(ContainsKey(group->jobs(), job)); 946 group->RemoveJob(job); 947 948 // If we've got no more jobs for this group, then we no longer need a 949 // backup job either. 950 if (group->jobs().empty()) 951 group->CleanupBackupJob(); 952 953 DCHECK(job); 954 delete job; 955 } 956 957 void ClientSocketPoolBaseHelper::OnAvailableSocketSlot( 958 const std::string& group_name, Group* group) { 959 DCHECK(ContainsKey(group_map_, group_name)); 960 if (group->IsEmpty()) 961 RemoveGroup(group_name); 962 else if (!group->pending_requests().empty()) 963 ProcessPendingRequest(group_name, group); 964 } 965 966 void ClientSocketPoolBaseHelper::ProcessPendingRequest( 967 const std::string& group_name, Group* group) { 968 int rv = RequestSocketInternal(group_name, 969 *group->pending_requests().begin()); 970 if (rv != ERR_IO_PENDING) { 971 scoped_ptr<const Request> request(RemoveRequestFromQueue( 972 group->mutable_pending_requests()->begin(), group)); 973 if (group->IsEmpty()) 974 RemoveGroup(group_name); 975 976 request->net_log().EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv); 977 InvokeUserCallbackLater(request->handle(), request->callback(), rv); 978 } 979 } 980 981 void ClientSocketPoolBaseHelper::HandOutSocket( 982 StreamSocket* socket, 983 bool reused, 984 const LoadTimingInfo::ConnectTiming& connect_timing, 985 ClientSocketHandle* handle, 986 base::TimeDelta idle_time, 987 Group* group, 988 const BoundNetLog& net_log) { 989 DCHECK(socket); 990 handle->set_socket(socket); 991 handle->set_is_reused(reused); 992 handle->set_idle_time(idle_time); 993 handle->set_pool_id(pool_generation_number_); 994 handle->set_connect_timing(connect_timing); 995 996 if (reused) { 997 net_log.AddEvent( 998 NetLog::TYPE_SOCKET_POOL_REUSED_AN_EXISTING_SOCKET, 999 NetLog::IntegerCallback( 1000 "idle_ms", static_cast<int>(idle_time.InMilliseconds()))); 1001 } 1002 1003 net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET, 1004 socket->NetLog().source().ToEventParametersCallback()); 1005 1006 handed_out_socket_count_++; 1007 group->IncrementActiveSocketCount(); 1008 } 1009 1010 void ClientSocketPoolBaseHelper::AddIdleSocket( 1011 StreamSocket* socket, Group* group) { 1012 DCHECK(socket); 1013 IdleSocket idle_socket; 1014 idle_socket.socket = socket; 1015 idle_socket.start_time = base::TimeTicks::Now(); 1016 1017 group->mutable_idle_sockets()->push_back(idle_socket); 1018 IncrementIdleCount(); 1019 } 1020 1021 void ClientSocketPoolBaseHelper::CancelAllConnectJobs() { 1022 for (GroupMap::iterator i = group_map_.begin(); i != group_map_.end();) { 1023 Group* group = i->second; 1024 connecting_socket_count_ -= group->jobs().size(); 1025 group->RemoveAllJobs(); 1026 1027 // Delete group if no longer needed. 1028 if (group->IsEmpty()) { 1029 // RemoveGroup() will call .erase() which will invalidate the iterator, 1030 // but i will already have been incremented to a valid iterator before 1031 // RemoveGroup() is called. 1032 RemoveGroup(i++); 1033 } else { 1034 ++i; 1035 } 1036 } 1037 DCHECK_EQ(0, connecting_socket_count_); 1038 } 1039 1040 void ClientSocketPoolBaseHelper::CancelAllRequestsWithError(int error) { 1041 for (GroupMap::iterator i = group_map_.begin(); i != group_map_.end();) { 1042 Group* group = i->second; 1043 1044 RequestQueue pending_requests; 1045 pending_requests.swap(*group->mutable_pending_requests()); 1046 for (RequestQueue::iterator it2 = pending_requests.begin(); 1047 it2 != pending_requests.end(); ++it2) { 1048 scoped_ptr<const Request> request(*it2); 1049 InvokeUserCallbackLater( 1050 request->handle(), request->callback(), error); 1051 } 1052 1053 // Delete group if no longer needed. 1054 if (group->IsEmpty()) { 1055 // RemoveGroup() will call .erase() which will invalidate the iterator, 1056 // but i will already have been incremented to a valid iterator before 1057 // RemoveGroup() is called. 1058 RemoveGroup(i++); 1059 } else { 1060 ++i; 1061 } 1062 } 1063 } 1064 1065 bool ClientSocketPoolBaseHelper::ReachedMaxSocketsLimit() const { 1066 // Each connecting socket will eventually connect and be handed out. 1067 int total = handed_out_socket_count_ + connecting_socket_count_ + 1068 idle_socket_count(); 1069 // There can be more sockets than the limit since some requests can ignore 1070 // the limit 1071 if (total < max_sockets_) 1072 return false; 1073 return true; 1074 } 1075 1076 bool ClientSocketPoolBaseHelper::CloseOneIdleSocket() { 1077 if (idle_socket_count() == 0) 1078 return false; 1079 return CloseOneIdleSocketExceptInGroup(NULL); 1080 } 1081 1082 bool ClientSocketPoolBaseHelper::CloseOneIdleSocketExceptInGroup( 1083 const Group* exception_group) { 1084 CHECK_GT(idle_socket_count(), 0); 1085 1086 for (GroupMap::iterator i = group_map_.begin(); i != group_map_.end(); ++i) { 1087 Group* group = i->second; 1088 if (exception_group == group) 1089 continue; 1090 std::list<IdleSocket>* idle_sockets = group->mutable_idle_sockets(); 1091 1092 if (!idle_sockets->empty()) { 1093 delete idle_sockets->front().socket; 1094 idle_sockets->pop_front(); 1095 DecrementIdleCount(); 1096 if (group->IsEmpty()) 1097 RemoveGroup(i); 1098 1099 return true; 1100 } 1101 } 1102 1103 return false; 1104 } 1105 1106 bool ClientSocketPoolBaseHelper::CloseOneIdleConnectionInLayeredPool() { 1107 // This pool doesn't have any idle sockets. It's possible that a pool at a 1108 // higher layer is holding one of this sockets active, but it's actually idle. 1109 // Query the higher layers. 1110 for (std::set<LayeredPool*>::const_iterator it = higher_layer_pools_.begin(); 1111 it != higher_layer_pools_.end(); ++it) { 1112 if ((*it)->CloseOneIdleConnection()) 1113 return true; 1114 } 1115 return false; 1116 } 1117 1118 void ClientSocketPoolBaseHelper::InvokeUserCallbackLater( 1119 ClientSocketHandle* handle, const CompletionCallback& callback, int rv) { 1120 CHECK(!ContainsKey(pending_callback_map_, handle)); 1121 pending_callback_map_[handle] = CallbackResultPair(callback, rv); 1122 base::MessageLoop::current()->PostTask( 1123 FROM_HERE, 1124 base::Bind(&ClientSocketPoolBaseHelper::InvokeUserCallback, 1125 weak_factory_.GetWeakPtr(), handle)); 1126 } 1127 1128 void ClientSocketPoolBaseHelper::InvokeUserCallback( 1129 ClientSocketHandle* handle) { 1130 PendingCallbackMap::iterator it = pending_callback_map_.find(handle); 1131 1132 // Exit if the request has already been cancelled. 1133 if (it == pending_callback_map_.end()) 1134 return; 1135 1136 CHECK(!handle->is_initialized()); 1137 CompletionCallback callback = it->second.callback; 1138 int result = it->second.result; 1139 pending_callback_map_.erase(it); 1140 callback.Run(result); 1141 } 1142 1143 void ClientSocketPoolBaseHelper::TryToCloseSocketsInLayeredPools() { 1144 while (IsStalled()) { 1145 // Closing a socket will result in calling back into |this| to use the freed 1146 // socket slot, so nothing else is needed. 1147 if (!CloseOneIdleConnectionInLayeredPool()) 1148 return; 1149 } 1150 } 1151 1152 ClientSocketPoolBaseHelper::Group::Group() 1153 : unassigned_job_count_(0), 1154 active_socket_count_(0), 1155 weak_factory_(this) {} 1156 1157 ClientSocketPoolBaseHelper::Group::~Group() { 1158 CleanupBackupJob(); 1159 DCHECK_EQ(0u, unassigned_job_count_); 1160 } 1161 1162 void ClientSocketPoolBaseHelper::Group::StartBackupSocketTimer( 1163 const std::string& group_name, 1164 ClientSocketPoolBaseHelper* pool) { 1165 // Only allow one timer pending to create a backup socket. 1166 if (weak_factory_.HasWeakPtrs()) 1167 return; 1168 1169 base::MessageLoop::current()->PostDelayedTask( 1170 FROM_HERE, 1171 base::Bind(&Group::OnBackupSocketTimerFired, weak_factory_.GetWeakPtr(), 1172 group_name, pool), 1173 pool->ConnectRetryInterval()); 1174 } 1175 1176 bool ClientSocketPoolBaseHelper::Group::TryToUseUnassignedConnectJob() { 1177 SanityCheck(); 1178 1179 if (unassigned_job_count_ == 0) 1180 return false; 1181 --unassigned_job_count_; 1182 return true; 1183 } 1184 1185 void ClientSocketPoolBaseHelper::Group::AddJob(ConnectJob* job, 1186 bool is_preconnect) { 1187 SanityCheck(); 1188 1189 if (is_preconnect) 1190 ++unassigned_job_count_; 1191 jobs_.insert(job); 1192 } 1193 1194 void ClientSocketPoolBaseHelper::Group::RemoveJob(ConnectJob* job) { 1195 SanityCheck(); 1196 1197 jobs_.erase(job); 1198 size_t job_count = jobs_.size(); 1199 if (job_count < unassigned_job_count_) 1200 unassigned_job_count_ = job_count; 1201 } 1202 1203 void ClientSocketPoolBaseHelper::Group::OnBackupSocketTimerFired( 1204 std::string group_name, 1205 ClientSocketPoolBaseHelper* pool) { 1206 // If there are no more jobs pending, there is no work to do. 1207 // If we've done our cleanups correctly, this should not happen. 1208 if (jobs_.empty()) { 1209 NOTREACHED(); 1210 return; 1211 } 1212 1213 // If our old job is waiting on DNS, or if we can't create any sockets 1214 // right now due to limits, just reset the timer. 1215 if (pool->ReachedMaxSocketsLimit() || 1216 !HasAvailableSocketSlot(pool->max_sockets_per_group_) || 1217 (*jobs_.begin())->GetLoadState() == LOAD_STATE_RESOLVING_HOST) { 1218 StartBackupSocketTimer(group_name, pool); 1219 return; 1220 } 1221 1222 if (pending_requests_.empty()) 1223 return; 1224 1225 ConnectJob* backup_job = pool->connect_job_factory_->NewConnectJob( 1226 group_name, **pending_requests_.begin(), pool); 1227 backup_job->net_log().AddEvent(NetLog::TYPE_SOCKET_BACKUP_CREATED); 1228 SIMPLE_STATS_COUNTER("socket.backup_created"); 1229 int rv = backup_job->Connect(); 1230 pool->connecting_socket_count_++; 1231 AddJob(backup_job, false); 1232 if (rv != ERR_IO_PENDING) 1233 pool->OnConnectJobComplete(rv, backup_job); 1234 } 1235 1236 void ClientSocketPoolBaseHelper::Group::SanityCheck() { 1237 DCHECK_LE(unassigned_job_count_, jobs_.size()); 1238 } 1239 1240 void ClientSocketPoolBaseHelper::Group::RemoveAllJobs() { 1241 SanityCheck(); 1242 1243 // Delete active jobs. 1244 STLDeleteElements(&jobs_); 1245 unassigned_job_count_ = 0; 1246 1247 // Cancel pending backup job. 1248 weak_factory_.InvalidateWeakPtrs(); 1249 } 1250 1251 } // namespace internal 1252 1253 } // namespace net 1254