Home | History | Annotate | Download | only in nuplayer
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "StreamingSource"
     19 #include <utils/Log.h>
     20 
     21 #include "StreamingSource.h"
     22 
     23 #include "ATSParser.h"
     24 #include "AnotherPacketSource.h"
     25 #include "NuPlayerStreamListener.h"
     26 
     27 #include <media/stagefright/foundation/ABuffer.h>
     28 #include <media/stagefright/foundation/ADebug.h>
     29 #include <media/stagefright/foundation/AMessage.h>
     30 #include <media/stagefright/MediaSource.h>
     31 #include <media/stagefright/MetaData.h>
     32 #include <media/stagefright/Utils.h>
     33 
     34 namespace android {
     35 
     36 const int32_t kNumListenerQueuePackets = 80;
     37 
     38 NuPlayer::StreamingSource::StreamingSource(
     39         const sp<AMessage> &notify,
     40         const sp<IStreamSource> &source)
     41     : Source(notify),
     42       mSource(source),
     43       mFinalResult(OK),
     44       mBuffering(false) {
     45 }
     46 
     47 NuPlayer::StreamingSource::~StreamingSource() {
     48     if (mLooper != NULL) {
     49         mLooper->unregisterHandler(id());
     50         mLooper->stop();
     51     }
     52 }
     53 
     54 void NuPlayer::StreamingSource::prepareAsync() {
     55     if (mLooper == NULL) {
     56         mLooper = new ALooper;
     57         mLooper->setName("streaming");
     58         mLooper->start();
     59 
     60         mLooper->registerHandler(this);
     61     }
     62 
     63     notifyVideoSizeChanged();
     64     notifyFlagsChanged(0);
     65     notifyPrepared();
     66 }
     67 
     68 void NuPlayer::StreamingSource::start() {
     69     mStreamListener = new NuPlayerStreamListener(mSource, NULL);
     70 
     71     uint32_t sourceFlags = mSource->flags();
     72 
     73     uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE;
     74     if (sourceFlags & IStreamSource::kFlagAlignedVideoData) {
     75         parserFlags |= ATSParser::ALIGNED_VIDEO_DATA;
     76     }
     77 
     78     mTSParser = new ATSParser(parserFlags);
     79 
     80     mStreamListener->start();
     81 
     82     postReadBuffer();
     83 }
     84 
     85 status_t NuPlayer::StreamingSource::feedMoreTSData() {
     86     return postReadBuffer();
     87 }
     88 
     89 void NuPlayer::StreamingSource::onReadBuffer() {
     90     for (int32_t i = 0; i < kNumListenerQueuePackets; ++i) {
     91         char buffer[188];
     92         sp<AMessage> extra;
     93         ssize_t n = mStreamListener->read(buffer, sizeof(buffer), &extra);
     94 
     95         if (n == 0) {
     96             ALOGI("input data EOS reached.");
     97             mTSParser->signalEOS(ERROR_END_OF_STREAM);
     98             setError(ERROR_END_OF_STREAM);
     99             break;
    100         } else if (n == INFO_DISCONTINUITY) {
    101             int32_t type = ATSParser::DISCONTINUITY_TIME;
    102 
    103             int32_t mask;
    104             if (extra != NULL
    105                     && extra->findInt32(
    106                         IStreamListener::kKeyDiscontinuityMask, &mask)) {
    107                 if (mask == 0) {
    108                     ALOGE("Client specified an illegal discontinuity type.");
    109                     setError(ERROR_UNSUPPORTED);
    110                     break;
    111                 }
    112 
    113                 type = mask;
    114             }
    115 
    116             mTSParser->signalDiscontinuity(
    117                     (ATSParser::DiscontinuityType)type, extra);
    118         } else if (n < 0) {
    119             break;
    120         } else {
    121             if (buffer[0] == 0x00) {
    122                 // XXX legacy
    123 
    124                 if (extra == NULL) {
    125                     extra = new AMessage;
    126                 }
    127 
    128                 uint8_t type = buffer[1];
    129 
    130                 if (type & 2) {
    131                     int64_t mediaTimeUs;
    132                     memcpy(&mediaTimeUs, &buffer[2], sizeof(mediaTimeUs));
    133 
    134                     extra->setInt64(IStreamListener::kKeyMediaTimeUs, mediaTimeUs);
    135                 }
    136 
    137                 mTSParser->signalDiscontinuity(
    138                         ((type & 1) == 0)
    139                             ? ATSParser::DISCONTINUITY_TIME
    140                             : ATSParser::DISCONTINUITY_FORMATCHANGE,
    141                         extra);
    142             } else {
    143                 status_t err = mTSParser->feedTSPacket(buffer, sizeof(buffer));
    144 
    145                 if (err != OK) {
    146                     ALOGE("TS Parser returned error %d", err);
    147 
    148                     mTSParser->signalEOS(err);
    149                     setError(err);
    150                     break;
    151                 }
    152             }
    153         }
    154     }
    155 }
    156 
    157 status_t NuPlayer::StreamingSource::postReadBuffer() {
    158     {
    159         Mutex::Autolock _l(mBufferingLock);
    160         if (mFinalResult != OK) {
    161             return mFinalResult;
    162         }
    163         if (mBuffering) {
    164             return OK;
    165         }
    166         mBuffering = true;
    167     }
    168 
    169     (new AMessage(kWhatReadBuffer, this))->post();
    170     return OK;
    171 }
    172 
    173 bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
    174     // We're going to buffer at least 2 secs worth data on all tracks before
    175     // starting playback (both at startup and after a seek).
    176 
    177     static const int64_t kMinDurationUs = 2000000ll;
    178 
    179     sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
    180     sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);
    181 
    182     status_t err;
    183     int64_t durationUs;
    184     if (audioTrack != NULL
    185             && (durationUs = audioTrack->getBufferedDurationUs(&err))
    186                     < kMinDurationUs
    187             && err == OK) {
    188         ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
    189               durationUs / 1E6);
    190         return false;
    191     }
    192 
    193     if (videoTrack != NULL
    194             && (durationUs = videoTrack->getBufferedDurationUs(&err))
    195                     < kMinDurationUs
    196             && err == OK) {
    197         ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
    198               durationUs / 1E6);
    199         return false;
    200     }
    201 
    202     return true;
    203 }
    204 
    205 void NuPlayer::StreamingSource::setError(status_t err) {
    206     Mutex::Autolock _l(mBufferingLock);
    207     mFinalResult = err;
    208 }
    209 
    210 sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
    211     if (mTSParser == NULL) {
    212         return NULL;
    213     }
    214 
    215     sp<MediaSource> source = mTSParser->getSource(
    216             audio ? ATSParser::AUDIO : ATSParser::VIDEO);
    217 
    218     return static_cast<AnotherPacketSource *>(source.get());
    219 }
    220 
    221 sp<AMessage> NuPlayer::StreamingSource::getFormat(bool audio) {
    222     sp<AnotherPacketSource> source = getSource(audio);
    223 
    224     sp<AMessage> format = new AMessage;
    225     if (source == NULL) {
    226         format->setInt32("err", -EWOULDBLOCK);
    227         return format;
    228     }
    229 
    230     sp<MetaData> meta = source->getFormat();
    231     status_t err = convertMetaDataToMessage(meta, &format);
    232     if (err != OK) { // format may have been cleared on error
    233         format = new AMessage;
    234         format->setInt32("err", err);
    235     }
    236     return format;
    237 }
    238 
    239 status_t NuPlayer::StreamingSource::dequeueAccessUnit(
    240         bool audio, sp<ABuffer> *accessUnit) {
    241     sp<AnotherPacketSource> source = getSource(audio);
    242 
    243     if (source == NULL) {
    244         return -EWOULDBLOCK;
    245     }
    246 
    247     if (!haveSufficientDataOnAllTracks()) {
    248         postReadBuffer();
    249     }
    250 
    251     status_t finalResult;
    252     if (!source->hasBufferAvailable(&finalResult)) {
    253         return finalResult == OK ? -EWOULDBLOCK : finalResult;
    254     }
    255 
    256     status_t err = source->dequeueAccessUnit(accessUnit);
    257 
    258 #if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0
    259     if (err == OK) {
    260         int64_t timeUs;
    261         CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
    262         ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs);
    263     }
    264 #endif
    265 
    266     return err;
    267 }
    268 
    269 bool NuPlayer::StreamingSource::isRealTime() const {
    270     return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
    271 }
    272 
    273 void NuPlayer::StreamingSource::onMessageReceived(
    274         const sp<AMessage> &msg) {
    275     switch (msg->what()) {
    276         case kWhatReadBuffer:
    277         {
    278             onReadBuffer();
    279 
    280             {
    281                 Mutex::Autolock _l(mBufferingLock);
    282                 mBuffering = false;
    283             }
    284             break;
    285         }
    286         default:
    287         {
    288             TRESPASS();
    289         }
    290     }
    291 }
    292 
    293 
    294 }  // namespace android
    295 
    296