1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "PlaylistFetcher" 19 #include <utils/Log.h> 20 21 #include "PlaylistFetcher.h" 22 23 #include "LiveDataSource.h" 24 #include "LiveSession.h" 25 #include "M3UParser.h" 26 27 #include "include/avc_utils.h" 28 #include "include/HTTPBase.h" 29 #include "include/ID3.h" 30 #include "mpeg2ts/AnotherPacketSource.h" 31 32 #include <media/IStreamSource.h> 33 #include <media/stagefright/foundation/ABitReader.h> 34 #include <media/stagefright/foundation/ABuffer.h> 35 #include <media/stagefright/foundation/ADebug.h> 36 #include <media/stagefright/foundation/hexdump.h> 37 #include <media/stagefright/FileSource.h> 38 #include <media/stagefright/MediaDefs.h> 39 #include <media/stagefright/MetaData.h> 40 #include <media/stagefright/Utils.h> 41 42 #include <ctype.h> 43 #include <inttypes.h> 44 #include <openssl/aes.h> 45 #include <openssl/md5.h> 46 47 namespace android { 48 49 // static 50 const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; 51 const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; 52 // LCM of 188 (size of a TS packet) & 1k works well 53 const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024; 54 const int32_t PlaylistFetcher::kNumSkipFrames = 5; 55 56 PlaylistFetcher::PlaylistFetcher( 57 const sp<AMessage> ¬ify, 58 const sp<LiveSession> &session, 59 const char *uri, 60 int32_t subtitleGeneration) 61 : mNotify(notify), 62 mStartTimeUsNotify(notify->dup()), 63 mSession(session), 64 mURI(uri), 65 mStreamTypeMask(0), 66 mStartTimeUs(-1ll), 67 mSegmentStartTimeUs(-1ll), 68 mDiscontinuitySeq(-1ll), 69 mStartTimeUsRelative(false), 70 mLastPlaylistFetchTimeUs(-1ll), 71 mSeqNumber(-1), 72 mNumRetries(0), 73 mStartup(true), 74 mAdaptive(false), 75 mPrepared(false), 76 mNextPTSTimeUs(-1ll), 77 mMonitorQueueGeneration(0), 78 mSubtitleGeneration(subtitleGeneration), 79 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), 80 mFirstPTSValid(false), 81 mAbsoluteTimeAnchorUs(0ll), 82 mVideoBuffer(new AnotherPacketSource(NULL)) { 83 memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); 84 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 85 mStartTimeUsNotify->setInt32("streamMask", 0); 86 } 87 88 PlaylistFetcher::~PlaylistFetcher() { 89 } 90 91 int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { 92 CHECK(mPlaylist != NULL); 93 94 int32_t firstSeqNumberInPlaylist; 95 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 96 "media-sequence", &firstSeqNumberInPlaylist)) { 97 firstSeqNumberInPlaylist = 0; 98 } 99 100 int32_t lastSeqNumberInPlaylist = 101 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 102 103 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 104 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 105 106 int64_t segmentStartUs = 0ll; 107 for (int32_t index = 0; 108 index < seqNumber - firstSeqNumberInPlaylist; ++index) { 109 sp<AMessage> itemMeta; 110 CHECK(mPlaylist->itemAt( 111 index, NULL /* uri */, &itemMeta)); 112 113 int64_t itemDurationUs; 114 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 115 116 segmentStartUs += itemDurationUs; 117 } 118 119 return segmentStartUs; 120 } 121 122 int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { 123 int64_t nowUs = ALooper::GetNowUs(); 124 125 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { 126 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); 127 return 0ll; 128 } 129 130 if (mPlaylist->isComplete()) { 131 return (~0llu >> 1); 132 } 133 134 int32_t targetDurationSecs; 135 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 136 137 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 138 139 int64_t minPlaylistAgeUs; 140 141 switch (mRefreshState) { 142 case INITIAL_MINIMUM_RELOAD_DELAY: 143 { 144 size_t n = mPlaylist->size(); 145 if (n > 0) { 146 sp<AMessage> itemMeta; 147 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta)); 148 149 int64_t itemDurationUs; 150 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 151 152 minPlaylistAgeUs = itemDurationUs; 153 break; 154 } 155 156 // fall through 157 } 158 159 case FIRST_UNCHANGED_RELOAD_ATTEMPT: 160 { 161 minPlaylistAgeUs = targetDurationUs / 2; 162 break; 163 } 164 165 case SECOND_UNCHANGED_RELOAD_ATTEMPT: 166 { 167 minPlaylistAgeUs = (targetDurationUs * 3) / 2; 168 break; 169 } 170 171 case THIRD_UNCHANGED_RELOAD_ATTEMPT: 172 { 173 minPlaylistAgeUs = targetDurationUs * 3; 174 break; 175 } 176 177 default: 178 TRESPASS(); 179 break; 180 } 181 182 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; 183 return delayUs > 0ll ? delayUs : 0ll; 184 } 185 186 status_t PlaylistFetcher::decryptBuffer( 187 size_t playlistIndex, const sp<ABuffer> &buffer, 188 bool first) { 189 sp<AMessage> itemMeta; 190 bool found = false; 191 AString method; 192 193 for (ssize_t i = playlistIndex; i >= 0; --i) { 194 AString uri; 195 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 196 197 if (itemMeta->findString("cipher-method", &method)) { 198 found = true; 199 break; 200 } 201 } 202 203 if (!found) { 204 method = "NONE"; 205 } 206 buffer->meta()->setString("cipher-method", method.c_str()); 207 208 if (method == "NONE") { 209 return OK; 210 } else if (!(method == "AES-128")) { 211 ALOGE("Unsupported cipher method '%s'", method.c_str()); 212 return ERROR_UNSUPPORTED; 213 } 214 215 AString keyURI; 216 if (!itemMeta->findString("cipher-uri", &keyURI)) { 217 ALOGE("Missing key uri"); 218 return ERROR_MALFORMED; 219 } 220 221 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 222 223 sp<ABuffer> key; 224 if (index >= 0) { 225 key = mAESKeyForURI.valueAt(index); 226 } else { 227 ssize_t err = mSession->fetchFile(keyURI.c_str(), &key); 228 229 if (err < 0) { 230 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 231 return ERROR_IO; 232 } else if (key->size() != 16) { 233 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str()); 234 return ERROR_MALFORMED; 235 } 236 237 mAESKeyForURI.add(keyURI, key); 238 } 239 240 AES_KEY aes_key; 241 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 242 ALOGE("failed to set AES decryption key."); 243 return UNKNOWN_ERROR; 244 } 245 246 size_t n = buffer->size(); 247 if (!n) { 248 return OK; 249 } 250 CHECK(n % 16 == 0); 251 252 if (first) { 253 // If decrypting the first block in a file, read the iv from the manifest 254 // or derive the iv from the file's sequence number. 255 256 AString iv; 257 if (itemMeta->findString("cipher-iv", &iv)) { 258 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 259 || iv.size() != 16 * 2 + 2) { 260 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 261 return ERROR_MALFORMED; 262 } 263 264 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 265 for (size_t i = 0; i < 16; ++i) { 266 char c1 = tolower(iv.c_str()[2 + 2 * i]); 267 char c2 = tolower(iv.c_str()[3 + 2 * i]); 268 if (!isxdigit(c1) || !isxdigit(c2)) { 269 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 270 return ERROR_MALFORMED; 271 } 272 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 273 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 274 275 mAESInitVec[i] = nibble1 << 4 | nibble2; 276 } 277 } else { 278 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 279 mAESInitVec[15] = mSeqNumber & 0xff; 280 mAESInitVec[14] = (mSeqNumber >> 8) & 0xff; 281 mAESInitVec[13] = (mSeqNumber >> 16) & 0xff; 282 mAESInitVec[12] = (mSeqNumber >> 24) & 0xff; 283 } 284 } 285 286 AES_cbc_encrypt( 287 buffer->data(), buffer->data(), buffer->size(), 288 &aes_key, mAESInitVec, AES_DECRYPT); 289 290 return OK; 291 } 292 293 status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) { 294 status_t err; 295 AString method; 296 CHECK(buffer->meta()->findString("cipher-method", &method)); 297 if (method == "NONE") { 298 return OK; 299 } 300 301 uint8_t padding = 0; 302 if (buffer->size() > 0) { 303 padding = buffer->data()[buffer->size() - 1]; 304 } 305 306 if (padding > 16) { 307 return ERROR_MALFORMED; 308 } 309 310 for (size_t i = buffer->size() - padding; i < padding; i++) { 311 if (buffer->data()[i] != padding) { 312 return ERROR_MALFORMED; 313 } 314 } 315 316 buffer->setRange(buffer->offset(), buffer->size() - padding); 317 return OK; 318 } 319 320 void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { 321 int64_t maxDelayUs = delayUsToRefreshPlaylist(); 322 if (maxDelayUs < minDelayUs) { 323 maxDelayUs = minDelayUs; 324 } 325 if (delayUs > maxDelayUs) { 326 ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs); 327 delayUs = maxDelayUs; 328 } 329 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); 330 msg->setInt32("generation", mMonitorQueueGeneration); 331 msg->post(delayUs); 332 } 333 334 void PlaylistFetcher::cancelMonitorQueue() { 335 ++mMonitorQueueGeneration; 336 } 337 338 void PlaylistFetcher::startAsync( 339 const sp<AnotherPacketSource> &audioSource, 340 const sp<AnotherPacketSource> &videoSource, 341 const sp<AnotherPacketSource> &subtitleSource, 342 int64_t startTimeUs, 343 int64_t segmentStartTimeUs, 344 int32_t startDiscontinuitySeq, 345 bool adaptive) { 346 sp<AMessage> msg = new AMessage(kWhatStart, id()); 347 348 uint32_t streamTypeMask = 0ul; 349 350 if (audioSource != NULL) { 351 msg->setPointer("audioSource", audioSource.get()); 352 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO; 353 } 354 355 if (videoSource != NULL) { 356 msg->setPointer("videoSource", videoSource.get()); 357 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO; 358 } 359 360 if (subtitleSource != NULL) { 361 msg->setPointer("subtitleSource", subtitleSource.get()); 362 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES; 363 } 364 365 msg->setInt32("streamTypeMask", streamTypeMask); 366 msg->setInt64("startTimeUs", startTimeUs); 367 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); 368 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); 369 msg->setInt32("adaptive", adaptive); 370 msg->post(); 371 } 372 373 void PlaylistFetcher::pauseAsync() { 374 (new AMessage(kWhatPause, id()))->post(); 375 } 376 377 void PlaylistFetcher::stopAsync(bool clear) { 378 sp<AMessage> msg = new AMessage(kWhatStop, id()); 379 msg->setInt32("clear", clear); 380 msg->post(); 381 } 382 383 void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) { 384 AMessage* msg = new AMessage(kWhatResumeUntil, id()); 385 msg->setMessage("params", params); 386 msg->post(); 387 } 388 389 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { 390 switch (msg->what()) { 391 case kWhatStart: 392 { 393 status_t err = onStart(msg); 394 395 sp<AMessage> notify = mNotify->dup(); 396 notify->setInt32("what", kWhatStarted); 397 notify->setInt32("err", err); 398 notify->post(); 399 break; 400 } 401 402 case kWhatPause: 403 { 404 onPause(); 405 406 sp<AMessage> notify = mNotify->dup(); 407 notify->setInt32("what", kWhatPaused); 408 notify->post(); 409 break; 410 } 411 412 case kWhatStop: 413 { 414 onStop(msg); 415 416 sp<AMessage> notify = mNotify->dup(); 417 notify->setInt32("what", kWhatStopped); 418 notify->post(); 419 break; 420 } 421 422 case kWhatMonitorQueue: 423 case kWhatDownloadNext: 424 { 425 int32_t generation; 426 CHECK(msg->findInt32("generation", &generation)); 427 428 if (generation != mMonitorQueueGeneration) { 429 // Stale event 430 break; 431 } 432 433 if (msg->what() == kWhatMonitorQueue) { 434 onMonitorQueue(); 435 } else { 436 onDownloadNext(); 437 } 438 break; 439 } 440 441 case kWhatResumeUntil: 442 { 443 onResumeUntil(msg); 444 break; 445 } 446 447 default: 448 TRESPASS(); 449 } 450 } 451 452 status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { 453 mPacketSources.clear(); 454 455 uint32_t streamTypeMask; 456 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); 457 458 int64_t startTimeUs; 459 int64_t segmentStartTimeUs; 460 int32_t startDiscontinuitySeq; 461 int32_t adaptive; 462 CHECK(msg->findInt64("startTimeUs", &startTimeUs)); 463 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); 464 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); 465 CHECK(msg->findInt32("adaptive", &adaptive)); 466 467 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { 468 void *ptr; 469 CHECK(msg->findPointer("audioSource", &ptr)); 470 471 mPacketSources.add( 472 LiveSession::STREAMTYPE_AUDIO, 473 static_cast<AnotherPacketSource *>(ptr)); 474 } 475 476 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) { 477 void *ptr; 478 CHECK(msg->findPointer("videoSource", &ptr)); 479 480 mPacketSources.add( 481 LiveSession::STREAMTYPE_VIDEO, 482 static_cast<AnotherPacketSource *>(ptr)); 483 } 484 485 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) { 486 void *ptr; 487 CHECK(msg->findPointer("subtitleSource", &ptr)); 488 489 mPacketSources.add( 490 LiveSession::STREAMTYPE_SUBTITLES, 491 static_cast<AnotherPacketSource *>(ptr)); 492 } 493 494 mStreamTypeMask = streamTypeMask; 495 496 mSegmentStartTimeUs = segmentStartTimeUs; 497 mDiscontinuitySeq = startDiscontinuitySeq; 498 499 if (startTimeUs >= 0) { 500 mStartTimeUs = startTimeUs; 501 mSeqNumber = -1; 502 mStartup = true; 503 mPrepared = false; 504 mAdaptive = adaptive; 505 } 506 507 postMonitorQueue(); 508 509 return OK; 510 } 511 512 void PlaylistFetcher::onPause() { 513 cancelMonitorQueue(); 514 } 515 516 void PlaylistFetcher::onStop(const sp<AMessage> &msg) { 517 cancelMonitorQueue(); 518 519 int32_t clear; 520 CHECK(msg->findInt32("clear", &clear)); 521 if (clear) { 522 for (size_t i = 0; i < mPacketSources.size(); i++) { 523 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 524 packetSource->clear(); 525 } 526 } 527 528 mPacketSources.clear(); 529 mStreamTypeMask = 0; 530 } 531 532 // Resume until we have reached the boundary timestamps listed in `msg`; when 533 // the remaining time is too short (within a resume threshold) stop immediately 534 // instead. 535 status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { 536 sp<AMessage> params; 537 CHECK(msg->findMessage("params", ¶ms)); 538 539 bool stop = false; 540 for (size_t i = 0; i < mPacketSources.size(); i++) { 541 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 542 543 const char *stopKey; 544 int streamType = mPacketSources.keyAt(i); 545 switch (streamType) { 546 case LiveSession::STREAMTYPE_VIDEO: 547 stopKey = "timeUsVideo"; 548 break; 549 550 case LiveSession::STREAMTYPE_AUDIO: 551 stopKey = "timeUsAudio"; 552 break; 553 554 case LiveSession::STREAMTYPE_SUBTITLES: 555 stopKey = "timeUsSubtitle"; 556 break; 557 558 default: 559 TRESPASS(); 560 } 561 562 // Don't resume if we would stop within a resume threshold. 563 int32_t discontinuitySeq; 564 int64_t latestTimeUs = 0, stopTimeUs = 0; 565 sp<AMessage> latestMeta = packetSource->getLatestEnqueuedMeta(); 566 if (latestMeta != NULL 567 && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq) 568 && discontinuitySeq == mDiscontinuitySeq 569 && latestMeta->findInt64("timeUs", &latestTimeUs) 570 && params->findInt64(stopKey, &stopTimeUs) 571 && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) { 572 stop = true; 573 } 574 } 575 576 if (stop) { 577 for (size_t i = 0; i < mPacketSources.size(); i++) { 578 mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); 579 } 580 stopAsync(/* clear = */ false); 581 return OK; 582 } 583 584 mStopParams = params; 585 postMonitorQueue(); 586 587 return OK; 588 } 589 590 void PlaylistFetcher::notifyError(status_t err) { 591 sp<AMessage> notify = mNotify->dup(); 592 notify->setInt32("what", kWhatError); 593 notify->setInt32("err", err); 594 notify->post(); 595 } 596 597 void PlaylistFetcher::queueDiscontinuity( 598 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { 599 for (size_t i = 0; i < mPacketSources.size(); ++i) { 600 // do not discard buffer upon #EXT-X-DISCONTINUITY tag 601 // (seek will discard buffer by abandoning old fetchers) 602 mPacketSources.valueAt(i)->queueDiscontinuity( 603 type, extra, false /* discard */); 604 } 605 } 606 607 void PlaylistFetcher::onMonitorQueue() { 608 bool downloadMore = false; 609 refreshPlaylist(); 610 611 int32_t targetDurationSecs; 612 int64_t targetDurationUs = kMinBufferedDurationUs; 613 if (mPlaylist != NULL) { 614 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 615 "target-duration", &targetDurationSecs)) { 616 ALOGE("Playlist is missing required EXT-X-TARGETDURATION tag"); 617 notifyError(ERROR_MALFORMED); 618 return; 619 } 620 targetDurationUs = targetDurationSecs * 1000000ll; 621 } 622 623 // buffer at least 3 times the target duration, or up to 10 seconds 624 int64_t durationToBufferUs = targetDurationUs * 3; 625 if (durationToBufferUs > kMinBufferedDurationUs) { 626 durationToBufferUs = kMinBufferedDurationUs; 627 } 628 629 int64_t bufferedDurationUs = 0ll; 630 status_t finalResult = NOT_ENOUGH_DATA; 631 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 632 sp<AnotherPacketSource> packetSource = 633 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 634 635 bufferedDurationUs = 636 packetSource->getBufferedDurationUs(&finalResult); 637 finalResult = OK; 638 } else { 639 // Use max stream duration to prevent us from waiting on a non-existent stream; 640 // when we cannot make out from the manifest what streams are included in a playlist 641 // we might assume extra streams. 642 for (size_t i = 0; i < mPacketSources.size(); ++i) { 643 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { 644 continue; 645 } 646 647 int64_t bufferedStreamDurationUs = 648 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); 649 ALOGV("buffered %" PRId64 " for stream %d", 650 bufferedStreamDurationUs, mPacketSources.keyAt(i)); 651 if (bufferedStreamDurationUs > bufferedDurationUs) { 652 bufferedDurationUs = bufferedStreamDurationUs; 653 } 654 } 655 } 656 downloadMore = (bufferedDurationUs < durationToBufferUs); 657 658 // signal start if buffered up at least the target size 659 if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) { 660 mPrepared = true; 661 662 ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "", 663 bufferedDurationUs, targetDurationUs); 664 sp<AMessage> msg = mNotify->dup(); 665 msg->setInt32("what", kWhatTemporarilyDoneFetching); 666 msg->post(); 667 } 668 669 if (finalResult == OK && downloadMore) { 670 ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "", 671 bufferedDurationUs, durationToBufferUs); 672 // delay the next download slightly; hopefully this gives other concurrent fetchers 673 // a better chance to run. 674 // onDownloadNext(); 675 sp<AMessage> msg = new AMessage(kWhatDownloadNext, id()); 676 msg->setInt32("generation", mMonitorQueueGeneration); 677 msg->post(1000l); 678 } else { 679 // Nothing to do yet, try again in a second. 680 681 sp<AMessage> msg = mNotify->dup(); 682 msg->setInt32("what", kWhatTemporarilyDoneFetching); 683 msg->post(); 684 685 int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; 686 ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "", 687 delayUs, bufferedDurationUs, durationToBufferUs); 688 // :TRICKY: need to enforce minimum delay because the delay to 689 // refresh the playlist will become 0 690 postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0); 691 } 692 } 693 694 status_t PlaylistFetcher::refreshPlaylist() { 695 if (delayUsToRefreshPlaylist() <= 0) { 696 bool unchanged; 697 sp<M3UParser> playlist = mSession->fetchPlaylist( 698 mURI.c_str(), mPlaylistHash, &unchanged); 699 700 if (playlist == NULL) { 701 if (unchanged) { 702 // We succeeded in fetching the playlist, but it was 703 // unchanged from the last time we tried. 704 705 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) { 706 mRefreshState = (RefreshState)(mRefreshState + 1); 707 } 708 } else { 709 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str()); 710 return ERROR_IO; 711 } 712 } else { 713 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 714 mPlaylist = playlist; 715 716 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 717 updateDuration(); 718 } 719 } 720 721 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 722 } 723 return OK; 724 } 725 726 // static 727 bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) { 728 return buffer->size() > 0 && buffer->data()[0] == 0x47; 729 } 730 731 void PlaylistFetcher::onDownloadNext() { 732 status_t err = refreshPlaylist(); 733 int32_t firstSeqNumberInPlaylist = 0; 734 int32_t lastSeqNumberInPlaylist = 0; 735 bool discontinuity = false; 736 737 if (mPlaylist != NULL) { 738 if (mPlaylist->meta() != NULL) { 739 mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist); 740 } 741 742 lastSeqNumberInPlaylist = 743 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 744 745 if (mDiscontinuitySeq < 0) { 746 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 747 } 748 } 749 750 if (mPlaylist != NULL && mSeqNumber < 0) { 751 CHECK_GE(mStartTimeUs, 0ll); 752 753 if (mSegmentStartTimeUs < 0) { 754 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { 755 // If this is a live session, start 3 segments from the end on connect 756 mSeqNumber = lastSeqNumberInPlaylist - 3; 757 if (mSeqNumber < firstSeqNumberInPlaylist) { 758 mSeqNumber = firstSeqNumberInPlaylist; 759 } 760 } else { 761 // When seeking mSegmentStartTimeUs is unavailable (< 0), we 762 // use mStartTimeUs (client supplied timestamp) to determine both start segment 763 // and relative position inside a segment 764 mSeqNumber = getSeqNumberForTime(mStartTimeUs); 765 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber); 766 } 767 mStartTimeUsRelative = true; 768 ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)", 769 mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, 770 lastSeqNumberInPlaylist); 771 } else { 772 // When adapting or track switching, mSegmentStartTimeUs (relative 773 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute 774 // timestamps coming from the media container) is used to determine the position 775 // inside a segments. 776 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); 777 if (mAdaptive) { 778 // avoid double fetch/decode 779 mSeqNumber += 1; 780 } 781 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq); 782 if (mSeqNumber < minSeq) { 783 mSeqNumber = minSeq; 784 } 785 786 if (mSeqNumber < firstSeqNumberInPlaylist) { 787 mSeqNumber = firstSeqNumberInPlaylist; 788 } 789 790 if (mSeqNumber > lastSeqNumberInPlaylist) { 791 mSeqNumber = lastSeqNumberInPlaylist; 792 } 793 ALOGV("Initial sequence number for live event %d from (%d .. %d)", 794 mSeqNumber, firstSeqNumberInPlaylist, 795 lastSeqNumberInPlaylist); 796 } 797 } 798 799 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true 800 if (mSeqNumber < firstSeqNumberInPlaylist 801 || mSeqNumber > lastSeqNumberInPlaylist 802 || err != OK) { 803 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) { 804 ++mNumRetries; 805 806 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) { 807 // make sure we reach this retry logic on refresh failures 808 // by adding an err != OK clause to all enclosing if's. 809 810 // refresh in increasing fraction (1/2, 1/3, ...) of the 811 // playlist's target duration or 3 seconds, whichever is less 812 int64_t delayUs = kMaxMonitorDelayUs; 813 if (mPlaylist != NULL && mPlaylist->meta() != NULL) { 814 int32_t targetDurationSecs; 815 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 816 delayUs = mPlaylist->size() * targetDurationSecs * 817 1000000ll / (1 + mNumRetries); 818 } 819 if (delayUs > kMaxMonitorDelayUs) { 820 delayUs = kMaxMonitorDelayUs; 821 } 822 ALOGV("sequence number high: %d from (%d .. %d), " 823 "monitor in %" PRId64 " (retry=%d)", 824 mSeqNumber, firstSeqNumberInPlaylist, 825 lastSeqNumberInPlaylist, delayUs, mNumRetries); 826 postMonitorQueue(delayUs); 827 return; 828 } 829 830 if (err != OK) { 831 notifyError(err); 832 return; 833 } 834 835 // we've missed the boat, let's start 3 segments prior to the latest sequence 836 // number available and signal a discontinuity. 837 838 ALOGI("We've missed the boat, restarting playback." 839 " mStartup=%d, was looking for %d in %d-%d", 840 mStartup, mSeqNumber, firstSeqNumberInPlaylist, 841 lastSeqNumberInPlaylist); 842 if (mStopParams != NULL) { 843 // we should have kept on fetching until we hit the boundaries in mStopParams, 844 // but since the segments we are supposed to fetch have already rolled off 845 // the playlist, i.e. we have already missed the boat, we inevitably have to 846 // skip. 847 for (size_t i = 0; i < mPacketSources.size(); i++) { 848 sp<ABuffer> formatChange = mSession->createFormatChangeBuffer(); 849 mPacketSources.valueAt(i)->queueAccessUnit(formatChange); 850 } 851 stopAsync(/* clear = */ false); 852 return; 853 } 854 mSeqNumber = lastSeqNumberInPlaylist - 3; 855 if (mSeqNumber < firstSeqNumberInPlaylist) { 856 mSeqNumber = firstSeqNumberInPlaylist; 857 } 858 discontinuity = true; 859 860 // fall through 861 } else { 862 ALOGE("Cannot find sequence number %d in playlist " 863 "(contains %d - %d)", 864 mSeqNumber, firstSeqNumberInPlaylist, 865 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); 866 867 notifyError(ERROR_END_OF_STREAM); 868 return; 869 } 870 } 871 872 mNumRetries = 0; 873 874 AString uri; 875 sp<AMessage> itemMeta; 876 CHECK(mPlaylist->itemAt( 877 mSeqNumber - firstSeqNumberInPlaylist, 878 &uri, 879 &itemMeta)); 880 881 int32_t val; 882 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 883 mDiscontinuitySeq++; 884 discontinuity = true; 885 } 886 887 int64_t range_offset, range_length; 888 if (!itemMeta->findInt64("range-offset", &range_offset) 889 || !itemMeta->findInt64("range-length", &range_length)) { 890 range_offset = 0; 891 range_length = -1; 892 } 893 894 ALOGV("fetching segment %d from (%d .. %d)", 895 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); 896 897 ALOGV("fetching '%s'", uri.c_str()); 898 899 sp<DataSource> source; 900 sp<ABuffer> buffer, tsBuffer; 901 // decrypt a junk buffer to prefetch key; since a session uses only one http connection, 902 // this avoids interleaved connections to the key and segment file. 903 { 904 sp<ABuffer> junk = new ABuffer(16); 905 junk->setRange(0, 16); 906 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk, 907 true /* first */); 908 if (err != OK) { 909 notifyError(err); 910 return; 911 } 912 } 913 914 // block-wise download 915 bool startup = mStartup; 916 ssize_t bytesRead; 917 do { 918 bytesRead = mSession->fetchFile( 919 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source); 920 921 if (bytesRead < 0) { 922 status_t err = bytesRead; 923 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 924 notifyError(err); 925 return; 926 } 927 928 CHECK(buffer != NULL); 929 930 size_t size = buffer->size(); 931 // Set decryption range. 932 buffer->setRange(size - bytesRead, bytesRead); 933 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer, 934 buffer->offset() == 0 /* first */); 935 // Unset decryption range. 936 buffer->setRange(0, size); 937 938 if (err != OK) { 939 ALOGE("decryptBuffer failed w/ error %d", err); 940 941 notifyError(err); 942 return; 943 } 944 945 if (startup || discontinuity) { 946 // Signal discontinuity. 947 948 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 949 // If this was a live event this made no sense since 950 // we don't have access to all the segment before the current 951 // one. 952 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); 953 } 954 955 if (discontinuity) { 956 ALOGI("queueing discontinuity (explicit=%d)", discontinuity); 957 958 queueDiscontinuity( 959 ATSParser::DISCONTINUITY_FORMATCHANGE, 960 NULL /* extra */); 961 962 discontinuity = false; 963 } 964 965 startup = false; 966 } 967 968 err = OK; 969 if (bufferStartsWithTsSyncByte(buffer)) { 970 // Incremental extraction is only supported for MPEG2 transport streams. 971 if (tsBuffer == NULL) { 972 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 973 tsBuffer->setRange(0, 0); 974 } else if (tsBuffer->capacity() != buffer->capacity()) { 975 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); 976 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 977 tsBuffer->setRange(tsOff, tsSize); 978 } 979 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); 980 981 err = extractAndQueueAccessUnitsFromTs(tsBuffer); 982 } 983 984 if (err == -EAGAIN) { 985 // starting sequence number too low/high 986 mTSParser.clear(); 987 for (size_t i = 0; i < mPacketSources.size(); i++) { 988 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 989 packetSource->clear(); 990 } 991 postMonitorQueue(); 992 return; 993 } else if (err == ERROR_OUT_OF_RANGE) { 994 // reached stopping point 995 stopAsync(/* clear = */ false); 996 return; 997 } else if (err != OK) { 998 notifyError(err); 999 return; 1000 } 1001 1002 } while (bytesRead != 0); 1003 1004 if (bufferStartsWithTsSyncByte(buffer)) { 1005 // If we don't see a stream in the program table after fetching a full ts segment 1006 // mark it as nonexistent. 1007 const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES; 1008 ATSParser::SourceType srcTypes[kNumTypes] = 1009 { ATSParser::VIDEO, ATSParser::AUDIO }; 1010 LiveSession::StreamType streamTypes[kNumTypes] = 1011 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO }; 1012 1013 for (size_t i = 0; i < kNumTypes; i++) { 1014 ATSParser::SourceType srcType = srcTypes[i]; 1015 LiveSession::StreamType streamType = streamTypes[i]; 1016 1017 sp<AnotherPacketSource> source = 1018 static_cast<AnotherPacketSource *>( 1019 mTSParser->getSource(srcType).get()); 1020 1021 if (!mTSParser->hasSource(srcType)) { 1022 ALOGW("MPEG2 Transport stream does not contain %s data.", 1023 srcType == ATSParser::VIDEO ? "video" : "audio"); 1024 1025 mStreamTypeMask &= ~streamType; 1026 mPacketSources.removeItem(streamType); 1027 } 1028 } 1029 1030 } 1031 1032 if (checkDecryptPadding(buffer) != OK) { 1033 ALOGE("Incorrect padding bytes after decryption."); 1034 notifyError(ERROR_MALFORMED); 1035 return; 1036 } 1037 1038 err = OK; 1039 if (tsBuffer != NULL) { 1040 AString method; 1041 CHECK(buffer->meta()->findString("cipher-method", &method)); 1042 if ((tsBuffer->size() > 0 && method == "NONE") 1043 || tsBuffer->size() > 16) { 1044 ALOGE("MPEG2 transport stream is not an even multiple of 188 " 1045 "bytes in length."); 1046 notifyError(ERROR_MALFORMED); 1047 return; 1048 } 1049 } 1050 1051 // bulk extract non-ts files 1052 if (tsBuffer == NULL) { 1053 err = extractAndQueueAccessUnits(buffer, itemMeta); 1054 if (err == -EAGAIN) { 1055 // starting sequence number too low/high 1056 postMonitorQueue(); 1057 return; 1058 } else if (err == ERROR_OUT_OF_RANGE) { 1059 // reached stopping point 1060 stopAsync(/* clear = */false); 1061 return; 1062 } 1063 } 1064 1065 if (err != OK) { 1066 notifyError(err); 1067 return; 1068 } 1069 1070 ++mSeqNumber; 1071 1072 postMonitorQueue(); 1073 } 1074 1075 int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const { 1076 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; 1077 if (mPlaylist->meta() == NULL 1078 || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { 1079 firstSeqNumberInPlaylist = 0; 1080 } 1081 lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1; 1082 1083 int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1; 1084 while (index >= 0 && anchorTimeUs > mStartTimeUs) { 1085 sp<AMessage> itemMeta; 1086 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); 1087 1088 int64_t itemDurationUs; 1089 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1090 1091 anchorTimeUs -= itemDurationUs; 1092 --index; 1093 } 1094 1095 int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1; 1096 if (newSeqNumber <= lastSeqNumberInPlaylist) { 1097 return newSeqNumber; 1098 } else { 1099 return lastSeqNumberInPlaylist; 1100 } 1101 } 1102 1103 int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const { 1104 int32_t firstSeqNumberInPlaylist; 1105 if (mPlaylist->meta() == NULL 1106 || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { 1107 firstSeqNumberInPlaylist = 0; 1108 } 1109 1110 size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 1111 if (discontinuitySeq < curDiscontinuitySeq) { 1112 return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1); 1113 } 1114 1115 size_t index = 0; 1116 while (index < mPlaylist->size()) { 1117 sp<AMessage> itemMeta; 1118 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); 1119 1120 int64_t discontinuity; 1121 if (itemMeta->findInt64("discontinuity", &discontinuity)) { 1122 curDiscontinuitySeq++; 1123 } 1124 1125 if (curDiscontinuitySeq == discontinuitySeq) { 1126 return firstSeqNumberInPlaylist + index; 1127 } 1128 1129 ++index; 1130 } 1131 1132 return firstSeqNumberInPlaylist + mPlaylist->size(); 1133 } 1134 1135 int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { 1136 int32_t firstSeqNumberInPlaylist; 1137 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 1138 "media-sequence", &firstSeqNumberInPlaylist)) { 1139 firstSeqNumberInPlaylist = 0; 1140 } 1141 1142 size_t index = 0; 1143 int64_t segmentStartUs = 0; 1144 while (index < mPlaylist->size()) { 1145 sp<AMessage> itemMeta; 1146 CHECK(mPlaylist->itemAt( 1147 index, NULL /* uri */, &itemMeta)); 1148 1149 int64_t itemDurationUs; 1150 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1151 1152 if (timeUs < segmentStartUs + itemDurationUs) { 1153 break; 1154 } 1155 1156 segmentStartUs += itemDurationUs; 1157 ++index; 1158 } 1159 1160 if (index >= mPlaylist->size()) { 1161 index = mPlaylist->size() - 1; 1162 } 1163 1164 return firstSeqNumberInPlaylist + index; 1165 } 1166 1167 const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties( 1168 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) { 1169 sp<MetaData> format = source->getFormat(); 1170 if (format != NULL) { 1171 // for simplicity, store a reference to the format in each unit 1172 accessUnit->meta()->setObject("format", format); 1173 } 1174 1175 if (discard) { 1176 accessUnit->meta()->setInt32("discard", discard); 1177 } 1178 1179 int32_t targetDurationSecs; 1180 if (mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)) { 1181 accessUnit->meta()->setInt32("targetDuration", targetDurationSecs); 1182 } 1183 1184 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1185 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1186 return accessUnit; 1187 } 1188 1189 status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) { 1190 if (mTSParser == NULL) { 1191 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. 1192 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); 1193 } 1194 1195 if (mNextPTSTimeUs >= 0ll) { 1196 sp<AMessage> extra = new AMessage; 1197 // Since we are using absolute timestamps, signal an offset of 0 to prevent 1198 // ATSParser from skewing the timestamps of access units. 1199 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); 1200 1201 mTSParser->signalDiscontinuity( 1202 ATSParser::DISCONTINUITY_TIME, extra); 1203 1204 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1205 mNextPTSTimeUs = -1ll; 1206 mFirstPTSValid = false; 1207 } 1208 1209 size_t offset = 0; 1210 while (offset + 188 <= buffer->size()) { 1211 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); 1212 1213 if (err != OK) { 1214 return err; 1215 } 1216 1217 offset += 188; 1218 } 1219 // setRange to indicate consumed bytes. 1220 buffer->setRange(buffer->offset() + offset, buffer->size() - offset); 1221 1222 status_t err = OK; 1223 for (size_t i = mPacketSources.size(); i-- > 0;) { 1224 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1225 1226 const char *key; 1227 ATSParser::SourceType type; 1228 const LiveSession::StreamType stream = mPacketSources.keyAt(i); 1229 switch (stream) { 1230 case LiveSession::STREAMTYPE_VIDEO: 1231 type = ATSParser::VIDEO; 1232 key = "timeUsVideo"; 1233 break; 1234 1235 case LiveSession::STREAMTYPE_AUDIO: 1236 type = ATSParser::AUDIO; 1237 key = "timeUsAudio"; 1238 break; 1239 1240 case LiveSession::STREAMTYPE_SUBTITLES: 1241 { 1242 ALOGE("MPEG2 Transport streams do not contain subtitles."); 1243 return ERROR_MALFORMED; 1244 break; 1245 } 1246 1247 default: 1248 TRESPASS(); 1249 } 1250 1251 sp<AnotherPacketSource> source = 1252 static_cast<AnotherPacketSource *>( 1253 mTSParser->getSource(type).get()); 1254 1255 if (source == NULL) { 1256 continue; 1257 } 1258 1259 int64_t timeUs; 1260 sp<ABuffer> accessUnit; 1261 status_t finalResult; 1262 while (source->hasBufferAvailable(&finalResult) 1263 && source->dequeueAccessUnit(&accessUnit) == OK) { 1264 1265 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1266 1267 if (mStartup) { 1268 if (!mFirstPTSValid) { 1269 mFirstTimeUs = timeUs; 1270 mFirstPTSValid = true; 1271 } 1272 if (mStartTimeUsRelative) { 1273 timeUs -= mFirstTimeUs; 1274 if (timeUs < 0) { 1275 timeUs = 0; 1276 } 1277 } 1278 1279 if (timeUs < mStartTimeUs) { 1280 // buffer up to the closest preceding IDR frame 1281 ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us", 1282 timeUs, mStartTimeUs); 1283 const char *mime; 1284 sp<MetaData> format = source->getFormat(); 1285 bool isAvc = false; 1286 if (format != NULL && format->findCString(kKeyMIMEType, &mime) 1287 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) { 1288 isAvc = true; 1289 } 1290 if (isAvc && IsIDR(accessUnit)) { 1291 mVideoBuffer->clear(); 1292 } 1293 if (isAvc) { 1294 mVideoBuffer->queueAccessUnit(accessUnit); 1295 } 1296 1297 continue; 1298 } 1299 } 1300 1301 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1302 if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) { 1303 int32_t firstSeqNumberInPlaylist; 1304 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 1305 "media-sequence", &firstSeqNumberInPlaylist)) { 1306 firstSeqNumberInPlaylist = 0; 1307 } 1308 1309 int32_t targetDurationSecs; 1310 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 1311 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 1312 // mStartup 1313 // mStartup is true until we have queued a packet for all the streams 1314 // we are fetching. We queue packets whose timestamps are greater than 1315 // mStartTimeUs. 1316 // mSegmentStartTimeUs >= 0 1317 // mSegmentStartTimeUs is non-negative when adapting or switching tracks 1318 // mSeqNumber > firstSeqNumberInPlaylist 1319 // don't decrement mSeqNumber if it already points to the 1st segment 1320 // timeUs - mStartTimeUs > targetDurationUs: 1321 // This and the 2 above conditions should only happen when adapting in a live 1322 // stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher 1323 // would start fetching after timeUs, which should be greater than mStartTimeUs; 1324 // the old fetcher would then continue fetching data until timeUs. We don't want 1325 // timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to 1326 // stop as early as possible. The definition of being "too far ahead" is 1327 // arbitrary; here we use targetDurationUs as threshold. 1328 if (mStartup && mSegmentStartTimeUs >= 0 1329 && mSeqNumber > firstSeqNumberInPlaylist 1330 && timeUs - mStartTimeUs > targetDurationUs) { 1331 // we just guessed a starting timestamp that is too high when adapting in a 1332 // live stream; re-adjust based on the actual timestamp extracted from the 1333 // media segment; if we didn't move backward after the re-adjustment 1334 // (newSeqNumber), start at least 1 segment prior. 1335 int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); 1336 if (newSeqNumber >= mSeqNumber) { 1337 --mSeqNumber; 1338 } else { 1339 mSeqNumber = newSeqNumber; 1340 } 1341 mStartTimeUsNotify = mNotify->dup(); 1342 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 1343 return -EAGAIN; 1344 } 1345 1346 int32_t seq; 1347 if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) { 1348 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); 1349 } 1350 int64_t startTimeUs; 1351 if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) { 1352 mStartTimeUsNotify->setInt64(key, timeUs); 1353 1354 uint32_t streamMask = 0; 1355 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); 1356 streamMask |= mPacketSources.keyAt(i); 1357 mStartTimeUsNotify->setInt32("streamMask", streamMask); 1358 1359 if (streamMask == mStreamTypeMask) { 1360 mStartup = false; 1361 mStartTimeUsNotify->post(); 1362 mStartTimeUsNotify.clear(); 1363 } 1364 } 1365 } 1366 1367 if (mStopParams != NULL) { 1368 // Queue discontinuity in original stream. 1369 int32_t discontinuitySeq; 1370 int64_t stopTimeUs; 1371 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1372 || discontinuitySeq > mDiscontinuitySeq 1373 || !mStopParams->findInt64(key, &stopTimeUs) 1374 || (discontinuitySeq == mDiscontinuitySeq 1375 && timeUs >= stopTimeUs)) { 1376 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1377 mStreamTypeMask &= ~stream; 1378 mPacketSources.removeItemsAt(i); 1379 break; 1380 } 1381 } 1382 1383 // Note that we do NOT dequeue any discontinuities except for format change. 1384 if (stream == LiveSession::STREAMTYPE_VIDEO) { 1385 const bool discard = true; 1386 status_t status; 1387 while (mVideoBuffer->hasBufferAvailable(&status)) { 1388 sp<ABuffer> videoBuffer; 1389 mVideoBuffer->dequeueAccessUnit(&videoBuffer); 1390 setAccessUnitProperties(videoBuffer, source, discard); 1391 packetSource->queueAccessUnit(videoBuffer); 1392 } 1393 } 1394 1395 setAccessUnitProperties(accessUnit, source); 1396 packetSource->queueAccessUnit(accessUnit); 1397 } 1398 1399 if (err != OK) { 1400 break; 1401 } 1402 } 1403 1404 if (err != OK) { 1405 for (size_t i = mPacketSources.size(); i-- > 0;) { 1406 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1407 packetSource->clear(); 1408 } 1409 return err; 1410 } 1411 1412 if (!mStreamTypeMask) { 1413 // Signal gap is filled between original and new stream. 1414 ALOGV("ERROR OUT OF RANGE"); 1415 return ERROR_OUT_OF_RANGE; 1416 } 1417 1418 return OK; 1419 } 1420 1421 /* static */ 1422 bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence( 1423 const sp<ABuffer> &buffer) { 1424 size_t pos = 0; 1425 1426 // skip possible BOM 1427 if (buffer->size() >= pos + 3 && 1428 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) { 1429 pos += 3; 1430 } 1431 1432 // accept WEBVTT followed by SPACE, TAB or (CR) LF 1433 if (buffer->size() < pos + 6 || 1434 memcmp("WEBVTT", buffer->data() + pos, 6)) { 1435 return false; 1436 } 1437 pos += 6; 1438 1439 if (buffer->size() == pos) { 1440 return true; 1441 } 1442 1443 uint8_t sep = buffer->data()[pos]; 1444 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r'; 1445 } 1446 1447 status_t PlaylistFetcher::extractAndQueueAccessUnits( 1448 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { 1449 if (bufferStartsWithWebVTTMagicSequence(buffer)) { 1450 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { 1451 ALOGE("This stream only contains subtitles."); 1452 return ERROR_MALFORMED; 1453 } 1454 1455 const sp<AnotherPacketSource> packetSource = 1456 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 1457 1458 int64_t durationUs; 1459 CHECK(itemMeta->findInt64("durationUs", &durationUs)); 1460 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); 1461 buffer->meta()->setInt64("durationUs", durationUs); 1462 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1463 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1464 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration); 1465 1466 packetSource->queueAccessUnit(buffer); 1467 return OK; 1468 } 1469 1470 if (mNextPTSTimeUs >= 0ll) { 1471 mFirstPTSValid = false; 1472 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1473 mNextPTSTimeUs = -1ll; 1474 } 1475 1476 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio 1477 // stream prefixed by an ID3 tag. 1478 1479 bool firstID3Tag = true; 1480 uint64_t PTS = 0; 1481 1482 for (;;) { 1483 // Make sure to skip all ID3 tags preceding the audio data. 1484 // At least one must be present to provide the PTS timestamp. 1485 1486 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */); 1487 if (!id3.isValid()) { 1488 if (firstID3Tag) { 1489 ALOGE("Unable to parse ID3 tag."); 1490 return ERROR_MALFORMED; 1491 } else { 1492 break; 1493 } 1494 } 1495 1496 if (firstID3Tag) { 1497 bool found = false; 1498 1499 ID3::Iterator it(id3, "PRIV"); 1500 while (!it.done()) { 1501 size_t length; 1502 const uint8_t *data = it.getData(&length); 1503 1504 static const char *kMatchName = 1505 "com.apple.streaming.transportStreamTimestamp"; 1506 static const size_t kMatchNameLen = strlen(kMatchName); 1507 1508 if (length == kMatchNameLen + 1 + 8 1509 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) { 1510 found = true; 1511 PTS = U64_AT(&data[kMatchNameLen + 1]); 1512 } 1513 1514 it.next(); 1515 } 1516 1517 if (!found) { 1518 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag."); 1519 return ERROR_MALFORMED; 1520 } 1521 } 1522 1523 // skip the ID3 tag 1524 buffer->setRange( 1525 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize()); 1526 1527 firstID3Tag = false; 1528 } 1529 1530 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { 1531 ALOGW("This stream only contains audio data!"); 1532 1533 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO; 1534 1535 if (mStreamTypeMask == 0) { 1536 return OK; 1537 } 1538 } 1539 1540 sp<AnotherPacketSource> packetSource = 1541 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO); 1542 1543 if (packetSource->getFormat() == NULL && buffer->size() >= 7) { 1544 ABitReader bits(buffer->data(), buffer->size()); 1545 1546 // adts_fixed_header 1547 1548 CHECK_EQ(bits.getBits(12), 0xfffu); 1549 bits.skipBits(3); // ID, layer 1550 bool protection_absent = bits.getBits(1) != 0; 1551 1552 unsigned profile = bits.getBits(2); 1553 CHECK_NE(profile, 3u); 1554 unsigned sampling_freq_index = bits.getBits(4); 1555 bits.getBits(1); // private_bit 1556 unsigned channel_configuration = bits.getBits(3); 1557 CHECK_NE(channel_configuration, 0u); 1558 bits.skipBits(2); // original_copy, home 1559 1560 sp<MetaData> meta = MakeAACCodecSpecificData( 1561 profile, sampling_freq_index, channel_configuration); 1562 1563 meta->setInt32(kKeyIsADTS, true); 1564 1565 packetSource->setFormat(meta); 1566 } 1567 1568 int64_t numSamples = 0ll; 1569 int32_t sampleRate; 1570 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); 1571 1572 int64_t timeUs = (PTS * 100ll) / 9ll; 1573 if (!mFirstPTSValid) { 1574 mFirstPTSValid = true; 1575 mFirstTimeUs = timeUs; 1576 } 1577 1578 size_t offset = 0; 1579 while (offset < buffer->size()) { 1580 const uint8_t *adtsHeader = buffer->data() + offset; 1581 CHECK_LT(offset + 5, buffer->size()); 1582 1583 unsigned aac_frame_length = 1584 ((adtsHeader[3] & 3) << 11) 1585 | (adtsHeader[4] << 3) 1586 | (adtsHeader[5] >> 5); 1587 1588 if (aac_frame_length == 0) { 1589 const uint8_t *id3Header = adtsHeader; 1590 if (!memcmp(id3Header, "ID3", 3)) { 1591 ID3 id3(id3Header, buffer->size() - offset, true); 1592 if (id3.isValid()) { 1593 offset += id3.rawSize(); 1594 continue; 1595 }; 1596 } 1597 return ERROR_MALFORMED; 1598 } 1599 1600 CHECK_LE(offset + aac_frame_length, buffer->size()); 1601 1602 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; 1603 offset += aac_frame_length; 1604 1605 // Each AAC frame encodes 1024 samples. 1606 numSamples += 1024; 1607 1608 if (mStartup) { 1609 int64_t startTimeUs = unitTimeUs; 1610 if (mStartTimeUsRelative) { 1611 startTimeUs -= mFirstTimeUs; 1612 if (startTimeUs < 0) { 1613 startTimeUs = 0; 1614 } 1615 } 1616 if (startTimeUs < mStartTimeUs) { 1617 continue; 1618 } 1619 1620 if (mStartTimeUsNotify != NULL) { 1621 int32_t targetDurationSecs; 1622 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 1623 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 1624 1625 // Duplicated logic from how we handle .ts playlists. 1626 if (mStartup && mSegmentStartTimeUs >= 0 1627 && timeUs - mStartTimeUs > targetDurationUs) { 1628 int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); 1629 if (newSeqNumber >= mSeqNumber) { 1630 --mSeqNumber; 1631 } else { 1632 mSeqNumber = newSeqNumber; 1633 } 1634 return -EAGAIN; 1635 } 1636 1637 mStartTimeUsNotify->setInt64("timeUsAudio", timeUs); 1638 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); 1639 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO); 1640 mStartTimeUsNotify->post(); 1641 mStartTimeUsNotify.clear(); 1642 mStartup = false; 1643 } 1644 } 1645 1646 if (mStopParams != NULL) { 1647 // Queue discontinuity in original stream. 1648 int32_t discontinuitySeq; 1649 int64_t stopTimeUs; 1650 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1651 || discontinuitySeq > mDiscontinuitySeq 1652 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs) 1653 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) { 1654 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1655 mStreamTypeMask = 0; 1656 mPacketSources.clear(); 1657 return ERROR_OUT_OF_RANGE; 1658 } 1659 } 1660 1661 sp<ABuffer> unit = new ABuffer(aac_frame_length); 1662 memcpy(unit->data(), adtsHeader, aac_frame_length); 1663 1664 unit->meta()->setInt64("timeUs", unitTimeUs); 1665 setAccessUnitProperties(unit, packetSource); 1666 packetSource->queueAccessUnit(unit); 1667 } 1668 1669 return OK; 1670 } 1671 1672 void PlaylistFetcher::updateDuration() { 1673 int64_t durationUs = 0ll; 1674 for (size_t index = 0; index < mPlaylist->size(); ++index) { 1675 sp<AMessage> itemMeta; 1676 CHECK(mPlaylist->itemAt( 1677 index, NULL /* uri */, &itemMeta)); 1678 1679 int64_t itemDurationUs; 1680 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1681 1682 durationUs += itemDurationUs; 1683 } 1684 1685 sp<AMessage> msg = mNotify->dup(); 1686 msg->setInt32("what", kWhatDurationUpdate); 1687 msg->setInt64("durationUs", durationUs); 1688 msg->post(); 1689 } 1690 1691 int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) { 1692 int64_t durationUs, threshold; 1693 if (msg->findInt64("durationUs", &durationUs) && durationUs > 0) { 1694 return kNumSkipFrames * durationUs; 1695 } 1696 1697 sp<RefBase> obj; 1698 msg->findObject("format", &obj); 1699 MetaData *format = static_cast<MetaData *>(obj.get()); 1700 1701 const char *mime; 1702 CHECK(format->findCString(kKeyMIMEType, &mime)); 1703 bool audio = !strncasecmp(mime, "audio/", 6); 1704 if (audio) { 1705 // Assumes 1000 samples per frame. 1706 int32_t sampleRate; 1707 CHECK(format->findInt32(kKeySampleRate, &sampleRate)); 1708 return kNumSkipFrames /* frames */ * 1000 /* samples */ 1709 * (1000000 / sampleRate) /* sample duration (us) */; 1710 } else { 1711 int32_t frameRate; 1712 if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) { 1713 return kNumSkipFrames * (1000000 / frameRate); 1714 } 1715 } 1716 1717 return 500000ll; 1718 } 1719 1720 } // namespace android 1721