1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/tools/flip_server/output_ordering.h" 6 7 #include "net/tools/flip_server/flip_config.h" 8 #include "net/tools/flip_server/sm_connection.h" 9 10 11 namespace net { 12 13 // static 14 double OutputOrdering::server_think_time_in_s_ = 0.0; 15 16 OutputOrdering::OutputOrdering(SMConnectionInterface* connection) 17 : first_data_senders_threshold_(kInitialDataSendersThreshold), 18 connection_(connection) { 19 if (connection) 20 epoll_server_ = connection->epoll_server(); 21 } 22 23 OutputOrdering::~OutputOrdering() {} 24 25 void OutputOrdering::Reset() { 26 while (!stream_ids_.empty()) { 27 StreamIdToPriorityMap::iterator sitpmi = stream_ids_.begin(); 28 PriorityMapPointer& pmp = sitpmi->second; 29 if (pmp.alarm_enabled) { 30 epoll_server_->UnregisterAlarm(pmp.alarm_token); 31 } 32 stream_ids_.erase(sitpmi); 33 } 34 priority_map_.clear(); 35 first_data_senders_.clear(); 36 } 37 38 bool OutputOrdering::ExistsInPriorityMaps(uint32 stream_id) { 39 StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id); 40 return sitpmi != stream_ids_.end(); 41 } 42 43 OutputOrdering::BeginOutputtingAlarm::BeginOutputtingAlarm( 44 OutputOrdering* oo, 45 OutputOrdering::PriorityMapPointer* pmp, 46 const MemCacheIter& mci) 47 : output_ordering_(oo), 48 pmp_(pmp), 49 mci_(mci), 50 epoll_server_(NULL) { 51 } 52 53 OutputOrdering::BeginOutputtingAlarm::~BeginOutputtingAlarm() { 54 if (epoll_server_ && pmp_->alarm_enabled) 55 epoll_server_->UnregisterAlarm(pmp_->alarm_token); 56 } 57 58 int64 OutputOrdering::BeginOutputtingAlarm::OnAlarm() { 59 OnUnregistration(); 60 output_ordering_->MoveToActive(pmp_, mci_); 61 VLOG(2) << "ON ALARM! Should now start to output..."; 62 delete this; 63 return 0; 64 } 65 66 void OutputOrdering::BeginOutputtingAlarm::OnRegistration( 67 const EpollServer::AlarmRegToken& tok, 68 EpollServer* eps) { 69 epoll_server_ = eps; 70 pmp_->alarm_token = tok; 71 pmp_->alarm_enabled = true; 72 } 73 74 void OutputOrdering::BeginOutputtingAlarm::OnUnregistration() { 75 pmp_->alarm_enabled = false; 76 } 77 78 void OutputOrdering::BeginOutputtingAlarm::OnShutdown(EpollServer* eps) { 79 OnUnregistration(); 80 } 81 82 void OutputOrdering::MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) { 83 VLOG(2) << "Moving to active!"; 84 first_data_senders_.push_back(mci); 85 pmp->ring = &first_data_senders_; 86 pmp->it = first_data_senders_.end(); 87 --pmp->it; 88 connection_->ReadyToSend(); 89 } 90 91 void OutputOrdering::AddToOutputOrder(const MemCacheIter& mci) { 92 if (ExistsInPriorityMaps(mci.stream_id)) 93 LOG(ERROR) << "OOps, already was inserted here?!"; 94 95 double think_time_in_s = server_think_time_in_s_; 96 std::string x_server_latency = 97 mci.file_data->headers->GetHeader("X-Server-Latency").as_string(); 98 if (!x_server_latency.empty()) { 99 char* endp; 100 double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp); 101 if (endp != x_server_latency.c_str() + x_server_latency.size()) { 102 LOG(ERROR) << "Unable to understand X-Server-Latency of: " 103 << x_server_latency << " for resource: " 104 << mci.file_data->filename.c_str(); 105 } else { 106 think_time_in_s = tmp_think_time_in_s; 107 } 108 } 109 StreamIdToPriorityMap::iterator sitpmi; 110 sitpmi = stream_ids_.insert( 111 std::pair<uint32, PriorityMapPointer>(mci.stream_id, 112 PriorityMapPointer())).first; 113 PriorityMapPointer& pmp = sitpmi->second; 114 115 BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci); 116 VLOG(1) << "Server think time: " << think_time_in_s; 117 epoll_server_->RegisterAlarmApproximateDelta( 118 think_time_in_s * 1000000, boa); 119 } 120 121 void OutputOrdering::SpliceToPriorityRing(PriorityRing::iterator pri) { 122 MemCacheIter& mci = *pri; 123 PriorityMap::iterator pmi = priority_map_.find(mci.priority); 124 if (pmi == priority_map_.end()) { 125 pmi = priority_map_.insert( 126 std::pair<uint32, PriorityRing>(mci.priority, PriorityRing())).first; 127 } 128 129 pmi->second.splice(pmi->second.end(), 130 first_data_senders_, 131 pri); 132 StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(mci.stream_id); 133 sitpmi->second.ring = &(pmi->second); 134 } 135 136 MemCacheIter* OutputOrdering::GetIter() { 137 while (!first_data_senders_.empty()) { 138 MemCacheIter& mci = first_data_senders_.front(); 139 if (mci.bytes_sent >= first_data_senders_threshold_) { 140 SpliceToPriorityRing(first_data_senders_.begin()); 141 } else { 142 first_data_senders_.splice(first_data_senders_.end(), 143 first_data_senders_, 144 first_data_senders_.begin()); 145 mci.max_segment_size = kInitialDataSendersThreshold; 146 return &mci; 147 } 148 } 149 while (!priority_map_.empty()) { 150 PriorityRing& first_ring = priority_map_.begin()->second; 151 if (first_ring.empty()) { 152 priority_map_.erase(priority_map_.begin()); 153 continue; 154 } 155 MemCacheIter& mci = first_ring.front(); 156 first_ring.splice(first_ring.end(), 157 first_ring, 158 first_ring.begin()); 159 mci.max_segment_size = kSpdySegmentSize; 160 return &mci; 161 } 162 return NULL; 163 } 164 165 void OutputOrdering::RemoveStreamId(uint32 stream_id) { 166 StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id); 167 if (sitpmi == stream_ids_.end()) 168 return; 169 170 PriorityMapPointer& pmp = sitpmi->second; 171 if (pmp.alarm_enabled) 172 epoll_server_->UnregisterAlarm(pmp.alarm_token); 173 else 174 pmp.ring->erase(pmp.it); 175 stream_ids_.erase(sitpmi); 176 } 177 178 } // namespace net 179 180