1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "LiveSession" 19 #include <utils/Log.h> 20 21 #include "LiveSession.h" 22 23 #include "M3UParser.h" 24 #include "PlaylistFetcher.h" 25 26 #include "include/HTTPBase.h" 27 #include "mpeg2ts/AnotherPacketSource.h" 28 29 #include <cutils/properties.h> 30 #include <media/stagefright/foundation/hexdump.h> 31 #include <media/stagefright/foundation/ABuffer.h> 32 #include <media/stagefright/foundation/ADebug.h> 33 #include <media/stagefright/foundation/AMessage.h> 34 #include <media/stagefright/DataSource.h> 35 #include <media/stagefright/FileSource.h> 36 #include <media/stagefright/MediaErrors.h> 37 #include <media/stagefright/MetaData.h> 38 #include <media/stagefright/Utils.h> 39 40 #include <ctype.h> 41 #include <openssl/aes.h> 42 #include <openssl/md5.h> 43 44 namespace android { 45 46 LiveSession::LiveSession( 47 const sp<AMessage> ¬ify, uint32_t flags, bool uidValid, uid_t uid) 48 : mNotify(notify), 49 mFlags(flags), 50 mUIDValid(uidValid), 51 mUID(uid), 52 mInPreparationPhase(true), 53 mHTTPDataSource( 54 HTTPBase::Create( 55 (mFlags & kFlagIncognito) 56 ? HTTPBase::kFlagIncognito 57 : 0)), 58 mPrevBandwidthIndex(-1), 59 mStreamMask(0), 60 mCheckBandwidthGeneration(0), 61 mLastDequeuedTimeUs(0ll), 62 mRealTimeBaseUs(0ll), 63 mReconfigurationInProgress(false), 64 mDisconnectReplyID(0) { 65 if (mUIDValid) { 66 mHTTPDataSource->setUID(mUID); 67 } 68 69 mPacketSources.add( 70 STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */)); 71 72 mPacketSources.add( 73 STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */)); 74 75 mPacketSources.add( 76 STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */)); 77 } 78 79 LiveSession::~LiveSession() { 80 } 81 82 status_t LiveSession::dequeueAccessUnit( 83 StreamType stream, sp<ABuffer> *accessUnit) { 84 if (!(mStreamMask & stream)) { 85 return UNKNOWN_ERROR; 86 } 87 88 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 89 90 status_t finalResult; 91 if (!packetSource->hasBufferAvailable(&finalResult)) { 92 return finalResult == OK ? -EAGAIN : finalResult; 93 } 94 95 status_t err = packetSource->dequeueAccessUnit(accessUnit); 96 97 const char *streamStr; 98 switch (stream) { 99 case STREAMTYPE_AUDIO: 100 streamStr = "audio"; 101 break; 102 case STREAMTYPE_VIDEO: 103 streamStr = "video"; 104 break; 105 case STREAMTYPE_SUBTITLES: 106 streamStr = "subs"; 107 break; 108 default: 109 TRESPASS(); 110 } 111 112 if (err == INFO_DISCONTINUITY) { 113 int32_t type; 114 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 115 116 sp<AMessage> extra; 117 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 118 extra.clear(); 119 } 120 121 ALOGI("[%s] read discontinuity of type %d, extra = %s", 122 streamStr, 123 type, 124 extra == NULL ? "NULL" : extra->debugString().c_str()); 125 } else if (err == OK) { 126 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 127 int64_t timeUs; 128 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 129 ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs); 130 131 mLastDequeuedTimeUs = timeUs; 132 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 133 } else if (stream == STREAMTYPE_SUBTITLES) { 134 (*accessUnit)->meta()->setInt32( 135 "trackIndex", mPlaylist->getSelectedIndex()); 136 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 137 } 138 } else { 139 ALOGI("[%s] encountered error %d", streamStr, err); 140 } 141 142 return err; 143 } 144 145 status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 146 if (!(mStreamMask & stream)) { 147 return UNKNOWN_ERROR; 148 } 149 150 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 151 152 sp<MetaData> meta = packetSource->getFormat(); 153 154 if (meta == NULL) { 155 return -EAGAIN; 156 } 157 158 return convertMetaDataToMessage(meta, format); 159 } 160 161 void LiveSession::connectAsync( 162 const char *url, const KeyedVector<String8, String8> *headers) { 163 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 164 msg->setString("url", url); 165 166 if (headers != NULL) { 167 msg->setPointer( 168 "headers", 169 new KeyedVector<String8, String8>(*headers)); 170 } 171 172 msg->post(); 173 } 174 175 status_t LiveSession::disconnect() { 176 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 177 178 sp<AMessage> response; 179 status_t err = msg->postAndAwaitResponse(&response); 180 181 return err; 182 } 183 184 status_t LiveSession::seekTo(int64_t timeUs) { 185 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 186 msg->setInt64("timeUs", timeUs); 187 188 sp<AMessage> response; 189 status_t err = msg->postAndAwaitResponse(&response); 190 191 return err; 192 } 193 194 void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 195 switch (msg->what()) { 196 case kWhatConnect: 197 { 198 onConnect(msg); 199 break; 200 } 201 202 case kWhatDisconnect: 203 { 204 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 205 206 if (mReconfigurationInProgress) { 207 break; 208 } 209 210 finishDisconnect(); 211 break; 212 } 213 214 case kWhatSeek: 215 { 216 uint32_t replyID; 217 CHECK(msg->senderAwaitsResponse(&replyID)); 218 219 status_t err = onSeek(msg); 220 221 sp<AMessage> response = new AMessage; 222 response->setInt32("err", err); 223 224 response->postReply(replyID); 225 break; 226 } 227 228 case kWhatFetcherNotify: 229 { 230 int32_t what; 231 CHECK(msg->findInt32("what", &what)); 232 233 switch (what) { 234 case PlaylistFetcher::kWhatStarted: 235 break; 236 case PlaylistFetcher::kWhatPaused: 237 case PlaylistFetcher::kWhatStopped: 238 { 239 if (what == PlaylistFetcher::kWhatStopped) { 240 AString uri; 241 CHECK(msg->findString("uri", &uri)); 242 mFetcherInfos.removeItem(uri); 243 } 244 245 if (mContinuation != NULL) { 246 CHECK_GT(mContinuationCounter, 0); 247 if (--mContinuationCounter == 0) { 248 mContinuation->post(); 249 } 250 } 251 break; 252 } 253 254 case PlaylistFetcher::kWhatDurationUpdate: 255 { 256 AString uri; 257 CHECK(msg->findString("uri", &uri)); 258 259 int64_t durationUs; 260 CHECK(msg->findInt64("durationUs", &durationUs)); 261 262 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 263 info->mDurationUs = durationUs; 264 break; 265 } 266 267 case PlaylistFetcher::kWhatError: 268 { 269 status_t err; 270 CHECK(msg->findInt32("err", &err)); 271 272 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 273 274 if (mInPreparationPhase) { 275 postPrepared(err); 276 } 277 278 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 279 280 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 281 282 mPacketSources.valueFor( 283 STREAMTYPE_SUBTITLES)->signalEOS(err); 284 285 sp<AMessage> notify = mNotify->dup(); 286 notify->setInt32("what", kWhatError); 287 notify->setInt32("err", err); 288 notify->post(); 289 break; 290 } 291 292 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 293 { 294 AString uri; 295 CHECK(msg->findString("uri", &uri)); 296 297 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 298 info->mIsPrepared = true; 299 300 if (mInPreparationPhase) { 301 bool allFetchersPrepared = true; 302 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 303 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 304 allFetchersPrepared = false; 305 break; 306 } 307 } 308 309 if (allFetchersPrepared) { 310 postPrepared(OK); 311 } 312 } 313 break; 314 } 315 316 default: 317 TRESPASS(); 318 } 319 320 break; 321 } 322 323 case kWhatCheckBandwidth: 324 { 325 int32_t generation; 326 CHECK(msg->findInt32("generation", &generation)); 327 328 if (generation != mCheckBandwidthGeneration) { 329 break; 330 } 331 332 onCheckBandwidth(); 333 break; 334 } 335 336 case kWhatChangeConfiguration: 337 { 338 onChangeConfiguration(msg); 339 break; 340 } 341 342 case kWhatChangeConfiguration2: 343 { 344 onChangeConfiguration2(msg); 345 break; 346 } 347 348 case kWhatChangeConfiguration3: 349 { 350 onChangeConfiguration3(msg); 351 break; 352 } 353 354 case kWhatFinishDisconnect2: 355 { 356 onFinishDisconnect2(); 357 break; 358 } 359 360 default: 361 TRESPASS(); 362 break; 363 } 364 } 365 366 // static 367 int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 368 if (a->mBandwidth < b->mBandwidth) { 369 return -1; 370 } else if (a->mBandwidth == b->mBandwidth) { 371 return 0; 372 } 373 374 return 1; 375 } 376 377 void LiveSession::onConnect(const sp<AMessage> &msg) { 378 AString url; 379 CHECK(msg->findString("url", &url)); 380 381 KeyedVector<String8, String8> *headers = NULL; 382 if (!msg->findPointer("headers", (void **)&headers)) { 383 mExtraHeaders.clear(); 384 } else { 385 mExtraHeaders = *headers; 386 387 delete headers; 388 headers = NULL; 389 } 390 391 #if 1 392 ALOGI("onConnect <URL suppressed>"); 393 #else 394 ALOGI("onConnect %s", url.c_str()); 395 #endif 396 397 mMasterURL = url; 398 399 bool dummy; 400 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 401 402 if (mPlaylist == NULL) { 403 ALOGE("unable to fetch master playlist '%s'.", url.c_str()); 404 405 postPrepared(ERROR_IO); 406 return; 407 } 408 409 // We trust the content provider to make a reasonable choice of preferred 410 // initial bandwidth by listing it first in the variant playlist. 411 // At startup we really don't have a good estimate on the available 412 // network bandwidth since we haven't tranferred any data yet. Once 413 // we have we can make a better informed choice. 414 size_t initialBandwidth = 0; 415 size_t initialBandwidthIndex = 0; 416 417 if (mPlaylist->isVariantPlaylist()) { 418 for (size_t i = 0; i < mPlaylist->size(); ++i) { 419 BandwidthItem item; 420 421 item.mPlaylistIndex = i; 422 423 sp<AMessage> meta; 424 AString uri; 425 mPlaylist->itemAt(i, &uri, &meta); 426 427 unsigned long bandwidth; 428 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 429 430 if (initialBandwidth == 0) { 431 initialBandwidth = item.mBandwidth; 432 } 433 434 mBandwidthItems.push(item); 435 } 436 437 CHECK_GT(mBandwidthItems.size(), 0u); 438 439 mBandwidthItems.sort(SortByBandwidth); 440 441 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 442 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 443 initialBandwidthIndex = i; 444 break; 445 } 446 } 447 } else { 448 // dummy item. 449 BandwidthItem item; 450 item.mPlaylistIndex = 0; 451 item.mBandwidth = 0; 452 mBandwidthItems.push(item); 453 } 454 455 changeConfiguration( 456 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 457 } 458 459 void LiveSession::finishDisconnect() { 460 // No reconfiguration is currently pending, make sure none will trigger 461 // during disconnection either. 462 cancelCheckBandwidthEvent(); 463 464 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 465 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 466 } 467 468 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 469 470 mContinuationCounter = mFetcherInfos.size(); 471 mContinuation = msg; 472 473 if (mContinuationCounter == 0) { 474 msg->post(); 475 } 476 } 477 478 void LiveSession::onFinishDisconnect2() { 479 mContinuation.clear(); 480 481 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 482 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 483 484 mPacketSources.valueFor( 485 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 486 487 sp<AMessage> response = new AMessage; 488 response->setInt32("err", OK); 489 490 response->postReply(mDisconnectReplyID); 491 mDisconnectReplyID = 0; 492 } 493 494 sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 495 ssize_t index = mFetcherInfos.indexOfKey(uri); 496 497 if (index >= 0) { 498 return NULL; 499 } 500 501 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 502 notify->setString("uri", uri); 503 504 FetcherInfo info; 505 info.mFetcher = new PlaylistFetcher(notify, this, uri); 506 info.mDurationUs = -1ll; 507 info.mIsPrepared = false; 508 looper()->registerHandler(info.mFetcher); 509 510 mFetcherInfos.add(uri, info); 511 512 return info.mFetcher; 513 } 514 515 status_t LiveSession::fetchFile( 516 const char *url, sp<ABuffer> *out, 517 int64_t range_offset, int64_t range_length) { 518 *out = NULL; 519 520 sp<DataSource> source; 521 522 if (!strncasecmp(url, "file://", 7)) { 523 source = new FileSource(url + 7); 524 } else if (strncasecmp(url, "http://", 7) 525 && strncasecmp(url, "https://", 8)) { 526 return ERROR_UNSUPPORTED; 527 } else { 528 KeyedVector<String8, String8> headers = mExtraHeaders; 529 if (range_offset > 0 || range_length >= 0) { 530 headers.add( 531 String8("Range"), 532 String8( 533 StringPrintf( 534 "bytes=%lld-%s", 535 range_offset, 536 range_length < 0 537 ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str())); 538 } 539 status_t err = mHTTPDataSource->connect(url, &headers); 540 541 if (err != OK) { 542 return err; 543 } 544 545 source = mHTTPDataSource; 546 } 547 548 off64_t size; 549 status_t err = source->getSize(&size); 550 551 if (err != OK) { 552 size = 65536; 553 } 554 555 sp<ABuffer> buffer = new ABuffer(size); 556 buffer->setRange(0, 0); 557 558 for (;;) { 559 size_t bufferRemaining = buffer->capacity() - buffer->size(); 560 561 if (bufferRemaining == 0) { 562 bufferRemaining = 32768; 563 564 ALOGV("increasing download buffer to %d bytes", 565 buffer->size() + bufferRemaining); 566 567 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 568 memcpy(copy->data(), buffer->data(), buffer->size()); 569 copy->setRange(0, buffer->size()); 570 571 buffer = copy; 572 } 573 574 size_t maxBytesToRead = bufferRemaining; 575 if (range_length >= 0) { 576 int64_t bytesLeftInRange = range_length - buffer->size(); 577 if (bytesLeftInRange < maxBytesToRead) { 578 maxBytesToRead = bytesLeftInRange; 579 580 if (bytesLeftInRange == 0) { 581 break; 582 } 583 } 584 } 585 586 ssize_t n = source->readAt( 587 buffer->size(), buffer->data() + buffer->size(), 588 maxBytesToRead); 589 590 if (n < 0) { 591 return n; 592 } 593 594 if (n == 0) { 595 break; 596 } 597 598 buffer->setRange(0, buffer->size() + (size_t)n); 599 } 600 601 *out = buffer; 602 603 return OK; 604 } 605 606 sp<M3UParser> LiveSession::fetchPlaylist( 607 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 608 ALOGV("fetchPlaylist '%s'", url); 609 610 *unchanged = false; 611 612 sp<ABuffer> buffer; 613 status_t err = fetchFile(url, &buffer); 614 615 if (err != OK) { 616 return NULL; 617 } 618 619 // MD5 functionality is not available on the simulator, treat all 620 // playlists as changed. 621 622 #if defined(HAVE_ANDROID_OS) 623 uint8_t hash[16]; 624 625 MD5_CTX m; 626 MD5_Init(&m); 627 MD5_Update(&m, buffer->data(), buffer->size()); 628 629 MD5_Final(hash, &m); 630 631 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 632 // playlist unchanged 633 *unchanged = true; 634 635 ALOGV("Playlist unchanged, refresh state is now %d", 636 (int)mRefreshState); 637 638 return NULL; 639 } 640 641 if (curPlaylistHash != NULL) { 642 memcpy(curPlaylistHash, hash, sizeof(hash)); 643 } 644 #endif 645 646 sp<M3UParser> playlist = 647 new M3UParser(url, buffer->data(), buffer->size()); 648 649 if (playlist->initCheck() != OK) { 650 ALOGE("failed to parse .m3u8 playlist"); 651 652 return NULL; 653 } 654 655 return playlist; 656 } 657 658 static double uniformRand() { 659 return (double)rand() / RAND_MAX; 660 } 661 662 size_t LiveSession::getBandwidthIndex() { 663 if (mBandwidthItems.size() == 0) { 664 return 0; 665 } 666 667 #if 1 668 char value[PROPERTY_VALUE_MAX]; 669 ssize_t index = -1; 670 if (property_get("media.httplive.bw-index", value, NULL)) { 671 char *end; 672 index = strtol(value, &end, 10); 673 CHECK(end > value && *end == '\0'); 674 675 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 676 index = mBandwidthItems.size() - 1; 677 } 678 } 679 680 if (index < 0) { 681 int32_t bandwidthBps; 682 if (mHTTPDataSource != NULL 683 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 684 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 685 } else { 686 ALOGV("no bandwidth estimate."); 687 return 0; // Pick the lowest bandwidth stream by default. 688 } 689 690 char value[PROPERTY_VALUE_MAX]; 691 if (property_get("media.httplive.max-bw", value, NULL)) { 692 char *end; 693 long maxBw = strtoul(value, &end, 10); 694 if (end > value && *end == '\0') { 695 if (maxBw > 0 && bandwidthBps > maxBw) { 696 ALOGV("bandwidth capped to %ld bps", maxBw); 697 bandwidthBps = maxBw; 698 } 699 } 700 } 701 702 // Consider only 80% of the available bandwidth usable. 703 bandwidthBps = (bandwidthBps * 8) / 10; 704 705 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 706 707 index = mBandwidthItems.size() - 1; 708 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 709 > (size_t)bandwidthBps) { 710 --index; 711 } 712 } 713 #elif 0 714 // Change bandwidth at random() 715 size_t index = uniformRand() * mBandwidthItems.size(); 716 #elif 0 717 // There's a 50% chance to stay on the current bandwidth and 718 // a 50% chance to switch to the next higher bandwidth (wrapping around 719 // to lowest) 720 const size_t kMinIndex = 0; 721 722 static ssize_t mPrevBandwidthIndex = -1; 723 724 size_t index; 725 if (mPrevBandwidthIndex < 0) { 726 index = kMinIndex; 727 } else if (uniformRand() < 0.5) { 728 index = (size_t)mPrevBandwidthIndex; 729 } else { 730 index = mPrevBandwidthIndex + 1; 731 if (index == mBandwidthItems.size()) { 732 index = kMinIndex; 733 } 734 } 735 mPrevBandwidthIndex = index; 736 #elif 0 737 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 738 739 size_t index = mBandwidthItems.size() - 1; 740 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 741 --index; 742 } 743 #elif 1 744 char value[PROPERTY_VALUE_MAX]; 745 size_t index; 746 if (property_get("media.httplive.bw-index", value, NULL)) { 747 char *end; 748 index = strtoul(value, &end, 10); 749 CHECK(end > value && *end == '\0'); 750 751 if (index >= mBandwidthItems.size()) { 752 index = mBandwidthItems.size() - 1; 753 } 754 } else { 755 index = 0; 756 } 757 #else 758 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 759 #endif 760 761 CHECK_GE(index, 0); 762 763 return index; 764 } 765 766 status_t LiveSession::onSeek(const sp<AMessage> &msg) { 767 int64_t timeUs; 768 CHECK(msg->findInt64("timeUs", &timeUs)); 769 770 if (!mReconfigurationInProgress) { 771 changeConfiguration(timeUs, getBandwidthIndex()); 772 } 773 774 return OK; 775 } 776 777 status_t LiveSession::getDuration(int64_t *durationUs) const { 778 int64_t maxDurationUs = 0ll; 779 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 780 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 781 782 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 783 maxDurationUs = fetcherDurationUs; 784 } 785 } 786 787 *durationUs = maxDurationUs; 788 789 return OK; 790 } 791 792 bool LiveSession::isSeekable() const { 793 int64_t durationUs; 794 return getDuration(&durationUs) == OK && durationUs >= 0; 795 } 796 797 bool LiveSession::hasDynamicDuration() const { 798 return false; 799 } 800 801 status_t LiveSession::getTrackInfo(Parcel *reply) const { 802 return mPlaylist->getTrackInfo(reply); 803 } 804 805 status_t LiveSession::selectTrack(size_t index, bool select) { 806 status_t err = mPlaylist->selectTrack(index, select); 807 if (err == OK) { 808 (new AMessage(kWhatChangeConfiguration, id()))->post(); 809 } 810 return err; 811 } 812 813 void LiveSession::changeConfiguration( 814 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 815 CHECK(!mReconfigurationInProgress); 816 mReconfigurationInProgress = true; 817 818 mPrevBandwidthIndex = bandwidthIndex; 819 820 ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d", 821 timeUs, bandwidthIndex, pickTrack); 822 823 if (pickTrack) { 824 mPlaylist->pickRandomMediaItems(); 825 } 826 827 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 828 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 829 830 uint32_t streamMask = 0; 831 832 AString audioURI; 833 if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) { 834 streamMask |= STREAMTYPE_AUDIO; 835 } 836 837 AString videoURI; 838 if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) { 839 streamMask |= STREAMTYPE_VIDEO; 840 } 841 842 AString subtitleURI; 843 if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) { 844 streamMask |= STREAMTYPE_SUBTITLES; 845 } 846 847 // Step 1, stop and discard fetchers that are no longer needed. 848 // Pause those that we'll reuse. 849 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 850 const AString &uri = mFetcherInfos.keyAt(i); 851 852 bool discardFetcher = true; 853 854 // If we're seeking all current fetchers are discarded. 855 if (timeUs < 0ll) { 856 if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) 857 || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) 858 || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) { 859 discardFetcher = false; 860 } 861 } 862 863 if (discardFetcher) { 864 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 865 } else { 866 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 867 } 868 } 869 870 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id()); 871 msg->setInt32("streamMask", streamMask); 872 msg->setInt64("timeUs", timeUs); 873 if (streamMask & STREAMTYPE_AUDIO) { 874 msg->setString("audioURI", audioURI.c_str()); 875 } 876 if (streamMask & STREAMTYPE_VIDEO) { 877 msg->setString("videoURI", videoURI.c_str()); 878 } 879 if (streamMask & STREAMTYPE_SUBTITLES) { 880 msg->setString("subtitleURI", subtitleURI.c_str()); 881 } 882 883 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 884 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 885 // fetchers have completed their asynchronous operation, we'll post 886 // mContinuation, which then is handled below in onChangeConfiguration2. 887 mContinuationCounter = mFetcherInfos.size(); 888 mContinuation = msg; 889 890 if (mContinuationCounter == 0) { 891 msg->post(); 892 } 893 } 894 895 void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 896 if (!mReconfigurationInProgress) { 897 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 898 } else { 899 msg->post(1000000ll); // retry in 1 sec 900 } 901 } 902 903 void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 904 mContinuation.clear(); 905 906 // All fetchers are either suspended or have been removed now. 907 908 uint32_t streamMask; 909 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 910 911 AString audioURI, videoURI, subtitleURI; 912 if (streamMask & STREAMTYPE_AUDIO) { 913 CHECK(msg->findString("audioURI", &audioURI)); 914 ALOGV("audioURI = '%s'", audioURI.c_str()); 915 } 916 if (streamMask & STREAMTYPE_VIDEO) { 917 CHECK(msg->findString("videoURI", &videoURI)); 918 ALOGV("videoURI = '%s'", videoURI.c_str()); 919 } 920 if (streamMask & STREAMTYPE_SUBTITLES) { 921 CHECK(msg->findString("subtitleURI", &subtitleURI)); 922 ALOGV("subtitleURI = '%s'", subtitleURI.c_str()); 923 } 924 925 // Determine which decoders to shutdown on the player side, 926 // a decoder has to be shutdown if either 927 // 1) its streamtype was active before but now longer isn't. 928 // or 929 // 2) its streamtype was already active and still is but the URI 930 // has changed. 931 uint32_t changedMask = 0; 932 if (((mStreamMask & streamMask & STREAMTYPE_AUDIO) 933 && !(audioURI == mAudioURI)) 934 || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) { 935 changedMask |= STREAMTYPE_AUDIO; 936 } 937 if (((mStreamMask & streamMask & STREAMTYPE_VIDEO) 938 && !(videoURI == mVideoURI)) 939 || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) { 940 changedMask |= STREAMTYPE_VIDEO; 941 } 942 943 if (changedMask == 0) { 944 // If nothing changed as far as the audio/video decoders 945 // are concerned we can proceed. 946 onChangeConfiguration3(msg); 947 return; 948 } 949 950 // Something changed, inform the player which will shutdown the 951 // corresponding decoders and will post the reply once that's done. 952 // Handling the reply will continue executing below in 953 // onChangeConfiguration3. 954 sp<AMessage> notify = mNotify->dup(); 955 notify->setInt32("what", kWhatStreamsChanged); 956 notify->setInt32("changedMask", changedMask); 957 958 msg->setWhat(kWhatChangeConfiguration3); 959 msg->setTarget(id()); 960 961 notify->setMessage("reply", msg); 962 notify->post(); 963 } 964 965 void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 966 // All remaining fetchers are still suspended, the player has shutdown 967 // any decoders that needed it. 968 969 uint32_t streamMask; 970 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 971 972 AString audioURI, videoURI, subtitleURI; 973 if (streamMask & STREAMTYPE_AUDIO) { 974 CHECK(msg->findString("audioURI", &audioURI)); 975 } 976 if (streamMask & STREAMTYPE_VIDEO) { 977 CHECK(msg->findString("videoURI", &videoURI)); 978 } 979 if (streamMask & STREAMTYPE_SUBTITLES) { 980 CHECK(msg->findString("subtitleURI", &subtitleURI)); 981 } 982 983 int64_t timeUs; 984 CHECK(msg->findInt64("timeUs", &timeUs)); 985 986 if (timeUs < 0ll) { 987 timeUs = mLastDequeuedTimeUs; 988 } 989 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 990 991 mStreamMask = streamMask; 992 mAudioURI = audioURI; 993 mVideoURI = videoURI; 994 mSubtitleURI = subtitleURI; 995 996 // Resume all existing fetchers and assign them packet sources. 997 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 998 const AString &uri = mFetcherInfos.keyAt(i); 999 1000 uint32_t resumeMask = 0; 1001 1002 sp<AnotherPacketSource> audioSource; 1003 if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) { 1004 audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 1005 resumeMask |= STREAMTYPE_AUDIO; 1006 } 1007 1008 sp<AnotherPacketSource> videoSource; 1009 if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) { 1010 videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 1011 resumeMask |= STREAMTYPE_VIDEO; 1012 } 1013 1014 sp<AnotherPacketSource> subtitleSource; 1015 if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) { 1016 subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES); 1017 resumeMask |= STREAMTYPE_SUBTITLES; 1018 } 1019 1020 CHECK_NE(resumeMask, 0u); 1021 1022 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1023 1024 streamMask &= ~resumeMask; 1025 1026 mFetcherInfos.valueAt(i).mFetcher->startAsync( 1027 audioSource, videoSource, subtitleSource); 1028 } 1029 1030 // streamMask now only contains the types that need a new fetcher created. 1031 1032 if (streamMask != 0) { 1033 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1034 } 1035 1036 while (streamMask != 0) { 1037 StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1)); 1038 1039 AString uri; 1040 switch (streamType) { 1041 case STREAMTYPE_AUDIO: 1042 uri = audioURI; 1043 break; 1044 case STREAMTYPE_VIDEO: 1045 uri = videoURI; 1046 break; 1047 case STREAMTYPE_SUBTITLES: 1048 uri = subtitleURI; 1049 break; 1050 default: 1051 TRESPASS(); 1052 } 1053 1054 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1055 CHECK(fetcher != NULL); 1056 1057 sp<AnotherPacketSource> audioSource; 1058 if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) { 1059 audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 1060 audioSource->clear(); 1061 1062 streamMask &= ~STREAMTYPE_AUDIO; 1063 } 1064 1065 sp<AnotherPacketSource> videoSource; 1066 if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) { 1067 videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 1068 videoSource->clear(); 1069 1070 streamMask &= ~STREAMTYPE_VIDEO; 1071 } 1072 1073 sp<AnotherPacketSource> subtitleSource; 1074 if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) { 1075 subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES); 1076 subtitleSource->clear(); 1077 1078 streamMask &= ~STREAMTYPE_SUBTITLES; 1079 } 1080 1081 fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs); 1082 } 1083 1084 // All fetchers have now been started, the configuration change 1085 // has completed. 1086 1087 scheduleCheckBandwidthEvent(); 1088 1089 ALOGV("XXX configuration change completed."); 1090 1091 mReconfigurationInProgress = false; 1092 1093 if (mDisconnectReplyID != 0) { 1094 finishDisconnect(); 1095 } 1096 } 1097 1098 void LiveSession::scheduleCheckBandwidthEvent() { 1099 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1100 msg->setInt32("generation", mCheckBandwidthGeneration); 1101 msg->post(10000000ll); 1102 } 1103 1104 void LiveSession::cancelCheckBandwidthEvent() { 1105 ++mCheckBandwidthGeneration; 1106 } 1107 1108 void LiveSession::onCheckBandwidth() { 1109 if (mReconfigurationInProgress) { 1110 scheduleCheckBandwidthEvent(); 1111 return; 1112 } 1113 1114 size_t bandwidthIndex = getBandwidthIndex(); 1115 if (mPrevBandwidthIndex < 0 1116 || bandwidthIndex != (size_t)mPrevBandwidthIndex) { 1117 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1118 } 1119 1120 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1121 // schedule another one on return, only an explicit call to 1122 // scheduleCheckBandwidthEvent will do that. 1123 // This ensures that only one configuration change is ongoing at any 1124 // one time, once that completes it'll schedule another check bandwidth 1125 // event. 1126 } 1127 1128 void LiveSession::postPrepared(status_t err) { 1129 CHECK(mInPreparationPhase); 1130 1131 sp<AMessage> notify = mNotify->dup(); 1132 if (err == OK || err == ERROR_END_OF_STREAM) { 1133 notify->setInt32("what", kWhatPrepared); 1134 } else { 1135 notify->setInt32("what", kWhatPreparationFailed); 1136 notify->setInt32("err", err); 1137 } 1138 1139 notify->post(); 1140 1141 mInPreparationPhase = false; 1142 } 1143 1144 } // namespace android 1145 1146