1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "NuCachedSource2" 19 #include <utils/Log.h> 20 21 #include "include/NuCachedSource2.h" 22 #include "include/HTTPBase.h" 23 24 #include <cutils/properties.h> 25 #include <media/stagefright/foundation/ADebug.h> 26 #include <media/stagefright/foundation/AMessage.h> 27 #include <media/stagefright/MediaErrors.h> 28 29 namespace android { 30 31 struct PageCache { 32 PageCache(size_t pageSize); 33 ~PageCache(); 34 35 struct Page { 36 void *mData; 37 size_t mSize; 38 }; 39 40 Page *acquirePage(); 41 void releasePage(Page *page); 42 43 void appendPage(Page *page); 44 size_t releaseFromStart(size_t maxBytes); 45 46 size_t totalSize() const { 47 return mTotalSize; 48 } 49 50 void copy(size_t from, void *data, size_t size); 51 52 private: 53 size_t mPageSize; 54 size_t mTotalSize; 55 56 List<Page *> mActivePages; 57 List<Page *> mFreePages; 58 59 void freePages(List<Page *> *list); 60 61 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 62 }; 63 64 PageCache::PageCache(size_t pageSize) 65 : mPageSize(pageSize), 66 mTotalSize(0) { 67 } 68 69 PageCache::~PageCache() { 70 freePages(&mActivePages); 71 freePages(&mFreePages); 72 } 73 74 void PageCache::freePages(List<Page *> *list) { 75 List<Page *>::iterator it = list->begin(); 76 while (it != list->end()) { 77 Page *page = *it; 78 79 free(page->mData); 80 delete page; 81 page = NULL; 82 83 ++it; 84 } 85 } 86 87 PageCache::Page *PageCache::acquirePage() { 88 if (!mFreePages.empty()) { 89 List<Page *>::iterator it = mFreePages.begin(); 90 Page *page = *it; 91 mFreePages.erase(it); 92 93 return page; 94 } 95 96 Page *page = new Page; 97 page->mData = malloc(mPageSize); 98 page->mSize = 0; 99 100 return page; 101 } 102 103 void PageCache::releasePage(Page *page) { 104 page->mSize = 0; 105 mFreePages.push_back(page); 106 } 107 108 void PageCache::appendPage(Page *page) { 109 mTotalSize += page->mSize; 110 mActivePages.push_back(page); 111 } 112 113 size_t PageCache::releaseFromStart(size_t maxBytes) { 114 size_t bytesReleased = 0; 115 116 while (maxBytes > 0 && !mActivePages.empty()) { 117 List<Page *>::iterator it = mActivePages.begin(); 118 119 Page *page = *it; 120 121 if (maxBytes < page->mSize) { 122 break; 123 } 124 125 mActivePages.erase(it); 126 127 maxBytes -= page->mSize; 128 bytesReleased += page->mSize; 129 130 releasePage(page); 131 } 132 133 mTotalSize -= bytesReleased; 134 return bytesReleased; 135 } 136 137 void PageCache::copy(size_t from, void *data, size_t size) { 138 ALOGV("copy from %d size %d", from, size); 139 140 if (size == 0) { 141 return; 142 } 143 144 CHECK_LE(from + size, mTotalSize); 145 146 size_t offset = 0; 147 List<Page *>::iterator it = mActivePages.begin(); 148 while (from >= offset + (*it)->mSize) { 149 offset += (*it)->mSize; 150 ++it; 151 } 152 153 size_t delta = from - offset; 154 size_t avail = (*it)->mSize - delta; 155 156 if (avail >= size) { 157 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 158 return; 159 } 160 161 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 162 ++it; 163 data = (uint8_t *)data + avail; 164 size -= avail; 165 166 while (size > 0) { 167 size_t copy = (*it)->mSize; 168 if (copy > size) { 169 copy = size; 170 } 171 memcpy(data, (*it)->mData, copy); 172 data = (uint8_t *)data + copy; 173 size -= copy; 174 ++it; 175 } 176 } 177 178 //////////////////////////////////////////////////////////////////////////////// 179 180 NuCachedSource2::NuCachedSource2( 181 const sp<DataSource> &source, 182 const char *cacheConfig, 183 bool disconnectAtHighwatermark) 184 : mSource(source), 185 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 186 mLooper(new ALooper), 187 mCache(new PageCache(kPageSize)), 188 mCacheOffset(0), 189 mFinalStatus(OK), 190 mLastAccessPos(0), 191 mFetching(true), 192 mLastFetchTimeUs(-1), 193 mNumRetriesLeft(kMaxNumRetries), 194 mHighwaterThresholdBytes(kDefaultHighWaterThreshold), 195 mLowwaterThresholdBytes(kDefaultLowWaterThreshold), 196 mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs), 197 mDisconnectAtHighwatermark(disconnectAtHighwatermark) { 198 // We are NOT going to support disconnect-at-highwatermark indefinitely 199 // and we are not guaranteeing support for client-specified cache 200 // parameters. Both of these are temporary measures to solve a specific 201 // problem that will be solved in a better way going forward. 202 203 updateCacheParamsFromSystemProperty(); 204 205 if (cacheConfig != NULL) { 206 updateCacheParamsFromString(cacheConfig); 207 } 208 209 if (mDisconnectAtHighwatermark) { 210 // Makes no sense to disconnect and do keep-alives... 211 mKeepAliveIntervalUs = 0; 212 } 213 214 mLooper->setName("NuCachedSource2"); 215 mLooper->registerHandler(mReflector); 216 mLooper->start(); 217 218 Mutex::Autolock autoLock(mLock); 219 (new AMessage(kWhatFetchMore, mReflector->id()))->post(); 220 } 221 222 NuCachedSource2::~NuCachedSource2() { 223 mLooper->stop(); 224 mLooper->unregisterHandler(mReflector->id()); 225 226 delete mCache; 227 mCache = NULL; 228 } 229 230 status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) { 231 if (mSource->flags() & kIsHTTPBasedSource) { 232 HTTPBase* source = static_cast<HTTPBase *>(mSource.get()); 233 return source->getEstimatedBandwidthKbps(kbps); 234 } 235 return ERROR_UNSUPPORTED; 236 } 237 238 status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) { 239 if (mSource->flags() & kIsHTTPBasedSource) { 240 HTTPBase *source = static_cast<HTTPBase *>(mSource.get()); 241 return source->setBandwidthStatCollectFreq(freqMs); 242 } 243 return ERROR_UNSUPPORTED; 244 } 245 246 status_t NuCachedSource2::initCheck() const { 247 return mSource->initCheck(); 248 } 249 250 status_t NuCachedSource2::getSize(off64_t *size) { 251 return mSource->getSize(size); 252 } 253 254 uint32_t NuCachedSource2::flags() { 255 // Remove HTTP related flags since NuCachedSource2 is not HTTP-based. 256 uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource); 257 return (flags | kIsCachingDataSource); 258 } 259 260 void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 261 switch (msg->what()) { 262 case kWhatFetchMore: 263 { 264 onFetch(); 265 break; 266 } 267 268 case kWhatRead: 269 { 270 onRead(msg); 271 break; 272 } 273 274 default: 275 TRESPASS(); 276 } 277 } 278 279 void NuCachedSource2::fetchInternal() { 280 ALOGV("fetchInternal"); 281 282 bool reconnect = false; 283 284 { 285 Mutex::Autolock autoLock(mLock); 286 CHECK(mFinalStatus == OK || mNumRetriesLeft > 0); 287 288 if (mFinalStatus != OK) { 289 --mNumRetriesLeft; 290 291 reconnect = true; 292 } 293 } 294 295 if (reconnect) { 296 status_t err = 297 mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize()); 298 299 Mutex::Autolock autoLock(mLock); 300 301 if (err == ERROR_UNSUPPORTED || err == -EPIPE) { 302 // These are errors that are not likely to go away even if we 303 // retry, i.e. the server doesn't support range requests or similar. 304 mNumRetriesLeft = 0; 305 return; 306 } else if (err != OK) { 307 ALOGI("The attempt to reconnect failed, %d retries remaining", 308 mNumRetriesLeft); 309 310 return; 311 } 312 } 313 314 PageCache::Page *page = mCache->acquirePage(); 315 316 ssize_t n = mSource->readAt( 317 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 318 319 Mutex::Autolock autoLock(mLock); 320 321 if (n < 0) { 322 mFinalStatus = n; 323 if (n == ERROR_UNSUPPORTED || n == -EPIPE) { 324 // These are errors that are not likely to go away even if we 325 // retry, i.e. the server doesn't support range requests or similar. 326 mNumRetriesLeft = 0; 327 } 328 329 ALOGE("source returned error %ld, %d retries left", n, mNumRetriesLeft); 330 mCache->releasePage(page); 331 } else if (n == 0) { 332 ALOGI("ERROR_END_OF_STREAM"); 333 334 mNumRetriesLeft = 0; 335 mFinalStatus = ERROR_END_OF_STREAM; 336 337 mCache->releasePage(page); 338 } else { 339 if (mFinalStatus != OK) { 340 ALOGI("retrying a previously failed read succeeded."); 341 } 342 mNumRetriesLeft = kMaxNumRetries; 343 mFinalStatus = OK; 344 345 page->mSize = n; 346 mCache->appendPage(page); 347 } 348 } 349 350 void NuCachedSource2::onFetch() { 351 ALOGV("onFetch"); 352 353 if (mFinalStatus != OK && mNumRetriesLeft == 0) { 354 ALOGV("EOS reached, done prefetching for now"); 355 mFetching = false; 356 } 357 358 bool keepAlive = 359 !mFetching 360 && mFinalStatus == OK 361 && mKeepAliveIntervalUs > 0 362 && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs; 363 364 if (mFetching || keepAlive) { 365 if (keepAlive) { 366 ALOGI("Keep alive"); 367 } 368 369 fetchInternal(); 370 371 mLastFetchTimeUs = ALooper::GetNowUs(); 372 373 if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) { 374 ALOGI("Cache full, done prefetching for now"); 375 mFetching = false; 376 377 if (mDisconnectAtHighwatermark 378 && (mSource->flags() & DataSource::kIsHTTPBasedSource)) { 379 ALOGV("Disconnecting at high watermark"); 380 static_cast<HTTPBase *>(mSource.get())->disconnect(); 381 mFinalStatus = -EAGAIN; 382 } 383 } 384 } else { 385 Mutex::Autolock autoLock(mLock); 386 restartPrefetcherIfNecessary_l(); 387 } 388 389 int64_t delayUs; 390 if (mFetching) { 391 if (mFinalStatus != OK && mNumRetriesLeft > 0) { 392 // We failed this time and will try again in 3 seconds. 393 delayUs = 3000000ll; 394 } else { 395 delayUs = 0; 396 } 397 } else { 398 delayUs = 100000ll; 399 } 400 401 (new AMessage(kWhatFetchMore, mReflector->id()))->post(delayUs); 402 } 403 404 void NuCachedSource2::onRead(const sp<AMessage> &msg) { 405 ALOGV("onRead"); 406 407 int64_t offset; 408 CHECK(msg->findInt64("offset", &offset)); 409 410 void *data; 411 CHECK(msg->findPointer("data", &data)); 412 413 size_t size; 414 CHECK(msg->findSize("size", &size)); 415 416 ssize_t result = readInternal(offset, data, size); 417 418 if (result == -EAGAIN) { 419 msg->post(50000); 420 return; 421 } 422 423 Mutex::Autolock autoLock(mLock); 424 425 CHECK(mAsyncResult == NULL); 426 427 mAsyncResult = new AMessage; 428 mAsyncResult->setInt32("result", result); 429 430 mCondition.signal(); 431 } 432 433 void NuCachedSource2::restartPrefetcherIfNecessary_l( 434 bool ignoreLowWaterThreshold, bool force) { 435 static const size_t kGrayArea = 1024 * 1024; 436 437 if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) { 438 return; 439 } 440 441 if (!ignoreLowWaterThreshold && !force 442 && mCacheOffset + mCache->totalSize() - mLastAccessPos 443 >= mLowwaterThresholdBytes) { 444 return; 445 } 446 447 size_t maxBytes = mLastAccessPos - mCacheOffset; 448 449 if (!force) { 450 if (maxBytes < kGrayArea) { 451 return; 452 } 453 454 maxBytes -= kGrayArea; 455 } 456 457 size_t actualBytes = mCache->releaseFromStart(maxBytes); 458 mCacheOffset += actualBytes; 459 460 ALOGI("restarting prefetcher, totalSize = %d", mCache->totalSize()); 461 mFetching = true; 462 } 463 464 ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) { 465 Mutex::Autolock autoSerializer(mSerializer); 466 467 ALOGV("readAt offset %lld, size %d", offset, size); 468 469 Mutex::Autolock autoLock(mLock); 470 471 // If the request can be completely satisfied from the cache, do so. 472 473 if (offset >= mCacheOffset 474 && offset + size <= mCacheOffset + mCache->totalSize()) { 475 size_t delta = offset - mCacheOffset; 476 mCache->copy(delta, data, size); 477 478 mLastAccessPos = offset + size; 479 480 return size; 481 } 482 483 sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id()); 484 msg->setInt64("offset", offset); 485 msg->setPointer("data", data); 486 msg->setSize("size", size); 487 488 CHECK(mAsyncResult == NULL); 489 msg->post(); 490 491 while (mAsyncResult == NULL) { 492 mCondition.wait(mLock); 493 } 494 495 int32_t result; 496 CHECK(mAsyncResult->findInt32("result", &result)); 497 498 mAsyncResult.clear(); 499 500 if (result > 0) { 501 mLastAccessPos = offset + result; 502 } 503 504 return (ssize_t)result; 505 } 506 507 size_t NuCachedSource2::cachedSize() { 508 Mutex::Autolock autoLock(mLock); 509 return mCacheOffset + mCache->totalSize(); 510 } 511 512 size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const { 513 Mutex::Autolock autoLock(mLock); 514 return approxDataRemaining_l(finalStatus); 515 } 516 517 size_t NuCachedSource2::approxDataRemaining_l(status_t *finalStatus) const { 518 *finalStatus = mFinalStatus; 519 520 if (mFinalStatus != OK && mNumRetriesLeft > 0) { 521 // Pretend that everything is fine until we're out of retries. 522 *finalStatus = OK; 523 } 524 525 off64_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 526 if (mLastAccessPos < lastBytePosCached) { 527 return lastBytePosCached - mLastAccessPos; 528 } 529 return 0; 530 } 531 532 ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) { 533 CHECK_LE(size, (size_t)mHighwaterThresholdBytes); 534 535 ALOGV("readInternal offset %lld size %d", offset, size); 536 537 Mutex::Autolock autoLock(mLock); 538 539 if (!mFetching) { 540 mLastAccessPos = offset; 541 restartPrefetcherIfNecessary_l( 542 false, // ignoreLowWaterThreshold 543 true); // force 544 } 545 546 if (offset < mCacheOffset 547 || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) { 548 static const off64_t kPadding = 256 * 1024; 549 550 // In the presence of multiple decoded streams, once of them will 551 // trigger this seek request, the other one will request data "nearby" 552 // soon, adjust the seek position so that that subsequent request 553 // does not trigger another seek. 554 off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 555 556 seekInternal_l(seekOffset); 557 } 558 559 size_t delta = offset - mCacheOffset; 560 561 if (mFinalStatus != OK && mNumRetriesLeft == 0) { 562 if (delta >= mCache->totalSize()) { 563 return mFinalStatus; 564 } 565 566 size_t avail = mCache->totalSize() - delta; 567 568 if (avail > size) { 569 avail = size; 570 } 571 572 mCache->copy(delta, data, avail); 573 574 return avail; 575 } 576 577 if (offset + size <= mCacheOffset + mCache->totalSize()) { 578 mCache->copy(delta, data, size); 579 580 return size; 581 } 582 583 ALOGV("deferring read"); 584 585 return -EAGAIN; 586 } 587 588 status_t NuCachedSource2::seekInternal_l(off64_t offset) { 589 mLastAccessPos = offset; 590 591 if (offset >= mCacheOffset 592 && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) { 593 return OK; 594 } 595 596 ALOGI("new range: offset= %lld", offset); 597 598 mCacheOffset = offset; 599 600 size_t totalSize = mCache->totalSize(); 601 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 602 603 mNumRetriesLeft = kMaxNumRetries; 604 mFetching = true; 605 606 return OK; 607 } 608 609 void NuCachedSource2::resumeFetchingIfNecessary() { 610 Mutex::Autolock autoLock(mLock); 611 612 restartPrefetcherIfNecessary_l(true /* ignore low water threshold */); 613 } 614 615 sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) { 616 return mSource->DrmInitialization(mime); 617 } 618 619 void NuCachedSource2::getDrmInfo(sp<DecryptHandle> &handle, DrmManagerClient **client) { 620 mSource->getDrmInfo(handle, client); 621 } 622 623 String8 NuCachedSource2::getUri() { 624 return mSource->getUri(); 625 } 626 627 String8 NuCachedSource2::getMIMEType() const { 628 return mSource->getMIMEType(); 629 } 630 631 void NuCachedSource2::updateCacheParamsFromSystemProperty() { 632 char value[PROPERTY_VALUE_MAX]; 633 if (!property_get("media.stagefright.cache-params", value, NULL)) { 634 return; 635 } 636 637 updateCacheParamsFromString(value); 638 } 639 640 void NuCachedSource2::updateCacheParamsFromString(const char *s) { 641 ssize_t lowwaterMarkKb, highwaterMarkKb; 642 int keepAliveSecs; 643 644 if (sscanf(s, "%ld/%ld/%d", 645 &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) { 646 ALOGE("Failed to parse cache parameters from '%s'.", s); 647 return; 648 } 649 650 if (lowwaterMarkKb >= 0) { 651 mLowwaterThresholdBytes = lowwaterMarkKb * 1024; 652 } else { 653 mLowwaterThresholdBytes = kDefaultLowWaterThreshold; 654 } 655 656 if (highwaterMarkKb >= 0) { 657 mHighwaterThresholdBytes = highwaterMarkKb * 1024; 658 } else { 659 mHighwaterThresholdBytes = kDefaultHighWaterThreshold; 660 } 661 662 if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) { 663 ALOGE("Illegal low/highwater marks specified, reverting to defaults."); 664 665 mLowwaterThresholdBytes = kDefaultLowWaterThreshold; 666 mHighwaterThresholdBytes = kDefaultHighWaterThreshold; 667 } 668 669 if (keepAliveSecs >= 0) { 670 mKeepAliveIntervalUs = keepAliveSecs * 1000000ll; 671 } else { 672 mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs; 673 } 674 675 ALOGV("lowwater = %d bytes, highwater = %d bytes, keepalive = %lld us", 676 mLowwaterThresholdBytes, 677 mHighwaterThresholdBytes, 678 mKeepAliveIntervalUs); 679 } 680 681 // static 682 void NuCachedSource2::RemoveCacheSpecificHeaders( 683 KeyedVector<String8, String8> *headers, 684 String8 *cacheConfig, 685 bool *disconnectAtHighwatermark) { 686 *cacheConfig = String8(); 687 *disconnectAtHighwatermark = false; 688 689 if (headers == NULL) { 690 return; 691 } 692 693 ssize_t index; 694 if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) { 695 *cacheConfig = headers->valueAt(index); 696 697 headers->removeItemsAt(index); 698 699 ALOGV("Using special cache config '%s'", cacheConfig->string()); 700 } 701 702 if ((index = headers->indexOfKey( 703 String8("x-disconnect-at-highwatermark"))) >= 0) { 704 *disconnectAtHighwatermark = true; 705 headers->removeItemsAt(index); 706 707 ALOGV("Client requested disconnection at highwater mark"); 708 } 709 } 710 711 } // namespace android 712