1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MY_HANDLER_H_ 18 19 #define MY_HANDLER_H_ 20 21 //#define LOG_NDEBUG 0 22 #define LOG_TAG "MyHandler" 23 #include <utils/Log.h> 24 25 #include "APacketSource.h" 26 #include "ARTPConnection.h" 27 #include "ARTSPConnection.h" 28 #include "ASessionDescription.h" 29 30 #include <ctype.h> 31 32 #include <media/stagefright/foundation/ABuffer.h> 33 #include <media/stagefright/foundation/ADebug.h> 34 #include <media/stagefright/foundation/ALooper.h> 35 #include <media/stagefright/foundation/AMessage.h> 36 #include <media/stagefright/MediaErrors.h> 37 #include <media/stagefright/Utils.h> 38 39 #include <arpa/inet.h> 40 #include <sys/socket.h> 41 #include <netdb.h> 42 43 #include "HTTPBase.h" 44 45 // If no access units are received within 5 secs, assume that the rtp 46 // stream has ended and signal end of stream. 47 static int64_t kAccessUnitTimeoutUs = 10000000ll; 48 49 // If no access units arrive for the first 10 secs after starting the 50 // stream, assume none ever will and signal EOS or switch transports. 51 static int64_t kStartupTimeoutUs = 10000000ll; 52 53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 54 55 static int64_t kPauseDelayUs = 3000000ll; 56 57 namespace android { 58 59 static bool GetAttribute(const char *s, const char *key, AString *value) { 60 value->clear(); 61 62 size_t keyLen = strlen(key); 63 64 for (;;) { 65 while (isspace(*s)) { 66 ++s; 67 } 68 69 const char *colonPos = strchr(s, ';'); 70 71 size_t len = 72 (colonPos == NULL) ? strlen(s) : colonPos - s; 73 74 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 75 value->setTo(&s[keyLen + 1], len - keyLen - 1); 76 return true; 77 } 78 79 if (colonPos == NULL) { 80 return false; 81 } 82 83 s = colonPos + 1; 84 } 85 } 86 87 struct MyHandler : public AHandler { 88 enum { 89 kWhatConnected = 'conn', 90 kWhatDisconnected = 'disc', 91 kWhatSeekDone = 'sdon', 92 93 kWhatAccessUnit = 'accU', 94 kWhatEOS = 'eos!', 95 kWhatSeekDiscontinuity = 'seeD', 96 kWhatNormalPlayTimeMapping = 'nptM', 97 }; 98 99 MyHandler( 100 const char *url, 101 const sp<AMessage> ¬ify, 102 bool uidValid = false, uid_t uid = 0) 103 : mNotify(notify), 104 mUIDValid(uidValid), 105 mUID(uid), 106 mNetLooper(new ALooper), 107 mConn(new ARTSPConnection(mUIDValid, mUID)), 108 mRTPConn(new ARTPConnection), 109 mOriginalSessionURL(url), 110 mSessionURL(url), 111 mSetupTracksSuccessful(false), 112 mSeekPending(false), 113 mFirstAccessUnit(true), 114 mAllTracksHaveTime(false), 115 mNTPAnchorUs(-1), 116 mMediaAnchorUs(-1), 117 mLastMediaTimeUs(0), 118 mNumAccessUnitsReceived(0), 119 mCheckPending(false), 120 mCheckGeneration(0), 121 mCheckTimeoutGeneration(0), 122 mTryTCPInterleaving(false), 123 mTryFakeRTCP(false), 124 mReceivedFirstRTCPPacket(false), 125 mReceivedFirstRTPPacket(false), 126 mSeekable(true), 127 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 128 mKeepAliveGeneration(0), 129 mPausing(false), 130 mPauseGeneration(0), 131 mPlayResponseParsed(false) { 132 mNetLooper->setName("rtsp net"); 133 mNetLooper->start(false /* runOnCallingThread */, 134 false /* canCallJava */, 135 PRIORITY_HIGHEST); 136 137 // Strip any authentication info from the session url, we don't 138 // want to transmit user/pass in cleartext. 139 AString host, path, user, pass; 140 unsigned port; 141 CHECK(ARTSPConnection::ParseURL( 142 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 143 144 if (user.size() > 0) { 145 mSessionURL.clear(); 146 mSessionURL.append("rtsp://"); 147 mSessionURL.append(host); 148 mSessionURL.append(":"); 149 mSessionURL.append(StringPrintf("%u", port)); 150 mSessionURL.append(path); 151 152 ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); 153 } 154 155 mSessionHost = host; 156 } 157 158 void connect() { 159 looper()->registerHandler(mConn); 160 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 161 162 sp<AMessage> notify = new AMessage('biny', id()); 163 mConn->observeBinaryData(notify); 164 165 sp<AMessage> reply = new AMessage('conn', id()); 166 mConn->connect(mOriginalSessionURL.c_str(), reply); 167 } 168 169 void loadSDP(const sp<ASessionDescription>& desc) { 170 looper()->registerHandler(mConn); 171 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 172 173 sp<AMessage> notify = new AMessage('biny', id()); 174 mConn->observeBinaryData(notify); 175 176 sp<AMessage> reply = new AMessage('sdpl', id()); 177 reply->setObject("description", desc); 178 mConn->connect(mOriginalSessionURL.c_str(), reply); 179 } 180 181 AString getControlURL(sp<ASessionDescription> desc) { 182 AString sessionLevelControlURL; 183 if (mSessionDesc->findAttribute( 184 0, 185 "a=control", 186 &sessionLevelControlURL)) { 187 if (sessionLevelControlURL.compare("*") == 0) { 188 return mBaseURL; 189 } else { 190 AString controlURL; 191 CHECK(MakeURL( 192 mBaseURL.c_str(), 193 sessionLevelControlURL.c_str(), 194 &controlURL)); 195 return controlURL; 196 } 197 } else { 198 return mSessionURL; 199 } 200 } 201 202 void disconnect() { 203 (new AMessage('abor', id()))->post(); 204 } 205 206 void seek(int64_t timeUs) { 207 sp<AMessage> msg = new AMessage('seek', id()); 208 msg->setInt64("time", timeUs); 209 mPauseGeneration++; 210 msg->post(); 211 } 212 213 bool isSeekable() const { 214 return mSeekable; 215 } 216 217 void pause() { 218 sp<AMessage> msg = new AMessage('paus', id()); 219 mPauseGeneration++; 220 msg->setInt32("pausecheck", mPauseGeneration); 221 msg->post(kPauseDelayUs); 222 } 223 224 void resume() { 225 sp<AMessage> msg = new AMessage('resu', id()); 226 mPauseGeneration++; 227 msg->post(); 228 } 229 230 static void addRR(const sp<ABuffer> &buf) { 231 uint8_t *ptr = buf->data() + buf->size(); 232 ptr[0] = 0x80 | 0; 233 ptr[1] = 201; // RR 234 ptr[2] = 0; 235 ptr[3] = 1; 236 ptr[4] = 0xde; // SSRC 237 ptr[5] = 0xad; 238 ptr[6] = 0xbe; 239 ptr[7] = 0xef; 240 241 buf->setRange(0, buf->size() + 8); 242 } 243 244 static void addSDES(int s, const sp<ABuffer> &buffer) { 245 struct sockaddr_in addr; 246 socklen_t addrSize = sizeof(addr); 247 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 248 249 uint8_t *data = buffer->data() + buffer->size(); 250 data[0] = 0x80 | 1; 251 data[1] = 202; // SDES 252 data[4] = 0xde; // SSRC 253 data[5] = 0xad; 254 data[6] = 0xbe; 255 data[7] = 0xef; 256 257 size_t offset = 8; 258 259 data[offset++] = 1; // CNAME 260 261 AString cname = "stagefright@"; 262 cname.append(inet_ntoa(addr.sin_addr)); 263 data[offset++] = cname.size(); 264 265 memcpy(&data[offset], cname.c_str(), cname.size()); 266 offset += cname.size(); 267 268 data[offset++] = 6; // TOOL 269 270 AString tool = MakeUserAgent(); 271 272 data[offset++] = tool.size(); 273 274 memcpy(&data[offset], tool.c_str(), tool.size()); 275 offset += tool.size(); 276 277 data[offset++] = 0; 278 279 if ((offset % 4) > 0) { 280 size_t count = 4 - (offset % 4); 281 switch (count) { 282 case 3: 283 data[offset++] = 0; 284 case 2: 285 data[offset++] = 0; 286 case 1: 287 data[offset++] = 0; 288 } 289 } 290 291 size_t numWords = (offset / 4) - 1; 292 data[2] = numWords >> 8; 293 data[3] = numWords & 0xff; 294 295 buffer->setRange(buffer->offset(), buffer->size() + offset); 296 } 297 298 // In case we're behind NAT, fire off two UDP packets to the remote 299 // rtp/rtcp ports to poke a hole into the firewall for future incoming 300 // packets. We're going to send an RR/SDES RTCP packet to both of them. 301 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 302 struct sockaddr_in addr; 303 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 304 addr.sin_family = AF_INET; 305 306 AString source; 307 AString server_port; 308 if (!GetAttribute(transport.c_str(), 309 "source", 310 &source)) { 311 ALOGW("Missing 'source' field in Transport response. Using " 312 "RTSP endpoint address."); 313 314 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 315 if (ent == NULL) { 316 ALOGE("Failed to look up address of session host '%s'", 317 mSessionHost.c_str()); 318 319 return false; 320 } 321 322 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 323 } else { 324 addr.sin_addr.s_addr = inet_addr(source.c_str()); 325 } 326 327 if (!GetAttribute(transport.c_str(), 328 "server_port", 329 &server_port)) { 330 ALOGI("Missing 'server_port' field in Transport response."); 331 return false; 332 } 333 334 int rtpPort, rtcpPort; 335 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 336 || rtpPort <= 0 || rtpPort > 65535 337 || rtcpPort <=0 || rtcpPort > 65535 338 || rtcpPort != rtpPort + 1) { 339 ALOGE("Server picked invalid RTP/RTCP port pair %s," 340 " RTP port must be even, RTCP port must be one higher.", 341 server_port.c_str()); 342 343 return false; 344 } 345 346 if (rtpPort & 1) { 347 ALOGW("Server picked an odd RTP port, it should've picked an " 348 "even one, we'll let it pass for now, but this may break " 349 "in the future."); 350 } 351 352 if (addr.sin_addr.s_addr == INADDR_NONE) { 353 return true; 354 } 355 356 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 357 // No firewalls to traverse on the loopback interface. 358 return true; 359 } 360 361 // Make up an RR/SDES RTCP packet. 362 sp<ABuffer> buf = new ABuffer(65536); 363 buf->setRange(0, 0); 364 addRR(buf); 365 addSDES(rtpSocket, buf); 366 367 addr.sin_port = htons(rtpPort); 368 369 ssize_t n = sendto( 370 rtpSocket, buf->data(), buf->size(), 0, 371 (const sockaddr *)&addr, sizeof(addr)); 372 373 if (n < (ssize_t)buf->size()) { 374 ALOGE("failed to poke a hole for RTP packets"); 375 return false; 376 } 377 378 addr.sin_port = htons(rtcpPort); 379 380 n = sendto( 381 rtcpSocket, buf->data(), buf->size(), 0, 382 (const sockaddr *)&addr, sizeof(addr)); 383 384 if (n < (ssize_t)buf->size()) { 385 ALOGE("failed to poke a hole for RTCP packets"); 386 return false; 387 } 388 389 ALOGV("successfully poked holes."); 390 391 return true; 392 } 393 394 static bool isLiveStream(const sp<ASessionDescription> &desc) { 395 AString attrLiveStream; 396 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 397 ssize_t semicolonPos = attrLiveStream.find(";", 2); 398 399 const char* liveStreamValue; 400 if (semicolonPos < 0) { 401 liveStreamValue = attrLiveStream.c_str(); 402 } else { 403 AString valString; 404 valString.setTo(attrLiveStream, 405 semicolonPos + 1, 406 attrLiveStream.size() - semicolonPos - 1); 407 liveStreamValue = valString.c_str(); 408 } 409 410 uint32_t value = strtoul(liveStreamValue, NULL, 10); 411 if (value == 1) { 412 ALOGV("found live stream"); 413 return true; 414 } 415 } else { 416 // It is a live stream if no duration is returned 417 int64_t durationUs; 418 if (!desc->getDurationUs(&durationUs)) { 419 ALOGV("No duration found, assume live stream"); 420 return true; 421 } 422 } 423 424 return false; 425 } 426 427 virtual void onMessageReceived(const sp<AMessage> &msg) { 428 switch (msg->what()) { 429 case 'conn': 430 { 431 int32_t result; 432 CHECK(msg->findInt32("result", &result)); 433 434 ALOGI("connection request completed with result %d (%s)", 435 result, strerror(-result)); 436 437 if (result == OK) { 438 AString request; 439 request = "DESCRIBE "; 440 request.append(mSessionURL); 441 request.append(" RTSP/1.0\r\n"); 442 request.append("Accept: application/sdp\r\n"); 443 request.append("\r\n"); 444 445 sp<AMessage> reply = new AMessage('desc', id()); 446 mConn->sendRequest(request.c_str(), reply); 447 } else { 448 (new AMessage('disc', id()))->post(); 449 } 450 break; 451 } 452 453 case 'disc': 454 { 455 ++mKeepAliveGeneration; 456 457 int32_t reconnect; 458 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 459 sp<AMessage> reply = new AMessage('conn', id()); 460 mConn->connect(mOriginalSessionURL.c_str(), reply); 461 } else { 462 (new AMessage('quit', id()))->post(); 463 } 464 break; 465 } 466 467 case 'desc': 468 { 469 int32_t result; 470 CHECK(msg->findInt32("result", &result)); 471 472 ALOGI("DESCRIBE completed with result %d (%s)", 473 result, strerror(-result)); 474 475 if (result == OK) { 476 sp<RefBase> obj; 477 CHECK(msg->findObject("response", &obj)); 478 sp<ARTSPResponse> response = 479 static_cast<ARTSPResponse *>(obj.get()); 480 481 if (response->mStatusCode == 302) { 482 ssize_t i = response->mHeaders.indexOfKey("location"); 483 CHECK_GE(i, 0); 484 485 mSessionURL = response->mHeaders.valueAt(i); 486 487 AString request; 488 request = "DESCRIBE "; 489 request.append(mSessionURL); 490 request.append(" RTSP/1.0\r\n"); 491 request.append("Accept: application/sdp\r\n"); 492 request.append("\r\n"); 493 494 sp<AMessage> reply = new AMessage('desc', id()); 495 mConn->sendRequest(request.c_str(), reply); 496 break; 497 } 498 499 if (response->mStatusCode != 200) { 500 result = UNKNOWN_ERROR; 501 } else if (response->mContent == NULL) { 502 result = ERROR_MALFORMED; 503 ALOGE("The response has no content."); 504 } else { 505 mSessionDesc = new ASessionDescription; 506 507 mSessionDesc->setTo( 508 response->mContent->data(), 509 response->mContent->size()); 510 511 if (!mSessionDesc->isValid()) { 512 ALOGE("Failed to parse session description."); 513 result = ERROR_MALFORMED; 514 } else { 515 ssize_t i = response->mHeaders.indexOfKey("content-base"); 516 if (i >= 0) { 517 mBaseURL = response->mHeaders.valueAt(i); 518 } else { 519 i = response->mHeaders.indexOfKey("content-location"); 520 if (i >= 0) { 521 mBaseURL = response->mHeaders.valueAt(i); 522 } else { 523 mBaseURL = mSessionURL; 524 } 525 } 526 527 mSeekable = !isLiveStream(mSessionDesc); 528 529 if (!mBaseURL.startsWith("rtsp://")) { 530 // Some misbehaving servers specify a relative 531 // URL in one of the locations above, combine 532 // it with the absolute session URL to get 533 // something usable... 534 535 ALOGW("Server specified a non-absolute base URL" 536 ", combining it with the session URL to " 537 "get something usable..."); 538 539 AString tmp; 540 CHECK(MakeURL( 541 mSessionURL.c_str(), 542 mBaseURL.c_str(), 543 &tmp)); 544 545 mBaseURL = tmp; 546 } 547 548 mControlURL = getControlURL(mSessionDesc); 549 550 if (mSessionDesc->countTracks() < 2) { 551 // There's no actual tracks in this session. 552 // The first "track" is merely session meta 553 // data. 554 555 ALOGW("Session doesn't contain any playable " 556 "tracks. Aborting."); 557 result = ERROR_UNSUPPORTED; 558 } else { 559 setupTrack(1); 560 } 561 } 562 } 563 } 564 565 if (result != OK) { 566 sp<AMessage> reply = new AMessage('disc', id()); 567 mConn->disconnect(reply); 568 } 569 break; 570 } 571 572 case 'sdpl': 573 { 574 int32_t result; 575 CHECK(msg->findInt32("result", &result)); 576 577 ALOGI("SDP connection request completed with result %d (%s)", 578 result, strerror(-result)); 579 580 if (result == OK) { 581 sp<RefBase> obj; 582 CHECK(msg->findObject("description", &obj)); 583 mSessionDesc = 584 static_cast<ASessionDescription *>(obj.get()); 585 586 if (!mSessionDesc->isValid()) { 587 ALOGE("Failed to parse session description."); 588 result = ERROR_MALFORMED; 589 } else { 590 mBaseURL = mSessionURL; 591 592 mSeekable = !isLiveStream(mSessionDesc); 593 594 mControlURL = getControlURL(mSessionDesc); 595 596 if (mSessionDesc->countTracks() < 2) { 597 // There's no actual tracks in this session. 598 // The first "track" is merely session meta 599 // data. 600 601 ALOGW("Session doesn't contain any playable " 602 "tracks. Aborting."); 603 result = ERROR_UNSUPPORTED; 604 } else { 605 setupTrack(1); 606 } 607 } 608 } 609 610 if (result != OK) { 611 sp<AMessage> reply = new AMessage('disc', id()); 612 mConn->disconnect(reply); 613 } 614 break; 615 } 616 617 case 'setu': 618 { 619 size_t index; 620 CHECK(msg->findSize("index", &index)); 621 622 TrackInfo *track = NULL; 623 size_t trackIndex; 624 if (msg->findSize("track-index", &trackIndex)) { 625 track = &mTracks.editItemAt(trackIndex); 626 } 627 628 int32_t result; 629 CHECK(msg->findInt32("result", &result)); 630 631 ALOGI("SETUP(%d) completed with result %d (%s)", 632 index, result, strerror(-result)); 633 634 if (result == OK) { 635 CHECK(track != NULL); 636 637 sp<RefBase> obj; 638 CHECK(msg->findObject("response", &obj)); 639 sp<ARTSPResponse> response = 640 static_cast<ARTSPResponse *>(obj.get()); 641 642 if (response->mStatusCode != 200) { 643 result = UNKNOWN_ERROR; 644 } else { 645 ssize_t i = response->mHeaders.indexOfKey("session"); 646 CHECK_GE(i, 0); 647 648 mSessionID = response->mHeaders.valueAt(i); 649 650 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 651 AString timeoutStr; 652 if (GetAttribute( 653 mSessionID.c_str(), "timeout", &timeoutStr)) { 654 char *end; 655 unsigned long timeoutSecs = 656 strtoul(timeoutStr.c_str(), &end, 10); 657 658 if (end == timeoutStr.c_str() || *end != '\0') { 659 ALOGW("server specified malformed timeout '%s'", 660 timeoutStr.c_str()); 661 662 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 663 } else if (timeoutSecs < 15) { 664 ALOGW("server specified too short a timeout " 665 "(%lu secs), using default.", 666 timeoutSecs); 667 668 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 669 } else { 670 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 671 672 ALOGI("server specified timeout of %lu secs.", 673 timeoutSecs); 674 } 675 } 676 677 i = mSessionID.find(";"); 678 if (i >= 0) { 679 // Remove options, i.e. ";timeout=90" 680 mSessionID.erase(i, mSessionID.size() - i); 681 } 682 683 sp<AMessage> notify = new AMessage('accu', id()); 684 notify->setSize("track-index", trackIndex); 685 686 i = response->mHeaders.indexOfKey("transport"); 687 CHECK_GE(i, 0); 688 689 if (!track->mUsingInterleavedTCP) { 690 AString transport = response->mHeaders.valueAt(i); 691 692 // We are going to continue even if we were 693 // unable to poke a hole into the firewall... 694 pokeAHole( 695 track->mRTPSocket, 696 track->mRTCPSocket, 697 transport); 698 } 699 700 mRTPConn->addStream( 701 track->mRTPSocket, track->mRTCPSocket, 702 mSessionDesc, index, 703 notify, track->mUsingInterleavedTCP); 704 705 mSetupTracksSuccessful = true; 706 } 707 } 708 709 if (result != OK) { 710 if (track) { 711 if (!track->mUsingInterleavedTCP) { 712 // Clear the tag 713 if (mUIDValid) { 714 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 715 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket); 716 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 717 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket); 718 } 719 720 close(track->mRTPSocket); 721 close(track->mRTCPSocket); 722 } 723 724 mTracks.removeItemsAt(trackIndex); 725 } 726 } 727 728 ++index; 729 if (index < mSessionDesc->countTracks()) { 730 setupTrack(index); 731 } else if (mSetupTracksSuccessful) { 732 ++mKeepAliveGeneration; 733 postKeepAlive(); 734 735 AString request = "PLAY "; 736 request.append(mControlURL); 737 request.append(" RTSP/1.0\r\n"); 738 739 request.append("Session: "); 740 request.append(mSessionID); 741 request.append("\r\n"); 742 743 request.append("\r\n"); 744 745 sp<AMessage> reply = new AMessage('play', id()); 746 mConn->sendRequest(request.c_str(), reply); 747 } else { 748 sp<AMessage> reply = new AMessage('disc', id()); 749 mConn->disconnect(reply); 750 } 751 break; 752 } 753 754 case 'play': 755 { 756 int32_t result; 757 CHECK(msg->findInt32("result", &result)); 758 759 ALOGI("PLAY completed with result %d (%s)", 760 result, strerror(-result)); 761 762 if (result == OK) { 763 sp<RefBase> obj; 764 CHECK(msg->findObject("response", &obj)); 765 sp<ARTSPResponse> response = 766 static_cast<ARTSPResponse *>(obj.get()); 767 768 if (response->mStatusCode != 200) { 769 result = UNKNOWN_ERROR; 770 } else { 771 parsePlayResponse(response); 772 773 sp<AMessage> timeout = new AMessage('tiou', id()); 774 mCheckTimeoutGeneration++; 775 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 776 timeout->post(kStartupTimeoutUs); 777 } 778 } 779 780 if (result != OK) { 781 sp<AMessage> reply = new AMessage('disc', id()); 782 mConn->disconnect(reply); 783 } 784 785 break; 786 } 787 788 case 'aliv': 789 { 790 int32_t generation; 791 CHECK(msg->findInt32("generation", &generation)); 792 793 if (generation != mKeepAliveGeneration) { 794 // obsolete event. 795 break; 796 } 797 798 AString request; 799 request.append("OPTIONS "); 800 request.append(mSessionURL); 801 request.append(" RTSP/1.0\r\n"); 802 request.append("Session: "); 803 request.append(mSessionID); 804 request.append("\r\n"); 805 request.append("\r\n"); 806 807 sp<AMessage> reply = new AMessage('opts', id()); 808 reply->setInt32("generation", mKeepAliveGeneration); 809 mConn->sendRequest(request.c_str(), reply); 810 break; 811 } 812 813 case 'opts': 814 { 815 int32_t result; 816 CHECK(msg->findInt32("result", &result)); 817 818 ALOGI("OPTIONS completed with result %d (%s)", 819 result, strerror(-result)); 820 821 int32_t generation; 822 CHECK(msg->findInt32("generation", &generation)); 823 824 if (generation != mKeepAliveGeneration) { 825 // obsolete event. 826 break; 827 } 828 829 postKeepAlive(); 830 break; 831 } 832 833 case 'abor': 834 { 835 for (size_t i = 0; i < mTracks.size(); ++i) { 836 TrackInfo *info = &mTracks.editItemAt(i); 837 838 if (!mFirstAccessUnit) { 839 postQueueEOS(i, ERROR_END_OF_STREAM); 840 } 841 842 if (!info->mUsingInterleavedTCP) { 843 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 844 845 // Clear the tag 846 if (mUIDValid) { 847 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 848 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket); 849 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 850 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket); 851 } 852 853 close(info->mRTPSocket); 854 close(info->mRTCPSocket); 855 } 856 } 857 mTracks.clear(); 858 mSetupTracksSuccessful = false; 859 mSeekPending = false; 860 mFirstAccessUnit = true; 861 mAllTracksHaveTime = false; 862 mNTPAnchorUs = -1; 863 mMediaAnchorUs = -1; 864 mNumAccessUnitsReceived = 0; 865 mReceivedFirstRTCPPacket = false; 866 mReceivedFirstRTPPacket = false; 867 mPausing = false; 868 mSeekable = true; 869 870 sp<AMessage> reply = new AMessage('tear', id()); 871 872 int32_t reconnect; 873 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 874 reply->setInt32("reconnect", true); 875 } 876 877 AString request; 878 request = "TEARDOWN "; 879 880 // XXX should use aggregate url from SDP here... 881 request.append(mSessionURL); 882 request.append(" RTSP/1.0\r\n"); 883 884 request.append("Session: "); 885 request.append(mSessionID); 886 request.append("\r\n"); 887 888 request.append("\r\n"); 889 890 mConn->sendRequest(request.c_str(), reply); 891 break; 892 } 893 894 case 'tear': 895 { 896 int32_t result; 897 CHECK(msg->findInt32("result", &result)); 898 899 ALOGI("TEARDOWN completed with result %d (%s)", 900 result, strerror(-result)); 901 902 sp<AMessage> reply = new AMessage('disc', id()); 903 904 int32_t reconnect; 905 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 906 reply->setInt32("reconnect", true); 907 } 908 909 mConn->disconnect(reply); 910 break; 911 } 912 913 case 'quit': 914 { 915 sp<AMessage> msg = mNotify->dup(); 916 msg->setInt32("what", kWhatDisconnected); 917 msg->setInt32("result", UNKNOWN_ERROR); 918 msg->post(); 919 break; 920 } 921 922 case 'chek': 923 { 924 int32_t generation; 925 CHECK(msg->findInt32("generation", &generation)); 926 if (generation != mCheckGeneration) { 927 // This is an outdated message. Ignore. 928 break; 929 } 930 931 if (mNumAccessUnitsReceived == 0) { 932 #if 1 933 ALOGI("stream ended? aborting."); 934 (new AMessage('abor', id()))->post(); 935 break; 936 #else 937 ALOGI("haven't seen an AU in a looong time."); 938 #endif 939 } 940 941 mNumAccessUnitsReceived = 0; 942 msg->post(kAccessUnitTimeoutUs); 943 break; 944 } 945 946 case 'accu': 947 { 948 int32_t timeUpdate; 949 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 950 size_t trackIndex; 951 CHECK(msg->findSize("track-index", &trackIndex)); 952 953 uint32_t rtpTime; 954 uint64_t ntpTime; 955 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 956 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 957 958 onTimeUpdate(trackIndex, rtpTime, ntpTime); 959 break; 960 } 961 962 int32_t first; 963 if (msg->findInt32("first-rtcp", &first)) { 964 mReceivedFirstRTCPPacket = true; 965 break; 966 } 967 968 if (msg->findInt32("first-rtp", &first)) { 969 mReceivedFirstRTPPacket = true; 970 break; 971 } 972 973 ++mNumAccessUnitsReceived; 974 postAccessUnitTimeoutCheck(); 975 976 size_t trackIndex; 977 CHECK(msg->findSize("track-index", &trackIndex)); 978 979 if (trackIndex >= mTracks.size()) { 980 ALOGV("late packets ignored."); 981 break; 982 } 983 984 TrackInfo *track = &mTracks.editItemAt(trackIndex); 985 986 int32_t eos; 987 if (msg->findInt32("eos", &eos)) { 988 ALOGI("received BYE on track index %d", trackIndex); 989 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { 990 ALOGI("No time established => fake existing data"); 991 992 track->mEOSReceived = true; 993 mTryFakeRTCP = true; 994 mReceivedFirstRTCPPacket = true; 995 fakeTimestamps(); 996 } else { 997 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 998 } 999 return; 1000 } 1001 1002 sp<ABuffer> accessUnit; 1003 CHECK(msg->findBuffer("access-unit", &accessUnit)); 1004 1005 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1006 1007 if (mSeekPending) { 1008 ALOGV("we're seeking, dropping stale packet."); 1009 break; 1010 } 1011 1012 if (seqNum < track->mFirstSeqNumInSegment) { 1013 ALOGV("dropping stale access-unit (%d < %d)", 1014 seqNum, track->mFirstSeqNumInSegment); 1015 break; 1016 } 1017 1018 if (track->mNewSegment) { 1019 track->mNewSegment = false; 1020 } 1021 1022 onAccessUnitComplete(trackIndex, accessUnit); 1023 break; 1024 } 1025 1026 case 'paus': 1027 { 1028 int32_t generation; 1029 CHECK(msg->findInt32("pausecheck", &generation)); 1030 if (generation != mPauseGeneration) { 1031 ALOGV("Ignoring outdated pause message."); 1032 break; 1033 } 1034 1035 if (!mSeekable) { 1036 ALOGW("This is a live stream, ignoring pause request."); 1037 break; 1038 } 1039 mCheckPending = true; 1040 ++mCheckGeneration; 1041 mPausing = true; 1042 1043 AString request = "PAUSE "; 1044 request.append(mControlURL); 1045 request.append(" RTSP/1.0\r\n"); 1046 1047 request.append("Session: "); 1048 request.append(mSessionID); 1049 request.append("\r\n"); 1050 1051 request.append("\r\n"); 1052 1053 sp<AMessage> reply = new AMessage('pau2', id()); 1054 mConn->sendRequest(request.c_str(), reply); 1055 break; 1056 } 1057 1058 case 'pau2': 1059 { 1060 int32_t result; 1061 CHECK(msg->findInt32("result", &result)); 1062 mCheckTimeoutGeneration++; 1063 1064 ALOGI("PAUSE completed with result %d (%s)", 1065 result, strerror(-result)); 1066 break; 1067 } 1068 1069 case 'resu': 1070 { 1071 if (mPausing && mSeekPending) { 1072 // If seeking, Play will be sent from see1 instead 1073 break; 1074 } 1075 1076 if (!mPausing) { 1077 // Dont send PLAY if we have not paused 1078 break; 1079 } 1080 AString request = "PLAY "; 1081 request.append(mControlURL); 1082 request.append(" RTSP/1.0\r\n"); 1083 1084 request.append("Session: "); 1085 request.append(mSessionID); 1086 request.append("\r\n"); 1087 1088 request.append("\r\n"); 1089 1090 sp<AMessage> reply = new AMessage('res2', id()); 1091 mConn->sendRequest(request.c_str(), reply); 1092 break; 1093 } 1094 1095 case 'res2': 1096 { 1097 int32_t result; 1098 CHECK(msg->findInt32("result", &result)); 1099 1100 ALOGI("PLAY completed with result %d (%s)", 1101 result, strerror(-result)); 1102 1103 mCheckPending = false; 1104 postAccessUnitTimeoutCheck(); 1105 1106 if (result == OK) { 1107 sp<RefBase> obj; 1108 CHECK(msg->findObject("response", &obj)); 1109 sp<ARTSPResponse> response = 1110 static_cast<ARTSPResponse *>(obj.get()); 1111 1112 if (response->mStatusCode != 200) { 1113 result = UNKNOWN_ERROR; 1114 } else { 1115 parsePlayResponse(response); 1116 1117 // Post new timeout in order to make sure to use 1118 // fake timestamps if no new Sender Reports arrive 1119 sp<AMessage> timeout = new AMessage('tiou', id()); 1120 mCheckTimeoutGeneration++; 1121 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1122 timeout->post(kStartupTimeoutUs); 1123 } 1124 } 1125 1126 if (result != OK) { 1127 ALOGE("resume failed, aborting."); 1128 (new AMessage('abor', id()))->post(); 1129 } 1130 1131 mPausing = false; 1132 break; 1133 } 1134 1135 case 'seek': 1136 { 1137 if (!mSeekable) { 1138 ALOGW("This is a live stream, ignoring seek request."); 1139 1140 sp<AMessage> msg = mNotify->dup(); 1141 msg->setInt32("what", kWhatSeekDone); 1142 msg->post(); 1143 break; 1144 } 1145 1146 int64_t timeUs; 1147 CHECK(msg->findInt64("time", &timeUs)); 1148 1149 mSeekPending = true; 1150 1151 // Disable the access unit timeout until we resumed 1152 // playback again. 1153 mCheckPending = true; 1154 ++mCheckGeneration; 1155 1156 sp<AMessage> reply = new AMessage('see1', id()); 1157 reply->setInt64("time", timeUs); 1158 1159 if (mPausing) { 1160 // PAUSE already sent 1161 ALOGI("Pause already sent"); 1162 reply->post(); 1163 break; 1164 } 1165 AString request = "PAUSE "; 1166 request.append(mControlURL); 1167 request.append(" RTSP/1.0\r\n"); 1168 1169 request.append("Session: "); 1170 request.append(mSessionID); 1171 request.append("\r\n"); 1172 1173 request.append("\r\n"); 1174 1175 mConn->sendRequest(request.c_str(), reply); 1176 break; 1177 } 1178 1179 case 'see1': 1180 { 1181 // Session is paused now. 1182 for (size_t i = 0; i < mTracks.size(); ++i) { 1183 TrackInfo *info = &mTracks.editItemAt(i); 1184 1185 postQueueSeekDiscontinuity(i); 1186 info->mEOSReceived = false; 1187 1188 info->mRTPAnchor = 0; 1189 info->mNTPAnchorUs = -1; 1190 } 1191 1192 mAllTracksHaveTime = false; 1193 mNTPAnchorUs = -1; 1194 1195 // Start new timeoutgeneration to avoid getting timeout 1196 // before PLAY response arrive 1197 sp<AMessage> timeout = new AMessage('tiou', id()); 1198 mCheckTimeoutGeneration++; 1199 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1200 timeout->post(kStartupTimeoutUs); 1201 1202 int64_t timeUs; 1203 CHECK(msg->findInt64("time", &timeUs)); 1204 1205 AString request = "PLAY "; 1206 request.append(mControlURL); 1207 request.append(" RTSP/1.0\r\n"); 1208 1209 request.append("Session: "); 1210 request.append(mSessionID); 1211 request.append("\r\n"); 1212 1213 request.append( 1214 StringPrintf( 1215 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1216 1217 request.append("\r\n"); 1218 1219 sp<AMessage> reply = new AMessage('see2', id()); 1220 mConn->sendRequest(request.c_str(), reply); 1221 break; 1222 } 1223 1224 case 'see2': 1225 { 1226 if (mTracks.size() == 0) { 1227 // We have already hit abor, break 1228 break; 1229 } 1230 1231 int32_t result; 1232 CHECK(msg->findInt32("result", &result)); 1233 1234 ALOGI("PLAY completed with result %d (%s)", 1235 result, strerror(-result)); 1236 1237 mCheckPending = false; 1238 postAccessUnitTimeoutCheck(); 1239 1240 if (result == OK) { 1241 sp<RefBase> obj; 1242 CHECK(msg->findObject("response", &obj)); 1243 sp<ARTSPResponse> response = 1244 static_cast<ARTSPResponse *>(obj.get()); 1245 1246 if (response->mStatusCode != 200) { 1247 result = UNKNOWN_ERROR; 1248 } else { 1249 parsePlayResponse(response); 1250 1251 // Post new timeout in order to make sure to use 1252 // fake timestamps if no new Sender Reports arrive 1253 sp<AMessage> timeout = new AMessage('tiou', id()); 1254 mCheckTimeoutGeneration++; 1255 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1256 timeout->post(kStartupTimeoutUs); 1257 1258 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1259 CHECK_GE(i, 0); 1260 1261 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1262 1263 ALOGI("seek completed."); 1264 } 1265 } 1266 1267 if (result != OK) { 1268 ALOGE("seek failed, aborting."); 1269 (new AMessage('abor', id()))->post(); 1270 } 1271 1272 mPausing = false; 1273 mSeekPending = false; 1274 1275 sp<AMessage> msg = mNotify->dup(); 1276 msg->setInt32("what", kWhatSeekDone); 1277 msg->post(); 1278 break; 1279 } 1280 1281 case 'biny': 1282 { 1283 sp<ABuffer> buffer; 1284 CHECK(msg->findBuffer("buffer", &buffer)); 1285 1286 int32_t index; 1287 CHECK(buffer->meta()->findInt32("index", &index)); 1288 1289 mRTPConn->injectPacket(index, buffer); 1290 break; 1291 } 1292 1293 case 'tiou': 1294 { 1295 int32_t timeoutGenerationCheck; 1296 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); 1297 if (timeoutGenerationCheck != mCheckTimeoutGeneration) { 1298 // This is an outdated message. Ignore. 1299 // This typically happens if a lot of seeks are 1300 // performed, since new timeout messages now are 1301 // posted at seek as well. 1302 break; 1303 } 1304 if (!mReceivedFirstRTCPPacket) { 1305 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { 1306 ALOGW("We received RTP packets but no RTCP packets, " 1307 "using fake timestamps."); 1308 1309 mTryFakeRTCP = true; 1310 1311 mReceivedFirstRTCPPacket = true; 1312 1313 fakeTimestamps(); 1314 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1315 ALOGW("Never received any data, switching transports."); 1316 1317 mTryTCPInterleaving = true; 1318 1319 sp<AMessage> msg = new AMessage('abor', id()); 1320 msg->setInt32("reconnect", true); 1321 msg->post(); 1322 } else { 1323 ALOGW("Never received any data, disconnecting."); 1324 (new AMessage('abor', id()))->post(); 1325 } 1326 } else { 1327 if (!mAllTracksHaveTime) { 1328 ALOGW("We received some RTCP packets, but time " 1329 "could not be established on all tracks, now " 1330 "using fake timestamps"); 1331 1332 fakeTimestamps(); 1333 } 1334 } 1335 break; 1336 } 1337 1338 default: 1339 TRESPASS(); 1340 break; 1341 } 1342 } 1343 1344 void postKeepAlive() { 1345 sp<AMessage> msg = new AMessage('aliv', id()); 1346 msg->setInt32("generation", mKeepAliveGeneration); 1347 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1348 } 1349 1350 void postAccessUnitTimeoutCheck() { 1351 if (mCheckPending) { 1352 return; 1353 } 1354 1355 mCheckPending = true; 1356 sp<AMessage> check = new AMessage('chek', id()); 1357 check->setInt32("generation", mCheckGeneration); 1358 check->post(kAccessUnitTimeoutUs); 1359 } 1360 1361 static void SplitString( 1362 const AString &s, const char *separator, List<AString> *items) { 1363 items->clear(); 1364 size_t start = 0; 1365 while (start < s.size()) { 1366 ssize_t offset = s.find(separator, start); 1367 1368 if (offset < 0) { 1369 items->push_back(AString(s, start, s.size() - start)); 1370 break; 1371 } 1372 1373 items->push_back(AString(s, start, offset - start)); 1374 start = offset + strlen(separator); 1375 } 1376 } 1377 1378 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1379 mPlayResponseParsed = true; 1380 if (mTracks.size() == 0) { 1381 ALOGV("parsePlayResponse: late packets ignored."); 1382 return; 1383 } 1384 1385 ssize_t i = response->mHeaders.indexOfKey("range"); 1386 if (i < 0) { 1387 // Server doesn't even tell use what range it is going to 1388 // play, therefore we won't support seeking. 1389 return; 1390 } 1391 1392 AString range = response->mHeaders.valueAt(i); 1393 ALOGV("Range: %s", range.c_str()); 1394 1395 AString val; 1396 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1397 1398 float npt1, npt2; 1399 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1400 // This is a live stream and therefore not seekable. 1401 1402 ALOGI("This is a live stream"); 1403 return; 1404 } 1405 1406 i = response->mHeaders.indexOfKey("rtp-info"); 1407 CHECK_GE(i, 0); 1408 1409 AString rtpInfo = response->mHeaders.valueAt(i); 1410 List<AString> streamInfos; 1411 SplitString(rtpInfo, ",", &streamInfos); 1412 1413 int n = 1; 1414 for (List<AString>::iterator it = streamInfos.begin(); 1415 it != streamInfos.end(); ++it) { 1416 (*it).trim(); 1417 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1418 1419 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1420 1421 size_t trackIndex = 0; 1422 while (trackIndex < mTracks.size() 1423 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1424 ++trackIndex; 1425 } 1426 CHECK_LT(trackIndex, mTracks.size()); 1427 1428 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1429 1430 char *end; 1431 unsigned long seq = strtoul(val.c_str(), &end, 10); 1432 1433 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1434 info->mFirstSeqNumInSegment = seq; 1435 info->mNewSegment = true; 1436 1437 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1438 1439 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1440 1441 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1442 1443 info->mNormalPlayTimeRTP = rtpTime; 1444 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1445 1446 if (!mFirstAccessUnit) { 1447 postNormalPlayTimeMapping( 1448 trackIndex, 1449 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1450 } 1451 1452 ++n; 1453 } 1454 } 1455 1456 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1457 CHECK_GE(index, 0u); 1458 CHECK_LT(index, mTracks.size()); 1459 1460 const TrackInfo &info = mTracks.itemAt(index); 1461 1462 *timeScale = info.mTimeScale; 1463 1464 return info.mPacketSource->getFormat(); 1465 } 1466 1467 size_t countTracks() const { 1468 return mTracks.size(); 1469 } 1470 1471 private: 1472 struct TrackInfo { 1473 AString mURL; 1474 int mRTPSocket; 1475 int mRTCPSocket; 1476 bool mUsingInterleavedTCP; 1477 uint32_t mFirstSeqNumInSegment; 1478 bool mNewSegment; 1479 1480 uint32_t mRTPAnchor; 1481 int64_t mNTPAnchorUs; 1482 int32_t mTimeScale; 1483 bool mEOSReceived; 1484 1485 uint32_t mNormalPlayTimeRTP; 1486 int64_t mNormalPlayTimeUs; 1487 1488 sp<APacketSource> mPacketSource; 1489 1490 // Stores packets temporarily while no notion of time 1491 // has been established yet. 1492 List<sp<ABuffer> > mPackets; 1493 }; 1494 1495 sp<AMessage> mNotify; 1496 bool mUIDValid; 1497 uid_t mUID; 1498 sp<ALooper> mNetLooper; 1499 sp<ARTSPConnection> mConn; 1500 sp<ARTPConnection> mRTPConn; 1501 sp<ASessionDescription> mSessionDesc; 1502 AString mOriginalSessionURL; // This one still has user:pass@ 1503 AString mSessionURL; 1504 AString mSessionHost; 1505 AString mBaseURL; 1506 AString mControlURL; 1507 AString mSessionID; 1508 bool mSetupTracksSuccessful; 1509 bool mSeekPending; 1510 bool mFirstAccessUnit; 1511 1512 bool mAllTracksHaveTime; 1513 int64_t mNTPAnchorUs; 1514 int64_t mMediaAnchorUs; 1515 int64_t mLastMediaTimeUs; 1516 1517 int64_t mNumAccessUnitsReceived; 1518 bool mCheckPending; 1519 int32_t mCheckGeneration; 1520 int32_t mCheckTimeoutGeneration; 1521 bool mTryTCPInterleaving; 1522 bool mTryFakeRTCP; 1523 bool mReceivedFirstRTCPPacket; 1524 bool mReceivedFirstRTPPacket; 1525 bool mSeekable; 1526 int64_t mKeepAliveTimeoutUs; 1527 int32_t mKeepAliveGeneration; 1528 bool mPausing; 1529 int32_t mPauseGeneration; 1530 1531 Vector<TrackInfo> mTracks; 1532 1533 bool mPlayResponseParsed; 1534 1535 void setupTrack(size_t index) { 1536 sp<APacketSource> source = 1537 new APacketSource(mSessionDesc, index); 1538 1539 if (source->initCheck() != OK) { 1540 ALOGW("Unsupported format. Ignoring track #%d.", index); 1541 1542 sp<AMessage> reply = new AMessage('setu', id()); 1543 reply->setSize("index", index); 1544 reply->setInt32("result", ERROR_UNSUPPORTED); 1545 reply->post(); 1546 return; 1547 } 1548 1549 AString url; 1550 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1551 1552 AString trackURL; 1553 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1554 1555 mTracks.push(TrackInfo()); 1556 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1557 info->mURL = trackURL; 1558 info->mPacketSource = source; 1559 info->mUsingInterleavedTCP = false; 1560 info->mFirstSeqNumInSegment = 0; 1561 info->mNewSegment = true; 1562 info->mRTPAnchor = 0; 1563 info->mNTPAnchorUs = -1; 1564 info->mNormalPlayTimeRTP = 0; 1565 info->mNormalPlayTimeUs = 0ll; 1566 1567 unsigned long PT; 1568 AString formatDesc; 1569 AString formatParams; 1570 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1571 1572 int32_t timescale; 1573 int32_t numChannels; 1574 ASessionDescription::ParseFormatDesc( 1575 formatDesc.c_str(), ×cale, &numChannels); 1576 1577 info->mTimeScale = timescale; 1578 info->mEOSReceived = false; 1579 1580 ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1581 1582 AString request = "SETUP "; 1583 request.append(trackURL); 1584 request.append(" RTSP/1.0\r\n"); 1585 1586 if (mTryTCPInterleaving) { 1587 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1588 info->mUsingInterleavedTCP = true; 1589 info->mRTPSocket = interleaveIndex; 1590 info->mRTCPSocket = interleaveIndex + 1; 1591 1592 request.append("Transport: RTP/AVP/TCP;interleaved="); 1593 request.append(interleaveIndex); 1594 request.append("-"); 1595 request.append(interleaveIndex + 1); 1596 } else { 1597 unsigned rtpPort; 1598 ARTPConnection::MakePortPair( 1599 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1600 1601 if (mUIDValid) { 1602 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1603 (uint32_t)*(uint32_t*) "RTP_"); 1604 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1605 (uint32_t)*(uint32_t*) "RTP_"); 1606 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); 1607 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); 1608 } 1609 1610 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1611 request.append(rtpPort); 1612 request.append("-"); 1613 request.append(rtpPort + 1); 1614 } 1615 1616 request.append("\r\n"); 1617 1618 if (index > 1) { 1619 request.append("Session: "); 1620 request.append(mSessionID); 1621 request.append("\r\n"); 1622 } 1623 1624 request.append("\r\n"); 1625 1626 sp<AMessage> reply = new AMessage('setu', id()); 1627 reply->setSize("index", index); 1628 reply->setSize("track-index", mTracks.size() - 1); 1629 mConn->sendRequest(request.c_str(), reply); 1630 } 1631 1632 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1633 out->clear(); 1634 1635 if (strncasecmp("rtsp://", baseURL, 7)) { 1636 // Base URL must be absolute 1637 return false; 1638 } 1639 1640 if (!strncasecmp("rtsp://", url, 7)) { 1641 // "url" is already an absolute URL, ignore base URL. 1642 out->setTo(url); 1643 return true; 1644 } 1645 1646 size_t n = strlen(baseURL); 1647 if (baseURL[n - 1] == '/') { 1648 out->setTo(baseURL); 1649 out->append(url); 1650 } else { 1651 const char *slashPos = strrchr(baseURL, '/'); 1652 1653 if (slashPos > &baseURL[6]) { 1654 out->setTo(baseURL, slashPos - baseURL); 1655 } else { 1656 out->setTo(baseURL); 1657 } 1658 1659 out->append("/"); 1660 out->append(url); 1661 } 1662 1663 return true; 1664 } 1665 1666 void fakeTimestamps() { 1667 mNTPAnchorUs = -1ll; 1668 for (size_t i = 0; i < mTracks.size(); ++i) { 1669 onTimeUpdate(i, 0, 0ll); 1670 } 1671 } 1672 1673 bool dataReceivedOnAllChannels() { 1674 TrackInfo *track; 1675 for (size_t i = 0; i < mTracks.size(); ++i) { 1676 track = &mTracks.editItemAt(i); 1677 if (track->mPackets.empty()) { 1678 return false; 1679 } 1680 } 1681 return true; 1682 } 1683 1684 void handleFirstAccessUnit() { 1685 if (mFirstAccessUnit) { 1686 sp<AMessage> msg = mNotify->dup(); 1687 msg->setInt32("what", kWhatConnected); 1688 msg->post(); 1689 1690 if (mSeekable) { 1691 for (size_t i = 0; i < mTracks.size(); ++i) { 1692 TrackInfo *info = &mTracks.editItemAt(i); 1693 1694 postNormalPlayTimeMapping( 1695 i, 1696 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1697 } 1698 } 1699 1700 mFirstAccessUnit = false; 1701 } 1702 } 1703 1704 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1705 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", 1706 trackIndex, rtpTime, ntpTime); 1707 1708 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1709 1710 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1711 1712 track->mRTPAnchor = rtpTime; 1713 track->mNTPAnchorUs = ntpTimeUs; 1714 1715 if (mNTPAnchorUs < 0) { 1716 mNTPAnchorUs = ntpTimeUs; 1717 mMediaAnchorUs = mLastMediaTimeUs; 1718 } 1719 1720 if (!mAllTracksHaveTime) { 1721 bool allTracksHaveTime = true; 1722 for (size_t i = 0; i < mTracks.size(); ++i) { 1723 TrackInfo *track = &mTracks.editItemAt(i); 1724 if (track->mNTPAnchorUs < 0) { 1725 allTracksHaveTime = false; 1726 break; 1727 } 1728 } 1729 if (allTracksHaveTime) { 1730 mAllTracksHaveTime = true; 1731 ALOGI("Time now established for all tracks."); 1732 } 1733 } 1734 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1735 handleFirstAccessUnit(); 1736 1737 // Time is now established, lets start timestamping immediately 1738 for (size_t i = 0; i < mTracks.size(); ++i) { 1739 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1740 while (!trackInfo->mPackets.empty()) { 1741 sp<ABuffer> accessUnit = *trackInfo->mPackets.begin(); 1742 trackInfo->mPackets.erase(trackInfo->mPackets.begin()); 1743 1744 if (addMediaTimestamp(i, trackInfo, accessUnit)) { 1745 postQueueAccessUnit(i, accessUnit); 1746 } 1747 } 1748 } 1749 for (size_t i = 0; i < mTracks.size(); ++i) { 1750 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1751 if (trackInfo->mEOSReceived) { 1752 postQueueEOS(i, ERROR_END_OF_STREAM); 1753 trackInfo->mEOSReceived = false; 1754 } 1755 } 1756 } 1757 } 1758 1759 void onAccessUnitComplete( 1760 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1761 ALOGV("onAccessUnitComplete track %d", trackIndex); 1762 1763 if(!mPlayResponseParsed){ 1764 ALOGI("play response is not parsed, storing accessunit"); 1765 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1766 track->mPackets.push_back(accessUnit); 1767 return; 1768 } 1769 1770 handleFirstAccessUnit(); 1771 1772 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1773 1774 if (!mAllTracksHaveTime) { 1775 ALOGV("storing accessUnit, no time established yet"); 1776 track->mPackets.push_back(accessUnit); 1777 return; 1778 } 1779 1780 while (!track->mPackets.empty()) { 1781 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1782 track->mPackets.erase(track->mPackets.begin()); 1783 1784 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1785 postQueueAccessUnit(trackIndex, accessUnit); 1786 } 1787 } 1788 1789 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1790 postQueueAccessUnit(trackIndex, accessUnit); 1791 } 1792 1793 if (track->mEOSReceived) { 1794 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1795 track->mEOSReceived = false; 1796 } 1797 } 1798 1799 bool addMediaTimestamp( 1800 int32_t trackIndex, const TrackInfo *track, 1801 const sp<ABuffer> &accessUnit) { 1802 uint32_t rtpTime; 1803 CHECK(accessUnit->meta()->findInt32( 1804 "rtp-time", (int32_t *)&rtpTime)); 1805 1806 int64_t relRtpTimeUs = 1807 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1808 / track->mTimeScale; 1809 1810 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1811 1812 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1813 1814 if (mediaTimeUs > mLastMediaTimeUs) { 1815 mLastMediaTimeUs = mediaTimeUs; 1816 } 1817 1818 if (mediaTimeUs < 0) { 1819 ALOGV("dropping early accessUnit."); 1820 return false; 1821 } 1822 1823 ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", 1824 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); 1825 1826 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1827 1828 return true; 1829 } 1830 1831 void postQueueAccessUnit( 1832 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1833 sp<AMessage> msg = mNotify->dup(); 1834 msg->setInt32("what", kWhatAccessUnit); 1835 msg->setSize("trackIndex", trackIndex); 1836 msg->setBuffer("accessUnit", accessUnit); 1837 msg->post(); 1838 } 1839 1840 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1841 sp<AMessage> msg = mNotify->dup(); 1842 msg->setInt32("what", kWhatEOS); 1843 msg->setSize("trackIndex", trackIndex); 1844 msg->setInt32("finalResult", finalResult); 1845 msg->post(); 1846 } 1847 1848 void postQueueSeekDiscontinuity(size_t trackIndex) { 1849 sp<AMessage> msg = mNotify->dup(); 1850 msg->setInt32("what", kWhatSeekDiscontinuity); 1851 msg->setSize("trackIndex", trackIndex); 1852 msg->post(); 1853 } 1854 1855 void postNormalPlayTimeMapping( 1856 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1857 sp<AMessage> msg = mNotify->dup(); 1858 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1859 msg->setSize("trackIndex", trackIndex); 1860 msg->setInt32("rtpTime", rtpTime); 1861 msg->setInt64("nptUs", nptUs); 1862 msg->post(); 1863 } 1864 1865 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1866 }; 1867 1868 } // namespace android 1869 1870 #endif // MY_HANDLER_H_ 1871