1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/socket/client_socket_pool_base.h" 6 7 #include "base/compiler_specific.h" 8 #include "base/message_loop.h" 9 #include "base/stl_util-inl.h" 10 #include "base/time.h" 11 #include "net/base/load_log.h" 12 #include "net/base/net_errors.h" 13 #include "net/socket/client_socket_handle.h" 14 15 using base::TimeDelta; 16 17 namespace { 18 19 // The timeout value, in seconds, used to clean up idle sockets that can't be 20 // reused. 21 // 22 // Note: It's important to close idle sockets that have received data as soon 23 // as possible because the received data may cause BSOD on Windows XP under 24 // some conditions. See http://crbug.com/4606. 25 const int kCleanupInterval = 10; // DO NOT INCREASE THIS TIMEOUT. 26 27 // The maximum size of the ConnectJob's LoadLog. 28 const int kMaxNumLoadLogEntries = 50; 29 30 } // namespace 31 32 namespace net { 33 34 ConnectJob::ConnectJob(const std::string& group_name, 35 base::TimeDelta timeout_duration, 36 Delegate* delegate, 37 LoadLog* load_log) 38 : group_name_(group_name), 39 timeout_duration_(timeout_duration), 40 delegate_(delegate), 41 load_log_(load_log) { 42 DCHECK(!group_name.empty()); 43 DCHECK(delegate); 44 } 45 46 ConnectJob::~ConnectJob() { 47 if (delegate_) { 48 // If the delegate was not NULLed, then NotifyDelegateOfCompletion has 49 // not been called yet (hence we are cancelling). 50 LoadLog::AddEvent(load_log_, LoadLog::TYPE_CANCELLED); 51 LoadLog::EndEvent(load_log_, LoadLog::TYPE_SOCKET_POOL_CONNECT_JOB); 52 } 53 } 54 55 int ConnectJob::Connect() { 56 if (timeout_duration_ != base::TimeDelta()) 57 timer_.Start(timeout_duration_, this, &ConnectJob::OnTimeout); 58 59 LoadLog::BeginEvent(load_log_, LoadLog::TYPE_SOCKET_POOL_CONNECT_JOB); 60 61 int rv = ConnectInternal(); 62 63 if (rv != ERR_IO_PENDING) { 64 delegate_ = NULL; 65 LoadLog::EndEvent(load_log_, LoadLog::TYPE_SOCKET_POOL_CONNECT_JOB); 66 } 67 68 return rv; 69 } 70 71 void ConnectJob::NotifyDelegateOfCompletion(int rv) { 72 // The delegate will delete |this|. 73 Delegate *delegate = delegate_; 74 delegate_ = NULL; 75 76 LoadLog::EndEvent(load_log_, LoadLog::TYPE_SOCKET_POOL_CONNECT_JOB); 77 78 delegate->OnConnectJobComplete(rv, this); 79 } 80 81 void ConnectJob::OnTimeout() { 82 // Make sure the socket is NULL before calling into |delegate|. 83 set_socket(NULL); 84 85 LoadLog::AddEvent(load_log_, 86 LoadLog::TYPE_SOCKET_POOL_CONNECT_JOB_TIMED_OUT); 87 88 NotifyDelegateOfCompletion(ERR_TIMED_OUT); 89 } 90 91 namespace internal { 92 93 ClientSocketPoolBaseHelper::ClientSocketPoolBaseHelper( 94 int max_sockets, 95 int max_sockets_per_group, 96 base::TimeDelta unused_idle_socket_timeout, 97 base::TimeDelta used_idle_socket_timeout, 98 ConnectJobFactory* connect_job_factory, 99 NetworkChangeNotifier* network_change_notifier) 100 : idle_socket_count_(0), 101 connecting_socket_count_(0), 102 handed_out_socket_count_(0), 103 max_sockets_(max_sockets), 104 max_sockets_per_group_(max_sockets_per_group), 105 unused_idle_socket_timeout_(unused_idle_socket_timeout), 106 used_idle_socket_timeout_(used_idle_socket_timeout), 107 may_have_stalled_group_(false), 108 connect_job_factory_(connect_job_factory), 109 network_change_notifier_(network_change_notifier) { 110 DCHECK_LE(0, max_sockets_per_group); 111 DCHECK_LE(max_sockets_per_group, max_sockets); 112 113 if (network_change_notifier_) 114 network_change_notifier_->AddObserver(this); 115 } 116 117 ClientSocketPoolBaseHelper::~ClientSocketPoolBaseHelper() { 118 CancelAllConnectJobs(); 119 120 // Clean up any idle sockets. Assert that we have no remaining active 121 // sockets or pending requests. They should have all been cleaned up prior 122 // to the manager being destroyed. 123 CloseIdleSockets(); 124 DCHECK(group_map_.empty()); 125 DCHECK_EQ(0, connecting_socket_count_); 126 127 if (network_change_notifier_) 128 network_change_notifier_->RemoveObserver(this); 129 } 130 131 // InsertRequestIntoQueue inserts the request into the queue based on 132 // priority. Highest priorities are closest to the front. Older requests are 133 // prioritized over requests of equal priority. 134 // 135 // static 136 void ClientSocketPoolBaseHelper::InsertRequestIntoQueue( 137 const Request* r, RequestQueue* pending_requests) { 138 LoadLog::BeginEvent(r->load_log(), 139 LoadLog::TYPE_SOCKET_POOL_WAITING_IN_QUEUE); 140 141 RequestQueue::iterator it = pending_requests->begin(); 142 while (it != pending_requests->end() && r->priority() >= (*it)->priority()) 143 ++it; 144 pending_requests->insert(it, r); 145 } 146 147 // static 148 const ClientSocketPoolBaseHelper::Request* 149 ClientSocketPoolBaseHelper::RemoveRequestFromQueue( 150 RequestQueue::iterator it, RequestQueue* pending_requests) { 151 const Request* req = *it; 152 153 LoadLog::EndEvent(req->load_log(), 154 LoadLog::TYPE_SOCKET_POOL_WAITING_IN_QUEUE); 155 156 pending_requests->erase(it); 157 return req; 158 } 159 160 int ClientSocketPoolBaseHelper::RequestSocket( 161 const std::string& group_name, 162 const Request* request) { 163 DCHECK_GE(request->priority(), 0); 164 CompletionCallback* const callback = request->callback(); 165 CHECK(callback); 166 ClientSocketHandle* const handle = request->handle(); 167 CHECK(handle); 168 Group& group = group_map_[group_name]; 169 170 // Can we make another active socket now? 171 if (ReachedMaxSocketsLimit() || 172 !group.HasAvailableSocketSlot(max_sockets_per_group_)) { 173 if (ReachedMaxSocketsLimit()) { 174 // We could check if we really have a stalled group here, but it requires 175 // a scan of all groups, so just flip a flag here, and do the check later. 176 may_have_stalled_group_ = true; 177 178 LoadLog::AddEvent(request->load_log(), 179 LoadLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS); 180 } else { 181 LoadLog::AddEvent(request->load_log(), 182 LoadLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS_PER_GROUP); 183 } 184 InsertRequestIntoQueue(request, &group.pending_requests); 185 return ERR_IO_PENDING; 186 } 187 188 // Try to reuse a socket. 189 while (!group.idle_sockets.empty()) { 190 IdleSocket idle_socket = group.idle_sockets.back(); 191 group.idle_sockets.pop_back(); 192 DecrementIdleCount(); 193 if (idle_socket.socket->IsConnectedAndIdle()) { 194 // We found one we can reuse! 195 base::TimeDelta idle_time = 196 base::TimeTicks::Now() - idle_socket.start_time; 197 HandOutSocket( 198 idle_socket.socket, idle_socket.used, handle, idle_time, &group); 199 return OK; 200 } 201 delete idle_socket.socket; 202 } 203 204 // See if we already have enough connect jobs or sockets that will be released 205 // soon. 206 if (group.HasReleasingSockets()) { 207 InsertRequestIntoQueue(request, &group.pending_requests); 208 return ERR_IO_PENDING; 209 } 210 211 // We couldn't find a socket to reuse, so allocate and connect a new one. 212 scoped_refptr<LoadLog> job_load_log = new LoadLog(kMaxNumLoadLogEntries); 213 214 scoped_ptr<ConnectJob> connect_job( 215 connect_job_factory_->NewConnectJob(group_name, *request, this, 216 job_load_log)); 217 218 int rv = connect_job->Connect(); 219 220 if (rv != ERR_IO_PENDING && request->load_log()) 221 request->load_log()->Append(job_load_log); 222 223 if (rv == OK) { 224 HandOutSocket(connect_job->ReleaseSocket(), false /* not reused */, 225 handle, base::TimeDelta(), &group); 226 } else if (rv == ERR_IO_PENDING) { 227 connecting_socket_count_++; 228 229 ConnectJob* job = connect_job.release(); 230 InsertRequestIntoQueue(request, &group.pending_requests); 231 group.jobs.insert(job); 232 } else if (group.IsEmpty()) { 233 group_map_.erase(group_name); 234 } 235 236 return rv; 237 } 238 239 void ClientSocketPoolBaseHelper::CancelRequest( 240 const std::string& group_name, const ClientSocketHandle* handle) { 241 CHECK(ContainsKey(group_map_, group_name)); 242 243 Group& group = group_map_[group_name]; 244 245 // Search pending_requests for matching handle. 246 RequestQueue::iterator it = group.pending_requests.begin(); 247 for (; it != group.pending_requests.end(); ++it) { 248 if ((*it)->handle() == handle) { 249 const Request* req = RemoveRequestFromQueue(it, &group.pending_requests); 250 LoadLog::AddEvent(req->load_log(), LoadLog::TYPE_CANCELLED); 251 LoadLog::EndEvent(req->load_log(), LoadLog::TYPE_SOCKET_POOL); 252 delete req; 253 if (group.jobs.size() > group.pending_requests.size() + 1) { 254 // TODO(willchan): Cancel the job in the earliest LoadState. 255 RemoveConnectJob(*group.jobs.begin(), &group); 256 OnAvailableSocketSlot(group_name, &group); 257 } 258 return; 259 } 260 } 261 } 262 263 void ClientSocketPoolBaseHelper::ReleaseSocket(const std::string& group_name, 264 ClientSocket* socket) { 265 Group& group = group_map_[group_name]; 266 group.num_releasing_sockets++; 267 DCHECK_LE(group.num_releasing_sockets, group.active_socket_count); 268 // Run this asynchronously to allow the caller to finish before we let 269 // another to begin doing work. This also avoids nasty recursion issues. 270 // NOTE: We cannot refer to the handle argument after this method returns. 271 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( 272 this, &ClientSocketPoolBaseHelper::DoReleaseSocket, group_name, socket)); 273 } 274 275 void ClientSocketPoolBaseHelper::CloseIdleSockets() { 276 CleanupIdleSockets(true); 277 } 278 279 int ClientSocketPoolBaseHelper::IdleSocketCountInGroup( 280 const std::string& group_name) const { 281 GroupMap::const_iterator i = group_map_.find(group_name); 282 CHECK(i != group_map_.end()); 283 284 return i->second.idle_sockets.size(); 285 } 286 287 LoadState ClientSocketPoolBaseHelper::GetLoadState( 288 const std::string& group_name, 289 const ClientSocketHandle* handle) const { 290 if (!ContainsKey(group_map_, group_name)) { 291 NOTREACHED() << "ClientSocketPool does not contain group: " << group_name 292 << " for handle: " << handle; 293 return LOAD_STATE_IDLE; 294 } 295 296 // Can't use operator[] since it is non-const. 297 const Group& group = group_map_.find(group_name)->second; 298 299 // Search pending_requests for matching handle. 300 RequestQueue::const_iterator it = group.pending_requests.begin(); 301 for (size_t i = 0; it != group.pending_requests.end(); ++it, ++i) { 302 if ((*it)->handle() == handle) { 303 if (i < group.jobs.size()) { 304 LoadState max_state = LOAD_STATE_IDLE; 305 for (ConnectJobSet::const_iterator job_it = group.jobs.begin(); 306 job_it != group.jobs.end(); ++job_it) { 307 max_state = std::max(max_state, (*job_it)->GetLoadState()); 308 } 309 return max_state; 310 } else { 311 // TODO(wtc): Add a state for being on the wait list. 312 // See http://www.crbug.com/5077. 313 return LOAD_STATE_IDLE; 314 } 315 } 316 } 317 318 NOTREACHED(); 319 return LOAD_STATE_IDLE; 320 } 321 322 bool ClientSocketPoolBaseHelper::IdleSocket::ShouldCleanup( 323 base::TimeTicks now, 324 base::TimeDelta timeout) const { 325 bool timed_out = (now - start_time) >= timeout; 326 return timed_out || 327 !(used ? socket->IsConnectedAndIdle() : socket->IsConnected()); 328 } 329 330 void ClientSocketPoolBaseHelper::CleanupIdleSockets(bool force) { 331 if (idle_socket_count_ == 0) 332 return; 333 334 // Current time value. Retrieving it once at the function start rather than 335 // inside the inner loop, since it shouldn't change by any meaningful amount. 336 base::TimeTicks now = base::TimeTicks::Now(); 337 338 GroupMap::iterator i = group_map_.begin(); 339 while (i != group_map_.end()) { 340 Group& group = i->second; 341 342 std::deque<IdleSocket>::iterator j = group.idle_sockets.begin(); 343 while (j != group.idle_sockets.end()) { 344 base::TimeDelta timeout = 345 j->used ? used_idle_socket_timeout_ : unused_idle_socket_timeout_; 346 if (force || j->ShouldCleanup(now, timeout)) { 347 delete j->socket; 348 j = group.idle_sockets.erase(j); 349 DecrementIdleCount(); 350 } else { 351 ++j; 352 } 353 } 354 355 // Delete group if no longer needed. 356 if (group.IsEmpty()) { 357 group_map_.erase(i++); 358 } else { 359 ++i; 360 } 361 } 362 } 363 364 void ClientSocketPoolBaseHelper::IncrementIdleCount() { 365 if (++idle_socket_count_ == 1) 366 timer_.Start(TimeDelta::FromSeconds(kCleanupInterval), this, 367 &ClientSocketPoolBaseHelper::OnCleanupTimerFired); 368 } 369 370 void ClientSocketPoolBaseHelper::DecrementIdleCount() { 371 if (--idle_socket_count_ == 0) 372 timer_.Stop(); 373 } 374 375 void ClientSocketPoolBaseHelper::DoReleaseSocket(const std::string& group_name, 376 ClientSocket* socket) { 377 GroupMap::iterator i = group_map_.find(group_name); 378 CHECK(i != group_map_.end()); 379 380 Group& group = i->second; 381 382 group.num_releasing_sockets--; 383 DCHECK_GE(group.num_releasing_sockets, 0); 384 385 CHECK(handed_out_socket_count_ > 0); 386 handed_out_socket_count_--; 387 388 CHECK(group.active_socket_count > 0); 389 group.active_socket_count--; 390 391 const bool can_reuse = socket->IsConnectedAndIdle(); 392 if (can_reuse) { 393 AddIdleSocket(socket, true /* used socket */, &group); 394 } else { 395 delete socket; 396 } 397 398 const bool more_releasing_sockets = group.num_releasing_sockets > 0; 399 400 OnAvailableSocketSlot(group_name, &group); 401 402 // If there are no more releasing sockets, then we might have to process 403 // multiple available socket slots, since we stalled their processing until 404 // all sockets have been released. 405 if (more_releasing_sockets) 406 return; 407 408 while (true) { 409 // We can't activate more sockets since we're already at our global limit. 410 if (ReachedMaxSocketsLimit()) 411 return; 412 413 // |group| might now be deleted. 414 i = group_map_.find(group_name); 415 if (i == group_map_.end()) 416 return; 417 418 group = i->second; 419 420 // If we already have enough ConnectJobs to satisfy the pending requests, 421 // don't bother starting up more. 422 if (group.pending_requests.size() <= group.jobs.size()) 423 return; 424 425 if (!group.HasAvailableSocketSlot(max_sockets_per_group_)) 426 return; 427 428 OnAvailableSocketSlot(group_name, &group); 429 } 430 } 431 432 // Search for the highest priority pending request, amongst the groups that 433 // are not at the |max_sockets_per_group_| limit. Note: for requests with 434 // the same priority, the winner is based on group hash ordering (and not 435 // insertion order). 436 int ClientSocketPoolBaseHelper::FindTopStalledGroup(Group** group, 437 std::string* group_name) { 438 Group* top_group = NULL; 439 const std::string* top_group_name = NULL; 440 int stalled_group_count = 0; 441 for (GroupMap::iterator i = group_map_.begin(); 442 i != group_map_.end(); ++i) { 443 Group& group = i->second; 444 const RequestQueue& queue = group.pending_requests; 445 if (queue.empty()) 446 continue; 447 bool has_slot = group.HasAvailableSocketSlot(max_sockets_per_group_); 448 if (has_slot) 449 stalled_group_count++; 450 bool has_higher_priority = !top_group || 451 group.TopPendingPriority() < top_group->TopPendingPriority(); 452 if (has_slot && has_higher_priority) { 453 top_group = &group; 454 top_group_name = &i->first; 455 } 456 } 457 if (top_group) { 458 *group = top_group; 459 *group_name = *top_group_name; 460 } 461 return stalled_group_count; 462 } 463 464 void ClientSocketPoolBaseHelper::OnConnectJobComplete( 465 int result, ConnectJob* job) { 466 DCHECK_NE(ERR_IO_PENDING, result); 467 const std::string group_name = job->group_name(); 468 GroupMap::iterator group_it = group_map_.find(group_name); 469 CHECK(group_it != group_map_.end()); 470 Group& group = group_it->second; 471 472 scoped_ptr<ClientSocket> socket(job->ReleaseSocket()); 473 474 scoped_refptr<LoadLog> job_load_log(job->load_log()); 475 RemoveConnectJob(job, &group); 476 477 LoadLog::EndEvent(job_load_log, LoadLog::TYPE_SOCKET_POOL); 478 479 if (result == OK) { 480 DCHECK(socket.get()); 481 if (!group.pending_requests.empty()) { 482 scoped_ptr<const Request> r(RemoveRequestFromQueue( 483 group.pending_requests.begin(), &group.pending_requests)); 484 if (r->load_log()) 485 r->load_log()->Append(job_load_log); 486 HandOutSocket( 487 socket.release(), false /* unused socket */, r->handle(), 488 base::TimeDelta(), &group); 489 r->callback()->Run(result); 490 } else { 491 AddIdleSocket(socket.release(), false /* unused socket */, &group); 492 OnAvailableSocketSlot(group_name, &group); 493 } 494 } else { 495 DCHECK(!socket.get()); 496 if (!group.pending_requests.empty()) { 497 scoped_ptr<const Request> r(RemoveRequestFromQueue( 498 group.pending_requests.begin(), &group.pending_requests)); 499 if (r->load_log()) 500 r->load_log()->Append(job_load_log); 501 r->callback()->Run(result); 502 } 503 MaybeOnAvailableSocketSlot(group_name); 504 } 505 } 506 507 void ClientSocketPoolBaseHelper::OnIPAddressChanged() { 508 CloseIdleSockets(); 509 } 510 511 void ClientSocketPoolBaseHelper::RemoveConnectJob(const ConnectJob *job, 512 Group* group) { 513 CHECK(connecting_socket_count_ > 0); 514 connecting_socket_count_--; 515 516 DCHECK(job); 517 delete job; 518 519 if (group) { 520 DCHECK(ContainsKey(group->jobs, job)); 521 group->jobs.erase(job); 522 } 523 } 524 525 void ClientSocketPoolBaseHelper::MaybeOnAvailableSocketSlot( 526 const std::string& group_name) { 527 GroupMap::iterator it = group_map_.find(group_name); 528 if (it != group_map_.end()) { 529 Group& group = it->second; 530 if (group.HasAvailableSocketSlot(max_sockets_per_group_)) 531 OnAvailableSocketSlot(group_name, &group); 532 } 533 } 534 535 void ClientSocketPoolBaseHelper::OnAvailableSocketSlot( 536 const std::string& group_name, Group* group) { 537 if (may_have_stalled_group_) { 538 std::string top_group_name; 539 Group* top_group = NULL; 540 int stalled_group_count = FindTopStalledGroup(&top_group, &top_group_name); 541 if (stalled_group_count <= 1) 542 may_have_stalled_group_ = false; 543 if (stalled_group_count >= 1) 544 ProcessPendingRequest(top_group_name, top_group); 545 } else if (!group->pending_requests.empty()) { 546 ProcessPendingRequest(group_name, group); 547 // |group| may no longer be valid after this point. Be careful not to 548 // access it again. 549 } else if (group->IsEmpty()) { 550 // Delete |group| if no longer needed. |group| will no longer be valid. 551 group_map_.erase(group_name); 552 } 553 } 554 555 void ClientSocketPoolBaseHelper::ProcessPendingRequest( 556 const std::string& group_name, Group* group) { 557 scoped_ptr<const Request> r(RemoveRequestFromQueue( 558 group->pending_requests.begin(), &group->pending_requests)); 559 560 int rv = RequestSocket(group_name, r.get()); 561 562 if (rv != ERR_IO_PENDING) { 563 LoadLog::EndEvent(r->load_log(), LoadLog::TYPE_SOCKET_POOL); 564 r->callback()->Run(rv); 565 if (rv != OK) { 566 // |group| may be invalid after the callback, we need to search 567 // |group_map_| again. 568 MaybeOnAvailableSocketSlot(group_name); 569 } 570 } else { 571 r.release(); 572 } 573 } 574 575 void ClientSocketPoolBaseHelper::HandOutSocket( 576 ClientSocket* socket, 577 bool reused, 578 ClientSocketHandle* handle, 579 base::TimeDelta idle_time, 580 Group* group) { 581 DCHECK(socket); 582 handle->set_socket(socket); 583 handle->set_is_reused(reused); 584 handle->set_idle_time(idle_time); 585 586 handed_out_socket_count_++; 587 group->active_socket_count++; 588 } 589 590 void ClientSocketPoolBaseHelper::AddIdleSocket( 591 ClientSocket* socket, bool used, Group* group) { 592 DCHECK(socket); 593 IdleSocket idle_socket; 594 idle_socket.socket = socket; 595 idle_socket.start_time = base::TimeTicks::Now(); 596 idle_socket.used = used; 597 598 group->idle_sockets.push_back(idle_socket); 599 IncrementIdleCount(); 600 } 601 602 void ClientSocketPoolBaseHelper::CancelAllConnectJobs() { 603 for (GroupMap::iterator i = group_map_.begin(); i != group_map_.end();) { 604 Group& group = i->second; 605 connecting_socket_count_ -= group.jobs.size(); 606 STLDeleteElements(&group.jobs); 607 608 // Delete group if no longer needed. 609 if (group.IsEmpty()) { 610 group_map_.erase(i++); 611 } else { 612 ++i; 613 } 614 } 615 } 616 617 bool ClientSocketPoolBaseHelper::ReachedMaxSocketsLimit() const { 618 // Each connecting socket will eventually connect and be handed out. 619 int total = handed_out_socket_count_ + connecting_socket_count_; 620 DCHECK_LE(total, max_sockets_); 621 return total == max_sockets_; 622 } 623 624 } // namespace internal 625 626 } // namespace net 627