1 /* 2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/modules/remote_bitrate_estimator/test/bwe_test_framework.h" 12 13 #include <stdio.h> 14 15 #include <sstream> 16 17 namespace webrtc { 18 namespace testing { 19 namespace bwe { 20 class DelayCapHelper { 21 public: 22 DelayCapHelper() : max_delay_us_(0), delay_stats_() {} 23 24 void SetMaxDelay(int max_delay_ms) { 25 BWE_TEST_LOGGING_ENABLE(false); 26 BWE_TEST_LOGGING_LOG1("Max Delay", "%d ms", static_cast<int>(max_delay_ms)); 27 assert(max_delay_ms >= 0); 28 max_delay_us_ = max_delay_ms * 1000; 29 } 30 31 bool ShouldSendPacket(int64_t send_time_us, int64_t arrival_time_us) { 32 int64_t packet_delay_us = send_time_us - arrival_time_us; 33 delay_stats_.Push(std::min(packet_delay_us, max_delay_us_) / 1000); 34 return (max_delay_us_ == 0 || max_delay_us_ >= packet_delay_us); 35 } 36 37 const Stats<double>& delay_stats() const { 38 return delay_stats_; 39 } 40 41 private: 42 int64_t max_delay_us_; 43 Stats<double> delay_stats_; 44 45 DISALLOW_COPY_AND_ASSIGN(DelayCapHelper); 46 }; 47 48 const FlowIds CreateFlowIds(const int *flow_ids_array, size_t num_flow_ids) { 49 FlowIds flow_ids(&flow_ids_array[0], flow_ids_array + num_flow_ids); 50 return flow_ids; 51 } 52 53 class RateCounter { 54 public: 55 RateCounter() 56 : kWindowSizeUs(1000000), 57 packets_per_second_(0), 58 bytes_per_second_(0), 59 last_accumulated_us_(0), 60 window_() {} 61 62 void UpdateRates(int64_t send_time_us, uint32_t payload_size) { 63 packets_per_second_++; 64 bytes_per_second_ += payload_size; 65 last_accumulated_us_ = send_time_us; 66 window_.push_back(std::make_pair(send_time_us, payload_size)); 67 while (!window_.empty()) { 68 const TimeSizePair& packet = window_.front(); 69 if (packet.first > (last_accumulated_us_ - kWindowSizeUs)) { 70 break; 71 } 72 assert(packets_per_second_ >= 1); 73 assert(bytes_per_second_ >= packet.second); 74 packets_per_second_--; 75 bytes_per_second_ -= packet.second; 76 window_.pop_front(); 77 } 78 } 79 80 uint32_t bits_per_second() const { 81 return bytes_per_second_ * 8; 82 } 83 uint32_t packets_per_second() const { return packets_per_second_; } 84 85 private: 86 typedef std::pair<int64_t, uint32_t> TimeSizePair; 87 88 const int64_t kWindowSizeUs; 89 uint32_t packets_per_second_; 90 uint32_t bytes_per_second_; 91 int64_t last_accumulated_us_; 92 std::list<TimeSizePair> window_; 93 }; 94 95 Random::Random(uint32_t seed) 96 : a_(0x531FDB97 ^ seed), 97 b_(0x6420ECA8 + seed) { 98 } 99 100 float Random::Rand() { 101 const float kScale = 1.0f / 0xffffffff; 102 float result = kScale * b_; 103 a_ ^= b_; 104 b_ += a_; 105 return result; 106 } 107 108 int Random::Gaussian(int mean, int standard_deviation) { 109 // Creating a Normal distribution variable from two independent uniform 110 // variables based on the Box-Muller transform, which is defined on the 111 // interval (0, 1], hence the mask+add below. 112 const double kPi = 3.14159265358979323846; 113 const double kScale = 1.0 / 0x80000000ul; 114 double u1 = kScale * ((a_ & 0x7ffffffful) + 1); 115 double u2 = kScale * ((b_ & 0x7ffffffful) + 1); 116 a_ ^= b_; 117 b_ += a_; 118 return static_cast<int>(mean + standard_deviation * 119 sqrt(-2 * log(u1)) * cos(2 * kPi * u2)); 120 } 121 122 Packet::Packet() 123 : flow_id_(0), 124 creation_time_us_(-1), 125 send_time_us_(-1), 126 payload_size_(0) { 127 memset(&header_, 0, sizeof(header_)); 128 } 129 130 Packet::Packet(int flow_id, int64_t send_time_us, uint32_t payload_size, 131 const RTPHeader& header) 132 : flow_id_(flow_id), 133 creation_time_us_(send_time_us), 134 send_time_us_(send_time_us), 135 payload_size_(payload_size), 136 header_(header) { 137 } 138 139 Packet::Packet(int64_t send_time_us, uint32_t sequence_number) 140 : flow_id_(0), 141 creation_time_us_(send_time_us), 142 send_time_us_(send_time_us), 143 payload_size_(0) { 144 memset(&header_, 0, sizeof(header_)); 145 header_.sequenceNumber = sequence_number; 146 } 147 148 bool Packet::operator<(const Packet& rhs) const { 149 return send_time_us_ < rhs.send_time_us_; 150 } 151 152 void Packet::set_send_time_us(int64_t send_time_us) { 153 assert(send_time_us >= 0); 154 send_time_us_ = send_time_us; 155 } 156 157 void Packet::SetAbsSendTimeMs(int64_t abs_send_time_ms) { 158 header_.extension.absoluteSendTime = ((static_cast<int64_t>(abs_send_time_ms * 159 (1 << 18)) + 500) / 1000) & 0x00fffffful; 160 } 161 162 bool IsTimeSorted(const Packets& packets) { 163 PacketsConstIt last_it = packets.begin(); 164 for (PacketsConstIt it = last_it; it != packets.end(); ++it) { 165 if (it != last_it && *it < *last_it) { 166 return false; 167 } 168 last_it = it; 169 } 170 return true; 171 } 172 173 PacketProcessor::PacketProcessor(PacketProcessorListener* listener, 174 bool is_sender) 175 : listener_(listener), flow_ids_(1, 0) { 176 if (listener_) { 177 listener_->AddPacketProcessor(this, is_sender); 178 } 179 } 180 181 PacketProcessor::PacketProcessor(PacketProcessorListener* listener, 182 const FlowIds& flow_ids, 183 bool is_sender) 184 : listener_(listener), flow_ids_(flow_ids) { 185 if (listener_) { 186 listener_->AddPacketProcessor(this, is_sender); 187 } 188 } 189 190 PacketProcessor::~PacketProcessor() { 191 if (listener_) { 192 listener_->RemovePacketProcessor(this); 193 } 194 } 195 196 RateCounterFilter::RateCounterFilter(PacketProcessorListener* listener) 197 : PacketProcessor(listener, false), 198 rate_counter_(new RateCounter()), 199 packets_per_second_stats_(), 200 kbps_stats_(), 201 name_("") {} 202 203 RateCounterFilter::RateCounterFilter(PacketProcessorListener* listener, 204 const std::string& name) 205 : PacketProcessor(listener, false), 206 rate_counter_(new RateCounter()), 207 packets_per_second_stats_(), 208 kbps_stats_(), 209 name_(name) {} 210 211 RateCounterFilter::RateCounterFilter(PacketProcessorListener* listener, 212 const FlowIds& flow_ids, 213 const std::string& name) 214 : PacketProcessor(listener, flow_ids, false), 215 rate_counter_(new RateCounter()), 216 packets_per_second_stats_(), 217 kbps_stats_(), 218 name_(name) { 219 std::stringstream ss; 220 ss << name_ << "_"; 221 for (size_t i = 0; i < flow_ids.size(); ++i) { 222 ss << flow_ids[i] << ","; 223 } 224 name_ = ss.str(); 225 } 226 227 RateCounterFilter::~RateCounterFilter() { 228 LogStats(); 229 } 230 231 uint32_t RateCounterFilter::packets_per_second() const { 232 return rate_counter_->packets_per_second(); 233 } 234 235 uint32_t RateCounterFilter::bits_per_second() const { 236 return rate_counter_->bits_per_second(); 237 } 238 239 void RateCounterFilter::LogStats() { 240 BWE_TEST_LOGGING_CONTEXT("RateCounterFilter"); 241 packets_per_second_stats_.Log("pps"); 242 kbps_stats_.Log("kbps"); 243 } 244 245 Stats<double> RateCounterFilter::GetBitrateStats() const { 246 return kbps_stats_; 247 } 248 249 void RateCounterFilter::Plot(int64_t timestamp_ms) { 250 BWE_TEST_LOGGING_CONTEXT(name_.c_str()); 251 BWE_TEST_LOGGING_PLOT("Throughput_#1", timestamp_ms, 252 rate_counter_->bits_per_second() / 1000.0); 253 } 254 255 void RateCounterFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) { 256 assert(in_out); 257 for (PacketsConstIt it = in_out->begin(); it != in_out->end(); ++it) { 258 rate_counter_->UpdateRates(it->send_time_us(), it->payload_size()); 259 } 260 packets_per_second_stats_.Push(rate_counter_->packets_per_second()); 261 kbps_stats_.Push(rate_counter_->bits_per_second() / 1000.0); 262 } 263 264 LossFilter::LossFilter(PacketProcessorListener* listener) 265 : PacketProcessor(listener, false), 266 random_(0x12345678), 267 loss_fraction_(0.0f) { 268 } 269 270 void LossFilter::SetLoss(float loss_percent) { 271 BWE_TEST_LOGGING_ENABLE(false); 272 BWE_TEST_LOGGING_LOG1("Loss", "%f%%", loss_percent); 273 assert(loss_percent >= 0.0f); 274 assert(loss_percent <= 100.0f); 275 loss_fraction_ = loss_percent * 0.01f; 276 } 277 278 void LossFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) { 279 assert(in_out); 280 for (PacketsIt it = in_out->begin(); it != in_out->end(); ) { 281 if (random_.Rand() < loss_fraction_) { 282 it = in_out->erase(it); 283 } else { 284 ++it; 285 } 286 } 287 } 288 289 DelayFilter::DelayFilter(PacketProcessorListener* listener) 290 : PacketProcessor(listener, false), 291 delay_us_(0), 292 last_send_time_us_(0) { 293 } 294 295 void DelayFilter::SetDelay(int64_t delay_ms) { 296 BWE_TEST_LOGGING_ENABLE(false); 297 BWE_TEST_LOGGING_LOG1("Delay", "%d ms", static_cast<int>(delay_ms)); 298 assert(delay_ms >= 0); 299 delay_us_ = delay_ms * 1000; 300 } 301 302 void DelayFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) { 303 assert(in_out); 304 for (PacketsIt it = in_out->begin(); it != in_out->end(); ++it) { 305 int64_t new_send_time_us = it->send_time_us() + delay_us_; 306 last_send_time_us_ = std::max(last_send_time_us_, new_send_time_us); 307 it->set_send_time_us(last_send_time_us_); 308 } 309 } 310 311 JitterFilter::JitterFilter(PacketProcessorListener* listener) 312 : PacketProcessor(listener, false), 313 random_(0x89674523), 314 stddev_jitter_us_(0), 315 last_send_time_us_(0) { 316 } 317 318 void JitterFilter::SetJitter(int64_t stddev_jitter_ms) { 319 BWE_TEST_LOGGING_ENABLE(false); 320 BWE_TEST_LOGGING_LOG1("Jitter", "%d ms", 321 static_cast<int>(stddev_jitter_ms)); 322 assert(stddev_jitter_ms >= 0); 323 stddev_jitter_us_ = stddev_jitter_ms * 1000; 324 } 325 326 void JitterFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) { 327 assert(in_out); 328 for (PacketsIt it = in_out->begin(); it != in_out->end(); ++it) { 329 int64_t new_send_time_us = it->send_time_us(); 330 new_send_time_us += random_.Gaussian(0, stddev_jitter_us_); 331 last_send_time_us_ = std::max(last_send_time_us_, new_send_time_us); 332 it->set_send_time_us(last_send_time_us_); 333 } 334 } 335 336 ReorderFilter::ReorderFilter(PacketProcessorListener* listener) 337 : PacketProcessor(listener, false), 338 random_(0x27452389), 339 reorder_fraction_(0.0f) { 340 } 341 342 void ReorderFilter::SetReorder(float reorder_percent) { 343 BWE_TEST_LOGGING_ENABLE(false); 344 BWE_TEST_LOGGING_LOG1("Reordering", "%f%%", reorder_percent); 345 assert(reorder_percent >= 0.0f); 346 assert(reorder_percent <= 100.0f); 347 reorder_fraction_ = reorder_percent * 0.01f; 348 } 349 350 void ReorderFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) { 351 assert(in_out); 352 if (in_out->size() >= 2) { 353 PacketsIt last_it = in_out->begin(); 354 PacketsIt it = last_it; 355 while (++it != in_out->end()) { 356 if (random_.Rand() < reorder_fraction_) { 357 int64_t t1 = last_it->send_time_us(); 358 int64_t t2 = it->send_time_us(); 359 std::swap(*last_it, *it); 360 last_it->set_send_time_us(t1); 361 it->set_send_time_us(t2); 362 } 363 last_it = it; 364 } 365 } 366 } 367 368 ChokeFilter::ChokeFilter(PacketProcessorListener* listener) 369 : PacketProcessor(listener, false), 370 kbps_(1200), 371 last_send_time_us_(0), 372 delay_cap_helper_(new DelayCapHelper()) { 373 } 374 375 ChokeFilter::ChokeFilter(PacketProcessorListener* listener, 376 const FlowIds& flow_ids) 377 : PacketProcessor(listener, flow_ids, false), 378 kbps_(1200), 379 last_send_time_us_(0), 380 delay_cap_helper_(new DelayCapHelper()) { 381 } 382 383 ChokeFilter::~ChokeFilter() {} 384 385 void ChokeFilter::SetCapacity(uint32_t kbps) { 386 BWE_TEST_LOGGING_ENABLE(false); 387 BWE_TEST_LOGGING_LOG1("BitrateChoke", "%d kbps", kbps); 388 kbps_ = kbps; 389 } 390 391 void ChokeFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) { 392 assert(in_out); 393 for (PacketsIt it = in_out->begin(); it != in_out->end(); ) { 394 int64_t earliest_send_time_us = last_send_time_us_ + 395 (it->payload_size() * 8 * 1000 + kbps_ / 2) / kbps_; 396 int64_t new_send_time_us = std::max(it->send_time_us(), 397 earliest_send_time_us); 398 if (delay_cap_helper_->ShouldSendPacket(new_send_time_us, 399 it->send_time_us())) { 400 it->set_send_time_us(new_send_time_us); 401 last_send_time_us_ = new_send_time_us; 402 ++it; 403 } else { 404 it = in_out->erase(it); 405 } 406 } 407 } 408 409 void ChokeFilter::SetMaxDelay(int max_delay_ms) { 410 delay_cap_helper_->SetMaxDelay(max_delay_ms); 411 } 412 413 Stats<double> ChokeFilter::GetDelayStats() const { 414 return delay_cap_helper_->delay_stats(); 415 } 416 417 TraceBasedDeliveryFilter::TraceBasedDeliveryFilter( 418 PacketProcessorListener* listener) 419 : PacketProcessor(listener, false), 420 current_offset_us_(0), 421 delivery_times_us_(), 422 next_delivery_it_(), 423 local_time_us_(-1), 424 rate_counter_(new RateCounter), 425 name_(""), 426 delay_cap_helper_(new DelayCapHelper()), 427 packets_per_second_stats_(), 428 kbps_stats_() {} 429 430 TraceBasedDeliveryFilter::TraceBasedDeliveryFilter( 431 PacketProcessorListener* listener, 432 const std::string& name) 433 : PacketProcessor(listener, false), 434 current_offset_us_(0), 435 delivery_times_us_(), 436 next_delivery_it_(), 437 local_time_us_(-1), 438 rate_counter_(new RateCounter), 439 name_(name), 440 delay_cap_helper_(new DelayCapHelper()), 441 packets_per_second_stats_(), 442 kbps_stats_() {} 443 444 TraceBasedDeliveryFilter::~TraceBasedDeliveryFilter() { 445 } 446 447 bool TraceBasedDeliveryFilter::Init(const std::string& filename) { 448 FILE* trace_file = fopen(filename.c_str(), "r"); 449 if (!trace_file) { 450 return false; 451 } 452 int64_t first_timestamp = -1; 453 while(!feof(trace_file)) { 454 const size_t kMaxLineLength = 100; 455 char line[kMaxLineLength]; 456 if (fgets(line, kMaxLineLength, trace_file)) { 457 std::string line_string(line); 458 std::istringstream buffer(line_string); 459 int64_t timestamp; 460 buffer >> timestamp; 461 timestamp /= 1000; // Convert to microseconds. 462 if (first_timestamp == -1) 463 first_timestamp = timestamp; 464 assert(delivery_times_us_.empty() || 465 timestamp - first_timestamp - delivery_times_us_.back() >= 0); 466 delivery_times_us_.push_back(timestamp - first_timestamp); 467 } 468 } 469 assert(!delivery_times_us_.empty()); 470 next_delivery_it_ = delivery_times_us_.begin(); 471 fclose(trace_file); 472 return true; 473 } 474 475 void TraceBasedDeliveryFilter::Plot(int64_t timestamp_ms) { 476 BWE_TEST_LOGGING_CONTEXT(name_.c_str()); 477 // This plots the max possible throughput of the trace-based delivery filter, 478 // which will be reached if a packet sent on every packet slot of the trace. 479 BWE_TEST_LOGGING_PLOT("MaxThroughput_#1", timestamp_ms, 480 rate_counter_->bits_per_second() / 1000.0); 481 } 482 483 void TraceBasedDeliveryFilter::RunFor(int64_t time_ms, Packets* in_out) { 484 assert(in_out); 485 for (PacketsIt it = in_out->begin(); it != in_out->end();) { 486 while (local_time_us_ < it->send_time_us()) { 487 ProceedToNextSlot(); 488 } 489 // Drop any packets that have been queued for too long. 490 while (!delay_cap_helper_->ShouldSendPacket(local_time_us_, 491 it->send_time_us())) { 492 it = in_out->erase(it); 493 if (it == in_out->end()) { 494 return; 495 } 496 } 497 if (local_time_us_ >= it->send_time_us()) { 498 it->set_send_time_us(local_time_us_); 499 ProceedToNextSlot(); 500 } 501 ++it; 502 } 503 packets_per_second_stats_.Push(rate_counter_->packets_per_second()); 504 kbps_stats_.Push(rate_counter_->bits_per_second() / 1000.0); 505 } 506 507 void TraceBasedDeliveryFilter::SetMaxDelay(int max_delay_ms) { 508 delay_cap_helper_->SetMaxDelay(max_delay_ms); 509 } 510 511 Stats<double> TraceBasedDeliveryFilter::GetDelayStats() const { 512 return delay_cap_helper_->delay_stats(); 513 } 514 515 Stats<double> TraceBasedDeliveryFilter::GetBitrateStats() const { 516 return kbps_stats_; 517 } 518 519 void TraceBasedDeliveryFilter::ProceedToNextSlot() { 520 if (*next_delivery_it_ <= local_time_us_) { 521 ++next_delivery_it_; 522 if (next_delivery_it_ == delivery_times_us_.end()) { 523 // When the trace wraps we allow two packets to be sent back-to-back. 524 for (TimeList::iterator it = delivery_times_us_.begin(); 525 it != delivery_times_us_.end(); ++it) { 526 *it += local_time_us_ - current_offset_us_; 527 } 528 current_offset_us_ += local_time_us_ - current_offset_us_; 529 next_delivery_it_ = delivery_times_us_.begin(); 530 } 531 } 532 local_time_us_ = *next_delivery_it_; 533 const int kPayloadSize = 1200; 534 rate_counter_->UpdateRates(local_time_us_, kPayloadSize); 535 } 536 537 PacketSender::PacketSender(PacketProcessorListener* listener) 538 : PacketProcessor(listener, true) {} 539 540 PacketSender::PacketSender(PacketProcessorListener* listener, 541 const FlowIds& flow_ids) 542 : PacketProcessor(listener, flow_ids, true) { 543 544 } 545 546 VideoSender::VideoSender(int flow_id, PacketProcessorListener* listener, 547 float fps, uint32_t kbps, uint32_t ssrc, 548 float first_frame_offset) 549 : PacketSender(listener, FlowIds(1, flow_id)), 550 kMaxPayloadSizeBytes(1200), 551 kTimestampBase(0xff80ff00ul), 552 frame_period_ms_(1000.0 / fps), 553 bytes_per_second_((1000 * kbps) / 8), 554 frame_size_bytes_(bytes_per_second_ / fps), 555 next_frame_ms_(frame_period_ms_ * first_frame_offset), 556 now_ms_(0.0), 557 prototype_header_() { 558 assert(first_frame_offset >= 0.0f); 559 assert(first_frame_offset < 1.0f); 560 memset(&prototype_header_, 0, sizeof(prototype_header_)); 561 prototype_header_.ssrc = ssrc; 562 prototype_header_.sequenceNumber = 0xf000u; 563 } 564 565 uint32_t VideoSender::GetCapacityKbps() const { 566 return (bytes_per_second_ * 8) / 1000; 567 } 568 569 void VideoSender::RunFor(int64_t time_ms, Packets* in_out) { 570 assert(in_out); 571 now_ms_ += time_ms; 572 Packets new_packets; 573 while (now_ms_ >= next_frame_ms_) { 574 prototype_header_.timestamp = kTimestampBase + 575 static_cast<uint32_t>(next_frame_ms_ * 90.0); 576 prototype_header_.extension.transmissionTimeOffset = 0; 577 578 // Generate new packets for this frame, all with the same timestamp, 579 // but the payload size is capped, so if the whole frame doesn't fit in 580 // one packet, we will see a number of equally sized packets followed by 581 // one smaller at the tail. 582 int64_t send_time_us = next_frame_ms_ * 1000.0; 583 uint32_t payload_size = frame_size_bytes_; 584 while (payload_size > 0) { 585 ++prototype_header_.sequenceNumber; 586 uint32_t size = std::min(kMaxPayloadSizeBytes, payload_size); 587 new_packets.push_back(Packet(flow_ids()[0], send_time_us, size, 588 prototype_header_)); 589 new_packets.back().SetAbsSendTimeMs(next_frame_ms_); 590 payload_size -= size; 591 } 592 593 next_frame_ms_ += frame_period_ms_; 594 } 595 in_out->merge(new_packets); 596 } 597 598 AdaptiveVideoSender::AdaptiveVideoSender(int flow_id, 599 PacketProcessorListener* listener, 600 float fps, 601 uint32_t kbps, 602 uint32_t ssrc, 603 float first_frame_offset) 604 : VideoSender(flow_id, listener, fps, kbps, ssrc, first_frame_offset) {} 605 606 void AdaptiveVideoSender::GiveFeedback(const PacketSender::Feedback& feedback) { 607 bytes_per_second_ = feedback.estimated_bps / 8; 608 frame_size_bytes_ = (bytes_per_second_ * frame_period_ms_ + 500) / 1000; 609 } 610 611 PacedVideoSender::PacedVideoSender(PacketProcessorListener* listener, 612 uint32_t kbps, 613 AdaptiveVideoSender* source) 614 // It is important that the first_frame_offset and the initial time of 615 // clock_ are both zero, otherwise we can't have absolute time in this 616 // class. 617 : PacketSender(listener, source->flow_ids()), 618 clock_(0), 619 start_of_run_ms_(0), 620 pacer_(&clock_, this, PacedSender::kDefaultPaceMultiplier * kbps, 0), 621 source_(source) {} 622 623 void PacedVideoSender::RunFor(int64_t time_ms, Packets* in_out) { 624 start_of_run_ms_ = clock_.TimeInMilliseconds(); 625 Packets generated_packets; 626 source_->RunFor(time_ms, &generated_packets); 627 Packets::iterator it = generated_packets.begin(); 628 // Run process periodically to allow the packets to be paced out. 629 const int kProcessIntervalMs = 10; 630 for (int64_t current_time = 0; current_time < time_ms; 631 current_time += kProcessIntervalMs) { 632 int64_t end_of_interval_us = 633 1000 * (clock_.TimeInMilliseconds() + kProcessIntervalMs); 634 while (it != generated_packets.end() && 635 end_of_interval_us >= it->send_time_us()) { 636 // Time to send next packet to pacer. 637 pacer_.SendPacket(PacedSender::kNormalPriority, 638 it->header().ssrc, 639 it->header().sequenceNumber, 640 (it->send_time_us() + 500) / 1000, 641 it->payload_size(), 642 false); 643 pacer_queue_.push_back(*it); 644 const size_t kMaxPacerQueueSize = 1000; 645 if (pacer_queue_.size() > kMaxPacerQueueSize) { 646 pacer_queue_.pop_front(); 647 } 648 ++it; 649 } 650 clock_.AdvanceTimeMilliseconds(kProcessIntervalMs); 651 pacer_.Process(); 652 } 653 QueuePackets(in_out, (start_of_run_ms_ + time_ms) * 1000); 654 } 655 656 void PacedVideoSender::QueuePackets(Packets* batch, 657 int64_t end_of_batch_time_us) { 658 queue_.merge(*batch); 659 if (queue_.empty()) { 660 return; 661 } 662 Packets::iterator it = queue_.begin(); 663 for (; it != queue_.end(); ++it) { 664 if (it->send_time_us() > end_of_batch_time_us) { 665 break; 666 } 667 } 668 Packets to_transfer; 669 to_transfer.splice(to_transfer.begin(), queue_, queue_.begin(), it); 670 batch->merge(to_transfer); 671 } 672 673 void PacedVideoSender::GiveFeedback(const PacketSender::Feedback& feedback) { 674 source_->GiveFeedback(feedback); 675 pacer_.UpdateBitrate( 676 PacedSender::kDefaultPaceMultiplier * feedback.estimated_bps / 1000, 0); 677 } 678 679 bool PacedVideoSender::TimeToSendPacket(uint32_t ssrc, 680 uint16_t sequence_number, 681 int64_t capture_time_ms, 682 bool retransmission) { 683 for (Packets::iterator it = pacer_queue_.begin(); it != pacer_queue_.end(); 684 ++it) { 685 if (it->header().sequenceNumber == sequence_number) { 686 int64_t pace_out_time_ms = clock_.TimeInMilliseconds(); 687 // Make sure a packet is never paced out earlier than when it was put into 688 // the pacer. 689 assert(1000 * pace_out_time_ms >= it->send_time_us()); 690 it->SetAbsSendTimeMs(pace_out_time_ms); 691 it->set_send_time_us(1000 * pace_out_time_ms); 692 queue_.push_back(*it); 693 return true; 694 } 695 } 696 return false; 697 } 698 699 int PacedVideoSender::TimeToSendPadding(int bytes) { 700 return 0; 701 } 702 } // namespace bwe 703 } // namespace testing 704 } // namespace webrtc 705