Home | History | Annotate | Download | only in receiver
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "media/cast/receiver/frame_receiver.h"
      6 
      7 #include <algorithm>
      8 
      9 #include "base/big_endian.h"
     10 #include "base/bind.h"
     11 #include "base/logging.h"
     12 #include "base/message_loop/message_loop.h"
     13 #include "media/cast/cast_environment.h"
     14 
     15 namespace {
     16 const int kMinSchedulingDelayMs = 1;
     17 }  // namespace
     18 
     19 namespace media {
     20 namespace cast {
     21 
     22 FrameReceiver::FrameReceiver(
     23     const scoped_refptr<CastEnvironment>& cast_environment,
     24     const FrameReceiverConfig& config,
     25     EventMediaType event_media_type,
     26     transport::PacedPacketSender* const packet_sender)
     27     : cast_environment_(cast_environment),
     28       packet_parser_(config.incoming_ssrc, config.rtp_payload_type),
     29       stats_(cast_environment->Clock()),
     30       event_media_type_(event_media_type),
     31       event_subscriber_(kReceiverRtcpEventHistorySize, event_media_type),
     32       rtp_timebase_(config.frequency),
     33       target_playout_delay_(
     34           base::TimeDelta::FromMilliseconds(config.rtp_max_delay_ms)),
     35       expected_frame_duration_(
     36           base::TimeDelta::FromSeconds(1) / config.max_frame_rate),
     37       reports_are_scheduled_(false),
     38       framer_(cast_environment->Clock(),
     39               this,
     40               config.incoming_ssrc,
     41               true,
     42               config.rtp_max_delay_ms * config.max_frame_rate / 1000),
     43       rtcp_(cast_environment_,
     44             NULL,
     45             NULL,
     46             packet_sender,
     47             &stats_,
     48             config.rtcp_mode,
     49             base::TimeDelta::FromMilliseconds(config.rtcp_interval),
     50             config.feedback_ssrc,
     51             config.incoming_ssrc,
     52             config.rtcp_c_name,
     53             event_media_type),
     54       is_waiting_for_consecutive_frame_(false),
     55       lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()),
     56       weak_factory_(this) {
     57   DCHECK_GT(config.rtp_max_delay_ms, 0);
     58   DCHECK_GT(config.max_frame_rate, 0);
     59   decryptor_.Initialize(config.aes_key, config.aes_iv_mask);
     60   rtcp_.SetTargetDelay(target_playout_delay_);
     61   cast_environment_->Logging()->AddRawEventSubscriber(&event_subscriber_);
     62   memset(frame_id_to_rtp_timestamp_, 0, sizeof(frame_id_to_rtp_timestamp_));
     63 }
     64 
     65 FrameReceiver::~FrameReceiver() {
     66   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
     67   cast_environment_->Logging()->RemoveRawEventSubscriber(&event_subscriber_);
     68 }
     69 
     70 void FrameReceiver::RequestEncodedFrame(
     71     const ReceiveEncodedFrameCallback& callback) {
     72   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
     73   frame_request_queue_.push_back(callback);
     74   EmitAvailableEncodedFrames();
     75 }
     76 
     77 bool FrameReceiver::ProcessPacket(scoped_ptr<Packet> packet) {
     78   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
     79 
     80   if (Rtcp::IsRtcpPacket(&packet->front(), packet->size())) {
     81     rtcp_.IncomingRtcpPacket(&packet->front(), packet->size());
     82   } else {
     83     RtpCastHeader rtp_header;
     84     const uint8* payload_data;
     85     size_t payload_size;
     86     if (!packet_parser_.ParsePacket(&packet->front(),
     87                                     packet->size(),
     88                                     &rtp_header,
     89                                     &payload_data,
     90                                     &payload_size)) {
     91       return false;
     92     }
     93 
     94     ProcessParsedPacket(rtp_header, payload_data, payload_size);
     95     stats_.UpdateStatistics(rtp_header);
     96   }
     97 
     98   if (!reports_are_scheduled_) {
     99     ScheduleNextRtcpReport();
    100     ScheduleNextCastMessage();
    101     reports_are_scheduled_ = true;
    102   }
    103 
    104   return true;
    105 }
    106 
    107 // static
    108 bool FrameReceiver::ParseSenderSsrc(const uint8* packet,
    109                                     size_t length,
    110                                     uint32* ssrc) {
    111   base::BigEndianReader big_endian_reader(
    112       reinterpret_cast<const char*>(packet), length);
    113   return big_endian_reader.Skip(8) && big_endian_reader.ReadU32(ssrc);
    114 }
    115 
    116 void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header,
    117                                         const uint8* payload_data,
    118                                         size_t payload_size) {
    119   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    120 
    121   const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
    122 
    123   frame_id_to_rtp_timestamp_[rtp_header.frame_id & 0xff] =
    124       rtp_header.rtp_timestamp;
    125   cast_environment_->Logging()->InsertPacketEvent(
    126       now, PACKET_RECEIVED, event_media_type_, rtp_header.rtp_timestamp,
    127       rtp_header.frame_id, rtp_header.packet_id, rtp_header.max_packet_id,
    128       payload_size);
    129 
    130   bool duplicate = false;
    131   const bool complete =
    132       framer_.InsertPacket(payload_data, payload_size, rtp_header, &duplicate);
    133 
    134   // Duplicate packets are ignored.
    135   if (duplicate)
    136     return;
    137 
    138   // Update lip-sync values upon receiving the first packet of each frame, or if
    139   // they have never been set yet.
    140   if (rtp_header.packet_id == 0 || lip_sync_reference_time_.is_null()) {
    141     RtpTimestamp fresh_sync_rtp;
    142     base::TimeTicks fresh_sync_reference;
    143     if (!rtcp_.GetLatestLipSyncTimes(&fresh_sync_rtp, &fresh_sync_reference)) {
    144       // HACK: The sender should have provided Sender Reports before the first
    145       // frame was sent.  However, the spec does not currently require this.
    146       // Therefore, when the data is missing, the local clock is used to
    147       // generate reference timestamps.
    148       VLOG(2) << "Lip sync info missing.  Falling-back to local clock.";
    149       fresh_sync_rtp = rtp_header.rtp_timestamp;
    150       fresh_sync_reference = now;
    151     }
    152     // |lip_sync_reference_time_| is always incremented according to the time
    153     // delta computed from the difference in RTP timestamps.  Then,
    154     // |lip_sync_drift_| accounts for clock drift and also smoothes-out any
    155     // sudden/discontinuous shifts in the series of reference time values.
    156     if (lip_sync_reference_time_.is_null()) {
    157       lip_sync_reference_time_ = fresh_sync_reference;
    158     } else {
    159       lip_sync_reference_time_ += RtpDeltaToTimeDelta(
    160           static_cast<int32>(fresh_sync_rtp - lip_sync_rtp_timestamp_),
    161           rtp_timebase_);
    162     }
    163     lip_sync_rtp_timestamp_ = fresh_sync_rtp;
    164     lip_sync_drift_.Update(
    165         now, fresh_sync_reference - lip_sync_reference_time_);
    166   }
    167 
    168   // Another frame is complete from a non-duplicate packet.  Attempt to emit
    169   // more frames to satisfy enqueued requests.
    170   if (complete)
    171     EmitAvailableEncodedFrames();
    172 }
    173 
    174 void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) {
    175   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    176 
    177   base::TimeTicks now = cast_environment_->Clock()->NowTicks();
    178   RtpTimestamp rtp_timestamp =
    179       frame_id_to_rtp_timestamp_[cast_message.ack_frame_id_ & 0xff];
    180   cast_environment_->Logging()->InsertFrameEvent(
    181       now, FRAME_ACK_SENT, event_media_type_,
    182       rtp_timestamp, cast_message.ack_frame_id_);
    183 
    184   ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events;
    185   event_subscriber_.GetRtcpEventsAndReset(&rtcp_events);
    186   rtcp_.SendRtcpFromRtpReceiver(&cast_message, &rtcp_events);
    187 }
    188 
    189 void FrameReceiver::EmitAvailableEncodedFrames() {
    190   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    191 
    192   while (!frame_request_queue_.empty()) {
    193     // Attempt to peek at the next completed frame from the |framer_|.
    194     // TODO(miu): We should only be peeking at the metadata, and not copying the
    195     // payload yet!  Or, at least, peek using a StringPiece instead of a copy.
    196     scoped_ptr<transport::EncodedFrame> encoded_frame(
    197         new transport::EncodedFrame());
    198     bool is_consecutively_next_frame = false;
    199     bool have_multiple_complete_frames = false;
    200     if (!framer_.GetEncodedFrame(encoded_frame.get(),
    201                                  &is_consecutively_next_frame,
    202                                  &have_multiple_complete_frames)) {
    203       VLOG(1) << "Wait for more packets to produce a completed frame.";
    204       return;  // ProcessParsedPacket() will invoke this method in the future.
    205     }
    206 
    207     const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
    208     const base::TimeTicks playout_time =
    209         GetPlayoutTime(encoded_frame->rtp_timestamp);
    210 
    211     // If we have multiple decodable frames, and the current frame is
    212     // too old, then skip it and decode the next frame instead.
    213     if (have_multiple_complete_frames && now > playout_time) {
    214       framer_.ReleaseFrame(encoded_frame->frame_id);
    215       continue;
    216     }
    217 
    218     // If |framer_| has a frame ready that is out of sequence, examine the
    219     // playout time to determine whether it's acceptable to continue, thereby
    220     // skipping one or more frames.  Skip if the missing frame wouldn't complete
    221     // playing before the start of playback of the available frame.
    222     if (!is_consecutively_next_frame) {
    223       // TODO(miu): Also account for expected decode time here?
    224       const base::TimeTicks earliest_possible_end_time_of_missing_frame =
    225           now + expected_frame_duration_;
    226       if (earliest_possible_end_time_of_missing_frame < playout_time) {
    227         VLOG(1) << "Wait for next consecutive frame instead of skipping.";
    228         if (!is_waiting_for_consecutive_frame_) {
    229           is_waiting_for_consecutive_frame_ = true;
    230           cast_environment_->PostDelayedTask(
    231               CastEnvironment::MAIN,
    232               FROM_HERE,
    233               base::Bind(&FrameReceiver::EmitAvailableEncodedFramesAfterWaiting,
    234                          weak_factory_.GetWeakPtr()),
    235               playout_time - now);
    236         }
    237         return;
    238       }
    239     }
    240 
    241     // Decrypt the payload data in the frame, if crypto is being used.
    242     if (decryptor_.initialized()) {
    243       std::string decrypted_data;
    244       if (!decryptor_.Decrypt(encoded_frame->frame_id,
    245                               encoded_frame->data,
    246                               &decrypted_data)) {
    247         // Decryption failed.  Give up on this frame.
    248         framer_.ReleaseFrame(encoded_frame->frame_id);
    249         continue;
    250       }
    251       encoded_frame->data.swap(decrypted_data);
    252     }
    253 
    254     // At this point, we have a decrypted EncodedFrame ready to be emitted.
    255     encoded_frame->reference_time = playout_time;
    256     framer_.ReleaseFrame(encoded_frame->frame_id);
    257     cast_environment_->PostTask(CastEnvironment::MAIN,
    258                                 FROM_HERE,
    259                                 base::Bind(frame_request_queue_.front(),
    260                                            base::Passed(&encoded_frame)));
    261     frame_request_queue_.pop_front();
    262   }
    263 }
    264 
    265 void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() {
    266   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    267   DCHECK(is_waiting_for_consecutive_frame_);
    268   is_waiting_for_consecutive_frame_ = false;
    269   EmitAvailableEncodedFrames();
    270 }
    271 
    272 base::TimeTicks FrameReceiver::GetPlayoutTime(uint32 rtp_timestamp) const {
    273   return lip_sync_reference_time_ +
    274       lip_sync_drift_.Current() +
    275       RtpDeltaToTimeDelta(
    276           static_cast<int32>(rtp_timestamp - lip_sync_rtp_timestamp_),
    277           rtp_timebase_) +
    278       target_playout_delay_;
    279 }
    280 
    281 void FrameReceiver::ScheduleNextCastMessage() {
    282   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    283   base::TimeTicks send_time;
    284   framer_.TimeToSendNextCastMessage(&send_time);
    285   base::TimeDelta time_to_send =
    286       send_time - cast_environment_->Clock()->NowTicks();
    287   time_to_send = std::max(
    288       time_to_send, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
    289   cast_environment_->PostDelayedTask(
    290       CastEnvironment::MAIN,
    291       FROM_HERE,
    292       base::Bind(&FrameReceiver::SendNextCastMessage,
    293                  weak_factory_.GetWeakPtr()),
    294       time_to_send);
    295 }
    296 
    297 void FrameReceiver::SendNextCastMessage() {
    298   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    299   framer_.SendCastMessage();  // Will only send a message if it is time.
    300   ScheduleNextCastMessage();
    301 }
    302 
    303 void FrameReceiver::ScheduleNextRtcpReport() {
    304   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    305   base::TimeDelta time_to_next = rtcp_.TimeToSendNextRtcpReport() -
    306                                  cast_environment_->Clock()->NowTicks();
    307 
    308   time_to_next = std::max(
    309       time_to_next, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
    310 
    311   cast_environment_->PostDelayedTask(
    312       CastEnvironment::MAIN,
    313       FROM_HERE,
    314       base::Bind(&FrameReceiver::SendNextRtcpReport,
    315                  weak_factory_.GetWeakPtr()),
    316       time_to_next);
    317 }
    318 
    319 void FrameReceiver::SendNextRtcpReport() {
    320   DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
    321   rtcp_.SendRtcpFromRtpReceiver(NULL, NULL);
    322   ScheduleNextRtcpReport();
    323 }
    324 
    325 }  // namespace cast
    326 }  // namespace media
    327