1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "media/cast/net/pacing/paced_sender.h" 6 7 #include "base/big_endian.h" 8 #include "base/bind.h" 9 #include "base/debug/dump_without_crashing.h" 10 #include "base/message_loop/message_loop.h" 11 #include "media/cast/logging/logging_impl.h" 12 13 namespace media { 14 namespace cast { 15 16 namespace { 17 18 static const int64 kPacingIntervalMs = 10; 19 // Each frame will be split into no more than kPacingMaxBurstsPerFrame 20 // bursts of packets. 21 static const size_t kPacingMaxBurstsPerFrame = 3; 22 static const size_t kMaxDedupeWindowMs = 500; 23 24 // "Impossible" upper-bound on the maximum number of packets that should ever be 25 // enqueued in the pacer. This is used to detect bugs, reported as crash dumps. 26 static const size_t kHugeQueueLengthSeconds = 10; 27 static const size_t kRidiculousNumberOfPackets = 28 kHugeQueueLengthSeconds * (kMaxBurstSize * 1000 / kPacingIntervalMs); 29 30 } // namespace 31 32 DedupInfo::DedupInfo() : last_byte_acked_for_audio(0) {} 33 34 // static 35 PacketKey PacedPacketSender::MakePacketKey(const base::TimeTicks& ticks, 36 uint32 ssrc, 37 uint16 packet_id) { 38 return std::make_pair(ticks, std::make_pair(ssrc, packet_id)); 39 } 40 41 PacedSender::PacketSendRecord::PacketSendRecord() 42 : last_byte_sent(0), last_byte_sent_for_audio(0) {} 43 44 PacedSender::PacedSender( 45 size_t target_burst_size, 46 size_t max_burst_size, 47 base::TickClock* clock, 48 LoggingImpl* logging, 49 PacketSender* transport, 50 const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner) 51 : clock_(clock), 52 logging_(logging), 53 transport_(transport), 54 transport_task_runner_(transport_task_runner), 55 audio_ssrc_(0), 56 video_ssrc_(0), 57 target_burst_size_(target_burst_size), 58 max_burst_size_(max_burst_size), 59 current_max_burst_size_(target_burst_size_), 60 next_max_burst_size_(target_burst_size_), 61 next_next_max_burst_size_(target_burst_size_), 62 current_burst_size_(0), 63 state_(State_Unblocked), 64 has_reached_upper_bound_once_(false), 65 weak_factory_(this) { 66 } 67 68 PacedSender::~PacedSender() {} 69 70 void PacedSender::RegisterAudioSsrc(uint32 audio_ssrc) { 71 audio_ssrc_ = audio_ssrc; 72 } 73 74 void PacedSender::RegisterVideoSsrc(uint32 video_ssrc) { 75 video_ssrc_ = video_ssrc; 76 } 77 78 void PacedSender::RegisterPrioritySsrc(uint32 ssrc) { 79 priority_ssrcs_.push_back(ssrc); 80 } 81 82 int64 PacedSender::GetLastByteSentForPacket(const PacketKey& packet_key) { 83 PacketSendHistory::const_iterator it = send_history_.find(packet_key); 84 if (it == send_history_.end()) 85 return 0; 86 return it->second.last_byte_sent; 87 } 88 89 int64 PacedSender::GetLastByteSentForSsrc(uint32 ssrc) { 90 std::map<uint32, int64>::const_iterator it = last_byte_sent_.find(ssrc); 91 if (it == last_byte_sent_.end()) 92 return 0; 93 return it->second; 94 } 95 96 bool PacedSender::SendPackets(const SendPacketVector& packets) { 97 if (packets.empty()) { 98 return true; 99 } 100 const bool high_priority = IsHighPriority(packets.begin()->first); 101 for (size_t i = 0; i < packets.size(); i++) { 102 DCHECK(IsHighPriority(packets[i].first) == high_priority); 103 if (high_priority) { 104 priority_packet_list_[packets[i].first] = 105 make_pair(PacketType_Normal, packets[i].second); 106 } else { 107 packet_list_[packets[i].first] = 108 make_pair(PacketType_Normal, packets[i].second); 109 } 110 } 111 if (state_ == State_Unblocked) { 112 SendStoredPackets(); 113 } 114 return true; 115 } 116 117 bool PacedSender::ShouldResend(const PacketKey& packet_key, 118 const DedupInfo& dedup_info, 119 const base::TimeTicks& now) { 120 PacketSendHistory::const_iterator it = send_history_.find(packet_key); 121 122 // No history of previous transmission. It might be sent too long ago. 123 if (it == send_history_.end()) 124 return true; 125 126 // Suppose there is request to retransmit X and there is an audio 127 // packet Y sent just before X. Reject retransmission of X if ACK for 128 // Y has not been received. 129 // Only do this for video packets. 130 if (packet_key.second.first == video_ssrc_) { 131 if (dedup_info.last_byte_acked_for_audio && 132 it->second.last_byte_sent_for_audio && 133 dedup_info.last_byte_acked_for_audio < 134 it->second.last_byte_sent_for_audio) { 135 return false; 136 } 137 } 138 // Retransmission interval has to be greater than |resend_interval|. 139 if (now - it->second.time < dedup_info.resend_interval) 140 return false; 141 return true; 142 } 143 144 bool PacedSender::ResendPackets(const SendPacketVector& packets, 145 const DedupInfo& dedup_info) { 146 if (packets.empty()) { 147 return true; 148 } 149 const bool high_priority = IsHighPriority(packets.begin()->first); 150 const base::TimeTicks now = clock_->NowTicks(); 151 for (size_t i = 0; i < packets.size(); i++) { 152 if (!ShouldResend(packets[i].first, dedup_info, now)) { 153 LogPacketEvent(packets[i].second->data, PACKET_RTX_REJECTED); 154 continue; 155 } 156 157 DCHECK(IsHighPriority(packets[i].first) == high_priority); 158 if (high_priority) { 159 priority_packet_list_[packets[i].first] = 160 make_pair(PacketType_Resend, packets[i].second); 161 } else { 162 packet_list_[packets[i].first] = 163 make_pair(PacketType_Resend, packets[i].second); 164 } 165 } 166 if (state_ == State_Unblocked) { 167 SendStoredPackets(); 168 } 169 return true; 170 } 171 172 bool PacedSender::SendRtcpPacket(uint32 ssrc, PacketRef packet) { 173 if (state_ == State_TransportBlocked) { 174 priority_packet_list_[ 175 PacedPacketSender::MakePacketKey(base::TimeTicks(), ssrc, 0)] = 176 make_pair(PacketType_RTCP, packet); 177 } else { 178 // We pass the RTCP packets straight through. 179 if (!transport_->SendPacket( 180 packet, 181 base::Bind(&PacedSender::SendStoredPackets, 182 weak_factory_.GetWeakPtr()))) { 183 state_ = State_TransportBlocked; 184 } 185 } 186 return true; 187 } 188 189 void PacedSender::CancelSendingPacket(const PacketKey& packet_key) { 190 packet_list_.erase(packet_key); 191 priority_packet_list_.erase(packet_key); 192 } 193 194 PacketRef PacedSender::PopNextPacket(PacketType* packet_type, 195 PacketKey* packet_key) { 196 PacketList* list = !priority_packet_list_.empty() ? 197 &priority_packet_list_ : &packet_list_; 198 DCHECK(!list->empty()); 199 PacketList::iterator i = list->begin(); 200 *packet_type = i->second.first; 201 *packet_key = i->first; 202 PacketRef ret = i->second.second; 203 list->erase(i); 204 return ret; 205 } 206 207 bool PacedSender::IsHighPriority(const PacketKey& packet_key) const { 208 return std::find(priority_ssrcs_.begin(), priority_ssrcs_.end(), 209 packet_key.second.first) != priority_ssrcs_.end(); 210 } 211 212 bool PacedSender::empty() const { 213 return packet_list_.empty() && priority_packet_list_.empty(); 214 } 215 216 size_t PacedSender::size() const { 217 return packet_list_.size() + priority_packet_list_.size(); 218 } 219 220 // This function can be called from three places: 221 // 1. User called one of the Send* functions and we were in an unblocked state. 222 // 2. state_ == State_TransportBlocked and the transport is calling us to 223 // let us know that it's ok to send again. 224 // 3. state_ == State_BurstFull and there are still packets to send. In this 225 // case we called PostDelayedTask on this function to start a new burst. 226 void PacedSender::SendStoredPackets() { 227 State previous_state = state_; 228 state_ = State_Unblocked; 229 if (empty()) { 230 return; 231 } 232 233 // If the queue ever becomes impossibly long, send a crash dump without 234 // actually crashing the process. 235 if (size() > kRidiculousNumberOfPackets && !has_reached_upper_bound_once_) { 236 NOTREACHED(); 237 // Please use Cr=Internals-Cast label in bug reports: 238 base::debug::DumpWithoutCrashing(); 239 has_reached_upper_bound_once_ = true; 240 } 241 242 base::TimeTicks now = clock_->NowTicks(); 243 // I don't actually trust that PostDelayTask(x - now) will mean that 244 // now >= x when the call happens, so check if the previous state was 245 // State_BurstFull too. 246 if (now >= burst_end_ || previous_state == State_BurstFull) { 247 // Start a new burst. 248 current_burst_size_ = 0; 249 burst_end_ = now + base::TimeDelta::FromMilliseconds(kPacingIntervalMs); 250 251 // The goal here is to try to send out the queued packets over the next 252 // three bursts, while trying to keep the burst size below 10 if possible. 253 // We have some evidence that sending more than 12 packets in a row doesn't 254 // work very well, but we don't actually know why yet. Sending out packets 255 // sooner is better than sending out packets later as that gives us more 256 // time to re-send them if needed. So if we have less than 30 packets, just 257 // send 10 at a time. If we have less than 60 packets, send n / 3 at a time. 258 // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s 259 // which is more bandwidth than the cast library should need, and sending 260 // out more data per second is unlikely to be helpful. 261 size_t max_burst_size = std::min( 262 max_burst_size_, 263 std::max(target_burst_size_, size() / kPacingMaxBurstsPerFrame)); 264 current_max_burst_size_ = std::max(next_max_burst_size_, max_burst_size); 265 next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size); 266 next_next_max_burst_size_ = max_burst_size; 267 } 268 269 base::Closure cb = base::Bind(&PacedSender::SendStoredPackets, 270 weak_factory_.GetWeakPtr()); 271 while (!empty()) { 272 if (current_burst_size_ >= current_max_burst_size_) { 273 transport_task_runner_->PostDelayedTask(FROM_HERE, 274 cb, 275 burst_end_ - now); 276 state_ = State_BurstFull; 277 return; 278 } 279 PacketType packet_type; 280 PacketKey packet_key; 281 PacketRef packet = PopNextPacket(&packet_type, &packet_key); 282 PacketSendRecord send_record; 283 send_record.time = now; 284 285 switch (packet_type) { 286 case PacketType_Resend: 287 LogPacketEvent(packet->data, PACKET_RETRANSMITTED); 288 break; 289 case PacketType_Normal: 290 LogPacketEvent(packet->data, PACKET_SENT_TO_NETWORK); 291 break; 292 case PacketType_RTCP: 293 break; 294 } 295 296 const bool socket_blocked = !transport_->SendPacket(packet, cb); 297 298 // Save the send record. 299 send_record.last_byte_sent = transport_->GetBytesSent(); 300 send_record.last_byte_sent_for_audio = GetLastByteSentForSsrc(audio_ssrc_); 301 send_history_[packet_key] = send_record; 302 send_history_buffer_[packet_key] = send_record; 303 last_byte_sent_[packet_key.second.first] = send_record.last_byte_sent; 304 305 if (socket_blocked) { 306 state_ = State_TransportBlocked; 307 return; 308 } 309 current_burst_size_++; 310 } 311 312 // Keep ~0.5 seconds of data (1000 packets). 313 if (send_history_buffer_.size() >= 314 max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs) { 315 send_history_.swap(send_history_buffer_); 316 send_history_buffer_.clear(); 317 } 318 DCHECK_LE(send_history_buffer_.size(), 319 max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs); 320 state_ = State_Unblocked; 321 } 322 323 void PacedSender::LogPacketEvent(const Packet& packet, CastLoggingEvent event) { 324 // Get SSRC from packet and compare with the audio_ssrc / video_ssrc to see 325 // if the packet is audio or video. 326 DCHECK_GE(packet.size(), 12u); 327 base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[8]), 4); 328 uint32 ssrc; 329 bool success = reader.ReadU32(&ssrc); 330 DCHECK(success); 331 bool is_audio; 332 if (ssrc == audio_ssrc_) { 333 is_audio = true; 334 } else if (ssrc == video_ssrc_) { 335 is_audio = false; 336 } else { 337 DVLOG(3) << "Got unknown ssrc " << ssrc << " when logging packet event"; 338 return; 339 } 340 341 EventMediaType media_type = is_audio ? AUDIO_EVENT : VIDEO_EVENT; 342 logging_->InsertSinglePacketEvent(clock_->NowTicks(), event, media_type, 343 packet); 344 } 345 346 } // namespace cast 347 } // namespace media 348