Home | History | Annotate | Download | only in flip_server
      1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/tools/flip_server/output_ordering.h"
      6 
      7 #include <utility>
      8 
      9 #include "net/tools/flip_server/flip_config.h"
     10 #include "net/tools/flip_server/sm_connection.h"
     11 
     12 namespace net {
     13 
     14 OutputOrdering::PriorityMapPointer::PriorityMapPointer()
     15     : ring(NULL), alarm_enabled(false) {}
     16 
     17 OutputOrdering::PriorityMapPointer::~PriorityMapPointer() {}
     18 
     19 // static
     20 double OutputOrdering::server_think_time_in_s_ = 0.0;
     21 
     22 OutputOrdering::OutputOrdering(SMConnectionInterface* connection)
     23     : first_data_senders_threshold_(kInitialDataSendersThreshold),
     24       connection_(connection) {
     25   if (connection)
     26     epoll_server_ = connection->epoll_server();
     27 }
     28 
     29 OutputOrdering::~OutputOrdering() { Reset(); }
     30 
     31 void OutputOrdering::Reset() {
     32   while (!stream_ids_.empty()) {
     33     StreamIdToPriorityMap::iterator sitpmi = stream_ids_.begin();
     34     PriorityMapPointer& pmp = sitpmi->second;
     35     if (pmp.alarm_enabled) {
     36       epoll_server_->UnregisterAlarm(pmp.alarm_token);
     37     }
     38     stream_ids_.erase(sitpmi);
     39   }
     40   priority_map_.clear();
     41   first_data_senders_.clear();
     42 }
     43 
     44 bool OutputOrdering::ExistsInPriorityMaps(uint32 stream_id) const {
     45   StreamIdToPriorityMap::const_iterator sitpmi = stream_ids_.find(stream_id);
     46   return sitpmi != stream_ids_.end();
     47 }
     48 
     49 OutputOrdering::BeginOutputtingAlarm::BeginOutputtingAlarm(
     50     OutputOrdering* oo,
     51     OutputOrdering::PriorityMapPointer* pmp,
     52     const MemCacheIter& mci)
     53     : output_ordering_(oo), pmp_(pmp), mci_(mci), epoll_server_(NULL) {}
     54 
     55 OutputOrdering::BeginOutputtingAlarm::~BeginOutputtingAlarm() {
     56   if (epoll_server_ && pmp_->alarm_enabled)
     57     epoll_server_->UnregisterAlarm(pmp_->alarm_token);
     58 }
     59 
     60 int64 OutputOrdering::BeginOutputtingAlarm::OnAlarm() {
     61   OnUnregistration();
     62   output_ordering_->MoveToActive(pmp_, mci_);
     63   VLOG(2) << "ON ALARM! Should now start to output...";
     64   delete this;
     65   return 0;
     66 }
     67 
     68 void OutputOrdering::BeginOutputtingAlarm::OnRegistration(
     69     const EpollServer::AlarmRegToken& tok,
     70     EpollServer* eps) {
     71   epoll_server_ = eps;
     72   pmp_->alarm_token = tok;
     73   pmp_->alarm_enabled = true;
     74 }
     75 
     76 void OutputOrdering::BeginOutputtingAlarm::OnUnregistration() {
     77   pmp_->alarm_enabled = false;
     78   delete this;
     79 }
     80 
     81 void OutputOrdering::BeginOutputtingAlarm::OnShutdown(EpollServer* eps) {
     82   OnUnregistration();
     83 }
     84 
     85 void OutputOrdering::MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) {
     86   VLOG(2) << "Moving to active!";
     87   first_data_senders_.push_back(mci);
     88   pmp->ring = &first_data_senders_;
     89   pmp->it = first_data_senders_.end();
     90   --pmp->it;
     91   connection_->ReadyToSend();
     92 }
     93 
     94 void OutputOrdering::AddToOutputOrder(const MemCacheIter& mci) {
     95   if (ExistsInPriorityMaps(mci.stream_id))
     96     LOG(ERROR) << "OOps, already was inserted here?!";
     97 
     98   double think_time_in_s = server_think_time_in_s_;
     99   std::string x_server_latency =
    100       mci.file_data->headers()->GetHeader("X-Server-Latency").as_string();
    101   if (!x_server_latency.empty()) {
    102     char* endp;
    103     double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp);
    104     if (endp != x_server_latency.c_str() + x_server_latency.size()) {
    105       LOG(ERROR) << "Unable to understand X-Server-Latency of: "
    106                  << x_server_latency
    107                  << " for resource: " << mci.file_data->filename().c_str();
    108     } else {
    109       think_time_in_s = tmp_think_time_in_s;
    110     }
    111   }
    112   StreamIdToPriorityMap::iterator sitpmi;
    113   sitpmi = stream_ids_.insert(std::pair<uint32, PriorityMapPointer>(
    114                                   mci.stream_id, PriorityMapPointer())).first;
    115   PriorityMapPointer& pmp = sitpmi->second;
    116 
    117   BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci);
    118   VLOG(1) << "Server think time: " << think_time_in_s;
    119   epoll_server_->RegisterAlarmApproximateDelta(think_time_in_s * 1000000, boa);
    120 }
    121 
    122 void OutputOrdering::SpliceToPriorityRing(PriorityRing::iterator pri) {
    123   MemCacheIter& mci = *pri;
    124   PriorityMap::iterator pmi = priority_map_.find(mci.priority);
    125   if (pmi == priority_map_.end()) {
    126     pmi = priority_map_.insert(std::pair<uint32, PriorityRing>(
    127                                    mci.priority, PriorityRing())).first;
    128   }
    129 
    130   pmi->second.splice(pmi->second.end(), first_data_senders_, pri);
    131   StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(mci.stream_id);
    132   sitpmi->second.ring = &(pmi->second);
    133 }
    134 
    135 MemCacheIter* OutputOrdering::GetIter() {
    136   while (!first_data_senders_.empty()) {
    137     MemCacheIter& mci = first_data_senders_.front();
    138     if (mci.bytes_sent >= first_data_senders_threshold_) {
    139       SpliceToPriorityRing(first_data_senders_.begin());
    140     } else {
    141       first_data_senders_.splice(first_data_senders_.end(),
    142                                  first_data_senders_,
    143                                  first_data_senders_.begin());
    144       mci.max_segment_size = kInitialDataSendersThreshold;
    145       return &mci;
    146     }
    147   }
    148   while (!priority_map_.empty()) {
    149     PriorityRing& first_ring = priority_map_.begin()->second;
    150     if (first_ring.empty()) {
    151       priority_map_.erase(priority_map_.begin());
    152       continue;
    153     }
    154     MemCacheIter& mci = first_ring.front();
    155     first_ring.splice(first_ring.end(), first_ring, first_ring.begin());
    156     mci.max_segment_size = kSpdySegmentSize;
    157     return &mci;
    158   }
    159   return NULL;
    160 }
    161 
    162 void OutputOrdering::RemoveStreamId(uint32 stream_id) {
    163   StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id);
    164   if (sitpmi == stream_ids_.end())
    165     return;
    166 
    167   PriorityMapPointer& pmp = sitpmi->second;
    168   if (pmp.alarm_enabled)
    169     epoll_server_->UnregisterAlarm(pmp.alarm_token);
    170   else
    171     pmp.ring->erase(pmp.it);
    172   stream_ids_.erase(sitpmi);
    173 }
    174 
    175 }  // namespace net
    176