1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "LiveSession" 19 #include <utils/Log.h> 20 21 #include "LiveSession.h" 22 #include "HTTPDownloader.h" 23 #include "M3UParser.h" 24 #include "PlaylistFetcher.h" 25 26 #include "mpeg2ts/AnotherPacketSource.h" 27 28 #include <cutils/properties.h> 29 #include <media/MediaHTTPService.h> 30 #include <media/stagefright/foundation/ABuffer.h> 31 #include <media/stagefright/foundation/ADebug.h> 32 #include <media/stagefright/foundation/AMessage.h> 33 #include <media/stagefright/foundation/AUtils.h> 34 #include <media/stagefright/MediaDefs.h> 35 #include <media/stagefright/MetaData.h> 36 #include <media/stagefright/Utils.h> 37 38 #include <utils/Mutex.h> 39 40 #include <ctype.h> 41 #include <inttypes.h> 42 43 namespace android { 44 45 // static 46 // Bandwidth Switch Mark Defaults 47 const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll; 48 const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll; 49 const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll; 50 const int64_t LiveSession::kResumeThresholdUs = 100000ll; 51 52 //TODO: redefine this mark to a fair value 53 // default buffer underflow mark 54 static const int kUnderflowMarkMs = 1000; // 1 second 55 56 struct LiveSession::BandwidthEstimator : public RefBase { 57 BandwidthEstimator(); 58 59 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); 60 bool estimateBandwidth( 61 int32_t *bandwidth, 62 bool *isStable = NULL, 63 int32_t *shortTermBps = NULL); 64 65 private: 66 // Bandwidth estimation parameters 67 static const int32_t kShortTermBandwidthItems = 3; 68 static const int32_t kMinBandwidthHistoryItems = 20; 69 static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec 70 static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec 71 static const int64_t kMaxBandwidthHistoryAgeUs = 60000000ll; // 60 sec 72 73 struct BandwidthEntry { 74 int64_t mTimestampUs; 75 int64_t mDelayUs; 76 size_t mNumBytes; 77 }; 78 79 Mutex mLock; 80 List<BandwidthEntry> mBandwidthHistory; 81 List<int32_t> mPrevEstimates; 82 int32_t mShortTermEstimate; 83 bool mHasNewSample; 84 bool mIsStable; 85 int64_t mTotalTransferTimeUs; 86 size_t mTotalTransferBytes; 87 88 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator); 89 }; 90 91 LiveSession::BandwidthEstimator::BandwidthEstimator() : 92 mShortTermEstimate(0), 93 mHasNewSample(false), 94 mIsStable(true), 95 mTotalTransferTimeUs(0), 96 mTotalTransferBytes(0) { 97 } 98 99 void LiveSession::BandwidthEstimator::addBandwidthMeasurement( 100 size_t numBytes, int64_t delayUs) { 101 AutoMutex autoLock(mLock); 102 103 int64_t nowUs = ALooper::GetNowUs(); 104 BandwidthEntry entry; 105 entry.mTimestampUs = nowUs; 106 entry.mDelayUs = delayUs; 107 entry.mNumBytes = numBytes; 108 mTotalTransferTimeUs += delayUs; 109 mTotalTransferBytes += numBytes; 110 mBandwidthHistory.push_back(entry); 111 mHasNewSample = true; 112 113 // Remove no more than 10% of total transfer time at a time 114 // to avoid sudden jump on bandwidth estimation. There might 115 // be long blocking reads that takes up signification time, 116 // we have to keep a longer window in that case. 117 int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10; 118 if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) { 119 bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs; 120 } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) { 121 bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs; 122 } 123 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples, 124 // and total transfer time at least kMaxBandwidthHistoryWindowUs. 125 while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) { 126 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 127 // remove sample if either absolute age or total transfer time is 128 // over kMaxBandwidthHistoryWindowUs 129 if (nowUs - it->mTimestampUs < kMaxBandwidthHistoryAgeUs && 130 mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) { 131 break; 132 } 133 mTotalTransferTimeUs -= it->mDelayUs; 134 mTotalTransferBytes -= it->mNumBytes; 135 mBandwidthHistory.erase(mBandwidthHistory.begin()); 136 } 137 } 138 139 bool LiveSession::BandwidthEstimator::estimateBandwidth( 140 int32_t *bandwidthBps, bool *isStable, int32_t *shortTermBps) { 141 AutoMutex autoLock(mLock); 142 143 if (mBandwidthHistory.size() < 2) { 144 return false; 145 } 146 147 if (!mHasNewSample) { 148 *bandwidthBps = *(--mPrevEstimates.end()); 149 if (isStable) { 150 *isStable = mIsStable; 151 } 152 if (shortTermBps) { 153 *shortTermBps = mShortTermEstimate; 154 } 155 return true; 156 } 157 158 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs); 159 mPrevEstimates.push_back(*bandwidthBps); 160 while (mPrevEstimates.size() > 3) { 161 mPrevEstimates.erase(mPrevEstimates.begin()); 162 } 163 mHasNewSample = false; 164 165 int64_t totalTimeUs = 0; 166 size_t totalBytes = 0; 167 if (mBandwidthHistory.size() >= kShortTermBandwidthItems) { 168 List<BandwidthEntry>::iterator it = --mBandwidthHistory.end(); 169 for (size_t i = 0; i < kShortTermBandwidthItems; i++, it--) { 170 totalTimeUs += it->mDelayUs; 171 totalBytes += it->mNumBytes; 172 } 173 } 174 mShortTermEstimate = totalTimeUs > 0 ? 175 (totalBytes * 8E6 / totalTimeUs) : *bandwidthBps; 176 if (shortTermBps) { 177 *shortTermBps = mShortTermEstimate; 178 } 179 180 int64_t minEstimate = -1, maxEstimate = -1; 181 List<int32_t>::iterator it; 182 for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) { 183 int32_t estimate = *it; 184 if (minEstimate < 0 || minEstimate > estimate) { 185 minEstimate = estimate; 186 } 187 if (maxEstimate < 0 || maxEstimate < estimate) { 188 maxEstimate = estimate; 189 } 190 } 191 // consider it stable if long-term average is not jumping a lot 192 // and short-term average is not much lower than long-term average 193 mIsStable = (maxEstimate <= minEstimate * 4 / 3) 194 && mShortTermEstimate > minEstimate * 7 / 10; 195 if (isStable) { 196 *isStable = mIsStable; 197 } 198 199 #if 0 200 { 201 char dumpStr[1024] = {0}; 202 size_t itemIdx = 0; 203 size_t histSize = mBandwidthHistory.size(); 204 sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {", 205 *bandwidthBps, mIsStable, histSize); 206 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 207 for (; it != mBandwidthHistory.end(); ++it) { 208 if (itemIdx > 50) { 209 sprintf(dumpStr + strlen(dumpStr), 210 "...(%zd more items)... }", histSize - itemIdx); 211 break; 212 } 213 sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s", 214 it->mNumBytes / 1024, 215 (double)it->mDelayUs * 1.0e-6, 216 (it == (--mBandwidthHistory.end())) ? "}" : ", "); 217 itemIdx++; 218 } 219 ALOGE(dumpStr); 220 } 221 #endif 222 return true; 223 } 224 225 //static 226 const char *LiveSession::getKeyForStream(StreamType type) { 227 switch (type) { 228 case STREAMTYPE_VIDEO: 229 return "timeUsVideo"; 230 case STREAMTYPE_AUDIO: 231 return "timeUsAudio"; 232 case STREAMTYPE_SUBTITLES: 233 return "timeUsSubtitle"; 234 case STREAMTYPE_METADATA: 235 return "timeUsMetadata"; // unused 236 default: 237 TRESPASS(); 238 } 239 return NULL; 240 } 241 242 //static 243 const char *LiveSession::getNameForStream(StreamType type) { 244 switch (type) { 245 case STREAMTYPE_VIDEO: 246 return "video"; 247 case STREAMTYPE_AUDIO: 248 return "audio"; 249 case STREAMTYPE_SUBTITLES: 250 return "subs"; 251 case STREAMTYPE_METADATA: 252 return "metadata"; 253 default: 254 break; 255 } 256 return "unknown"; 257 } 258 259 //static 260 ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) { 261 switch (type) { 262 case STREAMTYPE_VIDEO: 263 return ATSParser::VIDEO; 264 case STREAMTYPE_AUDIO: 265 return ATSParser::AUDIO; 266 case STREAMTYPE_METADATA: 267 return ATSParser::META; 268 case STREAMTYPE_SUBTITLES: 269 default: 270 TRESPASS(); 271 } 272 return ATSParser::NUM_SOURCE_TYPES; // should not reach here 273 } 274 275 LiveSession::LiveSession( 276 const sp<AMessage> ¬ify, uint32_t flags, 277 const sp<MediaHTTPService> &httpService) 278 : mNotify(notify), 279 mFlags(flags), 280 mHTTPService(httpService), 281 mBuffering(false), 282 mInPreparationPhase(true), 283 mPollBufferingGeneration(0), 284 mPrevBufferPercentage(-1), 285 mCurBandwidthIndex(-1), 286 mOrigBandwidthIndex(-1), 287 mLastBandwidthBps(-1ll), 288 mLastBandwidthStable(false), 289 mBandwidthEstimator(new BandwidthEstimator()), 290 mMaxWidth(720), 291 mMaxHeight(480), 292 mStreamMask(0), 293 mNewStreamMask(0), 294 mSwapMask(0), 295 mSwitchGeneration(0), 296 mSubtitleGeneration(0), 297 mLastDequeuedTimeUs(0ll), 298 mRealTimeBaseUs(0ll), 299 mReconfigurationInProgress(false), 300 mSwitchInProgress(false), 301 mUpSwitchMark(kUpSwitchMarkUs), 302 mDownSwitchMark(kDownSwitchMarkUs), 303 mUpSwitchMargin(kUpSwitchMarginUs), 304 mFirstTimeUsValid(false), 305 mFirstTimeUs(0), 306 mLastSeekTimeUs(0), 307 mHasMetadata(false) { 308 mStreams[kAudioIndex] = StreamItem("audio"); 309 mStreams[kVideoIndex] = StreamItem("video"); 310 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 311 312 for (size_t i = 0; i < kNumSources; ++i) { 313 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 314 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 315 } 316 } 317 318 LiveSession::~LiveSession() { 319 if (mFetcherLooper != NULL) { 320 mFetcherLooper->stop(); 321 } 322 } 323 324 int64_t LiveSession::calculateMediaTimeUs( 325 int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) { 326 if (timeUs >= firstTimeUs) { 327 timeUs -= firstTimeUs; 328 } else { 329 timeUs = 0; 330 } 331 timeUs += mLastSeekTimeUs; 332 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 333 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 334 } 335 return timeUs; 336 } 337 338 status_t LiveSession::dequeueAccessUnit( 339 StreamType stream, sp<ABuffer> *accessUnit) { 340 status_t finalResult = OK; 341 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 342 343 ssize_t streamIdx = typeToIndex(stream); 344 if (streamIdx < 0) { 345 return BAD_VALUE; 346 } 347 const char *streamStr = getNameForStream(stream); 348 // Do not let client pull data if we don't have data packets yet. 349 // We might only have a format discontinuity queued without data. 350 // When NuPlayerDecoder dequeues the format discontinuity, it will 351 // immediately try to getFormat. If we return NULL, NuPlayerDecoder 352 // thinks it can do seamless change, so will not shutdown decoder. 353 // When the actual format arrives, it can't handle it and get stuck. 354 if (!packetSource->hasDataBufferAvailable(&finalResult)) { 355 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)", 356 streamStr, finalResult); 357 358 if (finalResult == OK) { 359 return -EAGAIN; 360 } else { 361 return finalResult; 362 } 363 } 364 365 // Let the client dequeue as long as we have buffers available 366 // Do not make pause/resume decisions here. 367 368 status_t err = packetSource->dequeueAccessUnit(accessUnit); 369 370 if (err == INFO_DISCONTINUITY) { 371 // adaptive streaming, discontinuities in the playlist 372 int32_t type; 373 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 374 375 sp<AMessage> extra; 376 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 377 extra.clear(); 378 } 379 380 ALOGI("[%s] read discontinuity of type %d, extra = %s", 381 streamStr, 382 type, 383 extra == NULL ? "NULL" : extra->debugString().c_str()); 384 } else if (err == OK) { 385 386 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 387 int64_t timeUs, originalTimeUs; 388 int32_t discontinuitySeq = 0; 389 StreamItem& strm = mStreams[streamIdx]; 390 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 391 originalTimeUs = timeUs; 392 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 393 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { 394 int64_t offsetTimeUs; 395 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 396 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); 397 } else { 398 offsetTimeUs = 0; 399 } 400 401 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0 402 && strm.mLastDequeuedTimeUs >= 0) { 403 int64_t firstTimeUs; 404 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 405 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 406 offsetTimeUs += strm.mLastSampleDurationUs; 407 } else { 408 offsetTimeUs += strm.mLastSampleDurationUs; 409 } 410 411 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); 412 strm.mCurDiscontinuitySeq = discontinuitySeq; 413 } 414 415 int32_t discard = 0; 416 int64_t firstTimeUs; 417 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 418 int64_t durUs; // approximate sample duration 419 if (timeUs > strm.mLastDequeuedTimeUs) { 420 durUs = timeUs - strm.mLastDequeuedTimeUs; 421 } else { 422 durUs = strm.mLastDequeuedTimeUs - timeUs; 423 } 424 strm.mLastSampleDurationUs = durUs; 425 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 426 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 427 firstTimeUs = timeUs; 428 } else { 429 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 430 firstTimeUs = timeUs; 431 } 432 433 strm.mLastDequeuedTimeUs = timeUs; 434 timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq); 435 436 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us", 437 streamStr, (long long)timeUs, (long long)originalTimeUs); 438 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 439 mLastDequeuedTimeUs = timeUs; 440 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 441 } else if (stream == STREAMTYPE_SUBTITLES) { 442 int32_t subtitleGeneration; 443 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 444 && subtitleGeneration != mSubtitleGeneration) { 445 return -EAGAIN; 446 }; 447 (*accessUnit)->meta()->setInt32( 448 "trackIndex", mPlaylist->getSelectedIndex()); 449 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 450 } else if (stream == STREAMTYPE_METADATA) { 451 HLSTime mdTime((*accessUnit)->meta()); 452 if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) { 453 packetSource->requeueAccessUnit((*accessUnit)); 454 return -EAGAIN; 455 } else { 456 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq); 457 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq); 458 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 459 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 460 } 461 } 462 } else { 463 ALOGI("[%s] encountered error %d", streamStr, err); 464 } 465 466 return err; 467 } 468 469 status_t LiveSession::getStreamFormatMeta(StreamType stream, sp<MetaData> *meta) { 470 if (!(mStreamMask & stream)) { 471 return UNKNOWN_ERROR; 472 } 473 474 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 475 476 *meta = packetSource->getFormat(); 477 478 if (*meta == NULL) { 479 return -EWOULDBLOCK; 480 } 481 482 if (stream == STREAMTYPE_AUDIO) { 483 // set AAC input buffer size to 32K bytes (256kbps x 1sec) 484 (*meta)->setInt32(kKeyMaxInputSize, 32 * 1024); 485 } else if (stream == STREAMTYPE_VIDEO) { 486 (*meta)->setInt32(kKeyMaxWidth, mMaxWidth); 487 (*meta)->setInt32(kKeyMaxHeight, mMaxHeight); 488 } 489 490 return OK; 491 } 492 493 sp<HTTPDownloader> LiveSession::getHTTPDownloader() { 494 return new HTTPDownloader(mHTTPService, mExtraHeaders); 495 } 496 497 void LiveSession::setBufferingSettings( 498 const BufferingSettings &buffering) { 499 sp<AMessage> msg = new AMessage(kWhatSetBufferingSettings, this); 500 writeToAMessage(msg, buffering); 501 msg->post(); 502 } 503 504 void LiveSession::connectAsync( 505 const char *url, const KeyedVector<String8, String8> *headers) { 506 sp<AMessage> msg = new AMessage(kWhatConnect, this); 507 msg->setString("url", url); 508 509 if (headers != NULL) { 510 msg->setPointer( 511 "headers", 512 new KeyedVector<String8, String8>(*headers)); 513 } 514 515 msg->post(); 516 } 517 518 status_t LiveSession::disconnect() { 519 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 520 521 sp<AMessage> response; 522 status_t err = msg->postAndAwaitResponse(&response); 523 524 return err; 525 } 526 527 status_t LiveSession::seekTo(int64_t timeUs, MediaPlayerSeekMode mode) { 528 sp<AMessage> msg = new AMessage(kWhatSeek, this); 529 msg->setInt64("timeUs", timeUs); 530 msg->setInt32("mode", mode); 531 532 sp<AMessage> response; 533 status_t err = msg->postAndAwaitResponse(&response); 534 535 return err; 536 } 537 538 bool LiveSession::checkSwitchProgress( 539 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) { 540 AString newUri; 541 CHECK(stopParams->findString("uri", &newUri)); 542 543 *needResumeUntil = false; 544 sp<AMessage> firstNewMeta[kMaxStreams]; 545 for (size_t i = 0; i < kMaxStreams; ++i) { 546 StreamType stream = indexToType(i); 547 if (!(mSwapMask & mNewStreamMask & stream) 548 || (mStreams[i].mNewUri != newUri)) { 549 continue; 550 } 551 if (stream == STREAMTYPE_SUBTITLES) { 552 continue; 553 } 554 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i); 555 556 // First, get latest dequeued meta, which is where the decoder is at. 557 // (when upswitching, we take the meta after a certain delay, so that 558 // the decoder is left with some cushion) 559 sp<AMessage> lastDequeueMeta, lastEnqueueMeta; 560 if (delayUs > 0) { 561 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); 562 if (lastDequeueMeta == NULL) { 563 // this means we don't have enough cushion, try again later 564 ALOGV("[%s] up switching failed due to insufficient buffer", 565 getNameForStream(stream)); 566 return false; 567 } 568 } else { 569 // It's okay for lastDequeueMeta to be NULL here, it means the 570 // decoder hasn't even started dequeueing 571 lastDequeueMeta = source->getLatestDequeuedMeta(); 572 } 573 // Then, trim off packets at beginning of mPacketSources2 that's before 574 // the latest dequeued time. These samples are definitely too late. 575 firstNewMeta[i] = mPacketSources2.editValueAt(i) 576 ->trimBuffersBeforeMeta(lastDequeueMeta); 577 578 // Now firstNewMeta[i] is the first sample after the trim. 579 // If it's NULL, we failed because dequeue already past all samples 580 // in mPacketSource2, we have to try again. 581 if (firstNewMeta[i] == NULL) { 582 HLSTime dequeueTime(lastDequeueMeta); 583 ALOGV("[%s] dequeue time (%d, %lld) past start time", 584 getNameForStream(stream), 585 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs); 586 return false; 587 } 588 589 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher 590 // already fetched, and see if we need to resumeUntil 591 lastEnqueueMeta = source->getLatestEnqueuedMeta(); 592 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity 593 // boundary, no need to resume as the content will look different anyways 594 if (lastEnqueueMeta != NULL) { 595 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]); 596 597 // no need to resume old fetcher if new fetcher started in different 598 // discontinuity sequence, as the content will look different. 599 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq 600 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs); 601 602 // update the stopTime for resumeUntil 603 stopParams->setInt32("discontinuitySeq", startTime.mSeq); 604 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs); 605 } 606 } 607 608 // if we're here, it means dequeue progress hasn't passed some samples in 609 // mPacketSource2, we can trim off the excess in mPacketSource. 610 // (old fetcher might still need to resumeUntil the start time of new fetcher) 611 for (size_t i = 0; i < kMaxStreams; ++i) { 612 StreamType stream = indexToType(i); 613 if (!(mSwapMask & mNewStreamMask & stream) 614 || (newUri != mStreams[i].mNewUri) 615 || stream == STREAMTYPE_SUBTITLES) { 616 continue; 617 } 618 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]); 619 } 620 621 // no resumeUntil if already underflow 622 *needResumeUntil &= !mBuffering; 623 624 return true; 625 } 626 627 void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 628 switch (msg->what()) { 629 case kWhatSetBufferingSettings: 630 { 631 readFromAMessage(msg, &mBufferingSettings); 632 break; 633 } 634 635 case kWhatConnect: 636 { 637 onConnect(msg); 638 break; 639 } 640 641 case kWhatDisconnect: 642 { 643 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 644 645 if (mReconfigurationInProgress) { 646 break; 647 } 648 649 finishDisconnect(); 650 break; 651 } 652 653 case kWhatSeek: 654 { 655 if (mReconfigurationInProgress) { 656 msg->post(50000); 657 break; 658 } 659 660 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 661 mSeekReply = new AMessage; 662 663 onSeek(msg); 664 break; 665 } 666 667 case kWhatFetcherNotify: 668 { 669 int32_t what; 670 CHECK(msg->findInt32("what", &what)); 671 672 switch (what) { 673 case PlaylistFetcher::kWhatStarted: 674 break; 675 case PlaylistFetcher::kWhatPaused: 676 case PlaylistFetcher::kWhatStopped: 677 { 678 AString uri; 679 CHECK(msg->findString("uri", &uri)); 680 ssize_t index = mFetcherInfos.indexOfKey(uri); 681 if (index < 0) { 682 // ignore msgs from fetchers that's already gone 683 break; 684 } 685 686 ALOGV("fetcher-%d %s", 687 mFetcherInfos[index].mFetcher->getFetcherID(), 688 what == PlaylistFetcher::kWhatPaused ? 689 "paused" : "stopped"); 690 691 if (what == PlaylistFetcher::kWhatStopped) { 692 mFetcherLooper->unregisterHandler( 693 mFetcherInfos[index].mFetcher->id()); 694 mFetcherInfos.removeItemsAt(index); 695 } else if (what == PlaylistFetcher::kWhatPaused) { 696 int32_t seekMode; 697 CHECK(msg->findInt32("seekMode", &seekMode)); 698 for (size_t i = 0; i < kMaxStreams; ++i) { 699 if (mStreams[i].mUri == uri) { 700 mStreams[i].mSeekMode = (SeekMode) seekMode; 701 } 702 } 703 } 704 705 if (mContinuation != NULL) { 706 CHECK_GT(mContinuationCounter, 0u); 707 if (--mContinuationCounter == 0) { 708 mContinuation->post(); 709 } 710 ALOGV("%zu fetcher(s) left", mContinuationCounter); 711 } 712 break; 713 } 714 715 case PlaylistFetcher::kWhatDurationUpdate: 716 { 717 AString uri; 718 CHECK(msg->findString("uri", &uri)); 719 720 int64_t durationUs; 721 CHECK(msg->findInt64("durationUs", &durationUs)); 722 723 ssize_t index = mFetcherInfos.indexOfKey(uri); 724 if (index >= 0) { 725 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 726 info->mDurationUs = durationUs; 727 } 728 break; 729 } 730 731 case PlaylistFetcher::kWhatTargetDurationUpdate: 732 { 733 int64_t targetDurationUs; 734 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); 735 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4); 736 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4); 737 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs); 738 break; 739 } 740 741 case PlaylistFetcher::kWhatError: 742 { 743 status_t err; 744 CHECK(msg->findInt32("err", &err)); 745 746 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 747 748 // handle EOS on subtitle tracks independently 749 AString uri; 750 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 751 ssize_t i = mFetcherInfos.indexOfKey(uri); 752 if (i >= 0) { 753 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 754 if (fetcher != NULL) { 755 uint32_t type = fetcher->getStreamTypeMask(); 756 if (type == STREAMTYPE_SUBTITLES) { 757 mPacketSources.valueFor( 758 STREAMTYPE_SUBTITLES)->signalEOS(err);; 759 break; 760 } 761 } 762 } 763 } 764 765 // remember the failure index (as mCurBandwidthIndex will be restored 766 // after cancelBandwidthSwitch()), and record last fail time 767 size_t failureIndex = mCurBandwidthIndex; 768 mBandwidthItems.editItemAt( 769 failureIndex).mLastFailureUs = ALooper::GetNowUs(); 770 771 if (mSwitchInProgress) { 772 // if error happened when we switch to a variant, try fallback 773 // to other variant to save the session 774 if (tryBandwidthFallback()) { 775 break; 776 } 777 } 778 779 if (mInPreparationPhase) { 780 postPrepared(err); 781 } 782 783 cancelBandwidthSwitch(); 784 785 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 786 787 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 788 789 mPacketSources.valueFor( 790 STREAMTYPE_SUBTITLES)->signalEOS(err); 791 792 postError(err); 793 break; 794 } 795 796 case PlaylistFetcher::kWhatStopReached: 797 { 798 ALOGV("kWhatStopReached"); 799 800 AString oldUri; 801 CHECK(msg->findString("uri", &oldUri)); 802 803 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 804 if (index < 0) { 805 break; 806 } 807 808 tryToFinishBandwidthSwitch(oldUri); 809 break; 810 } 811 812 case PlaylistFetcher::kWhatStartedAt: 813 { 814 int32_t switchGeneration; 815 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 816 817 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d", 818 switchGeneration, mSwitchGeneration); 819 820 if (switchGeneration != mSwitchGeneration) { 821 break; 822 } 823 824 AString uri; 825 CHECK(msg->findString("uri", &uri)); 826 827 // mark new fetcher mToBeResumed 828 ssize_t index = mFetcherInfos.indexOfKey(uri); 829 if (index >= 0) { 830 mFetcherInfos.editValueAt(index).mToBeResumed = true; 831 } 832 833 // temporarily disable packet sources to be swapped to prevent 834 // NuPlayerDecoder from dequeuing while we check progress 835 for (size_t i = 0; i < mPacketSources.size(); ++i) { 836 if ((mSwapMask & mPacketSources.keyAt(i)) 837 && uri == mStreams[i].mNewUri) { 838 mPacketSources.editValueAt(i)->enable(false); 839 } 840 } 841 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); 842 // If switching up, require a cushion bigger than kUnderflowMark 843 // to avoid buffering immediately after the switch. 844 // (If we don't have that cushion we'd rather cancel and try again.) 845 int64_t delayUs = 846 switchUp ? 847 (kUnderflowMarkMs * 1000ll + 1000000ll) 848 : 0; 849 bool needResumeUntil = false; 850 sp<AMessage> stopParams = msg; 851 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { 852 // playback time hasn't passed startAt time 853 if (!needResumeUntil) { 854 ALOGV("finish switch"); 855 for (size_t i = 0; i < kMaxStreams; ++i) { 856 if ((mSwapMask & indexToType(i)) 857 && uri == mStreams[i].mNewUri) { 858 // have to make a copy of mStreams[i].mUri because 859 // tryToFinishBandwidthSwitch is modifying mStreams[] 860 AString oldURI = mStreams[i].mUri; 861 tryToFinishBandwidthSwitch(oldURI); 862 break; 863 } 864 } 865 } else { 866 // startAt time is after last enqueue time 867 // Resume fetcher for the original variant; the resumed fetcher should 868 // continue until the timestamps found in msg, which is stored by the 869 // new fetcher to indicate where the new variant has started buffering. 870 ALOGV("finish switch with resumeUntilAsync"); 871 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 872 const FetcherInfo &info = mFetcherInfos.valueAt(i); 873 if (info.mToBeRemoved) { 874 info.mFetcher->resumeUntilAsync(stopParams); 875 } 876 } 877 } 878 } else { 879 // playback time passed startAt time 880 if (switchUp) { 881 // if switching up, cancel and retry if condition satisfies again 882 ALOGV("cancel up switch because we're too late"); 883 cancelBandwidthSwitch(true /* resume */); 884 } else { 885 ALOGV("retry down switch at next sample"); 886 resumeFetcher(uri, mSwapMask, -1, true /* newUri */); 887 } 888 } 889 // re-enable all packet sources 890 for (size_t i = 0; i < mPacketSources.size(); ++i) { 891 mPacketSources.editValueAt(i)->enable(true); 892 } 893 894 break; 895 } 896 897 case PlaylistFetcher::kWhatPlaylistFetched: 898 { 899 onMasterPlaylistFetched(msg); 900 break; 901 } 902 903 case PlaylistFetcher::kWhatMetadataDetected: 904 { 905 if (!mHasMetadata) { 906 mHasMetadata = true; 907 sp<AMessage> notify = mNotify->dup(); 908 notify->setInt32("what", kWhatMetadataDetected); 909 notify->post(); 910 } 911 break; 912 } 913 914 default: 915 TRESPASS(); 916 } 917 918 break; 919 } 920 921 case kWhatChangeConfiguration: 922 { 923 onChangeConfiguration(msg); 924 break; 925 } 926 927 case kWhatChangeConfiguration2: 928 { 929 onChangeConfiguration2(msg); 930 break; 931 } 932 933 case kWhatChangeConfiguration3: 934 { 935 onChangeConfiguration3(msg); 936 break; 937 } 938 939 case kWhatPollBuffering: 940 { 941 int32_t generation; 942 CHECK(msg->findInt32("generation", &generation)); 943 if (generation == mPollBufferingGeneration) { 944 onPollBuffering(); 945 } 946 break; 947 } 948 949 default: 950 TRESPASS(); 951 break; 952 } 953 } 954 955 // static 956 bool LiveSession::isBandwidthValid(const BandwidthItem &item) { 957 static const int64_t kBlacklistWindowUs = 300 * 1000000ll; 958 return item.mLastFailureUs < 0 959 || ALooper::GetNowUs() - item.mLastFailureUs > kBlacklistWindowUs; 960 } 961 962 // static 963 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 964 if (a->mBandwidth < b->mBandwidth) { 965 return -1; 966 } else if (a->mBandwidth == b->mBandwidth) { 967 return 0; 968 } 969 970 return 1; 971 } 972 973 // static 974 LiveSession::StreamType LiveSession::indexToType(int idx) { 975 CHECK(idx >= 0 && idx < kNumSources); 976 return (StreamType)(1 << idx); 977 } 978 979 // static 980 ssize_t LiveSession::typeToIndex(int32_t type) { 981 switch (type) { 982 case STREAMTYPE_AUDIO: 983 return 0; 984 case STREAMTYPE_VIDEO: 985 return 1; 986 case STREAMTYPE_SUBTITLES: 987 return 2; 988 case STREAMTYPE_METADATA: 989 return 3; 990 default: 991 return -1; 992 }; 993 return -1; 994 } 995 996 void LiveSession::onConnect(const sp<AMessage> &msg) { 997 CHECK(msg->findString("url", &mMasterURL)); 998 999 // TODO currently we don't know if we are coming here from incognito mode 1000 ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str()); 1001 1002 KeyedVector<String8, String8> *headers = NULL; 1003 if (!msg->findPointer("headers", (void **)&headers)) { 1004 mExtraHeaders.clear(); 1005 } else { 1006 mExtraHeaders = *headers; 1007 1008 delete headers; 1009 headers = NULL; 1010 } 1011 1012 // create looper for fetchers 1013 if (mFetcherLooper == NULL) { 1014 mFetcherLooper = new ALooper(); 1015 1016 mFetcherLooper->setName("Fetcher"); 1017 mFetcherLooper->start(false, /* runOnCallingThread */ 1018 true /* canCallJava */); 1019 } 1020 1021 // create fetcher to fetch the master playlist 1022 addFetcher(mMasterURL.c_str())->fetchPlaylistAsync(); 1023 } 1024 1025 void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) { 1026 AString uri; 1027 CHECK(msg->findString("uri", &uri)); 1028 ssize_t index = mFetcherInfos.indexOfKey(uri); 1029 if (index < 0) { 1030 ALOGW("fetcher for master playlist is gone."); 1031 return; 1032 } 1033 1034 // no longer useful, remove 1035 mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id()); 1036 mFetcherInfos.removeItemsAt(index); 1037 1038 CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist)); 1039 if (mPlaylist == NULL) { 1040 ALOGE("unable to fetch master playlist %s.", 1041 uriDebugString(mMasterURL).c_str()); 1042 1043 postPrepared(ERROR_IO); 1044 return; 1045 } 1046 // We trust the content provider to make a reasonable choice of preferred 1047 // initial bandwidth by listing it first in the variant playlist. 1048 // At startup we really don't have a good estimate on the available 1049 // network bandwidth since we haven't tranferred any data yet. Once 1050 // we have we can make a better informed choice. 1051 size_t initialBandwidth = 0; 1052 size_t initialBandwidthIndex = 0; 1053 1054 int32_t maxWidth = 0; 1055 int32_t maxHeight = 0; 1056 1057 if (mPlaylist->isVariantPlaylist()) { 1058 Vector<BandwidthItem> itemsWithVideo; 1059 for (size_t i = 0; i < mPlaylist->size(); ++i) { 1060 BandwidthItem item; 1061 1062 item.mPlaylistIndex = i; 1063 item.mLastFailureUs = -1ll; 1064 1065 sp<AMessage> meta; 1066 AString uri; 1067 mPlaylist->itemAt(i, &uri, &meta); 1068 1069 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 1070 1071 int32_t width, height; 1072 if (meta->findInt32("width", &width)) { 1073 maxWidth = max(maxWidth, width); 1074 } 1075 if (meta->findInt32("height", &height)) { 1076 maxHeight = max(maxHeight, height); 1077 } 1078 1079 mBandwidthItems.push(item); 1080 if (mPlaylist->hasType(i, "video")) { 1081 itemsWithVideo.push(item); 1082 } 1083 } 1084 // remove the audio-only variants if we have at least one with video 1085 if (!itemsWithVideo.empty() 1086 && itemsWithVideo.size() < mBandwidthItems.size()) { 1087 mBandwidthItems.clear(); 1088 for (size_t i = 0; i < itemsWithVideo.size(); ++i) { 1089 mBandwidthItems.push(itemsWithVideo[i]); 1090 } 1091 } 1092 1093 CHECK_GT(mBandwidthItems.size(), 0u); 1094 initialBandwidth = mBandwidthItems[0].mBandwidth; 1095 1096 mBandwidthItems.sort(SortByBandwidth); 1097 1098 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 1099 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 1100 initialBandwidthIndex = i; 1101 break; 1102 } 1103 } 1104 } else { 1105 // dummy item. 1106 BandwidthItem item; 1107 item.mPlaylistIndex = 0; 1108 item.mBandwidth = 0; 1109 mBandwidthItems.push(item); 1110 } 1111 1112 mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth; 1113 mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight; 1114 1115 mPlaylist->pickRandomMediaItems(); 1116 changeConfiguration( 1117 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 1118 } 1119 1120 void LiveSession::finishDisconnect() { 1121 ALOGV("finishDisconnect"); 1122 1123 // No reconfiguration is currently pending, make sure none will trigger 1124 // during disconnection either. 1125 cancelBandwidthSwitch(); 1126 1127 // cancel buffer polling 1128 cancelPollBuffering(); 1129 1130 // TRICKY: don't wait for all fetcher to be stopped when disconnecting 1131 // 1132 // Some fetchers might be stuck in connect/getSize at this point. These 1133 // operations will eventually timeout (as we have a timeout set in 1134 // MediaHTTPConnection), but we don't want to block the main UI thread 1135 // until then. Here we just need to make sure we clear all references 1136 // to the fetchers, so that when they finally exit from the blocking 1137 // operation, they can be destructed. 1138 // 1139 // There is one very tricky point though. For this scheme to work, the 1140 // fecther must hold a reference to LiveSession, so that LiveSession is 1141 // destroyed after fetcher. Otherwise LiveSession would get stuck in its 1142 // own destructor when it waits for mFetcherLooper to stop, which still 1143 // blocks main UI thread. 1144 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1145 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1146 mFetcherLooper->unregisterHandler( 1147 mFetcherInfos.valueAt(i).mFetcher->id()); 1148 } 1149 mFetcherInfos.clear(); 1150 1151 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 1152 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 1153 1154 mPacketSources.valueFor( 1155 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 1156 1157 sp<AMessage> response = new AMessage; 1158 response->setInt32("err", OK); 1159 1160 response->postReply(mDisconnectReplyID); 1161 mDisconnectReplyID.clear(); 1162 } 1163 1164 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 1165 ssize_t index = mFetcherInfos.indexOfKey(uri); 1166 1167 if (index >= 0) { 1168 return NULL; 1169 } 1170 1171 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); 1172 notify->setString("uri", uri); 1173 notify->setInt32("switchGeneration", mSwitchGeneration); 1174 1175 FetcherInfo info; 1176 info.mFetcher = new PlaylistFetcher( 1177 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration); 1178 info.mDurationUs = -1ll; 1179 info.mToBeRemoved = false; 1180 info.mToBeResumed = false; 1181 mFetcherLooper->registerHandler(info.mFetcher); 1182 1183 mFetcherInfos.add(uri, info); 1184 1185 return info.mFetcher; 1186 } 1187 1188 #if 0 1189 static double uniformRand() { 1190 return (double)rand() / RAND_MAX; 1191 } 1192 #endif 1193 1194 bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) { 1195 ALOGV("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i, 1196 newUri ? "true" : "false", 1197 newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str()); 1198 return i >= 0 1199 && ((!newUri && uri == mStreams[i].mUri) 1200 || (newUri && uri == mStreams[i].mNewUri)); 1201 } 1202 1203 sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex( 1204 size_t trackIndex, bool newUri) { 1205 StreamType type = indexToType(trackIndex); 1206 sp<AnotherPacketSource> source = NULL; 1207 if (newUri) { 1208 source = mPacketSources2.valueFor(type); 1209 source->clear(); 1210 } else { 1211 source = mPacketSources.valueFor(type); 1212 }; 1213 return source; 1214 } 1215 1216 sp<AnotherPacketSource> LiveSession::getMetadataSource( 1217 sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) { 1218 // todo: One case where the following strategy can fail is when audio and video 1219 // are in separate playlists, both are transport streams, and the metadata 1220 // is actually contained in the audio stream. 1221 ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s", 1222 streamMask, newUri ? "true" : "false"); 1223 1224 if ((sources[kVideoIndex] != NULL) // video fetcher; or ... 1225 || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) { 1226 // ... audio fetcher for audio only variant 1227 return getPacketSourceForStreamIndex(kMetaDataIndex, newUri); 1228 } 1229 1230 return NULL; 1231 } 1232 1233 bool LiveSession::resumeFetcher( 1234 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { 1235 ssize_t index = mFetcherInfos.indexOfKey(uri); 1236 if (index < 0) { 1237 ALOGE("did not find fetcher for uri: %s", uri.c_str()); 1238 return false; 1239 } 1240 1241 bool resume = false; 1242 sp<AnotherPacketSource> sources[kNumSources]; 1243 for (size_t i = 0; i < kMaxStreams; ++i) { 1244 if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) { 1245 resume = true; 1246 sources[i] = getPacketSourceForStreamIndex(i, newUri); 1247 } 1248 } 1249 1250 if (resume) { 1251 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher; 1252 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; 1253 1254 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d", 1255 fetcher->getFetcherID(), (long long)timeUs, seekMode); 1256 1257 fetcher->startAsync( 1258 sources[kAudioIndex], 1259 sources[kVideoIndex], 1260 sources[kSubtitleIndex], 1261 getMetadataSource(sources, streamMask, newUri), 1262 timeUs, -1, -1, seekMode); 1263 } 1264 1265 return resume; 1266 } 1267 1268 float LiveSession::getAbortThreshold( 1269 ssize_t currentBWIndex, ssize_t targetBWIndex) const { 1270 float abortThreshold = -1.0f; 1271 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { 1272 /* 1273 If we're switching down, we need to decide whether to 1274 1275 1) finish last segment of high-bandwidth variant, or 1276 2) abort last segment of high-bandwidth variant, and fetch an 1277 overlapping portion from low-bandwidth variant. 1278 1279 Here we try to maximize the amount of buffer left when the 1280 switch point is met. Given the following parameters: 1281 1282 B: our current buffering level in seconds 1283 T: target duration in seconds 1284 X: sample duration in seconds remain to fetch in last segment 1285 bw0: bandwidth of old variant (as specified in playlist) 1286 bw1: bandwidth of new variant (as specified in playlist) 1287 bw: measured bandwidth available 1288 1289 If we choose 1), when switch happens at the end of current 1290 segment, our buffering will be 1291 B + X - X * bw0 / bw 1292 1293 If we choose 2), when switch happens where we aborted current 1294 segment, our buffering will be 1295 B - (T - X) * bw1 / bw 1296 1297 We should only choose 1) if 1298 X/T < bw1 / (bw1 + bw0 - bw) 1299 */ 1300 1301 // abort old bandwidth immediately if bandwidth is fluctuating a lot. 1302 // our estimate could be far off, and fetching old bandwidth could 1303 // take too long. 1304 if (!mLastBandwidthStable) { 1305 return 0.0f; 1306 } 1307 1308 // Taking the measured current bandwidth at 50% face value only, 1309 // as our bandwidth estimation is a lagging indicator. Being 1310 // conservative on this, we prefer switching to lower bandwidth 1311 // unless we're really confident finishing up the last segment 1312 // of higher bandwidth will be fast. 1313 CHECK(mLastBandwidthBps >= 0); 1314 abortThreshold = 1315 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1316 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1317 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth 1318 - (float)mLastBandwidthBps * 0.5f); 1319 if (abortThreshold < 0.0f) { 1320 abortThreshold = -1.0f; // do not abort 1321 } 1322 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", 1323 mBandwidthItems.itemAt(currentBWIndex).mBandwidth, 1324 mBandwidthItems.itemAt(targetBWIndex).mBandwidth, 1325 mLastBandwidthBps, 1326 abortThreshold); 1327 } 1328 return abortThreshold; 1329 } 1330 1331 void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { 1332 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); 1333 } 1334 1335 ssize_t LiveSession::getLowestValidBandwidthIndex() const { 1336 for (size_t index = 0; index < mBandwidthItems.size(); index++) { 1337 if (isBandwidthValid(mBandwidthItems[index])) { 1338 return index; 1339 } 1340 } 1341 // if playlists are all blacklisted, return 0 and hope it's alive 1342 return 0; 1343 } 1344 1345 size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) { 1346 if (mBandwidthItems.size() < 2) { 1347 // shouldn't be here if we only have 1 bandwidth, check 1348 // logic to get rid of redundant bandwidth polling 1349 ALOGW("getBandwidthIndex() called for single bandwidth playlist!"); 1350 return 0; 1351 } 1352 1353 #if 1 1354 char value[PROPERTY_VALUE_MAX]; 1355 ssize_t index = -1; 1356 if (property_get("media.httplive.bw-index", value, NULL)) { 1357 char *end; 1358 index = strtol(value, &end, 10); 1359 CHECK(end > value && *end == '\0'); 1360 1361 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1362 index = mBandwidthItems.size() - 1; 1363 } 1364 } 1365 1366 if (index < 0) { 1367 char value[PROPERTY_VALUE_MAX]; 1368 if (property_get("media.httplive.max-bw", value, NULL)) { 1369 char *end; 1370 long maxBw = strtoul(value, &end, 10); 1371 if (end > value && *end == '\0') { 1372 if (maxBw > 0 && bandwidthBps > maxBw) { 1373 ALOGV("bandwidth capped to %ld bps", maxBw); 1374 bandwidthBps = maxBw; 1375 } 1376 } 1377 } 1378 1379 // Pick the highest bandwidth stream that's not currently blacklisted 1380 // below or equal to estimated bandwidth. 1381 1382 index = mBandwidthItems.size() - 1; 1383 ssize_t lowestBandwidth = getLowestValidBandwidthIndex(); 1384 while (index > lowestBandwidth) { 1385 // be conservative (70%) to avoid overestimating and immediately 1386 // switching down again. 1387 size_t adjustedBandwidthBps = bandwidthBps * 7 / 10; 1388 const BandwidthItem &item = mBandwidthItems[index]; 1389 if (item.mBandwidth <= adjustedBandwidthBps 1390 && isBandwidthValid(item)) { 1391 break; 1392 } 1393 --index; 1394 } 1395 } 1396 #elif 0 1397 // Change bandwidth at random() 1398 size_t index = uniformRand() * mBandwidthItems.size(); 1399 #elif 0 1400 // There's a 50% chance to stay on the current bandwidth and 1401 // a 50% chance to switch to the next higher bandwidth (wrapping around 1402 // to lowest) 1403 const size_t kMinIndex = 0; 1404 1405 static ssize_t mCurBandwidthIndex = -1; 1406 1407 size_t index; 1408 if (mCurBandwidthIndex < 0) { 1409 index = kMinIndex; 1410 } else if (uniformRand() < 0.5) { 1411 index = (size_t)mCurBandwidthIndex; 1412 } else { 1413 index = mCurBandwidthIndex + 1; 1414 if (index == mBandwidthItems.size()) { 1415 index = kMinIndex; 1416 } 1417 } 1418 mCurBandwidthIndex = index; 1419 #elif 0 1420 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1421 1422 size_t index = mBandwidthItems.size() - 1; 1423 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1424 --index; 1425 } 1426 #elif 1 1427 char value[PROPERTY_VALUE_MAX]; 1428 size_t index; 1429 if (property_get("media.httplive.bw-index", value, NULL)) { 1430 char *end; 1431 index = strtoul(value, &end, 10); 1432 CHECK(end > value && *end == '\0'); 1433 1434 if (index >= mBandwidthItems.size()) { 1435 index = mBandwidthItems.size() - 1; 1436 } 1437 } else { 1438 index = 0; 1439 } 1440 #else 1441 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1442 #endif 1443 1444 CHECK_GE(index, 0); 1445 1446 return index; 1447 } 1448 1449 HLSTime LiveSession::latestMediaSegmentStartTime() const { 1450 HLSTime audioTime(mPacketSources.valueFor( 1451 STREAMTYPE_AUDIO)->getLatestDequeuedMeta()); 1452 1453 HLSTime videoTime(mPacketSources.valueFor( 1454 STREAMTYPE_VIDEO)->getLatestDequeuedMeta()); 1455 1456 return audioTime < videoTime ? videoTime : audioTime; 1457 } 1458 1459 void LiveSession::onSeek(const sp<AMessage> &msg) { 1460 int64_t timeUs; 1461 int32_t mode; 1462 CHECK(msg->findInt64("timeUs", &timeUs)); 1463 CHECK(msg->findInt32("mode", &mode)); 1464 // TODO: add "mode" to changeConfiguration. 1465 changeConfiguration(timeUs/* , (MediaPlayerSeekMode)mode */); 1466 } 1467 1468 status_t LiveSession::getDuration(int64_t *durationUs) const { 1469 int64_t maxDurationUs = -1ll; 1470 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1471 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1472 1473 if (fetcherDurationUs > maxDurationUs) { 1474 maxDurationUs = fetcherDurationUs; 1475 } 1476 } 1477 1478 *durationUs = maxDurationUs; 1479 1480 return OK; 1481 } 1482 1483 bool LiveSession::isSeekable() const { 1484 int64_t durationUs; 1485 return getDuration(&durationUs) == OK && durationUs >= 0; 1486 } 1487 1488 bool LiveSession::hasDynamicDuration() const { 1489 return false; 1490 } 1491 1492 size_t LiveSession::getTrackCount() const { 1493 if (mPlaylist == NULL) { 1494 return 0; 1495 } else { 1496 return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0); 1497 } 1498 } 1499 1500 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1501 if (mPlaylist == NULL) { 1502 return NULL; 1503 } else { 1504 if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) { 1505 sp<AMessage> format = new AMessage(); 1506 format->setInt32("type", MEDIA_TRACK_TYPE_METADATA); 1507 format->setString("language", "und"); 1508 format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3); 1509 return format; 1510 } 1511 return mPlaylist->getTrackInfo(trackIndex); 1512 } 1513 } 1514 1515 status_t LiveSession::selectTrack(size_t index, bool select) { 1516 if (mPlaylist == NULL) { 1517 return INVALID_OPERATION; 1518 } 1519 1520 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++", 1521 index, select, mSubtitleGeneration); 1522 1523 ++mSubtitleGeneration; 1524 status_t err = mPlaylist->selectTrack(index, select); 1525 if (err == OK) { 1526 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); 1527 msg->setInt32("pickTrack", select); 1528 msg->post(); 1529 } 1530 return err; 1531 } 1532 1533 ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1534 if (mPlaylist == NULL) { 1535 return -1; 1536 } else { 1537 return mPlaylist->getSelectedTrack(type); 1538 } 1539 } 1540 1541 void LiveSession::changeConfiguration( 1542 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { 1543 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d", 1544 (long long)timeUs, bandwidthIndex, pickTrack); 1545 1546 cancelBandwidthSwitch(); 1547 1548 CHECK(!mReconfigurationInProgress); 1549 mReconfigurationInProgress = true; 1550 if (bandwidthIndex >= 0) { 1551 mOrigBandwidthIndex = mCurBandwidthIndex; 1552 mCurBandwidthIndex = bandwidthIndex; 1553 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1554 ALOGI("#### Starting Bandwidth Switch: %zd => %zd", 1555 mOrigBandwidthIndex, mCurBandwidthIndex); 1556 } 1557 } 1558 CHECK_LT((size_t)mCurBandwidthIndex, mBandwidthItems.size()); 1559 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); 1560 1561 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1562 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1563 1564 AString URIs[kMaxStreams]; 1565 for (size_t i = 0; i < kMaxStreams; ++i) { 1566 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1567 streamMask |= indexToType(i); 1568 } 1569 } 1570 1571 // Step 1, stop and discard fetchers that are no longer needed. 1572 // Pause those that we'll reuse. 1573 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1574 // skip fetchers that are marked mToBeRemoved, 1575 // these are done and can't be reused 1576 if (mFetcherInfos[i].mToBeRemoved) { 1577 continue; 1578 } 1579 1580 const AString &uri = mFetcherInfos.keyAt(i); 1581 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher; 1582 1583 bool discardFetcher = true, delayRemoval = false; 1584 for (size_t j = 0; j < kMaxStreams; ++j) { 1585 StreamType type = indexToType(j); 1586 if ((streamMask & type) && uri == URIs[j]) { 1587 resumeMask |= type; 1588 streamMask &= ~type; 1589 discardFetcher = false; 1590 } 1591 } 1592 // Delay fetcher removal if not picking tracks, AND old fetcher 1593 // has stream mask that overlaps new variant. (Okay to discard 1594 // old fetcher now, if completely no overlap.) 1595 if (discardFetcher && timeUs < 0ll && !pickTrack 1596 && (fetcher->getStreamTypeMask() & streamMask)) { 1597 discardFetcher = false; 1598 delayRemoval = true; 1599 } 1600 1601 if (discardFetcher) { 1602 ALOGV("discarding fetcher-%d", fetcher->getFetcherID()); 1603 fetcher->stopAsync(); 1604 } else { 1605 float threshold = 0.0f; // default to pause after current block (47Kbytes) 1606 bool disconnect = false; 1607 if (timeUs >= 0ll) { 1608 // seeking, no need to finish fetching 1609 disconnect = true; 1610 } else if (delayRemoval) { 1611 // adapting, abort if remaining of current segment is over threshold 1612 threshold = getAbortThreshold( 1613 mOrigBandwidthIndex, mCurBandwidthIndex); 1614 } 1615 1616 ALOGV("pausing fetcher-%d, threshold=%.2f", 1617 fetcher->getFetcherID(), threshold); 1618 fetcher->pauseAsync(threshold, disconnect); 1619 } 1620 } 1621 1622 sp<AMessage> msg; 1623 if (timeUs < 0ll) { 1624 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1625 msg = new AMessage(kWhatChangeConfiguration3, this); 1626 } else { 1627 msg = new AMessage(kWhatChangeConfiguration2, this); 1628 } 1629 msg->setInt32("streamMask", streamMask); 1630 msg->setInt32("resumeMask", resumeMask); 1631 msg->setInt32("pickTrack", pickTrack); 1632 msg->setInt64("timeUs", timeUs); 1633 for (size_t i = 0; i < kMaxStreams; ++i) { 1634 if ((streamMask | resumeMask) & indexToType(i)) { 1635 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1636 } 1637 } 1638 1639 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1640 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1641 // fetchers have completed their asynchronous operation, we'll post 1642 // mContinuation, which then is handled below in onChangeConfiguration2. 1643 mContinuationCounter = mFetcherInfos.size(); 1644 mContinuation = msg; 1645 1646 if (mContinuationCounter == 0) { 1647 msg->post(); 1648 } 1649 } 1650 1651 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1652 ALOGV("onChangeConfiguration"); 1653 1654 if (!mReconfigurationInProgress) { 1655 int32_t pickTrack = 0; 1656 msg->findInt32("pickTrack", &pickTrack); 1657 changeConfiguration(-1ll /* timeUs */, -1, pickTrack); 1658 } else { 1659 msg->post(1000000ll); // retry in 1 sec 1660 } 1661 } 1662 1663 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1664 ALOGV("onChangeConfiguration2"); 1665 1666 mContinuation.clear(); 1667 1668 // All fetchers are either suspended or have been removed now. 1669 1670 // If we're seeking, clear all packet sources before we report 1671 // seek complete, to prevent decoder from pulling stale data. 1672 int64_t timeUs; 1673 CHECK(msg->findInt64("timeUs", &timeUs)); 1674 1675 if (timeUs >= 0) { 1676 mLastSeekTimeUs = timeUs; 1677 mLastDequeuedTimeUs = timeUs; 1678 1679 for (size_t i = 0; i < mPacketSources.size(); i++) { 1680 sp<AnotherPacketSource> packetSource = mPacketSources.editValueAt(i); 1681 sp<MetaData> format = packetSource->getFormat(); 1682 packetSource->clear(); 1683 // Set a tentative format here such that HTTPLiveSource will always have 1684 // a format available when NuPlayer queries. Without an available video 1685 // format when setting a surface NuPlayer might disable video decoding 1686 // altogether. The tentative format will be overwritten by the 1687 // authoritative (and possibly same) format once content from the new 1688 // position is dequeued. 1689 packetSource->setFormat(format); 1690 } 1691 1692 for (size_t i = 0; i < kMaxStreams; ++i) { 1693 mStreams[i].reset(); 1694 } 1695 1696 mDiscontinuityOffsetTimesUs.clear(); 1697 mDiscontinuityAbsStartTimesUs.clear(); 1698 1699 if (mSeekReplyID != NULL) { 1700 CHECK(mSeekReply != NULL); 1701 mSeekReply->setInt32("err", OK); 1702 mSeekReply->postReply(mSeekReplyID); 1703 mSeekReplyID.clear(); 1704 mSeekReply.clear(); 1705 } 1706 1707 // restart buffer polling after seek becauese previous 1708 // buffering position is no longer valid. 1709 restartPollBuffering(); 1710 } 1711 1712 uint32_t streamMask, resumeMask; 1713 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1714 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1715 1716 streamMask |= resumeMask; 1717 1718 AString URIs[kMaxStreams]; 1719 for (size_t i = 0; i < kMaxStreams; ++i) { 1720 if (streamMask & indexToType(i)) { 1721 const AString &uriKey = mStreams[i].uriKey(); 1722 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1723 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1724 } 1725 } 1726 1727 uint32_t changedMask = 0; 1728 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1729 // stream URI could change even if onChangeConfiguration2 is only 1730 // used for seek. Seek could happen during a bw switch, in this 1731 // case bw switch will be cancelled, but the seekTo position will 1732 // fetch from the new URI. 1733 if ((mStreamMask & streamMask & indexToType(i)) 1734 && !mStreams[i].mUri.empty() 1735 && !(URIs[i] == mStreams[i].mUri)) { 1736 ALOGV("stream %zu changed: oldURI %s, newURI %s", i, 1737 mStreams[i].mUri.c_str(), URIs[i].c_str()); 1738 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); 1739 if (source->getLatestDequeuedMeta() != NULL) { 1740 source->queueDiscontinuity( 1741 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1742 } 1743 } 1744 // Determine which decoders to shutdown on the player side, 1745 // a decoder has to be shutdown if its streamtype was active 1746 // before but now longer isn't. 1747 if ((mStreamMask & ~streamMask & indexToType(i))) { 1748 changedMask |= indexToType(i); 1749 } 1750 } 1751 1752 if (changedMask == 0) { 1753 // If nothing changed as far as the audio/video decoders 1754 // are concerned we can proceed. 1755 onChangeConfiguration3(msg); 1756 return; 1757 } 1758 1759 // Something changed, inform the player which will shutdown the 1760 // corresponding decoders and will post the reply once that's done. 1761 // Handling the reply will continue executing below in 1762 // onChangeConfiguration3. 1763 sp<AMessage> notify = mNotify->dup(); 1764 notify->setInt32("what", kWhatStreamsChanged); 1765 notify->setInt32("changedMask", changedMask); 1766 1767 msg->setWhat(kWhatChangeConfiguration3); 1768 msg->setTarget(this); 1769 1770 notify->setMessage("reply", msg); 1771 notify->post(); 1772 } 1773 1774 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1775 mContinuation.clear(); 1776 // All remaining fetchers are still suspended, the player has shutdown 1777 // any decoders that needed it. 1778 1779 uint32_t streamMask, resumeMask; 1780 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1781 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1782 1783 mNewStreamMask = streamMask | resumeMask; 1784 1785 int64_t timeUs; 1786 int32_t pickTrack; 1787 bool switching = false; 1788 CHECK(msg->findInt64("timeUs", &timeUs)); 1789 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1790 1791 if (timeUs < 0ll) { 1792 if (!pickTrack) { 1793 // mSwapMask contains streams that are in both old and new variant, 1794 // (in mNewStreamMask & mStreamMask) but with different URIs 1795 // (not in resumeMask). 1796 // For example, old variant has video and audio in two separate 1797 // URIs, and new variant has only audio with unchanged URI. mSwapMask 1798 // should be 0 as there is nothing to swap. We only need to stop video, 1799 // and resume audio. 1800 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask; 1801 switching = (mSwapMask != 0); 1802 } 1803 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1804 } else { 1805 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1806 } 1807 1808 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, " 1809 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x", 1810 (long long)timeUs, switching, pickTrack, 1811 mStreamMask, mNewStreamMask, mSwapMask); 1812 1813 for (size_t i = 0; i < kMaxStreams; ++i) { 1814 if (streamMask & indexToType(i)) { 1815 if (switching) { 1816 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1817 } else { 1818 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1819 } 1820 } 1821 } 1822 1823 // Of all existing fetchers: 1824 // * Resume fetchers that are still needed and assign them original packet sources. 1825 // * Mark otherwise unneeded fetchers for removal. 1826 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1827 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1828 const AString &uri = mFetcherInfos.keyAt(i); 1829 if (!resumeFetcher(uri, resumeMask, timeUs)) { 1830 ALOGV("marking fetcher-%d to be removed", 1831 mFetcherInfos[i].mFetcher->getFetcherID()); 1832 1833 mFetcherInfos.editValueAt(i).mToBeRemoved = true; 1834 } 1835 } 1836 1837 // streamMask now only contains the types that need a new fetcher created. 1838 if (streamMask != 0) { 1839 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1840 } 1841 1842 // Find out when the original fetchers have buffered up to and start the new fetchers 1843 // at a later timestamp. 1844 for (size_t i = 0; i < kMaxStreams; i++) { 1845 if (!(indexToType(i) & streamMask)) { 1846 continue; 1847 } 1848 1849 AString uri; 1850 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1851 1852 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1853 CHECK(fetcher != NULL); 1854 1855 HLSTime startTime; 1856 SeekMode seekMode = kSeekModeExactPosition; 1857 sp<AnotherPacketSource> sources[kNumSources]; 1858 1859 if (i == kSubtitleIndex || (!pickTrack && !switching)) { 1860 startTime = latestMediaSegmentStartTime(); 1861 } 1862 1863 // TRICKY: looping from i as earlier streams are already removed from streamMask 1864 for (size_t j = i; j < kMaxStreams; ++j) { 1865 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1866 if ((streamMask & indexToType(j)) && uri == streamUri) { 1867 sources[j] = mPacketSources.valueFor(indexToType(j)); 1868 1869 if (timeUs >= 0) { 1870 startTime.mTimeUs = timeUs; 1871 } else { 1872 int32_t type; 1873 sp<AMessage> meta; 1874 if (!switching) { 1875 // selecting, or adapting but no swap required 1876 meta = sources[j]->getLatestDequeuedMeta(); 1877 } else { 1878 // adapting and swap required 1879 meta = sources[j]->getLatestEnqueuedMeta(); 1880 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { 1881 // switching up 1882 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); 1883 } 1884 } 1885 1886 if ((j == kAudioIndex || j == kVideoIndex) 1887 && meta != NULL && !meta->findInt32("discontinuity", &type)) { 1888 HLSTime tmpTime(meta); 1889 if (startTime < tmpTime) { 1890 startTime = tmpTime; 1891 } 1892 } 1893 1894 if (!switching) { 1895 // selecting, or adapting but no swap required 1896 sources[j]->clear(); 1897 if (j == kSubtitleIndex) { 1898 break; 1899 } 1900 1901 ALOGV("stream[%zu]: queue format change", j); 1902 sources[j]->queueDiscontinuity( 1903 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); 1904 } else { 1905 // switching, queue discontinuities after resume 1906 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1907 sources[j]->clear(); 1908 // the new fetcher might be providing streams that used to be 1909 // provided by two different fetchers, if one of the fetcher 1910 // paused in the middle while the other somehow paused in next 1911 // seg, we have to start from next seg. 1912 if (seekMode < mStreams[j].mSeekMode) { 1913 seekMode = mStreams[j].mSeekMode; 1914 } 1915 } 1916 } 1917 1918 streamMask &= ~indexToType(j); 1919 } 1920 } 1921 1922 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld " 1923 "segmentStartTimeUs %lld seekMode %d", 1924 fetcher->getFetcherID(), 1925 (long long)startTime.mTimeUs, 1926 (long long)mLastSeekTimeUs, 1927 (long long)startTime.getSegmentTimeUs(), 1928 seekMode); 1929 1930 // Set the target segment start time to the middle point of the 1931 // segment where the last sample was. 1932 // This gives a better guess if segments of the two variants are not 1933 // perfectly aligned. (If the corresponding segment in new variant 1934 // starts slightly later than that in the old variant, we still want 1935 // to pick that segment, not the one before) 1936 fetcher->startAsync( 1937 sources[kAudioIndex], 1938 sources[kVideoIndex], 1939 sources[kSubtitleIndex], 1940 getMetadataSource(sources, mNewStreamMask, switching), 1941 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, 1942 startTime.getSegmentTimeUs(), 1943 startTime.mSeq, 1944 seekMode); 1945 } 1946 1947 // All fetchers have now been started, the configuration change 1948 // has completed. 1949 1950 mReconfigurationInProgress = false; 1951 if (switching) { 1952 mSwitchInProgress = true; 1953 } else { 1954 mStreamMask = mNewStreamMask; 1955 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1956 ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd", 1957 mOrigBandwidthIndex, mCurBandwidthIndex); 1958 mOrigBandwidthIndex = mCurBandwidthIndex; 1959 } 1960 } 1961 1962 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x", 1963 mSwitchInProgress, mStreamMask); 1964 1965 if (mDisconnectReplyID != NULL) { 1966 finishDisconnect(); 1967 } 1968 } 1969 1970 void LiveSession::swapPacketSource(StreamType stream) { 1971 ALOGV("[%s] swapPacketSource", getNameForStream(stream)); 1972 1973 // transfer packets from source2 to source 1974 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 1975 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 1976 1977 // queue discontinuity in mPacketSource 1978 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); 1979 1980 // queue packets in mPacketSource2 to mPacketSource 1981 status_t finalResult = OK; 1982 sp<ABuffer> accessUnit; 1983 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && 1984 OK == aps2->dequeueAccessUnit(&accessUnit)) { 1985 aps->queueAccessUnit(accessUnit); 1986 } 1987 aps2->clear(); 1988 } 1989 1990 void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { 1991 if (!mSwitchInProgress) { 1992 return; 1993 } 1994 1995 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 1996 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { 1997 return; 1998 } 1999 2000 // Swap packet source of streams provided by old variant 2001 for (size_t idx = 0; idx < kMaxStreams; idx++) { 2002 StreamType stream = indexToType(idx); 2003 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { 2004 swapPacketSource(stream); 2005 2006 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 2007 ALOGW("swapping stream type %d %s to empty stream", 2008 stream, mStreams[idx].mUri.c_str()); 2009 } 2010 mStreams[idx].mUri = mStreams[idx].mNewUri; 2011 mStreams[idx].mNewUri.clear(); 2012 2013 mSwapMask &= ~stream; 2014 } 2015 } 2016 2017 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); 2018 2019 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask); 2020 if (mSwapMask != 0) { 2021 return; 2022 } 2023 2024 // Check if new variant contains extra streams. 2025 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 2026 while (extraStreams) { 2027 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); 2028 extraStreams &= ~stream; 2029 2030 swapPacketSource(stream); 2031 2032 ssize_t idx = typeToIndex(stream); 2033 CHECK(idx >= 0); 2034 if (mStreams[idx].mNewUri.empty()) { 2035 ALOGW("swapping extra stream type %d %s to empty stream", 2036 stream, mStreams[idx].mUri.c_str()); 2037 } 2038 mStreams[idx].mUri = mStreams[idx].mNewUri; 2039 mStreams[idx].mNewUri.clear(); 2040 } 2041 2042 // Restart new fetcher (it was paused after the first 47k block) 2043 // and let it fetch into mPacketSources (not mPacketSources2) 2044 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2045 FetcherInfo &info = mFetcherInfos.editValueAt(i); 2046 if (info.mToBeResumed) { 2047 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); 2048 info.mToBeResumed = false; 2049 } 2050 } 2051 2052 ALOGI("#### Finished Bandwidth Switch: %zd => %zd", 2053 mOrigBandwidthIndex, mCurBandwidthIndex); 2054 2055 mStreamMask = mNewStreamMask; 2056 mSwitchInProgress = false; 2057 mOrigBandwidthIndex = mCurBandwidthIndex; 2058 2059 restartPollBuffering(); 2060 } 2061 2062 void LiveSession::schedulePollBuffering() { 2063 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); 2064 msg->setInt32("generation", mPollBufferingGeneration); 2065 msg->post(1000000ll); 2066 } 2067 2068 void LiveSession::cancelPollBuffering() { 2069 ++mPollBufferingGeneration; 2070 mPrevBufferPercentage = -1; 2071 } 2072 2073 void LiveSession::restartPollBuffering() { 2074 cancelPollBuffering(); 2075 onPollBuffering(); 2076 } 2077 2078 void LiveSession::onPollBuffering() { 2079 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " 2080 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", 2081 mSwitchInProgress, mReconfigurationInProgress, 2082 mInPreparationPhase, mCurBandwidthIndex, mStreamMask); 2083 2084 bool underflow, ready, down, up; 2085 if (checkBuffering(underflow, ready, down, up)) { 2086 if (mInPreparationPhase) { 2087 // Allow down switch even if we're still preparing. 2088 // 2089 // Some streams have a high bandwidth index as default, 2090 // when bandwidth is low, it takes a long time to buffer 2091 // to ready mark, then it immediately pauses after start 2092 // as we have to do a down switch. It's better experience 2093 // to restart from a lower index, if we detect low bw. 2094 if (!switchBandwidthIfNeeded(false /* up */, down) && ready) { 2095 postPrepared(OK); 2096 } 2097 } 2098 2099 if (!mInPreparationPhase) { 2100 if (ready) { 2101 stopBufferingIfNecessary(); 2102 } else if (underflow) { 2103 startBufferingIfNecessary(); 2104 } 2105 switchBandwidthIfNeeded(up, down); 2106 } 2107 } 2108 2109 schedulePollBuffering(); 2110 } 2111 2112 void LiveSession::cancelBandwidthSwitch(bool resume) { 2113 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", 2114 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); 2115 if (!mSwitchInProgress) { 2116 return; 2117 } 2118 2119 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2120 FetcherInfo& info = mFetcherInfos.editValueAt(i); 2121 if (info.mToBeRemoved) { 2122 info.mToBeRemoved = false; 2123 if (resume) { 2124 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); 2125 } 2126 } 2127 } 2128 2129 for (size_t i = 0; i < kMaxStreams; ++i) { 2130 AString newUri = mStreams[i].mNewUri; 2131 if (!newUri.empty()) { 2132 // clear all mNewUri matching this newUri 2133 for (size_t j = i; j < kMaxStreams; ++j) { 2134 if (mStreams[j].mNewUri == newUri) { 2135 mStreams[j].mNewUri.clear(); 2136 } 2137 } 2138 ALOGV("stopping newUri = %s", newUri.c_str()); 2139 ssize_t index = mFetcherInfos.indexOfKey(newUri); 2140 if (index < 0) { 2141 ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); 2142 continue; 2143 } 2144 FetcherInfo &info = mFetcherInfos.editValueAt(index); 2145 info.mToBeRemoved = true; 2146 info.mFetcher->stopAsync(); 2147 } 2148 } 2149 2150 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", 2151 mOrigBandwidthIndex, mCurBandwidthIndex); 2152 2153 mSwitchGeneration++; 2154 mSwitchInProgress = false; 2155 mCurBandwidthIndex = mOrigBandwidthIndex; 2156 mSwapMask = 0; 2157 } 2158 2159 bool LiveSession::checkBuffering( 2160 bool &underflow, bool &ready, bool &down, bool &up) { 2161 underflow = ready = down = up = false; 2162 2163 if (mReconfigurationInProgress) { 2164 ALOGV("Switch/Reconfig in progress, defer buffer polling"); 2165 return false; 2166 } 2167 2168 size_t activeCount, underflowCount, readyCount, downCount, upCount; 2169 activeCount = underflowCount = readyCount = downCount = upCount =0; 2170 int32_t minBufferPercent = -1; 2171 int64_t durationUs; 2172 if (getDuration(&durationUs) != OK) { 2173 durationUs = -1; 2174 } 2175 for (size_t i = 0; i < mPacketSources.size(); ++i) { 2176 // we don't check subtitles for buffering level 2177 if (!(mStreamMask & mPacketSources.keyAt(i) 2178 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { 2179 continue; 2180 } 2181 // ignore streams that never had any packet queued. 2182 // (it's possible that the variant only has audio or video) 2183 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 2184 if (meta == NULL) { 2185 continue; 2186 } 2187 2188 status_t finalResult; 2189 int64_t bufferedDurationUs = 2190 mPacketSources[i]->getBufferedDurationUs(&finalResult); 2191 ALOGV("[%s] buffered %lld us", 2192 getNameForStream(mPacketSources.keyAt(i)), 2193 (long long)bufferedDurationUs); 2194 if (durationUs >= 0) { 2195 int32_t percent; 2196 if (mPacketSources[i]->isFinished(0 /* duration */)) { 2197 percent = 100; 2198 } else { 2199 percent = (int32_t)(100.0 * 2200 (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); 2201 } 2202 if (minBufferPercent < 0 || percent < minBufferPercent) { 2203 minBufferPercent = percent; 2204 } 2205 } 2206 2207 ++activeCount; 2208 int64_t readyMarkUs = 2209 (mInPreparationPhase ? 2210 mBufferingSettings.mInitialMarkMs : 2211 mBufferingSettings.mResumePlaybackMarkMs) * 1000ll; 2212 if (bufferedDurationUs > readyMarkUs 2213 || mPacketSources[i]->isFinished(0)) { 2214 ++readyCount; 2215 } 2216 if (!mPacketSources[i]->isFinished(0)) { 2217 if (bufferedDurationUs < kUnderflowMarkMs * 1000ll) { 2218 ++underflowCount; 2219 } 2220 if (bufferedDurationUs > mUpSwitchMark) { 2221 ++upCount; 2222 } 2223 if (bufferedDurationUs < mDownSwitchMark) { 2224 ++downCount; 2225 } 2226 } 2227 } 2228 2229 if (minBufferPercent >= 0) { 2230 notifyBufferingUpdate(minBufferPercent); 2231 } 2232 2233 if (activeCount > 0) { 2234 up = (upCount == activeCount); 2235 down = (downCount > 0); 2236 ready = (readyCount == activeCount); 2237 underflow = (underflowCount > 0); 2238 return true; 2239 } 2240 2241 return false; 2242 } 2243 2244 void LiveSession::startBufferingIfNecessary() { 2245 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2246 mInPreparationPhase, mBuffering); 2247 if (!mBuffering) { 2248 mBuffering = true; 2249 2250 sp<AMessage> notify = mNotify->dup(); 2251 notify->setInt32("what", kWhatBufferingStart); 2252 notify->post(); 2253 } 2254 } 2255 2256 void LiveSession::stopBufferingIfNecessary() { 2257 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2258 mInPreparationPhase, mBuffering); 2259 2260 if (mBuffering) { 2261 mBuffering = false; 2262 2263 sp<AMessage> notify = mNotify->dup(); 2264 notify->setInt32("what", kWhatBufferingEnd); 2265 notify->post(); 2266 } 2267 } 2268 2269 void LiveSession::notifyBufferingUpdate(int32_t percentage) { 2270 if (percentage < mPrevBufferPercentage) { 2271 percentage = mPrevBufferPercentage; 2272 } else if (percentage > 100) { 2273 percentage = 100; 2274 } 2275 2276 mPrevBufferPercentage = percentage; 2277 2278 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); 2279 2280 sp<AMessage> notify = mNotify->dup(); 2281 notify->setInt32("what", kWhatBufferingUpdate); 2282 notify->setInt32("percentage", percentage); 2283 notify->post(); 2284 } 2285 2286 bool LiveSession::tryBandwidthFallback() { 2287 if (mInPreparationPhase || mReconfigurationInProgress) { 2288 // Don't try fallback during prepare or reconfig. 2289 // If error happens there, it's likely unrecoverable. 2290 return false; 2291 } 2292 if (mCurBandwidthIndex > mOrigBandwidthIndex) { 2293 // if we're switching up, simply cancel and resume old variant 2294 cancelBandwidthSwitch(true /* resume */); 2295 return true; 2296 } else { 2297 // if we're switching down, we're likely about to underflow (if 2298 // not already underflowing). try the lowest viable bandwidth if 2299 // not on that variant already. 2300 ssize_t lowestValid = getLowestValidBandwidthIndex(); 2301 if (mCurBandwidthIndex > lowestValid) { 2302 cancelBandwidthSwitch(); 2303 changeConfiguration(-1ll, lowestValid); 2304 return true; 2305 } 2306 } 2307 // return false if we couldn't find any fallback 2308 return false; 2309 } 2310 2311 /* 2312 * returns true if a bandwidth switch is actually needed (and started), 2313 * returns false otherwise 2314 */ 2315 bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { 2316 // no need to check bandwidth if we only have 1 bandwidth settings 2317 if (mBandwidthItems.size() < 2) { 2318 return false; 2319 } 2320 2321 if (mSwitchInProgress) { 2322 if (mBuffering) { 2323 tryBandwidthFallback(); 2324 } 2325 return false; 2326 } 2327 2328 int32_t bandwidthBps, shortTermBps; 2329 bool isStable; 2330 if (mBandwidthEstimator->estimateBandwidth( 2331 &bandwidthBps, &isStable, &shortTermBps)) { 2332 ALOGV("bandwidth estimated at %.2f kbps, " 2333 "stable %d, shortTermBps %.2f kbps", 2334 bandwidthBps / 1024.0f, isStable, shortTermBps / 1024.0f); 2335 mLastBandwidthBps = bandwidthBps; 2336 mLastBandwidthStable = isStable; 2337 } else { 2338 ALOGV("no bandwidth estimate."); 2339 return false; 2340 } 2341 2342 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth; 2343 // canSwithDown and canSwitchUp can't both be true. 2344 // we only want to switch up when measured bw is 120% higher than current variant, 2345 // and we only want to switch down when measured bw is below current variant. 2346 bool canSwitchDown = bufferLow 2347 && (bandwidthBps < (int32_t)curBandwidth); 2348 bool canSwitchUp = bufferHigh 2349 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10); 2350 2351 if (canSwitchDown || canSwitchUp) { 2352 // bandwidth estimating has some delay, if we have to downswitch when 2353 // it hasn't stabilized, use the short term to guess real bandwidth, 2354 // since it may be dropping too fast. 2355 // (note this doesn't apply to upswitch, always use longer average there) 2356 if (!isStable && canSwitchDown) { 2357 if (shortTermBps < bandwidthBps) { 2358 bandwidthBps = shortTermBps; 2359 } 2360 } 2361 2362 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps); 2363 2364 // it's possible that we're checking for canSwitchUp case, but the returned 2365 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70% 2366 // of measured bw. In that case we don't want to do anything, since we have 2367 // both enough buffer and enough bw. 2368 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) 2369 || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) { 2370 // if not yet prepared, just restart again with new bw index. 2371 // this is faster and playback experience is cleaner. 2372 changeConfiguration( 2373 mInPreparationPhase ? 0 : -1ll, bandwidthIndex); 2374 return true; 2375 } 2376 } 2377 return false; 2378 } 2379 2380 void LiveSession::postError(status_t err) { 2381 // if we reached EOS, notify buffering of 100% 2382 if (err == ERROR_END_OF_STREAM) { 2383 notifyBufferingUpdate(100); 2384 } 2385 // we'll stop buffer polling now, before that notify 2386 // stop buffering to stop the spinning icon 2387 stopBufferingIfNecessary(); 2388 cancelPollBuffering(); 2389 2390 sp<AMessage> notify = mNotify->dup(); 2391 notify->setInt32("what", kWhatError); 2392 notify->setInt32("err", err); 2393 notify->post(); 2394 } 2395 2396 void LiveSession::postPrepared(status_t err) { 2397 CHECK(mInPreparationPhase); 2398 2399 sp<AMessage> notify = mNotify->dup(); 2400 if (err == OK || err == ERROR_END_OF_STREAM) { 2401 notify->setInt32("what", kWhatPrepared); 2402 } else { 2403 cancelPollBuffering(); 2404 2405 notify->setInt32("what", kWhatPreparationFailed); 2406 notify->setInt32("err", err); 2407 } 2408 2409 notify->post(); 2410 2411 mInPreparationPhase = false; 2412 } 2413 2414 2415 } // namespace android 2416 2417