Home | History | Annotate | Download | only in base
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/base/prioritized_dispatcher.h"
      6 
      7 #include "base/logging.h"
      8 
      9 namespace net {
     10 
     11 PrioritizedDispatcher::Limits::Limits(Priority num_priorities,
     12                                       size_t total_jobs)
     13     : total_jobs(total_jobs), reserved_slots(num_priorities) {}
     14 
     15 PrioritizedDispatcher::Limits::~Limits() {}
     16 
     17 PrioritizedDispatcher::PrioritizedDispatcher(const Limits& limits)
     18     : queue_(limits.reserved_slots.size()),
     19       max_running_jobs_(limits.reserved_slots.size()),
     20       num_running_jobs_(0) {
     21   SetLimits(limits);
     22 }
     23 
     24 PrioritizedDispatcher::~PrioritizedDispatcher() {}
     25 
     26 PrioritizedDispatcher::Handle PrioritizedDispatcher::Add(
     27     Job* job, Priority priority) {
     28   DCHECK(job);
     29   DCHECK_LT(priority, num_priorities());
     30   if (num_running_jobs_ < max_running_jobs_[priority]) {
     31     ++num_running_jobs_;
     32     job->Start();
     33     return Handle();
     34   }
     35   return queue_.Insert(job, priority);
     36 }
     37 
     38 PrioritizedDispatcher::Handle PrioritizedDispatcher::AddAtHead(
     39     Job* job, Priority priority) {
     40   DCHECK(job);
     41   DCHECK_LT(priority, num_priorities());
     42   if (num_running_jobs_ < max_running_jobs_[priority]) {
     43     ++num_running_jobs_;
     44     job->Start();
     45     return Handle();
     46   }
     47   return queue_.InsertAtFront(job, priority);
     48 }
     49 
     50 void PrioritizedDispatcher::Cancel(const Handle& handle) {
     51   queue_.Erase(handle);
     52 }
     53 
     54 PrioritizedDispatcher::Job* PrioritizedDispatcher::EvictOldestLowest() {
     55   Handle handle = queue_.FirstMin();
     56   if (handle.is_null())
     57     return NULL;
     58   Job* job = handle.value();
     59   Cancel(handle);
     60   return job;
     61 }
     62 
     63 PrioritizedDispatcher::Handle PrioritizedDispatcher::ChangePriority(
     64     const Handle& handle, Priority priority) {
     65   DCHECK(!handle.is_null());
     66   DCHECK_LT(priority, num_priorities());
     67   DCHECK_GE(num_running_jobs_, max_running_jobs_[handle.priority()]) <<
     68       "Job should not be in queue when limits permit it to start.";
     69 
     70   if (handle.priority() == priority)
     71     return handle;
     72 
     73   if (MaybeDispatchJob(handle, priority))
     74     return Handle();
     75   Job* job = handle.value();
     76   queue_.Erase(handle);
     77   return queue_.Insert(job, priority);
     78 }
     79 
     80 void PrioritizedDispatcher::OnJobFinished() {
     81   DCHECK_GT(num_running_jobs_, 0u);
     82   --num_running_jobs_;
     83   MaybeDispatchNextJob();
     84 }
     85 
     86 PrioritizedDispatcher::Limits PrioritizedDispatcher::GetLimits() const {
     87   size_t num_priorities = max_running_jobs_.size();
     88   Limits limits(num_priorities, max_running_jobs_.back());
     89 
     90   // Calculate the number of jobs reserved for each priority and higher.  Leave
     91   // the number of jobs reserved for the lowest priority or higher as 0.
     92   for (size_t i = 1; i < num_priorities; ++i) {
     93     limits.reserved_slots[i] = max_running_jobs_[i] - max_running_jobs_[i - 1];
     94   }
     95 
     96   return limits;
     97 }
     98 
     99 void PrioritizedDispatcher::SetLimits(const Limits& limits) {
    100   DCHECK_EQ(queue_.num_priorities(), limits.reserved_slots.size());
    101   size_t total = 0;
    102   for (size_t i = 0; i < limits.reserved_slots.size(); ++i) {
    103     total += limits.reserved_slots[i];
    104     max_running_jobs_[i] = total;
    105   }
    106   // Unreserved slots are available for all priorities.
    107   DCHECK_LE(total, limits.total_jobs) << "sum(reserved_slots) <= total_jobs";
    108   size_t spare = limits.total_jobs - total;
    109   for (size_t i = limits.reserved_slots.size(); i > 0; --i) {
    110     max_running_jobs_[i - 1] += spare;
    111   }
    112 
    113   // Start pending jobs, if limits permit.
    114   while (true) {
    115     if (!MaybeDispatchNextJob())
    116       break;
    117   }
    118 }
    119 
    120 void PrioritizedDispatcher::SetLimitsToZero() {
    121   SetLimits(Limits(queue_.num_priorities(), 0));
    122 }
    123 
    124 bool PrioritizedDispatcher::MaybeDispatchJob(const Handle& handle,
    125                                              Priority job_priority) {
    126   DCHECK_LT(job_priority, num_priorities());
    127   if (num_running_jobs_ >= max_running_jobs_[job_priority])
    128     return false;
    129   Job* job = handle.value();
    130   queue_.Erase(handle);
    131   ++num_running_jobs_;
    132   job->Start();
    133   return true;
    134 }
    135 
    136 bool PrioritizedDispatcher::MaybeDispatchNextJob() {
    137   Handle handle = queue_.FirstMax();
    138   if (handle.is_null()) {
    139     DCHECK_EQ(0u, queue_.size());
    140     return false;
    141   }
    142   return MaybeDispatchJob(handle, handle.priority());
    143 }
    144 
    145 }  // namespace net
    146