Home | History | Annotate | Download | only in httplive
      1 /*
      2  * Copyright (C) 2012 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "PlaylistFetcher"
     19 #include <utils/Log.h>
     20 #include <utils/misc.h>
     21 
     22 #include "PlaylistFetcher.h"
     23 #include "HTTPDownloader.h"
     24 #include "LiveSession.h"
     25 #include "M3UParser.h"
     26 #include "include/avc_utils.h"
     27 #include "include/ID3.h"
     28 #include "mpeg2ts/AnotherPacketSource.h"
     29 
     30 #include <media/stagefright/foundation/ABitReader.h>
     31 #include <media/stagefright/foundation/ABuffer.h>
     32 #include <media/stagefright/foundation/ADebug.h>
     33 #include <media/stagefright/MediaDefs.h>
     34 #include <media/stagefright/MetaData.h>
     35 #include <media/stagefright/Utils.h>
     36 
     37 #include <ctype.h>
     38 #include <inttypes.h>
     39 #include <openssl/aes.h>
     40 
     41 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
     42 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
     43          LiveSession::getNameForStream(stream), ##__VA_ARGS__)
     44 
     45 namespace android {
     46 
     47 // static
     48 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll;
     49 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
     50 // LCM of 188 (size of a TS packet) & 1k works well
     51 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024;
     52 
     53 struct PlaylistFetcher::DownloadState : public RefBase {
     54     DownloadState();
     55     void resetState();
     56     bool hasSavedState() const;
     57     void restoreState(
     58             AString &uri,
     59             sp<AMessage> &itemMeta,
     60             sp<ABuffer> &buffer,
     61             sp<ABuffer> &tsBuffer,
     62             int32_t &firstSeqNumberInPlaylist,
     63             int32_t &lastSeqNumberInPlaylist);
     64     void saveState(
     65             AString &uri,
     66             sp<AMessage> &itemMeta,
     67             sp<ABuffer> &buffer,
     68             sp<ABuffer> &tsBuffer,
     69             int32_t &firstSeqNumberInPlaylist,
     70             int32_t &lastSeqNumberInPlaylist);
     71 
     72 private:
     73     bool mHasSavedState;
     74     AString mUri;
     75     sp<AMessage> mItemMeta;
     76     sp<ABuffer> mBuffer;
     77     sp<ABuffer> mTsBuffer;
     78     int32_t mFirstSeqNumberInPlaylist;
     79     int32_t mLastSeqNumberInPlaylist;
     80 };
     81 
     82 PlaylistFetcher::DownloadState::DownloadState() {
     83     resetState();
     84 }
     85 
     86 bool PlaylistFetcher::DownloadState::hasSavedState() const {
     87     return mHasSavedState;
     88 }
     89 
     90 void PlaylistFetcher::DownloadState::resetState() {
     91     mHasSavedState = false;
     92 
     93     mUri.clear();
     94     mItemMeta = NULL;
     95     mBuffer = NULL;
     96     mTsBuffer = NULL;
     97     mFirstSeqNumberInPlaylist = 0;
     98     mLastSeqNumberInPlaylist = 0;
     99 }
    100 
    101 void PlaylistFetcher::DownloadState::restoreState(
    102         AString &uri,
    103         sp<AMessage> &itemMeta,
    104         sp<ABuffer> &buffer,
    105         sp<ABuffer> &tsBuffer,
    106         int32_t &firstSeqNumberInPlaylist,
    107         int32_t &lastSeqNumberInPlaylist) {
    108     if (!mHasSavedState) {
    109         return;
    110     }
    111 
    112     uri = mUri;
    113     itemMeta = mItemMeta;
    114     buffer = mBuffer;
    115     tsBuffer = mTsBuffer;
    116     firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist;
    117     lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist;
    118 
    119     resetState();
    120 }
    121 
    122 void PlaylistFetcher::DownloadState::saveState(
    123         AString &uri,
    124         sp<AMessage> &itemMeta,
    125         sp<ABuffer> &buffer,
    126         sp<ABuffer> &tsBuffer,
    127         int32_t &firstSeqNumberInPlaylist,
    128         int32_t &lastSeqNumberInPlaylist) {
    129     mHasSavedState = true;
    130 
    131     mUri = uri;
    132     mItemMeta = itemMeta;
    133     mBuffer = buffer;
    134     mTsBuffer = tsBuffer;
    135     mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist;
    136     mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist;
    137 }
    138 
    139 PlaylistFetcher::PlaylistFetcher(
    140         const sp<AMessage> &notify,
    141         const sp<LiveSession> &session,
    142         const char *uri,
    143         int32_t id,
    144         int32_t subtitleGeneration)
    145     : mNotify(notify),
    146       mSession(session),
    147       mURI(uri),
    148       mFetcherID(id),
    149       mStreamTypeMask(0),
    150       mStartTimeUs(-1ll),
    151       mSegmentStartTimeUs(-1ll),
    152       mDiscontinuitySeq(-1ll),
    153       mStartTimeUsRelative(false),
    154       mLastPlaylistFetchTimeUs(-1ll),
    155       mPlaylistTimeUs(-1ll),
    156       mSeqNumber(-1),
    157       mNumRetries(0),
    158       mStartup(true),
    159       mIDRFound(false),
    160       mSeekMode(LiveSession::kSeekModeExactPosition),
    161       mTimeChangeSignaled(false),
    162       mNextPTSTimeUs(-1ll),
    163       mMonitorQueueGeneration(0),
    164       mSubtitleGeneration(subtitleGeneration),
    165       mLastDiscontinuitySeq(-1ll),
    166       mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
    167       mFirstPTSValid(false),
    168       mFirstTimeUs(-1ll),
    169       mVideoBuffer(new AnotherPacketSource(NULL)),
    170       mThresholdRatio(-1.0f),
    171       mDownloadState(new DownloadState()),
    172       mHasMetadata(false) {
    173     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
    174     mHTTPDownloader = mSession->getHTTPDownloader();
    175 }
    176 
    177 PlaylistFetcher::~PlaylistFetcher() {
    178 }
    179 
    180 int32_t PlaylistFetcher::getFetcherID() const {
    181     return mFetcherID;
    182 }
    183 
    184 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const {
    185     CHECK(mPlaylist != NULL);
    186 
    187     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
    188     mPlaylist->getSeqNumberRange(
    189             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
    190 
    191     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
    192     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
    193 
    194     int64_t segmentStartUs = 0ll;
    195     for (int32_t index = 0;
    196             index < seqNumber - firstSeqNumberInPlaylist; ++index) {
    197         sp<AMessage> itemMeta;
    198         CHECK(mPlaylist->itemAt(
    199                     index, NULL /* uri */, &itemMeta));
    200 
    201         int64_t itemDurationUs;
    202         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
    203 
    204         segmentStartUs += itemDurationUs;
    205     }
    206 
    207     return segmentStartUs;
    208 }
    209 
    210 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const {
    211     CHECK(mPlaylist != NULL);
    212 
    213     int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist;
    214     mPlaylist->getSeqNumberRange(
    215             &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
    216 
    217     CHECK_GE(seqNumber, firstSeqNumberInPlaylist);
    218     CHECK_LE(seqNumber, lastSeqNumberInPlaylist);
    219 
    220     int32_t index = seqNumber - firstSeqNumberInPlaylist;
    221     sp<AMessage> itemMeta;
    222     CHECK(mPlaylist->itemAt(
    223                 index, NULL /* uri */, &itemMeta));
    224 
    225     int64_t itemDurationUs;
    226     CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
    227 
    228     return itemDurationUs;
    229 }
    230 
    231 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const {
    232     int64_t nowUs = ALooper::GetNowUs();
    233 
    234     if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) {
    235         CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY);
    236         return 0ll;
    237     }
    238 
    239     if (mPlaylist->isComplete()) {
    240         return (~0llu >> 1);
    241     }
    242 
    243     int64_t targetDurationUs = mPlaylist->getTargetDuration();
    244 
    245     int64_t minPlaylistAgeUs;
    246 
    247     switch (mRefreshState) {
    248         case INITIAL_MINIMUM_RELOAD_DELAY:
    249         {
    250             size_t n = mPlaylist->size();
    251             if (n > 0) {
    252                 sp<AMessage> itemMeta;
    253                 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta));
    254 
    255                 int64_t itemDurationUs;
    256                 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
    257 
    258                 minPlaylistAgeUs = itemDurationUs;
    259                 break;
    260             }
    261 
    262             // fall through
    263         }
    264 
    265         case FIRST_UNCHANGED_RELOAD_ATTEMPT:
    266         {
    267             minPlaylistAgeUs = targetDurationUs / 2;
    268             break;
    269         }
    270 
    271         case SECOND_UNCHANGED_RELOAD_ATTEMPT:
    272         {
    273             minPlaylistAgeUs = (targetDurationUs * 3) / 2;
    274             break;
    275         }
    276 
    277         case THIRD_UNCHANGED_RELOAD_ATTEMPT:
    278         {
    279             minPlaylistAgeUs = targetDurationUs * 3;
    280             break;
    281         }
    282 
    283         default:
    284             TRESPASS();
    285             break;
    286     }
    287 
    288     int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs;
    289     return delayUs > 0ll ? delayUs : 0ll;
    290 }
    291 
    292 status_t PlaylistFetcher::decryptBuffer(
    293         size_t playlistIndex, const sp<ABuffer> &buffer,
    294         bool first) {
    295     sp<AMessage> itemMeta;
    296     bool found = false;
    297     AString method;
    298 
    299     for (ssize_t i = playlistIndex; i >= 0; --i) {
    300         AString uri;
    301         CHECK(mPlaylist->itemAt(i, &uri, &itemMeta));
    302 
    303         if (itemMeta->findString("cipher-method", &method)) {
    304             found = true;
    305             break;
    306         }
    307     }
    308 
    309     if (!found) {
    310         method = "NONE";
    311     }
    312     buffer->meta()->setString("cipher-method", method.c_str());
    313 
    314     if (method == "NONE") {
    315         return OK;
    316     } else if (!(method == "AES-128")) {
    317         ALOGE("Unsupported cipher method '%s'", method.c_str());
    318         return ERROR_UNSUPPORTED;
    319     }
    320 
    321     AString keyURI;
    322     if (!itemMeta->findString("cipher-uri", &keyURI)) {
    323         ALOGE("Missing key uri");
    324         return ERROR_MALFORMED;
    325     }
    326 
    327     ssize_t index = mAESKeyForURI.indexOfKey(keyURI);
    328 
    329     sp<ABuffer> key;
    330     if (index >= 0) {
    331         key = mAESKeyForURI.valueAt(index);
    332     } else {
    333         ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
    334 
    335         if (err == ERROR_NOT_CONNECTED) {
    336             return ERROR_NOT_CONNECTED;
    337         } else if (err < 0) {
    338             ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
    339             return ERROR_IO;
    340         } else if (key->size() != 16) {
    341             ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str());
    342             return ERROR_MALFORMED;
    343         }
    344 
    345         mAESKeyForURI.add(keyURI, key);
    346     }
    347 
    348     AES_KEY aes_key;
    349     if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) {
    350         ALOGE("failed to set AES decryption key.");
    351         return UNKNOWN_ERROR;
    352     }
    353 
    354     size_t n = buffer->size();
    355     if (!n) {
    356         return OK;
    357     }
    358 
    359     if (n < 16 || n % 16) {
    360         ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n);
    361         return ERROR_MALFORMED;
    362     }
    363 
    364     if (first) {
    365         // If decrypting the first block in a file, read the iv from the manifest
    366         // or derive the iv from the file's sequence number.
    367 
    368         AString iv;
    369         if (itemMeta->findString("cipher-iv", &iv)) {
    370             if ((!iv.startsWith("0x") && !iv.startsWith("0X"))
    371                     || iv.size() > 16 * 2 + 2) {
    372                 ALOGE("malformed cipher IV '%s'.", iv.c_str());
    373                 return ERROR_MALFORMED;
    374             }
    375 
    376             while (iv.size() < 16 * 2 + 2) {
    377                 iv.insert("0", 1, 2);
    378             }
    379 
    380             memset(mAESInitVec, 0, sizeof(mAESInitVec));
    381             for (size_t i = 0; i < 16; ++i) {
    382                 char c1 = tolower(iv.c_str()[2 + 2 * i]);
    383                 char c2 = tolower(iv.c_str()[3 + 2 * i]);
    384                 if (!isxdigit(c1) || !isxdigit(c2)) {
    385                     ALOGE("malformed cipher IV '%s'.", iv.c_str());
    386                     return ERROR_MALFORMED;
    387                 }
    388                 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10;
    389                 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10;
    390 
    391                 mAESInitVec[i] = nibble1 << 4 | nibble2;
    392             }
    393         } else {
    394             memset(mAESInitVec, 0, sizeof(mAESInitVec));
    395             mAESInitVec[15] = mSeqNumber & 0xff;
    396             mAESInitVec[14] = (mSeqNumber >> 8) & 0xff;
    397             mAESInitVec[13] = (mSeqNumber >> 16) & 0xff;
    398             mAESInitVec[12] = (mSeqNumber >> 24) & 0xff;
    399         }
    400     }
    401 
    402     AES_cbc_encrypt(
    403             buffer->data(), buffer->data(), buffer->size(),
    404             &aes_key, mAESInitVec, AES_DECRYPT);
    405 
    406     return OK;
    407 }
    408 
    409 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) {
    410     AString method;
    411     CHECK(buffer->meta()->findString("cipher-method", &method));
    412     if (method == "NONE") {
    413         return OK;
    414     }
    415 
    416     uint8_t padding = 0;
    417     if (buffer->size() > 0) {
    418         padding = buffer->data()[buffer->size() - 1];
    419     }
    420 
    421     if (padding > 16) {
    422         return ERROR_MALFORMED;
    423     }
    424 
    425     for (size_t i = buffer->size() - padding; i < padding; i++) {
    426         if (buffer->data()[i] != padding) {
    427             return ERROR_MALFORMED;
    428         }
    429     }
    430 
    431     buffer->setRange(buffer->offset(), buffer->size() - padding);
    432     return OK;
    433 }
    434 
    435 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) {
    436     int64_t maxDelayUs = delayUsToRefreshPlaylist();
    437     if (maxDelayUs < minDelayUs) {
    438         maxDelayUs = minDelayUs;
    439     }
    440     if (delayUs > maxDelayUs) {
    441         FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs);
    442         delayUs = maxDelayUs;
    443     }
    444     sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this);
    445     msg->setInt32("generation", mMonitorQueueGeneration);
    446     msg->post(delayUs);
    447 }
    448 
    449 void PlaylistFetcher::cancelMonitorQueue() {
    450     ++mMonitorQueueGeneration;
    451 }
    452 
    453 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
    454     {
    455         AutoMutex _l(mThresholdLock);
    456         mThresholdRatio = thresholdRatio;
    457     }
    458     if (disconnect) {
    459         mHTTPDownloader->disconnect();
    460     }
    461 }
    462 
    463 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
    464     {
    465         AutoMutex _l(mThresholdLock);
    466         mThresholdRatio = -1.0f;
    467     }
    468     if (disconnect) {
    469         mHTTPDownloader->disconnect();
    470     } else {
    471         // allow reconnect
    472         mHTTPDownloader->reconnect();
    473     }
    474 }
    475 
    476 float PlaylistFetcher::getStoppingThreshold() {
    477     AutoMutex _l(mThresholdLock);
    478     return mThresholdRatio;
    479 }
    480 
    481 void PlaylistFetcher::startAsync(
    482         const sp<AnotherPacketSource> &audioSource,
    483         const sp<AnotherPacketSource> &videoSource,
    484         const sp<AnotherPacketSource> &subtitleSource,
    485         const sp<AnotherPacketSource> &metadataSource,
    486         int64_t startTimeUs,
    487         int64_t segmentStartTimeUs,
    488         int32_t startDiscontinuitySeq,
    489         LiveSession::SeekMode seekMode) {
    490     sp<AMessage> msg = new AMessage(kWhatStart, this);
    491 
    492     uint32_t streamTypeMask = 0ul;
    493 
    494     if (audioSource != NULL) {
    495         msg->setPointer("audioSource", audioSource.get());
    496         streamTypeMask |= LiveSession::STREAMTYPE_AUDIO;
    497     }
    498 
    499     if (videoSource != NULL) {
    500         msg->setPointer("videoSource", videoSource.get());
    501         streamTypeMask |= LiveSession::STREAMTYPE_VIDEO;
    502     }
    503 
    504     if (subtitleSource != NULL) {
    505         msg->setPointer("subtitleSource", subtitleSource.get());
    506         streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES;
    507     }
    508 
    509     if (metadataSource != NULL) {
    510         msg->setPointer("metadataSource", metadataSource.get());
    511         // metadataSource does not affect streamTypeMask.
    512     }
    513 
    514     msg->setInt32("streamTypeMask", streamTypeMask);
    515     msg->setInt64("startTimeUs", startTimeUs);
    516     msg->setInt64("segmentStartTimeUs", segmentStartTimeUs);
    517     msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq);
    518     msg->setInt32("seekMode", seekMode);
    519     msg->post();
    520 }
    521 
    522 /*
    523  * pauseAsync
    524  *
    525  * threshold: 0.0f - pause after current fetch block (default 47Kbytes)
    526  *           -1.0f - pause after finishing current segment
    527  *        0.0~1.0f - pause if remaining of current segment exceeds threshold
    528  */
    529 void PlaylistFetcher::pauseAsync(
    530         float thresholdRatio, bool disconnect) {
    531     setStoppingThreshold(thresholdRatio, disconnect);
    532 
    533     (new AMessage(kWhatPause, this))->post();
    534 }
    535 
    536 void PlaylistFetcher::stopAsync(bool clear) {
    537     setStoppingThreshold(0.0f, true /* disconncect */);
    538 
    539     sp<AMessage> msg = new AMessage(kWhatStop, this);
    540     msg->setInt32("clear", clear);
    541     msg->post();
    542 }
    543 
    544 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> &params) {
    545     FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str());
    546 
    547     AMessage* msg = new AMessage(kWhatResumeUntil, this);
    548     msg->setMessage("params", params);
    549     msg->post();
    550 }
    551 
    552 void PlaylistFetcher::fetchPlaylistAsync() {
    553     (new AMessage(kWhatFetchPlaylist, this))->post();
    554 }
    555 
    556 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
    557     switch (msg->what()) {
    558         case kWhatStart:
    559         {
    560             status_t err = onStart(msg);
    561 
    562             sp<AMessage> notify = mNotify->dup();
    563             notify->setInt32("what", kWhatStarted);
    564             notify->setInt32("err", err);
    565             notify->post();
    566             break;
    567         }
    568 
    569         case kWhatPause:
    570         {
    571             onPause();
    572 
    573             sp<AMessage> notify = mNotify->dup();
    574             notify->setInt32("what", kWhatPaused);
    575             notify->setInt32("seekMode",
    576                     mDownloadState->hasSavedState()
    577                     ? LiveSession::kSeekModeNextSample
    578                     : LiveSession::kSeekModeNextSegment);
    579             notify->post();
    580             break;
    581         }
    582 
    583         case kWhatStop:
    584         {
    585             onStop(msg);
    586 
    587             sp<AMessage> notify = mNotify->dup();
    588             notify->setInt32("what", kWhatStopped);
    589             notify->post();
    590             break;
    591         }
    592 
    593         case kWhatFetchPlaylist:
    594         {
    595             bool unchanged;
    596             sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
    597                     mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
    598 
    599             sp<AMessage> notify = mNotify->dup();
    600             notify->setInt32("what", kWhatPlaylistFetched);
    601             notify->setObject("playlist", playlist);
    602             notify->post();
    603             break;
    604         }
    605 
    606         case kWhatMonitorQueue:
    607         case kWhatDownloadNext:
    608         {
    609             int32_t generation;
    610             CHECK(msg->findInt32("generation", &generation));
    611 
    612             if (generation != mMonitorQueueGeneration) {
    613                 // Stale event
    614                 break;
    615             }
    616 
    617             if (msg->what() == kWhatMonitorQueue) {
    618                 onMonitorQueue();
    619             } else {
    620                 onDownloadNext();
    621             }
    622             break;
    623         }
    624 
    625         case kWhatResumeUntil:
    626         {
    627             onResumeUntil(msg);
    628             break;
    629         }
    630 
    631         default:
    632             TRESPASS();
    633     }
    634 }
    635 
    636 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
    637     mPacketSources.clear();
    638     mStopParams.clear();
    639     mStartTimeUsNotify = mNotify->dup();
    640     mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
    641     mStartTimeUsNotify->setString("uri", mURI);
    642 
    643     uint32_t streamTypeMask;
    644     CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
    645 
    646     int64_t startTimeUs;
    647     int64_t segmentStartTimeUs;
    648     int32_t startDiscontinuitySeq;
    649     int32_t seekMode;
    650     CHECK(msg->findInt64("startTimeUs", &startTimeUs));
    651     CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs));
    652     CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq));
    653     CHECK(msg->findInt32("seekMode", &seekMode));
    654 
    655     if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) {
    656         void *ptr;
    657         CHECK(msg->findPointer("audioSource", &ptr));
    658 
    659         mPacketSources.add(
    660                 LiveSession::STREAMTYPE_AUDIO,
    661                 static_cast<AnotherPacketSource *>(ptr));
    662     }
    663 
    664     if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) {
    665         void *ptr;
    666         CHECK(msg->findPointer("videoSource", &ptr));
    667 
    668         mPacketSources.add(
    669                 LiveSession::STREAMTYPE_VIDEO,
    670                 static_cast<AnotherPacketSource *>(ptr));
    671     }
    672 
    673     if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) {
    674         void *ptr;
    675         CHECK(msg->findPointer("subtitleSource", &ptr));
    676 
    677         mPacketSources.add(
    678                 LiveSession::STREAMTYPE_SUBTITLES,
    679                 static_cast<AnotherPacketSource *>(ptr));
    680     }
    681 
    682     void *ptr;
    683     // metadataSource is not part of streamTypeMask
    684     if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO))
    685             && msg->findPointer("metadataSource", &ptr)) {
    686         mPacketSources.add(
    687                 LiveSession::STREAMTYPE_METADATA,
    688                 static_cast<AnotherPacketSource *>(ptr));
    689     }
    690 
    691     mStreamTypeMask = streamTypeMask;
    692 
    693     mSegmentStartTimeUs = segmentStartTimeUs;
    694 
    695     if (startDiscontinuitySeq >= 0) {
    696         mDiscontinuitySeq = startDiscontinuitySeq;
    697     }
    698 
    699     mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
    700     mSeekMode = (LiveSession::SeekMode) seekMode;
    701 
    702     if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) {
    703         mStartup = true;
    704         mIDRFound = false;
    705         mVideoBuffer->clear();
    706     }
    707 
    708     if (startTimeUs >= 0) {
    709         mStartTimeUs = startTimeUs;
    710         mFirstPTSValid = false;
    711         mSeqNumber = -1;
    712         mTimeChangeSignaled = false;
    713         mDownloadState->resetState();
    714     }
    715 
    716     postMonitorQueue();
    717 
    718     return OK;
    719 }
    720 
    721 void PlaylistFetcher::onPause() {
    722     cancelMonitorQueue();
    723     mLastDiscontinuitySeq = mDiscontinuitySeq;
    724 
    725     resetStoppingThreshold(false /* disconnect */);
    726 }
    727 
    728 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
    729     cancelMonitorQueue();
    730 
    731     int32_t clear;
    732     CHECK(msg->findInt32("clear", &clear));
    733     if (clear) {
    734         for (size_t i = 0; i < mPacketSources.size(); i++) {
    735             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
    736             packetSource->clear();
    737         }
    738     }
    739 
    740     mDownloadState->resetState();
    741     mPacketSources.clear();
    742     mStreamTypeMask = 0;
    743 
    744     resetStoppingThreshold(true /* disconnect */);
    745 }
    746 
    747 // Resume until we have reached the boundary timestamps listed in `msg`; when
    748 // the remaining time is too short (within a resume threshold) stop immediately
    749 // instead.
    750 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) {
    751     sp<AMessage> params;
    752     CHECK(msg->findMessage("params", &params));
    753 
    754     mStopParams = params;
    755     onDownloadNext();
    756 
    757     return OK;
    758 }
    759 
    760 void PlaylistFetcher::notifyStopReached() {
    761     sp<AMessage> notify = mNotify->dup();
    762     notify->setInt32("what", kWhatStopReached);
    763     notify->post();
    764 }
    765 
    766 void PlaylistFetcher::notifyError(status_t err) {
    767     sp<AMessage> notify = mNotify->dup();
    768     notify->setInt32("what", kWhatError);
    769     notify->setInt32("err", err);
    770     notify->post();
    771 }
    772 
    773 void PlaylistFetcher::queueDiscontinuity(
    774         ATSParser::DiscontinuityType type, const sp<AMessage> &extra) {
    775     for (size_t i = 0; i < mPacketSources.size(); ++i) {
    776         // do not discard buffer upon #EXT-X-DISCONTINUITY tag
    777         // (seek will discard buffer by abandoning old fetchers)
    778         mPacketSources.valueAt(i)->queueDiscontinuity(
    779                 type, extra, false /* discard */);
    780     }
    781 }
    782 
    783 void PlaylistFetcher::onMonitorQueue() {
    784     // in the middle of an unfinished download, delay
    785     // playlist refresh as it'll change seq numbers
    786     if (!mDownloadState->hasSavedState()) {
    787         refreshPlaylist();
    788     }
    789 
    790     int64_t targetDurationUs = kMinBufferedDurationUs;
    791     if (mPlaylist != NULL) {
    792         targetDurationUs = mPlaylist->getTargetDuration();
    793     }
    794 
    795     int64_t bufferedDurationUs = 0ll;
    796     status_t finalResult = OK;
    797     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
    798         sp<AnotherPacketSource> packetSource =
    799             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
    800 
    801         bufferedDurationUs =
    802                 packetSource->getBufferedDurationUs(&finalResult);
    803     } else {
    804         // Use min stream duration, but ignore streams that never have any packet
    805         // enqueued to prevent us from waiting on a non-existent stream;
    806         // when we cannot make out from the manifest what streams are included in
    807         // a playlist we might assume extra streams.
    808         bufferedDurationUs = -1ll;
    809         for (size_t i = 0; i < mPacketSources.size(); ++i) {
    810             if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
    811                     || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
    812                 continue;
    813             }
    814 
    815             int64_t bufferedStreamDurationUs =
    816                 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
    817 
    818             FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs);
    819 
    820             if (bufferedDurationUs == -1ll
    821                  || bufferedStreamDurationUs < bufferedDurationUs) {
    822                 bufferedDurationUs = bufferedStreamDurationUs;
    823             }
    824         }
    825         if (bufferedDurationUs == -1ll) {
    826             bufferedDurationUs = 0ll;
    827         }
    828     }
    829 
    830     if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
    831         FLOGV("monitoring, buffered=%lld < %lld",
    832                 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs);
    833 
    834         // delay the next download slightly; hopefully this gives other concurrent fetchers
    835         // a better chance to run.
    836         // onDownloadNext();
    837         sp<AMessage> msg = new AMessage(kWhatDownloadNext, this);
    838         msg->setInt32("generation", mMonitorQueueGeneration);
    839         msg->post(1000l);
    840     } else {
    841         // We'd like to maintain buffering above durationToBufferUs, so try
    842         // again when buffer just about to go below durationToBufferUs
    843         // (or after targetDurationUs / 2, whichever is smaller).
    844         int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
    845         if (delayUs > targetDurationUs / 2) {
    846             delayUs = targetDurationUs / 2;
    847         }
    848 
    849         FLOGV("pausing for %lld, buffered=%lld > %lld",
    850                 (long long)delayUs,
    851                 (long long)bufferedDurationUs,
    852                 (long long)kMinBufferedDurationUs);
    853 
    854         postMonitorQueue(delayUs);
    855     }
    856 }
    857 
    858 status_t PlaylistFetcher::refreshPlaylist() {
    859     if (delayUsToRefreshPlaylist() <= 0) {
    860         bool unchanged;
    861         sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
    862                 mURI.c_str(), mPlaylistHash, &unchanged);
    863 
    864         if (playlist == NULL) {
    865             if (unchanged) {
    866                 // We succeeded in fetching the playlist, but it was
    867                 // unchanged from the last time we tried.
    868 
    869                 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) {
    870                     mRefreshState = (RefreshState)(mRefreshState + 1);
    871                 }
    872             } else {
    873                 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str());
    874                 return ERROR_IO;
    875             }
    876         } else {
    877             mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;
    878             mPlaylist = playlist;
    879 
    880             if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
    881                 updateDuration();
    882             }
    883             // Notify LiveSession to use target-duration based buffering level
    884             // for up/down switch. Default LiveSession::kUpSwitchMark may not
    885             // be reachable for live streams, as our max buffering amount is
    886             // limited to 3 segments.
    887             if (!mPlaylist->isComplete()) {
    888                 updateTargetDuration();
    889             }
    890             mPlaylistTimeUs = ALooper::GetNowUs();
    891         }
    892 
    893         mLastPlaylistFetchTimeUs = ALooper::GetNowUs();
    894     }
    895     return OK;
    896 }
    897 
    898 // static
    899 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
    900     return buffer->size() > 0 && buffer->data()[0] == 0x47;
    901 }
    902 
    903 bool PlaylistFetcher::shouldPauseDownload() {
    904     if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
    905         // doesn't apply to subtitles
    906         return false;
    907     }
    908 
    909     // Calculate threshold to abort current download
    910     float thresholdRatio = getStoppingThreshold();
    911 
    912     if (thresholdRatio < 0.0f) {
    913         // never abort
    914         return false;
    915     } else if (thresholdRatio == 0.0f) {
    916         // immediately abort
    917         return true;
    918     }
    919 
    920     // now we have a positive thresholdUs, abort if remaining
    921     // portion to download is over that threshold.
    922     if (mSegmentFirstPTS < 0) {
    923         // this means we haven't even find the first access unit,
    924         // abort now as we must be very far away from the end.
    925         return true;
    926     }
    927     int64_t lastEnqueueUs = mSegmentFirstPTS;
    928     for (size_t i = 0; i < mPacketSources.size(); ++i) {
    929         if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
    930             continue;
    931         }
    932         sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta();
    933         int32_t type;
    934         if (meta == NULL || meta->findInt32("discontinuity", &type)) {
    935             continue;
    936         }
    937         int64_t tmpUs;
    938         CHECK(meta->findInt64("timeUs", &tmpUs));
    939         if (tmpUs > lastEnqueueUs) {
    940             lastEnqueueUs = tmpUs;
    941         }
    942     }
    943     lastEnqueueUs -= mSegmentFirstPTS;
    944 
    945     int64_t targetDurationUs = mPlaylist->getTargetDuration();
    946     int64_t thresholdUs = thresholdRatio * targetDurationUs;
    947 
    948     FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
    949             targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
    950             (long long)thresholdUs,
    951             (long long)(targetDurationUs - lastEnqueueUs));
    952 
    953     if (targetDurationUs - lastEnqueueUs > thresholdUs) {
    954         return true;
    955     }
    956     return false;
    957 }
    958 
    959 bool PlaylistFetcher::initDownloadState(
    960         AString &uri,
    961         sp<AMessage> &itemMeta,
    962         int32_t &firstSeqNumberInPlaylist,
    963         int32_t &lastSeqNumberInPlaylist) {
    964     status_t err = refreshPlaylist();
    965     firstSeqNumberInPlaylist = 0;
    966     lastSeqNumberInPlaylist = 0;
    967     bool discontinuity = false;
    968 
    969     if (mPlaylist != NULL) {
    970         mPlaylist->getSeqNumberRange(
    971                 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist);
    972 
    973         if (mDiscontinuitySeq < 0) {
    974             mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq();
    975         }
    976     }
    977 
    978     mSegmentFirstPTS = -1ll;
    979 
    980     if (mPlaylist != NULL && mSeqNumber < 0) {
    981         CHECK_GE(mStartTimeUs, 0ll);
    982 
    983         if (mSegmentStartTimeUs < 0) {
    984             if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
    985                 // If this is a live session, start 3 segments from the end on connect
    986                 mSeqNumber = lastSeqNumberInPlaylist - 3;
    987                 if (mSeqNumber < firstSeqNumberInPlaylist) {
    988                     mSeqNumber = firstSeqNumberInPlaylist;
    989                 }
    990             } else {
    991                 // When seeking mSegmentStartTimeUs is unavailable (< 0), we
    992                 // use mStartTimeUs (client supplied timestamp) to determine both start segment
    993                 // and relative position inside a segment
    994                 mSeqNumber = getSeqNumberForTime(mStartTimeUs);
    995                 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber);
    996             }
    997             mStartTimeUsRelative = true;
    998             FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)",
    999                     (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist,
   1000                     lastSeqNumberInPlaylist);
   1001         } else {
   1002             // When adapting or track switching, mSegmentStartTimeUs (relative
   1003             // to media time 0) is used to determine the start segment; mStartTimeUs (absolute
   1004             // timestamps coming from the media container) is used to determine the position
   1005             // inside a segments.
   1006             if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES
   1007                     && mSeekMode != LiveSession::kSeekModeNextSample) {
   1008                 // avoid double fetch/decode
   1009                 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search
   1010                 // for the starting segment in new variant.
   1011                 // If the two variants' segments are aligned, this gives the
   1012                 // next segment. If they're not aligned, this gives the segment
   1013                 // that overlaps no more than 1/2 * targetDurationUs.
   1014                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs
   1015                         + mPlaylist->getTargetDuration() / 2);
   1016             } else {
   1017                 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs);
   1018             }
   1019             ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq);
   1020             if (mSeqNumber < minSeq) {
   1021                 mSeqNumber = minSeq;
   1022             }
   1023 
   1024             if (mSeqNumber < firstSeqNumberInPlaylist) {
   1025                 mSeqNumber = firstSeqNumberInPlaylist;
   1026             }
   1027 
   1028             if (mSeqNumber > lastSeqNumberInPlaylist) {
   1029                 mSeqNumber = lastSeqNumberInPlaylist;
   1030             }
   1031             FLOGV("Initial sequence number is %d from (%d .. %d)",
   1032                     mSeqNumber, firstSeqNumberInPlaylist,
   1033                     lastSeqNumberInPlaylist);
   1034         }
   1035     }
   1036 
   1037     // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true
   1038     if (mSeqNumber < firstSeqNumberInPlaylist
   1039             || mSeqNumber > lastSeqNumberInPlaylist
   1040             || err != OK) {
   1041         if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) {
   1042             ++mNumRetries;
   1043 
   1044             if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) {
   1045                 // make sure we reach this retry logic on refresh failures
   1046                 // by adding an err != OK clause to all enclosing if's.
   1047 
   1048                 // refresh in increasing fraction (1/2, 1/3, ...) of the
   1049                 // playlist's target duration or 3 seconds, whichever is less
   1050                 int64_t delayUs = kMaxMonitorDelayUs;
   1051                 if (mPlaylist != NULL) {
   1052                     delayUs = mPlaylist->size() * mPlaylist->getTargetDuration()
   1053                             / (1 + mNumRetries);
   1054                 }
   1055                 if (delayUs > kMaxMonitorDelayUs) {
   1056                     delayUs = kMaxMonitorDelayUs;
   1057                 }
   1058                 FLOGV("sequence number high: %d from (%d .. %d), "
   1059                       "monitor in %lld (retry=%d)",
   1060                         mSeqNumber, firstSeqNumberInPlaylist,
   1061                         lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries);
   1062                 postMonitorQueue(delayUs);
   1063                 return false;
   1064             }
   1065 
   1066             if (err != OK) {
   1067                 notifyError(err);
   1068                 return false;
   1069             }
   1070 
   1071             // we've missed the boat, let's start 3 segments prior to the latest sequence
   1072             // number available and signal a discontinuity.
   1073 
   1074             ALOGI("We've missed the boat, restarting playback."
   1075                   "  mStartup=%d, was  looking for %d in %d-%d",
   1076                     mStartup, mSeqNumber, firstSeqNumberInPlaylist,
   1077                     lastSeqNumberInPlaylist);
   1078             if (mStopParams != NULL) {
   1079                 // we should have kept on fetching until we hit the boundaries in mStopParams,
   1080                 // but since the segments we are supposed to fetch have already rolled off
   1081                 // the playlist, i.e. we have already missed the boat, we inevitably have to
   1082                 // skip.
   1083                 notifyStopReached();
   1084                 return false;
   1085             }
   1086             mSeqNumber = lastSeqNumberInPlaylist - 3;
   1087             if (mSeqNumber < firstSeqNumberInPlaylist) {
   1088                 mSeqNumber = firstSeqNumberInPlaylist;
   1089             }
   1090             discontinuity = true;
   1091 
   1092             // fall through
   1093         } else {
   1094             if (mPlaylist != NULL) {
   1095                 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size()
   1096                         && !mPlaylist->isComplete()) {
   1097                     // Live playlists
   1098                     ALOGW("sequence number %d not yet available", mSeqNumber);
   1099                     postMonitorQueue(delayUsToRefreshPlaylist());
   1100                     return false;
   1101                 }
   1102                 ALOGE("Cannot find sequence number %d in playlist "
   1103                      "(contains %d - %d)",
   1104                      mSeqNumber, firstSeqNumberInPlaylist,
   1105                       firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1);
   1106 
   1107                 if (mTSParser != NULL) {
   1108                     mTSParser->signalEOS(ERROR_END_OF_STREAM);
   1109                     // Use an empty buffer; we don't have any new data, just want to extract
   1110                     // potential new access units after flush.  Reset mSeqNumber to
   1111                     // lastSeqNumberInPlaylist such that we set the correct access unit
   1112                     // properties in extractAndQueueAccessUnitsFromTs.
   1113                     sp<ABuffer> buffer = new ABuffer(0);
   1114                     mSeqNumber = lastSeqNumberInPlaylist;
   1115                     extractAndQueueAccessUnitsFromTs(buffer);
   1116                 }
   1117                 notifyError(ERROR_END_OF_STREAM);
   1118             } else {
   1119                 // It's possible that we were never able to download the playlist.
   1120                 // In this case we should notify error, instead of EOS, as EOS during
   1121                 // prepare means we succeeded in downloading everything.
   1122                 ALOGE("Failed to download playlist!");
   1123                 notifyError(ERROR_IO);
   1124             }
   1125 
   1126             return false;
   1127         }
   1128     }
   1129 
   1130     mNumRetries = 0;
   1131 
   1132     CHECK(mPlaylist->itemAt(
   1133                 mSeqNumber - firstSeqNumberInPlaylist,
   1134                 &uri,
   1135                 &itemMeta));
   1136 
   1137     CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));
   1138 
   1139     int32_t val;
   1140     if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
   1141         discontinuity = true;
   1142     } else if (mLastDiscontinuitySeq >= 0
   1143             && mDiscontinuitySeq != mLastDiscontinuitySeq) {
   1144         // Seek jumped to a new discontinuity sequence. We need to signal
   1145         // a format change to decoder. Decoder needs to shutdown and be
   1146         // created again if seamless format change is unsupported.
   1147         FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
   1148                 "mDiscontinuitySeq %d, mStartTimeUs %lld",
   1149                 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
   1150         discontinuity = true;
   1151     }
   1152     mLastDiscontinuitySeq = -1;
   1153 
   1154     // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
   1155     // this avoids interleaved connections to the key and segment file.
   1156     {
   1157         sp<ABuffer> junk = new ABuffer(16);
   1158         junk->setRange(0, 16);
   1159         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
   1160                 true /* first */);
   1161         if (err == ERROR_NOT_CONNECTED) {
   1162             return false;
   1163         } else if (err != OK) {
   1164             notifyError(err);
   1165             return false;
   1166         }
   1167     }
   1168 
   1169     if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
   1170         // We need to signal a time discontinuity to ATSParser on the
   1171         // first segment after start, or on a discontinuity segment.
   1172         // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
   1173         // to send the time discontinuity.
   1174         if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
   1175             // If this was a live event this made no sense since
   1176             // we don't have access to all the segment before the current
   1177             // one.
   1178             mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
   1179         }
   1180 
   1181         // Setting mTimeChangeSignaled to true, so that if start time
   1182         // searching goes into 2nd segment (without a discontinuity),
   1183         // we don't reset time again. It causes corruption when pending
   1184         // data in ATSParser is cleared.
   1185         mTimeChangeSignaled = true;
   1186     }
   1187 
   1188     if (discontinuity) {
   1189         ALOGI("queueing discontinuity (explicit=%d)", discontinuity);
   1190 
   1191         // Signal a format discontinuity to ATSParser to clear partial data
   1192         // from previous streams. Not doing this causes bitstream corruption.
   1193         if (mTSParser != NULL) {
   1194             mTSParser.clear();
   1195         }
   1196 
   1197         queueDiscontinuity(
   1198                 ATSParser::DISCONTINUITY_FORMAT_ONLY,
   1199                 NULL /* extra */);
   1200 
   1201         if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
   1202             // This means we guessed mStartTimeUs to be in the previous
   1203             // segment (likely very close to the end), but either video or
   1204             // audio has not found start by the end of that segment.
   1205             //
   1206             // If this new segment is not a discontinuity, keep searching.
   1207             //
   1208             // If this new segment even got a discontinuity marker, just
   1209             // set mStartTimeUs=0, and take all samples from now on.
   1210             mStartTimeUs = 0;
   1211             mFirstPTSValid = false;
   1212             mIDRFound = false;
   1213             mVideoBuffer->clear();
   1214         }
   1215     }
   1216 
   1217     FLOGV("fetching segment %d from (%d .. %d)",
   1218             mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist);
   1219     return true;
   1220 }
   1221 
   1222 void PlaylistFetcher::onDownloadNext() {
   1223     AString uri;
   1224     sp<AMessage> itemMeta;
   1225     sp<ABuffer> buffer;
   1226     sp<ABuffer> tsBuffer;
   1227     int32_t firstSeqNumberInPlaylist = 0;
   1228     int32_t lastSeqNumberInPlaylist = 0;
   1229     bool connectHTTP = true;
   1230 
   1231     if (mDownloadState->hasSavedState()) {
   1232         mDownloadState->restoreState(
   1233                 uri,
   1234                 itemMeta,
   1235                 buffer,
   1236                 tsBuffer,
   1237                 firstSeqNumberInPlaylist,
   1238                 lastSeqNumberInPlaylist);
   1239         connectHTTP = false;
   1240         FLOGV("resuming: '%s'", uri.c_str());
   1241     } else {
   1242         if (!initDownloadState(
   1243                 uri,
   1244                 itemMeta,
   1245                 firstSeqNumberInPlaylist,
   1246                 lastSeqNumberInPlaylist)) {
   1247             return;
   1248         }
   1249         FLOGV("fetching: '%s'", uri.c_str());
   1250     }
   1251 
   1252     int64_t range_offset, range_length;
   1253     if (!itemMeta->findInt64("range-offset", &range_offset)
   1254             || !itemMeta->findInt64("range-length", &range_length)) {
   1255         range_offset = 0;
   1256         range_length = -1;
   1257     }
   1258 
   1259     // block-wise download
   1260     bool shouldPause = false;
   1261     ssize_t bytesRead;
   1262     do {
   1263         int64_t startUs = ALooper::GetNowUs();
   1264         bytesRead = mHTTPDownloader->fetchBlock(
   1265                 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
   1266                 NULL /* actualURL */, connectHTTP);
   1267         int64_t delayUs = ALooper::GetNowUs() - startUs;
   1268 
   1269         if (bytesRead == ERROR_NOT_CONNECTED) {
   1270             return;
   1271         }
   1272         if (bytesRead < 0) {
   1273             status_t err = bytesRead;
   1274             ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
   1275             notifyError(err);
   1276             return;
   1277         }
   1278 
   1279         // add sample for bandwidth estimation, excluding samples from subtitles (as
   1280         // its too small), or during startup/resumeUntil (when we could have more than
   1281         // one connection open which affects bandwidth)
   1282         if (!mStartup && mStopParams == NULL && bytesRead > 0
   1283                 && (mStreamTypeMask
   1284                         & (LiveSession::STREAMTYPE_AUDIO
   1285                         | LiveSession::STREAMTYPE_VIDEO))) {
   1286             mSession->addBandwidthMeasurement(bytesRead, delayUs);
   1287             if (delayUs > 2000000ll) {
   1288                 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
   1289                         bytesRead, (double)delayUs / 1.0e6);
   1290             }
   1291         }
   1292 
   1293         connectHTTP = false;
   1294 
   1295         CHECK(buffer != NULL);
   1296 
   1297         size_t size = buffer->size();
   1298         // Set decryption range.
   1299         buffer->setRange(size - bytesRead, bytesRead);
   1300         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
   1301                 buffer->offset() == 0 /* first */);
   1302         // Unset decryption range.
   1303         buffer->setRange(0, size);
   1304 
   1305         if (err != OK) {
   1306             ALOGE("decryptBuffer failed w/ error %d", err);
   1307 
   1308             notifyError(err);
   1309             return;
   1310         }
   1311 
   1312         bool startUp = mStartup; // save current start up state
   1313 
   1314         err = OK;
   1315         if (bufferStartsWithTsSyncByte(buffer)) {
   1316             // Incremental extraction is only supported for MPEG2 transport streams.
   1317             if (tsBuffer == NULL) {
   1318                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
   1319                 tsBuffer->setRange(0, 0);
   1320             } else if (tsBuffer->capacity() != buffer->capacity()) {
   1321                 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
   1322                 tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
   1323                 tsBuffer->setRange(tsOff, tsSize);
   1324             }
   1325             tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);
   1326             err = extractAndQueueAccessUnitsFromTs(tsBuffer);
   1327         }
   1328 
   1329         if (err == -EAGAIN) {
   1330             // starting sequence number too low/high
   1331             mTSParser.clear();
   1332             for (size_t i = 0; i < mPacketSources.size(); i++) {
   1333                 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
   1334                 packetSource->clear();
   1335             }
   1336             postMonitorQueue();
   1337             return;
   1338         } else if (err == ERROR_OUT_OF_RANGE) {
   1339             // reached stopping point
   1340             notifyStopReached();
   1341             return;
   1342         } else if (err != OK) {
   1343             notifyError(err);
   1344             return;
   1345         }
   1346         // If we're switching, post start notification
   1347         // this should only be posted when the last chunk is full processed by TSParser
   1348         if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
   1349             CHECK(mStartTimeUsNotify != NULL);
   1350             mStartTimeUsNotify->post();
   1351             mStartTimeUsNotify.clear();
   1352             shouldPause = true;
   1353         }
   1354         if (shouldPause || shouldPauseDownload()) {
   1355             // save state and return if this is not the last chunk,
   1356             // leaving the fetcher in paused state.
   1357             if (bytesRead != 0) {
   1358                 mDownloadState->saveState(
   1359                         uri,
   1360                         itemMeta,
   1361                         buffer,
   1362                         tsBuffer,
   1363                         firstSeqNumberInPlaylist,
   1364                         lastSeqNumberInPlaylist);
   1365                 return;
   1366             }
   1367             shouldPause = true;
   1368         }
   1369     } while (bytesRead != 0);
   1370 
   1371     if (bufferStartsWithTsSyncByte(buffer)) {
   1372         // If we don't see a stream in the program table after fetching a full ts segment
   1373         // mark it as nonexistent.
   1374         ATSParser::SourceType srcTypes[] =
   1375                 { ATSParser::VIDEO, ATSParser::AUDIO };
   1376         LiveSession::StreamType streamTypes[] =
   1377                 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };
   1378         const size_t kNumTypes = NELEM(srcTypes);
   1379 
   1380         for (size_t i = 0; i < kNumTypes; i++) {
   1381             ATSParser::SourceType srcType = srcTypes[i];
   1382             LiveSession::StreamType streamType = streamTypes[i];
   1383 
   1384             sp<AnotherPacketSource> source =
   1385                 static_cast<AnotherPacketSource *>(
   1386                     mTSParser->getSource(srcType).get());
   1387 
   1388             if (!mTSParser->hasSource(srcType)) {
   1389                 ALOGW("MPEG2 Transport stream does not contain %s data.",
   1390                       srcType == ATSParser::VIDEO ? "video" : "audio");
   1391 
   1392                 mStreamTypeMask &= ~streamType;
   1393                 mPacketSources.removeItem(streamType);
   1394             }
   1395         }
   1396 
   1397     }
   1398 
   1399     if (checkDecryptPadding(buffer) != OK) {
   1400         ALOGE("Incorrect padding bytes after decryption.");
   1401         notifyError(ERROR_MALFORMED);
   1402         return;
   1403     }
   1404 
   1405     if (tsBuffer != NULL) {
   1406         AString method;
   1407         CHECK(buffer->meta()->findString("cipher-method", &method));
   1408         if ((tsBuffer->size() > 0 && method == "NONE")
   1409                 || tsBuffer->size() > 16) {
   1410             ALOGE("MPEG2 transport stream is not an even multiple of 188 "
   1411                     "bytes in length.");
   1412             notifyError(ERROR_MALFORMED);
   1413             return;
   1414         }
   1415     }
   1416 
   1417     // bulk extract non-ts files
   1418     bool startUp = mStartup;
   1419     if (tsBuffer == NULL) {
   1420         status_t err = extractAndQueueAccessUnits(buffer, itemMeta);
   1421         if (err == -EAGAIN) {
   1422             // starting sequence number too low/high
   1423             postMonitorQueue();
   1424             return;
   1425         } else if (err == ERROR_OUT_OF_RANGE) {
   1426             // reached stopping point
   1427             notifyStopReached();
   1428             return;
   1429         } else if (err != OK) {
   1430             notifyError(err);
   1431             return;
   1432         }
   1433     }
   1434 
   1435     ++mSeqNumber;
   1436 
   1437     // if adapting, pause after found the next starting point
   1438     if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) {
   1439         CHECK(mStartTimeUsNotify != NULL);
   1440         mStartTimeUsNotify->post();
   1441         mStartTimeUsNotify.clear();
   1442         shouldPause = true;
   1443     }
   1444 
   1445     if (!shouldPause) {
   1446         postMonitorQueue();
   1447     }
   1448 }
   1449 
   1450 /*
   1451  * returns true if we need to adjust mSeqNumber
   1452  */
   1453 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) {
   1454     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
   1455 
   1456     int64_t minDiffUs, maxDiffUs;
   1457     if (mSeekMode == LiveSession::kSeekModeNextSample) {
   1458         // if the previous fetcher paused in the middle of a segment, we
   1459         // want to start at a segment that overlaps the last sample
   1460         minDiffUs = -mPlaylist->getTargetDuration();
   1461         maxDiffUs = 0ll;
   1462     } else {
   1463         // if the previous fetcher paused at the end of a segment, ideally
   1464         // we want to start at the segment that's roughly aligned with its
   1465         // next segment, but if the two variants are not well aligned we
   1466         // adjust the diff to within (-T/2, T/2)
   1467         minDiffUs = -mPlaylist->getTargetDuration() / 2;
   1468         maxDiffUs = mPlaylist->getTargetDuration() / 2;
   1469     }
   1470 
   1471     int32_t oldSeqNumber = mSeqNumber;
   1472     ssize_t index = mSeqNumber - firstSeqNumberInPlaylist;
   1473 
   1474     // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs
   1475     int64_t diffUs = anchorTimeUs - mStartTimeUs;
   1476     if (diffUs > maxDiffUs) {
   1477         while (index > 0 && diffUs > maxDiffUs) {
   1478             --index;
   1479 
   1480             sp<AMessage> itemMeta;
   1481             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
   1482 
   1483             int64_t itemDurationUs;
   1484             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
   1485 
   1486             diffUs -= itemDurationUs;
   1487         }
   1488     } else if (diffUs < minDiffUs) {
   1489         while (index + 1 < (ssize_t) mPlaylist->size()
   1490                 && diffUs < minDiffUs) {
   1491             ++index;
   1492 
   1493             sp<AMessage> itemMeta;
   1494             CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta));
   1495 
   1496             int64_t itemDurationUs;
   1497             CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
   1498 
   1499             diffUs += itemDurationUs;
   1500         }
   1501     }
   1502 
   1503     mSeqNumber = firstSeqNumberInPlaylist + index;
   1504 
   1505     if (mSeqNumber != oldSeqNumber) {
   1506         FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]",
   1507                 (long long) anchorTimeUs - mStartTimeUs,
   1508                 (long long) minDiffUs,
   1509                 (long long) maxDiffUs);
   1510         return true;
   1511     }
   1512     return false;
   1513 }
   1514 
   1515 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const {
   1516     int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber();
   1517 
   1518     size_t index = 0;
   1519     while (index < mPlaylist->size()) {
   1520         sp<AMessage> itemMeta;
   1521         CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta));
   1522         size_t curDiscontinuitySeq;
   1523         CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq));
   1524         int32_t seqNumber = firstSeqNumberInPlaylist + index;
   1525         if (curDiscontinuitySeq == discontinuitySeq) {
   1526             return seqNumber;
   1527         } else if (curDiscontinuitySeq > discontinuitySeq) {
   1528             return seqNumber <= 0 ? 0 : seqNumber - 1;
   1529         }
   1530 
   1531         ++index;
   1532     }
   1533 
   1534     return firstSeqNumberInPlaylist + mPlaylist->size();
   1535 }
   1536 
   1537 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
   1538     size_t index = 0;
   1539     int64_t segmentStartUs = 0;
   1540     while (index < mPlaylist->size()) {
   1541         sp<AMessage> itemMeta;
   1542         CHECK(mPlaylist->itemAt(
   1543                     index, NULL /* uri */, &itemMeta));
   1544 
   1545         int64_t itemDurationUs;
   1546         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
   1547 
   1548         if (timeUs < segmentStartUs + itemDurationUs) {
   1549             break;
   1550         }
   1551 
   1552         segmentStartUs += itemDurationUs;
   1553         ++index;
   1554     }
   1555 
   1556     if (index >= mPlaylist->size()) {
   1557         index = mPlaylist->size() - 1;
   1558     }
   1559 
   1560     return mPlaylist->getFirstSeqNumber() + index;
   1561 }
   1562 
   1563 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties(
   1564         const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) {
   1565     sp<MetaData> format = source->getFormat();
   1566     if (format != NULL) {
   1567         // for simplicity, store a reference to the format in each unit
   1568         accessUnit->meta()->setObject("format", format);
   1569     }
   1570 
   1571     if (discard) {
   1572         accessUnit->meta()->setInt32("discard", discard);
   1573     }
   1574 
   1575     accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
   1576     accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
   1577     accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS);
   1578     accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber));
   1579     if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) {
   1580         accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs);
   1581     }
   1582     return accessUnit;
   1583 }
   1584 
   1585 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) {
   1586     if (!mFirstPTSValid) {
   1587         mFirstTimeUs = timeUs;
   1588         mFirstPTSValid = true;
   1589     }
   1590     bool startTimeReached = true;
   1591     if (mStartTimeUsRelative) {
   1592         FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld",
   1593                 (long long)timeUs,
   1594                 (long long)mFirstTimeUs,
   1595                 (long long)(timeUs - mFirstTimeUs));
   1596         timeUs -= mFirstTimeUs;
   1597         if (timeUs < 0) {
   1598             FLOGV("clamp negative timeUs to 0");
   1599             timeUs = 0;
   1600         }
   1601         startTimeReached = (timeUs >= mStartTimeUs);
   1602     }
   1603     return startTimeReached;
   1604 }
   1605 
   1606 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
   1607     if (mTSParser == NULL) {
   1608         // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
   1609         mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
   1610     }
   1611 
   1612     if (mNextPTSTimeUs >= 0ll) {
   1613         sp<AMessage> extra = new AMessage;
   1614         // Since we are using absolute timestamps, signal an offset of 0 to prevent
   1615         // ATSParser from skewing the timestamps of access units.
   1616         extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0);
   1617 
   1618         // When adapting, signal a recent media time to the parser,
   1619         // so that PTS wrap around is handled for the new variant.
   1620         if (mStartTimeUs >= 0 && !mStartTimeUsRelative) {
   1621             extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs);
   1622         }
   1623 
   1624         mTSParser->signalDiscontinuity(
   1625                 ATSParser::DISCONTINUITY_TIME, extra);
   1626 
   1627         mNextPTSTimeUs = -1ll;
   1628     }
   1629 
   1630     size_t offset = 0;
   1631     while (offset + 188 <= buffer->size()) {
   1632         status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);
   1633 
   1634         if (err != OK) {
   1635             return err;
   1636         }
   1637 
   1638         offset += 188;
   1639     }
   1640     // setRange to indicate consumed bytes.
   1641     buffer->setRange(buffer->offset() + offset, buffer->size() - offset);
   1642 
   1643     if (mSegmentFirstPTS < 0ll) {
   1644         // get the smallest first PTS from all streams present in this parser
   1645         for (size_t i = mPacketSources.size(); i > 0;) {
   1646             i--;
   1647             const LiveSession::StreamType stream = mPacketSources.keyAt(i);
   1648             if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
   1649                 ALOGE("MPEG2 Transport streams do not contain subtitles.");
   1650                 return ERROR_MALFORMED;
   1651             }
   1652             if (stream == LiveSession::STREAMTYPE_METADATA) {
   1653                 continue;
   1654             }
   1655             ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
   1656             sp<AnotherPacketSource> source =
   1657                 static_cast<AnotherPacketSource *>(
   1658                         mTSParser->getSource(type).get());
   1659 
   1660             if (source == NULL) {
   1661                 continue;
   1662             }
   1663             sp<AMessage> meta = source->getMetaAfterLastDequeued(0);
   1664             if (meta != NULL) {
   1665                 int64_t timeUs;
   1666                 CHECK(meta->findInt64("timeUs", &timeUs));
   1667                 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) {
   1668                     mSegmentFirstPTS = timeUs;
   1669                 }
   1670             }
   1671         }
   1672         if (mSegmentFirstPTS < 0ll) {
   1673             // didn't find any TS packet, can return early
   1674             return OK;
   1675         }
   1676         if (!mStartTimeUsRelative) {
   1677             // mStartup
   1678             //   mStartup is true until we have queued a packet for all the streams
   1679             //   we are fetching. We queue packets whose timestamps are greater than
   1680             //   mStartTimeUs.
   1681             // mSegmentStartTimeUs >= 0
   1682             //   mSegmentStartTimeUs is non-negative when adapting or switching tracks
   1683             // adjustSeqNumberWithAnchorTime(timeUs) == true
   1684             //   we guessed a seq number that's either too large or too small.
   1685             // If this happens, we'll adjust mSeqNumber and restart fetching from new
   1686             // location. Note that we only want to adjust once, so set mSegmentStartTimeUs
   1687             // to -1 so that we don't enter this chunk next time.
   1688             if (mStartup && mSegmentStartTimeUs >= 0
   1689                     && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) {
   1690                 mStartTimeUsNotify = mNotify->dup();
   1691                 mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
   1692                 mStartTimeUsNotify->setString("uri", mURI);
   1693                 mIDRFound = false;
   1694                 mSegmentStartTimeUs = -1;
   1695                 return -EAGAIN;
   1696             }
   1697         }
   1698     }
   1699 
   1700     status_t err = OK;
   1701     for (size_t i = mPacketSources.size(); i > 0;) {
   1702         i--;
   1703         sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
   1704 
   1705         const LiveSession::StreamType stream = mPacketSources.keyAt(i);
   1706         if (stream == LiveSession::STREAMTYPE_SUBTITLES) {
   1707             ALOGE("MPEG2 Transport streams do not contain subtitles.");
   1708             return ERROR_MALFORMED;
   1709         }
   1710 
   1711         const char *key = LiveSession::getKeyForStream(stream);
   1712         ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream);
   1713 
   1714         sp<AnotherPacketSource> source =
   1715             static_cast<AnotherPacketSource *>(
   1716                     mTSParser->getSource(type).get());
   1717 
   1718         if (source == NULL) {
   1719             continue;
   1720         }
   1721 
   1722         const char *mime;
   1723         sp<MetaData> format  = source->getFormat();
   1724         bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
   1725                 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);
   1726 
   1727         sp<ABuffer> accessUnit;
   1728         status_t finalResult;
   1729         while (source->hasBufferAvailable(&finalResult)
   1730                 && source->dequeueAccessUnit(&accessUnit) == OK) {
   1731 
   1732             int64_t timeUs;
   1733             CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
   1734 
   1735             if (mStartup) {
   1736                 bool startTimeReached = isStartTimeReached(timeUs);
   1737 
   1738                 if (!startTimeReached || (isAvc && !mIDRFound)) {
   1739                     // buffer up to the closest preceding IDR frame in the next segement,
   1740                     // or the closest succeeding IDR frame after the exact position
   1741                     FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d",
   1742                             (long long)timeUs,
   1743                             (long long)mStartTimeUs,
   1744                             (long long)timeUs - mStartTimeUs,
   1745                             mIDRFound);
   1746                     if (isAvc) {
   1747                         if (IsIDR(accessUnit)) {
   1748                             mVideoBuffer->clear();
   1749                             FSLOGV(stream, "found IDR, clear mVideoBuffer");
   1750                             mIDRFound = true;
   1751                         }
   1752                         if (mIDRFound && mStartTimeUsRelative && !startTimeReached) {
   1753                             mVideoBuffer->queueAccessUnit(accessUnit);
   1754                             FSLOGV(stream, "saving AVC video AccessUnit");
   1755                         }
   1756                     }
   1757                     if (!startTimeReached || (isAvc && !mIDRFound)) {
   1758                         continue;
   1759                     }
   1760                 }
   1761             }
   1762 
   1763             if (mStartTimeUsNotify != NULL) {
   1764                 uint32_t streamMask = 0;
   1765                 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask);
   1766                 if ((mStreamTypeMask & mPacketSources.keyAt(i))
   1767                         && !(streamMask & mPacketSources.keyAt(i))) {
   1768                     streamMask |= mPacketSources.keyAt(i);
   1769                     mStartTimeUsNotify->setInt32("streamMask", streamMask);
   1770                     FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x",
   1771                             (long long)timeUs, streamMask);
   1772 
   1773                     if (streamMask == mStreamTypeMask) {
   1774                         FLOGV("found start point for all streams");
   1775                         mStartup = false;
   1776                     }
   1777                 }
   1778             }
   1779 
   1780             if (mStopParams != NULL) {
   1781                 int32_t discontinuitySeq;
   1782                 int64_t stopTimeUs;
   1783                 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
   1784                         || discontinuitySeq > mDiscontinuitySeq
   1785                         || !mStopParams->findInt64(key, &stopTimeUs)
   1786                         || (discontinuitySeq == mDiscontinuitySeq
   1787                                 && timeUs >= stopTimeUs)) {
   1788                     FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs);
   1789                     mStreamTypeMask &= ~stream;
   1790                     mPacketSources.removeItemsAt(i);
   1791                     break;
   1792                 }
   1793             }
   1794 
   1795             if (stream == LiveSession::STREAMTYPE_VIDEO) {
   1796                 const bool discard = true;
   1797                 status_t status;
   1798                 while (mVideoBuffer->hasBufferAvailable(&status)) {
   1799                     sp<ABuffer> videoBuffer;
   1800                     mVideoBuffer->dequeueAccessUnit(&videoBuffer);
   1801                     setAccessUnitProperties(videoBuffer, source, discard);
   1802                     packetSource->queueAccessUnit(videoBuffer);
   1803                     int64_t bufferTimeUs;
   1804                     CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs));
   1805                     FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld",
   1806                             (long long)bufferTimeUs);
   1807                 }
   1808             } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) {
   1809                 mHasMetadata = true;
   1810                 sp<AMessage> notify = mNotify->dup();
   1811                 notify->setInt32("what", kWhatMetadataDetected);
   1812                 notify->post();
   1813             }
   1814 
   1815             setAccessUnitProperties(accessUnit, source);
   1816             packetSource->queueAccessUnit(accessUnit);
   1817             FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs);
   1818         }
   1819 
   1820         if (err != OK) {
   1821             break;
   1822         }
   1823     }
   1824 
   1825     if (err != OK) {
   1826         for (size_t i = mPacketSources.size(); i > 0;) {
   1827             i--;
   1828             sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i);
   1829             packetSource->clear();
   1830         }
   1831         return err;
   1832     }
   1833 
   1834     if (!mStreamTypeMask) {
   1835         // Signal gap is filled between original and new stream.
   1836         FLOGV("reached stop point for all streams");
   1837         return ERROR_OUT_OF_RANGE;
   1838     }
   1839 
   1840     return OK;
   1841 }
   1842 
   1843 /* static */
   1844 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence(
   1845         const sp<ABuffer> &buffer) {
   1846     size_t pos = 0;
   1847 
   1848     // skip possible BOM
   1849     if (buffer->size() >= pos + 3 &&
   1850             !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) {
   1851         pos += 3;
   1852     }
   1853 
   1854     // accept WEBVTT followed by SPACE, TAB or (CR) LF
   1855     if (buffer->size() < pos + 6 ||
   1856             memcmp("WEBVTT", buffer->data() + pos, 6)) {
   1857         return false;
   1858     }
   1859     pos += 6;
   1860 
   1861     if (buffer->size() == pos) {
   1862         return true;
   1863     }
   1864 
   1865     uint8_t sep = buffer->data()[pos];
   1866     return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r';
   1867 }
   1868 
   1869 status_t PlaylistFetcher::extractAndQueueAccessUnits(
   1870         const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
   1871     if (bufferStartsWithWebVTTMagicSequence(buffer)) {
   1872         if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
   1873             ALOGE("This stream only contains subtitles.");
   1874             return ERROR_MALFORMED;
   1875         }
   1876 
   1877         const sp<AnotherPacketSource> packetSource =
   1878             mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
   1879 
   1880         int64_t durationUs;
   1881         CHECK(itemMeta->findInt64("durationUs", &durationUs));
   1882         buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber));
   1883         buffer->meta()->setInt64("durationUs", durationUs);
   1884         buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber));
   1885         buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq);
   1886         buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration);
   1887         packetSource->queueAccessUnit(buffer);
   1888         return OK;
   1889     }
   1890 
   1891     if (mNextPTSTimeUs >= 0ll) {
   1892         mNextPTSTimeUs = -1ll;
   1893     }
   1894 
   1895     // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio
   1896     // stream prefixed by an ID3 tag.
   1897 
   1898     bool firstID3Tag = true;
   1899     uint64_t PTS = 0;
   1900 
   1901     for (;;) {
   1902         // Make sure to skip all ID3 tags preceding the audio data.
   1903         // At least one must be present to provide the PTS timestamp.
   1904 
   1905         ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */);
   1906         if (!id3.isValid()) {
   1907             if (firstID3Tag) {
   1908                 ALOGE("Unable to parse ID3 tag.");
   1909                 return ERROR_MALFORMED;
   1910             } else {
   1911                 break;
   1912             }
   1913         }
   1914 
   1915         if (firstID3Tag) {
   1916             bool found = false;
   1917 
   1918             ID3::Iterator it(id3, "PRIV");
   1919             while (!it.done()) {
   1920                 size_t length;
   1921                 const uint8_t *data = it.getData(&length);
   1922                 if (!data) {
   1923                     return ERROR_MALFORMED;
   1924                 }
   1925 
   1926                 static const char *kMatchName =
   1927                     "com.apple.streaming.transportStreamTimestamp";
   1928                 static const size_t kMatchNameLen = strlen(kMatchName);
   1929 
   1930                 if (length == kMatchNameLen + 1 + 8
   1931                         && !strncmp((const char *)data, kMatchName, kMatchNameLen)) {
   1932                     found = true;
   1933                     PTS = U64_AT(&data[kMatchNameLen + 1]);
   1934                 }
   1935 
   1936                 it.next();
   1937             }
   1938 
   1939             if (!found) {
   1940                 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag.");
   1941                 return ERROR_MALFORMED;
   1942             }
   1943         }
   1944 
   1945         // skip the ID3 tag
   1946         buffer->setRange(
   1947                 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize());
   1948 
   1949         firstID3Tag = false;
   1950     }
   1951 
   1952     if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) {
   1953         ALOGW("This stream only contains audio data!");
   1954 
   1955         mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO;
   1956 
   1957         if (mStreamTypeMask == 0) {
   1958             return OK;
   1959         }
   1960     }
   1961 
   1962     sp<AnotherPacketSource> packetSource =
   1963         mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO);
   1964 
   1965     if (packetSource->getFormat() == NULL && buffer->size() >= 7) {
   1966         ABitReader bits(buffer->data(), buffer->size());
   1967 
   1968         // adts_fixed_header
   1969 
   1970         CHECK_EQ(bits.getBits(12), 0xfffu);
   1971         bits.skipBits(3);  // ID, layer
   1972         bool protection_absent __unused = bits.getBits(1) != 0;
   1973 
   1974         unsigned profile = bits.getBits(2);
   1975         CHECK_NE(profile, 3u);
   1976         unsigned sampling_freq_index = bits.getBits(4);
   1977         bits.getBits(1);  // private_bit
   1978         unsigned channel_configuration = bits.getBits(3);
   1979         CHECK_NE(channel_configuration, 0u);
   1980         bits.skipBits(2);  // original_copy, home
   1981 
   1982         sp<MetaData> meta = MakeAACCodecSpecificData(
   1983                 profile, sampling_freq_index, channel_configuration);
   1984 
   1985         meta->setInt32(kKeyIsADTS, true);
   1986 
   1987         packetSource->setFormat(meta);
   1988     }
   1989 
   1990     int64_t numSamples = 0ll;
   1991     int32_t sampleRate;
   1992     CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));
   1993 
   1994     int64_t timeUs = (PTS * 100ll) / 9ll;
   1995     if (mStartup && !mFirstPTSValid) {
   1996         mFirstPTSValid = true;
   1997         mFirstTimeUs = timeUs;
   1998     }
   1999 
   2000     if (mSegmentFirstPTS < 0ll) {
   2001         mSegmentFirstPTS = timeUs;
   2002         if (!mStartTimeUsRelative) {
   2003             // Duplicated logic from how we handle .ts playlists.
   2004             if (mStartup && mSegmentStartTimeUs >= 0
   2005                     && adjustSeqNumberWithAnchorTime(timeUs)) {
   2006                 mSegmentStartTimeUs = -1;
   2007                 return -EAGAIN;
   2008             }
   2009         }
   2010     }
   2011 
   2012     size_t offset = 0;
   2013     while (offset < buffer->size()) {
   2014         const uint8_t *adtsHeader = buffer->data() + offset;
   2015         CHECK_LT(offset + 5, buffer->size());
   2016 
   2017         unsigned aac_frame_length =
   2018             ((adtsHeader[3] & 3) << 11)
   2019             | (adtsHeader[4] << 3)
   2020             | (adtsHeader[5] >> 5);
   2021 
   2022         if (aac_frame_length == 0) {
   2023             const uint8_t *id3Header = adtsHeader;
   2024             if (!memcmp(id3Header, "ID3", 3)) {
   2025                 ID3 id3(id3Header, buffer->size() - offset, true);
   2026                 if (id3.isValid()) {
   2027                     offset += id3.rawSize();
   2028                     continue;
   2029                 };
   2030             }
   2031             return ERROR_MALFORMED;
   2032         }
   2033 
   2034         CHECK_LE(offset + aac_frame_length, buffer->size());
   2035 
   2036         int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate;
   2037         offset += aac_frame_length;
   2038 
   2039         // Each AAC frame encodes 1024 samples.
   2040         numSamples += 1024;
   2041 
   2042         if (mStartup) {
   2043             int64_t startTimeUs = unitTimeUs;
   2044             if (mStartTimeUsRelative) {
   2045                 startTimeUs -= mFirstTimeUs;
   2046                 if (startTimeUs  < 0) {
   2047                     startTimeUs = 0;
   2048                 }
   2049             }
   2050             if (startTimeUs < mStartTimeUs) {
   2051                 continue;
   2052             }
   2053 
   2054             if (mStartTimeUsNotify != NULL) {
   2055                 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO);
   2056                 mStartup = false;
   2057             }
   2058         }
   2059 
   2060         if (mStopParams != NULL) {
   2061             int32_t discontinuitySeq;
   2062             int64_t stopTimeUs;
   2063             if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq)
   2064                     || discontinuitySeq > mDiscontinuitySeq
   2065                     || !mStopParams->findInt64("timeUsAudio", &stopTimeUs)
   2066                     || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) {
   2067                 mStreamTypeMask = 0;
   2068                 mPacketSources.clear();
   2069                 return ERROR_OUT_OF_RANGE;
   2070             }
   2071         }
   2072 
   2073         sp<ABuffer> unit = new ABuffer(aac_frame_length);
   2074         memcpy(unit->data(), adtsHeader, aac_frame_length);
   2075 
   2076         unit->meta()->setInt64("timeUs", unitTimeUs);
   2077         setAccessUnitProperties(unit, packetSource);
   2078         packetSource->queueAccessUnit(unit);
   2079     }
   2080 
   2081     return OK;
   2082 }
   2083 
   2084 void PlaylistFetcher::updateDuration() {
   2085     int64_t durationUs = 0ll;
   2086     for (size_t index = 0; index < mPlaylist->size(); ++index) {
   2087         sp<AMessage> itemMeta;
   2088         CHECK(mPlaylist->itemAt(
   2089                     index, NULL /* uri */, &itemMeta));
   2090 
   2091         int64_t itemDurationUs;
   2092         CHECK(itemMeta->findInt64("durationUs", &itemDurationUs));
   2093 
   2094         durationUs += itemDurationUs;
   2095     }
   2096 
   2097     sp<AMessage> msg = mNotify->dup();
   2098     msg->setInt32("what", kWhatDurationUpdate);
   2099     msg->setInt64("durationUs", durationUs);
   2100     msg->post();
   2101 }
   2102 
   2103 void PlaylistFetcher::updateTargetDuration() {
   2104     sp<AMessage> msg = mNotify->dup();
   2105     msg->setInt32("what", kWhatTargetDurationUpdate);
   2106     msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration());
   2107     msg->post();
   2108 }
   2109 
   2110 }  // namespace android
   2111