1 /* 2 * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/modules/remote_bitrate_estimator/test/packet_sender.h" 12 13 #include <algorithm> 14 #include <list> 15 #include <sstream> 16 17 #include "webrtc/base/checks.h" 18 #include "webrtc/modules/include/module_common_types.h" 19 #include "webrtc/modules/remote_bitrate_estimator/test/bwe.h" 20 #include "webrtc/modules/remote_bitrate_estimator/test/metric_recorder.h" 21 22 namespace webrtc { 23 namespace testing { 24 namespace bwe { 25 26 void PacketSender::Pause() { 27 running_ = false; 28 if (metric_recorder_ != nullptr) { 29 metric_recorder_->PauseFlow(); 30 } 31 } 32 33 void PacketSender::Resume(int64_t paused_time_ms) { 34 running_ = true; 35 if (metric_recorder_ != nullptr) { 36 metric_recorder_->ResumeFlow(paused_time_ms); 37 } 38 } 39 40 void PacketSender::set_metric_recorder(MetricRecorder* metric_recorder) { 41 metric_recorder_ = metric_recorder; 42 } 43 44 void PacketSender::RecordBitrate() { 45 if (metric_recorder_ != nullptr) { 46 BWE_TEST_LOGGING_CONTEXT("Sender"); 47 BWE_TEST_LOGGING_CONTEXT(*flow_ids().begin()); 48 metric_recorder_->UpdateTimeMs(clock_.TimeInMilliseconds()); 49 metric_recorder_->UpdateSendingEstimateKbps(TargetBitrateKbps()); 50 } 51 } 52 53 std::list<FeedbackPacket*> GetFeedbackPackets(Packets* in_out, 54 int64_t end_time_ms, 55 int flow_id) { 56 std::list<FeedbackPacket*> fb_packets; 57 for (auto it = in_out->begin(); it != in_out->end();) { 58 if ((*it)->send_time_us() > 1000 * end_time_ms) 59 break; 60 if ((*it)->GetPacketType() == Packet::kFeedback && 61 flow_id == (*it)->flow_id()) { 62 fb_packets.push_back(static_cast<FeedbackPacket*>(*it)); 63 it = in_out->erase(it); 64 } else { 65 ++it; 66 } 67 } 68 return fb_packets; 69 } 70 71 VideoSender::VideoSender(PacketProcessorListener* listener, 72 VideoSource* source, 73 BandwidthEstimatorType estimator_type) 74 : PacketSender(listener, source->flow_id()), 75 source_(source), 76 bwe_(CreateBweSender(estimator_type, 77 source_->bits_per_second() / 1000, 78 this, 79 &clock_)), 80 previous_sending_bitrate_(0) { 81 modules_.push_back(bwe_.get()); 82 } 83 84 VideoSender::~VideoSender() { 85 } 86 87 void VideoSender::Pause() { 88 previous_sending_bitrate_ = TargetBitrateKbps(); 89 PacketSender::Pause(); 90 } 91 92 void VideoSender::Resume(int64_t paused_time_ms) { 93 source_->SetBitrateBps(previous_sending_bitrate_); 94 PacketSender::Resume(paused_time_ms); 95 } 96 97 void VideoSender::RunFor(int64_t time_ms, Packets* in_out) { 98 std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets( 99 in_out, clock_.TimeInMilliseconds() + time_ms, source_->flow_id()); 100 ProcessFeedbackAndGeneratePackets(time_ms, &feedbacks, in_out); 101 } 102 103 void VideoSender::ProcessFeedbackAndGeneratePackets( 104 int64_t time_ms, 105 std::list<FeedbackPacket*>* feedbacks, 106 Packets* packets) { 107 do { 108 // Make sure to at least run Process() below every 100 ms. 109 int64_t time_to_run_ms = std::min<int64_t>(time_ms, 100); 110 if (!feedbacks->empty()) { 111 int64_t time_until_feedback_ms = 112 feedbacks->front()->send_time_ms() - clock_.TimeInMilliseconds(); 113 time_to_run_ms = 114 std::max<int64_t>(std::min(time_ms, time_until_feedback_ms), 0); 115 } 116 117 if (!running_) { 118 source_->SetBitrateBps(0); 119 } 120 121 Packets generated; 122 source_->RunFor(time_to_run_ms, &generated); 123 bwe_->OnPacketsSent(generated); 124 packets->merge(generated, DereferencingComparator<Packet>); 125 126 clock_.AdvanceTimeMilliseconds(time_to_run_ms); 127 128 if (!feedbacks->empty()) { 129 bwe_->GiveFeedback(*feedbacks->front()); 130 delete feedbacks->front(); 131 feedbacks->pop_front(); 132 } 133 134 bwe_->Process(); 135 136 time_ms -= time_to_run_ms; 137 } while (time_ms > 0); 138 assert(feedbacks->empty()); 139 } 140 141 int VideoSender::GetFeedbackIntervalMs() const { 142 return bwe_->GetFeedbackIntervalMs(); 143 } 144 145 void VideoSender::OnNetworkChanged(uint32_t target_bitrate_bps, 146 uint8_t fraction_lost, 147 int64_t rtt) { 148 source_->SetBitrateBps(target_bitrate_bps); 149 RecordBitrate(); 150 } 151 152 uint32_t VideoSender::TargetBitrateKbps() { 153 return (source_->bits_per_second() + 500) / 1000; 154 } 155 156 PacedVideoSender::PacedVideoSender(PacketProcessorListener* listener, 157 VideoSource* source, 158 BandwidthEstimatorType estimator) 159 : VideoSender(listener, source, estimator), 160 pacer_(&clock_, 161 this, 162 source->bits_per_second() / 1000, 163 PacedSender::kDefaultPaceMultiplier * source->bits_per_second() / 164 1000, 165 0) { 166 modules_.push_back(&pacer_); 167 } 168 169 PacedVideoSender::~PacedVideoSender() { 170 for (Packet* packet : pacer_queue_) 171 delete packet; 172 for (Packet* packet : queue_) 173 delete packet; 174 } 175 176 void PacedVideoSender::RunFor(int64_t time_ms, Packets* in_out) { 177 int64_t end_time_ms = clock_.TimeInMilliseconds() + time_ms; 178 // Run process periodically to allow the packets to be paced out. 179 std::list<FeedbackPacket*> feedbacks = 180 GetFeedbackPackets(in_out, end_time_ms, source_->flow_id()); 181 int64_t last_run_time_ms = -1; 182 BWE_TEST_LOGGING_CONTEXT("Sender"); 183 BWE_TEST_LOGGING_CONTEXT(source_->flow_id()); 184 do { 185 int64_t time_until_process_ms = TimeUntilNextProcess(modules_); 186 int64_t time_until_feedback_ms = time_ms; 187 if (!feedbacks.empty()) 188 time_until_feedback_ms = std::max<int64_t>( 189 feedbacks.front()->send_time_ms() - clock_.TimeInMilliseconds(), 0); 190 191 int64_t time_until_next_event_ms = 192 std::min(time_until_feedback_ms, time_until_process_ms); 193 194 time_until_next_event_ms = 195 std::min(source_->GetTimeUntilNextFrameMs(), time_until_next_event_ms); 196 197 // Never run for longer than we have been asked for. 198 if (clock_.TimeInMilliseconds() + time_until_next_event_ms > end_time_ms) 199 time_until_next_event_ms = end_time_ms - clock_.TimeInMilliseconds(); 200 201 // Make sure we don't get stuck if an event doesn't trigger. This typically 202 // happens if the prober wants to probe, but there's no packet to send. 203 if (time_until_next_event_ms == 0 && last_run_time_ms == 0) 204 time_until_next_event_ms = 1; 205 last_run_time_ms = time_until_next_event_ms; 206 207 Packets generated_packets; 208 source_->RunFor(time_until_next_event_ms, &generated_packets); 209 if (!generated_packets.empty()) { 210 for (Packet* packet : generated_packets) { 211 MediaPacket* media_packet = static_cast<MediaPacket*>(packet); 212 pacer_.InsertPacket( 213 PacedSender::kNormalPriority, media_packet->header().ssrc, 214 media_packet->header().sequenceNumber, media_packet->send_time_ms(), 215 media_packet->payload_size(), false); 216 pacer_queue_.push_back(packet); 217 assert(pacer_queue_.size() < 10000); 218 } 219 } 220 221 clock_.AdvanceTimeMilliseconds(time_until_next_event_ms); 222 223 if (time_until_next_event_ms == time_until_feedback_ms) { 224 if (!feedbacks.empty()) { 225 bwe_->GiveFeedback(*feedbacks.front()); 226 delete feedbacks.front(); 227 feedbacks.pop_front(); 228 } 229 bwe_->Process(); 230 } 231 232 if (time_until_next_event_ms == time_until_process_ms) { 233 CallProcess(modules_); 234 } 235 } while (clock_.TimeInMilliseconds() < end_time_ms); 236 QueuePackets(in_out, end_time_ms * 1000); 237 } 238 239 int64_t PacedVideoSender::TimeUntilNextProcess( 240 const std::list<Module*>& modules) { 241 int64_t time_until_next_process_ms = 10; 242 for (Module* module : modules) { 243 int64_t next_process_ms = module->TimeUntilNextProcess(); 244 if (next_process_ms < time_until_next_process_ms) 245 time_until_next_process_ms = next_process_ms; 246 } 247 if (time_until_next_process_ms < 0) 248 time_until_next_process_ms = 0; 249 return time_until_next_process_ms; 250 } 251 252 void PacedVideoSender::CallProcess(const std::list<Module*>& modules) { 253 for (Module* module : modules) { 254 if (module->TimeUntilNextProcess() <= 0) { 255 module->Process(); 256 } 257 } 258 } 259 260 void PacedVideoSender::QueuePackets(Packets* batch, 261 int64_t end_of_batch_time_us) { 262 queue_.merge(*batch, DereferencingComparator<Packet>); 263 if (queue_.empty()) { 264 return; 265 } 266 Packets::iterator it = queue_.begin(); 267 for (; it != queue_.end(); ++it) { 268 if ((*it)->send_time_us() > end_of_batch_time_us) { 269 break; 270 } 271 } 272 Packets to_transfer; 273 to_transfer.splice(to_transfer.begin(), queue_, queue_.begin(), it); 274 for (Packet* packet : to_transfer) 275 packet->set_paced(true); 276 bwe_->OnPacketsSent(to_transfer); 277 batch->merge(to_transfer, DereferencingComparator<Packet>); 278 } 279 280 bool PacedVideoSender::TimeToSendPacket(uint32_t ssrc, 281 uint16_t sequence_number, 282 int64_t capture_time_ms, 283 bool retransmission) { 284 for (Packets::iterator it = pacer_queue_.begin(); it != pacer_queue_.end(); 285 ++it) { 286 MediaPacket* media_packet = static_cast<MediaPacket*>(*it); 287 if (media_packet->header().sequenceNumber == sequence_number) { 288 int64_t pace_out_time_ms = clock_.TimeInMilliseconds(); 289 290 // Make sure a packet is never paced out earlier than when it was put into 291 // the pacer. 292 assert(pace_out_time_ms >= media_packet->send_time_ms()); 293 294 media_packet->SetAbsSendTimeMs(pace_out_time_ms); 295 media_packet->set_send_time_us(1000 * pace_out_time_ms); 296 media_packet->set_sender_timestamp_us(1000 * pace_out_time_ms); 297 queue_.push_back(media_packet); 298 pacer_queue_.erase(it); 299 return true; 300 } 301 } 302 return false; 303 } 304 305 size_t PacedVideoSender::TimeToSendPadding(size_t bytes) { 306 return 0; 307 } 308 309 void PacedVideoSender::OnNetworkChanged(uint32_t target_bitrate_bps, 310 uint8_t fraction_lost, 311 int64_t rtt) { 312 VideoSender::OnNetworkChanged(target_bitrate_bps, fraction_lost, rtt); 313 pacer_.UpdateBitrate( 314 target_bitrate_bps / 1000, 315 PacedSender::kDefaultPaceMultiplier * target_bitrate_bps / 1000, 0); 316 } 317 318 const int kNoLimit = std::numeric_limits<int>::max(); 319 const int kPacketSizeBytes = 1200; 320 321 TcpSender::TcpSender(PacketProcessorListener* listener, 322 int flow_id, 323 int64_t offset_ms) 324 : TcpSender(listener, flow_id, offset_ms, kNoLimit) { 325 } 326 327 TcpSender::TcpSender(PacketProcessorListener* listener, 328 int flow_id, 329 int64_t offset_ms, 330 int send_limit_bytes) 331 : PacketSender(listener, flow_id), 332 cwnd_(10), 333 ssthresh_(kNoLimit), 334 ack_received_(false), 335 last_acked_seq_num_(0), 336 next_sequence_number_(0), 337 offset_ms_(offset_ms), 338 last_reduction_time_ms_(-1), 339 last_rtt_ms_(0), 340 total_sent_bytes_(0), 341 send_limit_bytes_(send_limit_bytes), 342 last_generated_packets_ms_(0), 343 num_recent_sent_packets_(0), 344 bitrate_kbps_(0) { 345 } 346 347 void TcpSender::RunFor(int64_t time_ms, Packets* in_out) { 348 if (clock_.TimeInMilliseconds() + time_ms < offset_ms_) { 349 clock_.AdvanceTimeMilliseconds(time_ms); 350 if (running_) { 351 Pause(); 352 } 353 return; 354 } 355 356 if (!running_ && total_sent_bytes_ == 0) { 357 Resume(offset_ms_); 358 } 359 360 int64_t start_time_ms = clock_.TimeInMilliseconds(); 361 362 std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets( 363 in_out, clock_.TimeInMilliseconds() + time_ms, *flow_ids().begin()); 364 // The number of packets which are sent in during time_ms depends on the 365 // number of packets in_flight_ and the max number of packets in flight 366 // (cwnd_). Therefore SendPackets() isn't directly dependent on time_ms. 367 for (FeedbackPacket* fb : feedbacks) { 368 clock_.AdvanceTimeMilliseconds(fb->send_time_ms() - 369 clock_.TimeInMilliseconds()); 370 last_rtt_ms_ = fb->send_time_ms() - fb->latest_send_time_ms(); 371 UpdateCongestionControl(fb); 372 SendPackets(in_out); 373 } 374 375 for (auto it = in_flight_.begin(); it != in_flight_.end();) { 376 if (it->time_ms < clock_.TimeInMilliseconds() - 1000) 377 in_flight_.erase(it++); 378 else 379 ++it; 380 } 381 382 clock_.AdvanceTimeMilliseconds(time_ms - 383 (clock_.TimeInMilliseconds() - start_time_ms)); 384 SendPackets(in_out); 385 } 386 387 void TcpSender::SendPackets(Packets* in_out) { 388 int cwnd = ceil(cwnd_); 389 int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0); 390 int timed_out = TriggerTimeouts(); 391 if (timed_out > 0) { 392 HandleLoss(); 393 } 394 if (packets_to_send > 0) { 395 Packets generated = GeneratePackets(packets_to_send); 396 for (Packet* packet : generated) 397 in_flight_.insert(InFlight(*static_cast<MediaPacket*>(packet))); 398 399 in_out->merge(generated, DereferencingComparator<Packet>); 400 } 401 } 402 403 void TcpSender::UpdateCongestionControl(const FeedbackPacket* fb) { 404 const TcpFeedback* tcp_fb = static_cast<const TcpFeedback*>(fb); 405 RTC_DCHECK(!tcp_fb->acked_packets().empty()); 406 ack_received_ = true; 407 408 uint16_t expected = tcp_fb->acked_packets().back() - last_acked_seq_num_; 409 uint16_t missing = 410 expected - static_cast<uint16_t>(tcp_fb->acked_packets().size()); 411 412 for (uint16_t ack_seq_num : tcp_fb->acked_packets()) 413 in_flight_.erase(InFlight(ack_seq_num, clock_.TimeInMilliseconds())); 414 415 if (missing > 0) { 416 HandleLoss(); 417 } else if (cwnd_ <= ssthresh_) { 418 cwnd_ += tcp_fb->acked_packets().size(); 419 } else { 420 cwnd_ += 1.0f / cwnd_; 421 } 422 423 last_acked_seq_num_ = 424 LatestSequenceNumber(tcp_fb->acked_packets().back(), last_acked_seq_num_); 425 } 426 427 int TcpSender::TriggerTimeouts() { 428 int timed_out = 0; 429 for (auto it = in_flight_.begin(); it != in_flight_.end();) { 430 if (it->time_ms < clock_.TimeInMilliseconds() - 1000) { 431 in_flight_.erase(it++); 432 ++timed_out; 433 } else { 434 ++it; 435 } 436 } 437 return timed_out; 438 } 439 440 void TcpSender::HandleLoss() { 441 if (clock_.TimeInMilliseconds() - last_reduction_time_ms_ < last_rtt_ms_) 442 return; 443 last_reduction_time_ms_ = clock_.TimeInMilliseconds(); 444 ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2); 445 cwnd_ = ssthresh_; 446 } 447 448 Packets TcpSender::GeneratePackets(size_t num_packets) { 449 Packets generated; 450 451 UpdateSendBitrateEstimate(num_packets); 452 453 for (size_t i = 0; i < num_packets; ++i) { 454 if ((total_sent_bytes_ + kPacketSizeBytes) > send_limit_bytes_) { 455 if (running_) { 456 Pause(); 457 } 458 break; 459 } 460 generated.push_back( 461 new MediaPacket(*flow_ids().begin(), 1000 * clock_.TimeInMilliseconds(), 462 kPacketSizeBytes, next_sequence_number_++)); 463 generated.back()->set_sender_timestamp_us( 464 1000 * clock_.TimeInMilliseconds()); 465 466 total_sent_bytes_ += kPacketSizeBytes; 467 } 468 469 return generated; 470 } 471 472 void TcpSender::UpdateSendBitrateEstimate(size_t num_packets) { 473 const int kTimeWindowMs = 500; 474 num_recent_sent_packets_ += num_packets; 475 476 int64_t delta_ms = clock_.TimeInMilliseconds() - last_generated_packets_ms_; 477 if (delta_ms >= kTimeWindowMs) { 478 bitrate_kbps_ = 479 static_cast<uint32_t>(8 * num_recent_sent_packets_ * kPacketSizeBytes) / 480 delta_ms; 481 last_generated_packets_ms_ = clock_.TimeInMilliseconds(); 482 num_recent_sent_packets_ = 0; 483 } 484 485 RecordBitrate(); 486 } 487 488 uint32_t TcpSender::TargetBitrateKbps() { 489 return bitrate_kbps_; 490 } 491 492 } // namespace bwe 493 } // namespace testing 494 } // namespace webrtc 495