1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MY_HANDLER_H_ 18 19 #define MY_HANDLER_H_ 20 21 //#define LOG_NDEBUG 0 22 23 #ifndef LOG_TAG 24 #define LOG_TAG "MyHandler" 25 #endif 26 27 #include <utils/Log.h> 28 29 #include "APacketSource.h" 30 #include "ARTPConnection.h" 31 #include "ARTSPConnection.h" 32 #include "ASessionDescription.h" 33 34 #include <ctype.h> 35 36 #include <media/stagefright/foundation/ABuffer.h> 37 #include <media/stagefright/foundation/ADebug.h> 38 #include <media/stagefright/foundation/ALooper.h> 39 #include <media/stagefright/foundation/AMessage.h> 40 #include <media/stagefright/MediaErrors.h> 41 #include <media/stagefright/Utils.h> 42 43 #include <arpa/inet.h> 44 #include <sys/socket.h> 45 #include <netdb.h> 46 47 #include "HTTPBase.h" 48 49 #if LOG_NDEBUG 50 #define UNUSED_UNLESS_VERBOSE(x) (void)(x) 51 #else 52 #define UNUSED_UNLESS_VERBOSE(x) 53 #endif 54 55 // If no access units are received within 5 secs, assume that the rtp 56 // stream has ended and signal end of stream. 57 static int64_t kAccessUnitTimeoutUs = 10000000ll; 58 59 // If no access units arrive for the first 10 secs after starting the 60 // stream, assume none ever will and signal EOS or switch transports. 61 static int64_t kStartupTimeoutUs = 10000000ll; 62 63 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 64 65 static int64_t kPauseDelayUs = 3000000ll; 66 67 // The allowed maximum number of stale access units at the beginning of 68 // a new sequence. 69 static int32_t kMaxAllowedStaleAccessUnits = 20; 70 71 namespace android { 72 73 static bool GetAttribute(const char *s, const char *key, AString *value) { 74 value->clear(); 75 76 size_t keyLen = strlen(key); 77 78 for (;;) { 79 while (isspace(*s)) { 80 ++s; 81 } 82 83 const char *colonPos = strchr(s, ';'); 84 85 size_t len = 86 (colonPos == NULL) ? strlen(s) : colonPos - s; 87 88 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 89 value->setTo(&s[keyLen + 1], len - keyLen - 1); 90 return true; 91 } 92 93 if (colonPos == NULL) { 94 return false; 95 } 96 97 s = colonPos + 1; 98 } 99 } 100 101 struct MyHandler : public AHandler { 102 enum { 103 kWhatConnected = 'conn', 104 kWhatDisconnected = 'disc', 105 kWhatSeekPaused = 'spau', 106 kWhatSeekDone = 'sdon', 107 108 kWhatAccessUnit = 'accU', 109 kWhatEOS = 'eos!', 110 kWhatSeekDiscontinuity = 'seeD', 111 kWhatNormalPlayTimeMapping = 'nptM', 112 }; 113 114 MyHandler( 115 const char *url, 116 const sp<AMessage> ¬ify, 117 bool uidValid = false, uid_t uid = 0) 118 : mNotify(notify), 119 mUIDValid(uidValid), 120 mUID(uid), 121 mNetLooper(new ALooper), 122 mConn(new ARTSPConnection(mUIDValid, mUID)), 123 mRTPConn(new ARTPConnection), 124 mOriginalSessionURL(url), 125 mSessionURL(url), 126 mSetupTracksSuccessful(false), 127 mSeekPending(false), 128 mFirstAccessUnit(true), 129 mAllTracksHaveTime(false), 130 mNTPAnchorUs(-1), 131 mMediaAnchorUs(-1), 132 mLastMediaTimeUs(0), 133 mNumAccessUnitsReceived(0), 134 mCheckPending(false), 135 mCheckGeneration(0), 136 mCheckTimeoutGeneration(0), 137 mTryTCPInterleaving(false), 138 mTryFakeRTCP(false), 139 mReceivedFirstRTCPPacket(false), 140 mReceivedFirstRTPPacket(false), 141 mSeekable(true), 142 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 143 mKeepAliveGeneration(0), 144 mPausing(false), 145 mPauseGeneration(0), 146 mPlayResponseParsed(false) { 147 mNetLooper->setName("rtsp net"); 148 mNetLooper->start(false /* runOnCallingThread */, 149 false /* canCallJava */, 150 PRIORITY_HIGHEST); 151 152 // Strip any authentication info from the session url, we don't 153 // want to transmit user/pass in cleartext. 154 AString host, path, user, pass; 155 unsigned port; 156 CHECK(ARTSPConnection::ParseURL( 157 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 158 159 if (user.size() > 0) { 160 mSessionURL.clear(); 161 mSessionURL.append("rtsp://"); 162 mSessionURL.append(host); 163 mSessionURL.append(":"); 164 mSessionURL.append(AStringPrintf("%u", port)); 165 mSessionURL.append(path); 166 167 ALOGV("rewritten session url: '%s'", mSessionURL.c_str()); 168 } 169 170 mSessionHost = host; 171 } 172 173 void connect() { 174 looper()->registerHandler(mConn); 175 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 176 177 sp<AMessage> notify = new AMessage('biny', this); 178 mConn->observeBinaryData(notify); 179 180 sp<AMessage> reply = new AMessage('conn', this); 181 mConn->connect(mOriginalSessionURL.c_str(), reply); 182 } 183 184 void loadSDP(const sp<ASessionDescription>& desc) { 185 looper()->registerHandler(mConn); 186 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 187 188 sp<AMessage> notify = new AMessage('biny', this); 189 mConn->observeBinaryData(notify); 190 191 sp<AMessage> reply = new AMessage('sdpl', this); 192 reply->setObject("description", desc); 193 mConn->connect(mOriginalSessionURL.c_str(), reply); 194 } 195 196 AString getControlURL() { 197 AString sessionLevelControlURL; 198 if (mSessionDesc->findAttribute( 199 0, 200 "a=control", 201 &sessionLevelControlURL)) { 202 if (sessionLevelControlURL.compare("*") == 0) { 203 return mBaseURL; 204 } else { 205 AString controlURL; 206 CHECK(MakeURL( 207 mBaseURL.c_str(), 208 sessionLevelControlURL.c_str(), 209 &controlURL)); 210 return controlURL; 211 } 212 } else { 213 return mSessionURL; 214 } 215 } 216 217 void disconnect() { 218 (new AMessage('abor', this))->post(); 219 } 220 221 void seek(int64_t timeUs) { 222 sp<AMessage> msg = new AMessage('seek', this); 223 msg->setInt64("time", timeUs); 224 mPauseGeneration++; 225 msg->post(); 226 } 227 228 void continueSeekAfterPause(int64_t timeUs) { 229 sp<AMessage> msg = new AMessage('see1', this); 230 msg->setInt64("time", timeUs); 231 msg->post(); 232 } 233 234 bool isSeekable() const { 235 return mSeekable; 236 } 237 238 void pause() { 239 sp<AMessage> msg = new AMessage('paus', this); 240 mPauseGeneration++; 241 msg->setInt32("pausecheck", mPauseGeneration); 242 msg->post(); 243 } 244 245 void resume() { 246 sp<AMessage> msg = new AMessage('resu', this); 247 mPauseGeneration++; 248 msg->post(); 249 } 250 251 static void addRR(const sp<ABuffer> &buf) { 252 uint8_t *ptr = buf->data() + buf->size(); 253 ptr[0] = 0x80 | 0; 254 ptr[1] = 201; // RR 255 ptr[2] = 0; 256 ptr[3] = 1; 257 ptr[4] = 0xde; // SSRC 258 ptr[5] = 0xad; 259 ptr[6] = 0xbe; 260 ptr[7] = 0xef; 261 262 buf->setRange(0, buf->size() + 8); 263 } 264 265 static void addSDES(int s, const sp<ABuffer> &buffer) { 266 struct sockaddr_in addr; 267 socklen_t addrSize = sizeof(addr); 268 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) { 269 inet_aton("0.0.0.0", &(addr.sin_addr)); 270 } 271 272 uint8_t *data = buffer->data() + buffer->size(); 273 data[0] = 0x80 | 1; 274 data[1] = 202; // SDES 275 data[4] = 0xde; // SSRC 276 data[5] = 0xad; 277 data[6] = 0xbe; 278 data[7] = 0xef; 279 280 size_t offset = 8; 281 282 data[offset++] = 1; // CNAME 283 284 AString cname = "stagefright@"; 285 cname.append(inet_ntoa(addr.sin_addr)); 286 data[offset++] = cname.size(); 287 288 memcpy(&data[offset], cname.c_str(), cname.size()); 289 offset += cname.size(); 290 291 data[offset++] = 6; // TOOL 292 293 AString tool = MakeUserAgent(); 294 295 data[offset++] = tool.size(); 296 297 memcpy(&data[offset], tool.c_str(), tool.size()); 298 offset += tool.size(); 299 300 data[offset++] = 0; 301 302 if ((offset % 4) > 0) { 303 size_t count = 4 - (offset % 4); 304 switch (count) { 305 case 3: 306 data[offset++] = 0; 307 case 2: 308 data[offset++] = 0; 309 case 1: 310 data[offset++] = 0; 311 } 312 } 313 314 size_t numWords = (offset / 4) - 1; 315 data[2] = numWords >> 8; 316 data[3] = numWords & 0xff; 317 318 buffer->setRange(buffer->offset(), buffer->size() + offset); 319 } 320 321 // In case we're behind NAT, fire off two UDP packets to the remote 322 // rtp/rtcp ports to poke a hole into the firewall for future incoming 323 // packets. We're going to send an RR/SDES RTCP packet to both of them. 324 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 325 struct sockaddr_in addr; 326 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 327 addr.sin_family = AF_INET; 328 329 AString source; 330 AString server_port; 331 if (!GetAttribute(transport.c_str(), 332 "source", 333 &source)) { 334 ALOGW("Missing 'source' field in Transport response. Using " 335 "RTSP endpoint address."); 336 337 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 338 if (ent == NULL) { 339 ALOGE("Failed to look up address of session host '%s'", 340 mSessionHost.c_str()); 341 342 return false; 343 } 344 345 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 346 } else { 347 addr.sin_addr.s_addr = inet_addr(source.c_str()); 348 } 349 350 if (!GetAttribute(transport.c_str(), 351 "server_port", 352 &server_port)) { 353 ALOGI("Missing 'server_port' field in Transport response."); 354 return false; 355 } 356 357 int rtpPort, rtcpPort; 358 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 359 || rtpPort <= 0 || rtpPort > 65535 360 || rtcpPort <=0 || rtcpPort > 65535 361 || rtcpPort != rtpPort + 1) { 362 ALOGE("Server picked invalid RTP/RTCP port pair %s," 363 " RTP port must be even, RTCP port must be one higher.", 364 server_port.c_str()); 365 366 return false; 367 } 368 369 if (rtpPort & 1) { 370 ALOGW("Server picked an odd RTP port, it should've picked an " 371 "even one, we'll let it pass for now, but this may break " 372 "in the future."); 373 } 374 375 if (addr.sin_addr.s_addr == INADDR_NONE) { 376 return true; 377 } 378 379 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 380 // No firewalls to traverse on the loopback interface. 381 return true; 382 } 383 384 // Make up an RR/SDES RTCP packet. 385 sp<ABuffer> buf = new ABuffer(65536); 386 buf->setRange(0, 0); 387 addRR(buf); 388 addSDES(rtpSocket, buf); 389 390 addr.sin_port = htons(rtpPort); 391 392 ssize_t n = sendto( 393 rtpSocket, buf->data(), buf->size(), 0, 394 (const sockaddr *)&addr, sizeof(addr)); 395 396 if (n < (ssize_t)buf->size()) { 397 ALOGE("failed to poke a hole for RTP packets"); 398 return false; 399 } 400 401 addr.sin_port = htons(rtcpPort); 402 403 n = sendto( 404 rtcpSocket, buf->data(), buf->size(), 0, 405 (const sockaddr *)&addr, sizeof(addr)); 406 407 if (n < (ssize_t)buf->size()) { 408 ALOGE("failed to poke a hole for RTCP packets"); 409 return false; 410 } 411 412 ALOGV("successfully poked holes."); 413 414 return true; 415 } 416 417 static bool isLiveStream(const sp<ASessionDescription> &desc) { 418 AString attrLiveStream; 419 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 420 ssize_t semicolonPos = attrLiveStream.find(";", 2); 421 422 const char* liveStreamValue; 423 if (semicolonPos < 0) { 424 liveStreamValue = attrLiveStream.c_str(); 425 } else { 426 AString valString; 427 valString.setTo(attrLiveStream, 428 semicolonPos + 1, 429 attrLiveStream.size() - semicolonPos - 1); 430 liveStreamValue = valString.c_str(); 431 } 432 433 uint32_t value = strtoul(liveStreamValue, NULL, 10); 434 if (value == 1) { 435 ALOGV("found live stream"); 436 return true; 437 } 438 } else { 439 // It is a live stream if no duration is returned 440 int64_t durationUs; 441 if (!desc->getDurationUs(&durationUs)) { 442 ALOGV("No duration found, assume live stream"); 443 return true; 444 } 445 } 446 447 return false; 448 } 449 450 virtual void onMessageReceived(const sp<AMessage> &msg) { 451 switch (msg->what()) { 452 case 'conn': 453 { 454 int32_t result; 455 CHECK(msg->findInt32("result", &result)); 456 457 ALOGI("connection request completed with result %d (%s)", 458 result, strerror(-result)); 459 460 if (result == OK) { 461 AString request; 462 request = "DESCRIBE "; 463 request.append(mSessionURL); 464 request.append(" RTSP/1.0\r\n"); 465 request.append("Accept: application/sdp\r\n"); 466 request.append("\r\n"); 467 468 sp<AMessage> reply = new AMessage('desc', this); 469 mConn->sendRequest(request.c_str(), reply); 470 } else { 471 (new AMessage('disc', this))->post(); 472 } 473 break; 474 } 475 476 case 'disc': 477 { 478 ++mKeepAliveGeneration; 479 480 int32_t reconnect; 481 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 482 sp<AMessage> reply = new AMessage('conn', this); 483 mConn->connect(mOriginalSessionURL.c_str(), reply); 484 } else { 485 (new AMessage('quit', this))->post(); 486 } 487 break; 488 } 489 490 case 'desc': 491 { 492 int32_t result; 493 CHECK(msg->findInt32("result", &result)); 494 495 ALOGI("DESCRIBE completed with result %d (%s)", 496 result, strerror(-result)); 497 498 if (result == OK) { 499 sp<RefBase> obj; 500 CHECK(msg->findObject("response", &obj)); 501 sp<ARTSPResponse> response = 502 static_cast<ARTSPResponse *>(obj.get()); 503 504 if (response->mStatusCode == 301 || response->mStatusCode == 302) { 505 ssize_t i = response->mHeaders.indexOfKey("location"); 506 CHECK_GE(i, 0); 507 508 mOriginalSessionURL = response->mHeaders.valueAt(i); 509 mSessionURL = mOriginalSessionURL; 510 511 // Strip any authentication info from the session url, we don't 512 // want to transmit user/pass in cleartext. 513 AString host, path, user, pass; 514 unsigned port; 515 if (ARTSPConnection::ParseURL( 516 mSessionURL.c_str(), &host, &port, &path, &user, &pass) 517 && user.size() > 0) { 518 mSessionURL.clear(); 519 mSessionURL.append("rtsp://"); 520 mSessionURL.append(host); 521 mSessionURL.append(":"); 522 mSessionURL.append(AStringPrintf("%u", port)); 523 mSessionURL.append(path); 524 525 ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); 526 } 527 528 sp<AMessage> reply = new AMessage('conn', this); 529 mConn->connect(mOriginalSessionURL.c_str(), reply); 530 break; 531 } 532 533 if (response->mStatusCode != 200) { 534 result = UNKNOWN_ERROR; 535 } else if (response->mContent == NULL) { 536 result = ERROR_MALFORMED; 537 ALOGE("The response has no content."); 538 } else { 539 mSessionDesc = new ASessionDescription; 540 541 mSessionDesc->setTo( 542 response->mContent->data(), 543 response->mContent->size()); 544 545 if (!mSessionDesc->isValid()) { 546 ALOGE("Failed to parse session description."); 547 result = ERROR_MALFORMED; 548 } else { 549 ssize_t i = response->mHeaders.indexOfKey("content-base"); 550 if (i >= 0) { 551 mBaseURL = response->mHeaders.valueAt(i); 552 } else { 553 i = response->mHeaders.indexOfKey("content-location"); 554 if (i >= 0) { 555 mBaseURL = response->mHeaders.valueAt(i); 556 } else { 557 mBaseURL = mSessionURL; 558 } 559 } 560 561 mSeekable = !isLiveStream(mSessionDesc); 562 563 if (!mBaseURL.startsWith("rtsp://")) { 564 // Some misbehaving servers specify a relative 565 // URL in one of the locations above, combine 566 // it with the absolute session URL to get 567 // something usable... 568 569 ALOGW("Server specified a non-absolute base URL" 570 ", combining it with the session URL to " 571 "get something usable..."); 572 573 AString tmp; 574 CHECK(MakeURL( 575 mSessionURL.c_str(), 576 mBaseURL.c_str(), 577 &tmp)); 578 579 mBaseURL = tmp; 580 } 581 582 mControlURL = getControlURL(); 583 584 if (mSessionDesc->countTracks() < 2) { 585 // There's no actual tracks in this session. 586 // The first "track" is merely session meta 587 // data. 588 589 ALOGW("Session doesn't contain any playable " 590 "tracks. Aborting."); 591 result = ERROR_UNSUPPORTED; 592 } else { 593 setupTrack(1); 594 } 595 } 596 } 597 } 598 599 if (result != OK) { 600 sp<AMessage> reply = new AMessage('disc', this); 601 mConn->disconnect(reply); 602 } 603 break; 604 } 605 606 case 'sdpl': 607 { 608 int32_t result; 609 CHECK(msg->findInt32("result", &result)); 610 611 ALOGI("SDP connection request completed with result %d (%s)", 612 result, strerror(-result)); 613 614 if (result == OK) { 615 sp<RefBase> obj; 616 CHECK(msg->findObject("description", &obj)); 617 mSessionDesc = 618 static_cast<ASessionDescription *>(obj.get()); 619 620 if (!mSessionDesc->isValid()) { 621 ALOGE("Failed to parse session description."); 622 result = ERROR_MALFORMED; 623 } else { 624 mBaseURL = mSessionURL; 625 626 mSeekable = !isLiveStream(mSessionDesc); 627 628 mControlURL = getControlURL(); 629 630 if (mSessionDesc->countTracks() < 2) { 631 // There's no actual tracks in this session. 632 // The first "track" is merely session meta 633 // data. 634 635 ALOGW("Session doesn't contain any playable " 636 "tracks. Aborting."); 637 result = ERROR_UNSUPPORTED; 638 } else { 639 setupTrack(1); 640 } 641 } 642 } 643 644 if (result != OK) { 645 sp<AMessage> reply = new AMessage('disc', this); 646 mConn->disconnect(reply); 647 } 648 break; 649 } 650 651 case 'setu': 652 { 653 size_t index; 654 CHECK(msg->findSize("index", &index)); 655 656 TrackInfo *track = NULL; 657 size_t trackIndex; 658 if (msg->findSize("track-index", &trackIndex)) { 659 track = &mTracks.editItemAt(trackIndex); 660 } 661 662 int32_t result; 663 CHECK(msg->findInt32("result", &result)); 664 665 ALOGI("SETUP(%zu) completed with result %d (%s)", 666 index, result, strerror(-result)); 667 668 if (result == OK) { 669 CHECK(track != NULL); 670 671 sp<RefBase> obj; 672 CHECK(msg->findObject("response", &obj)); 673 sp<ARTSPResponse> response = 674 static_cast<ARTSPResponse *>(obj.get()); 675 676 if (response->mStatusCode != 200) { 677 result = UNKNOWN_ERROR; 678 } else { 679 ssize_t i = response->mHeaders.indexOfKey("session"); 680 CHECK_GE(i, 0); 681 682 mSessionID = response->mHeaders.valueAt(i); 683 684 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 685 AString timeoutStr; 686 if (GetAttribute( 687 mSessionID.c_str(), "timeout", &timeoutStr)) { 688 char *end; 689 unsigned long timeoutSecs = 690 strtoul(timeoutStr.c_str(), &end, 10); 691 692 if (end == timeoutStr.c_str() || *end != '\0') { 693 ALOGW("server specified malformed timeout '%s'", 694 timeoutStr.c_str()); 695 696 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 697 } else if (timeoutSecs < 15) { 698 ALOGW("server specified too short a timeout " 699 "(%lu secs), using default.", 700 timeoutSecs); 701 702 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 703 } else { 704 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 705 706 ALOGI("server specified timeout of %lu secs.", 707 timeoutSecs); 708 } 709 } 710 711 i = mSessionID.find(";"); 712 if (i >= 0) { 713 // Remove options, i.e. ";timeout=90" 714 mSessionID.erase(i, mSessionID.size() - i); 715 } 716 717 sp<AMessage> notify = new AMessage('accu', this); 718 notify->setSize("track-index", trackIndex); 719 720 i = response->mHeaders.indexOfKey("transport"); 721 CHECK_GE(i, 0); 722 723 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) { 724 if (!track->mUsingInterleavedTCP) { 725 AString transport = response->mHeaders.valueAt(i); 726 727 // We are going to continue even if we were 728 // unable to poke a hole into the firewall... 729 pokeAHole( 730 track->mRTPSocket, 731 track->mRTCPSocket, 732 transport); 733 } 734 735 mRTPConn->addStream( 736 track->mRTPSocket, track->mRTCPSocket, 737 mSessionDesc, index, 738 notify, track->mUsingInterleavedTCP); 739 740 mSetupTracksSuccessful = true; 741 } else { 742 result = BAD_VALUE; 743 } 744 } 745 } 746 747 if (result != OK) { 748 if (track) { 749 if (!track->mUsingInterleavedTCP) { 750 // Clear the tag 751 if (mUIDValid) { 752 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 753 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket); 754 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 755 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket); 756 } 757 758 close(track->mRTPSocket); 759 close(track->mRTCPSocket); 760 } 761 762 mTracks.removeItemsAt(trackIndex); 763 } 764 } 765 766 ++index; 767 if (result == OK && index < mSessionDesc->countTracks()) { 768 setupTrack(index); 769 } else if (mSetupTracksSuccessful) { 770 ++mKeepAliveGeneration; 771 postKeepAlive(); 772 773 AString request = "PLAY "; 774 request.append(mControlURL); 775 request.append(" RTSP/1.0\r\n"); 776 777 request.append("Session: "); 778 request.append(mSessionID); 779 request.append("\r\n"); 780 781 request.append("\r\n"); 782 783 sp<AMessage> reply = new AMessage('play', this); 784 mConn->sendRequest(request.c_str(), reply); 785 } else { 786 sp<AMessage> reply = new AMessage('disc', this); 787 mConn->disconnect(reply); 788 } 789 break; 790 } 791 792 case 'play': 793 { 794 int32_t result; 795 CHECK(msg->findInt32("result", &result)); 796 797 ALOGI("PLAY completed with result %d (%s)", 798 result, strerror(-result)); 799 800 if (result == OK) { 801 sp<RefBase> obj; 802 CHECK(msg->findObject("response", &obj)); 803 sp<ARTSPResponse> response = 804 static_cast<ARTSPResponse *>(obj.get()); 805 806 if (response->mStatusCode != 200) { 807 result = UNKNOWN_ERROR; 808 } else { 809 parsePlayResponse(response); 810 811 sp<AMessage> timeout = new AMessage('tiou', this); 812 mCheckTimeoutGeneration++; 813 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 814 timeout->post(kStartupTimeoutUs); 815 } 816 } 817 818 if (result != OK) { 819 sp<AMessage> reply = new AMessage('disc', this); 820 mConn->disconnect(reply); 821 } 822 823 break; 824 } 825 826 case 'aliv': 827 { 828 int32_t generation; 829 CHECK(msg->findInt32("generation", &generation)); 830 831 if (generation != mKeepAliveGeneration) { 832 // obsolete event. 833 break; 834 } 835 836 AString request; 837 request.append("OPTIONS "); 838 request.append(mSessionURL); 839 request.append(" RTSP/1.0\r\n"); 840 request.append("Session: "); 841 request.append(mSessionID); 842 request.append("\r\n"); 843 request.append("\r\n"); 844 845 sp<AMessage> reply = new AMessage('opts', this); 846 reply->setInt32("generation", mKeepAliveGeneration); 847 mConn->sendRequest(request.c_str(), reply); 848 break; 849 } 850 851 case 'opts': 852 { 853 int32_t result; 854 CHECK(msg->findInt32("result", &result)); 855 856 ALOGI("OPTIONS completed with result %d (%s)", 857 result, strerror(-result)); 858 859 int32_t generation; 860 CHECK(msg->findInt32("generation", &generation)); 861 862 if (generation != mKeepAliveGeneration) { 863 // obsolete event. 864 break; 865 } 866 867 postKeepAlive(); 868 break; 869 } 870 871 case 'abor': 872 { 873 for (size_t i = 0; i < mTracks.size(); ++i) { 874 TrackInfo *info = &mTracks.editItemAt(i); 875 876 if (!mFirstAccessUnit) { 877 postQueueEOS(i, ERROR_END_OF_STREAM); 878 } 879 880 if (!info->mUsingInterleavedTCP) { 881 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 882 883 // Clear the tag 884 if (mUIDValid) { 885 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 886 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket); 887 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 888 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket); 889 } 890 891 close(info->mRTPSocket); 892 close(info->mRTCPSocket); 893 } 894 } 895 mTracks.clear(); 896 mSetupTracksSuccessful = false; 897 mSeekPending = false; 898 mFirstAccessUnit = true; 899 mAllTracksHaveTime = false; 900 mNTPAnchorUs = -1; 901 mMediaAnchorUs = -1; 902 mNumAccessUnitsReceived = 0; 903 mReceivedFirstRTCPPacket = false; 904 mReceivedFirstRTPPacket = false; 905 mPausing = false; 906 mSeekable = true; 907 908 sp<AMessage> reply = new AMessage('tear', this); 909 910 int32_t reconnect; 911 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 912 reply->setInt32("reconnect", true); 913 } 914 915 AString request; 916 request = "TEARDOWN "; 917 918 // XXX should use aggregate url from SDP here... 919 request.append(mSessionURL); 920 request.append(" RTSP/1.0\r\n"); 921 922 request.append("Session: "); 923 request.append(mSessionID); 924 request.append("\r\n"); 925 926 request.append("\r\n"); 927 928 mConn->sendRequest(request.c_str(), reply); 929 break; 930 } 931 932 case 'tear': 933 { 934 int32_t result; 935 CHECK(msg->findInt32("result", &result)); 936 937 ALOGI("TEARDOWN completed with result %d (%s)", 938 result, strerror(-result)); 939 940 sp<AMessage> reply = new AMessage('disc', this); 941 942 int32_t reconnect; 943 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 944 reply->setInt32("reconnect", true); 945 } 946 947 mConn->disconnect(reply); 948 break; 949 } 950 951 case 'quit': 952 { 953 sp<AMessage> msg = mNotify->dup(); 954 msg->setInt32("what", kWhatDisconnected); 955 msg->setInt32("result", UNKNOWN_ERROR); 956 msg->post(); 957 break; 958 } 959 960 case 'chek': 961 { 962 int32_t generation; 963 CHECK(msg->findInt32("generation", &generation)); 964 if (generation != mCheckGeneration) { 965 // This is an outdated message. Ignore. 966 break; 967 } 968 969 if (mNumAccessUnitsReceived == 0) { 970 #if 1 971 ALOGI("stream ended? aborting."); 972 (new AMessage('abor', this))->post(); 973 break; 974 #else 975 ALOGI("haven't seen an AU in a looong time."); 976 #endif 977 } 978 979 mNumAccessUnitsReceived = 0; 980 msg->post(kAccessUnitTimeoutUs); 981 break; 982 } 983 984 case 'accu': 985 { 986 if (mSeekPending) { 987 ALOGV("Stale access unit."); 988 break; 989 } 990 991 int32_t timeUpdate; 992 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 993 size_t trackIndex; 994 CHECK(msg->findSize("track-index", &trackIndex)); 995 996 uint32_t rtpTime; 997 uint64_t ntpTime; 998 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 999 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 1000 1001 onTimeUpdate(trackIndex, rtpTime, ntpTime); 1002 break; 1003 } 1004 1005 int32_t first; 1006 if (msg->findInt32("first-rtcp", &first)) { 1007 mReceivedFirstRTCPPacket = true; 1008 break; 1009 } 1010 1011 if (msg->findInt32("first-rtp", &first)) { 1012 mReceivedFirstRTPPacket = true; 1013 break; 1014 } 1015 1016 ++mNumAccessUnitsReceived; 1017 postAccessUnitTimeoutCheck(); 1018 1019 size_t trackIndex; 1020 CHECK(msg->findSize("track-index", &trackIndex)); 1021 1022 if (trackIndex >= mTracks.size()) { 1023 ALOGV("late packets ignored."); 1024 break; 1025 } 1026 1027 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1028 1029 int32_t eos; 1030 if (msg->findInt32("eos", &eos)) { 1031 ALOGI("received BYE on track index %zu", trackIndex); 1032 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1033 ALOGI("No time established => fake existing data"); 1034 1035 track->mEOSReceived = true; 1036 mTryFakeRTCP = true; 1037 mReceivedFirstRTCPPacket = true; 1038 fakeTimestamps(); 1039 } else { 1040 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1041 } 1042 return; 1043 } 1044 1045 if (mSeekPending) { 1046 ALOGV("we're seeking, dropping stale packet."); 1047 break; 1048 } 1049 1050 sp<ABuffer> accessUnit; 1051 CHECK(msg->findBuffer("access-unit", &accessUnit)); 1052 onAccessUnitComplete(trackIndex, accessUnit); 1053 break; 1054 } 1055 1056 case 'paus': 1057 { 1058 int32_t generation; 1059 CHECK(msg->findInt32("pausecheck", &generation)); 1060 if (generation != mPauseGeneration) { 1061 ALOGV("Ignoring outdated pause message."); 1062 break; 1063 } 1064 1065 if (!mSeekable) { 1066 ALOGW("This is a live stream, ignoring pause request."); 1067 break; 1068 } 1069 1070 if (mPausing) { 1071 ALOGV("This stream is already paused."); 1072 break; 1073 } 1074 1075 mCheckPending = true; 1076 ++mCheckGeneration; 1077 mPausing = true; 1078 1079 AString request = "PAUSE "; 1080 request.append(mControlURL); 1081 request.append(" RTSP/1.0\r\n"); 1082 1083 request.append("Session: "); 1084 request.append(mSessionID); 1085 request.append("\r\n"); 1086 1087 request.append("\r\n"); 1088 1089 sp<AMessage> reply = new AMessage('pau2', this); 1090 mConn->sendRequest(request.c_str(), reply); 1091 break; 1092 } 1093 1094 case 'pau2': 1095 { 1096 int32_t result; 1097 CHECK(msg->findInt32("result", &result)); 1098 mCheckTimeoutGeneration++; 1099 1100 ALOGI("PAUSE completed with result %d (%s)", 1101 result, strerror(-result)); 1102 break; 1103 } 1104 1105 case 'resu': 1106 { 1107 if (mPausing && mSeekPending) { 1108 // If seeking, Play will be sent from see1 instead 1109 break; 1110 } 1111 1112 if (!mPausing) { 1113 // Dont send PLAY if we have not paused 1114 break; 1115 } 1116 AString request = "PLAY "; 1117 request.append(mControlURL); 1118 request.append(" RTSP/1.0\r\n"); 1119 1120 request.append("Session: "); 1121 request.append(mSessionID); 1122 request.append("\r\n"); 1123 1124 request.append("\r\n"); 1125 1126 sp<AMessage> reply = new AMessage('res2', this); 1127 mConn->sendRequest(request.c_str(), reply); 1128 break; 1129 } 1130 1131 case 'res2': 1132 { 1133 int32_t result; 1134 CHECK(msg->findInt32("result", &result)); 1135 1136 ALOGI("PLAY (for resume) completed with result %d (%s)", 1137 result, strerror(-result)); 1138 1139 mCheckPending = false; 1140 ++mCheckGeneration; 1141 postAccessUnitTimeoutCheck(); 1142 1143 if (result == OK) { 1144 sp<RefBase> obj; 1145 CHECK(msg->findObject("response", &obj)); 1146 sp<ARTSPResponse> response = 1147 static_cast<ARTSPResponse *>(obj.get()); 1148 1149 if (response->mStatusCode != 200) { 1150 result = UNKNOWN_ERROR; 1151 } else { 1152 parsePlayResponse(response); 1153 1154 // Post new timeout in order to make sure to use 1155 // fake timestamps if no new Sender Reports arrive 1156 sp<AMessage> timeout = new AMessage('tiou', this); 1157 mCheckTimeoutGeneration++; 1158 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1159 timeout->post(kStartupTimeoutUs); 1160 } 1161 } 1162 1163 if (result != OK) { 1164 ALOGE("resume failed, aborting."); 1165 (new AMessage('abor', this))->post(); 1166 } 1167 1168 mPausing = false; 1169 break; 1170 } 1171 1172 case 'seek': 1173 { 1174 if (!mSeekable) { 1175 ALOGW("This is a live stream, ignoring seek request."); 1176 1177 sp<AMessage> msg = mNotify->dup(); 1178 msg->setInt32("what", kWhatSeekDone); 1179 msg->post(); 1180 break; 1181 } 1182 1183 int64_t timeUs; 1184 CHECK(msg->findInt64("time", &timeUs)); 1185 1186 mSeekPending = true; 1187 1188 // Disable the access unit timeout until we resumed 1189 // playback again. 1190 mCheckPending = true; 1191 ++mCheckGeneration; 1192 1193 sp<AMessage> reply = new AMessage('see0', this); 1194 reply->setInt64("time", timeUs); 1195 1196 if (mPausing) { 1197 // PAUSE already sent 1198 ALOGI("Pause already sent"); 1199 reply->post(); 1200 break; 1201 } 1202 AString request = "PAUSE "; 1203 request.append(mControlURL); 1204 request.append(" RTSP/1.0\r\n"); 1205 1206 request.append("Session: "); 1207 request.append(mSessionID); 1208 request.append("\r\n"); 1209 1210 request.append("\r\n"); 1211 1212 mConn->sendRequest(request.c_str(), reply); 1213 break; 1214 } 1215 1216 case 'see0': 1217 { 1218 // Session is paused now. 1219 status_t err = OK; 1220 msg->findInt32("result", &err); 1221 1222 int64_t timeUs; 1223 CHECK(msg->findInt64("time", &timeUs)); 1224 1225 sp<AMessage> notify = mNotify->dup(); 1226 notify->setInt32("what", kWhatSeekPaused); 1227 notify->setInt32("err", err); 1228 notify->setInt64("time", timeUs); 1229 notify->post(); 1230 break; 1231 1232 } 1233 1234 case 'see1': 1235 { 1236 for (size_t i = 0; i < mTracks.size(); ++i) { 1237 TrackInfo *info = &mTracks.editItemAt(i); 1238 1239 postQueueSeekDiscontinuity(i); 1240 info->mEOSReceived = false; 1241 1242 info->mRTPAnchor = 0; 1243 info->mNTPAnchorUs = -1; 1244 } 1245 1246 mAllTracksHaveTime = false; 1247 mNTPAnchorUs = -1; 1248 1249 // Start new timeoutgeneration to avoid getting timeout 1250 // before PLAY response arrive 1251 sp<AMessage> timeout = new AMessage('tiou', this); 1252 mCheckTimeoutGeneration++; 1253 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1254 timeout->post(kStartupTimeoutUs); 1255 1256 int64_t timeUs; 1257 CHECK(msg->findInt64("time", &timeUs)); 1258 1259 AString request = "PLAY "; 1260 request.append(mControlURL); 1261 request.append(" RTSP/1.0\r\n"); 1262 1263 request.append("Session: "); 1264 request.append(mSessionID); 1265 request.append("\r\n"); 1266 1267 request.append( 1268 AStringPrintf( 1269 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1270 1271 request.append("\r\n"); 1272 1273 sp<AMessage> reply = new AMessage('see2', this); 1274 mConn->sendRequest(request.c_str(), reply); 1275 break; 1276 } 1277 1278 case 'see2': 1279 { 1280 if (mTracks.size() == 0) { 1281 // We have already hit abor, break 1282 break; 1283 } 1284 1285 int32_t result; 1286 CHECK(msg->findInt32("result", &result)); 1287 1288 ALOGI("PLAY (for seek) completed with result %d (%s)", 1289 result, strerror(-result)); 1290 1291 mCheckPending = false; 1292 ++mCheckGeneration; 1293 postAccessUnitTimeoutCheck(); 1294 1295 if (result == OK) { 1296 sp<RefBase> obj; 1297 CHECK(msg->findObject("response", &obj)); 1298 sp<ARTSPResponse> response = 1299 static_cast<ARTSPResponse *>(obj.get()); 1300 1301 if (response->mStatusCode != 200) { 1302 result = UNKNOWN_ERROR; 1303 } else { 1304 parsePlayResponse(response); 1305 1306 // Post new timeout in order to make sure to use 1307 // fake timestamps if no new Sender Reports arrive 1308 sp<AMessage> timeout = new AMessage('tiou', this); 1309 mCheckTimeoutGeneration++; 1310 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1311 timeout->post(kStartupTimeoutUs); 1312 1313 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1314 CHECK_GE(i, 0); 1315 1316 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1317 1318 ALOGI("seek completed."); 1319 } 1320 } 1321 1322 if (result != OK) { 1323 ALOGE("seek failed, aborting."); 1324 (new AMessage('abor', this))->post(); 1325 } 1326 1327 mPausing = false; 1328 mSeekPending = false; 1329 1330 // Discard all stale access units. 1331 for (size_t i = 0; i < mTracks.size(); ++i) { 1332 TrackInfo *track = &mTracks.editItemAt(i); 1333 track->mPackets.clear(); 1334 } 1335 1336 sp<AMessage> msg = mNotify->dup(); 1337 msg->setInt32("what", kWhatSeekDone); 1338 msg->post(); 1339 break; 1340 } 1341 1342 case 'biny': 1343 { 1344 sp<ABuffer> buffer; 1345 CHECK(msg->findBuffer("buffer", &buffer)); 1346 1347 int32_t index; 1348 CHECK(buffer->meta()->findInt32("index", &index)); 1349 1350 mRTPConn->injectPacket(index, buffer); 1351 break; 1352 } 1353 1354 case 'tiou': 1355 { 1356 int32_t timeoutGenerationCheck; 1357 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); 1358 if (timeoutGenerationCheck != mCheckTimeoutGeneration) { 1359 // This is an outdated message. Ignore. 1360 // This typically happens if a lot of seeks are 1361 // performed, since new timeout messages now are 1362 // posted at seek as well. 1363 break; 1364 } 1365 if (!mReceivedFirstRTCPPacket) { 1366 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { 1367 ALOGW("We received RTP packets but no RTCP packets, " 1368 "using fake timestamps."); 1369 1370 mTryFakeRTCP = true; 1371 1372 mReceivedFirstRTCPPacket = true; 1373 1374 fakeTimestamps(); 1375 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1376 ALOGW("Never received any data, switching transports."); 1377 1378 mTryTCPInterleaving = true; 1379 1380 sp<AMessage> msg = new AMessage('abor', this); 1381 msg->setInt32("reconnect", true); 1382 msg->post(); 1383 } else { 1384 ALOGW("Never received any data, disconnecting."); 1385 (new AMessage('abor', this))->post(); 1386 } 1387 } else { 1388 if (!mAllTracksHaveTime) { 1389 ALOGW("We received some RTCP packets, but time " 1390 "could not be established on all tracks, now " 1391 "using fake timestamps"); 1392 1393 fakeTimestamps(); 1394 } 1395 } 1396 break; 1397 } 1398 1399 default: 1400 TRESPASS(); 1401 break; 1402 } 1403 } 1404 1405 void postKeepAlive() { 1406 sp<AMessage> msg = new AMessage('aliv', this); 1407 msg->setInt32("generation", mKeepAliveGeneration); 1408 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1409 } 1410 1411 void cancelAccessUnitTimeoutCheck() { 1412 ALOGV("cancelAccessUnitTimeoutCheck"); 1413 ++mCheckGeneration; 1414 } 1415 1416 void postAccessUnitTimeoutCheck() { 1417 if (mCheckPending) { 1418 return; 1419 } 1420 1421 mCheckPending = true; 1422 sp<AMessage> check = new AMessage('chek', this); 1423 check->setInt32("generation", mCheckGeneration); 1424 check->post(kAccessUnitTimeoutUs); 1425 } 1426 1427 static void SplitString( 1428 const AString &s, const char *separator, List<AString> *items) { 1429 items->clear(); 1430 size_t start = 0; 1431 while (start < s.size()) { 1432 ssize_t offset = s.find(separator, start); 1433 1434 if (offset < 0) { 1435 items->push_back(AString(s, start, s.size() - start)); 1436 break; 1437 } 1438 1439 items->push_back(AString(s, start, offset - start)); 1440 start = offset + strlen(separator); 1441 } 1442 } 1443 1444 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1445 mPlayResponseParsed = true; 1446 if (mTracks.size() == 0) { 1447 ALOGV("parsePlayResponse: late packets ignored."); 1448 return; 1449 } 1450 1451 ssize_t i = response->mHeaders.indexOfKey("range"); 1452 if (i < 0) { 1453 // Server doesn't even tell use what range it is going to 1454 // play, therefore we won't support seeking. 1455 return; 1456 } 1457 1458 AString range = response->mHeaders.valueAt(i); 1459 ALOGV("Range: %s", range.c_str()); 1460 1461 AString val; 1462 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1463 1464 float npt1, npt2; 1465 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1466 // This is a live stream and therefore not seekable. 1467 1468 ALOGI("This is a live stream"); 1469 return; 1470 } 1471 1472 i = response->mHeaders.indexOfKey("rtp-info"); 1473 CHECK_GE(i, 0); 1474 1475 AString rtpInfo = response->mHeaders.valueAt(i); 1476 List<AString> streamInfos; 1477 SplitString(rtpInfo, ",", &streamInfos); 1478 1479 int n = 1; 1480 for (List<AString>::iterator it = streamInfos.begin(); 1481 it != streamInfos.end(); ++it) { 1482 (*it).trim(); 1483 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1484 1485 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1486 1487 size_t trackIndex = 0; 1488 while (trackIndex < mTracks.size() 1489 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1490 ++trackIndex; 1491 } 1492 CHECK_LT(trackIndex, mTracks.size()); 1493 1494 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1495 1496 char *end; 1497 unsigned long seq = strtoul(val.c_str(), &end, 10); 1498 1499 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1500 info->mFirstSeqNumInSegment = seq; 1501 info->mNewSegment = true; 1502 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits; 1503 1504 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1505 1506 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1507 1508 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1509 1510 info->mNormalPlayTimeRTP = rtpTime; 1511 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1512 1513 if (!mFirstAccessUnit) { 1514 postNormalPlayTimeMapping( 1515 trackIndex, 1516 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1517 } 1518 1519 ++n; 1520 } 1521 } 1522 1523 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1524 CHECK_GE(index, 0u); 1525 CHECK_LT(index, mTracks.size()); 1526 1527 const TrackInfo &info = mTracks.itemAt(index); 1528 1529 *timeScale = info.mTimeScale; 1530 1531 return info.mPacketSource->getFormat(); 1532 } 1533 1534 size_t countTracks() const { 1535 return mTracks.size(); 1536 } 1537 1538 private: 1539 struct TrackInfo { 1540 AString mURL; 1541 int mRTPSocket; 1542 int mRTCPSocket; 1543 bool mUsingInterleavedTCP; 1544 uint32_t mFirstSeqNumInSegment; 1545 bool mNewSegment; 1546 int32_t mAllowedStaleAccessUnits; 1547 1548 uint32_t mRTPAnchor; 1549 int64_t mNTPAnchorUs; 1550 int32_t mTimeScale; 1551 bool mEOSReceived; 1552 1553 uint32_t mNormalPlayTimeRTP; 1554 int64_t mNormalPlayTimeUs; 1555 1556 sp<APacketSource> mPacketSource; 1557 1558 // Stores packets temporarily while no notion of time 1559 // has been established yet. 1560 List<sp<ABuffer> > mPackets; 1561 }; 1562 1563 sp<AMessage> mNotify; 1564 bool mUIDValid; 1565 uid_t mUID; 1566 sp<ALooper> mNetLooper; 1567 sp<ARTSPConnection> mConn; 1568 sp<ARTPConnection> mRTPConn; 1569 sp<ASessionDescription> mSessionDesc; 1570 AString mOriginalSessionURL; // This one still has user:pass@ 1571 AString mSessionURL; 1572 AString mSessionHost; 1573 AString mBaseURL; 1574 AString mControlURL; 1575 AString mSessionID; 1576 bool mSetupTracksSuccessful; 1577 bool mSeekPending; 1578 bool mFirstAccessUnit; 1579 1580 bool mAllTracksHaveTime; 1581 int64_t mNTPAnchorUs; 1582 int64_t mMediaAnchorUs; 1583 int64_t mLastMediaTimeUs; 1584 1585 int64_t mNumAccessUnitsReceived; 1586 bool mCheckPending; 1587 int32_t mCheckGeneration; 1588 int32_t mCheckTimeoutGeneration; 1589 bool mTryTCPInterleaving; 1590 bool mTryFakeRTCP; 1591 bool mReceivedFirstRTCPPacket; 1592 bool mReceivedFirstRTPPacket; 1593 bool mSeekable; 1594 int64_t mKeepAliveTimeoutUs; 1595 int32_t mKeepAliveGeneration; 1596 bool mPausing; 1597 int32_t mPauseGeneration; 1598 1599 Vector<TrackInfo> mTracks; 1600 1601 bool mPlayResponseParsed; 1602 1603 void setupTrack(size_t index) { 1604 sp<APacketSource> source = 1605 new APacketSource(mSessionDesc, index); 1606 1607 if (source->initCheck() != OK) { 1608 ALOGW("Unsupported format. Ignoring track #%zu.", index); 1609 1610 sp<AMessage> reply = new AMessage('setu', this); 1611 reply->setSize("index", index); 1612 reply->setInt32("result", ERROR_UNSUPPORTED); 1613 reply->post(); 1614 return; 1615 } 1616 1617 AString url; 1618 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1619 1620 AString trackURL; 1621 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1622 1623 mTracks.push(TrackInfo()); 1624 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1625 info->mURL = trackURL; 1626 info->mPacketSource = source; 1627 info->mUsingInterleavedTCP = false; 1628 info->mFirstSeqNumInSegment = 0; 1629 info->mNewSegment = true; 1630 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits; 1631 info->mRTPSocket = -1; 1632 info->mRTCPSocket = -1; 1633 info->mRTPAnchor = 0; 1634 info->mNTPAnchorUs = -1; 1635 info->mNormalPlayTimeRTP = 0; 1636 info->mNormalPlayTimeUs = 0ll; 1637 1638 unsigned long PT; 1639 AString formatDesc; 1640 AString formatParams; 1641 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1642 1643 int32_t timescale; 1644 int32_t numChannels; 1645 ASessionDescription::ParseFormatDesc( 1646 formatDesc.c_str(), ×cale, &numChannels); 1647 1648 info->mTimeScale = timescale; 1649 info->mEOSReceived = false; 1650 1651 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str()); 1652 1653 AString request = "SETUP "; 1654 request.append(trackURL); 1655 request.append(" RTSP/1.0\r\n"); 1656 1657 if (mTryTCPInterleaving) { 1658 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1659 info->mUsingInterleavedTCP = true; 1660 info->mRTPSocket = interleaveIndex; 1661 info->mRTCPSocket = interleaveIndex + 1; 1662 1663 request.append("Transport: RTP/AVP/TCP;interleaved="); 1664 request.append(interleaveIndex); 1665 request.append("-"); 1666 request.append(interleaveIndex + 1); 1667 } else { 1668 unsigned rtpPort; 1669 ARTPConnection::MakePortPair( 1670 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1671 1672 if (mUIDValid) { 1673 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1674 (uint32_t)*(uint32_t*) "RTP_"); 1675 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1676 (uint32_t)*(uint32_t*) "RTP_"); 1677 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); 1678 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); 1679 } 1680 1681 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1682 request.append(rtpPort); 1683 request.append("-"); 1684 request.append(rtpPort + 1); 1685 } 1686 1687 request.append("\r\n"); 1688 1689 if (index > 1) { 1690 request.append("Session: "); 1691 request.append(mSessionID); 1692 request.append("\r\n"); 1693 } 1694 1695 request.append("\r\n"); 1696 1697 sp<AMessage> reply = new AMessage('setu', this); 1698 reply->setSize("index", index); 1699 reply->setSize("track-index", mTracks.size() - 1); 1700 mConn->sendRequest(request.c_str(), reply); 1701 } 1702 1703 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1704 out->clear(); 1705 1706 if (strncasecmp("rtsp://", baseURL, 7)) { 1707 // Base URL must be absolute 1708 return false; 1709 } 1710 1711 if (!strncasecmp("rtsp://", url, 7)) { 1712 // "url" is already an absolute URL, ignore base URL. 1713 out->setTo(url); 1714 return true; 1715 } 1716 1717 size_t n = strlen(baseURL); 1718 out->setTo(baseURL); 1719 if (baseURL[n - 1] != '/') { 1720 out->append("/"); 1721 } 1722 out->append(url); 1723 1724 return true; 1725 } 1726 1727 void fakeTimestamps() { 1728 mNTPAnchorUs = -1ll; 1729 for (size_t i = 0; i < mTracks.size(); ++i) { 1730 onTimeUpdate(i, 0, 0ll); 1731 } 1732 } 1733 1734 bool dataReceivedOnAllChannels() { 1735 TrackInfo *track; 1736 for (size_t i = 0; i < mTracks.size(); ++i) { 1737 track = &mTracks.editItemAt(i); 1738 if (track->mPackets.empty()) { 1739 return false; 1740 } 1741 } 1742 return true; 1743 } 1744 1745 void handleFirstAccessUnit() { 1746 if (mFirstAccessUnit) { 1747 sp<AMessage> msg = mNotify->dup(); 1748 msg->setInt32("what", kWhatConnected); 1749 msg->post(); 1750 1751 if (mSeekable) { 1752 for (size_t i = 0; i < mTracks.size(); ++i) { 1753 TrackInfo *info = &mTracks.editItemAt(i); 1754 1755 postNormalPlayTimeMapping( 1756 i, 1757 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1758 } 1759 } 1760 1761 mFirstAccessUnit = false; 1762 } 1763 } 1764 1765 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1766 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx", 1767 trackIndex, rtpTime, (long long)ntpTime); 1768 1769 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1770 1771 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1772 1773 track->mRTPAnchor = rtpTime; 1774 track->mNTPAnchorUs = ntpTimeUs; 1775 1776 if (mNTPAnchorUs < 0) { 1777 mNTPAnchorUs = ntpTimeUs; 1778 mMediaAnchorUs = mLastMediaTimeUs; 1779 } 1780 1781 if (!mAllTracksHaveTime) { 1782 bool allTracksHaveTime = (mTracks.size() > 0); 1783 for (size_t i = 0; i < mTracks.size(); ++i) { 1784 TrackInfo *track = &mTracks.editItemAt(i); 1785 if (track->mNTPAnchorUs < 0) { 1786 allTracksHaveTime = false; 1787 break; 1788 } 1789 } 1790 if (allTracksHaveTime) { 1791 mAllTracksHaveTime = true; 1792 ALOGI("Time now established for all tracks."); 1793 } 1794 } 1795 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1796 handleFirstAccessUnit(); 1797 1798 // Time is now established, lets start timestamping immediately 1799 for (size_t i = 0; i < mTracks.size(); ++i) { 1800 if (OK != processAccessUnitQueue(i)) { 1801 return; 1802 } 1803 } 1804 for (size_t i = 0; i < mTracks.size(); ++i) { 1805 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1806 if (trackInfo->mEOSReceived) { 1807 postQueueEOS(i, ERROR_END_OF_STREAM); 1808 trackInfo->mEOSReceived = false; 1809 } 1810 } 1811 } 1812 } 1813 1814 status_t processAccessUnitQueue(int32_t trackIndex) { 1815 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1816 while (!track->mPackets.empty()) { 1817 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1818 track->mPackets.erase(track->mPackets.begin()); 1819 1820 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1821 if (track->mNewSegment) { 1822 // The sequence number from RTP packet has only 16 bits and is extended 1823 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of 1824 // RTSP "PLAY" command should be used to detect the first RTP packet 1825 // after seeking. 1826 if (mSeekable) { 1827 if (track->mAllowedStaleAccessUnits > 0) { 1828 uint32_t seqNum16 = seqNum & 0xffff; 1829 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff; 1830 if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits 1831 || seqNum16 < firstSeqNumInSegment16) { 1832 // Not the first rtp packet of the stream after seeking, discarding. 1833 track->mAllowedStaleAccessUnits--; 1834 ALOGV("discarding stale access unit (0x%x : 0x%x)", 1835 seqNum, track->mFirstSeqNumInSegment); 1836 continue; 1837 } 1838 ALOGW_IF(seqNum16 != firstSeqNumInSegment16, 1839 "Missing the first packet(%u), now take packet(%u) as first one", 1840 track->mFirstSeqNumInSegment, seqNum); 1841 } else { // track->mAllowedStaleAccessUnits <= 0 1842 mNumAccessUnitsReceived = 0; 1843 ALOGW_IF(track->mAllowedStaleAccessUnits == 0, 1844 "Still no first rtp packet after %d stale ones", 1845 kMaxAllowedStaleAccessUnits); 1846 track->mAllowedStaleAccessUnits = -1; 1847 return UNKNOWN_ERROR; 1848 } 1849 } 1850 1851 // Now found the first rtp packet of the stream after seeking. 1852 track->mFirstSeqNumInSegment = seqNum; 1853 track->mNewSegment = false; 1854 } 1855 1856 if (seqNum < track->mFirstSeqNumInSegment) { 1857 ALOGV("dropping stale access-unit (%d < %d)", 1858 seqNum, track->mFirstSeqNumInSegment); 1859 continue; 1860 } 1861 1862 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1863 postQueueAccessUnit(trackIndex, accessUnit); 1864 } 1865 } 1866 return OK; 1867 } 1868 1869 void onAccessUnitComplete( 1870 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1871 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1872 track->mPackets.push_back(accessUnit); 1873 1874 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1875 ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum); 1876 1877 if(!mPlayResponseParsed){ 1878 ALOGV("play response is not parsed"); 1879 return; 1880 } 1881 1882 handleFirstAccessUnit(); 1883 1884 if (!mAllTracksHaveTime) { 1885 ALOGV("storing accessUnit, no time established yet"); 1886 return; 1887 } 1888 1889 if (OK != processAccessUnitQueue(trackIndex)) { 1890 return; 1891 } 1892 1893 if (track->mEOSReceived) { 1894 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1895 track->mEOSReceived = false; 1896 } 1897 } 1898 1899 bool addMediaTimestamp( 1900 int32_t trackIndex, const TrackInfo *track, 1901 const sp<ABuffer> &accessUnit) { 1902 UNUSED_UNLESS_VERBOSE(trackIndex); 1903 1904 uint32_t rtpTime; 1905 CHECK(accessUnit->meta()->findInt32( 1906 "rtp-time", (int32_t *)&rtpTime)); 1907 1908 int64_t relRtpTimeUs = 1909 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1910 / track->mTimeScale; 1911 1912 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1913 1914 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1915 1916 if (mediaTimeUs > mLastMediaTimeUs) { 1917 mLastMediaTimeUs = mediaTimeUs; 1918 } 1919 1920 if (mediaTimeUs < 0) { 1921 ALOGV("dropping early accessUnit."); 1922 return false; 1923 } 1924 1925 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)", 1926 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6); 1927 1928 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1929 1930 return true; 1931 } 1932 1933 void postQueueAccessUnit( 1934 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1935 sp<AMessage> msg = mNotify->dup(); 1936 msg->setInt32("what", kWhatAccessUnit); 1937 msg->setSize("trackIndex", trackIndex); 1938 msg->setBuffer("accessUnit", accessUnit); 1939 msg->post(); 1940 } 1941 1942 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1943 sp<AMessage> msg = mNotify->dup(); 1944 msg->setInt32("what", kWhatEOS); 1945 msg->setSize("trackIndex", trackIndex); 1946 msg->setInt32("finalResult", finalResult); 1947 msg->post(); 1948 } 1949 1950 void postQueueSeekDiscontinuity(size_t trackIndex) { 1951 sp<AMessage> msg = mNotify->dup(); 1952 msg->setInt32("what", kWhatSeekDiscontinuity); 1953 msg->setSize("trackIndex", trackIndex); 1954 msg->post(); 1955 } 1956 1957 void postNormalPlayTimeMapping( 1958 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1959 sp<AMessage> msg = mNotify->dup(); 1960 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1961 msg->setSize("trackIndex", trackIndex); 1962 msg->setInt32("rtpTime", rtpTime); 1963 msg->setInt64("nptUs", nptUs); 1964 msg->post(); 1965 } 1966 1967 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1968 }; 1969 1970 } // namespace android 1971 1972 #endif // MY_HANDLER_H_ 1973