Home | History | Annotate | Download | only in pacing
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "media/cast/transport/pacing/paced_sender.h"
      6 
      7 #include "base/big_endian.h"
      8 #include "base/bind.h"
      9 #include "base/message_loop/message_loop.h"
     10 
     11 namespace media {
     12 namespace cast {
     13 namespace transport {
     14 
     15 namespace {
     16 
     17 static const int64 kPacingIntervalMs = 10;
     18 // Each frame will be split into no more than kPacingMaxBurstsPerFrame
     19 // bursts of packets.
     20 static const size_t kPacingMaxBurstsPerFrame = 3;
     21 static const size_t kTargetBurstSize = 10;
     22 static const size_t kMaxBurstSize = 20;
     23 static const size_t kMaxDedupeWindowMs = 500;
     24 
     25 }  // namespace
     26 
     27 // static
     28 PacketKey PacedPacketSender::MakePacketKey(const base::TimeTicks& ticks,
     29                                            uint32 ssrc,
     30                                            uint16 packet_id) {
     31   return std::make_pair(ticks, std::make_pair(ssrc, packet_id));
     32 }
     33 
     34 PacedSender::PacedSender(
     35     base::TickClock* clock,
     36     LoggingImpl* logging,
     37     PacketSender* transport,
     38     const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner)
     39     : clock_(clock),
     40       logging_(logging),
     41       transport_(transport),
     42       transport_task_runner_(transport_task_runner),
     43       audio_ssrc_(0),
     44       video_ssrc_(0),
     45       max_burst_size_(kTargetBurstSize),
     46       next_max_burst_size_(kTargetBurstSize),
     47       next_next_max_burst_size_(kTargetBurstSize),
     48       current_burst_size_(0),
     49       state_(State_Unblocked),
     50       weak_factory_(this) {
     51 }
     52 
     53 PacedSender::~PacedSender() {}
     54 
     55 void PacedSender::RegisterAudioSsrc(uint32 audio_ssrc) {
     56   audio_ssrc_ = audio_ssrc;
     57 }
     58 
     59 void PacedSender::RegisterVideoSsrc(uint32 video_ssrc) {
     60   video_ssrc_ = video_ssrc;
     61 }
     62 
     63 bool PacedSender::SendPackets(const SendPacketVector& packets) {
     64   if (packets.empty()) {
     65     return true;
     66   }
     67   for (size_t i = 0; i < packets.size(); i++) {
     68     packet_list_[packets[i].first] =
     69         make_pair(PacketType_Normal, packets[i].second);
     70   }
     71   if (state_ == State_Unblocked) {
     72     SendStoredPackets();
     73   }
     74   return true;
     75 }
     76 
     77 bool PacedSender::ResendPackets(const SendPacketVector& packets,
     78                                 base::TimeDelta dedupe_window) {
     79   if (packets.empty()) {
     80     return true;
     81   }
     82   base::TimeTicks now = clock_->NowTicks();
     83   for (size_t i = 0; i < packets.size(); i++) {
     84     std::map<PacketKey, base::TimeTicks>::const_iterator j =
     85         sent_time_.find(packets[i].first);
     86 
     87     if (j != sent_time_.end() && now - j->second < dedupe_window) {
     88       LogPacketEvent(packets[i].second->data, PACKET_RTX_REJECTED);
     89       continue;
     90     }
     91 
     92     packet_list_[packets[i].first] =
     93         make_pair(PacketType_Resend, packets[i].second);
     94   }
     95   if (state_ == State_Unblocked) {
     96     SendStoredPackets();
     97   }
     98   return true;
     99 }
    100 
    101 bool PacedSender::SendRtcpPacket(uint32 ssrc, PacketRef packet) {
    102   if (state_ == State_TransportBlocked) {
    103     packet_list_[PacedPacketSender::MakePacketKey(base::TimeTicks(), ssrc, 0)] =
    104         make_pair(PacketType_RTCP, packet);
    105   } else {
    106     // We pass the RTCP packets straight through.
    107     if (!transport_->SendPacket(
    108             packet,
    109             base::Bind(&PacedSender::SendStoredPackets,
    110                        weak_factory_.GetWeakPtr()))) {
    111       state_ = State_TransportBlocked;
    112     }
    113 
    114   }
    115   return true;
    116 }
    117 
    118 void PacedSender::CancelSendingPacket(const PacketKey& packet_key) {
    119   packet_list_.erase(packet_key);
    120 }
    121 
    122 PacketRef PacedSender::GetNextPacket(PacketType* packet_type,
    123                                      PacketKey* packet_key) {
    124   std::map<PacketKey, std::pair<PacketType, PacketRef> >::iterator i;
    125   i = packet_list_.begin();
    126   DCHECK(i != packet_list_.end());
    127   *packet_type = i->second.first;
    128   *packet_key = i->first;
    129   PacketRef ret = i->second.second;
    130   packet_list_.erase(i);
    131   return ret;
    132 }
    133 
    134 bool PacedSender::empty() const {
    135   return packet_list_.empty();
    136 }
    137 
    138 size_t PacedSender::size() const {
    139   return packet_list_.size();
    140 }
    141 
    142 // This function can be called from three places:
    143 // 1. User called one of the Send* functions and we were in an unblocked state.
    144 // 2. state_ == State_TransportBlocked and the transport is calling us to
    145 //    let us know that it's ok to send again.
    146 // 3. state_ == State_BurstFull and there are still packets to send. In this
    147 //    case we called PostDelayedTask on this function to start a new burst.
    148 void PacedSender::SendStoredPackets() {
    149   State previous_state = state_;
    150   state_ = State_Unblocked;
    151   if (empty()) {
    152     return;
    153   }
    154 
    155   base::TimeTicks now = clock_->NowTicks();
    156   // I don't actually trust that PostDelayTask(x - now) will mean that
    157   // now >= x when the call happens, so check if the previous state was
    158   // State_BurstFull too.
    159   if (now >= burst_end_ || previous_state == State_BurstFull) {
    160     // Start a new burst.
    161     current_burst_size_ = 0;
    162     burst_end_ = now + base::TimeDelta::FromMilliseconds(kPacingIntervalMs);
    163 
    164     // The goal here is to try to send out the queued packets over the next
    165     // three bursts, while trying to keep the burst size below 10 if possible.
    166     // We have some evidence that sending more than 12 packets in a row doesn't
    167     // work very well, but we don't actually know why yet. Sending out packets
    168     // sooner is better than sending out packets later as that gives us more
    169     // time to re-send them if needed. So if we have less than 30 packets, just
    170     // send 10 at a time. If we have less than 60 packets, send n / 3 at a time.
    171     // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s
    172     // which is more bandwidth than the cast library should need, and sending
    173     // out more data per second is unlikely to be helpful.
    174     size_t max_burst_size = std::min(
    175         kMaxBurstSize,
    176         std::max(kTargetBurstSize, size() / kPacingMaxBurstsPerFrame));
    177 
    178     // If the queue is long, issue a warning. Try to limit the number of
    179     // warnings issued by only issuing the warning when the burst size
    180     // grows. Otherwise we might get 100 warnings per second.
    181     if (max_burst_size > next_next_max_burst_size_ && size() > 100) {
    182       LOG(WARNING) << "Packet queue is very long:" << size();
    183     }
    184 
    185     max_burst_size_ = std::max(next_max_burst_size_, max_burst_size);
    186     next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size);
    187     next_next_max_burst_size_ = max_burst_size;
    188   }
    189 
    190   base::Closure cb = base::Bind(&PacedSender::SendStoredPackets,
    191                                 weak_factory_.GetWeakPtr());
    192   while (!empty()) {
    193     if (current_burst_size_ >= max_burst_size_) {
    194       transport_task_runner_->PostDelayedTask(FROM_HERE,
    195                                               cb,
    196                                               burst_end_ - now);
    197       state_ = State_BurstFull;
    198       return;
    199     }
    200     PacketType packet_type;
    201     PacketKey packet_key;
    202     PacketRef packet = GetNextPacket(&packet_type, &packet_key);
    203     sent_time_[packet_key] = now;
    204     sent_time_buffer_[packet_key] = now;
    205 
    206     switch (packet_type) {
    207       case PacketType_Resend:
    208         LogPacketEvent(packet->data, PACKET_RETRANSMITTED);
    209         break;
    210       case PacketType_Normal:
    211         LogPacketEvent(packet->data, PACKET_SENT_TO_NETWORK);
    212         break;
    213       case PacketType_RTCP:
    214         break;
    215     }
    216     if (!transport_->SendPacket(packet, cb)) {
    217       state_ = State_TransportBlocked;
    218       return;
    219     }
    220     current_burst_size_++;
    221   }
    222   // Keep ~0.5 seconds of data (1000 packets)
    223   if (sent_time_buffer_.size() >=
    224       kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs) {
    225     sent_time_.swap(sent_time_buffer_);
    226     sent_time_buffer_.clear();
    227   }
    228   DCHECK_LE(sent_time_buffer_.size(),
    229             kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs);
    230   DCHECK_LE(sent_time_.size(),
    231             2 * kMaxBurstSize * kMaxDedupeWindowMs / kPacingIntervalMs);
    232   state_ = State_Unblocked;
    233 }
    234 
    235 void PacedSender::LogPacketEvent(const Packet& packet, CastLoggingEvent event) {
    236   // Get SSRC from packet and compare with the audio_ssrc / video_ssrc to see
    237   // if the packet is audio or video.
    238   DCHECK_GE(packet.size(), 12u);
    239   base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[8]), 4);
    240   uint32 ssrc;
    241   bool success = reader.ReadU32(&ssrc);
    242   DCHECK(success);
    243   bool is_audio;
    244   if (ssrc == audio_ssrc_) {
    245     is_audio = true;
    246   } else if (ssrc == video_ssrc_) {
    247     is_audio = false;
    248   } else {
    249     DVLOG(3) << "Got unknown ssrc " << ssrc << " when logging packet event";
    250     return;
    251   }
    252 
    253   EventMediaType media_type = is_audio ? AUDIO_EVENT : VIDEO_EVENT;
    254   logging_->InsertSinglePacketEvent(clock_->NowTicks(), event, media_type,
    255       packet);
    256 }
    257 
    258 }  // namespace transport
    259 }  // namespace cast
    260 }  // namespace media
    261