1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "LiveSession" 19 #include <utils/Log.h> 20 21 #include "LiveSession.h" 22 #include "HTTPDownloader.h" 23 #include "M3UParser.h" 24 #include "PlaylistFetcher.h" 25 26 #include "mpeg2ts/AnotherPacketSource.h" 27 28 #include <cutils/properties.h> 29 #include <media/IMediaHTTPService.h> 30 #include <media/stagefright/foundation/ABuffer.h> 31 #include <media/stagefright/foundation/ADebug.h> 32 #include <media/stagefright/foundation/AMessage.h> 33 #include <media/stagefright/foundation/AUtils.h> 34 #include <media/stagefright/MediaDefs.h> 35 #include <media/stagefright/MetaData.h> 36 #include <media/stagefright/Utils.h> 37 38 #include <utils/Mutex.h> 39 40 #include <ctype.h> 41 #include <inttypes.h> 42 43 namespace android { 44 45 // static 46 // Bandwidth Switch Mark Defaults 47 const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll; 48 const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll; 49 const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll; 50 const int64_t LiveSession::kResumeThresholdUs = 100000ll; 51 52 struct LiveSession::BandwidthEstimator : public RefBase { 53 BandwidthEstimator(); 54 55 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); 56 bool estimateBandwidth( 57 int32_t *bandwidth, 58 bool *isStable = NULL, 59 int32_t *shortTermBps = NULL); 60 61 private: 62 // Bandwidth estimation parameters 63 static const int32_t kShortTermBandwidthItems = 3; 64 static const int32_t kMinBandwidthHistoryItems = 20; 65 static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec 66 static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec 67 static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec 68 69 struct BandwidthEntry { 70 int64_t mTimestampUs; 71 int64_t mDelayUs; 72 size_t mNumBytes; 73 }; 74 75 Mutex mLock; 76 List<BandwidthEntry> mBandwidthHistory; 77 List<int32_t> mPrevEstimates; 78 int32_t mShortTermEstimate; 79 bool mHasNewSample; 80 bool mIsStable; 81 int64_t mTotalTransferTimeUs; 82 size_t mTotalTransferBytes; 83 84 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator); 85 }; 86 87 LiveSession::BandwidthEstimator::BandwidthEstimator() : 88 mShortTermEstimate(0), 89 mHasNewSample(false), 90 mIsStable(true), 91 mTotalTransferTimeUs(0), 92 mTotalTransferBytes(0) { 93 } 94 95 void LiveSession::BandwidthEstimator::addBandwidthMeasurement( 96 size_t numBytes, int64_t delayUs) { 97 AutoMutex autoLock(mLock); 98 99 int64_t nowUs = ALooper::GetNowUs(); 100 BandwidthEntry entry; 101 entry.mTimestampUs = nowUs; 102 entry.mDelayUs = delayUs; 103 entry.mNumBytes = numBytes; 104 mTotalTransferTimeUs += delayUs; 105 mTotalTransferBytes += numBytes; 106 mBandwidthHistory.push_back(entry); 107 mHasNewSample = true; 108 109 // Remove no more than 10% of total transfer time at a time 110 // to avoid sudden jump on bandwidth estimation. There might 111 // be long blocking reads that takes up signification time, 112 // we have to keep a longer window in that case. 113 int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10; 114 if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) { 115 bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs; 116 } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) { 117 bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs; 118 } 119 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples, 120 // and total transfer time at least kMaxBandwidthHistoryWindowUs. 121 while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) { 122 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 123 // remove sample if either absolute age or total transfer time is 124 // over kMaxBandwidthHistoryWindowUs 125 if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs && 126 mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) { 127 break; 128 } 129 mTotalTransferTimeUs -= it->mDelayUs; 130 mTotalTransferBytes -= it->mNumBytes; 131 mBandwidthHistory.erase(mBandwidthHistory.begin()); 132 } 133 } 134 135 bool LiveSession::BandwidthEstimator::estimateBandwidth( 136 int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) { 137 AutoMutex autoLock(mLock); 138 139 if (mBandwidthHistory.size() < 2) { 140 return false; 141 } 142 143 if (!mHasNewSample) { 144 *bandwidthBps = *(--mPrevEstimates.end()); 145 if (isStable) { 146 *isStable = mIsStable; 147 } 148 if (shortTermBps) { 149 *shortTermBps = mShortTermEstimate; 150 } 151 return true; 152 } 153 154 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs); 155 mPrevEstimates.push_back(*bandwidthBps); 156 while (mPrevEstimates.size() > 3) { 157 mPrevEstimates.erase(mPrevEstimates.begin()); 158 } 159 mHasNewSample = false; 160 161 int64_t totalTimeUs = 0; 162 size_t totalBytes = 0; 163 if (mBandwidthHistory.size() >= kShortTermBandwidthItems) { 164 List<BandwidthEntry>::iterator it = --mBandwidthHistory.end(); 165 for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) { 166 totalTimeUs += it->mDelayUs; 167 totalBytes += it->mNumBytes; 168 } 169 } 170 mShortTermEstimate = totalTimeUs > 0 ? 171 (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps; 172 if (shortTermBps) { 173 *shortTermBps = mShortTermEstimate; 174 } 175 176 int64_t minEstimate = -1, maxEstimate = -1; 177 List<int32_t>::iterator it; 178 for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) { 179 int32_t estimate = *it; 180 if (minEstimate < 0 || minEstimate > estimate) { 181 minEstimate = estimate; 182 } 183 if (maxEstimate < 0 || maxEstimate < estimate) { 184 maxEstimate = estimate; 185 } 186 } 187 // consider it stable if long-term average is not jumping a lot 188 // and short-term average is not much lower than long-term average 189 mIsStable = (maxEstimate <= minEstimate * 4 / 3) 190 && mShortTermEstimate > minEstimate * 7 / 10; 191 if (isStable) { 192 *isStable = mIsStable; 193 } 194 195 #if 0 196 { 197 char dumpStr[1024] = {0}; 198 size_t itemIdx = 0; 199 size_t histSize = mBandwidthHistory.size(); 200 sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {", 201 *bandwidthBps, mIsStable, histSize); 202 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 203 for (; it != mBandwidthHistory.end(); ++it) { 204 if (itemIdx > 50) { 205 sprintf(dumpStr + strlen(dumpStr), 206 "...(%zd more items)... }", histSize - itemIdx); 207 break; 208 } 209 sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s", 210 it->mNumBytes / 1024, 211 (double)it->mDelayUs * 1.0e-6, 212 (it == (--mBandwidthHistory.end())) ? "}" : ", "); 213 itemIdx++; 214 } 215 ALOGE(dumpStr); 216 } 217 #endif 218 return true; 219 } 220 221 //static 222 const char *LiveSession::getKeyForStream(StreamType type) { 223 switch (type) { 224 case STREAMTYPE_VIDEO: 225 return "timeUsVideo"; 226 case STREAMTYPE_AUDIO: 227 return "timeUsAudio"; 228 case STREAMTYPE_SUBTITLES: 229 return "timeUsSubtitle"; 230 case STREAMTYPE_METADATA: 231 return "timeUsMetadata"; // unused 232 default: 233 TRESPASS(); 234 } 235 return NULL; 236 } 237 238 //static 239 const char *LiveSession::getNameForStream(StreamType type) { 240 switch (type) { 241 case STREAMTYPE_VIDEO: 242 return "video"; 243 case STREAMTYPE_AUDIO: 244 return "audio"; 245 case STREAMTYPE_SUBTITLES: 246 return "subs"; 247 case STREAMTYPE_METADATA: 248 return "metadata"; 249 default: 250 break; 251 } 252 return "unknown"; 253 } 254 255 //static 256 ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) { 257 switch (type) { 258 case STREAMTYPE_VIDEO: 259 return ATSParser::VIDEO; 260 case STREAMTYPE_AUDIO: 261 return ATSParser::AUDIO; 262 case STREAMTYPE_METADATA: 263 return ATSParser::META; 264 case STREAMTYPE_SUBTITLES: 265 default: 266 TRESPASS(); 267 } 268 return ATSParser::NUM_SOURCE_TYPES; // should not reach here 269 } 270 271 LiveSession::LiveSession( 272 const sp<AMessage> ¬ify, uint32_t flags, 273 const sp<IMediaHTTPService> &httpService) 274 : mNotify(notify), 275 mFlags(flags), 276 mHTTPService(httpService), 277 mBuffering(false), 278 mInPreparationPhase(true), 279 mPollBufferingGeneration(0), 280 mPrevBufferPercentage(-1), 281 mCurBandwidthIndex(-1), 282 mOrigBandwidthIndex(-1), 283 mLastBandwidthBps(-1ll), 284 mLastBandwidthStable(false), 285 mBandwidthEstimator(new BandwidthEstimator()), 286 mMaxWidth(720), 287 mMaxHeight(480), 288 mStreamMask(0), 289 mNewStreamMask(0), 290 mSwapMask(0), 291 mSwitchGeneration(0), 292 mSubtitleGeneration(0), 293 mLastDequeuedTimeUs(0ll), 294 mRealTimeBaseUs(0ll), 295 mReconfigurationInProgress(false), 296 mSwitchInProgress(false), 297 mUpSwitchMark(kUpSwitchMarkUs), 298 mDownSwitchMark(kDownSwitchMarkUs), 299 mUpSwitchMargin(kUpSwitchMarginUs), 300 mFirstTimeUsValid(false), 301 mFirstTimeUs(0), 302 mLastSeekTimeUs(0), 303 mHasMetadata(false) { 304 mStreams[kAudioIndex] = StreamItem("audio"); 305 mStreams[kVideoIndex] = StreamItem("video"); 306 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 307 308 for (size_t i = 0; i < kNumSources; ++i) { 309 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 310 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 311 } 312 } 313 314 LiveSession::~LiveSession() { 315 if (mFetcherLooper != NULL) { 316 mFetcherLooper->stop(); 317 } 318 } 319 320 int64_t LiveSession::calculateMediaTimeUs( 321 int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) { 322 if (timeUs >= firstTimeUs) { 323 timeUs -= firstTimeUs; 324 } else { 325 timeUs = 0; 326 } 327 timeUs += mLastSeekTimeUs; 328 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 329 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 330 } 331 return timeUs; 332 } 333 334 status_t LiveSession::dequeueAccessUnit( 335 StreamType stream, sp<ABuffer> *accessUnit) { 336 status_t finalResult = OK; 337 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 338 339 ssize_t streamIdx = typeToIndex(stream); 340 if (streamIdx < 0) { 341 return BAD_VALUE; 342 } 343 const char *streamStr = getNameForStream(stream); 344 // Do not let client pull data if we don't have data packets yet. 345 // We might only have a format discontinuity queued without data. 346 // When NuPlayerDecoder dequeues the format discontinuity, it will 347 // immediately try to getFormat. If we return NULL, NuPlayerDecoder 348 // thinks it can do seamless change, so will not shutdown decoder. 349 // When the actual format arrives, it can't handle it and get stuck. 350 if (!packetSource->hasDataBufferAvailable(&finalResult)) { 351 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)", 352 streamStr, finalResult); 353 354 if (finalResult == OK) { 355 return -EAGAIN; 356 } else { 357 return finalResult; 358 } 359 } 360 361 // Let the client dequeue as long as we have buffers available 362 // Do not make pause/resume decisions here. 363 364 status_t err = packetSource->dequeueAccessUnit(accessUnit); 365 366 if (err == INFO_DISCONTINUITY) { 367 // adaptive streaming, discontinuities in the playlist 368 int32_t type; 369 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 370 371 sp<AMessage> extra; 372 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 373 extra.clear(); 374 } 375 376 ALOGI("[%s] read discontinuity of type %d, extra = %s", 377 streamStr, 378 type, 379 extra == NULL ? "NULL" : extra->debugString().c_str()); 380 } else if (err == OK) { 381 382 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 383 int64_t timeUs, originalTimeUs; 384 int32_t discontinuitySeq = 0; 385 StreamItem& strm = mStreams[streamIdx]; 386 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 387 originalTimeUs = timeUs; 388 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 389 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { 390 int64_t offsetTimeUs; 391 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 392 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); 393 } else { 394 offsetTimeUs = 0; 395 } 396 397 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0 398 && strm.mLastDequeuedTimeUs >= 0) { 399 int64_t firstTimeUs; 400 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 401 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 402 offsetTimeUs += strm.mLastSampleDurationUs; 403 } else { 404 offsetTimeUs += strm.mLastSampleDurationUs; 405 } 406 407 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); 408 strm.mCurDiscontinuitySeq = discontinuitySeq; 409 } 410 411 int32_t discard = 0; 412 int64_t firstTimeUs; 413 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 414 int64_t durUs; // approximate sample duration 415 if (timeUs > strm.mLastDequeuedTimeUs) { 416 durUs = timeUs - strm.mLastDequeuedTimeUs; 417 } else { 418 durUs = strm.mLastDequeuedTimeUs - timeUs; 419 } 420 strm.mLastSampleDurationUs = durUs; 421 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 422 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 423 firstTimeUs = timeUs; 424 } else { 425 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 426 firstTimeUs = timeUs; 427 } 428 429 strm.mLastDequeuedTimeUs = timeUs; 430 timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq); 431 432 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us", 433 streamStr, (long long)timeUs, (long long)originalTimeUs); 434 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 435 mLastDequeuedTimeUs = timeUs; 436 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 437 } else if (stream == STREAMTYPE_SUBTITLES) { 438 int32_t subtitleGeneration; 439 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 440 && subtitleGeneration != mSubtitleGeneration) { 441 return -EAGAIN; 442 }; 443 (*accessUnit)->meta()->setInt32( 444 "trackIndex", mPlaylist->getSelectedIndex()); 445 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 446 } else if (stream == STREAMTYPE_METADATA) { 447 HLSTime mdTime((*accessUnit)->meta()); 448 if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) { 449 packetSource->requeueAccessUnit((*accessUnit)); 450 return -EAGAIN; 451 } else { 452 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq); 453 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq); 454 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 455 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 456 } 457 } 458 } else { 459 ALOGI("[%s] encountered error %d", streamStr, err); 460 } 461 462 return err; 463 } 464 465 status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) { 466 if (!(mStreamMask & stream)) { 467 return UNKNOWN_ERROR; 468 } 469 470 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 471 472 *meta = packetSource->getFormat(); 473 474 if (*meta == NULL) { 475 return -EWOULDBLOCK; 476 } 477 478 if (stream == STREAMTYPE_AUDIO) { 479 // set AAC input buffer size to 32K bytes (256kbps x 1sec) 480 (*meta)->setInt32(kKeyMaxInputSize, 32 * 1024); 481 } else if (stream == STREAMTYPE_VIDEO) { 482 (*meta)->setInt32(kKeyMaxWidth, mMaxWidth); 483 (*meta)->setInt32(kKeyMaxHeight, mMaxHeight); 484 } 485 486 return OK; 487 } 488 489 sp<HTTPDownloader> LiveSession::getHTTPDownloader() { 490 return new HTTPDownloader(mHTTPService, mExtraHeaders); 491 } 492 493 void LiveSession::setBufferingSettings( 494 const BufferingSettings &buffering) { 495 sp<AMessage> msg = new AMessage(kWhatSetBufferingSettings, this); 496 writeToAMessage(msg, buffering); 497 msg->post(); 498 } 499 500 void LiveSession::connectAsync( 501 const char *url, const KeyedVector<String8, String8> *headers) { 502 sp<AMessage> msg = new AMessage(kWhatConnect, this); 503 msg->setString("url", url); 504 505 if (headers != NULL) { 506 msg->setPointer( 507 "headers", 508 new KeyedVector<String8, String8>(*headers)); 509 } 510 511 msg->post(); 512 } 513 514 status_t LiveSession::disconnect() { 515 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 516 517 sp<AMessage> response; 518 status_t err = msg->postAndAwaitResponse(&response); 519 520 return err; 521 } 522 523 status_t LiveSession::seekTo(int64_t timeUs, MediaPlayerSeekMode mode) { 524 sp<AMessage> msg = new AMessage(kWhatSeek, this); 525 msg->setInt64("timeUs", timeUs); 526 msg->setInt32("mode", mode); 527 528 sp<AMessage> response; 529 status_t err = msg->postAndAwaitResponse(&response); 530 531 return err; 532 } 533 534 bool LiveSession::checkSwitchProgress( 535 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) { 536 AString newUri; 537 CHECK(stopParams->findString("uri", &newUri)); 538 539 *needResumeUntil = false; 540 sp<AMessage> firstNewMeta[kMaxStreams]; 541 for (size_t i = 0; i < kMaxStreams; ++i) { 542 StreamType stream = indexToType(i); 543 if (!(mSwapMask & mNewStreamMask & stream) 544 || (mStreams[i].mNewUri != newUri)) { 545 continue; 546 } 547 if (stream == STREAMTYPE_SUBTITLES) { 548 continue; 549 } 550 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i); 551 552 // First, get latest dequeued meta, which is where the decoder is at. 553 // (when upswitching, we take the meta after a certain delay, so that 554 // the decoder is left with some cushion) 555 sp<AMessage> lastDequeueMeta, lastEnqueueMeta; 556 if (delayUs > 0) { 557 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); 558 if (lastDequeueMeta == NULL) { 559 // this means we don't have enough cushion, try again later 560 ALOGV("[%s] up switching failed due to insufficient buffer", 561 getNameForStream(stream)); 562 return false; 563 } 564 } else { 565 // It's okay for lastDequeueMeta to be NULL here, it means the 566 // decoder hasn't even started dequeueing 567 lastDequeueMeta = source->getLatestDequeuedMeta(); 568 } 569 // Then, trim off packets at beginning of mPacketSources2 that's before 570 // the latest dequeued time. These samples are definitely too late. 571 firstNewMeta[i] = mPacketSources2.editValueAt(i) 572 ->trimBuffersBeforeMeta(lastDequeueMeta); 573 574 // Now firstNewMeta[i] is the first sample after the trim. 575 // If it's NULL, we failed because dequeue already past all samples 576 // in mPacketSource2, we have to try again. 577 if (firstNewMeta[i] == NULL) { 578 HLSTime dequeueTime(lastDequeueMeta); 579 ALOGV("[%s] dequeue time (%d, %lld) past start time", 580 getNameForStream(stream), 581 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs); 582 return false; 583 } 584 585 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher 586 // already fetched, and see if we need to resumeUntil 587 lastEnqueueMeta = source->getLatestEnqueuedMeta(); 588 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity 589 // boundary, no need to resume as the content will look different anyways 590 if (lastEnqueueMeta != NULL) { 591 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]); 592 593 // no need to resume old fetcher if new fetcher started in different 594 // discontinuity sequence, as the content will look different. 595 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq 596 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs); 597 598 // update the stopTime for resumeUntil 599 stopParams->setInt32("discontinuitySeq", startTime.mSeq); 600 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs); 601 } 602 } 603 604 // if we're here, it means dequeue progress hasn't passed some samples in 605 // mPacketSource2, we can trim off the excess in mPacketSource. 606 // (old fetcher might still need to resumeUntil the start time of new fetcher) 607 for (size_t i = 0; i < kMaxStreams; ++i) { 608 StreamType stream = indexToType(i); 609 if (!(mSwapMask & mNewStreamMask & stream) 610 || (newUri != mStreams[i].mNewUri) 611 || stream == STREAMTYPE_SUBTITLES) { 612 continue; 613 } 614 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]); 615 } 616 617 // no resumeUntil if already underflow 618 *needResumeUntil &= !mBuffering; 619 620 return true; 621 } 622 623 void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 624 switch (msg->what()) { 625 case kWhatSetBufferingSettings: 626 { 627 readFromAMessage(msg, &mBufferingSettings); 628 break; 629 } 630 631 case kWhatConnect: 632 { 633 onConnect(msg); 634 break; 635 } 636 637 case kWhatDisconnect: 638 { 639 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 640 641 if (mReconfigurationInProgress) { 642 break; 643 } 644 645 finishDisconnect(); 646 break; 647 } 648 649 case kWhatSeek: 650 { 651 if (mReconfigurationInProgress) { 652 msg->post(50000); 653 break; 654 } 655 656 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 657 mSeekReply = new AMessage; 658 659 onSeek(msg); 660 break; 661 } 662 663 case kWhatFetcherNotify: 664 { 665 int32_t what; 666 CHECK(msg->findInt32("what", &what)); 667 668 switch (what) { 669 case PlaylistFetcher::kWhatStarted: 670 break; 671 case PlaylistFetcher::kWhatPaused: 672 case PlaylistFetcher::kWhatStopped: 673 { 674 AString uri; 675 CHECK(msg->findString("uri", &uri)); 676 ssize_t index = mFetcherInfos.indexOfKey(uri); 677 if (index < 0) { 678 // ignore msgs from fetchers that's already gone 679 break; 680 } 681 682 ALOGV("fetcher-%d %s", 683 mFetcherInfos[index].mFetcher->getFetcherID(), 684 what == PlaylistFetcher::kWhatPaused ? 685 "paused" : "stopped"); 686 687 if (what == PlaylistFetcher::kWhatStopped) { 688 mFetcherLooper->unregisterHandler( 689 mFetcherInfos[index].mFetcher->id()); 690 mFetcherInfos.removeItemsAt(index); 691 } else if (what == PlaylistFetcher::kWhatPaused) { 692 int32_t seekMode; 693 CHECK(msg->findInt32("seekMode", &seekMode)); 694 for (size_t i = 0; i < kMaxStreams; ++i) { 695 if (mStreams[i].mUri == uri) { 696 mStreams[i].mSeekMode = (SeekMode) seekMode; 697 } 698 } 699 } 700 701 if (mContinuation != NULL) { 702 CHECK_GT(mContinuationCounter, 0u); 703 if (--mContinuationCounter == 0) { 704 mContinuation->post(); 705 } 706 ALOGV("%zu fetcher(s) left", mContinuationCounter); 707 } 708 break; 709 } 710 711 case PlaylistFetcher::kWhatDurationUpdate: 712 { 713 AString uri; 714 CHECK(msg->findString("uri", &uri)); 715 716 int64_t durationUs; 717 CHECK(msg->findInt64("durationUs", &durationUs)); 718 719 ssize_t index = mFetcherInfos.indexOfKey(uri); 720 if (index >= 0) { 721 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 722 info->mDurationUs = durationUs; 723 } 724 break; 725 } 726 727 case PlaylistFetcher::kWhatTargetDurationUpdate: 728 { 729 int64_t targetDurationUs; 730 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); 731 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4); 732 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4); 733 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs); 734 break; 735 } 736 737 case PlaylistFetcher::kWhatError: 738 { 739 status_t err; 740 CHECK(msg->findInt32("err", &err)); 741 742 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 743 744 // handle EOS on subtitle tracks independently 745 AString uri; 746 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 747 ssize_t i = mFetcherInfos.indexOfKey(uri); 748 if (i >= 0) { 749 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 750 if (fetcher != NULL) { 751 uint32_t type = fetcher->getStreamTypeMask(); 752 if (type == STREAMTYPE_SUBTITLES) { 753 mPacketSources.valueFor( 754 STREAMTYPE_SUBTITLES)->signalEOS(err);; 755 break; 756 } 757 } 758 } 759 } 760 761 // remember the failure index (as mCurBandwidthIndex will be restored 762 // after cancelBandwidthSwitch()), and record last fail time 763 size_t failureIndex = mCurBandwidthIndex; 764 mBandwidthItems.editItemAt( 765 failureIndex).mLastFailureUs = ALooper::GetNowUs(); 766 767 if (mSwitchInProgress) { 768 // if error happened when we switch to a variant, try fallback 769 // to other variant to save the session 770 if (tryBandwidthFallback()) { 771 break; 772 } 773 } 774 775 if (mInPreparationPhase) { 776 postPrepared(err); 777 } 778 779 cancelBandwidthSwitch(); 780 781 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 782 783 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 784 785 mPacketSources.valueFor( 786 STREAMTYPE_SUBTITLES)->signalEOS(err); 787 788 postError(err); 789 break; 790 } 791 792 case PlaylistFetcher::kWhatStopReached: 793 { 794 ALOGV("kWhatStopReached"); 795 796 AString oldUri; 797 CHECK(msg->findString("uri", &oldUri)); 798 799 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 800 if (index < 0) { 801 break; 802 } 803 804 tryToFinishBandwidthSwitch(oldUri); 805 break; 806 } 807 808 case PlaylistFetcher::kWhatStartedAt: 809 { 810 int32_t switchGeneration; 811 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 812 813 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d", 814 switchGeneration, mSwitchGeneration); 815 816 if (switchGeneration != mSwitchGeneration) { 817 break; 818 } 819 820 AString uri; 821 CHECK(msg->findString("uri", &uri)); 822 823 // mark new fetcher mToBeResumed 824 ssize_t index = mFetcherInfos.indexOfKey(uri); 825 if (index >= 0) { 826 mFetcherInfos.editValueAt(index).mToBeResumed = true; 827 } 828 829 // temporarily disable packet sources to be swapped to prevent 830 // NuPlayerDecoder from dequeuing while we check progress 831 for (size_t i = 0; i < mPacketSources.size(); ++i) { 832 if ((mSwapMask & mPacketSources.keyAt(i)) 833 && uri == mStreams[i].mNewUri) { 834 mPacketSources.editValueAt(i)->enable(false); 835 } 836 } 837 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); 838 // If switching up, require a cushion bigger than kUnderflowMark 839 // to avoid buffering immediately after the switch. 840 // (If we don't have that cushion we'd rather cancel and try again.) 841 int64_t delayUs = 842 switchUp ? 843 (mBufferingSettings.mRebufferingWatermarkLowMs * 1000ll + 1000000ll) 844 : 0; 845 bool needResumeUntil = false; 846 sp<AMessage> stopParams = msg; 847 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { 848 // playback time hasn't passed startAt time 849 if (!needResumeUntil) { 850 ALOGV("finish switch"); 851 for (size_t i = 0; i < kMaxStreams; ++i) { 852 if ((mSwapMask & indexToType(i)) 853 && uri == mStreams[i].mNewUri) { 854 // have to make a copy of mStreams[i].mUri because 855 // tryToFinishBandwidthSwitch is modifying mStreams[] 856 AString oldURI = mStreams[i].mUri; 857 tryToFinishBandwidthSwitch(oldURI); 858 break; 859 } 860 } 861 } else { 862 // startAt time is after last enqueue time 863 // Resume fetcher for the original variant; the resumed fetcher should 864 // continue until the timestamps found in msg, which is stored by the 865 // new fetcher to indicate where the new variant has started buffering. 866 ALOGV("finish switch with resumeUntilAsync"); 867 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 868 const FetcherInfo &info = mFetcherInfos.valueAt(i); 869 if (info.mToBeRemoved) { 870 info.mFetcher->resumeUntilAsync(stopParams); 871 } 872 } 873 } 874 } else { 875 // playback time passed startAt time 876 if (switchUp) { 877 // if switching up, cancel and retry if condition satisfies again 878 ALOGV("cancel up switch because we're too late"); 879 cancelBandwidthSwitch(true /* resume */); 880 } else { 881 ALOGV("retry down switch at next sample"); 882 resumeFetcher(uri, mSwapMask, -1, true /* newUri */); 883 } 884 } 885 // re-enable all packet sources 886 for (size_t i = 0; i < mPacketSources.size(); ++i) { 887 mPacketSources.editValueAt(i)->enable(true); 888 } 889 890 break; 891 } 892 893 case PlaylistFetcher::kWhatPlaylistFetched: 894 { 895 onMasterPlaylistFetched(msg); 896 break; 897 } 898 899 case PlaylistFetcher::kWhatMetadataDetected: 900 { 901 if (!mHasMetadata) { 902 mHasMetadata = true; 903 sp<AMessage> notify = mNotify->dup(); 904 notify->setInt32("what", kWhatMetadataDetected); 905 notify->post(); 906 } 907 break; 908 } 909 910 default: 911 TRESPASS(); 912 } 913 914 break; 915 } 916 917 case kWhatChangeConfiguration: 918 { 919 onChangeConfiguration(msg); 920 break; 921 } 922 923 case kWhatChangeConfiguration2: 924 { 925 onChangeConfiguration2(msg); 926 break; 927 } 928 929 case kWhatChangeConfiguration3: 930 { 931 onChangeConfiguration3(msg); 932 break; 933 } 934 935 case kWhatPollBuffering: 936 { 937 int32_t generation; 938 CHECK(msg->findInt32("generation", &generation)); 939 if (generation == mPollBufferingGeneration) { 940 onPollBuffering(); 941 } 942 break; 943 } 944 945 default: 946 TRESPASS(); 947 break; 948 } 949 } 950 951 // static 952 bool LiveSession::isBandwidthValid(const BandwidthItem &item) { 953 static const int64_t kBlacklistWindowUs = 300 * 1000000ll; 954 return item.mLastFailureUs < 0 955 || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs; 956 } 957 958 // static 959 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 960 if (a->mBandwidth < b->mBandwidth) { 961 return -1; 962 } else if (a->mBandwidth == b->mBandwidth) { 963 return 0; 964 } 965 966 return 1; 967 } 968 969 // static 970 LiveSession::StreamType LiveSession::indexToType(int idx) { 971 CHECK(idx >= 0 && idx < kNumSources); 972 return (StreamType)(1 << idx); 973 } 974 975 // static 976 ssize_t LiveSession::typeToIndex(int32_t type) { 977 switch (type) { 978 case STREAMTYPE_AUDIO: 979 return 0; 980 case STREAMTYPE_VIDEO: 981 return 1; 982 case STREAMTYPE_SUBTITLES: 983 return 2; 984 case STREAMTYPE_METADATA: 985 return 3; 986 default: 987 return -1; 988 }; 989 return -1; 990 } 991 992 void LiveSession::onConnect(const sp<AMessage> &msg) { 993 CHECK(msg->findString("url", &mMasterURL)); 994 995 // TODO currently we don't know if we are coming here from incognito mode 996 ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str()); 997 998 KeyedVector<String8, String8> *headers = NULL; 999 if (!msg->findPointer("headers", (void **)&headers)) { 1000 mExtraHeaders.clear(); 1001 } else { 1002 mExtraHeaders = *headers; 1003 1004 delete headers; 1005 headers = NULL; 1006 } 1007 1008 // create looper for fetchers 1009 if (mFetcherLooper == NULL) { 1010 mFetcherLooper = new ALooper(); 1011 1012 mFetcherLooper->setName("Fetcher"); 1013 mFetcherLooper->start(false, false); 1014 } 1015 1016 // create fetcher to fetch the master playlist 1017 addFetcher(mMasterURL.c_str())->fetchPlaylistAsync(); 1018 } 1019 1020 void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) { 1021 AString uri; 1022 CHECK(msg->findString("uri", &uri)); 1023 ssize_t index = mFetcherInfos.indexOfKey(uri); 1024 if (index < 0) { 1025 ALOGW("fetcher for master playlist is gone."); 1026 return; 1027 } 1028 1029 // no longer useful, remove 1030 mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id()); 1031 mFetcherInfos.removeItemsAt(index); 1032 1033 CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist)); 1034 if (mPlaylist == NULL) { 1035 ALOGE("unable to fetch master playlist %s.", 1036 uriDebugString(mMasterURL).c_str()); 1037 1038 postPrepared(ERROR_IO); 1039 return; 1040 } 1041 // We trust the content provider to make a reasonable choice of preferred 1042 // initial bandwidth by listing it first in the variant playlist. 1043 // At startup we really don't have a good estimate on the available 1044 // network bandwidth since we haven't tranferred any data yet. Once 1045 // we have we can make a better informed choice. 1046 size_t initialBandwidth = 0; 1047 size_t initialBandwidthIndex = 0; 1048 1049 int32_t maxWidth = 0; 1050 int32_t maxHeight = 0; 1051 1052 if (mPlaylist->isVariantPlaylist()) { 1053 Vector<BandwidthItem> itemsWithVideo; 1054 for (size_t i = 0; i < mPlaylist->size(); ++i) { 1055 BandwidthItem item; 1056 1057 item.mPlaylistIndex = i; 1058 item.mLastFailureUs = -1ll; 1059 1060 sp<AMessage> meta; 1061 AString uri; 1062 mPlaylist->itemAt(i, &uri, &meta); 1063 1064 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 1065 1066 int32_t width, height; 1067 if (meta->findInt32("width", &width)) { 1068 maxWidth = max(maxWidth, width); 1069 } 1070 if (meta->findInt32("height", &height)) { 1071 maxHeight = max(maxHeight, height); 1072 } 1073 1074 mBandwidthItems.push(item); 1075 if (mPlaylist->hasType(i, "video")) { 1076 itemsWithVideo.push(item); 1077 } 1078 } 1079 // remove the audio-only variants if we have at least one with video 1080 if (!itemsWithVideo.empty() 1081 && itemsWithVideo.size() < mBandwidthItems.size()) { 1082 mBandwidthItems.clear(); 1083 for (size_t i = 0; i < itemsWithVideo.size(); ++i) { 1084 mBandwidthItems.push(itemsWithVideo[i]); 1085 } 1086 } 1087 1088 CHECK_GT(mBandwidthItems.size(), 0u); 1089 initialBandwidth = mBandwidthItems[0].mBandwidth; 1090 1091 mBandwidthItems.sort(SortByBandwidth); 1092 1093 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 1094 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 1095 initialBandwidthIndex = i; 1096 break; 1097 } 1098 } 1099 } else { 1100 // dummy item. 1101 BandwidthItem item; 1102 item.mPlaylistIndex = 0; 1103 item.mBandwidth = 0; 1104 mBandwidthItems.push(item); 1105 } 1106 1107 mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth; 1108 mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight; 1109 1110 mPlaylist->pickRandomMediaItems(); 1111 changeConfiguration( 1112 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 1113 } 1114 1115 void LiveSession::finishDisconnect() { 1116 ALOGV("finishDisconnect"); 1117 1118 // No reconfiguration is currently pending, make sure none will trigger 1119 // during disconnection either. 1120 cancelBandwidthSwitch(); 1121 1122 // cancel buffer polling 1123 cancelPollBuffering(); 1124 1125 // TRICKY: don't wait for all fetcher to be stopped when disconnecting 1126 // 1127 // Some fetchers might be stuck in connect/getSize at this point. These 1128 // operations will eventually timeout (as we have a timeout set in 1129 // MediaHTTPConnection), but we don't want to block the main UI thread 1130 // until then. Here we just need to make sure we clear all references 1131 // to the fetchers, so that when they finally exit from the blocking 1132 // operation, they can be destructed. 1133 // 1134 // There is one very tricky point though. For this scheme to work, the 1135 // fecther must hold a reference to LiveSession, so that LiveSession is 1136 // destroyed after fetcher. Otherwise LiveSession would get stuck in its 1137 // own destructor when it waits for mFetcherLooper to stop, which still 1138 // blocks main UI thread. 1139 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1140 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1141 mFetcherLooper->unregisterHandler( 1142 mFetcherInfos.valueAt(i).mFetcher->id()); 1143 } 1144 mFetcherInfos.clear(); 1145 1146 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 1147 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 1148 1149 mPacketSources.valueFor( 1150 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 1151 1152 sp<AMessage> response = new AMessage; 1153 response->setInt32("err", OK); 1154 1155 response->postReply(mDisconnectReplyID); 1156 mDisconnectReplyID.clear(); 1157 } 1158 1159 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 1160 ssize_t index = mFetcherInfos.indexOfKey(uri); 1161 1162 if (index >= 0) { 1163 return NULL; 1164 } 1165 1166 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); 1167 notify->setString("uri", uri); 1168 notify->setInt32("switchGeneration", mSwitchGeneration); 1169 1170 FetcherInfo info; 1171 info.mFetcher = new PlaylistFetcher( 1172 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration); 1173 info.mDurationUs = -1ll; 1174 info.mToBeRemoved = false; 1175 info.mToBeResumed = false; 1176 mFetcherLooper->registerHandler(info.mFetcher); 1177 1178 mFetcherInfos.add(uri, info); 1179 1180 return info.mFetcher; 1181 } 1182 1183 #if 0 1184 static double uniformRand() { 1185 return (double)rand() / RAND_MAX; 1186 } 1187 #endif 1188 1189 bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) { 1190 ALOGV("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i, 1191 newUri ? "true" : "false", 1192 newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str()); 1193 return i >= 0 1194 && ((!newUri && uri == mStreams[i].mUri) 1195 || (newUri && uri == mStreams[i].mNewUri)); 1196 } 1197 1198 sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex( 1199 size_t trackIndex, bool newUri) { 1200 StreamType type = indexToType(trackIndex); 1201 sp<AnotherPacketSource> source = NULL; 1202 if (newUri) { 1203 source = mPacketSources2.valueFor(type); 1204 source->clear(); 1205 } else { 1206 source = mPacketSources.valueFor(type); 1207 }; 1208 return source; 1209 } 1210 1211 sp<AnotherPacketSource> LiveSession::getMetadataSource( 1212 sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) { 1213 // todo: One case where the following strategy can fail is when audio and video 1214 // are in separate playlists, both are transport streams, and the metadata 1215 // is actually contained in the audio stream. 1216 ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s", 1217 streamMask, newUri ? "true" : "false"); 1218 1219 if ((sources[kVideoIndex] != NULL) // video fetcher; or ... 1220 || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) { 1221 // ... audio fetcher for audio only variant 1222 return getPacketSourceForStreamIndex(kMetaDataIndex, newUri); 1223 } 1224 1225 return NULL; 1226 } 1227 1228 bool LiveSession::resumeFetcher( 1229 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { 1230 ssize_t index = mFetcherInfos.indexOfKey(uri); 1231 if (index < 0) { 1232 ALOGE("did not find fetcher for uri: %s", uri.c_str()); 1233 return false; 1234 } 1235 1236 bool resume = false; 1237 sp<AnotherPacketSource> sources[kNumSources]; 1238 for (size_t i = 0; i < kMaxStreams; ++i) { 1239 if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) { 1240 resume = true; 1241 sources[i] = getPacketSourceForStreamIndex(i, newUri); 1242 } 1243 } 1244 1245 if (resume) { 1246 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher; 1247 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; 1248 1249 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d", 1250 fetcher->getFetcherID(), (long long)timeUs, seekMode); 1251 1252 fetcher->startAsync( 1253 sources[kAudioIndex], 1254 sources[kVideoIndex], 1255 sources[kSubtitleIndex], 1256 getMetadataSource(sources, streamMask, newUri), 1257 timeUs, -1, -1, seekMode); 1258 } 1259 1260 return resume; 1261 } 1262 1263 float LiveSession::getAbortThreshold( 1264 ssize_t currentBWIndex, ssize_t targetBWIndex) const { 1265 float abortThreshold = -1.0f; 1266 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { 1267 /* 1268 If we're switching down, we need to decide whether to 1269 1270 1) finish last segment of high-bandwidth variant, or 1271 2) abort last segment of high-bandwidth variant, and fetch an 1272 overlapping portion from low-bandwidth variant. 1273 1274 Here we try to maximize the amount of buffer left when the 1275 switch point is met. Given the following parameters: 1276 1277 B: our current buffering level in seconds 1278 T: target duration in seconds 1279 X: sample duration in seconds remain to fetch in last segment 1280 bw0: bandwidth of old variant (as specified in playlist) 1281 bw1: bandwidth of new variant (as specified in playlist) 1282 bw: measured bandwidth available 1283 1284 If we choose 1), when switch happens at the end of current 1285 segment, our buffering will be 1286 B + X - X * bw0 / bw 1287 1288 If we choose 2), when switch happens where we aborted current 1289 segment, our buffering will be 1290 B - (T - X) * bw1 / bw 1291 1292 We should only choose 1) if 1293 X/T < bw1 / (bw1 + bw0 - bw) 1294 */ 1295 1296 // abort old bandwidth immediately if bandwidth is fluctuating a lot. 1297 // our estimate could be far off, and fetching old bandwidth could 1298 // take too long. 1299 if (!mLastBandwidthStable) { 1300 return 0.0f; 1301 } 1302 1303 // Taking the measured current bandwidth at 50% face value only, 1304 // as our bandwidth estimation is a lagging indicator. Being 1305 // conservative on this, we prefer switching to lower bandwidth 1306 // unless we're really confident finishing up the last segment 1307 // of higher bandwidth will be fast. 1308 CHECK(mLastBandwidthBps >= 0); 1309 abortThreshold = 1310 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1311 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1312 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth 1313 - (float)mLastBandwidthBps * 0.5f); 1314 if (abortThreshold < 0.0f) { 1315 abortThreshold = -1.0f; // do not abort 1316 } 1317 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", 1318 mBandwidthItems.itemAt(currentBWIndex).mBandwidth, 1319 mBandwidthItems.itemAt(targetBWIndex).mBandwidth, 1320 mLastBandwidthBps, 1321 abortThreshold); 1322 } 1323 return abortThreshold; 1324 } 1325 1326 void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { 1327 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); 1328 } 1329 1330 ssize_t LiveSession::getLowestValidBandwidthIndex() const { 1331 for (size_t index = 0; index < mBandwidthItems.size(); index++) { 1332 if (isBandwidthValid(mBandwidthItems[index])) { 1333 return index; 1334 } 1335 } 1336 // if playlists are all blacklisted, return 0 and hope it's alive 1337 return 0; 1338 } 1339 1340 size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) { 1341 if (mBandwidthItems.size() < 2) { 1342 // shouldn't be here if we only have 1 bandwidth, check 1343 // logic to get rid of redundant bandwidth polling 1344 ALOGW("getBandwidthIndex() called for single bandwidth playlist!"); 1345 return 0; 1346 } 1347 1348 #if 1 1349 char value[PROPERTY_VALUE_MAX]; 1350 ssize_t index = -1; 1351 if (property_get("media.httplive.bw-index", value, NULL)) { 1352 char *end; 1353 index = strtol(value, &end, 10); 1354 CHECK(end > value && *end == '\0'); 1355 1356 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1357 index = mBandwidthItems.size() - 1; 1358 } 1359 } 1360 1361 if (index < 0) { 1362 char value[PROPERTY_VALUE_MAX]; 1363 if (property_get("media.httplive.max-bw", value, NULL)) { 1364 char *end; 1365 long maxBw = strtoul(value, &end, 10); 1366 if (end > value && *end == '\0') { 1367 if (maxBw > 0 && bandwidthBps > maxBw) { 1368 ALOGV("bandwidth capped to %ld bps", maxBw); 1369 bandwidthBps = maxBw; 1370 } 1371 } 1372 } 1373 1374 // Pick the highest bandwidth stream that's not currently blacklisted 1375 // below or equal to estimated bandwidth. 1376 1377 index = mBandwidthItems.size() - 1; 1378 ssize_t lowestBandwidth = getLowestValidBandwidthIndex(); 1379 while (index > lowestBandwidth) { 1380 // be conservative (70%) to avoid overestimating and immediately 1381 // switching down again. 1382 size_t adjustedBandwidthBps = bandwidthBps * 7 / 10; 1383 const BandwidthItem &item = mBandwidthItems[index]; 1384 if (item.mBandwidth <= adjustedBandwidthBps 1385 && isBandwidthValid(item)) { 1386 break; 1387 } 1388 --index; 1389 } 1390 } 1391 #elif 0 1392 // Change bandwidth at random() 1393 size_t index = uniformRand() * mBandwidthItems.size(); 1394 #elif 0 1395 // There's a 50% chance to stay on the current bandwidth and 1396 // a 50% chance to switch to the next higher bandwidth (wrapping around 1397 // to lowest) 1398 const size_t kMinIndex = 0; 1399 1400 static ssize_t mCurBandwidthIndex = -1; 1401 1402 size_t index; 1403 if (mCurBandwidthIndex < 0) { 1404 index = kMinIndex; 1405 } else if (uniformRand() < 0.5) { 1406 index = (size_t)mCurBandwidthIndex; 1407 } else { 1408 index = mCurBandwidthIndex + 1; 1409 if (index == mBandwidthItems.size()) { 1410 index = kMinIndex; 1411 } 1412 } 1413 mCurBandwidthIndex = index; 1414 #elif 0 1415 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1416 1417 size_t index = mBandwidthItems.size() - 1; 1418 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1419 --index; 1420 } 1421 #elif 1 1422 char value[PROPERTY_VALUE_MAX]; 1423 size_t index; 1424 if (property_get("media.httplive.bw-index", value, NULL)) { 1425 char *end; 1426 index = strtoul(value, &end, 10); 1427 CHECK(end > value && *end == '\0'); 1428 1429 if (index >= mBandwidthItems.size()) { 1430 index = mBandwidthItems.size() - 1; 1431 } 1432 } else { 1433 index = 0; 1434 } 1435 #else 1436 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1437 #endif 1438 1439 CHECK_GE(index, 0); 1440 1441 return index; 1442 } 1443 1444 HLSTime LiveSession::latestMediaSegmentStartTime() const { 1445 HLSTime audioTime(mPacketSources.valueFor( 1446 STREAMTYPE_AUDIO)->getLatestDequeuedMeta()); 1447 1448 HLSTime videoTime(mPacketSources.valueFor( 1449 STREAMTYPE_VIDEO)->getLatestDequeuedMeta()); 1450 1451 return audioTime < videoTime ? videoTime : audioTime; 1452 } 1453 1454 void LiveSession::onSeek(const sp<AMessage> &msg) { 1455 int64_t timeUs; 1456 int32_t mode; 1457 CHECK(msg->findInt64("timeUs", &timeUs)); 1458 CHECK(msg->findInt32("mode", &mode)); 1459 // TODO: add "mode" to changeConfiguration. 1460 changeConfiguration(timeUs/* , (MediaPlayerSeekMode)mode */); 1461 } 1462 1463 status_t LiveSession::getDuration(int64_t *durationUs) const { 1464 int64_t maxDurationUs = -1ll; 1465 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1466 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1467 1468 if (fetcherDurationUs > maxDurationUs) { 1469 maxDurationUs = fetcherDurationUs; 1470 } 1471 } 1472 1473 *durationUs = maxDurationUs; 1474 1475 return OK; 1476 } 1477 1478 bool LiveSession::isSeekable() const { 1479 int64_t durationUs; 1480 return getDuration(&durationUs) == OK && durationUs >= 0; 1481 } 1482 1483 bool LiveSession::hasDynamicDuration() const { 1484 return false; 1485 } 1486 1487 size_t LiveSession::getTrackCount() const { 1488 if (mPlaylist == NULL) { 1489 return 0; 1490 } else { 1491 return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0); 1492 } 1493 } 1494 1495 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1496 if (mPlaylist == NULL) { 1497 return NULL; 1498 } else { 1499 if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) { 1500 sp<AMessage> format = new AMessage(); 1501 format->setInt32("type", MEDIA_TRACK_TYPE_METADATA); 1502 format->setString("language", "und"); 1503 format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3); 1504 return format; 1505 } 1506 return mPlaylist->getTrackInfo(trackIndex); 1507 } 1508 } 1509 1510 status_t LiveSession::selectTrack(size_t index, bool select) { 1511 if (mPlaylist == NULL) { 1512 return INVALID_OPERATION; 1513 } 1514 1515 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++", 1516 index, select, mSubtitleGeneration); 1517 1518 ++mSubtitleGeneration; 1519 status_t err = mPlaylist->selectTrack(index, select); 1520 if (err == OK) { 1521 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); 1522 msg->setInt32("pickTrack", select); 1523 msg->post(); 1524 } 1525 return err; 1526 } 1527 1528 ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1529 if (mPlaylist == NULL) { 1530 return -1; 1531 } else { 1532 return mPlaylist->getSelectedTrack(type); 1533 } 1534 } 1535 1536 void LiveSession::changeConfiguration( 1537 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { 1538 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d", 1539 (long long)timeUs, bandwidthIndex, pickTrack); 1540 1541 cancelBandwidthSwitch(); 1542 1543 CHECK(!mReconfigurationInProgress); 1544 mReconfigurationInProgress = true; 1545 if (bandwidthIndex >= 0) { 1546 mOrigBandwidthIndex = mCurBandwidthIndex; 1547 mCurBandwidthIndex = bandwidthIndex; 1548 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1549 ALOGI("#### Starting Bandwidth Switch: %zd => %zd", 1550 mOrigBandwidthIndex, mCurBandwidthIndex); 1551 } 1552 } 1553 CHECK_LT((size_t)mCurBandwidthIndex, mBandwidthItems.size()); 1554 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); 1555 1556 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1557 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1558 1559 AString URIs[kMaxStreams]; 1560 for (size_t i = 0; i < kMaxStreams; ++i) { 1561 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1562 streamMask |= indexToType(i); 1563 } 1564 } 1565 1566 // Step 1, stop and discard fetchers that are no longer needed. 1567 // Pause those that we'll reuse. 1568 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1569 // skip fetchers that are marked mToBeRemoved, 1570 // these are done and can't be reused 1571 if (mFetcherInfos[i].mToBeRemoved) { 1572 continue; 1573 } 1574 1575 const AString &uri = mFetcherInfos.keyAt(i); 1576 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher; 1577 1578 bool discardFetcher = true, delayRemoval = false; 1579 for (size_t j = 0; j < kMaxStreams; ++j) { 1580 StreamType type = indexToType(j); 1581 if ((streamMask & type) && uri == URIs[j]) { 1582 resumeMask |= type; 1583 streamMask &= ~type; 1584 discardFetcher = false; 1585 } 1586 } 1587 // Delay fetcher removal if not picking tracks, AND old fetcher 1588 // has stream mask that overlaps new variant. (Okay to discard 1589 // old fetcher now, if completely no overlap.) 1590 if (discardFetcher && timeUs < 0ll && !pickTrack 1591 && (fetcher->getStreamTypeMask() & streamMask)) { 1592 discardFetcher = false; 1593 delayRemoval = true; 1594 } 1595 1596 if (discardFetcher) { 1597 ALOGV("discarding fetcher-%d", fetcher->getFetcherID()); 1598 fetcher->stopAsync(); 1599 } else { 1600 float threshold = 0.0f; // default to pause after current block (47Kbytes) 1601 bool disconnect = false; 1602 if (timeUs >= 0ll) { 1603 // seeking, no need to finish fetching 1604 disconnect = true; 1605 } else if (delayRemoval) { 1606 // adapting, abort if remaining of current segment is over threshold 1607 threshold = getAbortThreshold( 1608 mOrigBandwidthIndex, mCurBandwidthIndex); 1609 } 1610 1611 ALOGV("pausing fetcher-%d, threshold=%.2f", 1612 fetcher->getFetcherID(), threshold); 1613 fetcher->pauseAsync(threshold, disconnect); 1614 } 1615 } 1616 1617 sp<AMessage> msg; 1618 if (timeUs < 0ll) { 1619 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1620 msg = new AMessage(kWhatChangeConfiguration3, this); 1621 } else { 1622 msg = new AMessage(kWhatChangeConfiguration2, this); 1623 } 1624 msg->setInt32("streamMask", streamMask); 1625 msg->setInt32("resumeMask", resumeMask); 1626 msg->setInt32("pickTrack", pickTrack); 1627 msg->setInt64("timeUs", timeUs); 1628 for (size_t i = 0; i < kMaxStreams; ++i) { 1629 if ((streamMask | resumeMask) & indexToType(i)) { 1630 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1631 } 1632 } 1633 1634 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1635 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1636 // fetchers have completed their asynchronous operation, we'll post 1637 // mContinuation, which then is handled below in onChangeConfiguration2. 1638 mContinuationCounter = mFetcherInfos.size(); 1639 mContinuation = msg; 1640 1641 if (mContinuationCounter == 0) { 1642 msg->post(); 1643 } 1644 } 1645 1646 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1647 ALOGV("onChangeConfiguration"); 1648 1649 if (!mReconfigurationInProgress) { 1650 int32_t pickTrack = 0; 1651 msg->findInt32("pickTrack", &pickTrack); 1652 changeConfiguration(-1ll /* timeUs */, -1, pickTrack); 1653 } else { 1654 msg->post(1000000ll); // retry in 1 sec 1655 } 1656 } 1657 1658 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1659 ALOGV("onChangeConfiguration2"); 1660 1661 mContinuation.clear(); 1662 1663 // All fetchers are either suspended or have been removed now. 1664 1665 // If we're seeking, clear all packet sources before we report 1666 // seek complete, to prevent decoder from pulling stale data. 1667 int64_t timeUs; 1668 CHECK(msg->findInt64("timeUs", &timeUs)); 1669 1670 if (timeUs >= 0) { 1671 mLastSeekTimeUs = timeUs; 1672 mLastDequeuedTimeUs = timeUs; 1673 1674 for (size_t i = 0; i < mPacketSources.size(); i++) { 1675 sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i); 1676 sp<MetaData> format = packetSource->getFormat(); 1677 packetSource->clear(); 1678 // Set a tentative format here such that HTTPLiveSource will always have 1679 // a format available when NuPlayer queries. Without an available video 1680 // format when setting a surface NuPlayer might disable video decoding 1681 // altogether. The tentative format will be overwritten by the 1682 // authoritative (and possibly same) format once content from the new 1683 // position is dequeued. 1684 packetSource->setFormat(format); 1685 } 1686 1687 for (size_t i = 0; i < kMaxStreams; ++i) { 1688 mStreams[i].reset(); 1689 } 1690 1691 mDiscontinuityOffsetTimesUs.clear(); 1692 mDiscontinuityAbsStartTimesUs.clear(); 1693 1694 if (mSeekReplyID != NULL) { 1695 CHECK(mSeekReply != NULL); 1696 mSeekReply->setInt32("err", OK); 1697 mSeekReply->postReply(mSeekReplyID); 1698 mSeekReplyID.clear(); 1699 mSeekReply.clear(); 1700 } 1701 1702 // restart buffer polling after seek becauese previous 1703 // buffering position is no longer valid. 1704 restartPollBuffering(); 1705 } 1706 1707 uint32_t streamMask, resumeMask; 1708 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1709 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1710 1711 streamMask |= resumeMask; 1712 1713 AString URIs[kMaxStreams]; 1714 for (size_t i = 0; i < kMaxStreams; ++i) { 1715 if (streamMask & indexToType(i)) { 1716 const AString &uriKey = mStreams[i].uriKey(); 1717 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1718 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1719 } 1720 } 1721 1722 uint32_t changedMask = 0; 1723 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1724 // stream URI could change even if onChangeConfiguration2 is only 1725 // used for seek. Seek could happen during a bw switch, in this 1726 // case bw switch will be cancelled, but the seekTo position will 1727 // fetch from the new URI. 1728 if ((mStreamMask & streamMask & indexToType(i)) 1729 && !mStreams[i].mUri.empty() 1730 && !(URIs[i] == mStreams[i].mUri)) { 1731 ALOGV("stream %zu changed: oldURI %s, newURI %s", i, 1732 mStreams[i].mUri.c_str(), URIs[i].c_str()); 1733 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); 1734 if (source->getLatestDequeuedMeta() != NULL) { 1735 source->queueDiscontinuity( 1736 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1737 } 1738 } 1739 // Determine which decoders to shutdown on the player side, 1740 // a decoder has to be shutdown if its streamtype was active 1741 // before but now longer isn't. 1742 if ((mStreamMask & ~streamMask & indexToType(i))) { 1743 changedMask |= indexToType(i); 1744 } 1745 } 1746 1747 if (changedMask == 0) { 1748 // If nothing changed as far as the audio/video decoders 1749 // are concerned we can proceed. 1750 onChangeConfiguration3(msg); 1751 return; 1752 } 1753 1754 // Something changed, inform the player which will shutdown the 1755 // corresponding decoders and will post the reply once that's done. 1756 // Handling the reply will continue executing below in 1757 // onChangeConfiguration3. 1758 sp<AMessage> notify = mNotify->dup(); 1759 notify->setInt32("what", kWhatStreamsChanged); 1760 notify->setInt32("changedMask", changedMask); 1761 1762 msg->setWhat(kWhatChangeConfiguration3); 1763 msg->setTarget(this); 1764 1765 notify->setMessage("reply", msg); 1766 notify->post(); 1767 } 1768 1769 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1770 mContinuation.clear(); 1771 // All remaining fetchers are still suspended, the player has shutdown 1772 // any decoders that needed it. 1773 1774 uint32_t streamMask, resumeMask; 1775 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1776 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1777 1778 mNewStreamMask = streamMask | resumeMask; 1779 1780 int64_t timeUs; 1781 int32_t pickTrack; 1782 bool switching = false; 1783 CHECK(msg->findInt64("timeUs", &timeUs)); 1784 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1785 1786 if (timeUs < 0ll) { 1787 if (!pickTrack) { 1788 // mSwapMask contains streams that are in both old and new variant, 1789 // (in mNewStreamMask & mStreamMask) but with different URIs 1790 // (not in resumeMask). 1791 // For example, old variant has video and audio in two separate 1792 // URIs, and new variant has only audio with unchanged URI. mSwapMask 1793 // should be 0 as there is nothing to swap. We only need to stop video, 1794 // and resume audio. 1795 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask; 1796 switching = (mSwapMask != 0); 1797 } 1798 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1799 } else { 1800 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1801 } 1802 1803 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, " 1804 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x", 1805 (long long)timeUs, switching, pickTrack, 1806 mStreamMask, mNewStreamMask, mSwapMask); 1807 1808 for (size_t i = 0; i < kMaxStreams; ++i) { 1809 if (streamMask & indexToType(i)) { 1810 if (switching) { 1811 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1812 } else { 1813 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1814 } 1815 } 1816 } 1817 1818 // Of all existing fetchers: 1819 // * Resume fetchers that are still needed and assign them original packet sources. 1820 // * Mark otherwise unneeded fetchers for removal. 1821 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1822 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1823 const AString &uri = mFetcherInfos.keyAt(i); 1824 if (!resumeFetcher(uri, resumeMask, timeUs)) { 1825 ALOGV("marking fetcher-%d to be removed", 1826 mFetcherInfos[i].mFetcher->getFetcherID()); 1827 1828 mFetcherInfos.editValueAt(i).mToBeRemoved = true; 1829 } 1830 } 1831 1832 // streamMask now only contains the types that need a new fetcher created. 1833 if (streamMask != 0) { 1834 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1835 } 1836 1837 // Find out when the original fetchers have buffered up to and start the new fetchers 1838 // at a later timestamp. 1839 for (size_t i = 0; i < kMaxStreams; i++) { 1840 if (!(indexToType(i) & streamMask)) { 1841 continue; 1842 } 1843 1844 AString uri; 1845 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1846 1847 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1848 CHECK(fetcher != NULL); 1849 1850 HLSTime startTime; 1851 SeekMode seekMode = kSeekModeExactPosition; 1852 sp<AnotherPacketSource> sources[kNumSources]; 1853 1854 if (i == kSubtitleIndex || (!pickTrack && !switching)) { 1855 startTime = latestMediaSegmentStartTime(); 1856 } 1857 1858 // TRICKY: looping from i as earlier streams are already removed from streamMask 1859 for (size_t j = i; j < kMaxStreams; ++j) { 1860 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1861 if ((streamMask & indexToType(j)) && uri == streamUri) { 1862 sources[j] = mPacketSources.valueFor(indexToType(j)); 1863 1864 if (timeUs >= 0) { 1865 startTime.mTimeUs = timeUs; 1866 } else { 1867 int32_t type; 1868 sp<AMessage> meta; 1869 if (!switching) { 1870 // selecting, or adapting but no swap required 1871 meta = sources[j]->getLatestDequeuedMeta(); 1872 } else { 1873 // adapting and swap required 1874 meta = sources[j]->getLatestEnqueuedMeta(); 1875 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { 1876 // switching up 1877 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); 1878 } 1879 } 1880 1881 if ((j == kAudioIndex || j == kVideoIndex) 1882 && meta != NULL && !meta->findInt32("discontinuity", &type)) { 1883 HLSTime tmpTime(meta); 1884 if (startTime < tmpTime) { 1885 startTime = tmpTime; 1886 } 1887 } 1888 1889 if (!switching) { 1890 // selecting, or adapting but no swap required 1891 sources[j]->clear(); 1892 if (j == kSubtitleIndex) { 1893 break; 1894 } 1895 1896 ALOGV("stream[%zu]: queue format change", j); 1897 sources[j]->queueDiscontinuity( 1898 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); 1899 } else { 1900 // switching, queue discontinuities after resume 1901 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1902 sources[j]->clear(); 1903 // the new fetcher might be providing streams that used to be 1904 // provided by two different fetchers, if one of the fetcher 1905 // paused in the middle while the other somehow paused in next 1906 // seg, we have to start from next seg. 1907 if (seekMode < mStreams[j].mSeekMode) { 1908 seekMode = mStreams[j].mSeekMode; 1909 } 1910 } 1911 } 1912 1913 streamMask &= ~indexToType(j); 1914 } 1915 } 1916 1917 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld " 1918 "segmentStartTimeUs %lld seekMode %d", 1919 fetcher->getFetcherID(), 1920 (long long)startTime.mTimeUs, 1921 (long long)mLastSeekTimeUs, 1922 (long long)startTime.getSegmentTimeUs(), 1923 seekMode); 1924 1925 // Set the target segment start time to the middle point of the 1926 // segment where the last sample was. 1927 // This gives a better guess if segments of the two variants are not 1928 // perfectly aligned. (If the corresponding segment in new variant 1929 // starts slightly later than that in the old variant, we still want 1930 // to pick that segment, not the one before) 1931 fetcher->startAsync( 1932 sources[kAudioIndex], 1933 sources[kVideoIndex], 1934 sources[kSubtitleIndex], 1935 getMetadataSource(sources, mNewStreamMask, switching), 1936 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, 1937 startTime.getSegmentTimeUs(), 1938 startTime.mSeq, 1939 seekMode); 1940 } 1941 1942 // All fetchers have now been started, the configuration change 1943 // has completed. 1944 1945 mReconfigurationInProgress = false; 1946 if (switching) { 1947 mSwitchInProgress = true; 1948 } else { 1949 mStreamMask = mNewStreamMask; 1950 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1951 ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd", 1952 mOrigBandwidthIndex, mCurBandwidthIndex); 1953 mOrigBandwidthIndex = mCurBandwidthIndex; 1954 } 1955 } 1956 1957 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x", 1958 mSwitchInProgress, mStreamMask); 1959 1960 if (mDisconnectReplyID != NULL) { 1961 finishDisconnect(); 1962 } 1963 } 1964 1965 void LiveSession::swapPacketSource(StreamType stream) { 1966 ALOGV("[%s] swapPacketSource", getNameForStream(stream)); 1967 1968 // transfer packets from source2 to source 1969 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 1970 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 1971 1972 // queue discontinuity in mPacketSource 1973 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); 1974 1975 // queue packets in mPacketSource2 to mPacketSource 1976 status_t finalResult = OK; 1977 sp<ABuffer> accessUnit; 1978 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && 1979 OK == aps2->dequeueAccessUnit(&accessUnit)) { 1980 aps->queueAccessUnit(accessUnit); 1981 } 1982 aps2->clear(); 1983 } 1984 1985 void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { 1986 if (!mSwitchInProgress) { 1987 return; 1988 } 1989 1990 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 1991 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { 1992 return; 1993 } 1994 1995 // Swap packet source of streams provided by old variant 1996 for (size_t idx = 0; idx < kMaxStreams; idx++) { 1997 StreamType stream = indexToType(idx); 1998 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { 1999 swapPacketSource(stream); 2000 2001 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 2002 ALOGW("swapping stream type %d %s to empty stream", 2003 stream, mStreams[idx].mUri.c_str()); 2004 } 2005 mStreams[idx].mUri = mStreams[idx].mNewUri; 2006 mStreams[idx].mNewUri.clear(); 2007 2008 mSwapMask &= ~stream; 2009 } 2010 } 2011 2012 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); 2013 2014 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask); 2015 if (mSwapMask != 0) { 2016 return; 2017 } 2018 2019 // Check if new variant contains extra streams. 2020 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 2021 while (extraStreams) { 2022 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); 2023 extraStreams &= ~stream; 2024 2025 swapPacketSource(stream); 2026 2027 ssize_t idx = typeToIndex(stream); 2028 CHECK(idx >= 0); 2029 if (mStreams[idx].mNewUri.empty()) { 2030 ALOGW("swapping extra stream type %d %s to empty stream", 2031 stream, mStreams[idx].mUri.c_str()); 2032 } 2033 mStreams[idx].mUri = mStreams[idx].mNewUri; 2034 mStreams[idx].mNewUri.clear(); 2035 } 2036 2037 // Restart new fetcher (it was paused after the first 47k block) 2038 // and let it fetch into mPacketSources (not mPacketSources2) 2039 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2040 FetcherInfo &info = mFetcherInfos.editValueAt(i); 2041 if (info.mToBeResumed) { 2042 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); 2043 info.mToBeResumed = false; 2044 } 2045 } 2046 2047 ALOGI("#### Finished Bandwidth Switch: %zd => %zd", 2048 mOrigBandwidthIndex, mCurBandwidthIndex); 2049 2050 mStreamMask = mNewStreamMask; 2051 mSwitchInProgress = false; 2052 mOrigBandwidthIndex = mCurBandwidthIndex; 2053 2054 restartPollBuffering(); 2055 } 2056 2057 void LiveSession::schedulePollBuffering() { 2058 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); 2059 msg->setInt32("generation", mPollBufferingGeneration); 2060 msg->post(1000000ll); 2061 } 2062 2063 void LiveSession::cancelPollBuffering() { 2064 ++mPollBufferingGeneration; 2065 mPrevBufferPercentage = -1; 2066 } 2067 2068 void LiveSession::restartPollBuffering() { 2069 cancelPollBuffering(); 2070 onPollBuffering(); 2071 } 2072 2073 void LiveSession::onPollBuffering() { 2074 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " 2075 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", 2076 mSwitchInProgress, mReconfigurationInProgress, 2077 mInPreparationPhase, mCurBandwidthIndex, mStreamMask); 2078 2079 bool underflow, ready, down, up; 2080 if (checkBuffering(underflow, ready, down, up)) { 2081 if (mInPreparationPhase) { 2082 // Allow down switch even if we're still preparing. 2083 // 2084 // Some streams have a high bandwidth index as default, 2085 // when bandwidth is low, it takes a long time to buffer 2086 // to ready mark, then it immediately pauses after start 2087 // as we have to do a down switch. It's better experience 2088 // to restart from a lower index, if we detect low bw. 2089 if (!switchBandwidthIfNeeded(false /* up */, down) && ready) { 2090 postPrepared(OK); 2091 } 2092 } 2093 2094 if (!mInPreparationPhase) { 2095 if (ready) { 2096 stopBufferingIfNecessary(); 2097 } else if (underflow) { 2098 startBufferingIfNecessary(); 2099 } 2100 switchBandwidthIfNeeded(up, down); 2101 } 2102 } 2103 2104 schedulePollBuffering(); 2105 } 2106 2107 void LiveSession::cancelBandwidthSwitch(bool resume) { 2108 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", 2109 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); 2110 if (!mSwitchInProgress) { 2111 return; 2112 } 2113 2114 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2115 FetcherInfo& info = mFetcherInfos.editValueAt(i); 2116 if (info.mToBeRemoved) { 2117 info.mToBeRemoved = false; 2118 if (resume) { 2119 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); 2120 } 2121 } 2122 } 2123 2124 for (size_t i = 0; i < kMaxStreams; ++i) { 2125 AString newUri = mStreams[i].mNewUri; 2126 if (!newUri.empty()) { 2127 // clear all mNewUri matching this newUri 2128 for (size_t j = i; j < kMaxStreams; ++j) { 2129 if (mStreams[j].mNewUri == newUri) { 2130 mStreams[j].mNewUri.clear(); 2131 } 2132 } 2133 ALOGV("stopping newUri = %s", newUri.c_str()); 2134 ssize_t index = mFetcherInfos.indexOfKey(newUri); 2135 if (index < 0) { 2136 ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); 2137 continue; 2138 } 2139 FetcherInfo &info = mFetcherInfos.editValueAt(index); 2140 info.mToBeRemoved = true; 2141 info.mFetcher->stopAsync(); 2142 } 2143 } 2144 2145 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", 2146 mOrigBandwidthIndex, mCurBandwidthIndex); 2147 2148 mSwitchGeneration++; 2149 mSwitchInProgress = false; 2150 mCurBandwidthIndex = mOrigBandwidthIndex; 2151 mSwapMask = 0; 2152 } 2153 2154 bool LiveSession::checkBuffering( 2155 bool &underflow, bool &ready, bool &down, bool &up) { 2156 underflow = ready = down = up = false; 2157 2158 if (mReconfigurationInProgress) { 2159 ALOGV("Switch/Reconfig in progress, defer buffer polling"); 2160 return false; 2161 } 2162 2163 size_t activeCount, underflowCount, readyCount, downCount, upCount; 2164 activeCount = underflowCount = readyCount = downCount = upCount =0; 2165 int32_t minBufferPercent = -1; 2166 int64_t durationUs; 2167 if (getDuration(&durationUs) != OK) { 2168 durationUs = -1; 2169 } 2170 for (size_t i = 0; i < mPacketSources.size(); ++i) { 2171 // we don't check subtitles for buffering level 2172 if (!(mStreamMask & mPacketSources.keyAt(i) 2173 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { 2174 continue; 2175 } 2176 // ignore streams that never had any packet queued. 2177 // (it's possible that the variant only has audio or video) 2178 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 2179 if (meta == NULL) { 2180 continue; 2181 } 2182 2183 status_t finalResult; 2184 int64_t bufferedDurationUs = 2185 mPacketSources[i]->getBufferedDurationUs(&finalResult); 2186 ALOGV("[%s] buffered %lld us", 2187 getNameForStream(mPacketSources.keyAt(i)), 2188 (long long)bufferedDurationUs); 2189 if (durationUs >= 0) { 2190 int32_t percent; 2191 if (mPacketSources[i]->isFinished(0 /* duration */)) { 2192 percent = 100; 2193 } else { 2194 percent = (int32_t)(100.0 * 2195 (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); 2196 } 2197 if (minBufferPercent < 0 || percent < minBufferPercent) { 2198 minBufferPercent = percent; 2199 } 2200 } 2201 2202 ++activeCount; 2203 int64_t readyMarkUs = 2204 (mInPreparationPhase ? 2205 mBufferingSettings.mInitialWatermarkMs : 2206 mBufferingSettings.mRebufferingWatermarkHighMs) * 1000ll; 2207 if (bufferedDurationUs > readyMarkUs 2208 || mPacketSources[i]->isFinished(0)) { 2209 ++readyCount; 2210 } 2211 if (!mPacketSources[i]->isFinished(0)) { 2212 if (bufferedDurationUs < mBufferingSettings.mRebufferingWatermarkLowMs * 1000ll) { 2213 ++underflowCount; 2214 } 2215 if (bufferedDurationUs > mUpSwitchMark) { 2216 ++upCount; 2217 } 2218 if (bufferedDurationUs < mDownSwitchMark) { 2219 ++downCount; 2220 } 2221 } 2222 } 2223 2224 if (minBufferPercent >= 0) { 2225 notifyBufferingUpdate(minBufferPercent); 2226 } 2227 2228 if (activeCount > 0) { 2229 up = (upCount == activeCount); 2230 down = (downCount > 0); 2231 ready = (readyCount == activeCount); 2232 underflow = (underflowCount > 0); 2233 return true; 2234 } 2235 2236 return false; 2237 } 2238 2239 void LiveSession::startBufferingIfNecessary() { 2240 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2241 mInPreparationPhase, mBuffering); 2242 if (!mBuffering) { 2243 mBuffering = true; 2244 2245 sp<AMessage> notify = mNotify->dup(); 2246 notify->setInt32("what", kWhatBufferingStart); 2247 notify->post(); 2248 } 2249 } 2250 2251 void LiveSession::stopBufferingIfNecessary() { 2252 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2253 mInPreparationPhase, mBuffering); 2254 2255 if (mBuffering) { 2256 mBuffering = false; 2257 2258 sp<AMessage> notify = mNotify->dup(); 2259 notify->setInt32("what", kWhatBufferingEnd); 2260 notify->post(); 2261 } 2262 } 2263 2264 void LiveSession::notifyBufferingUpdate(int32_t percentage) { 2265 if (percentage < mPrevBufferPercentage) { 2266 percentage = mPrevBufferPercentage; 2267 } else if (percentage > 100) { 2268 percentage = 100; 2269 } 2270 2271 mPrevBufferPercentage = percentage; 2272 2273 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); 2274 2275 sp<AMessage> notify = mNotify->dup(); 2276 notify->setInt32("what", kWhatBufferingUpdate); 2277 notify->setInt32("percentage", percentage); 2278 notify->post(); 2279 } 2280 2281 bool LiveSession::tryBandwidthFallback() { 2282 if (mInPreparationPhase || mReconfigurationInProgress) { 2283 // Don't try fallback during prepare or reconfig. 2284 // If error happens there, it's likely unrecoverable. 2285 return false; 2286 } 2287 if (mCurBandwidthIndex > mOrigBandwidthIndex) { 2288 // if we're switching up, simply cancel and resume old variant 2289 cancelBandwidthSwitch(true /* resume */); 2290 return true; 2291 } else { 2292 // if we're switching down, we're likely about to underflow (if 2293 // not already underflowing). try the lowest viable bandwidth if 2294 // not on that variant already. 2295 ssize_t lowestValid = getLowestValidBandwidthIndex(); 2296 if (mCurBandwidthIndex > lowestValid) { 2297 cancelBandwidthSwitch(); 2298 changeConfiguration(-1ll, lowestValid); 2299 return true; 2300 } 2301 } 2302 // return false if we couldn't find any fallback 2303 return false; 2304 } 2305 2306 /* 2307 * returns true if a bandwidth switch is actually needed (and started), 2308 * returns false otherwise 2309 */ 2310 bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { 2311 // no need to check bandwidth if we only have 1 bandwidth settings 2312 if (mBandwidthItems.size() < 2) { 2313 return false; 2314 } 2315 2316 if (mSwitchInProgress) { 2317 if (mBuffering) { 2318 tryBandwidthFallback(); 2319 } 2320 return false; 2321 } 2322 2323 int32_t bandwidthBps, shortTermBps; 2324 bool isStable; 2325 if (mBandwidthEstimator->estimateBandwidth( 2326 &bandwidthBps, &isStable, &shortTermBps)) { 2327 ALOGV("bandwidth estimated at %.2f kbps, " 2328 "stable %d, shortTermBps %.2f kbps", 2329 bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f); 2330 mLastBandwidthBps = bandwidthBps; 2331 mLastBandwidthStable = isStable; 2332 } else { 2333 ALOGV("no bandwidth estimate."); 2334 return false; 2335 } 2336 2337 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth; 2338 // canSwithDown and canSwitchUp can't both be true. 2339 // we only want to switch up when measured bw is 120% higher than current variant, 2340 // and we only want to switch down when measured bw is below current variant. 2341 bool canSwitchDown = bufferLow 2342 && (bandwidthBps < (int32_t)curBandwidth); 2343 bool canSwitchUp = bufferHigh 2344 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10); 2345 2346 if (canSwitchDown || canSwitchUp) { 2347 // bandwidth estimating has some delay, if we have to downswitch when 2348 // it hasn't stabilized, use the short term to guess real bandwidth, 2349 // since it may be dropping too fast. 2350 // (note this doesn't apply to upswitch, always use longer average there) 2351 if (!isStable && canSwitchDown) { 2352 if (shortTermBps < bandwidthBps) { 2353 bandwidthBps = shortTermBps; 2354 } 2355 } 2356 2357 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps); 2358 2359 // it's possible that we're checking for canSwitchUp case, but the returned 2360 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70% 2361 // of measured bw. In that case we don't want to do anything, since we have 2362 // both enough buffer and enough bw. 2363 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) 2364 || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) { 2365 // if not yet prepared, just restart again with new bw index. 2366 // this is faster and playback experience is cleaner. 2367 changeConfiguration( 2368 mInPreparationPhase ? 0 : -1ll, bandwidthIndex); 2369 return true; 2370 } 2371 } 2372 return false; 2373 } 2374 2375 void LiveSession::postError(status_t err) { 2376 // if we reached EOS, notify buffering of 100% 2377 if (err == ERROR_END_OF_STREAM) { 2378 notifyBufferingUpdate(100); 2379 } 2380 // we'll stop buffer polling now, before that notify 2381 // stop buffering to stop the spinning icon 2382 stopBufferingIfNecessary(); 2383 cancelPollBuffering(); 2384 2385 sp<AMessage> notify = mNotify->dup(); 2386 notify->setInt32("what", kWhatError); 2387 notify->setInt32("err", err); 2388 notify->post(); 2389 } 2390 2391 void LiveSession::postPrepared(status_t err) { 2392 CHECK(mInPreparationPhase); 2393 2394 sp<AMessage> notify = mNotify->dup(); 2395 if (err == OK || err == ERROR_END_OF_STREAM) { 2396 notify->setInt32("what", kWhatPrepared); 2397 } else { 2398 cancelPollBuffering(); 2399 2400 notify->setInt32("what", kWhatPreparationFailed); 2401 notify->setInt32("err", err); 2402 } 2403 2404 notify->post(); 2405 2406 mInPreparationPhase = false; 2407 } 2408 2409 2410 } // namespace android 2411 2412