Home | History | Annotate | Download | only in source
      1 /*
      2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/modules/rtp_rtcp/source/receive_statistics_impl.h"
     12 
     13 #include <math.h>
     14 
     15 #include "webrtc/base/scoped_ptr.h"
     16 #include "webrtc/modules/rtp_rtcp/source/bitrate.h"
     17 #include "webrtc/modules/rtp_rtcp/source/time_util.h"
     18 #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
     19 
     20 namespace webrtc {
     21 
     22 const int64_t kStatisticsTimeoutMs = 8000;
     23 const int64_t kStatisticsProcessIntervalMs = 1000;
     24 
     25 StreamStatistician::~StreamStatistician() {}
     26 
     27 StreamStatisticianImpl::StreamStatisticianImpl(
     28     Clock* clock,
     29     RtcpStatisticsCallback* rtcp_callback,
     30     StreamDataCountersCallback* rtp_callback)
     31     : clock_(clock),
     32       stream_lock_(CriticalSectionWrapper::CreateCriticalSection()),
     33       incoming_bitrate_(clock, NULL),
     34       ssrc_(0),
     35       max_reordering_threshold_(kDefaultMaxReorderingThreshold),
     36       jitter_q4_(0),
     37       cumulative_loss_(0),
     38       jitter_q4_transmission_time_offset_(0),
     39       last_receive_time_ms_(0),
     40       last_received_timestamp_(0),
     41       last_received_transmission_time_offset_(0),
     42       received_seq_first_(0),
     43       received_seq_max_(0),
     44       received_seq_wraps_(0),
     45       received_packet_overhead_(12),
     46       last_report_inorder_packets_(0),
     47       last_report_old_packets_(0),
     48       last_report_seq_max_(0),
     49       rtcp_callback_(rtcp_callback),
     50       rtp_callback_(rtp_callback) {}
     51 
     52 void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
     53                                             size_t packet_length,
     54                                             bool retransmitted) {
     55   UpdateCounters(header, packet_length, retransmitted);
     56   NotifyRtpCallback();
     57 }
     58 
     59 void StreamStatisticianImpl::UpdateCounters(const RTPHeader& header,
     60                                             size_t packet_length,
     61                                             bool retransmitted) {
     62   CriticalSectionScoped cs(stream_lock_.get());
     63   bool in_order = InOrderPacketInternal(header.sequenceNumber);
     64   ssrc_ = header.ssrc;
     65   incoming_bitrate_.Update(packet_length);
     66   receive_counters_.transmitted.AddPacket(packet_length, header);
     67   if (!in_order && retransmitted) {
     68     receive_counters_.retransmitted.AddPacket(packet_length, header);
     69   }
     70 
     71   if (receive_counters_.transmitted.packets == 1) {
     72     received_seq_first_ = header.sequenceNumber;
     73     receive_counters_.first_packet_time_ms = clock_->TimeInMilliseconds();
     74   }
     75 
     76   // Count only the new packets received. That is, if packets 1, 2, 3, 5, 4, 6
     77   // are received, 4 will be ignored.
     78   if (in_order) {
     79     // Current time in samples.
     80     NtpTime receive_time(*clock_);
     81 
     82     // Wrong if we use RetransmitOfOldPacket.
     83     if (receive_counters_.transmitted.packets > 1 &&
     84         received_seq_max_ > header.sequenceNumber) {
     85       // Wrap around detected.
     86       received_seq_wraps_++;
     87     }
     88     // New max.
     89     received_seq_max_ = header.sequenceNumber;
     90 
     91     // If new time stamp and more than one in-order packet received, calculate
     92     // new jitter statistics.
     93     if (header.timestamp != last_received_timestamp_ &&
     94         (receive_counters_.transmitted.packets -
     95          receive_counters_.retransmitted.packets) > 1) {
     96       UpdateJitter(header, receive_time);
     97     }
     98     last_received_timestamp_ = header.timestamp;
     99     last_receive_time_ntp_ = receive_time;
    100     last_receive_time_ms_ = clock_->TimeInMilliseconds();
    101   }
    102 
    103   size_t packet_oh = header.headerLength + header.paddingLength;
    104 
    105   // Our measured overhead. Filter from RFC 5104 4.2.1.2:
    106   // avg_OH (new) = 15/16*avg_OH (old) + 1/16*pckt_OH,
    107   received_packet_overhead_ = (15 * received_packet_overhead_ + packet_oh) >> 4;
    108 }
    109 
    110 void StreamStatisticianImpl::UpdateJitter(const RTPHeader& header,
    111                                           NtpTime receive_time) {
    112   uint32_t receive_time_rtp =
    113       NtpToRtp(receive_time, header.payload_type_frequency);
    114   uint32_t last_receive_time_rtp =
    115       NtpToRtp(last_receive_time_ntp_, header.payload_type_frequency);
    116   int32_t time_diff_samples = (receive_time_rtp - last_receive_time_rtp) -
    117       (header.timestamp - last_received_timestamp_);
    118 
    119   time_diff_samples = abs(time_diff_samples);
    120 
    121   // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
    122   // If this happens, don't update jitter value. Use 5 secs video frequency
    123   // as the threshold.
    124   if (time_diff_samples < 450000) {
    125     // Note we calculate in Q4 to avoid using float.
    126     int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
    127     jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
    128   }
    129 
    130   // Extended jitter report, RFC 5450.
    131   // Actual network jitter, excluding the source-introduced jitter.
    132   int32_t time_diff_samples_ext =
    133     (receive_time_rtp - last_receive_time_rtp) -
    134     ((header.timestamp +
    135       header.extension.transmissionTimeOffset) -
    136      (last_received_timestamp_ +
    137       last_received_transmission_time_offset_));
    138 
    139   time_diff_samples_ext = abs(time_diff_samples_ext);
    140 
    141   if (time_diff_samples_ext < 450000) {
    142     int32_t jitter_diffQ4TransmissionTimeOffset =
    143       (time_diff_samples_ext << 4) - jitter_q4_transmission_time_offset_;
    144     jitter_q4_transmission_time_offset_ +=
    145       ((jitter_diffQ4TransmissionTimeOffset + 8) >> 4);
    146   }
    147 }
    148 
    149 void StreamStatisticianImpl::NotifyRtpCallback() {
    150   StreamDataCounters data;
    151   uint32_t ssrc;
    152   {
    153     CriticalSectionScoped cs(stream_lock_.get());
    154     data = receive_counters_;
    155     ssrc = ssrc_;
    156   }
    157   rtp_callback_->DataCountersUpdated(data, ssrc);
    158 }
    159 
    160 void StreamStatisticianImpl::NotifyRtcpCallback() {
    161   RtcpStatistics data;
    162   uint32_t ssrc;
    163   {
    164     CriticalSectionScoped cs(stream_lock_.get());
    165     data = last_reported_statistics_;
    166     ssrc = ssrc_;
    167   }
    168   rtcp_callback_->StatisticsUpdated(data, ssrc);
    169 }
    170 
    171 void StreamStatisticianImpl::FecPacketReceived(const RTPHeader& header,
    172                                                size_t packet_length) {
    173   {
    174     CriticalSectionScoped cs(stream_lock_.get());
    175     receive_counters_.fec.AddPacket(packet_length, header);
    176   }
    177   NotifyRtpCallback();
    178 }
    179 
    180 void StreamStatisticianImpl::SetMaxReorderingThreshold(
    181     int max_reordering_threshold) {
    182   CriticalSectionScoped cs(stream_lock_.get());
    183   max_reordering_threshold_ = max_reordering_threshold;
    184 }
    185 
    186 bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
    187                                            bool reset) {
    188   {
    189     CriticalSectionScoped cs(stream_lock_.get());
    190     if (received_seq_first_ == 0 &&
    191         receive_counters_.transmitted.payload_bytes == 0) {
    192       // We have not received anything.
    193       return false;
    194     }
    195 
    196     if (!reset) {
    197       if (last_report_inorder_packets_ == 0) {
    198         // No report.
    199         return false;
    200       }
    201       // Just get last report.
    202       *statistics = last_reported_statistics_;
    203       return true;
    204     }
    205 
    206     *statistics = CalculateRtcpStatistics();
    207   }
    208 
    209   NotifyRtcpCallback();
    210 
    211   return true;
    212 }
    213 
    214 RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
    215   RtcpStatistics stats;
    216 
    217   if (last_report_inorder_packets_ == 0) {
    218     // First time we send a report.
    219     last_report_seq_max_ = received_seq_first_ - 1;
    220   }
    221 
    222   // Calculate fraction lost.
    223   uint16_t exp_since_last = (received_seq_max_ - last_report_seq_max_);
    224 
    225   if (last_report_seq_max_ > received_seq_max_) {
    226     // Can we assume that the seq_num can't go decrease over a full RTCP period?
    227     exp_since_last = 0;
    228   }
    229 
    230   // Number of received RTP packets since last report, counts all packets but
    231   // not re-transmissions.
    232   uint32_t rec_since_last =
    233       (receive_counters_.transmitted.packets -
    234        receive_counters_.retransmitted.packets) - last_report_inorder_packets_;
    235 
    236   // With NACK we don't know the expected retransmissions during the last
    237   // second. We know how many "old" packets we have received. We just count
    238   // the number of old received to estimate the loss, but it still does not
    239   // guarantee an exact number since we run this based on time triggered by
    240   // sending of an RTP packet. This should have a minimum effect.
    241 
    242   // With NACK we don't count old packets as received since they are
    243   // re-transmitted. We use RTT to decide if a packet is re-ordered or
    244   // re-transmitted.
    245   uint32_t retransmitted_packets =
    246       receive_counters_.retransmitted.packets - last_report_old_packets_;
    247   rec_since_last += retransmitted_packets;
    248 
    249   int32_t missing = 0;
    250   if (exp_since_last > rec_since_last) {
    251     missing = (exp_since_last - rec_since_last);
    252   }
    253   uint8_t local_fraction_lost = 0;
    254   if (exp_since_last) {
    255     // Scale 0 to 255, where 255 is 100% loss.
    256     local_fraction_lost =
    257         static_cast<uint8_t>(255 * missing / exp_since_last);
    258   }
    259   stats.fraction_lost = local_fraction_lost;
    260 
    261   // We need a counter for cumulative loss too.
    262   // TODO(danilchap): Ensure cumulative loss is below maximum value of 2^24.
    263   cumulative_loss_ += missing;
    264   stats.cumulative_lost = cumulative_loss_;
    265   stats.extended_max_sequence_number =
    266       (received_seq_wraps_ << 16) + received_seq_max_;
    267   // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
    268   stats.jitter = jitter_q4_ >> 4;
    269 
    270   // Store this report.
    271   last_reported_statistics_ = stats;
    272 
    273   // Only for report blocks in RTCP SR and RR.
    274   last_report_inorder_packets_ =
    275       receive_counters_.transmitted.packets -
    276       receive_counters_.retransmitted.packets;
    277   last_report_old_packets_ = receive_counters_.retransmitted.packets;
    278   last_report_seq_max_ = received_seq_max_;
    279 
    280   return stats;
    281 }
    282 
    283 void StreamStatisticianImpl::GetDataCounters(
    284     size_t* bytes_received, uint32_t* packets_received) const {
    285   CriticalSectionScoped cs(stream_lock_.get());
    286   if (bytes_received) {
    287     *bytes_received = receive_counters_.transmitted.payload_bytes +
    288                       receive_counters_.transmitted.header_bytes +
    289                       receive_counters_.transmitted.padding_bytes;
    290   }
    291   if (packets_received) {
    292     *packets_received = receive_counters_.transmitted.packets;
    293   }
    294 }
    295 
    296 void StreamStatisticianImpl::GetReceiveStreamDataCounters(
    297     StreamDataCounters* data_counters) const {
    298   CriticalSectionScoped cs(stream_lock_.get());
    299   *data_counters = receive_counters_;
    300 }
    301 
    302 uint32_t StreamStatisticianImpl::BitrateReceived() const {
    303   CriticalSectionScoped cs(stream_lock_.get());
    304   return incoming_bitrate_.BitrateNow();
    305 }
    306 
    307 void StreamStatisticianImpl::ProcessBitrate() {
    308   CriticalSectionScoped cs(stream_lock_.get());
    309   incoming_bitrate_.Process();
    310 }
    311 
    312 void StreamStatisticianImpl::LastReceiveTimeNtp(uint32_t* secs,
    313                                                 uint32_t* frac) const {
    314   CriticalSectionScoped cs(stream_lock_.get());
    315   *secs = last_receive_time_ntp_.seconds();
    316   *frac = last_receive_time_ntp_.fractions();
    317 }
    318 
    319 bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
    320     const RTPHeader& header, int64_t min_rtt) const {
    321   CriticalSectionScoped cs(stream_lock_.get());
    322   if (InOrderPacketInternal(header.sequenceNumber)) {
    323     return false;
    324   }
    325   uint32_t frequency_khz = header.payload_type_frequency / 1000;
    326   assert(frequency_khz > 0);
    327 
    328   int64_t time_diff_ms = clock_->TimeInMilliseconds() -
    329       last_receive_time_ms_;
    330 
    331   // Diff in time stamp since last received in order.
    332   uint32_t timestamp_diff = header.timestamp - last_received_timestamp_;
    333   uint32_t rtp_time_stamp_diff_ms = timestamp_diff / frequency_khz;
    334 
    335   int64_t max_delay_ms = 0;
    336   if (min_rtt == 0) {
    337     // Jitter standard deviation in samples.
    338     float jitter_std = sqrt(static_cast<float>(jitter_q4_ >> 4));
    339 
    340     // 2 times the standard deviation => 95% confidence.
    341     // And transform to milliseconds by dividing by the frequency in kHz.
    342     max_delay_ms = static_cast<int64_t>((2 * jitter_std) / frequency_khz);
    343 
    344     // Min max_delay_ms is 1.
    345     if (max_delay_ms == 0) {
    346       max_delay_ms = 1;
    347     }
    348   } else {
    349     max_delay_ms = (min_rtt / 3) + 1;
    350   }
    351   return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
    352 }
    353 
    354 bool StreamStatisticianImpl::IsPacketInOrder(uint16_t sequence_number) const {
    355   CriticalSectionScoped cs(stream_lock_.get());
    356   return InOrderPacketInternal(sequence_number);
    357 }
    358 
    359 bool StreamStatisticianImpl::InOrderPacketInternal(
    360     uint16_t sequence_number) const {
    361   // First packet is always in order.
    362   if (last_receive_time_ms_ == 0)
    363     return true;
    364 
    365   if (IsNewerSequenceNumber(sequence_number, received_seq_max_)) {
    366     return true;
    367   } else {
    368     // If we have a restart of the remote side this packet is still in order.
    369     return !IsNewerSequenceNumber(sequence_number, received_seq_max_ -
    370                                   max_reordering_threshold_);
    371   }
    372 }
    373 
    374 ReceiveStatistics* ReceiveStatistics::Create(Clock* clock) {
    375   return new ReceiveStatisticsImpl(clock);
    376 }
    377 
    378 ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
    379     : clock_(clock),
    380       receive_statistics_lock_(CriticalSectionWrapper::CreateCriticalSection()),
    381       last_rate_update_ms_(0),
    382       rtcp_stats_callback_(NULL),
    383       rtp_stats_callback_(NULL) {}
    384 
    385 ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
    386   while (!statisticians_.empty()) {
    387     delete statisticians_.begin()->second;
    388     statisticians_.erase(statisticians_.begin());
    389   }
    390 }
    391 
    392 void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
    393                                            size_t packet_length,
    394                                            bool retransmitted) {
    395   StreamStatisticianImpl* impl;
    396   {
    397     CriticalSectionScoped cs(receive_statistics_lock_.get());
    398     StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
    399     if (it != statisticians_.end()) {
    400       impl = it->second;
    401     } else {
    402       impl = new StreamStatisticianImpl(clock_, this, this);
    403       statisticians_[header.ssrc] = impl;
    404     }
    405   }
    406   // StreamStatisticianImpl instance is created once and only destroyed when
    407   // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
    408   // it's own locking so don't hold receive_statistics_lock_ (potential
    409   // deadlock).
    410   impl->IncomingPacket(header, packet_length, retransmitted);
    411 }
    412 
    413 void ReceiveStatisticsImpl::FecPacketReceived(const RTPHeader& header,
    414                                               size_t packet_length) {
    415   CriticalSectionScoped cs(receive_statistics_lock_.get());
    416   StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
    417   // Ignore FEC if it is the first packet.
    418   if (it != statisticians_.end()) {
    419     it->second->FecPacketReceived(header, packet_length);
    420   }
    421 }
    422 
    423 StatisticianMap ReceiveStatisticsImpl::GetActiveStatisticians() const {
    424   CriticalSectionScoped cs(receive_statistics_lock_.get());
    425   StatisticianMap active_statisticians;
    426   for (StatisticianImplMap::const_iterator it = statisticians_.begin();
    427        it != statisticians_.end(); ++it) {
    428     uint32_t secs;
    429     uint32_t frac;
    430     it->second->LastReceiveTimeNtp(&secs, &frac);
    431     if (clock_->CurrentNtpInMilliseconds() -
    432         Clock::NtpToMs(secs, frac) < kStatisticsTimeoutMs) {
    433       active_statisticians[it->first] = it->second;
    434     }
    435   }
    436   return active_statisticians;
    437 }
    438 
    439 StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
    440     uint32_t ssrc) const {
    441   CriticalSectionScoped cs(receive_statistics_lock_.get());
    442   StatisticianImplMap::const_iterator it = statisticians_.find(ssrc);
    443   if (it == statisticians_.end())
    444     return NULL;
    445   return it->second;
    446 }
    447 
    448 void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
    449     int max_reordering_threshold) {
    450   CriticalSectionScoped cs(receive_statistics_lock_.get());
    451   for (StatisticianImplMap::iterator it = statisticians_.begin();
    452        it != statisticians_.end(); ++it) {
    453     it->second->SetMaxReorderingThreshold(max_reordering_threshold);
    454   }
    455 }
    456 
    457 int32_t ReceiveStatisticsImpl::Process() {
    458   CriticalSectionScoped cs(receive_statistics_lock_.get());
    459   for (StatisticianImplMap::iterator it = statisticians_.begin();
    460        it != statisticians_.end(); ++it) {
    461     it->second->ProcessBitrate();
    462   }
    463   last_rate_update_ms_ = clock_->TimeInMilliseconds();
    464   return 0;
    465 }
    466 
    467 int64_t ReceiveStatisticsImpl::TimeUntilNextProcess() {
    468   CriticalSectionScoped cs(receive_statistics_lock_.get());
    469   int64_t time_since_last_update = clock_->TimeInMilliseconds() -
    470       last_rate_update_ms_;
    471   return std::max<int64_t>(
    472       kStatisticsProcessIntervalMs - time_since_last_update, 0);
    473 }
    474 
    475 void ReceiveStatisticsImpl::RegisterRtcpStatisticsCallback(
    476     RtcpStatisticsCallback* callback) {
    477   CriticalSectionScoped cs(receive_statistics_lock_.get());
    478   if (callback != NULL)
    479     assert(rtcp_stats_callback_ == NULL);
    480   rtcp_stats_callback_ = callback;
    481 }
    482 
    483 void ReceiveStatisticsImpl::StatisticsUpdated(const RtcpStatistics& statistics,
    484                                               uint32_t ssrc) {
    485   CriticalSectionScoped cs(receive_statistics_lock_.get());
    486   if (rtcp_stats_callback_)
    487     rtcp_stats_callback_->StatisticsUpdated(statistics, ssrc);
    488 }
    489 
    490 void ReceiveStatisticsImpl::CNameChanged(const char* cname, uint32_t ssrc) {
    491   CriticalSectionScoped cs(receive_statistics_lock_.get());
    492   if (rtcp_stats_callback_)
    493     rtcp_stats_callback_->CNameChanged(cname, ssrc);
    494 }
    495 
    496 void ReceiveStatisticsImpl::RegisterRtpStatisticsCallback(
    497     StreamDataCountersCallback* callback) {
    498   CriticalSectionScoped cs(receive_statistics_lock_.get());
    499   if (callback != NULL)
    500     assert(rtp_stats_callback_ == NULL);
    501   rtp_stats_callback_ = callback;
    502 }
    503 
    504 void ReceiveStatisticsImpl::DataCountersUpdated(const StreamDataCounters& stats,
    505                                                 uint32_t ssrc) {
    506   CriticalSectionScoped cs(receive_statistics_lock_.get());
    507   if (rtp_stats_callback_) {
    508     rtp_stats_callback_->DataCountersUpdated(stats, ssrc);
    509   }
    510 }
    511 
    512 void NullReceiveStatistics::IncomingPacket(const RTPHeader& rtp_header,
    513                                            size_t packet_length,
    514                                            bool retransmitted) {}
    515 
    516 void NullReceiveStatistics::FecPacketReceived(const RTPHeader& header,
    517                                               size_t packet_length) {}
    518 
    519 StatisticianMap NullReceiveStatistics::GetActiveStatisticians() const {
    520   return StatisticianMap();
    521 }
    522 
    523 StreamStatistician* NullReceiveStatistics::GetStatistician(
    524     uint32_t ssrc) const {
    525   return NULL;
    526 }
    527 
    528 void NullReceiveStatistics::SetMaxReorderingThreshold(
    529     int max_reordering_threshold) {}
    530 
    531 int64_t NullReceiveStatistics::TimeUntilNextProcess() { return 0; }
    532 
    533 int32_t NullReceiveStatistics::Process() { return 0; }
    534 
    535 void NullReceiveStatistics::RegisterRtcpStatisticsCallback(
    536     RtcpStatisticsCallback* callback) {}
    537 
    538 void NullReceiveStatistics::RegisterRtpStatisticsCallback(
    539     StreamDataCountersCallback* callback) {}
    540 
    541 }  // namespace webrtc
    542