1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include <inttypes.h> 18 19 //#define LOG_NDEBUG 0 20 #define LOG_TAG "NuCachedSource2" 21 #include <utils/Log.h> 22 23 #include "include/NuCachedSource2.h" 24 #include "include/HTTPBase.h" 25 26 #include <cutils/properties.h> 27 #include <media/stagefright/foundation/ADebug.h> 28 #include <media/stagefright/foundation/AMessage.h> 29 #include <media/stagefright/MediaErrors.h> 30 31 namespace android { 32 33 struct PageCache { 34 explicit PageCache(size_t pageSize); 35 ~PageCache(); 36 37 struct Page { 38 void *mData; 39 size_t mSize; 40 }; 41 42 Page *acquirePage(); 43 void releasePage(Page *page); 44 45 void appendPage(Page *page); 46 size_t releaseFromStart(size_t maxBytes); 47 48 size_t totalSize() const { 49 return mTotalSize; 50 } 51 52 void copy(size_t from, void *data, size_t size); 53 54 private: 55 size_t mPageSize; 56 size_t mTotalSize; 57 58 List<Page *> mActivePages; 59 List<Page *> mFreePages; 60 61 void freePages(List<Page *> *list); 62 63 DISALLOW_EVIL_CONSTRUCTORS(PageCache); 64 }; 65 66 PageCache::PageCache(size_t pageSize) 67 : mPageSize(pageSize), 68 mTotalSize(0) { 69 } 70 71 PageCache::~PageCache() { 72 freePages(&mActivePages); 73 freePages(&mFreePages); 74 } 75 76 void PageCache::freePages(List<Page *> *list) { 77 List<Page *>::iterator it = list->begin(); 78 while (it != list->end()) { 79 Page *page = *it; 80 81 free(page->mData); 82 delete page; 83 page = NULL; 84 85 ++it; 86 } 87 } 88 89 PageCache::Page *PageCache::acquirePage() { 90 if (!mFreePages.empty()) { 91 List<Page *>::iterator it = mFreePages.begin(); 92 Page *page = *it; 93 mFreePages.erase(it); 94 95 return page; 96 } 97 98 Page *page = new Page; 99 page->mData = malloc(mPageSize); 100 page->mSize = 0; 101 102 return page; 103 } 104 105 void PageCache::releasePage(Page *page) { 106 page->mSize = 0; 107 mFreePages.push_back(page); 108 } 109 110 void PageCache::appendPage(Page *page) { 111 mTotalSize += page->mSize; 112 mActivePages.push_back(page); 113 } 114 115 size_t PageCache::releaseFromStart(size_t maxBytes) { 116 size_t bytesReleased = 0; 117 118 while (maxBytes > 0 && !mActivePages.empty()) { 119 List<Page *>::iterator it = mActivePages.begin(); 120 121 Page *page = *it; 122 123 if (maxBytes < page->mSize) { 124 break; 125 } 126 127 mActivePages.erase(it); 128 129 maxBytes -= page->mSize; 130 bytesReleased += page->mSize; 131 132 releasePage(page); 133 } 134 135 mTotalSize -= bytesReleased; 136 return bytesReleased; 137 } 138 139 void PageCache::copy(size_t from, void *data, size_t size) { 140 ALOGV("copy from %zu size %zu", from, size); 141 142 if (size == 0) { 143 return; 144 } 145 146 CHECK_LE(from + size, mTotalSize); 147 148 size_t offset = 0; 149 List<Page *>::iterator it = mActivePages.begin(); 150 while (from >= offset + (*it)->mSize) { 151 offset += (*it)->mSize; 152 ++it; 153 } 154 155 size_t delta = from - offset; 156 size_t avail = (*it)->mSize - delta; 157 158 if (avail >= size) { 159 memcpy(data, (const uint8_t *)(*it)->mData + delta, size); 160 return; 161 } 162 163 memcpy(data, (const uint8_t *)(*it)->mData + delta, avail); 164 ++it; 165 data = (uint8_t *)data + avail; 166 size -= avail; 167 168 while (size > 0) { 169 size_t copy = (*it)->mSize; 170 if (copy > size) { 171 copy = size; 172 } 173 memcpy(data, (*it)->mData, copy); 174 data = (uint8_t *)data + copy; 175 size -= copy; 176 ++it; 177 } 178 } 179 180 //////////////////////////////////////////////////////////////////////////////// 181 182 NuCachedSource2::NuCachedSource2( 183 const sp<DataSource> &source, 184 const char *cacheConfig, 185 bool disconnectAtHighwatermark) 186 : mSource(source), 187 mReflector(new AHandlerReflector<NuCachedSource2>(this)), 188 mLooper(new ALooper), 189 mCache(new PageCache(kPageSize)), 190 mCacheOffset(0), 191 mFinalStatus(OK), 192 mLastAccessPos(0), 193 mFetching(true), 194 mDisconnecting(false), 195 mLastFetchTimeUs(-1), 196 mNumRetriesLeft(kMaxNumRetries), 197 mHighwaterThresholdBytes(kDefaultHighWaterThreshold), 198 mLowwaterThresholdBytes(kDefaultLowWaterThreshold), 199 mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs), 200 mDisconnectAtHighwatermark(disconnectAtHighwatermark) { 201 // We are NOT going to support disconnect-at-highwatermark indefinitely 202 // and we are not guaranteeing support for client-specified cache 203 // parameters. Both of these are temporary measures to solve a specific 204 // problem that will be solved in a better way going forward. 205 206 updateCacheParamsFromSystemProperty(); 207 208 if (cacheConfig != NULL) { 209 updateCacheParamsFromString(cacheConfig); 210 } 211 212 if (mDisconnectAtHighwatermark) { 213 // Makes no sense to disconnect and do keep-alives... 214 mKeepAliveIntervalUs = 0; 215 } 216 217 mLooper->setName("NuCachedSource2"); 218 mLooper->registerHandler(mReflector); 219 220 // Since it may not be obvious why our looper thread needs to be 221 // able to call into java since it doesn't appear to do so at all... 222 // IMediaHTTPConnection may be (and most likely is) implemented in JAVA 223 // and a local JAVA IBinder will call directly into JNI methods. 224 // So whenever we call DataSource::readAt it may end up in a call to 225 // IMediaHTTPConnection::readAt and therefore call back into JAVA. 226 mLooper->start(false /* runOnCallingThread */, true /* canCallJava */); 227 228 mName = String8::format("NuCachedSource2(%s)", mSource->toString().string()); 229 } 230 231 NuCachedSource2::~NuCachedSource2() { 232 mLooper->stop(); 233 mLooper->unregisterHandler(mReflector->id()); 234 235 delete mCache; 236 mCache = NULL; 237 } 238 239 // static 240 sp<NuCachedSource2> NuCachedSource2::Create( 241 const sp<DataSource> &source, 242 const char *cacheConfig, 243 bool disconnectAtHighwatermark) { 244 sp<NuCachedSource2> instance = new NuCachedSource2( 245 source, cacheConfig, disconnectAtHighwatermark); 246 Mutex::Autolock autoLock(instance->mLock); 247 (new AMessage(kWhatFetchMore, instance->mReflector))->post(); 248 return instance; 249 } 250 251 status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) { 252 if (mSource->flags() & kIsHTTPBasedSource) { 253 HTTPBase* source = static_cast<HTTPBase *>(mSource.get()); 254 return source->getEstimatedBandwidthKbps(kbps); 255 } 256 return ERROR_UNSUPPORTED; 257 } 258 259 void NuCachedSource2::close() { 260 disconnect(); 261 } 262 263 void NuCachedSource2::disconnect() { 264 if (mSource->flags() & kIsHTTPBasedSource) { 265 ALOGV("disconnecting HTTPBasedSource"); 266 267 { 268 Mutex::Autolock autoLock(mLock); 269 // set mDisconnecting to true, if a fetch returns after 270 // this, the source will be marked as EOS. 271 mDisconnecting = true; 272 273 // explicitly signal mCondition so that the pending readAt() 274 // will immediately return 275 mCondition.signal(); 276 } 277 278 // explicitly disconnect from the source, to allow any 279 // pending reads to return more promptly 280 static_cast<HTTPBase *>(mSource.get())->disconnect(); 281 } 282 } 283 284 status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) { 285 if (mSource->flags() & kIsHTTPBasedSource) { 286 HTTPBase *source = static_cast<HTTPBase *>(mSource.get()); 287 return source->setBandwidthStatCollectFreq(freqMs); 288 } 289 return ERROR_UNSUPPORTED; 290 } 291 292 status_t NuCachedSource2::initCheck() const { 293 return mSource->initCheck(); 294 } 295 296 status_t NuCachedSource2::getSize(off64_t *size) { 297 return mSource->getSize(size); 298 } 299 300 uint32_t NuCachedSource2::flags() { 301 // Remove HTTP related flags since NuCachedSource2 is not HTTP-based. 302 uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource); 303 return (flags | kIsCachingDataSource); 304 } 305 306 void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) { 307 switch (msg->what()) { 308 case kWhatFetchMore: 309 { 310 onFetch(); 311 break; 312 } 313 314 case kWhatRead: 315 { 316 onRead(msg); 317 break; 318 } 319 320 default: 321 TRESPASS(); 322 } 323 } 324 325 void NuCachedSource2::fetchInternal() { 326 ALOGV("fetchInternal"); 327 328 bool reconnect = false; 329 330 { 331 Mutex::Autolock autoLock(mLock); 332 CHECK(mFinalStatus == OK || mNumRetriesLeft > 0); 333 334 if (mFinalStatus != OK) { 335 --mNumRetriesLeft; 336 337 reconnect = true; 338 } 339 } 340 341 if (reconnect) { 342 status_t err = 343 mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize()); 344 345 Mutex::Autolock autoLock(mLock); 346 347 if (mDisconnecting) { 348 mNumRetriesLeft = 0; 349 mFinalStatus = ERROR_END_OF_STREAM; 350 return; 351 } else if (err == ERROR_UNSUPPORTED || err == -EPIPE) { 352 // These are errors that are not likely to go away even if we 353 // retry, i.e. the server doesn't support range requests or similar. 354 mNumRetriesLeft = 0; 355 return; 356 } else if (err != OK) { 357 ALOGI("The attempt to reconnect failed, %d retries remaining", 358 mNumRetriesLeft); 359 360 return; 361 } 362 } 363 364 PageCache::Page *page = mCache->acquirePage(); 365 366 ssize_t n = mSource->readAt( 367 mCacheOffset + mCache->totalSize(), page->mData, kPageSize); 368 369 Mutex::Autolock autoLock(mLock); 370 371 if (n == 0 || mDisconnecting) { 372 ALOGI("caching reached eos."); 373 374 mNumRetriesLeft = 0; 375 mFinalStatus = ERROR_END_OF_STREAM; 376 377 mCache->releasePage(page); 378 } else if (n < 0) { 379 mFinalStatus = n; 380 if (n == ERROR_UNSUPPORTED || n == -EPIPE) { 381 // These are errors that are not likely to go away even if we 382 // retry, i.e. the server doesn't support range requests or similar. 383 mNumRetriesLeft = 0; 384 } 385 386 ALOGE("source returned error %zd, %d retries left", n, mNumRetriesLeft); 387 mCache->releasePage(page); 388 } else { 389 if (mFinalStatus != OK) { 390 ALOGI("retrying a previously failed read succeeded."); 391 } 392 mNumRetriesLeft = kMaxNumRetries; 393 mFinalStatus = OK; 394 395 page->mSize = n; 396 mCache->appendPage(page); 397 } 398 } 399 400 void NuCachedSource2::onFetch() { 401 ALOGV("onFetch"); 402 403 if (mFinalStatus != OK && mNumRetriesLeft == 0) { 404 ALOGV("EOS reached, done prefetching for now"); 405 mFetching = false; 406 } 407 408 bool keepAlive = 409 !mFetching 410 && mFinalStatus == OK 411 && mKeepAliveIntervalUs > 0 412 && ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs; 413 414 if (mFetching || keepAlive) { 415 if (keepAlive) { 416 ALOGI("Keep alive"); 417 } 418 419 fetchInternal(); 420 421 mLastFetchTimeUs = ALooper::GetNowUs(); 422 423 if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) { 424 ALOGI("Cache full, done prefetching for now"); 425 mFetching = false; 426 427 if (mDisconnectAtHighwatermark 428 && (mSource->flags() & DataSource::kIsHTTPBasedSource)) { 429 ALOGV("Disconnecting at high watermark"); 430 static_cast<HTTPBase *>(mSource.get())->disconnect(); 431 mFinalStatus = -EAGAIN; 432 } 433 } 434 } else { 435 Mutex::Autolock autoLock(mLock); 436 restartPrefetcherIfNecessary_l(); 437 } 438 439 int64_t delayUs; 440 if (mFetching) { 441 if (mFinalStatus != OK && mNumRetriesLeft > 0) { 442 // We failed this time and will try again in 3 seconds. 443 delayUs = 3000000LL; 444 } else { 445 delayUs = 0; 446 } 447 } else { 448 delayUs = 100000LL; 449 } 450 451 (new AMessage(kWhatFetchMore, mReflector))->post(delayUs); 452 } 453 454 void NuCachedSource2::onRead(const sp<AMessage> &msg) { 455 ALOGV("onRead"); 456 457 int64_t offset; 458 CHECK(msg->findInt64("offset", &offset)); 459 460 void *data; 461 CHECK(msg->findPointer("data", &data)); 462 463 size_t size; 464 CHECK(msg->findSize("size", &size)); 465 466 ssize_t result = readInternal(offset, data, size); 467 468 if (result == -EAGAIN) { 469 msg->post(50000); 470 return; 471 } 472 473 Mutex::Autolock autoLock(mLock); 474 if (mDisconnecting) { 475 mCondition.signal(); 476 return; 477 } 478 479 CHECK(mAsyncResult == NULL); 480 481 mAsyncResult = new AMessage; 482 mAsyncResult->setInt32("result", result); 483 484 mCondition.signal(); 485 } 486 487 void NuCachedSource2::restartPrefetcherIfNecessary_l( 488 bool ignoreLowWaterThreshold, bool force) { 489 static const size_t kGrayArea = 1024 * 1024; 490 491 if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) { 492 return; 493 } 494 495 if (!ignoreLowWaterThreshold && !force 496 && mCacheOffset + mCache->totalSize() - mLastAccessPos 497 >= mLowwaterThresholdBytes) { 498 return; 499 } 500 501 size_t maxBytes = mLastAccessPos - mCacheOffset; 502 503 if (!force) { 504 if (maxBytes < kGrayArea) { 505 return; 506 } 507 508 maxBytes -= kGrayArea; 509 } 510 511 size_t actualBytes = mCache->releaseFromStart(maxBytes); 512 mCacheOffset += actualBytes; 513 514 ALOGI("restarting prefetcher, totalSize = %zu", mCache->totalSize()); 515 mFetching = true; 516 } 517 518 ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) { 519 Mutex::Autolock autoSerializer(mSerializer); 520 521 ALOGV("readAt offset %lld, size %zu", (long long)offset, size); 522 523 Mutex::Autolock autoLock(mLock); 524 if (mDisconnecting) { 525 return ERROR_END_OF_STREAM; 526 } 527 528 // If the request can be completely satisfied from the cache, do so. 529 530 if (offset >= mCacheOffset 531 && offset + size <= mCacheOffset + mCache->totalSize()) { 532 size_t delta = offset - mCacheOffset; 533 mCache->copy(delta, data, size); 534 535 mLastAccessPos = offset + size; 536 537 return size; 538 } 539 540 sp<AMessage> msg = new AMessage(kWhatRead, mReflector); 541 msg->setInt64("offset", offset); 542 msg->setPointer("data", data); 543 msg->setSize("size", size); 544 545 CHECK(mAsyncResult == NULL); 546 msg->post(); 547 548 while (mAsyncResult == NULL && !mDisconnecting) { 549 mCondition.wait(mLock); 550 } 551 552 if (mDisconnecting) { 553 mAsyncResult.clear(); 554 return ERROR_END_OF_STREAM; 555 } 556 557 int32_t result; 558 CHECK(mAsyncResult->findInt32("result", &result)); 559 560 mAsyncResult.clear(); 561 562 if (result > 0) { 563 mLastAccessPos = offset + result; 564 } 565 566 return (ssize_t)result; 567 } 568 569 size_t NuCachedSource2::cachedSize() { 570 Mutex::Autolock autoLock(mLock); 571 return mCacheOffset + mCache->totalSize(); 572 } 573 574 status_t NuCachedSource2::getAvailableSize(off64_t offset, off64_t *size) { 575 Mutex::Autolock autoLock(mLock); 576 status_t finalStatus = UNKNOWN_ERROR; 577 *size = approxDataRemaining_l(offset, &finalStatus); 578 return finalStatus; 579 } 580 581 size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const { 582 Mutex::Autolock autoLock(mLock); 583 return approxDataRemaining_l(mLastAccessPos, finalStatus); 584 } 585 586 size_t NuCachedSource2::approxDataRemaining_l(off64_t offset, status_t *finalStatus) const { 587 *finalStatus = mFinalStatus; 588 589 if (mFinalStatus != OK && mNumRetriesLeft > 0) { 590 // Pretend that everything is fine until we're out of retries. 591 *finalStatus = OK; 592 } 593 594 offset = offset >= 0 ? offset : mLastAccessPos; 595 off64_t lastBytePosCached = mCacheOffset + mCache->totalSize(); 596 if (offset < lastBytePosCached) { 597 return lastBytePosCached - offset; 598 } 599 return 0; 600 } 601 602 ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) { 603 CHECK_LE(size, (size_t)mHighwaterThresholdBytes); 604 605 ALOGV("readInternal offset %lld size %zu", (long long)offset, size); 606 607 Mutex::Autolock autoLock(mLock); 608 609 // If we're disconnecting, return EOS and don't access *data pointer. 610 // data could be on the stack of the caller to NuCachedSource2::readAt(), 611 // which may have exited already. 612 if (mDisconnecting) { 613 return ERROR_END_OF_STREAM; 614 } 615 616 if (!mFetching) { 617 mLastAccessPos = offset; 618 restartPrefetcherIfNecessary_l( 619 false, // ignoreLowWaterThreshold 620 true); // force 621 } 622 623 if (offset < mCacheOffset 624 || offset >= (off64_t)(mCacheOffset + mCache->totalSize())) { 625 static const off64_t kPadding = 256 * 1024; 626 627 // In the presence of multiple decoded streams, once of them will 628 // trigger this seek request, the other one will request data "nearby" 629 // soon, adjust the seek position so that that subsequent request 630 // does not trigger another seek. 631 off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0; 632 633 seekInternal_l(seekOffset); 634 } 635 636 size_t delta = offset - mCacheOffset; 637 638 if (mFinalStatus != OK && mNumRetriesLeft == 0) { 639 if (delta >= mCache->totalSize()) { 640 return mFinalStatus; 641 } 642 643 size_t avail = mCache->totalSize() - delta; 644 645 if (avail > size) { 646 avail = size; 647 } 648 649 mCache->copy(delta, data, avail); 650 651 return avail; 652 } 653 654 if (offset + size <= mCacheOffset + mCache->totalSize()) { 655 mCache->copy(delta, data, size); 656 657 return size; 658 } 659 660 ALOGV("deferring read"); 661 662 return -EAGAIN; 663 } 664 665 status_t NuCachedSource2::seekInternal_l(off64_t offset) { 666 mLastAccessPos = offset; 667 668 if (offset >= mCacheOffset 669 && offset <= (off64_t)(mCacheOffset + mCache->totalSize())) { 670 return OK; 671 } 672 673 ALOGI("new range: offset= %lld", (long long)offset); 674 675 mCacheOffset = offset; 676 677 size_t totalSize = mCache->totalSize(); 678 CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize); 679 680 mNumRetriesLeft = kMaxNumRetries; 681 mFetching = true; 682 683 return OK; 684 } 685 686 void NuCachedSource2::resumeFetchingIfNecessary() { 687 Mutex::Autolock autoLock(mLock); 688 689 restartPrefetcherIfNecessary_l(true /* ignore low water threshold */); 690 } 691 692 sp<DecryptHandle> NuCachedSource2::DrmInitialization(const char* mime) { 693 return mSource->DrmInitialization(mime); 694 } 695 696 String8 NuCachedSource2::getUri() { 697 return mSource->getUri(); 698 } 699 700 String8 NuCachedSource2::getMIMEType() const { 701 return mSource->getMIMEType(); 702 } 703 704 void NuCachedSource2::updateCacheParamsFromSystemProperty() { 705 char value[PROPERTY_VALUE_MAX]; 706 if (!property_get("media.stagefright.cache-params", value, NULL)) { 707 return; 708 } 709 710 updateCacheParamsFromString(value); 711 } 712 713 void NuCachedSource2::updateCacheParamsFromString(const char *s) { 714 ssize_t lowwaterMarkKb, highwaterMarkKb; 715 int keepAliveSecs; 716 717 if (sscanf(s, "%zd/%zd/%d", 718 &lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) { 719 ALOGE("Failed to parse cache parameters from '%s'.", s); 720 return; 721 } 722 723 if (lowwaterMarkKb >= 0) { 724 mLowwaterThresholdBytes = lowwaterMarkKb * 1024; 725 } else { 726 mLowwaterThresholdBytes = kDefaultLowWaterThreshold; 727 } 728 729 if (highwaterMarkKb >= 0) { 730 mHighwaterThresholdBytes = highwaterMarkKb * 1024; 731 } else { 732 mHighwaterThresholdBytes = kDefaultHighWaterThreshold; 733 } 734 735 if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) { 736 ALOGE("Illegal low/highwater marks specified, reverting to defaults."); 737 738 mLowwaterThresholdBytes = kDefaultLowWaterThreshold; 739 mHighwaterThresholdBytes = kDefaultHighWaterThreshold; 740 } 741 742 if (keepAliveSecs >= 0) { 743 mKeepAliveIntervalUs = keepAliveSecs * 1000000LL; 744 } else { 745 mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs; 746 } 747 748 ALOGV("lowwater = %zu bytes, highwater = %zu bytes, keepalive = %lld us", 749 mLowwaterThresholdBytes, 750 mHighwaterThresholdBytes, 751 (long long)mKeepAliveIntervalUs); 752 } 753 754 // static 755 void NuCachedSource2::RemoveCacheSpecificHeaders( 756 KeyedVector<String8, String8> *headers, 757 String8 *cacheConfig, 758 bool *disconnectAtHighwatermark) { 759 *cacheConfig = String8(); 760 *disconnectAtHighwatermark = false; 761 762 if (headers == NULL) { 763 return; 764 } 765 766 ssize_t index; 767 if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) { 768 *cacheConfig = headers->valueAt(index); 769 770 headers->removeItemsAt(index); 771 772 ALOGV("Using special cache config '%s'", cacheConfig->string()); 773 } 774 775 if ((index = headers->indexOfKey( 776 String8("x-disconnect-at-highwatermark"))) >= 0) { 777 *disconnectAtHighwatermark = true; 778 headers->removeItemsAt(index); 779 780 ALOGV("Client requested disconnection at highwater mark"); 781 } 782 } 783 784 } // namespace android 785