1 /* 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/modules/pacing/include/paced_sender.h" 12 13 #include <assert.h> 14 15 #include <map> 16 #include <set> 17 18 #include "webrtc/modules/interface/module_common_types.h" 19 #include "webrtc/system_wrappers/interface/clock.h" 20 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h" 21 #include "webrtc/system_wrappers/interface/trace_event.h" 22 23 namespace { 24 // Time limit in milliseconds between packet bursts. 25 const int kMinPacketLimitMs = 5; 26 27 // Upper cap on process interval, in case process has not been called in a long 28 // time. 29 const int kMaxIntervalTimeMs = 30; 30 31 // Max time that the first packet in the queue can sit in the queue if no 32 // packets are sent, regardless of buffer state. In practice only in effect at 33 // low bitrates (less than 320 kbits/s). 34 const int kMaxQueueTimeWithoutSendingUs = 30000; 35 36 } // namespace 37 38 namespace webrtc { 39 namespace paced_sender { 40 struct Packet { 41 Packet(uint32_t ssrc, 42 uint16_t seq_number, 43 int64_t capture_time_ms, 44 int64_t enqueue_time_ms, 45 int length_in_bytes, 46 bool retransmission) 47 : ssrc(ssrc), 48 sequence_number(seq_number), 49 capture_time_ms(capture_time_ms), 50 enqueue_time_ms(enqueue_time_ms), 51 bytes(length_in_bytes), 52 retransmission(retransmission) {} 53 uint32_t ssrc; 54 uint16_t sequence_number; 55 int64_t capture_time_ms; 56 int64_t enqueue_time_ms; 57 int bytes; 58 bool retransmission; 59 }; 60 61 // STL list style class which prevents duplicates in the list. 62 class PacketList { 63 public: 64 PacketList() {}; 65 66 bool empty() const { 67 return packet_list_.empty(); 68 } 69 70 Packet front() const { 71 return packet_list_.front(); 72 } 73 74 void pop_front() { 75 Packet& packet = packet_list_.front(); 76 uint16_t sequence_number = packet.sequence_number; 77 uint32_t ssrc = packet.ssrc; 78 packet_list_.pop_front(); 79 sequence_number_set_[ssrc].erase(sequence_number); 80 } 81 82 void push_back(const Packet& packet) { 83 if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) == 84 sequence_number_set_[packet.ssrc].end()) { 85 // Don't insert duplicates. 86 packet_list_.push_back(packet); 87 sequence_number_set_[packet.ssrc].insert(packet.sequence_number); 88 } 89 } 90 91 private: 92 std::list<Packet> packet_list_; 93 std::map<uint32_t, std::set<uint16_t> > sequence_number_set_; 94 }; 95 96 class IntervalBudget { 97 public: 98 explicit IntervalBudget(int initial_target_rate_kbps) 99 : target_rate_kbps_(initial_target_rate_kbps), 100 bytes_remaining_(0) {} 101 102 void set_target_rate_kbps(int target_rate_kbps) { 103 target_rate_kbps_ = target_rate_kbps; 104 } 105 106 void IncreaseBudget(int delta_time_ms) { 107 int bytes = target_rate_kbps_ * delta_time_ms / 8; 108 if (bytes_remaining_ < 0) { 109 // We overused last interval, compensate this interval. 110 bytes_remaining_ = bytes_remaining_ + bytes; 111 } else { 112 // If we underused last interval we can't use it this interval. 113 bytes_remaining_ = bytes; 114 } 115 } 116 117 void UseBudget(int bytes) { 118 bytes_remaining_ = std::max(bytes_remaining_ - bytes, 119 -500 * target_rate_kbps_ / 8); 120 } 121 122 int bytes_remaining() const { return bytes_remaining_; } 123 124 private: 125 int target_rate_kbps_; 126 int bytes_remaining_; 127 }; 128 } // namespace paced_sender 129 130 const float PacedSender::kDefaultPaceMultiplier = 2.5f; 131 132 PacedSender::PacedSender(Clock* clock, 133 Callback* callback, 134 int max_bitrate_kbps, 135 int min_bitrate_kbps) 136 : clock_(clock), 137 callback_(callback), 138 critsect_(CriticalSectionWrapper::CreateCriticalSection()), 139 enabled_(true), 140 paused_(false), 141 max_queue_length_ms_(kDefaultMaxQueueLengthMs), 142 media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)), 143 padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)), 144 time_last_update_us_(clock->TimeInMicroseconds()), 145 capture_time_ms_last_queued_(0), 146 capture_time_ms_last_sent_(0), 147 high_priority_packets_(new paced_sender::PacketList), 148 normal_priority_packets_(new paced_sender::PacketList), 149 low_priority_packets_(new paced_sender::PacketList) { 150 UpdateBytesPerInterval(kMinPacketLimitMs); 151 } 152 153 PacedSender::~PacedSender() {} 154 155 void PacedSender::Pause() { 156 CriticalSectionScoped cs(critsect_.get()); 157 paused_ = true; 158 } 159 160 void PacedSender::Resume() { 161 CriticalSectionScoped cs(critsect_.get()); 162 paused_ = false; 163 } 164 165 void PacedSender::SetStatus(bool enable) { 166 CriticalSectionScoped cs(critsect_.get()); 167 enabled_ = enable; 168 } 169 170 bool PacedSender::Enabled() const { 171 CriticalSectionScoped cs(critsect_.get()); 172 return enabled_; 173 } 174 175 void PacedSender::UpdateBitrate(int max_bitrate_kbps, 176 int min_bitrate_kbps) { 177 CriticalSectionScoped cs(critsect_.get()); 178 media_budget_->set_target_rate_kbps(max_bitrate_kbps); 179 padding_budget_->set_target_rate_kbps(min_bitrate_kbps); 180 } 181 182 bool PacedSender::SendPacket(Priority priority, uint32_t ssrc, 183 uint16_t sequence_number, int64_t capture_time_ms, int bytes, 184 bool retransmission) { 185 CriticalSectionScoped cs(critsect_.get()); 186 187 if (!enabled_) { 188 return true; // We can send now. 189 } 190 if (capture_time_ms < 0) { 191 capture_time_ms = clock_->TimeInMilliseconds(); 192 } 193 if (priority != kHighPriority && 194 capture_time_ms > capture_time_ms_last_queued_) { 195 capture_time_ms_last_queued_ = capture_time_ms; 196 TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms, 197 "capture_time_ms", capture_time_ms); 198 } 199 paced_sender::PacketList* packet_list = NULL; 200 switch (priority) { 201 case kHighPriority: 202 packet_list = high_priority_packets_.get(); 203 break; 204 case kNormalPriority: 205 packet_list = normal_priority_packets_.get(); 206 break; 207 case kLowPriority: 208 packet_list = low_priority_packets_.get(); 209 break; 210 } 211 packet_list->push_back(paced_sender::Packet(ssrc, 212 sequence_number, 213 capture_time_ms, 214 clock_->TimeInMilliseconds(), 215 bytes, 216 retransmission)); 217 return false; 218 } 219 220 void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) { 221 CriticalSectionScoped cs(critsect_.get()); 222 max_queue_length_ms_ = max_queue_length_ms; 223 } 224 225 int PacedSender::QueueInMs() const { 226 CriticalSectionScoped cs(critsect_.get()); 227 int64_t now_ms = clock_->TimeInMilliseconds(); 228 int64_t oldest_packet_enqueue_time = now_ms; 229 if (!high_priority_packets_->empty()) { 230 oldest_packet_enqueue_time = 231 std::min(oldest_packet_enqueue_time, 232 high_priority_packets_->front().enqueue_time_ms); 233 } 234 if (!normal_priority_packets_->empty()) { 235 oldest_packet_enqueue_time = 236 std::min(oldest_packet_enqueue_time, 237 normal_priority_packets_->front().enqueue_time_ms); 238 } 239 if (!low_priority_packets_->empty()) { 240 oldest_packet_enqueue_time = 241 std::min(oldest_packet_enqueue_time, 242 low_priority_packets_->front().enqueue_time_ms); 243 } 244 return now_ms - oldest_packet_enqueue_time; 245 } 246 247 int32_t PacedSender::TimeUntilNextProcess() { 248 CriticalSectionScoped cs(critsect_.get()); 249 int64_t elapsed_time_ms = (clock_->TimeInMicroseconds() - 250 time_last_update_us_ + 500) / 1000; 251 if (elapsed_time_ms <= 0) { 252 return kMinPacketLimitMs; 253 } 254 if (elapsed_time_ms >= kMinPacketLimitMs) { 255 return 0; 256 } 257 return kMinPacketLimitMs - elapsed_time_ms; 258 } 259 260 int32_t PacedSender::Process() { 261 int64_t now_us = clock_->TimeInMicroseconds(); 262 CriticalSectionScoped cs(critsect_.get()); 263 int elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000; 264 time_last_update_us_ = now_us; 265 if (!enabled_) { 266 return 0; 267 } 268 if (!paused_) { 269 if (elapsed_time_ms > 0) { 270 uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); 271 UpdateBytesPerInterval(delta_time_ms); 272 } 273 paced_sender::PacketList* packet_list; 274 while (ShouldSendNextPacket(&packet_list)) { 275 if (!SendPacketFromList(packet_list)) 276 return 0; 277 } 278 if (high_priority_packets_->empty() && 279 normal_priority_packets_->empty() && 280 low_priority_packets_->empty() && 281 padding_budget_->bytes_remaining() > 0) { 282 int padding_needed = padding_budget_->bytes_remaining(); 283 critsect_->Leave(); 284 int bytes_sent = callback_->TimeToSendPadding(padding_needed); 285 critsect_->Enter(); 286 media_budget_->UseBudget(bytes_sent); 287 padding_budget_->UseBudget(bytes_sent); 288 } 289 } 290 return 0; 291 } 292 293 bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list) 294 EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) { 295 paced_sender::Packet packet = GetNextPacketFromList(packet_list); 296 critsect_->Leave(); 297 298 const bool success = callback_->TimeToSendPacket(packet.ssrc, 299 packet.sequence_number, 300 packet.capture_time_ms, 301 packet.retransmission); 302 critsect_->Enter(); 303 // If packet cannot be sent then keep it in packet list and exit early. 304 // There's no need to send more packets. 305 if (!success) { 306 return false; 307 } 308 packet_list->pop_front(); 309 const bool last_packet = 310 packet_list->empty() || 311 packet_list->front().capture_time_ms > packet.capture_time_ms; 312 if (packet_list != high_priority_packets_.get()) { 313 if (packet.capture_time_ms > capture_time_ms_last_sent_) { 314 capture_time_ms_last_sent_ = packet.capture_time_ms; 315 } else if (packet.capture_time_ms == capture_time_ms_last_sent_ && 316 last_packet) { 317 TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms); 318 } 319 } 320 return true; 321 } 322 323 void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) { 324 media_budget_->IncreaseBudget(delta_time_ms); 325 padding_budget_->IncreaseBudget(delta_time_ms); 326 } 327 328 bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) { 329 *packet_list = NULL; 330 if (media_budget_->bytes_remaining() <= 0) { 331 // All bytes consumed for this interval. 332 // Check if we have not sent in a too long time. 333 if (clock_->TimeInMicroseconds() - time_last_send_us_ > 334 kMaxQueueTimeWithoutSendingUs) { 335 if (!high_priority_packets_->empty()) { 336 *packet_list = high_priority_packets_.get(); 337 return true; 338 } 339 if (!normal_priority_packets_->empty()) { 340 *packet_list = normal_priority_packets_.get(); 341 return true; 342 } 343 } 344 // Send any old packets to avoid queuing for too long. 345 if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) { 346 int64_t high_priority_capture_time = -1; 347 if (!high_priority_packets_->empty()) { 348 high_priority_capture_time = 349 high_priority_packets_->front().capture_time_ms; 350 *packet_list = high_priority_packets_.get(); 351 } 352 if (!normal_priority_packets_->empty() && 353 (high_priority_capture_time == -1 || 354 high_priority_capture_time > 355 normal_priority_packets_->front().capture_time_ms)) { 356 *packet_list = normal_priority_packets_.get(); 357 } 358 if (*packet_list) 359 return true; 360 } 361 return false; 362 } 363 if (!high_priority_packets_->empty()) { 364 *packet_list = high_priority_packets_.get(); 365 return true; 366 } 367 if (!normal_priority_packets_->empty()) { 368 *packet_list = normal_priority_packets_.get(); 369 return true; 370 } 371 if (!low_priority_packets_->empty()) { 372 *packet_list = low_priority_packets_.get(); 373 return true; 374 } 375 return false; 376 } 377 378 paced_sender::Packet PacedSender::GetNextPacketFromList( 379 paced_sender::PacketList* packets) { 380 paced_sender::Packet packet = packets->front(); 381 UpdateMediaBytesSent(packet.bytes); 382 return packet; 383 } 384 385 void PacedSender::UpdateMediaBytesSent(int num_bytes) { 386 time_last_send_us_ = clock_->TimeInMicroseconds(); 387 media_budget_->UseBudget(num_bytes); 388 padding_budget_->UseBudget(num_bytes); 389 } 390 391 } // namespace webrtc 392