1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define LOG_TAG "NuCachedSource2" 18 #include <utils/Log.h> 19 20 #include "include/NuCachedSource2.h" 21 22 #include <media/stagefright/foundation/ADebug.h> 23 #include <media/stagefright/foundation/AMessage.h> 24 #include <media/stagefright/MediaErrors.h> 25 26 namespace android { 27 28 struct PageCache { 29 PageCache(size_t pageSize); 30 ~PageCache(); 31 32 struct Page { 33 void *mData; 34 size_t mSize; 35 }; 36 37 Page *acquirePage(); 38 void releasePage(Page *page); 39 40 void appendPage(Page *page); 41 size_t releaseFromStart(size_t maxBytes); 42 43 size_t totalSize() const { 44 return mTotalSize; 45 } 46 47 void copy(size_t from, void *data, size_t size); 48 49 private: 50 size_t mPageSize; 51 size_t mTotalSize; 52 53 List<Page *> mActivePages; 54 List<Page *> mFreePages; 55 56 void freePages(List<Page *> *list); 57 58 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 59 }; 60 61 PageCache::PageCache(size_t pageSize) 62 : mPageSize(pageSize), 63 mTotalSize(0) { 64 } 65 66 PageCache::~PageCache() { 67 freePages(&mActivePages); 68 freePages(&mFreePages); 69 } 70 71 void PageCache::freePages(List<Page *> *list) { 72 List<Page *>::iterator it = list->begin(); 73 while (it != list->end()) { 74 Page *page = *it; 75 76 free(page->mData); 77 delete page; 78 page = NULL; 79 80 ++it; 81 } 82 } 83 84 PageCache::Page *PageCache::acquirePage() { 85 if (!mFreePages.empty()) { 86 List<Page *>::iterator it = mFreePages.begin(); 87 Page *page = *it; 88 mFreePages.erase(it); 89 90 return page; 91 } 92 93 Page *page = new Page; 94 page->mData = malloc(mPageSize); 95 page->mSize = 0; 96 97 return page; 98 } 99 100 void PageCache::releasePage(Page *page) { 101 page->mSize = 0; 102 mFreePages.push_back(page); 103 } 104 105 void PageCache::appendPage(Page *page) { 106 mTotalSize += page->mSize; 107 mActivePages.push_back(page); 108 } 109 110 size_t PageCache::releaseFromStart(size_t maxBytes) { 111 size_t bytesReleased = 0; 112 113 while (maxBytes > 0 && !mActivePages.empty()) { 114 List<Page *>::iterator it = mActivePages.begin(); 115 116 Page *page = *it; 117 118 if (maxBytes < page->mSize) { 119 break; 120 } 121 122 mActivePages.erase(it); 123 124 maxBytes -= page->mSize; 125 bytesReleased += page->mSize; 126 127 releasePage(page); 128 } 129 130 mTotalSize -= bytesReleased; 131 return bytesReleased; 132 } 133 134 void PageCache::copy(size_t from, void *data, size_t size) { 135 LOGV("copy from %d size %d", from, size); 136 137 CHECK_LE(from + size, mTotalSize); 138 139 size_t offset = 0; 140 List<Page *>::iterator it = mActivePages.begin(); 141 while (from >= offset + (*it)->mSize) { 142 offset += (*it)->mSize; 143 ++it; 144 } 145 146 size_t delta = from - offset; 147 size_t avail = (*it)->mSize - delta; 148 149 if (avail >= size) { 150 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 151 return; 152 } 153 154 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 155 ++it; 156 data = (uint8_t *)data + avail; 157 size -= avail; 158 159 while (size > 0) { 160 size_t copy = (*it)->mSize; 161 if (copy > size) { 162 copy = size; 163 } 164 memcpy(data, (*it)->mData, copy); 165 data = (uint8_t *)data + copy; 166 size -= copy; 167 ++it; 168 } 169 } 170 171 //////////////////////////////////////////////////////////////////////////////// 172 173 NuCachedSource2::NuCachedSource2(const sp<DataSource> &source) 174 : mSource(source), 175 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 176 mLooper(new ALooper), 177 mCache(new PageCache(kPageSize)), 178 mCacheOffset(0), 179 mFinalStatus(OK), 180 mLastAccessPos(0), 181 mFetching(true), 182 mLastFetchTimeUs(-1), 183 mSuspended(false) { 184 mLooper->setName("NuCachedSource2"); 185 mLooper->registerHandler(mReflector); 186 mLooper->start(); 187 188 Mutex::Autolock autoLock(mLock); 189 (new AMessage(kWhatFetchMore, mReflector->id()))->post(); 190 } 191 192 NuCachedSource2::~NuCachedSource2() { 193 mLooper->stop(); 194 mLooper->unregisterHandler(mReflector->id()); 195 196 delete mCache; 197 mCache = NULL; 198 } 199 200 status_t NuCachedSource2::initCheck() const { 201 return mSource->initCheck(); 202 } 203 204 status_t NuCachedSource2::getSize(off_t *size) { 205 return mSource->getSize(size); 206 } 207 208 uint32_t NuCachedSource2::flags() { 209 return (mSource->flags() & ~kWantsPrefetching) | kIsCachingDataSource; 210 } 211 212 void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 213 switch (msg->what()) { 214 case kWhatFetchMore: 215 { 216 onFetch(); 217 break; 218 } 219 220 case kWhatRead: 221 { 222 onRead(msg); 223 break; 224 } 225 226 case kWhatSuspend: 227 { 228 onSuspend(); 229 break; 230 } 231 232 default: 233 TRESPASS(); 234 } 235 } 236 237 void NuCachedSource2::fetchInternal() { 238 LOGV("fetchInternal"); 239 240 CHECK_EQ(mFinalStatus, (status_t)OK); 241 242 PageCache::Page *page = mCache->acquirePage(); 243 244 ssize_t n = mSource->readAt( 245 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 246 247 Mutex::Autolock autoLock(mLock); 248 249 if (n < 0) { 250 LOGE("source returned error %ld", n); 251 mFinalStatus = n; 252 mCache->releasePage(page); 253 } else if (n == 0) { 254 LOGI("ERROR_END_OF_STREAM"); 255 mFinalStatus = ERROR_END_OF_STREAM; 256 mCache->releasePage(page); 257 } else { 258 page->mSize = n; 259 mCache->appendPage(page); 260 } 261 } 262 263 void NuCachedSource2::onFetch() { 264 LOGV("onFetch"); 265 266 if (mFinalStatus != OK) { 267 LOGV("EOS reached, done prefetching for now"); 268 mFetching = false; 269 } 270 271 bool keepAlive = 272 !mFetching 273 && !mSuspended 274 && mFinalStatus == OK 275 && ALooper::GetNowUs() >= mLastFetchTimeUs + kKeepAliveIntervalUs; 276 277 if (mFetching || keepAlive) { 278 if (keepAlive) { 279 LOGI("Keep alive"); 280 } 281 282 fetchInternal(); 283 284 mLastFetchTimeUs = ALooper::GetNowUs(); 285 286 if (mFetching && mCache->totalSize() >= kHighWaterThreshold) { 287 LOGI("Cache full, done prefetching for now"); 288 mFetching = false; 289 } 290 } else if (!mSuspended) { 291 Mutex::Autolock autoLock(mLock); 292 restartPrefetcherIfNecessary_l(); 293 } 294 295 (new AMessage(kWhatFetchMore, mReflector->id()))->post( 296 mFetching ? 0 : 100000ll); 297 } 298 299 void NuCachedSource2::onRead(const sp<AMessage> &msg) { 300 LOGV("onRead"); 301 302 int64_t offset; 303 CHECK(msg->findInt64("offset", &offset)); 304 305 void *data; 306 CHECK(msg->findPointer("data", &data)); 307 308 size_t size; 309 CHECK(msg->findSize("size", &size)); 310 311 ssize_t result = readInternal(offset, data, size); 312 313 if (result == -EAGAIN) { 314 msg->post(50000); 315 return; 316 } 317 318 Mutex::Autolock autoLock(mLock); 319 320 CHECK(mAsyncResult == NULL); 321 322 mAsyncResult = new AMessage; 323 mAsyncResult->setInt32("result", result); 324 325 mCondition.signal(); 326 } 327 328 void NuCachedSource2::restartPrefetcherIfNecessary_l() { 329 static const size_t kGrayArea = 256 * 1024; 330 331 if (mFetching || mFinalStatus != OK) { 332 return; 333 } 334 335 if (mCacheOffset + mCache->totalSize() - mLastAccessPos 336 >= kLowWaterThreshold) { 337 return; 338 } 339 340 size_t maxBytes = mLastAccessPos - mCacheOffset; 341 if (maxBytes < kGrayArea) { 342 return; 343 } 344 345 maxBytes -= kGrayArea; 346 347 size_t actualBytes = mCache->releaseFromStart(maxBytes); 348 mCacheOffset += actualBytes; 349 350 LOGI("restarting prefetcher, totalSize = %d", mCache->totalSize()); 351 mFetching = true; 352 } 353 354 ssize_t NuCachedSource2::readAt(off_t offset, void *data, size_t size) { 355 Mutex::Autolock autoSerializer(mSerializer); 356 357 LOGV("readAt offset %ld, size %d", offset, size); 358 359 Mutex::Autolock autoLock(mLock); 360 361 // If the request can be completely satisfied from the cache, do so. 362 363 if (offset >= mCacheOffset 364 && offset + size <= mCacheOffset + mCache->totalSize()) { 365 size_t delta = offset - mCacheOffset; 366 mCache->copy(delta, data, size); 367 368 mLastAccessPos = offset + size; 369 370 return size; 371 } 372 373 sp<AMessage> msg = new AMessage(kWhatRead, mReflector->id()); 374 msg->setInt64("offset", offset); 375 msg->setPointer("data", data); 376 msg->setSize("size", size); 377 378 CHECK(mAsyncResult == NULL); 379 msg->post(); 380 381 while (mAsyncResult == NULL) { 382 mCondition.wait(mLock); 383 } 384 385 int32_t result; 386 CHECK(mAsyncResult->findInt32("result", &result)); 387 388 mAsyncResult.clear(); 389 390 if (result > 0) { 391 mLastAccessPos = offset + result; 392 } 393 394 return (ssize_t)result; 395 } 396 397 size_t NuCachedSource2::cachedSize() { 398 Mutex::Autolock autoLock(mLock); 399 return mCacheOffset + mCache->totalSize(); 400 } 401 402 size_t NuCachedSource2::approxDataRemaining(bool *eos) { 403 Mutex::Autolock autoLock(mLock); 404 return approxDataRemaining_l(eos); 405 } 406 407 size_t NuCachedSource2::approxDataRemaining_l(bool *eos) { 408 *eos = (mFinalStatus != OK); 409 off_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 410 if (mLastAccessPos < lastBytePosCached) { 411 return lastBytePosCached - mLastAccessPos; 412 } 413 return 0; 414 } 415 416 ssize_t NuCachedSource2::readInternal(off_t offset, void *data, size_t size) { 417 LOGV("readInternal offset %ld size %d", offset, size); 418 419 Mutex::Autolock autoLock(mLock); 420 421 if (offset < mCacheOffset 422 || offset >= (off_t)(mCacheOffset + mCache->totalSize())) { 423 static const off_t kPadding = 32768; 424 425 // In the presence of multiple decoded streams, once of them will 426 // trigger this seek request, the other one will request data "nearby" 427 // soon, adjust the seek position so that that subsequent request 428 // does not trigger another seek. 429 off_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 430 431 seekInternal_l(seekOffset); 432 } 433 434 size_t delta = offset - mCacheOffset; 435 436 if (mFinalStatus != OK) { 437 if (delta >= mCache->totalSize()) { 438 return mFinalStatus; 439 } 440 441 size_t avail = mCache->totalSize() - delta; 442 mCache->copy(delta, data, avail); 443 444 return avail; 445 } 446 447 if (offset + size <= mCacheOffset + mCache->totalSize()) { 448 mCache->copy(delta, data, size); 449 450 return size; 451 } 452 453 LOGV("deferring read"); 454 455 return -EAGAIN; 456 } 457 458 status_t NuCachedSource2::seekInternal_l(off_t offset) { 459 mLastAccessPos = offset; 460 461 if (offset >= mCacheOffset 462 && offset <= (off_t)(mCacheOffset + mCache->totalSize())) { 463 return OK; 464 } 465 466 LOGI("new range: offset= %ld", offset); 467 468 mCacheOffset = offset; 469 470 size_t totalSize = mCache->totalSize(); 471 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 472 473 mFinalStatus = OK; 474 mFetching = true; 475 476 return OK; 477 } 478 479 void NuCachedSource2::clearCacheAndResume() { 480 LOGV("clearCacheAndResume"); 481 482 Mutex::Autolock autoLock(mLock); 483 484 CHECK(mSuspended); 485 486 mCacheOffset = 0; 487 mFinalStatus = OK; 488 mLastAccessPos = 0; 489 mLastFetchTimeUs = -1; 490 491 size_t totalSize = mCache->totalSize(); 492 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 493 494 mFetching = true; 495 mSuspended = false; 496 } 497 498 void NuCachedSource2::suspend() { 499 (new AMessage(kWhatSuspend, mReflector->id()))->post(); 500 501 while (!mSuspended) { 502 usleep(10000); 503 } 504 } 505 506 void NuCachedSource2::onSuspend() { 507 Mutex::Autolock autoLock(mLock); 508 509 mFetching = false; 510 mSuspended = true; 511 } 512 513 } // namespace android 514 515