1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "PlaylistFetcher" 19 #include <utils/Log.h> 20 #include <utils/misc.h> 21 22 #include "PlaylistFetcher.h" 23 #include "HTTPDownloader.h" 24 #include "LiveSession.h" 25 #include "M3UParser.h" 26 #include "include/avc_utils.h" 27 #include "include/ID3.h" 28 #include "mpeg2ts/AnotherPacketSource.h" 29 30 #include <media/stagefright/foundation/ABitReader.h> 31 #include <media/stagefright/foundation/ABuffer.h> 32 #include <media/stagefright/foundation/ADebug.h> 33 #include <media/stagefright/MediaDefs.h> 34 #include <media/stagefright/MetaData.h> 35 #include <media/stagefright/Utils.h> 36 37 #include <ctype.h> 38 #include <inttypes.h> 39 #include <openssl/aes.h> 40 41 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__) 42 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \ 43 LiveSession::getNameForStream(stream), ##__VA_ARGS__) 44 45 namespace android { 46 47 // static 48 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll; 49 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; 50 // LCM of 188 (size of a TS packet) & 1k works well 51 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024; 52 53 struct PlaylistFetcher::DownloadState : public RefBase { 54 DownloadState(); 55 void resetState(); 56 bool hasSavedState() const; 57 void restoreState( 58 AString &uri, 59 sp<AMessage> &itemMeta, 60 sp<ABuffer> &buffer, 61 sp<ABuffer> &tsBuffer, 62 int32_t &firstSeqNumberInPlaylist, 63 int32_t &lastSeqNumberInPlaylist); 64 void saveState( 65 AString &uri, 66 sp<AMessage> &itemMeta, 67 sp<ABuffer> &buffer, 68 sp<ABuffer> &tsBuffer, 69 int32_t &firstSeqNumberInPlaylist, 70 int32_t &lastSeqNumberInPlaylist); 71 72 private: 73 bool mHasSavedState; 74 AString mUri; 75 sp<AMessage> mItemMeta; 76 sp<ABuffer> mBuffer; 77 sp<ABuffer> mTsBuffer; 78 int32_t mFirstSeqNumberInPlaylist; 79 int32_t mLastSeqNumberInPlaylist; 80 }; 81 82 PlaylistFetcher::DownloadState::DownloadState() { 83 resetState(); 84 } 85 86 bool PlaylistFetcher::DownloadState::hasSavedState() const { 87 return mHasSavedState; 88 } 89 90 void PlaylistFetcher::DownloadState::resetState() { 91 mHasSavedState = false; 92 93 mUri.clear(); 94 mItemMeta = NULL; 95 mBuffer = NULL; 96 mTsBuffer = NULL; 97 mFirstSeqNumberInPlaylist = 0; 98 mLastSeqNumberInPlaylist = 0; 99 } 100 101 void PlaylistFetcher::DownloadState::restoreState( 102 AString &uri, 103 sp<AMessage> &itemMeta, 104 sp<ABuffer> &buffer, 105 sp<ABuffer> &tsBuffer, 106 int32_t &firstSeqNumberInPlaylist, 107 int32_t &lastSeqNumberInPlaylist) { 108 if (!mHasSavedState) { 109 return; 110 } 111 112 uri = mUri; 113 itemMeta = mItemMeta; 114 buffer = mBuffer; 115 tsBuffer = mTsBuffer; 116 firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist; 117 lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist; 118 119 resetState(); 120 } 121 122 void PlaylistFetcher::DownloadState::saveState( 123 AString &uri, 124 sp<AMessage> &itemMeta, 125 sp<ABuffer> &buffer, 126 sp<ABuffer> &tsBuffer, 127 int32_t &firstSeqNumberInPlaylist, 128 int32_t &lastSeqNumberInPlaylist) { 129 mHasSavedState = true; 130 131 mUri = uri; 132 mItemMeta = itemMeta; 133 mBuffer = buffer; 134 mTsBuffer = tsBuffer; 135 mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist; 136 mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist; 137 } 138 139 PlaylistFetcher::PlaylistFetcher( 140 const sp<AMessage> ¬ify, 141 const sp<LiveSession> &session, 142 const char *uri, 143 int32_t id, 144 int32_t subtitleGeneration) 145 : mNotify(notify), 146 mSession(session), 147 mURI(uri), 148 mFetcherID(id), 149 mStreamTypeMask(0), 150 mStartTimeUs(-1ll), 151 mSegmentStartTimeUs(-1ll), 152 mDiscontinuitySeq(-1ll), 153 mStartTimeUsRelative(false), 154 mLastPlaylistFetchTimeUs(-1ll), 155 mPlaylistTimeUs(-1ll), 156 mSeqNumber(-1), 157 mNumRetries(0), 158 mStartup(true), 159 mIDRFound(false), 160 mSeekMode(LiveSession::kSeekModeExactPosition), 161 mTimeChangeSignaled(false), 162 mNextPTSTimeUs(-1ll), 163 mMonitorQueueGeneration(0), 164 mSubtitleGeneration(subtitleGeneration), 165 mLastDiscontinuitySeq(-1ll), 166 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), 167 mFirstPTSValid(false), 168 mFirstTimeUs(-1ll), 169 mVideoBuffer(new AnotherPacketSource(NULL)), 170 mThresholdRatio(-1.0f), 171 mDownloadState(new DownloadState()), 172 mHasMetadata(false) { 173 memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); 174 mHTTPDownloader = mSession->getHTTPDownloader(); 175 } 176 177 PlaylistFetcher::~PlaylistFetcher() { 178 } 179 180 int32_t PlaylistFetcher::getFetcherID() const { 181 return mFetcherID; 182 } 183 184 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { 185 CHECK(mPlaylist != NULL); 186 187 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; 188 mPlaylist->getSeqNumberRange( 189 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist); 190 191 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 192 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 193 194 int64_t segmentStartUs = 0ll; 195 for (int32_t index = 0; 196 index < seqNumber - firstSeqNumberInPlaylist; ++index) { 197 sp<AMessage> itemMeta; 198 CHECK(mPlaylist->itemAt( 199 index, NULL /* uri */, &itemMeta)); 200 201 int64_t itemDurationUs; 202 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 203 204 segmentStartUs += itemDurationUs; 205 } 206 207 return segmentStartUs; 208 } 209 210 int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const { 211 CHECK(mPlaylist != NULL); 212 213 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; 214 mPlaylist->getSeqNumberRange( 215 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist); 216 217 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 218 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 219 220 int32_t index = seqNumber - firstSeqNumberInPlaylist; 221 sp<AMessage> itemMeta; 222 CHECK(mPlaylist->itemAt( 223 index, NULL /* uri */, &itemMeta)); 224 225 int64_t itemDurationUs; 226 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 227 228 return itemDurationUs; 229 } 230 231 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { 232 int64_t nowUs = ALooper::GetNowUs(); 233 234 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { 235 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); 236 return 0ll; 237 } 238 239 if (mPlaylist->isComplete()) { 240 return (~0llu >> 1); 241 } 242 243 int64_t targetDurationUs = mPlaylist->getTargetDuration(); 244 245 int64_t minPlaylistAgeUs; 246 247 switch (mRefreshState) { 248 case INITIAL_MINIMUM_RELOAD_DELAY: 249 { 250 size_t n = mPlaylist->size(); 251 if (n > 0) { 252 sp<AMessage> itemMeta; 253 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta)); 254 255 int64_t itemDurationUs; 256 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 257 258 minPlaylistAgeUs = itemDurationUs; 259 break; 260 } 261 262 // fall through 263 } 264 265 case FIRST_UNCHANGED_RELOAD_ATTEMPT: 266 { 267 minPlaylistAgeUs = targetDurationUs / 2; 268 break; 269 } 270 271 case SECOND_UNCHANGED_RELOAD_ATTEMPT: 272 { 273 minPlaylistAgeUs = (targetDurationUs * 3) / 2; 274 break; 275 } 276 277 case THIRD_UNCHANGED_RELOAD_ATTEMPT: 278 { 279 minPlaylistAgeUs = targetDurationUs * 3; 280 break; 281 } 282 283 default: 284 TRESPASS(); 285 break; 286 } 287 288 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; 289 return delayUs > 0ll ? delayUs : 0ll; 290 } 291 292 status_t PlaylistFetcher::decryptBuffer( 293 size_t playlistIndex, const sp<ABuffer> &buffer, 294 bool first) { 295 sp<AMessage> itemMeta; 296 bool found = false; 297 AString method; 298 299 for (ssize_t i = playlistIndex; i >= 0; --i) { 300 AString uri; 301 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 302 303 if (itemMeta->findString("cipher-method", &method)) { 304 found = true; 305 break; 306 } 307 } 308 309 if (!found) { 310 method = "NONE"; 311 } 312 buffer->meta()->setString("cipher-method", method.c_str()); 313 314 if (method == "NONE") { 315 return OK; 316 } else if (!(method == "AES-128")) { 317 ALOGE("Unsupported cipher method '%s'", method.c_str()); 318 return ERROR_UNSUPPORTED; 319 } 320 321 AString keyURI; 322 if (!itemMeta->findString("cipher-uri", &keyURI)) { 323 ALOGE("Missing key uri"); 324 return ERROR_MALFORMED; 325 } 326 327 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 328 329 sp<ABuffer> key; 330 if (index >= 0) { 331 key = mAESKeyForURI.valueAt(index); 332 } else { 333 ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key); 334 335 if (err == ERROR_NOT_CONNECTED) { 336 return ERROR_NOT_CONNECTED; 337 } else if (err < 0) { 338 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 339 return ERROR_IO; 340 } else if (key->size() != 16) { 341 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str()); 342 return ERROR_MALFORMED; 343 } 344 345 mAESKeyForURI.add(keyURI, key); 346 } 347 348 AES_KEY aes_key; 349 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 350 ALOGE("failed to set AES decryption key."); 351 return UNKNOWN_ERROR; 352 } 353 354 size_t n = buffer->size(); 355 if (!n) { 356 return OK; 357 } 358 359 if (n < 16 || n % 16) { 360 ALOGE("not enough or trailing bytes (%zu) in encrypted buffer", n); 361 return ERROR_MALFORMED; 362 } 363 364 if (first) { 365 // If decrypting the first block in a file, read the iv from the manifest 366 // or derive the iv from the file's sequence number. 367 368 AString iv; 369 if (itemMeta->findString("cipher-iv", &iv)) { 370 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 371 || iv.size() > 16 * 2 + 2) { 372 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 373 return ERROR_MALFORMED; 374 } 375 376 while (iv.size() < 16 * 2 + 2) { 377 iv.insert("0", 1, 2); 378 } 379 380 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 381 for (size_t i = 0; i < 16; ++i) { 382 char c1 = tolower(iv.c_str()[2 + 2 * i]); 383 char c2 = tolower(iv.c_str()[3 + 2 * i]); 384 if (!isxdigit(c1) || !isxdigit(c2)) { 385 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 386 return ERROR_MALFORMED; 387 } 388 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 389 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 390 391 mAESInitVec[i] = nibble1 << 4 | nibble2; 392 } 393 } else { 394 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 395 mAESInitVec[15] = mSeqNumber & 0xff; 396 mAESInitVec[14] = (mSeqNumber >> 8) & 0xff; 397 mAESInitVec[13] = (mSeqNumber >> 16) & 0xff; 398 mAESInitVec[12] = (mSeqNumber >> 24) & 0xff; 399 } 400 } 401 402 AES_cbc_encrypt( 403 buffer->data(), buffer->data(), buffer->size(), 404 &aes_key, mAESInitVec, AES_DECRYPT); 405 406 return OK; 407 } 408 409 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) { 410 AString method; 411 CHECK(buffer->meta()->findString("cipher-method", &method)); 412 if (method == "NONE") { 413 return OK; 414 } 415 416 uint8_t padding = 0; 417 if (buffer->size() > 0) { 418 padding = buffer->data()[buffer->size() - 1]; 419 } 420 421 if (padding > 16) { 422 return ERROR_MALFORMED; 423 } 424 425 for (size_t i = buffer->size() - padding; i < padding; i++) { 426 if (buffer->data()[i] != padding) { 427 return ERROR_MALFORMED; 428 } 429 } 430 431 buffer->setRange(buffer->offset(), buffer->size() - padding); 432 return OK; 433 } 434 435 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { 436 int64_t maxDelayUs = delayUsToRefreshPlaylist(); 437 if (maxDelayUs < minDelayUs) { 438 maxDelayUs = minDelayUs; 439 } 440 if (delayUs > maxDelayUs) { 441 FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs); 442 delayUs = maxDelayUs; 443 } 444 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this); 445 msg->setInt32("generation", mMonitorQueueGeneration); 446 msg->post(delayUs); 447 } 448 449 void PlaylistFetcher::cancelMonitorQueue() { 450 ++mMonitorQueueGeneration; 451 } 452 453 void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) { 454 { 455 AutoMutex _l(mThresholdLock); 456 mThresholdRatio = thresholdRatio; 457 } 458 if (disconnect) { 459 mHTTPDownloader->disconnect(); 460 } 461 } 462 463 void PlaylistFetcher::resetStoppingThreshold(bool disconnect) { 464 { 465 AutoMutex _l(mThresholdLock); 466 mThresholdRatio = -1.0f; 467 } 468 if (disconnect) { 469 mHTTPDownloader->disconnect(); 470 } else { 471 // allow reconnect 472 mHTTPDownloader->reconnect(); 473 } 474 } 475 476 float PlaylistFetcher::getStoppingThreshold() { 477 AutoMutex _l(mThresholdLock); 478 return mThresholdRatio; 479 } 480 481 void PlaylistFetcher::startAsync( 482 const sp<AnotherPacketSource> &audioSource, 483 const sp<AnotherPacketSource> &videoSource, 484 const sp<AnotherPacketSource> &subtitleSource, 485 const sp<AnotherPacketSource> &metadataSource, 486 int64_t startTimeUs, 487 int64_t segmentStartTimeUs, 488 int32_t startDiscontinuitySeq, 489 LiveSession::SeekMode seekMode) { 490 sp<AMessage> msg = new AMessage(kWhatStart, this); 491 492 uint32_t streamTypeMask = 0ul; 493 494 if (audioSource != NULL) { 495 msg->setPointer("audioSource", audioSource.get()); 496 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO; 497 } 498 499 if (videoSource != NULL) { 500 msg->setPointer("videoSource", videoSource.get()); 501 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO; 502 } 503 504 if (subtitleSource != NULL) { 505 msg->setPointer("subtitleSource", subtitleSource.get()); 506 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES; 507 } 508 509 if (metadataSource != NULL) { 510 msg->setPointer("metadataSource", metadataSource.get()); 511 // metadataSource does not affect streamTypeMask. 512 } 513 514 msg->setInt32("streamTypeMask", streamTypeMask); 515 msg->setInt64("startTimeUs", startTimeUs); 516 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); 517 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); 518 msg->setInt32("seekMode", seekMode); 519 msg->post(); 520 } 521 522 /* 523 * pauseAsync 524 * 525 * threshold: 0.0f - pause after current fetch block (default 47Kbytes) 526 * -1.0f - pause after finishing current segment 527 * 0.0~1.0f - pause if remaining of current segment exceeds threshold 528 */ 529 void PlaylistFetcher::pauseAsync( 530 float thresholdRatio, bool disconnect) { 531 setStoppingThreshold(thresholdRatio, disconnect); 532 533 (new AMessage(kWhatPause, this))->post(); 534 } 535 536 void PlaylistFetcher::stopAsync(bool clear) { 537 setStoppingThreshold(0.0f, true /* disconncect */); 538 539 sp<AMessage> msg = new AMessage(kWhatStop, this); 540 msg->setInt32("clear", clear); 541 msg->post(); 542 } 543 544 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) { 545 FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str()); 546 547 AMessage* msg = new AMessage(kWhatResumeUntil, this); 548 msg->setMessage("params", params); 549 msg->post(); 550 } 551 552 void PlaylistFetcher::fetchPlaylistAsync() { 553 (new AMessage(kWhatFetchPlaylist, this))->post(); 554 } 555 556 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { 557 switch (msg->what()) { 558 case kWhatStart: 559 { 560 status_t err = onStart(msg); 561 562 sp<AMessage> notify = mNotify->dup(); 563 notify->setInt32("what", kWhatStarted); 564 notify->setInt32("err", err); 565 notify->post(); 566 break; 567 } 568 569 case kWhatPause: 570 { 571 onPause(); 572 573 sp<AMessage> notify = mNotify->dup(); 574 notify->setInt32("what", kWhatPaused); 575 notify->setInt32("seekMode", 576 mDownloadState->hasSavedState() 577 ? LiveSession::kSeekModeNextSample 578 : LiveSession::kSeekModeNextSegment); 579 notify->post(); 580 break; 581 } 582 583 case kWhatStop: 584 { 585 onStop(msg); 586 587 sp<AMessage> notify = mNotify->dup(); 588 notify->setInt32("what", kWhatStopped); 589 notify->post(); 590 break; 591 } 592 593 case kWhatFetchPlaylist: 594 { 595 bool unchanged; 596 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist( 597 mURI.c_str(), NULL /* curPlaylistHash */, &unchanged); 598 599 sp<AMessage> notify = mNotify->dup(); 600 notify->setInt32("what", kWhatPlaylistFetched); 601 notify->setObject("playlist", playlist); 602 notify->post(); 603 break; 604 } 605 606 case kWhatMonitorQueue: 607 case kWhatDownloadNext: 608 { 609 int32_t generation; 610 CHECK(msg->findInt32("generation", &generation)); 611 612 if (generation != mMonitorQueueGeneration) { 613 // Stale event 614 break; 615 } 616 617 if (msg->what() == kWhatMonitorQueue) { 618 onMonitorQueue(); 619 } else { 620 onDownloadNext(); 621 } 622 break; 623 } 624 625 case kWhatResumeUntil: 626 { 627 onResumeUntil(msg); 628 break; 629 } 630 631 default: 632 TRESPASS(); 633 } 634 } 635 636 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { 637 mPacketSources.clear(); 638 mStopParams.clear(); 639 mStartTimeUsNotify = mNotify->dup(); 640 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 641 mStartTimeUsNotify->setString("uri", mURI); 642 643 uint32_t streamTypeMask; 644 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); 645 646 int64_t startTimeUs; 647 int64_t segmentStartTimeUs; 648 int32_t startDiscontinuitySeq; 649 int32_t seekMode; 650 CHECK(msg->findInt64("startTimeUs", &startTimeUs)); 651 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); 652 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); 653 CHECK(msg->findInt32("seekMode", &seekMode)); 654 655 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { 656 void *ptr; 657 CHECK(msg->findPointer("audioSource", &ptr)); 658 659 mPacketSources.add( 660 LiveSession::STREAMTYPE_AUDIO, 661 static_cast<AnotherPacketSource *>(ptr)); 662 } 663 664 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) { 665 void *ptr; 666 CHECK(msg->findPointer("videoSource", &ptr)); 667 668 mPacketSources.add( 669 LiveSession::STREAMTYPE_VIDEO, 670 static_cast<AnotherPacketSource *>(ptr)); 671 } 672 673 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) { 674 void *ptr; 675 CHECK(msg->findPointer("subtitleSource", &ptr)); 676 677 mPacketSources.add( 678 LiveSession::STREAMTYPE_SUBTITLES, 679 static_cast<AnotherPacketSource *>(ptr)); 680 } 681 682 void *ptr; 683 // metadataSource is not part of streamTypeMask 684 if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO)) 685 && msg->findPointer("metadataSource", &ptr)) { 686 mPacketSources.add( 687 LiveSession::STREAMTYPE_METADATA, 688 static_cast<AnotherPacketSource *>(ptr)); 689 } 690 691 mStreamTypeMask = streamTypeMask; 692 693 mSegmentStartTimeUs = segmentStartTimeUs; 694 695 if (startDiscontinuitySeq >= 0) { 696 mDiscontinuitySeq = startDiscontinuitySeq; 697 } 698 699 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 700 mSeekMode = (LiveSession::SeekMode) seekMode; 701 702 if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) { 703 mStartup = true; 704 mIDRFound = false; 705 mVideoBuffer->clear(); 706 } 707 708 if (startTimeUs >= 0) { 709 mStartTimeUs = startTimeUs; 710 mFirstPTSValid = false; 711 mSeqNumber = -1; 712 mTimeChangeSignaled = false; 713 mDownloadState->resetState(); 714 } 715 716 postMonitorQueue(); 717 718 return OK; 719 } 720 721 void PlaylistFetcher::onPause() { 722 cancelMonitorQueue(); 723 mLastDiscontinuitySeq = mDiscontinuitySeq; 724 725 resetStoppingThreshold(false /* disconnect */); 726 } 727 728 void PlaylistFetcher::onStop(const sp<AMessage> &msg) { 729 cancelMonitorQueue(); 730 731 int32_t clear; 732 CHECK(msg->findInt32("clear", &clear)); 733 if (clear) { 734 for (size_t i = 0; i < mPacketSources.size(); i++) { 735 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 736 packetSource->clear(); 737 } 738 } 739 740 mDownloadState->resetState(); 741 mPacketSources.clear(); 742 mStreamTypeMask = 0; 743 744 resetStoppingThreshold(true /* disconnect */); 745 } 746 747 // Resume until we have reached the boundary timestamps listed in `msg`; when 748 // the remaining time is too short (within a resume threshold) stop immediately 749 // instead. 750 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { 751 sp<AMessage> params; 752 CHECK(msg->findMessage("params", ¶ms)); 753 754 mStopParams = params; 755 onDownloadNext(); 756 757 return OK; 758 } 759 760 void PlaylistFetcher::notifyStopReached() { 761 sp<AMessage> notify = mNotify->dup(); 762 notify->setInt32("what", kWhatStopReached); 763 notify->post(); 764 } 765 766 void PlaylistFetcher::notifyError(status_t err) { 767 sp<AMessage> notify = mNotify->dup(); 768 notify->setInt32("what", kWhatError); 769 notify->setInt32("err", err); 770 notify->post(); 771 } 772 773 void PlaylistFetcher::queueDiscontinuity( 774 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { 775 for (size_t i = 0; i < mPacketSources.size(); ++i) { 776 // do not discard buffer upon #EXT-X-DISCONTINUITY tag 777 // (seek will discard buffer by abandoning old fetchers) 778 mPacketSources.valueAt(i)->queueDiscontinuity( 779 type, extra, false /* discard */); 780 } 781 } 782 783 void PlaylistFetcher::onMonitorQueue() { 784 // in the middle of an unfinished download, delay 785 // playlist refresh as it'll change seq numbers 786 if (!mDownloadState->hasSavedState()) { 787 refreshPlaylist(); 788 } 789 790 int64_t targetDurationUs = kMinBufferedDurationUs; 791 if (mPlaylist != NULL) { 792 targetDurationUs = mPlaylist->getTargetDuration(); 793 } 794 795 int64_t bufferedDurationUs = 0ll; 796 status_t finalResult = OK; 797 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 798 sp<AnotherPacketSource> packetSource = 799 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 800 801 bufferedDurationUs = 802 packetSource->getBufferedDurationUs(&finalResult); 803 } else { 804 // Use min stream duration, but ignore streams that never have any packet 805 // enqueued to prevent us from waiting on a non-existent stream; 806 // when we cannot make out from the manifest what streams are included in 807 // a playlist we might assume extra streams. 808 bufferedDurationUs = -1ll; 809 for (size_t i = 0; i < mPacketSources.size(); ++i) { 810 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0 811 || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) { 812 continue; 813 } 814 815 int64_t bufferedStreamDurationUs = 816 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); 817 818 FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs); 819 820 if (bufferedDurationUs == -1ll 821 || bufferedStreamDurationUs < bufferedDurationUs) { 822 bufferedDurationUs = bufferedStreamDurationUs; 823 } 824 } 825 if (bufferedDurationUs == -1ll) { 826 bufferedDurationUs = 0ll; 827 } 828 } 829 830 if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) { 831 FLOGV("monitoring, buffered=%lld < %lld", 832 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs); 833 834 // delay the next download slightly; hopefully this gives other concurrent fetchers 835 // a better chance to run. 836 // onDownloadNext(); 837 sp<AMessage> msg = new AMessage(kWhatDownloadNext, this); 838 msg->setInt32("generation", mMonitorQueueGeneration); 839 msg->post(1000l); 840 } else { 841 // We'd like to maintain buffering above durationToBufferUs, so try 842 // again when buffer just about to go below durationToBufferUs 843 // (or after targetDurationUs / 2, whichever is smaller). 844 int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll; 845 if (delayUs > targetDurationUs / 2) { 846 delayUs = targetDurationUs / 2; 847 } 848 849 FLOGV("pausing for %lld, buffered=%lld > %lld", 850 (long long)delayUs, 851 (long long)bufferedDurationUs, 852 (long long)kMinBufferedDurationUs); 853 854 postMonitorQueue(delayUs); 855 } 856 } 857 858 status_t PlaylistFetcher::refreshPlaylist() { 859 if (delayUsToRefreshPlaylist() <= 0) { 860 bool unchanged; 861 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist( 862 mURI.c_str(), mPlaylistHash, &unchanged); 863 864 if (playlist == NULL) { 865 if (unchanged) { 866 // We succeeded in fetching the playlist, but it was 867 // unchanged from the last time we tried. 868 869 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) { 870 mRefreshState = (RefreshState)(mRefreshState + 1); 871 } 872 } else { 873 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str()); 874 return ERROR_IO; 875 } 876 } else { 877 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 878 mPlaylist = playlist; 879 880 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 881 updateDuration(); 882 } 883 // Notify LiveSession to use target-duration based buffering level 884 // for up/down switch. Default LiveSession::kUpSwitchMark may not 885 // be reachable for live streams, as our max buffering amount is 886 // limited to 3 segments. 887 if (!mPlaylist->isComplete()) { 888 updateTargetDuration(); 889 } 890 mPlaylistTimeUs = ALooper::GetNowUs(); 891 } 892 893 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 894 } 895 return OK; 896 } 897 898 // static 899 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) { 900 return buffer->size() > 0 && buffer->data()[0] == 0x47; 901 } 902 903 bool PlaylistFetcher::shouldPauseDownload() { 904 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 905 // doesn't apply to subtitles 906 return false; 907 } 908 909 // Calculate threshold to abort current download 910 float thresholdRatio = getStoppingThreshold(); 911 912 if (thresholdRatio < 0.0f) { 913 // never abort 914 return false; 915 } else if (thresholdRatio == 0.0f) { 916 // immediately abort 917 return true; 918 } 919 920 // now we have a positive thresholdUs, abort if remaining 921 // portion to download is over that threshold. 922 if (mSegmentFirstPTS < 0) { 923 // this means we haven't even find the first access unit, 924 // abort now as we must be very far away from the end. 925 return true; 926 } 927 int64_t lastEnqueueUs = mSegmentFirstPTS; 928 for (size_t i = 0; i < mPacketSources.size(); ++i) { 929 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { 930 continue; 931 } 932 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 933 int32_t type; 934 if (meta == NULL || meta->findInt32("discontinuity", &type)) { 935 continue; 936 } 937 int64_t tmpUs; 938 CHECK(meta->findInt64("timeUs", &tmpUs)); 939 if (tmpUs > lastEnqueueUs) { 940 lastEnqueueUs = tmpUs; 941 } 942 } 943 lastEnqueueUs -= mSegmentFirstPTS; 944 945 int64_t targetDurationUs = mPlaylist->getTargetDuration(); 946 int64_t thresholdUs = thresholdRatio * targetDurationUs; 947 948 FLOGV("%spausing now, thresholdUs %lld, remaining %lld", 949 targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ", 950 (long long)thresholdUs, 951 (long long)(targetDurationUs - lastEnqueueUs)); 952 953 if (targetDurationUs - lastEnqueueUs > thresholdUs) { 954 return true; 955 } 956 return false; 957 } 958 959 bool PlaylistFetcher::initDownloadState( 960 AString &uri, 961 sp<AMessage> &itemMeta, 962 int32_t &firstSeqNumberInPlaylist, 963 int32_t &lastSeqNumberInPlaylist) { 964 status_t err = refreshPlaylist(); 965 firstSeqNumberInPlaylist = 0; 966 lastSeqNumberInPlaylist = 0; 967 bool discontinuity = false; 968 969 if (mPlaylist != NULL) { 970 mPlaylist->getSeqNumberRange( 971 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist); 972 973 if (mDiscontinuitySeq < 0) { 974 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 975 } 976 } 977 978 mSegmentFirstPTS = -1ll; 979 980 if (mPlaylist != NULL && mSeqNumber < 0) { 981 CHECK_GE(mStartTimeUs, 0ll); 982 983 if (mSegmentStartTimeUs < 0) { 984 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { 985 // If this is a live session, start 3 segments from the end on connect 986 mSeqNumber = lastSeqNumberInPlaylist - 3; 987 if (mSeqNumber < firstSeqNumberInPlaylist) { 988 mSeqNumber = firstSeqNumberInPlaylist; 989 } 990 } else { 991 // When seeking mSegmentStartTimeUs is unavailable (< 0), we 992 // use mStartTimeUs (client supplied timestamp) to determine both start segment 993 // and relative position inside a segment 994 mSeqNumber = getSeqNumberForTime(mStartTimeUs); 995 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber); 996 } 997 mStartTimeUsRelative = true; 998 FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)", 999 (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, 1000 lastSeqNumberInPlaylist); 1001 } else { 1002 // When adapting or track switching, mSegmentStartTimeUs (relative 1003 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute 1004 // timestamps coming from the media container) is used to determine the position 1005 // inside a segments. 1006 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES 1007 && mSeekMode != LiveSession::kSeekModeNextSample) { 1008 // avoid double fetch/decode 1009 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search 1010 // for the starting segment in new variant. 1011 // If the two variants' segments are aligned, this gives the 1012 // next segment. If they're not aligned, this gives the segment 1013 // that overlaps no more than 1/2 * targetDurationUs. 1014 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs 1015 + mPlaylist->getTargetDuration() / 2); 1016 } else { 1017 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); 1018 } 1019 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq); 1020 if (mSeqNumber < minSeq) { 1021 mSeqNumber = minSeq; 1022 } 1023 1024 if (mSeqNumber < firstSeqNumberInPlaylist) { 1025 mSeqNumber = firstSeqNumberInPlaylist; 1026 } 1027 1028 if (mSeqNumber > lastSeqNumberInPlaylist) { 1029 mSeqNumber = lastSeqNumberInPlaylist; 1030 } 1031 FLOGV("Initial sequence number is %d from (%d .. %d)", 1032 mSeqNumber, firstSeqNumberInPlaylist, 1033 lastSeqNumberInPlaylist); 1034 } 1035 } 1036 1037 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true 1038 if (mSeqNumber < firstSeqNumberInPlaylist 1039 || mSeqNumber > lastSeqNumberInPlaylist 1040 || err != OK) { 1041 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) { 1042 ++mNumRetries; 1043 1044 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) { 1045 // make sure we reach this retry logic on refresh failures 1046 // by adding an err != OK clause to all enclosing if's. 1047 1048 // refresh in increasing fraction (1/2, 1/3, ...) of the 1049 // playlist's target duration or 3 seconds, whichever is less 1050 int64_t delayUs = kMaxMonitorDelayUs; 1051 if (mPlaylist != NULL) { 1052 delayUs = mPlaylist->size() * mPlaylist->getTargetDuration() 1053 / (1 + mNumRetries); 1054 } 1055 if (delayUs > kMaxMonitorDelayUs) { 1056 delayUs = kMaxMonitorDelayUs; 1057 } 1058 FLOGV("sequence number high: %d from (%d .. %d), " 1059 "monitor in %lld (retry=%d)", 1060 mSeqNumber, firstSeqNumberInPlaylist, 1061 lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries); 1062 postMonitorQueue(delayUs); 1063 return false; 1064 } 1065 1066 if (err != OK) { 1067 notifyError(err); 1068 return false; 1069 } 1070 1071 // we've missed the boat, let's start 3 segments prior to the latest sequence 1072 // number available and signal a discontinuity. 1073 1074 ALOGI("We've missed the boat, restarting playback." 1075 " mStartup=%d, was looking for %d in %d-%d", 1076 mStartup, mSeqNumber, firstSeqNumberInPlaylist, 1077 lastSeqNumberInPlaylist); 1078 if (mStopParams != NULL) { 1079 // we should have kept on fetching until we hit the boundaries in mStopParams, 1080 // but since the segments we are supposed to fetch have already rolled off 1081 // the playlist, i.e. we have already missed the boat, we inevitably have to 1082 // skip. 1083 notifyStopReached(); 1084 return false; 1085 } 1086 mSeqNumber = lastSeqNumberInPlaylist - 3; 1087 if (mSeqNumber < firstSeqNumberInPlaylist) { 1088 mSeqNumber = firstSeqNumberInPlaylist; 1089 } 1090 discontinuity = true; 1091 1092 // fall through 1093 } else { 1094 if (mPlaylist != NULL) { 1095 if (mSeqNumber >= firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() 1096 && !mPlaylist->isComplete()) { 1097 // Live playlists 1098 ALOGW("sequence number %d not yet available", mSeqNumber); 1099 postMonitorQueue(delayUsToRefreshPlaylist()); 1100 return false; 1101 } 1102 ALOGE("Cannot find sequence number %d in playlist " 1103 "(contains %d - %d)", 1104 mSeqNumber, firstSeqNumberInPlaylist, 1105 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); 1106 1107 if (mTSParser != NULL) { 1108 mTSParser->signalEOS(ERROR_END_OF_STREAM); 1109 // Use an empty buffer; we don't have any new data, just want to extract 1110 // potential new access units after flush. Reset mSeqNumber to 1111 // lastSeqNumberInPlaylist such that we set the correct access unit 1112 // properties in extractAndQueueAccessUnitsFromTs. 1113 sp<ABuffer> buffer = new ABuffer(0); 1114 mSeqNumber = lastSeqNumberInPlaylist; 1115 extractAndQueueAccessUnitsFromTs(buffer); 1116 } 1117 notifyError(ERROR_END_OF_STREAM); 1118 } else { 1119 // It's possible that we were never able to download the playlist. 1120 // In this case we should notify error, instead of EOS, as EOS during 1121 // prepare means we succeeded in downloading everything. 1122 ALOGE("Failed to download playlist!"); 1123 notifyError(ERROR_IO); 1124 } 1125 1126 return false; 1127 } 1128 } 1129 1130 mNumRetries = 0; 1131 1132 CHECK(mPlaylist->itemAt( 1133 mSeqNumber - firstSeqNumberInPlaylist, 1134 &uri, 1135 &itemMeta)); 1136 1137 CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq)); 1138 1139 int32_t val; 1140 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 1141 discontinuity = true; 1142 } else if (mLastDiscontinuitySeq >= 0 1143 && mDiscontinuitySeq != mLastDiscontinuitySeq) { 1144 // Seek jumped to a new discontinuity sequence. We need to signal 1145 // a format change to decoder. Decoder needs to shutdown and be 1146 // created again if seamless format change is unsupported. 1147 FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, " 1148 "mDiscontinuitySeq %d, mStartTimeUs %lld", 1149 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs); 1150 discontinuity = true; 1151 } 1152 mLastDiscontinuitySeq = -1; 1153 1154 // decrypt a junk buffer to prefetch key; since a session uses only one http connection, 1155 // this avoids interleaved connections to the key and segment file. 1156 { 1157 sp<ABuffer> junk = new ABuffer(16); 1158 junk->setRange(0, 16); 1159 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk, 1160 true /* first */); 1161 if (err == ERROR_NOT_CONNECTED) { 1162 return false; 1163 } else if (err != OK) { 1164 notifyError(err); 1165 return false; 1166 } 1167 } 1168 1169 if ((mStartup && !mTimeChangeSignaled) || discontinuity) { 1170 // We need to signal a time discontinuity to ATSParser on the 1171 // first segment after start, or on a discontinuity segment. 1172 // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX() 1173 // to send the time discontinuity. 1174 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 1175 // If this was a live event this made no sense since 1176 // we don't have access to all the segment before the current 1177 // one. 1178 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); 1179 } 1180 1181 // Setting mTimeChangeSignaled to true, so that if start time 1182 // searching goes into 2nd segment (without a discontinuity), 1183 // we don't reset time again. It causes corruption when pending 1184 // data in ATSParser is cleared. 1185 mTimeChangeSignaled = true; 1186 } 1187 1188 if (discontinuity) { 1189 ALOGI("queueing discontinuity (explicit=%d)", discontinuity); 1190 1191 // Signal a format discontinuity to ATSParser to clear partial data 1192 // from previous streams. Not doing this causes bitstream corruption. 1193 if (mTSParser != NULL) { 1194 mTSParser.clear(); 1195 } 1196 1197 queueDiscontinuity( 1198 ATSParser::DISCONTINUITY_FORMAT_ONLY, 1199 NULL /* extra */); 1200 1201 if (mStartup && mStartTimeUsRelative && mFirstPTSValid) { 1202 // This means we guessed mStartTimeUs to be in the previous 1203 // segment (likely very close to the end), but either video or 1204 // audio has not found start by the end of that segment. 1205 // 1206 // If this new segment is not a discontinuity, keep searching. 1207 // 1208 // If this new segment even got a discontinuity marker, just 1209 // set mStartTimeUs=0, and take all samples from now on. 1210 mStartTimeUs = 0; 1211 mFirstPTSValid = false; 1212 mIDRFound = false; 1213 mVideoBuffer->clear(); 1214 } 1215 } 1216 1217 FLOGV("fetching segment %d from (%d .. %d)", 1218 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); 1219 return true; 1220 } 1221 1222 void PlaylistFetcher::onDownloadNext() { 1223 AString uri; 1224 sp<AMessage> itemMeta; 1225 sp<ABuffer> buffer; 1226 sp<ABuffer> tsBuffer; 1227 int32_t firstSeqNumberInPlaylist = 0; 1228 int32_t lastSeqNumberInPlaylist = 0; 1229 bool connectHTTP = true; 1230 1231 if (mDownloadState->hasSavedState()) { 1232 mDownloadState->restoreState( 1233 uri, 1234 itemMeta, 1235 buffer, 1236 tsBuffer, 1237 firstSeqNumberInPlaylist, 1238 lastSeqNumberInPlaylist); 1239 connectHTTP = false; 1240 FLOGV("resuming: '%s'", uri.c_str()); 1241 } else { 1242 if (!initDownloadState( 1243 uri, 1244 itemMeta, 1245 firstSeqNumberInPlaylist, 1246 lastSeqNumberInPlaylist)) { 1247 return; 1248 } 1249 FLOGV("fetching: '%s'", uri.c_str()); 1250 } 1251 1252 int64_t range_offset, range_length; 1253 if (!itemMeta->findInt64("range-offset", &range_offset) 1254 || !itemMeta->findInt64("range-length", &range_length)) { 1255 range_offset = 0; 1256 range_length = -1; 1257 } 1258 1259 // block-wise download 1260 bool shouldPause = false; 1261 ssize_t bytesRead; 1262 do { 1263 int64_t startUs = ALooper::GetNowUs(); 1264 bytesRead = mHTTPDownloader->fetchBlock( 1265 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, 1266 NULL /* actualURL */, connectHTTP); 1267 int64_t delayUs = ALooper::GetNowUs() - startUs; 1268 1269 if (bytesRead == ERROR_NOT_CONNECTED) { 1270 return; 1271 } 1272 if (bytesRead < 0) { 1273 status_t err = bytesRead; 1274 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 1275 notifyError(err); 1276 return; 1277 } 1278 1279 // add sample for bandwidth estimation, excluding samples from subtitles (as 1280 // its too small), or during startup/resumeUntil (when we could have more than 1281 // one connection open which affects bandwidth) 1282 if (!mStartup && mStopParams == NULL && bytesRead > 0 1283 && (mStreamTypeMask 1284 & (LiveSession::STREAMTYPE_AUDIO 1285 | LiveSession::STREAMTYPE_VIDEO))) { 1286 mSession->addBandwidthMeasurement(bytesRead, delayUs); 1287 if (delayUs > 2000000ll) { 1288 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip", 1289 bytesRead, (double)delayUs / 1.0e6); 1290 } 1291 } 1292 1293 connectHTTP = false; 1294 1295 CHECK(buffer != NULL); 1296 1297 size_t size = buffer->size(); 1298 // Set decryption range. 1299 buffer->setRange(size - bytesRead, bytesRead); 1300 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer, 1301 buffer->offset() == 0 /* first */); 1302 // Unset decryption range. 1303 buffer->setRange(0, size); 1304 1305 if (err != OK) { 1306 ALOGE("decryptBuffer failed w/ error %d", err); 1307 1308 notifyError(err); 1309 return; 1310 } 1311 1312 bool startUp = mStartup; // save current start up state 1313 1314 err = OK; 1315 if (bufferStartsWithTsSyncByte(buffer)) { 1316 // Incremental extraction is only supported for MPEG2 transport streams. 1317 if (tsBuffer == NULL) { 1318 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 1319 tsBuffer->setRange(0, 0); 1320 } else if (tsBuffer->capacity() != buffer->capacity()) { 1321 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); 1322 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 1323 tsBuffer->setRange(tsOff, tsSize); 1324 } 1325 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); 1326 err = extractAndQueueAccessUnitsFromTs(tsBuffer); 1327 } 1328 1329 if (err == -EAGAIN) { 1330 // starting sequence number too low/high 1331 mTSParser.clear(); 1332 for (size_t i = 0; i < mPacketSources.size(); i++) { 1333 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1334 packetSource->clear(); 1335 } 1336 postMonitorQueue(); 1337 return; 1338 } else if (err == ERROR_OUT_OF_RANGE) { 1339 // reached stopping point 1340 notifyStopReached(); 1341 return; 1342 } else if (err != OK) { 1343 notifyError(err); 1344 return; 1345 } 1346 // If we're switching, post start notification 1347 // this should only be posted when the last chunk is full processed by TSParser 1348 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) { 1349 CHECK(mStartTimeUsNotify != NULL); 1350 mStartTimeUsNotify->post(); 1351 mStartTimeUsNotify.clear(); 1352 shouldPause = true; 1353 } 1354 if (shouldPause || shouldPauseDownload()) { 1355 // save state and return if this is not the last chunk, 1356 // leaving the fetcher in paused state. 1357 if (bytesRead != 0) { 1358 mDownloadState->saveState( 1359 uri, 1360 itemMeta, 1361 buffer, 1362 tsBuffer, 1363 firstSeqNumberInPlaylist, 1364 lastSeqNumberInPlaylist); 1365 return; 1366 } 1367 shouldPause = true; 1368 } 1369 } while (bytesRead != 0); 1370 1371 if (bufferStartsWithTsSyncByte(buffer)) { 1372 // If we don't see a stream in the program table after fetching a full ts segment 1373 // mark it as nonexistent. 1374 ATSParser::SourceType srcTypes[] = 1375 { ATSParser::VIDEO, ATSParser::AUDIO }; 1376 LiveSession::StreamType streamTypes[] = 1377 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO }; 1378 const size_t kNumTypes = NELEM(srcTypes); 1379 1380 for (size_t i = 0; i < kNumTypes; i++) { 1381 ATSParser::SourceType srcType = srcTypes[i]; 1382 LiveSession::StreamType streamType = streamTypes[i]; 1383 1384 sp<AnotherPacketSource> source = 1385 static_cast<AnotherPacketSource *>( 1386 mTSParser->getSource(srcType).get()); 1387 1388 if (!mTSParser->hasSource(srcType)) { 1389 ALOGW("MPEG2 Transport stream does not contain %s data.", 1390 srcType == ATSParser::VIDEO ? "video" : "audio"); 1391 1392 mStreamTypeMask &= ~streamType; 1393 mPacketSources.removeItem(streamType); 1394 } 1395 } 1396 1397 } 1398 1399 if (checkDecryptPadding(buffer) != OK) { 1400 ALOGE("Incorrect padding bytes after decryption."); 1401 notifyError(ERROR_MALFORMED); 1402 return; 1403 } 1404 1405 if (tsBuffer != NULL) { 1406 AString method; 1407 CHECK(buffer->meta()->findString("cipher-method", &method)); 1408 if ((tsBuffer->size() > 0 && method == "NONE") 1409 || tsBuffer->size() > 16) { 1410 ALOGE("MPEG2 transport stream is not an even multiple of 188 " 1411 "bytes in length."); 1412 notifyError(ERROR_MALFORMED); 1413 return; 1414 } 1415 } 1416 1417 // bulk extract non-ts files 1418 bool startUp = mStartup; 1419 if (tsBuffer == NULL) { 1420 status_t err = extractAndQueueAccessUnits(buffer, itemMeta); 1421 if (err == -EAGAIN) { 1422 // starting sequence number too low/high 1423 postMonitorQueue(); 1424 return; 1425 } else if (err == ERROR_OUT_OF_RANGE) { 1426 // reached stopping point 1427 notifyStopReached(); 1428 return; 1429 } else if (err != OK) { 1430 notifyError(err); 1431 return; 1432 } 1433 } 1434 1435 ++mSeqNumber; 1436 1437 // if adapting, pause after found the next starting point 1438 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) { 1439 CHECK(mStartTimeUsNotify != NULL); 1440 mStartTimeUsNotify->post(); 1441 mStartTimeUsNotify.clear(); 1442 shouldPause = true; 1443 } 1444 1445 if (!shouldPause) { 1446 postMonitorQueue(); 1447 } 1448 } 1449 1450 /* 1451 * returns true if we need to adjust mSeqNumber 1452 */ 1453 bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) { 1454 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber(); 1455 1456 int64_t minDiffUs, maxDiffUs; 1457 if (mSeekMode == LiveSession::kSeekModeNextSample) { 1458 // if the previous fetcher paused in the middle of a segment, we 1459 // want to start at a segment that overlaps the last sample 1460 minDiffUs = -mPlaylist->getTargetDuration(); 1461 maxDiffUs = 0ll; 1462 } else { 1463 // if the previous fetcher paused at the end of a segment, ideally 1464 // we want to start at the segment that's roughly aligned with its 1465 // next segment, but if the two variants are not well aligned we 1466 // adjust the diff to within (-T/2, T/2) 1467 minDiffUs = -mPlaylist->getTargetDuration() / 2; 1468 maxDiffUs = mPlaylist->getTargetDuration() / 2; 1469 } 1470 1471 int32_t oldSeqNumber = mSeqNumber; 1472 ssize_t index = mSeqNumber - firstSeqNumberInPlaylist; 1473 1474 // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs 1475 int64_t diffUs = anchorTimeUs - mStartTimeUs; 1476 if (diffUs > maxDiffUs) { 1477 while (index > 0 && diffUs > maxDiffUs) { 1478 --index; 1479 1480 sp<AMessage> itemMeta; 1481 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); 1482 1483 int64_t itemDurationUs; 1484 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1485 1486 diffUs -= itemDurationUs; 1487 } 1488 } else if (diffUs < minDiffUs) { 1489 while (index + 1 < (ssize_t) mPlaylist->size() 1490 && diffUs < minDiffUs) { 1491 ++index; 1492 1493 sp<AMessage> itemMeta; 1494 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); 1495 1496 int64_t itemDurationUs; 1497 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1498 1499 diffUs += itemDurationUs; 1500 } 1501 } 1502 1503 mSeqNumber = firstSeqNumberInPlaylist + index; 1504 1505 if (mSeqNumber != oldSeqNumber) { 1506 FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]", 1507 (long long) anchorTimeUs - mStartTimeUs, 1508 (long long) minDiffUs, 1509 (long long) maxDiffUs); 1510 return true; 1511 } 1512 return false; 1513 } 1514 1515 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const { 1516 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber(); 1517 1518 size_t index = 0; 1519 while (index < mPlaylist->size()) { 1520 sp<AMessage> itemMeta; 1521 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); 1522 size_t curDiscontinuitySeq; 1523 CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq)); 1524 int32_t seqNumber = firstSeqNumberInPlaylist + index; 1525 if (curDiscontinuitySeq == discontinuitySeq) { 1526 return seqNumber; 1527 } else if (curDiscontinuitySeq > discontinuitySeq) { 1528 return seqNumber <= 0 ? 0 : seqNumber - 1; 1529 } 1530 1531 ++index; 1532 } 1533 1534 return firstSeqNumberInPlaylist + mPlaylist->size(); 1535 } 1536 1537 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { 1538 size_t index = 0; 1539 int64_t segmentStartUs = 0; 1540 while (index < mPlaylist->size()) { 1541 sp<AMessage> itemMeta; 1542 CHECK(mPlaylist->itemAt( 1543 index, NULL /* uri */, &itemMeta)); 1544 1545 int64_t itemDurationUs; 1546 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1547 1548 if (timeUs < segmentStartUs + itemDurationUs) { 1549 break; 1550 } 1551 1552 segmentStartUs += itemDurationUs; 1553 ++index; 1554 } 1555 1556 if (index >= mPlaylist->size()) { 1557 index = mPlaylist->size() - 1; 1558 } 1559 1560 return mPlaylist->getFirstSeqNumber() + index; 1561 } 1562 1563 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties( 1564 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) { 1565 sp<MetaData> format = source->getFormat(); 1566 if (format != NULL) { 1567 // for simplicity, store a reference to the format in each unit 1568 accessUnit->meta()->setObject("format", format); 1569 } 1570 1571 if (discard) { 1572 accessUnit->meta()->setInt32("discard", discard); 1573 } 1574 1575 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1576 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1577 accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS); 1578 accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber)); 1579 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { 1580 accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs); 1581 } 1582 return accessUnit; 1583 } 1584 1585 bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) { 1586 if (!mFirstPTSValid) { 1587 mFirstTimeUs = timeUs; 1588 mFirstPTSValid = true; 1589 } 1590 bool startTimeReached = true; 1591 if (mStartTimeUsRelative) { 1592 FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld", 1593 (long long)timeUs, 1594 (long long)mFirstTimeUs, 1595 (long long)(timeUs - mFirstTimeUs)); 1596 timeUs -= mFirstTimeUs; 1597 if (timeUs < 0) { 1598 FLOGV("clamp negative timeUs to 0"); 1599 timeUs = 0; 1600 } 1601 startTimeReached = (timeUs >= mStartTimeUs); 1602 } 1603 return startTimeReached; 1604 } 1605 1606 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) { 1607 if (mTSParser == NULL) { 1608 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. 1609 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); 1610 } 1611 1612 if (mNextPTSTimeUs >= 0ll) { 1613 sp<AMessage> extra = new AMessage; 1614 // Since we are using absolute timestamps, signal an offset of 0 to prevent 1615 // ATSParser from skewing the timestamps of access units. 1616 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); 1617 1618 // When adapting, signal a recent media time to the parser, 1619 // so that PTS wrap around is handled for the new variant. 1620 if (mStartTimeUs >= 0 && !mStartTimeUsRelative) { 1621 extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs); 1622 } 1623 1624 mTSParser->signalDiscontinuity( 1625 ATSParser::DISCONTINUITY_TIME, extra); 1626 1627 mNextPTSTimeUs = -1ll; 1628 } 1629 1630 size_t offset = 0; 1631 while (offset + 188 <= buffer->size()) { 1632 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); 1633 1634 if (err != OK) { 1635 return err; 1636 } 1637 1638 offset += 188; 1639 } 1640 // setRange to indicate consumed bytes. 1641 buffer->setRange(buffer->offset() + offset, buffer->size() - offset); 1642 1643 if (mSegmentFirstPTS < 0ll) { 1644 // get the smallest first PTS from all streams present in this parser 1645 for (size_t i = mPacketSources.size(); i > 0;) { 1646 i--; 1647 const LiveSession::StreamType stream = mPacketSources.keyAt(i); 1648 if (stream == LiveSession::STREAMTYPE_SUBTITLES) { 1649 ALOGE("MPEG2 Transport streams do not contain subtitles."); 1650 return ERROR_MALFORMED; 1651 } 1652 if (stream == LiveSession::STREAMTYPE_METADATA) { 1653 continue; 1654 } 1655 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream); 1656 sp<AnotherPacketSource> source = 1657 static_cast<AnotherPacketSource *>( 1658 mTSParser->getSource(type).get()); 1659 1660 if (source == NULL) { 1661 continue; 1662 } 1663 sp<AMessage> meta = source->getMetaAfterLastDequeued(0); 1664 if (meta != NULL) { 1665 int64_t timeUs; 1666 CHECK(meta->findInt64("timeUs", &timeUs)); 1667 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) { 1668 mSegmentFirstPTS = timeUs; 1669 } 1670 } 1671 } 1672 if (mSegmentFirstPTS < 0ll) { 1673 // didn't find any TS packet, can return early 1674 return OK; 1675 } 1676 if (!mStartTimeUsRelative) { 1677 // mStartup 1678 // mStartup is true until we have queued a packet for all the streams 1679 // we are fetching. We queue packets whose timestamps are greater than 1680 // mStartTimeUs. 1681 // mSegmentStartTimeUs >= 0 1682 // mSegmentStartTimeUs is non-negative when adapting or switching tracks 1683 // adjustSeqNumberWithAnchorTime(timeUs) == true 1684 // we guessed a seq number that's either too large or too small. 1685 // If this happens, we'll adjust mSeqNumber and restart fetching from new 1686 // location. Note that we only want to adjust once, so set mSegmentStartTimeUs 1687 // to -1 so that we don't enter this chunk next time. 1688 if (mStartup && mSegmentStartTimeUs >= 0 1689 && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) { 1690 mStartTimeUsNotify = mNotify->dup(); 1691 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 1692 mStartTimeUsNotify->setString("uri", mURI); 1693 mIDRFound = false; 1694 mSegmentStartTimeUs = -1; 1695 return -EAGAIN; 1696 } 1697 } 1698 } 1699 1700 status_t err = OK; 1701 for (size_t i = mPacketSources.size(); i > 0;) { 1702 i--; 1703 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1704 1705 const LiveSession::StreamType stream = mPacketSources.keyAt(i); 1706 if (stream == LiveSession::STREAMTYPE_SUBTITLES) { 1707 ALOGE("MPEG2 Transport streams do not contain subtitles."); 1708 return ERROR_MALFORMED; 1709 } 1710 1711 const char *key = LiveSession::getKeyForStream(stream); 1712 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream); 1713 1714 sp<AnotherPacketSource> source = 1715 static_cast<AnotherPacketSource *>( 1716 mTSParser->getSource(type).get()); 1717 1718 if (source == NULL) { 1719 continue; 1720 } 1721 1722 const char *mime; 1723 sp<MetaData> format = source->getFormat(); 1724 bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime) 1725 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC); 1726 1727 sp<ABuffer> accessUnit; 1728 status_t finalResult; 1729 while (source->hasBufferAvailable(&finalResult) 1730 && source->dequeueAccessUnit(&accessUnit) == OK) { 1731 1732 int64_t timeUs; 1733 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1734 1735 if (mStartup) { 1736 bool startTimeReached = isStartTimeReached(timeUs); 1737 1738 if (!startTimeReached || (isAvc && !mIDRFound)) { 1739 // buffer up to the closest preceding IDR frame in the next segement, 1740 // or the closest succeeding IDR frame after the exact position 1741 FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d", 1742 (long long)timeUs, 1743 (long long)mStartTimeUs, 1744 (long long)timeUs - mStartTimeUs, 1745 mIDRFound); 1746 if (isAvc) { 1747 if (IsIDR(accessUnit)) { 1748 mVideoBuffer->clear(); 1749 FSLOGV(stream, "found IDR, clear mVideoBuffer"); 1750 mIDRFound = true; 1751 } 1752 if (mIDRFound && mStartTimeUsRelative && !startTimeReached) { 1753 mVideoBuffer->queueAccessUnit(accessUnit); 1754 FSLOGV(stream, "saving AVC video AccessUnit"); 1755 } 1756 } 1757 if (!startTimeReached || (isAvc && !mIDRFound)) { 1758 continue; 1759 } 1760 } 1761 } 1762 1763 if (mStartTimeUsNotify != NULL) { 1764 uint32_t streamMask = 0; 1765 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); 1766 if ((mStreamTypeMask & mPacketSources.keyAt(i)) 1767 && !(streamMask & mPacketSources.keyAt(i))) { 1768 streamMask |= mPacketSources.keyAt(i); 1769 mStartTimeUsNotify->setInt32("streamMask", streamMask); 1770 FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x", 1771 (long long)timeUs, streamMask); 1772 1773 if (streamMask == mStreamTypeMask) { 1774 FLOGV("found start point for all streams"); 1775 mStartup = false; 1776 } 1777 } 1778 } 1779 1780 if (mStopParams != NULL) { 1781 int32_t discontinuitySeq; 1782 int64_t stopTimeUs; 1783 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1784 || discontinuitySeq > mDiscontinuitySeq 1785 || !mStopParams->findInt64(key, &stopTimeUs) 1786 || (discontinuitySeq == mDiscontinuitySeq 1787 && timeUs >= stopTimeUs)) { 1788 FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs); 1789 mStreamTypeMask &= ~stream; 1790 mPacketSources.removeItemsAt(i); 1791 break; 1792 } 1793 } 1794 1795 if (stream == LiveSession::STREAMTYPE_VIDEO) { 1796 const bool discard = true; 1797 status_t status; 1798 while (mVideoBuffer->hasBufferAvailable(&status)) { 1799 sp<ABuffer> videoBuffer; 1800 mVideoBuffer->dequeueAccessUnit(&videoBuffer); 1801 setAccessUnitProperties(videoBuffer, source, discard); 1802 packetSource->queueAccessUnit(videoBuffer); 1803 int64_t bufferTimeUs; 1804 CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs)); 1805 FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld", 1806 (long long)bufferTimeUs); 1807 } 1808 } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) { 1809 mHasMetadata = true; 1810 sp<AMessage> notify = mNotify->dup(); 1811 notify->setInt32("what", kWhatMetadataDetected); 1812 notify->post(); 1813 } 1814 1815 setAccessUnitProperties(accessUnit, source); 1816 packetSource->queueAccessUnit(accessUnit); 1817 FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs); 1818 } 1819 1820 if (err != OK) { 1821 break; 1822 } 1823 } 1824 1825 if (err != OK) { 1826 for (size_t i = mPacketSources.size(); i > 0;) { 1827 i--; 1828 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1829 packetSource->clear(); 1830 } 1831 return err; 1832 } 1833 1834 if (!mStreamTypeMask) { 1835 // Signal gap is filled between original and new stream. 1836 FLOGV("reached stop point for all streams"); 1837 return ERROR_OUT_OF_RANGE; 1838 } 1839 1840 return OK; 1841 } 1842 1843 /* static */ 1844 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence( 1845 const sp<ABuffer> &buffer) { 1846 size_t pos = 0; 1847 1848 // skip possible BOM 1849 if (buffer->size() >= pos + 3 && 1850 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) { 1851 pos += 3; 1852 } 1853 1854 // accept WEBVTT followed by SPACE, TAB or (CR) LF 1855 if (buffer->size() < pos + 6 || 1856 memcmp("WEBVTT", buffer->data() + pos, 6)) { 1857 return false; 1858 } 1859 pos += 6; 1860 1861 if (buffer->size() == pos) { 1862 return true; 1863 } 1864 1865 uint8_t sep = buffer->data()[pos]; 1866 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r'; 1867 } 1868 1869 status_t PlaylistFetcher::extractAndQueueAccessUnits( 1870 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { 1871 if (bufferStartsWithWebVTTMagicSequence(buffer)) { 1872 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { 1873 ALOGE("This stream only contains subtitles."); 1874 return ERROR_MALFORMED; 1875 } 1876 1877 const sp<AnotherPacketSource> packetSource = 1878 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 1879 1880 int64_t durationUs; 1881 CHECK(itemMeta->findInt64("durationUs", &durationUs)); 1882 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); 1883 buffer->meta()->setInt64("durationUs", durationUs); 1884 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1885 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1886 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration); 1887 packetSource->queueAccessUnit(buffer); 1888 return OK; 1889 } 1890 1891 if (mNextPTSTimeUs >= 0ll) { 1892 mNextPTSTimeUs = -1ll; 1893 } 1894 1895 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio 1896 // stream prefixed by an ID3 tag. 1897 1898 bool firstID3Tag = true; 1899 uint64_t PTS = 0; 1900 1901 for (;;) { 1902 // Make sure to skip all ID3 tags preceding the audio data. 1903 // At least one must be present to provide the PTS timestamp. 1904 1905 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */); 1906 if (!id3.isValid()) { 1907 if (firstID3Tag) { 1908 ALOGE("Unable to parse ID3 tag."); 1909 return ERROR_MALFORMED; 1910 } else { 1911 break; 1912 } 1913 } 1914 1915 if (firstID3Tag) { 1916 bool found = false; 1917 1918 ID3::Iterator it(id3, "PRIV"); 1919 while (!it.done()) { 1920 size_t length; 1921 const uint8_t *data = it.getData(&length); 1922 if (!data) { 1923 return ERROR_MALFORMED; 1924 } 1925 1926 static const char *kMatchName = 1927 "com.apple.streaming.transportStreamTimestamp"; 1928 static const size_t kMatchNameLen = strlen(kMatchName); 1929 1930 if (length == kMatchNameLen + 1 + 8 1931 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) { 1932 found = true; 1933 PTS = U64_AT(&data[kMatchNameLen + 1]); 1934 } 1935 1936 it.next(); 1937 } 1938 1939 if (!found) { 1940 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag."); 1941 return ERROR_MALFORMED; 1942 } 1943 } 1944 1945 // skip the ID3 tag 1946 buffer->setRange( 1947 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize()); 1948 1949 firstID3Tag = false; 1950 } 1951 1952 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { 1953 ALOGW("This stream only contains audio data!"); 1954 1955 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO; 1956 1957 if (mStreamTypeMask == 0) { 1958 return OK; 1959 } 1960 } 1961 1962 sp<AnotherPacketSource> packetSource = 1963 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO); 1964 1965 if (packetSource->getFormat() == NULL && buffer->size() >= 7) { 1966 ABitReader bits(buffer->data(), buffer->size()); 1967 1968 // adts_fixed_header 1969 1970 CHECK_EQ(bits.getBits(12), 0xfffu); 1971 bits.skipBits(3); // ID, layer 1972 bool protection_absent __unused = bits.getBits(1) != 0; 1973 1974 unsigned profile = bits.getBits(2); 1975 CHECK_NE(profile, 3u); 1976 unsigned sampling_freq_index = bits.getBits(4); 1977 bits.getBits(1); // private_bit 1978 unsigned channel_configuration = bits.getBits(3); 1979 CHECK_NE(channel_configuration, 0u); 1980 bits.skipBits(2); // original_copy, home 1981 1982 sp<MetaData> meta = MakeAACCodecSpecificData( 1983 profile, sampling_freq_index, channel_configuration); 1984 1985 meta->setInt32(kKeyIsADTS, true); 1986 1987 packetSource->setFormat(meta); 1988 } 1989 1990 int64_t numSamples = 0ll; 1991 int32_t sampleRate; 1992 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); 1993 1994 int64_t timeUs = (PTS * 100ll) / 9ll; 1995 if (mStartup && !mFirstPTSValid) { 1996 mFirstPTSValid = true; 1997 mFirstTimeUs = timeUs; 1998 } 1999 2000 if (mSegmentFirstPTS < 0ll) { 2001 mSegmentFirstPTS = timeUs; 2002 if (!mStartTimeUsRelative) { 2003 // Duplicated logic from how we handle .ts playlists. 2004 if (mStartup && mSegmentStartTimeUs >= 0 2005 && adjustSeqNumberWithAnchorTime(timeUs)) { 2006 mSegmentStartTimeUs = -1; 2007 return -EAGAIN; 2008 } 2009 } 2010 } 2011 2012 size_t offset = 0; 2013 while (offset < buffer->size()) { 2014 const uint8_t *adtsHeader = buffer->data() + offset; 2015 CHECK_LT(offset + 5, buffer->size()); 2016 2017 unsigned aac_frame_length = 2018 ((adtsHeader[3] & 3) << 11) 2019 | (adtsHeader[4] << 3) 2020 | (adtsHeader[5] >> 5); 2021 2022 if (aac_frame_length == 0) { 2023 const uint8_t *id3Header = adtsHeader; 2024 if (!memcmp(id3Header, "ID3", 3)) { 2025 ID3 id3(id3Header, buffer->size() - offset, true); 2026 if (id3.isValid()) { 2027 offset += id3.rawSize(); 2028 continue; 2029 }; 2030 } 2031 return ERROR_MALFORMED; 2032 } 2033 2034 CHECK_LE(offset + aac_frame_length, buffer->size()); 2035 2036 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; 2037 offset += aac_frame_length; 2038 2039 // Each AAC frame encodes 1024 samples. 2040 numSamples += 1024; 2041 2042 if (mStartup) { 2043 int64_t startTimeUs = unitTimeUs; 2044 if (mStartTimeUsRelative) { 2045 startTimeUs -= mFirstTimeUs; 2046 if (startTimeUs < 0) { 2047 startTimeUs = 0; 2048 } 2049 } 2050 if (startTimeUs < mStartTimeUs) { 2051 continue; 2052 } 2053 2054 if (mStartTimeUsNotify != NULL) { 2055 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO); 2056 mStartup = false; 2057 } 2058 } 2059 2060 if (mStopParams != NULL) { 2061 int32_t discontinuitySeq; 2062 int64_t stopTimeUs; 2063 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 2064 || discontinuitySeq > mDiscontinuitySeq 2065 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs) 2066 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) { 2067 mStreamTypeMask = 0; 2068 mPacketSources.clear(); 2069 return ERROR_OUT_OF_RANGE; 2070 } 2071 } 2072 2073 sp<ABuffer> unit = new ABuffer(aac_frame_length); 2074 memcpy(unit->data(), adtsHeader, aac_frame_length); 2075 2076 unit->meta()->setInt64("timeUs", unitTimeUs); 2077 setAccessUnitProperties(unit, packetSource); 2078 packetSource->queueAccessUnit(unit); 2079 } 2080 2081 return OK; 2082 } 2083 2084 void PlaylistFetcher::updateDuration() { 2085 int64_t durationUs = 0ll; 2086 for (size_t index = 0; index < mPlaylist->size(); ++index) { 2087 sp<AMessage> itemMeta; 2088 CHECK(mPlaylist->itemAt( 2089 index, NULL /* uri */, &itemMeta)); 2090 2091 int64_t itemDurationUs; 2092 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 2093 2094 durationUs += itemDurationUs; 2095 } 2096 2097 sp<AMessage> msg = mNotify->dup(); 2098 msg->setInt32("what", kWhatDurationUpdate); 2099 msg->setInt64("durationUs", durationUs); 2100 msg->post(); 2101 } 2102 2103 void PlaylistFetcher::updateTargetDuration() { 2104 sp<AMessage> msg = mNotify->dup(); 2105 msg->setInt32("what", kWhatTargetDurationUpdate); 2106 msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration()); 2107 msg->post(); 2108 } 2109 2110 } // namespace android 2111