Home | History | Annotate | Download | only in httplive
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "LiveSession"
     19 #include <utils/Log.h>
     20 
     21 #include "LiveSession.h"
     22 
     23 #include "M3UParser.h"
     24 #include "PlaylistFetcher.h"
     25 
     26 #include "include/HTTPBase.h"
     27 #include "mpeg2ts/AnotherPacketSource.h"
     28 
     29 #include <cutils/properties.h>
     30 #include <media/IMediaHTTPConnection.h>
     31 #include <media/IMediaHTTPService.h>
     32 #include <media/stagefright/foundation/hexdump.h>
     33 #include <media/stagefright/foundation/ABuffer.h>
     34 #include <media/stagefright/foundation/ADebug.h>
     35 #include <media/stagefright/foundation/AMessage.h>
     36 #include <media/stagefright/DataSource.h>
     37 #include <media/stagefright/FileSource.h>
     38 #include <media/stagefright/MediaErrors.h>
     39 #include <media/stagefright/MediaHTTP.h>
     40 #include <media/stagefright/MetaData.h>
     41 #include <media/stagefright/Utils.h>
     42 
     43 #include <utils/Mutex.h>
     44 
     45 #include <ctype.h>
     46 #include <inttypes.h>
     47 #include <openssl/aes.h>
     48 #include <openssl/md5.h>
     49 
     50 namespace android {
     51 
     52 // Number of recently-read bytes to use for bandwidth estimation
     53 const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024;
     54 
     55 LiveSession::LiveSession(
     56         const sp<AMessage> &notify, uint32_t flags,
     57         const sp<IMediaHTTPService> &httpService)
     58     : mNotify(notify),
     59       mFlags(flags),
     60       mHTTPService(httpService),
     61       mInPreparationPhase(true),
     62       mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
     63       mCurBandwidthIndex(-1),
     64       mStreamMask(0),
     65       mNewStreamMask(0),
     66       mSwapMask(0),
     67       mCheckBandwidthGeneration(0),
     68       mSwitchGeneration(0),
     69       mSubtitleGeneration(0),
     70       mLastDequeuedTimeUs(0ll),
     71       mRealTimeBaseUs(0ll),
     72       mReconfigurationInProgress(false),
     73       mSwitchInProgress(false),
     74       mDisconnectReplyID(0),
     75       mSeekReplyID(0),
     76       mFirstTimeUsValid(false),
     77       mFirstTimeUs(0),
     78       mLastSeekTimeUs(0) {
     79 
     80     mStreams[kAudioIndex] = StreamItem("audio");
     81     mStreams[kVideoIndex] = StreamItem("video");
     82     mStreams[kSubtitleIndex] = StreamItem("subtitles");
     83 
     84     for (size_t i = 0; i < kMaxStreams; ++i) {
     85         mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
     86         mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
     87         mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
     88         mBuffering[i] = false;
     89     }
     90 
     91     size_t numHistoryItems = kBandwidthHistoryBytes /
     92             PlaylistFetcher::kDownloadBlockSize + 1;
     93     if (numHistoryItems < 5) {
     94         numHistoryItems = 5;
     95     }
     96     mHTTPDataSource->setBandwidthHistorySize(numHistoryItems);
     97 }
     98 
     99 LiveSession::~LiveSession() {
    100 }
    101 
    102 sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
    103     ABuffer *discontinuity = new ABuffer(0);
    104     discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
    105     discontinuity->meta()->setInt32("swapPacketSource", swap);
    106     discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
    107     discontinuity->meta()->setInt64("timeUs", -1);
    108     return discontinuity;
    109 }
    110 
    111 void LiveSession::swapPacketSource(StreamType stream) {
    112     sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
    113     sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
    114     sp<AnotherPacketSource> tmp = aps;
    115     aps = aps2;
    116     aps2 = tmp;
    117     aps2->clear();
    118 }
    119 
    120 status_t LiveSession::dequeueAccessUnit(
    121         StreamType stream, sp<ABuffer> *accessUnit) {
    122     if (!(mStreamMask & stream)) {
    123         // return -EWOULDBLOCK to avoid halting the decoder
    124         // when switching between audio/video and audio only.
    125         return -EWOULDBLOCK;
    126     }
    127 
    128     status_t finalResult;
    129     sp<AnotherPacketSource> discontinuityQueue  = mDiscontinuities.valueFor(stream);
    130     if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
    131         discontinuityQueue->dequeueAccessUnit(accessUnit);
    132         // seeking, track switching
    133         sp<AMessage> extra;
    134         int64_t timeUs;
    135         if ((*accessUnit)->meta()->findMessage("extra", &extra)
    136                 && extra != NULL
    137                 && extra->findInt64("timeUs", &timeUs)) {
    138             // seeking only
    139             mLastSeekTimeUs = timeUs;
    140             mDiscontinuityOffsetTimesUs.clear();
    141             mDiscontinuityAbsStartTimesUs.clear();
    142         }
    143         return INFO_DISCONTINUITY;
    144     }
    145 
    146     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
    147 
    148     ssize_t idx = typeToIndex(stream);
    149     if (!packetSource->hasBufferAvailable(&finalResult)) {
    150         if (finalResult == OK) {
    151             mBuffering[idx] = true;
    152             return -EAGAIN;
    153         } else {
    154             return finalResult;
    155         }
    156     }
    157 
    158     int32_t targetDuration = 0;
    159     sp<AMessage> meta = packetSource->getLatestEnqueuedMeta();
    160     if (meta != NULL) {
    161         meta->findInt32("targetDuration", &targetDuration);
    162     }
    163 
    164     int64_t targetDurationUs = targetDuration * 1000000ll;
    165     if (targetDurationUs == 0 ||
    166             targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) {
    167         // Fetchers limit buffering to
    168         // min(3 * targetDuration, kMinBufferedDurationUs)
    169         targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs;
    170     }
    171 
    172     if (mBuffering[idx]) {
    173         if (mSwitchInProgress
    174                 || packetSource->isFinished(0)
    175                 || packetSource->getEstimatedDurationUs() > targetDurationUs) {
    176             mBuffering[idx] = false;
    177         }
    178     }
    179 
    180     if (mBuffering[idx]) {
    181         return -EAGAIN;
    182     }
    183 
    184     // wait for counterpart
    185     sp<AnotherPacketSource> otherSource;
    186     uint32_t mask = mNewStreamMask & mStreamMask;
    187     uint32_t fetchersMask  = 0;
    188     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
    189         uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
    190         fetchersMask |= fetcherMask;
    191     }
    192     mask &= fetchersMask;
    193     if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
    194         otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
    195     } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
    196         otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
    197     }
    198     if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
    199         return finalResult == OK ? -EAGAIN : finalResult;
    200     }
    201 
    202     status_t err = packetSource->dequeueAccessUnit(accessUnit);
    203 
    204     size_t streamIdx;
    205     const char *streamStr;
    206     switch (stream) {
    207         case STREAMTYPE_AUDIO:
    208             streamIdx = kAudioIndex;
    209             streamStr = "audio";
    210             break;
    211         case STREAMTYPE_VIDEO:
    212             streamIdx = kVideoIndex;
    213             streamStr = "video";
    214             break;
    215         case STREAMTYPE_SUBTITLES:
    216             streamIdx = kSubtitleIndex;
    217             streamStr = "subs";
    218             break;
    219         default:
    220             TRESPASS();
    221     }
    222 
    223     StreamItem& strm = mStreams[streamIdx];
    224     if (err == INFO_DISCONTINUITY) {
    225         // adaptive streaming, discontinuities in the playlist
    226         int32_t type;
    227         CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
    228 
    229         sp<AMessage> extra;
    230         if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
    231             extra.clear();
    232         }
    233 
    234         ALOGI("[%s] read discontinuity of type %d, extra = %s",
    235               streamStr,
    236               type,
    237               extra == NULL ? "NULL" : extra->debugString().c_str());
    238 
    239         int32_t swap;
    240         if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
    241             int32_t switchGeneration;
    242             CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
    243             {
    244                 Mutex::Autolock lock(mSwapMutex);
    245                 if (switchGeneration == mSwitchGeneration) {
    246                     swapPacketSource(stream);
    247                     sp<AMessage> msg = new AMessage(kWhatSwapped, id());
    248                     msg->setInt32("stream", stream);
    249                     msg->setInt32("switchGeneration", switchGeneration);
    250                     msg->post();
    251                 }
    252             }
    253         } else {
    254             size_t seq = strm.mCurDiscontinuitySeq;
    255             int64_t offsetTimeUs;
    256             if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
    257                 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
    258             } else {
    259                 offsetTimeUs = 0;
    260             }
    261 
    262             seq += 1;
    263             if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
    264                 int64_t firstTimeUs;
    265                 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
    266                 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
    267                 offsetTimeUs += strm.mLastSampleDurationUs;
    268             } else {
    269                 offsetTimeUs += strm.mLastSampleDurationUs;
    270             }
    271 
    272             mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
    273         }
    274     } else if (err == OK) {
    275 
    276         if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
    277             int64_t timeUs;
    278             int32_t discontinuitySeq = 0;
    279             CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
    280             (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
    281             strm.mCurDiscontinuitySeq = discontinuitySeq;
    282 
    283             int32_t discard = 0;
    284             int64_t firstTimeUs;
    285             if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
    286                 int64_t durUs; // approximate sample duration
    287                 if (timeUs > strm.mLastDequeuedTimeUs) {
    288                     durUs = timeUs - strm.mLastDequeuedTimeUs;
    289                 } else {
    290                     durUs = strm.mLastDequeuedTimeUs - timeUs;
    291                 }
    292                 strm.mLastSampleDurationUs = durUs;
    293                 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
    294             } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
    295                 firstTimeUs = timeUs;
    296             } else {
    297                 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
    298                 firstTimeUs = timeUs;
    299             }
    300 
    301             strm.mLastDequeuedTimeUs = timeUs;
    302             if (timeUs >= firstTimeUs) {
    303                 timeUs -= firstTimeUs;
    304             } else {
    305                 timeUs = 0;
    306             }
    307             timeUs += mLastSeekTimeUs;
    308             if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
    309                 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
    310             }
    311 
    312             ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
    313             (*accessUnit)->meta()->setInt64("timeUs",  timeUs);
    314             mLastDequeuedTimeUs = timeUs;
    315             mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
    316         } else if (stream == STREAMTYPE_SUBTITLES) {
    317             int32_t subtitleGeneration;
    318             if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
    319                     && subtitleGeneration != mSubtitleGeneration) {
    320                return -EAGAIN;
    321             };
    322             (*accessUnit)->meta()->setInt32(
    323                     "trackIndex", mPlaylist->getSelectedIndex());
    324             (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
    325         }
    326     } else {
    327         ALOGI("[%s] encountered error %d", streamStr, err);
    328     }
    329 
    330     return err;
    331 }
    332 
    333 status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
    334     // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
    335     if (!(mStreamMask & stream)) {
    336         return UNKNOWN_ERROR;
    337     }
    338 
    339     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
    340 
    341     sp<MetaData> meta = packetSource->getFormat();
    342 
    343     if (meta == NULL) {
    344         return -EAGAIN;
    345     }
    346 
    347     return convertMetaDataToMessage(meta, format);
    348 }
    349 
    350 void LiveSession::connectAsync(
    351         const char *url, const KeyedVector<String8, String8> *headers) {
    352     sp<AMessage> msg = new AMessage(kWhatConnect, id());
    353     msg->setString("url", url);
    354 
    355     if (headers != NULL) {
    356         msg->setPointer(
    357                 "headers",
    358                 new KeyedVector<String8, String8>(*headers));
    359     }
    360 
    361     msg->post();
    362 }
    363 
    364 status_t LiveSession::disconnect() {
    365     sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
    366 
    367     sp<AMessage> response;
    368     status_t err = msg->postAndAwaitResponse(&response);
    369 
    370     return err;
    371 }
    372 
    373 status_t LiveSession::seekTo(int64_t timeUs) {
    374     sp<AMessage> msg = new AMessage(kWhatSeek, id());
    375     msg->setInt64("timeUs", timeUs);
    376 
    377     sp<AMessage> response;
    378     status_t err = msg->postAndAwaitResponse(&response);
    379 
    380     return err;
    381 }
    382 
    383 void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
    384     switch (msg->what()) {
    385         case kWhatConnect:
    386         {
    387             onConnect(msg);
    388             break;
    389         }
    390 
    391         case kWhatDisconnect:
    392         {
    393             CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
    394 
    395             if (mReconfigurationInProgress) {
    396                 break;
    397             }
    398 
    399             finishDisconnect();
    400             break;
    401         }
    402 
    403         case kWhatSeek:
    404         {
    405             uint32_t seekReplyID;
    406             CHECK(msg->senderAwaitsResponse(&seekReplyID));
    407             mSeekReplyID = seekReplyID;
    408             mSeekReply = new AMessage;
    409 
    410             status_t err = onSeek(msg);
    411 
    412             if (err != OK) {
    413                 msg->post(50000);
    414             }
    415             break;
    416         }
    417 
    418         case kWhatFetcherNotify:
    419         {
    420             int32_t what;
    421             CHECK(msg->findInt32("what", &what));
    422 
    423             switch (what) {
    424                 case PlaylistFetcher::kWhatStarted:
    425                     break;
    426                 case PlaylistFetcher::kWhatPaused:
    427                 case PlaylistFetcher::kWhatStopped:
    428                 {
    429                     if (what == PlaylistFetcher::kWhatStopped) {
    430                         AString uri;
    431                         CHECK(msg->findString("uri", &uri));
    432                         if (mFetcherInfos.removeItem(uri) < 0) {
    433                             // ignore duplicated kWhatStopped messages.
    434                             break;
    435                         }
    436 
    437                         if (mSwitchInProgress) {
    438                             tryToFinishBandwidthSwitch();
    439                         }
    440                     }
    441 
    442                     if (mContinuation != NULL) {
    443                         CHECK_GT(mContinuationCounter, 0);
    444                         if (--mContinuationCounter == 0) {
    445                             mContinuation->post();
    446 
    447                             if (mSeekReplyID != 0) {
    448                                 CHECK(mSeekReply != NULL);
    449                                 mSeekReply->setInt32("err", OK);
    450                                 mSeekReply->postReply(mSeekReplyID);
    451                                 mSeekReplyID = 0;
    452                                 mSeekReply.clear();
    453                             }
    454                         }
    455                     }
    456                     break;
    457                 }
    458 
    459                 case PlaylistFetcher::kWhatDurationUpdate:
    460                 {
    461                     AString uri;
    462                     CHECK(msg->findString("uri", &uri));
    463 
    464                     int64_t durationUs;
    465                     CHECK(msg->findInt64("durationUs", &durationUs));
    466 
    467                     FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
    468                     info->mDurationUs = durationUs;
    469                     break;
    470                 }
    471 
    472                 case PlaylistFetcher::kWhatError:
    473                 {
    474                     status_t err;
    475                     CHECK(msg->findInt32("err", &err));
    476 
    477                     ALOGE("XXX Received error %d from PlaylistFetcher.", err);
    478 
    479                     // handle EOS on subtitle tracks independently
    480                     AString uri;
    481                     if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
    482                         ssize_t i = mFetcherInfos.indexOfKey(uri);
    483                         if (i >= 0) {
    484                             const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
    485                             if (fetcher != NULL) {
    486                                 uint32_t type = fetcher->getStreamTypeMask();
    487                                 if (type == STREAMTYPE_SUBTITLES) {
    488                                     mPacketSources.valueFor(
    489                                             STREAMTYPE_SUBTITLES)->signalEOS(err);;
    490                                     break;
    491                                 }
    492                             }
    493                         }
    494                     }
    495 
    496                     if (mInPreparationPhase) {
    497                         postPrepared(err);
    498                     }
    499 
    500                     cancelBandwidthSwitch();
    501 
    502                     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
    503 
    504                     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
    505 
    506                     mPacketSources.valueFor(
    507                             STREAMTYPE_SUBTITLES)->signalEOS(err);
    508 
    509                     sp<AMessage> notify = mNotify->dup();
    510                     notify->setInt32("what", kWhatError);
    511                     notify->setInt32("err", err);
    512                     notify->post();
    513                     break;
    514                 }
    515 
    516                 case PlaylistFetcher::kWhatTemporarilyDoneFetching:
    517                 {
    518                     AString uri;
    519                     CHECK(msg->findString("uri", &uri));
    520 
    521                     if (mFetcherInfos.indexOfKey(uri) < 0) {
    522                         ALOGE("couldn't find uri");
    523                         break;
    524                     }
    525                     FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
    526                     info->mIsPrepared = true;
    527 
    528                     if (mInPreparationPhase) {
    529                         bool allFetchersPrepared = true;
    530                         for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
    531                             if (!mFetcherInfos.valueAt(i).mIsPrepared) {
    532                                 allFetchersPrepared = false;
    533                                 break;
    534                             }
    535                         }
    536 
    537                         if (allFetchersPrepared) {
    538                             postPrepared(OK);
    539                         }
    540                     }
    541                     break;
    542                 }
    543 
    544                 case PlaylistFetcher::kWhatStartedAt:
    545                 {
    546                     int32_t switchGeneration;
    547                     CHECK(msg->findInt32("switchGeneration", &switchGeneration));
    548 
    549                     if (switchGeneration != mSwitchGeneration) {
    550                         break;
    551                     }
    552 
    553                     // Resume fetcher for the original variant; the resumed fetcher should
    554                     // continue until the timestamps found in msg, which is stored by the
    555                     // new fetcher to indicate where the new variant has started buffering.
    556                     for (size_t i = 0; i < mFetcherInfos.size(); i++) {
    557                         const FetcherInfo info = mFetcherInfos.valueAt(i);
    558                         if (info.mToBeRemoved) {
    559                             info.mFetcher->resumeUntilAsync(msg);
    560                         }
    561                     }
    562                     break;
    563                 }
    564 
    565                 default:
    566                     TRESPASS();
    567             }
    568 
    569             break;
    570         }
    571 
    572         case kWhatCheckBandwidth:
    573         {
    574             int32_t generation;
    575             CHECK(msg->findInt32("generation", &generation));
    576 
    577             if (generation != mCheckBandwidthGeneration) {
    578                 break;
    579             }
    580 
    581             onCheckBandwidth(msg);
    582             break;
    583         }
    584 
    585         case kWhatChangeConfiguration:
    586         {
    587             onChangeConfiguration(msg);
    588             break;
    589         }
    590 
    591         case kWhatChangeConfiguration2:
    592         {
    593             onChangeConfiguration2(msg);
    594             break;
    595         }
    596 
    597         case kWhatChangeConfiguration3:
    598         {
    599             onChangeConfiguration3(msg);
    600             break;
    601         }
    602 
    603         case kWhatFinishDisconnect2:
    604         {
    605             onFinishDisconnect2();
    606             break;
    607         }
    608 
    609         case kWhatSwapped:
    610         {
    611             onSwapped(msg);
    612             break;
    613         }
    614 
    615         case kWhatCheckSwitchDown:
    616         {
    617             onCheckSwitchDown();
    618             break;
    619         }
    620 
    621         case kWhatSwitchDown:
    622         {
    623             onSwitchDown();
    624             break;
    625         }
    626 
    627         default:
    628             TRESPASS();
    629             break;
    630     }
    631 }
    632 
    633 // static
    634 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
    635     if (a->mBandwidth < b->mBandwidth) {
    636         return -1;
    637     } else if (a->mBandwidth == b->mBandwidth) {
    638         return 0;
    639     }
    640 
    641     return 1;
    642 }
    643 
    644 // static
    645 LiveSession::StreamType LiveSession::indexToType(int idx) {
    646     CHECK(idx >= 0 && idx < kMaxStreams);
    647     return (StreamType)(1 << idx);
    648 }
    649 
    650 // static
    651 ssize_t LiveSession::typeToIndex(int32_t type) {
    652     switch (type) {
    653         case STREAMTYPE_AUDIO:
    654             return 0;
    655         case STREAMTYPE_VIDEO:
    656             return 1;
    657         case STREAMTYPE_SUBTITLES:
    658             return 2;
    659         default:
    660             return -1;
    661     };
    662     return -1;
    663 }
    664 
    665 void LiveSession::onConnect(const sp<AMessage> &msg) {
    666     AString url;
    667     CHECK(msg->findString("url", &url));
    668 
    669     KeyedVector<String8, String8> *headers = NULL;
    670     if (!msg->findPointer("headers", (void **)&headers)) {
    671         mExtraHeaders.clear();
    672     } else {
    673         mExtraHeaders = *headers;
    674 
    675         delete headers;
    676         headers = NULL;
    677     }
    678 
    679     // TODO currently we don't know if we are coming here from incognito mode
    680     ALOGI("onConnect %s", uriDebugString(url).c_str());
    681 
    682     mMasterURL = url;
    683 
    684     bool dummy;
    685     mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
    686 
    687     if (mPlaylist == NULL) {
    688         ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
    689 
    690         postPrepared(ERROR_IO);
    691         return;
    692     }
    693 
    694     // We trust the content provider to make a reasonable choice of preferred
    695     // initial bandwidth by listing it first in the variant playlist.
    696     // At startup we really don't have a good estimate on the available
    697     // network bandwidth since we haven't tranferred any data yet. Once
    698     // we have we can make a better informed choice.
    699     size_t initialBandwidth = 0;
    700     size_t initialBandwidthIndex = 0;
    701 
    702     if (mPlaylist->isVariantPlaylist()) {
    703         for (size_t i = 0; i < mPlaylist->size(); ++i) {
    704             BandwidthItem item;
    705 
    706             item.mPlaylistIndex = i;
    707 
    708             sp<AMessage> meta;
    709             AString uri;
    710             mPlaylist->itemAt(i, &uri, &meta);
    711 
    712             unsigned long bandwidth;
    713             CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
    714 
    715             if (initialBandwidth == 0) {
    716                 initialBandwidth = item.mBandwidth;
    717             }
    718 
    719             mBandwidthItems.push(item);
    720         }
    721 
    722         CHECK_GT(mBandwidthItems.size(), 0u);
    723 
    724         mBandwidthItems.sort(SortByBandwidth);
    725 
    726         for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
    727             if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
    728                 initialBandwidthIndex = i;
    729                 break;
    730             }
    731         }
    732     } else {
    733         // dummy item.
    734         BandwidthItem item;
    735         item.mPlaylistIndex = 0;
    736         item.mBandwidth = 0;
    737         mBandwidthItems.push(item);
    738     }
    739 
    740     mPlaylist->pickRandomMediaItems();
    741     changeConfiguration(
    742             0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
    743 }
    744 
    745 void LiveSession::finishDisconnect() {
    746     // No reconfiguration is currently pending, make sure none will trigger
    747     // during disconnection either.
    748     cancelCheckBandwidthEvent();
    749 
    750     // Protect mPacketSources from a swapPacketSource race condition through disconnect.
    751     // (finishDisconnect, onFinishDisconnect2)
    752     cancelBandwidthSwitch();
    753 
    754     // cancel switch down monitor
    755     mSwitchDownMonitor.clear();
    756 
    757     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
    758         mFetcherInfos.valueAt(i).mFetcher->stopAsync();
    759     }
    760 
    761     sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
    762 
    763     mContinuationCounter = mFetcherInfos.size();
    764     mContinuation = msg;
    765 
    766     if (mContinuationCounter == 0) {
    767         msg->post();
    768     }
    769 }
    770 
    771 void LiveSession::onFinishDisconnect2() {
    772     mContinuation.clear();
    773 
    774     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
    775     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
    776 
    777     mPacketSources.valueFor(
    778             STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
    779 
    780     sp<AMessage> response = new AMessage;
    781     response->setInt32("err", OK);
    782 
    783     response->postReply(mDisconnectReplyID);
    784     mDisconnectReplyID = 0;
    785 }
    786 
    787 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
    788     ssize_t index = mFetcherInfos.indexOfKey(uri);
    789 
    790     if (index >= 0) {
    791         return NULL;
    792     }
    793 
    794     sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
    795     notify->setString("uri", uri);
    796     notify->setInt32("switchGeneration", mSwitchGeneration);
    797 
    798     FetcherInfo info;
    799     info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
    800     info.mDurationUs = -1ll;
    801     info.mIsPrepared = false;
    802     info.mToBeRemoved = false;
    803     looper()->registerHandler(info.mFetcher);
    804 
    805     mFetcherInfos.add(uri, info);
    806 
    807     return info.mFetcher;
    808 }
    809 
    810 /*
    811  * Illustration of parameters:
    812  *
    813  * 0      `range_offset`
    814  * +------------+-------------------------------------------------------+--+--+
    815  * |            |                                 | next block to fetch |  |  |
    816  * |            | `source` handle => `out` buffer |                     |  |  |
    817  * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
    818  * |            |<----------- `range_length` / buffer capacity ----------->|  |
    819  * |<------------------------------ file_size ------------------------------->|
    820  *
    821  * Special parameter values:
    822  * - range_length == -1 means entire file
    823  * - block_size == 0 means entire range
    824  *
    825  */
    826 ssize_t LiveSession::fetchFile(
    827         const char *url, sp<ABuffer> *out,
    828         int64_t range_offset, int64_t range_length,
    829         uint32_t block_size, /* download block size */
    830         sp<DataSource> *source, /* to return and reuse source */
    831         String8 *actualUrl) {
    832     off64_t size;
    833     sp<DataSource> temp_source;
    834     if (source == NULL) {
    835         source = &temp_source;
    836     }
    837 
    838     if (*source == NULL) {
    839         if (!strncasecmp(url, "file://", 7)) {
    840             *source = new FileSource(url + 7);
    841         } else if (strncasecmp(url, "http://", 7)
    842                 && strncasecmp(url, "https://", 8)) {
    843             return ERROR_UNSUPPORTED;
    844         } else {
    845             KeyedVector<String8, String8> headers = mExtraHeaders;
    846             if (range_offset > 0 || range_length >= 0) {
    847                 headers.add(
    848                         String8("Range"),
    849                         String8(
    850                             StringPrintf(
    851                                 "bytes=%lld-%s",
    852                                 range_offset,
    853                                 range_length < 0
    854                                     ? "" : StringPrintf("%lld",
    855                                             range_offset + range_length - 1).c_str()).c_str()));
    856             }
    857             status_t err = mHTTPDataSource->connect(url, &headers);
    858 
    859             if (err != OK) {
    860                 return err;
    861             }
    862 
    863             *source = mHTTPDataSource;
    864         }
    865     }
    866 
    867     status_t getSizeErr = (*source)->getSize(&size);
    868     if (getSizeErr != OK) {
    869         size = 65536;
    870     }
    871 
    872     sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
    873     if (*out == NULL) {
    874         buffer->setRange(0, 0);
    875     }
    876 
    877     ssize_t bytesRead = 0;
    878     // adjust range_length if only reading partial block
    879     if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
    880         range_length = buffer->size() + block_size;
    881     }
    882     for (;;) {
    883         // Only resize when we don't know the size.
    884         size_t bufferRemaining = buffer->capacity() - buffer->size();
    885         if (bufferRemaining == 0 && getSizeErr != OK) {
    886             size_t bufferIncrement = buffer->size() / 2;
    887             if (bufferIncrement < 32768) {
    888                 bufferIncrement = 32768;
    889             }
    890             bufferRemaining = bufferIncrement;
    891 
    892             ALOGV("increasing download buffer to %zu bytes",
    893                  buffer->size() + bufferRemaining);
    894 
    895             sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
    896             memcpy(copy->data(), buffer->data(), buffer->size());
    897             copy->setRange(0, buffer->size());
    898 
    899             buffer = copy;
    900         }
    901 
    902         size_t maxBytesToRead = bufferRemaining;
    903         if (range_length >= 0) {
    904             int64_t bytesLeftInRange = range_length - buffer->size();
    905             if (bytesLeftInRange < (int64_t)maxBytesToRead) {
    906                 maxBytesToRead = bytesLeftInRange;
    907 
    908                 if (bytesLeftInRange == 0) {
    909                     break;
    910                 }
    911             }
    912         }
    913 
    914         // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
    915         // to help us break out of the loop.
    916         ssize_t n = (*source)->readAt(
    917                 buffer->size(), buffer->data() + buffer->size(),
    918                 maxBytesToRead);
    919 
    920         if (n < 0) {
    921             return n;
    922         }
    923 
    924         if (n == 0) {
    925             break;
    926         }
    927 
    928         buffer->setRange(0, buffer->size() + (size_t)n);
    929         bytesRead += n;
    930     }
    931 
    932     *out = buffer;
    933     if (actualUrl != NULL) {
    934         *actualUrl = (*source)->getUri();
    935         if (actualUrl->isEmpty()) {
    936             *actualUrl = url;
    937         }
    938     }
    939 
    940     return bytesRead;
    941 }
    942 
    943 sp<M3UParser> LiveSession::fetchPlaylist(
    944         const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
    945     ALOGV("fetchPlaylist '%s'", url);
    946 
    947     *unchanged = false;
    948 
    949     sp<ABuffer> buffer;
    950     String8 actualUrl;
    951     ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
    952 
    953     if (err <= 0) {
    954         return NULL;
    955     }
    956 
    957     // MD5 functionality is not available on the simulator, treat all
    958     // playlists as changed.
    959 
    960 #if defined(HAVE_ANDROID_OS)
    961     uint8_t hash[16];
    962 
    963     MD5_CTX m;
    964     MD5_Init(&m);
    965     MD5_Update(&m, buffer->data(), buffer->size());
    966 
    967     MD5_Final(hash, &m);
    968 
    969     if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
    970         // playlist unchanged
    971         *unchanged = true;
    972 
    973         return NULL;
    974     }
    975 
    976     if (curPlaylistHash != NULL) {
    977         memcpy(curPlaylistHash, hash, sizeof(hash));
    978     }
    979 #endif
    980 
    981     sp<M3UParser> playlist =
    982         new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
    983 
    984     if (playlist->initCheck() != OK) {
    985         ALOGE("failed to parse .m3u8 playlist");
    986 
    987         return NULL;
    988     }
    989 
    990     return playlist;
    991 }
    992 
    993 static double uniformRand() {
    994     return (double)rand() / RAND_MAX;
    995 }
    996 
    997 size_t LiveSession::getBandwidthIndex() {
    998     if (mBandwidthItems.size() == 0) {
    999         return 0;
   1000     }
   1001 
   1002 #if 1
   1003     char value[PROPERTY_VALUE_MAX];
   1004     ssize_t index = -1;
   1005     if (property_get("media.httplive.bw-index", value, NULL)) {
   1006         char *end;
   1007         index = strtol(value, &end, 10);
   1008         CHECK(end > value && *end == '\0');
   1009 
   1010         if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
   1011             index = mBandwidthItems.size() - 1;
   1012         }
   1013     }
   1014 
   1015     if (index < 0) {
   1016         int32_t bandwidthBps;
   1017         if (mHTTPDataSource != NULL
   1018                 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
   1019             ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
   1020         } else {
   1021             ALOGV("no bandwidth estimate.");
   1022             return 0;  // Pick the lowest bandwidth stream by default.
   1023         }
   1024 
   1025         char value[PROPERTY_VALUE_MAX];
   1026         if (property_get("media.httplive.max-bw", value, NULL)) {
   1027             char *end;
   1028             long maxBw = strtoul(value, &end, 10);
   1029             if (end > value && *end == '\0') {
   1030                 if (maxBw > 0 && bandwidthBps > maxBw) {
   1031                     ALOGV("bandwidth capped to %ld bps", maxBw);
   1032                     bandwidthBps = maxBw;
   1033                 }
   1034             }
   1035         }
   1036 
   1037         // Pick the highest bandwidth stream below or equal to estimated bandwidth.
   1038 
   1039         index = mBandwidthItems.size() - 1;
   1040         while (index > 0) {
   1041             // consider only 80% of the available bandwidth, but if we are switching up,
   1042             // be even more conservative (70%) to avoid overestimating and immediately
   1043             // switching back.
   1044             size_t adjustedBandwidthBps = bandwidthBps;
   1045             if (index > mCurBandwidthIndex) {
   1046                 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
   1047             } else {
   1048                 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
   1049             }
   1050             if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
   1051                 break;
   1052             }
   1053             --index;
   1054         }
   1055     }
   1056 #elif 0
   1057     // Change bandwidth at random()
   1058     size_t index = uniformRand() * mBandwidthItems.size();
   1059 #elif 0
   1060     // There's a 50% chance to stay on the current bandwidth and
   1061     // a 50% chance to switch to the next higher bandwidth (wrapping around
   1062     // to lowest)
   1063     const size_t kMinIndex = 0;
   1064 
   1065     static ssize_t mCurBandwidthIndex = -1;
   1066 
   1067     size_t index;
   1068     if (mCurBandwidthIndex < 0) {
   1069         index = kMinIndex;
   1070     } else if (uniformRand() < 0.5) {
   1071         index = (size_t)mCurBandwidthIndex;
   1072     } else {
   1073         index = mCurBandwidthIndex + 1;
   1074         if (index == mBandwidthItems.size()) {
   1075             index = kMinIndex;
   1076         }
   1077     }
   1078     mCurBandwidthIndex = index;
   1079 #elif 0
   1080     // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
   1081 
   1082     size_t index = mBandwidthItems.size() - 1;
   1083     while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
   1084         --index;
   1085     }
   1086 #elif 1
   1087     char value[PROPERTY_VALUE_MAX];
   1088     size_t index;
   1089     if (property_get("media.httplive.bw-index", value, NULL)) {
   1090         char *end;
   1091         index = strtoul(value, &end, 10);
   1092         CHECK(end > value && *end == '\0');
   1093 
   1094         if (index >= mBandwidthItems.size()) {
   1095             index = mBandwidthItems.size() - 1;
   1096         }
   1097     } else {
   1098         index = 0;
   1099     }
   1100 #else
   1101     size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
   1102 #endif
   1103 
   1104     CHECK_GE(index, 0);
   1105 
   1106     return index;
   1107 }
   1108 
   1109 int64_t LiveSession::latestMediaSegmentStartTimeUs() {
   1110     sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta();
   1111     int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1;
   1112     if (audioMeta != NULL) {
   1113         audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs);
   1114     }
   1115 
   1116     sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta();
   1117     if (videoMeta != NULL
   1118             && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) {
   1119         if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) {
   1120             minSegmentStartTimeUs = videoSegmentStartTimeUs;
   1121         }
   1122 
   1123     }
   1124     return minSegmentStartTimeUs;
   1125 }
   1126 
   1127 status_t LiveSession::onSeek(const sp<AMessage> &msg) {
   1128     int64_t timeUs;
   1129     CHECK(msg->findInt64("timeUs", &timeUs));
   1130 
   1131     if (!mReconfigurationInProgress) {
   1132         changeConfiguration(timeUs, mCurBandwidthIndex);
   1133         return OK;
   1134     } else {
   1135         return -EWOULDBLOCK;
   1136     }
   1137 }
   1138 
   1139 status_t LiveSession::getDuration(int64_t *durationUs) const {
   1140     int64_t maxDurationUs = -1ll;
   1141     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
   1142         int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
   1143 
   1144         if (fetcherDurationUs > maxDurationUs) {
   1145             maxDurationUs = fetcherDurationUs;
   1146         }
   1147     }
   1148 
   1149     *durationUs = maxDurationUs;
   1150 
   1151     return OK;
   1152 }
   1153 
   1154 bool LiveSession::isSeekable() const {
   1155     int64_t durationUs;
   1156     return getDuration(&durationUs) == OK && durationUs >= 0;
   1157 }
   1158 
   1159 bool LiveSession::hasDynamicDuration() const {
   1160     return false;
   1161 }
   1162 
   1163 size_t LiveSession::getTrackCount() const {
   1164     if (mPlaylist == NULL) {
   1165         return 0;
   1166     } else {
   1167         return mPlaylist->getTrackCount();
   1168     }
   1169 }
   1170 
   1171 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
   1172     if (mPlaylist == NULL) {
   1173         return NULL;
   1174     } else {
   1175         return mPlaylist->getTrackInfo(trackIndex);
   1176     }
   1177 }
   1178 
   1179 status_t LiveSession::selectTrack(size_t index, bool select) {
   1180     if (mPlaylist == NULL) {
   1181         return INVALID_OPERATION;
   1182     }
   1183 
   1184     ++mSubtitleGeneration;
   1185     status_t err = mPlaylist->selectTrack(index, select);
   1186     if (err == OK) {
   1187         sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
   1188         msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
   1189         msg->setInt32("pickTrack", select);
   1190         msg->post();
   1191     }
   1192     return err;
   1193 }
   1194 
   1195 ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
   1196     if (mPlaylist == NULL) {
   1197         return -1;
   1198     } else {
   1199         return mPlaylist->getSelectedTrack(type);
   1200     }
   1201 }
   1202 
   1203 bool LiveSession::canSwitchUp() {
   1204     // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
   1205     status_t err = OK;
   1206     for (size_t i = 0; i < mPacketSources.size(); ++i) {
   1207         sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
   1208         int64_t dur = source->getBufferedDurationUs(&err);
   1209         if (err == OK && dur > 10000000) {
   1210             return true;
   1211         }
   1212     }
   1213     return false;
   1214 }
   1215 
   1216 void LiveSession::changeConfiguration(
   1217         int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
   1218     // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
   1219     // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
   1220     cancelBandwidthSwitch();
   1221 
   1222     CHECK(!mReconfigurationInProgress);
   1223     mReconfigurationInProgress = true;
   1224 
   1225     mCurBandwidthIndex = bandwidthIndex;
   1226 
   1227     ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
   1228           timeUs, bandwidthIndex, pickTrack);
   1229 
   1230     CHECK_LT(bandwidthIndex, mBandwidthItems.size());
   1231     const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
   1232 
   1233     uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
   1234     uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
   1235 
   1236     AString URIs[kMaxStreams];
   1237     for (size_t i = 0; i < kMaxStreams; ++i) {
   1238         if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
   1239             streamMask |= indexToType(i);
   1240         }
   1241     }
   1242 
   1243     // Step 1, stop and discard fetchers that are no longer needed.
   1244     // Pause those that we'll reuse.
   1245     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
   1246         const AString &uri = mFetcherInfos.keyAt(i);
   1247 
   1248         bool discardFetcher = true;
   1249 
   1250         // If we're seeking all current fetchers are discarded.
   1251         if (timeUs < 0ll) {
   1252             // delay fetcher removal if not picking tracks
   1253             discardFetcher = pickTrack;
   1254 
   1255             for (size_t j = 0; j < kMaxStreams; ++j) {
   1256                 StreamType type = indexToType(j);
   1257                 if ((streamMask & type) && uri == URIs[j]) {
   1258                     resumeMask |= type;
   1259                     streamMask &= ~type;
   1260                     discardFetcher = false;
   1261                 }
   1262             }
   1263         }
   1264 
   1265         if (discardFetcher) {
   1266             mFetcherInfos.valueAt(i).mFetcher->stopAsync();
   1267         } else {
   1268             mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
   1269         }
   1270     }
   1271 
   1272     sp<AMessage> msg;
   1273     if (timeUs < 0ll) {
   1274         // skip onChangeConfiguration2 (decoder destruction) if not seeking.
   1275         msg = new AMessage(kWhatChangeConfiguration3, id());
   1276     } else {
   1277         msg = new AMessage(kWhatChangeConfiguration2, id());
   1278     }
   1279     msg->setInt32("streamMask", streamMask);
   1280     msg->setInt32("resumeMask", resumeMask);
   1281     msg->setInt32("pickTrack", pickTrack);
   1282     msg->setInt64("timeUs", timeUs);
   1283     for (size_t i = 0; i < kMaxStreams; ++i) {
   1284         if ((streamMask | resumeMask) & indexToType(i)) {
   1285             msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
   1286         }
   1287     }
   1288 
   1289     // Every time a fetcher acknowledges the stopAsync or pauseAsync request
   1290     // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
   1291     // fetchers have completed their asynchronous operation, we'll post
   1292     // mContinuation, which then is handled below in onChangeConfiguration2.
   1293     mContinuationCounter = mFetcherInfos.size();
   1294     mContinuation = msg;
   1295 
   1296     if (mContinuationCounter == 0) {
   1297         msg->post();
   1298 
   1299         if (mSeekReplyID != 0) {
   1300             CHECK(mSeekReply != NULL);
   1301             mSeekReply->setInt32("err", OK);
   1302             mSeekReply->postReply(mSeekReplyID);
   1303             mSeekReplyID = 0;
   1304             mSeekReply.clear();
   1305         }
   1306     }
   1307 }
   1308 
   1309 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
   1310     if (!mReconfigurationInProgress) {
   1311         int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
   1312         msg->findInt32("pickTrack", &pickTrack);
   1313         msg->findInt32("bandwidthIndex", &bandwidthIndex);
   1314         changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
   1315     } else {
   1316         msg->post(1000000ll); // retry in 1 sec
   1317     }
   1318 }
   1319 
   1320 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
   1321     mContinuation.clear();
   1322 
   1323     // All fetchers are either suspended or have been removed now.
   1324 
   1325     uint32_t streamMask, resumeMask;
   1326     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
   1327     CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
   1328 
   1329     // currently onChangeConfiguration2 is only called for seeking;
   1330     // remove the following CHECK if using it else where.
   1331     CHECK_EQ(resumeMask, 0);
   1332     streamMask |= resumeMask;
   1333 
   1334     AString URIs[kMaxStreams];
   1335     for (size_t i = 0; i < kMaxStreams; ++i) {
   1336         if (streamMask & indexToType(i)) {
   1337             const AString &uriKey = mStreams[i].uriKey();
   1338             CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
   1339             ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
   1340         }
   1341     }
   1342 
   1343     // Determine which decoders to shutdown on the player side,
   1344     // a decoder has to be shutdown if either
   1345     // 1) its streamtype was active before but now longer isn't.
   1346     // or
   1347     // 2) its streamtype was already active and still is but the URI
   1348     //    has changed.
   1349     uint32_t changedMask = 0;
   1350     for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
   1351         if (((mStreamMask & streamMask & indexToType(i))
   1352                 && !(URIs[i] == mStreams[i].mUri))
   1353                 || (mStreamMask & ~streamMask & indexToType(i))) {
   1354             changedMask |= indexToType(i);
   1355         }
   1356     }
   1357 
   1358     if (changedMask == 0) {
   1359         // If nothing changed as far as the audio/video decoders
   1360         // are concerned we can proceed.
   1361         onChangeConfiguration3(msg);
   1362         return;
   1363     }
   1364 
   1365     // Something changed, inform the player which will shutdown the
   1366     // corresponding decoders and will post the reply once that's done.
   1367     // Handling the reply will continue executing below in
   1368     // onChangeConfiguration3.
   1369     sp<AMessage> notify = mNotify->dup();
   1370     notify->setInt32("what", kWhatStreamsChanged);
   1371     notify->setInt32("changedMask", changedMask);
   1372 
   1373     msg->setWhat(kWhatChangeConfiguration3);
   1374     msg->setTarget(id());
   1375 
   1376     notify->setMessage("reply", msg);
   1377     notify->post();
   1378 }
   1379 
   1380 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
   1381     mContinuation.clear();
   1382     // All remaining fetchers are still suspended, the player has shutdown
   1383     // any decoders that needed it.
   1384 
   1385     uint32_t streamMask, resumeMask;
   1386     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
   1387     CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
   1388 
   1389     int64_t timeUs;
   1390     int32_t pickTrack;
   1391     bool switching = false;
   1392     CHECK(msg->findInt64("timeUs", &timeUs));
   1393     CHECK(msg->findInt32("pickTrack", &pickTrack));
   1394 
   1395     if (timeUs < 0ll) {
   1396         if (!pickTrack) {
   1397             switching = true;
   1398         }
   1399         mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
   1400     } else {
   1401         mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
   1402     }
   1403 
   1404     for (size_t i = 0; i < kMaxStreams; ++i) {
   1405         if (streamMask & indexToType(i)) {
   1406             if (switching) {
   1407                 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
   1408             } else {
   1409                 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
   1410             }
   1411         }
   1412     }
   1413 
   1414     mNewStreamMask = streamMask | resumeMask;
   1415     if (switching) {
   1416         mSwapMask = mStreamMask & ~resumeMask;
   1417     }
   1418 
   1419     // Of all existing fetchers:
   1420     // * Resume fetchers that are still needed and assign them original packet sources.
   1421     // * Mark otherwise unneeded fetchers for removal.
   1422     ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
   1423     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
   1424         const AString &uri = mFetcherInfos.keyAt(i);
   1425 
   1426         sp<AnotherPacketSource> sources[kMaxStreams];
   1427         for (size_t j = 0; j < kMaxStreams; ++j) {
   1428             if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
   1429                 sources[j] = mPacketSources.valueFor(indexToType(j));
   1430 
   1431                 if (j != kSubtitleIndex) {
   1432                     ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
   1433                     sp<AnotherPacketSource> discontinuityQueue;
   1434                     discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
   1435                     discontinuityQueue->queueDiscontinuity(
   1436                             ATSParser::DISCONTINUITY_NONE,
   1437                             NULL,
   1438                             true);
   1439                 }
   1440             }
   1441         }
   1442 
   1443         FetcherInfo &info = mFetcherInfos.editValueAt(i);
   1444         if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
   1445                 || sources[kSubtitleIndex] != NULL) {
   1446             info.mFetcher->startAsync(
   1447                     sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
   1448         } else {
   1449             info.mToBeRemoved = true;
   1450         }
   1451     }
   1452 
   1453     // streamMask now only contains the types that need a new fetcher created.
   1454 
   1455     if (streamMask != 0) {
   1456         ALOGV("creating new fetchers for mask 0x%08x", streamMask);
   1457     }
   1458 
   1459     // Find out when the original fetchers have buffered up to and start the new fetchers
   1460     // at a later timestamp.
   1461     for (size_t i = 0; i < kMaxStreams; i++) {
   1462         if (!(indexToType(i) & streamMask)) {
   1463             continue;
   1464         }
   1465 
   1466         AString uri;
   1467         uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
   1468 
   1469         sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
   1470         CHECK(fetcher != NULL);
   1471 
   1472         int32_t latestSeq = -1;
   1473         int64_t startTimeUs = -1;
   1474         int64_t segmentStartTimeUs = -1ll;
   1475         int32_t discontinuitySeq = -1;
   1476         sp<AnotherPacketSource> sources[kMaxStreams];
   1477 
   1478         if (i == kSubtitleIndex) {
   1479             segmentStartTimeUs = latestMediaSegmentStartTimeUs();
   1480         }
   1481 
   1482         // TRICKY: looping from i as earlier streams are already removed from streamMask
   1483         for (size_t j = i; j < kMaxStreams; ++j) {
   1484             const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
   1485             if ((streamMask & indexToType(j)) && uri == streamUri) {
   1486                 sources[j] = mPacketSources.valueFor(indexToType(j));
   1487 
   1488                 if (timeUs >= 0) {
   1489                     sources[j]->clear();
   1490                     startTimeUs = timeUs;
   1491 
   1492                     sp<AnotherPacketSource> discontinuityQueue;
   1493                     sp<AMessage> extra = new AMessage;
   1494                     extra->setInt64("timeUs", timeUs);
   1495                     discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
   1496                     discontinuityQueue->queueDiscontinuity(
   1497                             ATSParser::DISCONTINUITY_TIME, extra, true);
   1498                 } else {
   1499                     int32_t type;
   1500                     int64_t srcSegmentStartTimeUs;
   1501                     sp<AMessage> meta;
   1502                     if (pickTrack) {
   1503                         // selecting
   1504                         meta = sources[j]->getLatestDequeuedMeta();
   1505                     } else {
   1506                         // adapting
   1507                         meta = sources[j]->getLatestEnqueuedMeta();
   1508                     }
   1509 
   1510                     if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
   1511                         int64_t tmpUs;
   1512                         int64_t tmpSegmentUs;
   1513 
   1514                         CHECK(meta->findInt64("timeUs", &tmpUs));
   1515                         CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs));
   1516                         if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) {
   1517                             startTimeUs = tmpUs;
   1518                             segmentStartTimeUs = tmpSegmentUs;
   1519                         } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) {
   1520                             startTimeUs = tmpUs;
   1521                         }
   1522 
   1523                         int32_t seq;
   1524                         CHECK(meta->findInt32("discontinuitySeq", &seq));
   1525                         if (discontinuitySeq < 0 || seq < discontinuitySeq) {
   1526                             discontinuitySeq = seq;
   1527                         }
   1528                     }
   1529 
   1530                     if (pickTrack) {
   1531                         // selecting track, queue discontinuities before content
   1532                         sources[j]->clear();
   1533                         if (j == kSubtitleIndex) {
   1534                             break;
   1535                         }
   1536                         sp<AnotherPacketSource> discontinuityQueue;
   1537                         discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
   1538                         discontinuityQueue->queueDiscontinuity(
   1539                                 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
   1540                     } else {
   1541                         // adapting, queue discontinuities after resume
   1542                         sources[j] = mPacketSources2.valueFor(indexToType(j));
   1543                         sources[j]->clear();
   1544                         uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
   1545                         if (extraStreams & indexToType(j)) {
   1546                             sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
   1547                         }
   1548                     }
   1549                 }
   1550 
   1551                 streamMask &= ~indexToType(j);
   1552             }
   1553         }
   1554 
   1555         fetcher->startAsync(
   1556                 sources[kAudioIndex],
   1557                 sources[kVideoIndex],
   1558                 sources[kSubtitleIndex],
   1559                 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
   1560                 segmentStartTimeUs,
   1561                 discontinuitySeq,
   1562                 switching);
   1563     }
   1564 
   1565     // All fetchers have now been started, the configuration change
   1566     // has completed.
   1567 
   1568     cancelCheckBandwidthEvent();
   1569     scheduleCheckBandwidthEvent();
   1570 
   1571     ALOGV("XXX configuration change completed.");
   1572     mReconfigurationInProgress = false;
   1573     if (switching) {
   1574         mSwitchInProgress = true;
   1575     } else {
   1576         mStreamMask = mNewStreamMask;
   1577     }
   1578 
   1579     if (mDisconnectReplyID != 0) {
   1580         finishDisconnect();
   1581     }
   1582 }
   1583 
   1584 void LiveSession::onSwapped(const sp<AMessage> &msg) {
   1585     int32_t switchGeneration;
   1586     CHECK(msg->findInt32("switchGeneration", &switchGeneration));
   1587     if (switchGeneration != mSwitchGeneration) {
   1588         return;
   1589     }
   1590 
   1591     int32_t stream;
   1592     CHECK(msg->findInt32("stream", &stream));
   1593 
   1594     ssize_t idx = typeToIndex(stream);
   1595     CHECK(idx >= 0);
   1596     if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
   1597         ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
   1598     }
   1599     mStreams[idx].mUri = mStreams[idx].mNewUri;
   1600     mStreams[idx].mNewUri.clear();
   1601 
   1602     mSwapMask &= ~stream;
   1603     if (mSwapMask != 0) {
   1604         return;
   1605     }
   1606 
   1607     // Check if new variant contains extra streams.
   1608     uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
   1609     while (extraStreams) {
   1610         StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
   1611         swapPacketSource(extraStream);
   1612         extraStreams &= ~extraStream;
   1613 
   1614         idx = typeToIndex(extraStream);
   1615         CHECK(idx >= 0);
   1616         if (mStreams[idx].mNewUri.empty()) {
   1617             ALOGW("swapping extra stream type %d %s to empty stream",
   1618                     extraStream, mStreams[idx].mUri.c_str());
   1619         }
   1620         mStreams[idx].mUri = mStreams[idx].mNewUri;
   1621         mStreams[idx].mNewUri.clear();
   1622     }
   1623 
   1624     tryToFinishBandwidthSwitch();
   1625 }
   1626 
   1627 void LiveSession::onCheckSwitchDown() {
   1628     if (mSwitchDownMonitor == NULL) {
   1629         return;
   1630     }
   1631 
   1632     if (mSwitchInProgress || mReconfigurationInProgress) {
   1633         ALOGV("Switch/Reconfig in progress, defer switch down");
   1634         mSwitchDownMonitor->post(1000000ll);
   1635         return;
   1636     }
   1637 
   1638     for (size_t i = 0; i < kMaxStreams; ++i) {
   1639         int32_t targetDuration;
   1640         sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
   1641         sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
   1642 
   1643         if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
   1644             int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
   1645             int64_t targetDurationUs = targetDuration * 1000000ll;
   1646 
   1647             if (bufferedDurationUs < targetDurationUs / 3) {
   1648                 (new AMessage(kWhatSwitchDown, id()))->post();
   1649                 break;
   1650             }
   1651         }
   1652     }
   1653 
   1654     mSwitchDownMonitor->post(1000000ll);
   1655 }
   1656 
   1657 void LiveSession::onSwitchDown() {
   1658     if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
   1659         return;
   1660     }
   1661 
   1662     ssize_t bandwidthIndex = getBandwidthIndex();
   1663     if (bandwidthIndex < mCurBandwidthIndex) {
   1664         changeConfiguration(-1, bandwidthIndex, false);
   1665         return;
   1666     }
   1667 
   1668 }
   1669 
   1670 // Mark switch done when:
   1671 //   1. all old buffers are swapped out
   1672 void LiveSession::tryToFinishBandwidthSwitch() {
   1673     if (!mSwitchInProgress) {
   1674         return;
   1675     }
   1676 
   1677     bool needToRemoveFetchers = false;
   1678     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
   1679         if (mFetcherInfos.valueAt(i).mToBeRemoved) {
   1680             needToRemoveFetchers = true;
   1681             break;
   1682         }
   1683     }
   1684 
   1685     if (!needToRemoveFetchers && mSwapMask == 0) {
   1686         ALOGI("mSwitchInProgress = false");
   1687         mStreamMask = mNewStreamMask;
   1688         mSwitchInProgress = false;
   1689     }
   1690 }
   1691 
   1692 void LiveSession::scheduleCheckBandwidthEvent() {
   1693     sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
   1694     msg->setInt32("generation", mCheckBandwidthGeneration);
   1695     msg->post(10000000ll);
   1696 }
   1697 
   1698 void LiveSession::cancelCheckBandwidthEvent() {
   1699     ++mCheckBandwidthGeneration;
   1700 }
   1701 
   1702 void LiveSession::cancelBandwidthSwitch() {
   1703     Mutex::Autolock lock(mSwapMutex);
   1704     mSwitchGeneration++;
   1705     mSwitchInProgress = false;
   1706     mSwapMask = 0;
   1707 
   1708     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
   1709         FetcherInfo& info = mFetcherInfos.editValueAt(i);
   1710         if (info.mToBeRemoved) {
   1711             info.mToBeRemoved = false;
   1712         }
   1713     }
   1714 
   1715     for (size_t i = 0; i < kMaxStreams; ++i) {
   1716         if (!mStreams[i].mNewUri.empty()) {
   1717             ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
   1718             if (j < 0) {
   1719                 mStreams[i].mNewUri.clear();
   1720                 continue;
   1721             }
   1722 
   1723             const FetcherInfo &info = mFetcherInfos.valueAt(j);
   1724             info.mFetcher->stopAsync();
   1725             mFetcherInfos.removeItemsAt(j);
   1726             mStreams[i].mNewUri.clear();
   1727         }
   1728     }
   1729 }
   1730 
   1731 bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
   1732     if (mReconfigurationInProgress || mSwitchInProgress) {
   1733         return false;
   1734     }
   1735 
   1736     if (mCurBandwidthIndex < 0) {
   1737         return true;
   1738     }
   1739 
   1740     if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
   1741         return false;
   1742     } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
   1743         return canSwitchUp();
   1744     } else {
   1745         return true;
   1746     }
   1747 }
   1748 
   1749 void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
   1750     size_t bandwidthIndex = getBandwidthIndex();
   1751     if (canSwitchBandwidthTo(bandwidthIndex)) {
   1752         changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
   1753     } else {
   1754         // Come back and check again 10 seconds later in case there is nothing to do now.
   1755         // If we DO change configuration, once that completes it'll schedule a new
   1756         // check bandwidth event with an incremented mCheckBandwidthGeneration.
   1757         msg->post(10000000ll);
   1758     }
   1759 }
   1760 
   1761 void LiveSession::postPrepared(status_t err) {
   1762     CHECK(mInPreparationPhase);
   1763 
   1764     sp<AMessage> notify = mNotify->dup();
   1765     if (err == OK || err == ERROR_END_OF_STREAM) {
   1766         notify->setInt32("what", kWhatPrepared);
   1767     } else {
   1768         notify->setInt32("what", kWhatPreparationFailed);
   1769         notify->setInt32("err", err);
   1770     }
   1771 
   1772     notify->post();
   1773 
   1774     mInPreparationPhase = false;
   1775 
   1776     mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
   1777     mSwitchDownMonitor->post();
   1778 }
   1779 
   1780 }  // namespace android
   1781 
   1782