Home | History | Annotate | Download | only in source
      1 /*
      2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/modules/rtp_rtcp/source/receive_statistics_impl.h"
     12 
     13 #include <math.h>
     14 
     15 #include "webrtc/modules/rtp_rtcp/source/bitrate.h"
     16 #include "webrtc/modules/rtp_rtcp/source/rtp_utility.h"
     17 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
     18 #include "webrtc/system_wrappers/interface/scoped_ptr.h"
     19 
     20 namespace webrtc {
     21 
     22 const int64_t kStatisticsTimeoutMs = 8000;
     23 const int kStatisticsProcessIntervalMs = 1000;
     24 
     25 StreamStatistician::~StreamStatistician() {}
     26 
     27 StreamStatisticianImpl::StreamStatisticianImpl(
     28     Clock* clock,
     29     RtcpStatisticsCallback* rtcp_callback,
     30     StreamDataCountersCallback* rtp_callback)
     31     : clock_(clock),
     32       stream_lock_(CriticalSectionWrapper::CreateCriticalSection()),
     33       incoming_bitrate_(clock, NULL),
     34       ssrc_(0),
     35       max_reordering_threshold_(kDefaultMaxReorderingThreshold),
     36       jitter_q4_(0),
     37       cumulative_loss_(0),
     38       jitter_q4_transmission_time_offset_(0),
     39       last_receive_time_ms_(0),
     40       last_receive_time_secs_(0),
     41       last_receive_time_frac_(0),
     42       last_received_timestamp_(0),
     43       last_received_transmission_time_offset_(0),
     44       received_seq_first_(0),
     45       received_seq_max_(0),
     46       received_seq_wraps_(0),
     47       received_packet_overhead_(12),
     48       last_report_inorder_packets_(0),
     49       last_report_old_packets_(0),
     50       last_report_seq_max_(0),
     51       rtcp_callback_(rtcp_callback),
     52       rtp_callback_(rtp_callback) {}
     53 
     54 void StreamStatisticianImpl::ResetStatistics() {
     55   CriticalSectionScoped cs(stream_lock_.get());
     56   last_report_inorder_packets_ = 0;
     57   last_report_old_packets_ = 0;
     58   last_report_seq_max_ = 0;
     59   last_reported_statistics_ = RtcpStatistics();
     60   jitter_q4_ = 0;
     61   cumulative_loss_ = 0;
     62   jitter_q4_transmission_time_offset_ = 0;
     63   received_seq_wraps_ = 0;
     64   received_seq_max_ = 0;
     65   received_seq_first_ = 0;
     66   receive_counters_ = StreamDataCounters();
     67 }
     68 
     69 void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
     70                                             size_t bytes,
     71                                             bool retransmitted) {
     72   UpdateCounters(header, bytes, retransmitted);
     73   NotifyRtpCallback();
     74 }
     75 
     76 void StreamStatisticianImpl::UpdateCounters(const RTPHeader& header,
     77                                             size_t bytes,
     78                                             bool retransmitted) {
     79   CriticalSectionScoped cs(stream_lock_.get());
     80   bool in_order = InOrderPacketInternal(header.sequenceNumber);
     81   ssrc_ = header.ssrc;
     82   incoming_bitrate_.Update(bytes);
     83   receive_counters_.bytes +=
     84       bytes - (header.paddingLength + header.headerLength);
     85   receive_counters_.header_bytes += header.headerLength;
     86   receive_counters_.padding_bytes += header.paddingLength;
     87   ++receive_counters_.packets;
     88   if (!in_order && retransmitted) {
     89     ++receive_counters_.retransmitted_packets;
     90   }
     91 
     92   if (receive_counters_.packets == 1) {
     93     received_seq_first_ = header.sequenceNumber;
     94   }
     95 
     96   // Count only the new packets received. That is, if packets 1, 2, 3, 5, 4, 6
     97   // are received, 4 will be ignored.
     98   if (in_order) {
     99     // Current time in samples.
    100     uint32_t receive_time_secs;
    101     uint32_t receive_time_frac;
    102     clock_->CurrentNtp(receive_time_secs, receive_time_frac);
    103 
    104     // Wrong if we use RetransmitOfOldPacket.
    105     if (receive_counters_.packets > 1 &&
    106         received_seq_max_ > header.sequenceNumber) {
    107       // Wrap around detected.
    108       received_seq_wraps_++;
    109     }
    110     // New max.
    111     received_seq_max_ = header.sequenceNumber;
    112 
    113     // If new time stamp and more than one in-order packet received, calculate
    114     // new jitter statistics.
    115     if (header.timestamp != last_received_timestamp_ &&
    116         (receive_counters_.packets - receive_counters_.retransmitted_packets) >
    117             1) {
    118       UpdateJitter(header, receive_time_secs, receive_time_frac);
    119     }
    120     last_received_timestamp_ = header.timestamp;
    121     last_receive_time_secs_ = receive_time_secs;
    122     last_receive_time_frac_ = receive_time_frac;
    123     last_receive_time_ms_ = clock_->TimeInMilliseconds();
    124   }
    125 
    126   uint16_t packet_oh = header.headerLength + header.paddingLength;
    127 
    128   // Our measured overhead. Filter from RFC 5104 4.2.1.2:
    129   // avg_OH (new) = 15/16*avg_OH (old) + 1/16*pckt_OH,
    130   received_packet_overhead_ = (15 * received_packet_overhead_ + packet_oh) >> 4;
    131 }
    132 
    133 void StreamStatisticianImpl::UpdateJitter(const RTPHeader& header,
    134                                           uint32_t receive_time_secs,
    135                                           uint32_t receive_time_frac) {
    136   uint32_t receive_time_rtp = ModuleRTPUtility::ConvertNTPTimeToRTP(
    137           receive_time_secs, receive_time_frac, header.payload_type_frequency);
    138   uint32_t last_receive_time_rtp = ModuleRTPUtility::ConvertNTPTimeToRTP(
    139       last_receive_time_secs_, last_receive_time_frac_,
    140       header.payload_type_frequency);
    141   int32_t time_diff_samples = (receive_time_rtp - last_receive_time_rtp) -
    142       (header.timestamp - last_received_timestamp_);
    143 
    144   time_diff_samples = abs(time_diff_samples);
    145 
    146   // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
    147   // If this happens, don't update jitter value. Use 5 secs video frequency
    148   // as the threshold.
    149   if (time_diff_samples < 450000) {
    150     // Note we calculate in Q4 to avoid using float.
    151     int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
    152     jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
    153   }
    154 
    155   // Extended jitter report, RFC 5450.
    156   // Actual network jitter, excluding the source-introduced jitter.
    157   int32_t time_diff_samples_ext =
    158     (receive_time_rtp - last_receive_time_rtp) -
    159     ((header.timestamp +
    160       header.extension.transmissionTimeOffset) -
    161      (last_received_timestamp_ +
    162       last_received_transmission_time_offset_));
    163 
    164   time_diff_samples_ext = abs(time_diff_samples_ext);
    165 
    166   if (time_diff_samples_ext < 450000) {
    167     int32_t jitter_diffQ4TransmissionTimeOffset =
    168       (time_diff_samples_ext << 4) - jitter_q4_transmission_time_offset_;
    169     jitter_q4_transmission_time_offset_ +=
    170       ((jitter_diffQ4TransmissionTimeOffset + 8) >> 4);
    171   }
    172 }
    173 
    174 void StreamStatisticianImpl::NotifyRtpCallback() {
    175   StreamDataCounters data;
    176   uint32_t ssrc;
    177   {
    178     CriticalSectionScoped cs(stream_lock_.get());
    179     data = receive_counters_;
    180     ssrc = ssrc_;
    181   }
    182   rtp_callback_->DataCountersUpdated(data, ssrc);
    183 }
    184 
    185 void StreamStatisticianImpl::NotifyRtcpCallback() {
    186   RtcpStatistics data;
    187   uint32_t ssrc;
    188   {
    189     CriticalSectionScoped cs(stream_lock_.get());
    190     data = last_reported_statistics_;
    191     ssrc = ssrc_;
    192   }
    193   rtcp_callback_->StatisticsUpdated(data, ssrc);
    194 }
    195 
    196 void StreamStatisticianImpl::FecPacketReceived() {
    197   {
    198     CriticalSectionScoped cs(stream_lock_.get());
    199     ++receive_counters_.fec_packets;
    200   }
    201   NotifyRtpCallback();
    202 }
    203 
    204 void StreamStatisticianImpl::SetMaxReorderingThreshold(
    205     int max_reordering_threshold) {
    206   CriticalSectionScoped cs(stream_lock_.get());
    207   max_reordering_threshold_ = max_reordering_threshold;
    208 }
    209 
    210 bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
    211                                            bool reset) {
    212   {
    213     CriticalSectionScoped cs(stream_lock_.get());
    214     if (received_seq_first_ == 0 && receive_counters_.bytes == 0) {
    215       // We have not received anything.
    216       return false;
    217     }
    218 
    219     if (!reset) {
    220       if (last_report_inorder_packets_ == 0) {
    221         // No report.
    222         return false;
    223       }
    224       // Just get last report.
    225       *statistics = last_reported_statistics_;
    226       return true;
    227     }
    228 
    229     *statistics = CalculateRtcpStatistics();
    230   }
    231 
    232   NotifyRtcpCallback();
    233 
    234   return true;
    235 }
    236 
    237 RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
    238   RtcpStatistics stats;
    239 
    240   if (last_report_inorder_packets_ == 0) {
    241     // First time we send a report.
    242     last_report_seq_max_ = received_seq_first_ - 1;
    243   }
    244 
    245   // Calculate fraction lost.
    246   uint16_t exp_since_last = (received_seq_max_ - last_report_seq_max_);
    247 
    248   if (last_report_seq_max_ > received_seq_max_) {
    249     // Can we assume that the seq_num can't go decrease over a full RTCP period?
    250     exp_since_last = 0;
    251   }
    252 
    253   // Number of received RTP packets since last report, counts all packets but
    254   // not re-transmissions.
    255   uint32_t rec_since_last =
    256       (receive_counters_.packets - receive_counters_.retransmitted_packets) -
    257       last_report_inorder_packets_;
    258 
    259   // With NACK we don't know the expected retransmissions during the last
    260   // second. We know how many "old" packets we have received. We just count
    261   // the number of old received to estimate the loss, but it still does not
    262   // guarantee an exact number since we run this based on time triggered by
    263   // sending of an RTP packet. This should have a minimum effect.
    264 
    265   // With NACK we don't count old packets as received since they are
    266   // re-transmitted. We use RTT to decide if a packet is re-ordered or
    267   // re-transmitted.
    268   uint32_t retransmitted_packets =
    269       receive_counters_.retransmitted_packets - last_report_old_packets_;
    270   rec_since_last += retransmitted_packets;
    271 
    272   int32_t missing = 0;
    273   if (exp_since_last > rec_since_last) {
    274     missing = (exp_since_last - rec_since_last);
    275   }
    276   uint8_t local_fraction_lost = 0;
    277   if (exp_since_last) {
    278     // Scale 0 to 255, where 255 is 100% loss.
    279     local_fraction_lost =
    280         static_cast<uint8_t>(255 * missing / exp_since_last);
    281   }
    282   stats.fraction_lost = local_fraction_lost;
    283 
    284   // We need a counter for cumulative loss too.
    285   cumulative_loss_ += missing;
    286   stats.cumulative_lost = cumulative_loss_;
    287   stats.extended_max_sequence_number =
    288       (received_seq_wraps_ << 16) + received_seq_max_;
    289   // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
    290   stats.jitter = jitter_q4_ >> 4;
    291 
    292   // Store this report.
    293   last_reported_statistics_ = stats;
    294 
    295   // Only for report blocks in RTCP SR and RR.
    296   last_report_inorder_packets_ =
    297       receive_counters_.packets - receive_counters_.retransmitted_packets;
    298   last_report_old_packets_ = receive_counters_.retransmitted_packets;
    299   last_report_seq_max_ = received_seq_max_;
    300 
    301   return stats;
    302 }
    303 
    304 void StreamStatisticianImpl::GetDataCounters(
    305     uint32_t* bytes_received, uint32_t* packets_received) const {
    306   CriticalSectionScoped cs(stream_lock_.get());
    307   if (bytes_received) {
    308     *bytes_received = receive_counters_.bytes + receive_counters_.header_bytes +
    309                       receive_counters_.padding_bytes;
    310   }
    311   if (packets_received) {
    312     *packets_received = receive_counters_.packets;
    313   }
    314 }
    315 
    316 uint32_t StreamStatisticianImpl::BitrateReceived() const {
    317   CriticalSectionScoped cs(stream_lock_.get());
    318   return incoming_bitrate_.BitrateNow();
    319 }
    320 
    321 void StreamStatisticianImpl::ProcessBitrate() {
    322   CriticalSectionScoped cs(stream_lock_.get());
    323   incoming_bitrate_.Process();
    324 }
    325 
    326 void StreamStatisticianImpl::LastReceiveTimeNtp(uint32_t* secs,
    327                                                 uint32_t* frac) const {
    328   CriticalSectionScoped cs(stream_lock_.get());
    329   *secs = last_receive_time_secs_;
    330   *frac = last_receive_time_frac_;
    331 }
    332 
    333 bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
    334     const RTPHeader& header, int min_rtt) const {
    335   CriticalSectionScoped cs(stream_lock_.get());
    336   if (InOrderPacketInternal(header.sequenceNumber)) {
    337     return false;
    338   }
    339   uint32_t frequency_khz = header.payload_type_frequency / 1000;
    340   assert(frequency_khz > 0);
    341 
    342   int64_t time_diff_ms = clock_->TimeInMilliseconds() -
    343       last_receive_time_ms_;
    344 
    345   // Diff in time stamp since last received in order.
    346   uint32_t timestamp_diff = header.timestamp - last_received_timestamp_;
    347   int32_t rtp_time_stamp_diff_ms = static_cast<int32_t>(timestamp_diff) /
    348       frequency_khz;
    349 
    350   int32_t max_delay_ms = 0;
    351   if (min_rtt == 0) {
    352     // Jitter standard deviation in samples.
    353     float jitter_std = sqrt(static_cast<float>(jitter_q4_ >> 4));
    354 
    355     // 2 times the standard deviation => 95% confidence.
    356     // And transform to milliseconds by dividing by the frequency in kHz.
    357     max_delay_ms = static_cast<int32_t>((2 * jitter_std) / frequency_khz);
    358 
    359     // Min max_delay_ms is 1.
    360     if (max_delay_ms == 0) {
    361       max_delay_ms = 1;
    362     }
    363   } else {
    364     max_delay_ms = (min_rtt / 3) + 1;
    365   }
    366   return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
    367 }
    368 
    369 bool StreamStatisticianImpl::IsPacketInOrder(uint16_t sequence_number) const {
    370   CriticalSectionScoped cs(stream_lock_.get());
    371   return InOrderPacketInternal(sequence_number);
    372 }
    373 
    374 bool StreamStatisticianImpl::InOrderPacketInternal(
    375     uint16_t sequence_number) const {
    376   // First packet is always in order.
    377   if (last_receive_time_ms_ == 0)
    378     return true;
    379 
    380   if (IsNewerSequenceNumber(sequence_number, received_seq_max_)) {
    381     return true;
    382   } else {
    383     // If we have a restart of the remote side this packet is still in order.
    384     return !IsNewerSequenceNumber(sequence_number, received_seq_max_ -
    385                                   max_reordering_threshold_);
    386   }
    387 }
    388 
    389 ReceiveStatistics* ReceiveStatistics::Create(Clock* clock) {
    390   return new ReceiveStatisticsImpl(clock);
    391 }
    392 
    393 ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
    394     : clock_(clock),
    395       receive_statistics_lock_(CriticalSectionWrapper::CreateCriticalSection()),
    396       last_rate_update_ms_(0),
    397       rtcp_stats_callback_(NULL),
    398       rtp_stats_callback_(NULL) {}
    399 
    400 ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
    401   while (!statisticians_.empty()) {
    402     delete statisticians_.begin()->second;
    403     statisticians_.erase(statisticians_.begin());
    404   }
    405 }
    406 
    407 void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
    408                                            size_t bytes,
    409                                            bool retransmitted) {
    410   StatisticianImplMap::iterator it;
    411   {
    412     CriticalSectionScoped cs(receive_statistics_lock_.get());
    413     it = statisticians_.find(header.ssrc);
    414     if (it == statisticians_.end()) {
    415       std::pair<StatisticianImplMap::iterator, uint32_t> insert_result =
    416           statisticians_.insert(std::make_pair(
    417               header.ssrc, new StreamStatisticianImpl(clock_, this, this)));
    418       it = insert_result.first;
    419     }
    420   }
    421   it->second->IncomingPacket(header, bytes, retransmitted);
    422 }
    423 
    424 void ReceiveStatisticsImpl::FecPacketReceived(uint32_t ssrc) {
    425   CriticalSectionScoped cs(receive_statistics_lock_.get());
    426   StatisticianImplMap::iterator it = statisticians_.find(ssrc);
    427   assert(it != statisticians_.end());
    428   it->second->FecPacketReceived();
    429 }
    430 
    431 void ReceiveStatisticsImpl::ChangeSsrc(uint32_t from_ssrc, uint32_t to_ssrc) {
    432   CriticalSectionScoped cs(receive_statistics_lock_.get());
    433   StatisticianImplMap::iterator from_it = statisticians_.find(from_ssrc);
    434   if (from_it == statisticians_.end())
    435     return;
    436   if (statisticians_.find(to_ssrc) != statisticians_.end())
    437     return;
    438   statisticians_[to_ssrc] = from_it->second;
    439   statisticians_.erase(from_it);
    440 }
    441 
    442 StatisticianMap ReceiveStatisticsImpl::GetActiveStatisticians() const {
    443   CriticalSectionScoped cs(receive_statistics_lock_.get());
    444   StatisticianMap active_statisticians;
    445   for (StatisticianImplMap::const_iterator it = statisticians_.begin();
    446        it != statisticians_.end(); ++it) {
    447     uint32_t secs;
    448     uint32_t frac;
    449     it->second->LastReceiveTimeNtp(&secs, &frac);
    450     if (clock_->CurrentNtpInMilliseconds() -
    451         Clock::NtpToMs(secs, frac) < kStatisticsTimeoutMs) {
    452       active_statisticians[it->first] = it->second;
    453     }
    454   }
    455   return active_statisticians;
    456 }
    457 
    458 StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
    459     uint32_t ssrc) const {
    460   CriticalSectionScoped cs(receive_statistics_lock_.get());
    461   StatisticianImplMap::const_iterator it = statisticians_.find(ssrc);
    462   if (it == statisticians_.end())
    463     return NULL;
    464   return it->second;
    465 }
    466 
    467 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
    468     int max_reordering_threshold) {
    469   CriticalSectionScoped cs(receive_statistics_lock_.get());
    470   for (StatisticianImplMap::iterator it = statisticians_.begin();
    471        it != statisticians_.end(); ++it) {
    472     it->second->SetMaxReorderingThreshold(max_reordering_threshold);
    473   }
    474 }
    475 
    476 int32_t ReceiveStatisticsImpl::Process() {
    477   CriticalSectionScoped cs(receive_statistics_lock_.get());
    478   for (StatisticianImplMap::iterator it = statisticians_.begin();
    479        it != statisticians_.end(); ++it) {
    480     it->second->ProcessBitrate();
    481   }
    482   last_rate_update_ms_ = clock_->TimeInMilliseconds();
    483   return 0;
    484 }
    485 
    486 int32_t ReceiveStatisticsImpl::TimeUntilNextProcess() {
    487   CriticalSectionScoped cs(receive_statistics_lock_.get());
    488   int time_since_last_update = clock_->TimeInMilliseconds() -
    489       last_rate_update_ms_;
    490   return std::max(kStatisticsProcessIntervalMs - time_since_last_update, 0);
    491 }
    492 
    493 void ReceiveStatisticsImpl::RegisterRtcpStatisticsCallback(
    494     RtcpStatisticsCallback* callback) {
    495   CriticalSectionScoped cs(receive_statistics_lock_.get());
    496   if (callback != NULL)
    497     assert(rtcp_stats_callback_ == NULL);
    498   rtcp_stats_callback_ = callback;
    499 }
    500 
    501 void ReceiveStatisticsImpl::StatisticsUpdated(const RtcpStatistics& statistics,
    502                                               uint32_t ssrc) {
    503   CriticalSectionScoped cs(receive_statistics_lock_.get());
    504   if (rtcp_stats_callback_) {
    505     rtcp_stats_callback_->StatisticsUpdated(statistics, ssrc);
    506   }
    507 }
    508 
    509 void ReceiveStatisticsImpl::RegisterRtpStatisticsCallback(
    510     StreamDataCountersCallback* callback) {
    511   CriticalSectionScoped cs(receive_statistics_lock_.get());
    512   if (callback != NULL)
    513     assert(rtp_stats_callback_ == NULL);
    514   rtp_stats_callback_ = callback;
    515 }
    516 
    517 void ReceiveStatisticsImpl::DataCountersUpdated(const StreamDataCounters& stats,
    518                                                 uint32_t ssrc) {
    519   CriticalSectionScoped cs(receive_statistics_lock_.get());
    520   if (rtp_stats_callback_) {
    521     rtp_stats_callback_->DataCountersUpdated(stats, ssrc);
    522   }
    523 }
    524 
    525 void NullReceiveStatistics::IncomingPacket(const RTPHeader& rtp_header,
    526                                            size_t bytes,
    527                                            bool retransmitted) {}
    528 
    529 void NullReceiveStatistics::FecPacketReceived(uint32_t ssrc) {}
    530 
    531 StatisticianMap NullReceiveStatistics::GetActiveStatisticians() const {
    532   return StatisticianMap();
    533 }
    534 
    535 StreamStatistician* NullReceiveStatistics::GetStatistician(
    536     uint32_t ssrc) const {
    537   return NULL;
    538 }
    539 
    540 void NullReceiveStatistics::SetMaxReorderingThreshold(
    541     int max_reordering_threshold) {}
    542 
    543 int32_t NullReceiveStatistics::TimeUntilNextProcess() { return 0; }
    544 
    545 int32_t NullReceiveStatistics::Process() { return 0; }
    546 
    547 void NullReceiveStatistics::RegisterRtcpStatisticsCallback(
    548     RtcpStatisticsCallback* callback) {}
    549 
    550 void NullReceiveStatistics::RegisterRtpStatisticsCallback(
    551     StreamDataCountersCallback* callback) {}
    552 
    553 }  // namespace webrtc
    554