Home | History | Annotate | Download | only in test
      1 /*
      2  *  Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/modules/remote_bitrate_estimator/test/packet_sender.h"
     12 
     13 #include <algorithm>
     14 #include <list>
     15 #include <sstream>
     16 
     17 #include "webrtc/base/checks.h"
     18 #include "webrtc/modules/include/module_common_types.h"
     19 #include "webrtc/modules/remote_bitrate_estimator/test/bwe.h"
     20 #include "webrtc/modules/remote_bitrate_estimator/test/metric_recorder.h"
     21 
     22 namespace webrtc {
     23 namespace testing {
     24 namespace bwe {
     25 
     26 void PacketSender::Pause() {
     27   running_ = false;
     28   if (metric_recorder_ != nullptr) {
     29     metric_recorder_->PauseFlow();
     30   }
     31 }
     32 
     33 void PacketSender::Resume(int64_t paused_time_ms) {
     34   running_ = true;
     35   if (metric_recorder_ != nullptr) {
     36     metric_recorder_->ResumeFlow(paused_time_ms);
     37   }
     38 }
     39 
     40 void PacketSender::set_metric_recorder(MetricRecorder* metric_recorder) {
     41   metric_recorder_ = metric_recorder;
     42 }
     43 
     44 void PacketSender::RecordBitrate() {
     45   if (metric_recorder_ != nullptr) {
     46     BWE_TEST_LOGGING_CONTEXT("Sender");
     47     BWE_TEST_LOGGING_CONTEXT(*flow_ids().begin());
     48     metric_recorder_->UpdateTimeMs(clock_.TimeInMilliseconds());
     49     metric_recorder_->UpdateSendingEstimateKbps(TargetBitrateKbps());
     50   }
     51 }
     52 
     53 std::list<FeedbackPacket*> GetFeedbackPackets(Packets* in_out,
     54                                               int64_t end_time_ms,
     55                                               int flow_id) {
     56   std::list<FeedbackPacket*> fb_packets;
     57   for (auto it = in_out->begin(); it != in_out->end();) {
     58     if ((*it)->send_time_us() > 1000 * end_time_ms)
     59       break;
     60     if ((*it)->GetPacketType() == Packet::kFeedback &&
     61         flow_id == (*it)->flow_id()) {
     62       fb_packets.push_back(static_cast<FeedbackPacket*>(*it));
     63       it = in_out->erase(it);
     64     } else {
     65       ++it;
     66     }
     67   }
     68   return fb_packets;
     69 }
     70 
     71 VideoSender::VideoSender(PacketProcessorListener* listener,
     72                          VideoSource* source,
     73                          BandwidthEstimatorType estimator_type)
     74     : PacketSender(listener, source->flow_id()),
     75       source_(source),
     76       bwe_(CreateBweSender(estimator_type,
     77                            source_->bits_per_second() / 1000,
     78                            this,
     79                            &clock_)),
     80       previous_sending_bitrate_(0) {
     81   modules_.push_back(bwe_.get());
     82 }
     83 
     84 VideoSender::~VideoSender() {
     85 }
     86 
     87 void VideoSender::Pause() {
     88   previous_sending_bitrate_ = TargetBitrateKbps();
     89   PacketSender::Pause();
     90 }
     91 
     92 void VideoSender::Resume(int64_t paused_time_ms) {
     93   source_->SetBitrateBps(previous_sending_bitrate_);
     94   PacketSender::Resume(paused_time_ms);
     95 }
     96 
     97 void VideoSender::RunFor(int64_t time_ms, Packets* in_out) {
     98   std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets(
     99       in_out, clock_.TimeInMilliseconds() + time_ms, source_->flow_id());
    100   ProcessFeedbackAndGeneratePackets(time_ms, &feedbacks, in_out);
    101 }
    102 
    103 void VideoSender::ProcessFeedbackAndGeneratePackets(
    104     int64_t time_ms,
    105     std::list<FeedbackPacket*>* feedbacks,
    106     Packets* packets) {
    107   do {
    108     // Make sure to at least run Process() below every 100 ms.
    109     int64_t time_to_run_ms = std::min<int64_t>(time_ms, 100);
    110     if (!feedbacks->empty()) {
    111       int64_t time_until_feedback_ms =
    112           feedbacks->front()->send_time_ms() - clock_.TimeInMilliseconds();
    113       time_to_run_ms =
    114           std::max<int64_t>(std::min(time_ms, time_until_feedback_ms), 0);
    115     }
    116 
    117     if (!running_) {
    118       source_->SetBitrateBps(0);
    119     }
    120 
    121     Packets generated;
    122     source_->RunFor(time_to_run_ms, &generated);
    123     bwe_->OnPacketsSent(generated);
    124     packets->merge(generated, DereferencingComparator<Packet>);
    125 
    126     clock_.AdvanceTimeMilliseconds(time_to_run_ms);
    127 
    128     if (!feedbacks->empty()) {
    129       bwe_->GiveFeedback(*feedbacks->front());
    130       delete feedbacks->front();
    131       feedbacks->pop_front();
    132     }
    133 
    134     bwe_->Process();
    135 
    136     time_ms -= time_to_run_ms;
    137   } while (time_ms > 0);
    138   assert(feedbacks->empty());
    139 }
    140 
    141 int VideoSender::GetFeedbackIntervalMs() const {
    142   return bwe_->GetFeedbackIntervalMs();
    143 }
    144 
    145 void VideoSender::OnNetworkChanged(uint32_t target_bitrate_bps,
    146                                    uint8_t fraction_lost,
    147                                    int64_t rtt) {
    148   source_->SetBitrateBps(target_bitrate_bps);
    149   RecordBitrate();
    150 }
    151 
    152 uint32_t VideoSender::TargetBitrateKbps() {
    153   return (source_->bits_per_second() + 500) / 1000;
    154 }
    155 
    156 PacedVideoSender::PacedVideoSender(PacketProcessorListener* listener,
    157                                    VideoSource* source,
    158                                    BandwidthEstimatorType estimator)
    159     : VideoSender(listener, source, estimator),
    160       pacer_(&clock_,
    161              this,
    162              source->bits_per_second() / 1000,
    163              PacedSender::kDefaultPaceMultiplier * source->bits_per_second() /
    164                  1000,
    165              0) {
    166   modules_.push_back(&pacer_);
    167 }
    168 
    169 PacedVideoSender::~PacedVideoSender() {
    170   for (Packet* packet : pacer_queue_)
    171     delete packet;
    172   for (Packet* packet : queue_)
    173     delete packet;
    174 }
    175 
    176 void PacedVideoSender::RunFor(int64_t time_ms, Packets* in_out) {
    177   int64_t end_time_ms = clock_.TimeInMilliseconds() + time_ms;
    178   // Run process periodically to allow the packets to be paced out.
    179   std::list<FeedbackPacket*> feedbacks =
    180       GetFeedbackPackets(in_out, end_time_ms, source_->flow_id());
    181   int64_t last_run_time_ms = -1;
    182   BWE_TEST_LOGGING_CONTEXT("Sender");
    183   BWE_TEST_LOGGING_CONTEXT(source_->flow_id());
    184   do {
    185     int64_t time_until_process_ms = TimeUntilNextProcess(modules_);
    186     int64_t time_until_feedback_ms = time_ms;
    187     if (!feedbacks.empty())
    188       time_until_feedback_ms = std::max<int64_t>(
    189           feedbacks.front()->send_time_ms() - clock_.TimeInMilliseconds(), 0);
    190 
    191     int64_t time_until_next_event_ms =
    192         std::min(time_until_feedback_ms, time_until_process_ms);
    193 
    194     time_until_next_event_ms =
    195         std::min(source_->GetTimeUntilNextFrameMs(), time_until_next_event_ms);
    196 
    197     // Never run for longer than we have been asked for.
    198     if (clock_.TimeInMilliseconds() + time_until_next_event_ms > end_time_ms)
    199       time_until_next_event_ms = end_time_ms - clock_.TimeInMilliseconds();
    200 
    201     // Make sure we don't get stuck if an event doesn't trigger. This typically
    202     // happens if the prober wants to probe, but there's no packet to send.
    203     if (time_until_next_event_ms == 0 && last_run_time_ms == 0)
    204       time_until_next_event_ms = 1;
    205     last_run_time_ms = time_until_next_event_ms;
    206 
    207     Packets generated_packets;
    208     source_->RunFor(time_until_next_event_ms, &generated_packets);
    209     if (!generated_packets.empty()) {
    210       for (Packet* packet : generated_packets) {
    211         MediaPacket* media_packet = static_cast<MediaPacket*>(packet);
    212         pacer_.InsertPacket(
    213             PacedSender::kNormalPriority, media_packet->header().ssrc,
    214             media_packet->header().sequenceNumber, media_packet->send_time_ms(),
    215             media_packet->payload_size(), false);
    216         pacer_queue_.push_back(packet);
    217         assert(pacer_queue_.size() < 10000);
    218       }
    219     }
    220 
    221     clock_.AdvanceTimeMilliseconds(time_until_next_event_ms);
    222 
    223     if (time_until_next_event_ms == time_until_feedback_ms) {
    224       if (!feedbacks.empty()) {
    225         bwe_->GiveFeedback(*feedbacks.front());
    226         delete feedbacks.front();
    227         feedbacks.pop_front();
    228       }
    229       bwe_->Process();
    230     }
    231 
    232     if (time_until_next_event_ms == time_until_process_ms) {
    233       CallProcess(modules_);
    234     }
    235   } while (clock_.TimeInMilliseconds() < end_time_ms);
    236   QueuePackets(in_out, end_time_ms * 1000);
    237 }
    238 
    239 int64_t PacedVideoSender::TimeUntilNextProcess(
    240     const std::list<Module*>& modules) {
    241   int64_t time_until_next_process_ms = 10;
    242   for (Module* module : modules) {
    243     int64_t next_process_ms = module->TimeUntilNextProcess();
    244     if (next_process_ms < time_until_next_process_ms)
    245       time_until_next_process_ms = next_process_ms;
    246   }
    247   if (time_until_next_process_ms < 0)
    248     time_until_next_process_ms = 0;
    249   return time_until_next_process_ms;
    250 }
    251 
    252 void PacedVideoSender::CallProcess(const std::list<Module*>& modules) {
    253   for (Module* module : modules) {
    254     if (module->TimeUntilNextProcess() <= 0) {
    255       module->Process();
    256     }
    257   }
    258 }
    259 
    260 void PacedVideoSender::QueuePackets(Packets* batch,
    261                                     int64_t end_of_batch_time_us) {
    262   queue_.merge(*batch, DereferencingComparator<Packet>);
    263   if (queue_.empty()) {
    264     return;
    265   }
    266   Packets::iterator it = queue_.begin();
    267   for (; it != queue_.end(); ++it) {
    268     if ((*it)->send_time_us() > end_of_batch_time_us) {
    269       break;
    270     }
    271   }
    272   Packets to_transfer;
    273   to_transfer.splice(to_transfer.begin(), queue_, queue_.begin(), it);
    274   for (Packet* packet : to_transfer)
    275     packet->set_paced(true);
    276   bwe_->OnPacketsSent(to_transfer);
    277   batch->merge(to_transfer, DereferencingComparator<Packet>);
    278 }
    279 
    280 bool PacedVideoSender::TimeToSendPacket(uint32_t ssrc,
    281                                         uint16_t sequence_number,
    282                                         int64_t capture_time_ms,
    283                                         bool retransmission) {
    284   for (Packets::iterator it = pacer_queue_.begin(); it != pacer_queue_.end();
    285        ++it) {
    286     MediaPacket* media_packet = static_cast<MediaPacket*>(*it);
    287     if (media_packet->header().sequenceNumber == sequence_number) {
    288       int64_t pace_out_time_ms = clock_.TimeInMilliseconds();
    289 
    290       // Make sure a packet is never paced out earlier than when it was put into
    291       // the pacer.
    292       assert(pace_out_time_ms >= media_packet->send_time_ms());
    293 
    294       media_packet->SetAbsSendTimeMs(pace_out_time_ms);
    295       media_packet->set_send_time_us(1000 * pace_out_time_ms);
    296       media_packet->set_sender_timestamp_us(1000 * pace_out_time_ms);
    297       queue_.push_back(media_packet);
    298       pacer_queue_.erase(it);
    299       return true;
    300     }
    301   }
    302   return false;
    303 }
    304 
    305 size_t PacedVideoSender::TimeToSendPadding(size_t bytes) {
    306   return 0;
    307 }
    308 
    309 void PacedVideoSender::OnNetworkChanged(uint32_t target_bitrate_bps,
    310                                         uint8_t fraction_lost,
    311                                         int64_t rtt) {
    312   VideoSender::OnNetworkChanged(target_bitrate_bps, fraction_lost, rtt);
    313   pacer_.UpdateBitrate(
    314       target_bitrate_bps / 1000,
    315       PacedSender::kDefaultPaceMultiplier * target_bitrate_bps / 1000, 0);
    316 }
    317 
    318 const int kNoLimit = std::numeric_limits<int>::max();
    319 const int kPacketSizeBytes = 1200;
    320 
    321 TcpSender::TcpSender(PacketProcessorListener* listener,
    322                      int flow_id,
    323                      int64_t offset_ms)
    324     : TcpSender(listener, flow_id, offset_ms, kNoLimit) {
    325 }
    326 
    327 TcpSender::TcpSender(PacketProcessorListener* listener,
    328                      int flow_id,
    329                      int64_t offset_ms,
    330                      int send_limit_bytes)
    331     : PacketSender(listener, flow_id),
    332       cwnd_(10),
    333       ssthresh_(kNoLimit),
    334       ack_received_(false),
    335       last_acked_seq_num_(0),
    336       next_sequence_number_(0),
    337       offset_ms_(offset_ms),
    338       last_reduction_time_ms_(-1),
    339       last_rtt_ms_(0),
    340       total_sent_bytes_(0),
    341       send_limit_bytes_(send_limit_bytes),
    342       last_generated_packets_ms_(0),
    343       num_recent_sent_packets_(0),
    344       bitrate_kbps_(0) {
    345 }
    346 
    347 void TcpSender::RunFor(int64_t time_ms, Packets* in_out) {
    348   if (clock_.TimeInMilliseconds() + time_ms < offset_ms_) {
    349     clock_.AdvanceTimeMilliseconds(time_ms);
    350     if (running_) {
    351       Pause();
    352     }
    353     return;
    354   }
    355 
    356   if (!running_ && total_sent_bytes_ == 0) {
    357     Resume(offset_ms_);
    358   }
    359 
    360   int64_t start_time_ms = clock_.TimeInMilliseconds();
    361 
    362   std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets(
    363       in_out, clock_.TimeInMilliseconds() + time_ms, *flow_ids().begin());
    364   // The number of packets which are sent in during time_ms depends on the
    365   // number of packets in_flight_ and the max number of packets in flight
    366   // (cwnd_). Therefore SendPackets() isn't directly dependent on time_ms.
    367   for (FeedbackPacket* fb : feedbacks) {
    368     clock_.AdvanceTimeMilliseconds(fb->send_time_ms() -
    369                                    clock_.TimeInMilliseconds());
    370     last_rtt_ms_ = fb->send_time_ms() - fb->latest_send_time_ms();
    371     UpdateCongestionControl(fb);
    372     SendPackets(in_out);
    373   }
    374 
    375   for (auto it = in_flight_.begin(); it != in_flight_.end();) {
    376     if (it->time_ms < clock_.TimeInMilliseconds() - 1000)
    377       in_flight_.erase(it++);
    378     else
    379       ++it;
    380   }
    381 
    382   clock_.AdvanceTimeMilliseconds(time_ms -
    383                                  (clock_.TimeInMilliseconds() - start_time_ms));
    384   SendPackets(in_out);
    385 }
    386 
    387 void TcpSender::SendPackets(Packets* in_out) {
    388   int cwnd = ceil(cwnd_);
    389   int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
    390   int timed_out = TriggerTimeouts();
    391   if (timed_out > 0) {
    392     HandleLoss();
    393   }
    394   if (packets_to_send > 0) {
    395     Packets generated = GeneratePackets(packets_to_send);
    396     for (Packet* packet : generated)
    397       in_flight_.insert(InFlight(*static_cast<MediaPacket*>(packet)));
    398 
    399     in_out->merge(generated, DereferencingComparator<Packet>);
    400   }
    401 }
    402 
    403 void TcpSender::UpdateCongestionControl(const FeedbackPacket* fb) {
    404   const TcpFeedback* tcp_fb = static_cast<const TcpFeedback*>(fb);
    405   RTC_DCHECK(!tcp_fb->acked_packets().empty());
    406   ack_received_ = true;
    407 
    408   uint16_t expected = tcp_fb->acked_packets().back() - last_acked_seq_num_;
    409   uint16_t missing =
    410       expected - static_cast<uint16_t>(tcp_fb->acked_packets().size());
    411 
    412   for (uint16_t ack_seq_num : tcp_fb->acked_packets())
    413     in_flight_.erase(InFlight(ack_seq_num, clock_.TimeInMilliseconds()));
    414 
    415   if (missing > 0) {
    416     HandleLoss();
    417   } else if (cwnd_ <= ssthresh_) {
    418     cwnd_ += tcp_fb->acked_packets().size();
    419   } else {
    420     cwnd_ += 1.0f / cwnd_;
    421   }
    422 
    423   last_acked_seq_num_ =
    424       LatestSequenceNumber(tcp_fb->acked_packets().back(), last_acked_seq_num_);
    425 }
    426 
    427 int TcpSender::TriggerTimeouts() {
    428   int timed_out = 0;
    429   for (auto it = in_flight_.begin(); it != in_flight_.end();) {
    430     if (it->time_ms < clock_.TimeInMilliseconds() - 1000) {
    431       in_flight_.erase(it++);
    432       ++timed_out;
    433     } else {
    434       ++it;
    435     }
    436   }
    437   return timed_out;
    438 }
    439 
    440 void TcpSender::HandleLoss() {
    441   if (clock_.TimeInMilliseconds() - last_reduction_time_ms_ < last_rtt_ms_)
    442     return;
    443   last_reduction_time_ms_ = clock_.TimeInMilliseconds();
    444   ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
    445   cwnd_ = ssthresh_;
    446 }
    447 
    448 Packets TcpSender::GeneratePackets(size_t num_packets) {
    449   Packets generated;
    450 
    451   UpdateSendBitrateEstimate(num_packets);
    452 
    453   for (size_t i = 0; i < num_packets; ++i) {
    454     if ((total_sent_bytes_ + kPacketSizeBytes) > send_limit_bytes_) {
    455       if (running_) {
    456         Pause();
    457       }
    458       break;
    459     }
    460     generated.push_back(
    461         new MediaPacket(*flow_ids().begin(), 1000 * clock_.TimeInMilliseconds(),
    462                         kPacketSizeBytes, next_sequence_number_++));
    463     generated.back()->set_sender_timestamp_us(
    464         1000 * clock_.TimeInMilliseconds());
    465 
    466     total_sent_bytes_ += kPacketSizeBytes;
    467   }
    468 
    469   return generated;
    470 }
    471 
    472 void TcpSender::UpdateSendBitrateEstimate(size_t num_packets) {
    473   const int kTimeWindowMs = 500;
    474   num_recent_sent_packets_ += num_packets;
    475 
    476   int64_t delta_ms = clock_.TimeInMilliseconds() - last_generated_packets_ms_;
    477   if (delta_ms >= kTimeWindowMs) {
    478     bitrate_kbps_ =
    479         static_cast<uint32_t>(8 * num_recent_sent_packets_ * kPacketSizeBytes) /
    480         delta_ms;
    481     last_generated_packets_ms_ = clock_.TimeInMilliseconds();
    482     num_recent_sent_packets_ = 0;
    483   }
    484 
    485   RecordBitrate();
    486 }
    487 
    488 uint32_t TcpSender::TargetBitrateKbps() {
    489   return bitrate_kbps_;
    490 }
    491 
    492 }  // namespace bwe
    493 }  // namespace testing
    494 }  // namespace webrtc
    495