Home | History | Annotate | Download | only in libstagefright
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #define LOG_TAG "NuCachedSource2"
     18 #include <utils/Log.h>
     19 
     20 #include "include/NuCachedSource2.h"
     21 
     22 #include <media/stagefright/foundation/ADebug.h>
     23 #include <media/stagefright/foundation/AMessage.h>
     24 #include <media/stagefright/MediaErrors.h>
     25 
     26 namespace android {
     27 
     28 struct PageCache {
     29     PageCache(size_t pageSize);
     30     ~PageCache();
     31 
     32     struct Page {
     33         void *mData;
     34         size_t mSize;
     35     };
     36 
     37     Page *acquirePage();
     38     void releasePage(Page *page);
     39 
     40     void appendPage(Page *page);
     41     size_t releaseFromStart(size_t maxBytes);
     42 
     43     size_t totalSize() const {
     44         return mTotalSize;
     45     }
     46 
     47     void copy(size_t from, void *data, size_t size);
     48 
     49 private:
     50     size_t mPageSize;
     51     size_t mTotalSize;
     52 
     53     List<Page *> mActivePages;
     54     List<Page *> mFreePages;
     55 
     56     void freePages(List<Page *> *list);
     57 
     58     DISALLOW_EVIL_CONSTRUCTORS(PageCache);
     59 };
     60 
     61 PageCache::PageCache(size_t pageSize)
     62     : mPageSize(pageSize),
     63       mTotalSize(0) {
     64 }
     65 
     66 PageCache::~PageCache() {
     67     freePages(&mActivePages);
     68     freePages(&mFreePages);
     69 }
     70 
     71 void PageCache::freePages(List<Page *> *list) {
     72     List<Page *>::iterator it = list->begin();
     73     while (it != list->end()) {
     74         Page *page = *it;
     75 
     76         free(page->mData);
     77         delete page;
     78         page = NULL;
     79 
     80         ++it;
     81     }
     82 }
     83 
     84 PageCache::Page *PageCache::acquirePage() {
     85     if (!mFreePages.empty()) {
     86         List<Page *>::iterator it = mFreePages.begin();
     87         Page *page = *it;
     88         mFreePages.erase(it);
     89 
     90         return page;
     91     }
     92 
     93     Page *page = new Page;
     94     page->mData = malloc(mPageSize);
     95     page->mSize = 0;
     96 
     97     return page;
     98 }
     99 
    100 void PageCache::releasePage(Page *page) {
    101     page->mSize = 0;
    102     mFreePages.push_back(page);
    103 }
    104 
    105 void PageCache::appendPage(Page *page) {
    106     mTotalSize += page->mSize;
    107     mActivePages.push_back(page);
    108 }
    109 
    110 size_t PageCache::releaseFromStart(size_t maxBytes) {
    111     size_t bytesReleased = 0;
    112 
    113     while (maxBytes > 0 && !mActivePages.empty()) {
    114         List<Page *>::iterator it = mActivePages.begin();
    115 
    116         Page *page = *it;
    117 
    118         if (maxBytes < page->mSize) {
    119             break;
    120         }
    121 
    122         mActivePages.erase(it);
    123 
    124         maxBytes -= page->mSize;
    125         bytesReleased += page->mSize;
    126 
    127         releasePage(page);
    128     }
    129 
    130     mTotalSize -= bytesReleased;
    131     return bytesReleased;
    132 }
    133 
    134 void PageCache::copy(size_t from, void *data, size_t size) {
    135     LOGV("copy from %d size %d", from, size);
    136 
    137     CHECK_LE(from + size, mTotalSize);
    138 
    139     size_t offset = 0;
    140     List<Page *>::iterator it = mActivePages.begin();
    141     while (from >= offset + (*it)->mSize) {
    142         offset += (*it)->mSize;
    143         ++it;
    144     }
    145 
    146     size_t delta = from - offset;
    147     size_t avail = (*it)->mSize - delta;
    148 
    149     if (avail >= size) {
    150         memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
    151         return;
    152     }
    153 
    154     memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
    155     ++it;
    156     data = (uint8_t *)data + avail;
    157     size -= avail;
    158 
    159     while (size > 0) {
    160         size_t copy = (*it)->mSize;
    161         if (copy > size) {
    162             copy = size;
    163         }
    164         memcpy(data, (*it)->mData, copy);
    165         data = (uint8_t *)data + copy;
    166         size -= copy;
    167         ++it;
    168     }
    169 }
    170 
    171 ////////////////////////////////////////////////////////////////////////////////
    172 
    173 NuCachedSource2::NuCachedSource2(const sp<DataSource> &source)
    174     : mSource(source),
    175       mReflector(new AHandlerReflector<NuCachedSource2>(this)),
    176       mLooper(new ALooper),
    177       mCache(new PageCache(kPageSize)),
    178       mCacheOffset(0),
    179       mFinalStatus(OK),
    180       mLastAccessPos(0),
    181       mFetching(true),
    182       mLastFetchTimeUs(-1),
    183       mSuspended(false) {
    184     mLooper->setName("NuCachedSource2");
    185     mLooper->registerHandler(mReflector);
    186     mLooper->start();
    187 
    188     Mutex::Autolock autoLock(mLock);
    189     (new AMessage(kWhatFetchMore, mReflector->id()))->post();
    190 }
    191 
    192 NuCachedSource2::~NuCachedSource2() {
    193     mLooper->stop();
    194     mLooper->unregisterHandler(mReflector->id());
    195 
    196     delete mCache;
    197     mCache = NULL;
    198 }
    199 
    200 status_t NuCachedSource2::initCheck() const {
    201     return mSource->initCheck();
    202 }
    203 
    204 status_t NuCachedSource2::getSize(off_t *size) {
    205     return mSource->getSize(size);
    206 }
    207 
    208 uint32_t NuCachedSource2::flags() {
    209     return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource;
    210 }
    211 
    212 void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
    213     switch (msg->what()) {
    214         case kWhatFetchMore:
    215         {
    216             onFetch();
    217             break;
    218         }
    219 
    220         case kWhatRead:
    221         {
    222             onRead(msg);
    223             break;
    224         }
    225 
    226         case kWhatSuspend:
    227         {
    228             onSuspend();
    229             break;
    230         }
    231 
    232         default:
    233             TRESPASS();
    234     }
    235 }
    236 
    237 void NuCachedSource2::fetchInternal() {
    238     LOGV("fetchInternal");
    239 
    240     CHECK_EQ(mFinalStatus, (status_t)OK);
    241 
    242     PageCache::Page *page = mCache->acquirePage();
    243 
    244     ssize_t n = mSource->readAt(
    245             mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
    246 
    247     Mutex::Autolock autoLock(mLock);
    248 
    249     if (n < 0) {
    250         LOGE("source returned error %ld", n);
    251         mFinalStatus = n;
    252         mCache->releasePage(page);
    253     } else if (n == 0) {
    254         LOGI("ERROR_END_OF_STREAM");
    255         mFinalStatus = ERROR_END_OF_STREAM;
    256         mCache->releasePage(page);
    257     } else {
    258         page->mSize = n;
    259         mCache->appendPage(page);
    260     }
    261 }
    262 
    263 void NuCachedSource2::onFetch() {
    264     LOGV("onFetch");
    265 
    266     if (mFinalStatus != OK) {
    267         LOGV("EOS reached, done prefetching for now");
    268         mFetching = false;
    269     }
    270 
    271     bool keepAlive =
    272         !mFetching
    273             && !mSuspended
    274             && mFinalStatus == OK
    275             && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs;
    276 
    277     if (mFetching || keepAlive) {
    278         if (keepAlive) {
    279             LOGI("Keep alive");
    280         }
    281 
    282         fetchInternal();
    283 
    284         mLastFetchTimeUs = ALooper::GetNowUs();
    285 
    286         if (mFetching && mCache->totalSize() >= kHighWaterThreshold) {
    287             LOGI("Cache full, done prefetching for now");
    288             mFetching = false;
    289         }
    290     } else if (!mSuspended) {
    291         Mutex::Autolock autoLock(mLock);
    292         restartPrefetcherIfNecessary_l();
    293     }
    294 
    295     (new AMessage(kWhatFetchMore, mReflector->id()))->post(
    296             mFetching ? 0 : 100000ll);
    297 }
    298 
    299 void NuCachedSource2::onRead(const sp<AMessage> &msg) {
    300     LOGV("onRead");
    301 
    302     int64_t offset;
    303     CHECK(msg->findInt64("offset", &offset));
    304 
    305     void *data;
    306     CHECK(msg->findPointer("data", &data));
    307 
    308     size_t size;
    309     CHECK(msg->findSize("size", &size));
    310 
    311     ssize_t result = readInternal(offset, data, size);
    312 
    313     if (result == -EAGAIN) {
    314         msg->post(50000);
    315         return;
    316     }
    317 
    318     Mutex::Autolock autoLock(mLock);
    319 
    320     CHECK(mAsyncResult == NULL);
    321 
    322     mAsyncResult = new AMessage;
    323     mAsyncResult->setInt32("result", result);
    324 
    325     mCondition.signal();
    326 }
    327 
    328 void NuCachedSource2::restartPrefetcherIfNecessary_l() {
    329     static const size_t kGrayArea = 256 * 1024;
    330 
    331     if (mFetching || mFinalStatus != OK) {
    332         return;
    333     }
    334 
    335     if (mCacheOffset + mCache->totalSize() - mLastAccessPos
    336             >= kLowWaterThreshold) {
    337         return;
    338     }
    339 
    340     size_t maxBytes = mLastAccessPos - mCacheOffset;
    341     if (maxBytes < kGrayArea) {
    342         return;
    343     }
    344 
    345     maxBytes -= kGrayArea;
    346 
    347     size_t actualBytes = mCache->releaseFromStart(maxBytes);
    348     mCacheOffset += actualBytes;
    349 
    350     LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize());
    351     mFetching = true;
    352 }
    353 
    354 ssize_t NuCachedSource2::readAt(off_t offset, void *data, size_t size) {
    355     Mutex::Autolock autoSerializer(mSerializer);
    356 
    357     LOGV("readAt offset %ld, size %d", offset, size);
    358 
    359     Mutex::Autolock autoLock(mLock);
    360 
    361     // If the request can be completely satisfied from the cache, do so.
    362 
    363     if (offset >= mCacheOffset
    364             && offset + size <= mCacheOffset + mCache->totalSize()) {
    365         size_t delta = offset - mCacheOffset;
    366         mCache->copy(delta, data, size);
    367 
    368         mLastAccessPos = offset + size;
    369 
    370         return size;
    371     }
    372 
    373     sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id());
    374     msg->setInt64("offset", offset);
    375     msg->setPointer("data", data);
    376     msg->setSize("size", size);
    377 
    378     CHECK(mAsyncResult == NULL);
    379     msg->post();
    380 
    381     while (mAsyncResult == NULL) {
    382         mCondition.wait(mLock);
    383     }
    384 
    385     int32_t result;
    386     CHECK(mAsyncResult->findInt32("result", &result));
    387 
    388     mAsyncResult.clear();
    389 
    390     if (result > 0) {
    391         mLastAccessPos = offset + result;
    392     }
    393 
    394     return (ssize_t)result;
    395 }
    396 
    397 size_t NuCachedSource2::cachedSize() {
    398     Mutex::Autolock autoLock(mLock);
    399     return mCacheOffset + mCache->totalSize();
    400 }
    401 
    402 size_t NuCachedSource2::approxDataRemaining(bool *eos) {
    403     Mutex::Autolock autoLock(mLock);
    404     return approxDataRemaining_l(eos);
    405 }
    406 
    407 size_t NuCachedSource2::approxDataRemaining_l(bool *eos) {
    408     *eos = (mFinalStatus != OK);
    409     off_t lastBytePosCached = mCacheOffset + mCache->totalSize();
    410     if (mLastAccessPos < lastBytePosCached) {
    411         return lastBytePosCached - mLastAccessPos;
    412     }
    413     return 0;
    414 }
    415 
    416 ssize_t NuCachedSource2::readInternal(off_t offset, void *data, size_t size) {
    417     LOGV("readInternal offset %ld size %d", offset, size);
    418 
    419     Mutex::Autolock autoLock(mLock);
    420 
    421     if (offset < mCacheOffset
    422             || offset >= (off_t)(mCacheOffset + mCache->totalSize())) {
    423         static const off_t kPadding = 32768;
    424 
    425         // In the presence of multiple decoded streams, once of them will
    426         // trigger this seek request, the other one will request data "nearby"
    427         // soon, adjust the seek position so that that subsequent request
    428         // does not trigger another seek.
    429         off_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
    430 
    431         seekInternal_l(seekOffset);
    432     }
    433 
    434     size_t delta = offset - mCacheOffset;
    435 
    436     if (mFinalStatus != OK) {
    437         if (delta >= mCache->totalSize()) {
    438             return mFinalStatus;
    439         }
    440 
    441         size_t avail = mCache->totalSize() - delta;
    442         mCache->copy(delta, data, avail);
    443 
    444         return avail;
    445     }
    446 
    447     if (offset + size <= mCacheOffset + mCache->totalSize()) {
    448         mCache->copy(delta, data, size);
    449 
    450         return size;
    451     }
    452 
    453     LOGV("deferring read");
    454 
    455     return -EAGAIN;
    456 }
    457 
    458 status_t NuCachedSource2::seekInternal_l(off_t offset) {
    459     mLastAccessPos = offset;
    460 
    461     if (offset >= mCacheOffset
    462             && offset <= (off_t)(mCacheOffset + mCache->totalSize())) {
    463         return OK;
    464     }
    465 
    466     LOGI("new range: offset= %ld", offset);
    467 
    468     mCacheOffset = offset;
    469 
    470     size_t totalSize = mCache->totalSize();
    471     CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
    472 
    473     mFinalStatus = OK;
    474     mFetching = true;
    475 
    476     return OK;
    477 }
    478 
    479 void NuCachedSource2::clearCacheAndResume() {
    480     LOGV("clearCacheAndResume");
    481 
    482     Mutex::Autolock autoLock(mLock);
    483 
    484     CHECK(mSuspended);
    485 
    486     mCacheOffset = 0;
    487     mFinalStatus = OK;
    488     mLastAccessPos = 0;
    489     mLastFetchTimeUs = -1;
    490 
    491     size_t totalSize = mCache->totalSize();
    492     CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
    493 
    494     mFetching = true;
    495     mSuspended = false;
    496 }
    497 
    498 void NuCachedSource2::suspend() {
    499     (new AMessage(kWhatSuspend, mReflector->id()))->post();
    500 
    501     while (!mSuspended) {
    502         usleep(10000);
    503     }
    504 }
    505 
    506 void NuCachedSource2::onSuspend() {
    507     Mutex::Autolock autoLock(mLock);
    508 
    509     mFetching = false;
    510     mSuspended = true;
    511 }
    512 
    513 }  // namespace android
    514 
    515