Home | History | Annotate | Download | only in receiver
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "media/cast/receiver/frame_receiver.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/big_endian.h"
     10 #include "base/bind.h"
     11 #include "base/logging.h"
     12 #include "base/message_loop/message_loop.h"
     13 #include "media/cast/cast_environment.h"
     14 
     15 namespace {
     16 const int kMinSchedulingDelayMs = 1;
     17 }  // namespace
     18 
     19 namespace media {
     20 namespace cast {
     21 
     22 FrameReceiver::FrameReceiver(
     23     const scoped_refptr<CastEnvironment>& cast_environment,
     24     const FrameReceiverConfig& config,
     25     EventMediaType event_media_type,
     26     PacedPacketSender* const packet_sender)
     27     : cast_environment_(cast_environment),
     28       packet_parser_(config.incoming_ssrc, config.rtp_payload_type),
     29       stats_(cast_environment->Clock()),
     30       event_media_type_(event_media_type),
     31       event_subscriber_(kReceiverRtcpEventHistorySize, event_media_type),
     32       rtp_timebase_(config.frequency),
     33       target_playout_delay_(
     34           base::TimeDelta::FromMilliseconds(config.rtp_max_delay_ms)),
     35       expected_frame_duration_(
     36           base::TimeDelta::FromSeconds(1) / config.max_frame_rate),
     37       reports_are_scheduled_(false),
     38       framer_(cast_environment->Clock(),
     39               this,
     40               config.incoming_ssrc,
     41               true,
     42               config.rtp_max_delay_ms * config.max_frame_rate / 1000),
     43       rtcp_(RtcpCastMessageCallback(),
     44             RtcpRttCallback(),
     45             RtcpLogMessageCallback(),
     46             cast_environment_->Clock(),
     47             packet_sender,
     48             config.feedback_ssrc,
     49             config.incoming_ssrc),
     50       is_waiting_for_consecutive_frame_(false),
     51       lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()),
     52       rtcp_interval_(base::TimeDelta::FromMilliseconds(config.rtcp_interval)),
     53       weak_factory_(this) {
     54   DCHECK_GT(config.rtp_max_delay_ms, 0);
     55   DCHECK_GT(config.max_frame_rate, 0);
     56   decryptor_.Initialize(config.aes_key, config.aes_iv_mask);
     57   cast_environment_->Logging()->AddRawEventSubscriber(&event_subscriber_);
     58   memset(frame_id_to_rtp_timestamp_, 0, sizeof(frame_id_to_rtp_timestamp_));
     59 }
     60 
     61 FrameReceiver::~FrameReceiver() {
     62   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
     63   cast_environment_->Logging()->RemoveRawEventSubscriber(&event_subscriber_);
     64 }
     65 
     66 void FrameReceiver::RequestEncodedFrame(
     67     const ReceiveEncodedFrameCallback& callback) {
     68   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
     69   frame_request_queue_.push_back(callback);
     70   EmitAvailableEncodedFrames();
     71 }
     72 
     73 bool FrameReceiver::ProcessPacket(scoped_ptr<Packet> packet) {
     74   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
     75 
     76   if (Rtcp::IsRtcpPacket(&packet->front(), packet->size())) {
     77     rtcp_.IncomingRtcpPacket(&packet->front(), packet->size());
     78   } else {
     79     RtpCastHeader rtp_header;
     80     const uint8* payload_data;
     81     size_t payload_size;
     82     if (!packet_parser_.ParsePacket(&packet->front(),
     83                                     packet->size(),
     84                                     &rtp_header,
     85                                     &payload_data,
     86                                     &payload_size)) {
     87       return false;
     88     }
     89 
     90     ProcessParsedPacket(rtp_header, payload_data, payload_size);
     91     stats_.UpdateStatistics(rtp_header);
     92   }
     93 
     94   if (!reports_are_scheduled_) {
     95     ScheduleNextRtcpReport();
     96     ScheduleNextCastMessage();
     97     reports_are_scheduled_ = true;
     98   }
     99 
    100   return true;
    101 }
    102 
    103 // static
    104 bool FrameReceiver::ParseSenderSsrc(const uint8* packet,
    105                                     size_t length,
    106                                     uint32* ssrc) {
    107   base::BigEndianReader big_endian_reader(
    108       reinterpret_cast<const char*>(packet), length);
    109   return big_endian_reader.Skip(8) && big_endian_reader.ReadU32(ssrc);
    110 }
    111 
    112 void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header,
    113                                         const uint8* payload_data,
    114                                         size_t payload_size) {
    115   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    116 
    117   const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
    118 
    119   frame_id_to_rtp_timestamp_[rtp_header.frame_id & 0xff] =
    120       rtp_header.rtp_timestamp;
    121   cast_environment_->Logging()->InsertPacketEvent(
    122       now, PACKET_RECEIVED, event_media_type_, rtp_header.rtp_timestamp,
    123       rtp_header.frame_id, rtp_header.packet_id, rtp_header.max_packet_id,
    124       payload_size);
    125 
    126   bool duplicate = false;
    127   const bool complete =
    128       framer_.InsertPacket(payload_data, payload_size, rtp_header, &duplicate);
    129 
    130   // Duplicate packets are ignored.
    131   if (duplicate)
    132     return;
    133 
    134   // Update lip-sync values upon receiving the first packet of each frame, or if
    135   // they have never been set yet.
    136   if (rtp_header.packet_id == 0 || lip_sync_reference_time_.is_null()) {
    137     RtpTimestamp fresh_sync_rtp;
    138     base::TimeTicks fresh_sync_reference;
    139     if (!rtcp_.GetLatestLipSyncTimes(&fresh_sync_rtp, &fresh_sync_reference)) {
    140       // HACK: The sender should have provided Sender Reports before the first
    141       // frame was sent.  However, the spec does not currently require this.
    142       // Therefore, when the data is missing, the local clock is used to
    143       // generate reference timestamps.
    144       VLOG(2) << "Lip sync info missing.  Falling-back to local clock.";
    145       fresh_sync_rtp = rtp_header.rtp_timestamp;
    146       fresh_sync_reference = now;
    147     }
    148     // |lip_sync_reference_time_| is always incremented according to the time
    149     // delta computed from the difference in RTP timestamps.  Then,
    150     // |lip_sync_drift_| accounts for clock drift and also smoothes-out any
    151     // sudden/discontinuous shifts in the series of reference time values.
    152     if (lip_sync_reference_time_.is_null()) {
    153       lip_sync_reference_time_ = fresh_sync_reference;
    154     } else {
    155       lip_sync_reference_time_ += RtpDeltaToTimeDelta(
    156           static_cast<int32>(fresh_sync_rtp - lip_sync_rtp_timestamp_),
    157           rtp_timebase_);
    158     }
    159     lip_sync_rtp_timestamp_ = fresh_sync_rtp;
    160     lip_sync_drift_.Update(
    161         now, fresh_sync_reference - lip_sync_reference_time_);
    162   }
    163 
    164   // Another frame is complete from a non-duplicate packet.  Attempt to emit
    165   // more frames to satisfy enqueued requests.
    166   if (complete)
    167     EmitAvailableEncodedFrames();
    168 }
    169 
    170 void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) {
    171   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    172 
    173   base::TimeTicks now = cast_environment_->Clock()->NowTicks();
    174   RtpTimestamp rtp_timestamp =
    175       frame_id_to_rtp_timestamp_[cast_message.ack_frame_id & 0xff];
    176   cast_environment_->Logging()->InsertFrameEvent(
    177       now, FRAME_ACK_SENT, event_media_type_,
    178       rtp_timestamp, cast_message.ack_frame_id);
    179 
    180   ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events;
    181   event_subscriber_.GetRtcpEventsAndReset(&rtcp_events);
    182   rtcp_.SendRtcpFromRtpReceiver(&cast_message, target_playout_delay_,
    183                                 &rtcp_events, NULL);
    184 }
    185 
    186 void FrameReceiver::EmitAvailableEncodedFrames() {
    187   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    188 
    189   while (!frame_request_queue_.empty()) {
    190     // Attempt to peek at the next completed frame from the |framer_|.
    191     // TODO(miu): We should only be peeking at the metadata, and not copying the
    192     // payload yet!  Or, at least, peek using a StringPiece instead of a copy.
    193     scoped_ptr<EncodedFrame> encoded_frame(
    194         new EncodedFrame());
    195     bool is_consecutively_next_frame = false;
    196     bool have_multiple_complete_frames = false;
    197     if (!framer_.GetEncodedFrame(encoded_frame.get(),
    198                                  &is_consecutively_next_frame,
    199                                  &have_multiple_complete_frames)) {
    200       VLOG(1) << "Wait for more packets to produce a completed frame.";
    201       return;  // ProcessParsedPacket() will invoke this method in the future.
    202     }
    203 
    204     const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
    205     const base::TimeTicks playout_time = GetPlayoutTime(*encoded_frame);
    206 
    207     // If we have multiple decodable frames, and the current frame is
    208     // too old, then skip it and decode the next frame instead.
    209     if (have_multiple_complete_frames && now > playout_time) {
    210       framer_.ReleaseFrame(encoded_frame->frame_id);
    211       continue;
    212     }
    213 
    214     // If |framer_| has a frame ready that is out of sequence, examine the
    215     // playout time to determine whether it's acceptable to continue, thereby
    216     // skipping one or more frames.  Skip if the missing frame wouldn't complete
    217     // playing before the start of playback of the available frame.
    218     if (!is_consecutively_next_frame) {
    219       // This assumes that decoding takes as long as playing, which might
    220       // not be true.
    221       const base::TimeTicks earliest_possible_end_time_of_missing_frame =
    222           now + expected_frame_duration_ * 2;
    223       if (earliest_possible_end_time_of_missing_frame < playout_time) {
    224         VLOG(1) << "Wait for next consecutive frame instead of skipping.";
    225         if (!is_waiting_for_consecutive_frame_) {
    226           is_waiting_for_consecutive_frame_ = true;
    227           cast_environment_->PostDelayedTask(
    228               CastEnvironment::MAIN,
    229               FROM_HERE,
    230               base::Bind(&FrameReceiver::EmitAvailableEncodedFramesAfterWaiting,
    231                          weak_factory_.GetWeakPtr()),
    232               playout_time - now);
    233         }
    234         return;
    235       }
    236     }
    237 
    238     // At this point, we have the complete next frame, or a decodable
    239     // frame from somewhere later in the stream, AND we have given up
    240     // on waiting for any frames in between, so now we can ACK the frame.
    241     framer_.AckFrame(encoded_frame->frame_id);
    242 
    243     // Decrypt the payload data in the frame, if crypto is being used.
    244     if (decryptor_.is_activated()) {
    245       std::string decrypted_data;
    246       if (!decryptor_.Decrypt(encoded_frame->frame_id,
    247                               encoded_frame->data,
    248                               &decrypted_data)) {
    249         // Decryption failed.  Give up on this frame.
    250         framer_.ReleaseFrame(encoded_frame->frame_id);
    251         continue;
    252       }
    253       encoded_frame->data.swap(decrypted_data);
    254     }
    255 
    256     // At this point, we have a decrypted EncodedFrame ready to be emitted.
    257     encoded_frame->reference_time = playout_time;
    258     framer_.ReleaseFrame(encoded_frame->frame_id);
    259     if (encoded_frame->new_playout_delay_ms) {
    260       target_playout_delay_ = base::TimeDelta::FromMilliseconds(
    261           encoded_frame->new_playout_delay_ms);
    262     }
    263     cast_environment_->PostTask(CastEnvironment::MAIN,
    264                                 FROM_HERE,
    265                                 base::Bind(&FrameReceiver::EmitOneFrame,
    266                                            weak_factory_.GetWeakPtr(),
    267                                            frame_request_queue_.front(),
    268                                            base::Passed(&encoded_frame)));
    269     frame_request_queue_.pop_front();
    270   }
    271 }
    272 
    273 void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() {
    274   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    275   DCHECK(is_waiting_for_consecutive_frame_);
    276   is_waiting_for_consecutive_frame_ = false;
    277   EmitAvailableEncodedFrames();
    278 }
    279 
    280 void FrameReceiver::EmitOneFrame(const ReceiveEncodedFrameCallback& callback,
    281                                  scoped_ptr<EncodedFrame> encoded_frame) const {
    282   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    283   if (!callback.is_null())
    284     callback.Run(encoded_frame.Pass());
    285 }
    286 
    287 base::TimeTicks FrameReceiver::GetPlayoutTime(const EncodedFrame& frame) const {
    288   base::TimeDelta target_playout_delay = target_playout_delay_;
    289   if (frame.new_playout_delay_ms) {
    290     target_playout_delay = base::TimeDelta::FromMilliseconds(
    291         frame.new_playout_delay_ms);
    292   }
    293   return lip_sync_reference_time_ +
    294       lip_sync_drift_.Current() +
    295       RtpDeltaToTimeDelta(
    296           static_cast<int32>(frame.rtp_timestamp - lip_sync_rtp_timestamp_),
    297           rtp_timebase_) +
    298       target_playout_delay;
    299 }
    300 
    301 void FrameReceiver::ScheduleNextCastMessage() {
    302   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    303   base::TimeTicks send_time;
    304   framer_.TimeToSendNextCastMessage(&send_time);
    305   base::TimeDelta time_to_send =
    306       send_time - cast_environment_->Clock()->NowTicks();
    307   time_to_send = std::max(
    308       time_to_send, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
    309   cast_environment_->PostDelayedTask(
    310       CastEnvironment::MAIN,
    311       FROM_HERE,
    312       base::Bind(&FrameReceiver::SendNextCastMessage,
    313                  weak_factory_.GetWeakPtr()),
    314       time_to_send);
    315 }
    316 
    317 void FrameReceiver::SendNextCastMessage() {
    318   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    319   framer_.SendCastMessage();  // Will only send a message if it is time.
    320   ScheduleNextCastMessage();
    321 }
    322 
    323 void FrameReceiver::ScheduleNextRtcpReport() {
    324   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    325   base::TimeDelta time_to_next = rtcp_interval_;
    326   time_to_next = std::max(
    327       time_to_next, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
    328 
    329   cast_environment_->PostDelayedTask(
    330       CastEnvironment::MAIN,
    331       FROM_HERE,
    332       base::Bind(&FrameReceiver::SendNextRtcpReport,
    333                  weak_factory_.GetWeakPtr()),
    334       time_to_next);
    335 }
    336 
    337 void FrameReceiver::SendNextRtcpReport() {
    338   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    339   rtcp_.SendRtcpFromRtpReceiver(NULL, base::TimeDelta(), NULL, &stats_);
    340   ScheduleNextRtcpReport();
    341 }
    342 
    343 }  // namespace cast
    344 }  // namespace media
    345