Home | History | Annotate | Download | only in httplive
      1 /*
      2  * Copyright (C) 2012 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "PlaylistFetcher"
     19 #include <utils/Log.h>
     20 
     21 #include "PlaylistFetcher.h"
     22 
     23 #include "LiveDataSource.h"
     24 #include "LiveSession.h"
     25 #include "M3UParser.h"
     26 
     27 #include "include/avc_utils.h"
     28 #include "include/HTTPBase.h"
     29 #include "include/ID3.h"
     30 #include "mpeg2ts/AnotherPacketSource.h"
     31 
     32 #include <media/IStreamSource.h>
     33 #include <media/stagefright/foundation/ABitReader.h>
     34 #include <media/stagefright/foundation/ABuffer.h>
     35 #include <media/stagefright/foundation/ADebug.h>
     36 #include <media/stagefright/foundation/hexdump.h>
     37 #include <media/stagefright/FileSource.h>
     38 #include <media/stagefright/MediaDefs.h>
     39 #include <media/stagefright/MetaData.h>
     40 #include <media/stagefright/Utils.h>
     41 
     42 #include <ctype.h>
     43 #include <inttypes.h>
     44 #include <openssl/aes.h>
     45 #include <openssl/md5.h>
     46 
     47 namespace android {
     48 
     49 // static
     50 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
     51 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
     52 // LCM of 188 (size of a TS packet) & 1k works well
     53 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
     54 const int32_t PlaylistFetcher::kNumSkipFrames = 5;
     55 
     56 PlaylistFetcher::PlaylistFetcher(
     57         const sp<AMessage> &notify,
     58         const sp<LiveSession> &session,
     59         const char *uri,
     60         int32_t subtitleGeneration)
     61     : mNotify(notify),
     62       mStartTimeUsNotify(notify->dup()),
     63       mSession(session),
     64       mURI(uri),
     65       mStreamTypeMask(0),
     66       mStartTimeUs(-1ll),
     67       mSegmentStartTimeUs(-1ll),
     68       mDiscontinuitySeq(-1ll),
     69       mStartTimeUsRelative(false),
     70       mLastPlaylistFetchTimeUs(-1ll),
     71       mSeqNumber(-1),
     72       mNumRetries(0),
     73       mStartup(true),
     74       mAdaptive(false),
     75       mPrepared(false),
     76       mNextPTSTimeUs(-1ll),
     77       mMonitorQueueGeneration(0),
     78       mSubtitleGeneration(subtitleGeneration),
     79       mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
     80       mFirstPTSValid(false),
     81       mAbsoluteTimeAnchorUs(0ll),
     82       mVideoBuffer(new AnotherPacketSource(NULL)) {
     83     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
     84     mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
     85     mStartTimeUsNotify->setInt32("streamMask", 0);
     86 }
     87 
     88 PlaylistFetcher::~PlaylistFetcher() {
     89 }
     90 
     91 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
     92     CHECK(mPlaylist != NULL);
     93 
     94     int32_t firstSeqNumberInPlaylist;
     95     if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
     96                 "media-sequence", &firstSeqNumberInPlaylist)) {
     97         firstSeqNumberInPlaylist = 0;
     98     }
     99 
    100     int32_t lastSeqNumberInPlaylist =
    101         firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
    102 
    103     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
    104     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
    105 
    106     int64_t segmentStartUs = 0ll;
    107     for (int32_t index = 0;
    108             index < seqNumber - firstSeqNumberInPlaylist; ++index) {
    109         sp<AMessage> itemMeta;
    110         CHECK(mPlaylist->itemAt(
    111                     index, NULL /* uri */, &itemMeta));
    112 
    113         int64_t itemDurationUs;
    114         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
    115 
    116         segmentStartUs += itemDurationUs;
    117     }
    118 
    119     return segmentStartUs;
    120 }
    121 
    122 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
    123     int64_t nowUs = ALooper::GetNowUs();
    124 
    125     if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
    126         CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
    127         return 0ll;
    128     }
    129 
    130     if (mPlaylist->isComplete()) {
    131         return (~0llu >> 1);
    132     }
    133 
    134     int32_t targetDurationSecs;
    135     CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
    136 
    137     int64_t targetDurationUs = targetDurationSecs * 1000000ll;
    138 
    139     int64_t minPlaylistAgeUs;
    140 
    141     switch (mRefreshState) {
    142         case INITIAL_MINIMUM_RELOAD_DELAY:
    143         {
    144             size_t n = mPlaylist->size();
    145             if (n > 0) {
    146                 sp<AMessage> itemMeta;
    147                 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
    148 
    149                 int64_t itemDurationUs;
    150                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
    151 
    152                 minPlaylistAgeUs = itemDurationUs;
    153                 break;
    154             }
    155 
    156             // fall through
    157         }
    158 
    159         case FIRST_UNCHANGED_RELOAD_ATTEMPT:
    160         {
    161             minPlaylistAgeUs = targetDurationUs / 2;
    162             break;
    163         }
    164 
    165         case SECOND_UNCHANGED_RELOAD_ATTEMPT:
    166         {
    167             minPlaylistAgeUs = (targetDurationUs * 3) / 2;
    168             break;
    169         }
    170 
    171         case THIRD_UNCHANGED_RELOAD_ATTEMPT:
    172         {
    173             minPlaylistAgeUs = targetDurationUs * 3;
    174             break;
    175         }
    176 
    177         default:
    178             TRESPASS();
    179             break;
    180     }
    181 
    182     int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
    183     return delayUs > 0ll ? delayUs : 0ll;
    184 }
    185 
    186 status_t PlaylistFetcher::decryptBuffer(
    187         size_t playlistIndex, const sp<ABuffer> &buffer,
    188         bool first) {
    189     sp<AMessage> itemMeta;
    190     bool found = false;
    191     AString method;
    192 
    193     for (ssize_t i = playlistIndex; i >= 0; --i) {
    194         AString uri;
    195         CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
    196 
    197         if (itemMeta->findString("cipher-method", &method)) {
    198             found = true;
    199             break;
    200         }
    201     }
    202 
    203     if (!found) {
    204         method = "NONE";
    205     }
    206     buffer->meta()->setString("cipher-method", method.c_str());
    207 
    208     if (method == "NONE") {
    209         return OK;
    210     } else if (!(method == "AES-128")) {
    211         ALOGE("Unsupported cipher method '%s'", method.c_str());
    212         return ERROR_UNSUPPORTED;
    213     }
    214 
    215     AString keyURI;
    216     if (!itemMeta->findString("cipher-uri", &keyURI)) {
    217         ALOGE("Missing key uri");
    218         return ERROR_MALFORMED;
    219     }
    220 
    221     ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
    222 
    223     sp<ABuffer> key;
    224     if (index >= 0) {
    225         key = mAESKeyForURI.valueAt(index);
    226     } else {
    227         ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
    228 
    229         if (err < 0) {
    230             ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
    231             return ERROR_IO;
    232         } else if (key->size() != 16) {
    233             ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
    234             return ERROR_MALFORMED;
    235         }
    236 
    237         mAESKeyForURI.add(keyURI, key);
    238     }
    239 
    240     AES_KEY aes_key;
    241     if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
    242         ALOGE("failed to set AES decryption key.");
    243         return UNKNOWN_ERROR;
    244     }
    245 
    246     size_t n = buffer->size();
    247     if (!n) {
    248         return OK;
    249     }
    250     CHECK(n % 16 == 0);
    251 
    252     if (first) {
    253         // If decrypting the first block in a file, read the iv from the manifest
    254         // or derive the iv from the file's sequence number.
    255 
    256         AString iv;
    257         if (itemMeta->findString("cipher-iv", &iv)) {
    258             if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
    259                     || iv.size() != 16 * 2 + 2) {
    260                 ALOGE("malformed cipher IV '%s'.", iv.c_str());
    261                 return ERROR_MALFORMED;
    262             }
    263 
    264             memset(mAESInitVec, 0, sizeof(mAESInitVec));
    265             for (size_t i = 0; i < 16; ++i) {
    266                 char c1 = tolower(iv.c_str()[2 + 2 * i]);
    267                 char c2 = tolower(iv.c_str()[3 + 2 * i]);
    268                 if (!isxdigit(c1) || !isxdigit(c2)) {
    269                     ALOGE("malformed cipher IV '%s'.", iv.c_str());
    270                     return ERROR_MALFORMED;
    271                 }
    272                 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
    273                 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
    274 
    275                 mAESInitVec[i] = nibble1 << 4 | nibble2;
    276             }
    277         } else {
    278             memset(mAESInitVec, 0, sizeof(mAESInitVec));
    279             mAESInitVec[15] = mSeqNumber & 0xff;
    280             mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
    281             mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
    282             mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
    283         }
    284     }
    285 
    286     AES_cbc_encrypt(
    287             buffer->data(), buffer->data(), buffer->size(),
    288             &aes_key, mAESInitVec, AES_DECRYPT);
    289 
    290     return OK;
    291 }
    292 
    293 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
    294     status_t err;
    295     AString method;
    296     CHECK(buffer->meta()->findString("cipher-method", &method));
    297     if (method == "NONE") {
    298         return OK;
    299     }
    300 
    301     uint8_t padding = 0;
    302     if (buffer->size() > 0) {
    303         padding = buffer->data()[buffer->size() - 1];
    304     }
    305 
    306     if (padding > 16) {
    307         return ERROR_MALFORMED;
    308     }
    309 
    310     for (size_t i = buffer->size() - padding; i < padding; i++) {
    311         if (buffer->data()[i] != padding) {
    312             return ERROR_MALFORMED;
    313         }
    314     }
    315 
    316     buffer->setRange(buffer->offset(), buffer->size() - padding);
    317     return OK;
    318 }
    319 
    320 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
    321     int64_t maxDelayUs = delayUsToRefreshPlaylist();
    322     if (maxDelayUs < minDelayUs) {
    323         maxDelayUs = minDelayUs;
    324     }
    325     if (delayUs > maxDelayUs) {
    326         ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs);
    327         delayUs = maxDelayUs;
    328     }
    329     sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id());
    330     msg->setInt32("generation", mMonitorQueueGeneration);
    331     msg->post(delayUs);
    332 }
    333 
    334 void PlaylistFetcher::cancelMonitorQueue() {
    335     ++mMonitorQueueGeneration;
    336 }
    337 
    338 void PlaylistFetcher::startAsync(
    339         const sp<AnotherPacketSource> &audioSource,
    340         const sp<AnotherPacketSource> &videoSource,
    341         const sp<AnotherPacketSource> &subtitleSource,
    342         int64_t startTimeUs,
    343         int64_t segmentStartTimeUs,
    344         int32_t startDiscontinuitySeq,
    345         bool adaptive) {
    346     sp<AMessage> msg = new AMessage(kWhatStart, id());
    347 
    348     uint32_t streamTypeMask = 0ul;
    349 
    350     if (audioSource != NULL) {
    351         msg->setPointer("audioSource", audioSource.get());
    352         streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
    353     }
    354 
    355     if (videoSource != NULL) {
    356         msg->setPointer("videoSource", videoSource.get());
    357         streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
    358     }
    359 
    360     if (subtitleSource != NULL) {
    361         msg->setPointer("subtitleSource", subtitleSource.get());
    362         streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
    363     }
    364 
    365     msg->setInt32("streamTypeMask", streamTypeMask);
    366     msg->setInt64("startTimeUs", startTimeUs);
    367     msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
    368     msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
    369     msg->setInt32("adaptive", adaptive);
    370     msg->post();
    371 }
    372 
    373 void PlaylistFetcher::pauseAsync() {
    374     (new AMessage(kWhatPause, id()))->post();
    375 }
    376 
    377 void PlaylistFetcher::stopAsync(bool clear) {
    378     sp<AMessage> msg = new AMessage(kWhatStop, id());
    379     msg->setInt32("clear", clear);
    380     msg->post();
    381 }
    382 
    383 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
    384     AMessage* msg = new AMessage(kWhatResumeUntil, id());
    385     msg->setMessage("params", params);
    386     msg->post();
    387 }
    388 
    389 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
    390     switch (msg->what()) {
    391         case kWhatStart:
    392         {
    393             status_t err = onStart(msg);
    394 
    395             sp<AMessage> notify = mNotify->dup();
    396             notify->setInt32("what", kWhatStarted);
    397             notify->setInt32("err", err);
    398             notify->post();
    399             break;
    400         }
    401 
    402         case kWhatPause:
    403         {
    404             onPause();
    405 
    406             sp<AMessage> notify = mNotify->dup();
    407             notify->setInt32("what", kWhatPaused);
    408             notify->post();
    409             break;
    410         }
    411 
    412         case kWhatStop:
    413         {
    414             onStop(msg);
    415 
    416             sp<AMessage> notify = mNotify->dup();
    417             notify->setInt32("what", kWhatStopped);
    418             notify->post();
    419             break;
    420         }
    421 
    422         case kWhatMonitorQueue:
    423         case kWhatDownloadNext:
    424         {
    425             int32_t generation;
    426             CHECK(msg->findInt32("generation", &generation));
    427 
    428             if (generation != mMonitorQueueGeneration) {
    429                 // Stale event
    430                 break;
    431             }
    432 
    433             if (msg->what() == kWhatMonitorQueue) {
    434                 onMonitorQueue();
    435             } else {
    436                 onDownloadNext();
    437             }
    438             break;
    439         }
    440 
    441         case kWhatResumeUntil:
    442         {
    443             onResumeUntil(msg);
    444             break;
    445         }
    446 
    447         default:
    448             TRESPASS();
    449     }
    450 }
    451 
    452 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
    453     mPacketSources.clear();
    454 
    455     uint32_t streamTypeMask;
    456     CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
    457 
    458     int64_t startTimeUs;
    459     int64_t segmentStartTimeUs;
    460     int32_t startDiscontinuitySeq;
    461     int32_t adaptive;
    462     CHECK(msg->findInt64("startTimeUs", &startTimeUs));
    463     CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
    464     CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
    465     CHECK(msg->findInt32("adaptive", &adaptive));
    466 
    467     if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
    468         void *ptr;
    469         CHECK(msg->findPointer("audioSource", &ptr));
    470 
    471         mPacketSources.add(
    472                 LiveSession::STREAMTYPE_AUDIO,
    473                 static_cast<AnotherPacketSource *>(ptr));
    474     }
    475 
    476     if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
    477         void *ptr;
    478         CHECK(msg->findPointer("videoSource", &ptr));
    479 
    480         mPacketSources.add(
    481                 LiveSession::STREAMTYPE_VIDEO,
    482                 static_cast<AnotherPacketSource *>(ptr));
    483     }
    484 
    485     if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
    486         void *ptr;
    487         CHECK(msg->findPointer("subtitleSource", &ptr));
    488 
    489         mPacketSources.add(
    490                 LiveSession::STREAMTYPE_SUBTITLES,
    491                 static_cast<AnotherPacketSource *>(ptr));
    492     }
    493 
    494     mStreamTypeMask = streamTypeMask;
    495 
    496     mSegmentStartTimeUs = segmentStartTimeUs;
    497     mDiscontinuitySeq = startDiscontinuitySeq;
    498 
    499     if (startTimeUs >= 0) {
    500         mStartTimeUs = startTimeUs;
    501         mSeqNumber = -1;
    502         mStartup = true;
    503         mPrepared = false;
    504         mAdaptive = adaptive;
    505     }
    506 
    507     postMonitorQueue();
    508 
    509     return OK;
    510 }
    511 
    512 void PlaylistFetcher::onPause() {
    513     cancelMonitorQueue();
    514 }
    515 
    516 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
    517     cancelMonitorQueue();
    518 
    519     int32_t clear;
    520     CHECK(msg->findInt32("clear", &clear));
    521     if (clear) {
    522         for (size_t i = 0; i < mPacketSources.size(); i++) {
    523             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
    524             packetSource->clear();
    525         }
    526     }
    527 
    528     mPacketSources.clear();
    529     mStreamTypeMask = 0;
    530 }
    531 
    532 // Resume until we have reached the boundary timestamps listed in `msg`; when
    533 // the remaining time is too short (within a resume threshold) stop immediately
    534 // instead.
    535 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
    536     sp<AMessage> params;
    537     CHECK(msg->findMessage("params", &params));
    538 
    539     bool stop = false;
    540     for (size_t i = 0; i < mPacketSources.size(); i++) {
    541         sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
    542 
    543         const char *stopKey;
    544         int streamType = mPacketSources.keyAt(i);
    545         switch (streamType) {
    546         case LiveSession::STREAMTYPE_VIDEO:
    547             stopKey = "timeUsVideo";
    548             break;
    549 
    550         case LiveSession::STREAMTYPE_AUDIO:
    551             stopKey = "timeUsAudio";
    552             break;
    553 
    554         case LiveSession::STREAMTYPE_SUBTITLES:
    555             stopKey = "timeUsSubtitle";
    556             break;
    557 
    558         default:
    559             TRESPASS();
    560         }
    561 
    562         // Don't resume if we would stop within a resume threshold.
    563         int32_t discontinuitySeq;
    564         int64_t latestTimeUs = 0, stopTimeUs = 0;
    565         sp<AMessage> latestMeta = packetSource->getLatestEnqueuedMeta();
    566         if (latestMeta != NULL
    567                 && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq)
    568                 && discontinuitySeq == mDiscontinuitySeq
    569                 && latestMeta->findInt64("timeUs", &latestTimeUs)
    570                 && params->findInt64(stopKey, &stopTimeUs)
    571                 && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) {
    572             stop = true;
    573         }
    574     }
    575 
    576     if (stop) {
    577         for (size_t i = 0; i < mPacketSources.size(); i++) {
    578             mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer());
    579         }
    580         stopAsync(/* clear = */ false);
    581         return OK;
    582     }
    583 
    584     mStopParams = params;
    585     postMonitorQueue();
    586 
    587     return OK;
    588 }
    589 
    590 void PlaylistFetcher::notifyError(status_t err) {
    591     sp<AMessage> notify = mNotify->dup();
    592     notify->setInt32("what", kWhatError);
    593     notify->setInt32("err", err);
    594     notify->post();
    595 }
    596 
    597 void PlaylistFetcher::queueDiscontinuity(
    598         ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
    599     for (size_t i = 0; i < mPacketSources.size(); ++i) {
    600         // do not discard buffer upon #EXT-X-DISCONTINUITY tag
    601         // (seek will discard buffer by abandoning old fetchers)
    602         mPacketSources.valueAt(i)->queueDiscontinuity(
    603                 type, extra, false /* discard */);
    604     }
    605 }
    606 
    607 void PlaylistFetcher::onMonitorQueue() {
    608     bool downloadMore = false;
    609     refreshPlaylist();
    610 
    611     int32_t targetDurationSecs;
    612     int64_t targetDurationUs = kMinBufferedDurationUs;
    613     if (mPlaylist != NULL) {
    614         if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
    615                 "target-duration", &targetDurationSecs)) {
    616             ALOGE("Playlist is missing required EXT-X-TARGETDURATION tag");
    617             notifyError(ERROR_MALFORMED);
    618             return;
    619         }
    620         targetDurationUs = targetDurationSecs * 1000000ll;
    621     }
    622 
    623     // buffer at least 3 times the target duration, or up to 10 seconds
    624     int64_t durationToBufferUs = targetDurationUs * 3;
    625     if (durationToBufferUs > kMinBufferedDurationUs)  {
    626         durationToBufferUs = kMinBufferedDurationUs;
    627     }
    628 
    629     int64_t bufferedDurationUs = 0ll;
    630     status_t finalResult = NOT_ENOUGH_DATA;
    631     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
    632         sp<AnotherPacketSource> packetSource =
    633             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
    634 
    635         bufferedDurationUs =
    636                 packetSource->getBufferedDurationUs(&finalResult);
    637         finalResult = OK;
    638     } else {
    639         // Use max stream duration to prevent us from waiting on a non-existent stream;
    640         // when we cannot make out from the manifest what streams are included in a playlist
    641         // we might assume extra streams.
    642         for (size_t i = 0; i < mPacketSources.size(); ++i) {
    643             if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
    644                 continue;
    645             }
    646 
    647             int64_t bufferedStreamDurationUs =
    648                 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
    649             ALOGV("buffered %" PRId64 " for stream %d",
    650                     bufferedStreamDurationUs, mPacketSources.keyAt(i));
    651             if (bufferedStreamDurationUs > bufferedDurationUs) {
    652                 bufferedDurationUs = bufferedStreamDurationUs;
    653             }
    654         }
    655     }
    656     downloadMore = (bufferedDurationUs < durationToBufferUs);
    657 
    658     // signal start if buffered up at least the target size
    659     if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
    660         mPrepared = true;
    661 
    662         ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
    663                 bufferedDurationUs, targetDurationUs);
    664         sp<AMessage> msg = mNotify->dup();
    665         msg->setInt32("what", kWhatTemporarilyDoneFetching);
    666         msg->post();
    667     }
    668 
    669     if (finalResult == OK && downloadMore) {
    670         ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
    671                 bufferedDurationUs, durationToBufferUs);
    672         // delay the next download slightly; hopefully this gives other concurrent fetchers
    673         // a better chance to run.
    674         // onDownloadNext();
    675         sp<AMessage> msg = new AMessage(kWhatDownloadNext, id());
    676         msg->setInt32("generation", mMonitorQueueGeneration);
    677         msg->post(1000l);
    678     } else {
    679         // Nothing to do yet, try again in a second.
    680 
    681         sp<AMessage> msg = mNotify->dup();
    682         msg->setInt32("what", kWhatTemporarilyDoneFetching);
    683         msg->post();
    684 
    685         int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
    686         ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
    687                 delayUs, bufferedDurationUs, durationToBufferUs);
    688         // :TRICKY: need to enforce minimum delay because the delay to
    689         // refresh the playlist will become 0
    690         postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
    691     }
    692 }
    693 
    694 status_t PlaylistFetcher::refreshPlaylist() {
    695     if (delayUsToRefreshPlaylist() <= 0) {
    696         bool unchanged;
    697         sp<M3UParser> playlist = mSession->fetchPlaylist(
    698                 mURI.c_str(), mPlaylistHash, &unchanged);
    699 
    700         if (playlist == NULL) {
    701             if (unchanged) {
    702                 // We succeeded in fetching the playlist, but it was
    703                 // unchanged from the last time we tried.
    704 
    705                 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
    706                     mRefreshState = (RefreshState)(mRefreshState + 1);
    707                 }
    708             } else {
    709                 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
    710                 return ERROR_IO;
    711             }
    712         } else {
    713             mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
    714             mPlaylist = playlist;
    715 
    716             if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
    717                 updateDuration();
    718             }
    719         }
    720 
    721         mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
    722     }
    723     return OK;
    724 }
    725 
    726 // static
    727 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
    728     return buffer->size() > 0 && buffer->data()[0] == 0x47;
    729 }
    730 
    731 void PlaylistFetcher::onDownloadNext() {
    732     status_t err = refreshPlaylist();
    733     int32_t firstSeqNumberInPlaylist = 0;
    734     int32_t lastSeqNumberInPlaylist = 0;
    735     bool discontinuity = false;
    736 
    737     if (mPlaylist != NULL) {
    738         if (mPlaylist->meta() != NULL) {
    739             mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist);
    740         }
    741 
    742         lastSeqNumberInPlaylist =
    743                 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1;
    744 
    745         if (mDiscontinuitySeq < 0) {
    746             mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
    747         }
    748     }
    749 
    750     if (mPlaylist != NULL && mSeqNumber < 0) {
    751         CHECK_GE(mStartTimeUs, 0ll);
    752 
    753         if (mSegmentStartTimeUs < 0) {
    754             if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
    755                 // If this is a live session, start 3 segments from the end on connect
    756                 mSeqNumber = lastSeqNumberInPlaylist - 3;
    757                 if (mSeqNumber < firstSeqNumberInPlaylist) {
    758                     mSeqNumber = firstSeqNumberInPlaylist;
    759                 }
    760             } else {
    761                 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
    762                 // use mStartTimeUs (client supplied timestamp) to determine both start segment
    763                 // and relative position inside a segment
    764                 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
    765                 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
    766             }
    767             mStartTimeUsRelative = true;
    768             ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)",
    769                     mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
    770                     lastSeqNumberInPlaylist);
    771         } else {
    772             // When adapting or track switching, mSegmentStartTimeUs (relative
    773             // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
    774             // timestamps coming from the media container) is used to determine the position
    775             // inside a segments.
    776             mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
    777             if (mAdaptive) {
    778                 // avoid double fetch/decode
    779                 mSeqNumber += 1;
    780             }
    781             ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
    782             if (mSeqNumber < minSeq) {
    783                 mSeqNumber = minSeq;
    784             }
    785 
    786             if (mSeqNumber < firstSeqNumberInPlaylist) {
    787                 mSeqNumber = firstSeqNumberInPlaylist;
    788             }
    789 
    790             if (mSeqNumber > lastSeqNumberInPlaylist) {
    791                 mSeqNumber = lastSeqNumberInPlaylist;
    792             }
    793             ALOGV("Initial sequence number for live event %d from (%d .. %d)",
    794                     mSeqNumber, firstSeqNumberInPlaylist,
    795                     lastSeqNumberInPlaylist);
    796         }
    797     }
    798 
    799     // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
    800     if (mSeqNumber < firstSeqNumberInPlaylist
    801             || mSeqNumber > lastSeqNumberInPlaylist
    802             || err != OK) {
    803         if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
    804             ++mNumRetries;
    805 
    806             if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
    807                 // make sure we reach this retry logic on refresh failures
    808                 // by adding an err != OK clause to all enclosing if's.
    809 
    810                 // refresh in increasing fraction (1/2, 1/3, ...) of the
    811                 // playlist's target duration or 3 seconds, whichever is less
    812                 int64_t delayUs = kMaxMonitorDelayUs;
    813                 if (mPlaylist != NULL && mPlaylist->meta() != NULL) {
    814                     int32_t targetDurationSecs;
    815                     CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
    816                     delayUs = mPlaylist->size() * targetDurationSecs *
    817                             1000000ll / (1 + mNumRetries);
    818                 }
    819                 if (delayUs > kMaxMonitorDelayUs) {
    820                     delayUs = kMaxMonitorDelayUs;
    821                 }
    822                 ALOGV("sequence number high: %d from (%d .. %d), "
    823                       "monitor in %" PRId64 " (retry=%d)",
    824                         mSeqNumber, firstSeqNumberInPlaylist,
    825                         lastSeqNumberInPlaylist, delayUs, mNumRetries);
    826                 postMonitorQueue(delayUs);
    827                 return;
    828             }
    829 
    830             if (err != OK) {
    831                 notifyError(err);
    832                 return;
    833             }
    834 
    835             // we've missed the boat, let's start 3 segments prior to the latest sequence
    836             // number available and signal a discontinuity.
    837 
    838             ALOGI("We've missed the boat, restarting playback."
    839                   "  mStartup=%d, was  looking for %d in %d-%d",
    840                     mStartup, mSeqNumber, firstSeqNumberInPlaylist,
    841                     lastSeqNumberInPlaylist);
    842             if (mStopParams != NULL) {
    843                 // we should have kept on fetching until we hit the boundaries in mStopParams,
    844                 // but since the segments we are supposed to fetch have already rolled off
    845                 // the playlist, i.e. we have already missed the boat, we inevitably have to
    846                 // skip.
    847                 for (size_t i = 0; i < mPacketSources.size(); i++) {
    848                     sp<ABuffer> formatChange = mSession->createFormatChangeBuffer();
    849                     mPacketSources.valueAt(i)->queueAccessUnit(formatChange);
    850                 }
    851                 stopAsync(/* clear = */ false);
    852                 return;
    853             }
    854             mSeqNumber = lastSeqNumberInPlaylist - 3;
    855             if (mSeqNumber < firstSeqNumberInPlaylist) {
    856                 mSeqNumber = firstSeqNumberInPlaylist;
    857             }
    858             discontinuity = true;
    859 
    860             // fall through
    861         } else {
    862             ALOGE("Cannot find sequence number %d in playlist "
    863                  "(contains %d - %d)",
    864                  mSeqNumber, firstSeqNumberInPlaylist,
    865                   firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
    866 
    867             notifyError(ERROR_END_OF_STREAM);
    868             return;
    869         }
    870     }
    871 
    872     mNumRetries = 0;
    873 
    874     AString uri;
    875     sp<AMessage> itemMeta;
    876     CHECK(mPlaylist->itemAt(
    877                 mSeqNumber - firstSeqNumberInPlaylist,
    878                 &uri,
    879                 &itemMeta));
    880 
    881     int32_t val;
    882     if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
    883         mDiscontinuitySeq++;
    884         discontinuity = true;
    885     }
    886 
    887     int64_t range_offset, range_length;
    888     if (!itemMeta->findInt64("range-offset", &range_offset)
    889             || !itemMeta->findInt64("range-length", &range_length)) {
    890         range_offset = 0;
    891         range_length = -1;
    892     }
    893 
    894     ALOGV("fetching segment %d from (%d .. %d)",
    895           mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
    896 
    897     ALOGV("fetching '%s'", uri.c_str());
    898 
    899     sp<DataSource> source;
    900     sp<ABuffer> buffer, tsBuffer;
    901     // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
    902     // this avoids interleaved connections to the key and segment file.
    903     {
    904         sp<ABuffer> junk = new ABuffer(16);
    905         junk->setRange(0, 16);
    906         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
    907                 true /* first */);
    908         if (err != OK) {
    909             notifyError(err);
    910             return;
    911         }
    912     }
    913 
    914     // block-wise download
    915     bool startup = mStartup;
    916     ssize_t bytesRead;
    917     do {
    918         bytesRead = mSession->fetchFile(
    919                 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
    920 
    921         if (bytesRead < 0) {
    922             status_t err = bytesRead;
    923             ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
    924             notifyError(err);
    925             return;
    926         }
    927 
    928         CHECK(buffer != NULL);
    929 
    930         size_t size = buffer->size();
    931         // Set decryption range.
    932         buffer->setRange(size - bytesRead, bytesRead);
    933         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
    934                 buffer->offset() == 0 /* first */);
    935         // Unset decryption range.
    936         buffer->setRange(0, size);
    937 
    938         if (err != OK) {
    939             ALOGE("decryptBuffer failed w/ error %d", err);
    940 
    941             notifyError(err);
    942             return;
    943         }
    944 
    945         if (startup || discontinuity) {
    946             // Signal discontinuity.
    947 
    948             if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
    949                 // If this was a live event this made no sense since
    950                 // we don't have access to all the segment before the current
    951                 // one.
    952                 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
    953             }
    954 
    955             if (discontinuity) {
    956                 ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
    957 
    958                 queueDiscontinuity(
    959                         ATSParser::DISCONTINUITY_FORMATCHANGE,
    960                         NULL /* extra */);
    961 
    962                 discontinuity = false;
    963             }
    964 
    965             startup = false;
    966         }
    967 
    968         err = OK;
    969         if (bufferStartsWithTsSyncByte(buffer)) {
    970             // Incremental extraction is only supported for MPEG2 transport streams.
    971             if (tsBuffer == NULL) {
    972                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
    973                 tsBuffer->setRange(0, 0);
    974             } else if (tsBuffer->capacity() != buffer->capacity()) {
    975                 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
    976                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
    977                 tsBuffer->setRange(tsOff, tsSize);
    978             }
    979             tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
    980 
    981             err = extractAndQueueAccessUnitsFromTs(tsBuffer);
    982         }
    983 
    984         if (err == -EAGAIN) {
    985             // starting sequence number too low/high
    986             mTSParser.clear();
    987             for (size_t i = 0; i < mPacketSources.size(); i++) {
    988                 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
    989                 packetSource->clear();
    990             }
    991             postMonitorQueue();
    992             return;
    993         } else if (err == ERROR_OUT_OF_RANGE) {
    994             // reached stopping point
    995             stopAsync(/* clear = */ false);
    996             return;
    997         } else if (err != OK) {
    998             notifyError(err);
    999             return;
   1000         }
   1001 
   1002     } while (bytesRead != 0);
   1003 
   1004     if (bufferStartsWithTsSyncByte(buffer)) {
   1005         // If we don't see a stream in the program table after fetching a full ts segment
   1006         // mark it as nonexistent.
   1007         const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
   1008         ATSParser::SourceType srcTypes[kNumTypes] =
   1009                 { ATSParser::VIDEO, ATSParser::AUDIO };
   1010         LiveSession::StreamType streamTypes[kNumTypes] =
   1011                 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
   1012 
   1013         for (size_t i = 0; i < kNumTypes; i++) {
   1014             ATSParser::SourceType srcType = srcTypes[i];
   1015             LiveSession::StreamType streamType = streamTypes[i];
   1016 
   1017             sp<AnotherPacketSource> source =
   1018                 static_cast<AnotherPacketSource *>(
   1019                     mTSParser->getSource(srcType).get());
   1020 
   1021             if (!mTSParser->hasSource(srcType)) {
   1022                 ALOGW("MPEG2 Transport stream does not contain %s data.",
   1023                       srcType == ATSParser::VIDEO ? "video" : "audio");
   1024 
   1025                 mStreamTypeMask &= ~streamType;
   1026                 mPacketSources.removeItem(streamType);
   1027             }
   1028         }
   1029 
   1030     }
   1031 
   1032     if (checkDecryptPadding(buffer) != OK) {
   1033         ALOGE("Incorrect padding bytes after decryption.");
   1034         notifyError(ERROR_MALFORMED);
   1035         return;
   1036     }
   1037 
   1038     err = OK;
   1039     if (tsBuffer != NULL) {
   1040         AString method;
   1041         CHECK(buffer->meta()->findString("cipher-method", &method));
   1042         if ((tsBuffer->size() > 0 && method == "NONE")
   1043                 || tsBuffer->size() > 16) {
   1044             ALOGE("MPEG2 transport stream is not an even multiple of 188 "
   1045                     "bytes in length.");
   1046             notifyError(ERROR_MALFORMED);
   1047             return;
   1048         }
   1049     }
   1050 
   1051     // bulk extract non-ts files
   1052     if (tsBuffer == NULL) {
   1053         err = extractAndQueueAccessUnits(buffer, itemMeta);
   1054         if (err == -EAGAIN) {
   1055             // starting sequence number too low/high
   1056             postMonitorQueue();
   1057             return;
   1058         } else if (err == ERROR_OUT_OF_RANGE) {
   1059             // reached stopping point
   1060             stopAsync(/* clear = */false);
   1061             return;
   1062         }
   1063     }
   1064 
   1065     if (err != OK) {
   1066         notifyError(err);
   1067         return;
   1068     }
   1069 
   1070     ++mSeqNumber;
   1071 
   1072     postMonitorQueue();
   1073 }
   1074 
   1075 int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const {
   1076     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
   1077     if (mPlaylist->meta() == NULL
   1078             || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
   1079         firstSeqNumberInPlaylist = 0;
   1080     }
   1081     lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1;
   1082 
   1083     int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1;
   1084     while (index >= 0 && anchorTimeUs > mStartTimeUs) {
   1085         sp<AMessage> itemMeta;
   1086         CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
   1087 
   1088         int64_t itemDurationUs;
   1089         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
   1090 
   1091         anchorTimeUs -= itemDurationUs;
   1092         --index;
   1093     }
   1094 
   1095     int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1;
   1096     if (newSeqNumber <= lastSeqNumberInPlaylist) {
   1097         return newSeqNumber;
   1098     } else {
   1099         return lastSeqNumberInPlaylist;
   1100     }
   1101 }
   1102 
   1103 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
   1104     int32_t firstSeqNumberInPlaylist;
   1105     if (mPlaylist->meta() == NULL
   1106             || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) {
   1107         firstSeqNumberInPlaylist = 0;
   1108     }
   1109 
   1110     size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
   1111     if (discontinuitySeq < curDiscontinuitySeq) {
   1112         return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1);
   1113     }
   1114 
   1115     size_t index = 0;
   1116     while (index < mPlaylist->size()) {
   1117         sp<AMessage> itemMeta;
   1118         CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
   1119 
   1120         int64_t discontinuity;
   1121         if (itemMeta->findInt64("discontinuity", &discontinuity)) {
   1122             curDiscontinuitySeq++;
   1123         }
   1124 
   1125         if (curDiscontinuitySeq == discontinuitySeq) {
   1126             return firstSeqNumberInPlaylist + index;
   1127         }
   1128 
   1129         ++index;
   1130     }
   1131 
   1132     return firstSeqNumberInPlaylist + mPlaylist->size();
   1133 }
   1134 
   1135 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
   1136     int32_t firstSeqNumberInPlaylist;
   1137     if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
   1138                 "media-sequence", &firstSeqNumberInPlaylist)) {
   1139         firstSeqNumberInPlaylist = 0;
   1140     }
   1141 
   1142     size_t index = 0;
   1143     int64_t segmentStartUs = 0;
   1144     while (index < mPlaylist->size()) {
   1145         sp<AMessage> itemMeta;
   1146         CHECK(mPlaylist->itemAt(
   1147                     index, NULL /* uri */, &itemMeta));
   1148 
   1149         int64_t itemDurationUs;
   1150         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
   1151 
   1152         if (timeUs < segmentStartUs + itemDurationUs) {
   1153             break;
   1154         }
   1155 
   1156         segmentStartUs += itemDurationUs;
   1157         ++index;
   1158     }
   1159 
   1160     if (index >= mPlaylist->size()) {
   1161         index = mPlaylist->size() - 1;
   1162     }
   1163 
   1164     return firstSeqNumberInPlaylist + index;
   1165 }
   1166 
   1167 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
   1168         const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
   1169     sp<MetaData> format = source->getFormat();
   1170     if (format != NULL) {
   1171         // for simplicity, store a reference to the format in each unit
   1172         accessUnit->meta()->setObject("format", format);
   1173     }
   1174 
   1175     if (discard) {
   1176         accessUnit->meta()->setInt32("discard", discard);
   1177     }
   1178 
   1179     int32_t targetDurationSecs;
   1180     if (mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)) {
   1181         accessUnit->meta()->setInt32("targetDuration", targetDurationSecs);
   1182     }
   1183 
   1184     accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
   1185     accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
   1186     return accessUnit;
   1187 }
   1188 
   1189 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
   1190     if (mTSParser == NULL) {
   1191         // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
   1192         mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
   1193     }
   1194 
   1195     if (mNextPTSTimeUs >= 0ll) {
   1196         sp<AMessage> extra = new AMessage;
   1197         // Since we are using absolute timestamps, signal an offset of 0 to prevent
   1198         // ATSParser from skewing the timestamps of access units.
   1199         extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
   1200 
   1201         mTSParser->signalDiscontinuity(
   1202                 ATSParser::DISCONTINUITY_TIME, extra);
   1203 
   1204         mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
   1205         mNextPTSTimeUs = -1ll;
   1206         mFirstPTSValid = false;
   1207     }
   1208 
   1209     size_t offset = 0;
   1210     while (offset + 188 <= buffer->size()) {
   1211         status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
   1212 
   1213         if (err != OK) {
   1214             return err;
   1215         }
   1216 
   1217         offset += 188;
   1218     }
   1219     // setRange to indicate consumed bytes.
   1220     buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
   1221 
   1222     status_t err = OK;
   1223     for (size_t i = mPacketSources.size(); i-- > 0;) {
   1224         sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
   1225 
   1226         const char *key;
   1227         ATSParser::SourceType type;
   1228         const LiveSession::StreamType stream = mPacketSources.keyAt(i);
   1229         switch (stream) {
   1230             case LiveSession::STREAMTYPE_VIDEO:
   1231                 type = ATSParser::VIDEO;
   1232                 key = "timeUsVideo";
   1233                 break;
   1234 
   1235             case LiveSession::STREAMTYPE_AUDIO:
   1236                 type = ATSParser::AUDIO;
   1237                 key = "timeUsAudio";
   1238                 break;
   1239 
   1240             case LiveSession::STREAMTYPE_SUBTITLES:
   1241             {
   1242                 ALOGE("MPEG2 Transport streams do not contain subtitles.");
   1243                 return ERROR_MALFORMED;
   1244                 break;
   1245             }
   1246 
   1247             default:
   1248                 TRESPASS();
   1249         }
   1250 
   1251         sp<AnotherPacketSource> source =
   1252             static_cast<AnotherPacketSource *>(
   1253                     mTSParser->getSource(type).get());
   1254 
   1255         if (source == NULL) {
   1256             continue;
   1257         }
   1258 
   1259         int64_t timeUs;
   1260         sp<ABuffer> accessUnit;
   1261         status_t finalResult;
   1262         while (source->hasBufferAvailable(&finalResult)
   1263                 && source->dequeueAccessUnit(&accessUnit) == OK) {
   1264 
   1265             CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
   1266 
   1267             if (mStartup) {
   1268                 if (!mFirstPTSValid) {
   1269                     mFirstTimeUs = timeUs;
   1270                     mFirstPTSValid = true;
   1271                 }
   1272                 if (mStartTimeUsRelative) {
   1273                     timeUs -= mFirstTimeUs;
   1274                     if (timeUs < 0) {
   1275                         timeUs = 0;
   1276                     }
   1277                 }
   1278 
   1279                 if (timeUs < mStartTimeUs) {
   1280                     // buffer up to the closest preceding IDR frame
   1281                     ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
   1282                             timeUs, mStartTimeUs);
   1283                     const char *mime;
   1284                     sp<MetaData> format  = source->getFormat();
   1285                     bool isAvc = false;
   1286                     if (format != NULL && format->findCString(kKeyMIMEType, &mime)
   1287                             && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
   1288                         isAvc = true;
   1289                     }
   1290                     if (isAvc && IsIDR(accessUnit)) {
   1291                         mVideoBuffer->clear();
   1292                     }
   1293                     if (isAvc) {
   1294                         mVideoBuffer->queueAccessUnit(accessUnit);
   1295                     }
   1296 
   1297                     continue;
   1298                 }
   1299             }
   1300 
   1301             CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
   1302             if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) {
   1303                 int32_t firstSeqNumberInPlaylist;
   1304                 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
   1305                             "media-sequence", &firstSeqNumberInPlaylist)) {
   1306                     firstSeqNumberInPlaylist = 0;
   1307                 }
   1308 
   1309                 int32_t targetDurationSecs;
   1310                 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
   1311                 int64_t targetDurationUs = targetDurationSecs * 1000000ll;
   1312                 // mStartup
   1313                 //   mStartup is true until we have queued a packet for all the streams
   1314                 //   we are fetching. We queue packets whose timestamps are greater than
   1315                 //   mStartTimeUs.
   1316                 // mSegmentStartTimeUs >= 0
   1317                 //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
   1318                 // mSeqNumber > firstSeqNumberInPlaylist
   1319                 //   don't decrement mSeqNumber if it already points to the 1st segment
   1320                 // timeUs - mStartTimeUs > targetDurationUs:
   1321                 //   This and the 2 above conditions should only happen when adapting in a live
   1322                 //   stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher
   1323                 //   would start fetching after timeUs, which should be greater than mStartTimeUs;
   1324                 //   the old fetcher would then continue fetching data until timeUs. We don't want
   1325                 //   timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to
   1326                 //   stop as early as possible. The definition of being "too far ahead" is
   1327                 //   arbitrary; here we use targetDurationUs as threshold.
   1328                 if (mStartup && mSegmentStartTimeUs >= 0
   1329                         && mSeqNumber > firstSeqNumberInPlaylist
   1330                         && timeUs - mStartTimeUs > targetDurationUs) {
   1331                     // we just guessed a starting timestamp that is too high when adapting in a
   1332                     // live stream; re-adjust based on the actual timestamp extracted from the
   1333                     // media segment; if we didn't move backward after the re-adjustment
   1334                     // (newSeqNumber), start at least 1 segment prior.
   1335                     int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
   1336                     if (newSeqNumber >= mSeqNumber) {
   1337                         --mSeqNumber;
   1338                     } else {
   1339                         mSeqNumber = newSeqNumber;
   1340                     }
   1341                     mStartTimeUsNotify = mNotify->dup();
   1342                     mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
   1343                     return -EAGAIN;
   1344                 }
   1345 
   1346                 int32_t seq;
   1347                 if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) {
   1348                     mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
   1349                 }
   1350                 int64_t startTimeUs;
   1351                 if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) {
   1352                     mStartTimeUsNotify->setInt64(key, timeUs);
   1353 
   1354                     uint32_t streamMask = 0;
   1355                     mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
   1356                     streamMask |= mPacketSources.keyAt(i);
   1357                     mStartTimeUsNotify->setInt32("streamMask", streamMask);
   1358 
   1359                     if (streamMask == mStreamTypeMask) {
   1360                         mStartup = false;
   1361                         mStartTimeUsNotify->post();
   1362                         mStartTimeUsNotify.clear();
   1363                     }
   1364                 }
   1365             }
   1366 
   1367             if (mStopParams != NULL) {
   1368                 // Queue discontinuity in original stream.
   1369                 int32_t discontinuitySeq;
   1370                 int64_t stopTimeUs;
   1371                 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
   1372                         || discontinuitySeq > mDiscontinuitySeq
   1373                         || !mStopParams->findInt64(key, &stopTimeUs)
   1374                         || (discontinuitySeq == mDiscontinuitySeq
   1375                                 && timeUs >= stopTimeUs)) {
   1376                     packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
   1377                     mStreamTypeMask &= ~stream;
   1378                     mPacketSources.removeItemsAt(i);
   1379                     break;
   1380                 }
   1381             }
   1382 
   1383             // Note that we do NOT dequeue any discontinuities except for format change.
   1384             if (stream == LiveSession::STREAMTYPE_VIDEO) {
   1385                 const bool discard = true;
   1386                 status_t status;
   1387                 while (mVideoBuffer->hasBufferAvailable(&status)) {
   1388                     sp<ABuffer> videoBuffer;
   1389                     mVideoBuffer->dequeueAccessUnit(&videoBuffer);
   1390                     setAccessUnitProperties(videoBuffer, source, discard);
   1391                     packetSource->queueAccessUnit(videoBuffer);
   1392                 }
   1393             }
   1394 
   1395             setAccessUnitProperties(accessUnit, source);
   1396             packetSource->queueAccessUnit(accessUnit);
   1397         }
   1398 
   1399         if (err != OK) {
   1400             break;
   1401         }
   1402     }
   1403 
   1404     if (err != OK) {
   1405         for (size_t i = mPacketSources.size(); i-- > 0;) {
   1406             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
   1407             packetSource->clear();
   1408         }
   1409         return err;
   1410     }
   1411 
   1412     if (!mStreamTypeMask) {
   1413         // Signal gap is filled between original and new stream.
   1414         ALOGV("ERROR OUT OF RANGE");
   1415         return ERROR_OUT_OF_RANGE;
   1416     }
   1417 
   1418     return OK;
   1419 }
   1420 
   1421 /* static */
   1422 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
   1423         const sp<ABuffer> &buffer) {
   1424     size_t pos = 0;
   1425 
   1426     // skip possible BOM
   1427     if (buffer->size() >= pos + 3 &&
   1428             !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
   1429         pos += 3;
   1430     }
   1431 
   1432     // accept WEBVTT followed by SPACE, TAB or (CR) LF
   1433     if (buffer->size() < pos + 6 ||
   1434             memcmp("WEBVTT", buffer->data() + pos, 6)) {
   1435         return false;
   1436     }
   1437     pos += 6;
   1438 
   1439     if (buffer->size() == pos) {
   1440         return true;
   1441     }
   1442 
   1443     uint8_t sep = buffer->data()[pos];
   1444     return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
   1445 }
   1446 
   1447 status_t PlaylistFetcher::extractAndQueueAccessUnits(
   1448         const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
   1449     if (bufferStartsWithWebVTTMagicSequence(buffer)) {
   1450         if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
   1451             ALOGE("This stream only contains subtitles.");
   1452             return ERROR_MALFORMED;
   1453         }
   1454 
   1455         const sp<AnotherPacketSource> packetSource =
   1456             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
   1457 
   1458         int64_t durationUs;
   1459         CHECK(itemMeta->findInt64("durationUs", &durationUs));
   1460         buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
   1461         buffer->meta()->setInt64("durationUs", durationUs);
   1462         buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
   1463         buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
   1464         buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
   1465 
   1466         packetSource->queueAccessUnit(buffer);
   1467         return OK;
   1468     }
   1469 
   1470     if (mNextPTSTimeUs >= 0ll) {
   1471         mFirstPTSValid = false;
   1472         mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
   1473         mNextPTSTimeUs = -1ll;
   1474     }
   1475 
   1476     // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
   1477     // stream prefixed by an ID3 tag.
   1478 
   1479     bool firstID3Tag = true;
   1480     uint64_t PTS = 0;
   1481 
   1482     for (;;) {
   1483         // Make sure to skip all ID3 tags preceding the audio data.
   1484         // At least one must be present to provide the PTS timestamp.
   1485 
   1486         ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
   1487         if (!id3.isValid()) {
   1488             if (firstID3Tag) {
   1489                 ALOGE("Unable to parse ID3 tag.");
   1490                 return ERROR_MALFORMED;
   1491             } else {
   1492                 break;
   1493             }
   1494         }
   1495 
   1496         if (firstID3Tag) {
   1497             bool found = false;
   1498 
   1499             ID3::Iterator it(id3, "PRIV");
   1500             while (!it.done()) {
   1501                 size_t length;
   1502                 const uint8_t *data = it.getData(&length);
   1503 
   1504                 static const char *kMatchName =
   1505                     "com.apple.streaming.transportStreamTimestamp";
   1506                 static const size_t kMatchNameLen = strlen(kMatchName);
   1507 
   1508                 if (length == kMatchNameLen + 1 + 8
   1509                         && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
   1510                     found = true;
   1511                     PTS = U64_AT(&data[kMatchNameLen + 1]);
   1512                 }
   1513 
   1514                 it.next();
   1515             }
   1516 
   1517             if (!found) {
   1518                 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
   1519                 return ERROR_MALFORMED;
   1520             }
   1521         }
   1522 
   1523         // skip the ID3 tag
   1524         buffer->setRange(
   1525                 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
   1526 
   1527         firstID3Tag = false;
   1528     }
   1529 
   1530     if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
   1531         ALOGW("This stream only contains audio data!");
   1532 
   1533         mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
   1534 
   1535         if (mStreamTypeMask == 0) {
   1536             return OK;
   1537         }
   1538     }
   1539 
   1540     sp<AnotherPacketSource> packetSource =
   1541         mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
   1542 
   1543     if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
   1544         ABitReader bits(buffer->data(), buffer->size());
   1545 
   1546         // adts_fixed_header
   1547 
   1548         CHECK_EQ(bits.getBits(12), 0xfffu);
   1549         bits.skipBits(3);  // ID, layer
   1550         bool protection_absent = bits.getBits(1) != 0;
   1551 
   1552         unsigned profile = bits.getBits(2);
   1553         CHECK_NE(profile, 3u);
   1554         unsigned sampling_freq_index = bits.getBits(4);
   1555         bits.getBits(1);  // private_bit
   1556         unsigned channel_configuration = bits.getBits(3);
   1557         CHECK_NE(channel_configuration, 0u);
   1558         bits.skipBits(2);  // original_copy, home
   1559 
   1560         sp<MetaData> meta = MakeAACCodecSpecificData(
   1561                 profile, sampling_freq_index, channel_configuration);
   1562 
   1563         meta->setInt32(kKeyIsADTS, true);
   1564 
   1565         packetSource->setFormat(meta);
   1566     }
   1567 
   1568     int64_t numSamples = 0ll;
   1569     int32_t sampleRate;
   1570     CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
   1571 
   1572     int64_t timeUs = (PTS * 100ll) / 9ll;
   1573     if (!mFirstPTSValid) {
   1574         mFirstPTSValid = true;
   1575         mFirstTimeUs = timeUs;
   1576     }
   1577 
   1578     size_t offset = 0;
   1579     while (offset < buffer->size()) {
   1580         const uint8_t *adtsHeader = buffer->data() + offset;
   1581         CHECK_LT(offset + 5, buffer->size());
   1582 
   1583         unsigned aac_frame_length =
   1584             ((adtsHeader[3] & 3) << 11)
   1585             | (adtsHeader[4] << 3)
   1586             | (adtsHeader[5] >> 5);
   1587 
   1588         if (aac_frame_length == 0) {
   1589             const uint8_t *id3Header = adtsHeader;
   1590             if (!memcmp(id3Header, "ID3", 3)) {
   1591                 ID3 id3(id3Header, buffer->size() - offset, true);
   1592                 if (id3.isValid()) {
   1593                     offset += id3.rawSize();
   1594                     continue;
   1595                 };
   1596             }
   1597             return ERROR_MALFORMED;
   1598         }
   1599 
   1600         CHECK_LE(offset + aac_frame_length, buffer->size());
   1601 
   1602         int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
   1603         offset += aac_frame_length;
   1604 
   1605         // Each AAC frame encodes 1024 samples.
   1606         numSamples += 1024;
   1607 
   1608         if (mStartup) {
   1609             int64_t startTimeUs = unitTimeUs;
   1610             if (mStartTimeUsRelative) {
   1611                 startTimeUs -= mFirstTimeUs;
   1612                 if (startTimeUs  < 0) {
   1613                     startTimeUs = 0;
   1614                 }
   1615             }
   1616             if (startTimeUs < mStartTimeUs) {
   1617                 continue;
   1618             }
   1619 
   1620             if (mStartTimeUsNotify != NULL) {
   1621                 int32_t targetDurationSecs;
   1622                 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs));
   1623                 int64_t targetDurationUs = targetDurationSecs * 1000000ll;
   1624 
   1625                 // Duplicated logic from how we handle .ts playlists.
   1626                 if (mStartup && mSegmentStartTimeUs >= 0
   1627                         && timeUs - mStartTimeUs > targetDurationUs) {
   1628                     int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs);
   1629                     if (newSeqNumber >= mSeqNumber) {
   1630                         --mSeqNumber;
   1631                     } else {
   1632                         mSeqNumber = newSeqNumber;
   1633                     }
   1634                     return -EAGAIN;
   1635                 }
   1636 
   1637                 mStartTimeUsNotify->setInt64("timeUsAudio", timeUs);
   1638                 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq);
   1639                 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
   1640                 mStartTimeUsNotify->post();
   1641                 mStartTimeUsNotify.clear();
   1642                 mStartup = false;
   1643             }
   1644         }
   1645 
   1646         if (mStopParams != NULL) {
   1647             // Queue discontinuity in original stream.
   1648             int32_t discontinuitySeq;
   1649             int64_t stopTimeUs;
   1650             if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
   1651                     || discontinuitySeq > mDiscontinuitySeq
   1652                     || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
   1653                     || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
   1654                 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer());
   1655                 mStreamTypeMask = 0;
   1656                 mPacketSources.clear();
   1657                 return ERROR_OUT_OF_RANGE;
   1658             }
   1659         }
   1660 
   1661         sp<ABuffer> unit = new ABuffer(aac_frame_length);
   1662         memcpy(unit->data(), adtsHeader, aac_frame_length);
   1663 
   1664         unit->meta()->setInt64("timeUs", unitTimeUs);
   1665         setAccessUnitProperties(unit, packetSource);
   1666         packetSource->queueAccessUnit(unit);
   1667     }
   1668 
   1669     return OK;
   1670 }
   1671 
   1672 void PlaylistFetcher::updateDuration() {
   1673     int64_t durationUs = 0ll;
   1674     for (size_t index = 0; index < mPlaylist->size(); ++index) {
   1675         sp<AMessage> itemMeta;
   1676         CHECK(mPlaylist->itemAt(
   1677                     index, NULL /* uri */, &itemMeta));
   1678 
   1679         int64_t itemDurationUs;
   1680         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
   1681 
   1682         durationUs += itemDurationUs;
   1683     }
   1684 
   1685     sp<AMessage> msg = mNotify->dup();
   1686     msg->setInt32("what", kWhatDurationUpdate);
   1687     msg->setInt64("durationUs", durationUs);
   1688     msg->post();
   1689 }
   1690 
   1691 int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) {
   1692     int64_t durationUs, threshold;
   1693     if (msg->findInt64("durationUs", &durationUs) && durationUs > 0) {
   1694         return kNumSkipFrames * durationUs;
   1695     }
   1696 
   1697     sp<RefBase> obj;
   1698     msg->findObject("format", &obj);
   1699     MetaData *format = static_cast<MetaData *>(obj.get());
   1700 
   1701     const char *mime;
   1702     CHECK(format->findCString(kKeyMIMEType, &mime));
   1703     bool audio = !strncasecmp(mime, "audio/", 6);
   1704     if (audio) {
   1705         // Assumes 1000 samples per frame.
   1706         int32_t sampleRate;
   1707         CHECK(format->findInt32(kKeySampleRate, &sampleRate));
   1708         return kNumSkipFrames  /* frames */ * 1000 /* samples */
   1709                 * (1000000 / sampleRate) /* sample duration (us) */;
   1710     } else {
   1711         int32_t frameRate;
   1712         if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) {
   1713             return kNumSkipFrames * (1000000 / frameRate);
   1714         }
   1715     }
   1716 
   1717     return 500000ll;
   1718 }
   1719 
   1720 }  // namespace android
   1721