Home | History | Annotate | Download | only in pacing
      1 /*
      2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/modules/pacing/paced_sender.h"
     12 
     13 #include <map>
     14 #include <queue>
     15 #include <set>
     16 
     17 #include "webrtc/base/checks.h"
     18 #include "webrtc/base/logging.h"
     19 #include "webrtc/modules/include/module_common_types.h"
     20 #include "webrtc/modules/pacing/bitrate_prober.h"
     21 #include "webrtc/system_wrappers/include/clock.h"
     22 #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
     23 #include "webrtc/system_wrappers/include/field_trial.h"
     24 
     25 namespace {
     26 // Time limit in milliseconds between packet bursts.
     27 const int64_t kMinPacketLimitMs = 5;
     28 
     29 // Upper cap on process interval, in case process has not been called in a long
     30 // time.
     31 const int64_t kMaxIntervalTimeMs = 30;
     32 
     33 }  // namespace
     34 
     35 // TODO(sprang): Move at least PacketQueue and MediaBudget out to separate
     36 // files, so that we can more easily test them.
     37 
     38 namespace webrtc {
     39 namespace paced_sender {
     40 struct Packet {
     41   Packet(RtpPacketSender::Priority priority,
     42          uint32_t ssrc,
     43          uint16_t seq_number,
     44          int64_t capture_time_ms,
     45          int64_t enqueue_time_ms,
     46          size_t length_in_bytes,
     47          bool retransmission,
     48          uint64_t enqueue_order)
     49       : priority(priority),
     50         ssrc(ssrc),
     51         sequence_number(seq_number),
     52         capture_time_ms(capture_time_ms),
     53         enqueue_time_ms(enqueue_time_ms),
     54         bytes(length_in_bytes),
     55         retransmission(retransmission),
     56         enqueue_order(enqueue_order) {}
     57 
     58   RtpPacketSender::Priority priority;
     59   uint32_t ssrc;
     60   uint16_t sequence_number;
     61   int64_t capture_time_ms;
     62   int64_t enqueue_time_ms;
     63   size_t bytes;
     64   bool retransmission;
     65   uint64_t enqueue_order;
     66   std::list<Packet>::iterator this_it;
     67 };
     68 
     69 // Used by priority queue to sort packets.
     70 struct Comparator {
     71   bool operator()(const Packet* first, const Packet* second) {
     72     // Highest prio = 0.
     73     if (first->priority != second->priority)
     74       return first->priority > second->priority;
     75 
     76     // Retransmissions go first.
     77     if (second->retransmission && !first->retransmission)
     78       return true;
     79 
     80     // Older frames have higher prio.
     81     if (first->capture_time_ms != second->capture_time_ms)
     82       return first->capture_time_ms > second->capture_time_ms;
     83 
     84     return first->enqueue_order > second->enqueue_order;
     85   }
     86 };
     87 
     88 // Class encapsulating a priority queue with some extensions.
     89 class PacketQueue {
     90  public:
     91   explicit PacketQueue(Clock* clock)
     92       : bytes_(0),
     93         clock_(clock),
     94         queue_time_sum_(0),
     95         time_last_updated_(clock_->TimeInMilliseconds()) {}
     96   virtual ~PacketQueue() {}
     97 
     98   void Push(const Packet& packet) {
     99     if (!AddToDupeSet(packet))
    100       return;
    101 
    102     UpdateQueueTime(packet.enqueue_time_ms);
    103 
    104     // Store packet in list, use pointers in priority queue for cheaper moves.
    105     // Packets have a handle to its own iterator in the list, for easy removal
    106     // when popping from queue.
    107     packet_list_.push_front(packet);
    108     std::list<Packet>::iterator it = packet_list_.begin();
    109     it->this_it = it;          // Handle for direct removal from list.
    110     prio_queue_.push(&(*it));  // Pointer into list.
    111     bytes_ += packet.bytes;
    112   }
    113 
    114   const Packet& BeginPop() {
    115     const Packet& packet = *prio_queue_.top();
    116     prio_queue_.pop();
    117     return packet;
    118   }
    119 
    120   void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); }
    121 
    122   void FinalizePop(const Packet& packet) {
    123     RemoveFromDupeSet(packet);
    124     bytes_ -= packet.bytes;
    125     queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms);
    126     packet_list_.erase(packet.this_it);
    127     RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
    128     if (packet_list_.empty())
    129       RTC_DCHECK_EQ(0u, queue_time_sum_);
    130   }
    131 
    132   bool Empty() const { return prio_queue_.empty(); }
    133 
    134   size_t SizeInPackets() const { return prio_queue_.size(); }
    135 
    136   uint64_t SizeInBytes() const { return bytes_; }
    137 
    138   int64_t OldestEnqueueTimeMs() const {
    139     auto it = packet_list_.rbegin();
    140     if (it == packet_list_.rend())
    141       return 0;
    142     return it->enqueue_time_ms;
    143   }
    144 
    145   void UpdateQueueTime(int64_t timestamp_ms) {
    146     RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
    147     int64_t delta = timestamp_ms - time_last_updated_;
    148     // Use packet packet_list_.size() not prio_queue_.size() here, as there
    149     // might be an outstanding element popped from prio_queue_ currently in the
    150     // SendPacket() call, while packet_list_ will always be correct.
    151     queue_time_sum_ += delta * packet_list_.size();
    152     time_last_updated_ = timestamp_ms;
    153   }
    154 
    155   int64_t AverageQueueTimeMs() const {
    156     if (prio_queue_.empty())
    157       return 0;
    158     return queue_time_sum_ / packet_list_.size();
    159   }
    160 
    161  private:
    162   // Try to add a packet to the set of ssrc/seqno identifiers currently in the
    163   // queue. Return true if inserted, false if this is a duplicate.
    164   bool AddToDupeSet(const Packet& packet) {
    165     SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
    166     if (it == dupe_map_.end()) {
    167       // First for this ssrc, just insert.
    168       dupe_map_[packet.ssrc].insert(packet.sequence_number);
    169       return true;
    170     }
    171 
    172     // Insert returns a pair, where second is a bool set to true if new element.
    173     return it->second.insert(packet.sequence_number).second;
    174   }
    175 
    176   void RemoveFromDupeSet(const Packet& packet) {
    177     SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
    178     RTC_DCHECK(it != dupe_map_.end());
    179     it->second.erase(packet.sequence_number);
    180     if (it->second.empty()) {
    181       dupe_map_.erase(it);
    182     }
    183   }
    184 
    185   // List of packets, in the order the were enqueued. Since dequeueing may
    186   // occur out of order, use list instead of vector.
    187   std::list<Packet> packet_list_;
    188   // Priority queue of the packets, sorted according to Comparator.
    189   // Use pointers into list, to avoid moving whole struct within heap.
    190   std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
    191   // Total number of bytes in the queue.
    192   uint64_t bytes_;
    193   // Map<ssrc, set<seq_no> >, for checking duplicates.
    194   typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
    195   SsrcSeqNoMap dupe_map_;
    196   Clock* const clock_;
    197   int64_t queue_time_sum_;
    198   int64_t time_last_updated_;
    199 };
    200 
    201 class IntervalBudget {
    202  public:
    203   explicit IntervalBudget(int initial_target_rate_kbps)
    204       : target_rate_kbps_(initial_target_rate_kbps),
    205         bytes_remaining_(0) {}
    206 
    207   void set_target_rate_kbps(int target_rate_kbps) {
    208     target_rate_kbps_ = target_rate_kbps;
    209     bytes_remaining_ =
    210         std::max(-kWindowMs * target_rate_kbps_ / 8, bytes_remaining_);
    211   }
    212 
    213   void IncreaseBudget(int64_t delta_time_ms) {
    214     int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;
    215     if (bytes_remaining_ < 0) {
    216       // We overused last interval, compensate this interval.
    217       bytes_remaining_ = bytes_remaining_ + bytes;
    218     } else {
    219       // If we underused last interval we can't use it this interval.
    220       bytes_remaining_ = bytes;
    221     }
    222   }
    223 
    224   void UseBudget(size_t bytes) {
    225     bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes),
    226                                 -kWindowMs * target_rate_kbps_ / 8);
    227   }
    228 
    229   size_t bytes_remaining() const {
    230     return static_cast<size_t>(std::max(0, bytes_remaining_));
    231   }
    232 
    233   int target_rate_kbps() const { return target_rate_kbps_; }
    234 
    235  private:
    236   static const int kWindowMs = 500;
    237 
    238   int target_rate_kbps_;
    239   int bytes_remaining_;
    240 };
    241 }  // namespace paced_sender
    242 
    243 const int64_t PacedSender::kMaxQueueLengthMs = 2000;
    244 const float PacedSender::kDefaultPaceMultiplier = 2.5f;
    245 
    246 PacedSender::PacedSender(Clock* clock,
    247                          Callback* callback,
    248                          int bitrate_kbps,
    249                          int max_bitrate_kbps,
    250                          int min_bitrate_kbps)
    251     : clock_(clock),
    252       callback_(callback),
    253       critsect_(CriticalSectionWrapper::CreateCriticalSection()),
    254       paused_(false),
    255       probing_enabled_(true),
    256       media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
    257       padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
    258       prober_(new BitrateProber()),
    259       bitrate_bps_(1000 * bitrate_kbps),
    260       max_bitrate_kbps_(max_bitrate_kbps),
    261       time_last_update_us_(clock->TimeInMicroseconds()),
    262       packets_(new paced_sender::PacketQueue(clock)),
    263       packet_counter_(0) {
    264   UpdateBytesPerInterval(kMinPacketLimitMs);
    265 }
    266 
    267 PacedSender::~PacedSender() {}
    268 
    269 void PacedSender::Pause() {
    270   CriticalSectionScoped cs(critsect_.get());
    271   paused_ = true;
    272 }
    273 
    274 void PacedSender::Resume() {
    275   CriticalSectionScoped cs(critsect_.get());
    276   paused_ = false;
    277 }
    278 
    279 void PacedSender::SetProbingEnabled(bool enabled) {
    280   RTC_CHECK_EQ(0u, packet_counter_);
    281   probing_enabled_ = enabled;
    282 }
    283 
    284 void PacedSender::UpdateBitrate(int bitrate_kbps,
    285                                 int max_bitrate_kbps,
    286                                 int min_bitrate_kbps) {
    287   CriticalSectionScoped cs(critsect_.get());
    288   // Don't set media bitrate here as it may be boosted in order to meet max
    289   // queue time constraint. Just update max_bitrate_kbps_ and let media_budget_
    290   // be updated in Process().
    291   padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
    292   bitrate_bps_ = 1000 * bitrate_kbps;
    293   max_bitrate_kbps_ = max_bitrate_kbps;
    294 }
    295 
    296 void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
    297                                uint32_t ssrc,
    298                                uint16_t sequence_number,
    299                                int64_t capture_time_ms,
    300                                size_t bytes,
    301                                bool retransmission) {
    302   CriticalSectionScoped cs(critsect_.get());
    303 
    304   if (probing_enabled_ && !prober_->IsProbing())
    305     prober_->SetEnabled(true);
    306   prober_->MaybeInitializeProbe(bitrate_bps_);
    307 
    308   int64_t now_ms = clock_->TimeInMilliseconds();
    309   if (capture_time_ms < 0)
    310     capture_time_ms = now_ms;
    311 
    312   packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number,
    313                                       capture_time_ms, now_ms, bytes,
    314                                       retransmission, packet_counter_++));
    315 }
    316 
    317 int64_t PacedSender::ExpectedQueueTimeMs() const {
    318   CriticalSectionScoped cs(critsect_.get());
    319   RTC_DCHECK_GT(max_bitrate_kbps_, 0);
    320   return static_cast<int64_t>(packets_->SizeInBytes() * 8 / max_bitrate_kbps_);
    321 }
    322 
    323 size_t PacedSender::QueueSizePackets() const {
    324   CriticalSectionScoped cs(critsect_.get());
    325   return packets_->SizeInPackets();
    326 }
    327 
    328 int64_t PacedSender::QueueInMs() const {
    329   CriticalSectionScoped cs(critsect_.get());
    330 
    331   int64_t oldest_packet = packets_->OldestEnqueueTimeMs();
    332   if (oldest_packet == 0)
    333     return 0;
    334 
    335   return clock_->TimeInMilliseconds() - oldest_packet;
    336 }
    337 
    338 int64_t PacedSender::AverageQueueTimeMs() {
    339   CriticalSectionScoped cs(critsect_.get());
    340   packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
    341   return packets_->AverageQueueTimeMs();
    342 }
    343 
    344 int64_t PacedSender::TimeUntilNextProcess() {
    345   CriticalSectionScoped cs(critsect_.get());
    346   if (prober_->IsProbing()) {
    347     int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
    348     if (ret >= 0)
    349       return ret;
    350   }
    351   int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
    352   int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
    353   return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
    354 }
    355 
    356 int32_t PacedSender::Process() {
    357   int64_t now_us = clock_->TimeInMicroseconds();
    358   CriticalSectionScoped cs(critsect_.get());
    359   int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
    360   time_last_update_us_ = now_us;
    361   int target_bitrate_kbps = max_bitrate_kbps_;
    362   // TODO(holmer): Remove the !paused_ check when issue 5307 has been fixed.
    363   if (!paused_ && elapsed_time_ms > 0) {
    364     size_t queue_size_bytes = packets_->SizeInBytes();
    365     if (queue_size_bytes > 0) {
    366       // Assuming equal size packets and input/output rate, the average packet
    367       // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
    368       // time constraint shall be met. Determine bitrate needed for that.
    369       packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
    370       int64_t avg_time_left_ms = std::max<int64_t>(
    371           1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs());
    372       int min_bitrate_needed_kbps =
    373           static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
    374       if (min_bitrate_needed_kbps > target_bitrate_kbps)
    375         target_bitrate_kbps = min_bitrate_needed_kbps;
    376     }
    377 
    378     media_budget_->set_target_rate_kbps(target_bitrate_kbps);
    379 
    380     int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
    381     UpdateBytesPerInterval(delta_time_ms);
    382   }
    383   while (!packets_->Empty()) {
    384     if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing())
    385       return 0;
    386 
    387     // Since we need to release the lock in order to send, we first pop the
    388     // element from the priority queue but keep it in storage, so that we can
    389     // reinsert it if send fails.
    390     const paced_sender::Packet& packet = packets_->BeginPop();
    391 
    392     if (SendPacket(packet)) {
    393       // Send succeeded, remove it from the queue.
    394       packets_->FinalizePop(packet);
    395       if (prober_->IsProbing())
    396         return 0;
    397     } else {
    398       // Send failed, put it back into the queue.
    399       packets_->CancelPop(packet);
    400       return 0;
    401     }
    402   }
    403 
    404   // TODO(holmer): Remove the paused_ check when issue 5307 has been fixed.
    405   if (paused_ || !packets_->Empty())
    406     return 0;
    407 
    408   size_t padding_needed;
    409   if (prober_->IsProbing()) {
    410     padding_needed = prober_->RecommendedPacketSize();
    411   } else {
    412     padding_needed = padding_budget_->bytes_remaining();
    413   }
    414 
    415   if (padding_needed > 0)
    416     SendPadding(static_cast<size_t>(padding_needed));
    417   return 0;
    418 }
    419 
    420 bool PacedSender::SendPacket(const paced_sender::Packet& packet) {
    421   // TODO(holmer): Because of this bug issue 5307 we have to send audio
    422   // packets even when the pacer is paused. Here we assume audio packets are
    423   // always high priority and that they are the only high priority packets.
    424   if (paused_ && packet.priority != kHighPriority)
    425     return false;
    426   critsect_->Leave();
    427   const bool success = callback_->TimeToSendPacket(packet.ssrc,
    428                                                    packet.sequence_number,
    429                                                    packet.capture_time_ms,
    430                                                    packet.retransmission);
    431   critsect_->Enter();
    432 
    433   // TODO(holmer): High priority packets should only be accounted for if we are
    434   // allocating bandwidth for audio.
    435   if (success && packet.priority != kHighPriority) {
    436     // Update media bytes sent.
    437     prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes);
    438     media_budget_->UseBudget(packet.bytes);
    439     padding_budget_->UseBudget(packet.bytes);
    440   }
    441 
    442   return success;
    443 }
    444 
    445 void PacedSender::SendPadding(size_t padding_needed) {
    446   critsect_->Leave();
    447   size_t bytes_sent = callback_->TimeToSendPadding(padding_needed);
    448   critsect_->Enter();
    449 
    450   if (bytes_sent > 0) {
    451     prober_->PacketSent(clock_->TimeInMilliseconds(), bytes_sent);
    452     media_budget_->UseBudget(bytes_sent);
    453     padding_budget_->UseBudget(bytes_sent);
    454   }
    455 }
    456 
    457 void PacedSender::UpdateBytesPerInterval(int64_t delta_time_ms) {
    458   media_budget_->IncreaseBudget(delta_time_ms);
    459   padding_budget_->IncreaseBudget(delta_time_ms);
    460 }
    461 }  // namespace webrtc
    462