1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "LiveSession" 19 #include <utils/Log.h> 20 21 #include "LiveSession.h" 22 23 #include "M3UParser.h" 24 #include "PlaylistFetcher.h" 25 26 #include "include/HTTPBase.h" 27 #include "mpeg2ts/AnotherPacketSource.h" 28 29 #include <cutils/properties.h> 30 #include <media/IMediaHTTPConnection.h> 31 #include <media/IMediaHTTPService.h> 32 #include <media/stagefright/foundation/hexdump.h> 33 #include <media/stagefright/foundation/ABuffer.h> 34 #include <media/stagefright/foundation/ADebug.h> 35 #include <media/stagefright/foundation/AMessage.h> 36 #include <media/stagefright/DataSource.h> 37 #include <media/stagefright/FileSource.h> 38 #include <media/stagefright/MediaErrors.h> 39 #include <media/stagefright/MediaHTTP.h> 40 #include <media/stagefright/MetaData.h> 41 #include <media/stagefright/Utils.h> 42 43 #include <utils/Mutex.h> 44 45 #include <ctype.h> 46 #include <inttypes.h> 47 #include <openssl/aes.h> 48 #include <openssl/md5.h> 49 50 namespace android { 51 52 // Number of recently-read bytes to use for bandwidth estimation 53 const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024; 54 55 LiveSession::LiveSession( 56 const sp<AMessage> ¬ify, uint32_t flags, 57 const sp<IMediaHTTPService> &httpService) 58 : mNotify(notify), 59 mFlags(flags), 60 mHTTPService(httpService), 61 mInPreparationPhase(true), 62 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 63 mCurBandwidthIndex(-1), 64 mStreamMask(0), 65 mNewStreamMask(0), 66 mSwapMask(0), 67 mCheckBandwidthGeneration(0), 68 mSwitchGeneration(0), 69 mSubtitleGeneration(0), 70 mLastDequeuedTimeUs(0ll), 71 mRealTimeBaseUs(0ll), 72 mReconfigurationInProgress(false), 73 mSwitchInProgress(false), 74 mDisconnectReplyID(0), 75 mSeekReplyID(0), 76 mFirstTimeUsValid(false), 77 mFirstTimeUs(0), 78 mLastSeekTimeUs(0) { 79 80 mStreams[kAudioIndex] = StreamItem("audio"); 81 mStreams[kVideoIndex] = StreamItem("video"); 82 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 83 84 for (size_t i = 0; i < kMaxStreams; ++i) { 85 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 86 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 87 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 88 mBuffering[i] = false; 89 } 90 91 size_t numHistoryItems = kBandwidthHistoryBytes / 92 PlaylistFetcher::kDownloadBlockSize + 1; 93 if (numHistoryItems < 5) { 94 numHistoryItems = 5; 95 } 96 mHTTPDataSource->setBandwidthHistorySize(numHistoryItems); 97 } 98 99 LiveSession::~LiveSession() { 100 } 101 102 sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 103 ABuffer *discontinuity = new ABuffer(0); 104 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 105 discontinuity->meta()->setInt32("swapPacketSource", swap); 106 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 107 discontinuity->meta()->setInt64("timeUs", -1); 108 return discontinuity; 109 } 110 111 void LiveSession::swapPacketSource(StreamType stream) { 112 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 113 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 114 sp<AnotherPacketSource> tmp = aps; 115 aps = aps2; 116 aps2 = tmp; 117 aps2->clear(); 118 } 119 120 status_t LiveSession::dequeueAccessUnit( 121 StreamType stream, sp<ABuffer> *accessUnit) { 122 if (!(mStreamMask & stream)) { 123 // return -EWOULDBLOCK to avoid halting the decoder 124 // when switching between audio/video and audio only. 125 return -EWOULDBLOCK; 126 } 127 128 status_t finalResult; 129 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 130 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 131 discontinuityQueue->dequeueAccessUnit(accessUnit); 132 // seeking, track switching 133 sp<AMessage> extra; 134 int64_t timeUs; 135 if ((*accessUnit)->meta()->findMessage("extra", &extra) 136 && extra != NULL 137 && extra->findInt64("timeUs", &timeUs)) { 138 // seeking only 139 mLastSeekTimeUs = timeUs; 140 mDiscontinuityOffsetTimesUs.clear(); 141 mDiscontinuityAbsStartTimesUs.clear(); 142 } 143 return INFO_DISCONTINUITY; 144 } 145 146 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 147 148 ssize_t idx = typeToIndex(stream); 149 if (!packetSource->hasBufferAvailable(&finalResult)) { 150 if (finalResult == OK) { 151 mBuffering[idx] = true; 152 return -EAGAIN; 153 } else { 154 return finalResult; 155 } 156 } 157 158 int32_t targetDuration = 0; 159 sp<AMessage> meta = packetSource->getLatestEnqueuedMeta(); 160 if (meta != NULL) { 161 meta->findInt32("targetDuration", &targetDuration); 162 } 163 164 int64_t targetDurationUs = targetDuration * 1000000ll; 165 if (targetDurationUs == 0 || 166 targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) { 167 // Fetchers limit buffering to 168 // min(3 * targetDuration, kMinBufferedDurationUs) 169 targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs; 170 } 171 172 if (mBuffering[idx]) { 173 if (mSwitchInProgress 174 || packetSource->isFinished(0) 175 || packetSource->getEstimatedDurationUs() > targetDurationUs) { 176 mBuffering[idx] = false; 177 } 178 } 179 180 if (mBuffering[idx]) { 181 return -EAGAIN; 182 } 183 184 // wait for counterpart 185 sp<AnotherPacketSource> otherSource; 186 uint32_t mask = mNewStreamMask & mStreamMask; 187 uint32_t fetchersMask = 0; 188 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 189 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 190 fetchersMask |= fetcherMask; 191 } 192 mask &= fetchersMask; 193 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 194 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 195 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 196 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 197 } 198 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 199 return finalResult == OK ? -EAGAIN : finalResult; 200 } 201 202 status_t err = packetSource->dequeueAccessUnit(accessUnit); 203 204 size_t streamIdx; 205 const char *streamStr; 206 switch (stream) { 207 case STREAMTYPE_AUDIO: 208 streamIdx = kAudioIndex; 209 streamStr = "audio"; 210 break; 211 case STREAMTYPE_VIDEO: 212 streamIdx = kVideoIndex; 213 streamStr = "video"; 214 break; 215 case STREAMTYPE_SUBTITLES: 216 streamIdx = kSubtitleIndex; 217 streamStr = "subs"; 218 break; 219 default: 220 TRESPASS(); 221 } 222 223 StreamItem& strm = mStreams[streamIdx]; 224 if (err == INFO_DISCONTINUITY) { 225 // adaptive streaming, discontinuities in the playlist 226 int32_t type; 227 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 228 229 sp<AMessage> extra; 230 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 231 extra.clear(); 232 } 233 234 ALOGI("[%s] read discontinuity of type %d, extra = %s", 235 streamStr, 236 type, 237 extra == NULL ? "NULL" : extra->debugString().c_str()); 238 239 int32_t swap; 240 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 241 int32_t switchGeneration; 242 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 243 { 244 Mutex::Autolock lock(mSwapMutex); 245 if (switchGeneration == mSwitchGeneration) { 246 swapPacketSource(stream); 247 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 248 msg->setInt32("stream", stream); 249 msg->setInt32("switchGeneration", switchGeneration); 250 msg->post(); 251 } 252 } 253 } else { 254 size_t seq = strm.mCurDiscontinuitySeq; 255 int64_t offsetTimeUs; 256 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 257 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 258 } else { 259 offsetTimeUs = 0; 260 } 261 262 seq += 1; 263 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 264 int64_t firstTimeUs; 265 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 266 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 267 offsetTimeUs += strm.mLastSampleDurationUs; 268 } else { 269 offsetTimeUs += strm.mLastSampleDurationUs; 270 } 271 272 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 273 } 274 } else if (err == OK) { 275 276 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 277 int64_t timeUs; 278 int32_t discontinuitySeq = 0; 279 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 280 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 281 strm.mCurDiscontinuitySeq = discontinuitySeq; 282 283 int32_t discard = 0; 284 int64_t firstTimeUs; 285 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 286 int64_t durUs; // approximate sample duration 287 if (timeUs > strm.mLastDequeuedTimeUs) { 288 durUs = timeUs - strm.mLastDequeuedTimeUs; 289 } else { 290 durUs = strm.mLastDequeuedTimeUs - timeUs; 291 } 292 strm.mLastSampleDurationUs = durUs; 293 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 294 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 295 firstTimeUs = timeUs; 296 } else { 297 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 298 firstTimeUs = timeUs; 299 } 300 301 strm.mLastDequeuedTimeUs = timeUs; 302 if (timeUs >= firstTimeUs) { 303 timeUs -= firstTimeUs; 304 } else { 305 timeUs = 0; 306 } 307 timeUs += mLastSeekTimeUs; 308 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 309 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 310 } 311 312 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 313 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 314 mLastDequeuedTimeUs = timeUs; 315 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 316 } else if (stream == STREAMTYPE_SUBTITLES) { 317 int32_t subtitleGeneration; 318 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 319 && subtitleGeneration != mSubtitleGeneration) { 320 return -EAGAIN; 321 }; 322 (*accessUnit)->meta()->setInt32( 323 "trackIndex", mPlaylist->getSelectedIndex()); 324 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 325 } 326 } else { 327 ALOGI("[%s] encountered error %d", streamStr, err); 328 } 329 330 return err; 331 } 332 333 status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 334 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 335 if (!(mStreamMask & stream)) { 336 return UNKNOWN_ERROR; 337 } 338 339 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 340 341 sp<MetaData> meta = packetSource->getFormat(); 342 343 if (meta == NULL) { 344 return -EAGAIN; 345 } 346 347 return convertMetaDataToMessage(meta, format); 348 } 349 350 void LiveSession::connectAsync( 351 const char *url, const KeyedVector<String8, String8> *headers) { 352 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 353 msg->setString("url", url); 354 355 if (headers != NULL) { 356 msg->setPointer( 357 "headers", 358 new KeyedVector<String8, String8>(*headers)); 359 } 360 361 msg->post(); 362 } 363 364 status_t LiveSession::disconnect() { 365 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 366 367 sp<AMessage> response; 368 status_t err = msg->postAndAwaitResponse(&response); 369 370 return err; 371 } 372 373 status_t LiveSession::seekTo(int64_t timeUs) { 374 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 375 msg->setInt64("timeUs", timeUs); 376 377 sp<AMessage> response; 378 status_t err = msg->postAndAwaitResponse(&response); 379 380 return err; 381 } 382 383 void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 384 switch (msg->what()) { 385 case kWhatConnect: 386 { 387 onConnect(msg); 388 break; 389 } 390 391 case kWhatDisconnect: 392 { 393 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 394 395 if (mReconfigurationInProgress) { 396 break; 397 } 398 399 finishDisconnect(); 400 break; 401 } 402 403 case kWhatSeek: 404 { 405 uint32_t seekReplyID; 406 CHECK(msg->senderAwaitsResponse(&seekReplyID)); 407 mSeekReplyID = seekReplyID; 408 mSeekReply = new AMessage; 409 410 status_t err = onSeek(msg); 411 412 if (err != OK) { 413 msg->post(50000); 414 } 415 break; 416 } 417 418 case kWhatFetcherNotify: 419 { 420 int32_t what; 421 CHECK(msg->findInt32("what", &what)); 422 423 switch (what) { 424 case PlaylistFetcher::kWhatStarted: 425 break; 426 case PlaylistFetcher::kWhatPaused: 427 case PlaylistFetcher::kWhatStopped: 428 { 429 if (what == PlaylistFetcher::kWhatStopped) { 430 AString uri; 431 CHECK(msg->findString("uri", &uri)); 432 if (mFetcherInfos.removeItem(uri) < 0) { 433 // ignore duplicated kWhatStopped messages. 434 break; 435 } 436 437 if (mSwitchInProgress) { 438 tryToFinishBandwidthSwitch(); 439 } 440 } 441 442 if (mContinuation != NULL) { 443 CHECK_GT(mContinuationCounter, 0); 444 if (--mContinuationCounter == 0) { 445 mContinuation->post(); 446 447 if (mSeekReplyID != 0) { 448 CHECK(mSeekReply != NULL); 449 mSeekReply->setInt32("err", OK); 450 mSeekReply->postReply(mSeekReplyID); 451 mSeekReplyID = 0; 452 mSeekReply.clear(); 453 } 454 } 455 } 456 break; 457 } 458 459 case PlaylistFetcher::kWhatDurationUpdate: 460 { 461 AString uri; 462 CHECK(msg->findString("uri", &uri)); 463 464 int64_t durationUs; 465 CHECK(msg->findInt64("durationUs", &durationUs)); 466 467 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 468 info->mDurationUs = durationUs; 469 break; 470 } 471 472 case PlaylistFetcher::kWhatError: 473 { 474 status_t err; 475 CHECK(msg->findInt32("err", &err)); 476 477 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 478 479 // handle EOS on subtitle tracks independently 480 AString uri; 481 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 482 ssize_t i = mFetcherInfos.indexOfKey(uri); 483 if (i >= 0) { 484 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 485 if (fetcher != NULL) { 486 uint32_t type = fetcher->getStreamTypeMask(); 487 if (type == STREAMTYPE_SUBTITLES) { 488 mPacketSources.valueFor( 489 STREAMTYPE_SUBTITLES)->signalEOS(err);; 490 break; 491 } 492 } 493 } 494 } 495 496 if (mInPreparationPhase) { 497 postPrepared(err); 498 } 499 500 cancelBandwidthSwitch(); 501 502 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 503 504 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 505 506 mPacketSources.valueFor( 507 STREAMTYPE_SUBTITLES)->signalEOS(err); 508 509 sp<AMessage> notify = mNotify->dup(); 510 notify->setInt32("what", kWhatError); 511 notify->setInt32("err", err); 512 notify->post(); 513 break; 514 } 515 516 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 517 { 518 AString uri; 519 CHECK(msg->findString("uri", &uri)); 520 521 if (mFetcherInfos.indexOfKey(uri) < 0) { 522 ALOGE("couldn't find uri"); 523 break; 524 } 525 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 526 info->mIsPrepared = true; 527 528 if (mInPreparationPhase) { 529 bool allFetchersPrepared = true; 530 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 531 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 532 allFetchersPrepared = false; 533 break; 534 } 535 } 536 537 if (allFetchersPrepared) { 538 postPrepared(OK); 539 } 540 } 541 break; 542 } 543 544 case PlaylistFetcher::kWhatStartedAt: 545 { 546 int32_t switchGeneration; 547 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 548 549 if (switchGeneration != mSwitchGeneration) { 550 break; 551 } 552 553 // Resume fetcher for the original variant; the resumed fetcher should 554 // continue until the timestamps found in msg, which is stored by the 555 // new fetcher to indicate where the new variant has started buffering. 556 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 557 const FetcherInfo info = mFetcherInfos.valueAt(i); 558 if (info.mToBeRemoved) { 559 info.mFetcher->resumeUntilAsync(msg); 560 } 561 } 562 break; 563 } 564 565 default: 566 TRESPASS(); 567 } 568 569 break; 570 } 571 572 case kWhatCheckBandwidth: 573 { 574 int32_t generation; 575 CHECK(msg->findInt32("generation", &generation)); 576 577 if (generation != mCheckBandwidthGeneration) { 578 break; 579 } 580 581 onCheckBandwidth(msg); 582 break; 583 } 584 585 case kWhatChangeConfiguration: 586 { 587 onChangeConfiguration(msg); 588 break; 589 } 590 591 case kWhatChangeConfiguration2: 592 { 593 onChangeConfiguration2(msg); 594 break; 595 } 596 597 case kWhatChangeConfiguration3: 598 { 599 onChangeConfiguration3(msg); 600 break; 601 } 602 603 case kWhatFinishDisconnect2: 604 { 605 onFinishDisconnect2(); 606 break; 607 } 608 609 case kWhatSwapped: 610 { 611 onSwapped(msg); 612 break; 613 } 614 615 case kWhatCheckSwitchDown: 616 { 617 onCheckSwitchDown(); 618 break; 619 } 620 621 case kWhatSwitchDown: 622 { 623 onSwitchDown(); 624 break; 625 } 626 627 default: 628 TRESPASS(); 629 break; 630 } 631 } 632 633 // static 634 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 635 if (a->mBandwidth < b->mBandwidth) { 636 return -1; 637 } else if (a->mBandwidth == b->mBandwidth) { 638 return 0; 639 } 640 641 return 1; 642 } 643 644 // static 645 LiveSession::StreamType LiveSession::indexToType(int idx) { 646 CHECK(idx >= 0 && idx < kMaxStreams); 647 return (StreamType)(1 << idx); 648 } 649 650 // static 651 ssize_t LiveSession::typeToIndex(int32_t type) { 652 switch (type) { 653 case STREAMTYPE_AUDIO: 654 return 0; 655 case STREAMTYPE_VIDEO: 656 return 1; 657 case STREAMTYPE_SUBTITLES: 658 return 2; 659 default: 660 return -1; 661 }; 662 return -1; 663 } 664 665 void LiveSession::onConnect(const sp<AMessage> &msg) { 666 AString url; 667 CHECK(msg->findString("url", &url)); 668 669 KeyedVector<String8, String8> *headers = NULL; 670 if (!msg->findPointer("headers", (void **)&headers)) { 671 mExtraHeaders.clear(); 672 } else { 673 mExtraHeaders = *headers; 674 675 delete headers; 676 headers = NULL; 677 } 678 679 // TODO currently we don't know if we are coming here from incognito mode 680 ALOGI("onConnect %s", uriDebugString(url).c_str()); 681 682 mMasterURL = url; 683 684 bool dummy; 685 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 686 687 if (mPlaylist == NULL) { 688 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 689 690 postPrepared(ERROR_IO); 691 return; 692 } 693 694 // We trust the content provider to make a reasonable choice of preferred 695 // initial bandwidth by listing it first in the variant playlist. 696 // At startup we really don't have a good estimate on the available 697 // network bandwidth since we haven't tranferred any data yet. Once 698 // we have we can make a better informed choice. 699 size_t initialBandwidth = 0; 700 size_t initialBandwidthIndex = 0; 701 702 if (mPlaylist->isVariantPlaylist()) { 703 for (size_t i = 0; i < mPlaylist->size(); ++i) { 704 BandwidthItem item; 705 706 item.mPlaylistIndex = i; 707 708 sp<AMessage> meta; 709 AString uri; 710 mPlaylist->itemAt(i, &uri, &meta); 711 712 unsigned long bandwidth; 713 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 714 715 if (initialBandwidth == 0) { 716 initialBandwidth = item.mBandwidth; 717 } 718 719 mBandwidthItems.push(item); 720 } 721 722 CHECK_GT(mBandwidthItems.size(), 0u); 723 724 mBandwidthItems.sort(SortByBandwidth); 725 726 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 727 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 728 initialBandwidthIndex = i; 729 break; 730 } 731 } 732 } else { 733 // dummy item. 734 BandwidthItem item; 735 item.mPlaylistIndex = 0; 736 item.mBandwidth = 0; 737 mBandwidthItems.push(item); 738 } 739 740 mPlaylist->pickRandomMediaItems(); 741 changeConfiguration( 742 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 743 } 744 745 void LiveSession::finishDisconnect() { 746 // No reconfiguration is currently pending, make sure none will trigger 747 // during disconnection either. 748 cancelCheckBandwidthEvent(); 749 750 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 751 // (finishDisconnect, onFinishDisconnect2) 752 cancelBandwidthSwitch(); 753 754 // cancel switch down monitor 755 mSwitchDownMonitor.clear(); 756 757 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 758 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 759 } 760 761 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 762 763 mContinuationCounter = mFetcherInfos.size(); 764 mContinuation = msg; 765 766 if (mContinuationCounter == 0) { 767 msg->post(); 768 } 769 } 770 771 void LiveSession::onFinishDisconnect2() { 772 mContinuation.clear(); 773 774 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 775 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 776 777 mPacketSources.valueFor( 778 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 779 780 sp<AMessage> response = new AMessage; 781 response->setInt32("err", OK); 782 783 response->postReply(mDisconnectReplyID); 784 mDisconnectReplyID = 0; 785 } 786 787 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 788 ssize_t index = mFetcherInfos.indexOfKey(uri); 789 790 if (index >= 0) { 791 return NULL; 792 } 793 794 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 795 notify->setString("uri", uri); 796 notify->setInt32("switchGeneration", mSwitchGeneration); 797 798 FetcherInfo info; 799 info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); 800 info.mDurationUs = -1ll; 801 info.mIsPrepared = false; 802 info.mToBeRemoved = false; 803 looper()->registerHandler(info.mFetcher); 804 805 mFetcherInfos.add(uri, info); 806 807 return info.mFetcher; 808 } 809 810 /* 811 * Illustration of parameters: 812 * 813 * 0 `range_offset` 814 * +------------+-------------------------------------------------------+--+--+ 815 * | | | next block to fetch | | | 816 * | | `source` handle => `out` buffer | | | | 817 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 818 * | |<----------- `range_length` / buffer capacity ----------->| | 819 * |<------------------------------ file_size ------------------------------->| 820 * 821 * Special parameter values: 822 * - range_length == -1 means entire file 823 * - block_size == 0 means entire range 824 * 825 */ 826 ssize_t LiveSession::fetchFile( 827 const char *url, sp<ABuffer> *out, 828 int64_t range_offset, int64_t range_length, 829 uint32_t block_size, /* download block size */ 830 sp<DataSource> *source, /* to return and reuse source */ 831 String8 *actualUrl) { 832 off64_t size; 833 sp<DataSource> temp_source; 834 if (source == NULL) { 835 source = &temp_source; 836 } 837 838 if (*source == NULL) { 839 if (!strncasecmp(url, "file://", 7)) { 840 *source = new FileSource(url + 7); 841 } else if (strncasecmp(url, "http://", 7) 842 && strncasecmp(url, "https://", 8)) { 843 return ERROR_UNSUPPORTED; 844 } else { 845 KeyedVector<String8, String8> headers = mExtraHeaders; 846 if (range_offset > 0 || range_length >= 0) { 847 headers.add( 848 String8("Range"), 849 String8( 850 StringPrintf( 851 "bytes=%lld-%s", 852 range_offset, 853 range_length < 0 854 ? "" : StringPrintf("%lld", 855 range_offset + range_length - 1).c_str()).c_str())); 856 } 857 status_t err = mHTTPDataSource->connect(url, &headers); 858 859 if (err != OK) { 860 return err; 861 } 862 863 *source = mHTTPDataSource; 864 } 865 } 866 867 status_t getSizeErr = (*source)->getSize(&size); 868 if (getSizeErr != OK) { 869 size = 65536; 870 } 871 872 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 873 if (*out == NULL) { 874 buffer->setRange(0, 0); 875 } 876 877 ssize_t bytesRead = 0; 878 // adjust range_length if only reading partial block 879 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 880 range_length = buffer->size() + block_size; 881 } 882 for (;;) { 883 // Only resize when we don't know the size. 884 size_t bufferRemaining = buffer->capacity() - buffer->size(); 885 if (bufferRemaining == 0 && getSizeErr != OK) { 886 size_t bufferIncrement = buffer->size() / 2; 887 if (bufferIncrement < 32768) { 888 bufferIncrement = 32768; 889 } 890 bufferRemaining = bufferIncrement; 891 892 ALOGV("increasing download buffer to %zu bytes", 893 buffer->size() + bufferRemaining); 894 895 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 896 memcpy(copy->data(), buffer->data(), buffer->size()); 897 copy->setRange(0, buffer->size()); 898 899 buffer = copy; 900 } 901 902 size_t maxBytesToRead = bufferRemaining; 903 if (range_length >= 0) { 904 int64_t bytesLeftInRange = range_length - buffer->size(); 905 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 906 maxBytesToRead = bytesLeftInRange; 907 908 if (bytesLeftInRange == 0) { 909 break; 910 } 911 } 912 } 913 914 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 915 // to help us break out of the loop. 916 ssize_t n = (*source)->readAt( 917 buffer->size(), buffer->data() + buffer->size(), 918 maxBytesToRead); 919 920 if (n < 0) { 921 return n; 922 } 923 924 if (n == 0) { 925 break; 926 } 927 928 buffer->setRange(0, buffer->size() + (size_t)n); 929 bytesRead += n; 930 } 931 932 *out = buffer; 933 if (actualUrl != NULL) { 934 *actualUrl = (*source)->getUri(); 935 if (actualUrl->isEmpty()) { 936 *actualUrl = url; 937 } 938 } 939 940 return bytesRead; 941 } 942 943 sp<M3UParser> LiveSession::fetchPlaylist( 944 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 945 ALOGV("fetchPlaylist '%s'", url); 946 947 *unchanged = false; 948 949 sp<ABuffer> buffer; 950 String8 actualUrl; 951 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 952 953 if (err <= 0) { 954 return NULL; 955 } 956 957 // MD5 functionality is not available on the simulator, treat all 958 // playlists as changed. 959 960 #if defined(HAVE_ANDROID_OS) 961 uint8_t hash[16]; 962 963 MD5_CTX m; 964 MD5_Init(&m); 965 MD5_Update(&m, buffer->data(), buffer->size()); 966 967 MD5_Final(hash, &m); 968 969 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 970 // playlist unchanged 971 *unchanged = true; 972 973 return NULL; 974 } 975 976 if (curPlaylistHash != NULL) { 977 memcpy(curPlaylistHash, hash, sizeof(hash)); 978 } 979 #endif 980 981 sp<M3UParser> playlist = 982 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 983 984 if (playlist->initCheck() != OK) { 985 ALOGE("failed to parse .m3u8 playlist"); 986 987 return NULL; 988 } 989 990 return playlist; 991 } 992 993 static double uniformRand() { 994 return (double)rand() / RAND_MAX; 995 } 996 997 size_t LiveSession::getBandwidthIndex() { 998 if (mBandwidthItems.size() == 0) { 999 return 0; 1000 } 1001 1002 #if 1 1003 char value[PROPERTY_VALUE_MAX]; 1004 ssize_t index = -1; 1005 if (property_get("media.httplive.bw-index", value, NULL)) { 1006 char *end; 1007 index = strtol(value, &end, 10); 1008 CHECK(end > value && *end == '\0'); 1009 1010 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1011 index = mBandwidthItems.size() - 1; 1012 } 1013 } 1014 1015 if (index < 0) { 1016 int32_t bandwidthBps; 1017 if (mHTTPDataSource != NULL 1018 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 1019 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 1020 } else { 1021 ALOGV("no bandwidth estimate."); 1022 return 0; // Pick the lowest bandwidth stream by default. 1023 } 1024 1025 char value[PROPERTY_VALUE_MAX]; 1026 if (property_get("media.httplive.max-bw", value, NULL)) { 1027 char *end; 1028 long maxBw = strtoul(value, &end, 10); 1029 if (end > value && *end == '\0') { 1030 if (maxBw > 0 && bandwidthBps > maxBw) { 1031 ALOGV("bandwidth capped to %ld bps", maxBw); 1032 bandwidthBps = maxBw; 1033 } 1034 } 1035 } 1036 1037 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1038 1039 index = mBandwidthItems.size() - 1; 1040 while (index > 0) { 1041 // consider only 80% of the available bandwidth, but if we are switching up, 1042 // be even more conservative (70%) to avoid overestimating and immediately 1043 // switching back. 1044 size_t adjustedBandwidthBps = bandwidthBps; 1045 if (index > mCurBandwidthIndex) { 1046 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1047 } else { 1048 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1049 } 1050 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1051 break; 1052 } 1053 --index; 1054 } 1055 } 1056 #elif 0 1057 // Change bandwidth at random() 1058 size_t index = uniformRand() * mBandwidthItems.size(); 1059 #elif 0 1060 // There's a 50% chance to stay on the current bandwidth and 1061 // a 50% chance to switch to the next higher bandwidth (wrapping around 1062 // to lowest) 1063 const size_t kMinIndex = 0; 1064 1065 static ssize_t mCurBandwidthIndex = -1; 1066 1067 size_t index; 1068 if (mCurBandwidthIndex < 0) { 1069 index = kMinIndex; 1070 } else if (uniformRand() < 0.5) { 1071 index = (size_t)mCurBandwidthIndex; 1072 } else { 1073 index = mCurBandwidthIndex + 1; 1074 if (index == mBandwidthItems.size()) { 1075 index = kMinIndex; 1076 } 1077 } 1078 mCurBandwidthIndex = index; 1079 #elif 0 1080 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1081 1082 size_t index = mBandwidthItems.size() - 1; 1083 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1084 --index; 1085 } 1086 #elif 1 1087 char value[PROPERTY_VALUE_MAX]; 1088 size_t index; 1089 if (property_get("media.httplive.bw-index", value, NULL)) { 1090 char *end; 1091 index = strtoul(value, &end, 10); 1092 CHECK(end > value && *end == '\0'); 1093 1094 if (index >= mBandwidthItems.size()) { 1095 index = mBandwidthItems.size() - 1; 1096 } 1097 } else { 1098 index = 0; 1099 } 1100 #else 1101 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1102 #endif 1103 1104 CHECK_GE(index, 0); 1105 1106 return index; 1107 } 1108 1109 int64_t LiveSession::latestMediaSegmentStartTimeUs() { 1110 sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta(); 1111 int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1; 1112 if (audioMeta != NULL) { 1113 audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs); 1114 } 1115 1116 sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta(); 1117 if (videoMeta != NULL 1118 && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) { 1119 if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) { 1120 minSegmentStartTimeUs = videoSegmentStartTimeUs; 1121 } 1122 1123 } 1124 return minSegmentStartTimeUs; 1125 } 1126 1127 status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1128 int64_t timeUs; 1129 CHECK(msg->findInt64("timeUs", &timeUs)); 1130 1131 if (!mReconfigurationInProgress) { 1132 changeConfiguration(timeUs, mCurBandwidthIndex); 1133 return OK; 1134 } else { 1135 return -EWOULDBLOCK; 1136 } 1137 } 1138 1139 status_t LiveSession::getDuration(int64_t *durationUs) const { 1140 int64_t maxDurationUs = -1ll; 1141 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1142 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1143 1144 if (fetcherDurationUs > maxDurationUs) { 1145 maxDurationUs = fetcherDurationUs; 1146 } 1147 } 1148 1149 *durationUs = maxDurationUs; 1150 1151 return OK; 1152 } 1153 1154 bool LiveSession::isSeekable() const { 1155 int64_t durationUs; 1156 return getDuration(&durationUs) == OK && durationUs >= 0; 1157 } 1158 1159 bool LiveSession::hasDynamicDuration() const { 1160 return false; 1161 } 1162 1163 size_t LiveSession::getTrackCount() const { 1164 if (mPlaylist == NULL) { 1165 return 0; 1166 } else { 1167 return mPlaylist->getTrackCount(); 1168 } 1169 } 1170 1171 sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1172 if (mPlaylist == NULL) { 1173 return NULL; 1174 } else { 1175 return mPlaylist->getTrackInfo(trackIndex); 1176 } 1177 } 1178 1179 status_t LiveSession::selectTrack(size_t index, bool select) { 1180 if (mPlaylist == NULL) { 1181 return INVALID_OPERATION; 1182 } 1183 1184 ++mSubtitleGeneration; 1185 status_t err = mPlaylist->selectTrack(index, select); 1186 if (err == OK) { 1187 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1188 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1189 msg->setInt32("pickTrack", select); 1190 msg->post(); 1191 } 1192 return err; 1193 } 1194 1195 ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1196 if (mPlaylist == NULL) { 1197 return -1; 1198 } else { 1199 return mPlaylist->getSelectedTrack(type); 1200 } 1201 } 1202 1203 bool LiveSession::canSwitchUp() { 1204 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1205 status_t err = OK; 1206 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1207 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1208 int64_t dur = source->getBufferedDurationUs(&err); 1209 if (err == OK && dur > 10000000) { 1210 return true; 1211 } 1212 } 1213 return false; 1214 } 1215 1216 void LiveSession::changeConfiguration( 1217 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1218 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1219 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1220 cancelBandwidthSwitch(); 1221 1222 CHECK(!mReconfigurationInProgress); 1223 mReconfigurationInProgress = true; 1224 1225 mCurBandwidthIndex = bandwidthIndex; 1226 1227 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1228 timeUs, bandwidthIndex, pickTrack); 1229 1230 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1231 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1232 1233 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1234 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1235 1236 AString URIs[kMaxStreams]; 1237 for (size_t i = 0; i < kMaxStreams; ++i) { 1238 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1239 streamMask |= indexToType(i); 1240 } 1241 } 1242 1243 // Step 1, stop and discard fetchers that are no longer needed. 1244 // Pause those that we'll reuse. 1245 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1246 const AString &uri = mFetcherInfos.keyAt(i); 1247 1248 bool discardFetcher = true; 1249 1250 // If we're seeking all current fetchers are discarded. 1251 if (timeUs < 0ll) { 1252 // delay fetcher removal if not picking tracks 1253 discardFetcher = pickTrack; 1254 1255 for (size_t j = 0; j < kMaxStreams; ++j) { 1256 StreamType type = indexToType(j); 1257 if ((streamMask & type) && uri == URIs[j]) { 1258 resumeMask |= type; 1259 streamMask &= ~type; 1260 discardFetcher = false; 1261 } 1262 } 1263 } 1264 1265 if (discardFetcher) { 1266 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1267 } else { 1268 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1269 } 1270 } 1271 1272 sp<AMessage> msg; 1273 if (timeUs < 0ll) { 1274 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1275 msg = new AMessage(kWhatChangeConfiguration3, id()); 1276 } else { 1277 msg = new AMessage(kWhatChangeConfiguration2, id()); 1278 } 1279 msg->setInt32("streamMask", streamMask); 1280 msg->setInt32("resumeMask", resumeMask); 1281 msg->setInt32("pickTrack", pickTrack); 1282 msg->setInt64("timeUs", timeUs); 1283 for (size_t i = 0; i < kMaxStreams; ++i) { 1284 if ((streamMask | resumeMask) & indexToType(i)) { 1285 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1286 } 1287 } 1288 1289 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1290 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1291 // fetchers have completed their asynchronous operation, we'll post 1292 // mContinuation, which then is handled below in onChangeConfiguration2. 1293 mContinuationCounter = mFetcherInfos.size(); 1294 mContinuation = msg; 1295 1296 if (mContinuationCounter == 0) { 1297 msg->post(); 1298 1299 if (mSeekReplyID != 0) { 1300 CHECK(mSeekReply != NULL); 1301 mSeekReply->setInt32("err", OK); 1302 mSeekReply->postReply(mSeekReplyID); 1303 mSeekReplyID = 0; 1304 mSeekReply.clear(); 1305 } 1306 } 1307 } 1308 1309 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1310 if (!mReconfigurationInProgress) { 1311 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1312 msg->findInt32("pickTrack", &pickTrack); 1313 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1314 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1315 } else { 1316 msg->post(1000000ll); // retry in 1 sec 1317 } 1318 } 1319 1320 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1321 mContinuation.clear(); 1322 1323 // All fetchers are either suspended or have been removed now. 1324 1325 uint32_t streamMask, resumeMask; 1326 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1327 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1328 1329 // currently onChangeConfiguration2 is only called for seeking; 1330 // remove the following CHECK if using it else where. 1331 CHECK_EQ(resumeMask, 0); 1332 streamMask |= resumeMask; 1333 1334 AString URIs[kMaxStreams]; 1335 for (size_t i = 0; i < kMaxStreams; ++i) { 1336 if (streamMask & indexToType(i)) { 1337 const AString &uriKey = mStreams[i].uriKey(); 1338 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1339 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1340 } 1341 } 1342 1343 // Determine which decoders to shutdown on the player side, 1344 // a decoder has to be shutdown if either 1345 // 1) its streamtype was active before but now longer isn't. 1346 // or 1347 // 2) its streamtype was already active and still is but the URI 1348 // has changed. 1349 uint32_t changedMask = 0; 1350 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1351 if (((mStreamMask & streamMask & indexToType(i)) 1352 && !(URIs[i] == mStreams[i].mUri)) 1353 || (mStreamMask & ~streamMask & indexToType(i))) { 1354 changedMask |= indexToType(i); 1355 } 1356 } 1357 1358 if (changedMask == 0) { 1359 // If nothing changed as far as the audio/video decoders 1360 // are concerned we can proceed. 1361 onChangeConfiguration3(msg); 1362 return; 1363 } 1364 1365 // Something changed, inform the player which will shutdown the 1366 // corresponding decoders and will post the reply once that's done. 1367 // Handling the reply will continue executing below in 1368 // onChangeConfiguration3. 1369 sp<AMessage> notify = mNotify->dup(); 1370 notify->setInt32("what", kWhatStreamsChanged); 1371 notify->setInt32("changedMask", changedMask); 1372 1373 msg->setWhat(kWhatChangeConfiguration3); 1374 msg->setTarget(id()); 1375 1376 notify->setMessage("reply", msg); 1377 notify->post(); 1378 } 1379 1380 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1381 mContinuation.clear(); 1382 // All remaining fetchers are still suspended, the player has shutdown 1383 // any decoders that needed it. 1384 1385 uint32_t streamMask, resumeMask; 1386 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1387 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1388 1389 int64_t timeUs; 1390 int32_t pickTrack; 1391 bool switching = false; 1392 CHECK(msg->findInt64("timeUs", &timeUs)); 1393 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1394 1395 if (timeUs < 0ll) { 1396 if (!pickTrack) { 1397 switching = true; 1398 } 1399 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1400 } else { 1401 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1402 } 1403 1404 for (size_t i = 0; i < kMaxStreams; ++i) { 1405 if (streamMask & indexToType(i)) { 1406 if (switching) { 1407 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1408 } else { 1409 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1410 } 1411 } 1412 } 1413 1414 mNewStreamMask = streamMask | resumeMask; 1415 if (switching) { 1416 mSwapMask = mStreamMask & ~resumeMask; 1417 } 1418 1419 // Of all existing fetchers: 1420 // * Resume fetchers that are still needed and assign them original packet sources. 1421 // * Mark otherwise unneeded fetchers for removal. 1422 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1423 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1424 const AString &uri = mFetcherInfos.keyAt(i); 1425 1426 sp<AnotherPacketSource> sources[kMaxStreams]; 1427 for (size_t j = 0; j < kMaxStreams; ++j) { 1428 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1429 sources[j] = mPacketSources.valueFor(indexToType(j)); 1430 1431 if (j != kSubtitleIndex) { 1432 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1433 sp<AnotherPacketSource> discontinuityQueue; 1434 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1435 discontinuityQueue->queueDiscontinuity( 1436 ATSParser::DISCONTINUITY_NONE, 1437 NULL, 1438 true); 1439 } 1440 } 1441 } 1442 1443 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1444 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1445 || sources[kSubtitleIndex] != NULL) { 1446 info.mFetcher->startAsync( 1447 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1448 } else { 1449 info.mToBeRemoved = true; 1450 } 1451 } 1452 1453 // streamMask now only contains the types that need a new fetcher created. 1454 1455 if (streamMask != 0) { 1456 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1457 } 1458 1459 // Find out when the original fetchers have buffered up to and start the new fetchers 1460 // at a later timestamp. 1461 for (size_t i = 0; i < kMaxStreams; i++) { 1462 if (!(indexToType(i) & streamMask)) { 1463 continue; 1464 } 1465 1466 AString uri; 1467 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1468 1469 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1470 CHECK(fetcher != NULL); 1471 1472 int32_t latestSeq = -1; 1473 int64_t startTimeUs = -1; 1474 int64_t segmentStartTimeUs = -1ll; 1475 int32_t discontinuitySeq = -1; 1476 sp<AnotherPacketSource> sources[kMaxStreams]; 1477 1478 if (i == kSubtitleIndex) { 1479 segmentStartTimeUs = latestMediaSegmentStartTimeUs(); 1480 } 1481 1482 // TRICKY: looping from i as earlier streams are already removed from streamMask 1483 for (size_t j = i; j < kMaxStreams; ++j) { 1484 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1485 if ((streamMask & indexToType(j)) && uri == streamUri) { 1486 sources[j] = mPacketSources.valueFor(indexToType(j)); 1487 1488 if (timeUs >= 0) { 1489 sources[j]->clear(); 1490 startTimeUs = timeUs; 1491 1492 sp<AnotherPacketSource> discontinuityQueue; 1493 sp<AMessage> extra = new AMessage; 1494 extra->setInt64("timeUs", timeUs); 1495 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1496 discontinuityQueue->queueDiscontinuity( 1497 ATSParser::DISCONTINUITY_TIME, extra, true); 1498 } else { 1499 int32_t type; 1500 int64_t srcSegmentStartTimeUs; 1501 sp<AMessage> meta; 1502 if (pickTrack) { 1503 // selecting 1504 meta = sources[j]->getLatestDequeuedMeta(); 1505 } else { 1506 // adapting 1507 meta = sources[j]->getLatestEnqueuedMeta(); 1508 } 1509 1510 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1511 int64_t tmpUs; 1512 int64_t tmpSegmentUs; 1513 1514 CHECK(meta->findInt64("timeUs", &tmpUs)); 1515 CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs)); 1516 if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) { 1517 startTimeUs = tmpUs; 1518 segmentStartTimeUs = tmpSegmentUs; 1519 } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) { 1520 startTimeUs = tmpUs; 1521 } 1522 1523 int32_t seq; 1524 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1525 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1526 discontinuitySeq = seq; 1527 } 1528 } 1529 1530 if (pickTrack) { 1531 // selecting track, queue discontinuities before content 1532 sources[j]->clear(); 1533 if (j == kSubtitleIndex) { 1534 break; 1535 } 1536 sp<AnotherPacketSource> discontinuityQueue; 1537 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1538 discontinuityQueue->queueDiscontinuity( 1539 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1540 } else { 1541 // adapting, queue discontinuities after resume 1542 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1543 sources[j]->clear(); 1544 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1545 if (extraStreams & indexToType(j)) { 1546 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1547 } 1548 } 1549 } 1550 1551 streamMask &= ~indexToType(j); 1552 } 1553 } 1554 1555 fetcher->startAsync( 1556 sources[kAudioIndex], 1557 sources[kVideoIndex], 1558 sources[kSubtitleIndex], 1559 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1560 segmentStartTimeUs, 1561 discontinuitySeq, 1562 switching); 1563 } 1564 1565 // All fetchers have now been started, the configuration change 1566 // has completed. 1567 1568 cancelCheckBandwidthEvent(); 1569 scheduleCheckBandwidthEvent(); 1570 1571 ALOGV("XXX configuration change completed."); 1572 mReconfigurationInProgress = false; 1573 if (switching) { 1574 mSwitchInProgress = true; 1575 } else { 1576 mStreamMask = mNewStreamMask; 1577 } 1578 1579 if (mDisconnectReplyID != 0) { 1580 finishDisconnect(); 1581 } 1582 } 1583 1584 void LiveSession::onSwapped(const sp<AMessage> &msg) { 1585 int32_t switchGeneration; 1586 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1587 if (switchGeneration != mSwitchGeneration) { 1588 return; 1589 } 1590 1591 int32_t stream; 1592 CHECK(msg->findInt32("stream", &stream)); 1593 1594 ssize_t idx = typeToIndex(stream); 1595 CHECK(idx >= 0); 1596 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1597 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1598 } 1599 mStreams[idx].mUri = mStreams[idx].mNewUri; 1600 mStreams[idx].mNewUri.clear(); 1601 1602 mSwapMask &= ~stream; 1603 if (mSwapMask != 0) { 1604 return; 1605 } 1606 1607 // Check if new variant contains extra streams. 1608 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1609 while (extraStreams) { 1610 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1611 swapPacketSource(extraStream); 1612 extraStreams &= ~extraStream; 1613 1614 idx = typeToIndex(extraStream); 1615 CHECK(idx >= 0); 1616 if (mStreams[idx].mNewUri.empty()) { 1617 ALOGW("swapping extra stream type %d %s to empty stream", 1618 extraStream, mStreams[idx].mUri.c_str()); 1619 } 1620 mStreams[idx].mUri = mStreams[idx].mNewUri; 1621 mStreams[idx].mNewUri.clear(); 1622 } 1623 1624 tryToFinishBandwidthSwitch(); 1625 } 1626 1627 void LiveSession::onCheckSwitchDown() { 1628 if (mSwitchDownMonitor == NULL) { 1629 return; 1630 } 1631 1632 if (mSwitchInProgress || mReconfigurationInProgress) { 1633 ALOGV("Switch/Reconfig in progress, defer switch down"); 1634 mSwitchDownMonitor->post(1000000ll); 1635 return; 1636 } 1637 1638 for (size_t i = 0; i < kMaxStreams; ++i) { 1639 int32_t targetDuration; 1640 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1641 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1642 1643 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1644 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1645 int64_t targetDurationUs = targetDuration * 1000000ll; 1646 1647 if (bufferedDurationUs < targetDurationUs / 3) { 1648 (new AMessage(kWhatSwitchDown, id()))->post(); 1649 break; 1650 } 1651 } 1652 } 1653 1654 mSwitchDownMonitor->post(1000000ll); 1655 } 1656 1657 void LiveSession::onSwitchDown() { 1658 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1659 return; 1660 } 1661 1662 ssize_t bandwidthIndex = getBandwidthIndex(); 1663 if (bandwidthIndex < mCurBandwidthIndex) { 1664 changeConfiguration(-1, bandwidthIndex, false); 1665 return; 1666 } 1667 1668 } 1669 1670 // Mark switch done when: 1671 // 1. all old buffers are swapped out 1672 void LiveSession::tryToFinishBandwidthSwitch() { 1673 if (!mSwitchInProgress) { 1674 return; 1675 } 1676 1677 bool needToRemoveFetchers = false; 1678 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1679 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1680 needToRemoveFetchers = true; 1681 break; 1682 } 1683 } 1684 1685 if (!needToRemoveFetchers && mSwapMask == 0) { 1686 ALOGI("mSwitchInProgress = false"); 1687 mStreamMask = mNewStreamMask; 1688 mSwitchInProgress = false; 1689 } 1690 } 1691 1692 void LiveSession::scheduleCheckBandwidthEvent() { 1693 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1694 msg->setInt32("generation", mCheckBandwidthGeneration); 1695 msg->post(10000000ll); 1696 } 1697 1698 void LiveSession::cancelCheckBandwidthEvent() { 1699 ++mCheckBandwidthGeneration; 1700 } 1701 1702 void LiveSession::cancelBandwidthSwitch() { 1703 Mutex::Autolock lock(mSwapMutex); 1704 mSwitchGeneration++; 1705 mSwitchInProgress = false; 1706 mSwapMask = 0; 1707 1708 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1709 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1710 if (info.mToBeRemoved) { 1711 info.mToBeRemoved = false; 1712 } 1713 } 1714 1715 for (size_t i = 0; i < kMaxStreams; ++i) { 1716 if (!mStreams[i].mNewUri.empty()) { 1717 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1718 if (j < 0) { 1719 mStreams[i].mNewUri.clear(); 1720 continue; 1721 } 1722 1723 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1724 info.mFetcher->stopAsync(); 1725 mFetcherInfos.removeItemsAt(j); 1726 mStreams[i].mNewUri.clear(); 1727 } 1728 } 1729 } 1730 1731 bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1732 if (mReconfigurationInProgress || mSwitchInProgress) { 1733 return false; 1734 } 1735 1736 if (mCurBandwidthIndex < 0) { 1737 return true; 1738 } 1739 1740 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1741 return false; 1742 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1743 return canSwitchUp(); 1744 } else { 1745 return true; 1746 } 1747 } 1748 1749 void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1750 size_t bandwidthIndex = getBandwidthIndex(); 1751 if (canSwitchBandwidthTo(bandwidthIndex)) { 1752 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1753 } else { 1754 // Come back and check again 10 seconds later in case there is nothing to do now. 1755 // If we DO change configuration, once that completes it'll schedule a new 1756 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1757 msg->post(10000000ll); 1758 } 1759 } 1760 1761 void LiveSession::postPrepared(status_t err) { 1762 CHECK(mInPreparationPhase); 1763 1764 sp<AMessage> notify = mNotify->dup(); 1765 if (err == OK || err == ERROR_END_OF_STREAM) { 1766 notify->setInt32("what", kWhatPrepared); 1767 } else { 1768 notify->setInt32("what", kWhatPreparationFailed); 1769 notify->setInt32("err", err); 1770 } 1771 1772 notify->post(); 1773 1774 mInPreparationPhase = false; 1775 1776 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1777 mSwitchDownMonitor->post(); 1778 } 1779 1780 } // namespace android 1781 1782