Home | History | Annotate | Download | only in mpeg2ts
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "AnotherPacketSource"
     19 
     20 #include "AnotherPacketSource.h"
     21 
     22 #include "include/avc_utils.h"
     23 
     24 #include <media/stagefright/foundation/ABuffer.h>
     25 #include <media/stagefright/foundation/ADebug.h>
     26 #include <media/stagefright/foundation/AMessage.h>
     27 #include <media/stagefright/foundation/AString.h>
     28 #include <media/stagefright/foundation/hexdump.h>
     29 #include <media/stagefright/MediaBuffer.h>
     30 #include <media/stagefright/MediaDefs.h>
     31 #include <media/stagefright/MetaData.h>
     32 #include <media/stagefright/Utils.h>
     33 #include <utils/Vector.h>
     34 
     35 #include <inttypes.h>
     36 
     37 namespace android {
     38 
     39 const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs
     40 
     41 AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
     42     : mIsAudio(false),
     43       mIsVideo(false),
     44       mEnabled(true),
     45       mFormat(NULL),
     46       mLastQueuedTimeUs(0),
     47       mEOSResult(OK),
     48       mLatestEnqueuedMeta(NULL),
     49       mLatestDequeuedMeta(NULL) {
     50     setFormat(meta);
     51 
     52     mDiscontinuitySegments.push_back(DiscontinuitySegment());
     53 }
     54 
     55 void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
     56     if (mFormat != NULL) {
     57         // Only allowed to be set once. Requires explicit clear to reset.
     58         return;
     59     }
     60 
     61     mIsAudio = false;
     62     mIsVideo = false;
     63 
     64     if (meta == NULL) {
     65         return;
     66     }
     67 
     68     mFormat = meta;
     69     const char *mime;
     70     CHECK(meta->findCString(kKeyMIMEType, &mime));
     71 
     72     if (!strncasecmp("audio/", mime, 6)) {
     73         mIsAudio = true;
     74     } else  if (!strncasecmp("video/", mime, 6)) {
     75         mIsVideo = true;
     76     } else {
     77         CHECK(!strncasecmp("text/", mime, 5) || !strncasecmp("application/", mime, 12));
     78     }
     79 }
     80 
     81 AnotherPacketSource::~AnotherPacketSource() {
     82 }
     83 
     84 status_t AnotherPacketSource::start(MetaData * /* params */) {
     85     return OK;
     86 }
     87 
     88 status_t AnotherPacketSource::stop() {
     89     return OK;
     90 }
     91 
     92 sp<MetaData> AnotherPacketSource::getFormat() {
     93     Mutex::Autolock autoLock(mLock);
     94     if (mFormat != NULL) {
     95         return mFormat;
     96     }
     97 
     98     List<sp<ABuffer> >::iterator it = mBuffers.begin();
     99     while (it != mBuffers.end()) {
    100         sp<ABuffer> buffer = *it;
    101         int32_t discontinuity;
    102         if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) {
    103             sp<RefBase> object;
    104             if (buffer->meta()->findObject("format", &object)) {
    105                 setFormat(static_cast<MetaData*>(object.get()));
    106                 return mFormat;
    107             }
    108         }
    109 
    110         ++it;
    111     }
    112     return NULL;
    113 }
    114 
    115 status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
    116     buffer->clear();
    117 
    118     Mutex::Autolock autoLock(mLock);
    119     while (mEOSResult == OK && mBuffers.empty()) {
    120         mCondition.wait(mLock);
    121     }
    122 
    123     if (!mBuffers.empty()) {
    124         *buffer = *mBuffers.begin();
    125         mBuffers.erase(mBuffers.begin());
    126 
    127         int32_t discontinuity;
    128         if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) {
    129             if (wasFormatChange(discontinuity)) {
    130                 mFormat.clear();
    131             }
    132 
    133             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
    134             // CHECK(!mDiscontinuitySegments.empty());
    135             return INFO_DISCONTINUITY;
    136         }
    137 
    138         // CHECK(!mDiscontinuitySegments.empty());
    139         DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
    140 
    141         int64_t timeUs;
    142         mLatestDequeuedMeta = (*buffer)->meta()->dup();
    143         CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs));
    144         if (timeUs > seg.mMaxDequeTimeUs) {
    145             seg.mMaxDequeTimeUs = timeUs;
    146         }
    147 
    148         sp<RefBase> object;
    149         if ((*buffer)->meta()->findObject("format", &object)) {
    150             setFormat(static_cast<MetaData*>(object.get()));
    151         }
    152 
    153         return OK;
    154     }
    155 
    156     return mEOSResult;
    157 }
    158 
    159 void AnotherPacketSource::requeueAccessUnit(const sp<ABuffer> &buffer) {
    160     // TODO: update corresponding book keeping info.
    161     Mutex::Autolock autoLock(mLock);
    162     mBuffers.push_front(buffer);
    163 }
    164 
    165 status_t AnotherPacketSource::read(
    166         MediaBuffer **out, const ReadOptions *) {
    167     *out = NULL;
    168 
    169     Mutex::Autolock autoLock(mLock);
    170     while (mEOSResult == OK && mBuffers.empty()) {
    171         mCondition.wait(mLock);
    172     }
    173 
    174     if (!mBuffers.empty()) {
    175 
    176         const sp<ABuffer> buffer = *mBuffers.begin();
    177         mBuffers.erase(mBuffers.begin());
    178 
    179         int32_t discontinuity;
    180         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
    181             if (wasFormatChange(discontinuity)) {
    182                 mFormat.clear();
    183             }
    184 
    185             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
    186             // CHECK(!mDiscontinuitySegments.empty());
    187             return INFO_DISCONTINUITY;
    188         }
    189 
    190         mLatestDequeuedMeta = buffer->meta()->dup();
    191 
    192         sp<RefBase> object;
    193         if (buffer->meta()->findObject("format", &object)) {
    194             setFormat(static_cast<MetaData*>(object.get()));
    195         }
    196 
    197         int64_t timeUs;
    198         CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
    199         // CHECK(!mDiscontinuitySegments.empty());
    200         DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
    201         if (timeUs > seg.mMaxDequeTimeUs) {
    202             seg.mMaxDequeTimeUs = timeUs;
    203         }
    204 
    205         MediaBuffer *mediaBuffer = new MediaBuffer(buffer);
    206 
    207         mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs);
    208 
    209         int32_t isSync;
    210         if (buffer->meta()->findInt32("isSync", &isSync)) {
    211             mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync);
    212         }
    213 
    214         sp<ABuffer> sei;
    215         if (buffer->meta()->findBuffer("sei", &sei) && sei != NULL) {
    216             mediaBuffer->meta_data()->setData(kKeySEI, 0, sei->data(), sei->size());
    217         }
    218 
    219         *out = mediaBuffer;
    220         return OK;
    221     }
    222 
    223     return mEOSResult;
    224 }
    225 
    226 bool AnotherPacketSource::wasFormatChange(
    227         int32_t discontinuityType) const {
    228     if (mIsAudio) {
    229         return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
    230     }
    231 
    232     if (mIsVideo) {
    233         return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0;
    234     }
    235 
    236     return false;
    237 }
    238 
    239 void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
    240     int32_t damaged;
    241     if (buffer->meta()->findInt32("damaged", &damaged) && damaged) {
    242         // LOG(VERBOSE) << "discarding damaged AU";
    243         return;
    244     }
    245 
    246     Mutex::Autolock autoLock(mLock);
    247     mBuffers.push_back(buffer);
    248     mCondition.signal();
    249 
    250     int32_t discontinuity;
    251     if (buffer->meta()->findInt32("discontinuity", &discontinuity)){
    252         ALOGV("queueing a discontinuity with queueAccessUnit");
    253 
    254         mLastQueuedTimeUs = 0ll;
    255         mEOSResult = OK;
    256         mLatestEnqueuedMeta = NULL;
    257 
    258         mDiscontinuitySegments.push_back(DiscontinuitySegment());
    259         return;
    260     }
    261 
    262     int64_t lastQueuedTimeUs;
    263     CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs));
    264     mLastQueuedTimeUs = lastQueuedTimeUs;
    265     ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
    266             mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);
    267 
    268     // CHECK(!mDiscontinuitySegments.empty());
    269     DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end());
    270     if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) {
    271         tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs;
    272     }
    273     if (tailSeg.mMaxDequeTimeUs == -1) {
    274         tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs;
    275     }
    276 
    277     if (mLatestEnqueuedMeta == NULL) {
    278         mLatestEnqueuedMeta = buffer->meta()->dup();
    279     } else {
    280         int64_t latestTimeUs = 0;
    281         int64_t frameDeltaUs = 0;
    282         CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs));
    283         if (lastQueuedTimeUs > latestTimeUs) {
    284             mLatestEnqueuedMeta = buffer->meta()->dup();
    285             frameDeltaUs = lastQueuedTimeUs - latestTimeUs;
    286             mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
    287         } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) {
    288             // For B frames
    289             frameDeltaUs = latestTimeUs - lastQueuedTimeUs;
    290             mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs);
    291         }
    292     }
    293 }
    294 
    295 void AnotherPacketSource::clear() {
    296     Mutex::Autolock autoLock(mLock);
    297 
    298     mBuffers.clear();
    299     mEOSResult = OK;
    300 
    301     mDiscontinuitySegments.clear();
    302     mDiscontinuitySegments.push_back(DiscontinuitySegment());
    303 
    304     mFormat = NULL;
    305     mLatestEnqueuedMeta = NULL;
    306 }
    307 
    308 void AnotherPacketSource::queueDiscontinuity(
    309         ATSParser::DiscontinuityType type,
    310         const sp<AMessage> &extra,
    311         bool discard) {
    312     Mutex::Autolock autoLock(mLock);
    313 
    314     if (discard) {
    315         // Leave only discontinuities in the queue.
    316         List<sp<ABuffer> >::iterator it = mBuffers.begin();
    317         while (it != mBuffers.end()) {
    318             sp<ABuffer> oldBuffer = *it;
    319 
    320             int32_t oldDiscontinuityType;
    321             if (!oldBuffer->meta()->findInt32(
    322                         "discontinuity", &oldDiscontinuityType)) {
    323                 it = mBuffers.erase(it);
    324                 continue;
    325             }
    326 
    327             ++it;
    328         }
    329 
    330         for (List<DiscontinuitySegment>::iterator it2 = mDiscontinuitySegments.begin();
    331                 it2 != mDiscontinuitySegments.end();
    332                 ++it2) {
    333             DiscontinuitySegment &seg = *it2;
    334             seg.clear();
    335         }
    336 
    337     }
    338 
    339     mEOSResult = OK;
    340     mLastQueuedTimeUs = 0;
    341     mLatestEnqueuedMeta = NULL;
    342 
    343     if (type == ATSParser::DISCONTINUITY_NONE) {
    344         return;
    345     }
    346 
    347     mDiscontinuitySegments.push_back(DiscontinuitySegment());
    348 
    349     sp<ABuffer> buffer = new ABuffer(0);
    350     buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
    351     buffer->meta()->setMessage("extra", extra);
    352 
    353     mBuffers.push_back(buffer);
    354     mCondition.signal();
    355 }
    356 
    357 void AnotherPacketSource::signalEOS(status_t result) {
    358     CHECK(result != OK);
    359 
    360     Mutex::Autolock autoLock(mLock);
    361     mEOSResult = result;
    362     mCondition.signal();
    363 }
    364 
    365 bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) {
    366     Mutex::Autolock autoLock(mLock);
    367     *finalResult = OK;
    368     if (!mEnabled) {
    369         return false;
    370     }
    371     if (!mBuffers.empty()) {
    372         return true;
    373     }
    374 
    375     *finalResult = mEOSResult;
    376     return false;
    377 }
    378 
    379 bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {
    380     Mutex::Autolock autoLock(mLock);
    381     *finalResult = OK;
    382     if (!mEnabled) {
    383         return false;
    384     }
    385     List<sp<ABuffer> >::iterator it;
    386     for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
    387         int32_t discontinuity;
    388         if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) {
    389             return true;
    390         }
    391     }
    392 
    393     *finalResult = mEOSResult;
    394     return false;
    395 }
    396 
    397 size_t AnotherPacketSource::getAvailableBufferCount(status_t *finalResult) {
    398     Mutex::Autolock autoLock(mLock);
    399 
    400     *finalResult = OK;
    401     if (!mEnabled) {
    402         return 0;
    403     }
    404     if (!mBuffers.empty()) {
    405         return mBuffers.size();
    406     }
    407     *finalResult = mEOSResult;
    408     return 0;
    409 }
    410 
    411 int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
    412     Mutex::Autolock autoLock(mLock);
    413     *finalResult = mEOSResult;
    414 
    415     int64_t durationUs = 0;
    416     for (List<DiscontinuitySegment>::iterator it = mDiscontinuitySegments.begin();
    417             it != mDiscontinuitySegments.end();
    418             ++it) {
    419         const DiscontinuitySegment &seg = *it;
    420         // dequeued access units should be a subset of enqueued access units
    421         // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs);
    422         durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs);
    423     }
    424 
    425     return durationUs;
    426 }
    427 
    428 status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
    429     *timeUs = 0;
    430 
    431     Mutex::Autolock autoLock(mLock);
    432 
    433     if (mBuffers.empty()) {
    434         return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK;
    435     }
    436 
    437     sp<ABuffer> buffer = *mBuffers.begin();
    438     CHECK(buffer->meta()->findInt64("timeUs", timeUs));
    439 
    440     return OK;
    441 }
    442 
    443 bool AnotherPacketSource::isFinished(int64_t duration) const {
    444     if (duration > 0) {
    445         int64_t diff = duration - mLastQueuedTimeUs;
    446         if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) {
    447             ALOGV("Detecting EOS due to near end");
    448             return true;
    449         }
    450     }
    451     return (mEOSResult != OK);
    452 }
    453 
    454 sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() {
    455     Mutex::Autolock autoLock(mLock);
    456     return mLatestEnqueuedMeta;
    457 }
    458 
    459 sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() {
    460     Mutex::Autolock autoLock(mLock);
    461     return mLatestDequeuedMeta;
    462 }
    463 
    464 void AnotherPacketSource::enable(bool enable) {
    465     Mutex::Autolock autoLock(mLock);
    466     mEnabled = enable;
    467 }
    468 
    469 /*
    470  * returns the sample meta that's delayUs after queue head
    471  * (NULL if such sample is unavailable)
    472  */
    473 sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) {
    474     Mutex::Autolock autoLock(mLock);
    475     int64_t firstUs = -1;
    476     int64_t lastUs = -1;
    477     int64_t durationUs = 0;
    478 
    479     List<sp<ABuffer> >::iterator it;
    480     for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
    481         const sp<ABuffer> &buffer = *it;
    482         int32_t discontinuity;
    483         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
    484             durationUs += lastUs - firstUs;
    485             firstUs = -1;
    486             lastUs = -1;
    487             continue;
    488         }
    489         int64_t timeUs;
    490         if (buffer->meta()->findInt64("timeUs", &timeUs)) {
    491             if (firstUs < 0) {
    492                 firstUs = timeUs;
    493             }
    494             if (lastUs < 0 || timeUs > lastUs) {
    495                 lastUs = timeUs;
    496             }
    497             if (durationUs + (lastUs - firstUs) >= delayUs) {
    498                 return buffer->meta();
    499             }
    500         }
    501     }
    502     return NULL;
    503 }
    504 
    505 /*
    506  * removes samples with time equal or after meta
    507  */
    508 void AnotherPacketSource::trimBuffersAfterMeta(
    509         const sp<AMessage> &meta) {
    510     if (meta == NULL) {
    511         ALOGW("trimming with NULL meta, ignoring");
    512         return;
    513     }
    514 
    515     Mutex::Autolock autoLock(mLock);
    516     if (mBuffers.empty()) {
    517         return;
    518     }
    519 
    520     HLSTime stopTime(meta);
    521     ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld",
    522             stopTime.mSeq, (long long)stopTime.mTimeUs);
    523 
    524     List<sp<ABuffer> >::iterator it;
    525     List<DiscontinuitySegment >::iterator it2;
    526     sp<AMessage> newLatestEnqueuedMeta = NULL;
    527     int64_t newLastQueuedTimeUs = 0;
    528     for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) {
    529         const sp<ABuffer> &buffer = *it;
    530         int32_t discontinuity;
    531         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
    532             // CHECK(it2 != mDiscontinuitySegments.end());
    533             ++it2;
    534             continue;
    535         }
    536 
    537         HLSTime curTime(buffer->meta());
    538         if (!(curTime < stopTime)) {
    539             ALOGV("trimming from %lld (inclusive) to end",
    540                     (long long)curTime.mTimeUs);
    541             break;
    542         }
    543         newLatestEnqueuedMeta = buffer->meta();
    544         newLastQueuedTimeUs = curTime.mTimeUs;
    545     }
    546 
    547     mBuffers.erase(it, mBuffers.end());
    548     mLatestEnqueuedMeta = newLatestEnqueuedMeta;
    549     mLastQueuedTimeUs = newLastQueuedTimeUs;
    550 
    551     DiscontinuitySegment &seg = *it2;
    552     if (newLatestEnqueuedMeta != NULL) {
    553         seg.mMaxEnqueTimeUs = newLastQueuedTimeUs;
    554     } else {
    555         seg.clear();
    556     }
    557     mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end());
    558 }
    559 
    560 /*
    561  * removes samples with time equal or before meta;
    562  * returns first sample left in the queue.
    563  *
    564  * (for AVC, if trim happens, the samples left will always start
    565  * at next IDR.)
    566  */
    567 sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
    568         const sp<AMessage> &meta) {
    569     HLSTime startTime(meta);
    570     ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld",
    571             startTime.mSeq, (long long)startTime.mTimeUs);
    572 
    573     sp<AMessage> firstMeta;
    574     int64_t firstTimeUs = -1;
    575     Mutex::Autolock autoLock(mLock);
    576     if (mBuffers.empty()) {
    577         return NULL;
    578     }
    579 
    580     sp<MetaData> format;
    581     bool isAvc = false;
    582 
    583     List<sp<ABuffer> >::iterator it;
    584     for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
    585         const sp<ABuffer> &buffer = *it;
    586         int32_t discontinuity;
    587         if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
    588             mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
    589             // CHECK(!mDiscontinuitySegments.empty());
    590             format = NULL;
    591             isAvc = false;
    592             continue;
    593         }
    594         if (format == NULL) {
    595             sp<RefBase> object;
    596             if (buffer->meta()->findObject("format", &object)) {
    597                 const char* mime;
    598                 format = static_cast<MetaData*>(object.get());
    599                 isAvc = format != NULL
    600                         && format->findCString(kKeyMIMEType, &mime)
    601                         && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
    602             }
    603         }
    604         if (isAvc && !IsIDR(buffer)) {
    605             continue;
    606         }
    607 
    608         HLSTime curTime(buffer->meta());
    609         if (startTime < curTime) {
    610             ALOGV("trimming from beginning to %lld (not inclusive)",
    611                     (long long)curTime.mTimeUs);
    612             firstMeta = buffer->meta();
    613             firstTimeUs = curTime.mTimeUs;
    614             break;
    615         }
    616     }
    617     mBuffers.erase(mBuffers.begin(), it);
    618     mLatestDequeuedMeta = NULL;
    619 
    620     // CHECK(!mDiscontinuitySegments.empty());
    621     DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
    622     if (firstTimeUs >= 0) {
    623         seg.mMaxDequeTimeUs = firstTimeUs;
    624     } else {
    625         seg.clear();
    626     }
    627 
    628     return firstMeta;
    629 }
    630 
    631 }  // namespace android
    632