1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "LiveSession" 19 #include <utils/Log.h> 20 21 #include "LiveSession.h" 22 23 #include "M3UParser.h" 24 #include "PlaylistFetcher.h" 25 26 #include "include/HTTPBase.h" 27 #include "mpeg2ts/AnotherPacketSource.h" 28 29 #include <cutils/properties.h> 30 #include <media/stagefright/foundation/hexdump.h> 31 #include <media/stagefright/foundation/ABuffer.h> 32 #include <media/stagefright/foundation/ADebug.h> 33 #include <media/stagefright/foundation/AMessage.h> 34 #include <media/stagefright/DataSource.h> 35 #include <media/stagefright/FileSource.h> 36 #include <media/stagefright/MediaErrors.h> 37 #include <media/stagefright/MetaData.h> 38 #include <media/stagefright/Utils.h> 39 40 #include <utils/Mutex.h> 41 42 #include <ctype.h> 43 #include <openssl/aes.h> 44 #include <openssl/md5.h> 45 46 namespace android { 47 48 LiveSession::LiveSession( 49 const sp<AMessage> ¬ify, uint32_t flags, bool uidValid, uid_t uid) 50 : mNotify(notify), 51 mFlags(flags), 52 mUIDValid(uidValid), 53 mUID(uid), 54 mInPreparationPhase(true), 55 mHTTPDataSource( 56 HTTPBase::Create( 57 (mFlags & kFlagIncognito) 58 ? HTTPBase::kFlagIncognito 59 : 0)), 60 mPrevBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mLastDequeuedTimeUs(0ll), 67 mRealTimeBaseUs(0ll), 68 mReconfigurationInProgress(false), 69 mSwitchInProgress(false), 70 mDisconnectReplyID(0), 71 mSeekReplyID(0) { 72 if (mUIDValid) { 73 mHTTPDataSource->setUID(mUID); 74 } 75 76 mStreams[kAudioIndex] = StreamItem("audio"); 77 mStreams[kVideoIndex] = StreamItem("video"); 78 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 79 80 for (size_t i = 0; i < kMaxStreams; ++i) { 81 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 82 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 } 84 } 85 86 LiveSession::~LiveSession() { 87 } 88 89 sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 90 ABuffer *discontinuity = new ABuffer(0); 91 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 92 discontinuity->meta()->setInt32("swapPacketSource", swap); 93 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 94 discontinuity->meta()->setInt64("timeUs", -1); 95 return discontinuity; 96 } 97 98 void LiveSession::swapPacketSource(StreamType stream) { 99 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 100 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 101 sp<AnotherPacketSource> tmp = aps; 102 aps = aps2; 103 aps2 = tmp; 104 aps2->clear(); 105 } 106 107 status_t LiveSession::dequeueAccessUnit( 108 StreamType stream, sp<ABuffer> *accessUnit) { 109 if (!(mStreamMask & stream)) { 110 // return -EWOULDBLOCK to avoid halting the decoder 111 // when switching between audio/video and audio only. 112 return -EWOULDBLOCK; 113 } 114 115 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 116 117 status_t finalResult; 118 if (!packetSource->hasBufferAvailable(&finalResult)) { 119 return finalResult == OK ? -EAGAIN : finalResult; 120 } 121 122 status_t err = packetSource->dequeueAccessUnit(accessUnit); 123 124 const char *streamStr; 125 switch (stream) { 126 case STREAMTYPE_AUDIO: 127 streamStr = "audio"; 128 break; 129 case STREAMTYPE_VIDEO: 130 streamStr = "video"; 131 break; 132 case STREAMTYPE_SUBTITLES: 133 streamStr = "subs"; 134 break; 135 default: 136 TRESPASS(); 137 } 138 139 if (err == INFO_DISCONTINUITY) { 140 int32_t type; 141 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 142 143 sp<AMessage> extra; 144 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 145 extra.clear(); 146 } 147 148 ALOGI("[%s] read discontinuity of type %d, extra = %s", 149 streamStr, 150 type, 151 extra == NULL ? "NULL" : extra->debugString().c_str()); 152 153 int32_t swap; 154 if (type == ATSParser::DISCONTINUITY_FORMATCHANGE 155 && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) 156 && swap) { 157 158 int32_t switchGeneration; 159 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 160 { 161 Mutex::Autolock lock(mSwapMutex); 162 if (switchGeneration == mSwitchGeneration) { 163 swapPacketSource(stream); 164 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 165 msg->setInt32("stream", stream); 166 msg->setInt32("switchGeneration", switchGeneration); 167 msg->post(); 168 } 169 } 170 } 171 } else if (err == OK) { 172 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 173 int64_t timeUs; 174 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 175 ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs); 176 177 mLastDequeuedTimeUs = timeUs; 178 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 179 } else if (stream == STREAMTYPE_SUBTITLES) { 180 (*accessUnit)->meta()->setInt32( 181 "trackIndex", mPlaylist->getSelectedIndex()); 182 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 183 } 184 } else { 185 ALOGI("[%s] encountered error %d", streamStr, err); 186 } 187 188 return err; 189 } 190 191 status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 192 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 193 if (!(mStreamMask & stream)) { 194 return UNKNOWN_ERROR; 195 } 196 197 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 198 199 sp<MetaData> meta = packetSource->getFormat(); 200 201 if (meta == NULL) { 202 return -EAGAIN; 203 } 204 205 return convertMetaDataToMessage(meta, format); 206 } 207 208 void LiveSession::connectAsync( 209 const char *url, const KeyedVector<String8, String8> *headers) { 210 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 211 msg->setString("url", url); 212 213 if (headers != NULL) { 214 msg->setPointer( 215 "headers", 216 new KeyedVector<String8, String8>(*headers)); 217 } 218 219 msg->post(); 220 } 221 222 status_t LiveSession::disconnect() { 223 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 224 225 sp<AMessage> response; 226 status_t err = msg->postAndAwaitResponse(&response); 227 228 return err; 229 } 230 231 status_t LiveSession::seekTo(int64_t timeUs) { 232 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 233 msg->setInt64("timeUs", timeUs); 234 235 sp<AMessage> response; 236 status_t err = msg->postAndAwaitResponse(&response); 237 238 uint32_t replyID; 239 CHECK(response == mSeekReply && 0 != mSeekReplyID); 240 mSeekReply.clear(); 241 mSeekReplyID = 0; 242 return err; 243 } 244 245 void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 246 switch (msg->what()) { 247 case kWhatConnect: 248 { 249 onConnect(msg); 250 break; 251 } 252 253 case kWhatDisconnect: 254 { 255 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 256 257 if (mReconfigurationInProgress) { 258 break; 259 } 260 261 finishDisconnect(); 262 break; 263 } 264 265 case kWhatSeek: 266 { 267 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 268 269 status_t err = onSeek(msg); 270 271 mSeekReply = new AMessage; 272 mSeekReply->setInt32("err", err); 273 break; 274 } 275 276 case kWhatFetcherNotify: 277 { 278 int32_t what; 279 CHECK(msg->findInt32("what", &what)); 280 281 switch (what) { 282 case PlaylistFetcher::kWhatStarted: 283 break; 284 case PlaylistFetcher::kWhatPaused: 285 case PlaylistFetcher::kWhatStopped: 286 { 287 if (what == PlaylistFetcher::kWhatStopped) { 288 AString uri; 289 CHECK(msg->findString("uri", &uri)); 290 if (mFetcherInfos.removeItem(uri) < 0) { 291 // ignore duplicated kWhatStopped messages. 292 break; 293 } 294 295 tryToFinishBandwidthSwitch(); 296 } 297 298 if (mContinuation != NULL) { 299 CHECK_GT(mContinuationCounter, 0); 300 if (--mContinuationCounter == 0) { 301 mContinuation->post(); 302 303 if (mSeekReplyID != 0) { 304 CHECK(mSeekReply != NULL); 305 mSeekReply->postReply(mSeekReplyID); 306 } 307 } 308 } 309 break; 310 } 311 312 case PlaylistFetcher::kWhatDurationUpdate: 313 { 314 AString uri; 315 CHECK(msg->findString("uri", &uri)); 316 317 int64_t durationUs; 318 CHECK(msg->findInt64("durationUs", &durationUs)); 319 320 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 321 info->mDurationUs = durationUs; 322 break; 323 } 324 325 case PlaylistFetcher::kWhatError: 326 { 327 status_t err; 328 CHECK(msg->findInt32("err", &err)); 329 330 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 331 332 if (mInPreparationPhase) { 333 postPrepared(err); 334 } 335 336 cancelBandwidthSwitch(); 337 338 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 339 340 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 341 342 mPacketSources.valueFor( 343 STREAMTYPE_SUBTITLES)->signalEOS(err); 344 345 sp<AMessage> notify = mNotify->dup(); 346 notify->setInt32("what", kWhatError); 347 notify->setInt32("err", err); 348 notify->post(); 349 break; 350 } 351 352 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 353 { 354 AString uri; 355 CHECK(msg->findString("uri", &uri)); 356 357 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 358 info->mIsPrepared = true; 359 360 if (mInPreparationPhase) { 361 bool allFetchersPrepared = true; 362 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 363 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 364 allFetchersPrepared = false; 365 break; 366 } 367 } 368 369 if (allFetchersPrepared) { 370 postPrepared(OK); 371 } 372 } 373 break; 374 } 375 376 case PlaylistFetcher::kWhatStartedAt: 377 { 378 int32_t switchGeneration; 379 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 380 381 if (switchGeneration != mSwitchGeneration) { 382 break; 383 } 384 385 // Resume fetcher for the original variant; the resumed fetcher should 386 // continue until the timestamps found in msg, which is stored by the 387 // new fetcher to indicate where the new variant has started buffering. 388 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 389 const FetcherInfo info = mFetcherInfos.valueAt(i); 390 if (info.mToBeRemoved) { 391 info.mFetcher->resumeUntilAsync(msg); 392 } 393 } 394 break; 395 } 396 397 default: 398 TRESPASS(); 399 } 400 401 break; 402 } 403 404 case kWhatCheckBandwidth: 405 { 406 int32_t generation; 407 CHECK(msg->findInt32("generation", &generation)); 408 409 if (generation != mCheckBandwidthGeneration) { 410 break; 411 } 412 413 onCheckBandwidth(); 414 break; 415 } 416 417 case kWhatChangeConfiguration: 418 { 419 onChangeConfiguration(msg); 420 break; 421 } 422 423 case kWhatChangeConfiguration2: 424 { 425 onChangeConfiguration2(msg); 426 break; 427 } 428 429 case kWhatChangeConfiguration3: 430 { 431 onChangeConfiguration3(msg); 432 break; 433 } 434 435 case kWhatFinishDisconnect2: 436 { 437 onFinishDisconnect2(); 438 break; 439 } 440 441 case kWhatSwapped: 442 { 443 onSwapped(msg); 444 break; 445 } 446 default: 447 TRESPASS(); 448 break; 449 } 450 } 451 452 // static 453 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 454 if (a->mBandwidth < b->mBandwidth) { 455 return -1; 456 } else if (a->mBandwidth == b->mBandwidth) { 457 return 0; 458 } 459 460 return 1; 461 } 462 463 // static 464 LiveSession::StreamType LiveSession::indexToType(int idx) { 465 CHECK(idx >= 0 && idx < kMaxStreams); 466 return (StreamType)(1 << idx); 467 } 468 469 void LiveSession::onConnect(const sp<AMessage> &msg) { 470 AString url; 471 CHECK(msg->findString("url", &url)); 472 473 KeyedVector<String8, String8> *headers = NULL; 474 if (!msg->findPointer("headers", (void **)&headers)) { 475 mExtraHeaders.clear(); 476 } else { 477 mExtraHeaders = *headers; 478 479 delete headers; 480 headers = NULL; 481 } 482 483 #if 1 484 ALOGI("onConnect <URL suppressed>"); 485 #else 486 ALOGI("onConnect %s", url.c_str()); 487 #endif 488 489 mMasterURL = url; 490 491 bool dummy; 492 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 493 494 if (mPlaylist == NULL) { 495 ALOGE("unable to fetch master playlist '%s'.", url.c_str()); 496 497 postPrepared(ERROR_IO); 498 return; 499 } 500 501 // We trust the content provider to make a reasonable choice of preferred 502 // initial bandwidth by listing it first in the variant playlist. 503 // At startup we really don't have a good estimate on the available 504 // network bandwidth since we haven't tranferred any data yet. Once 505 // we have we can make a better informed choice. 506 size_t initialBandwidth = 0; 507 size_t initialBandwidthIndex = 0; 508 509 if (mPlaylist->isVariantPlaylist()) { 510 for (size_t i = 0; i < mPlaylist->size(); ++i) { 511 BandwidthItem item; 512 513 item.mPlaylistIndex = i; 514 515 sp<AMessage> meta; 516 AString uri; 517 mPlaylist->itemAt(i, &uri, &meta); 518 519 unsigned long bandwidth; 520 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 521 522 if (initialBandwidth == 0) { 523 initialBandwidth = item.mBandwidth; 524 } 525 526 mBandwidthItems.push(item); 527 } 528 529 CHECK_GT(mBandwidthItems.size(), 0u); 530 531 mBandwidthItems.sort(SortByBandwidth); 532 533 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 534 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 535 initialBandwidthIndex = i; 536 break; 537 } 538 } 539 } else { 540 // dummy item. 541 BandwidthItem item; 542 item.mPlaylistIndex = 0; 543 item.mBandwidth = 0; 544 mBandwidthItems.push(item); 545 } 546 547 changeConfiguration( 548 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 549 } 550 551 void LiveSession::finishDisconnect() { 552 // No reconfiguration is currently pending, make sure none will trigger 553 // during disconnection either. 554 cancelCheckBandwidthEvent(); 555 556 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 557 // (finishDisconnect, onFinishDisconnect2) 558 cancelBandwidthSwitch(); 559 560 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 561 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 562 } 563 564 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 565 566 mContinuationCounter = mFetcherInfos.size(); 567 mContinuation = msg; 568 569 if (mContinuationCounter == 0) { 570 msg->post(); 571 } 572 } 573 574 void LiveSession::onFinishDisconnect2() { 575 mContinuation.clear(); 576 577 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 578 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 579 580 mPacketSources.valueFor( 581 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 582 583 sp<AMessage> response = new AMessage; 584 response->setInt32("err", OK); 585 586 response->postReply(mDisconnectReplyID); 587 mDisconnectReplyID = 0; 588 } 589 590 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 591 ssize_t index = mFetcherInfos.indexOfKey(uri); 592 593 if (index >= 0) { 594 return NULL; 595 } 596 597 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 598 notify->setString("uri", uri); 599 notify->setInt32("switchGeneration", mSwitchGeneration); 600 601 FetcherInfo info; 602 info.mFetcher = new PlaylistFetcher(notify, this, uri); 603 info.mDurationUs = -1ll; 604 info.mIsPrepared = false; 605 info.mToBeRemoved = false; 606 looper()->registerHandler(info.mFetcher); 607 608 mFetcherInfos.add(uri, info); 609 610 return info.mFetcher; 611 } 612 613 /* 614 * Illustration of parameters: 615 * 616 * 0 `range_offset` 617 * +------------+-------------------------------------------------------+--+--+ 618 * | | | next block to fetch | | | 619 * | | `source` handle => `out` buffer | | | | 620 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 621 * | |<----------- `range_length` / buffer capacity ----------->| | 622 * |<------------------------------ file_size ------------------------------->| 623 * 624 * Special parameter values: 625 * - range_length == -1 means entire file 626 * - block_size == 0 means entire range 627 * 628 */ 629 ssize_t LiveSession::fetchFile( 630 const char *url, sp<ABuffer> *out, 631 int64_t range_offset, int64_t range_length, 632 uint32_t block_size, /* download block size */ 633 sp<DataSource> *source, /* to return and reuse source */ 634 String8 *actualUrl) { 635 off64_t size; 636 sp<DataSource> temp_source; 637 if (source == NULL) { 638 source = &temp_source; 639 } 640 641 if (*source == NULL) { 642 if (!strncasecmp(url, "file://", 7)) { 643 *source = new FileSource(url + 7); 644 } else if (strncasecmp(url, "http://", 7) 645 && strncasecmp(url, "https://", 8)) { 646 return ERROR_UNSUPPORTED; 647 } else { 648 KeyedVector<String8, String8> headers = mExtraHeaders; 649 if (range_offset > 0 || range_length >= 0) { 650 headers.add( 651 String8("Range"), 652 String8( 653 StringPrintf( 654 "bytes=%lld-%s", 655 range_offset, 656 range_length < 0 657 ? "" : StringPrintf("%lld", 658 range_offset + range_length - 1).c_str()).c_str())); 659 } 660 status_t err = mHTTPDataSource->connect(url, &headers); 661 662 if (err != OK) { 663 return err; 664 } 665 666 *source = mHTTPDataSource; 667 } 668 } 669 670 status_t getSizeErr = (*source)->getSize(&size); 671 if (getSizeErr != OK) { 672 size = 65536; 673 } 674 675 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 676 if (*out == NULL) { 677 buffer->setRange(0, 0); 678 } 679 680 ssize_t bytesRead = 0; 681 // adjust range_length if only reading partial block 682 if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) { 683 range_length = buffer->size() + block_size; 684 } 685 for (;;) { 686 // Only resize when we don't know the size. 687 size_t bufferRemaining = buffer->capacity() - buffer->size(); 688 if (bufferRemaining == 0 && getSizeErr != OK) { 689 bufferRemaining = 32768; 690 691 ALOGV("increasing download buffer to %d bytes", 692 buffer->size() + bufferRemaining); 693 694 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 695 memcpy(copy->data(), buffer->data(), buffer->size()); 696 copy->setRange(0, buffer->size()); 697 698 buffer = copy; 699 } 700 701 size_t maxBytesToRead = bufferRemaining; 702 if (range_length >= 0) { 703 int64_t bytesLeftInRange = range_length - buffer->size(); 704 if (bytesLeftInRange < maxBytesToRead) { 705 maxBytesToRead = bytesLeftInRange; 706 707 if (bytesLeftInRange == 0) { 708 break; 709 } 710 } 711 } 712 713 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 714 // to help us break out of the loop. 715 ssize_t n = (*source)->readAt( 716 buffer->size(), buffer->data() + buffer->size(), 717 maxBytesToRead); 718 719 if (n < 0) { 720 return n; 721 } 722 723 if (n == 0) { 724 break; 725 } 726 727 buffer->setRange(0, buffer->size() + (size_t)n); 728 bytesRead += n; 729 } 730 731 *out = buffer; 732 if (actualUrl != NULL) { 733 *actualUrl = (*source)->getUri(); 734 if (actualUrl->isEmpty()) { 735 *actualUrl = url; 736 } 737 } 738 739 return bytesRead; 740 } 741 742 sp<M3UParser> LiveSession::fetchPlaylist( 743 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 744 ALOGV("fetchPlaylist '%s'", url); 745 746 *unchanged = false; 747 748 sp<ABuffer> buffer; 749 String8 actualUrl; 750 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 751 752 if (err <= 0) { 753 return NULL; 754 } 755 756 // MD5 functionality is not available on the simulator, treat all 757 // playlists as changed. 758 759 #if defined(HAVE_ANDROID_OS) 760 uint8_t hash[16]; 761 762 MD5_CTX m; 763 MD5_Init(&m); 764 MD5_Update(&m, buffer->data(), buffer->size()); 765 766 MD5_Final(hash, &m); 767 768 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 769 // playlist unchanged 770 *unchanged = true; 771 772 ALOGV("Playlist unchanged, refresh state is now %d", 773 (int)mRefreshState); 774 775 return NULL; 776 } 777 778 if (curPlaylistHash != NULL) { 779 memcpy(curPlaylistHash, hash, sizeof(hash)); 780 } 781 #endif 782 783 sp<M3UParser> playlist = 784 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 785 786 if (playlist->initCheck() != OK) { 787 ALOGE("failed to parse .m3u8 playlist"); 788 789 return NULL; 790 } 791 792 return playlist; 793 } 794 795 static double uniformRand() { 796 return (double)rand() / RAND_MAX; 797 } 798 799 size_t LiveSession::getBandwidthIndex() { 800 if (mBandwidthItems.size() == 0) { 801 return 0; 802 } 803 804 #if 1 805 char value[PROPERTY_VALUE_MAX]; 806 ssize_t index = -1; 807 if (property_get("media.httplive.bw-index", value, NULL)) { 808 char *end; 809 index = strtol(value, &end, 10); 810 CHECK(end > value && *end == '\0'); 811 812 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 813 index = mBandwidthItems.size() - 1; 814 } 815 } 816 817 if (index < 0) { 818 int32_t bandwidthBps; 819 if (mHTTPDataSource != NULL 820 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 821 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 822 } else { 823 ALOGV("no bandwidth estimate."); 824 return 0; // Pick the lowest bandwidth stream by default. 825 } 826 827 char value[PROPERTY_VALUE_MAX]; 828 if (property_get("media.httplive.max-bw", value, NULL)) { 829 char *end; 830 long maxBw = strtoul(value, &end, 10); 831 if (end > value && *end == '\0') { 832 if (maxBw > 0 && bandwidthBps > maxBw) { 833 ALOGV("bandwidth capped to %ld bps", maxBw); 834 bandwidthBps = maxBw; 835 } 836 } 837 } 838 839 // Consider only 80% of the available bandwidth usable. 840 bandwidthBps = (bandwidthBps * 8) / 10; 841 842 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 843 844 index = mBandwidthItems.size() - 1; 845 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 846 > (size_t)bandwidthBps) { 847 --index; 848 } 849 } 850 #elif 0 851 // Change bandwidth at random() 852 size_t index = uniformRand() * mBandwidthItems.size(); 853 #elif 0 854 // There's a 50% chance to stay on the current bandwidth and 855 // a 50% chance to switch to the next higher bandwidth (wrapping around 856 // to lowest) 857 const size_t kMinIndex = 0; 858 859 static ssize_t mPrevBandwidthIndex = -1; 860 861 size_t index; 862 if (mPrevBandwidthIndex < 0) { 863 index = kMinIndex; 864 } else if (uniformRand() < 0.5) { 865 index = (size_t)mPrevBandwidthIndex; 866 } else { 867 index = mPrevBandwidthIndex + 1; 868 if (index == mBandwidthItems.size()) { 869 index = kMinIndex; 870 } 871 } 872 mPrevBandwidthIndex = index; 873 #elif 0 874 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 875 876 size_t index = mBandwidthItems.size() - 1; 877 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 878 --index; 879 } 880 #elif 1 881 char value[PROPERTY_VALUE_MAX]; 882 size_t index; 883 if (property_get("media.httplive.bw-index", value, NULL)) { 884 char *end; 885 index = strtoul(value, &end, 10); 886 CHECK(end > value && *end == '\0'); 887 888 if (index >= mBandwidthItems.size()) { 889 index = mBandwidthItems.size() - 1; 890 } 891 } else { 892 index = 0; 893 } 894 #else 895 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 896 #endif 897 898 CHECK_GE(index, 0); 899 900 return index; 901 } 902 903 status_t LiveSession::onSeek(const sp<AMessage> &msg) { 904 int64_t timeUs; 905 CHECK(msg->findInt64("timeUs", &timeUs)); 906 907 if (!mReconfigurationInProgress) { 908 changeConfiguration(timeUs, getBandwidthIndex()); 909 } 910 911 return OK; 912 } 913 914 status_t LiveSession::getDuration(int64_t *durationUs) const { 915 int64_t maxDurationUs = 0ll; 916 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 917 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 918 919 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 920 maxDurationUs = fetcherDurationUs; 921 } 922 } 923 924 *durationUs = maxDurationUs; 925 926 return OK; 927 } 928 929 bool LiveSession::isSeekable() const { 930 int64_t durationUs; 931 return getDuration(&durationUs) == OK && durationUs >= 0; 932 } 933 934 bool LiveSession::hasDynamicDuration() const { 935 return false; 936 } 937 938 status_t LiveSession::getTrackInfo(Parcel *reply) const { 939 return mPlaylist->getTrackInfo(reply); 940 } 941 942 status_t LiveSession::selectTrack(size_t index, bool select) { 943 status_t err = mPlaylist->selectTrack(index, select); 944 if (err == OK) { 945 (new AMessage(kWhatChangeConfiguration, id()))->post(); 946 } 947 return err; 948 } 949 950 bool LiveSession::canSwitchUp() { 951 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 952 status_t err = OK; 953 for (size_t i = 0; i < mPacketSources.size(); ++i) { 954 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 955 int64_t dur = source->getBufferedDurationUs(&err); 956 if (err == OK && dur > 10000000) { 957 return true; 958 } 959 } 960 return false; 961 } 962 963 void LiveSession::changeConfiguration( 964 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 965 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 966 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 967 cancelBandwidthSwitch(); 968 969 CHECK(!mReconfigurationInProgress); 970 mReconfigurationInProgress = true; 971 972 mPrevBandwidthIndex = bandwidthIndex; 973 974 ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d", 975 timeUs, bandwidthIndex, pickTrack); 976 977 if (pickTrack) { 978 mPlaylist->pickRandomMediaItems(); 979 } 980 981 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 982 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 983 984 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 985 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 986 987 AString URIs[kMaxStreams]; 988 for (size_t i = 0; i < kMaxStreams; ++i) { 989 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 990 streamMask |= indexToType(i); 991 } 992 } 993 994 // Step 1, stop and discard fetchers that are no longer needed. 995 // Pause those that we'll reuse. 996 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 997 const AString &uri = mFetcherInfos.keyAt(i); 998 999 bool discardFetcher = true; 1000 1001 // If we're seeking all current fetchers are discarded. 1002 if (timeUs < 0ll) { 1003 // delay fetcher removal 1004 discardFetcher = false; 1005 1006 for (size_t j = 0; j < kMaxStreams; ++j) { 1007 StreamType type = indexToType(j); 1008 if ((streamMask & type) && uri == URIs[j]) { 1009 resumeMask |= type; 1010 streamMask &= ~type; 1011 } 1012 } 1013 } 1014 1015 if (discardFetcher) { 1016 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1017 } else { 1018 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1019 } 1020 } 1021 1022 sp<AMessage> msg; 1023 if (timeUs < 0ll) { 1024 // skip onChangeConfiguration2 (decoder destruction) if switching. 1025 msg = new AMessage(kWhatChangeConfiguration3, id()); 1026 } else { 1027 msg = new AMessage(kWhatChangeConfiguration2, id()); 1028 } 1029 msg->setInt32("streamMask", streamMask); 1030 msg->setInt32("resumeMask", resumeMask); 1031 msg->setInt64("timeUs", timeUs); 1032 for (size_t i = 0; i < kMaxStreams; ++i) { 1033 if (streamMask & indexToType(i)) { 1034 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1035 } 1036 } 1037 1038 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1039 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1040 // fetchers have completed their asynchronous operation, we'll post 1041 // mContinuation, which then is handled below in onChangeConfiguration2. 1042 mContinuationCounter = mFetcherInfos.size(); 1043 mContinuation = msg; 1044 1045 if (mContinuationCounter == 0) { 1046 msg->post(); 1047 1048 if (mSeekReplyID != 0) { 1049 CHECK(mSeekReply != NULL); 1050 mSeekReply->postReply(mSeekReplyID); 1051 } 1052 } 1053 } 1054 1055 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1056 if (!mReconfigurationInProgress) { 1057 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 1058 } else { 1059 msg->post(1000000ll); // retry in 1 sec 1060 } 1061 } 1062 1063 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1064 mContinuation.clear(); 1065 1066 // All fetchers are either suspended or have been removed now. 1067 1068 uint32_t streamMask; 1069 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1070 1071 AString URIs[kMaxStreams]; 1072 for (size_t i = 0; i < kMaxStreams; ++i) { 1073 if (streamMask & indexToType(i)) { 1074 const AString &uriKey = mStreams[i].uriKey(); 1075 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1076 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1077 } 1078 } 1079 1080 // Determine which decoders to shutdown on the player side, 1081 // a decoder has to be shutdown if either 1082 // 1) its streamtype was active before but now longer isn't. 1083 // or 1084 // 2) its streamtype was already active and still is but the URI 1085 // has changed. 1086 uint32_t changedMask = 0; 1087 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1088 if (((mStreamMask & streamMask & indexToType(i)) 1089 && !(URIs[i] == mStreams[i].mUri)) 1090 || (mStreamMask & ~streamMask & indexToType(i))) { 1091 changedMask |= indexToType(i); 1092 } 1093 } 1094 1095 if (changedMask == 0) { 1096 // If nothing changed as far as the audio/video decoders 1097 // are concerned we can proceed. 1098 onChangeConfiguration3(msg); 1099 return; 1100 } 1101 1102 // Something changed, inform the player which will shutdown the 1103 // corresponding decoders and will post the reply once that's done. 1104 // Handling the reply will continue executing below in 1105 // onChangeConfiguration3. 1106 sp<AMessage> notify = mNotify->dup(); 1107 notify->setInt32("what", kWhatStreamsChanged); 1108 notify->setInt32("changedMask", changedMask); 1109 1110 msg->setWhat(kWhatChangeConfiguration3); 1111 msg->setTarget(id()); 1112 1113 notify->setMessage("reply", msg); 1114 notify->post(); 1115 } 1116 1117 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1118 mContinuation.clear(); 1119 // All remaining fetchers are still suspended, the player has shutdown 1120 // any decoders that needed it. 1121 1122 uint32_t streamMask, resumeMask; 1123 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1124 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1125 1126 for (size_t i = 0; i < kMaxStreams; ++i) { 1127 if (streamMask & indexToType(i)) { 1128 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1129 } 1130 } 1131 1132 int64_t timeUs; 1133 bool switching = false; 1134 CHECK(msg->findInt64("timeUs", &timeUs)); 1135 1136 if (timeUs < 0ll) { 1137 timeUs = mLastDequeuedTimeUs; 1138 switching = true; 1139 } 1140 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1141 1142 mNewStreamMask = streamMask; 1143 1144 // Of all existing fetchers: 1145 // * Resume fetchers that are still needed and assign them original packet sources. 1146 // * Mark otherwise unneeded fetchers for removal. 1147 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1148 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1149 const AString &uri = mFetcherInfos.keyAt(i); 1150 1151 sp<AnotherPacketSource> sources[kMaxStreams]; 1152 for (size_t j = 0; j < kMaxStreams; ++j) { 1153 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1154 sources[j] = mPacketSources.valueFor(indexToType(j)); 1155 } 1156 } 1157 1158 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1159 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1160 || sources[kSubtitleIndex] != NULL) { 1161 info.mFetcher->startAsync( 1162 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1163 } else { 1164 info.mToBeRemoved = true; 1165 } 1166 } 1167 1168 // streamMask now only contains the types that need a new fetcher created. 1169 1170 if (streamMask != 0) { 1171 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1172 } 1173 1174 // Find out when the original fetchers have buffered up to and start the new fetchers 1175 // at a later timestamp. 1176 for (size_t i = 0; i < kMaxStreams; i++) { 1177 if (!(indexToType(i) & streamMask)) { 1178 continue; 1179 } 1180 1181 AString uri; 1182 uri = mStreams[i].mUri; 1183 1184 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1185 CHECK(fetcher != NULL); 1186 1187 int32_t latestSeq = -1; 1188 int64_t latestTimeUs = 0ll; 1189 sp<AnotherPacketSource> sources[kMaxStreams]; 1190 1191 // TRICKY: looping from i as earlier streams are already removed from streamMask 1192 for (size_t j = i; j < kMaxStreams; ++j) { 1193 if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { 1194 sources[j] = mPacketSources.valueFor(indexToType(j)); 1195 1196 if (!switching) { 1197 sources[j]->clear(); 1198 } else { 1199 int32_t type, seq; 1200 int64_t srcTimeUs; 1201 sp<AMessage> meta = sources[j]->getLatestMeta(); 1202 1203 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1204 CHECK(meta->findInt32("seq", &seq)); 1205 if (seq > latestSeq) { 1206 latestSeq = seq; 1207 } 1208 CHECK(meta->findInt64("timeUs", &srcTimeUs)); 1209 if (srcTimeUs > latestTimeUs) { 1210 latestTimeUs = srcTimeUs; 1211 } 1212 } 1213 1214 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1215 sources[j]->clear(); 1216 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1217 if (extraStreams & indexToType(j)) { 1218 sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); 1219 } 1220 } 1221 1222 streamMask &= ~indexToType(j); 1223 } 1224 } 1225 1226 fetcher->startAsync( 1227 sources[kAudioIndex], 1228 sources[kVideoIndex], 1229 sources[kSubtitleIndex], 1230 timeUs, 1231 latestTimeUs /* min start time(us) */, 1232 latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); 1233 } 1234 1235 // All fetchers have now been started, the configuration change 1236 // has completed. 1237 1238 scheduleCheckBandwidthEvent(); 1239 1240 ALOGV("XXX configuration change completed."); 1241 mReconfigurationInProgress = false; 1242 if (switching) { 1243 mSwitchInProgress = true; 1244 } else { 1245 mStreamMask = mNewStreamMask; 1246 } 1247 1248 if (mDisconnectReplyID != 0) { 1249 finishDisconnect(); 1250 } 1251 } 1252 1253 void LiveSession::onSwapped(const sp<AMessage> &msg) { 1254 int32_t switchGeneration; 1255 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1256 if (switchGeneration != mSwitchGeneration) { 1257 return; 1258 } 1259 1260 int32_t stream; 1261 CHECK(msg->findInt32("stream", &stream)); 1262 mSwapMask |= stream; 1263 if (mSwapMask != mStreamMask) { 1264 return; 1265 } 1266 1267 // Check if new variant contains extra streams. 1268 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1269 while (extraStreams) { 1270 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1271 swapPacketSource(extraStream); 1272 extraStreams &= ~extraStream; 1273 } 1274 1275 tryToFinishBandwidthSwitch(); 1276 } 1277 1278 // Mark switch done when: 1279 // 1. all old buffers are swapped out, AND 1280 // 2. all old fetchers are removed. 1281 void LiveSession::tryToFinishBandwidthSwitch() { 1282 bool needToRemoveFetchers = false; 1283 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1284 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1285 needToRemoveFetchers = true; 1286 break; 1287 } 1288 } 1289 if (!needToRemoveFetchers && mSwapMask == mStreamMask) { 1290 mStreamMask = mNewStreamMask; 1291 mSwitchInProgress = false; 1292 mSwapMask = 0; 1293 } 1294 } 1295 1296 void LiveSession::scheduleCheckBandwidthEvent() { 1297 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1298 msg->setInt32("generation", mCheckBandwidthGeneration); 1299 msg->post(10000000ll); 1300 } 1301 1302 void LiveSession::cancelCheckBandwidthEvent() { 1303 ++mCheckBandwidthGeneration; 1304 } 1305 1306 void LiveSession::cancelBandwidthSwitch() { 1307 Mutex::Autolock lock(mSwapMutex); 1308 mSwitchGeneration++; 1309 mSwitchInProgress = false; 1310 mSwapMask = 0; 1311 } 1312 1313 bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1314 if (mReconfigurationInProgress || mSwitchInProgress) { 1315 return false; 1316 } 1317 1318 if (mPrevBandwidthIndex < 0) { 1319 return true; 1320 } 1321 1322 if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { 1323 return false; 1324 } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { 1325 return canSwitchUp(); 1326 } else { 1327 return true; 1328 } 1329 } 1330 1331 void LiveSession::onCheckBandwidth() { 1332 size_t bandwidthIndex = getBandwidthIndex(); 1333 if (canSwitchBandwidthTo(bandwidthIndex)) { 1334 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1335 } else { 1336 scheduleCheckBandwidthEvent(); 1337 } 1338 1339 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1340 // schedule another one on return, only an explicit call to 1341 // scheduleCheckBandwidthEvent will do that. 1342 // This ensures that only one configuration change is ongoing at any 1343 // one time, once that completes it'll schedule another check bandwidth 1344 // event. 1345 } 1346 1347 void LiveSession::postPrepared(status_t err) { 1348 CHECK(mInPreparationPhase); 1349 1350 sp<AMessage> notify = mNotify->dup(); 1351 if (err == OK || err == ERROR_END_OF_STREAM) { 1352 notify->setInt32("what", kWhatPrepared); 1353 } else { 1354 notify->setInt32("what", kWhatPreparationFailed); 1355 notify->setInt32("err", err); 1356 } 1357 1358 notify->post(); 1359 1360 mInPreparationPhase = false; 1361 } 1362 1363 } // namespace android 1364 1365