Home | History | Annotate | Download | only in pacing
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "media/cast/net/pacing/paced_sender.h"
      6 
      7 #include "base/big_endian.h"
      8 #include "base/bind.h"
      9 #include "base/debug/dump_without_crashing.h"
     10 #include "base/message_loop/message_loop.h"
     11 #include "media/cast/logging/logging_impl.h"
     12 
     13 namespace media {
     14 namespace cast {
     15 
     16 namespace {
     17 
     18 static const int64 kPacingIntervalMs = 10;
     19 // Each frame will be split into no more than kPacingMaxBurstsPerFrame
     20 // bursts of packets.
     21 static const size_t kPacingMaxBurstsPerFrame = 3;
     22 static const size_t kMaxDedupeWindowMs = 500;
     23 
     24 // "Impossible" upper-bound on the maximum number of packets that should ever be
     25 // enqueued in the pacer.  This is used to detect bugs, reported as crash dumps.
     26 static const size_t kHugeQueueLengthSeconds = 10;
     27 static const size_t kRidiculousNumberOfPackets =
     28     kHugeQueueLengthSeconds * (kMaxBurstSize * 1000 / kPacingIntervalMs);
     29 
     30 }  // namespace
     31 
     32 DedupInfo::DedupInfo() : last_byte_acked_for_audio(0) {}
     33 
     34 // static
     35 PacketKey PacedPacketSender::MakePacketKey(const base::TimeTicks& ticks,
     36                                            uint32 ssrc,
     37                                            uint16 packet_id) {
     38   return std::make_pair(ticks, std::make_pair(ssrc, packet_id));
     39 }
     40 
     41 PacedSender::PacketSendRecord::PacketSendRecord()
     42     : last_byte_sent(0), last_byte_sent_for_audio(0) {}
     43 
     44 PacedSender::PacedSender(
     45     size_t target_burst_size,
     46     size_t max_burst_size,
     47     base::TickClock* clock,
     48     LoggingImpl* logging,
     49     PacketSender* transport,
     50     const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner)
     51     : clock_(clock),
     52       logging_(logging),
     53       transport_(transport),
     54       transport_task_runner_(transport_task_runner),
     55       audio_ssrc_(0),
     56       video_ssrc_(0),
     57       target_burst_size_(target_burst_size),
     58       max_burst_size_(max_burst_size),
     59       current_max_burst_size_(target_burst_size_),
     60       next_max_burst_size_(target_burst_size_),
     61       next_next_max_burst_size_(target_burst_size_),
     62       current_burst_size_(0),
     63       state_(State_Unblocked),
     64       has_reached_upper_bound_once_(false),
     65       weak_factory_(this) {
     66 }
     67 
     68 PacedSender::~PacedSender() {}
     69 
     70 void PacedSender::RegisterAudioSsrc(uint32 audio_ssrc) {
     71   audio_ssrc_ = audio_ssrc;
     72 }
     73 
     74 void PacedSender::RegisterVideoSsrc(uint32 video_ssrc) {
     75   video_ssrc_ = video_ssrc;
     76 }
     77 
     78 void PacedSender::RegisterPrioritySsrc(uint32 ssrc) {
     79   priority_ssrcs_.push_back(ssrc);
     80 }
     81 
     82 int64 PacedSender::GetLastByteSentForPacket(const PacketKey& packet_key) {
     83   PacketSendHistory::const_iterator it = send_history_.find(packet_key);
     84   if (it == send_history_.end())
     85     return 0;
     86   return it->second.last_byte_sent;
     87 }
     88 
     89 int64 PacedSender::GetLastByteSentForSsrc(uint32 ssrc) {
     90   std::map<uint32, int64>::const_iterator it = last_byte_sent_.find(ssrc);
     91   if (it == last_byte_sent_.end())
     92     return 0;
     93   return it->second;
     94 }
     95 
     96 bool PacedSender::SendPackets(const SendPacketVector& packets) {
     97   if (packets.empty()) {
     98     return true;
     99   }
    100   const bool high_priority = IsHighPriority(packets.begin()->first);
    101   for (size_t i = 0; i < packets.size(); i++) {
    102     DCHECK(IsHighPriority(packets[i].first) == high_priority);
    103     if (high_priority) {
    104       priority_packet_list_[packets[i].first] =
    105           make_pair(PacketType_Normal, packets[i].second);
    106     } else {
    107       packet_list_[packets[i].first] =
    108           make_pair(PacketType_Normal, packets[i].second);
    109     }
    110   }
    111   if (state_ == State_Unblocked) {
    112     SendStoredPackets();
    113   }
    114   return true;
    115 }
    116 
    117 bool PacedSender::ShouldResend(const PacketKey& packet_key,
    118                                const DedupInfo& dedup_info,
    119                                const base::TimeTicks& now) {
    120   PacketSendHistory::const_iterator it = send_history_.find(packet_key);
    121 
    122   // No history of previous transmission. It might be sent too long ago.
    123   if (it == send_history_.end())
    124     return true;
    125 
    126   // Suppose there is request to retransmit X and there is an audio
    127   // packet Y sent just before X. Reject retransmission of X if ACK for
    128   // Y has not been received.
    129   // Only do this for video packets.
    130   if (packet_key.second.first == video_ssrc_) {
    131     if (dedup_info.last_byte_acked_for_audio &&
    132         it->second.last_byte_sent_for_audio &&
    133         dedup_info.last_byte_acked_for_audio <
    134         it->second.last_byte_sent_for_audio) {
    135       return false;
    136     }
    137   }
    138   // Retransmission interval has to be greater than |resend_interval|.
    139   if (now - it->second.time < dedup_info.resend_interval)
    140     return false;
    141   return true;
    142 }
    143 
    144 bool PacedSender::ResendPackets(const SendPacketVector& packets,
    145                                 const DedupInfo& dedup_info) {
    146   if (packets.empty()) {
    147     return true;
    148   }
    149   const bool high_priority = IsHighPriority(packets.begin()->first);
    150   const base::TimeTicks now = clock_->NowTicks();
    151   for (size_t i = 0; i < packets.size(); i++) {
    152     if (!ShouldResend(packets[i].first, dedup_info, now)) {
    153       LogPacketEvent(packets[i].second->data, PACKET_RTX_REJECTED);
    154       continue;
    155     }
    156 
    157     DCHECK(IsHighPriority(packets[i].first) == high_priority);
    158     if (high_priority) {
    159       priority_packet_list_[packets[i].first] =
    160           make_pair(PacketType_Resend, packets[i].second);
    161     } else {
    162       packet_list_[packets[i].first] =
    163           make_pair(PacketType_Resend, packets[i].second);
    164     }
    165   }
    166   if (state_ == State_Unblocked) {
    167     SendStoredPackets();
    168   }
    169   return true;
    170 }
    171 
    172 bool PacedSender::SendRtcpPacket(uint32 ssrc, PacketRef packet) {
    173   if (state_ == State_TransportBlocked) {
    174     priority_packet_list_[
    175         PacedPacketSender::MakePacketKey(base::TimeTicks(), ssrc, 0)] =
    176         make_pair(PacketType_RTCP, packet);
    177   } else {
    178     // We pass the RTCP packets straight through.
    179     if (!transport_->SendPacket(
    180             packet,
    181             base::Bind(&PacedSender::SendStoredPackets,
    182                        weak_factory_.GetWeakPtr()))) {
    183       state_ = State_TransportBlocked;
    184     }
    185   }
    186   return true;
    187 }
    188 
    189 void PacedSender::CancelSendingPacket(const PacketKey& packet_key) {
    190   packet_list_.erase(packet_key);
    191   priority_packet_list_.erase(packet_key);
    192 }
    193 
    194 PacketRef PacedSender::PopNextPacket(PacketType* packet_type,
    195                                      PacketKey* packet_key) {
    196   PacketList* list = !priority_packet_list_.empty() ?
    197       &priority_packet_list_ : &packet_list_;
    198   DCHECK(!list->empty());
    199   PacketList::iterator i = list->begin();
    200   *packet_type = i->second.first;
    201   *packet_key = i->first;
    202   PacketRef ret = i->second.second;
    203   list->erase(i);
    204   return ret;
    205 }
    206 
    207 bool PacedSender::IsHighPriority(const PacketKey& packet_key) const {
    208   return std::find(priority_ssrcs_.begin(), priority_ssrcs_.end(),
    209                    packet_key.second.first) != priority_ssrcs_.end();
    210 }
    211 
    212 bool PacedSender::empty() const {
    213   return packet_list_.empty() && priority_packet_list_.empty();
    214 }
    215 
    216 size_t PacedSender::size() const {
    217   return packet_list_.size() + priority_packet_list_.size();
    218 }
    219 
    220 // This function can be called from three places:
    221 // 1. User called one of the Send* functions and we were in an unblocked state.
    222 // 2. state_ == State_TransportBlocked and the transport is calling us to
    223 //    let us know that it's ok to send again.
    224 // 3. state_ == State_BurstFull and there are still packets to send. In this
    225 //    case we called PostDelayedTask on this function to start a new burst.
    226 void PacedSender::SendStoredPackets() {
    227   State previous_state = state_;
    228   state_ = State_Unblocked;
    229   if (empty()) {
    230     return;
    231   }
    232 
    233   // If the queue ever becomes impossibly long, send a crash dump without
    234   // actually crashing the process.
    235   if (size() > kRidiculousNumberOfPackets && !has_reached_upper_bound_once_) {
    236     NOTREACHED();
    237     // Please use Cr=Internals-Cast label in bug reports:
    238     base::debug::DumpWithoutCrashing();
    239     has_reached_upper_bound_once_ = true;
    240   }
    241 
    242   base::TimeTicks now = clock_->NowTicks();
    243   // I don't actually trust that PostDelayTask(x - now) will mean that
    244   // now >= x when the call happens, so check if the previous state was
    245   // State_BurstFull too.
    246   if (now >= burst_end_ || previous_state == State_BurstFull) {
    247     // Start a new burst.
    248     current_burst_size_ = 0;
    249     burst_end_ = now + base::TimeDelta::FromMilliseconds(kPacingIntervalMs);
    250 
    251     // The goal here is to try to send out the queued packets over the next
    252     // three bursts, while trying to keep the burst size below 10 if possible.
    253     // We have some evidence that sending more than 12 packets in a row doesn't
    254     // work very well, but we don't actually know why yet. Sending out packets
    255     // sooner is better than sending out packets later as that gives us more
    256     // time to re-send them if needed. So if we have less than 30 packets, just
    257     // send 10 at a time. If we have less than 60 packets, send n / 3 at a time.
    258     // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s
    259     // which is more bandwidth than the cast library should need, and sending
    260     // out more data per second is unlikely to be helpful.
    261     size_t max_burst_size = std::min(
    262         max_burst_size_,
    263         std::max(target_burst_size_, size() / kPacingMaxBurstsPerFrame));
    264     current_max_burst_size_ = std::max(next_max_burst_size_, max_burst_size);
    265     next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size);
    266     next_next_max_burst_size_ = max_burst_size;
    267   }
    268 
    269   base::Closure cb = base::Bind(&PacedSender::SendStoredPackets,
    270                                 weak_factory_.GetWeakPtr());
    271   while (!empty()) {
    272     if (current_burst_size_ >= current_max_burst_size_) {
    273       transport_task_runner_->PostDelayedTask(FROM_HERE,
    274                                               cb,
    275                                               burst_end_ - now);
    276       state_ = State_BurstFull;
    277       return;
    278     }
    279     PacketType packet_type;
    280     PacketKey packet_key;
    281     PacketRef packet = PopNextPacket(&packet_type, &packet_key);
    282     PacketSendRecord send_record;
    283     send_record.time = now;
    284 
    285     switch (packet_type) {
    286       case PacketType_Resend:
    287         LogPacketEvent(packet->data, PACKET_RETRANSMITTED);
    288         break;
    289       case PacketType_Normal:
    290         LogPacketEvent(packet->data, PACKET_SENT_TO_NETWORK);
    291         break;
    292       case PacketType_RTCP:
    293         break;
    294     }
    295 
    296     const bool socket_blocked = !transport_->SendPacket(packet, cb);
    297 
    298     // Save the send record.
    299     send_record.last_byte_sent = transport_->GetBytesSent();
    300     send_record.last_byte_sent_for_audio = GetLastByteSentForSsrc(audio_ssrc_);
    301     send_history_[packet_key] = send_record;
    302     send_history_buffer_[packet_key] = send_record;
    303     last_byte_sent_[packet_key.second.first] = send_record.last_byte_sent;
    304 
    305     if (socket_blocked) {
    306       state_ = State_TransportBlocked;
    307       return;
    308     }
    309     current_burst_size_++;
    310   }
    311 
    312   // Keep ~0.5 seconds of data (1000 packets).
    313   if (send_history_buffer_.size() >=
    314       max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs) {
    315     send_history_.swap(send_history_buffer_);
    316     send_history_buffer_.clear();
    317   }
    318   DCHECK_LE(send_history_buffer_.size(),
    319             max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs);
    320   state_ = State_Unblocked;
    321 }
    322 
    323 void PacedSender::LogPacketEvent(const Packet& packet, CastLoggingEvent event) {
    324   // Get SSRC from packet and compare with the audio_ssrc / video_ssrc to see
    325   // if the packet is audio or video.
    326   DCHECK_GE(packet.size(), 12u);
    327   base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[8]), 4);
    328   uint32 ssrc;
    329   bool success = reader.ReadU32(&ssrc);
    330   DCHECK(success);
    331   bool is_audio;
    332   if (ssrc == audio_ssrc_) {
    333     is_audio = true;
    334   } else if (ssrc == video_ssrc_) {
    335     is_audio = false;
    336   } else {
    337     DVLOG(3) << "Got unknown ssrc " << ssrc << " when logging packet event";
    338     return;
    339   }
    340 
    341   EventMediaType media_type = is_audio ? AUDIO_EVENT : VIDEO_EVENT;
    342   logging_->InsertSinglePacketEvent(clock_->NowTicks(), event, media_type,
    343       packet);
    344 }
    345 
    346 }  // namespace cast
    347 }  // namespace media
    348