Home | History | Annotate | Download | only in remote_bitrate_estimator
      1 /*
      2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.h"
     12 
     13 #include <math.h>
     14 
     15 #include <algorithm>
     16 
     17 #include "webrtc/base/constructormagic.h"
     18 #include "webrtc/base/logging.h"
     19 #include "webrtc/base/scoped_ptr.h"
     20 #include "webrtc/base/thread_annotations.h"
     21 #include "webrtc/modules/pacing/paced_sender.h"
     22 #include "webrtc/modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
     23 #include "webrtc/system_wrappers/include/clock.h"
     24 #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
     25 #include "webrtc/typedefs.h"
     26 
     27 namespace webrtc {
     28 
     29 enum {
     30   kTimestampGroupLengthMs = 5,
     31   kAbsSendTimeFraction = 18,
     32   kAbsSendTimeInterArrivalUpshift = 8,
     33   kInterArrivalShift = kAbsSendTimeFraction + kAbsSendTimeInterArrivalUpshift,
     34   kInitialProbingIntervalMs = 2000,
     35   kMinClusterSize = 4,
     36   kMaxProbePackets = 15,
     37   kExpectedNumberOfProbes = 3
     38 };
     39 
     40 static const size_t kPropagationDeltaQueueMaxSize = 1000;
     41 static const int64_t kPropagationDeltaQueueMaxTimeMs = 1000;
     42 static const double kTimestampToMs = 1000.0 /
     43     static_cast<double>(1 << kInterArrivalShift);
     44 
     45 // Removes the entries at |index| of |time| and |value|, if time[index] is
     46 // smaller than or equal to |deadline|. |time| must be sorted ascendingly.
     47 static void RemoveStaleEntries(
     48     std::vector<int64_t>* time, std::vector<int>* value, int64_t deadline) {
     49   assert(time->size() == value->size());
     50   std::vector<int64_t>::iterator end_of_removal = std::upper_bound(
     51       time->begin(), time->end(), deadline);
     52   size_t end_of_removal_index = end_of_removal - time->begin();
     53 
     54   time->erase(time->begin(), end_of_removal);
     55   value->erase(value->begin(), value->begin() + end_of_removal_index);
     56 }
     57 
     58 template<typename K, typename V>
     59 std::vector<K> Keys(const std::map<K, V>& map) {
     60   std::vector<K> keys;
     61   keys.reserve(map.size());
     62   for (typename std::map<K, V>::const_iterator it = map.begin();
     63       it != map.end(); ++it) {
     64     keys.push_back(it->first);
     65   }
     66   return keys;
     67 }
     68 
     69 uint32_t ConvertMsTo24Bits(int64_t time_ms) {
     70   uint32_t time_24_bits =
     71       static_cast<uint32_t>(
     72           ((static_cast<uint64_t>(time_ms) << kAbsSendTimeFraction) + 500) /
     73           1000) &
     74       0x00FFFFFF;
     75   return time_24_bits;
     76 }
     77 
     78 bool RemoteBitrateEstimatorAbsSendTime::IsWithinClusterBounds(
     79     int send_delta_ms,
     80     const Cluster& cluster_aggregate) {
     81     if (cluster_aggregate.count == 0)
     82       return true;
     83     float cluster_mean = cluster_aggregate.send_mean_ms /
     84                          static_cast<float>(cluster_aggregate.count);
     85     return fabs(static_cast<float>(send_delta_ms) - cluster_mean) < 2.5f;
     86   }
     87 
     88   void RemoteBitrateEstimatorAbsSendTime::AddCluster(
     89       std::list<Cluster>* clusters,
     90       Cluster* cluster) {
     91     cluster->send_mean_ms /= static_cast<float>(cluster->count);
     92     cluster->recv_mean_ms /= static_cast<float>(cluster->count);
     93     cluster->mean_size /= cluster->count;
     94     clusters->push_back(*cluster);
     95   }
     96 
     97   int RemoteBitrateEstimatorAbsSendTime::Id() const {
     98     return static_cast<int>(reinterpret_cast<uint64_t>(this));
     99   }
    100 
    101   RemoteBitrateEstimatorAbsSendTime::RemoteBitrateEstimatorAbsSendTime(
    102       RemoteBitrateObserver* observer,
    103       Clock* clock)
    104       : crit_sect_(CriticalSectionWrapper::CreateCriticalSection()),
    105         observer_(observer),
    106         clock_(clock),
    107         ssrcs_(),
    108         inter_arrival_(),
    109         estimator_(OverUseDetectorOptions()),
    110         detector_(OverUseDetectorOptions()),
    111         incoming_bitrate_(kBitrateWindowMs, 8000),
    112         last_process_time_(-1),
    113         process_interval_ms_(kProcessIntervalMs),
    114         total_propagation_delta_ms_(0),
    115         total_probes_received_(0),
    116         first_packet_time_ms_(-1) {
    117   assert(observer_);
    118   assert(clock_);
    119   LOG(LS_INFO) << "RemoteBitrateEstimatorAbsSendTime: Instantiating.";
    120 }
    121 
    122 void RemoteBitrateEstimatorAbsSendTime::ComputeClusters(
    123     std::list<Cluster>* clusters) const {
    124   Cluster current;
    125   int64_t prev_send_time = -1;
    126   int64_t prev_recv_time = -1;
    127   for (std::list<Probe>::const_iterator it = probes_.begin();
    128        it != probes_.end();
    129        ++it) {
    130     if (prev_send_time >= 0) {
    131       int send_delta_ms = it->send_time_ms - prev_send_time;
    132       int recv_delta_ms = it->recv_time_ms - prev_recv_time;
    133       if (send_delta_ms >= 1 && recv_delta_ms >= 1) {
    134         ++current.num_above_min_delta;
    135       }
    136       if (!IsWithinClusterBounds(send_delta_ms, current)) {
    137         if (current.count >= kMinClusterSize)
    138           AddCluster(clusters, &current);
    139         current = Cluster();
    140       }
    141       current.send_mean_ms += send_delta_ms;
    142       current.recv_mean_ms += recv_delta_ms;
    143       current.mean_size += it->payload_size;
    144       ++current.count;
    145     }
    146     prev_send_time = it->send_time_ms;
    147     prev_recv_time = it->recv_time_ms;
    148   }
    149   if (current.count >= kMinClusterSize)
    150     AddCluster(clusters, &current);
    151 }
    152 
    153 std::list<Cluster>::const_iterator
    154 RemoteBitrateEstimatorAbsSendTime::FindBestProbe(
    155     const std::list<Cluster>& clusters) const {
    156   int highest_probe_bitrate_bps = 0;
    157   std::list<Cluster>::const_iterator best_it = clusters.end();
    158   for (std::list<Cluster>::const_iterator it = clusters.begin();
    159        it != clusters.end();
    160        ++it) {
    161     if (it->send_mean_ms == 0 || it->recv_mean_ms == 0)
    162       continue;
    163     int send_bitrate_bps = it->mean_size * 8 * 1000 / it->send_mean_ms;
    164     int recv_bitrate_bps = it->mean_size * 8 * 1000 / it->recv_mean_ms;
    165     if (it->num_above_min_delta > it->count / 2 &&
    166         (it->recv_mean_ms - it->send_mean_ms <= 2.0f &&
    167          it->send_mean_ms - it->recv_mean_ms <= 5.0f)) {
    168       int probe_bitrate_bps =
    169           std::min(it->GetSendBitrateBps(), it->GetRecvBitrateBps());
    170       if (probe_bitrate_bps > highest_probe_bitrate_bps) {
    171         highest_probe_bitrate_bps = probe_bitrate_bps;
    172         best_it = it;
    173       }
    174     } else {
    175       LOG(LS_INFO) << "Probe failed, sent at " << send_bitrate_bps
    176                    << " bps, received at " << recv_bitrate_bps
    177                    << " bps. Mean send delta: " << it->send_mean_ms
    178                    << " ms, mean recv delta: " << it->recv_mean_ms
    179                    << " ms, num probes: " << it->count;
    180       break;
    181     }
    182   }
    183   return best_it;
    184 }
    185 
    186 void RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) {
    187   std::list<Cluster> clusters;
    188   ComputeClusters(&clusters);
    189   if (clusters.empty()) {
    190     // If we reach the max number of probe packets and still have no clusters,
    191     // we will remove the oldest one.
    192     if (probes_.size() >= kMaxProbePackets)
    193       probes_.pop_front();
    194     return;
    195   }
    196 
    197   std::list<Cluster>::const_iterator best_it = FindBestProbe(clusters);
    198   if (best_it != clusters.end()) {
    199     int probe_bitrate_bps =
    200         std::min(best_it->GetSendBitrateBps(), best_it->GetRecvBitrateBps());
    201     // Make sure that a probe sent on a lower bitrate than our estimate can't
    202     // reduce the estimate.
    203     if (IsBitrateImproving(probe_bitrate_bps) &&
    204         probe_bitrate_bps > static_cast<int>(incoming_bitrate_.Rate(now_ms))) {
    205       LOG(LS_INFO) << "Probe successful, sent at "
    206                    << best_it->GetSendBitrateBps() << " bps, received at "
    207                    << best_it->GetRecvBitrateBps()
    208                    << " bps. Mean send delta: " << best_it->send_mean_ms
    209                    << " ms, mean recv delta: " << best_it->recv_mean_ms
    210                    << " ms, num probes: " << best_it->count;
    211       remote_rate_.SetEstimate(probe_bitrate_bps, now_ms);
    212     }
    213   }
    214 
    215   // Not probing and received non-probe packet, or finished with current set
    216   // of probes.
    217   if (clusters.size() >= kExpectedNumberOfProbes)
    218     probes_.clear();
    219 }
    220 
    221 bool RemoteBitrateEstimatorAbsSendTime::IsBitrateImproving(
    222     int new_bitrate_bps) const {
    223   bool initial_probe = !remote_rate_.ValidEstimate() && new_bitrate_bps > 0;
    224   bool bitrate_above_estimate =
    225       remote_rate_.ValidEstimate() &&
    226       new_bitrate_bps > static_cast<int>(remote_rate_.LatestEstimate());
    227   return initial_probe || bitrate_above_estimate;
    228 }
    229 
    230 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketFeedbackVector(
    231     const std::vector<PacketInfo>& packet_feedback_vector) {
    232   for (const auto& packet_info : packet_feedback_vector) {
    233     IncomingPacketInfo(packet_info.arrival_time_ms,
    234                        ConvertMsTo24Bits(packet_info.send_time_ms),
    235                        packet_info.payload_size, 0, packet_info.was_paced);
    236   }
    237 }
    238 
    239 void RemoteBitrateEstimatorAbsSendTime::IncomingPacket(int64_t arrival_time_ms,
    240                                                        size_t payload_size,
    241                                                        const RTPHeader& header,
    242                                                        bool was_paced) {
    243   if (!header.extension.hasAbsoluteSendTime) {
    244     LOG(LS_WARNING) << "RemoteBitrateEstimatorAbsSendTimeImpl: Incoming packet "
    245                        "is missing absolute send time extension!";
    246     return;
    247   }
    248   IncomingPacketInfo(arrival_time_ms, header.extension.absoluteSendTime,
    249                      payload_size, header.ssrc, was_paced);
    250 }
    251 
    252 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketInfo(
    253     int64_t arrival_time_ms,
    254     uint32_t send_time_24bits,
    255     size_t payload_size,
    256     uint32_t ssrc,
    257     bool was_paced) {
    258   assert(send_time_24bits < (1ul << 24));
    259   // Shift up send time to use the full 32 bits that inter_arrival works with,
    260   // so wrapping works properly.
    261   uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift;
    262   int64_t send_time_ms = static_cast<int64_t>(timestamp) * kTimestampToMs;
    263 
    264   CriticalSectionScoped cs(crit_sect_.get());
    265   int64_t now_ms = clock_->TimeInMilliseconds();
    266   // TODO(holmer): SSRCs are only needed for REMB, should be broken out from
    267   // here.
    268   ssrcs_[ssrc] = now_ms;
    269   incoming_bitrate_.Update(payload_size, now_ms);
    270   const BandwidthUsage prior_state = detector_.State();
    271 
    272   if (first_packet_time_ms_ == -1)
    273     first_packet_time_ms_ = clock_->TimeInMilliseconds();
    274 
    275   uint32_t ts_delta = 0;
    276   int64_t t_delta = 0;
    277   int size_delta = 0;
    278   // For now only try to detect probes while we don't have a valid estimate, and
    279   // make sure the packet was paced. We currently assume that only packets
    280   // larger than 200 bytes are paced by the sender.
    281   was_paced = was_paced && payload_size > PacedSender::kMinProbePacketSize;
    282   if (was_paced &&
    283       (!remote_rate_.ValidEstimate() ||
    284        now_ms - first_packet_time_ms_ < kInitialProbingIntervalMs)) {
    285     // TODO(holmer): Use a map instead to get correct order?
    286     if (total_probes_received_ < kMaxProbePackets) {
    287       int send_delta_ms = -1;
    288       int recv_delta_ms = -1;
    289       if (!probes_.empty()) {
    290         send_delta_ms = send_time_ms - probes_.back().send_time_ms;
    291         recv_delta_ms = arrival_time_ms - probes_.back().recv_time_ms;
    292       }
    293       LOG(LS_INFO) << "Probe packet received: send time=" << send_time_ms
    294                    << " ms, recv time=" << arrival_time_ms
    295                    << " ms, send delta=" << send_delta_ms
    296                    << " ms, recv delta=" << recv_delta_ms << " ms.";
    297     }
    298     probes_.push_back(Probe(send_time_ms, arrival_time_ms, payload_size));
    299     ++total_probes_received_;
    300     ProcessClusters(now_ms);
    301   }
    302   if (!inter_arrival_.get()) {
    303     inter_arrival_.reset(
    304         new InterArrival((kTimestampGroupLengthMs << kInterArrivalShift) / 1000,
    305                          kTimestampToMs, true));
    306   }
    307   if (inter_arrival_->ComputeDeltas(timestamp, arrival_time_ms, payload_size,
    308                                     &ts_delta, &t_delta, &size_delta)) {
    309     double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift);
    310     estimator_.Update(t_delta, ts_delta_ms, size_delta, detector_.State());
    311     detector_.Detect(estimator_.offset(), ts_delta_ms,
    312                      estimator_.num_of_deltas(), arrival_time_ms);
    313     UpdateStats(static_cast<int>(t_delta - ts_delta_ms), now_ms);
    314   }
    315   if (detector_.State() == kBwOverusing) {
    316     uint32_t incoming_bitrate_bps = incoming_bitrate_.Rate(now_ms);
    317     if (prior_state != kBwOverusing ||
    318         remote_rate_.TimeToReduceFurther(now_ms, incoming_bitrate_bps)) {
    319       // The first overuse should immediately trigger a new estimate.
    320       // We also have to update the estimate immediately if we are overusing
    321       // and the target bitrate is too high compared to what we are receiving.
    322       UpdateEstimate(now_ms);
    323     }
    324   }
    325 }
    326 
    327 int32_t RemoteBitrateEstimatorAbsSendTime::Process() {
    328   if (TimeUntilNextProcess() > 0) {
    329     return 0;
    330   }
    331   {
    332     CriticalSectionScoped cs(crit_sect_.get());
    333     UpdateEstimate(clock_->TimeInMilliseconds());
    334   }
    335   last_process_time_ = clock_->TimeInMilliseconds();
    336   return 0;
    337 }
    338 
    339 int64_t RemoteBitrateEstimatorAbsSendTime::TimeUntilNextProcess() {
    340   if (last_process_time_ < 0) {
    341     return 0;
    342   }
    343   {
    344     CriticalSectionScoped cs(crit_sect_.get());
    345     return last_process_time_ + process_interval_ms_ -
    346         clock_->TimeInMilliseconds();
    347   }
    348 }
    349 
    350 void RemoteBitrateEstimatorAbsSendTime::UpdateEstimate(int64_t now_ms) {
    351   if (!inter_arrival_.get()) {
    352     // No packets have been received on the active streams.
    353     return;
    354   }
    355   for (Ssrcs::iterator it = ssrcs_.begin(); it != ssrcs_.end();) {
    356     if ((now_ms - it->second) > kStreamTimeOutMs) {
    357       ssrcs_.erase(it++);
    358     } else {
    359       ++it;
    360     }
    361   }
    362   if (ssrcs_.empty()) {
    363     // We can't update the estimate if we don't have any active streams.
    364     inter_arrival_.reset();
    365     // We deliberately don't reset the first_packet_time_ms_ here for now since
    366     // we only probe for bandwidth in the beginning of a call right now.
    367     return;
    368   }
    369 
    370   const RateControlInput input(detector_.State(),
    371                                incoming_bitrate_.Rate(now_ms),
    372                                estimator_.var_noise());
    373   remote_rate_.Update(&input, now_ms);
    374   unsigned int target_bitrate = remote_rate_.UpdateBandwidthEstimate(now_ms);
    375   if (remote_rate_.ValidEstimate()) {
    376     process_interval_ms_ = remote_rate_.GetFeedbackInterval();
    377     observer_->OnReceiveBitrateChanged(Keys(ssrcs_), target_bitrate);
    378   }
    379 }
    380 
    381 void RemoteBitrateEstimatorAbsSendTime::OnRttUpdate(int64_t avg_rtt_ms,
    382                                                     int64_t max_rtt_ms) {
    383   CriticalSectionScoped cs(crit_sect_.get());
    384   remote_rate_.SetRtt(avg_rtt_ms);
    385 }
    386 
    387 void RemoteBitrateEstimatorAbsSendTime::RemoveStream(unsigned int ssrc) {
    388   CriticalSectionScoped cs(crit_sect_.get());
    389   ssrcs_.erase(ssrc);
    390 }
    391 
    392 bool RemoteBitrateEstimatorAbsSendTime::LatestEstimate(
    393     std::vector<unsigned int>* ssrcs,
    394     unsigned int* bitrate_bps) const {
    395   CriticalSectionScoped cs(crit_sect_.get());
    396   assert(ssrcs);
    397   assert(bitrate_bps);
    398   if (!remote_rate_.ValidEstimate()) {
    399     return false;
    400   }
    401   *ssrcs = Keys(ssrcs_);
    402   if (ssrcs_.empty()) {
    403     *bitrate_bps = 0;
    404   } else {
    405     *bitrate_bps = remote_rate_.LatestEstimate();
    406   }
    407   return true;
    408 }
    409 
    410 bool RemoteBitrateEstimatorAbsSendTime::GetStats(
    411     ReceiveBandwidthEstimatorStats* output) const {
    412   {
    413     CriticalSectionScoped cs(crit_sect_.get());
    414     output->recent_propagation_time_delta_ms = recent_propagation_delta_ms_;
    415     output->recent_arrival_time_ms = recent_update_time_ms_;
    416     output->total_propagation_time_delta_ms = total_propagation_delta_ms_;
    417   }
    418   RemoveStaleEntries(
    419       &output->recent_arrival_time_ms,
    420       &output->recent_propagation_time_delta_ms,
    421       clock_->TimeInMilliseconds() - kPropagationDeltaQueueMaxTimeMs);
    422   return true;
    423 }
    424 
    425 void RemoteBitrateEstimatorAbsSendTime::UpdateStats(int propagation_delta_ms,
    426                                                     int64_t now_ms) {
    427   // The caller must enter crit_sect_ before the call.
    428 
    429   // Remove the oldest entry if the size limit is reached.
    430   if (recent_update_time_ms_.size() == kPropagationDeltaQueueMaxSize) {
    431     recent_update_time_ms_.erase(recent_update_time_ms_.begin());
    432     recent_propagation_delta_ms_.erase(recent_propagation_delta_ms_.begin());
    433   }
    434 
    435   recent_propagation_delta_ms_.push_back(propagation_delta_ms);
    436   recent_update_time_ms_.push_back(now_ms);
    437 
    438   RemoveStaleEntries(
    439       &recent_update_time_ms_,
    440       &recent_propagation_delta_ms_,
    441       now_ms - kPropagationDeltaQueueMaxTimeMs);
    442 
    443   total_propagation_delta_ms_ =
    444       std::max(total_propagation_delta_ms_ + propagation_delta_ms, 0);
    445 }
    446 
    447 void RemoteBitrateEstimatorAbsSendTime::SetMinBitrate(int min_bitrate_bps) {
    448   CriticalSectionScoped cs(crit_sect_.get());
    449   remote_rate_.SetMinBitrate(min_bitrate_bps);
    450 }
    451 }  // namespace webrtc
    452