1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "media/cast/receiver/frame_receiver.h" 6 7 #include <algorithm> 8 9 #include "base/big_endian.h" 10 #include "base/bind.h" 11 #include "base/logging.h" 12 #include "base/message_loop/message_loop.h" 13 #include "media/cast/cast_environment.h" 14 15 namespace { 16 const int kMinSchedulingDelayMs = 1; 17 } // namespace 18 19 namespace media { 20 namespace cast { 21 22 FrameReceiver::FrameReceiver( 23 const scoped_refptr<CastEnvironment>& cast_environment, 24 const FrameReceiverConfig& config, 25 EventMediaType event_media_type, 26 transport::PacedPacketSender* const packet_sender) 27 : cast_environment_(cast_environment), 28 packet_parser_(config.incoming_ssrc, config.rtp_payload_type), 29 stats_(cast_environment->Clock()), 30 event_media_type_(event_media_type), 31 event_subscriber_(kReceiverRtcpEventHistorySize, event_media_type), 32 rtp_timebase_(config.frequency), 33 target_playout_delay_( 34 base::TimeDelta::FromMilliseconds(config.rtp_max_delay_ms)), 35 expected_frame_duration_( 36 base::TimeDelta::FromSeconds(1) / config.max_frame_rate), 37 reports_are_scheduled_(false), 38 framer_(cast_environment->Clock(), 39 this, 40 config.incoming_ssrc, 41 true, 42 config.rtp_max_delay_ms * config.max_frame_rate / 1000), 43 rtcp_(cast_environment_, 44 NULL, 45 NULL, 46 packet_sender, 47 &stats_, 48 config.rtcp_mode, 49 base::TimeDelta::FromMilliseconds(config.rtcp_interval), 50 config.feedback_ssrc, 51 config.incoming_ssrc, 52 config.rtcp_c_name, 53 event_media_type), 54 is_waiting_for_consecutive_frame_(false), 55 lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()), 56 weak_factory_(this) { 57 DCHECK_GT(config.rtp_max_delay_ms, 0); 58 DCHECK_GT(config.max_frame_rate, 0); 59 decryptor_.Initialize(config.aes_key, config.aes_iv_mask); 60 rtcp_.SetTargetDelay(target_playout_delay_); 61 cast_environment_->Logging()->AddRawEventSubscriber(&event_subscriber_); 62 memset(frame_id_to_rtp_timestamp_, 0, sizeof(frame_id_to_rtp_timestamp_)); 63 } 64 65 FrameReceiver::~FrameReceiver() { 66 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 67 cast_environment_->Logging()->RemoveRawEventSubscriber(&event_subscriber_); 68 } 69 70 void FrameReceiver::RequestEncodedFrame( 71 const ReceiveEncodedFrameCallback& callback) { 72 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 73 frame_request_queue_.push_back(callback); 74 EmitAvailableEncodedFrames(); 75 } 76 77 bool FrameReceiver::ProcessPacket(scoped_ptr<Packet> packet) { 78 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 79 80 if (Rtcp::IsRtcpPacket(&packet->front(), packet->size())) { 81 rtcp_.IncomingRtcpPacket(&packet->front(), packet->size()); 82 } else { 83 RtpCastHeader rtp_header; 84 const uint8* payload_data; 85 size_t payload_size; 86 if (!packet_parser_.ParsePacket(&packet->front(), 87 packet->size(), 88 &rtp_header, 89 &payload_data, 90 &payload_size)) { 91 return false; 92 } 93 94 ProcessParsedPacket(rtp_header, payload_data, payload_size); 95 stats_.UpdateStatistics(rtp_header); 96 } 97 98 if (!reports_are_scheduled_) { 99 ScheduleNextRtcpReport(); 100 ScheduleNextCastMessage(); 101 reports_are_scheduled_ = true; 102 } 103 104 return true; 105 } 106 107 // static 108 bool FrameReceiver::ParseSenderSsrc(const uint8* packet, 109 size_t length, 110 uint32* ssrc) { 111 base::BigEndianReader big_endian_reader( 112 reinterpret_cast<const char*>(packet), length); 113 return big_endian_reader.Skip(8) && big_endian_reader.ReadU32(ssrc); 114 } 115 116 void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header, 117 const uint8* payload_data, 118 size_t payload_size) { 119 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 120 121 const base::TimeTicks now = cast_environment_->Clock()->NowTicks(); 122 123 frame_id_to_rtp_timestamp_[rtp_header.frame_id & 0xff] = 124 rtp_header.rtp_timestamp; 125 cast_environment_->Logging()->InsertPacketEvent( 126 now, PACKET_RECEIVED, event_media_type_, rtp_header.rtp_timestamp, 127 rtp_header.frame_id, rtp_header.packet_id, rtp_header.max_packet_id, 128 payload_size); 129 130 bool duplicate = false; 131 const bool complete = 132 framer_.InsertPacket(payload_data, payload_size, rtp_header, &duplicate); 133 134 // Duplicate packets are ignored. 135 if (duplicate) 136 return; 137 138 // Update lip-sync values upon receiving the first packet of each frame, or if 139 // they have never been set yet. 140 if (rtp_header.packet_id == 0 || lip_sync_reference_time_.is_null()) { 141 RtpTimestamp fresh_sync_rtp; 142 base::TimeTicks fresh_sync_reference; 143 if (!rtcp_.GetLatestLipSyncTimes(&fresh_sync_rtp, &fresh_sync_reference)) { 144 // HACK: The sender should have provided Sender Reports before the first 145 // frame was sent. However, the spec does not currently require this. 146 // Therefore, when the data is missing, the local clock is used to 147 // generate reference timestamps. 148 VLOG(2) << "Lip sync info missing. Falling-back to local clock."; 149 fresh_sync_rtp = rtp_header.rtp_timestamp; 150 fresh_sync_reference = now; 151 } 152 // |lip_sync_reference_time_| is always incremented according to the time 153 // delta computed from the difference in RTP timestamps. Then, 154 // |lip_sync_drift_| accounts for clock drift and also smoothes-out any 155 // sudden/discontinuous shifts in the series of reference time values. 156 if (lip_sync_reference_time_.is_null()) { 157 lip_sync_reference_time_ = fresh_sync_reference; 158 } else { 159 lip_sync_reference_time_ += RtpDeltaToTimeDelta( 160 static_cast<int32>(fresh_sync_rtp - lip_sync_rtp_timestamp_), 161 rtp_timebase_); 162 } 163 lip_sync_rtp_timestamp_ = fresh_sync_rtp; 164 lip_sync_drift_.Update( 165 now, fresh_sync_reference - lip_sync_reference_time_); 166 } 167 168 // Another frame is complete from a non-duplicate packet. Attempt to emit 169 // more frames to satisfy enqueued requests. 170 if (complete) 171 EmitAvailableEncodedFrames(); 172 } 173 174 void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) { 175 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 176 177 base::TimeTicks now = cast_environment_->Clock()->NowTicks(); 178 RtpTimestamp rtp_timestamp = 179 frame_id_to_rtp_timestamp_[cast_message.ack_frame_id_ & 0xff]; 180 cast_environment_->Logging()->InsertFrameEvent( 181 now, FRAME_ACK_SENT, event_media_type_, 182 rtp_timestamp, cast_message.ack_frame_id_); 183 184 ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; 185 event_subscriber_.GetRtcpEventsAndReset(&rtcp_events); 186 rtcp_.SendRtcpFromRtpReceiver(&cast_message, &rtcp_events); 187 } 188 189 void FrameReceiver::EmitAvailableEncodedFrames() { 190 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 191 192 while (!frame_request_queue_.empty()) { 193 // Attempt to peek at the next completed frame from the |framer_|. 194 // TODO(miu): We should only be peeking at the metadata, and not copying the 195 // payload yet! Or, at least, peek using a StringPiece instead of a copy. 196 scoped_ptr<transport::EncodedFrame> encoded_frame( 197 new transport::EncodedFrame()); 198 bool is_consecutively_next_frame = false; 199 bool have_multiple_complete_frames = false; 200 if (!framer_.GetEncodedFrame(encoded_frame.get(), 201 &is_consecutively_next_frame, 202 &have_multiple_complete_frames)) { 203 VLOG(1) << "Wait for more packets to produce a completed frame."; 204 return; // ProcessParsedPacket() will invoke this method in the future. 205 } 206 207 const base::TimeTicks now = cast_environment_->Clock()->NowTicks(); 208 const base::TimeTicks playout_time = 209 GetPlayoutTime(encoded_frame->rtp_timestamp); 210 211 // If we have multiple decodable frames, and the current frame is 212 // too old, then skip it and decode the next frame instead. 213 if (have_multiple_complete_frames && now > playout_time) { 214 framer_.ReleaseFrame(encoded_frame->frame_id); 215 continue; 216 } 217 218 // If |framer_| has a frame ready that is out of sequence, examine the 219 // playout time to determine whether it's acceptable to continue, thereby 220 // skipping one or more frames. Skip if the missing frame wouldn't complete 221 // playing before the start of playback of the available frame. 222 if (!is_consecutively_next_frame) { 223 // TODO(miu): Also account for expected decode time here? 224 const base::TimeTicks earliest_possible_end_time_of_missing_frame = 225 now + expected_frame_duration_; 226 if (earliest_possible_end_time_of_missing_frame < playout_time) { 227 VLOG(1) << "Wait for next consecutive frame instead of skipping."; 228 if (!is_waiting_for_consecutive_frame_) { 229 is_waiting_for_consecutive_frame_ = true; 230 cast_environment_->PostDelayedTask( 231 CastEnvironment::MAIN, 232 FROM_HERE, 233 base::Bind(&FrameReceiver::EmitAvailableEncodedFramesAfterWaiting, 234 weak_factory_.GetWeakPtr()), 235 playout_time - now); 236 } 237 return; 238 } 239 } 240 241 // Decrypt the payload data in the frame, if crypto is being used. 242 if (decryptor_.initialized()) { 243 std::string decrypted_data; 244 if (!decryptor_.Decrypt(encoded_frame->frame_id, 245 encoded_frame->data, 246 &decrypted_data)) { 247 // Decryption failed. Give up on this frame. 248 framer_.ReleaseFrame(encoded_frame->frame_id); 249 continue; 250 } 251 encoded_frame->data.swap(decrypted_data); 252 } 253 254 // At this point, we have a decrypted EncodedFrame ready to be emitted. 255 encoded_frame->reference_time = playout_time; 256 framer_.ReleaseFrame(encoded_frame->frame_id); 257 cast_environment_->PostTask(CastEnvironment::MAIN, 258 FROM_HERE, 259 base::Bind(frame_request_queue_.front(), 260 base::Passed(&encoded_frame))); 261 frame_request_queue_.pop_front(); 262 } 263 } 264 265 void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() { 266 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 267 DCHECK(is_waiting_for_consecutive_frame_); 268 is_waiting_for_consecutive_frame_ = false; 269 EmitAvailableEncodedFrames(); 270 } 271 272 base::TimeTicks FrameReceiver::GetPlayoutTime(uint32 rtp_timestamp) const { 273 return lip_sync_reference_time_ + 274 lip_sync_drift_.Current() + 275 RtpDeltaToTimeDelta( 276 static_cast<int32>(rtp_timestamp - lip_sync_rtp_timestamp_), 277 rtp_timebase_) + 278 target_playout_delay_; 279 } 280 281 void FrameReceiver::ScheduleNextCastMessage() { 282 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 283 base::TimeTicks send_time; 284 framer_.TimeToSendNextCastMessage(&send_time); 285 base::TimeDelta time_to_send = 286 send_time - cast_environment_->Clock()->NowTicks(); 287 time_to_send = std::max( 288 time_to_send, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs)); 289 cast_environment_->PostDelayedTask( 290 CastEnvironment::MAIN, 291 FROM_HERE, 292 base::Bind(&FrameReceiver::SendNextCastMessage, 293 weak_factory_.GetWeakPtr()), 294 time_to_send); 295 } 296 297 void FrameReceiver::SendNextCastMessage() { 298 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 299 framer_.SendCastMessage(); // Will only send a message if it is time. 300 ScheduleNextCastMessage(); 301 } 302 303 void FrameReceiver::ScheduleNextRtcpReport() { 304 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 305 base::TimeDelta time_to_next = rtcp_.TimeToSendNextRtcpReport() - 306 cast_environment_->Clock()->NowTicks(); 307 308 time_to_next = std::max( 309 time_to_next, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs)); 310 311 cast_environment_->PostDelayedTask( 312 CastEnvironment::MAIN, 313 FROM_HERE, 314 base::Bind(&FrameReceiver::SendNextRtcpReport, 315 weak_factory_.GetWeakPtr()), 316 time_to_next); 317 } 318 319 void FrameReceiver::SendNextRtcpReport() { 320 DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); 321 rtcp_.SendRtcpFromRtpReceiver(NULL, NULL); 322 ScheduleNextRtcpReport(); 323 } 324 325 } // namespace cast 326 } // namespace media 327