1 /* 2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/modules/rtp_rtcp/source/receive_statistics_impl.h" 12 13 #include <math.h> 14 15 #include "webrtc/base/scoped_ptr.h" 16 #include "webrtc/modules/rtp_rtcp/source/bitrate.h" 17 #include "webrtc/modules/rtp_rtcp/source/time_util.h" 18 #include "webrtc/system_wrappers/include/critical_section_wrapper.h" 19 20 namespace webrtc { 21 22 const int64_t kStatisticsTimeoutMs = 8000; 23 const int64_t kStatisticsProcessIntervalMs = 1000; 24 25 StreamStatistician::~StreamStatistician() {} 26 27 StreamStatisticianImpl::StreamStatisticianImpl( 28 Clock* clock, 29 RtcpStatisticsCallback* rtcp_callback, 30 StreamDataCountersCallback* rtp_callback) 31 : clock_(clock), 32 stream_lock_(CriticalSectionWrapper::CreateCriticalSection()), 33 incoming_bitrate_(clock, NULL), 34 ssrc_(0), 35 max_reordering_threshold_(kDefaultMaxReorderingThreshold), 36 jitter_q4_(0), 37 cumulative_loss_(0), 38 jitter_q4_transmission_time_offset_(0), 39 last_receive_time_ms_(0), 40 last_received_timestamp_(0), 41 last_received_transmission_time_offset_(0), 42 received_seq_first_(0), 43 received_seq_max_(0), 44 received_seq_wraps_(0), 45 received_packet_overhead_(12), 46 last_report_inorder_packets_(0), 47 last_report_old_packets_(0), 48 last_report_seq_max_(0), 49 rtcp_callback_(rtcp_callback), 50 rtp_callback_(rtp_callback) {} 51 52 void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header, 53 size_t packet_length, 54 bool retransmitted) { 55 UpdateCounters(header, packet_length, retransmitted); 56 NotifyRtpCallback(); 57 } 58 59 void StreamStatisticianImpl::UpdateCounters(const RTPHeader& header, 60 size_t packet_length, 61 bool retransmitted) { 62 CriticalSectionScoped cs(stream_lock_.get()); 63 bool in_order = InOrderPacketInternal(header.sequenceNumber); 64 ssrc_ = header.ssrc; 65 incoming_bitrate_.Update(packet_length); 66 receive_counters_.transmitted.AddPacket(packet_length, header); 67 if (!in_order && retransmitted) { 68 receive_counters_.retransmitted.AddPacket(packet_length, header); 69 } 70 71 if (receive_counters_.transmitted.packets == 1) { 72 received_seq_first_ = header.sequenceNumber; 73 receive_counters_.first_packet_time_ms = clock_->TimeInMilliseconds(); 74 } 75 76 // Count only the new packets received. That is, if packets 1, 2, 3, 5, 4, 6 77 // are received, 4 will be ignored. 78 if (in_order) { 79 // Current time in samples. 80 NtpTime receive_time(*clock_); 81 82 // Wrong if we use RetransmitOfOldPacket. 83 if (receive_counters_.transmitted.packets > 1 && 84 received_seq_max_ > header.sequenceNumber) { 85 // Wrap around detected. 86 received_seq_wraps_++; 87 } 88 // New max. 89 received_seq_max_ = header.sequenceNumber; 90 91 // If new time stamp and more than one in-order packet received, calculate 92 // new jitter statistics. 93 if (header.timestamp != last_received_timestamp_ && 94 (receive_counters_.transmitted.packets - 95 receive_counters_.retransmitted.packets) > 1) { 96 UpdateJitter(header, receive_time); 97 } 98 last_received_timestamp_ = header.timestamp; 99 last_receive_time_ntp_ = receive_time; 100 last_receive_time_ms_ = clock_->TimeInMilliseconds(); 101 } 102 103 size_t packet_oh = header.headerLength + header.paddingLength; 104 105 // Our measured overhead. Filter from RFC 5104 4.2.1.2: 106 // avg_OH (new) = 15/16*avg_OH (old) + 1/16*pckt_OH, 107 received_packet_overhead_ = (15 * received_packet_overhead_ + packet_oh) >> 4; 108 } 109 110 void StreamStatisticianImpl::UpdateJitter(const RTPHeader& header, 111 NtpTime receive_time) { 112 uint32_t receive_time_rtp = 113 NtpToRtp(receive_time, header.payload_type_frequency); 114 uint32_t last_receive_time_rtp = 115 NtpToRtp(last_receive_time_ntp_, header.payload_type_frequency); 116 int32_t time_diff_samples = (receive_time_rtp - last_receive_time_rtp) - 117 (header.timestamp - last_received_timestamp_); 118 119 time_diff_samples = abs(time_diff_samples); 120 121 // lib_jingle sometimes deliver crazy jumps in TS for the same stream. 122 // If this happens, don't update jitter value. Use 5 secs video frequency 123 // as the threshold. 124 if (time_diff_samples < 450000) { 125 // Note we calculate in Q4 to avoid using float. 126 int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_; 127 jitter_q4_ += ((jitter_diff_q4 + 8) >> 4); 128 } 129 130 // Extended jitter report, RFC 5450. 131 // Actual network jitter, excluding the source-introduced jitter. 132 int32_t time_diff_samples_ext = 133 (receive_time_rtp - last_receive_time_rtp) - 134 ((header.timestamp + 135 header.extension.transmissionTimeOffset) - 136 (last_received_timestamp_ + 137 last_received_transmission_time_offset_)); 138 139 time_diff_samples_ext = abs(time_diff_samples_ext); 140 141 if (time_diff_samples_ext < 450000) { 142 int32_t jitter_diffQ4TransmissionTimeOffset = 143 (time_diff_samples_ext << 4) - jitter_q4_transmission_time_offset_; 144 jitter_q4_transmission_time_offset_ += 145 ((jitter_diffQ4TransmissionTimeOffset + 8) >> 4); 146 } 147 } 148 149 void StreamStatisticianImpl::NotifyRtpCallback() { 150 StreamDataCounters data; 151 uint32_t ssrc; 152 { 153 CriticalSectionScoped cs(stream_lock_.get()); 154 data = receive_counters_; 155 ssrc = ssrc_; 156 } 157 rtp_callback_->DataCountersUpdated(data, ssrc); 158 } 159 160 void StreamStatisticianImpl::NotifyRtcpCallback() { 161 RtcpStatistics data; 162 uint32_t ssrc; 163 { 164 CriticalSectionScoped cs(stream_lock_.get()); 165 data = last_reported_statistics_; 166 ssrc = ssrc_; 167 } 168 rtcp_callback_->StatisticsUpdated(data, ssrc); 169 } 170 171 void StreamStatisticianImpl::FecPacketReceived(const RTPHeader& header, 172 size_t packet_length) { 173 { 174 CriticalSectionScoped cs(stream_lock_.get()); 175 receive_counters_.fec.AddPacket(packet_length, header); 176 } 177 NotifyRtpCallback(); 178 } 179 180 void StreamStatisticianImpl::SetMaxReorderingThreshold( 181 int max_reordering_threshold) { 182 CriticalSectionScoped cs(stream_lock_.get()); 183 max_reordering_threshold_ = max_reordering_threshold; 184 } 185 186 bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics, 187 bool reset) { 188 { 189 CriticalSectionScoped cs(stream_lock_.get()); 190 if (received_seq_first_ == 0 && 191 receive_counters_.transmitted.payload_bytes == 0) { 192 // We have not received anything. 193 return false; 194 } 195 196 if (!reset) { 197 if (last_report_inorder_packets_ == 0) { 198 // No report. 199 return false; 200 } 201 // Just get last report. 202 *statistics = last_reported_statistics_; 203 return true; 204 } 205 206 *statistics = CalculateRtcpStatistics(); 207 } 208 209 NotifyRtcpCallback(); 210 211 return true; 212 } 213 214 RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() { 215 RtcpStatistics stats; 216 217 if (last_report_inorder_packets_ == 0) { 218 // First time we send a report. 219 last_report_seq_max_ = received_seq_first_ - 1; 220 } 221 222 // Calculate fraction lost. 223 uint16_t exp_since_last = (received_seq_max_ - last_report_seq_max_); 224 225 if (last_report_seq_max_ > received_seq_max_) { 226 // Can we assume that the seq_num can't go decrease over a full RTCP period? 227 exp_since_last = 0; 228 } 229 230 // Number of received RTP packets since last report, counts all packets but 231 // not re-transmissions. 232 uint32_t rec_since_last = 233 (receive_counters_.transmitted.packets - 234 receive_counters_.retransmitted.packets) - last_report_inorder_packets_; 235 236 // With NACK we don't know the expected retransmissions during the last 237 // second. We know how many "old" packets we have received. We just count 238 // the number of old received to estimate the loss, but it still does not 239 // guarantee an exact number since we run this based on time triggered by 240 // sending of an RTP packet. This should have a minimum effect. 241 242 // With NACK we don't count old packets as received since they are 243 // re-transmitted. We use RTT to decide if a packet is re-ordered or 244 // re-transmitted. 245 uint32_t retransmitted_packets = 246 receive_counters_.retransmitted.packets - last_report_old_packets_; 247 rec_since_last += retransmitted_packets; 248 249 int32_t missing = 0; 250 if (exp_since_last > rec_since_last) { 251 missing = (exp_since_last - rec_since_last); 252 } 253 uint8_t local_fraction_lost = 0; 254 if (exp_since_last) { 255 // Scale 0 to 255, where 255 is 100% loss. 256 local_fraction_lost = 257 static_cast<uint8_t>(255 * missing / exp_since_last); 258 } 259 stats.fraction_lost = local_fraction_lost; 260 261 // We need a counter for cumulative loss too. 262 // TODO(danilchap): Ensure cumulative loss is below maximum value of 2^24. 263 cumulative_loss_ += missing; 264 stats.cumulative_lost = cumulative_loss_; 265 stats.extended_max_sequence_number = 266 (received_seq_wraps_ << 16) + received_seq_max_; 267 // Note: internal jitter value is in Q4 and needs to be scaled by 1/16. 268 stats.jitter = jitter_q4_ >> 4; 269 270 // Store this report. 271 last_reported_statistics_ = stats; 272 273 // Only for report blocks in RTCP SR and RR. 274 last_report_inorder_packets_ = 275 receive_counters_.transmitted.packets - 276 receive_counters_.retransmitted.packets; 277 last_report_old_packets_ = receive_counters_.retransmitted.packets; 278 last_report_seq_max_ = received_seq_max_; 279 280 return stats; 281 } 282 283 void StreamStatisticianImpl::GetDataCounters( 284 size_t* bytes_received, uint32_t* packets_received) const { 285 CriticalSectionScoped cs(stream_lock_.get()); 286 if (bytes_received) { 287 *bytes_received = receive_counters_.transmitted.payload_bytes + 288 receive_counters_.transmitted.header_bytes + 289 receive_counters_.transmitted.padding_bytes; 290 } 291 if (packets_received) { 292 *packets_received = receive_counters_.transmitted.packets; 293 } 294 } 295 296 void StreamStatisticianImpl::GetReceiveStreamDataCounters( 297 StreamDataCounters* data_counters) const { 298 CriticalSectionScoped cs(stream_lock_.get()); 299 *data_counters = receive_counters_; 300 } 301 302 uint32_t StreamStatisticianImpl::BitrateReceived() const { 303 CriticalSectionScoped cs(stream_lock_.get()); 304 return incoming_bitrate_.BitrateNow(); 305 } 306 307 void StreamStatisticianImpl::ProcessBitrate() { 308 CriticalSectionScoped cs(stream_lock_.get()); 309 incoming_bitrate_.Process(); 310 } 311 312 void StreamStatisticianImpl::LastReceiveTimeNtp(uint32_t* secs, 313 uint32_t* frac) const { 314 CriticalSectionScoped cs(stream_lock_.get()); 315 *secs = last_receive_time_ntp_.seconds(); 316 *frac = last_receive_time_ntp_.fractions(); 317 } 318 319 bool StreamStatisticianImpl::IsRetransmitOfOldPacket( 320 const RTPHeader& header, int64_t min_rtt) const { 321 CriticalSectionScoped cs(stream_lock_.get()); 322 if (InOrderPacketInternal(header.sequenceNumber)) { 323 return false; 324 } 325 uint32_t frequency_khz = header.payload_type_frequency / 1000; 326 assert(frequency_khz > 0); 327 328 int64_t time_diff_ms = clock_->TimeInMilliseconds() - 329 last_receive_time_ms_; 330 331 // Diff in time stamp since last received in order. 332 uint32_t timestamp_diff = header.timestamp - last_received_timestamp_; 333 uint32_t rtp_time_stamp_diff_ms = timestamp_diff / frequency_khz; 334 335 int64_t max_delay_ms = 0; 336 if (min_rtt == 0) { 337 // Jitter standard deviation in samples. 338 float jitter_std = sqrt(static_cast<float>(jitter_q4_ >> 4)); 339 340 // 2 times the standard deviation => 95% confidence. 341 // And transform to milliseconds by dividing by the frequency in kHz. 342 max_delay_ms = static_cast<int64_t>((2 * jitter_std) / frequency_khz); 343 344 // Min max_delay_ms is 1. 345 if (max_delay_ms == 0) { 346 max_delay_ms = 1; 347 } 348 } else { 349 max_delay_ms = (min_rtt / 3) + 1; 350 } 351 return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms; 352 } 353 354 bool StreamStatisticianImpl::IsPacketInOrder(uint16_t sequence_number) const { 355 CriticalSectionScoped cs(stream_lock_.get()); 356 return InOrderPacketInternal(sequence_number); 357 } 358 359 bool StreamStatisticianImpl::InOrderPacketInternal( 360 uint16_t sequence_number) const { 361 // First packet is always in order. 362 if (last_receive_time_ms_ == 0) 363 return true; 364 365 if (IsNewerSequenceNumber(sequence_number, received_seq_max_)) { 366 return true; 367 } else { 368 // If we have a restart of the remote side this packet is still in order. 369 return !IsNewerSequenceNumber(sequence_number, received_seq_max_ - 370 max_reordering_threshold_); 371 } 372 } 373 374 ReceiveStatistics* ReceiveStatistics::Create(Clock* clock) { 375 return new ReceiveStatisticsImpl(clock); 376 } 377 378 ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock) 379 : clock_(clock), 380 receive_statistics_lock_(CriticalSectionWrapper::CreateCriticalSection()), 381 last_rate_update_ms_(0), 382 rtcp_stats_callback_(NULL), 383 rtp_stats_callback_(NULL) {} 384 385 ReceiveStatisticsImpl::~ReceiveStatisticsImpl() { 386 while (!statisticians_.empty()) { 387 delete statisticians_.begin()->second; 388 statisticians_.erase(statisticians_.begin()); 389 } 390 } 391 392 void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header, 393 size_t packet_length, 394 bool retransmitted) { 395 StreamStatisticianImpl* impl; 396 { 397 CriticalSectionScoped cs(receive_statistics_lock_.get()); 398 StatisticianImplMap::iterator it = statisticians_.find(header.ssrc); 399 if (it != statisticians_.end()) { 400 impl = it->second; 401 } else { 402 impl = new StreamStatisticianImpl(clock_, this, this); 403 statisticians_[header.ssrc] = impl; 404 } 405 } 406 // StreamStatisticianImpl instance is created once and only destroyed when 407 // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has 408 // it's own locking so don't hold receive_statistics_lock_ (potential 409 // deadlock). 410 impl->IncomingPacket(header, packet_length, retransmitted); 411 } 412 413 void ReceiveStatisticsImpl::FecPacketReceived(const RTPHeader& header, 414 size_t packet_length) { 415 CriticalSectionScoped cs(receive_statistics_lock_.get()); 416 StatisticianImplMap::iterator it = statisticians_.find(header.ssrc); 417 // Ignore FEC if it is the first packet. 418 if (it != statisticians_.end()) { 419 it->second->FecPacketReceived(header, packet_length); 420 } 421 } 422 423 StatisticianMap ReceiveStatisticsImpl::GetActiveStatisticians() const { 424 CriticalSectionScoped cs(receive_statistics_lock_.get()); 425 StatisticianMap active_statisticians; 426 for (StatisticianImplMap::const_iterator it = statisticians_.begin(); 427 it != statisticians_.end(); ++it) { 428 uint32_t secs; 429 uint32_t frac; 430 it->second->LastReceiveTimeNtp(&secs, &frac); 431 if (clock_->CurrentNtpInMilliseconds() - 432 Clock::NtpToMs(secs, frac) < kStatisticsTimeoutMs) { 433 active_statisticians[it->first] = it->second; 434 } 435 } 436 return active_statisticians; 437 } 438 439 StreamStatistician* ReceiveStatisticsImpl::GetStatistician( 440 uint32_t ssrc) const { 441 CriticalSectionScoped cs(receive_statistics_lock_.get()); 442 StatisticianImplMap::const_iterator it = statisticians_.find(ssrc); 443 if (it == statisticians_.end()) 444 return NULL; 445 return it->second; 446 } 447 448 void ReceiveStatisticsImpl::SetMaxReorderingThreshold( 449 int max_reordering_threshold) { 450 CriticalSectionScoped cs(receive_statistics_lock_.get()); 451 for (StatisticianImplMap::iterator it = statisticians_.begin(); 452 it != statisticians_.end(); ++it) { 453 it->second->SetMaxReorderingThreshold(max_reordering_threshold); 454 } 455 } 456 457 int32_t ReceiveStatisticsImpl::Process() { 458 CriticalSectionScoped cs(receive_statistics_lock_.get()); 459 for (StatisticianImplMap::iterator it = statisticians_.begin(); 460 it != statisticians_.end(); ++it) { 461 it->second->ProcessBitrate(); 462 } 463 last_rate_update_ms_ = clock_->TimeInMilliseconds(); 464 return 0; 465 } 466 467 int64_t ReceiveStatisticsImpl::TimeUntilNextProcess() { 468 CriticalSectionScoped cs(receive_statistics_lock_.get()); 469 int64_t time_since_last_update = clock_->TimeInMilliseconds() - 470 last_rate_update_ms_; 471 return std::max<int64_t>( 472 kStatisticsProcessIntervalMs - time_since_last_update, 0); 473 } 474 475 void ReceiveStatisticsImpl::RegisterRtcpStatisticsCallback( 476 RtcpStatisticsCallback* callback) { 477 CriticalSectionScoped cs(receive_statistics_lock_.get()); 478 if (callback != NULL) 479 assert(rtcp_stats_callback_ == NULL); 480 rtcp_stats_callback_ = callback; 481 } 482 483 void ReceiveStatisticsImpl::StatisticsUpdated(const RtcpStatistics& statistics, 484 uint32_t ssrc) { 485 CriticalSectionScoped cs(receive_statistics_lock_.get()); 486 if (rtcp_stats_callback_) 487 rtcp_stats_callback_->StatisticsUpdated(statistics, ssrc); 488 } 489 490 void ReceiveStatisticsImpl::CNameChanged(const char* cname, uint32_t ssrc) { 491 CriticalSectionScoped cs(receive_statistics_lock_.get()); 492 if (rtcp_stats_callback_) 493 rtcp_stats_callback_->CNameChanged(cname, ssrc); 494 } 495 496 void ReceiveStatisticsImpl::RegisterRtpStatisticsCallback( 497 StreamDataCountersCallback* callback) { 498 CriticalSectionScoped cs(receive_statistics_lock_.get()); 499 if (callback != NULL) 500 assert(rtp_stats_callback_ == NULL); 501 rtp_stats_callback_ = callback; 502 } 503 504 void ReceiveStatisticsImpl::DataCountersUpdated(const StreamDataCounters& stats, 505 uint32_t ssrc) { 506 CriticalSectionScoped cs(receive_statistics_lock_.get()); 507 if (rtp_stats_callback_) { 508 rtp_stats_callback_->DataCountersUpdated(stats, ssrc); 509 } 510 } 511 512 void NullReceiveStatistics::IncomingPacket(const RTPHeader& rtp_header, 513 size_t packet_length, 514 bool retransmitted) {} 515 516 void NullReceiveStatistics::FecPacketReceived(const RTPHeader& header, 517 size_t packet_length) {} 518 519 StatisticianMap NullReceiveStatistics::GetActiveStatisticians() const { 520 return StatisticianMap(); 521 } 522 523 StreamStatistician* NullReceiveStatistics::GetStatistician( 524 uint32_t ssrc) const { 525 return NULL; 526 } 527 528 void NullReceiveStatistics::SetMaxReorderingThreshold( 529 int max_reordering_threshold) {} 530 531 int64_t NullReceiveStatistics::TimeUntilNextProcess() { return 0; } 532 533 int32_t NullReceiveStatistics::Process() { return 0; } 534 535 void NullReceiveStatistics::RegisterRtcpStatisticsCallback( 536 RtcpStatisticsCallback* callback) {} 537 538 void NullReceiveStatistics::RegisterRtpStatisticsCallback( 539 StreamDataCountersCallback* callback) {} 540 541 } // namespace webrtc 542