1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "LiveSession" 19 #include <utils/Log.h> 20 21 #include "LiveSession.h" 22 #include "HTTPDownloader.h" 23 #include "M3UParser.h" 24 #include "PlaylistFetcher.h" 25 26 #include "mpeg2ts/AnotherPacketSource.h" 27 28 #include <cutils/properties.h> 29 #include <media/IMediaHTTPService.h> 30 #include <media/stagefright/foundation/ABuffer.h> 31 #include <media/stagefright/foundation/ADebug.h> 32 #include <media/stagefright/foundation/AMessage.h> 33 #include <media/stagefright/foundation/AUtils.h> 34 #include <media/stagefright/MediaDefs.h> 35 #include <media/stagefright/MetaData.h> 36 #include <media/stagefright/Utils.h> 37 38 #include <utils/Mutex.h> 39 40 #include <ctype.h> 41 #include <inttypes.h> 42 43 namespace android { 44 45 // static 46 // Bandwidth Switch Mark Defaults 47 const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll; 48 const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll; 49 const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll; 50 const int64_t LiveSession::kResumeThresholdUs = 100000ll; 51 52 // Buffer Prepare/Ready/Underflow Marks 53 const int64_t LiveSession::kReadyMarkUs = 5000000ll; 54 const int64_t LiveSession::kPrepareMarkUs = 1500000ll; 55 const int64_t LiveSession::kUnderflowMarkUs = 1000000ll; 56 57 struct LiveSession::BandwidthEstimator : public RefBase { 58 BandwidthEstimator(); 59 60 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); 61 bool estimateBandwidth( 62 int32_t *bandwidth, 63 bool *isStable = NULL, 64 int32_t *shortTermBps = NULL); 65 66 private: 67 // Bandwidth estimation parameters 68 static const int32_t kShortTermBandwidthItems = 3; 69 static const int32_t kMinBandwidthHistoryItems = 20; 70 static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec 71 static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec 72 static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec 73 74 struct BandwidthEntry { 75 int64_t mTimestampUs; 76 int64_t mDelayUs; 77 size_t mNumBytes; 78 }; 79 80 Mutex mLock; 81 List<BandwidthEntry> mBandwidthHistory; 82 List<int32_t> mPrevEstimates; 83 int32_t mShortTermEstimate; 84 bool mHasNewSample; 85 bool mIsStable; 86 int64_t mTotalTransferTimeUs; 87 size_t mTotalTransferBytes; 88 89 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator); 90 }; 91 92 LiveSession::BandwidthEstimator::BandwidthEstimator() : 93 mShortTermEstimate(0), 94 mHasNewSample(false), 95 mIsStable(true), 96 mTotalTransferTimeUs(0), 97 mTotalTransferBytes(0) { 98 } 99 100 void LiveSession::BandwidthEstimator::addBandwidthMeasurement( 101 size_t numBytes, int64_t delayUs) { 102 AutoMutex autoLock(mLock); 103 104 int64_t nowUs = ALooper::GetNowUs(); 105 BandwidthEntry entry; 106 entry.mTimestampUs = nowUs; 107 entry.mDelayUs = delayUs; 108 entry.mNumBytes = numBytes; 109 mTotalTransferTimeUs += delayUs; 110 mTotalTransferBytes += numBytes; 111 mBandwidthHistory.push_back(entry); 112 mHasNewSample = true; 113 114 // Remove no more than 10% of total transfer time at a time 115 // to avoid sudden jump on bandwidth estimation. There might 116 // be long blocking reads that takes up signification time, 117 // we have to keep a longer window in that case. 118 int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10; 119 if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) { 120 bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs; 121 } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) { 122 bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs; 123 } 124 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples, 125 // and total transfer time at least kMaxBandwidthHistoryWindowUs. 126 while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) { 127 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 128 // remove sample if either absolute age or total transfer time is 129 // over kMaxBandwidthHistoryWindowUs 130 if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs && 131 mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) { 132 break; 133 } 134 mTotalTransferTimeUs -= it->mDelayUs; 135 mTotalTransferBytes -= it->mNumBytes; 136 mBandwidthHistory.erase(mBandwidthHistory.begin()); 137 } 138 } 139 140 bool LiveSession::BandwidthEstimator::estimateBandwidth( 141 int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) { 142 AutoMutex autoLock(mLock); 143 144 if (mBandwidthHistory.size() < 2) { 145 return false; 146 } 147 148 if (!mHasNewSample) { 149 *bandwidthBps = *(--mPrevEstimates.end()); 150 if (isStable) { 151 *isStable = mIsStable; 152 } 153 if (shortTermBps) { 154 *shortTermBps = mShortTermEstimate; 155 } 156 return true; 157 } 158 159 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs); 160 mPrevEstimates.push_back(*bandwidthBps); 161 while (mPrevEstimates.size() > 3) { 162 mPrevEstimates.erase(mPrevEstimates.begin()); 163 } 164 mHasNewSample = false; 165 166 int64_t totalTimeUs = 0; 167 size_t totalBytes = 0; 168 if (mBandwidthHistory.size() >= kShortTermBandwidthItems) { 169 List<BandwidthEntry>::iterator it = --mBandwidthHistory.end(); 170 for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) { 171 totalTimeUs += it->mDelayUs; 172 totalBytes += it->mNumBytes; 173 } 174 } 175 mShortTermEstimate = totalTimeUs > 0 ? 176 (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps; 177 if (shortTermBps) { 178 *shortTermBps = mShortTermEstimate; 179 } 180 181 int32_t minEstimate = -1, maxEstimate = -1; 182 List<int32_t>::iterator it; 183 for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) { 184 int32_t estimate = *it; 185 if (minEstimate < 0 || minEstimate > estimate) { 186 minEstimate = estimate; 187 } 188 if (maxEstimate < 0 || maxEstimate < estimate) { 189 maxEstimate = estimate; 190 } 191 } 192 // consider it stable if long-term average is not jumping a lot 193 // and short-term average is not much lower than long-term average 194 mIsStable = (maxEstimate <= minEstimate * 4 / 3) 195 && mShortTermEstimate > minEstimate * 7 / 10; 196 if (isStable) { 197 *isStable = mIsStable; 198 } 199 200 #if 0 201 { 202 char dumpStr[1024] = {0}; 203 size_t itemIdx = 0; 204 size_t histSize = mBandwidthHistory.size(); 205 sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {", 206 *bandwidthBps, mIsStable, histSize); 207 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 208 for (; it != mBandwidthHistory.end(); ++it) { 209 if (itemIdx > 50) { 210 sprintf(dumpStr + strlen(dumpStr), 211 "...(%zd more items)... }", histSize - itemIdx); 212 break; 213 } 214 sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s", 215 it->mNumBytes / 1024, 216 (double)it->mDelayUs * 1.0e-6, 217 (it == (--mBandwidthHistory.end())) ? "}" : ", "); 218 itemIdx++; 219 } 220 ALOGE(dumpStr); 221 } 222 #endif 223 return true; 224 } 225 226 //static 227 const char *LiveSession::getKeyForStream(StreamType type) { 228 switch (type) { 229 case STREAMTYPE_VIDEO: 230 return "timeUsVideo"; 231 case STREAMTYPE_AUDIO: 232 return "timeUsAudio"; 233 case STREAMTYPE_SUBTITLES: 234 return "timeUsSubtitle"; 235 case STREAMTYPE_METADATA: 236 return "timeUsMetadata"; // unused 237 default: 238 TRESPASS(); 239 } 240 return NULL; 241 } 242 243 //static 244 const char *LiveSession::getNameForStream(StreamType type) { 245 switch (type) { 246 case STREAMTYPE_VIDEO: 247 return "video"; 248 case STREAMTYPE_AUDIO: 249 return "audio"; 250 case STREAMTYPE_SUBTITLES: 251 return "subs"; 252 case STREAMTYPE_METADATA: 253 return "metadata"; 254 default: 255 break; 256 } 257 return "unknown"; 258 } 259 260 //static 261 ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) { 262 switch (type) { 263 case STREAMTYPE_VIDEO: 264 return ATSParser::VIDEO; 265 case STREAMTYPE_AUDIO: 266 return ATSParser::AUDIO; 267 case STREAMTYPE_METADATA: 268 return ATSParser::META; 269 case STREAMTYPE_SUBTITLES: 270 default: 271 TRESPASS(); 272 } 273 return ATSParser::NUM_SOURCE_TYPES; // should not reach here 274 } 275 276 LiveSession::LiveSession( 277 const sp<AMessage> ¬ify, uint32_t flags, 278 const sp<IMediaHTTPService> &httpService) 279 : mNotify(notify), 280 mFlags(flags), 281 mHTTPService(httpService), 282 mBuffering(false), 283 mInPreparationPhase(true), 284 mPollBufferingGeneration(0), 285 mPrevBufferPercentage(-1), 286 mCurBandwidthIndex(-1), 287 mOrigBandwidthIndex(-1), 288 mLastBandwidthBps(-1ll), 289 mLastBandwidthStable(false), 290 mBandwidthEstimator(new BandwidthEstimator()), 291 mMaxWidth(720), 292 mMaxHeight(480), 293 mStreamMask(0), 294 mNewStreamMask(0), 295 mSwapMask(0), 296 mSwitchGeneration(0), 297 mSubtitleGeneration(0), 298 mLastDequeuedTimeUs(0ll), 299 mRealTimeBaseUs(0ll), 300 mReconfigurationInProgress(false), 301 mSwitchInProgress(false), 302 mUpSwitchMark(kUpSwitchMarkUs), 303 mDownSwitchMark(kDownSwitchMarkUs), 304 mUpSwitchMargin(kUpSwitchMarginUs), 305 mFirstTimeUsValid(false), 306 mFirstTimeUs(0), 307 mLastSeekTimeUs(0), 308 mHasMetadata(false) { 309 mStreams[kAudioIndex] = StreamItem("audio"); 310 mStreams[kVideoIndex] = StreamItem("video"); 311 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 312 313 for (size_t i = 0; i < kNumSources; ++i) { 314 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 315 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 316 } 317 } 318 319 LiveSession::~LiveSession() { 320 if (mFetcherLooper != NULL) { 321 mFetcherLooper->stop(); 322 } 323 } 324 325 int64_t LiveSession::calculateMediaTimeUs( 326 int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) { 327 if (timeUs >= firstTimeUs) { 328 timeUs -= firstTimeUs; 329 } else { 330 timeUs = 0; 331 } 332 timeUs += mLastSeekTimeUs; 333 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 334 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 335 } 336 return timeUs; 337 } 338 339 status_t LiveSession::dequeueAccessUnit( 340 StreamType stream, sp<ABuffer> *accessUnit) { 341 status_t finalResult = OK; 342 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 343 344 ssize_t streamIdx = typeToIndex(stream); 345 if (streamIdx < 0) { 346 return BAD_VALUE; 347 } 348 const char *streamStr = getNameForStream(stream); 349 // Do not let client pull data if we don't have data packets yet. 350 // We might only have a format discontinuity queued without data. 351 // When NuPlayerDecoder dequeues the format discontinuity, it will 352 // immediately try to getFormat. If we return NULL, NuPlayerDecoder 353 // thinks it can do seamless change, so will not shutdown decoder. 354 // When the actual format arrives, it can't handle it and get stuck. 355 if (!packetSource->hasDataBufferAvailable(&finalResult)) { 356 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)", 357 streamStr, finalResult); 358 359 if (finalResult == OK) { 360 return -EAGAIN; 361 } else { 362 return finalResult; 363 } 364 } 365 366 // Let the client dequeue as long as we have buffers available 367 // Do not make pause/resume decisions here. 368 369 status_t err = packetSource->dequeueAccessUnit(accessUnit); 370 371 if (err == INFO_DISCONTINUITY) { 372 // adaptive streaming, discontinuities in the playlist 373 int32_t type; 374 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 375 376 sp<AMessage> extra; 377 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 378 extra.clear(); 379 } 380 381 ALOGI("[%s] read discontinuity of type %d, extra = %s", 382 streamStr, 383 type, 384 extra == NULL ? "NULL" : extra->debugString().c_str()); 385 } else if (err == OK) { 386 387 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 388 int64_t timeUs, originalTimeUs; 389 int32_t discontinuitySeq = 0; 390 StreamItem& strm = mStreams[streamIdx]; 391 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 392 originalTimeUs = timeUs; 393 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 394 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { 395 int64_t offsetTimeUs; 396 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 397 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); 398 } else { 399 offsetTimeUs = 0; 400 } 401 402 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0 403 && strm.mLastDequeuedTimeUs >= 0) { 404 int64_t firstTimeUs; 405 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 406 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 407 offsetTimeUs += strm.mLastSampleDurationUs; 408 } else { 409 offsetTimeUs += strm.mLastSampleDurationUs; 410 } 411 412 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); 413 strm.mCurDiscontinuitySeq = discontinuitySeq; 414 } 415 416 int32_t discard = 0; 417 int64_t firstTimeUs; 418 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 419 int64_t durUs; // approximate sample duration 420 if (timeUs > strm.mLastDequeuedTimeUs) { 421 durUs = timeUs - strm.mLastDequeuedTimeUs; 422 } else { 423 durUs = strm.mLastDequeuedTimeUs - timeUs; 424 } 425 strm.mLastSampleDurationUs = durUs; 426 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 427 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 428 firstTimeUs = timeUs; 429 } else { 430 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 431 firstTimeUs = timeUs; 432 } 433 434 strm.mLastDequeuedTimeUs = timeUs; 435 timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq); 436 437 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us", 438 streamStr, (long long)timeUs, (long long)originalTimeUs); 439 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 440 mLastDequeuedTimeUs = timeUs; 441 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 442 } else if (stream == STREAMTYPE_SUBTITLES) { 443 int32_t subtitleGeneration; 444 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 445 && subtitleGeneration != mSubtitleGeneration) { 446 return -EAGAIN; 447 }; 448 (*accessUnit)->meta()->setInt32( 449 "trackIndex", mPlaylist->getSelectedIndex()); 450 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 451 } else if (stream == STREAMTYPE_METADATA) { 452 HLSTime mdTime((*accessUnit)->meta()); 453 if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) { 454 packetSource->requeueAccessUnit((*accessUnit)); 455 return -EAGAIN; 456 } else { 457 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq); 458 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq); 459 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 460 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 461 } 462 } 463 } else { 464 ALOGI("[%s] encountered error %d", streamStr, err); 465 } 466 467 return err; 468 } 469 470 status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 471 if (!(mStreamMask & stream)) { 472 return UNKNOWN_ERROR; 473 } 474 475 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 476 477 sp<MetaData> meta = packetSource->getFormat(); 478 479 if (meta == NULL) { 480 return -EWOULDBLOCK; 481 } 482 483 if (stream == STREAMTYPE_AUDIO) { 484 // set AAC input buffer size to 32K bytes (256kbps x 1sec) 485 meta->setInt32(kKeyMaxInputSize, 32 * 1024); 486 } else if (stream == STREAMTYPE_VIDEO) { 487 meta->setInt32(kKeyMaxWidth, mMaxWidth); 488 meta->setInt32(kKeyMaxHeight, mMaxHeight); 489 } 490 491 return convertMetaDataToMessage(meta, format); 492 } 493 494 sp<HTTPDownloader> LiveSession::getHTTPDownloader() { 495 return new HTTPDownloader(mHTTPService, mExtraHeaders); 496 } 497 498 void LiveSession::connectAsync( 499 const char *url, const KeyedVector<String8, String8> *headers) { 500 sp<AMessage> msg = new AMessage(kWhatConnect, this); 501 msg->setString("url", url); 502 503 if (headers != NULL) { 504 msg->setPointer( 505 "headers", 506 new KeyedVector<String8, String8>(*headers)); 507 } 508 509 msg->post(); 510 } 511 512 status_t LiveSession::disconnect() { 513 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 514 515 sp<AMessage> response; 516 status_t err = msg->postAndAwaitResponse(&response); 517 518 return err; 519 } 520 521 status_t LiveSession::seekTo(int64_t timeUs) { 522 sp<AMessage> msg = new AMessage(kWhatSeek, this); 523 msg->setInt64("timeUs", timeUs); 524 525 sp<AMessage> response; 526 status_t err = msg->postAndAwaitResponse(&response); 527 528 return err; 529 } 530 531 bool LiveSession::checkSwitchProgress( 532 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) { 533 AString newUri; 534 CHECK(stopParams->findString("uri", &newUri)); 535 536 *needResumeUntil = false; 537 sp<AMessage> firstNewMeta[kMaxStreams]; 538 for (size_t i = 0; i < kMaxStreams; ++i) { 539 StreamType stream = indexToType(i); 540 if (!(mSwapMask & mNewStreamMask & stream) 541 || (mStreams[i].mNewUri != newUri)) { 542 continue; 543 } 544 if (stream == STREAMTYPE_SUBTITLES) { 545 continue; 546 } 547 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i); 548 549 // First, get latest dequeued meta, which is where the decoder is at. 550 // (when upswitching, we take the meta after a certain delay, so that 551 // the decoder is left with some cushion) 552 sp<AMessage> lastDequeueMeta, lastEnqueueMeta; 553 if (delayUs > 0) { 554 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); 555 if (lastDequeueMeta == NULL) { 556 // this means we don't have enough cushion, try again later 557 ALOGV("[%s] up switching failed due to insufficient buffer", 558 getNameForStream(stream)); 559 return false; 560 } 561 } else { 562 // It's okay for lastDequeueMeta to be NULL here, it means the 563 // decoder hasn't even started dequeueing 564 lastDequeueMeta = source->getLatestDequeuedMeta(); 565 } 566 // Then, trim off packets at beginning of mPacketSources2 that's before 567 // the latest dequeued time. These samples are definitely too late. 568 firstNewMeta[i] = mPacketSources2.editValueAt(i) 569 ->trimBuffersBeforeMeta(lastDequeueMeta); 570 571 // Now firstNewMeta[i] is the first sample after the trim. 572 // If it's NULL, we failed because dequeue already past all samples 573 // in mPacketSource2, we have to try again. 574 if (firstNewMeta[i] == NULL) { 575 HLSTime dequeueTime(lastDequeueMeta); 576 ALOGV("[%s] dequeue time (%d, %lld) past start time", 577 getNameForStream(stream), 578 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs); 579 return false; 580 } 581 582 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher 583 // already fetched, and see if we need to resumeUntil 584 lastEnqueueMeta = source->getLatestEnqueuedMeta(); 585 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity 586 // boundary, no need to resume as the content will look different anyways 587 if (lastEnqueueMeta != NULL) { 588 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]); 589 590 // no need to resume old fetcher if new fetcher started in different 591 // discontinuity sequence, as the content will look different. 592 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq 593 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs); 594 595 // update the stopTime for resumeUntil 596 stopParams->setInt32("discontinuitySeq", startTime.mSeq); 597 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs); 598 } 599 } 600 601 // if we're here, it means dequeue progress hasn't passed some samples in 602 // mPacketSource2, we can trim off the excess in mPacketSource. 603 // (old fetcher might still need to resumeUntil the start time of new fetcher) 604 for (size_t i = 0; i < kMaxStreams; ++i) { 605 StreamType stream = indexToType(i); 606 if (!(mSwapMask & mNewStreamMask & stream) 607 || (newUri != mStreams[i].mNewUri) 608 || stream == STREAMTYPE_SUBTITLES) { 609 continue; 610 } 611 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]); 612 } 613 614 // no resumeUntil if already underflow 615 *needResumeUntil &= !mBuffering; 616 617 return true; 618 } 619 620 void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 621 switch (msg->what()) { 622 case kWhatConnect: 623 { 624 onConnect(msg); 625 break; 626 } 627 628 case kWhatDisconnect: 629 { 630 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 631 632 if (mReconfigurationInProgress) { 633 break; 634 } 635 636 finishDisconnect(); 637 break; 638 } 639 640 case kWhatSeek: 641 { 642 if (mReconfigurationInProgress) { 643 msg->post(50000); 644 break; 645 } 646 647 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 648 mSeekReply = new AMessage; 649 650 onSeek(msg); 651 break; 652 } 653 654 case kWhatFetcherNotify: 655 { 656 int32_t what; 657 CHECK(msg->findInt32("what", &what)); 658 659 switch (what) { 660 case PlaylistFetcher::kWhatStarted: 661 break; 662 case PlaylistFetcher::kWhatPaused: 663 case PlaylistFetcher::kWhatStopped: 664 { 665 AString uri; 666 CHECK(msg->findString("uri", &uri)); 667 ssize_t index = mFetcherInfos.indexOfKey(uri); 668 if (index < 0) { 669 // ignore msgs from fetchers that's already gone 670 break; 671 } 672 673 ALOGV("fetcher-%d %s", 674 mFetcherInfos[index].mFetcher->getFetcherID(), 675 what == PlaylistFetcher::kWhatPaused ? 676 "paused" : "stopped"); 677 678 if (what == PlaylistFetcher::kWhatStopped) { 679 mFetcherLooper->unregisterHandler( 680 mFetcherInfos[index].mFetcher->id()); 681 mFetcherInfos.removeItemsAt(index); 682 } else if (what == PlaylistFetcher::kWhatPaused) { 683 int32_t seekMode; 684 CHECK(msg->findInt32("seekMode", &seekMode)); 685 for (size_t i = 0; i < kMaxStreams; ++i) { 686 if (mStreams[i].mUri == uri) { 687 mStreams[i].mSeekMode = (SeekMode) seekMode; 688 } 689 } 690 } 691 692 if (mContinuation != NULL) { 693 CHECK_GT(mContinuationCounter, 0); 694 if (--mContinuationCounter == 0) { 695 mContinuation->post(); 696 } 697 ALOGV("%zu fetcher(s) left", mContinuationCounter); 698 } 699 break; 700 } 701 702 case PlaylistFetcher::kWhatDurationUpdate: 703 { 704 AString uri; 705 CHECK(msg->findString("uri", &uri)); 706 707 int64_t durationUs; 708 CHECK(msg->findInt64("durationUs", &durationUs)); 709 710 ssize_t index = mFetcherInfos.indexOfKey(uri); 711 if (index >= 0) { 712 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 713 info->mDurationUs = durationUs; 714 } 715 break; 716 } 717 718 case PlaylistFetcher::kWhatTargetDurationUpdate: 719 { 720 int64_t targetDurationUs; 721 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); 722 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4); 723 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4); 724 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs); 725 break; 726 } 727 728 case PlaylistFetcher::kWhatError: 729 { 730 status_t err; 731 CHECK(msg->findInt32("err", &err)); 732 733 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 734 735 // handle EOS on subtitle tracks independently 736 AString uri; 737 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 738 ssize_t i = mFetcherInfos.indexOfKey(uri); 739 if (i >= 0) { 740 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 741 if (fetcher != NULL) { 742 uint32_t type = fetcher->getStreamTypeMask(); 743 if (type == STREAMTYPE_SUBTITLES) { 744 mPacketSources.valueFor( 745 STREAMTYPE_SUBTITLES)->signalEOS(err);; 746 break; 747 } 748 } 749 } 750 } 751 752 // remember the failure index (as mCurBandwidthIndex will be restored 753 // after cancelBandwidthSwitch()), and record last fail time 754 size_t failureIndex = mCurBandwidthIndex; 755 mBandwidthItems.editItemAt( 756 failureIndex).mLastFailureUs = ALooper::GetNowUs(); 757 758 if (mSwitchInProgress) { 759 // if error happened when we switch to a variant, try fallback 760 // to other variant to save the session 761 if (tryBandwidthFallback()) { 762 break; 763 } 764 } 765 766 if (mInPreparationPhase) { 767 postPrepared(err); 768 } 769 770 cancelBandwidthSwitch(); 771 772 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 773 774 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 775 776 mPacketSources.valueFor( 777 STREAMTYPE_SUBTITLES)->signalEOS(err); 778 779 postError(err); 780 break; 781 } 782 783 case PlaylistFetcher::kWhatStopReached: 784 { 785 ALOGV("kWhatStopReached"); 786 787 AString oldUri; 788 CHECK(msg->findString("uri", &oldUri)); 789 790 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 791 if (index < 0) { 792 break; 793 } 794 795 tryToFinishBandwidthSwitch(oldUri); 796 break; 797 } 798 799 case PlaylistFetcher::kWhatStartedAt: 800 { 801 int32_t switchGeneration; 802 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 803 804 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d", 805 switchGeneration, mSwitchGeneration); 806 807 if (switchGeneration != mSwitchGeneration) { 808 break; 809 } 810 811 AString uri; 812 CHECK(msg->findString("uri", &uri)); 813 814 // mark new fetcher mToBeResumed 815 ssize_t index = mFetcherInfos.indexOfKey(uri); 816 if (index >= 0) { 817 mFetcherInfos.editValueAt(index).mToBeResumed = true; 818 } 819 820 // temporarily disable packet sources to be swapped to prevent 821 // NuPlayerDecoder from dequeuing while we check progress 822 for (size_t i = 0; i < mPacketSources.size(); ++i) { 823 if ((mSwapMask & mPacketSources.keyAt(i)) 824 && uri == mStreams[i].mNewUri) { 825 mPacketSources.editValueAt(i)->enable(false); 826 } 827 } 828 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); 829 // If switching up, require a cushion bigger than kUnderflowMark 830 // to avoid buffering immediately after the switch. 831 // (If we don't have that cushion we'd rather cancel and try again.) 832 int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0; 833 bool needResumeUntil = false; 834 sp<AMessage> stopParams = msg; 835 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { 836 // playback time hasn't passed startAt time 837 if (!needResumeUntil) { 838 ALOGV("finish switch"); 839 for (size_t i = 0; i < kMaxStreams; ++i) { 840 if ((mSwapMask & indexToType(i)) 841 && uri == mStreams[i].mNewUri) { 842 // have to make a copy of mStreams[i].mUri because 843 // tryToFinishBandwidthSwitch is modifying mStreams[] 844 AString oldURI = mStreams[i].mUri; 845 tryToFinishBandwidthSwitch(oldURI); 846 break; 847 } 848 } 849 } else { 850 // startAt time is after last enqueue time 851 // Resume fetcher for the original variant; the resumed fetcher should 852 // continue until the timestamps found in msg, which is stored by the 853 // new fetcher to indicate where the new variant has started buffering. 854 ALOGV("finish switch with resumeUntilAsync"); 855 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 856 const FetcherInfo &info = mFetcherInfos.valueAt(i); 857 if (info.mToBeRemoved) { 858 info.mFetcher->resumeUntilAsync(stopParams); 859 } 860 } 861 } 862 } else { 863 // playback time passed startAt time 864 if (switchUp) { 865 // if switching up, cancel and retry if condition satisfies again 866 ALOGV("cancel up switch because we're too late"); 867 cancelBandwidthSwitch(true /* resume */); 868 } else { 869 ALOGV("retry down switch at next sample"); 870 resumeFetcher(uri, mSwapMask, -1, true /* newUri */); 871 } 872 } 873 // re-enable all packet sources 874 for (size_t i = 0; i < mPacketSources.size(); ++i) { 875 mPacketSources.editValueAt(i)->enable(true); 876 } 877 878 break; 879 } 880 881 case PlaylistFetcher::kWhatPlaylistFetched: 882 { 883 onMasterPlaylistFetched(msg); 884 break; 885 } 886 887 case PlaylistFetcher::kWhatMetadataDetected: 888 { 889 if (!mHasMetadata) { 890 mHasMetadata = true; 891 sp<AMessage> notify = mNotify->dup(); 892 notify->setInt32("what", kWhatMetadataDetected); 893 notify->post(); 894 } 895 break; 896 } 897 898 default: 899 TRESPASS(); 900 } 901 902 break; 903 } 904 905 case kWhatChangeConfiguration: 906 { 907 onChangeConfiguration(msg); 908 break; 909 } 910 911 case kWhatChangeConfiguration2: 912 { 913 onChangeConfiguration2(msg); 914 break; 915 } 916 917 case kWhatChangeConfiguration3: 918 { 919 onChangeConfiguration3(msg); 920 break; 921 } 922 923 case kWhatPollBuffering: 924 { 925 int32_t generation; 926 CHECK(msg->findInt32("generation", &generation)); 927 if (generation == mPollBufferingGeneration) { 928 onPollBuffering(); 929 } 930 break; 931 } 932 933 default: 934 TRESPASS(); 935 break; 936 } 937 } 938 939 // static 940 bool LiveSession::isBandwidthValid(const BandwidthItem &item) { 941 static const int64_t kBlacklistWindowUs = 300 * 1000000ll; 942 return item.mLastFailureUs < 0 943 || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs; 944 } 945 946 // static 947 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 948 if (a->mBandwidth < b->mBandwidth) { 949 return -1; 950 } else if (a->mBandwidth == b->mBandwidth) { 951 return 0; 952 } 953 954 return 1; 955 } 956 957 // static 958 LiveSession::StreamType LiveSession::indexToType(int idx) { 959 CHECK(idx >= 0 && idx < kNumSources); 960 return (StreamType)(1 << idx); 961 } 962 963 // static 964 ssize_t LiveSession::typeToIndex(int32_t type) { 965 switch (type) { 966 case STREAMTYPE_AUDIO: 967 return 0; 968 case STREAMTYPE_VIDEO: 969 return 1; 970 case STREAMTYPE_SUBTITLES: 971 return 2; 972 case STREAMTYPE_METADATA: 973 return 3; 974 default: 975 return -1; 976 }; 977 return -1; 978 } 979 980 void LiveSession::onConnect(const sp<AMessage> &msg) { 981 CHECK(msg->findString("url", &mMasterURL)); 982 983 // TODO currently we don't know if we are coming here from incognito mode 984 ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str()); 985 986 KeyedVector<String8, String8> *headers = NULL; 987 if (!msg->findPointer("headers", (void **)&headers)) { 988 mExtraHeaders.clear(); 989 } else { 990 mExtraHeaders = *headers; 991 992 delete headers; 993 headers = NULL; 994 } 995 996 // create looper for fetchers 997 if (mFetcherLooper == NULL) { 998 mFetcherLooper = new ALooper(); 999 1000 mFetcherLooper->setName("Fetcher"); 1001 mFetcherLooper->start(false, false); 1002 } 1003 1004 // create fetcher to fetch the master playlist 1005 addFetcher(mMasterURL.c_str())->fetchPlaylistAsync(); 1006 } 1007 1008 void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) { 1009 AString uri; 1010 CHECK(msg->findString("uri", &uri)); 1011 ssize_t index = mFetcherInfos.indexOfKey(uri); 1012 if (index < 0) { 1013 ALOGW("fetcher for master playlist is gone."); 1014 return; 1015 } 1016 1017 // no longer useful, remove 1018 mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id()); 1019 mFetcherInfos.removeItemsAt(index); 1020 1021 CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist)); 1022 if (mPlaylist == NULL) { 1023 ALOGE("unable to fetch master playlist %s.", 1024 uriDebugString(mMasterURL).c_str()); 1025 1026 postPrepared(ERROR_IO); 1027 return; 1028 } 1029 // We trust the content provider to make a reasonable choice of preferred 1030 // initial bandwidth by listing it first in the variant playlist. 1031 // At startup we really don't have a good estimate on the available 1032 // network bandwidth since we haven't tranferred any data yet. Once 1033 // we have we can make a better informed choice. 1034 size_t initialBandwidth = 0; 1035 size_t initialBandwidthIndex = 0; 1036 1037 int32_t maxWidth = 0; 1038 int32_t maxHeight = 0; 1039 1040 if (mPlaylist->isVariantPlaylist()) { 1041 Vector<BandwidthItem> itemsWithVideo; 1042 for (size_t i = 0; i < mPlaylist->size(); ++i) { 1043 BandwidthItem item; 1044 1045 item.mPlaylistIndex = i; 1046 item.mLastFailureUs = -1ll; 1047 1048 sp<AMessage> meta; 1049 AString uri; 1050 mPlaylist->itemAt(i, &uri, &meta); 1051 1052 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 1053 1054 int32_t width, height; 1055 if (meta->findInt32("width", &width)) { 1056 maxWidth = max(maxWidth, width); 1057 } 1058 if (meta->findInt32("height", &height)) { 1059 maxHeight = max(maxHeight, height); 1060 } 1061 1062 mBandwidthItems.push(item); 1063 if (mPlaylist->hasType(i, "video")) { 1064 itemsWithVideo.push(item); 1065 } 1066 } 1067 // remove the audio-only variants if we have at least one with video 1068 if (!itemsWithVideo.empty() 1069 && itemsWithVideo.size() < mBandwidthItems.size()) { 1070 mBandwidthItems.clear(); 1071 for (size_t i = 0; i < itemsWithVideo.size(); ++i) { 1072 mBandwidthItems.push(itemsWithVideo[i]); 1073 } 1074 } 1075 1076 CHECK_GT(mBandwidthItems.size(), 0u); 1077 initialBandwidth = mBandwidthItems[0].mBandwidth; 1078 1079 mBandwidthItems.sort(SortByBandwidth); 1080 1081 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 1082 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 1083 initialBandwidthIndex = i; 1084 break; 1085 } 1086 } 1087 } else { 1088 // dummy item. 1089 BandwidthItem item; 1090 item.mPlaylistIndex = 0; 1091 item.mBandwidth = 0; 1092 mBandwidthItems.push(item); 1093 } 1094 1095 mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth; 1096 mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight; 1097 1098 mPlaylist->pickRandomMediaItems(); 1099 changeConfiguration( 1100 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 1101 } 1102 1103 void LiveSession::finishDisconnect() { 1104 ALOGV("finishDisconnect"); 1105 1106 // No reconfiguration is currently pending, make sure none will trigger 1107 // during disconnection either. 1108 cancelBandwidthSwitch(); 1109 1110 // cancel buffer polling 1111 cancelPollBuffering(); 1112 1113 // TRICKY: don't wait for all fetcher to be stopped when disconnecting 1114 // 1115 // Some fetchers might be stuck in connect/getSize at this point. These 1116 // operations will eventually timeout (as we have a timeout set in 1117 // MediaHTTPConnection), but we don't want to block the main UI thread 1118 // until then. Here we just need to make sure we clear all references 1119 // to the fetchers, so that when they finally exit from the blocking 1120 // operation, they can be destructed. 1121 // 1122 // There is one very tricky point though. For this scheme to work, the 1123 // fecther must hold a reference to LiveSession, so that LiveSession is 1124 // destroyed after fetcher. Otherwise LiveSession would get stuck in its 1125 // own destructor when it waits for mFetcherLooper to stop, which still 1126 // blocks main UI thread. 1127 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1128 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1129 mFetcherLooper->unregisterHandler( 1130 mFetcherInfos.valueAt(i).mFetcher->id()); 1131 } 1132 mFetcherInfos.clear(); 1133 1134 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 1135 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 1136 1137 mPacketSources.valueFor( 1138 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 1139 1140 sp<AMessage> response = new AMessage; 1141 response->setInt32("err", OK); 1142 1143 response->postReply(mDisconnectReplyID); 1144 mDisconnectReplyID.clear(); 1145 } 1146 1147 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 1148 ssize_t index = mFetcherInfos.indexOfKey(uri); 1149 1150 if (index >= 0) { 1151 return NULL; 1152 } 1153 1154 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); 1155 notify->setString("uri", uri); 1156 notify->setInt32("switchGeneration", mSwitchGeneration); 1157 1158 FetcherInfo info; 1159 info.mFetcher = new PlaylistFetcher( 1160 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration); 1161 info.mDurationUs = -1ll; 1162 info.mToBeRemoved = false; 1163 info.mToBeResumed = false; 1164 mFetcherLooper->registerHandler(info.mFetcher); 1165 1166 mFetcherInfos.add(uri, info); 1167 1168 return info.mFetcher; 1169 } 1170 1171 #if 0 1172 static double uniformRand() { 1173 return (double)rand() / RAND_MAX; 1174 } 1175 #endif 1176 1177 bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) { 1178 ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i, 1179 newUri ? "true" : "false", 1180 newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str()); 1181 return i >= 0 1182 && ((!newUri && uri == mStreams[i].mUri) 1183 || (newUri && uri == mStreams[i].mNewUri)); 1184 } 1185 1186 sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex( 1187 size_t trackIndex, bool newUri) { 1188 StreamType type = indexToType(trackIndex); 1189 sp<AnotherPacketSource> source = NULL; 1190 if (newUri) { 1191 source = mPacketSources2.valueFor(type); 1192 source->clear(); 1193 } else { 1194 source = mPacketSources.valueFor(type); 1195 }; 1196 return source; 1197 } 1198 1199 sp<AnotherPacketSource> LiveSession::getMetadataSource( 1200 sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) { 1201 // todo: One case where the following strategy can fail is when audio and video 1202 // are in separate playlists, both are transport streams, and the metadata 1203 // is actually contained in the audio stream. 1204 ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s", 1205 streamMask, newUri ? "true" : "false"); 1206 1207 if ((sources[kVideoIndex] != NULL) // video fetcher; or ... 1208 || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) { 1209 // ... audio fetcher for audio only variant 1210 return getPacketSourceForStreamIndex(kMetaDataIndex, newUri); 1211 } 1212 1213 return NULL; 1214 } 1215 1216 bool LiveSession::resumeFetcher( 1217 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { 1218 ssize_t index = mFetcherInfos.indexOfKey(uri); 1219 if (index < 0) { 1220 ALOGE("did not find fetcher for uri: %s", uri.c_str()); 1221 return false; 1222 } 1223 1224 bool resume = false; 1225 sp<AnotherPacketSource> sources[kNumSources]; 1226 for (size_t i = 0; i < kMaxStreams; ++i) { 1227 if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) { 1228 resume = true; 1229 sources[i] = getPacketSourceForStreamIndex(i, newUri); 1230 } 1231 } 1232 1233 if (resume) { 1234 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher; 1235 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; 1236 1237 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d", 1238 fetcher->getFetcherID(), (long long)timeUs, seekMode); 1239 1240 fetcher->startAsync( 1241 sources[kAudioIndex], 1242 sources[kVideoIndex], 1243 sources[kSubtitleIndex], 1244 getMetadataSource(sources, streamMask, newUri), 1245 timeUs, -1, -1, seekMode); 1246 } 1247 1248 return resume; 1249 } 1250 1251 float LiveSession::getAbortThreshold( 1252 ssize_t currentBWIndex, ssize_t targetBWIndex) const { 1253 float abortThreshold = -1.0f; 1254 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { 1255 /* 1256 If we're switching down, we need to decide whether to 1257 1258 1) finish last segment of high-bandwidth variant, or 1259 2) abort last segment of high-bandwidth variant, and fetch an 1260 overlapping portion from low-bandwidth variant. 1261 1262 Here we try to maximize the amount of buffer left when the 1263 switch point is met. Given the following parameters: 1264 1265 B: our current buffering level in seconds 1266 T: target duration in seconds 1267 X: sample duration in seconds remain to fetch in last segment 1268 bw0: bandwidth of old variant (as specified in playlist) 1269 bw1: bandwidth of new variant (as specified in playlist) 1270 bw: measured bandwidth available 1271 1272 If we choose 1), when switch happens at the end of current 1273 segment, our buffering will be 1274 B + X - X * bw0 / bw 1275 1276 If we choose 2), when switch happens where we aborted current 1277 segment, our buffering will be 1278 B - (T - X) * bw1 / bw 1279 1280 We should only choose 1) if 1281 X/T < bw1 / (bw1 + bw0 - bw) 1282 */ 1283 1284 // abort old bandwidth immediately if bandwidth is fluctuating a lot. 1285 // our estimate could be far off, and fetching old bandwidth could 1286 // take too long. 1287 if (!mLastBandwidthStable) { 1288 return 0.0f; 1289 } 1290 1291 // Taking the measured current bandwidth at 50% face value only, 1292 // as our bandwidth estimation is a lagging indicator. Being 1293 // conservative on this, we prefer switching to lower bandwidth 1294 // unless we're really confident finishing up the last segment 1295 // of higher bandwidth will be fast. 1296 CHECK(mLastBandwidthBps >= 0); 1297 abortThreshold = 1298 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1299 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1300 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth 1301 - (float)mLastBandwidthBps * 0.5f); 1302 if (abortThreshold < 0.0f) { 1303 abortThreshold = -1.0f; // do not abort 1304 } 1305 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", 1306 mBandwidthItems.itemAt(currentBWIndex).mBandwidth, 1307 mBandwidthItems.itemAt(targetBWIndex).mBandwidth, 1308 mLastBandwidthBps, 1309 abortThreshold); 1310 } 1311 return abortThreshold; 1312 } 1313 1314 void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { 1315 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); 1316 } 1317 1318 ssize_t LiveSession::getLowestValidBandwidthIndex() const { 1319 for (size_t index = 0; index < mBandwidthItems.size(); index++) { 1320 if (isBandwidthValid(mBandwidthItems[index])) { 1321 return index; 1322 } 1323 } 1324 // if playlists are all blacklisted, return 0 and hope it's alive 1325 return 0; 1326 } 1327 1328 size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) { 1329 if (mBandwidthItems.size() < 2) { 1330 // shouldn't be here if we only have 1 bandwidth, check 1331 // logic to get rid of redundant bandwidth polling 1332 ALOGW("getBandwidthIndex() called for single bandwidth playlist!"); 1333 return 0; 1334 } 1335 1336 #if 1 1337 char value[PROPERTY_VALUE_MAX]; 1338 ssize_t index = -1; 1339 if (property_get("media.httplive.bw-index", value, NULL)) { 1340 char *end; 1341 index = strtol(value, &end, 10); 1342 CHECK(end > value && *end == '\0'); 1343 1344 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1345 index = mBandwidthItems.size() - 1; 1346 } 1347 } 1348 1349 if (index < 0) { 1350 char value[PROPERTY_VALUE_MAX]; 1351 if (property_get("media.httplive.max-bw", value, NULL)) { 1352 char *end; 1353 long maxBw = strtoul(value, &end, 10); 1354 if (end > value && *end == '\0') { 1355 if (maxBw > 0 && bandwidthBps > maxBw) { 1356 ALOGV("bandwidth capped to %ld bps", maxBw); 1357 bandwidthBps = maxBw; 1358 } 1359 } 1360 } 1361 1362 // Pick the highest bandwidth stream that's not currently blacklisted 1363 // below or equal to estimated bandwidth. 1364 1365 index = mBandwidthItems.size() - 1; 1366 ssize_t lowestBandwidth = getLowestValidBandwidthIndex(); 1367 while (index > lowestBandwidth) { 1368 // be conservative (70%) to avoid overestimating and immediately 1369 // switching down again. 1370 size_t adjustedBandwidthBps = bandwidthBps * 7 / 10; 1371 const BandwidthItem &item = mBandwidthItems[index]; 1372 if (item.mBandwidth <= adjustedBandwidthBps 1373 && isBandwidthValid(item)) { 1374 break; 1375 } 1376 --index; 1377 } 1378 } 1379 #elif 0 1380 // Change bandwidth at random() 1381 size_t index = uniformRand() * mBandwidthItems.size(); 1382 #elif 0 1383 // There's a 50% chance to stay on the current bandwidth and 1384 // a 50% chance to switch to the next higher bandwidth (wrapping around 1385 // to lowest) 1386 const size_t kMinIndex = 0; 1387 1388 static ssize_t mCurBandwidthIndex = -1; 1389 1390 size_t index; 1391 if (mCurBandwidthIndex < 0) { 1392 index = kMinIndex; 1393 } else if (uniformRand() < 0.5) { 1394 index = (size_t)mCurBandwidthIndex; 1395 } else { 1396 index = mCurBandwidthIndex + 1; 1397 if (index == mBandwidthItems.size()) { 1398 index = kMinIndex; 1399 } 1400 } 1401 mCurBandwidthIndex = index; 1402 #elif 0 1403 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1404 1405 size_t index = mBandwidthItems.size() - 1; 1406 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1407 --index; 1408 } 1409 #elif 1 1410 char value[PROPERTY_VALUE_MAX]; 1411 size_t index; 1412 if (property_get("media.httplive.bw-index", value, NULL)) { 1413 char *end; 1414 index = strtoul(value, &end, 10); 1415 CHECK(end > value && *end == '\0'); 1416 1417 if (index >= mBandwidthItems.size()) { 1418 index = mBandwidthItems.size() - 1; 1419 } 1420 } else { 1421 index = 0; 1422 } 1423 #else 1424 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1425 #endif 1426 1427 CHECK_GE(index, 0); 1428 1429 return index; 1430 } 1431 1432 HLSTime LiveSession::latestMediaSegmentStartTime() const { 1433 HLSTime audioTime(mPacketSources.valueFor( 1434 STREAMTYPE_AUDIO)->getLatestDequeuedMeta()); 1435 1436 HLSTime videoTime(mPacketSources.valueFor( 1437 STREAMTYPE_VIDEO)->getLatestDequeuedMeta()); 1438 1439 return audioTime < videoTime ? videoTime : audioTime; 1440 } 1441 1442 void LiveSession::onSeek(const sp<AMessage> &msg) { 1443 int64_t timeUs; 1444 CHECK(msg->findInt64("timeUs", &timeUs)); 1445 changeConfiguration(timeUs); 1446 } 1447 1448 status_t LiveSession::getDuration(int64_t *durationUs) const { 1449 int64_t maxDurationUs = -1ll; 1450 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1451 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1452 1453 if (fetcherDurationUs > maxDurationUs) { 1454 maxDurationUs = fetcherDurationUs; 1455 } 1456 } 1457 1458 *durationUs = maxDurationUs; 1459 1460 return OK; 1461 } 1462 1463 bool LiveSession::isSeekable() const { 1464 int64_t durationUs; 1465 return getDuration(&durationUs) == OK && durationUs >= 0; 1466 } 1467 1468 bool LiveSession::hasDynamicDuration() const { 1469 return false; 1470 } 1471 1472 size_t LiveSession::getTrackCount() const { 1473 if (mPlaylist == NULL) { 1474 return 0; 1475 } else { 1476 return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0); 1477 } 1478 } 1479 1480 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1481 if (mPlaylist == NULL) { 1482 return NULL; 1483 } else { 1484 if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) { 1485 sp<AMessage> format = new AMessage(); 1486 format->setInt32("type", MEDIA_TRACK_TYPE_METADATA); 1487 format->setString("language", "und"); 1488 format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3); 1489 return format; 1490 } 1491 return mPlaylist->getTrackInfo(trackIndex); 1492 } 1493 } 1494 1495 status_t LiveSession::selectTrack(size_t index, bool select) { 1496 if (mPlaylist == NULL) { 1497 return INVALID_OPERATION; 1498 } 1499 1500 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++", 1501 index, select, mSubtitleGeneration); 1502 1503 ++mSubtitleGeneration; 1504 status_t err = mPlaylist->selectTrack(index, select); 1505 if (err == OK) { 1506 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); 1507 msg->setInt32("pickTrack", select); 1508 msg->post(); 1509 } 1510 return err; 1511 } 1512 1513 ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1514 if (mPlaylist == NULL) { 1515 return -1; 1516 } else { 1517 return mPlaylist->getSelectedTrack(type); 1518 } 1519 } 1520 1521 void LiveSession::changeConfiguration( 1522 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { 1523 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d", 1524 (long long)timeUs, bandwidthIndex, pickTrack); 1525 1526 cancelBandwidthSwitch(); 1527 1528 CHECK(!mReconfigurationInProgress); 1529 mReconfigurationInProgress = true; 1530 if (bandwidthIndex >= 0) { 1531 mOrigBandwidthIndex = mCurBandwidthIndex; 1532 mCurBandwidthIndex = bandwidthIndex; 1533 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1534 ALOGI("#### Starting Bandwidth Switch: %zd => %zd", 1535 mOrigBandwidthIndex, mCurBandwidthIndex); 1536 } 1537 } 1538 CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); 1539 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); 1540 1541 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1542 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1543 1544 AString URIs[kMaxStreams]; 1545 for (size_t i = 0; i < kMaxStreams; ++i) { 1546 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1547 streamMask |= indexToType(i); 1548 } 1549 } 1550 1551 // Step 1, stop and discard fetchers that are no longer needed. 1552 // Pause those that we'll reuse. 1553 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1554 // skip fetchers that are marked mToBeRemoved, 1555 // these are done and can't be reused 1556 if (mFetcherInfos[i].mToBeRemoved) { 1557 continue; 1558 } 1559 1560 const AString &uri = mFetcherInfos.keyAt(i); 1561 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher; 1562 1563 bool discardFetcher = true, delayRemoval = false; 1564 for (size_t j = 0; j < kMaxStreams; ++j) { 1565 StreamType type = indexToType(j); 1566 if ((streamMask & type) && uri == URIs[j]) { 1567 resumeMask |= type; 1568 streamMask &= ~type; 1569 discardFetcher = false; 1570 } 1571 } 1572 // Delay fetcher removal if not picking tracks, AND old fetcher 1573 // has stream mask that overlaps new variant. (Okay to discard 1574 // old fetcher now, if completely no overlap.) 1575 if (discardFetcher && timeUs < 0ll && !pickTrack 1576 && (fetcher->getStreamTypeMask() & streamMask)) { 1577 discardFetcher = false; 1578 delayRemoval = true; 1579 } 1580 1581 if (discardFetcher) { 1582 ALOGV("discarding fetcher-%d", fetcher->getFetcherID()); 1583 fetcher->stopAsync(); 1584 } else { 1585 float threshold = 0.0f; // default to pause after current block (47Kbytes) 1586 bool disconnect = false; 1587 if (timeUs >= 0ll) { 1588 // seeking, no need to finish fetching 1589 disconnect = true; 1590 } else if (delayRemoval) { 1591 // adapting, abort if remaining of current segment is over threshold 1592 threshold = getAbortThreshold( 1593 mOrigBandwidthIndex, mCurBandwidthIndex); 1594 } 1595 1596 ALOGV("pausing fetcher-%d, threshold=%.2f", 1597 fetcher->getFetcherID(), threshold); 1598 fetcher->pauseAsync(threshold, disconnect); 1599 } 1600 } 1601 1602 sp<AMessage> msg; 1603 if (timeUs < 0ll) { 1604 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1605 msg = new AMessage(kWhatChangeConfiguration3, this); 1606 } else { 1607 msg = new AMessage(kWhatChangeConfiguration2, this); 1608 } 1609 msg->setInt32("streamMask", streamMask); 1610 msg->setInt32("resumeMask", resumeMask); 1611 msg->setInt32("pickTrack", pickTrack); 1612 msg->setInt64("timeUs", timeUs); 1613 for (size_t i = 0; i < kMaxStreams; ++i) { 1614 if ((streamMask | resumeMask) & indexToType(i)) { 1615 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1616 } 1617 } 1618 1619 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1620 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1621 // fetchers have completed their asynchronous operation, we'll post 1622 // mContinuation, which then is handled below in onChangeConfiguration2. 1623 mContinuationCounter = mFetcherInfos.size(); 1624 mContinuation = msg; 1625 1626 if (mContinuationCounter == 0) { 1627 msg->post(); 1628 } 1629 } 1630 1631 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1632 ALOGV("onChangeConfiguration"); 1633 1634 if (!mReconfigurationInProgress) { 1635 int32_t pickTrack = 0; 1636 msg->findInt32("pickTrack", &pickTrack); 1637 changeConfiguration(-1ll /* timeUs */, -1, pickTrack); 1638 } else { 1639 msg->post(1000000ll); // retry in 1 sec 1640 } 1641 } 1642 1643 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1644 ALOGV("onChangeConfiguration2"); 1645 1646 mContinuation.clear(); 1647 1648 // All fetchers are either suspended or have been removed now. 1649 1650 // If we're seeking, clear all packet sources before we report 1651 // seek complete, to prevent decoder from pulling stale data. 1652 int64_t timeUs; 1653 CHECK(msg->findInt64("timeUs", &timeUs)); 1654 1655 if (timeUs >= 0) { 1656 mLastSeekTimeUs = timeUs; 1657 mLastDequeuedTimeUs = timeUs; 1658 1659 for (size_t i = 0; i < mPacketSources.size(); i++) { 1660 sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i); 1661 sp<MetaData> format = packetSource->getFormat(); 1662 packetSource->clear(); 1663 // Set a tentative format here such that HTTPLiveSource will always have 1664 // a format available when NuPlayer queries. Without an available video 1665 // format when setting a surface NuPlayer might disable video decoding 1666 // altogether. The tentative format will be overwritten by the 1667 // authoritative (and possibly same) format once content from the new 1668 // position is dequeued. 1669 packetSource->setFormat(format); 1670 } 1671 1672 for (size_t i = 0; i < kMaxStreams; ++i) { 1673 mStreams[i].reset(); 1674 } 1675 1676 mDiscontinuityOffsetTimesUs.clear(); 1677 mDiscontinuityAbsStartTimesUs.clear(); 1678 1679 if (mSeekReplyID != NULL) { 1680 CHECK(mSeekReply != NULL); 1681 mSeekReply->setInt32("err", OK); 1682 mSeekReply->postReply(mSeekReplyID); 1683 mSeekReplyID.clear(); 1684 mSeekReply.clear(); 1685 } 1686 1687 // restart buffer polling after seek becauese previous 1688 // buffering position is no longer valid. 1689 restartPollBuffering(); 1690 } 1691 1692 uint32_t streamMask, resumeMask; 1693 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1694 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1695 1696 streamMask |= resumeMask; 1697 1698 AString URIs[kMaxStreams]; 1699 for (size_t i = 0; i < kMaxStreams; ++i) { 1700 if (streamMask & indexToType(i)) { 1701 const AString &uriKey = mStreams[i].uriKey(); 1702 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1703 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1704 } 1705 } 1706 1707 uint32_t changedMask = 0; 1708 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1709 // stream URI could change even if onChangeConfiguration2 is only 1710 // used for seek. Seek could happen during a bw switch, in this 1711 // case bw switch will be cancelled, but the seekTo position will 1712 // fetch from the new URI. 1713 if ((mStreamMask & streamMask & indexToType(i)) 1714 && !mStreams[i].mUri.empty() 1715 && !(URIs[i] == mStreams[i].mUri)) { 1716 ALOGV("stream %zu changed: oldURI %s, newURI %s", i, 1717 mStreams[i].mUri.c_str(), URIs[i].c_str()); 1718 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); 1719 if (source->getLatestDequeuedMeta() != NULL) { 1720 source->queueDiscontinuity( 1721 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1722 } 1723 } 1724 // Determine which decoders to shutdown on the player side, 1725 // a decoder has to be shutdown if its streamtype was active 1726 // before but now longer isn't. 1727 if ((mStreamMask & ~streamMask & indexToType(i))) { 1728 changedMask |= indexToType(i); 1729 } 1730 } 1731 1732 if (changedMask == 0) { 1733 // If nothing changed as far as the audio/video decoders 1734 // are concerned we can proceed. 1735 onChangeConfiguration3(msg); 1736 return; 1737 } 1738 1739 // Something changed, inform the player which will shutdown the 1740 // corresponding decoders and will post the reply once that's done. 1741 // Handling the reply will continue executing below in 1742 // onChangeConfiguration3. 1743 sp<AMessage> notify = mNotify->dup(); 1744 notify->setInt32("what", kWhatStreamsChanged); 1745 notify->setInt32("changedMask", changedMask); 1746 1747 msg->setWhat(kWhatChangeConfiguration3); 1748 msg->setTarget(this); 1749 1750 notify->setMessage("reply", msg); 1751 notify->post(); 1752 } 1753 1754 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1755 mContinuation.clear(); 1756 // All remaining fetchers are still suspended, the player has shutdown 1757 // any decoders that needed it. 1758 1759 uint32_t streamMask, resumeMask; 1760 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1761 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1762 1763 mNewStreamMask = streamMask | resumeMask; 1764 1765 int64_t timeUs; 1766 int32_t pickTrack; 1767 bool switching = false; 1768 CHECK(msg->findInt64("timeUs", &timeUs)); 1769 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1770 1771 if (timeUs < 0ll) { 1772 if (!pickTrack) { 1773 // mSwapMask contains streams that are in both old and new variant, 1774 // (in mNewStreamMask & mStreamMask) but with different URIs 1775 // (not in resumeMask). 1776 // For example, old variant has video and audio in two separate 1777 // URIs, and new variant has only audio with unchanged URI. mSwapMask 1778 // should be 0 as there is nothing to swap. We only need to stop video, 1779 // and resume audio. 1780 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask; 1781 switching = (mSwapMask != 0); 1782 } 1783 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1784 } else { 1785 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1786 } 1787 1788 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, " 1789 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x", 1790 (long long)timeUs, switching, pickTrack, 1791 mStreamMask, mNewStreamMask, mSwapMask); 1792 1793 for (size_t i = 0; i < kMaxStreams; ++i) { 1794 if (streamMask & indexToType(i)) { 1795 if (switching) { 1796 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1797 } else { 1798 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1799 } 1800 } 1801 } 1802 1803 // Of all existing fetchers: 1804 // * Resume fetchers that are still needed and assign them original packet sources. 1805 // * Mark otherwise unneeded fetchers for removal. 1806 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1807 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1808 const AString &uri = mFetcherInfos.keyAt(i); 1809 if (!resumeFetcher(uri, resumeMask, timeUs)) { 1810 ALOGV("marking fetcher-%d to be removed", 1811 mFetcherInfos[i].mFetcher->getFetcherID()); 1812 1813 mFetcherInfos.editValueAt(i).mToBeRemoved = true; 1814 } 1815 } 1816 1817 // streamMask now only contains the types that need a new fetcher created. 1818 if (streamMask != 0) { 1819 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1820 } 1821 1822 // Find out when the original fetchers have buffered up to and start the new fetchers 1823 // at a later timestamp. 1824 for (size_t i = 0; i < kMaxStreams; i++) { 1825 if (!(indexToType(i) & streamMask)) { 1826 continue; 1827 } 1828 1829 AString uri; 1830 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1831 1832 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1833 CHECK(fetcher != NULL); 1834 1835 HLSTime startTime; 1836 SeekMode seekMode = kSeekModeExactPosition; 1837 sp<AnotherPacketSource> sources[kNumSources]; 1838 1839 if (i == kSubtitleIndex || (!pickTrack && !switching)) { 1840 startTime = latestMediaSegmentStartTime(); 1841 } 1842 1843 // TRICKY: looping from i as earlier streams are already removed from streamMask 1844 for (size_t j = i; j < kMaxStreams; ++j) { 1845 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1846 if ((streamMask & indexToType(j)) && uri == streamUri) { 1847 sources[j] = mPacketSources.valueFor(indexToType(j)); 1848 1849 if (timeUs >= 0) { 1850 startTime.mTimeUs = timeUs; 1851 } else { 1852 int32_t type; 1853 sp<AMessage> meta; 1854 if (!switching) { 1855 // selecting, or adapting but no swap required 1856 meta = sources[j]->getLatestDequeuedMeta(); 1857 } else { 1858 // adapting and swap required 1859 meta = sources[j]->getLatestEnqueuedMeta(); 1860 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { 1861 // switching up 1862 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); 1863 } 1864 } 1865 1866 if ((j == kAudioIndex || j == kVideoIndex) 1867 && meta != NULL && !meta->findInt32("discontinuity", &type)) { 1868 HLSTime tmpTime(meta); 1869 if (startTime < tmpTime) { 1870 startTime = tmpTime; 1871 } 1872 } 1873 1874 if (!switching) { 1875 // selecting, or adapting but no swap required 1876 sources[j]->clear(); 1877 if (j == kSubtitleIndex) { 1878 break; 1879 } 1880 1881 ALOGV("stream[%zu]: queue format change", j); 1882 sources[j]->queueDiscontinuity( 1883 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); 1884 } else { 1885 // switching, queue discontinuities after resume 1886 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1887 sources[j]->clear(); 1888 // the new fetcher might be providing streams that used to be 1889 // provided by two different fetchers, if one of the fetcher 1890 // paused in the middle while the other somehow paused in next 1891 // seg, we have to start from next seg. 1892 if (seekMode < mStreams[j].mSeekMode) { 1893 seekMode = mStreams[j].mSeekMode; 1894 } 1895 } 1896 } 1897 1898 streamMask &= ~indexToType(j); 1899 } 1900 } 1901 1902 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld " 1903 "segmentStartTimeUs %lld seekMode %d", 1904 fetcher->getFetcherID(), 1905 (long long)startTime.mTimeUs, 1906 (long long)mLastSeekTimeUs, 1907 (long long)startTime.getSegmentTimeUs(), 1908 seekMode); 1909 1910 // Set the target segment start time to the middle point of the 1911 // segment where the last sample was. 1912 // This gives a better guess if segments of the two variants are not 1913 // perfectly aligned. (If the corresponding segment in new variant 1914 // starts slightly later than that in the old variant, we still want 1915 // to pick that segment, not the one before) 1916 fetcher->startAsync( 1917 sources[kAudioIndex], 1918 sources[kVideoIndex], 1919 sources[kSubtitleIndex], 1920 getMetadataSource(sources, mNewStreamMask, switching), 1921 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, 1922 startTime.getSegmentTimeUs(), 1923 startTime.mSeq, 1924 seekMode); 1925 } 1926 1927 // All fetchers have now been started, the configuration change 1928 // has completed. 1929 1930 mReconfigurationInProgress = false; 1931 if (switching) { 1932 mSwitchInProgress = true; 1933 } else { 1934 mStreamMask = mNewStreamMask; 1935 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1936 ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd", 1937 mOrigBandwidthIndex, mCurBandwidthIndex); 1938 mOrigBandwidthIndex = mCurBandwidthIndex; 1939 } 1940 } 1941 1942 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x", 1943 mSwitchInProgress, mStreamMask); 1944 1945 if (mDisconnectReplyID != NULL) { 1946 finishDisconnect(); 1947 } 1948 } 1949 1950 void LiveSession::swapPacketSource(StreamType stream) { 1951 ALOGV("[%s] swapPacketSource", getNameForStream(stream)); 1952 1953 // transfer packets from source2 to source 1954 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 1955 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 1956 1957 // queue discontinuity in mPacketSource 1958 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); 1959 1960 // queue packets in mPacketSource2 to mPacketSource 1961 status_t finalResult = OK; 1962 sp<ABuffer> accessUnit; 1963 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && 1964 OK == aps2->dequeueAccessUnit(&accessUnit)) { 1965 aps->queueAccessUnit(accessUnit); 1966 } 1967 aps2->clear(); 1968 } 1969 1970 void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { 1971 if (!mSwitchInProgress) { 1972 return; 1973 } 1974 1975 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 1976 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { 1977 return; 1978 } 1979 1980 // Swap packet source of streams provided by old variant 1981 for (size_t idx = 0; idx < kMaxStreams; idx++) { 1982 StreamType stream = indexToType(idx); 1983 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { 1984 swapPacketSource(stream); 1985 1986 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1987 ALOGW("swapping stream type %d %s to empty stream", 1988 stream, mStreams[idx].mUri.c_str()); 1989 } 1990 mStreams[idx].mUri = mStreams[idx].mNewUri; 1991 mStreams[idx].mNewUri.clear(); 1992 1993 mSwapMask &= ~stream; 1994 } 1995 } 1996 1997 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); 1998 1999 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask); 2000 if (mSwapMask != 0) { 2001 return; 2002 } 2003 2004 // Check if new variant contains extra streams. 2005 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 2006 while (extraStreams) { 2007 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); 2008 extraStreams &= ~stream; 2009 2010 swapPacketSource(stream); 2011 2012 ssize_t idx = typeToIndex(stream); 2013 CHECK(idx >= 0); 2014 if (mStreams[idx].mNewUri.empty()) { 2015 ALOGW("swapping extra stream type %d %s to empty stream", 2016 stream, mStreams[idx].mUri.c_str()); 2017 } 2018 mStreams[idx].mUri = mStreams[idx].mNewUri; 2019 mStreams[idx].mNewUri.clear(); 2020 } 2021 2022 // Restart new fetcher (it was paused after the first 47k block) 2023 // and let it fetch into mPacketSources (not mPacketSources2) 2024 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2025 FetcherInfo &info = mFetcherInfos.editValueAt(i); 2026 if (info.mToBeResumed) { 2027 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); 2028 info.mToBeResumed = false; 2029 } 2030 } 2031 2032 ALOGI("#### Finished Bandwidth Switch: %zd => %zd", 2033 mOrigBandwidthIndex, mCurBandwidthIndex); 2034 2035 mStreamMask = mNewStreamMask; 2036 mSwitchInProgress = false; 2037 mOrigBandwidthIndex = mCurBandwidthIndex; 2038 2039 restartPollBuffering(); 2040 } 2041 2042 void LiveSession::schedulePollBuffering() { 2043 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); 2044 msg->setInt32("generation", mPollBufferingGeneration); 2045 msg->post(1000000ll); 2046 } 2047 2048 void LiveSession::cancelPollBuffering() { 2049 ++mPollBufferingGeneration; 2050 mPrevBufferPercentage = -1; 2051 } 2052 2053 void LiveSession::restartPollBuffering() { 2054 cancelPollBuffering(); 2055 onPollBuffering(); 2056 } 2057 2058 void LiveSession::onPollBuffering() { 2059 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " 2060 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", 2061 mSwitchInProgress, mReconfigurationInProgress, 2062 mInPreparationPhase, mCurBandwidthIndex, mStreamMask); 2063 2064 bool underflow, ready, down, up; 2065 if (checkBuffering(underflow, ready, down, up)) { 2066 if (mInPreparationPhase) { 2067 // Allow down switch even if we're still preparing. 2068 // 2069 // Some streams have a high bandwidth index as default, 2070 // when bandwidth is low, it takes a long time to buffer 2071 // to ready mark, then it immediately pauses after start 2072 // as we have to do a down switch. It's better experience 2073 // to restart from a lower index, if we detect low bw. 2074 if (!switchBandwidthIfNeeded(false /* up */, down) && ready) { 2075 postPrepared(OK); 2076 } 2077 } 2078 2079 if (!mInPreparationPhase) { 2080 if (ready) { 2081 stopBufferingIfNecessary(); 2082 } else if (underflow) { 2083 startBufferingIfNecessary(); 2084 } 2085 switchBandwidthIfNeeded(up, down); 2086 } 2087 } 2088 2089 schedulePollBuffering(); 2090 } 2091 2092 void LiveSession::cancelBandwidthSwitch(bool resume) { 2093 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", 2094 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); 2095 if (!mSwitchInProgress) { 2096 return; 2097 } 2098 2099 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2100 FetcherInfo& info = mFetcherInfos.editValueAt(i); 2101 if (info.mToBeRemoved) { 2102 info.mToBeRemoved = false; 2103 if (resume) { 2104 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); 2105 } 2106 } 2107 } 2108 2109 for (size_t i = 0; i < kMaxStreams; ++i) { 2110 AString newUri = mStreams[i].mNewUri; 2111 if (!newUri.empty()) { 2112 // clear all mNewUri matching this newUri 2113 for (size_t j = i; j < kMaxStreams; ++j) { 2114 if (mStreams[j].mNewUri == newUri) { 2115 mStreams[j].mNewUri.clear(); 2116 } 2117 } 2118 ALOGV("stopping newUri = %s", newUri.c_str()); 2119 ssize_t index = mFetcherInfos.indexOfKey(newUri); 2120 if (index < 0) { 2121 ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); 2122 continue; 2123 } 2124 FetcherInfo &info = mFetcherInfos.editValueAt(index); 2125 info.mToBeRemoved = true; 2126 info.mFetcher->stopAsync(); 2127 } 2128 } 2129 2130 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", 2131 mOrigBandwidthIndex, mCurBandwidthIndex); 2132 2133 mSwitchGeneration++; 2134 mSwitchInProgress = false; 2135 mCurBandwidthIndex = mOrigBandwidthIndex; 2136 mSwapMask = 0; 2137 } 2138 2139 bool LiveSession::checkBuffering( 2140 bool &underflow, bool &ready, bool &down, bool &up) { 2141 underflow = ready = down = up = false; 2142 2143 if (mReconfigurationInProgress) { 2144 ALOGV("Switch/Reconfig in progress, defer buffer polling"); 2145 return false; 2146 } 2147 2148 size_t activeCount, underflowCount, readyCount, downCount, upCount; 2149 activeCount = underflowCount = readyCount = downCount = upCount =0; 2150 int32_t minBufferPercent = -1; 2151 int64_t durationUs; 2152 if (getDuration(&durationUs) != OK) { 2153 durationUs = -1; 2154 } 2155 for (size_t i = 0; i < mPacketSources.size(); ++i) { 2156 // we don't check subtitles for buffering level 2157 if (!(mStreamMask & mPacketSources.keyAt(i) 2158 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { 2159 continue; 2160 } 2161 // ignore streams that never had any packet queued. 2162 // (it's possible that the variant only has audio or video) 2163 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 2164 if (meta == NULL) { 2165 continue; 2166 } 2167 2168 status_t finalResult; 2169 int64_t bufferedDurationUs = 2170 mPacketSources[i]->getBufferedDurationUs(&finalResult); 2171 ALOGV("[%s] buffered %lld us", 2172 getNameForStream(mPacketSources.keyAt(i)), 2173 (long long)bufferedDurationUs); 2174 if (durationUs >= 0) { 2175 int32_t percent; 2176 if (mPacketSources[i]->isFinished(0 /* duration */)) { 2177 percent = 100; 2178 } else { 2179 percent = (int32_t)(100.0 * 2180 (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); 2181 } 2182 if (minBufferPercent < 0 || percent < minBufferPercent) { 2183 minBufferPercent = percent; 2184 } 2185 } 2186 2187 ++activeCount; 2188 int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs; 2189 if (bufferedDurationUs > readyMark 2190 || mPacketSources[i]->isFinished(0)) { 2191 ++readyCount; 2192 } 2193 if (!mPacketSources[i]->isFinished(0)) { 2194 if (bufferedDurationUs < kUnderflowMarkUs) { 2195 ++underflowCount; 2196 } 2197 if (bufferedDurationUs > mUpSwitchMark) { 2198 ++upCount; 2199 } 2200 if (bufferedDurationUs < mDownSwitchMark) { 2201 ++downCount; 2202 } 2203 } 2204 } 2205 2206 if (minBufferPercent >= 0) { 2207 notifyBufferingUpdate(minBufferPercent); 2208 } 2209 2210 if (activeCount > 0) { 2211 up = (upCount == activeCount); 2212 down = (downCount > 0); 2213 ready = (readyCount == activeCount); 2214 underflow = (underflowCount > 0); 2215 return true; 2216 } 2217 2218 return false; 2219 } 2220 2221 void LiveSession::startBufferingIfNecessary() { 2222 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2223 mInPreparationPhase, mBuffering); 2224 if (!mBuffering) { 2225 mBuffering = true; 2226 2227 sp<AMessage> notify = mNotify->dup(); 2228 notify->setInt32("what", kWhatBufferingStart); 2229 notify->post(); 2230 } 2231 } 2232 2233 void LiveSession::stopBufferingIfNecessary() { 2234 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2235 mInPreparationPhase, mBuffering); 2236 2237 if (mBuffering) { 2238 mBuffering = false; 2239 2240 sp<AMessage> notify = mNotify->dup(); 2241 notify->setInt32("what", kWhatBufferingEnd); 2242 notify->post(); 2243 } 2244 } 2245 2246 void LiveSession::notifyBufferingUpdate(int32_t percentage) { 2247 if (percentage < mPrevBufferPercentage) { 2248 percentage = mPrevBufferPercentage; 2249 } else if (percentage > 100) { 2250 percentage = 100; 2251 } 2252 2253 mPrevBufferPercentage = percentage; 2254 2255 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); 2256 2257 sp<AMessage> notify = mNotify->dup(); 2258 notify->setInt32("what", kWhatBufferingUpdate); 2259 notify->setInt32("percentage", percentage); 2260 notify->post(); 2261 } 2262 2263 bool LiveSession::tryBandwidthFallback() { 2264 if (mInPreparationPhase || mReconfigurationInProgress) { 2265 // Don't try fallback during prepare or reconfig. 2266 // If error happens there, it's likely unrecoverable. 2267 return false; 2268 } 2269 if (mCurBandwidthIndex > mOrigBandwidthIndex) { 2270 // if we're switching up, simply cancel and resume old variant 2271 cancelBandwidthSwitch(true /* resume */); 2272 return true; 2273 } else { 2274 // if we're switching down, we're likely about to underflow (if 2275 // not already underflowing). try the lowest viable bandwidth if 2276 // not on that variant already. 2277 ssize_t lowestValid = getLowestValidBandwidthIndex(); 2278 if (mCurBandwidthIndex > lowestValid) { 2279 cancelBandwidthSwitch(); 2280 changeConfiguration(-1ll, lowestValid); 2281 return true; 2282 } 2283 } 2284 // return false if we couldn't find any fallback 2285 return false; 2286 } 2287 2288 /* 2289 * returns true if a bandwidth switch is actually needed (and started), 2290 * returns false otherwise 2291 */ 2292 bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { 2293 // no need to check bandwidth if we only have 1 bandwidth settings 2294 if (mBandwidthItems.size() < 2) { 2295 return false; 2296 } 2297 2298 if (mSwitchInProgress) { 2299 if (mBuffering) { 2300 tryBandwidthFallback(); 2301 } 2302 return false; 2303 } 2304 2305 int32_t bandwidthBps, shortTermBps; 2306 bool isStable; 2307 if (mBandwidthEstimator->estimateBandwidth( 2308 &bandwidthBps, &isStable, &shortTermBps)) { 2309 ALOGV("bandwidth estimated at %.2f kbps, " 2310 "stable %d, shortTermBps %.2f kbps", 2311 bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f); 2312 mLastBandwidthBps = bandwidthBps; 2313 mLastBandwidthStable = isStable; 2314 } else { 2315 ALOGV("no bandwidth estimate."); 2316 return false; 2317 } 2318 2319 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth; 2320 // canSwithDown and canSwitchUp can't both be true. 2321 // we only want to switch up when measured bw is 120% higher than current variant, 2322 // and we only want to switch down when measured bw is below current variant. 2323 bool canSwitchDown = bufferLow 2324 && (bandwidthBps < (int32_t)curBandwidth); 2325 bool canSwitchUp = bufferHigh 2326 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10); 2327 2328 if (canSwitchDown || canSwitchUp) { 2329 // bandwidth estimating has some delay, if we have to downswitch when 2330 // it hasn't stabilized, use the short term to guess real bandwidth, 2331 // since it may be dropping too fast. 2332 // (note this doesn't apply to upswitch, always use longer average there) 2333 if (!isStable && canSwitchDown) { 2334 if (shortTermBps < bandwidthBps) { 2335 bandwidthBps = shortTermBps; 2336 } 2337 } 2338 2339 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps); 2340 2341 // it's possible that we're checking for canSwitchUp case, but the returned 2342 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70% 2343 // of measured bw. In that case we don't want to do anything, since we have 2344 // both enough buffer and enough bw. 2345 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) 2346 || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) { 2347 // if not yet prepared, just restart again with new bw index. 2348 // this is faster and playback experience is cleaner. 2349 changeConfiguration( 2350 mInPreparationPhase ? 0 : -1ll, bandwidthIndex); 2351 return true; 2352 } 2353 } 2354 return false; 2355 } 2356 2357 void LiveSession::postError(status_t err) { 2358 // if we reached EOS, notify buffering of 100% 2359 if (err == ERROR_END_OF_STREAM) { 2360 notifyBufferingUpdate(100); 2361 } 2362 // we'll stop buffer polling now, before that notify 2363 // stop buffering to stop the spinning icon 2364 stopBufferingIfNecessary(); 2365 cancelPollBuffering(); 2366 2367 sp<AMessage> notify = mNotify->dup(); 2368 notify->setInt32("what", kWhatError); 2369 notify->setInt32("err", err); 2370 notify->post(); 2371 } 2372 2373 void LiveSession::postPrepared(status_t err) { 2374 CHECK(mInPreparationPhase); 2375 2376 sp<AMessage> notify = mNotify->dup(); 2377 if (err == OK || err == ERROR_END_OF_STREAM) { 2378 notify->setInt32("what", kWhatPrepared); 2379 } else { 2380 cancelPollBuffering(); 2381 2382 notify->setInt32("what", kWhatPreparationFailed); 2383 notify->setInt32("err", err); 2384 } 2385 2386 notify->post(); 2387 2388 mInPreparationPhase = false; 2389 } 2390 2391 2392 } // namespace android 2393 2394