1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/tools/flip_server/output_ordering.h" 6 7 #include <utility> 8 9 #include "net/tools/flip_server/flip_config.h" 10 #include "net/tools/flip_server/sm_connection.h" 11 12 namespace net { 13 14 OutputOrdering::PriorityMapPointer::PriorityMapPointer() 15 : ring(NULL), alarm_enabled(false) {} 16 17 OutputOrdering::PriorityMapPointer::~PriorityMapPointer() {} 18 19 // static 20 double OutputOrdering::server_think_time_in_s_ = 0.0; 21 22 OutputOrdering::OutputOrdering(SMConnectionInterface* connection) 23 : first_data_senders_threshold_(kInitialDataSendersThreshold), 24 connection_(connection) { 25 if (connection) 26 epoll_server_ = connection->epoll_server(); 27 } 28 29 OutputOrdering::~OutputOrdering() { Reset(); } 30 31 void OutputOrdering::Reset() { 32 while (!stream_ids_.empty()) { 33 StreamIdToPriorityMap::iterator sitpmi = stream_ids_.begin(); 34 PriorityMapPointer& pmp = sitpmi->second; 35 if (pmp.alarm_enabled) { 36 epoll_server_->UnregisterAlarm(pmp.alarm_token); 37 } 38 stream_ids_.erase(sitpmi); 39 } 40 priority_map_.clear(); 41 first_data_senders_.clear(); 42 } 43 44 bool OutputOrdering::ExistsInPriorityMaps(uint32 stream_id) const { 45 StreamIdToPriorityMap::const_iterator sitpmi = stream_ids_.find(stream_id); 46 return sitpmi != stream_ids_.end(); 47 } 48 49 OutputOrdering::BeginOutputtingAlarm::BeginOutputtingAlarm( 50 OutputOrdering* oo, 51 OutputOrdering::PriorityMapPointer* pmp, 52 const MemCacheIter& mci) 53 : output_ordering_(oo), pmp_(pmp), mci_(mci), epoll_server_(NULL) {} 54 55 OutputOrdering::BeginOutputtingAlarm::~BeginOutputtingAlarm() { 56 if (epoll_server_ && pmp_->alarm_enabled) 57 epoll_server_->UnregisterAlarm(pmp_->alarm_token); 58 } 59 60 int64 OutputOrdering::BeginOutputtingAlarm::OnAlarm() { 61 OnUnregistration(); 62 output_ordering_->MoveToActive(pmp_, mci_); 63 VLOG(2) << "ON ALARM! Should now start to output..."; 64 delete this; 65 return 0; 66 } 67 68 void OutputOrdering::BeginOutputtingAlarm::OnRegistration( 69 const EpollServer::AlarmRegToken& tok, 70 EpollServer* eps) { 71 epoll_server_ = eps; 72 pmp_->alarm_token = tok; 73 pmp_->alarm_enabled = true; 74 } 75 76 void OutputOrdering::BeginOutputtingAlarm::OnUnregistration() { 77 pmp_->alarm_enabled = false; 78 delete this; 79 } 80 81 void OutputOrdering::BeginOutputtingAlarm::OnShutdown(EpollServer* eps) { 82 OnUnregistration(); 83 } 84 85 void OutputOrdering::MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) { 86 VLOG(2) << "Moving to active!"; 87 first_data_senders_.push_back(mci); 88 pmp->ring = &first_data_senders_; 89 pmp->it = first_data_senders_.end(); 90 --pmp->it; 91 connection_->ReadyToSend(); 92 } 93 94 void OutputOrdering::AddToOutputOrder(const MemCacheIter& mci) { 95 if (ExistsInPriorityMaps(mci.stream_id)) 96 LOG(ERROR) << "OOps, already was inserted here?!"; 97 98 double think_time_in_s = server_think_time_in_s_; 99 std::string x_server_latency = 100 mci.file_data->headers()->GetHeader("X-Server-Latency").as_string(); 101 if (!x_server_latency.empty()) { 102 char* endp; 103 double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp); 104 if (endp != x_server_latency.c_str() + x_server_latency.size()) { 105 LOG(ERROR) << "Unable to understand X-Server-Latency of: " 106 << x_server_latency 107 << " for resource: " << mci.file_data->filename().c_str(); 108 } else { 109 think_time_in_s = tmp_think_time_in_s; 110 } 111 } 112 StreamIdToPriorityMap::iterator sitpmi; 113 sitpmi = stream_ids_.insert(std::pair<uint32, PriorityMapPointer>( 114 mci.stream_id, PriorityMapPointer())).first; 115 PriorityMapPointer& pmp = sitpmi->second; 116 117 BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci); 118 VLOG(1) << "Server think time: " << think_time_in_s; 119 epoll_server_->RegisterAlarmApproximateDelta(think_time_in_s * 1000000, boa); 120 } 121 122 void OutputOrdering::SpliceToPriorityRing(PriorityRing::iterator pri) { 123 MemCacheIter& mci = *pri; 124 PriorityMap::iterator pmi = priority_map_.find(mci.priority); 125 if (pmi == priority_map_.end()) { 126 pmi = priority_map_.insert(std::pair<uint32, PriorityRing>( 127 mci.priority, PriorityRing())).first; 128 } 129 130 pmi->second.splice(pmi->second.end(), first_data_senders_, pri); 131 StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(mci.stream_id); 132 sitpmi->second.ring = &(pmi->second); 133 } 134 135 MemCacheIter* OutputOrdering::GetIter() { 136 while (!first_data_senders_.empty()) { 137 MemCacheIter& mci = first_data_senders_.front(); 138 if (mci.bytes_sent >= first_data_senders_threshold_) { 139 SpliceToPriorityRing(first_data_senders_.begin()); 140 } else { 141 first_data_senders_.splice(first_data_senders_.end(), 142 first_data_senders_, 143 first_data_senders_.begin()); 144 mci.max_segment_size = kInitialDataSendersThreshold; 145 return &mci; 146 } 147 } 148 while (!priority_map_.empty()) { 149 PriorityRing& first_ring = priority_map_.begin()->second; 150 if (first_ring.empty()) { 151 priority_map_.erase(priority_map_.begin()); 152 continue; 153 } 154 MemCacheIter& mci = first_ring.front(); 155 first_ring.splice(first_ring.end(), first_ring, first_ring.begin()); 156 mci.max_segment_size = kSpdySegmentSize; 157 return &mci; 158 } 159 return NULL; 160 } 161 162 void OutputOrdering::RemoveStreamId(uint32 stream_id) { 163 StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id); 164 if (sitpmi == stream_ids_.end()) 165 return; 166 167 PriorityMapPointer& pmp = sitpmi->second; 168 if (pmp.alarm_enabled) 169 epoll_server_->UnregisterAlarm(pmp.alarm_token); 170 else 171 pmp.ring->erase(pmp.it); 172 stream_ids_.erase(sitpmi); 173 } 174 175 } // namespace net 176