Home | History | Annotate | Download | only in pacing
      1 /*
      2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/modules/pacing/include/paced_sender.h"
     12 
     13 #include <assert.h>
     14 
     15 #include "webrtc/modules/interface/module_common_types.h"
     16 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
     17 #include "webrtc/system_wrappers/interface/trace_event.h"
     18 
     19 namespace {
     20 // Time limit in milliseconds between packet bursts.
     21 const int kMinPacketLimitMs = 5;
     22 
     23 // Upper cap on process interval, in case process has not been called in a long
     24 // time.
     25 const int kMaxIntervalTimeMs = 30;
     26 
     27 // Max time that the first packet in the queue can sit in the queue if no
     28 // packets are sent, regardless of buffer state. In practice only in effect at
     29 // low bitrates (less than 320 kbits/s).
     30 const int kMaxQueueTimeWithoutSendingMs = 30;
     31 
     32 }  // namespace
     33 
     34 namespace webrtc {
     35 
     36 namespace paced_sender {
     37 struct Packet {
     38   Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms,
     39          int64_t enqueue_time_ms, int length_in_bytes, bool retransmission)
     40       : ssrc_(ssrc),
     41         sequence_number_(seq_number),
     42         capture_time_ms_(capture_time_ms),
     43         enqueue_time_ms_(enqueue_time_ms),
     44         bytes_(length_in_bytes),
     45         retransmission_(retransmission) {
     46   }
     47   uint32_t ssrc_;
     48   uint16_t sequence_number_;
     49   int64_t capture_time_ms_;
     50   int64_t enqueue_time_ms_;
     51   int bytes_;
     52   bool retransmission_;
     53 };
     54 
     55 // STL list style class which prevents duplicates in the list.
     56 class PacketList {
     57  public:
     58   PacketList() {};
     59 
     60   bool empty() const {
     61     return packet_list_.empty();
     62   }
     63 
     64   Packet front() const {
     65     return packet_list_.front();
     66   }
     67 
     68   void pop_front() {
     69     Packet& packet = packet_list_.front();
     70     uint16_t sequence_number = packet.sequence_number_;
     71     packet_list_.pop_front();
     72     sequence_number_set_.erase(sequence_number);
     73   }
     74 
     75   void push_back(const Packet& packet) {
     76     if (sequence_number_set_.find(packet.sequence_number_) ==
     77         sequence_number_set_.end()) {
     78       // Don't insert duplicates.
     79       packet_list_.push_back(packet);
     80       sequence_number_set_.insert(packet.sequence_number_);
     81     }
     82   }
     83 
     84  private:
     85   std::list<Packet> packet_list_;
     86   std::set<uint16_t> sequence_number_set_;
     87 };
     88 
     89 class IntervalBudget {
     90  public:
     91   explicit IntervalBudget(int initial_target_rate_kbps)
     92       : target_rate_kbps_(initial_target_rate_kbps),
     93         bytes_remaining_(0) {}
     94 
     95   void set_target_rate_kbps(int target_rate_kbps) {
     96     target_rate_kbps_ = target_rate_kbps;
     97   }
     98 
     99   void IncreaseBudget(int delta_time_ms) {
    100     int bytes = target_rate_kbps_ * delta_time_ms / 8;
    101     if (bytes_remaining_ < 0) {
    102       // We overused last interval, compensate this interval.
    103       bytes_remaining_ = bytes_remaining_ + bytes;
    104     } else {
    105       // If we underused last interval we can't use it this interval.
    106       bytes_remaining_ = bytes;
    107     }
    108   }
    109 
    110   void UseBudget(int bytes) {
    111     bytes_remaining_ = std::max(bytes_remaining_ - bytes,
    112                                 -500 * target_rate_kbps_ / 8);
    113   }
    114 
    115   int bytes_remaining() const { return bytes_remaining_; }
    116 
    117  private:
    118   int target_rate_kbps_;
    119   int bytes_remaining_;
    120 };
    121 }  // namespace paced_sender
    122 
    123 PacedSender::PacedSender(Callback* callback,
    124                          int max_bitrate_kbps,
    125                          int min_bitrate_kbps)
    126     : callback_(callback),
    127       enabled_(true),
    128       paused_(false),
    129       max_queue_length_ms_(kDefaultMaxQueueLengthMs),
    130       critsect_(CriticalSectionWrapper::CreateCriticalSection()),
    131       media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
    132       padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
    133       time_last_update_(TickTime::Now()),
    134       capture_time_ms_last_queued_(0),
    135       capture_time_ms_last_sent_(0),
    136       high_priority_packets_(new paced_sender::PacketList),
    137       normal_priority_packets_(new paced_sender::PacketList),
    138       low_priority_packets_(new paced_sender::PacketList) {
    139   UpdateBytesPerInterval(kMinPacketLimitMs);
    140 }
    141 
    142 PacedSender::~PacedSender() {
    143 }
    144 
    145 void PacedSender::Pause() {
    146   CriticalSectionScoped cs(critsect_.get());
    147   paused_ = true;
    148 }
    149 
    150 void PacedSender::Resume() {
    151   CriticalSectionScoped cs(critsect_.get());
    152   paused_ = false;
    153 }
    154 
    155 void PacedSender::SetStatus(bool enable) {
    156   CriticalSectionScoped cs(critsect_.get());
    157   enabled_ = enable;
    158 }
    159 
    160 bool PacedSender::Enabled() const {
    161   CriticalSectionScoped cs(critsect_.get());
    162   return enabled_;
    163 }
    164 
    165 void PacedSender::UpdateBitrate(int max_bitrate_kbps,
    166                                 int min_bitrate_kbps) {
    167   CriticalSectionScoped cs(critsect_.get());
    168   media_budget_->set_target_rate_kbps(max_bitrate_kbps);
    169   padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
    170 }
    171 
    172 bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
    173     uint16_t sequence_number, int64_t capture_time_ms, int bytes,
    174     bool retransmission) {
    175   CriticalSectionScoped cs(critsect_.get());
    176 
    177   if (!enabled_) {
    178     return true;  // We can send now.
    179   }
    180   if (capture_time_ms < 0) {
    181     capture_time_ms = TickTime::MillisecondTimestamp();
    182   }
    183   if (priority != kHighPriority &&
    184       capture_time_ms > capture_time_ms_last_queued_) {
    185     capture_time_ms_last_queued_ = capture_time_ms;
    186     TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
    187                              "capture_time_ms", capture_time_ms);
    188   }
    189   paced_sender::PacketList* packet_list = NULL;
    190   switch (priority) {
    191     case kHighPriority:
    192       packet_list = high_priority_packets_.get();
    193       break;
    194     case kNormalPriority:
    195       packet_list = normal_priority_packets_.get();
    196       break;
    197     case kLowPriority:
    198       packet_list = low_priority_packets_.get();
    199       break;
    200   }
    201   packet_list->push_back(paced_sender::Packet(ssrc,
    202                                               sequence_number,
    203                                               capture_time_ms,
    204                                               TickTime::MillisecondTimestamp(),
    205                                               bytes,
    206                                               retransmission));
    207   return false;
    208 }
    209 
    210 void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
    211   CriticalSectionScoped cs(critsect_.get());
    212   max_queue_length_ms_ = max_queue_length_ms;
    213 }
    214 
    215 int PacedSender::QueueInMs() const {
    216   CriticalSectionScoped cs(critsect_.get());
    217   int64_t now_ms = TickTime::MillisecondTimestamp();
    218   int64_t oldest_packet_enqueue_time = now_ms;
    219   if (!high_priority_packets_->empty()) {
    220     oldest_packet_enqueue_time = std::min(
    221         oldest_packet_enqueue_time,
    222         high_priority_packets_->front().enqueue_time_ms_);
    223   }
    224   if (!normal_priority_packets_->empty()) {
    225     oldest_packet_enqueue_time = std::min(
    226         oldest_packet_enqueue_time,
    227         normal_priority_packets_->front().enqueue_time_ms_);
    228   }
    229   if (!low_priority_packets_->empty()) {
    230     oldest_packet_enqueue_time = std::min(
    231         oldest_packet_enqueue_time,
    232         low_priority_packets_->front().enqueue_time_ms_);
    233   }
    234   return now_ms - oldest_packet_enqueue_time;
    235 }
    236 
    237 int32_t PacedSender::TimeUntilNextProcess() {
    238   CriticalSectionScoped cs(critsect_.get());
    239   int64_t elapsed_time_ms =
    240       (TickTime::Now() - time_last_update_).Milliseconds();
    241   if (elapsed_time_ms <= 0) {
    242     return kMinPacketLimitMs;
    243   }
    244   if (elapsed_time_ms >= kMinPacketLimitMs) {
    245     return 0;
    246   }
    247   return kMinPacketLimitMs - elapsed_time_ms;
    248 }
    249 
    250 int32_t PacedSender::Process() {
    251   TickTime now = TickTime::Now();
    252   CriticalSectionScoped cs(critsect_.get());
    253   int elapsed_time_ms = (now - time_last_update_).Milliseconds();
    254   time_last_update_ = now;
    255   if (!enabled_) {
    256     return 0;
    257   }
    258   if (!paused_) {
    259     if (elapsed_time_ms > 0) {
    260       uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
    261       UpdateBytesPerInterval(delta_time_ms);
    262     }
    263     paced_sender::PacketList* packet_list;
    264     while (ShouldSendNextPacket(&packet_list)) {
    265       if (!SendPacketFromList(packet_list))
    266         return 0;
    267     }
    268     if (high_priority_packets_->empty() &&
    269         normal_priority_packets_->empty() &&
    270         low_priority_packets_->empty() &&
    271         padding_budget_->bytes_remaining() > 0) {
    272       int padding_needed = padding_budget_->bytes_remaining();
    273       critsect_->Leave();
    274       int bytes_sent = callback_->TimeToSendPadding(padding_needed);
    275       critsect_->Enter();
    276       media_budget_->UseBudget(bytes_sent);
    277       padding_budget_->UseBudget(bytes_sent);
    278     }
    279   }
    280   return 0;
    281 }
    282 
    283 // MUST have critsect_ when calling.
    284 bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
    285     EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
    286   paced_sender::Packet packet = GetNextPacketFromList(packet_list);
    287   critsect_->Leave();
    288 
    289   const bool success = callback_->TimeToSendPacket(packet.ssrc_,
    290                                                    packet.sequence_number_,
    291                                                    packet.capture_time_ms_,
    292                                                    packet.retransmission_);
    293   critsect_->Enter();
    294   // If packet cannot be sent then keep it in packet list and exit early.
    295   // There's no need to send more packets.
    296   if (!success) {
    297     return false;
    298   }
    299   packet_list->pop_front();
    300   const bool last_packet = packet_list->empty() ||
    301       packet_list->front().capture_time_ms_ > packet.capture_time_ms_;
    302   if (packet_list != high_priority_packets_.get()) {
    303     if (packet.capture_time_ms_ > capture_time_ms_last_sent_) {
    304       capture_time_ms_last_sent_ = packet.capture_time_ms_;
    305     } else if (packet.capture_time_ms_ == capture_time_ms_last_sent_ &&
    306                last_packet) {
    307       TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend",
    308           packet.capture_time_ms_);
    309     }
    310   }
    311   return true;
    312 }
    313 
    314 // MUST have critsect_ when calling.
    315 void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
    316   media_budget_->IncreaseBudget(delta_time_ms);
    317   padding_budget_->IncreaseBudget(delta_time_ms);
    318 }
    319 
    320 // MUST have critsect_ when calling.
    321 bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
    322   *packet_list = NULL;
    323   if (media_budget_->bytes_remaining() <= 0) {
    324     // All bytes consumed for this interval.
    325     // Check if we have not sent in a too long time.
    326     if ((TickTime::Now() - time_last_send_).Milliseconds() >
    327         kMaxQueueTimeWithoutSendingMs) {
    328       if (!high_priority_packets_->empty()) {
    329         *packet_list = high_priority_packets_.get();
    330         return true;
    331       }
    332       if (!normal_priority_packets_->empty()) {
    333         *packet_list = normal_priority_packets_.get();
    334         return true;
    335       }
    336     }
    337     // Send any old packets to avoid queuing for too long.
    338     if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
    339       int64_t high_priority_capture_time = -1;
    340       if (!high_priority_packets_->empty()) {
    341         high_priority_capture_time =
    342             high_priority_packets_->front().capture_time_ms_;
    343         *packet_list = high_priority_packets_.get();
    344       }
    345       if (!normal_priority_packets_->empty() &&
    346           (high_priority_capture_time == -1 || high_priority_capture_time >
    347           normal_priority_packets_->front().capture_time_ms_)) {
    348         *packet_list = normal_priority_packets_.get();
    349       }
    350       if (*packet_list)
    351         return true;
    352     }
    353     return false;
    354   }
    355   if (!high_priority_packets_->empty()) {
    356     *packet_list = high_priority_packets_.get();
    357     return true;
    358   }
    359   if (!normal_priority_packets_->empty()) {
    360     *packet_list = normal_priority_packets_.get();
    361     return true;
    362   }
    363   if (!low_priority_packets_->empty()) {
    364     *packet_list = low_priority_packets_.get();
    365     return true;
    366   }
    367   return false;
    368 }
    369 
    370 paced_sender::Packet PacedSender::GetNextPacketFromList(
    371     paced_sender::PacketList* packets) {
    372   paced_sender::Packet packet = packets->front();
    373   UpdateMediaBytesSent(packet.bytes_);
    374   return packet;
    375 }
    376 
    377 // MUST have critsect_ when calling.
    378 void PacedSender::UpdateMediaBytesSent(int num_bytes) {
    379   time_last_send_ = TickTime::Now();
    380   media_budget_->UseBudget(num_bytes);
    381   padding_budget_->UseBudget(num_bytes);
    382 }
    383 
    384 }  // namespace webrtc
    385