Home | History | Annotate | Download | only in pacing
      1 /*
      2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/modules/pacing/include/paced_sender.h"
     12 
     13 #include <assert.h>
     14 
     15 #include <map>
     16 #include <set>
     17 
     18 #include "webrtc/modules/interface/module_common_types.h"
     19 #include "webrtc/system_wrappers/interface/clock.h"
     20 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
     21 #include "webrtc/system_wrappers/interface/trace_event.h"
     22 
     23 namespace {
     24 // Time limit in milliseconds between packet bursts.
     25 const int kMinPacketLimitMs = 5;
     26 
     27 // Upper cap on process interval, in case process has not been called in a long
     28 // time.
     29 const int kMaxIntervalTimeMs = 30;
     30 
     31 // Max time that the first packet in the queue can sit in the queue if no
     32 // packets are sent, regardless of buffer state. In practice only in effect at
     33 // low bitrates (less than 320 kbits/s).
     34 const int kMaxQueueTimeWithoutSendingUs = 30000;
     35 
     36 }  // namespace
     37 
     38 namespace webrtc {
     39 namespace paced_sender {
     40 struct Packet {
     41   Packet(uint32_t ssrc,
     42          uint16_t seq_number,
     43          int64_t capture_time_ms,
     44          int64_t enqueue_time_ms,
     45          int length_in_bytes,
     46          bool retransmission)
     47       : ssrc(ssrc),
     48         sequence_number(seq_number),
     49         capture_time_ms(capture_time_ms),
     50         enqueue_time_ms(enqueue_time_ms),
     51         bytes(length_in_bytes),
     52         retransmission(retransmission) {}
     53   uint32_t ssrc;
     54   uint16_t sequence_number;
     55   int64_t capture_time_ms;
     56   int64_t enqueue_time_ms;
     57   int bytes;
     58   bool retransmission;
     59 };
     60 
     61 // STL list style class which prevents duplicates in the list.
     62 class PacketList {
     63  public:
     64   PacketList() {};
     65 
     66   bool empty() const {
     67     return packet_list_.empty();
     68   }
     69 
     70   Packet front() const {
     71     return packet_list_.front();
     72   }
     73 
     74   void pop_front() {
     75     Packet& packet = packet_list_.front();
     76     uint16_t sequence_number = packet.sequence_number;
     77     uint32_t ssrc = packet.ssrc;
     78     packet_list_.pop_front();
     79     sequence_number_set_[ssrc].erase(sequence_number);
     80   }
     81 
     82   void push_back(const Packet& packet) {
     83     if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) ==
     84         sequence_number_set_[packet.ssrc].end()) {
     85       // Don't insert duplicates.
     86       packet_list_.push_back(packet);
     87       sequence_number_set_[packet.ssrc].insert(packet.sequence_number);
     88     }
     89   }
     90 
     91  private:
     92   std::list<Packet> packet_list_;
     93   std::map<uint32_t, std::set<uint16_t> > sequence_number_set_;
     94 };
     95 
     96 class IntervalBudget {
     97  public:
     98   explicit IntervalBudget(int initial_target_rate_kbps)
     99       : target_rate_kbps_(initial_target_rate_kbps),
    100         bytes_remaining_(0) {}
    101 
    102   void set_target_rate_kbps(int target_rate_kbps) {
    103     target_rate_kbps_ = target_rate_kbps;
    104   }
    105 
    106   void IncreaseBudget(int delta_time_ms) {
    107     int bytes = target_rate_kbps_ * delta_time_ms / 8;
    108     if (bytes_remaining_ < 0) {
    109       // We overused last interval, compensate this interval.
    110       bytes_remaining_ = bytes_remaining_ + bytes;
    111     } else {
    112       // If we underused last interval we can't use it this interval.
    113       bytes_remaining_ = bytes;
    114     }
    115   }
    116 
    117   void UseBudget(int bytes) {
    118     bytes_remaining_ = std::max(bytes_remaining_ - bytes,
    119                                 -500 * target_rate_kbps_ / 8);
    120   }
    121 
    122   int bytes_remaining() const { return bytes_remaining_; }
    123 
    124  private:
    125   int target_rate_kbps_;
    126   int bytes_remaining_;
    127 };
    128 }  // namespace paced_sender
    129 
    130 const float PacedSender::kDefaultPaceMultiplier = 2.5f;
    131 
    132 PacedSender::PacedSender(Clock* clock,
    133                          Callback* callback,
    134                          int max_bitrate_kbps,
    135                          int min_bitrate_kbps)
    136     : clock_(clock),
    137       callback_(callback),
    138       critsect_(CriticalSectionWrapper::CreateCriticalSection()),
    139       enabled_(true),
    140       paused_(false),
    141       max_queue_length_ms_(kDefaultMaxQueueLengthMs),
    142       media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
    143       padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
    144       time_last_update_us_(clock->TimeInMicroseconds()),
    145       capture_time_ms_last_queued_(0),
    146       capture_time_ms_last_sent_(0),
    147       high_priority_packets_(new paced_sender::PacketList),
    148       normal_priority_packets_(new paced_sender::PacketList),
    149       low_priority_packets_(new paced_sender::PacketList) {
    150   UpdateBytesPerInterval(kMinPacketLimitMs);
    151 }
    152 
    153 PacedSender::~PacedSender() {}
    154 
    155 void PacedSender::Pause() {
    156   CriticalSectionScoped cs(critsect_.get());
    157   paused_ = true;
    158 }
    159 
    160 void PacedSender::Resume() {
    161   CriticalSectionScoped cs(critsect_.get());
    162   paused_ = false;
    163 }
    164 
    165 void PacedSender::SetStatus(bool enable) {
    166   CriticalSectionScoped cs(critsect_.get());
    167   enabled_ = enable;
    168 }
    169 
    170 bool PacedSender::Enabled() const {
    171   CriticalSectionScoped cs(critsect_.get());
    172   return enabled_;
    173 }
    174 
    175 void PacedSender::UpdateBitrate(int max_bitrate_kbps,
    176                                 int min_bitrate_kbps) {
    177   CriticalSectionScoped cs(critsect_.get());
    178   media_budget_->set_target_rate_kbps(max_bitrate_kbps);
    179   padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
    180 }
    181 
    182 bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
    183     uint16_t sequence_number, int64_t capture_time_ms, int bytes,
    184     bool retransmission) {
    185   CriticalSectionScoped cs(critsect_.get());
    186 
    187   if (!enabled_) {
    188     return true;  // We can send now.
    189   }
    190   if (capture_time_ms < 0) {
    191     capture_time_ms = clock_->TimeInMilliseconds();
    192   }
    193   if (priority != kHighPriority &&
    194       capture_time_ms > capture_time_ms_last_queued_) {
    195     capture_time_ms_last_queued_ = capture_time_ms;
    196     TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
    197                              "capture_time_ms", capture_time_ms);
    198   }
    199   paced_sender::PacketList* packet_list = NULL;
    200   switch (priority) {
    201     case kHighPriority:
    202       packet_list = high_priority_packets_.get();
    203       break;
    204     case kNormalPriority:
    205       packet_list = normal_priority_packets_.get();
    206       break;
    207     case kLowPriority:
    208       packet_list = low_priority_packets_.get();
    209       break;
    210   }
    211   packet_list->push_back(paced_sender::Packet(ssrc,
    212                                               sequence_number,
    213                                               capture_time_ms,
    214                                               clock_->TimeInMilliseconds(),
    215                                               bytes,
    216                                               retransmission));
    217   return false;
    218 }
    219 
    220 void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
    221   CriticalSectionScoped cs(critsect_.get());
    222   max_queue_length_ms_ = max_queue_length_ms;
    223 }
    224 
    225 int PacedSender::QueueInMs() const {
    226   CriticalSectionScoped cs(critsect_.get());
    227   int64_t now_ms = clock_->TimeInMilliseconds();
    228   int64_t oldest_packet_enqueue_time = now_ms;
    229   if (!high_priority_packets_->empty()) {
    230     oldest_packet_enqueue_time =
    231         std::min(oldest_packet_enqueue_time,
    232                  high_priority_packets_->front().enqueue_time_ms);
    233   }
    234   if (!normal_priority_packets_->empty()) {
    235     oldest_packet_enqueue_time =
    236         std::min(oldest_packet_enqueue_time,
    237                  normal_priority_packets_->front().enqueue_time_ms);
    238   }
    239   if (!low_priority_packets_->empty()) {
    240     oldest_packet_enqueue_time =
    241         std::min(oldest_packet_enqueue_time,
    242                  low_priority_packets_->front().enqueue_time_ms);
    243   }
    244   return now_ms - oldest_packet_enqueue_time;
    245 }
    246 
    247 int32_t PacedSender::TimeUntilNextProcess() {
    248   CriticalSectionScoped cs(critsect_.get());
    249   int64_t elapsed_time_ms = (clock_->TimeInMicroseconds() -
    250       time_last_update_us_ + 500) / 1000;
    251   if (elapsed_time_ms <= 0) {
    252     return kMinPacketLimitMs;
    253   }
    254   if (elapsed_time_ms >= kMinPacketLimitMs) {
    255     return 0;
    256   }
    257   return kMinPacketLimitMs - elapsed_time_ms;
    258 }
    259 
    260 int32_t PacedSender::Process() {
    261   int64_t now_us = clock_->TimeInMicroseconds();
    262   CriticalSectionScoped cs(critsect_.get());
    263   int elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
    264   time_last_update_us_ = now_us;
    265   if (!enabled_) {
    266     return 0;
    267   }
    268   if (!paused_) {
    269     if (elapsed_time_ms > 0) {
    270       uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
    271       UpdateBytesPerInterval(delta_time_ms);
    272     }
    273     paced_sender::PacketList* packet_list;
    274     while (ShouldSendNextPacket(&packet_list)) {
    275       if (!SendPacketFromList(packet_list))
    276         return 0;
    277     }
    278     if (high_priority_packets_->empty() &&
    279         normal_priority_packets_->empty() &&
    280         low_priority_packets_->empty() &&
    281         padding_budget_->bytes_remaining() > 0) {
    282       int padding_needed = padding_budget_->bytes_remaining();
    283       critsect_->Leave();
    284       int bytes_sent = callback_->TimeToSendPadding(padding_needed);
    285       critsect_->Enter();
    286       media_budget_->UseBudget(bytes_sent);
    287       padding_budget_->UseBudget(bytes_sent);
    288     }
    289   }
    290   return 0;
    291 }
    292 
    293 bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
    294     EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
    295   paced_sender::Packet packet = GetNextPacketFromList(packet_list);
    296   critsect_->Leave();
    297 
    298   const bool success = callback_->TimeToSendPacket(packet.ssrc,
    299                                                    packet.sequence_number,
    300                                                    packet.capture_time_ms,
    301                                                    packet.retransmission);
    302   critsect_->Enter();
    303   // If packet cannot be sent then keep it in packet list and exit early.
    304   // There's no need to send more packets.
    305   if (!success) {
    306     return false;
    307   }
    308   packet_list->pop_front();
    309   const bool last_packet =
    310       packet_list->empty() ||
    311       packet_list->front().capture_time_ms > packet.capture_time_ms;
    312   if (packet_list != high_priority_packets_.get()) {
    313     if (packet.capture_time_ms > capture_time_ms_last_sent_) {
    314       capture_time_ms_last_sent_ = packet.capture_time_ms;
    315     } else if (packet.capture_time_ms == capture_time_ms_last_sent_ &&
    316                last_packet) {
    317       TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms);
    318     }
    319   }
    320   return true;
    321 }
    322 
    323 void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
    324   media_budget_->IncreaseBudget(delta_time_ms);
    325   padding_budget_->IncreaseBudget(delta_time_ms);
    326 }
    327 
    328 bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
    329   *packet_list = NULL;
    330   if (media_budget_->bytes_remaining() <= 0) {
    331     // All bytes consumed for this interval.
    332     // Check if we have not sent in a too long time.
    333     if (clock_->TimeInMicroseconds() - time_last_send_us_ >
    334         kMaxQueueTimeWithoutSendingUs) {
    335       if (!high_priority_packets_->empty()) {
    336         *packet_list = high_priority_packets_.get();
    337         return true;
    338       }
    339       if (!normal_priority_packets_->empty()) {
    340         *packet_list = normal_priority_packets_.get();
    341         return true;
    342       }
    343     }
    344     // Send any old packets to avoid queuing for too long.
    345     if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
    346       int64_t high_priority_capture_time = -1;
    347       if (!high_priority_packets_->empty()) {
    348         high_priority_capture_time =
    349             high_priority_packets_->front().capture_time_ms;
    350         *packet_list = high_priority_packets_.get();
    351       }
    352       if (!normal_priority_packets_->empty() &&
    353           (high_priority_capture_time == -1 ||
    354            high_priority_capture_time >
    355                normal_priority_packets_->front().capture_time_ms)) {
    356         *packet_list = normal_priority_packets_.get();
    357       }
    358       if (*packet_list)
    359         return true;
    360     }
    361     return false;
    362   }
    363   if (!high_priority_packets_->empty()) {
    364     *packet_list = high_priority_packets_.get();
    365     return true;
    366   }
    367   if (!normal_priority_packets_->empty()) {
    368     *packet_list = normal_priority_packets_.get();
    369     return true;
    370   }
    371   if (!low_priority_packets_->empty()) {
    372     *packet_list = low_priority_packets_.get();
    373     return true;
    374   }
    375   return false;
    376 }
    377 
    378 paced_sender::Packet PacedSender::GetNextPacketFromList(
    379     paced_sender::PacketList* packets) {
    380   paced_sender::Packet packet = packets->front();
    381   UpdateMediaBytesSent(packet.bytes);
    382   return packet;
    383 }
    384 
    385 void PacedSender::UpdateMediaBytesSent(int num_bytes) {
    386   time_last_send_us_ = clock_->TimeInMicroseconds();
    387   media_budget_->UseBudget(num_bytes);
    388   padding_budget_->UseBudget(num_bytes);
    389 }
    390 
    391 }  // namespace webrtc
    392