Home | History | Annotate | Download | only in source
      1 /*
      2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/modules/rtp_rtcp/source/receive_statistics_impl.h"
     12 
     13 #include <math.h>
     14 
     15 #include "webrtc/modules/rtp_rtcp/source/bitrate.h"
     16 #include "webrtc/modules/rtp_rtcp/source/rtp_utility.h"
     17 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
     18 #include "webrtc/system_wrappers/interface/scoped_ptr.h"
     19 
     20 namespace webrtc {
     21 
     22 const int64_t kStatisticsTimeoutMs = 8000;
     23 const int kStatisticsProcessIntervalMs = 1000;
     24 
     25 StreamStatistician::~StreamStatistician() {}
     26 
     27 StreamStatisticianImpl::StreamStatisticianImpl(
     28     Clock* clock,
     29     RtcpStatisticsCallback* rtcp_callback,
     30     StreamDataCountersCallback* rtp_callback)
     31     : clock_(clock),
     32       stream_lock_(CriticalSectionWrapper::CreateCriticalSection()),
     33       incoming_bitrate_(clock, NULL),
     34       ssrc_(0),
     35       max_reordering_threshold_(kDefaultMaxReorderingThreshold),
     36       jitter_q4_(0),
     37       cumulative_loss_(0),
     38       jitter_q4_transmission_time_offset_(0),
     39       last_receive_time_ms_(0),
     40       last_receive_time_secs_(0),
     41       last_receive_time_frac_(0),
     42       last_received_timestamp_(0),
     43       last_received_transmission_time_offset_(0),
     44       received_seq_first_(0),
     45       received_seq_max_(0),
     46       received_seq_wraps_(0),
     47       received_packet_overhead_(12),
     48       last_report_inorder_packets_(0),
     49       last_report_old_packets_(0),
     50       last_report_seq_max_(0),
     51       rtcp_callback_(rtcp_callback),
     52       rtp_callback_(rtp_callback) {}
     53 
     54 void StreamStatisticianImpl::ResetStatistics() {
     55   CriticalSectionScoped cs(stream_lock_.get());
     56   last_report_inorder_packets_ = 0;
     57   last_report_old_packets_ = 0;
     58   last_report_seq_max_ = 0;
     59   last_reported_statistics_ = RtcpStatistics();
     60   jitter_q4_ = 0;
     61   cumulative_loss_ = 0;
     62   jitter_q4_transmission_time_offset_ = 0;
     63   received_seq_wraps_ = 0;
     64   received_seq_max_ = 0;
     65   received_seq_first_ = 0;
     66   receive_counters_ = StreamDataCounters();
     67 }
     68 
     69 void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
     70                                             size_t bytes,
     71                                             bool retransmitted) {
     72   UpdateCounters(header, bytes, retransmitted);
     73   NotifyRtpCallback();
     74 }
     75 
     76 void StreamStatisticianImpl::UpdateCounters(const RTPHeader& header,
     77                                             size_t bytes,
     78                                             bool retransmitted) {
     79   CriticalSectionScoped cs(stream_lock_.get());
     80   bool in_order = InOrderPacketInternal(header.sequenceNumber);
     81   ssrc_ = header.ssrc;
     82   incoming_bitrate_.Update(bytes);
     83   receive_counters_.bytes +=
     84       bytes - (header.paddingLength + header.headerLength);
     85   receive_counters_.header_bytes += header.headerLength;
     86   receive_counters_.padding_bytes += header.paddingLength;
     87   ++receive_counters_.packets;
     88   if (!in_order && retransmitted) {
     89     ++receive_counters_.retransmitted_packets;
     90   }
     91 
     92   if (receive_counters_.packets == 1) {
     93     received_seq_first_ = header.sequenceNumber;
     94   }
     95 
     96   // Count only the new packets received. That is, if packets 1, 2, 3, 5, 4, 6
     97   // are received, 4 will be ignored.
     98   if (in_order) {
     99     // Current time in samples.
    100     uint32_t receive_time_secs;
    101     uint32_t receive_time_frac;
    102     clock_->CurrentNtp(receive_time_secs, receive_time_frac);
    103 
    104     // Wrong if we use RetransmitOfOldPacket.
    105     if (receive_counters_.packets > 1 &&
    106         received_seq_max_ > header.sequenceNumber) {
    107       // Wrap around detected.
    108       received_seq_wraps_++;
    109     }
    110     // New max.
    111     received_seq_max_ = header.sequenceNumber;
    112 
    113     // If new time stamp and more than one in-order packet received, calculate
    114     // new jitter statistics.
    115     if (header.timestamp != last_received_timestamp_ &&
    116         (receive_counters_.packets - receive_counters_.retransmitted_packets) >
    117             1) {
    118       UpdateJitter(header, receive_time_secs, receive_time_frac);
    119     }
    120     last_received_timestamp_ = header.timestamp;
    121     last_receive_time_secs_ = receive_time_secs;
    122     last_receive_time_frac_ = receive_time_frac;
    123     last_receive_time_ms_ = clock_->TimeInMilliseconds();
    124   }
    125 
    126   uint16_t packet_oh = header.headerLength + header.paddingLength;
    127 
    128   // Our measured overhead. Filter from RFC 5104 4.2.1.2:
    129   // avg_OH (new) = 15/16*avg_OH (old) + 1/16*pckt_OH,
    130   received_packet_overhead_ = (15 * received_packet_overhead_ + packet_oh) >> 4;
    131 }
    132 
    133 void StreamStatisticianImpl::UpdateJitter(const RTPHeader& header,
    134                                           uint32_t receive_time_secs,
    135                                           uint32_t receive_time_frac) {
    136   uint32_t receive_time_rtp = RtpUtility::ConvertNTPTimeToRTP(
    137       receive_time_secs, receive_time_frac, header.payload_type_frequency);
    138   uint32_t last_receive_time_rtp =
    139       RtpUtility::ConvertNTPTimeToRTP(last_receive_time_secs_,
    140                                       last_receive_time_frac_,
    141                                       header.payload_type_frequency);
    142   int32_t time_diff_samples = (receive_time_rtp - last_receive_time_rtp) -
    143       (header.timestamp - last_received_timestamp_);
    144 
    145   time_diff_samples = abs(time_diff_samples);
    146 
    147   // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
    148   // If this happens, don't update jitter value. Use 5 secs video frequency
    149   // as the threshold.
    150   if (time_diff_samples < 450000) {
    151     // Note we calculate in Q4 to avoid using float.
    152     int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
    153     jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
    154   }
    155 
    156   // Extended jitter report, RFC 5450.
    157   // Actual network jitter, excluding the source-introduced jitter.
    158   int32_t time_diff_samples_ext =
    159     (receive_time_rtp - last_receive_time_rtp) -
    160     ((header.timestamp +
    161       header.extension.transmissionTimeOffset) -
    162      (last_received_timestamp_ +
    163       last_received_transmission_time_offset_));
    164 
    165   time_diff_samples_ext = abs(time_diff_samples_ext);
    166 
    167   if (time_diff_samples_ext < 450000) {
    168     int32_t jitter_diffQ4TransmissionTimeOffset =
    169       (time_diff_samples_ext << 4) - jitter_q4_transmission_time_offset_;
    170     jitter_q4_transmission_time_offset_ +=
    171       ((jitter_diffQ4TransmissionTimeOffset + 8) >> 4);
    172   }
    173 }
    174 
    175 void StreamStatisticianImpl::NotifyRtpCallback() {
    176   StreamDataCounters data;
    177   uint32_t ssrc;
    178   {
    179     CriticalSectionScoped cs(stream_lock_.get());
    180     data = receive_counters_;
    181     ssrc = ssrc_;
    182   }
    183   rtp_callback_->DataCountersUpdated(data, ssrc);
    184 }
    185 
    186 void StreamStatisticianImpl::NotifyRtcpCallback() {
    187   RtcpStatistics data;
    188   uint32_t ssrc;
    189   {
    190     CriticalSectionScoped cs(stream_lock_.get());
    191     data = last_reported_statistics_;
    192     ssrc = ssrc_;
    193   }
    194   rtcp_callback_->StatisticsUpdated(data, ssrc);
    195 }
    196 
    197 void StreamStatisticianImpl::FecPacketReceived() {
    198   {
    199     CriticalSectionScoped cs(stream_lock_.get());
    200     ++receive_counters_.fec_packets;
    201   }
    202   NotifyRtpCallback();
    203 }
    204 
    205 void StreamStatisticianImpl::SetMaxReorderingThreshold(
    206     int max_reordering_threshold) {
    207   CriticalSectionScoped cs(stream_lock_.get());
    208   max_reordering_threshold_ = max_reordering_threshold;
    209 }
    210 
    211 bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
    212                                            bool reset) {
    213   {
    214     CriticalSectionScoped cs(stream_lock_.get());
    215     if (received_seq_first_ == 0 && receive_counters_.bytes == 0) {
    216       // We have not received anything.
    217       return false;
    218     }
    219 
    220     if (!reset) {
    221       if (last_report_inorder_packets_ == 0) {
    222         // No report.
    223         return false;
    224       }
    225       // Just get last report.
    226       *statistics = last_reported_statistics_;
    227       return true;
    228     }
    229 
    230     *statistics = CalculateRtcpStatistics();
    231   }
    232 
    233   NotifyRtcpCallback();
    234 
    235   return true;
    236 }
    237 
    238 RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
    239   RtcpStatistics stats;
    240 
    241   if (last_report_inorder_packets_ == 0) {
    242     // First time we send a report.
    243     last_report_seq_max_ = received_seq_first_ - 1;
    244   }
    245 
    246   // Calculate fraction lost.
    247   uint16_t exp_since_last = (received_seq_max_ - last_report_seq_max_);
    248 
    249   if (last_report_seq_max_ > received_seq_max_) {
    250     // Can we assume that the seq_num can't go decrease over a full RTCP period?
    251     exp_since_last = 0;
    252   }
    253 
    254   // Number of received RTP packets since last report, counts all packets but
    255   // not re-transmissions.
    256   uint32_t rec_since_last =
    257       (receive_counters_.packets - receive_counters_.retransmitted_packets) -
    258       last_report_inorder_packets_;
    259 
    260   // With NACK we don't know the expected retransmissions during the last
    261   // second. We know how many "old" packets we have received. We just count
    262   // the number of old received to estimate the loss, but it still does not
    263   // guarantee an exact number since we run this based on time triggered by
    264   // sending of an RTP packet. This should have a minimum effect.
    265 
    266   // With NACK we don't count old packets as received since they are
    267   // re-transmitted. We use RTT to decide if a packet is re-ordered or
    268   // re-transmitted.
    269   uint32_t retransmitted_packets =
    270       receive_counters_.retransmitted_packets - last_report_old_packets_;
    271   rec_since_last += retransmitted_packets;
    272 
    273   int32_t missing = 0;
    274   if (exp_since_last > rec_since_last) {
    275     missing = (exp_since_last - rec_since_last);
    276   }
    277   uint8_t local_fraction_lost = 0;
    278   if (exp_since_last) {
    279     // Scale 0 to 255, where 255 is 100% loss.
    280     local_fraction_lost =
    281         static_cast<uint8_t>(255 * missing / exp_since_last);
    282   }
    283   stats.fraction_lost = local_fraction_lost;
    284 
    285   // We need a counter for cumulative loss too.
    286   cumulative_loss_ += missing;
    287   stats.cumulative_lost = cumulative_loss_;
    288   stats.extended_max_sequence_number =
    289       (received_seq_wraps_ << 16) + received_seq_max_;
    290   // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
    291   stats.jitter = jitter_q4_ >> 4;
    292 
    293   // Store this report.
    294   last_reported_statistics_ = stats;
    295 
    296   // Only for report blocks in RTCP SR and RR.
    297   last_report_inorder_packets_ =
    298       receive_counters_.packets - receive_counters_.retransmitted_packets;
    299   last_report_old_packets_ = receive_counters_.retransmitted_packets;
    300   last_report_seq_max_ = received_seq_max_;
    301 
    302   return stats;
    303 }
    304 
    305 void StreamStatisticianImpl::GetDataCounters(
    306     uint32_t* bytes_received, uint32_t* packets_received) const {
    307   CriticalSectionScoped cs(stream_lock_.get());
    308   if (bytes_received) {
    309     *bytes_received = receive_counters_.bytes + receive_counters_.header_bytes +
    310                       receive_counters_.padding_bytes;
    311   }
    312   if (packets_received) {
    313     *packets_received = receive_counters_.packets;
    314   }
    315 }
    316 
    317 uint32_t StreamStatisticianImpl::BitrateReceived() const {
    318   CriticalSectionScoped cs(stream_lock_.get());
    319   return incoming_bitrate_.BitrateNow();
    320 }
    321 
    322 void StreamStatisticianImpl::ProcessBitrate() {
    323   CriticalSectionScoped cs(stream_lock_.get());
    324   incoming_bitrate_.Process();
    325 }
    326 
    327 void StreamStatisticianImpl::LastReceiveTimeNtp(uint32_t* secs,
    328                                                 uint32_t* frac) const {
    329   CriticalSectionScoped cs(stream_lock_.get());
    330   *secs = last_receive_time_secs_;
    331   *frac = last_receive_time_frac_;
    332 }
    333 
    334 bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
    335     const RTPHeader& header, int min_rtt) const {
    336   CriticalSectionScoped cs(stream_lock_.get());
    337   if (InOrderPacketInternal(header.sequenceNumber)) {
    338     return false;
    339   }
    340   uint32_t frequency_khz = header.payload_type_frequency / 1000;
    341   assert(frequency_khz > 0);
    342 
    343   int64_t time_diff_ms = clock_->TimeInMilliseconds() -
    344       last_receive_time_ms_;
    345 
    346   // Diff in time stamp since last received in order.
    347   uint32_t timestamp_diff = header.timestamp - last_received_timestamp_;
    348   int32_t rtp_time_stamp_diff_ms = static_cast<int32_t>(timestamp_diff) /
    349       frequency_khz;
    350 
    351   int32_t max_delay_ms = 0;
    352   if (min_rtt == 0) {
    353     // Jitter standard deviation in samples.
    354     float jitter_std = sqrt(static_cast<float>(jitter_q4_ >> 4));
    355 
    356     // 2 times the standard deviation => 95% confidence.
    357     // And transform to milliseconds by dividing by the frequency in kHz.
    358     max_delay_ms = static_cast<int32_t>((2 * jitter_std) / frequency_khz);
    359 
    360     // Min max_delay_ms is 1.
    361     if (max_delay_ms == 0) {
    362       max_delay_ms = 1;
    363     }
    364   } else {
    365     max_delay_ms = (min_rtt / 3) + 1;
    366   }
    367   return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
    368 }
    369 
    370 bool StreamStatisticianImpl::IsPacketInOrder(uint16_t sequence_number) const {
    371   CriticalSectionScoped cs(stream_lock_.get());
    372   return InOrderPacketInternal(sequence_number);
    373 }
    374 
    375 bool StreamStatisticianImpl::InOrderPacketInternal(
    376     uint16_t sequence_number) const {
    377   // First packet is always in order.
    378   if (last_receive_time_ms_ == 0)
    379     return true;
    380 
    381   if (IsNewerSequenceNumber(sequence_number, received_seq_max_)) {
    382     return true;
    383   } else {
    384     // If we have a restart of the remote side this packet is still in order.
    385     return !IsNewerSequenceNumber(sequence_number, received_seq_max_ -
    386                                   max_reordering_threshold_);
    387   }
    388 }
    389 
    390 ReceiveStatistics* ReceiveStatistics::Create(Clock* clock) {
    391   return new ReceiveStatisticsImpl(clock);
    392 }
    393 
    394 ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
    395     : clock_(clock),
    396       receive_statistics_lock_(CriticalSectionWrapper::CreateCriticalSection()),
    397       last_rate_update_ms_(0),
    398       rtcp_stats_callback_(NULL),
    399       rtp_stats_callback_(NULL) {}
    400 
    401 ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
    402   while (!statisticians_.empty()) {
    403     delete statisticians_.begin()->second;
    404     statisticians_.erase(statisticians_.begin());
    405   }
    406 }
    407 
    408 void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
    409                                            size_t bytes,
    410                                            bool retransmitted) {
    411   StreamStatisticianImpl* impl;
    412   {
    413     CriticalSectionScoped cs(receive_statistics_lock_.get());
    414     StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
    415     if (it != statisticians_.end()) {
    416       impl = it->second;
    417     } else {
    418       impl = new StreamStatisticianImpl(clock_, this, this);
    419       statisticians_[header.ssrc] = impl;
    420     }
    421   }
    422   // StreamStatisticianImpl instance is created once and only destroyed when
    423   // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
    424   // it's own locking so don't hold receive_statistics_lock_ (potential
    425   // deadlock).
    426   impl->IncomingPacket(header, bytes, retransmitted);
    427 }
    428 
    429 void ReceiveStatisticsImpl::FecPacketReceived(uint32_t ssrc) {
    430   CriticalSectionScoped cs(receive_statistics_lock_.get());
    431   StatisticianImplMap::iterator it = statisticians_.find(ssrc);
    432   // Ignore FEC if it is the first packet.
    433   if (it != statisticians_.end()) {
    434     it->second->FecPacketReceived();
    435   }
    436 }
    437 
    438 StatisticianMap ReceiveStatisticsImpl::GetActiveStatisticians() const {
    439   CriticalSectionScoped cs(receive_statistics_lock_.get());
    440   StatisticianMap active_statisticians;
    441   for (StatisticianImplMap::const_iterator it = statisticians_.begin();
    442        it != statisticians_.end(); ++it) {
    443     uint32_t secs;
    444     uint32_t frac;
    445     it->second->LastReceiveTimeNtp(&secs, &frac);
    446     if (clock_->CurrentNtpInMilliseconds() -
    447         Clock::NtpToMs(secs, frac) < kStatisticsTimeoutMs) {
    448       active_statisticians[it->first] = it->second;
    449     }
    450   }
    451   return active_statisticians;
    452 }
    453 
    454 StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
    455     uint32_t ssrc) const {
    456   CriticalSectionScoped cs(receive_statistics_lock_.get());
    457   StatisticianImplMap::const_iterator it = statisticians_.find(ssrc);
    458   if (it == statisticians_.end())
    459     return NULL;
    460   return it->second;
    461 }
    462 
    463 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
    464     int max_reordering_threshold) {
    465   CriticalSectionScoped cs(receive_statistics_lock_.get());
    466   for (StatisticianImplMap::iterator it = statisticians_.begin();
    467        it != statisticians_.end(); ++it) {
    468     it->second->SetMaxReorderingThreshold(max_reordering_threshold);
    469   }
    470 }
    471 
    472 int32_t ReceiveStatisticsImpl::Process() {
    473   CriticalSectionScoped cs(receive_statistics_lock_.get());
    474   for (StatisticianImplMap::iterator it = statisticians_.begin();
    475        it != statisticians_.end(); ++it) {
    476     it->second->ProcessBitrate();
    477   }
    478   last_rate_update_ms_ = clock_->TimeInMilliseconds();
    479   return 0;
    480 }
    481 
    482 int32_t ReceiveStatisticsImpl::TimeUntilNextProcess() {
    483   CriticalSectionScoped cs(receive_statistics_lock_.get());
    484   int time_since_last_update = clock_->TimeInMilliseconds() -
    485       last_rate_update_ms_;
    486   return std::max(kStatisticsProcessIntervalMs - time_since_last_update, 0);
    487 }
    488 
    489 void ReceiveStatisticsImpl::RegisterRtcpStatisticsCallback(
    490     RtcpStatisticsCallback* callback) {
    491   CriticalSectionScoped cs(receive_statistics_lock_.get());
    492   if (callback != NULL)
    493     assert(rtcp_stats_callback_ == NULL);
    494   rtcp_stats_callback_ = callback;
    495 }
    496 
    497 void ReceiveStatisticsImpl::StatisticsUpdated(const RtcpStatistics& statistics,
    498                                               uint32_t ssrc) {
    499   CriticalSectionScoped cs(receive_statistics_lock_.get());
    500   if (rtcp_stats_callback_) {
    501     rtcp_stats_callback_->StatisticsUpdated(statistics, ssrc);
    502   }
    503 }
    504 
    505 void ReceiveStatisticsImpl::RegisterRtpStatisticsCallback(
    506     StreamDataCountersCallback* callback) {
    507   CriticalSectionScoped cs(receive_statistics_lock_.get());
    508   if (callback != NULL)
    509     assert(rtp_stats_callback_ == NULL);
    510   rtp_stats_callback_ = callback;
    511 }
    512 
    513 void ReceiveStatisticsImpl::DataCountersUpdated(const StreamDataCounters& stats,
    514                                                 uint32_t ssrc) {
    515   CriticalSectionScoped cs(receive_statistics_lock_.get());
    516   if (rtp_stats_callback_) {
    517     rtp_stats_callback_->DataCountersUpdated(stats, ssrc);
    518   }
    519 }
    520 
    521 void NullReceiveStatistics::IncomingPacket(const RTPHeader& rtp_header,
    522                                            size_t bytes,
    523                                            bool retransmitted) {}
    524 
    525 void NullReceiveStatistics::FecPacketReceived(uint32_t ssrc) {}
    526 
    527 StatisticianMap NullReceiveStatistics::GetActiveStatisticians() const {
    528   return StatisticianMap();
    529 }
    530 
    531 StreamStatistician* NullReceiveStatistics::GetStatistician(
    532     uint32_t ssrc) const {
    533   return NULL;
    534 }
    535 
    536 void NullReceiveStatistics::SetMaxReorderingThreshold(
    537     int max_reordering_threshold) {}
    538 
    539 int32_t NullReceiveStatistics::TimeUntilNextProcess() { return 0; }
    540 
    541 int32_t NullReceiveStatistics::Process() { return 0; }
    542 
    543 void NullReceiveStatistics::RegisterRtcpStatisticsCallback(
    544     RtcpStatisticsCallback* callback) {}
    545 
    546 void NullReceiveStatistics::RegisterRtpStatisticsCallback(
    547     StreamDataCountersCallback* callback) {}
    548 
    549 }  // namespace webrtc
    550