Home | History | Annotate | Download | only in libstagefright
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "NuCachedSource2"
     19 #include <utils/Log.h>
     20 
     21 #include "include/NuCachedSource2.h"
     22 #include "include/HTTPBase.h"
     23 
     24 #include <cutils/properties.h>
     25 #include <media/stagefright/foundation/ADebug.h>
     26 #include <media/stagefright/foundation/AMessage.h>
     27 #include <media/stagefright/MediaErrors.h>
     28 
     29 namespace android {
     30 
     31 struct PageCache {
     32     PageCache(size_t pageSize);
     33     ~PageCache();
     34 
     35     struct Page {
     36         void *mData;
     37         size_t mSize;
     38     };
     39 
     40     Page *acquirePage();
     41     void releasePage(Page *page);
     42 
     43     void appendPage(Page *page);
     44     size_t releaseFromStart(size_t maxBytes);
     45 
     46     size_t totalSize() const {
     47         return mTotalSize;
     48     }
     49 
     50     void copy(size_t from, void *data, size_t size);
     51 
     52 private:
     53     size_t mPageSize;
     54     size_t mTotalSize;
     55 
     56     List<Page *> mActivePages;
     57     List<Page *> mFreePages;
     58 
     59     void freePages(List<Page *> *list);
     60 
     61     DISALLOW_EVIL_CONSTRUCTORS(PageCache);
     62 };
     63 
     64 PageCache::PageCache(size_t pageSize)
     65     : mPageSize(pageSize),
     66       mTotalSize(0) {
     67 }
     68 
     69 PageCache::~PageCache() {
     70     freePages(&mActivePages);
     71     freePages(&mFreePages);
     72 }
     73 
     74 void PageCache::freePages(List<Page *> *list) {
     75     List<Page *>::iterator it = list->begin();
     76     while (it != list->end()) {
     77         Page *page = *it;
     78 
     79         free(page->mData);
     80         delete page;
     81         page = NULL;
     82 
     83         ++it;
     84     }
     85 }
     86 
     87 PageCache::Page *PageCache::acquirePage() {
     88     if (!mFreePages.empty()) {
     89         List<Page *>::iterator it = mFreePages.begin();
     90         Page *page = *it;
     91         mFreePages.erase(it);
     92 
     93         return page;
     94     }
     95 
     96     Page *page = new Page;
     97     page->mData = malloc(mPageSize);
     98     page->mSize = 0;
     99 
    100     return page;
    101 }
    102 
    103 void PageCache::releasePage(Page *page) {
    104     page->mSize = 0;
    105     mFreePages.push_back(page);
    106 }
    107 
    108 void PageCache::appendPage(Page *page) {
    109     mTotalSize += page->mSize;
    110     mActivePages.push_back(page);
    111 }
    112 
    113 size_t PageCache::releaseFromStart(size_t maxBytes) {
    114     size_t bytesReleased = 0;
    115 
    116     while (maxBytes > 0 && !mActivePages.empty()) {
    117         List<Page *>::iterator it = mActivePages.begin();
    118 
    119         Page *page = *it;
    120 
    121         if (maxBytes < page->mSize) {
    122             break;
    123         }
    124 
    125         mActivePages.erase(it);
    126 
    127         maxBytes -= page->mSize;
    128         bytesReleased += page->mSize;
    129 
    130         releasePage(page);
    131     }
    132 
    133     mTotalSize -= bytesReleased;
    134     return bytesReleased;
    135 }
    136 
    137 void PageCache::copy(size_t from, void *data, size_t size) {
    138     ALOGV("copy from %d size %d", from, size);
    139 
    140     if (size == 0) {
    141         return;
    142     }
    143 
    144     CHECK_LE(from + size, mTotalSize);
    145 
    146     size_t offset = 0;
    147     List<Page *>::iterator it = mActivePages.begin();
    148     while (from >= offset + (*it)->mSize) {
    149         offset += (*it)->mSize;
    150         ++it;
    151     }
    152 
    153     size_t delta = from - offset;
    154     size_t avail = (*it)->mSize - delta;
    155 
    156     if (avail >= size) {
    157         memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
    158         return;
    159     }
    160 
    161     memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
    162     ++it;
    163     data = (uint8_t *)data + avail;
    164     size -= avail;
    165 
    166     while (size > 0) {
    167         size_t copy = (*it)->mSize;
    168         if (copy > size) {
    169             copy = size;
    170         }
    171         memcpy(data, (*it)->mData, copy);
    172         data = (uint8_t *)data + copy;
    173         size -= copy;
    174         ++it;
    175     }
    176 }
    177 
    178 ////////////////////////////////////////////////////////////////////////////////
    179 
    180 NuCachedSource2::NuCachedSource2(
    181         const sp<DataSource> &source,
    182         const char *cacheConfig,
    183         bool disconnectAtHighwatermark)
    184     : mSource(source),
    185       mReflector(new AHandlerReflector<NuCachedSource2>(this)),
    186       mLooper(new ALooper),
    187       mCache(new PageCache(kPageSize)),
    188       mCacheOffset(0),
    189       mFinalStatus(OK),
    190       mLastAccessPos(0),
    191       mFetching(true),
    192       mLastFetchTimeUs(-1),
    193       mNumRetriesLeft(kMaxNumRetries),
    194       mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
    195       mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
    196       mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs),
    197       mDisconnectAtHighwatermark(disconnectAtHighwatermark) {
    198     // We are NOT going to support disconnect-at-highwatermark indefinitely
    199     // and we are not guaranteeing support for client-specified cache
    200     // parameters. Both of these are temporary measures to solve a specific
    201     // problem that will be solved in a better way going forward.
    202 
    203     updateCacheParamsFromSystemProperty();
    204 
    205     if (cacheConfig != NULL) {
    206         updateCacheParamsFromString(cacheConfig);
    207     }
    208 
    209     if (mDisconnectAtHighwatermark) {
    210         // Makes no sense to disconnect and do keep-alives...
    211         mKeepAliveIntervalUs = 0;
    212     }
    213 
    214     mLooper->setName("NuCachedSource2");
    215     mLooper->registerHandler(mReflector);
    216     mLooper->start();
    217 
    218     Mutex::Autolock autoLock(mLock);
    219     (new AMessage(kWhatFetchMore, mReflector->id()))->post();
    220 }
    221 
    222 NuCachedSource2::~NuCachedSource2() {
    223     mLooper->stop();
    224     mLooper->unregisterHandler(mReflector->id());
    225 
    226     delete mCache;
    227     mCache = NULL;
    228 }
    229 
    230 status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
    231     if (mSource->flags() & kIsHTTPBasedSource) {
    232         HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
    233         return source->getEstimatedBandwidthKbps(kbps);
    234     }
    235     return ERROR_UNSUPPORTED;
    236 }
    237 
    238 status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
    239     if (mSource->flags() & kIsHTTPBasedSource) {
    240         HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
    241         return source->setBandwidthStatCollectFreq(freqMs);
    242     }
    243     return ERROR_UNSUPPORTED;
    244 }
    245 
    246 status_t NuCachedSource2::initCheck() const {
    247     return mSource->initCheck();
    248 }
    249 
    250 status_t NuCachedSource2::getSize(off64_t *size) {
    251     return mSource->getSize(size);
    252 }
    253 
    254 uint32_t NuCachedSource2::flags() {
    255     // Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
    256     uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
    257     return (flags | kIsCachingDataSource);
    258 }
    259 
    260 void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
    261     switch (msg->what()) {
    262         case kWhatFetchMore:
    263         {
    264             onFetch();
    265             break;
    266         }
    267 
    268         case kWhatRead:
    269         {
    270             onRead(msg);
    271             break;
    272         }
    273 
    274         default:
    275             TRESPASS();
    276     }
    277 }
    278 
    279 void NuCachedSource2::fetchInternal() {
    280     ALOGV("fetchInternal");
    281 
    282     bool reconnect = false;
    283 
    284     {
    285         Mutex::Autolock autoLock(mLock);
    286         CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
    287 
    288         if (mFinalStatus != OK) {
    289             --mNumRetriesLeft;
    290 
    291             reconnect = true;
    292         }
    293     }
    294 
    295     if (reconnect) {
    296         status_t err =
    297             mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
    298 
    299         Mutex::Autolock autoLock(mLock);
    300 
    301         if (err == ERROR_UNSUPPORTED || err == -EPIPE) {
    302             // These are errors that are not likely to go away even if we
    303             // retry, i.e. the server doesn't support range requests or similar.
    304             mNumRetriesLeft = 0;
    305             return;
    306         } else if (err != OK) {
    307             ALOGI("The attempt to reconnect failed, %d retries remaining",
    308                  mNumRetriesLeft);
    309 
    310             return;
    311         }
    312     }
    313 
    314     PageCache::Page *page = mCache->acquirePage();
    315 
    316     ssize_t n = mSource->readAt(
    317             mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
    318 
    319     Mutex::Autolock autoLock(mLock);
    320 
    321     if (n < 0) {
    322         mFinalStatus = n;
    323         if (n == ERROR_UNSUPPORTED || n == -EPIPE) {
    324             // These are errors that are not likely to go away even if we
    325             // retry, i.e. the server doesn't support range requests or similar.
    326             mNumRetriesLeft = 0;
    327         }
    328 
    329         ALOGE("source returned error %ld, %d retries left", n, mNumRetriesLeft);
    330         mCache->releasePage(page);
    331     } else if (n == 0) {
    332         ALOGI("ERROR_END_OF_STREAM");
    333 
    334         mNumRetriesLeft = 0;
    335         mFinalStatus = ERROR_END_OF_STREAM;
    336 
    337         mCache->releasePage(page);
    338     } else {
    339         if (mFinalStatus != OK) {
    340             ALOGI("retrying a previously failed read succeeded.");
    341         }
    342         mNumRetriesLeft = kMaxNumRetries;
    343         mFinalStatus = OK;
    344 
    345         page->mSize = n;
    346         mCache->appendPage(page);
    347     }
    348 }
    349 
    350 void NuCachedSource2::onFetch() {
    351     ALOGV("onFetch");
    352 
    353     if (mFinalStatus != OK && mNumRetriesLeft == 0) {
    354         ALOGV("EOS reached, done prefetching for now");
    355         mFetching = false;
    356     }
    357 
    358     bool keepAlive =
    359         !mFetching
    360             && mFinalStatus == OK
    361             && mKeepAliveIntervalUs > 0
    362             && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
    363 
    364     if (mFetching || keepAlive) {
    365         if (keepAlive) {
    366             ALOGI("Keep alive");
    367         }
    368 
    369         fetchInternal();
    370 
    371         mLastFetchTimeUs = ALooper::GetNowUs();
    372 
    373         if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
    374             ALOGI("Cache full, done prefetching for now");
    375             mFetching = false;
    376 
    377             if (mDisconnectAtHighwatermark
    378                     && (mSource->flags() & DataSource::kIsHTTPBasedSource)) {
    379                 ALOGV("Disconnecting at high watermark");
    380                 static_cast<HTTPBase *>(mSource.get())->disconnect();
    381                 mFinalStatus = -EAGAIN;
    382             }
    383         }
    384     } else {
    385         Mutex::Autolock autoLock(mLock);
    386         restartPrefetcherIfNecessary_l();
    387     }
    388 
    389     int64_t delayUs;
    390     if (mFetching) {
    391         if (mFinalStatus != OK && mNumRetriesLeft > 0) {
    392             // We failed this time and will try again in 3 seconds.
    393             delayUs = 3000000ll;
    394         } else {
    395             delayUs = 0;
    396         }
    397     } else {
    398         delayUs = 100000ll;
    399     }
    400 
    401     (new AMessage(kWhatFetchMore, mReflector->id()))->post(delayUs);
    402 }
    403 
    404 void NuCachedSource2::onRead(const sp<AMessage> &msg) {
    405     ALOGV("onRead");
    406 
    407     int64_t offset;
    408     CHECK(msg->findInt64("offset", &offset));
    409 
    410     void *data;
    411     CHECK(msg->findPointer("data", &data));
    412 
    413     size_t size;
    414     CHECK(msg->findSize("size", &size));
    415 
    416     ssize_t result = readInternal(offset, data, size);
    417 
    418     if (result == -EAGAIN) {
    419         msg->post(50000);
    420         return;
    421     }
    422 
    423     Mutex::Autolock autoLock(mLock);
    424 
    425     CHECK(mAsyncResult == NULL);
    426 
    427     mAsyncResult = new AMessage;
    428     mAsyncResult->setInt32("result", result);
    429 
    430     mCondition.signal();
    431 }
    432 
    433 void NuCachedSource2::restartPrefetcherIfNecessary_l(
    434         bool ignoreLowWaterThreshold, bool force) {
    435     static const size_t kGrayArea = 1024 * 1024;
    436 
    437     if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
    438         return;
    439     }
    440 
    441     if (!ignoreLowWaterThreshold && !force
    442             && mCacheOffset + mCache->totalSize() - mLastAccessPos
    443                 >= mLowwaterThresholdBytes) {
    444         return;
    445     }
    446 
    447     size_t maxBytes = mLastAccessPos - mCacheOffset;
    448 
    449     if (!force) {
    450         if (maxBytes < kGrayArea) {
    451             return;
    452         }
    453 
    454         maxBytes -= kGrayArea;
    455     }
    456 
    457     size_t actualBytes = mCache->releaseFromStart(maxBytes);
    458     mCacheOffset += actualBytes;
    459 
    460     ALOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
    461     mFetching = true;
    462 }
    463 
    464 ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
    465     Mutex::Autolock autoSerializer(mSerializer);
    466 
    467     ALOGV("readAt offset %lld, size %d", offset, size);
    468 
    469     Mutex::Autolock autoLock(mLock);
    470 
    471     // If the request can be completely satisfied from the cache, do so.
    472 
    473     if (offset >= mCacheOffset
    474             && offset + size <= mCacheOffset + mCache->totalSize()) {
    475         size_t delta = offset - mCacheOffset;
    476         mCache->copy(delta, data, size);
    477 
    478         mLastAccessPos = offset + size;
    479 
    480         return size;
    481     }
    482 
    483     sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
    484     msg->setInt64("offset", offset);
    485     msg->setPointer("data", data);
    486     msg->setSize("size", size);
    487 
    488     CHECK(mAsyncResult == NULL);
    489     msg->post();
    490 
    491     while (mAsyncResult == NULL) {
    492         mCondition.wait(mLock);
    493     }
    494 
    495     int32_t result;
    496     CHECK(mAsyncResult->findInt32("result", &result));
    497 
    498     mAsyncResult.clear();
    499 
    500     if (result > 0) {
    501         mLastAccessPos = offset + result;
    502     }
    503 
    504     return (ssize_t)result;
    505 }
    506 
    507 size_t NuCachedSource2::cachedSize() {
    508     Mutex::Autolock autoLock(mLock);
    509     return mCacheOffset + mCache->totalSize();
    510 }
    511 
    512 size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const {
    513     Mutex::Autolock autoLock(mLock);
    514     return approxDataRemaining_l(finalStatus);
    515 }
    516 
    517 size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) const {
    518     *finalStatus = mFinalStatus;
    519 
    520     if (mFinalStatus != OK && mNumRetriesLeft > 0) {
    521         // Pretend that everything is fine until we're out of retries.
    522         *finalStatus = OK;
    523     }
    524 
    525     off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
    526     if (mLastAccessPos < lastBytePosCached) {
    527         return lastBytePosCached - mLastAccessPos;
    528     }
    529     return 0;
    530 }
    531 
    532 ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
    533     CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
    534 
    535     ALOGV("readInternal offset %lld size %d", offset, size);
    536 
    537     Mutex::Autolock autoLock(mLock);
    538 
    539     if (!mFetching) {
    540         mLastAccessPos = offset;
    541         restartPrefetcherIfNecessary_l(
    542                 false, // ignoreLowWaterThreshold
    543                 true); // force
    544     }
    545 
    546     if (offset < mCacheOffset
    547             || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
    548         static const off64_t kPadding = 256 * 1024;
    549 
    550         // In the presence of multiple decoded streams, once of them will
    551         // trigger this seek request, the other one will request data "nearby"
    552         // soon, adjust the seek position so that that subsequent request
    553         // does not trigger another seek.
    554         off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
    555 
    556         seekInternal_l(seekOffset);
    557     }
    558 
    559     size_t delta = offset - mCacheOffset;
    560 
    561     if (mFinalStatus != OK && mNumRetriesLeft == 0) {
    562         if (delta >= mCache->totalSize()) {
    563             return mFinalStatus;
    564         }
    565 
    566         size_t avail = mCache->totalSize() - delta;
    567 
    568         if (avail > size) {
    569             avail = size;
    570         }
    571 
    572         mCache->copy(delta, data, avail);
    573 
    574         return avail;
    575     }
    576 
    577     if (offset + size <= mCacheOffset + mCache->totalSize()) {
    578         mCache->copy(delta, data, size);
    579 
    580         return size;
    581     }
    582 
    583     ALOGV("deferring read");
    584 
    585     return -EAGAIN;
    586 }
    587 
    588 status_t NuCachedSource2::seekInternal_l(off64_t offset) {
    589     mLastAccessPos = offset;
    590 
    591     if (offset >= mCacheOffset
    592             && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
    593         return OK;
    594     }
    595 
    596     ALOGI("new range: offset= %lld", offset);
    597 
    598     mCacheOffset = offset;
    599 
    600     size_t totalSize = mCache->totalSize();
    601     CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
    602 
    603     mNumRetriesLeft = kMaxNumRetries;
    604     mFetching = true;
    605 
    606     return OK;
    607 }
    608 
    609 void NuCachedSource2::resumeFetchingIfNecessary() {
    610     Mutex::Autolock autoLock(mLock);
    611 
    612     restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
    613 }
    614 
    615 sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) {
    616     return mSource->DrmInitialization(mime);
    617 }
    618 
    619 void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) {
    620     mSource->getDrmInfo(handle, client);
    621 }
    622 
    623 String8 NuCachedSource2::getUri() {
    624     return mSource->getUri();
    625 }
    626 
    627 String8 NuCachedSource2::getMIMEType() const {
    628     return mSource->getMIMEType();
    629 }
    630 
    631 void NuCachedSource2::updateCacheParamsFromSystemProperty() {
    632     char value[PROPERTY_VALUE_MAX];
    633     if (!property_get("media.stagefright.cache-params", value, NULL)) {
    634         return;
    635     }
    636 
    637     updateCacheParamsFromString(value);
    638 }
    639 
    640 void NuCachedSource2::updateCacheParamsFromString(const char *s) {
    641     ssize_t lowwaterMarkKb, highwaterMarkKb;
    642     int keepAliveSecs;
    643 
    644     if (sscanf(s, "%ld/%ld/%d",
    645                &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) {
    646         ALOGE("Failed to parse cache parameters from '%s'.", s);
    647         return;
    648     }
    649 
    650     if (lowwaterMarkKb >= 0) {
    651         mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
    652     } else {
    653         mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
    654     }
    655 
    656     if (highwaterMarkKb >= 0) {
    657         mHighwaterThresholdBytes = highwaterMarkKb * 1024;
    658     } else {
    659         mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
    660     }
    661 
    662     if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) {
    663         ALOGE("Illegal low/highwater marks specified, reverting to defaults.");
    664 
    665         mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
    666         mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
    667     }
    668 
    669     if (keepAliveSecs >= 0) {
    670         mKeepAliveIntervalUs = keepAliveSecs * 1000000ll;
    671     } else {
    672         mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs;
    673     }
    674 
    675     ALOGV("lowwater = %d bytes, highwater = %d bytes, keepalive = %lld us",
    676          mLowwaterThresholdBytes,
    677          mHighwaterThresholdBytes,
    678          mKeepAliveIntervalUs);
    679 }
    680 
    681 // static
    682 void NuCachedSource2::RemoveCacheSpecificHeaders(
    683         KeyedVector<String8, String8> *headers,
    684         String8 *cacheConfig,
    685         bool *disconnectAtHighwatermark) {
    686     *cacheConfig = String8();
    687     *disconnectAtHighwatermark = false;
    688 
    689     if (headers == NULL) {
    690         return;
    691     }
    692 
    693     ssize_t index;
    694     if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) {
    695         *cacheConfig = headers->valueAt(index);
    696 
    697         headers->removeItemsAt(index);
    698 
    699         ALOGV("Using special cache config '%s'", cacheConfig->string());
    700     }
    701 
    702     if ((index = headers->indexOfKey(
    703                     String8("x-disconnect-at-highwatermark"))) >= 0) {
    704         *disconnectAtHighwatermark = true;
    705         headers->removeItemsAt(index);
    706 
    707         ALOGV("Client requested disconnection at highwater mark");
    708     }
    709 }
    710 
    711 }  // namespace android
    712