1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "media/cast/receiver/frame_receiver.h" 6 7 #include <algorithm> 8 9 #include "base/big_endian.h" 10 #include "base/bind.h" 11 #include "base/logging.h" 12 #include "base/message_loop/message_loop.h" 13 #include "media/cast/cast_environment.h" 14 15 namespace { 16 const int kMinSchedulingDelayMs = 1; 17 } // namespace 18 19 namespace media { 20 namespace cast { 21 22 FrameReceiver::FrameReceiver( 23 const scoped_refptr<CastEnvironment>& cast_environment, 24 const FrameReceiverConfig& config, 25 EventMediaType event_media_type, 26 PacedPacketSender* const packet_sender) 27 : cast_environment_(cast_environment), 28 packet_parser_(config.incoming_ssrc, config.rtp_payload_type), 29 stats_(cast_environment->Clock()), 30 event_media_type_(event_media_type), 31 event_subscriber_(kReceiverRtcpEventHistorySize, event_media_type), 32 rtp_timebase_(config.frequency), 33 target_playout_delay_( 34 base::TimeDelta::FromMilliseconds(config.rtp_max_delay_ms)), 35 expected_frame_duration_( 36 base::TimeDelta::FromSeconds(1) / config.max_frame_rate), 37 reports_are_scheduled_(false), 38 framer_(cast_environment->Clock(), 39 this, 40 config.incoming_ssrc, 41 true, 42 config.rtp_max_delay_ms * config.max_frame_rate / 1000), 43 rtcp_(RtcpCastMessageCallback(), 44 RtcpRttCallback(), 45 RtcpLogMessageCallback(), 46 cast_environment_->Clock(), 47 packet_sender, 48 config.feedback_ssrc, 49 config.incoming_ssrc), 50 is_waiting_for_consecutive_frame_(false), 51 lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()), 52 rtcp_interval_(base::TimeDelta::FromMilliseconds(config.rtcp_interval)), 53 weak_factory_(this) { 54 DCHECK_GT(config.rtp_max_delay_ms, 0); 55 DCHECK_GT(config.max_frame_rate, 0); 56 decryptor_.Initialize(config.aes_key, config.aes_iv_mask); 57 cast_environment_->Logging()->AddRawEventSubscriber(&event_subscriber_); 58 memset(frame_id_to_rtp_timestamp_, 0, sizeof(frame_id_to_rtp_timestamp_)); 59 } 60 61 FrameReceiver::~FrameReceiver() { 62 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 63 cast_environment_->Logging()->RemoveRawEventSubscriber(&event_subscriber_); 64 } 65 66 void FrameReceiver::RequestEncodedFrame( 67 const ReceiveEncodedFrameCallback& callback) { 68 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 69 frame_request_queue_.push_back(callback); 70 EmitAvailableEncodedFrames(); 71 } 72 73 bool FrameReceiver::ProcessPacket(scoped_ptr<Packet> packet) { 74 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 75 76 if (Rtcp::IsRtcpPacket(&packet->front(), packet->size())) { 77 rtcp_.IncomingRtcpPacket(&packet->front(), packet->size()); 78 } else { 79 RtpCastHeader rtp_header; 80 const uint8* payload_data; 81 size_t payload_size; 82 if (!packet_parser_.ParsePacket(&packet->front(), 83 packet->size(), 84 &rtp_header, 85 &payload_data, 86 &payload_size)) { 87 return false; 88 } 89 90 ProcessParsedPacket(rtp_header, payload_data, payload_size); 91 stats_.UpdateStatistics(rtp_header); 92 } 93 94 if (!reports_are_scheduled_) { 95 ScheduleNextRtcpReport(); 96 ScheduleNextCastMessage(); 97 reports_are_scheduled_ = true; 98 } 99 100 return true; 101 } 102 103 // static 104 bool FrameReceiver::ParseSenderSsrc(const uint8* packet, 105 size_t length, 106 uint32* ssrc) { 107 base::BigEndianReader big_endian_reader( 108 reinterpret_cast<const char*>(packet), length); 109 return big_endian_reader.Skip(8) && big_endian_reader.ReadU32(ssrc); 110 } 111 112 void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header, 113 const uint8* payload_data, 114 size_t payload_size) { 115 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 116 117 const base::TimeTicks now = cast_environment_->Clock()->NowTicks(); 118 119 frame_id_to_rtp_timestamp_[rtp_header.frame_id & 0xff] = 120 rtp_header.rtp_timestamp; 121 cast_environment_->Logging()->InsertPacketEvent( 122 now, PACKET_RECEIVED, event_media_type_, rtp_header.rtp_timestamp, 123 rtp_header.frame_id, rtp_header.packet_id, rtp_header.max_packet_id, 124 payload_size); 125 126 bool duplicate = false; 127 const bool complete = 128 framer_.InsertPacket(payload_data, payload_size, rtp_header, &duplicate); 129 130 // Duplicate packets are ignored. 131 if (duplicate) 132 return; 133 134 // Update lip-sync values upon receiving the first packet of each frame, or if 135 // they have never been set yet. 136 if (rtp_header.packet_id == 0 || lip_sync_reference_time_.is_null()) { 137 RtpTimestamp fresh_sync_rtp; 138 base::TimeTicks fresh_sync_reference; 139 if (!rtcp_.GetLatestLipSyncTimes(&fresh_sync_rtp, &fresh_sync_reference)) { 140 // HACK: The sender should have provided Sender Reports before the first 141 // frame was sent. However, the spec does not currently require this. 142 // Therefore, when the data is missing, the local clock is used to 143 // generate reference timestamps. 144 VLOG(2) << "Lip sync info missing. Falling-back to local clock."; 145 fresh_sync_rtp = rtp_header.rtp_timestamp; 146 fresh_sync_reference = now; 147 } 148 // |lip_sync_reference_time_| is always incremented according to the time 149 // delta computed from the difference in RTP timestamps. Then, 150 // |lip_sync_drift_| accounts for clock drift and also smoothes-out any 151 // sudden/discontinuous shifts in the series of reference time values. 152 if (lip_sync_reference_time_.is_null()) { 153 lip_sync_reference_time_ = fresh_sync_reference; 154 } else { 155 lip_sync_reference_time_ += RtpDeltaToTimeDelta( 156 static_cast<int32>(fresh_sync_rtp - lip_sync_rtp_timestamp_), 157 rtp_timebase_); 158 } 159 lip_sync_rtp_timestamp_ = fresh_sync_rtp; 160 lip_sync_drift_.Update( 161 now, fresh_sync_reference - lip_sync_reference_time_); 162 } 163 164 // Another frame is complete from a non-duplicate packet. Attempt to emit 165 // more frames to satisfy enqueued requests. 166 if (complete) 167 EmitAvailableEncodedFrames(); 168 } 169 170 void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) { 171 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 172 173 base::TimeTicks now = cast_environment_->Clock()->NowTicks(); 174 RtpTimestamp rtp_timestamp = 175 frame_id_to_rtp_timestamp_[cast_message.ack_frame_id & 0xff]; 176 cast_environment_->Logging()->InsertFrameEvent( 177 now, FRAME_ACK_SENT, event_media_type_, 178 rtp_timestamp, cast_message.ack_frame_id); 179 180 ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; 181 event_subscriber_.GetRtcpEventsAndReset(&rtcp_events); 182 rtcp_.SendRtcpFromRtpReceiver(&cast_message, target_playout_delay_, 183 &rtcp_events, NULL); 184 } 185 186 void FrameReceiver::EmitAvailableEncodedFrames() { 187 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 188 189 while (!frame_request_queue_.empty()) { 190 // Attempt to peek at the next completed frame from the |framer_|. 191 // TODO(miu): We should only be peeking at the metadata, and not copying the 192 // payload yet! Or, at least, peek using a StringPiece instead of a copy. 193 scoped_ptr<EncodedFrame> encoded_frame( 194 new EncodedFrame()); 195 bool is_consecutively_next_frame = false; 196 bool have_multiple_complete_frames = false; 197 if (!framer_.GetEncodedFrame(encoded_frame.get(), 198 &is_consecutively_next_frame, 199 &have_multiple_complete_frames)) { 200 VLOG(1) << "Wait for more packets to produce a completed frame."; 201 return; // ProcessParsedPacket() will invoke this method in the future. 202 } 203 204 const base::TimeTicks now = cast_environment_->Clock()->NowTicks(); 205 const base::TimeTicks playout_time = GetPlayoutTime(*encoded_frame); 206 207 // If we have multiple decodable frames, and the current frame is 208 // too old, then skip it and decode the next frame instead. 209 if (have_multiple_complete_frames && now > playout_time) { 210 framer_.ReleaseFrame(encoded_frame->frame_id); 211 continue; 212 } 213 214 // If |framer_| has a frame ready that is out of sequence, examine the 215 // playout time to determine whether it's acceptable to continue, thereby 216 // skipping one or more frames. Skip if the missing frame wouldn't complete 217 // playing before the start of playback of the available frame. 218 if (!is_consecutively_next_frame) { 219 // This assumes that decoding takes as long as playing, which might 220 // not be true. 221 const base::TimeTicks earliest_possible_end_time_of_missing_frame = 222 now + expected_frame_duration_ * 2; 223 if (earliest_possible_end_time_of_missing_frame < playout_time) { 224 VLOG(1) << "Wait for next consecutive frame instead of skipping."; 225 if (!is_waiting_for_consecutive_frame_) { 226 is_waiting_for_consecutive_frame_ = true; 227 cast_environment_->PostDelayedTask( 228 CastEnvironment::MAIN, 229 FROM_HERE, 230 base::Bind(&FrameReceiver::EmitAvailableEncodedFramesAfterWaiting, 231 weak_factory_.GetWeakPtr()), 232 playout_time - now); 233 } 234 return; 235 } 236 } 237 238 // At this point, we have the complete next frame, or a decodable 239 // frame from somewhere later in the stream, AND we have given up 240 // on waiting for any frames in between, so now we can ACK the frame. 241 framer_.AckFrame(encoded_frame->frame_id); 242 243 // Decrypt the payload data in the frame, if crypto is being used. 244 if (decryptor_.is_activated()) { 245 std::string decrypted_data; 246 if (!decryptor_.Decrypt(encoded_frame->frame_id, 247 encoded_frame->data, 248 &decrypted_data)) { 249 // Decryption failed. Give up on this frame. 250 framer_.ReleaseFrame(encoded_frame->frame_id); 251 continue; 252 } 253 encoded_frame->data.swap(decrypted_data); 254 } 255 256 // At this point, we have a decrypted EncodedFrame ready to be emitted. 257 encoded_frame->reference_time = playout_time; 258 framer_.ReleaseFrame(encoded_frame->frame_id); 259 if (encoded_frame->new_playout_delay_ms) { 260 target_playout_delay_ = base::TimeDelta::FromMilliseconds( 261 encoded_frame->new_playout_delay_ms); 262 } 263 cast_environment_->PostTask(CastEnvironment::MAIN, 264 FROM_HERE, 265 base::Bind(&FrameReceiver::EmitOneFrame, 266 weak_factory_.GetWeakPtr(), 267 frame_request_queue_.front(), 268 base::Passed(&encoded_frame))); 269 frame_request_queue_.pop_front(); 270 } 271 } 272 273 void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() { 274 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 275 DCHECK(is_waiting_for_consecutive_frame_); 276 is_waiting_for_consecutive_frame_ = false; 277 EmitAvailableEncodedFrames(); 278 } 279 280 void FrameReceiver::EmitOneFrame(const ReceiveEncodedFrameCallback& callback, 281 scoped_ptr<EncodedFrame> encoded_frame) const { 282 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 283 if (!callback.is_null()) 284 callback.Run(encoded_frame.Pass()); 285 } 286 287 base::TimeTicks FrameReceiver::GetPlayoutTime(const EncodedFrame& frame) const { 288 base::TimeDelta target_playout_delay = target_playout_delay_; 289 if (frame.new_playout_delay_ms) { 290 target_playout_delay = base::TimeDelta::FromMilliseconds( 291 frame.new_playout_delay_ms); 292 } 293 return lip_sync_reference_time_ + 294 lip_sync_drift_.Current() + 295 RtpDeltaToTimeDelta( 296 static_cast<int32>(frame.rtp_timestamp - lip_sync_rtp_timestamp_), 297 rtp_timebase_) + 298 target_playout_delay; 299 } 300 301 void FrameReceiver::ScheduleNextCastMessage() { 302 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 303 base::TimeTicks send_time; 304 framer_.TimeToSendNextCastMessage(&send_time); 305 base::TimeDelta time_to_send = 306 send_time - cast_environment_->Clock()->NowTicks(); 307 time_to_send = std::max( 308 time_to_send, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs)); 309 cast_environment_->PostDelayedTask( 310 CastEnvironment::MAIN, 311 FROM_HERE, 312 base::Bind(&FrameReceiver::SendNextCastMessage, 313 weak_factory_.GetWeakPtr()), 314 time_to_send); 315 } 316 317 void FrameReceiver::SendNextCastMessage() { 318 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 319 framer_.SendCastMessage(); // Will only send a message if it is time. 320 ScheduleNextCastMessage(); 321 } 322 323 void FrameReceiver::ScheduleNextRtcpReport() { 324 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 325 base::TimeDelta time_to_next = rtcp_interval_; 326 time_to_next = std::max( 327 time_to_next, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs)); 328 329 cast_environment_->PostDelayedTask( 330 CastEnvironment::MAIN, 331 FROM_HERE, 332 base::Bind(&FrameReceiver::SendNextRtcpReport, 333 weak_factory_.GetWeakPtr()), 334 time_to_next); 335 } 336 337 void FrameReceiver::SendNextRtcpReport() { 338 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 339 rtcp_.SendRtcpFromRtpReceiver(NULL, base::TimeDelta(), NULL, &stats_); 340 ScheduleNextRtcpReport(); 341 } 342 343 } // namespace cast 344 } // namespace media 345