Home | History | Annotate | Download | only in flip_server
      1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/tools/flip_server/output_ordering.h"
      6 
      7 #include "net/tools/flip_server/flip_config.h"
      8 #include "net/tools/flip_server/sm_connection.h"
      9 
     10 
     11 namespace net {
     12 
     13 // static
     14 double OutputOrdering::server_think_time_in_s_ = 0.0;
     15 
     16 OutputOrdering::OutputOrdering(SMConnectionInterface* connection)
     17     : first_data_senders_threshold_(kInitialDataSendersThreshold),
     18       connection_(connection) {
     19   if (connection)
     20     epoll_server_ = connection->epoll_server();
     21 }
     22 
     23 OutputOrdering::~OutputOrdering() {}
     24 
     25 void OutputOrdering::Reset() {
     26   while (!stream_ids_.empty()) {
     27     StreamIdToPriorityMap::iterator sitpmi = stream_ids_.begin();
     28     PriorityMapPointer& pmp = sitpmi->second;
     29     if (pmp.alarm_enabled) {
     30       epoll_server_->UnregisterAlarm(pmp.alarm_token);
     31     }
     32     stream_ids_.erase(sitpmi);
     33   }
     34   priority_map_.clear();
     35   first_data_senders_.clear();
     36 }
     37 
     38 bool OutputOrdering::ExistsInPriorityMaps(uint32 stream_id) {
     39   StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id);
     40   return sitpmi != stream_ids_.end();
     41 }
     42 
     43 OutputOrdering::BeginOutputtingAlarm::BeginOutputtingAlarm(
     44     OutputOrdering* oo,
     45     OutputOrdering::PriorityMapPointer* pmp,
     46     const MemCacheIter& mci)
     47     : output_ordering_(oo),
     48       pmp_(pmp),
     49       mci_(mci),
     50       epoll_server_(NULL) {
     51 }
     52 
     53 OutputOrdering::BeginOutputtingAlarm::~BeginOutputtingAlarm() {
     54   if (epoll_server_ && pmp_->alarm_enabled)
     55     epoll_server_->UnregisterAlarm(pmp_->alarm_token);
     56 }
     57 
     58 int64 OutputOrdering::BeginOutputtingAlarm::OnAlarm() {
     59   OnUnregistration();
     60   output_ordering_->MoveToActive(pmp_, mci_);
     61   VLOG(2) << "ON ALARM! Should now start to output...";
     62   delete this;
     63   return 0;
     64 }
     65 
     66 void OutputOrdering::BeginOutputtingAlarm::OnRegistration(
     67     const EpollServer::AlarmRegToken& tok,
     68     EpollServer* eps) {
     69   epoll_server_ = eps;
     70   pmp_->alarm_token = tok;
     71   pmp_->alarm_enabled = true;
     72 }
     73 
     74 void OutputOrdering::BeginOutputtingAlarm::OnUnregistration() {
     75   pmp_->alarm_enabled = false;
     76 }
     77 
     78 void OutputOrdering::BeginOutputtingAlarm::OnShutdown(EpollServer* eps) {
     79   OnUnregistration();
     80 }
     81 
     82 void OutputOrdering::MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) {
     83   VLOG(2) << "Moving to active!";
     84   first_data_senders_.push_back(mci);
     85   pmp->ring = &first_data_senders_;
     86   pmp->it = first_data_senders_.end();
     87   --pmp->it;
     88   connection_->ReadyToSend();
     89 }
     90 
     91 void OutputOrdering::AddToOutputOrder(const MemCacheIter& mci) {
     92   if (ExistsInPriorityMaps(mci.stream_id))
     93     LOG(ERROR) << "OOps, already was inserted here?!";
     94 
     95   double think_time_in_s = server_think_time_in_s_;
     96   std::string x_server_latency =
     97     mci.file_data->headers->GetHeader("X-Server-Latency").as_string();
     98   if (!x_server_latency.empty()) {
     99     char* endp;
    100     double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp);
    101     if (endp != x_server_latency.c_str() + x_server_latency.size()) {
    102       LOG(ERROR) << "Unable to understand X-Server-Latency of: "
    103                  << x_server_latency << " for resource: "
    104                  <<  mci.file_data->filename.c_str();
    105     } else {
    106       think_time_in_s = tmp_think_time_in_s;
    107     }
    108   }
    109   StreamIdToPriorityMap::iterator sitpmi;
    110   sitpmi = stream_ids_.insert(
    111       std::pair<uint32, PriorityMapPointer>(mci.stream_id,
    112                                             PriorityMapPointer())).first;
    113   PriorityMapPointer& pmp = sitpmi->second;
    114 
    115   BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci);
    116   VLOG(1) << "Server think time: " << think_time_in_s;
    117   epoll_server_->RegisterAlarmApproximateDelta(
    118       think_time_in_s * 1000000, boa);
    119 }
    120 
    121 void OutputOrdering::SpliceToPriorityRing(PriorityRing::iterator pri) {
    122   MemCacheIter& mci = *pri;
    123   PriorityMap::iterator pmi = priority_map_.find(mci.priority);
    124   if (pmi == priority_map_.end()) {
    125     pmi = priority_map_.insert(
    126         std::pair<uint32, PriorityRing>(mci.priority, PriorityRing())).first;
    127   }
    128 
    129   pmi->second.splice(pmi->second.end(),
    130                      first_data_senders_,
    131                      pri);
    132   StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(mci.stream_id);
    133   sitpmi->second.ring = &(pmi->second);
    134 }
    135 
    136 MemCacheIter* OutputOrdering::GetIter() {
    137   while (!first_data_senders_.empty()) {
    138     MemCacheIter& mci = first_data_senders_.front();
    139     if (mci.bytes_sent >= first_data_senders_threshold_) {
    140       SpliceToPriorityRing(first_data_senders_.begin());
    141     } else {
    142       first_data_senders_.splice(first_data_senders_.end(),
    143                                 first_data_senders_,
    144                                 first_data_senders_.begin());
    145       mci.max_segment_size = kInitialDataSendersThreshold;
    146       return &mci;
    147     }
    148   }
    149   while (!priority_map_.empty()) {
    150     PriorityRing& first_ring = priority_map_.begin()->second;
    151     if (first_ring.empty()) {
    152       priority_map_.erase(priority_map_.begin());
    153       continue;
    154     }
    155     MemCacheIter& mci = first_ring.front();
    156     first_ring.splice(first_ring.end(),
    157                       first_ring,
    158                       first_ring.begin());
    159     mci.max_segment_size = kSpdySegmentSize;
    160     return &mci;
    161   }
    162   return NULL;
    163 }
    164 
    165 void OutputOrdering::RemoveStreamId(uint32 stream_id) {
    166   StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id);
    167   if (sitpmi == stream_ids_.end())
    168     return;
    169 
    170   PriorityMapPointer& pmp = sitpmi->second;
    171   if (pmp.alarm_enabled)
    172     epoll_server_->UnregisterAlarm(pmp.alarm_token);
    173   else
    174     pmp.ring->erase(pmp.it);
    175   stream_ids_.erase(sitpmi);
    176 }
    177 
    178 }  // namespace net
    179 
    180