Home | History | Annotate | Download | only in websockets
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/websockets/websocket_throttle.h"
      6 
      7 #include <algorithm>
      8 #include <set>
      9 #include <string>
     10 #include <utility>
     11 
     12 #include "base/memory/singleton.h"
     13 #include "base/message_loop/message_loop.h"
     14 #include "base/strings/string_number_conversions.h"
     15 #include "base/strings/string_util.h"
     16 #include "base/strings/stringprintf.h"
     17 #include "net/base/io_buffer.h"
     18 #include "net/socket_stream/socket_stream.h"
     19 #include "net/websockets/websocket_job.h"
     20 
     21 namespace net {
     22 
     23 namespace {
     24 
     25 const size_t kMaxWebSocketJobsThrottled = 1024;
     26 
     27 }  // namespace
     28 
     29 WebSocketThrottle::WebSocketThrottle() {
     30 }
     31 
     32 WebSocketThrottle::~WebSocketThrottle() {
     33   DCHECK(queue_.empty());
     34   DCHECK(addr_map_.empty());
     35 }
     36 
     37 // static
     38 WebSocketThrottle* WebSocketThrottle::GetInstance() {
     39   return Singleton<WebSocketThrottle>::get();
     40 }
     41 
     42 bool WebSocketThrottle::PutInQueue(WebSocketJob* job) {
     43   if (queue_.size() >= kMaxWebSocketJobsThrottled)
     44     return false;
     45 
     46   queue_.push_back(job);
     47   const AddressList& address_list = job->address_list();
     48   std::set<IPEndPoint> address_set;
     49   for (AddressList::const_iterator addr_iter = address_list.begin();
     50        addr_iter != address_list.end();
     51        ++addr_iter) {
     52     const IPEndPoint& address = *addr_iter;
     53     // If |address| is already processed, don't do it again.
     54     if (!address_set.insert(address).second)
     55       continue;
     56 
     57     ConnectingAddressMap::iterator iter = addr_map_.find(address);
     58     if (iter == addr_map_.end()) {
     59       ConnectingAddressMap::iterator new_queue =
     60           addr_map_.insert(make_pair(address, ConnectingQueue())).first;
     61       new_queue->second.push_back(job);
     62     } else {
     63       DCHECK(!iter->second.empty());
     64       iter->second.push_back(job);
     65       job->SetWaiting();
     66       DVLOG(1) << "Waiting on " << address.ToString();
     67     }
     68   }
     69 
     70   return true;
     71 }
     72 
     73 void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) {
     74   ConnectingQueue::iterator queue_iter =
     75       std::find(queue_.begin(), queue_.end(), job);
     76   if (queue_iter == queue_.end())
     77     return;
     78   queue_.erase(queue_iter);
     79 
     80   std::set<WebSocketJob*> wakeup_candidates;
     81 
     82   const AddressList& resolved_address_list = job->address_list();
     83   std::set<IPEndPoint> address_set;
     84   for (AddressList::const_iterator addr_iter = resolved_address_list.begin();
     85        addr_iter != resolved_address_list.end();
     86        ++addr_iter) {
     87     const IPEndPoint& address = *addr_iter;
     88     // If |address| is already processed, don't do it again.
     89     if (!address_set.insert(address).second)
     90       continue;
     91 
     92     ConnectingAddressMap::iterator map_iter = addr_map_.find(address);
     93     DCHECK(map_iter != addr_map_.end());
     94 
     95     ConnectingQueue& per_address_queue = map_iter->second;
     96     DCHECK(!per_address_queue.empty());
     97     // Job may not be front of the queue if the socket is closed while waiting.
     98     ConnectingQueue::iterator per_address_queue_iter =
     99         std::find(per_address_queue.begin(), per_address_queue.end(), job);
    100     bool was_front = false;
    101     if (per_address_queue_iter != per_address_queue.end()) {
    102       was_front = (per_address_queue_iter == per_address_queue.begin());
    103       per_address_queue.erase(per_address_queue_iter);
    104     }
    105     if (per_address_queue.empty()) {
    106       addr_map_.erase(map_iter);
    107     } else if (was_front) {
    108       // The new front is a wake-up candidate.
    109       wakeup_candidates.insert(per_address_queue.front());
    110     }
    111   }
    112 
    113   WakeupSocketIfNecessary(wakeup_candidates);
    114 }
    115 
    116 void WebSocketThrottle::WakeupSocketIfNecessary(
    117     const std::set<WebSocketJob*>& wakeup_candidates) {
    118   for (std::set<WebSocketJob*>::const_iterator iter = wakeup_candidates.begin();
    119        iter != wakeup_candidates.end();
    120        ++iter) {
    121     WebSocketJob* job = *iter;
    122     if (!job->IsWaiting())
    123       continue;
    124 
    125     bool should_wakeup = true;
    126     const AddressList& resolved_address_list = job->address_list();
    127     for (AddressList::const_iterator addr_iter = resolved_address_list.begin();
    128          addr_iter != resolved_address_list.end();
    129          ++addr_iter) {
    130       const IPEndPoint& address = *addr_iter;
    131       ConnectingAddressMap::iterator map_iter = addr_map_.find(address);
    132       DCHECK(map_iter != addr_map_.end());
    133       const ConnectingQueue& per_address_queue = map_iter->second;
    134       if (job != per_address_queue.front()) {
    135         should_wakeup = false;
    136         break;
    137       }
    138     }
    139     if (should_wakeup)
    140       job->Wakeup();
    141   }
    142 }
    143 
    144 }  // namespace net
    145