1 /* 2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.h" 12 13 #include <math.h> 14 15 #include <algorithm> 16 17 #include "webrtc/base/constructormagic.h" 18 #include "webrtc/base/logging.h" 19 #include "webrtc/base/scoped_ptr.h" 20 #include "webrtc/base/thread_annotations.h" 21 #include "webrtc/modules/pacing/paced_sender.h" 22 #include "webrtc/modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h" 23 #include "webrtc/system_wrappers/include/clock.h" 24 #include "webrtc/system_wrappers/include/critical_section_wrapper.h" 25 #include "webrtc/typedefs.h" 26 27 namespace webrtc { 28 29 enum { 30 kTimestampGroupLengthMs = 5, 31 kAbsSendTimeFraction = 18, 32 kAbsSendTimeInterArrivalUpshift = 8, 33 kInterArrivalShift = kAbsSendTimeFraction + kAbsSendTimeInterArrivalUpshift, 34 kInitialProbingIntervalMs = 2000, 35 kMinClusterSize = 4, 36 kMaxProbePackets = 15, 37 kExpectedNumberOfProbes = 3 38 }; 39 40 static const size_t kPropagationDeltaQueueMaxSize = 1000; 41 static const int64_t kPropagationDeltaQueueMaxTimeMs = 1000; 42 static const double kTimestampToMs = 1000.0 / 43 static_cast<double>(1 << kInterArrivalShift); 44 45 // Removes the entries at |index| of |time| and |value|, if time[index] is 46 // smaller than or equal to |deadline|. |time| must be sorted ascendingly. 47 static void RemoveStaleEntries( 48 std::vector<int64_t>* time, std::vector<int>* value, int64_t deadline) { 49 assert(time->size() == value->size()); 50 std::vector<int64_t>::iterator end_of_removal = std::upper_bound( 51 time->begin(), time->end(), deadline); 52 size_t end_of_removal_index = end_of_removal - time->begin(); 53 54 time->erase(time->begin(), end_of_removal); 55 value->erase(value->begin(), value->begin() + end_of_removal_index); 56 } 57 58 template<typename K, typename V> 59 std::vector<K> Keys(const std::map<K, V>& map) { 60 std::vector<K> keys; 61 keys.reserve(map.size()); 62 for (typename std::map<K, V>::const_iterator it = map.begin(); 63 it != map.end(); ++it) { 64 keys.push_back(it->first); 65 } 66 return keys; 67 } 68 69 uint32_t ConvertMsTo24Bits(int64_t time_ms) { 70 uint32_t time_24_bits = 71 static_cast<uint32_t>( 72 ((static_cast<uint64_t>(time_ms) << kAbsSendTimeFraction) + 500) / 73 1000) & 74 0x00FFFFFF; 75 return time_24_bits; 76 } 77 78 bool RemoteBitrateEstimatorAbsSendTime::IsWithinClusterBounds( 79 int send_delta_ms, 80 const Cluster& cluster_aggregate) { 81 if (cluster_aggregate.count == 0) 82 return true; 83 float cluster_mean = cluster_aggregate.send_mean_ms / 84 static_cast<float>(cluster_aggregate.count); 85 return fabs(static_cast<float>(send_delta_ms) - cluster_mean) < 2.5f; 86 } 87 88 void RemoteBitrateEstimatorAbsSendTime::AddCluster( 89 std::list<Cluster>* clusters, 90 Cluster* cluster) { 91 cluster->send_mean_ms /= static_cast<float>(cluster->count); 92 cluster->recv_mean_ms /= static_cast<float>(cluster->count); 93 cluster->mean_size /= cluster->count; 94 clusters->push_back(*cluster); 95 } 96 97 int RemoteBitrateEstimatorAbsSendTime::Id() const { 98 return static_cast<int>(reinterpret_cast<uint64_t>(this)); 99 } 100 101 RemoteBitrateEstimatorAbsSendTime::RemoteBitrateEstimatorAbsSendTime( 102 RemoteBitrateObserver* observer, 103 Clock* clock) 104 : crit_sect_(CriticalSectionWrapper::CreateCriticalSection()), 105 observer_(observer), 106 clock_(clock), 107 ssrcs_(), 108 inter_arrival_(), 109 estimator_(OverUseDetectorOptions()), 110 detector_(OverUseDetectorOptions()), 111 incoming_bitrate_(kBitrateWindowMs, 8000), 112 last_process_time_(-1), 113 process_interval_ms_(kProcessIntervalMs), 114 total_propagation_delta_ms_(0), 115 total_probes_received_(0), 116 first_packet_time_ms_(-1) { 117 assert(observer_); 118 assert(clock_); 119 LOG(LS_INFO) << "RemoteBitrateEstimatorAbsSendTime: Instantiating."; 120 } 121 122 void RemoteBitrateEstimatorAbsSendTime::ComputeClusters( 123 std::list<Cluster>* clusters) const { 124 Cluster current; 125 int64_t prev_send_time = -1; 126 int64_t prev_recv_time = -1; 127 for (std::list<Probe>::const_iterator it = probes_.begin(); 128 it != probes_.end(); 129 ++it) { 130 if (prev_send_time >= 0) { 131 int send_delta_ms = it->send_time_ms - prev_send_time; 132 int recv_delta_ms = it->recv_time_ms - prev_recv_time; 133 if (send_delta_ms >= 1 && recv_delta_ms >= 1) { 134 ++current.num_above_min_delta; 135 } 136 if (!IsWithinClusterBounds(send_delta_ms, current)) { 137 if (current.count >= kMinClusterSize) 138 AddCluster(clusters, ¤t); 139 current = Cluster(); 140 } 141 current.send_mean_ms += send_delta_ms; 142 current.recv_mean_ms += recv_delta_ms; 143 current.mean_size += it->payload_size; 144 ++current.count; 145 } 146 prev_send_time = it->send_time_ms; 147 prev_recv_time = it->recv_time_ms; 148 } 149 if (current.count >= kMinClusterSize) 150 AddCluster(clusters, ¤t); 151 } 152 153 std::list<Cluster>::const_iterator 154 RemoteBitrateEstimatorAbsSendTime::FindBestProbe( 155 const std::list<Cluster>& clusters) const { 156 int highest_probe_bitrate_bps = 0; 157 std::list<Cluster>::const_iterator best_it = clusters.end(); 158 for (std::list<Cluster>::const_iterator it = clusters.begin(); 159 it != clusters.end(); 160 ++it) { 161 if (it->send_mean_ms == 0 || it->recv_mean_ms == 0) 162 continue; 163 int send_bitrate_bps = it->mean_size * 8 * 1000 / it->send_mean_ms; 164 int recv_bitrate_bps = it->mean_size * 8 * 1000 / it->recv_mean_ms; 165 if (it->num_above_min_delta > it->count / 2 && 166 (it->recv_mean_ms - it->send_mean_ms <= 2.0f && 167 it->send_mean_ms - it->recv_mean_ms <= 5.0f)) { 168 int probe_bitrate_bps = 169 std::min(it->GetSendBitrateBps(), it->GetRecvBitrateBps()); 170 if (probe_bitrate_bps > highest_probe_bitrate_bps) { 171 highest_probe_bitrate_bps = probe_bitrate_bps; 172 best_it = it; 173 } 174 } else { 175 LOG(LS_INFO) << "Probe failed, sent at " << send_bitrate_bps 176 << " bps, received at " << recv_bitrate_bps 177 << " bps. Mean send delta: " << it->send_mean_ms 178 << " ms, mean recv delta: " << it->recv_mean_ms 179 << " ms, num probes: " << it->count; 180 break; 181 } 182 } 183 return best_it; 184 } 185 186 void RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) { 187 std::list<Cluster> clusters; 188 ComputeClusters(&clusters); 189 if (clusters.empty()) { 190 // If we reach the max number of probe packets and still have no clusters, 191 // we will remove the oldest one. 192 if (probes_.size() >= kMaxProbePackets) 193 probes_.pop_front(); 194 return; 195 } 196 197 std::list<Cluster>::const_iterator best_it = FindBestProbe(clusters); 198 if (best_it != clusters.end()) { 199 int probe_bitrate_bps = 200 std::min(best_it->GetSendBitrateBps(), best_it->GetRecvBitrateBps()); 201 // Make sure that a probe sent on a lower bitrate than our estimate can't 202 // reduce the estimate. 203 if (IsBitrateImproving(probe_bitrate_bps) && 204 probe_bitrate_bps > static_cast<int>(incoming_bitrate_.Rate(now_ms))) { 205 LOG(LS_INFO) << "Probe successful, sent at " 206 << best_it->GetSendBitrateBps() << " bps, received at " 207 << best_it->GetRecvBitrateBps() 208 << " bps. Mean send delta: " << best_it->send_mean_ms 209 << " ms, mean recv delta: " << best_it->recv_mean_ms 210 << " ms, num probes: " << best_it->count; 211 remote_rate_.SetEstimate(probe_bitrate_bps, now_ms); 212 } 213 } 214 215 // Not probing and received non-probe packet, or finished with current set 216 // of probes. 217 if (clusters.size() >= kExpectedNumberOfProbes) 218 probes_.clear(); 219 } 220 221 bool RemoteBitrateEstimatorAbsSendTime::IsBitrateImproving( 222 int new_bitrate_bps) const { 223 bool initial_probe = !remote_rate_.ValidEstimate() && new_bitrate_bps > 0; 224 bool bitrate_above_estimate = 225 remote_rate_.ValidEstimate() && 226 new_bitrate_bps > static_cast<int>(remote_rate_.LatestEstimate()); 227 return initial_probe || bitrate_above_estimate; 228 } 229 230 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketFeedbackVector( 231 const std::vector<PacketInfo>& packet_feedback_vector) { 232 for (const auto& packet_info : packet_feedback_vector) { 233 IncomingPacketInfo(packet_info.arrival_time_ms, 234 ConvertMsTo24Bits(packet_info.send_time_ms), 235 packet_info.payload_size, 0, packet_info.was_paced); 236 } 237 } 238 239 void RemoteBitrateEstimatorAbsSendTime::IncomingPacket(int64_t arrival_time_ms, 240 size_t payload_size, 241 const RTPHeader& header, 242 bool was_paced) { 243 if (!header.extension.hasAbsoluteSendTime) { 244 LOG(LS_WARNING) << "RemoteBitrateEstimatorAbsSendTimeImpl: Incoming packet " 245 "is missing absolute send time extension!"; 246 return; 247 } 248 IncomingPacketInfo(arrival_time_ms, header.extension.absoluteSendTime, 249 payload_size, header.ssrc, was_paced); 250 } 251 252 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketInfo( 253 int64_t arrival_time_ms, 254 uint32_t send_time_24bits, 255 size_t payload_size, 256 uint32_t ssrc, 257 bool was_paced) { 258 assert(send_time_24bits < (1ul << 24)); 259 // Shift up send time to use the full 32 bits that inter_arrival works with, 260 // so wrapping works properly. 261 uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift; 262 int64_t send_time_ms = static_cast<int64_t>(timestamp) * kTimestampToMs; 263 264 CriticalSectionScoped cs(crit_sect_.get()); 265 int64_t now_ms = clock_->TimeInMilliseconds(); 266 // TODO(holmer): SSRCs are only needed for REMB, should be broken out from 267 // here. 268 ssrcs_[ssrc] = now_ms; 269 incoming_bitrate_.Update(payload_size, now_ms); 270 const BandwidthUsage prior_state = detector_.State(); 271 272 if (first_packet_time_ms_ == -1) 273 first_packet_time_ms_ = clock_->TimeInMilliseconds(); 274 275 uint32_t ts_delta = 0; 276 int64_t t_delta = 0; 277 int size_delta = 0; 278 // For now only try to detect probes while we don't have a valid estimate, and 279 // make sure the packet was paced. We currently assume that only packets 280 // larger than 200 bytes are paced by the sender. 281 was_paced = was_paced && payload_size > PacedSender::kMinProbePacketSize; 282 if (was_paced && 283 (!remote_rate_.ValidEstimate() || 284 now_ms - first_packet_time_ms_ < kInitialProbingIntervalMs)) { 285 // TODO(holmer): Use a map instead to get correct order? 286 if (total_probes_received_ < kMaxProbePackets) { 287 int send_delta_ms = -1; 288 int recv_delta_ms = -1; 289 if (!probes_.empty()) { 290 send_delta_ms = send_time_ms - probes_.back().send_time_ms; 291 recv_delta_ms = arrival_time_ms - probes_.back().recv_time_ms; 292 } 293 LOG(LS_INFO) << "Probe packet received: send time=" << send_time_ms 294 << " ms, recv time=" << arrival_time_ms 295 << " ms, send delta=" << send_delta_ms 296 << " ms, recv delta=" << recv_delta_ms << " ms."; 297 } 298 probes_.push_back(Probe(send_time_ms, arrival_time_ms, payload_size)); 299 ++total_probes_received_; 300 ProcessClusters(now_ms); 301 } 302 if (!inter_arrival_.get()) { 303 inter_arrival_.reset( 304 new InterArrival((kTimestampGroupLengthMs << kInterArrivalShift) / 1000, 305 kTimestampToMs, true)); 306 } 307 if (inter_arrival_->ComputeDeltas(timestamp, arrival_time_ms, payload_size, 308 &ts_delta, &t_delta, &size_delta)) { 309 double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift); 310 estimator_.Update(t_delta, ts_delta_ms, size_delta, detector_.State()); 311 detector_.Detect(estimator_.offset(), ts_delta_ms, 312 estimator_.num_of_deltas(), arrival_time_ms); 313 UpdateStats(static_cast<int>(t_delta - ts_delta_ms), now_ms); 314 } 315 if (detector_.State() == kBwOverusing) { 316 uint32_t incoming_bitrate_bps = incoming_bitrate_.Rate(now_ms); 317 if (prior_state != kBwOverusing || 318 remote_rate_.TimeToReduceFurther(now_ms, incoming_bitrate_bps)) { 319 // The first overuse should immediately trigger a new estimate. 320 // We also have to update the estimate immediately if we are overusing 321 // and the target bitrate is too high compared to what we are receiving. 322 UpdateEstimate(now_ms); 323 } 324 } 325 } 326 327 int32_t RemoteBitrateEstimatorAbsSendTime::Process() { 328 if (TimeUntilNextProcess() > 0) { 329 return 0; 330 } 331 { 332 CriticalSectionScoped cs(crit_sect_.get()); 333 UpdateEstimate(clock_->TimeInMilliseconds()); 334 } 335 last_process_time_ = clock_->TimeInMilliseconds(); 336 return 0; 337 } 338 339 int64_t RemoteBitrateEstimatorAbsSendTime::TimeUntilNextProcess() { 340 if (last_process_time_ < 0) { 341 return 0; 342 } 343 { 344 CriticalSectionScoped cs(crit_sect_.get()); 345 return last_process_time_ + process_interval_ms_ - 346 clock_->TimeInMilliseconds(); 347 } 348 } 349 350 void RemoteBitrateEstimatorAbsSendTime::UpdateEstimate(int64_t now_ms) { 351 if (!inter_arrival_.get()) { 352 // No packets have been received on the active streams. 353 return; 354 } 355 for (Ssrcs::iterator it = ssrcs_.begin(); it != ssrcs_.end();) { 356 if ((now_ms - it->second) > kStreamTimeOutMs) { 357 ssrcs_.erase(it++); 358 } else { 359 ++it; 360 } 361 } 362 if (ssrcs_.empty()) { 363 // We can't update the estimate if we don't have any active streams. 364 inter_arrival_.reset(); 365 // We deliberately don't reset the first_packet_time_ms_ here for now since 366 // we only probe for bandwidth in the beginning of a call right now. 367 return; 368 } 369 370 const RateControlInput input(detector_.State(), 371 incoming_bitrate_.Rate(now_ms), 372 estimator_.var_noise()); 373 remote_rate_.Update(&input, now_ms); 374 unsigned int target_bitrate = remote_rate_.UpdateBandwidthEstimate(now_ms); 375 if (remote_rate_.ValidEstimate()) { 376 process_interval_ms_ = remote_rate_.GetFeedbackInterval(); 377 observer_->OnReceiveBitrateChanged(Keys(ssrcs_), target_bitrate); 378 } 379 } 380 381 void RemoteBitrateEstimatorAbsSendTime::OnRttUpdate(int64_t avg_rtt_ms, 382 int64_t max_rtt_ms) { 383 CriticalSectionScoped cs(crit_sect_.get()); 384 remote_rate_.SetRtt(avg_rtt_ms); 385 } 386 387 void RemoteBitrateEstimatorAbsSendTime::RemoveStream(unsigned int ssrc) { 388 CriticalSectionScoped cs(crit_sect_.get()); 389 ssrcs_.erase(ssrc); 390 } 391 392 bool RemoteBitrateEstimatorAbsSendTime::LatestEstimate( 393 std::vector<unsigned int>* ssrcs, 394 unsigned int* bitrate_bps) const { 395 CriticalSectionScoped cs(crit_sect_.get()); 396 assert(ssrcs); 397 assert(bitrate_bps); 398 if (!remote_rate_.ValidEstimate()) { 399 return false; 400 } 401 *ssrcs = Keys(ssrcs_); 402 if (ssrcs_.empty()) { 403 *bitrate_bps = 0; 404 } else { 405 *bitrate_bps = remote_rate_.LatestEstimate(); 406 } 407 return true; 408 } 409 410 bool RemoteBitrateEstimatorAbsSendTime::GetStats( 411 ReceiveBandwidthEstimatorStats* output) const { 412 { 413 CriticalSectionScoped cs(crit_sect_.get()); 414 output->recent_propagation_time_delta_ms = recent_propagation_delta_ms_; 415 output->recent_arrival_time_ms = recent_update_time_ms_; 416 output->total_propagation_time_delta_ms = total_propagation_delta_ms_; 417 } 418 RemoveStaleEntries( 419 &output->recent_arrival_time_ms, 420 &output->recent_propagation_time_delta_ms, 421 clock_->TimeInMilliseconds() - kPropagationDeltaQueueMaxTimeMs); 422 return true; 423 } 424 425 void RemoteBitrateEstimatorAbsSendTime::UpdateStats(int propagation_delta_ms, 426 int64_t now_ms) { 427 // The caller must enter crit_sect_ before the call. 428 429 // Remove the oldest entry if the size limit is reached. 430 if (recent_update_time_ms_.size() == kPropagationDeltaQueueMaxSize) { 431 recent_update_time_ms_.erase(recent_update_time_ms_.begin()); 432 recent_propagation_delta_ms_.erase(recent_propagation_delta_ms_.begin()); 433 } 434 435 recent_propagation_delta_ms_.push_back(propagation_delta_ms); 436 recent_update_time_ms_.push_back(now_ms); 437 438 RemoveStaleEntries( 439 &recent_update_time_ms_, 440 &recent_propagation_delta_ms_, 441 now_ms - kPropagationDeltaQueueMaxTimeMs); 442 443 total_propagation_delta_ms_ = 444 std::max(total_propagation_delta_ms_ + propagation_delta_ms, 0); 445 } 446 447 void RemoteBitrateEstimatorAbsSendTime::SetMinBitrate(int min_bitrate_bps) { 448 CriticalSectionScoped cs(crit_sect_.get()); 449 remote_rate_.SetMinBitrate(min_bitrate_bps); 450 } 451 } // namespace webrtc 452