Home | History | Annotate | Download | only in mpeg2ts
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "AnotherPacketSource"
     19 
     20 #include "AnotherPacketSource.h"
     21 
     22 #include "include/avc_utils.h"
     23 
     24 #include <media/stagefright/foundation/ABuffer.h>
     25 #include <media/stagefright/foundation/ADebug.h>
     26 #include <media/stagefright/foundation/AMessage.h>
     27 #include <media/stagefright/foundation/AString.h>
     28 #include <media/stagefright/foundation/hexdump.h>
     29 #include <media/stagefright/MediaBuffer.h>
     30 #include <media/stagefright/MediaDefs.h>
     31 #include <media/stagefright/MetaData.h>
     32 #include <media/stagefright/Utils.h>
     33 #include <utils/Vector.h>
     34 
     35 #include <inttypes.h>
     36 
     37 namespace android {
     38 
     39 const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
     40 
     41 AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
     42     : mIsAudio(false),
     43       mIsVideo(false),
     44       mEnabled(true),
     45       mFormat(NULL),
     46       mLastQueuedTimeUs(0),
     47       mEOSResult(OK),
     48       mLatestEnqueuedMeta(NULL),
     49       mLatestDequeuedMeta(NULL) {
     50     setFormat(meta);
     51 
     52     mDiscontinuitySegments.push_back(DiscontinuitySegment());
     53 }
     54 
     55 void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
     56     if (mFormat != NULL) {
     57         // Only allowed to be set once. Requires explicit clear to reset.
     58         return;
     59     }
     60 
     61     mIsAudio = false;
     62     mIsVideo = false;
     63 
     64     if (meta == NULL) {
     65         return;
     66     }
     67 
     68     mFormat = meta;
     69     const char *mime;
     70     CHECK(meta->findCString(kKeyMIMEType, &mime));
     71 
     72     if (!strncasecmp("audio/", mime, 6)) {
     73         mIsAudio = true;
     74     } else  if (!strncasecmp("video/", mime, 6)) {
     75         mIsVideo = true;
     76     } else {
     77         CHECK(!strncasecmp("text/", mime, 5) || !strncasecmp("application/", mime, 12));
     78     }
     79 }
     80 
     81 AnotherPacketSource::~AnotherPacketSource() {
     82 }
     83 
     84 status_t AnotherPacketSource::start(MetaData * /* params */) {
     85     return OK;
     86 }
     87 
     88 status_t AnotherPacketSource::stop() {
     89     return OK;
     90 }
     91 
     92 sp<MetaData> AnotherPacketSource::getFormat() {
     93     Mutex::Autolock autoLock(mLock);
     94     if (mFormat != NULL) {
     95         return mFormat;
     96     }
     97 
     98     List<sp<ABuffer> >::iterator it = mBuffers.begin();
     99     while (it != mBuffers.end()) {
    100         sp<ABuffer> buffer = *it;
    101         int32_t discontinuity;
    102         if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
    103             sp<RefBase> object;
    104             if (buffer->meta()->findObject("format", &object)) {
    105                 setFormat(static_cast<MetaData*>(object.get()));
    106                 return mFormat;
    107             }
    108         }
    109 
    110         ++it;
    111     }
    112     return NULL;
    113 }
    114 
    115 status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
    116     buffer->clear();
    117 
    118     Mutex::Autolock autoLock(mLock);
    119     while (mEOSResult == OK && mBuffers.empty()) {
    120         mCondition.wait(mLock);
    121     }
    122 
    123     if (!mBuffers.empty()) {
    124         *buffer = *mBuffers.begin();
    125         mBuffers.erase(mBuffers.begin());
    126 
    127         int32_t discontinuity;
    128         if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
    129             if (wasFormatChange(discontinuity)) {
    130                 mFormat.clear();
    131             }
    132 
    133             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
    134             // CHECK(!mDiscontinuitySegments.empty());
    135             return INFO_DISCONTINUITY;
    136         }
    137 
    138         // CHECK(!mDiscontinuitySegments.empty());
    139         DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
    140 
    141         int64_t timeUs;
    142         mLatestDequeuedMeta = (*buffer)->meta()->dup();
    143         CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs));
    144         if (timeUs > seg.mMaxDequeTimeUs) {
    145             seg.mMaxDequeTimeUs = timeUs;
    146         }
    147 
    148         sp<RefBase> object;
    149         if ((*buffer)->meta()->findObject("format", &object)) {
    150             setFormat(static_cast<MetaData*>(object.get()));
    151         }
    152 
    153         return OK;
    154     }
    155 
    156     return mEOSResult;
    157 }
    158 
    159 void AnotherPacketSource::requeueAccessUnit(const sp<ABuffer> &buffer) {
    160     // TODO: update corresponding book keeping info.
    161     Mutex::Autolock autoLock(mLock);
    162     mBuffers.push_front(buffer);
    163 }
    164 
    165 status_t AnotherPacketSource::read(
    166         MediaBuffer **out, const ReadOptions *) {
    167     *out = NULL;
    168 
    169     Mutex::Autolock autoLock(mLock);
    170     while (mEOSResult == OK && mBuffers.empty()) {
    171         mCondition.wait(mLock);
    172     }
    173 
    174     if (!mBuffers.empty()) {
    175 
    176         const sp<ABuffer> buffer = *mBuffers.begin();
    177         mBuffers.erase(mBuffers.begin());
    178 
    179         int32_t discontinuity;
    180         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
    181             if (wasFormatChange(discontinuity)) {
    182                 mFormat.clear();
    183             }
    184 
    185             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
    186             // CHECK(!mDiscontinuitySegments.empty());
    187             return INFO_DISCONTINUITY;
    188         }
    189 
    190         mLatestDequeuedMeta = buffer->meta()->dup();
    191 
    192         sp<RefBase> object;
    193         if (buffer->meta()->findObject("format", &object)) {
    194             setFormat(static_cast<MetaData*>(object.get()));
    195         }
    196 
    197         int64_t timeUs;
    198         CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
    199         // CHECK(!mDiscontinuitySegments.empty());
    200         DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
    201         if (timeUs > seg.mMaxDequeTimeUs) {
    202             seg.mMaxDequeTimeUs = timeUs;
    203         }
    204 
    205         MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
    206 
    207         mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
    208 
    209         int32_t isSync;
    210         if (buffer->meta()->findInt32("isSync", &isSync)) {
    211             mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync);
    212         }
    213 
    214         sp<ABuffer> sei;
    215         if (buffer->meta()->findBuffer("sei", &sei) && sei != NULL) {
    216             mediaBuffer->meta_data()->setData(kKeySEI, 0, sei->data(), sei->size());
    217         }
    218 
    219         sp<ABuffer> mpegUserData;
    220         if (buffer->meta()->findBuffer("mpegUserData", &mpegUserData) && mpegUserData != NULL) {
    221             mediaBuffer->meta_data()->setData(
    222                     kKeyMpegUserData, 0, mpegUserData->data(), mpegUserData->size());
    223         }
    224 
    225         *out = mediaBuffer;
    226         return OK;
    227     }
    228 
    229     return mEOSResult;
    230 }
    231 
    232 bool AnotherPacketSource::wasFormatChange(
    233         int32_t discontinuityType) const {
    234     if (mIsAudio) {
    235         return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
    236     }
    237 
    238     if (mIsVideo) {
    239         return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
    240     }
    241 
    242     return false;
    243 }
    244 
    245 void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
    246     int32_t damaged;
    247     if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
    248         // LOG(VERBOSE) << "discarding damaged AU";
    249         return;
    250     }
    251 
    252     Mutex::Autolock autoLock(mLock);
    253     mBuffers.push_back(buffer);
    254     mCondition.signal();
    255 
    256     int32_t discontinuity;
    257     if (buffer->meta()->findInt32("discontinuity", &discontinuity)){
    258         ALOGV("queueing a discontinuity with queueAccessUnit");
    259 
    260         mLastQueuedTimeUs = 0ll;
    261         mEOSResult = OK;
    262         mLatestEnqueuedMeta = NULL;
    263 
    264         mDiscontinuitySegments.push_back(DiscontinuitySegment());
    265         return;
    266     }
    267 
    268     int64_t lastQueuedTimeUs;
    269     CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
    270     mLastQueuedTimeUs = lastQueuedTimeUs;
    271     ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
    272             mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
    273 
    274     // CHECK(!mDiscontinuitySegments.empty());
    275     DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end());
    276     if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) {
    277         tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs;
    278     }
    279     if (tailSeg.mMaxDequeTimeUs == -1) {
    280         tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs;
    281     }
    282 
    283     if (mLatestEnqueuedMeta == NULL) {
    284         mLatestEnqueuedMeta = buffer->meta()->dup();
    285     } else {
    286         int64_t latestTimeUs = 0;
    287         int64_t frameDeltaUs = 0;
    288         CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
    289         if (lastQueuedTimeUs > latestTimeUs) {
    290             mLatestEnqueuedMeta = buffer->meta()->dup();
    291             frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
    292             mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
    293         } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
    294             // For B frames
    295             frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
    296             mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
    297         }
    298     }
    299 }
    300 
    301 void AnotherPacketSource::clear() {
    302     Mutex::Autolock autoLock(mLock);
    303 
    304     mBuffers.clear();
    305     mEOSResult = OK;
    306 
    307     mDiscontinuitySegments.clear();
    308     mDiscontinuitySegments.push_back(DiscontinuitySegment());
    309 
    310     mFormat = NULL;
    311     mLatestEnqueuedMeta = NULL;
    312 }
    313 
    314 void AnotherPacketSource::queueDiscontinuity(
    315         ATSParser::DiscontinuityType type,
    316         const sp<AMessage> &extra,
    317         bool discard) {
    318     Mutex::Autolock autoLock(mLock);
    319 
    320     if (discard) {
    321         // Leave only discontinuities in the queue.
    322         List<sp<ABuffer> >::iterator it = mBuffers.begin();
    323         while (it != mBuffers.end()) {
    324             sp<ABuffer> oldBuffer = *it;
    325 
    326             int32_t oldDiscontinuityType;
    327             if (!oldBuffer->meta()->findInt32(
    328                         "discontinuity", &oldDiscontinuityType)) {
    329                 it = mBuffers.erase(it);
    330                 continue;
    331             }
    332 
    333             ++it;
    334         }
    335 
    336         for (List<DiscontinuitySegment>::iterator it2 = mDiscontinuitySegments.begin();
    337                 it2 != mDiscontinuitySegments.end();
    338                 ++it2) {
    339             DiscontinuitySegment &seg = *it2;
    340             seg.clear();
    341         }
    342 
    343     }
    344 
    345     mEOSResult = OK;
    346     mLastQueuedTimeUs = 0;
    347     mLatestEnqueuedMeta = NULL;
    348 
    349     if (type == ATSParser::DISCONTINUITY_NONE) {
    350         return;
    351     }
    352 
    353     mDiscontinuitySegments.push_back(DiscontinuitySegment());
    354 
    355     sp<ABuffer> buffer = new ABuffer(0);
    356     buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
    357     buffer->meta()->setMessage("extra", extra);
    358 
    359     mBuffers.push_back(buffer);
    360     mCondition.signal();
    361 }
    362 
    363 void AnotherPacketSource::signalEOS(status_t result) {
    364     CHECK(result != OK);
    365 
    366     Mutex::Autolock autoLock(mLock);
    367     mEOSResult = result;
    368     mCondition.signal();
    369 }
    370 
    371 bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
    372     Mutex::Autolock autoLock(mLock);
    373     *finalResult = OK;
    374     if (!mEnabled) {
    375         return false;
    376     }
    377     if (!mBuffers.empty()) {
    378         return true;
    379     }
    380 
    381     *finalResult = mEOSResult;
    382     return false;
    383 }
    384 
    385 bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
    386     Mutex::Autolock autoLock(mLock);
    387     *finalResult = OK;
    388     if (!mEnabled) {
    389         return false;
    390     }
    391     List<sp<ABuffer> >::iterator it;
    392     for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
    393         int32_t discontinuity;
    394         if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
    395             return true;
    396         }
    397     }
    398 
    399     *finalResult = mEOSResult;
    400     return false;
    401 }
    402 
    403 size_t AnotherPacketSource::getAvailableBufferCount(status_t *finalResult) {
    404     Mutex::Autolock autoLock(mLock);
    405 
    406     *finalResult = OK;
    407     if (!mEnabled) {
    408         return 0;
    409     }
    410     if (!mBuffers.empty()) {
    411         return mBuffers.size();
    412     }
    413     *finalResult = mEOSResult;
    414     return 0;
    415 }
    416 
    417 int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
    418     Mutex::Autolock autoLock(mLock);
    419     *finalResult = mEOSResult;
    420 
    421     int64_t durationUs = 0;
    422     for (List<DiscontinuitySegment>::iterator it = mDiscontinuitySegments.begin();
    423             it != mDiscontinuitySegments.end();
    424             ++it) {
    425         const DiscontinuitySegment &seg = *it;
    426         // dequeued access units should be a subset of enqueued access units
    427         // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs);
    428         durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs);
    429     }
    430 
    431     return durationUs;
    432 }
    433 
    434 status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
    435     *timeUs = 0;
    436 
    437     Mutex::Autolock autoLock(mLock);
    438 
    439     if (mBuffers.empty()) {
    440         return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
    441     }
    442 
    443     sp<ABuffer> buffer = *mBuffers.begin();
    444     CHECK(buffer->meta()->findInt64("timeUs", timeUs));
    445 
    446     return OK;
    447 }
    448 
    449 bool AnotherPacketSource::isFinished(int64_t duration) const {
    450     if (duration > 0) {
    451         int64_t diff = duration - mLastQueuedTimeUs;
    452         if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
    453             ALOGV("Detecting EOS due to near end");
    454             return true;
    455         }
    456     }
    457     return (mEOSResult != OK);
    458 }
    459 
    460 sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
    461     Mutex::Autolock autoLock(mLock);
    462     return mLatestEnqueuedMeta;
    463 }
    464 
    465 sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
    466     Mutex::Autolock autoLock(mLock);
    467     return mLatestDequeuedMeta;
    468 }
    469 
    470 void AnotherPacketSource::enable(bool enable) {
    471     Mutex::Autolock autoLock(mLock);
    472     mEnabled = enable;
    473 }
    474 
    475 /*
    476  * returns the sample meta that's delayUs after queue head
    477  * (NULL if such sample is unavailable)
    478  */
    479 sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
    480     Mutex::Autolock autoLock(mLock);
    481     int64_t firstUs = -1;
    482     int64_t lastUs = -1;
    483     int64_t durationUs = 0;
    484 
    485     List<sp<ABuffer> >::iterator it;
    486     for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
    487         const sp<ABuffer> &buffer = *it;
    488         int32_t discontinuity;
    489         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
    490             durationUs += lastUs - firstUs;
    491             firstUs = -1;
    492             lastUs = -1;
    493             continue;
    494         }
    495         int64_t timeUs;
    496         if (buffer->meta()->findInt64("timeUs", &timeUs)) {
    497             if (firstUs < 0) {
    498                 firstUs = timeUs;
    499             }
    500             if (lastUs < 0 || timeUs > lastUs) {
    501                 lastUs = timeUs;
    502             }
    503             if (durationUs + (lastUs - firstUs) >= delayUs) {
    504                 return buffer->meta();
    505             }
    506         }
    507     }
    508     return NULL;
    509 }
    510 
    511 /*
    512  * removes samples with time equal or after meta
    513  */
    514 void AnotherPacketSource::trimBuffersAfterMeta(
    515         const sp<AMessage> &meta) {
    516     if (meta == NULL) {
    517         ALOGW("trimming with NULL meta, ignoring");
    518         return;
    519     }
    520 
    521     Mutex::Autolock autoLock(mLock);
    522     if (mBuffers.empty()) {
    523         return;
    524     }
    525 
    526     HLSTime stopTime(meta);
    527     ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld",
    528             stopTime.mSeq, (long long)stopTime.mTimeUs);
    529 
    530     List<sp<ABuffer> >::iterator it;
    531     List<DiscontinuitySegment >::iterator it2;
    532     sp<AMessage> newLatestEnqueuedMeta = NULL;
    533     int64_t newLastQueuedTimeUs = 0;
    534     for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) {
    535         const sp<ABuffer> &buffer = *it;
    536         int32_t discontinuity;
    537         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
    538             // CHECK(it2 != mDiscontinuitySegments.end());
    539             ++it2;
    540             continue;
    541         }
    542 
    543         HLSTime curTime(buffer->meta());
    544         if (!(curTime < stopTime)) {
    545             ALOGV("trimming from %lld (inclusive) to end",
    546                     (long long)curTime.mTimeUs);
    547             break;
    548         }
    549         newLatestEnqueuedMeta = buffer->meta();
    550         newLastQueuedTimeUs = curTime.mTimeUs;
    551     }
    552 
    553     mBuffers.erase(it, mBuffers.end());
    554     mLatestEnqueuedMeta = newLatestEnqueuedMeta;
    555     mLastQueuedTimeUs = newLastQueuedTimeUs;
    556 
    557     DiscontinuitySegment &seg = *it2;
    558     if (newLatestEnqueuedMeta != NULL) {
    559         seg.mMaxEnqueTimeUs = newLastQueuedTimeUs;
    560     } else {
    561         seg.clear();
    562     }
    563     mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end());
    564 }
    565 
    566 /*
    567  * removes samples with time equal or before meta;
    568  * returns first sample left in the queue.
    569  *
    570  * (for AVC, if trim happens, the samples left will always start
    571  * at next IDR.)
    572  */
    573 sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
    574         const sp<AMessage> &meta) {
    575     HLSTime startTime(meta);
    576     ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld",
    577             startTime.mSeq, (long long)startTime.mTimeUs);
    578 
    579     sp<AMessage> firstMeta;
    580     int64_t firstTimeUs = -1;
    581     Mutex::Autolock autoLock(mLock);
    582     if (mBuffers.empty()) {
    583         return NULL;
    584     }
    585 
    586     sp<MetaData> format;
    587     bool isAvc = false;
    588 
    589     List<sp<ABuffer> >::iterator it;
    590     for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
    591         const sp<ABuffer> &buffer = *it;
    592         int32_t discontinuity;
    593         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
    594             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
    595             // CHECK(!mDiscontinuitySegments.empty());
    596             format = NULL;
    597             isAvc = false;
    598             continue;
    599         }
    600         if (format == NULL) {
    601             sp<RefBase> object;
    602             if (buffer->meta()->findObject("format", &object)) {
    603                 const char* mime;
    604                 format = static_cast<MetaData*>(object.get());
    605                 isAvc = format != NULL
    606                         && format->findCString(kKeyMIMEType, &mime)
    607                         && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
    608             }
    609         }
    610         if (isAvc && !IsIDR(buffer)) {
    611             continue;
    612         }
    613 
    614         HLSTime curTime(buffer->meta());
    615         if (startTime < curTime) {
    616             ALOGV("trimming from beginning to %lld (not inclusive)",
    617                     (long long)curTime.mTimeUs);
    618             firstMeta = buffer->meta();
    619             firstTimeUs = curTime.mTimeUs;
    620             break;
    621         }
    622     }
    623     mBuffers.erase(mBuffers.begin(), it);
    624     mLatestDequeuedMeta = NULL;
    625 
    626     // CHECK(!mDiscontinuitySegments.empty());
    627     DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
    628     if (firstTimeUs >= 0) {
    629         seg.mMaxDequeTimeUs = firstTimeUs;
    630     } else {
    631         seg.clear();
    632     }
    633 
    634     return firstMeta;
    635 }
    636 
    637 }  // namespace android
    638