1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "media/filters/decoder_stream.h" 6 7 #include "base/bind.h" 8 #include "base/callback_helpers.h" 9 #include "base/debug/trace_event.h" 10 #include "base/location.h" 11 #include "base/logging.h" 12 #include "base/single_thread_task_runner.h" 13 #include "media/base/audio_decoder.h" 14 #include "media/base/bind_to_current_loop.h" 15 #include "media/base/decoder_buffer.h" 16 #include "media/base/demuxer_stream.h" 17 #include "media/base/video_decoder.h" 18 #include "media/filters/decrypting_demuxer_stream.h" 19 20 namespace media { 21 22 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for 23 // templated classes such as this. 24 template <DemuxerStream::Type StreamType> 25 static const char* GetTraceString(); 26 27 #define FUNCTION_DVLOG(level) \ 28 DVLOG(level) << __FUNCTION__ << \ 29 "<" << DecoderStreamTraits<StreamType>::ToString() << ">" 30 31 template <> 32 const char* GetTraceString<DemuxerStream::VIDEO>() { 33 return "DecoderStream<VIDEO>::Decode"; 34 } 35 36 template <> 37 const char* GetTraceString<DemuxerStream::AUDIO>() { 38 return "DecoderStream<AUDIO>::Decode"; 39 } 40 41 template <DemuxerStream::Type StreamType> 42 DecoderStream<StreamType>::DecoderStream( 43 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 44 ScopedVector<Decoder> decoders, 45 const SetDecryptorReadyCB& set_decryptor_ready_cb) 46 : task_runner_(task_runner), 47 state_(STATE_UNINITIALIZED), 48 stream_(NULL), 49 decoder_selector_( 50 new DecoderSelector<StreamType>(task_runner, 51 decoders.Pass(), 52 set_decryptor_ready_cb)), 53 active_splice_(false), 54 pending_decode_requests_(0), 55 weak_factory_(this) {} 56 57 template <DemuxerStream::Type StreamType> 58 DecoderStream<StreamType>::~DecoderStream() { 59 DCHECK(state_ == STATE_UNINITIALIZED || state_ == STATE_STOPPED) << state_; 60 } 61 62 template <DemuxerStream::Type StreamType> 63 void DecoderStream<StreamType>::Initialize(DemuxerStream* stream, 64 bool low_delay, 65 const StatisticsCB& statistics_cb, 66 const InitCB& init_cb) { 67 FUNCTION_DVLOG(2); 68 DCHECK(task_runner_->BelongsToCurrentThread()); 69 DCHECK_EQ(state_, STATE_UNINITIALIZED) << state_; 70 DCHECK(init_cb_.is_null()); 71 DCHECK(!init_cb.is_null()); 72 73 statistics_cb_ = statistics_cb; 74 init_cb_ = init_cb; 75 stream_ = stream; 76 low_delay_ = low_delay; 77 78 state_ = STATE_INITIALIZING; 79 // TODO(xhwang): DecoderSelector only needs a config to select a decoder. 80 decoder_selector_->SelectDecoder( 81 stream, low_delay, 82 base::Bind(&DecoderStream<StreamType>::OnDecoderSelected, 83 weak_factory_.GetWeakPtr()), 84 base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady, 85 weak_factory_.GetWeakPtr())); 86 } 87 88 template <DemuxerStream::Type StreamType> 89 void DecoderStream<StreamType>::Read(const ReadCB& read_cb) { 90 FUNCTION_DVLOG(2); 91 DCHECK(task_runner_->BelongsToCurrentThread()); 92 DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_INITIALIZING && 93 state_ != STATE_STOPPED) << state_; 94 // No two reads in the flight at any time. 95 DCHECK(read_cb_.is_null()); 96 // No read during resetting or stopping process. 97 DCHECK(reset_cb_.is_null()); 98 DCHECK(stop_cb_.is_null()); 99 100 if (state_ == STATE_ERROR) { 101 task_runner_->PostTask( 102 FROM_HERE, base::Bind(read_cb, DECODE_ERROR, scoped_refptr<Output>())); 103 return; 104 } 105 106 if (state_ == STATE_END_OF_STREAM && ready_outputs_.empty()) { 107 task_runner_->PostTask( 108 FROM_HERE, base::Bind(read_cb, OK, StreamTraits::CreateEOSOutput())); 109 return; 110 } 111 112 if (!ready_outputs_.empty()) { 113 task_runner_->PostTask(FROM_HERE, 114 base::Bind(read_cb, OK, ready_outputs_.front())); 115 ready_outputs_.pop_front(); 116 } else { 117 read_cb_ = read_cb; 118 } 119 120 if (state_ == STATE_NORMAL && CanDecodeMore()) 121 ReadFromDemuxerStream(); 122 } 123 124 template <DemuxerStream::Type StreamType> 125 void DecoderStream<StreamType>::Reset(const base::Closure& closure) { 126 FUNCTION_DVLOG(2); 127 DCHECK(task_runner_->BelongsToCurrentThread()); 128 DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_STOPPED) << state_; 129 DCHECK(reset_cb_.is_null()); 130 DCHECK(stop_cb_.is_null()); 131 132 reset_cb_ = closure; 133 134 if (!read_cb_.is_null()) { 135 task_runner_->PostTask(FROM_HERE, base::Bind( 136 base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>())); 137 } 138 139 ready_outputs_.clear(); 140 141 // During decoder reinitialization, the Decoder does not need to be and 142 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder 143 // reinitialization. 144 if (state_ == STATE_REINITIALIZING_DECODER) 145 return; 146 147 // During pending demuxer read and when not using DecryptingDemuxerStream, 148 // the Decoder will be reset after demuxer read is returned 149 // (in OnBufferReady()). 150 if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_) 151 return; 152 153 if (decrypting_demuxer_stream_) { 154 decrypting_demuxer_stream_->Reset(base::Bind( 155 &DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr())); 156 return; 157 } 158 159 ResetDecoder(); 160 } 161 162 template <DemuxerStream::Type StreamType> 163 void DecoderStream<StreamType>::Stop(const base::Closure& closure) { 164 FUNCTION_DVLOG(2); 165 DCHECK(task_runner_->BelongsToCurrentThread()); 166 DCHECK_NE(state_, STATE_STOPPED) << state_; 167 DCHECK(stop_cb_.is_null()); 168 169 stop_cb_ = closure; 170 171 if (state_ == STATE_INITIALIZING) { 172 decoder_selector_->Abort(); 173 return; 174 } 175 176 DCHECK(init_cb_.is_null()); 177 178 // All pending callbacks will be dropped. 179 weak_factory_.InvalidateWeakPtrs(); 180 181 // Post callbacks to prevent reentrance into this object. 182 if (!read_cb_.is_null()) { 183 task_runner_->PostTask(FROM_HERE, base::Bind( 184 base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>())); 185 } 186 if (!reset_cb_.is_null()) 187 task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_)); 188 189 if (decrypting_demuxer_stream_) { 190 decrypting_demuxer_stream_->Stop(base::Bind( 191 &DecoderStream<StreamType>::StopDecoder, weak_factory_.GetWeakPtr())); 192 return; 193 } 194 195 // We may not have a |decoder_| if Stop() was called during initialization. 196 if (decoder_) { 197 StopDecoder(); 198 return; 199 } 200 201 state_ = STATE_STOPPED; 202 stream_ = NULL; 203 decoder_.reset(); 204 decrypting_demuxer_stream_.reset(); 205 task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&stop_cb_)); 206 } 207 208 template <DemuxerStream::Type StreamType> 209 bool DecoderStream<StreamType>::CanReadWithoutStalling() const { 210 DCHECK(task_runner_->BelongsToCurrentThread()); 211 return !ready_outputs_.empty() || decoder_->CanReadWithoutStalling(); 212 } 213 214 template <> 215 bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const { 216 DCHECK(task_runner_->BelongsToCurrentThread()); 217 return true; 218 } 219 220 template <DemuxerStream::Type StreamType> 221 int DecoderStream<StreamType>::GetMaxDecodeRequests() const { 222 return decoder_->GetMaxDecodeRequests(); 223 } 224 225 template <> 226 int DecoderStream<DemuxerStream::AUDIO>::GetMaxDecodeRequests() const { 227 return 1; 228 } 229 230 template <DemuxerStream::Type StreamType> 231 bool DecoderStream<StreamType>::CanDecodeMore() const { 232 DCHECK(task_runner_->BelongsToCurrentThread()); 233 234 // Limit total number of outputs stored in |ready_outputs_| and being decoded. 235 // It only makes sense to saturate decoder completely when output queue is 236 // empty. 237 int num_decodes = 238 static_cast<int>(ready_outputs_.size()) + pending_decode_requests_; 239 return num_decodes < GetMaxDecodeRequests(); 240 } 241 242 template <DemuxerStream::Type StreamType> 243 void DecoderStream<StreamType>::OnDecoderSelected( 244 scoped_ptr<Decoder> selected_decoder, 245 scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) { 246 FUNCTION_DVLOG(2); 247 DCHECK(task_runner_->BelongsToCurrentThread()); 248 DCHECK_EQ(state_, STATE_INITIALIZING) << state_; 249 DCHECK(!init_cb_.is_null()); 250 DCHECK(read_cb_.is_null()); 251 DCHECK(reset_cb_.is_null()); 252 253 decoder_selector_.reset(); 254 if (decrypting_demuxer_stream) 255 stream_ = decrypting_demuxer_stream.get(); 256 257 if (!selected_decoder) { 258 state_ = STATE_UNINITIALIZED; 259 StreamTraits::FinishInitialization( 260 base::ResetAndReturn(&init_cb_), selected_decoder.get(), stream_); 261 } else { 262 state_ = STATE_NORMAL; 263 decoder_ = selected_decoder.Pass(); 264 decrypting_demuxer_stream_ = decrypting_demuxer_stream.Pass(); 265 StreamTraits::FinishInitialization( 266 base::ResetAndReturn(&init_cb_), decoder_.get(), stream_); 267 } 268 269 // Stop() called during initialization. 270 if (!stop_cb_.is_null()) { 271 Stop(base::ResetAndReturn(&stop_cb_)); 272 return; 273 } 274 } 275 276 template <DemuxerStream::Type StreamType> 277 void DecoderStream<StreamType>::SatisfyRead( 278 Status status, 279 const scoped_refptr<Output>& output) { 280 DCHECK(!read_cb_.is_null()); 281 base::ResetAndReturn(&read_cb_).Run(status, output); 282 } 283 284 template <DemuxerStream::Type StreamType> 285 void DecoderStream<StreamType>::Decode( 286 const scoped_refptr<DecoderBuffer>& buffer) { 287 FUNCTION_DVLOG(2); 288 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_; 289 DCHECK_LT(pending_decode_requests_, GetMaxDecodeRequests()); 290 DCHECK(reset_cb_.is_null()); 291 DCHECK(stop_cb_.is_null()); 292 DCHECK(buffer); 293 294 int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size(); 295 296 TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this); 297 ++pending_decode_requests_; 298 decoder_->Decode(buffer, 299 base::Bind(&DecoderStream<StreamType>::OnDecodeDone, 300 weak_factory_.GetWeakPtr(), 301 buffer_size, 302 buffer->end_of_stream())); 303 } 304 305 template <DemuxerStream::Type StreamType> 306 void DecoderStream<StreamType>::FlushDecoder() { 307 Decode(DecoderBuffer::CreateEOSBuffer()); 308 } 309 310 template <DemuxerStream::Type StreamType> 311 void DecoderStream<StreamType>::OnDecodeDone(int buffer_size, 312 bool end_of_stream, 313 typename Decoder::Status status) { 314 FUNCTION_DVLOG(2) << status; 315 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || 316 state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR) 317 << state_; 318 DCHECK(stop_cb_.is_null()); 319 DCHECK_GT(pending_decode_requests_, 0); 320 321 --pending_decode_requests_; 322 323 TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this); 324 325 if (state_ == STATE_ERROR) { 326 DCHECK(read_cb_.is_null()); 327 return; 328 } 329 330 // Drop decoding result if Reset() was called during decoding. 331 // The resetting process will be handled when the decoder is reset. 332 if (!reset_cb_.is_null()) 333 return; 334 335 switch (status) { 336 case Decoder::kDecodeError: 337 case Decoder::kDecryptError: 338 state_ = STATE_ERROR; 339 ready_outputs_.clear(); 340 if (!read_cb_.is_null()) 341 SatisfyRead(DECODE_ERROR, NULL); 342 return; 343 344 case Decoder::kAborted: 345 // Decoder can return kAborted only when Reset is pending. 346 NOTREACHED(); 347 return; 348 349 case Decoder::kOk: 350 // Any successful decode counts! 351 if (buffer_size > 0) 352 StreamTraits::ReportStatistics(statistics_cb_, buffer_size); 353 354 if (state_ == STATE_NORMAL) { 355 if (end_of_stream) { 356 state_ = STATE_END_OF_STREAM; 357 if (ready_outputs_.empty() && !read_cb_.is_null()) 358 SatisfyRead(OK, StreamTraits::CreateEOSOutput()); 359 return; 360 } 361 362 if (CanDecodeMore()) 363 ReadFromDemuxerStream(); 364 return; 365 } 366 367 if (state_ == STATE_FLUSHING_DECODER && !pending_decode_requests_) 368 ReinitializeDecoder(); 369 return; 370 } 371 } 372 373 template <DemuxerStream::Type StreamType> 374 void DecoderStream<StreamType>::OnDecodeOutputReady( 375 const scoped_refptr<Output>& output) { 376 FUNCTION_DVLOG(2) << ": " << output->timestamp().InMilliseconds() << " ms"; 377 DCHECK(output); 378 DCHECK(!output->end_of_stream()); 379 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || 380 state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR) 381 << state_; 382 383 if (state_ == STATE_ERROR) { 384 DCHECK(read_cb_.is_null()); 385 return; 386 } 387 388 // Drop decoding result if Reset() was called during decoding. 389 // The resetting process will be handled when the decoder is reset. 390 if (!reset_cb_.is_null()) 391 return; 392 393 // TODO(xhwang): VideoDecoder doesn't need to return EOS after it's flushed. 394 // Fix all decoders and remove this block. 395 // Store decoded output. 396 ready_outputs_.push_back(output); 397 398 if (read_cb_.is_null()) 399 return; 400 401 // Satisfy outstanding read request, if any. 402 scoped_refptr<Output> read_result = ready_outputs_.front(); 403 ready_outputs_.pop_front(); 404 SatisfyRead(OK, output); 405 } 406 407 template <DemuxerStream::Type StreamType> 408 void DecoderStream<StreamType>::ReadFromDemuxerStream() { 409 FUNCTION_DVLOG(2); 410 DCHECK_EQ(state_, STATE_NORMAL) << state_; 411 DCHECK(CanDecodeMore()); 412 DCHECK(reset_cb_.is_null()); 413 DCHECK(stop_cb_.is_null()); 414 415 state_ = STATE_PENDING_DEMUXER_READ; 416 stream_->Read(base::Bind(&DecoderStream<StreamType>::OnBufferReady, 417 weak_factory_.GetWeakPtr())); 418 } 419 420 template <DemuxerStream::Type StreamType> 421 void DecoderStream<StreamType>::OnBufferReady( 422 DemuxerStream::Status status, 423 const scoped_refptr<DecoderBuffer>& buffer) { 424 FUNCTION_DVLOG(2) << ": " << status << ", " 425 << buffer->AsHumanReadableString(); 426 427 DCHECK(task_runner_->BelongsToCurrentThread()); 428 DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR || 429 state_ == STATE_STOPPED) 430 << state_; 431 DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status; 432 DCHECK(stop_cb_.is_null()); 433 434 // Decoding has been stopped (e.g due to an error). 435 if (state_ != STATE_PENDING_DEMUXER_READ) { 436 DCHECK(state_ == STATE_ERROR || state_ == STATE_STOPPED); 437 DCHECK(read_cb_.is_null()); 438 return; 439 } 440 441 state_ = STATE_NORMAL; 442 443 if (status == DemuxerStream::kConfigChanged) { 444 FUNCTION_DVLOG(2) << ": " << "ConfigChanged"; 445 DCHECK(stream_->SupportsConfigChanges()); 446 447 if (!config_change_observer_cb_.is_null()) 448 config_change_observer_cb_.Run(); 449 450 state_ = STATE_FLUSHING_DECODER; 451 if (!reset_cb_.is_null()) { 452 // If we are using DecryptingDemuxerStream, we already called DDS::Reset() 453 // which will continue the resetting process in it's callback. 454 if (!decrypting_demuxer_stream_) 455 Reset(base::ResetAndReturn(&reset_cb_)); 456 // Reinitialization will continue after Reset() is done. 457 } else { 458 FlushDecoder(); 459 } 460 return; 461 } 462 463 if (!reset_cb_.is_null()) { 464 // If we are using DecryptingDemuxerStream, we already called DDS::Reset() 465 // which will continue the resetting process in it's callback. 466 if (!decrypting_demuxer_stream_) 467 Reset(base::ResetAndReturn(&reset_cb_)); 468 return; 469 } 470 471 if (status == DemuxerStream::kAborted) { 472 if (!read_cb_.is_null()) 473 SatisfyRead(DEMUXER_READ_ABORTED, NULL); 474 return; 475 } 476 477 if (!splice_observer_cb_.is_null() && !buffer->end_of_stream()) { 478 const bool has_splice_ts = buffer->splice_timestamp() != kNoTimestamp(); 479 if (active_splice_ || has_splice_ts) { 480 splice_observer_cb_.Run(buffer->splice_timestamp()); 481 active_splice_ = has_splice_ts; 482 } 483 } 484 485 DCHECK(status == DemuxerStream::kOk) << status; 486 Decode(buffer); 487 488 // Read more data if the decoder supports multiple parallel decoding requests. 489 if (CanDecodeMore() && !buffer->end_of_stream()) 490 ReadFromDemuxerStream(); 491 } 492 493 template <DemuxerStream::Type StreamType> 494 void DecoderStream<StreamType>::ReinitializeDecoder() { 495 FUNCTION_DVLOG(2); 496 DCHECK(task_runner_->BelongsToCurrentThread()); 497 DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_; 498 DCHECK_EQ(pending_decode_requests_, 0); 499 500 DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig()); 501 state_ = STATE_REINITIALIZING_DECODER; 502 DecoderStreamTraits<StreamType>::Initialize( 503 decoder_.get(), 504 StreamTraits::GetDecoderConfig(*stream_), 505 low_delay_, 506 base::Bind(&DecoderStream<StreamType>::OnDecoderReinitialized, 507 weak_factory_.GetWeakPtr()), 508 base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady, 509 weak_factory_.GetWeakPtr())); 510 } 511 512 template <DemuxerStream::Type StreamType> 513 void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) { 514 FUNCTION_DVLOG(2); 515 DCHECK(task_runner_->BelongsToCurrentThread()); 516 DCHECK_EQ(state_, STATE_REINITIALIZING_DECODER) << state_; 517 DCHECK(stop_cb_.is_null()); 518 519 // ReinitializeDecoder() can be called in two cases: 520 // 1, Flushing decoder finished (see OnDecodeOutputReady()). 521 // 2, Reset() was called during flushing decoder (see OnDecoderReset()). 522 // Also, Reset() can be called during pending ReinitializeDecoder(). 523 // This function needs to handle them all! 524 525 state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR; 526 527 if (!reset_cb_.is_null()) { 528 base::ResetAndReturn(&reset_cb_).Run(); 529 return; 530 } 531 532 if (read_cb_.is_null()) 533 return; 534 535 if (state_ == STATE_ERROR) { 536 SatisfyRead(DECODE_ERROR, NULL); 537 return; 538 } 539 540 ReadFromDemuxerStream(); 541 } 542 543 template <DemuxerStream::Type StreamType> 544 void DecoderStream<StreamType>::ResetDecoder() { 545 FUNCTION_DVLOG(2); 546 DCHECK(task_runner_->BelongsToCurrentThread()); 547 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || 548 state_ == STATE_ERROR || state_ == STATE_END_OF_STREAM) << state_; 549 DCHECK(!reset_cb_.is_null()); 550 551 decoder_->Reset(base::Bind(&DecoderStream<StreamType>::OnDecoderReset, 552 weak_factory_.GetWeakPtr())); 553 } 554 555 template <DemuxerStream::Type StreamType> 556 void DecoderStream<StreamType>::OnDecoderReset() { 557 FUNCTION_DVLOG(2); 558 DCHECK(task_runner_->BelongsToCurrentThread()); 559 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER || 560 state_ == STATE_ERROR || state_ == STATE_END_OF_STREAM) << state_; 561 // If Reset() was called during pending read, read callback should be fired 562 // before the reset callback is fired. 563 DCHECK(read_cb_.is_null()); 564 DCHECK(!reset_cb_.is_null()); 565 DCHECK(stop_cb_.is_null()); 566 567 if (state_ != STATE_FLUSHING_DECODER) { 568 state_ = STATE_NORMAL; 569 active_splice_ = false; 570 base::ResetAndReturn(&reset_cb_).Run(); 571 return; 572 } 573 574 // The resetting process will be continued in OnDecoderReinitialized(). 575 ReinitializeDecoder(); 576 } 577 578 template <DemuxerStream::Type StreamType> 579 void DecoderStream<StreamType>::StopDecoder() { 580 FUNCTION_DVLOG(2); 581 DCHECK(task_runner_->BelongsToCurrentThread()); 582 DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_STOPPED) << state_; 583 DCHECK(!stop_cb_.is_null()); 584 585 state_ = STATE_STOPPED; 586 decoder_->Stop(); 587 stream_ = NULL; 588 decoder_.reset(); 589 decrypting_demuxer_stream_.reset(); 590 // Post |stop_cb_| because pending |read_cb_| and/or |reset_cb_| are also 591 // posted in Stop(). 592 task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&stop_cb_)); 593 } 594 595 template class DecoderStream<DemuxerStream::VIDEO>; 596 template class DecoderStream<DemuxerStream::AUDIO>; 597 598 } // namespace media 599