Home | History | Annotate | Download | only in httplive
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "LiveSession"
     19 #include <utils/Log.h>
     20 
     21 #include "LiveSession.h"
     22 
     23 #include "M3UParser.h"
     24 #include "PlaylistFetcher.h"
     25 
     26 #include "include/HTTPBase.h"
     27 #include "mpeg2ts/AnotherPacketSource.h"
     28 
     29 #include <cutils/properties.h>
     30 #include <media/stagefright/foundation/hexdump.h>
     31 #include <media/stagefright/foundation/ABuffer.h>
     32 #include <media/stagefright/foundation/ADebug.h>
     33 #include <media/stagefright/foundation/AMessage.h>
     34 #include <media/stagefright/DataSource.h>
     35 #include <media/stagefright/FileSource.h>
     36 #include <media/stagefright/MediaErrors.h>
     37 #include <media/stagefright/MetaData.h>
     38 #include <media/stagefright/Utils.h>
     39 
     40 #include <ctype.h>
     41 #include <openssl/aes.h>
     42 #include <openssl/md5.h>
     43 
     44 namespace android {
     45 
     46 LiveSession::LiveSession(
     47         const sp<AMessage> &notify, uint32_t flags, bool uidValid, uid_t uid)
     48     : mNotify(notify),
     49       mFlags(flags),
     50       mUIDValid(uidValid),
     51       mUID(uid),
     52       mInPreparationPhase(true),
     53       mHTTPDataSource(
     54               HTTPBase::Create(
     55                   (mFlags & kFlagIncognito)
     56                     ? HTTPBase::kFlagIncognito
     57                     : 0)),
     58       mPrevBandwidthIndex(-1),
     59       mStreamMask(0),
     60       mCheckBandwidthGeneration(0),
     61       mLastDequeuedTimeUs(0ll),
     62       mRealTimeBaseUs(0ll),
     63       mReconfigurationInProgress(false),
     64       mDisconnectReplyID(0) {
     65     if (mUIDValid) {
     66         mHTTPDataSource->setUID(mUID);
     67     }
     68 
     69     mPacketSources.add(
     70             STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */));
     71 
     72     mPacketSources.add(
     73             STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */));
     74 
     75     mPacketSources.add(
     76             STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */));
     77 }
     78 
     79 LiveSession::~LiveSession() {
     80 }
     81 
     82 status_t LiveSession::dequeueAccessUnit(
     83         StreamType stream, sp<ABuffer> *accessUnit) {
     84     if (!(mStreamMask & stream)) {
     85         return UNKNOWN_ERROR;
     86     }
     87 
     88     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
     89 
     90     status_t finalResult;
     91     if (!packetSource->hasBufferAvailable(&finalResult)) {
     92         return finalResult == OK ? -EAGAIN : finalResult;
     93     }
     94 
     95     status_t err = packetSource->dequeueAccessUnit(accessUnit);
     96 
     97     const char *streamStr;
     98     switch (stream) {
     99         case STREAMTYPE_AUDIO:
    100             streamStr = "audio";
    101             break;
    102         case STREAMTYPE_VIDEO:
    103             streamStr = "video";
    104             break;
    105         case STREAMTYPE_SUBTITLES:
    106             streamStr = "subs";
    107             break;
    108         default:
    109             TRESPASS();
    110     }
    111 
    112     if (err == INFO_DISCONTINUITY) {
    113         int32_t type;
    114         CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
    115 
    116         sp<AMessage> extra;
    117         if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
    118             extra.clear();
    119         }
    120 
    121         ALOGI("[%s] read discontinuity of type %d, extra = %s",
    122               streamStr,
    123               type,
    124               extra == NULL ? "NULL" : extra->debugString().c_str());
    125     } else if (err == OK) {
    126         if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
    127             int64_t timeUs;
    128             CHECK((*accessUnit)->meta()->findInt64("timeUs",  &timeUs));
    129             ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs);
    130 
    131             mLastDequeuedTimeUs = timeUs;
    132             mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
    133         } else if (stream == STREAMTYPE_SUBTITLES) {
    134             (*accessUnit)->meta()->setInt32(
    135                     "trackIndex", mPlaylist->getSelectedIndex());
    136             (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
    137         }
    138     } else {
    139         ALOGI("[%s] encountered error %d", streamStr, err);
    140     }
    141 
    142     return err;
    143 }
    144 
    145 status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
    146     if (!(mStreamMask & stream)) {
    147         return UNKNOWN_ERROR;
    148     }
    149 
    150     sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
    151 
    152     sp<MetaData> meta = packetSource->getFormat();
    153 
    154     if (meta == NULL) {
    155         return -EAGAIN;
    156     }
    157 
    158     return convertMetaDataToMessage(meta, format);
    159 }
    160 
    161 void LiveSession::connectAsync(
    162         const char *url, const KeyedVector<String8, String8> *headers) {
    163     sp<AMessage> msg = new AMessage(kWhatConnect, id());
    164     msg->setString("url", url);
    165 
    166     if (headers != NULL) {
    167         msg->setPointer(
    168                 "headers",
    169                 new KeyedVector<String8, String8>(*headers));
    170     }
    171 
    172     msg->post();
    173 }
    174 
    175 status_t LiveSession::disconnect() {
    176     sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
    177 
    178     sp<AMessage> response;
    179     status_t err = msg->postAndAwaitResponse(&response);
    180 
    181     return err;
    182 }
    183 
    184 status_t LiveSession::seekTo(int64_t timeUs) {
    185     sp<AMessage> msg = new AMessage(kWhatSeek, id());
    186     msg->setInt64("timeUs", timeUs);
    187 
    188     sp<AMessage> response;
    189     status_t err = msg->postAndAwaitResponse(&response);
    190 
    191     return err;
    192 }
    193 
    194 void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
    195     switch (msg->what()) {
    196         case kWhatConnect:
    197         {
    198             onConnect(msg);
    199             break;
    200         }
    201 
    202         case kWhatDisconnect:
    203         {
    204             CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
    205 
    206             if (mReconfigurationInProgress) {
    207                 break;
    208             }
    209 
    210             finishDisconnect();
    211             break;
    212         }
    213 
    214         case kWhatSeek:
    215         {
    216             uint32_t replyID;
    217             CHECK(msg->senderAwaitsResponse(&replyID));
    218 
    219             status_t err = onSeek(msg);
    220 
    221             sp<AMessage> response = new AMessage;
    222             response->setInt32("err", err);
    223 
    224             response->postReply(replyID);
    225             break;
    226         }
    227 
    228         case kWhatFetcherNotify:
    229         {
    230             int32_t what;
    231             CHECK(msg->findInt32("what", &what));
    232 
    233             switch (what) {
    234                 case PlaylistFetcher::kWhatStarted:
    235                     break;
    236                 case PlaylistFetcher::kWhatPaused:
    237                 case PlaylistFetcher::kWhatStopped:
    238                 {
    239                     if (what == PlaylistFetcher::kWhatStopped) {
    240                         AString uri;
    241                         CHECK(msg->findString("uri", &uri));
    242                         mFetcherInfos.removeItem(uri);
    243                     }
    244 
    245                     if (mContinuation != NULL) {
    246                         CHECK_GT(mContinuationCounter, 0);
    247                         if (--mContinuationCounter == 0) {
    248                             mContinuation->post();
    249                         }
    250                     }
    251                     break;
    252                 }
    253 
    254                 case PlaylistFetcher::kWhatDurationUpdate:
    255                 {
    256                     AString uri;
    257                     CHECK(msg->findString("uri", &uri));
    258 
    259                     int64_t durationUs;
    260                     CHECK(msg->findInt64("durationUs", &durationUs));
    261 
    262                     FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
    263                     info->mDurationUs = durationUs;
    264                     break;
    265                 }
    266 
    267                 case PlaylistFetcher::kWhatError:
    268                 {
    269                     status_t err;
    270                     CHECK(msg->findInt32("err", &err));
    271 
    272                     ALOGE("XXX Received error %d from PlaylistFetcher.", err);
    273 
    274                     if (mInPreparationPhase) {
    275                         postPrepared(err);
    276                     }
    277 
    278                     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
    279 
    280                     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
    281 
    282                     mPacketSources.valueFor(
    283                             STREAMTYPE_SUBTITLES)->signalEOS(err);
    284 
    285                     sp<AMessage> notify = mNotify->dup();
    286                     notify->setInt32("what", kWhatError);
    287                     notify->setInt32("err", err);
    288                     notify->post();
    289                     break;
    290                 }
    291 
    292                 case PlaylistFetcher::kWhatTemporarilyDoneFetching:
    293                 {
    294                     AString uri;
    295                     CHECK(msg->findString("uri", &uri));
    296 
    297                     FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
    298                     info->mIsPrepared = true;
    299 
    300                     if (mInPreparationPhase) {
    301                         bool allFetchersPrepared = true;
    302                         for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
    303                             if (!mFetcherInfos.valueAt(i).mIsPrepared) {
    304                                 allFetchersPrepared = false;
    305                                 break;
    306                             }
    307                         }
    308 
    309                         if (allFetchersPrepared) {
    310                             postPrepared(OK);
    311                         }
    312                     }
    313                     break;
    314                 }
    315 
    316                 default:
    317                     TRESPASS();
    318             }
    319 
    320             break;
    321         }
    322 
    323         case kWhatCheckBandwidth:
    324         {
    325             int32_t generation;
    326             CHECK(msg->findInt32("generation", &generation));
    327 
    328             if (generation != mCheckBandwidthGeneration) {
    329                 break;
    330             }
    331 
    332             onCheckBandwidth();
    333             break;
    334         }
    335 
    336         case kWhatChangeConfiguration:
    337         {
    338             onChangeConfiguration(msg);
    339             break;
    340         }
    341 
    342         case kWhatChangeConfiguration2:
    343         {
    344             onChangeConfiguration2(msg);
    345             break;
    346         }
    347 
    348         case kWhatChangeConfiguration3:
    349         {
    350             onChangeConfiguration3(msg);
    351             break;
    352         }
    353 
    354         case kWhatFinishDisconnect2:
    355         {
    356             onFinishDisconnect2();
    357             break;
    358         }
    359 
    360         default:
    361             TRESPASS();
    362             break;
    363     }
    364 }
    365 
    366 // static
    367 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
    368     if (a->mBandwidth < b->mBandwidth) {
    369         return -1;
    370     } else if (a->mBandwidth == b->mBandwidth) {
    371         return 0;
    372     }
    373 
    374     return 1;
    375 }
    376 
    377 void LiveSession::onConnect(const sp<AMessage> &msg) {
    378     AString url;
    379     CHECK(msg->findString("url", &url));
    380 
    381     KeyedVector<String8, String8> *headers = NULL;
    382     if (!msg->findPointer("headers", (void **)&headers)) {
    383         mExtraHeaders.clear();
    384     } else {
    385         mExtraHeaders = *headers;
    386 
    387         delete headers;
    388         headers = NULL;
    389     }
    390 
    391 #if 1
    392     ALOGI("onConnect <URL suppressed>");
    393 #else
    394     ALOGI("onConnect %s", url.c_str());
    395 #endif
    396 
    397     mMasterURL = url;
    398 
    399     bool dummy;
    400     mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
    401 
    402     if (mPlaylist == NULL) {
    403         ALOGE("unable to fetch master playlist '%s'.", url.c_str());
    404 
    405         postPrepared(ERROR_IO);
    406         return;
    407     }
    408 
    409     // We trust the content provider to make a reasonable choice of preferred
    410     // initial bandwidth by listing it first in the variant playlist.
    411     // At startup we really don't have a good estimate on the available
    412     // network bandwidth since we haven't tranferred any data yet. Once
    413     // we have we can make a better informed choice.
    414     size_t initialBandwidth = 0;
    415     size_t initialBandwidthIndex = 0;
    416 
    417     if (mPlaylist->isVariantPlaylist()) {
    418         for (size_t i = 0; i < mPlaylist->size(); ++i) {
    419             BandwidthItem item;
    420 
    421             item.mPlaylistIndex = i;
    422 
    423             sp<AMessage> meta;
    424             AString uri;
    425             mPlaylist->itemAt(i, &uri, &meta);
    426 
    427             unsigned long bandwidth;
    428             CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
    429 
    430             if (initialBandwidth == 0) {
    431                 initialBandwidth = item.mBandwidth;
    432             }
    433 
    434             mBandwidthItems.push(item);
    435         }
    436 
    437         CHECK_GT(mBandwidthItems.size(), 0u);
    438 
    439         mBandwidthItems.sort(SortByBandwidth);
    440 
    441         for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
    442             if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
    443                 initialBandwidthIndex = i;
    444                 break;
    445             }
    446         }
    447     } else {
    448         // dummy item.
    449         BandwidthItem item;
    450         item.mPlaylistIndex = 0;
    451         item.mBandwidth = 0;
    452         mBandwidthItems.push(item);
    453     }
    454 
    455     changeConfiguration(
    456             0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */);
    457 }
    458 
    459 void LiveSession::finishDisconnect() {
    460     // No reconfiguration is currently pending, make sure none will trigger
    461     // during disconnection either.
    462     cancelCheckBandwidthEvent();
    463 
    464     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
    465         mFetcherInfos.valueAt(i).mFetcher->stopAsync();
    466     }
    467 
    468     sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
    469 
    470     mContinuationCounter = mFetcherInfos.size();
    471     mContinuation = msg;
    472 
    473     if (mContinuationCounter == 0) {
    474         msg->post();
    475     }
    476 }
    477 
    478 void LiveSession::onFinishDisconnect2() {
    479     mContinuation.clear();
    480 
    481     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
    482     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
    483 
    484     mPacketSources.valueFor(
    485             STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
    486 
    487     sp<AMessage> response = new AMessage;
    488     response->setInt32("err", OK);
    489 
    490     response->postReply(mDisconnectReplyID);
    491     mDisconnectReplyID = 0;
    492 }
    493 
    494 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
    495     ssize_t index = mFetcherInfos.indexOfKey(uri);
    496 
    497     if (index >= 0) {
    498         return NULL;
    499     }
    500 
    501     sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
    502     notify->setString("uri", uri);
    503 
    504     FetcherInfo info;
    505     info.mFetcher = new PlaylistFetcher(notify, this, uri);
    506     info.mDurationUs = -1ll;
    507     info.mIsPrepared = false;
    508     looper()->registerHandler(info.mFetcher);
    509 
    510     mFetcherInfos.add(uri, info);
    511 
    512     return info.mFetcher;
    513 }
    514 
    515 status_t LiveSession::fetchFile(
    516         const char *url, sp<ABuffer> *out,
    517         int64_t range_offset, int64_t range_length) {
    518     *out = NULL;
    519 
    520     sp<DataSource> source;
    521 
    522     if (!strncasecmp(url, "file://", 7)) {
    523         source = new FileSource(url + 7);
    524     } else if (strncasecmp(url, "http://", 7)
    525             && strncasecmp(url, "https://", 8)) {
    526         return ERROR_UNSUPPORTED;
    527     } else {
    528         KeyedVector<String8, String8> headers = mExtraHeaders;
    529         if (range_offset > 0 || range_length >= 0) {
    530             headers.add(
    531                     String8("Range"),
    532                     String8(
    533                         StringPrintf(
    534                             "bytes=%lld-%s",
    535                             range_offset,
    536                             range_length < 0
    537                                 ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str()));
    538         }
    539         status_t err = mHTTPDataSource->connect(url, &headers);
    540 
    541         if (err != OK) {
    542             return err;
    543         }
    544 
    545         source = mHTTPDataSource;
    546     }
    547 
    548     off64_t size;
    549     status_t err = source->getSize(&size);
    550 
    551     if (err != OK) {
    552         size = 65536;
    553     }
    554 
    555     sp<ABuffer> buffer = new ABuffer(size);
    556     buffer->setRange(0, 0);
    557 
    558     for (;;) {
    559         size_t bufferRemaining = buffer->capacity() - buffer->size();
    560 
    561         if (bufferRemaining == 0) {
    562             bufferRemaining = 32768;
    563 
    564             ALOGV("increasing download buffer to %d bytes",
    565                  buffer->size() + bufferRemaining);
    566 
    567             sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
    568             memcpy(copy->data(), buffer->data(), buffer->size());
    569             copy->setRange(0, buffer->size());
    570 
    571             buffer = copy;
    572         }
    573 
    574         size_t maxBytesToRead = bufferRemaining;
    575         if (range_length >= 0) {
    576             int64_t bytesLeftInRange = range_length - buffer->size();
    577             if (bytesLeftInRange < maxBytesToRead) {
    578                 maxBytesToRead = bytesLeftInRange;
    579 
    580                 if (bytesLeftInRange == 0) {
    581                     break;
    582                 }
    583             }
    584         }
    585 
    586         ssize_t n = source->readAt(
    587                 buffer->size(), buffer->data() + buffer->size(),
    588                 maxBytesToRead);
    589 
    590         if (n < 0) {
    591             return n;
    592         }
    593 
    594         if (n == 0) {
    595             break;
    596         }
    597 
    598         buffer->setRange(0, buffer->size() + (size_t)n);
    599     }
    600 
    601     *out = buffer;
    602 
    603     return OK;
    604 }
    605 
    606 sp<M3UParser> LiveSession::fetchPlaylist(
    607         const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
    608     ALOGV("fetchPlaylist '%s'", url);
    609 
    610     *unchanged = false;
    611 
    612     sp<ABuffer> buffer;
    613     status_t err = fetchFile(url, &buffer);
    614 
    615     if (err != OK) {
    616         return NULL;
    617     }
    618 
    619     // MD5 functionality is not available on the simulator, treat all
    620     // playlists as changed.
    621 
    622 #if defined(HAVE_ANDROID_OS)
    623     uint8_t hash[16];
    624 
    625     MD5_CTX m;
    626     MD5_Init(&m);
    627     MD5_Update(&m, buffer->data(), buffer->size());
    628 
    629     MD5_Final(hash, &m);
    630 
    631     if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
    632         // playlist unchanged
    633         *unchanged = true;
    634 
    635         ALOGV("Playlist unchanged, refresh state is now %d",
    636              (int)mRefreshState);
    637 
    638         return NULL;
    639     }
    640 
    641     if (curPlaylistHash != NULL) {
    642         memcpy(curPlaylistHash, hash, sizeof(hash));
    643     }
    644 #endif
    645 
    646     sp<M3UParser> playlist =
    647         new M3UParser(url, buffer->data(), buffer->size());
    648 
    649     if (playlist->initCheck() != OK) {
    650         ALOGE("failed to parse .m3u8 playlist");
    651 
    652         return NULL;
    653     }
    654 
    655     return playlist;
    656 }
    657 
    658 static double uniformRand() {
    659     return (double)rand() / RAND_MAX;
    660 }
    661 
    662 size_t LiveSession::getBandwidthIndex() {
    663     if (mBandwidthItems.size() == 0) {
    664         return 0;
    665     }
    666 
    667 #if 1
    668     char value[PROPERTY_VALUE_MAX];
    669     ssize_t index = -1;
    670     if (property_get("media.httplive.bw-index", value, NULL)) {
    671         char *end;
    672         index = strtol(value, &end, 10);
    673         CHECK(end > value && *end == '\0');
    674 
    675         if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
    676             index = mBandwidthItems.size() - 1;
    677         }
    678     }
    679 
    680     if (index < 0) {
    681         int32_t bandwidthBps;
    682         if (mHTTPDataSource != NULL
    683                 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
    684             ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
    685         } else {
    686             ALOGV("no bandwidth estimate.");
    687             return 0;  // Pick the lowest bandwidth stream by default.
    688         }
    689 
    690         char value[PROPERTY_VALUE_MAX];
    691         if (property_get("media.httplive.max-bw", value, NULL)) {
    692             char *end;
    693             long maxBw = strtoul(value, &end, 10);
    694             if (end > value && *end == '\0') {
    695                 if (maxBw > 0 && bandwidthBps > maxBw) {
    696                     ALOGV("bandwidth capped to %ld bps", maxBw);
    697                     bandwidthBps = maxBw;
    698                 }
    699             }
    700         }
    701 
    702         // Consider only 80% of the available bandwidth usable.
    703         bandwidthBps = (bandwidthBps * 8) / 10;
    704 
    705         // Pick the highest bandwidth stream below or equal to estimated bandwidth.
    706 
    707         index = mBandwidthItems.size() - 1;
    708         while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth
    709                                 > (size_t)bandwidthBps) {
    710             --index;
    711         }
    712     }
    713 #elif 0
    714     // Change bandwidth at random()
    715     size_t index = uniformRand() * mBandwidthItems.size();
    716 #elif 0
    717     // There's a 50% chance to stay on the current bandwidth and
    718     // a 50% chance to switch to the next higher bandwidth (wrapping around
    719     // to lowest)
    720     const size_t kMinIndex = 0;
    721 
    722     static ssize_t mPrevBandwidthIndex = -1;
    723 
    724     size_t index;
    725     if (mPrevBandwidthIndex < 0) {
    726         index = kMinIndex;
    727     } else if (uniformRand() < 0.5) {
    728         index = (size_t)mPrevBandwidthIndex;
    729     } else {
    730         index = mPrevBandwidthIndex + 1;
    731         if (index == mBandwidthItems.size()) {
    732             index = kMinIndex;
    733         }
    734     }
    735     mPrevBandwidthIndex = index;
    736 #elif 0
    737     // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
    738 
    739     size_t index = mBandwidthItems.size() - 1;
    740     while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
    741         --index;
    742     }
    743 #elif 1
    744     char value[PROPERTY_VALUE_MAX];
    745     size_t index;
    746     if (property_get("media.httplive.bw-index", value, NULL)) {
    747         char *end;
    748         index = strtoul(value, &end, 10);
    749         CHECK(end > value && *end == '\0');
    750 
    751         if (index >= mBandwidthItems.size()) {
    752             index = mBandwidthItems.size() - 1;
    753         }
    754     } else {
    755         index = 0;
    756     }
    757 #else
    758     size_t index = mBandwidthItems.size() - 1;  // Highest bandwidth stream
    759 #endif
    760 
    761     CHECK_GE(index, 0);
    762 
    763     return index;
    764 }
    765 
    766 status_t LiveSession::onSeek(const sp<AMessage> &msg) {
    767     int64_t timeUs;
    768     CHECK(msg->findInt64("timeUs", &timeUs));
    769 
    770     if (!mReconfigurationInProgress) {
    771         changeConfiguration(timeUs, getBandwidthIndex());
    772     }
    773 
    774     return OK;
    775 }
    776 
    777 status_t LiveSession::getDuration(int64_t *durationUs) const {
    778     int64_t maxDurationUs = 0ll;
    779     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
    780         int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
    781 
    782         if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) {
    783             maxDurationUs = fetcherDurationUs;
    784         }
    785     }
    786 
    787     *durationUs = maxDurationUs;
    788 
    789     return OK;
    790 }
    791 
    792 bool LiveSession::isSeekable() const {
    793     int64_t durationUs;
    794     return getDuration(&durationUs) == OK && durationUs >= 0;
    795 }
    796 
    797 bool LiveSession::hasDynamicDuration() const {
    798     return false;
    799 }
    800 
    801 status_t LiveSession::getTrackInfo(Parcel *reply) const {
    802     return mPlaylist->getTrackInfo(reply);
    803 }
    804 
    805 status_t LiveSession::selectTrack(size_t index, bool select) {
    806     status_t err = mPlaylist->selectTrack(index, select);
    807     if (err == OK) {
    808         (new AMessage(kWhatChangeConfiguration, id()))->post();
    809     }
    810     return err;
    811 }
    812 
    813 void LiveSession::changeConfiguration(
    814         int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
    815     CHECK(!mReconfigurationInProgress);
    816     mReconfigurationInProgress = true;
    817 
    818     mPrevBandwidthIndex = bandwidthIndex;
    819 
    820     ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d",
    821           timeUs, bandwidthIndex, pickTrack);
    822 
    823     if (pickTrack) {
    824         mPlaylist->pickRandomMediaItems();
    825     }
    826 
    827     CHECK_LT(bandwidthIndex, mBandwidthItems.size());
    828     const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
    829 
    830     uint32_t streamMask = 0;
    831 
    832     AString audioURI;
    833     if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) {
    834         streamMask |= STREAMTYPE_AUDIO;
    835     }
    836 
    837     AString videoURI;
    838     if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) {
    839         streamMask |= STREAMTYPE_VIDEO;
    840     }
    841 
    842     AString subtitleURI;
    843     if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) {
    844         streamMask |= STREAMTYPE_SUBTITLES;
    845     }
    846 
    847     // Step 1, stop and discard fetchers that are no longer needed.
    848     // Pause those that we'll reuse.
    849     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
    850         const AString &uri = mFetcherInfos.keyAt(i);
    851 
    852         bool discardFetcher = true;
    853 
    854         // If we're seeking all current fetchers are discarded.
    855         if (timeUs < 0ll) {
    856             if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI)
    857                     || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI)
    858                     || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) {
    859                 discardFetcher = false;
    860             }
    861         }
    862 
    863         if (discardFetcher) {
    864             mFetcherInfos.valueAt(i).mFetcher->stopAsync();
    865         } else {
    866             mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
    867         }
    868     }
    869 
    870     sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
    871     msg->setInt32("streamMask", streamMask);
    872     msg->setInt64("timeUs", timeUs);
    873     if (streamMask & STREAMTYPE_AUDIO) {
    874         msg->setString("audioURI", audioURI.c_str());
    875     }
    876     if (streamMask & STREAMTYPE_VIDEO) {
    877         msg->setString("videoURI", videoURI.c_str());
    878     }
    879     if (streamMask & STREAMTYPE_SUBTITLES) {
    880         msg->setString("subtitleURI", subtitleURI.c_str());
    881     }
    882 
    883     // Every time a fetcher acknowledges the stopAsync or pauseAsync request
    884     // we'll decrement mContinuationCounter, once it reaches zero, i.e. all
    885     // fetchers have completed their asynchronous operation, we'll post
    886     // mContinuation, which then is handled below in onChangeConfiguration2.
    887     mContinuationCounter = mFetcherInfos.size();
    888     mContinuation = msg;
    889 
    890     if (mContinuationCounter == 0) {
    891         msg->post();
    892     }
    893 }
    894 
    895 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
    896     if (!mReconfigurationInProgress) {
    897         changeConfiguration(-1ll /* timeUs */, getBandwidthIndex());
    898     } else {
    899         msg->post(1000000ll); // retry in 1 sec
    900     }
    901 }
    902 
    903 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
    904     mContinuation.clear();
    905 
    906     // All fetchers are either suspended or have been removed now.
    907 
    908     uint32_t streamMask;
    909     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
    910 
    911     AString audioURI, videoURI, subtitleURI;
    912     if (streamMask & STREAMTYPE_AUDIO) {
    913         CHECK(msg->findString("audioURI", &audioURI));
    914         ALOGV("audioURI = '%s'", audioURI.c_str());
    915     }
    916     if (streamMask & STREAMTYPE_VIDEO) {
    917         CHECK(msg->findString("videoURI", &videoURI));
    918         ALOGV("videoURI = '%s'", videoURI.c_str());
    919     }
    920     if (streamMask & STREAMTYPE_SUBTITLES) {
    921         CHECK(msg->findString("subtitleURI", &subtitleURI));
    922         ALOGV("subtitleURI = '%s'", subtitleURI.c_str());
    923     }
    924 
    925     // Determine which decoders to shutdown on the player side,
    926     // a decoder has to be shutdown if either
    927     // 1) its streamtype was active before but now longer isn't.
    928     // or
    929     // 2) its streamtype was already active and still is but the URI
    930     //    has changed.
    931     uint32_t changedMask = 0;
    932     if (((mStreamMask & streamMask & STREAMTYPE_AUDIO)
    933                 && !(audioURI == mAudioURI))
    934         || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) {
    935         changedMask |= STREAMTYPE_AUDIO;
    936     }
    937     if (((mStreamMask & streamMask & STREAMTYPE_VIDEO)
    938                 && !(videoURI == mVideoURI))
    939         || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) {
    940         changedMask |= STREAMTYPE_VIDEO;
    941     }
    942 
    943     if (changedMask == 0) {
    944         // If nothing changed as far as the audio/video decoders
    945         // are concerned we can proceed.
    946         onChangeConfiguration3(msg);
    947         return;
    948     }
    949 
    950     // Something changed, inform the player which will shutdown the
    951     // corresponding decoders and will post the reply once that's done.
    952     // Handling the reply will continue executing below in
    953     // onChangeConfiguration3.
    954     sp<AMessage> notify = mNotify->dup();
    955     notify->setInt32("what", kWhatStreamsChanged);
    956     notify->setInt32("changedMask", changedMask);
    957 
    958     msg->setWhat(kWhatChangeConfiguration3);
    959     msg->setTarget(id());
    960 
    961     notify->setMessage("reply", msg);
    962     notify->post();
    963 }
    964 
    965 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
    966     // All remaining fetchers are still suspended, the player has shutdown
    967     // any decoders that needed it.
    968 
    969     uint32_t streamMask;
    970     CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
    971 
    972     AString audioURI, videoURI, subtitleURI;
    973     if (streamMask & STREAMTYPE_AUDIO) {
    974         CHECK(msg->findString("audioURI", &audioURI));
    975     }
    976     if (streamMask & STREAMTYPE_VIDEO) {
    977         CHECK(msg->findString("videoURI", &videoURI));
    978     }
    979     if (streamMask & STREAMTYPE_SUBTITLES) {
    980         CHECK(msg->findString("subtitleURI", &subtitleURI));
    981     }
    982 
    983     int64_t timeUs;
    984     CHECK(msg->findInt64("timeUs", &timeUs));
    985 
    986     if (timeUs < 0ll) {
    987         timeUs = mLastDequeuedTimeUs;
    988     }
    989     mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
    990 
    991     mStreamMask = streamMask;
    992     mAudioURI = audioURI;
    993     mVideoURI = videoURI;
    994     mSubtitleURI = subtitleURI;
    995 
    996     // Resume all existing fetchers and assign them packet sources.
    997     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
    998         const AString &uri = mFetcherInfos.keyAt(i);
    999 
   1000         uint32_t resumeMask = 0;
   1001 
   1002         sp<AnotherPacketSource> audioSource;
   1003         if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
   1004             audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
   1005             resumeMask |= STREAMTYPE_AUDIO;
   1006         }
   1007 
   1008         sp<AnotherPacketSource> videoSource;
   1009         if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
   1010             videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
   1011             resumeMask |= STREAMTYPE_VIDEO;
   1012         }
   1013 
   1014         sp<AnotherPacketSource> subtitleSource;
   1015         if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
   1016             subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
   1017             resumeMask |= STREAMTYPE_SUBTITLES;
   1018         }
   1019 
   1020         CHECK_NE(resumeMask, 0u);
   1021 
   1022         ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
   1023 
   1024         streamMask &= ~resumeMask;
   1025 
   1026         mFetcherInfos.valueAt(i).mFetcher->startAsync(
   1027                 audioSource, videoSource, subtitleSource);
   1028     }
   1029 
   1030     // streamMask now only contains the types that need a new fetcher created.
   1031 
   1032     if (streamMask != 0) {
   1033         ALOGV("creating new fetchers for mask 0x%08x", streamMask);
   1034     }
   1035 
   1036     while (streamMask != 0) {
   1037         StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1));
   1038 
   1039         AString uri;
   1040         switch (streamType) {
   1041             case STREAMTYPE_AUDIO:
   1042                 uri = audioURI;
   1043                 break;
   1044             case STREAMTYPE_VIDEO:
   1045                 uri = videoURI;
   1046                 break;
   1047             case STREAMTYPE_SUBTITLES:
   1048                 uri = subtitleURI;
   1049                 break;
   1050             default:
   1051                 TRESPASS();
   1052         }
   1053 
   1054         sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
   1055         CHECK(fetcher != NULL);
   1056 
   1057         sp<AnotherPacketSource> audioSource;
   1058         if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) {
   1059             audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
   1060             audioSource->clear();
   1061 
   1062             streamMask &= ~STREAMTYPE_AUDIO;
   1063         }
   1064 
   1065         sp<AnotherPacketSource> videoSource;
   1066         if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) {
   1067             videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
   1068             videoSource->clear();
   1069 
   1070             streamMask &= ~STREAMTYPE_VIDEO;
   1071         }
   1072 
   1073         sp<AnotherPacketSource> subtitleSource;
   1074         if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) {
   1075             subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES);
   1076             subtitleSource->clear();
   1077 
   1078             streamMask &= ~STREAMTYPE_SUBTITLES;
   1079         }
   1080 
   1081         fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs);
   1082     }
   1083 
   1084     // All fetchers have now been started, the configuration change
   1085     // has completed.
   1086 
   1087     scheduleCheckBandwidthEvent();
   1088 
   1089     ALOGV("XXX configuration change completed.");
   1090 
   1091     mReconfigurationInProgress = false;
   1092 
   1093     if (mDisconnectReplyID != 0) {
   1094         finishDisconnect();
   1095     }
   1096 }
   1097 
   1098 void LiveSession::scheduleCheckBandwidthEvent() {
   1099     sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
   1100     msg->setInt32("generation", mCheckBandwidthGeneration);
   1101     msg->post(10000000ll);
   1102 }
   1103 
   1104 void LiveSession::cancelCheckBandwidthEvent() {
   1105     ++mCheckBandwidthGeneration;
   1106 }
   1107 
   1108 void LiveSession::onCheckBandwidth() {
   1109     if (mReconfigurationInProgress) {
   1110         scheduleCheckBandwidthEvent();
   1111         return;
   1112     }
   1113 
   1114     size_t bandwidthIndex = getBandwidthIndex();
   1115     if (mPrevBandwidthIndex < 0
   1116             || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
   1117         changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
   1118     }
   1119 
   1120     // Handling the kWhatCheckBandwidth even here does _not_ automatically
   1121     // schedule another one on return, only an explicit call to
   1122     // scheduleCheckBandwidthEvent will do that.
   1123     // This ensures that only one configuration change is ongoing at any
   1124     // one time, once that completes it'll schedule another check bandwidth
   1125     // event.
   1126 }
   1127 
   1128 void LiveSession::postPrepared(status_t err) {
   1129     CHECK(mInPreparationPhase);
   1130 
   1131     sp<AMessage> notify = mNotify->dup();
   1132     if (err == OK || err == ERROR_END_OF_STREAM) {
   1133         notify->setInt32("what", kWhatPrepared);
   1134     } else {
   1135         notify->setInt32("what", kWhatPreparationFailed);
   1136         notify->setInt32("err", err);
   1137     }
   1138 
   1139     notify->post();
   1140 
   1141     mInPreparationPhase = false;
   1142 }
   1143 
   1144 }  // namespace android
   1145 
   1146