1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MY_HANDLER_H_ 18 19 #define MY_HANDLER_H_ 20 21 //#define LOG_NDEBUG 0 22 23 #ifndef LOG_TAG 24 #define LOG_TAG "MyHandler" 25 #endif 26 27 #include <utils/Log.h> 28 #include <cutils/properties.h> // for property_get 29 30 #include "APacketSource.h" 31 #include "ARTPConnection.h" 32 #include "ARTSPConnection.h" 33 #include "ASessionDescription.h" 34 35 #include <ctype.h> 36 #include <cutils/properties.h> 37 38 #include <media/stagefright/foundation/ABuffer.h> 39 #include <media/stagefright/foundation/ADebug.h> 40 #include <media/stagefright/foundation/ALooper.h> 41 #include <media/stagefright/foundation/AMessage.h> 42 #include <media/stagefright/MediaErrors.h> 43 #include <media/stagefright/Utils.h> 44 45 #include <arpa/inet.h> 46 #include <sys/socket.h> 47 #include <netdb.h> 48 49 #include "HTTPBase.h" 50 51 #if LOG_NDEBUG 52 #define UNUSED_UNLESS_VERBOSE(x) (void)(x) 53 #else 54 #define UNUSED_UNLESS_VERBOSE(x) 55 #endif 56 57 // If no access units are received within 5 secs, assume that the rtp 58 // stream has ended and signal end of stream. 59 static int64_t kAccessUnitTimeoutUs = 10000000ll; 60 61 // If no access units arrive for the first 10 secs after starting the 62 // stream, assume none ever will and signal EOS or switch transports. 63 static int64_t kStartupTimeoutUs = 10000000ll; 64 65 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 66 67 static int64_t kPauseDelayUs = 3000000ll; 68 69 // The allowed maximum number of stale access units at the beginning of 70 // a new sequence. 71 static int32_t kMaxAllowedStaleAccessUnits = 20; 72 73 namespace android { 74 75 static bool GetAttribute(const char *s, const char *key, AString *value) { 76 value->clear(); 77 78 size_t keyLen = strlen(key); 79 80 for (;;) { 81 while (isspace(*s)) { 82 ++s; 83 } 84 85 const char *colonPos = strchr(s, ';'); 86 87 size_t len = 88 (colonPos == NULL) ? strlen(s) : colonPos - s; 89 90 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 91 value->setTo(&s[keyLen + 1], len - keyLen - 1); 92 return true; 93 } 94 95 if (colonPos == NULL) { 96 return false; 97 } 98 99 s = colonPos + 1; 100 } 101 } 102 103 struct MyHandler : public AHandler { 104 enum { 105 kWhatConnected = 'conn', 106 kWhatDisconnected = 'disc', 107 kWhatSeekPaused = 'spau', 108 kWhatSeekDone = 'sdon', 109 110 kWhatAccessUnit = 'accU', 111 kWhatEOS = 'eos!', 112 kWhatSeekDiscontinuity = 'seeD', 113 kWhatNormalPlayTimeMapping = 'nptM', 114 }; 115 116 MyHandler( 117 const char *url, 118 const sp<AMessage> ¬ify, 119 bool uidValid = false, uid_t uid = 0) 120 : mNotify(notify), 121 mUIDValid(uidValid), 122 mUID(uid), 123 mNetLooper(new ALooper), 124 mConn(new ARTSPConnection(mUIDValid, mUID)), 125 mRTPConn(new ARTPConnection), 126 mOriginalSessionURL(url), 127 mSessionURL(url), 128 mSetupTracksSuccessful(false), 129 mSeekPending(false), 130 mFirstAccessUnit(true), 131 mAllTracksHaveTime(false), 132 mNTPAnchorUs(-1), 133 mMediaAnchorUs(-1), 134 mLastMediaTimeUs(0), 135 mNumAccessUnitsReceived(0), 136 mCheckPending(false), 137 mCheckGeneration(0), 138 mCheckTimeoutGeneration(0), 139 mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)), 140 mTryFakeRTCP(false), 141 mReceivedFirstRTCPPacket(false), 142 mReceivedFirstRTPPacket(false), 143 mSeekable(true), 144 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 145 mKeepAliveGeneration(0), 146 mPausing(false), 147 mPauseGeneration(0), 148 mPlayResponseParsed(false) { 149 mNetLooper->setName("rtsp net"); 150 mNetLooper->start(false /* runOnCallingThread */, 151 false /* canCallJava */, 152 PRIORITY_HIGHEST); 153 154 // Strip any authentication info from the session url, we don't 155 // want to transmit user/pass in cleartext. 156 AString host, path, user, pass; 157 unsigned port; 158 CHECK(ARTSPConnection::ParseURL( 159 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 160 161 if (user.size() > 0) { 162 mSessionURL.clear(); 163 mSessionURL.append("rtsp://"); 164 mSessionURL.append(host); 165 mSessionURL.append(":"); 166 mSessionURL.append(AStringPrintf("%u", port)); 167 mSessionURL.append(path); 168 169 ALOGV("rewritten session url: '%s'", mSessionURL.c_str()); 170 } 171 172 mSessionHost = host; 173 } 174 175 void connect() { 176 looper()->registerHandler(mConn); 177 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 178 179 sp<AMessage> notify = new AMessage('biny', this); 180 mConn->observeBinaryData(notify); 181 182 sp<AMessage> reply = new AMessage('conn', this); 183 mConn->connect(mOriginalSessionURL.c_str(), reply); 184 } 185 186 void loadSDP(const sp<ASessionDescription>& desc) { 187 looper()->registerHandler(mConn); 188 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 189 190 sp<AMessage> notify = new AMessage('biny', this); 191 mConn->observeBinaryData(notify); 192 193 sp<AMessage> reply = new AMessage('sdpl', this); 194 reply->setObject("description", desc); 195 mConn->connect(mOriginalSessionURL.c_str(), reply); 196 } 197 198 AString getControlURL() { 199 AString sessionLevelControlURL; 200 if (mSessionDesc->findAttribute( 201 0, 202 "a=control", 203 &sessionLevelControlURL)) { 204 if (sessionLevelControlURL.compare("*") == 0) { 205 return mBaseURL; 206 } else { 207 AString controlURL; 208 CHECK(MakeURL( 209 mBaseURL.c_str(), 210 sessionLevelControlURL.c_str(), 211 &controlURL)); 212 return controlURL; 213 } 214 } else { 215 return mSessionURL; 216 } 217 } 218 219 void disconnect() { 220 (new AMessage('abor', this))->post(); 221 } 222 223 void seek(int64_t timeUs) { 224 sp<AMessage> msg = new AMessage('seek', this); 225 msg->setInt64("time", timeUs); 226 mPauseGeneration++; 227 msg->post(); 228 } 229 230 void continueSeekAfterPause(int64_t timeUs) { 231 sp<AMessage> msg = new AMessage('see1', this); 232 msg->setInt64("time", timeUs); 233 msg->post(); 234 } 235 236 bool isSeekable() const { 237 return mSeekable; 238 } 239 240 void pause() { 241 sp<AMessage> msg = new AMessage('paus', this); 242 mPauseGeneration++; 243 msg->setInt32("pausecheck", mPauseGeneration); 244 msg->post(); 245 } 246 247 void resume() { 248 sp<AMessage> msg = new AMessage('resu', this); 249 mPauseGeneration++; 250 msg->post(); 251 } 252 253 static void addRR(const sp<ABuffer> &buf) { 254 uint8_t *ptr = buf->data() + buf->size(); 255 ptr[0] = 0x80 | 0; 256 ptr[1] = 201; // RR 257 ptr[2] = 0; 258 ptr[3] = 1; 259 ptr[4] = 0xde; // SSRC 260 ptr[5] = 0xad; 261 ptr[6] = 0xbe; 262 ptr[7] = 0xef; 263 264 buf->setRange(0, buf->size() + 8); 265 } 266 267 static void addSDES(int s, const sp<ABuffer> &buffer) { 268 struct sockaddr_in addr; 269 socklen_t addrSize = sizeof(addr); 270 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) { 271 inet_aton("0.0.0.0", &(addr.sin_addr)); 272 } 273 274 uint8_t *data = buffer->data() + buffer->size(); 275 data[0] = 0x80 | 1; 276 data[1] = 202; // SDES 277 data[4] = 0xde; // SSRC 278 data[5] = 0xad; 279 data[6] = 0xbe; 280 data[7] = 0xef; 281 282 size_t offset = 8; 283 284 data[offset++] = 1; // CNAME 285 286 AString cname = "stagefright@"; 287 cname.append(inet_ntoa(addr.sin_addr)); 288 data[offset++] = cname.size(); 289 290 memcpy(&data[offset], cname.c_str(), cname.size()); 291 offset += cname.size(); 292 293 data[offset++] = 6; // TOOL 294 295 AString tool = MakeUserAgent(); 296 297 data[offset++] = tool.size(); 298 299 memcpy(&data[offset], tool.c_str(), tool.size()); 300 offset += tool.size(); 301 302 data[offset++] = 0; 303 304 if ((offset % 4) > 0) { 305 size_t count = 4 - (offset % 4); 306 switch (count) { 307 case 3: 308 data[offset++] = 0; 309 case 2: 310 data[offset++] = 0; 311 case 1: 312 data[offset++] = 0; 313 } 314 } 315 316 size_t numWords = (offset / 4) - 1; 317 data[2] = numWords >> 8; 318 data[3] = numWords & 0xff; 319 320 buffer->setRange(buffer->offset(), buffer->size() + offset); 321 } 322 323 // In case we're behind NAT, fire off two UDP packets to the remote 324 // rtp/rtcp ports to poke a hole into the firewall for future incoming 325 // packets. We're going to send an RR/SDES RTCP packet to both of them. 326 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 327 struct sockaddr_in addr; 328 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 329 addr.sin_family = AF_INET; 330 331 AString source; 332 AString server_port; 333 if (!GetAttribute(transport.c_str(), 334 "source", 335 &source)) { 336 ALOGW("Missing 'source' field in Transport response. Using " 337 "RTSP endpoint address."); 338 339 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 340 if (ent == NULL) { 341 ALOGE("Failed to look up address of session host '%s'", 342 mSessionHost.c_str()); 343 344 return false; 345 } 346 347 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 348 } else { 349 addr.sin_addr.s_addr = inet_addr(source.c_str()); 350 } 351 352 if (!GetAttribute(transport.c_str(), 353 "server_port", 354 &server_port)) { 355 ALOGI("Missing 'server_port' field in Transport response."); 356 return false; 357 } 358 359 int rtpPort, rtcpPort; 360 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 361 || rtpPort <= 0 || rtpPort > 65535 362 || rtcpPort <=0 || rtcpPort > 65535 363 || rtcpPort != rtpPort + 1) { 364 ALOGE("Server picked invalid RTP/RTCP port pair %s," 365 " RTP port must be even, RTCP port must be one higher.", 366 server_port.c_str()); 367 368 return false; 369 } 370 371 if (rtpPort & 1) { 372 ALOGW("Server picked an odd RTP port, it should've picked an " 373 "even one, we'll let it pass for now, but this may break " 374 "in the future."); 375 } 376 377 if (addr.sin_addr.s_addr == INADDR_NONE) { 378 return true; 379 } 380 381 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 382 // No firewalls to traverse on the loopback interface. 383 return true; 384 } 385 386 // Make up an RR/SDES RTCP packet. 387 sp<ABuffer> buf = new ABuffer(65536); 388 buf->setRange(0, 0); 389 addRR(buf); 390 addSDES(rtpSocket, buf); 391 392 addr.sin_port = htons(rtpPort); 393 394 ssize_t n = sendto( 395 rtpSocket, buf->data(), buf->size(), 0, 396 (const sockaddr *)&addr, sizeof(addr)); 397 398 if (n < (ssize_t)buf->size()) { 399 ALOGE("failed to poke a hole for RTP packets"); 400 return false; 401 } 402 403 addr.sin_port = htons(rtcpPort); 404 405 n = sendto( 406 rtcpSocket, buf->data(), buf->size(), 0, 407 (const sockaddr *)&addr, sizeof(addr)); 408 409 if (n < (ssize_t)buf->size()) { 410 ALOGE("failed to poke a hole for RTCP packets"); 411 return false; 412 } 413 414 ALOGV("successfully poked holes."); 415 416 return true; 417 } 418 419 static bool isLiveStream(const sp<ASessionDescription> &desc) { 420 AString attrLiveStream; 421 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 422 ssize_t semicolonPos = attrLiveStream.find(";", 2); 423 424 const char* liveStreamValue; 425 if (semicolonPos < 0) { 426 liveStreamValue = attrLiveStream.c_str(); 427 } else { 428 AString valString; 429 valString.setTo(attrLiveStream, 430 semicolonPos + 1, 431 attrLiveStream.size() - semicolonPos - 1); 432 liveStreamValue = valString.c_str(); 433 } 434 435 uint32_t value = strtoul(liveStreamValue, NULL, 10); 436 if (value == 1) { 437 ALOGV("found live stream"); 438 return true; 439 } 440 } else { 441 // It is a live stream if no duration is returned 442 int64_t durationUs; 443 if (!desc->getDurationUs(&durationUs)) { 444 ALOGV("No duration found, assume live stream"); 445 return true; 446 } 447 } 448 449 return false; 450 } 451 452 virtual void onMessageReceived(const sp<AMessage> &msg) { 453 switch (msg->what()) { 454 case 'conn': 455 { 456 int32_t result; 457 CHECK(msg->findInt32("result", &result)); 458 459 ALOGI("connection request completed with result %d (%s)", 460 result, strerror(-result)); 461 462 if (result == OK) { 463 AString request; 464 request = "DESCRIBE "; 465 request.append(mSessionURL); 466 request.append(" RTSP/1.0\r\n"); 467 request.append("Accept: application/sdp\r\n"); 468 request.append("\r\n"); 469 470 sp<AMessage> reply = new AMessage('desc', this); 471 mConn->sendRequest(request.c_str(), reply); 472 } else { 473 (new AMessage('disc', this))->post(); 474 } 475 break; 476 } 477 478 case 'disc': 479 { 480 ++mKeepAliveGeneration; 481 482 int32_t reconnect; 483 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 484 sp<AMessage> reply = new AMessage('conn', this); 485 mConn->connect(mOriginalSessionURL.c_str(), reply); 486 } else { 487 (new AMessage('quit', this))->post(); 488 } 489 break; 490 } 491 492 case 'desc': 493 { 494 int32_t result; 495 CHECK(msg->findInt32("result", &result)); 496 497 ALOGI("DESCRIBE completed with result %d (%s)", 498 result, strerror(-result)); 499 500 if (result == OK) { 501 sp<RefBase> obj; 502 CHECK(msg->findObject("response", &obj)); 503 sp<ARTSPResponse> response = 504 static_cast<ARTSPResponse *>(obj.get()); 505 506 if (response->mStatusCode == 301 || response->mStatusCode == 302) { 507 ssize_t i = response->mHeaders.indexOfKey("location"); 508 CHECK_GE(i, 0); 509 510 mOriginalSessionURL = response->mHeaders.valueAt(i); 511 mSessionURL = mOriginalSessionURL; 512 513 // Strip any authentication info from the session url, we don't 514 // want to transmit user/pass in cleartext. 515 AString host, path, user, pass; 516 unsigned port; 517 if (ARTSPConnection::ParseURL( 518 mSessionURL.c_str(), &host, &port, &path, &user, &pass) 519 && user.size() > 0) { 520 mSessionURL.clear(); 521 mSessionURL.append("rtsp://"); 522 mSessionURL.append(host); 523 mSessionURL.append(":"); 524 mSessionURL.append(AStringPrintf("%u", port)); 525 mSessionURL.append(path); 526 527 ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); 528 } 529 530 sp<AMessage> reply = new AMessage('conn', this); 531 mConn->connect(mOriginalSessionURL.c_str(), reply); 532 break; 533 } 534 535 if (response->mStatusCode != 200) { 536 result = UNKNOWN_ERROR; 537 } else if (response->mContent == NULL) { 538 result = ERROR_MALFORMED; 539 ALOGE("The response has no content."); 540 } else { 541 mSessionDesc = new ASessionDescription; 542 543 mSessionDesc->setTo( 544 response->mContent->data(), 545 response->mContent->size()); 546 547 if (!mSessionDesc->isValid()) { 548 ALOGE("Failed to parse session description."); 549 result = ERROR_MALFORMED; 550 } else { 551 ssize_t i = response->mHeaders.indexOfKey("content-base"); 552 if (i >= 0) { 553 mBaseURL = response->mHeaders.valueAt(i); 554 } else { 555 i = response->mHeaders.indexOfKey("content-location"); 556 if (i >= 0) { 557 mBaseURL = response->mHeaders.valueAt(i); 558 } else { 559 mBaseURL = mSessionURL; 560 } 561 } 562 563 mSeekable = !isLiveStream(mSessionDesc); 564 565 if (!mBaseURL.startsWith("rtsp://")) { 566 // Some misbehaving servers specify a relative 567 // URL in one of the locations above, combine 568 // it with the absolute session URL to get 569 // something usable... 570 571 ALOGW("Server specified a non-absolute base URL" 572 ", combining it with the session URL to " 573 "get something usable..."); 574 575 AString tmp; 576 CHECK(MakeURL( 577 mSessionURL.c_str(), 578 mBaseURL.c_str(), 579 &tmp)); 580 581 mBaseURL = tmp; 582 } 583 584 mControlURL = getControlURL(); 585 586 if (mSessionDesc->countTracks() < 2) { 587 // There's no actual tracks in this session. 588 // The first "track" is merely session meta 589 // data. 590 591 ALOGW("Session doesn't contain any playable " 592 "tracks. Aborting."); 593 result = ERROR_UNSUPPORTED; 594 } else { 595 setupTrack(1); 596 } 597 } 598 } 599 } 600 601 if (result != OK) { 602 sp<AMessage> reply = new AMessage('disc', this); 603 mConn->disconnect(reply); 604 } 605 break; 606 } 607 608 case 'sdpl': 609 { 610 int32_t result; 611 CHECK(msg->findInt32("result", &result)); 612 613 ALOGI("SDP connection request completed with result %d (%s)", 614 result, strerror(-result)); 615 616 if (result == OK) { 617 sp<RefBase> obj; 618 CHECK(msg->findObject("description", &obj)); 619 mSessionDesc = 620 static_cast<ASessionDescription *>(obj.get()); 621 622 if (!mSessionDesc->isValid()) { 623 ALOGE("Failed to parse session description."); 624 result = ERROR_MALFORMED; 625 } else { 626 mBaseURL = mSessionURL; 627 628 mSeekable = !isLiveStream(mSessionDesc); 629 630 mControlURL = getControlURL(); 631 632 if (mSessionDesc->countTracks() < 2) { 633 // There's no actual tracks in this session. 634 // The first "track" is merely session meta 635 // data. 636 637 ALOGW("Session doesn't contain any playable " 638 "tracks. Aborting."); 639 result = ERROR_UNSUPPORTED; 640 } else { 641 setupTrack(1); 642 } 643 } 644 } 645 646 if (result != OK) { 647 sp<AMessage> reply = new AMessage('disc', this); 648 mConn->disconnect(reply); 649 } 650 break; 651 } 652 653 case 'setu': 654 { 655 size_t index; 656 CHECK(msg->findSize("index", &index)); 657 658 TrackInfo *track = NULL; 659 size_t trackIndex; 660 if (msg->findSize("track-index", &trackIndex)) { 661 track = &mTracks.editItemAt(trackIndex); 662 } 663 664 int32_t result; 665 CHECK(msg->findInt32("result", &result)); 666 667 ALOGI("SETUP(%zu) completed with result %d (%s)", 668 index, result, strerror(-result)); 669 670 if (result == OK) { 671 CHECK(track != NULL); 672 673 sp<RefBase> obj; 674 CHECK(msg->findObject("response", &obj)); 675 sp<ARTSPResponse> response = 676 static_cast<ARTSPResponse *>(obj.get()); 677 678 if (response->mStatusCode != 200) { 679 result = UNKNOWN_ERROR; 680 } else { 681 ssize_t i = response->mHeaders.indexOfKey("session"); 682 CHECK_GE(i, 0); 683 684 mSessionID = response->mHeaders.valueAt(i); 685 686 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 687 AString timeoutStr; 688 if (GetAttribute( 689 mSessionID.c_str(), "timeout", &timeoutStr)) { 690 char *end; 691 unsigned long timeoutSecs = 692 strtoul(timeoutStr.c_str(), &end, 10); 693 694 if (end == timeoutStr.c_str() || *end != '\0') { 695 ALOGW("server specified malformed timeout '%s'", 696 timeoutStr.c_str()); 697 698 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 699 } else if (timeoutSecs < 15) { 700 ALOGW("server specified too short a timeout " 701 "(%lu secs), using default.", 702 timeoutSecs); 703 704 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 705 } else { 706 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 707 708 ALOGI("server specified timeout of %lu secs.", 709 timeoutSecs); 710 } 711 } 712 713 i = mSessionID.find(";"); 714 if (i >= 0) { 715 // Remove options, i.e. ";timeout=90" 716 mSessionID.erase(i, mSessionID.size() - i); 717 } 718 719 sp<AMessage> notify = new AMessage('accu', this); 720 notify->setSize("track-index", trackIndex); 721 722 i = response->mHeaders.indexOfKey("transport"); 723 CHECK_GE(i, 0); 724 725 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) { 726 if (!track->mUsingInterleavedTCP) { 727 AString transport = response->mHeaders.valueAt(i); 728 729 // We are going to continue even if we were 730 // unable to poke a hole into the firewall... 731 pokeAHole( 732 track->mRTPSocket, 733 track->mRTCPSocket, 734 transport); 735 } 736 737 mRTPConn->addStream( 738 track->mRTPSocket, track->mRTCPSocket, 739 mSessionDesc, index, 740 notify, track->mUsingInterleavedTCP); 741 742 mSetupTracksSuccessful = true; 743 } else { 744 result = BAD_VALUE; 745 } 746 } 747 } 748 749 if (result != OK) { 750 if (track) { 751 if (!track->mUsingInterleavedTCP) { 752 // Clear the tag 753 if (mUIDValid) { 754 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 755 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket); 756 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 757 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket); 758 } 759 760 close(track->mRTPSocket); 761 close(track->mRTCPSocket); 762 } 763 764 mTracks.removeItemsAt(trackIndex); 765 } 766 } 767 768 ++index; 769 if (result == OK && index < mSessionDesc->countTracks()) { 770 setupTrack(index); 771 } else if (mSetupTracksSuccessful) { 772 ++mKeepAliveGeneration; 773 postKeepAlive(); 774 775 AString request = "PLAY "; 776 request.append(mControlURL); 777 request.append(" RTSP/1.0\r\n"); 778 779 request.append("Session: "); 780 request.append(mSessionID); 781 request.append("\r\n"); 782 783 request.append("\r\n"); 784 785 sp<AMessage> reply = new AMessage('play', this); 786 mConn->sendRequest(request.c_str(), reply); 787 } else { 788 sp<AMessage> reply = new AMessage('disc', this); 789 mConn->disconnect(reply); 790 } 791 break; 792 } 793 794 case 'play': 795 { 796 int32_t result; 797 CHECK(msg->findInt32("result", &result)); 798 799 ALOGI("PLAY completed with result %d (%s)", 800 result, strerror(-result)); 801 802 if (result == OK) { 803 sp<RefBase> obj; 804 CHECK(msg->findObject("response", &obj)); 805 sp<ARTSPResponse> response = 806 static_cast<ARTSPResponse *>(obj.get()); 807 808 if (response->mStatusCode != 200) { 809 result = UNKNOWN_ERROR; 810 } else { 811 parsePlayResponse(response); 812 postTimeout(); 813 } 814 } 815 816 if (result != OK) { 817 sp<AMessage> reply = new AMessage('disc', this); 818 mConn->disconnect(reply); 819 } 820 821 break; 822 } 823 824 case 'aliv': 825 { 826 int32_t generation; 827 CHECK(msg->findInt32("generation", &generation)); 828 829 if (generation != mKeepAliveGeneration) { 830 // obsolete event. 831 break; 832 } 833 834 AString request; 835 request.append("OPTIONS "); 836 request.append(mSessionURL); 837 request.append(" RTSP/1.0\r\n"); 838 request.append("Session: "); 839 request.append(mSessionID); 840 request.append("\r\n"); 841 request.append("\r\n"); 842 843 sp<AMessage> reply = new AMessage('opts', this); 844 reply->setInt32("generation", mKeepAliveGeneration); 845 mConn->sendRequest(request.c_str(), reply); 846 break; 847 } 848 849 case 'opts': 850 { 851 int32_t result; 852 CHECK(msg->findInt32("result", &result)); 853 854 ALOGI("OPTIONS completed with result %d (%s)", 855 result, strerror(-result)); 856 857 int32_t generation; 858 CHECK(msg->findInt32("generation", &generation)); 859 860 if (generation != mKeepAliveGeneration) { 861 // obsolete event. 862 break; 863 } 864 865 postKeepAlive(); 866 break; 867 } 868 869 case 'abor': 870 { 871 for (size_t i = 0; i < mTracks.size(); ++i) { 872 TrackInfo *info = &mTracks.editItemAt(i); 873 874 if (!mFirstAccessUnit) { 875 postQueueEOS(i, ERROR_END_OF_STREAM); 876 } 877 878 if (!info->mUsingInterleavedTCP) { 879 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 880 881 // Clear the tag 882 if (mUIDValid) { 883 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 884 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket); 885 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 886 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket); 887 } 888 889 close(info->mRTPSocket); 890 close(info->mRTCPSocket); 891 } 892 } 893 mTracks.clear(); 894 mSetupTracksSuccessful = false; 895 mSeekPending = false; 896 mFirstAccessUnit = true; 897 mAllTracksHaveTime = false; 898 mNTPAnchorUs = -1; 899 mMediaAnchorUs = -1; 900 mNumAccessUnitsReceived = 0; 901 mReceivedFirstRTCPPacket = false; 902 mReceivedFirstRTPPacket = false; 903 mPausing = false; 904 mSeekable = true; 905 906 sp<AMessage> reply = new AMessage('tear', this); 907 908 int32_t reconnect; 909 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 910 reply->setInt32("reconnect", true); 911 } 912 913 AString request; 914 request = "TEARDOWN "; 915 916 // XXX should use aggregate url from SDP here... 917 request.append(mSessionURL); 918 request.append(" RTSP/1.0\r\n"); 919 920 request.append("Session: "); 921 request.append(mSessionID); 922 request.append("\r\n"); 923 924 request.append("\r\n"); 925 926 mConn->sendRequest(request.c_str(), reply); 927 break; 928 } 929 930 case 'tear': 931 { 932 int32_t result; 933 CHECK(msg->findInt32("result", &result)); 934 935 ALOGI("TEARDOWN completed with result %d (%s)", 936 result, strerror(-result)); 937 938 sp<AMessage> reply = new AMessage('disc', this); 939 940 int32_t reconnect; 941 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 942 reply->setInt32("reconnect", true); 943 } 944 945 mConn->disconnect(reply); 946 break; 947 } 948 949 case 'quit': 950 { 951 sp<AMessage> msg = mNotify->dup(); 952 msg->setInt32("what", kWhatDisconnected); 953 msg->setInt32("result", UNKNOWN_ERROR); 954 msg->post(); 955 break; 956 } 957 958 case 'chek': 959 { 960 int32_t generation; 961 CHECK(msg->findInt32("generation", &generation)); 962 if (generation != mCheckGeneration) { 963 // This is an outdated message. Ignore. 964 break; 965 } 966 967 if (mNumAccessUnitsReceived == 0) { 968 #if 1 969 ALOGI("stream ended? aborting."); 970 (new AMessage('abor', this))->post(); 971 break; 972 #else 973 ALOGI("haven't seen an AU in a looong time."); 974 #endif 975 } 976 977 mNumAccessUnitsReceived = 0; 978 msg->post(kAccessUnitTimeoutUs); 979 break; 980 } 981 982 case 'accu': 983 { 984 if (mSeekPending) { 985 ALOGV("Stale access unit."); 986 break; 987 } 988 989 int32_t timeUpdate; 990 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 991 size_t trackIndex; 992 CHECK(msg->findSize("track-index", &trackIndex)); 993 994 uint32_t rtpTime; 995 uint64_t ntpTime; 996 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 997 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 998 999 onTimeUpdate(trackIndex, rtpTime, ntpTime); 1000 break; 1001 } 1002 1003 int32_t first; 1004 if (msg->findInt32("first-rtcp", &first)) { 1005 mReceivedFirstRTCPPacket = true; 1006 break; 1007 } 1008 1009 if (msg->findInt32("first-rtp", &first)) { 1010 mReceivedFirstRTPPacket = true; 1011 break; 1012 } 1013 1014 ++mNumAccessUnitsReceived; 1015 postAccessUnitTimeoutCheck(); 1016 1017 size_t trackIndex; 1018 CHECK(msg->findSize("track-index", &trackIndex)); 1019 1020 if (trackIndex >= mTracks.size()) { 1021 ALOGV("late packets ignored."); 1022 break; 1023 } 1024 1025 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1026 1027 int32_t eos; 1028 if (msg->findInt32("eos", &eos)) { 1029 ALOGI("received BYE on track index %zu", trackIndex); 1030 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1031 ALOGI("No time established => fake existing data"); 1032 1033 track->mEOSReceived = true; 1034 mTryFakeRTCP = true; 1035 mReceivedFirstRTCPPacket = true; 1036 fakeTimestamps(); 1037 } else { 1038 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1039 } 1040 return; 1041 } 1042 1043 if (mSeekPending) { 1044 ALOGV("we're seeking, dropping stale packet."); 1045 break; 1046 } 1047 1048 sp<ABuffer> accessUnit; 1049 CHECK(msg->findBuffer("access-unit", &accessUnit)); 1050 onAccessUnitComplete(trackIndex, accessUnit); 1051 break; 1052 } 1053 1054 case 'paus': 1055 { 1056 int32_t generation; 1057 CHECK(msg->findInt32("pausecheck", &generation)); 1058 if (generation != mPauseGeneration) { 1059 ALOGV("Ignoring outdated pause message."); 1060 break; 1061 } 1062 1063 if (!mSeekable) { 1064 ALOGW("This is a live stream, ignoring pause request."); 1065 break; 1066 } 1067 1068 if (mPausing) { 1069 ALOGV("This stream is already paused."); 1070 break; 1071 } 1072 1073 mCheckPending = true; 1074 ++mCheckGeneration; 1075 mPausing = true; 1076 1077 AString request = "PAUSE "; 1078 request.append(mControlURL); 1079 request.append(" RTSP/1.0\r\n"); 1080 1081 request.append("Session: "); 1082 request.append(mSessionID); 1083 request.append("\r\n"); 1084 1085 request.append("\r\n"); 1086 1087 sp<AMessage> reply = new AMessage('pau2', this); 1088 mConn->sendRequest(request.c_str(), reply); 1089 break; 1090 } 1091 1092 case 'pau2': 1093 { 1094 int32_t result; 1095 CHECK(msg->findInt32("result", &result)); 1096 mCheckTimeoutGeneration++; 1097 1098 ALOGI("PAUSE completed with result %d (%s)", 1099 result, strerror(-result)); 1100 break; 1101 } 1102 1103 case 'resu': 1104 { 1105 if (mPausing && mSeekPending) { 1106 // If seeking, Play will be sent from see1 instead 1107 break; 1108 } 1109 1110 if (!mPausing) { 1111 // Dont send PLAY if we have not paused 1112 break; 1113 } 1114 AString request = "PLAY "; 1115 request.append(mControlURL); 1116 request.append(" RTSP/1.0\r\n"); 1117 1118 request.append("Session: "); 1119 request.append(mSessionID); 1120 request.append("\r\n"); 1121 1122 request.append("\r\n"); 1123 1124 sp<AMessage> reply = new AMessage('res2', this); 1125 mConn->sendRequest(request.c_str(), reply); 1126 break; 1127 } 1128 1129 case 'res2': 1130 { 1131 int32_t result; 1132 CHECK(msg->findInt32("result", &result)); 1133 1134 ALOGI("PLAY (for resume) completed with result %d (%s)", 1135 result, strerror(-result)); 1136 1137 mCheckPending = false; 1138 ++mCheckGeneration; 1139 postAccessUnitTimeoutCheck(); 1140 1141 if (result == OK) { 1142 sp<RefBase> obj; 1143 CHECK(msg->findObject("response", &obj)); 1144 sp<ARTSPResponse> response = 1145 static_cast<ARTSPResponse *>(obj.get()); 1146 1147 if (response->mStatusCode != 200) { 1148 result = UNKNOWN_ERROR; 1149 } else { 1150 parsePlayResponse(response); 1151 1152 // Post new timeout in order to make sure to use 1153 // fake timestamps if no new Sender Reports arrive 1154 postTimeout(); 1155 } 1156 } 1157 1158 if (result != OK) { 1159 ALOGE("resume failed, aborting."); 1160 (new AMessage('abor', this))->post(); 1161 } 1162 1163 mPausing = false; 1164 break; 1165 } 1166 1167 case 'seek': 1168 { 1169 if (!mSeekable) { 1170 ALOGW("This is a live stream, ignoring seek request."); 1171 1172 sp<AMessage> msg = mNotify->dup(); 1173 msg->setInt32("what", kWhatSeekDone); 1174 msg->post(); 1175 break; 1176 } 1177 1178 int64_t timeUs; 1179 CHECK(msg->findInt64("time", &timeUs)); 1180 1181 mSeekPending = true; 1182 1183 // Disable the access unit timeout until we resumed 1184 // playback again. 1185 mCheckPending = true; 1186 ++mCheckGeneration; 1187 1188 sp<AMessage> reply = new AMessage('see0', this); 1189 reply->setInt64("time", timeUs); 1190 1191 if (mPausing) { 1192 // PAUSE already sent 1193 ALOGI("Pause already sent"); 1194 reply->post(); 1195 break; 1196 } 1197 AString request = "PAUSE "; 1198 request.append(mControlURL); 1199 request.append(" RTSP/1.0\r\n"); 1200 1201 request.append("Session: "); 1202 request.append(mSessionID); 1203 request.append("\r\n"); 1204 1205 request.append("\r\n"); 1206 1207 mConn->sendRequest(request.c_str(), reply); 1208 break; 1209 } 1210 1211 case 'see0': 1212 { 1213 // Session is paused now. 1214 status_t err = OK; 1215 msg->findInt32("result", &err); 1216 1217 int64_t timeUs; 1218 CHECK(msg->findInt64("time", &timeUs)); 1219 1220 sp<AMessage> notify = mNotify->dup(); 1221 notify->setInt32("what", kWhatSeekPaused); 1222 notify->setInt32("err", err); 1223 notify->setInt64("time", timeUs); 1224 notify->post(); 1225 break; 1226 1227 } 1228 1229 case 'see1': 1230 { 1231 for (size_t i = 0; i < mTracks.size(); ++i) { 1232 TrackInfo *info = &mTracks.editItemAt(i); 1233 1234 postQueueSeekDiscontinuity(i); 1235 info->mEOSReceived = false; 1236 1237 info->mRTPAnchor = 0; 1238 info->mNTPAnchorUs = -1; 1239 } 1240 1241 mAllTracksHaveTime = false; 1242 mNTPAnchorUs = -1; 1243 1244 // Start new timeoutgeneration to avoid getting timeout 1245 // before PLAY response arrive 1246 postTimeout(); 1247 1248 int64_t timeUs; 1249 CHECK(msg->findInt64("time", &timeUs)); 1250 1251 AString request = "PLAY "; 1252 request.append(mControlURL); 1253 request.append(" RTSP/1.0\r\n"); 1254 1255 request.append("Session: "); 1256 request.append(mSessionID); 1257 request.append("\r\n"); 1258 1259 request.append( 1260 AStringPrintf( 1261 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1262 1263 request.append("\r\n"); 1264 1265 sp<AMessage> reply = new AMessage('see2', this); 1266 mConn->sendRequest(request.c_str(), reply); 1267 break; 1268 } 1269 1270 case 'see2': 1271 { 1272 if (mTracks.size() == 0) { 1273 // We have already hit abor, break 1274 break; 1275 } 1276 1277 int32_t result; 1278 CHECK(msg->findInt32("result", &result)); 1279 1280 ALOGI("PLAY (for seek) completed with result %d (%s)", 1281 result, strerror(-result)); 1282 1283 mCheckPending = false; 1284 ++mCheckGeneration; 1285 postAccessUnitTimeoutCheck(); 1286 1287 if (result == OK) { 1288 sp<RefBase> obj; 1289 CHECK(msg->findObject("response", &obj)); 1290 sp<ARTSPResponse> response = 1291 static_cast<ARTSPResponse *>(obj.get()); 1292 1293 if (response->mStatusCode != 200) { 1294 result = UNKNOWN_ERROR; 1295 } else { 1296 parsePlayResponse(response); 1297 1298 // Post new timeout in order to make sure to use 1299 // fake timestamps if no new Sender Reports arrive 1300 postTimeout(); 1301 1302 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1303 CHECK_GE(i, 0); 1304 1305 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1306 1307 ALOGI("seek completed."); 1308 } 1309 } 1310 1311 if (result != OK) { 1312 ALOGE("seek failed, aborting."); 1313 (new AMessage('abor', this))->post(); 1314 } 1315 1316 mPausing = false; 1317 mSeekPending = false; 1318 1319 // Discard all stale access units. 1320 for (size_t i = 0; i < mTracks.size(); ++i) { 1321 TrackInfo *track = &mTracks.editItemAt(i); 1322 track->mPackets.clear(); 1323 } 1324 1325 sp<AMessage> msg = mNotify->dup(); 1326 msg->setInt32("what", kWhatSeekDone); 1327 msg->post(); 1328 break; 1329 } 1330 1331 case 'biny': 1332 { 1333 sp<ABuffer> buffer; 1334 CHECK(msg->findBuffer("buffer", &buffer)); 1335 1336 int32_t index; 1337 CHECK(buffer->meta()->findInt32("index", &index)); 1338 1339 mRTPConn->injectPacket(index, buffer); 1340 break; 1341 } 1342 1343 case 'tiou': 1344 { 1345 int32_t timeoutGenerationCheck; 1346 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); 1347 if (timeoutGenerationCheck != mCheckTimeoutGeneration) { 1348 // This is an outdated message. Ignore. 1349 // This typically happens if a lot of seeks are 1350 // performed, since new timeout messages now are 1351 // posted at seek as well. 1352 break; 1353 } 1354 if (!mReceivedFirstRTCPPacket) { 1355 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { 1356 ALOGW("We received RTP packets but no RTCP packets, " 1357 "using fake timestamps."); 1358 1359 mTryFakeRTCP = true; 1360 1361 mReceivedFirstRTCPPacket = true; 1362 1363 fakeTimestamps(); 1364 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1365 ALOGW("Never received any data, switching transports."); 1366 1367 mTryTCPInterleaving = true; 1368 1369 sp<AMessage> msg = new AMessage('abor', this); 1370 msg->setInt32("reconnect", true); 1371 msg->post(); 1372 } else { 1373 ALOGW("Never received any data, disconnecting."); 1374 (new AMessage('abor', this))->post(); 1375 } 1376 } else { 1377 if (!mAllTracksHaveTime) { 1378 ALOGW("We received some RTCP packets, but time " 1379 "could not be established on all tracks, now " 1380 "using fake timestamps"); 1381 1382 fakeTimestamps(); 1383 } 1384 } 1385 break; 1386 } 1387 1388 default: 1389 TRESPASS(); 1390 break; 1391 } 1392 } 1393 1394 void postKeepAlive() { 1395 sp<AMessage> msg = new AMessage('aliv', this); 1396 msg->setInt32("generation", mKeepAliveGeneration); 1397 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1398 } 1399 1400 void cancelAccessUnitTimeoutCheck() { 1401 ALOGV("cancelAccessUnitTimeoutCheck"); 1402 ++mCheckGeneration; 1403 } 1404 1405 void postAccessUnitTimeoutCheck() { 1406 if (mCheckPending) { 1407 return; 1408 } 1409 1410 mCheckPending = true; 1411 sp<AMessage> check = new AMessage('chek', this); 1412 check->setInt32("generation", mCheckGeneration); 1413 check->post(kAccessUnitTimeoutUs); 1414 } 1415 1416 static void SplitString( 1417 const AString &s, const char *separator, List<AString> *items) { 1418 items->clear(); 1419 size_t start = 0; 1420 while (start < s.size()) { 1421 ssize_t offset = s.find(separator, start); 1422 1423 if (offset < 0) { 1424 items->push_back(AString(s, start, s.size() - start)); 1425 break; 1426 } 1427 1428 items->push_back(AString(s, start, offset - start)); 1429 start = offset + strlen(separator); 1430 } 1431 } 1432 1433 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1434 mPlayResponseParsed = true; 1435 if (mTracks.size() == 0) { 1436 ALOGV("parsePlayResponse: late packets ignored."); 1437 return; 1438 } 1439 1440 ssize_t i = response->mHeaders.indexOfKey("range"); 1441 if (i < 0) { 1442 // Server doesn't even tell use what range it is going to 1443 // play, therefore we won't support seeking. 1444 return; 1445 } 1446 1447 AString range = response->mHeaders.valueAt(i); 1448 ALOGV("Range: %s", range.c_str()); 1449 1450 AString val; 1451 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1452 1453 float npt1, npt2; 1454 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1455 // This is a live stream and therefore not seekable. 1456 1457 ALOGI("This is a live stream"); 1458 return; 1459 } 1460 1461 i = response->mHeaders.indexOfKey("rtp-info"); 1462 CHECK_GE(i, 0); 1463 1464 AString rtpInfo = response->mHeaders.valueAt(i); 1465 List<AString> streamInfos; 1466 SplitString(rtpInfo, ",", &streamInfos); 1467 1468 int n = 1; 1469 for (List<AString>::iterator it = streamInfos.begin(); 1470 it != streamInfos.end(); ++it) { 1471 (*it).trim(); 1472 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1473 1474 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1475 1476 size_t trackIndex = 0; 1477 while (trackIndex < mTracks.size() 1478 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1479 ++trackIndex; 1480 } 1481 CHECK_LT(trackIndex, mTracks.size()); 1482 1483 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1484 1485 char *end; 1486 unsigned long seq = strtoul(val.c_str(), &end, 10); 1487 1488 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1489 info->mFirstSeqNumInSegment = seq; 1490 info->mNewSegment = true; 1491 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits; 1492 1493 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1494 1495 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1496 1497 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1498 1499 info->mNormalPlayTimeRTP = rtpTime; 1500 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1501 1502 if (!mFirstAccessUnit) { 1503 postNormalPlayTimeMapping( 1504 trackIndex, 1505 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1506 } 1507 1508 ++n; 1509 } 1510 } 1511 1512 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1513 CHECK_GE(index, 0u); 1514 CHECK_LT(index, mTracks.size()); 1515 1516 const TrackInfo &info = mTracks.itemAt(index); 1517 1518 *timeScale = info.mTimeScale; 1519 1520 return info.mPacketSource->getFormat(); 1521 } 1522 1523 size_t countTracks() const { 1524 return mTracks.size(); 1525 } 1526 1527 private: 1528 struct TrackInfo { 1529 AString mURL; 1530 int mRTPSocket; 1531 int mRTCPSocket; 1532 bool mUsingInterleavedTCP; 1533 uint32_t mFirstSeqNumInSegment; 1534 bool mNewSegment; 1535 int32_t mAllowedStaleAccessUnits; 1536 1537 uint32_t mRTPAnchor; 1538 int64_t mNTPAnchorUs; 1539 int32_t mTimeScale; 1540 bool mEOSReceived; 1541 1542 uint32_t mNormalPlayTimeRTP; 1543 int64_t mNormalPlayTimeUs; 1544 1545 sp<APacketSource> mPacketSource; 1546 1547 // Stores packets temporarily while no notion of time 1548 // has been established yet. 1549 List<sp<ABuffer> > mPackets; 1550 }; 1551 1552 sp<AMessage> mNotify; 1553 bool mUIDValid; 1554 uid_t mUID; 1555 sp<ALooper> mNetLooper; 1556 sp<ARTSPConnection> mConn; 1557 sp<ARTPConnection> mRTPConn; 1558 sp<ASessionDescription> mSessionDesc; 1559 AString mOriginalSessionURL; // This one still has user:pass@ 1560 AString mSessionURL; 1561 AString mSessionHost; 1562 AString mBaseURL; 1563 AString mControlURL; 1564 AString mSessionID; 1565 bool mSetupTracksSuccessful; 1566 bool mSeekPending; 1567 bool mFirstAccessUnit; 1568 1569 bool mAllTracksHaveTime; 1570 int64_t mNTPAnchorUs; 1571 int64_t mMediaAnchorUs; 1572 int64_t mLastMediaTimeUs; 1573 1574 int64_t mNumAccessUnitsReceived; 1575 bool mCheckPending; 1576 int32_t mCheckGeneration; 1577 int32_t mCheckTimeoutGeneration; 1578 bool mTryTCPInterleaving; 1579 bool mTryFakeRTCP; 1580 bool mReceivedFirstRTCPPacket; 1581 bool mReceivedFirstRTPPacket; 1582 bool mSeekable; 1583 int64_t mKeepAliveTimeoutUs; 1584 int32_t mKeepAliveGeneration; 1585 bool mPausing; 1586 int32_t mPauseGeneration; 1587 1588 Vector<TrackInfo> mTracks; 1589 1590 bool mPlayResponseParsed; 1591 1592 void setupTrack(size_t index) { 1593 sp<APacketSource> source = 1594 new APacketSource(mSessionDesc, index); 1595 1596 if (source->initCheck() != OK) { 1597 ALOGW("Unsupported format. Ignoring track #%zu.", index); 1598 1599 sp<AMessage> reply = new AMessage('setu', this); 1600 reply->setSize("index", index); 1601 reply->setInt32("result", ERROR_UNSUPPORTED); 1602 reply->post(); 1603 return; 1604 } 1605 1606 AString url; 1607 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1608 1609 AString trackURL; 1610 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1611 1612 mTracks.push(TrackInfo()); 1613 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1614 info->mURL = trackURL; 1615 info->mPacketSource = source; 1616 info->mUsingInterleavedTCP = false; 1617 info->mFirstSeqNumInSegment = 0; 1618 info->mNewSegment = true; 1619 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits; 1620 info->mRTPSocket = -1; 1621 info->mRTCPSocket = -1; 1622 info->mRTPAnchor = 0; 1623 info->mNTPAnchorUs = -1; 1624 info->mNormalPlayTimeRTP = 0; 1625 info->mNormalPlayTimeUs = 0ll; 1626 1627 unsigned long PT; 1628 AString formatDesc; 1629 AString formatParams; 1630 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1631 1632 int32_t timescale; 1633 int32_t numChannels; 1634 ASessionDescription::ParseFormatDesc( 1635 formatDesc.c_str(), ×cale, &numChannels); 1636 1637 info->mTimeScale = timescale; 1638 info->mEOSReceived = false; 1639 1640 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str()); 1641 1642 AString request = "SETUP "; 1643 request.append(trackURL); 1644 request.append(" RTSP/1.0\r\n"); 1645 1646 if (mTryTCPInterleaving) { 1647 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1648 info->mUsingInterleavedTCP = true; 1649 info->mRTPSocket = interleaveIndex; 1650 info->mRTCPSocket = interleaveIndex + 1; 1651 1652 request.append("Transport: RTP/AVP/TCP;interleaved="); 1653 request.append(interleaveIndex); 1654 request.append("-"); 1655 request.append(interleaveIndex + 1); 1656 } else { 1657 unsigned rtpPort; 1658 ARTPConnection::MakePortPair( 1659 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1660 1661 if (mUIDValid) { 1662 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1663 (uint32_t)*(uint32_t*) "RTP_"); 1664 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1665 (uint32_t)*(uint32_t*) "RTP_"); 1666 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); 1667 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); 1668 } 1669 1670 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1671 request.append(rtpPort); 1672 request.append("-"); 1673 request.append(rtpPort + 1); 1674 } 1675 1676 request.append("\r\n"); 1677 1678 if (index > 1) { 1679 request.append("Session: "); 1680 request.append(mSessionID); 1681 request.append("\r\n"); 1682 } 1683 1684 request.append("\r\n"); 1685 1686 sp<AMessage> reply = new AMessage('setu', this); 1687 reply->setSize("index", index); 1688 reply->setSize("track-index", mTracks.size() - 1); 1689 mConn->sendRequest(request.c_str(), reply); 1690 } 1691 1692 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1693 out->clear(); 1694 1695 if (strncasecmp("rtsp://", baseURL, 7)) { 1696 // Base URL must be absolute 1697 return false; 1698 } 1699 1700 if (!strncasecmp("rtsp://", url, 7)) { 1701 // "url" is already an absolute URL, ignore base URL. 1702 out->setTo(url); 1703 return true; 1704 } 1705 1706 size_t n = strlen(baseURL); 1707 out->setTo(baseURL); 1708 if (baseURL[n - 1] != '/') { 1709 out->append("/"); 1710 } 1711 out->append(url); 1712 1713 return true; 1714 } 1715 1716 void fakeTimestamps() { 1717 mNTPAnchorUs = -1ll; 1718 for (size_t i = 0; i < mTracks.size(); ++i) { 1719 onTimeUpdate(i, 0, 0ll); 1720 } 1721 } 1722 1723 bool dataReceivedOnAllChannels() { 1724 TrackInfo *track; 1725 for (size_t i = 0; i < mTracks.size(); ++i) { 1726 track = &mTracks.editItemAt(i); 1727 if (track->mPackets.empty()) { 1728 return false; 1729 } 1730 } 1731 return true; 1732 } 1733 1734 void handleFirstAccessUnit() { 1735 if (mFirstAccessUnit) { 1736 sp<AMessage> msg = mNotify->dup(); 1737 msg->setInt32("what", kWhatConnected); 1738 msg->post(); 1739 1740 if (mSeekable) { 1741 for (size_t i = 0; i < mTracks.size(); ++i) { 1742 TrackInfo *info = &mTracks.editItemAt(i); 1743 1744 postNormalPlayTimeMapping( 1745 i, 1746 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1747 } 1748 } 1749 1750 mFirstAccessUnit = false; 1751 } 1752 } 1753 1754 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1755 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx", 1756 trackIndex, rtpTime, (long long)ntpTime); 1757 1758 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1759 1760 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1761 1762 track->mRTPAnchor = rtpTime; 1763 track->mNTPAnchorUs = ntpTimeUs; 1764 1765 if (mNTPAnchorUs < 0) { 1766 mNTPAnchorUs = ntpTimeUs; 1767 mMediaAnchorUs = mLastMediaTimeUs; 1768 } 1769 1770 if (!mAllTracksHaveTime) { 1771 bool allTracksHaveTime = (mTracks.size() > 0); 1772 for (size_t i = 0; i < mTracks.size(); ++i) { 1773 TrackInfo *track = &mTracks.editItemAt(i); 1774 if (track->mNTPAnchorUs < 0) { 1775 allTracksHaveTime = false; 1776 break; 1777 } 1778 } 1779 if (allTracksHaveTime) { 1780 mAllTracksHaveTime = true; 1781 ALOGI("Time now established for all tracks."); 1782 } 1783 } 1784 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1785 handleFirstAccessUnit(); 1786 1787 // Time is now established, lets start timestamping immediately 1788 for (size_t i = 0; i < mTracks.size(); ++i) { 1789 if (OK != processAccessUnitQueue(i)) { 1790 return; 1791 } 1792 } 1793 for (size_t i = 0; i < mTracks.size(); ++i) { 1794 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1795 if (trackInfo->mEOSReceived) { 1796 postQueueEOS(i, ERROR_END_OF_STREAM); 1797 trackInfo->mEOSReceived = false; 1798 } 1799 } 1800 } 1801 } 1802 1803 status_t processAccessUnitQueue(int32_t trackIndex) { 1804 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1805 while (!track->mPackets.empty()) { 1806 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1807 track->mPackets.erase(track->mPackets.begin()); 1808 1809 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1810 if (track->mNewSegment) { 1811 // The sequence number from RTP packet has only 16 bits and is extended 1812 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of 1813 // RTSP "PLAY" command should be used to detect the first RTP packet 1814 // after seeking. 1815 if (mSeekable) { 1816 if (track->mAllowedStaleAccessUnits > 0) { 1817 uint32_t seqNum16 = seqNum & 0xffff; 1818 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff; 1819 if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits 1820 || seqNum16 < firstSeqNumInSegment16) { 1821 // Not the first rtp packet of the stream after seeking, discarding. 1822 track->mAllowedStaleAccessUnits--; 1823 ALOGV("discarding stale access unit (0x%x : 0x%x)", 1824 seqNum, track->mFirstSeqNumInSegment); 1825 continue; 1826 } 1827 ALOGW_IF(seqNum16 != firstSeqNumInSegment16, 1828 "Missing the first packet(%u), now take packet(%u) as first one", 1829 track->mFirstSeqNumInSegment, seqNum); 1830 } else { // track->mAllowedStaleAccessUnits <= 0 1831 mNumAccessUnitsReceived = 0; 1832 ALOGW_IF(track->mAllowedStaleAccessUnits == 0, 1833 "Still no first rtp packet after %d stale ones", 1834 kMaxAllowedStaleAccessUnits); 1835 track->mAllowedStaleAccessUnits = -1; 1836 return UNKNOWN_ERROR; 1837 } 1838 } 1839 1840 // Now found the first rtp packet of the stream after seeking. 1841 track->mFirstSeqNumInSegment = seqNum; 1842 track->mNewSegment = false; 1843 } 1844 1845 if (seqNum < track->mFirstSeqNumInSegment) { 1846 ALOGV("dropping stale access-unit (%d < %d)", 1847 seqNum, track->mFirstSeqNumInSegment); 1848 continue; 1849 } 1850 1851 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1852 postQueueAccessUnit(trackIndex, accessUnit); 1853 } 1854 } 1855 return OK; 1856 } 1857 1858 void onAccessUnitComplete( 1859 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1860 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1861 track->mPackets.push_back(accessUnit); 1862 1863 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1864 ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum); 1865 1866 if(!mPlayResponseParsed){ 1867 ALOGV("play response is not parsed"); 1868 return; 1869 } 1870 1871 handleFirstAccessUnit(); 1872 1873 if (!mAllTracksHaveTime) { 1874 ALOGV("storing accessUnit, no time established yet"); 1875 return; 1876 } 1877 1878 if (OK != processAccessUnitQueue(trackIndex)) { 1879 return; 1880 } 1881 1882 if (track->mEOSReceived) { 1883 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1884 track->mEOSReceived = false; 1885 } 1886 } 1887 1888 bool addMediaTimestamp( 1889 int32_t trackIndex, const TrackInfo *track, 1890 const sp<ABuffer> &accessUnit) { 1891 UNUSED_UNLESS_VERBOSE(trackIndex); 1892 1893 uint32_t rtpTime; 1894 CHECK(accessUnit->meta()->findInt32( 1895 "rtp-time", (int32_t *)&rtpTime)); 1896 1897 int64_t relRtpTimeUs = 1898 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1899 / track->mTimeScale; 1900 1901 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1902 1903 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1904 1905 if (mediaTimeUs > mLastMediaTimeUs) { 1906 mLastMediaTimeUs = mediaTimeUs; 1907 } 1908 1909 if (mediaTimeUs < 0) { 1910 ALOGV("dropping early accessUnit."); 1911 return false; 1912 } 1913 1914 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)", 1915 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6); 1916 1917 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1918 1919 return true; 1920 } 1921 1922 void postQueueAccessUnit( 1923 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1924 sp<AMessage> msg = mNotify->dup(); 1925 msg->setInt32("what", kWhatAccessUnit); 1926 msg->setSize("trackIndex", trackIndex); 1927 msg->setBuffer("accessUnit", accessUnit); 1928 msg->post(); 1929 } 1930 1931 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1932 sp<AMessage> msg = mNotify->dup(); 1933 msg->setInt32("what", kWhatEOS); 1934 msg->setSize("trackIndex", trackIndex); 1935 msg->setInt32("finalResult", finalResult); 1936 msg->post(); 1937 } 1938 1939 void postQueueSeekDiscontinuity(size_t trackIndex) { 1940 sp<AMessage> msg = mNotify->dup(); 1941 msg->setInt32("what", kWhatSeekDiscontinuity); 1942 msg->setSize("trackIndex", trackIndex); 1943 msg->post(); 1944 } 1945 1946 void postNormalPlayTimeMapping( 1947 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1948 sp<AMessage> msg = mNotify->dup(); 1949 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1950 msg->setSize("trackIndex", trackIndex); 1951 msg->setInt32("rtpTime", rtpTime); 1952 msg->setInt64("nptUs", nptUs); 1953 msg->post(); 1954 } 1955 1956 void postTimeout() { 1957 sp<AMessage> timeout = new AMessage('tiou', this); 1958 mCheckTimeoutGeneration++; 1959 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1960 1961 int64_t startupTimeoutUs; 1962 startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs); 1963 timeout->post(startupTimeoutUs); 1964 } 1965 1966 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1967 }; 1968 1969 } // namespace android 1970 1971 #endif // MY_HANDLER_H_ 1972