Home | History | Annotate | Download | only in filters
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "media/filters/decoder_stream.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/callback_helpers.h"
      9 #include "base/debug/trace_event.h"
     10 #include "base/location.h"
     11 #include "base/logging.h"
     12 #include "base/single_thread_task_runner.h"
     13 #include "media/base/audio_decoder.h"
     14 #include "media/base/bind_to_current_loop.h"
     15 #include "media/base/decoder_buffer.h"
     16 #include "media/base/demuxer_stream.h"
     17 #include "media/base/video_decoder.h"
     18 #include "media/filters/decrypting_demuxer_stream.h"
     19 
     20 namespace media {
     21 
     22 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
     23 // templated classes such as this.
     24 template <DemuxerStream::Type StreamType>
     25 static const char* GetTraceString();
     26 
     27 #define FUNCTION_DVLOG(level) \
     28   DVLOG(level) << __FUNCTION__ << \
     29   "<" << DecoderStreamTraits<StreamType>::ToString() << ">"
     30 
     31 template <>
     32 const char* GetTraceString<DemuxerStream::VIDEO>() {
     33   return "DecoderStream<VIDEO>::Decode";
     34 }
     35 
     36 template <>
     37 const char* GetTraceString<DemuxerStream::AUDIO>() {
     38   return "DecoderStream<AUDIO>::Decode";
     39 }
     40 
     41 template <DemuxerStream::Type StreamType>
     42 DecoderStream<StreamType>::DecoderStream(
     43     const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
     44     ScopedVector<Decoder> decoders,
     45     const SetDecryptorReadyCB& set_decryptor_ready_cb,
     46     const scoped_refptr<MediaLog>& media_log)
     47     : task_runner_(task_runner),
     48       media_log_(media_log),
     49       state_(STATE_UNINITIALIZED),
     50       stream_(NULL),
     51       low_delay_(false),
     52       decoder_selector_(
     53           new DecoderSelector<StreamType>(task_runner,
     54                                           decoders.Pass(),
     55                                           set_decryptor_ready_cb)),
     56       active_splice_(false),
     57       decoding_eos_(false),
     58       pending_decode_requests_(0),
     59       weak_factory_(this) {}
     60 
     61 template <DemuxerStream::Type StreamType>
     62 DecoderStream<StreamType>::~DecoderStream() {
     63   FUNCTION_DVLOG(2);
     64   DCHECK(task_runner_->BelongsToCurrentThread());
     65 
     66   decoder_selector_.reset();
     67 
     68   if (!init_cb_.is_null()) {
     69     task_runner_->PostTask(FROM_HERE,
     70                            base::Bind(base::ResetAndReturn(&init_cb_), false));
     71   }
     72   if (!read_cb_.is_null()) {
     73     task_runner_->PostTask(FROM_HERE, base::Bind(
     74         base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
     75   }
     76   if (!reset_cb_.is_null())
     77     task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_));
     78 
     79   stream_ = NULL;
     80   decoder_.reset();
     81   decrypting_demuxer_stream_.reset();
     82 }
     83 
     84 template <DemuxerStream::Type StreamType>
     85 void DecoderStream<StreamType>::Initialize(DemuxerStream* stream,
     86                                            bool low_delay,
     87                                            const StatisticsCB& statistics_cb,
     88                                            const InitCB& init_cb) {
     89   FUNCTION_DVLOG(2);
     90   DCHECK(task_runner_->BelongsToCurrentThread());
     91   DCHECK_EQ(state_, STATE_UNINITIALIZED) << state_;
     92   DCHECK(init_cb_.is_null());
     93   DCHECK(!init_cb.is_null());
     94 
     95   statistics_cb_ = statistics_cb;
     96   init_cb_ = init_cb;
     97   stream_ = stream;
     98   low_delay_ = low_delay;
     99 
    100   state_ = STATE_INITIALIZING;
    101   // TODO(xhwang): DecoderSelector only needs a config to select a decoder.
    102   decoder_selector_->SelectDecoder(
    103       stream, low_delay,
    104       base::Bind(&DecoderStream<StreamType>::OnDecoderSelected,
    105                  weak_factory_.GetWeakPtr()),
    106       base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady,
    107                  weak_factory_.GetWeakPtr()));
    108 }
    109 
    110 template <DemuxerStream::Type StreamType>
    111 void DecoderStream<StreamType>::Read(const ReadCB& read_cb) {
    112   FUNCTION_DVLOG(2);
    113   DCHECK(task_runner_->BelongsToCurrentThread());
    114   DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_INITIALIZING)
    115       << state_;
    116   // No two reads in the flight at any time.
    117   DCHECK(read_cb_.is_null());
    118   // No read during resetting or stopping process.
    119   DCHECK(reset_cb_.is_null());
    120 
    121   if (state_ == STATE_ERROR) {
    122     task_runner_->PostTask(
    123         FROM_HERE, base::Bind(read_cb, DECODE_ERROR, scoped_refptr<Output>()));
    124     return;
    125   }
    126 
    127   if (state_ == STATE_END_OF_STREAM && ready_outputs_.empty()) {
    128     task_runner_->PostTask(
    129         FROM_HERE, base::Bind(read_cb, OK, StreamTraits::CreateEOSOutput()));
    130     return;
    131   }
    132 
    133   if (!ready_outputs_.empty()) {
    134     task_runner_->PostTask(FROM_HERE,
    135                            base::Bind(read_cb, OK, ready_outputs_.front()));
    136     ready_outputs_.pop_front();
    137   } else {
    138     read_cb_ = read_cb;
    139   }
    140 
    141   if (state_ == STATE_NORMAL && CanDecodeMore())
    142     ReadFromDemuxerStream();
    143 }
    144 
    145 template <DemuxerStream::Type StreamType>
    146 void DecoderStream<StreamType>::Reset(const base::Closure& closure) {
    147   FUNCTION_DVLOG(2);
    148   DCHECK(task_runner_->BelongsToCurrentThread());
    149   DCHECK(state_ != STATE_UNINITIALIZED)<< state_;
    150   DCHECK(reset_cb_.is_null());
    151 
    152   reset_cb_ = closure;
    153 
    154   if (!read_cb_.is_null()) {
    155     task_runner_->PostTask(FROM_HERE, base::Bind(
    156         base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
    157   }
    158 
    159   ready_outputs_.clear();
    160 
    161   // During decoder reinitialization, the Decoder does not need to be and
    162   // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
    163   // reinitialization.
    164   if (state_ == STATE_REINITIALIZING_DECODER)
    165     return;
    166 
    167   // During pending demuxer read and when not using DecryptingDemuxerStream,
    168   // the Decoder will be reset after demuxer read is returned
    169   // (in OnBufferReady()).
    170   if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_)
    171     return;
    172 
    173   if (decrypting_demuxer_stream_) {
    174     decrypting_demuxer_stream_->Reset(base::Bind(
    175         &DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr()));
    176     return;
    177   }
    178 
    179   ResetDecoder();
    180 }
    181 
    182 template <DemuxerStream::Type StreamType>
    183 bool DecoderStream<StreamType>::CanReadWithoutStalling() const {
    184   DCHECK(task_runner_->BelongsToCurrentThread());
    185   return !ready_outputs_.empty() || decoder_->CanReadWithoutStalling();
    186 }
    187 
    188 template <>
    189 bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const {
    190   DCHECK(task_runner_->BelongsToCurrentThread());
    191   return true;
    192 }
    193 
    194 template <DemuxerStream::Type StreamType>
    195 int DecoderStream<StreamType>::GetMaxDecodeRequests() const {
    196   return decoder_->GetMaxDecodeRequests();
    197 }
    198 
    199 template <>
    200 int DecoderStream<DemuxerStream::AUDIO>::GetMaxDecodeRequests() const {
    201   return 1;
    202 }
    203 
    204 template <DemuxerStream::Type StreamType>
    205 bool DecoderStream<StreamType>::CanDecodeMore() const {
    206   DCHECK(task_runner_->BelongsToCurrentThread());
    207 
    208   // Limit total number of outputs stored in |ready_outputs_| and being decoded.
    209   // It only makes sense to saturate decoder completely when output queue is
    210   // empty.
    211   int num_decodes =
    212       static_cast<int>(ready_outputs_.size()) + pending_decode_requests_;
    213   return !decoding_eos_ && num_decodes < GetMaxDecodeRequests();
    214 }
    215 
    216 template <DemuxerStream::Type StreamType>
    217 void DecoderStream<StreamType>::OnDecoderSelected(
    218     scoped_ptr<Decoder> selected_decoder,
    219     scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) {
    220   FUNCTION_DVLOG(2);
    221   DCHECK(task_runner_->BelongsToCurrentThread());
    222   DCHECK_EQ(state_, STATE_INITIALIZING) << state_;
    223   DCHECK(!init_cb_.is_null());
    224   DCHECK(read_cb_.is_null());
    225   DCHECK(reset_cb_.is_null());
    226 
    227   decoder_selector_.reset();
    228   if (decrypting_demuxer_stream)
    229     stream_ = decrypting_demuxer_stream.get();
    230 
    231   if (!selected_decoder) {
    232     state_ = STATE_UNINITIALIZED;
    233     base::ResetAndReturn(&init_cb_).Run(false);
    234     return;
    235   }
    236 
    237   state_ = STATE_NORMAL;
    238   decoder_ = selected_decoder.Pass();
    239   decrypting_demuxer_stream_ = decrypting_demuxer_stream.Pass();
    240 
    241   const std::string stream_type = DecoderStreamTraits<StreamType>::ToString();
    242   media_log_->SetBooleanProperty((stream_type + "_dds").c_str(),
    243                                  decrypting_demuxer_stream_);
    244   media_log_->SetStringProperty((stream_type + "_decoder").c_str(),
    245                                 decoder_->GetDisplayName());
    246 
    247   if (StreamTraits::NeedsBitstreamConversion(decoder_.get()))
    248     stream_->EnableBitstreamConverter();
    249   base::ResetAndReturn(&init_cb_).Run(true);
    250 }
    251 
    252 template <DemuxerStream::Type StreamType>
    253 void DecoderStream<StreamType>::SatisfyRead(
    254     Status status,
    255     const scoped_refptr<Output>& output) {
    256   DCHECK(!read_cb_.is_null());
    257   base::ResetAndReturn(&read_cb_).Run(status, output);
    258 }
    259 
    260 template <DemuxerStream::Type StreamType>
    261 void DecoderStream<StreamType>::Decode(
    262     const scoped_refptr<DecoderBuffer>& buffer) {
    263   FUNCTION_DVLOG(2);
    264   DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_;
    265   DCHECK_LT(pending_decode_requests_, GetMaxDecodeRequests());
    266   DCHECK(reset_cb_.is_null());
    267   DCHECK(buffer.get());
    268 
    269   int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size();
    270 
    271   TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this);
    272 
    273   if (buffer->end_of_stream())
    274     decoding_eos_ = true;
    275 
    276   ++pending_decode_requests_;
    277   decoder_->Decode(buffer,
    278                    base::Bind(&DecoderStream<StreamType>::OnDecodeDone,
    279                               weak_factory_.GetWeakPtr(),
    280                               buffer_size,
    281                               buffer->end_of_stream()));
    282 }
    283 
    284 template <DemuxerStream::Type StreamType>
    285 void DecoderStream<StreamType>::FlushDecoder() {
    286   Decode(DecoderBuffer::CreateEOSBuffer());
    287 }
    288 
    289 template <DemuxerStream::Type StreamType>
    290 void DecoderStream<StreamType>::OnDecodeDone(int buffer_size,
    291                                              bool end_of_stream,
    292                                              typename Decoder::Status status) {
    293   FUNCTION_DVLOG(2) << status;
    294   DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
    295          state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
    296       << state_;
    297   DCHECK_GT(pending_decode_requests_, 0);
    298 
    299   --pending_decode_requests_;
    300 
    301   TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this);
    302 
    303   if (end_of_stream)
    304     decoding_eos_ = false;
    305 
    306   if (state_ == STATE_ERROR) {
    307     DCHECK(read_cb_.is_null());
    308     return;
    309   }
    310 
    311   // Drop decoding result if Reset() was called during decoding.
    312   // The resetting process will be handled when the decoder is reset.
    313   if (!reset_cb_.is_null())
    314     return;
    315 
    316   switch (status) {
    317     case Decoder::kDecodeError:
    318     case Decoder::kDecryptError:
    319       state_ = STATE_ERROR;
    320       ready_outputs_.clear();
    321       if (!read_cb_.is_null())
    322         SatisfyRead(DECODE_ERROR, NULL);
    323       return;
    324 
    325     case Decoder::kAborted:
    326       // Decoder can return kAborted only when Reset is pending.
    327       NOTREACHED();
    328       return;
    329 
    330     case Decoder::kOk:
    331       // Any successful decode counts!
    332       if (buffer_size > 0)
    333         StreamTraits::ReportStatistics(statistics_cb_, buffer_size);
    334 
    335       if (state_ == STATE_NORMAL) {
    336         if (end_of_stream) {
    337           state_ = STATE_END_OF_STREAM;
    338           if (ready_outputs_.empty() && !read_cb_.is_null())
    339             SatisfyRead(OK, StreamTraits::CreateEOSOutput());
    340           return;
    341         }
    342 
    343         if (CanDecodeMore())
    344           ReadFromDemuxerStream();
    345         return;
    346       }
    347 
    348       if (state_ == STATE_FLUSHING_DECODER && !pending_decode_requests_)
    349         ReinitializeDecoder();
    350       return;
    351   }
    352 }
    353 
    354 template <DemuxerStream::Type StreamType>
    355 void DecoderStream<StreamType>::OnDecodeOutputReady(
    356     const scoped_refptr<Output>& output) {
    357   FUNCTION_DVLOG(2) << ": " << output->timestamp().InMilliseconds() << " ms";
    358   DCHECK(output.get());
    359   DCHECK(!output->end_of_stream());
    360   DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
    361          state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
    362       << state_;
    363 
    364   if (state_ == STATE_ERROR) {
    365     DCHECK(read_cb_.is_null());
    366     return;
    367   }
    368 
    369   // Drop decoding result if Reset() was called during decoding.
    370   // The resetting process will be handled when the decoder is reset.
    371   if (!reset_cb_.is_null())
    372     return;
    373 
    374   // TODO(xhwang): VideoDecoder doesn't need to return EOS after it's flushed.
    375   // Fix all decoders and remove this block.
    376   // Store decoded output.
    377   ready_outputs_.push_back(output);
    378 
    379   if (read_cb_.is_null())
    380     return;
    381 
    382   // Satisfy outstanding read request, if any.
    383   scoped_refptr<Output> read_result = ready_outputs_.front();
    384   ready_outputs_.pop_front();
    385   SatisfyRead(OK, output);
    386 }
    387 
    388 template <DemuxerStream::Type StreamType>
    389 void DecoderStream<StreamType>::ReadFromDemuxerStream() {
    390   FUNCTION_DVLOG(2);
    391   DCHECK_EQ(state_, STATE_NORMAL) << state_;
    392   DCHECK(CanDecodeMore());
    393   DCHECK(reset_cb_.is_null());
    394 
    395   state_ = STATE_PENDING_DEMUXER_READ;
    396   stream_->Read(base::Bind(&DecoderStream<StreamType>::OnBufferReady,
    397                            weak_factory_.GetWeakPtr()));
    398 }
    399 
    400 template <DemuxerStream::Type StreamType>
    401 void DecoderStream<StreamType>::OnBufferReady(
    402     DemuxerStream::Status status,
    403     const scoped_refptr<DecoderBuffer>& buffer) {
    404   FUNCTION_DVLOG(2) << ": " << status << ", "
    405                     << (buffer.get() ? buffer->AsHumanReadableString()
    406                                      : "NULL");
    407 
    408   DCHECK(task_runner_->BelongsToCurrentThread());
    409   DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
    410       << state_;
    411   DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status;
    412 
    413   // Decoding has been stopped (e.g due to an error).
    414   if (state_ != STATE_PENDING_DEMUXER_READ) {
    415     DCHECK(state_ == STATE_ERROR);
    416     DCHECK(read_cb_.is_null());
    417     return;
    418   }
    419 
    420   state_ = STATE_NORMAL;
    421 
    422   if (status == DemuxerStream::kConfigChanged) {
    423     FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
    424     DCHECK(stream_->SupportsConfigChanges());
    425 
    426     if (!config_change_observer_cb_.is_null())
    427       config_change_observer_cb_.Run();
    428 
    429     state_ = STATE_FLUSHING_DECODER;
    430     if (!reset_cb_.is_null()) {
    431       // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
    432       // which will continue the resetting process in it's callback.
    433       if (!decrypting_demuxer_stream_)
    434         Reset(base::ResetAndReturn(&reset_cb_));
    435       // Reinitialization will continue after Reset() is done.
    436     } else {
    437       FlushDecoder();
    438     }
    439     return;
    440   }
    441 
    442   if (!reset_cb_.is_null()) {
    443     // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
    444     // which will continue the resetting process in it's callback.
    445     if (!decrypting_demuxer_stream_)
    446       Reset(base::ResetAndReturn(&reset_cb_));
    447     return;
    448   }
    449 
    450   if (status == DemuxerStream::kAborted) {
    451     if (!read_cb_.is_null())
    452       SatisfyRead(DEMUXER_READ_ABORTED, NULL);
    453     return;
    454   }
    455 
    456   if (!splice_observer_cb_.is_null() && !buffer->end_of_stream()) {
    457     const bool has_splice_ts = buffer->splice_timestamp() != kNoTimestamp();
    458     if (active_splice_ || has_splice_ts) {
    459       splice_observer_cb_.Run(buffer->splice_timestamp());
    460       active_splice_ = has_splice_ts;
    461     }
    462   }
    463 
    464   DCHECK(status == DemuxerStream::kOk) << status;
    465   Decode(buffer);
    466 
    467   // Read more data if the decoder supports multiple parallel decoding requests.
    468   if (CanDecodeMore())
    469     ReadFromDemuxerStream();
    470 }
    471 
    472 template <DemuxerStream::Type StreamType>
    473 void DecoderStream<StreamType>::ReinitializeDecoder() {
    474   FUNCTION_DVLOG(2);
    475   DCHECK(task_runner_->BelongsToCurrentThread());
    476   DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_;
    477   DCHECK_EQ(pending_decode_requests_, 0);
    478 
    479   DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig());
    480   state_ = STATE_REINITIALIZING_DECODER;
    481   DecoderStreamTraits<StreamType>::Initialize(
    482       decoder_.get(),
    483       StreamTraits::GetDecoderConfig(*stream_),
    484       low_delay_,
    485       base::Bind(&DecoderStream<StreamType>::OnDecoderReinitialized,
    486                  weak_factory_.GetWeakPtr()),
    487       base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady,
    488                  weak_factory_.GetWeakPtr()));
    489 }
    490 
    491 template <DemuxerStream::Type StreamType>
    492 void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) {
    493   FUNCTION_DVLOG(2);
    494   DCHECK(task_runner_->BelongsToCurrentThread());
    495   DCHECK_EQ(state_, STATE_REINITIALIZING_DECODER) << state_;
    496 
    497   // ReinitializeDecoder() can be called in two cases:
    498   // 1, Flushing decoder finished (see OnDecodeOutputReady()).
    499   // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
    500   // Also, Reset() can be called during pending ReinitializeDecoder().
    501   // This function needs to handle them all!
    502 
    503   state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR;
    504 
    505   if (!reset_cb_.is_null()) {
    506     base::ResetAndReturn(&reset_cb_).Run();
    507     return;
    508   }
    509 
    510   if (read_cb_.is_null())
    511     return;
    512 
    513   if (state_ == STATE_ERROR) {
    514     SatisfyRead(DECODE_ERROR, NULL);
    515     return;
    516   }
    517 
    518   ReadFromDemuxerStream();
    519 }
    520 
    521 template <DemuxerStream::Type StreamType>
    522 void DecoderStream<StreamType>::ResetDecoder() {
    523   FUNCTION_DVLOG(2);
    524   DCHECK(task_runner_->BelongsToCurrentThread());
    525   DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
    526          state_ == STATE_ERROR || state_ == STATE_END_OF_STREAM) << state_;
    527   DCHECK(!reset_cb_.is_null());
    528 
    529   decoder_->Reset(base::Bind(&DecoderStream<StreamType>::OnDecoderReset,
    530                              weak_factory_.GetWeakPtr()));
    531 }
    532 
    533 template <DemuxerStream::Type StreamType>
    534 void DecoderStream<StreamType>::OnDecoderReset() {
    535   FUNCTION_DVLOG(2);
    536   DCHECK(task_runner_->BelongsToCurrentThread());
    537   DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
    538          state_ == STATE_ERROR || state_ == STATE_END_OF_STREAM) << state_;
    539   // If Reset() was called during pending read, read callback should be fired
    540   // before the reset callback is fired.
    541   DCHECK(read_cb_.is_null());
    542   DCHECK(!reset_cb_.is_null());
    543 
    544   if (state_ != STATE_FLUSHING_DECODER) {
    545     state_ = STATE_NORMAL;
    546     active_splice_ = false;
    547     base::ResetAndReturn(&reset_cb_).Run();
    548     return;
    549   }
    550 
    551   // The resetting process will be continued in OnDecoderReinitialized().
    552   ReinitializeDecoder();
    553 }
    554 
    555 template class DecoderStream<DemuxerStream::VIDEO>;
    556 template class DecoderStream<DemuxerStream::AUDIO>;
    557 
    558 }  // namespace media
    559