1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MY_HANDLER_H_ 18 19 #define MY_HANDLER_H_ 20 21 //#define LOG_NDEBUG 0 22 #define LOG_TAG "MyHandler" 23 #include <utils/Log.h> 24 25 #include "APacketSource.h" 26 #include "ARTPConnection.h" 27 #include "ARTSPConnection.h" 28 #include "ASessionDescription.h" 29 30 #include <ctype.h> 31 32 #include <media/stagefright/foundation/ABuffer.h> 33 #include <media/stagefright/foundation/ADebug.h> 34 #include <media/stagefright/foundation/ALooper.h> 35 #include <media/stagefright/foundation/AMessage.h> 36 #include <media/stagefright/MediaErrors.h> 37 #include <media/stagefright/Utils.h> 38 39 #include <arpa/inet.h> 40 #include <sys/socket.h> 41 #include <netdb.h> 42 43 #include "HTTPBase.h" 44 45 // If no access units are received within 5 secs, assume that the rtp 46 // stream has ended and signal end of stream. 47 static int64_t kAccessUnitTimeoutUs = 10000000ll; 48 49 // If no access units arrive for the first 10 secs after starting the 50 // stream, assume none ever will and signal EOS or switch transports. 51 static int64_t kStartupTimeoutUs = 10000000ll; 52 53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 54 55 static int64_t kPauseDelayUs = 3000000ll; 56 57 namespace android { 58 59 static bool GetAttribute(const char *s, const char *key, AString *value) { 60 value->clear(); 61 62 size_t keyLen = strlen(key); 63 64 for (;;) { 65 while (isspace(*s)) { 66 ++s; 67 } 68 69 const char *colonPos = strchr(s, ';'); 70 71 size_t len = 72 (colonPos == NULL) ? strlen(s) : colonPos - s; 73 74 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 75 value->setTo(&s[keyLen + 1], len - keyLen - 1); 76 return true; 77 } 78 79 if (colonPos == NULL) { 80 return false; 81 } 82 83 s = colonPos + 1; 84 } 85 } 86 87 struct MyHandler : public AHandler { 88 enum { 89 kWhatConnected = 'conn', 90 kWhatDisconnected = 'disc', 91 kWhatSeekDone = 'sdon', 92 93 kWhatAccessUnit = 'accU', 94 kWhatEOS = 'eos!', 95 kWhatSeekDiscontinuity = 'seeD', 96 kWhatNormalPlayTimeMapping = 'nptM', 97 }; 98 99 MyHandler( 100 const char *url, 101 const sp<AMessage> ¬ify, 102 bool uidValid = false, uid_t uid = 0) 103 : mNotify(notify), 104 mUIDValid(uidValid), 105 mUID(uid), 106 mNetLooper(new ALooper), 107 mConn(new ARTSPConnection(mUIDValid, mUID)), 108 mRTPConn(new ARTPConnection), 109 mOriginalSessionURL(url), 110 mSessionURL(url), 111 mSetupTracksSuccessful(false), 112 mSeekPending(false), 113 mFirstAccessUnit(true), 114 mAllTracksHaveTime(false), 115 mNTPAnchorUs(-1), 116 mMediaAnchorUs(-1), 117 mLastMediaTimeUs(0), 118 mNumAccessUnitsReceived(0), 119 mCheckPending(false), 120 mCheckGeneration(0), 121 mCheckTimeoutGeneration(0), 122 mTryTCPInterleaving(false), 123 mTryFakeRTCP(false), 124 mReceivedFirstRTCPPacket(false), 125 mReceivedFirstRTPPacket(false), 126 mSeekable(true), 127 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 128 mKeepAliveGeneration(0), 129 mPausing(false), 130 mPauseGeneration(0) { 131 mNetLooper->setName("rtsp net"); 132 mNetLooper->start(false /* runOnCallingThread */, 133 false /* canCallJava */, 134 PRIORITY_HIGHEST); 135 136 // Strip any authentication info from the session url, we don't 137 // want to transmit user/pass in cleartext. 138 AString host, path, user, pass; 139 unsigned port; 140 CHECK(ARTSPConnection::ParseURL( 141 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 142 143 if (user.size() > 0) { 144 mSessionURL.clear(); 145 mSessionURL.append("rtsp://"); 146 mSessionURL.append(host); 147 mSessionURL.append(":"); 148 mSessionURL.append(StringPrintf("%u", port)); 149 mSessionURL.append(path); 150 151 ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); 152 } 153 154 mSessionHost = host; 155 } 156 157 void connect() { 158 looper()->registerHandler(mConn); 159 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 160 161 sp<AMessage> notify = new AMessage('biny', id()); 162 mConn->observeBinaryData(notify); 163 164 sp<AMessage> reply = new AMessage('conn', id()); 165 mConn->connect(mOriginalSessionURL.c_str(), reply); 166 } 167 168 void loadSDP(const sp<ASessionDescription>& desc) { 169 looper()->registerHandler(mConn); 170 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 171 172 sp<AMessage> notify = new AMessage('biny', id()); 173 mConn->observeBinaryData(notify); 174 175 sp<AMessage> reply = new AMessage('sdpl', id()); 176 reply->setObject("description", desc); 177 mConn->connect(mOriginalSessionURL.c_str(), reply); 178 } 179 180 AString getControlURL(sp<ASessionDescription> desc) { 181 AString sessionLevelControlURL; 182 if (mSessionDesc->findAttribute( 183 0, 184 "a=control", 185 &sessionLevelControlURL)) { 186 if (sessionLevelControlURL.compare("*") == 0) { 187 return mBaseURL; 188 } else { 189 AString controlURL; 190 CHECK(MakeURL( 191 mBaseURL.c_str(), 192 sessionLevelControlURL.c_str(), 193 &controlURL)); 194 return controlURL; 195 } 196 } else { 197 return mSessionURL; 198 } 199 } 200 201 void disconnect() { 202 (new AMessage('abor', id()))->post(); 203 } 204 205 void seek(int64_t timeUs) { 206 sp<AMessage> msg = new AMessage('seek', id()); 207 msg->setInt64("time", timeUs); 208 mPauseGeneration++; 209 msg->post(); 210 } 211 212 bool isSeekable() const { 213 return mSeekable; 214 } 215 216 void pause() { 217 sp<AMessage> msg = new AMessage('paus', id()); 218 mPauseGeneration++; 219 msg->setInt32("pausecheck", mPauseGeneration); 220 msg->post(kPauseDelayUs); 221 } 222 223 void resume() { 224 sp<AMessage> msg = new AMessage('resu', id()); 225 mPauseGeneration++; 226 msg->post(); 227 } 228 229 static void addRR(const sp<ABuffer> &buf) { 230 uint8_t *ptr = buf->data() + buf->size(); 231 ptr[0] = 0x80 | 0; 232 ptr[1] = 201; // RR 233 ptr[2] = 0; 234 ptr[3] = 1; 235 ptr[4] = 0xde; // SSRC 236 ptr[5] = 0xad; 237 ptr[6] = 0xbe; 238 ptr[7] = 0xef; 239 240 buf->setRange(0, buf->size() + 8); 241 } 242 243 static void addSDES(int s, const sp<ABuffer> &buffer) { 244 struct sockaddr_in addr; 245 socklen_t addrSize = sizeof(addr); 246 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 247 248 uint8_t *data = buffer->data() + buffer->size(); 249 data[0] = 0x80 | 1; 250 data[1] = 202; // SDES 251 data[4] = 0xde; // SSRC 252 data[5] = 0xad; 253 data[6] = 0xbe; 254 data[7] = 0xef; 255 256 size_t offset = 8; 257 258 data[offset++] = 1; // CNAME 259 260 AString cname = "stagefright@"; 261 cname.append(inet_ntoa(addr.sin_addr)); 262 data[offset++] = cname.size(); 263 264 memcpy(&data[offset], cname.c_str(), cname.size()); 265 offset += cname.size(); 266 267 data[offset++] = 6; // TOOL 268 269 AString tool = MakeUserAgent(); 270 271 data[offset++] = tool.size(); 272 273 memcpy(&data[offset], tool.c_str(), tool.size()); 274 offset += tool.size(); 275 276 data[offset++] = 0; 277 278 if ((offset % 4) > 0) { 279 size_t count = 4 - (offset % 4); 280 switch (count) { 281 case 3: 282 data[offset++] = 0; 283 case 2: 284 data[offset++] = 0; 285 case 1: 286 data[offset++] = 0; 287 } 288 } 289 290 size_t numWords = (offset / 4) - 1; 291 data[2] = numWords >> 8; 292 data[3] = numWords & 0xff; 293 294 buffer->setRange(buffer->offset(), buffer->size() + offset); 295 } 296 297 // In case we're behind NAT, fire off two UDP packets to the remote 298 // rtp/rtcp ports to poke a hole into the firewall for future incoming 299 // packets. We're going to send an RR/SDES RTCP packet to both of them. 300 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 301 struct sockaddr_in addr; 302 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 303 addr.sin_family = AF_INET; 304 305 AString source; 306 AString server_port; 307 if (!GetAttribute(transport.c_str(), 308 "source", 309 &source)) { 310 ALOGW("Missing 'source' field in Transport response. Using " 311 "RTSP endpoint address."); 312 313 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 314 if (ent == NULL) { 315 ALOGE("Failed to look up address of session host '%s'", 316 mSessionHost.c_str()); 317 318 return false; 319 } 320 321 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 322 } else { 323 addr.sin_addr.s_addr = inet_addr(source.c_str()); 324 } 325 326 if (!GetAttribute(transport.c_str(), 327 "server_port", 328 &server_port)) { 329 ALOGI("Missing 'server_port' field in Transport response."); 330 return false; 331 } 332 333 int rtpPort, rtcpPort; 334 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 335 || rtpPort <= 0 || rtpPort > 65535 336 || rtcpPort <=0 || rtcpPort > 65535 337 || rtcpPort != rtpPort + 1) { 338 ALOGE("Server picked invalid RTP/RTCP port pair %s," 339 " RTP port must be even, RTCP port must be one higher.", 340 server_port.c_str()); 341 342 return false; 343 } 344 345 if (rtpPort & 1) { 346 ALOGW("Server picked an odd RTP port, it should've picked an " 347 "even one, we'll let it pass for now, but this may break " 348 "in the future."); 349 } 350 351 if (addr.sin_addr.s_addr == INADDR_NONE) { 352 return true; 353 } 354 355 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 356 // No firewalls to traverse on the loopback interface. 357 return true; 358 } 359 360 // Make up an RR/SDES RTCP packet. 361 sp<ABuffer> buf = new ABuffer(65536); 362 buf->setRange(0, 0); 363 addRR(buf); 364 addSDES(rtpSocket, buf); 365 366 addr.sin_port = htons(rtpPort); 367 368 ssize_t n = sendto( 369 rtpSocket, buf->data(), buf->size(), 0, 370 (const sockaddr *)&addr, sizeof(addr)); 371 372 if (n < (ssize_t)buf->size()) { 373 ALOGE("failed to poke a hole for RTP packets"); 374 return false; 375 } 376 377 addr.sin_port = htons(rtcpPort); 378 379 n = sendto( 380 rtcpSocket, buf->data(), buf->size(), 0, 381 (const sockaddr *)&addr, sizeof(addr)); 382 383 if (n < (ssize_t)buf->size()) { 384 ALOGE("failed to poke a hole for RTCP packets"); 385 return false; 386 } 387 388 ALOGV("successfully poked holes."); 389 390 return true; 391 } 392 393 static bool isLiveStream(const sp<ASessionDescription> &desc) { 394 AString attrLiveStream; 395 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 396 ssize_t semicolonPos = attrLiveStream.find(";", 2); 397 398 const char* liveStreamValue; 399 if (semicolonPos < 0) { 400 liveStreamValue = attrLiveStream.c_str(); 401 } else { 402 AString valString; 403 valString.setTo(attrLiveStream, 404 semicolonPos + 1, 405 attrLiveStream.size() - semicolonPos - 1); 406 liveStreamValue = valString.c_str(); 407 } 408 409 uint32_t value = strtoul(liveStreamValue, NULL, 10); 410 if (value == 1) { 411 ALOGV("found live stream"); 412 return true; 413 } 414 } else { 415 // It is a live stream if no duration is returned 416 int64_t durationUs; 417 if (!desc->getDurationUs(&durationUs)) { 418 ALOGV("No duration found, assume live stream"); 419 return true; 420 } 421 } 422 423 return false; 424 } 425 426 virtual void onMessageReceived(const sp<AMessage> &msg) { 427 switch (msg->what()) { 428 case 'conn': 429 { 430 int32_t result; 431 CHECK(msg->findInt32("result", &result)); 432 433 ALOGI("connection request completed with result %d (%s)", 434 result, strerror(-result)); 435 436 if (result == OK) { 437 AString request; 438 request = "DESCRIBE "; 439 request.append(mSessionURL); 440 request.append(" RTSP/1.0\r\n"); 441 request.append("Accept: application/sdp\r\n"); 442 request.append("\r\n"); 443 444 sp<AMessage> reply = new AMessage('desc', id()); 445 mConn->sendRequest(request.c_str(), reply); 446 } else { 447 (new AMessage('disc', id()))->post(); 448 } 449 break; 450 } 451 452 case 'disc': 453 { 454 ++mKeepAliveGeneration; 455 456 int32_t reconnect; 457 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 458 sp<AMessage> reply = new AMessage('conn', id()); 459 mConn->connect(mOriginalSessionURL.c_str(), reply); 460 } else { 461 (new AMessage('quit', id()))->post(); 462 } 463 break; 464 } 465 466 case 'desc': 467 { 468 int32_t result; 469 CHECK(msg->findInt32("result", &result)); 470 471 ALOGI("DESCRIBE completed with result %d (%s)", 472 result, strerror(-result)); 473 474 if (result == OK) { 475 sp<RefBase> obj; 476 CHECK(msg->findObject("response", &obj)); 477 sp<ARTSPResponse> response = 478 static_cast<ARTSPResponse *>(obj.get()); 479 480 if (response->mStatusCode == 302) { 481 ssize_t i = response->mHeaders.indexOfKey("location"); 482 CHECK_GE(i, 0); 483 484 mSessionURL = response->mHeaders.valueAt(i); 485 486 AString request; 487 request = "DESCRIBE "; 488 request.append(mSessionURL); 489 request.append(" RTSP/1.0\r\n"); 490 request.append("Accept: application/sdp\r\n"); 491 request.append("\r\n"); 492 493 sp<AMessage> reply = new AMessage('desc', id()); 494 mConn->sendRequest(request.c_str(), reply); 495 break; 496 } 497 498 if (response->mStatusCode != 200) { 499 result = UNKNOWN_ERROR; 500 } else if (response->mContent == NULL) { 501 result = ERROR_MALFORMED; 502 ALOGE("The response has no content."); 503 } else { 504 mSessionDesc = new ASessionDescription; 505 506 mSessionDesc->setTo( 507 response->mContent->data(), 508 response->mContent->size()); 509 510 if (!mSessionDesc->isValid()) { 511 ALOGE("Failed to parse session description."); 512 result = ERROR_MALFORMED; 513 } else { 514 ssize_t i = response->mHeaders.indexOfKey("content-base"); 515 if (i >= 0) { 516 mBaseURL = response->mHeaders.valueAt(i); 517 } else { 518 i = response->mHeaders.indexOfKey("content-location"); 519 if (i >= 0) { 520 mBaseURL = response->mHeaders.valueAt(i); 521 } else { 522 mBaseURL = mSessionURL; 523 } 524 } 525 526 mSeekable = !isLiveStream(mSessionDesc); 527 528 if (!mBaseURL.startsWith("rtsp://")) { 529 // Some misbehaving servers specify a relative 530 // URL in one of the locations above, combine 531 // it with the absolute session URL to get 532 // something usable... 533 534 ALOGW("Server specified a non-absolute base URL" 535 ", combining it with the session URL to " 536 "get something usable..."); 537 538 AString tmp; 539 CHECK(MakeURL( 540 mSessionURL.c_str(), 541 mBaseURL.c_str(), 542 &tmp)); 543 544 mBaseURL = tmp; 545 } 546 547 mControlURL = getControlURL(mSessionDesc); 548 549 if (mSessionDesc->countTracks() < 2) { 550 // There's no actual tracks in this session. 551 // The first "track" is merely session meta 552 // data. 553 554 ALOGW("Session doesn't contain any playable " 555 "tracks. Aborting."); 556 result = ERROR_UNSUPPORTED; 557 } else { 558 setupTrack(1); 559 } 560 } 561 } 562 } 563 564 if (result != OK) { 565 sp<AMessage> reply = new AMessage('disc', id()); 566 mConn->disconnect(reply); 567 } 568 break; 569 } 570 571 case 'sdpl': 572 { 573 int32_t result; 574 CHECK(msg->findInt32("result", &result)); 575 576 ALOGI("SDP connection request completed with result %d (%s)", 577 result, strerror(-result)); 578 579 if (result == OK) { 580 sp<RefBase> obj; 581 CHECK(msg->findObject("description", &obj)); 582 mSessionDesc = 583 static_cast<ASessionDescription *>(obj.get()); 584 585 if (!mSessionDesc->isValid()) { 586 ALOGE("Failed to parse session description."); 587 result = ERROR_MALFORMED; 588 } else { 589 mBaseURL = mSessionURL; 590 591 mSeekable = !isLiveStream(mSessionDesc); 592 593 mControlURL = getControlURL(mSessionDesc); 594 595 if (mSessionDesc->countTracks() < 2) { 596 // There's no actual tracks in this session. 597 // The first "track" is merely session meta 598 // data. 599 600 ALOGW("Session doesn't contain any playable " 601 "tracks. Aborting."); 602 result = ERROR_UNSUPPORTED; 603 } else { 604 setupTrack(1); 605 } 606 } 607 } 608 609 if (result != OK) { 610 sp<AMessage> reply = new AMessage('disc', id()); 611 mConn->disconnect(reply); 612 } 613 break; 614 } 615 616 case 'setu': 617 { 618 size_t index; 619 CHECK(msg->findSize("index", &index)); 620 621 TrackInfo *track = NULL; 622 size_t trackIndex; 623 if (msg->findSize("track-index", &trackIndex)) { 624 track = &mTracks.editItemAt(trackIndex); 625 } 626 627 int32_t result; 628 CHECK(msg->findInt32("result", &result)); 629 630 ALOGI("SETUP(%d) completed with result %d (%s)", 631 index, result, strerror(-result)); 632 633 if (result == OK) { 634 CHECK(track != NULL); 635 636 sp<RefBase> obj; 637 CHECK(msg->findObject("response", &obj)); 638 sp<ARTSPResponse> response = 639 static_cast<ARTSPResponse *>(obj.get()); 640 641 if (response->mStatusCode != 200) { 642 result = UNKNOWN_ERROR; 643 } else { 644 ssize_t i = response->mHeaders.indexOfKey("session"); 645 CHECK_GE(i, 0); 646 647 mSessionID = response->mHeaders.valueAt(i); 648 649 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 650 AString timeoutStr; 651 if (GetAttribute( 652 mSessionID.c_str(), "timeout", &timeoutStr)) { 653 char *end; 654 unsigned long timeoutSecs = 655 strtoul(timeoutStr.c_str(), &end, 10); 656 657 if (end == timeoutStr.c_str() || *end != '\0') { 658 ALOGW("server specified malformed timeout '%s'", 659 timeoutStr.c_str()); 660 661 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 662 } else if (timeoutSecs < 15) { 663 ALOGW("server specified too short a timeout " 664 "(%lu secs), using default.", 665 timeoutSecs); 666 667 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 668 } else { 669 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 670 671 ALOGI("server specified timeout of %lu secs.", 672 timeoutSecs); 673 } 674 } 675 676 i = mSessionID.find(";"); 677 if (i >= 0) { 678 // Remove options, i.e. ";timeout=90" 679 mSessionID.erase(i, mSessionID.size() - i); 680 } 681 682 sp<AMessage> notify = new AMessage('accu', id()); 683 notify->setSize("track-index", trackIndex); 684 685 i = response->mHeaders.indexOfKey("transport"); 686 CHECK_GE(i, 0); 687 688 if (!track->mUsingInterleavedTCP) { 689 AString transport = response->mHeaders.valueAt(i); 690 691 // We are going to continue even if we were 692 // unable to poke a hole into the firewall... 693 pokeAHole( 694 track->mRTPSocket, 695 track->mRTCPSocket, 696 transport); 697 } 698 699 mRTPConn->addStream( 700 track->mRTPSocket, track->mRTCPSocket, 701 mSessionDesc, index, 702 notify, track->mUsingInterleavedTCP); 703 704 mSetupTracksSuccessful = true; 705 } 706 } 707 708 if (result != OK) { 709 if (track) { 710 if (!track->mUsingInterleavedTCP) { 711 // Clear the tag 712 if (mUIDValid) { 713 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 714 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 715 } 716 717 close(track->mRTPSocket); 718 close(track->mRTCPSocket); 719 } 720 721 mTracks.removeItemsAt(trackIndex); 722 } 723 } 724 725 ++index; 726 if (index < mSessionDesc->countTracks()) { 727 setupTrack(index); 728 } else if (mSetupTracksSuccessful) { 729 ++mKeepAliveGeneration; 730 postKeepAlive(); 731 732 AString request = "PLAY "; 733 request.append(mControlURL); 734 request.append(" RTSP/1.0\r\n"); 735 736 request.append("Session: "); 737 request.append(mSessionID); 738 request.append("\r\n"); 739 740 request.append("\r\n"); 741 742 sp<AMessage> reply = new AMessage('play', id()); 743 mConn->sendRequest(request.c_str(), reply); 744 } else { 745 sp<AMessage> reply = new AMessage('disc', id()); 746 mConn->disconnect(reply); 747 } 748 break; 749 } 750 751 case 'play': 752 { 753 int32_t result; 754 CHECK(msg->findInt32("result", &result)); 755 756 ALOGI("PLAY completed with result %d (%s)", 757 result, strerror(-result)); 758 759 if (result == OK) { 760 sp<RefBase> obj; 761 CHECK(msg->findObject("response", &obj)); 762 sp<ARTSPResponse> response = 763 static_cast<ARTSPResponse *>(obj.get()); 764 765 if (response->mStatusCode != 200) { 766 result = UNKNOWN_ERROR; 767 } else { 768 parsePlayResponse(response); 769 770 sp<AMessage> timeout = new AMessage('tiou', id()); 771 mCheckTimeoutGeneration++; 772 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 773 timeout->post(kStartupTimeoutUs); 774 } 775 } 776 777 if (result != OK) { 778 sp<AMessage> reply = new AMessage('disc', id()); 779 mConn->disconnect(reply); 780 } 781 782 break; 783 } 784 785 case 'aliv': 786 { 787 int32_t generation; 788 CHECK(msg->findInt32("generation", &generation)); 789 790 if (generation != mKeepAliveGeneration) { 791 // obsolete event. 792 break; 793 } 794 795 AString request; 796 request.append("OPTIONS "); 797 request.append(mSessionURL); 798 request.append(" RTSP/1.0\r\n"); 799 request.append("Session: "); 800 request.append(mSessionID); 801 request.append("\r\n"); 802 request.append("\r\n"); 803 804 sp<AMessage> reply = new AMessage('opts', id()); 805 reply->setInt32("generation", mKeepAliveGeneration); 806 mConn->sendRequest(request.c_str(), reply); 807 break; 808 } 809 810 case 'opts': 811 { 812 int32_t result; 813 CHECK(msg->findInt32("result", &result)); 814 815 ALOGI("OPTIONS completed with result %d (%s)", 816 result, strerror(-result)); 817 818 int32_t generation; 819 CHECK(msg->findInt32("generation", &generation)); 820 821 if (generation != mKeepAliveGeneration) { 822 // obsolete event. 823 break; 824 } 825 826 postKeepAlive(); 827 break; 828 } 829 830 case 'abor': 831 { 832 for (size_t i = 0; i < mTracks.size(); ++i) { 833 TrackInfo *info = &mTracks.editItemAt(i); 834 835 if (!mFirstAccessUnit) { 836 postQueueEOS(i, ERROR_END_OF_STREAM); 837 } 838 839 if (!info->mUsingInterleavedTCP) { 840 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 841 842 // Clear the tag 843 if (mUIDValid) { 844 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 845 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 846 } 847 848 close(info->mRTPSocket); 849 close(info->mRTCPSocket); 850 } 851 } 852 mTracks.clear(); 853 mSetupTracksSuccessful = false; 854 mSeekPending = false; 855 mFirstAccessUnit = true; 856 mAllTracksHaveTime = false; 857 mNTPAnchorUs = -1; 858 mMediaAnchorUs = -1; 859 mNumAccessUnitsReceived = 0; 860 mReceivedFirstRTCPPacket = false; 861 mReceivedFirstRTPPacket = false; 862 mPausing = false; 863 mSeekable = true; 864 865 sp<AMessage> reply = new AMessage('tear', id()); 866 867 int32_t reconnect; 868 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 869 reply->setInt32("reconnect", true); 870 } 871 872 AString request; 873 request = "TEARDOWN "; 874 875 // XXX should use aggregate url from SDP here... 876 request.append(mSessionURL); 877 request.append(" RTSP/1.0\r\n"); 878 879 request.append("Session: "); 880 request.append(mSessionID); 881 request.append("\r\n"); 882 883 request.append("\r\n"); 884 885 mConn->sendRequest(request.c_str(), reply); 886 break; 887 } 888 889 case 'tear': 890 { 891 int32_t result; 892 CHECK(msg->findInt32("result", &result)); 893 894 ALOGI("TEARDOWN completed with result %d (%s)", 895 result, strerror(-result)); 896 897 sp<AMessage> reply = new AMessage('disc', id()); 898 899 int32_t reconnect; 900 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 901 reply->setInt32("reconnect", true); 902 } 903 904 mConn->disconnect(reply); 905 break; 906 } 907 908 case 'quit': 909 { 910 sp<AMessage> msg = mNotify->dup(); 911 msg->setInt32("what", kWhatDisconnected); 912 msg->setInt32("result", UNKNOWN_ERROR); 913 msg->post(); 914 break; 915 } 916 917 case 'chek': 918 { 919 int32_t generation; 920 CHECK(msg->findInt32("generation", &generation)); 921 if (generation != mCheckGeneration) { 922 // This is an outdated message. Ignore. 923 break; 924 } 925 926 if (mNumAccessUnitsReceived == 0) { 927 #if 1 928 ALOGI("stream ended? aborting."); 929 (new AMessage('abor', id()))->post(); 930 break; 931 #else 932 ALOGI("haven't seen an AU in a looong time."); 933 #endif 934 } 935 936 mNumAccessUnitsReceived = 0; 937 msg->post(kAccessUnitTimeoutUs); 938 break; 939 } 940 941 case 'accu': 942 { 943 int32_t timeUpdate; 944 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 945 size_t trackIndex; 946 CHECK(msg->findSize("track-index", &trackIndex)); 947 948 uint32_t rtpTime; 949 uint64_t ntpTime; 950 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 951 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 952 953 onTimeUpdate(trackIndex, rtpTime, ntpTime); 954 break; 955 } 956 957 int32_t first; 958 if (msg->findInt32("first-rtcp", &first)) { 959 mReceivedFirstRTCPPacket = true; 960 break; 961 } 962 963 if (msg->findInt32("first-rtp", &first)) { 964 mReceivedFirstRTPPacket = true; 965 break; 966 } 967 968 ++mNumAccessUnitsReceived; 969 postAccessUnitTimeoutCheck(); 970 971 size_t trackIndex; 972 CHECK(msg->findSize("track-index", &trackIndex)); 973 974 if (trackIndex >= mTracks.size()) { 975 ALOGV("late packets ignored."); 976 break; 977 } 978 979 TrackInfo *track = &mTracks.editItemAt(trackIndex); 980 981 int32_t eos; 982 if (msg->findInt32("eos", &eos)) { 983 ALOGI("received BYE on track index %d", trackIndex); 984 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { 985 ALOGI("No time established => fake existing data"); 986 987 track->mEOSReceived = true; 988 mTryFakeRTCP = true; 989 mReceivedFirstRTCPPacket = true; 990 fakeTimestamps(); 991 } else { 992 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 993 } 994 return; 995 } 996 997 sp<ABuffer> accessUnit; 998 CHECK(msg->findBuffer("access-unit", &accessUnit)); 999 1000 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1001 1002 if (mSeekPending) { 1003 ALOGV("we're seeking, dropping stale packet."); 1004 break; 1005 } 1006 1007 if (seqNum < track->mFirstSeqNumInSegment) { 1008 ALOGV("dropping stale access-unit (%d < %d)", 1009 seqNum, track->mFirstSeqNumInSegment); 1010 break; 1011 } 1012 1013 if (track->mNewSegment) { 1014 track->mNewSegment = false; 1015 } 1016 1017 onAccessUnitComplete(trackIndex, accessUnit); 1018 break; 1019 } 1020 1021 case 'paus': 1022 { 1023 int32_t generation; 1024 CHECK(msg->findInt32("pausecheck", &generation)); 1025 if (generation != mPauseGeneration) { 1026 ALOGV("Ignoring outdated pause message."); 1027 break; 1028 } 1029 1030 if (!mSeekable) { 1031 ALOGW("This is a live stream, ignoring pause request."); 1032 break; 1033 } 1034 mCheckPending = true; 1035 ++mCheckGeneration; 1036 mPausing = true; 1037 1038 AString request = "PAUSE "; 1039 request.append(mControlURL); 1040 request.append(" RTSP/1.0\r\n"); 1041 1042 request.append("Session: "); 1043 request.append(mSessionID); 1044 request.append("\r\n"); 1045 1046 request.append("\r\n"); 1047 1048 sp<AMessage> reply = new AMessage('pau2', id()); 1049 mConn->sendRequest(request.c_str(), reply); 1050 break; 1051 } 1052 1053 case 'pau2': 1054 { 1055 int32_t result; 1056 CHECK(msg->findInt32("result", &result)); 1057 mCheckTimeoutGeneration++; 1058 1059 ALOGI("PAUSE completed with result %d (%s)", 1060 result, strerror(-result)); 1061 break; 1062 } 1063 1064 case 'resu': 1065 { 1066 if (mPausing && mSeekPending) { 1067 // If seeking, Play will be sent from see1 instead 1068 break; 1069 } 1070 1071 if (!mPausing) { 1072 // Dont send PLAY if we have not paused 1073 break; 1074 } 1075 AString request = "PLAY "; 1076 request.append(mControlURL); 1077 request.append(" RTSP/1.0\r\n"); 1078 1079 request.append("Session: "); 1080 request.append(mSessionID); 1081 request.append("\r\n"); 1082 1083 request.append("\r\n"); 1084 1085 sp<AMessage> reply = new AMessage('res2', id()); 1086 mConn->sendRequest(request.c_str(), reply); 1087 break; 1088 } 1089 1090 case 'res2': 1091 { 1092 int32_t result; 1093 CHECK(msg->findInt32("result", &result)); 1094 1095 ALOGI("PLAY completed with result %d (%s)", 1096 result, strerror(-result)); 1097 1098 mCheckPending = false; 1099 postAccessUnitTimeoutCheck(); 1100 1101 if (result == OK) { 1102 sp<RefBase> obj; 1103 CHECK(msg->findObject("response", &obj)); 1104 sp<ARTSPResponse> response = 1105 static_cast<ARTSPResponse *>(obj.get()); 1106 1107 if (response->mStatusCode != 200) { 1108 result = UNKNOWN_ERROR; 1109 } else { 1110 parsePlayResponse(response); 1111 1112 // Post new timeout in order to make sure to use 1113 // fake timestamps if no new Sender Reports arrive 1114 sp<AMessage> timeout = new AMessage('tiou', id()); 1115 mCheckTimeoutGeneration++; 1116 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1117 timeout->post(kStartupTimeoutUs); 1118 } 1119 } 1120 1121 if (result != OK) { 1122 ALOGE("resume failed, aborting."); 1123 (new AMessage('abor', id()))->post(); 1124 } 1125 1126 mPausing = false; 1127 break; 1128 } 1129 1130 case 'seek': 1131 { 1132 if (!mSeekable) { 1133 ALOGW("This is a live stream, ignoring seek request."); 1134 1135 sp<AMessage> msg = mNotify->dup(); 1136 msg->setInt32("what", kWhatSeekDone); 1137 msg->post(); 1138 break; 1139 } 1140 1141 int64_t timeUs; 1142 CHECK(msg->findInt64("time", &timeUs)); 1143 1144 mSeekPending = true; 1145 1146 // Disable the access unit timeout until we resumed 1147 // playback again. 1148 mCheckPending = true; 1149 ++mCheckGeneration; 1150 1151 sp<AMessage> reply = new AMessage('see1', id()); 1152 reply->setInt64("time", timeUs); 1153 1154 if (mPausing) { 1155 // PAUSE already sent 1156 ALOGI("Pause already sent"); 1157 reply->post(); 1158 break; 1159 } 1160 AString request = "PAUSE "; 1161 request.append(mControlURL); 1162 request.append(" RTSP/1.0\r\n"); 1163 1164 request.append("Session: "); 1165 request.append(mSessionID); 1166 request.append("\r\n"); 1167 1168 request.append("\r\n"); 1169 1170 mConn->sendRequest(request.c_str(), reply); 1171 break; 1172 } 1173 1174 case 'see1': 1175 { 1176 // Session is paused now. 1177 for (size_t i = 0; i < mTracks.size(); ++i) { 1178 TrackInfo *info = &mTracks.editItemAt(i); 1179 1180 postQueueSeekDiscontinuity(i); 1181 info->mEOSReceived = false; 1182 1183 info->mRTPAnchor = 0; 1184 info->mNTPAnchorUs = -1; 1185 } 1186 1187 mAllTracksHaveTime = false; 1188 mNTPAnchorUs = -1; 1189 1190 // Start new timeoutgeneration to avoid getting timeout 1191 // before PLAY response arrive 1192 sp<AMessage> timeout = new AMessage('tiou', id()); 1193 mCheckTimeoutGeneration++; 1194 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1195 timeout->post(kStartupTimeoutUs); 1196 1197 int64_t timeUs; 1198 CHECK(msg->findInt64("time", &timeUs)); 1199 1200 AString request = "PLAY "; 1201 request.append(mControlURL); 1202 request.append(" RTSP/1.0\r\n"); 1203 1204 request.append("Session: "); 1205 request.append(mSessionID); 1206 request.append("\r\n"); 1207 1208 request.append( 1209 StringPrintf( 1210 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1211 1212 request.append("\r\n"); 1213 1214 sp<AMessage> reply = new AMessage('see2', id()); 1215 mConn->sendRequest(request.c_str(), reply); 1216 break; 1217 } 1218 1219 case 'see2': 1220 { 1221 if (mTracks.size() == 0) { 1222 // We have already hit abor, break 1223 break; 1224 } 1225 1226 int32_t result; 1227 CHECK(msg->findInt32("result", &result)); 1228 1229 ALOGI("PLAY completed with result %d (%s)", 1230 result, strerror(-result)); 1231 1232 mCheckPending = false; 1233 postAccessUnitTimeoutCheck(); 1234 1235 if (result == OK) { 1236 sp<RefBase> obj; 1237 CHECK(msg->findObject("response", &obj)); 1238 sp<ARTSPResponse> response = 1239 static_cast<ARTSPResponse *>(obj.get()); 1240 1241 if (response->mStatusCode != 200) { 1242 result = UNKNOWN_ERROR; 1243 } else { 1244 parsePlayResponse(response); 1245 1246 // Post new timeout in order to make sure to use 1247 // fake timestamps if no new Sender Reports arrive 1248 sp<AMessage> timeout = new AMessage('tiou', id()); 1249 mCheckTimeoutGeneration++; 1250 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1251 timeout->post(kStartupTimeoutUs); 1252 1253 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1254 CHECK_GE(i, 0); 1255 1256 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1257 1258 ALOGI("seek completed."); 1259 } 1260 } 1261 1262 if (result != OK) { 1263 ALOGE("seek failed, aborting."); 1264 (new AMessage('abor', id()))->post(); 1265 } 1266 1267 mPausing = false; 1268 mSeekPending = false; 1269 1270 sp<AMessage> msg = mNotify->dup(); 1271 msg->setInt32("what", kWhatSeekDone); 1272 msg->post(); 1273 break; 1274 } 1275 1276 case 'biny': 1277 { 1278 sp<ABuffer> buffer; 1279 CHECK(msg->findBuffer("buffer", &buffer)); 1280 1281 int32_t index; 1282 CHECK(buffer->meta()->findInt32("index", &index)); 1283 1284 mRTPConn->injectPacket(index, buffer); 1285 break; 1286 } 1287 1288 case 'tiou': 1289 { 1290 int32_t timeoutGenerationCheck; 1291 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); 1292 if (timeoutGenerationCheck != mCheckTimeoutGeneration) { 1293 // This is an outdated message. Ignore. 1294 // This typically happens if a lot of seeks are 1295 // performed, since new timeout messages now are 1296 // posted at seek as well. 1297 break; 1298 } 1299 if (!mReceivedFirstRTCPPacket) { 1300 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { 1301 ALOGW("We received RTP packets but no RTCP packets, " 1302 "using fake timestamps."); 1303 1304 mTryFakeRTCP = true; 1305 1306 mReceivedFirstRTCPPacket = true; 1307 1308 fakeTimestamps(); 1309 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1310 ALOGW("Never received any data, switching transports."); 1311 1312 mTryTCPInterleaving = true; 1313 1314 sp<AMessage> msg = new AMessage('abor', id()); 1315 msg->setInt32("reconnect", true); 1316 msg->post(); 1317 } else { 1318 ALOGW("Never received any data, disconnecting."); 1319 (new AMessage('abor', id()))->post(); 1320 } 1321 } else { 1322 if (!mAllTracksHaveTime) { 1323 ALOGW("We received some RTCP packets, but time " 1324 "could not be established on all tracks, now " 1325 "using fake timestamps"); 1326 1327 fakeTimestamps(); 1328 } 1329 } 1330 break; 1331 } 1332 1333 default: 1334 TRESPASS(); 1335 break; 1336 } 1337 } 1338 1339 void postKeepAlive() { 1340 sp<AMessage> msg = new AMessage('aliv', id()); 1341 msg->setInt32("generation", mKeepAliveGeneration); 1342 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1343 } 1344 1345 void postAccessUnitTimeoutCheck() { 1346 if (mCheckPending) { 1347 return; 1348 } 1349 1350 mCheckPending = true; 1351 sp<AMessage> check = new AMessage('chek', id()); 1352 check->setInt32("generation", mCheckGeneration); 1353 check->post(kAccessUnitTimeoutUs); 1354 } 1355 1356 static void SplitString( 1357 const AString &s, const char *separator, List<AString> *items) { 1358 items->clear(); 1359 size_t start = 0; 1360 while (start < s.size()) { 1361 ssize_t offset = s.find(separator, start); 1362 1363 if (offset < 0) { 1364 items->push_back(AString(s, start, s.size() - start)); 1365 break; 1366 } 1367 1368 items->push_back(AString(s, start, offset - start)); 1369 start = offset + strlen(separator); 1370 } 1371 } 1372 1373 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1374 if (mTracks.size() == 0) { 1375 ALOGV("parsePlayResponse: late packets ignored."); 1376 return; 1377 } 1378 1379 ssize_t i = response->mHeaders.indexOfKey("range"); 1380 if (i < 0) { 1381 // Server doesn't even tell use what range it is going to 1382 // play, therefore we won't support seeking. 1383 return; 1384 } 1385 1386 AString range = response->mHeaders.valueAt(i); 1387 ALOGV("Range: %s", range.c_str()); 1388 1389 AString val; 1390 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1391 1392 float npt1, npt2; 1393 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1394 // This is a live stream and therefore not seekable. 1395 1396 ALOGI("This is a live stream"); 1397 return; 1398 } 1399 1400 i = response->mHeaders.indexOfKey("rtp-info"); 1401 CHECK_GE(i, 0); 1402 1403 AString rtpInfo = response->mHeaders.valueAt(i); 1404 List<AString> streamInfos; 1405 SplitString(rtpInfo, ",", &streamInfos); 1406 1407 int n = 1; 1408 for (List<AString>::iterator it = streamInfos.begin(); 1409 it != streamInfos.end(); ++it) { 1410 (*it).trim(); 1411 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1412 1413 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1414 1415 size_t trackIndex = 0; 1416 while (trackIndex < mTracks.size() 1417 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1418 ++trackIndex; 1419 } 1420 CHECK_LT(trackIndex, mTracks.size()); 1421 1422 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1423 1424 char *end; 1425 unsigned long seq = strtoul(val.c_str(), &end, 10); 1426 1427 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1428 info->mFirstSeqNumInSegment = seq; 1429 info->mNewSegment = true; 1430 1431 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1432 1433 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1434 1435 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1436 1437 info->mNormalPlayTimeRTP = rtpTime; 1438 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1439 1440 if (!mFirstAccessUnit) { 1441 postNormalPlayTimeMapping( 1442 trackIndex, 1443 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1444 } 1445 1446 ++n; 1447 } 1448 } 1449 1450 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1451 CHECK_GE(index, 0u); 1452 CHECK_LT(index, mTracks.size()); 1453 1454 const TrackInfo &info = mTracks.itemAt(index); 1455 1456 *timeScale = info.mTimeScale; 1457 1458 return info.mPacketSource->getFormat(); 1459 } 1460 1461 size_t countTracks() const { 1462 return mTracks.size(); 1463 } 1464 1465 private: 1466 struct TrackInfo { 1467 AString mURL; 1468 int mRTPSocket; 1469 int mRTCPSocket; 1470 bool mUsingInterleavedTCP; 1471 uint32_t mFirstSeqNumInSegment; 1472 bool mNewSegment; 1473 1474 uint32_t mRTPAnchor; 1475 int64_t mNTPAnchorUs; 1476 int32_t mTimeScale; 1477 bool mEOSReceived; 1478 1479 uint32_t mNormalPlayTimeRTP; 1480 int64_t mNormalPlayTimeUs; 1481 1482 sp<APacketSource> mPacketSource; 1483 1484 // Stores packets temporarily while no notion of time 1485 // has been established yet. 1486 List<sp<ABuffer> > mPackets; 1487 }; 1488 1489 sp<AMessage> mNotify; 1490 bool mUIDValid; 1491 uid_t mUID; 1492 sp<ALooper> mNetLooper; 1493 sp<ARTSPConnection> mConn; 1494 sp<ARTPConnection> mRTPConn; 1495 sp<ASessionDescription> mSessionDesc; 1496 AString mOriginalSessionURL; // This one still has user:pass@ 1497 AString mSessionURL; 1498 AString mSessionHost; 1499 AString mBaseURL; 1500 AString mControlURL; 1501 AString mSessionID; 1502 bool mSetupTracksSuccessful; 1503 bool mSeekPending; 1504 bool mFirstAccessUnit; 1505 1506 bool mAllTracksHaveTime; 1507 int64_t mNTPAnchorUs; 1508 int64_t mMediaAnchorUs; 1509 int64_t mLastMediaTimeUs; 1510 1511 int64_t mNumAccessUnitsReceived; 1512 bool mCheckPending; 1513 int32_t mCheckGeneration; 1514 int32_t mCheckTimeoutGeneration; 1515 bool mTryTCPInterleaving; 1516 bool mTryFakeRTCP; 1517 bool mReceivedFirstRTCPPacket; 1518 bool mReceivedFirstRTPPacket; 1519 bool mSeekable; 1520 int64_t mKeepAliveTimeoutUs; 1521 int32_t mKeepAliveGeneration; 1522 bool mPausing; 1523 int32_t mPauseGeneration; 1524 1525 Vector<TrackInfo> mTracks; 1526 1527 void setupTrack(size_t index) { 1528 sp<APacketSource> source = 1529 new APacketSource(mSessionDesc, index); 1530 1531 if (source->initCheck() != OK) { 1532 ALOGW("Unsupported format. Ignoring track #%d.", index); 1533 1534 sp<AMessage> reply = new AMessage('setu', id()); 1535 reply->setSize("index", index); 1536 reply->setInt32("result", ERROR_UNSUPPORTED); 1537 reply->post(); 1538 return; 1539 } 1540 1541 AString url; 1542 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1543 1544 AString trackURL; 1545 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1546 1547 mTracks.push(TrackInfo()); 1548 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1549 info->mURL = trackURL; 1550 info->mPacketSource = source; 1551 info->mUsingInterleavedTCP = false; 1552 info->mFirstSeqNumInSegment = 0; 1553 info->mNewSegment = true; 1554 info->mRTPAnchor = 0; 1555 info->mNTPAnchorUs = -1; 1556 info->mNormalPlayTimeRTP = 0; 1557 info->mNormalPlayTimeUs = 0ll; 1558 1559 unsigned long PT; 1560 AString formatDesc; 1561 AString formatParams; 1562 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1563 1564 int32_t timescale; 1565 int32_t numChannels; 1566 ASessionDescription::ParseFormatDesc( 1567 formatDesc.c_str(), ×cale, &numChannels); 1568 1569 info->mTimeScale = timescale; 1570 info->mEOSReceived = false; 1571 1572 ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1573 1574 AString request = "SETUP "; 1575 request.append(trackURL); 1576 request.append(" RTSP/1.0\r\n"); 1577 1578 if (mTryTCPInterleaving) { 1579 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1580 info->mUsingInterleavedTCP = true; 1581 info->mRTPSocket = interleaveIndex; 1582 info->mRTCPSocket = interleaveIndex + 1; 1583 1584 request.append("Transport: RTP/AVP/TCP;interleaved="); 1585 request.append(interleaveIndex); 1586 request.append("-"); 1587 request.append(interleaveIndex + 1); 1588 } else { 1589 unsigned rtpPort; 1590 ARTPConnection::MakePortPair( 1591 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1592 1593 if (mUIDValid) { 1594 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1595 (uint32_t)*(uint32_t*) "RTP_"); 1596 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1597 (uint32_t)*(uint32_t*) "RTP_"); 1598 } 1599 1600 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1601 request.append(rtpPort); 1602 request.append("-"); 1603 request.append(rtpPort + 1); 1604 } 1605 1606 request.append("\r\n"); 1607 1608 if (index > 1) { 1609 request.append("Session: "); 1610 request.append(mSessionID); 1611 request.append("\r\n"); 1612 } 1613 1614 request.append("\r\n"); 1615 1616 sp<AMessage> reply = new AMessage('setu', id()); 1617 reply->setSize("index", index); 1618 reply->setSize("track-index", mTracks.size() - 1); 1619 mConn->sendRequest(request.c_str(), reply); 1620 } 1621 1622 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1623 out->clear(); 1624 1625 if (strncasecmp("rtsp://", baseURL, 7)) { 1626 // Base URL must be absolute 1627 return false; 1628 } 1629 1630 if (!strncasecmp("rtsp://", url, 7)) { 1631 // "url" is already an absolute URL, ignore base URL. 1632 out->setTo(url); 1633 return true; 1634 } 1635 1636 size_t n = strlen(baseURL); 1637 if (baseURL[n - 1] == '/') { 1638 out->setTo(baseURL); 1639 out->append(url); 1640 } else { 1641 const char *slashPos = strrchr(baseURL, '/'); 1642 1643 if (slashPos > &baseURL[6]) { 1644 out->setTo(baseURL, slashPos - baseURL); 1645 } else { 1646 out->setTo(baseURL); 1647 } 1648 1649 out->append("/"); 1650 out->append(url); 1651 } 1652 1653 return true; 1654 } 1655 1656 void fakeTimestamps() { 1657 mNTPAnchorUs = -1ll; 1658 for (size_t i = 0; i < mTracks.size(); ++i) { 1659 onTimeUpdate(i, 0, 0ll); 1660 } 1661 } 1662 1663 bool dataReceivedOnAllChannels() { 1664 TrackInfo *track; 1665 for (size_t i = 0; i < mTracks.size(); ++i) { 1666 track = &mTracks.editItemAt(i); 1667 if (track->mPackets.empty()) { 1668 return false; 1669 } 1670 } 1671 return true; 1672 } 1673 1674 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1675 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", 1676 trackIndex, rtpTime, ntpTime); 1677 1678 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1679 1680 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1681 1682 track->mRTPAnchor = rtpTime; 1683 track->mNTPAnchorUs = ntpTimeUs; 1684 1685 if (mNTPAnchorUs < 0) { 1686 mNTPAnchorUs = ntpTimeUs; 1687 mMediaAnchorUs = mLastMediaTimeUs; 1688 } 1689 1690 if (!mAllTracksHaveTime) { 1691 bool allTracksHaveTime = true; 1692 for (size_t i = 0; i < mTracks.size(); ++i) { 1693 TrackInfo *track = &mTracks.editItemAt(i); 1694 if (track->mNTPAnchorUs < 0) { 1695 allTracksHaveTime = false; 1696 break; 1697 } 1698 } 1699 if (allTracksHaveTime) { 1700 mAllTracksHaveTime = true; 1701 ALOGI("Time now established for all tracks."); 1702 } 1703 } 1704 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1705 // Time is now established, lets start timestamping immediately 1706 for (size_t i = 0; i < mTracks.size(); ++i) { 1707 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1708 while (!trackInfo->mPackets.empty()) { 1709 sp<ABuffer> accessUnit = *trackInfo->mPackets.begin(); 1710 trackInfo->mPackets.erase(trackInfo->mPackets.begin()); 1711 1712 if (addMediaTimestamp(i, trackInfo, accessUnit)) { 1713 postQueueAccessUnit(i, accessUnit); 1714 } 1715 } 1716 } 1717 for (size_t i = 0; i < mTracks.size(); ++i) { 1718 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1719 if (trackInfo->mEOSReceived) { 1720 postQueueEOS(i, ERROR_END_OF_STREAM); 1721 trackInfo->mEOSReceived = false; 1722 } 1723 } 1724 } 1725 } 1726 1727 void onAccessUnitComplete( 1728 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1729 ALOGV("onAccessUnitComplete track %d", trackIndex); 1730 1731 if (mFirstAccessUnit) { 1732 sp<AMessage> msg = mNotify->dup(); 1733 msg->setInt32("what", kWhatConnected); 1734 msg->post(); 1735 1736 if (mSeekable) { 1737 for (size_t i = 0; i < mTracks.size(); ++i) { 1738 TrackInfo *info = &mTracks.editItemAt(i); 1739 1740 postNormalPlayTimeMapping( 1741 i, 1742 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1743 } 1744 } 1745 1746 mFirstAccessUnit = false; 1747 } 1748 1749 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1750 1751 if (!mAllTracksHaveTime) { 1752 ALOGV("storing accessUnit, no time established yet"); 1753 track->mPackets.push_back(accessUnit); 1754 return; 1755 } 1756 1757 while (!track->mPackets.empty()) { 1758 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1759 track->mPackets.erase(track->mPackets.begin()); 1760 1761 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1762 postQueueAccessUnit(trackIndex, accessUnit); 1763 } 1764 } 1765 1766 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1767 postQueueAccessUnit(trackIndex, accessUnit); 1768 } 1769 1770 if (track->mEOSReceived) { 1771 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1772 track->mEOSReceived = false; 1773 } 1774 } 1775 1776 bool addMediaTimestamp( 1777 int32_t trackIndex, const TrackInfo *track, 1778 const sp<ABuffer> &accessUnit) { 1779 uint32_t rtpTime; 1780 CHECK(accessUnit->meta()->findInt32( 1781 "rtp-time", (int32_t *)&rtpTime)); 1782 1783 int64_t relRtpTimeUs = 1784 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1785 / track->mTimeScale; 1786 1787 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1788 1789 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1790 1791 if (mediaTimeUs > mLastMediaTimeUs) { 1792 mLastMediaTimeUs = mediaTimeUs; 1793 } 1794 1795 if (mediaTimeUs < 0) { 1796 ALOGV("dropping early accessUnit."); 1797 return false; 1798 } 1799 1800 ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", 1801 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); 1802 1803 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1804 1805 return true; 1806 } 1807 1808 void postQueueAccessUnit( 1809 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1810 sp<AMessage> msg = mNotify->dup(); 1811 msg->setInt32("what", kWhatAccessUnit); 1812 msg->setSize("trackIndex", trackIndex); 1813 msg->setBuffer("accessUnit", accessUnit); 1814 msg->post(); 1815 } 1816 1817 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1818 sp<AMessage> msg = mNotify->dup(); 1819 msg->setInt32("what", kWhatEOS); 1820 msg->setSize("trackIndex", trackIndex); 1821 msg->setInt32("finalResult", finalResult); 1822 msg->post(); 1823 } 1824 1825 void postQueueSeekDiscontinuity(size_t trackIndex) { 1826 sp<AMessage> msg = mNotify->dup(); 1827 msg->setInt32("what", kWhatSeekDiscontinuity); 1828 msg->setSize("trackIndex", trackIndex); 1829 msg->post(); 1830 } 1831 1832 void postNormalPlayTimeMapping( 1833 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1834 sp<AMessage> msg = mNotify->dup(); 1835 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1836 msg->setSize("trackIndex", trackIndex); 1837 msg->setInt32("rtpTime", rtpTime); 1838 msg->setInt64("nptUs", nptUs); 1839 msg->post(); 1840 } 1841 1842 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1843 }; 1844 1845 } // namespace android 1846 1847 #endif // MY_HANDLER_H_ 1848