1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MY_HANDLER_H_ 18 19 #define MY_HANDLER_H_ 20 21 //#define LOG_NDEBUG 0 22 23 #ifndef LOG_TAG 24 #define LOG_TAG "MyHandler" 25 #endif 26 27 #include <utils/Log.h> 28 #include <cutils/properties.h> // for property_get 29 30 #include "APacketSource.h" 31 #include "ARTPConnection.h" 32 #include "ARTSPConnection.h" 33 #include "ASessionDescription.h" 34 #include "NetworkUtils.h" 35 36 #include <ctype.h> 37 #include <cutils/properties.h> 38 39 #include <media/stagefright/foundation/ABuffer.h> 40 #include <media/stagefright/foundation/ADebug.h> 41 #include <media/stagefright/foundation/ALooper.h> 42 #include <media/stagefright/foundation/AMessage.h> 43 #include <media/stagefright/MediaErrors.h> 44 #include <media/stagefright/Utils.h> 45 46 #include <arpa/inet.h> 47 #include <sys/socket.h> 48 #include <netdb.h> 49 50 #include "HTTPBase.h" 51 52 #if LOG_NDEBUG 53 #define UNUSED_UNLESS_VERBOSE(x) (void)(x) 54 #else 55 #define UNUSED_UNLESS_VERBOSE(x) 56 #endif 57 58 #ifndef FALLTHROUGH_INTENDED 59 #define FALLTHROUGH_INTENDED [[clang::fallthrough]] // NOLINT 60 #endif 61 62 // If no access units are received within 5 secs, assume that the rtp 63 // stream has ended and signal end of stream. 64 static int64_t kAccessUnitTimeoutUs = 10000000ll; 65 66 // If no access units arrive for the first 10 secs after starting the 67 // stream, assume none ever will and signal EOS or switch transports. 68 static int64_t kStartupTimeoutUs = 10000000ll; 69 70 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 71 72 static int64_t kPauseDelayUs = 3000000ll; 73 74 // The allowed maximum number of stale access units at the beginning of 75 // a new sequence. 76 static int32_t kMaxAllowedStaleAccessUnits = 20; 77 78 static int64_t kTearDownTimeoutUs = 3000000ll; 79 80 namespace android { 81 82 static bool GetAttribute(const char *s, const char *key, AString *value) { 83 value->clear(); 84 85 size_t keyLen = strlen(key); 86 87 for (;;) { 88 while (isspace(*s)) { 89 ++s; 90 } 91 92 const char *colonPos = strchr(s, ';'); 93 94 size_t len = 95 (colonPos == NULL) ? strlen(s) : colonPos - s; 96 97 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 98 value->setTo(&s[keyLen + 1], len - keyLen - 1); 99 return true; 100 } 101 102 if (colonPos == NULL) { 103 return false; 104 } 105 106 s = colonPos + 1; 107 } 108 } 109 110 struct MyHandler : public AHandler { 111 enum { 112 kWhatConnected = 'conn', 113 kWhatDisconnected = 'disc', 114 kWhatSeekPaused = 'spau', 115 kWhatSeekDone = 'sdon', 116 117 kWhatAccessUnit = 'accU', 118 kWhatEOS = 'eos!', 119 kWhatSeekDiscontinuity = 'seeD', 120 kWhatNormalPlayTimeMapping = 'nptM', 121 }; 122 123 MyHandler( 124 const char *url, 125 const sp<AMessage> ¬ify, 126 bool uidValid = false, uid_t uid = 0) 127 : mNotify(notify), 128 mUIDValid(uidValid), 129 mUID(uid), 130 mNetLooper(new ALooper), 131 mConn(new ARTSPConnection(mUIDValid, mUID)), 132 mRTPConn(new ARTPConnection), 133 mOriginalSessionURL(url), 134 mSessionURL(url), 135 mSetupTracksSuccessful(false), 136 mSeekPending(false), 137 mFirstAccessUnit(true), 138 mAllTracksHaveTime(false), 139 mNTPAnchorUs(-1), 140 mMediaAnchorUs(-1), 141 mLastMediaTimeUs(0), 142 mNumAccessUnitsReceived(0), 143 mCheckPending(false), 144 mCheckGeneration(0), 145 mCheckTimeoutGeneration(0), 146 mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)), 147 mTryFakeRTCP(false), 148 mReceivedFirstRTCPPacket(false), 149 mReceivedFirstRTPPacket(false), 150 mSeekable(true), 151 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 152 mKeepAliveGeneration(0), 153 mPausing(false), 154 mPauseGeneration(0), 155 mPlayResponseParsed(false) { 156 mNetLooper->setName("rtsp net"); 157 mNetLooper->start(false /* runOnCallingThread */, 158 false /* canCallJava */, 159 PRIORITY_HIGHEST); 160 161 // Strip any authentication info from the session url, we don't 162 // want to transmit user/pass in cleartext. 163 AString host, path, user, pass; 164 unsigned port; 165 CHECK(ARTSPConnection::ParseURL( 166 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 167 168 if (user.size() > 0) { 169 mSessionURL.clear(); 170 mSessionURL.append("rtsp://"); 171 mSessionURL.append(host); 172 mSessionURL.append(":"); 173 mSessionURL.append(AStringPrintf("%u", port)); 174 mSessionURL.append(path); 175 176 ALOGV("rewritten session url: '%s'", mSessionURL.c_str()); 177 } 178 179 mSessionHost = host; 180 } 181 182 void connect() { 183 looper()->registerHandler(mConn); 184 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 185 186 sp<AMessage> notify = new AMessage('biny', this); 187 mConn->observeBinaryData(notify); 188 189 sp<AMessage> reply = new AMessage('conn', this); 190 mConn->connect(mOriginalSessionURL.c_str(), reply); 191 } 192 193 void loadSDP(const sp<ASessionDescription>& desc) { 194 looper()->registerHandler(mConn); 195 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 196 197 sp<AMessage> notify = new AMessage('biny', this); 198 mConn->observeBinaryData(notify); 199 200 sp<AMessage> reply = new AMessage('sdpl', this); 201 reply->setObject("description", desc); 202 mConn->connect(mOriginalSessionURL.c_str(), reply); 203 } 204 205 AString getControlURL() { 206 AString sessionLevelControlURL; 207 if (mSessionDesc->findAttribute( 208 0, 209 "a=control", 210 &sessionLevelControlURL)) { 211 if (sessionLevelControlURL.compare("*") == 0) { 212 return mBaseURL; 213 } else { 214 AString controlURL; 215 CHECK(MakeURL( 216 mBaseURL.c_str(), 217 sessionLevelControlURL.c_str(), 218 &controlURL)); 219 return controlURL; 220 } 221 } else { 222 return mSessionURL; 223 } 224 } 225 226 void disconnect() { 227 (new AMessage('abor', this))->post(); 228 } 229 230 void seek(int64_t timeUs) { 231 sp<AMessage> msg = new AMessage('seek', this); 232 msg->setInt64("time", timeUs); 233 mPauseGeneration++; 234 msg->post(); 235 } 236 237 void continueSeekAfterPause(int64_t timeUs) { 238 sp<AMessage> msg = new AMessage('see1', this); 239 msg->setInt64("time", timeUs); 240 msg->post(); 241 } 242 243 bool isSeekable() const { 244 return mSeekable; 245 } 246 247 void pause() { 248 sp<AMessage> msg = new AMessage('paus', this); 249 mPauseGeneration++; 250 msg->setInt32("pausecheck", mPauseGeneration); 251 msg->post(); 252 } 253 254 void resume() { 255 sp<AMessage> msg = new AMessage('resu', this); 256 mPauseGeneration++; 257 msg->post(); 258 } 259 260 static void addRR(const sp<ABuffer> &buf) { 261 uint8_t *ptr = buf->data() + buf->size(); 262 ptr[0] = 0x80 | 0; 263 ptr[1] = 201; // RR 264 ptr[2] = 0; 265 ptr[3] = 1; 266 ptr[4] = 0xde; // SSRC 267 ptr[5] = 0xad; 268 ptr[6] = 0xbe; 269 ptr[7] = 0xef; 270 271 buf->setRange(0, buf->size() + 8); 272 } 273 274 static void addSDES(int s, const sp<ABuffer> &buffer) { 275 struct sockaddr_in addr; 276 socklen_t addrSize = sizeof(addr); 277 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) { 278 inet_aton("0.0.0.0", &(addr.sin_addr)); 279 } 280 281 uint8_t *data = buffer->data() + buffer->size(); 282 data[0] = 0x80 | 1; 283 data[1] = 202; // SDES 284 data[4] = 0xde; // SSRC 285 data[5] = 0xad; 286 data[6] = 0xbe; 287 data[7] = 0xef; 288 289 size_t offset = 8; 290 291 data[offset++] = 1; // CNAME 292 293 AString cname = "stagefright@"; 294 cname.append(inet_ntoa(addr.sin_addr)); 295 data[offset++] = cname.size(); 296 297 memcpy(&data[offset], cname.c_str(), cname.size()); 298 offset += cname.size(); 299 300 data[offset++] = 6; // TOOL 301 302 AString tool = MakeUserAgent(); 303 304 data[offset++] = tool.size(); 305 306 memcpy(&data[offset], tool.c_str(), tool.size()); 307 offset += tool.size(); 308 309 data[offset++] = 0; 310 311 if ((offset % 4) > 0) { 312 size_t count = 4 - (offset % 4); 313 switch (count) { 314 case 3: 315 data[offset++] = 0; 316 FALLTHROUGH_INTENDED; 317 case 2: 318 data[offset++] = 0; 319 FALLTHROUGH_INTENDED; 320 case 1: 321 data[offset++] = 0; 322 } 323 } 324 325 size_t numWords = (offset / 4) - 1; 326 data[2] = numWords >> 8; 327 data[3] = numWords & 0xff; 328 329 buffer->setRange(buffer->offset(), buffer->size() + offset); 330 } 331 332 // In case we're behind NAT, fire off two UDP packets to the remote 333 // rtp/rtcp ports to poke a hole into the firewall for future incoming 334 // packets. We're going to send an RR/SDES RTCP packet to both of them. 335 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 336 struct sockaddr_in addr; 337 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 338 addr.sin_family = AF_INET; 339 340 AString source; 341 AString server_port; 342 if (!GetAttribute(transport.c_str(), 343 "source", 344 &source)) { 345 ALOGW("Missing 'source' field in Transport response. Using " 346 "RTSP endpoint address."); 347 348 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 349 if (ent == NULL) { 350 ALOGE("Failed to look up address of session host"); 351 352 return false; 353 } 354 355 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 356 } else { 357 addr.sin_addr.s_addr = inet_addr(source.c_str()); 358 } 359 360 if (!GetAttribute(transport.c_str(), 361 "server_port", 362 &server_port)) { 363 ALOGI("Missing 'server_port' field in Transport response."); 364 return false; 365 } 366 367 int rtpPort, rtcpPort; 368 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 369 || rtpPort <= 0 || rtpPort > 65535 370 || rtcpPort <=0 || rtcpPort > 65535 371 || rtcpPort != rtpPort + 1) { 372 ALOGE("Server picked invalid RTP/RTCP port pair %s," 373 " RTP port must be even, RTCP port must be one higher.", 374 server_port.c_str()); 375 376 return false; 377 } 378 379 if (rtpPort & 1) { 380 ALOGW("Server picked an odd RTP port, it should've picked an " 381 "even one, we'll let it pass for now, but this may break " 382 "in the future."); 383 } 384 385 if (addr.sin_addr.s_addr == INADDR_NONE) { 386 return true; 387 } 388 389 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 390 // No firewalls to traverse on the loopback interface. 391 return true; 392 } 393 394 // Make up an RR/SDES RTCP packet. 395 sp<ABuffer> buf = new ABuffer(65536); 396 buf->setRange(0, 0); 397 addRR(buf); 398 addSDES(rtpSocket, buf); 399 400 addr.sin_port = htons(rtpPort); 401 402 ssize_t n = sendto( 403 rtpSocket, buf->data(), buf->size(), 0, 404 (const sockaddr *)&addr, sizeof(addr)); 405 406 if (n < (ssize_t)buf->size()) { 407 ALOGE("failed to poke a hole for RTP packets"); 408 return false; 409 } 410 411 addr.sin_port = htons(rtcpPort); 412 413 n = sendto( 414 rtcpSocket, buf->data(), buf->size(), 0, 415 (const sockaddr *)&addr, sizeof(addr)); 416 417 if (n < (ssize_t)buf->size()) { 418 ALOGE("failed to poke a hole for RTCP packets"); 419 return false; 420 } 421 422 ALOGV("successfully poked holes."); 423 424 return true; 425 } 426 427 static bool isLiveStream(const sp<ASessionDescription> &desc) { 428 AString attrLiveStream; 429 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 430 ssize_t semicolonPos = attrLiveStream.find(";", 2); 431 432 const char* liveStreamValue; 433 if (semicolonPos < 0) { 434 liveStreamValue = attrLiveStream.c_str(); 435 } else { 436 AString valString; 437 valString.setTo(attrLiveStream, 438 semicolonPos + 1, 439 attrLiveStream.size() - semicolonPos - 1); 440 liveStreamValue = valString.c_str(); 441 } 442 443 uint32_t value = strtoul(liveStreamValue, NULL, 10); 444 if (value == 1) { 445 ALOGV("found live stream"); 446 return true; 447 } 448 } else { 449 // It is a live stream if no duration is returned 450 int64_t durationUs; 451 if (!desc->getDurationUs(&durationUs)) { 452 ALOGV("No duration found, assume live stream"); 453 return true; 454 } 455 } 456 457 return false; 458 } 459 460 virtual void onMessageReceived(const sp<AMessage> &msg) { 461 switch (msg->what()) { 462 case 'conn': 463 { 464 int32_t result; 465 CHECK(msg->findInt32("result", &result)); 466 467 ALOGI("connection request completed with result %d (%s)", 468 result, strerror(-result)); 469 470 if (result == OK) { 471 AString request; 472 request = "DESCRIBE "; 473 request.append(mSessionURL); 474 request.append(" RTSP/1.0\r\n"); 475 request.append("Accept: application/sdp\r\n"); 476 request.append("\r\n"); 477 478 sp<AMessage> reply = new AMessage('desc', this); 479 mConn->sendRequest(request.c_str(), reply); 480 } else { 481 (new AMessage('disc', this))->post(); 482 } 483 break; 484 } 485 486 case 'disc': 487 { 488 ++mKeepAliveGeneration; 489 490 int32_t reconnect; 491 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 492 sp<AMessage> reply = new AMessage('conn', this); 493 mConn->connect(mOriginalSessionURL.c_str(), reply); 494 } else { 495 (new AMessage('quit', this))->post(); 496 } 497 break; 498 } 499 500 case 'desc': 501 { 502 int32_t result; 503 CHECK(msg->findInt32("result", &result)); 504 505 ALOGI("DESCRIBE completed with result %d (%s)", 506 result, strerror(-result)); 507 508 if (result == OK) { 509 sp<RefBase> obj; 510 CHECK(msg->findObject("response", &obj)); 511 sp<ARTSPResponse> response = 512 static_cast<ARTSPResponse *>(obj.get()); 513 514 if (response->mStatusCode == 301 || response->mStatusCode == 302) { 515 ssize_t i = response->mHeaders.indexOfKey("location"); 516 CHECK_GE(i, 0); 517 518 mOriginalSessionURL = response->mHeaders.valueAt(i); 519 mSessionURL = mOriginalSessionURL; 520 521 // Strip any authentication info from the session url, we don't 522 // want to transmit user/pass in cleartext. 523 AString host, path, user, pass; 524 unsigned port; 525 if (ARTSPConnection::ParseURL( 526 mSessionURL.c_str(), &host, &port, &path, &user, &pass) 527 && user.size() > 0) { 528 mSessionURL.clear(); 529 mSessionURL.append("rtsp://"); 530 mSessionURL.append(host); 531 mSessionURL.append(":"); 532 mSessionURL.append(AStringPrintf("%u", port)); 533 mSessionURL.append(path); 534 535 ALOGV("rewritten session url: '%s'", mSessionURL.c_str()); 536 } 537 538 sp<AMessage> reply = new AMessage('conn', this); 539 mConn->connect(mOriginalSessionURL.c_str(), reply); 540 break; 541 } 542 543 if (response->mStatusCode != 200) { 544 result = UNKNOWN_ERROR; 545 } else if (response->mContent == NULL) { 546 result = ERROR_MALFORMED; 547 ALOGE("The response has no content."); 548 } else { 549 mSessionDesc = new ASessionDescription; 550 551 mSessionDesc->setTo( 552 response->mContent->data(), 553 response->mContent->size()); 554 555 if (!mSessionDesc->isValid()) { 556 ALOGE("Failed to parse session description."); 557 result = ERROR_MALFORMED; 558 } else { 559 ssize_t i = response->mHeaders.indexOfKey("content-base"); 560 if (i >= 0) { 561 mBaseURL = response->mHeaders.valueAt(i); 562 } else { 563 i = response->mHeaders.indexOfKey("content-location"); 564 if (i >= 0) { 565 mBaseURL = response->mHeaders.valueAt(i); 566 } else { 567 mBaseURL = mSessionURL; 568 } 569 } 570 571 mSeekable = !isLiveStream(mSessionDesc); 572 573 if (!mBaseURL.startsWith("rtsp://")) { 574 // Some misbehaving servers specify a relative 575 // URL in one of the locations above, combine 576 // it with the absolute session URL to get 577 // something usable... 578 579 ALOGW("Server specified a non-absolute base URL" 580 ", combining it with the session URL to " 581 "get something usable..."); 582 583 AString tmp; 584 CHECK(MakeURL( 585 mSessionURL.c_str(), 586 mBaseURL.c_str(), 587 &tmp)); 588 589 mBaseURL = tmp; 590 } 591 592 mControlURL = getControlURL(); 593 594 if (mSessionDesc->countTracks() < 2) { 595 // There's no actual tracks in this session. 596 // The first "track" is merely session meta 597 // data. 598 599 ALOGW("Session doesn't contain any playable " 600 "tracks. Aborting."); 601 result = ERROR_UNSUPPORTED; 602 } else { 603 setupTrack(1); 604 } 605 } 606 } 607 } 608 609 if (result != OK) { 610 sp<AMessage> reply = new AMessage('disc', this); 611 mConn->disconnect(reply); 612 } 613 break; 614 } 615 616 case 'sdpl': 617 { 618 int32_t result; 619 CHECK(msg->findInt32("result", &result)); 620 621 ALOGI("SDP connection request completed with result %d (%s)", 622 result, strerror(-result)); 623 624 if (result == OK) { 625 sp<RefBase> obj; 626 CHECK(msg->findObject("description", &obj)); 627 mSessionDesc = 628 static_cast<ASessionDescription *>(obj.get()); 629 630 if (!mSessionDesc->isValid()) { 631 ALOGE("Failed to parse session description."); 632 result = ERROR_MALFORMED; 633 } else { 634 mBaseURL = mSessionURL; 635 636 mSeekable = !isLiveStream(mSessionDesc); 637 638 mControlURL = getControlURL(); 639 640 if (mSessionDesc->countTracks() < 2) { 641 // There's no actual tracks in this session. 642 // The first "track" is merely session meta 643 // data. 644 645 ALOGW("Session doesn't contain any playable " 646 "tracks. Aborting."); 647 result = ERROR_UNSUPPORTED; 648 } else { 649 setupTrack(1); 650 } 651 } 652 } 653 654 if (result != OK) { 655 sp<AMessage> reply = new AMessage('disc', this); 656 mConn->disconnect(reply); 657 } 658 break; 659 } 660 661 case 'setu': 662 { 663 size_t index; 664 CHECK(msg->findSize("index", &index)); 665 666 TrackInfo *track = NULL; 667 size_t trackIndex; 668 if (msg->findSize("track-index", &trackIndex)) { 669 track = &mTracks.editItemAt(trackIndex); 670 } 671 672 int32_t result; 673 CHECK(msg->findInt32("result", &result)); 674 675 ALOGI("SETUP(%zu) completed with result %d (%s)", 676 index, result, strerror(-result)); 677 678 if (result == OK) { 679 CHECK(track != NULL); 680 681 sp<RefBase> obj; 682 CHECK(msg->findObject("response", &obj)); 683 sp<ARTSPResponse> response = 684 static_cast<ARTSPResponse *>(obj.get()); 685 686 if (response->mStatusCode != 200) { 687 result = UNKNOWN_ERROR; 688 } else { 689 ssize_t i = response->mHeaders.indexOfKey("session"); 690 CHECK_GE(i, 0); 691 692 mSessionID = response->mHeaders.valueAt(i); 693 694 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 695 AString timeoutStr; 696 if (GetAttribute( 697 mSessionID.c_str(), "timeout", &timeoutStr)) { 698 char *end; 699 unsigned long timeoutSecs = 700 strtoul(timeoutStr.c_str(), &end, 10); 701 702 if (end == timeoutStr.c_str() || *end != '\0') { 703 ALOGW("server specified malformed timeout '%s'", 704 timeoutStr.c_str()); 705 706 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 707 } else if (timeoutSecs < 15) { 708 ALOGW("server specified too short a timeout " 709 "(%lu secs), using default.", 710 timeoutSecs); 711 712 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 713 } else { 714 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 715 716 ALOGI("server specified timeout of %lu secs.", 717 timeoutSecs); 718 } 719 } 720 721 i = mSessionID.find(";"); 722 if (i >= 0) { 723 // Remove options, i.e. ";timeout=90" 724 mSessionID.erase(i, mSessionID.size() - i); 725 } 726 727 sp<AMessage> notify = new AMessage('accu', this); 728 notify->setSize("track-index", trackIndex); 729 730 i = response->mHeaders.indexOfKey("transport"); 731 CHECK_GE(i, 0); 732 733 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) { 734 if (!track->mUsingInterleavedTCP) { 735 AString transport = response->mHeaders.valueAt(i); 736 737 // We are going to continue even if we were 738 // unable to poke a hole into the firewall... 739 pokeAHole( 740 track->mRTPSocket, 741 track->mRTCPSocket, 742 transport); 743 } 744 745 mRTPConn->addStream( 746 track->mRTPSocket, track->mRTCPSocket, 747 mSessionDesc, index, 748 notify, track->mUsingInterleavedTCP); 749 750 mSetupTracksSuccessful = true; 751 } else { 752 result = BAD_VALUE; 753 } 754 } 755 } 756 757 if (result != OK) { 758 if (track) { 759 if (!track->mUsingInterleavedTCP) { 760 // Clear the tag 761 if (mUIDValid) { 762 NetworkUtils::UnRegisterSocketUserTag(track->mRTPSocket); 763 NetworkUtils::UnRegisterSocketUserMark(track->mRTPSocket); 764 NetworkUtils::UnRegisterSocketUserTag(track->mRTCPSocket); 765 NetworkUtils::UnRegisterSocketUserMark(track->mRTCPSocket); 766 } 767 768 close(track->mRTPSocket); 769 close(track->mRTCPSocket); 770 } 771 772 mTracks.removeItemsAt(trackIndex); 773 } 774 } 775 776 ++index; 777 if (result == OK && index < mSessionDesc->countTracks()) { 778 setupTrack(index); 779 } else if (mSetupTracksSuccessful) { 780 ++mKeepAliveGeneration; 781 postKeepAlive(); 782 783 AString request = "PLAY "; 784 request.append(mControlURL); 785 request.append(" RTSP/1.0\r\n"); 786 787 request.append("Session: "); 788 request.append(mSessionID); 789 request.append("\r\n"); 790 791 request.append("\r\n"); 792 793 sp<AMessage> reply = new AMessage('play', this); 794 mConn->sendRequest(request.c_str(), reply); 795 } else { 796 sp<AMessage> reply = new AMessage('disc', this); 797 mConn->disconnect(reply); 798 } 799 break; 800 } 801 802 case 'play': 803 { 804 int32_t result; 805 CHECK(msg->findInt32("result", &result)); 806 807 ALOGI("PLAY completed with result %d (%s)", 808 result, strerror(-result)); 809 810 if (result == OK) { 811 sp<RefBase> obj; 812 CHECK(msg->findObject("response", &obj)); 813 sp<ARTSPResponse> response = 814 static_cast<ARTSPResponse *>(obj.get()); 815 816 if (response->mStatusCode != 200) { 817 result = UNKNOWN_ERROR; 818 } else { 819 parsePlayResponse(response); 820 postTimeout(); 821 } 822 } 823 824 if (result != OK) { 825 sp<AMessage> reply = new AMessage('disc', this); 826 mConn->disconnect(reply); 827 } 828 829 break; 830 } 831 832 case 'aliv': 833 { 834 int32_t generation; 835 CHECK(msg->findInt32("generation", &generation)); 836 837 if (generation != mKeepAliveGeneration) { 838 // obsolete event. 839 break; 840 } 841 842 AString request; 843 request.append("OPTIONS "); 844 request.append(mSessionURL); 845 request.append(" RTSP/1.0\r\n"); 846 request.append("Session: "); 847 request.append(mSessionID); 848 request.append("\r\n"); 849 request.append("\r\n"); 850 851 sp<AMessage> reply = new AMessage('opts', this); 852 reply->setInt32("generation", mKeepAliveGeneration); 853 mConn->sendRequest(request.c_str(), reply); 854 break; 855 } 856 857 case 'opts': 858 { 859 int32_t result; 860 CHECK(msg->findInt32("result", &result)); 861 862 ALOGI("OPTIONS completed with result %d (%s)", 863 result, strerror(-result)); 864 865 int32_t generation; 866 CHECK(msg->findInt32("generation", &generation)); 867 868 if (generation != mKeepAliveGeneration) { 869 // obsolete event. 870 break; 871 } 872 873 postKeepAlive(); 874 break; 875 } 876 877 case 'abor': 878 { 879 for (size_t i = 0; i < mTracks.size(); ++i) { 880 TrackInfo *info = &mTracks.editItemAt(i); 881 882 if (!mFirstAccessUnit) { 883 postQueueEOS(i, ERROR_END_OF_STREAM); 884 } 885 886 if (!info->mUsingInterleavedTCP) { 887 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 888 889 // Clear the tag 890 if (mUIDValid) { 891 NetworkUtils::UnRegisterSocketUserTag(info->mRTPSocket); 892 NetworkUtils::UnRegisterSocketUserMark(info->mRTPSocket); 893 NetworkUtils::UnRegisterSocketUserTag(info->mRTCPSocket); 894 NetworkUtils::UnRegisterSocketUserMark(info->mRTCPSocket); 895 } 896 897 close(info->mRTPSocket); 898 close(info->mRTCPSocket); 899 } 900 } 901 mTracks.clear(); 902 mSetupTracksSuccessful = false; 903 mSeekPending = false; 904 mFirstAccessUnit = true; 905 mAllTracksHaveTime = false; 906 mNTPAnchorUs = -1; 907 mMediaAnchorUs = -1; 908 mNumAccessUnitsReceived = 0; 909 mReceivedFirstRTCPPacket = false; 910 mReceivedFirstRTPPacket = false; 911 mPausing = false; 912 mSeekable = true; 913 914 sp<AMessage> reply = new AMessage('tear', this); 915 916 int32_t reconnect; 917 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 918 reply->setInt32("reconnect", true); 919 } 920 921 AString request; 922 request = "TEARDOWN "; 923 924 // XXX should use aggregate url from SDP here... 925 request.append(mSessionURL); 926 request.append(" RTSP/1.0\r\n"); 927 928 request.append("Session: "); 929 request.append(mSessionID); 930 request.append("\r\n"); 931 932 request.append("\r\n"); 933 934 mConn->sendRequest(request.c_str(), reply); 935 936 // If the response of teardown hasn't been received in 3 seconds, 937 // post 'tear' message to avoid ANR. 938 if (!msg->findInt32("reconnect", &reconnect) || !reconnect) { 939 sp<AMessage> teardown = reply->dup(); 940 teardown->setInt32("result", -ECONNABORTED); 941 teardown->post(kTearDownTimeoutUs); 942 } 943 break; 944 } 945 946 case 'tear': 947 { 948 int32_t result; 949 CHECK(msg->findInt32("result", &result)); 950 951 ALOGI("TEARDOWN completed with result %d (%s)", 952 result, strerror(-result)); 953 954 sp<AMessage> reply = new AMessage('disc', this); 955 956 int32_t reconnect; 957 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 958 reply->setInt32("reconnect", true); 959 } 960 961 mConn->disconnect(reply); 962 break; 963 } 964 965 case 'quit': 966 { 967 sp<AMessage> msg = mNotify->dup(); 968 msg->setInt32("what", kWhatDisconnected); 969 msg->setInt32("result", UNKNOWN_ERROR); 970 msg->post(); 971 break; 972 } 973 974 case 'chek': 975 { 976 int32_t generation; 977 CHECK(msg->findInt32("generation", &generation)); 978 if (generation != mCheckGeneration) { 979 // This is an outdated message. Ignore. 980 break; 981 } 982 983 if (mNumAccessUnitsReceived == 0) { 984 #if 1 985 ALOGI("stream ended? aborting."); 986 (new AMessage('abor', this))->post(); 987 break; 988 #else 989 ALOGI("haven't seen an AU in a looong time."); 990 #endif 991 } 992 993 mNumAccessUnitsReceived = 0; 994 msg->post(kAccessUnitTimeoutUs); 995 break; 996 } 997 998 case 'accu': 999 { 1000 if (mSeekPending) { 1001 ALOGV("Stale access unit."); 1002 break; 1003 } 1004 1005 int32_t timeUpdate; 1006 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 1007 size_t trackIndex; 1008 CHECK(msg->findSize("track-index", &trackIndex)); 1009 1010 uint32_t rtpTime; 1011 uint64_t ntpTime; 1012 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 1013 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 1014 1015 onTimeUpdate(trackIndex, rtpTime, ntpTime); 1016 break; 1017 } 1018 1019 int32_t first; 1020 if (msg->findInt32("first-rtcp", &first)) { 1021 mReceivedFirstRTCPPacket = true; 1022 break; 1023 } 1024 1025 if (msg->findInt32("first-rtp", &first)) { 1026 mReceivedFirstRTPPacket = true; 1027 break; 1028 } 1029 1030 ++mNumAccessUnitsReceived; 1031 postAccessUnitTimeoutCheck(); 1032 1033 size_t trackIndex; 1034 CHECK(msg->findSize("track-index", &trackIndex)); 1035 1036 if (trackIndex >= mTracks.size()) { 1037 ALOGV("late packets ignored."); 1038 break; 1039 } 1040 1041 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1042 1043 int32_t eos; 1044 if (msg->findInt32("eos", &eos)) { 1045 ALOGI("received BYE on track index %zu", trackIndex); 1046 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1047 ALOGI("No time established => fake existing data"); 1048 1049 track->mEOSReceived = true; 1050 mTryFakeRTCP = true; 1051 mReceivedFirstRTCPPacket = true; 1052 fakeTimestamps(); 1053 } else { 1054 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1055 } 1056 return; 1057 } 1058 1059 if (mSeekPending) { 1060 ALOGV("we're seeking, dropping stale packet."); 1061 break; 1062 } 1063 1064 sp<ABuffer> accessUnit; 1065 CHECK(msg->findBuffer("access-unit", &accessUnit)); 1066 onAccessUnitComplete(trackIndex, accessUnit); 1067 break; 1068 } 1069 1070 case 'paus': 1071 { 1072 int32_t generation; 1073 CHECK(msg->findInt32("pausecheck", &generation)); 1074 if (generation != mPauseGeneration) { 1075 ALOGV("Ignoring outdated pause message."); 1076 break; 1077 } 1078 1079 if (!mSeekable) { 1080 ALOGW("This is a live stream, ignoring pause request."); 1081 break; 1082 } 1083 1084 if (mPausing) { 1085 ALOGV("This stream is already paused."); 1086 break; 1087 } 1088 1089 mCheckPending = true; 1090 ++mCheckGeneration; 1091 mPausing = true; 1092 1093 AString request = "PAUSE "; 1094 request.append(mControlURL); 1095 request.append(" RTSP/1.0\r\n"); 1096 1097 request.append("Session: "); 1098 request.append(mSessionID); 1099 request.append("\r\n"); 1100 1101 request.append("\r\n"); 1102 1103 sp<AMessage> reply = new AMessage('pau2', this); 1104 mConn->sendRequest(request.c_str(), reply); 1105 break; 1106 } 1107 1108 case 'pau2': 1109 { 1110 int32_t result; 1111 CHECK(msg->findInt32("result", &result)); 1112 mCheckTimeoutGeneration++; 1113 1114 ALOGI("PAUSE completed with result %d (%s)", 1115 result, strerror(-result)); 1116 break; 1117 } 1118 1119 case 'resu': 1120 { 1121 if (mPausing && mSeekPending) { 1122 // If seeking, Play will be sent from see1 instead 1123 break; 1124 } 1125 1126 if (!mPausing) { 1127 // Dont send PLAY if we have not paused 1128 break; 1129 } 1130 AString request = "PLAY "; 1131 request.append(mControlURL); 1132 request.append(" RTSP/1.0\r\n"); 1133 1134 request.append("Session: "); 1135 request.append(mSessionID); 1136 request.append("\r\n"); 1137 1138 request.append("\r\n"); 1139 1140 sp<AMessage> reply = new AMessage('res2', this); 1141 mConn->sendRequest(request.c_str(), reply); 1142 break; 1143 } 1144 1145 case 'res2': 1146 { 1147 int32_t result; 1148 CHECK(msg->findInt32("result", &result)); 1149 1150 ALOGI("PLAY (for resume) completed with result %d (%s)", 1151 result, strerror(-result)); 1152 1153 mCheckPending = false; 1154 ++mCheckGeneration; 1155 postAccessUnitTimeoutCheck(); 1156 1157 if (result == OK) { 1158 sp<RefBase> obj; 1159 CHECK(msg->findObject("response", &obj)); 1160 sp<ARTSPResponse> response = 1161 static_cast<ARTSPResponse *>(obj.get()); 1162 1163 if (response->mStatusCode != 200) { 1164 result = UNKNOWN_ERROR; 1165 } else { 1166 parsePlayResponse(response); 1167 1168 // Post new timeout in order to make sure to use 1169 // fake timestamps if no new Sender Reports arrive 1170 postTimeout(); 1171 } 1172 } 1173 1174 if (result != OK) { 1175 ALOGE("resume failed, aborting."); 1176 (new AMessage('abor', this))->post(); 1177 } 1178 1179 mPausing = false; 1180 break; 1181 } 1182 1183 case 'seek': 1184 { 1185 if (!mSeekable) { 1186 ALOGW("This is a live stream, ignoring seek request."); 1187 1188 sp<AMessage> msg = mNotify->dup(); 1189 msg->setInt32("what", kWhatSeekDone); 1190 msg->post(); 1191 break; 1192 } 1193 1194 int64_t timeUs; 1195 CHECK(msg->findInt64("time", &timeUs)); 1196 1197 mSeekPending = true; 1198 1199 // Disable the access unit timeout until we resumed 1200 // playback again. 1201 mCheckPending = true; 1202 ++mCheckGeneration; 1203 1204 sp<AMessage> reply = new AMessage('see0', this); 1205 reply->setInt64("time", timeUs); 1206 1207 if (mPausing) { 1208 // PAUSE already sent 1209 ALOGI("Pause already sent"); 1210 reply->post(); 1211 break; 1212 } 1213 AString request = "PAUSE "; 1214 request.append(mControlURL); 1215 request.append(" RTSP/1.0\r\n"); 1216 1217 request.append("Session: "); 1218 request.append(mSessionID); 1219 request.append("\r\n"); 1220 1221 request.append("\r\n"); 1222 1223 mConn->sendRequest(request.c_str(), reply); 1224 break; 1225 } 1226 1227 case 'see0': 1228 { 1229 // Session is paused now. 1230 status_t err = OK; 1231 msg->findInt32("result", &err); 1232 1233 int64_t timeUs; 1234 CHECK(msg->findInt64("time", &timeUs)); 1235 1236 sp<AMessage> notify = mNotify->dup(); 1237 notify->setInt32("what", kWhatSeekPaused); 1238 notify->setInt32("err", err); 1239 notify->setInt64("time", timeUs); 1240 notify->post(); 1241 break; 1242 1243 } 1244 1245 case 'see1': 1246 { 1247 for (size_t i = 0; i < mTracks.size(); ++i) { 1248 TrackInfo *info = &mTracks.editItemAt(i); 1249 1250 postQueueSeekDiscontinuity(i); 1251 info->mEOSReceived = false; 1252 1253 info->mRTPAnchor = 0; 1254 info->mNTPAnchorUs = -1; 1255 } 1256 1257 mAllTracksHaveTime = false; 1258 mNTPAnchorUs = -1; 1259 1260 // Start new timeoutgeneration to avoid getting timeout 1261 // before PLAY response arrive 1262 postTimeout(); 1263 1264 int64_t timeUs; 1265 CHECK(msg->findInt64("time", &timeUs)); 1266 1267 AString request = "PLAY "; 1268 request.append(mControlURL); 1269 request.append(" RTSP/1.0\r\n"); 1270 1271 request.append("Session: "); 1272 request.append(mSessionID); 1273 request.append("\r\n"); 1274 1275 request.append( 1276 AStringPrintf( 1277 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1278 1279 request.append("\r\n"); 1280 1281 sp<AMessage> reply = new AMessage('see2', this); 1282 mConn->sendRequest(request.c_str(), reply); 1283 break; 1284 } 1285 1286 case 'see2': 1287 { 1288 if (mTracks.size() == 0) { 1289 // We have already hit abor, break 1290 break; 1291 } 1292 1293 int32_t result; 1294 CHECK(msg->findInt32("result", &result)); 1295 1296 ALOGI("PLAY (for seek) completed with result %d (%s)", 1297 result, strerror(-result)); 1298 1299 mCheckPending = false; 1300 ++mCheckGeneration; 1301 postAccessUnitTimeoutCheck(); 1302 1303 if (result == OK) { 1304 sp<RefBase> obj; 1305 CHECK(msg->findObject("response", &obj)); 1306 sp<ARTSPResponse> response = 1307 static_cast<ARTSPResponse *>(obj.get()); 1308 1309 if (response->mStatusCode != 200) { 1310 result = UNKNOWN_ERROR; 1311 } else { 1312 parsePlayResponse(response); 1313 1314 // Post new timeout in order to make sure to use 1315 // fake timestamps if no new Sender Reports arrive 1316 postTimeout(); 1317 1318 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1319 CHECK_GE(i, 0); 1320 1321 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1322 1323 ALOGI("seek completed."); 1324 } 1325 } 1326 1327 if (result != OK) { 1328 ALOGE("seek failed, aborting."); 1329 (new AMessage('abor', this))->post(); 1330 } 1331 1332 mPausing = false; 1333 mSeekPending = false; 1334 1335 // Discard all stale access units. 1336 for (size_t i = 0; i < mTracks.size(); ++i) { 1337 TrackInfo *track = &mTracks.editItemAt(i); 1338 track->mPackets.clear(); 1339 } 1340 1341 sp<AMessage> msg = mNotify->dup(); 1342 msg->setInt32("what", kWhatSeekDone); 1343 msg->post(); 1344 break; 1345 } 1346 1347 case 'biny': 1348 { 1349 sp<ABuffer> buffer; 1350 CHECK(msg->findBuffer("buffer", &buffer)); 1351 1352 int32_t index; 1353 CHECK(buffer->meta()->findInt32("index", &index)); 1354 1355 mRTPConn->injectPacket(index, buffer); 1356 break; 1357 } 1358 1359 case 'tiou': 1360 { 1361 int32_t timeoutGenerationCheck; 1362 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); 1363 if (timeoutGenerationCheck != mCheckTimeoutGeneration) { 1364 // This is an outdated message. Ignore. 1365 // This typically happens if a lot of seeks are 1366 // performed, since new timeout messages now are 1367 // posted at seek as well. 1368 break; 1369 } 1370 if (!mReceivedFirstRTCPPacket) { 1371 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { 1372 ALOGW("We received RTP packets but no RTCP packets, " 1373 "using fake timestamps."); 1374 1375 mTryFakeRTCP = true; 1376 1377 mReceivedFirstRTCPPacket = true; 1378 1379 fakeTimestamps(); 1380 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1381 ALOGW("Never received any data, switching transports."); 1382 1383 mTryTCPInterleaving = true; 1384 1385 sp<AMessage> msg = new AMessage('abor', this); 1386 msg->setInt32("reconnect", true); 1387 msg->post(); 1388 } else { 1389 ALOGW("Never received any data, disconnecting."); 1390 (new AMessage('abor', this))->post(); 1391 } 1392 } else { 1393 if (!mAllTracksHaveTime) { 1394 ALOGW("We received some RTCP packets, but time " 1395 "could not be established on all tracks, now " 1396 "using fake timestamps"); 1397 1398 fakeTimestamps(); 1399 } 1400 } 1401 break; 1402 } 1403 1404 default: 1405 TRESPASS(); 1406 break; 1407 } 1408 } 1409 1410 void postKeepAlive() { 1411 sp<AMessage> msg = new AMessage('aliv', this); 1412 msg->setInt32("generation", mKeepAliveGeneration); 1413 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1414 } 1415 1416 void cancelAccessUnitTimeoutCheck() { 1417 ALOGV("cancelAccessUnitTimeoutCheck"); 1418 ++mCheckGeneration; 1419 } 1420 1421 void postAccessUnitTimeoutCheck() { 1422 if (mCheckPending) { 1423 return; 1424 } 1425 1426 mCheckPending = true; 1427 sp<AMessage> check = new AMessage('chek', this); 1428 check->setInt32("generation", mCheckGeneration); 1429 check->post(kAccessUnitTimeoutUs); 1430 } 1431 1432 static void SplitString( 1433 const AString &s, const char *separator, List<AString> *items) { 1434 items->clear(); 1435 size_t start = 0; 1436 while (start < s.size()) { 1437 ssize_t offset = s.find(separator, start); 1438 1439 if (offset < 0) { 1440 items->push_back(AString(s, start, s.size() - start)); 1441 break; 1442 } 1443 1444 items->push_back(AString(s, start, offset - start)); 1445 start = offset + strlen(separator); 1446 } 1447 } 1448 1449 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1450 mPlayResponseParsed = true; 1451 if (mTracks.size() == 0) { 1452 ALOGV("parsePlayResponse: late packets ignored."); 1453 return; 1454 } 1455 1456 ssize_t i = response->mHeaders.indexOfKey("range"); 1457 if (i < 0) { 1458 // Server doesn't even tell use what range it is going to 1459 // play, therefore we won't support seeking. 1460 return; 1461 } 1462 1463 AString range = response->mHeaders.valueAt(i); 1464 ALOGV("Range: %s", range.c_str()); 1465 1466 AString val; 1467 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1468 1469 float npt1, npt2; 1470 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1471 // This is a live stream and therefore not seekable. 1472 1473 ALOGI("This is a live stream"); 1474 return; 1475 } 1476 1477 i = response->mHeaders.indexOfKey("rtp-info"); 1478 CHECK_GE(i, 0); 1479 1480 AString rtpInfo = response->mHeaders.valueAt(i); 1481 List<AString> streamInfos; 1482 SplitString(rtpInfo, ",", &streamInfos); 1483 1484 int n = 1; 1485 for (List<AString>::iterator it = streamInfos.begin(); 1486 it != streamInfos.end(); ++it) { 1487 (*it).trim(); 1488 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1489 1490 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1491 1492 size_t trackIndex = 0; 1493 while (trackIndex < mTracks.size() 1494 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1495 ++trackIndex; 1496 } 1497 CHECK_LT(trackIndex, mTracks.size()); 1498 1499 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1500 1501 char *end; 1502 unsigned long seq = strtoul(val.c_str(), &end, 10); 1503 1504 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1505 info->mFirstSeqNumInSegment = seq; 1506 info->mNewSegment = true; 1507 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits; 1508 1509 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1510 1511 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1512 1513 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1514 1515 info->mNormalPlayTimeRTP = rtpTime; 1516 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1517 1518 if (!mFirstAccessUnit) { 1519 postNormalPlayTimeMapping( 1520 trackIndex, 1521 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1522 } 1523 1524 ++n; 1525 } 1526 } 1527 1528 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1529 CHECK_GE(index, 0u); 1530 CHECK_LT(index, mTracks.size()); 1531 1532 const TrackInfo &info = mTracks.itemAt(index); 1533 1534 *timeScale = info.mTimeScale; 1535 1536 return info.mPacketSource->getFormat(); 1537 } 1538 1539 size_t countTracks() const { 1540 return mTracks.size(); 1541 } 1542 1543 private: 1544 struct TrackInfo { 1545 AString mURL; 1546 int mRTPSocket; 1547 int mRTCPSocket; 1548 bool mUsingInterleavedTCP; 1549 uint32_t mFirstSeqNumInSegment; 1550 bool mNewSegment; 1551 int32_t mAllowedStaleAccessUnits; 1552 1553 uint32_t mRTPAnchor; 1554 int64_t mNTPAnchorUs; 1555 int32_t mTimeScale; 1556 bool mEOSReceived; 1557 1558 uint32_t mNormalPlayTimeRTP; 1559 int64_t mNormalPlayTimeUs; 1560 1561 sp<APacketSource> mPacketSource; 1562 1563 // Stores packets temporarily while no notion of time 1564 // has been established yet. 1565 List<sp<ABuffer> > mPackets; 1566 }; 1567 1568 sp<AMessage> mNotify; 1569 bool mUIDValid; 1570 uid_t mUID; 1571 sp<ALooper> mNetLooper; 1572 sp<ARTSPConnection> mConn; 1573 sp<ARTPConnection> mRTPConn; 1574 sp<ASessionDescription> mSessionDesc; 1575 AString mOriginalSessionURL; // This one still has user:pass@ 1576 AString mSessionURL; 1577 AString mSessionHost; 1578 AString mBaseURL; 1579 AString mControlURL; 1580 AString mSessionID; 1581 bool mSetupTracksSuccessful; 1582 bool mSeekPending; 1583 bool mFirstAccessUnit; 1584 1585 bool mAllTracksHaveTime; 1586 int64_t mNTPAnchorUs; 1587 int64_t mMediaAnchorUs; 1588 int64_t mLastMediaTimeUs; 1589 1590 int64_t mNumAccessUnitsReceived; 1591 bool mCheckPending; 1592 int32_t mCheckGeneration; 1593 int32_t mCheckTimeoutGeneration; 1594 bool mTryTCPInterleaving; 1595 bool mTryFakeRTCP; 1596 bool mReceivedFirstRTCPPacket; 1597 bool mReceivedFirstRTPPacket; 1598 bool mSeekable; 1599 int64_t mKeepAliveTimeoutUs; 1600 int32_t mKeepAliveGeneration; 1601 bool mPausing; 1602 int32_t mPauseGeneration; 1603 1604 Vector<TrackInfo> mTracks; 1605 1606 bool mPlayResponseParsed; 1607 1608 void setupTrack(size_t index) { 1609 sp<APacketSource> source = 1610 new APacketSource(mSessionDesc, index); 1611 1612 if (source->initCheck() != OK) { 1613 ALOGW("Unsupported format. Ignoring track #%zu.", index); 1614 1615 sp<AMessage> reply = new AMessage('setu', this); 1616 reply->setSize("index", index); 1617 reply->setInt32("result", ERROR_UNSUPPORTED); 1618 reply->post(); 1619 return; 1620 } 1621 1622 AString url; 1623 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1624 1625 AString trackURL; 1626 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1627 1628 mTracks.push(TrackInfo()); 1629 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1630 info->mURL = trackURL; 1631 info->mPacketSource = source; 1632 info->mUsingInterleavedTCP = false; 1633 info->mFirstSeqNumInSegment = 0; 1634 info->mNewSegment = true; 1635 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits; 1636 info->mRTPSocket = -1; 1637 info->mRTCPSocket = -1; 1638 info->mRTPAnchor = 0; 1639 info->mNTPAnchorUs = -1; 1640 info->mNormalPlayTimeRTP = 0; 1641 info->mNormalPlayTimeUs = 0ll; 1642 1643 unsigned long PT; 1644 AString formatDesc; 1645 AString formatParams; 1646 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1647 1648 int32_t timescale; 1649 int32_t numChannels; 1650 ASessionDescription::ParseFormatDesc( 1651 formatDesc.c_str(), ×cale, &numChannels); 1652 1653 info->mTimeScale = timescale; 1654 info->mEOSReceived = false; 1655 1656 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str()); 1657 1658 AString request = "SETUP "; 1659 request.append(trackURL); 1660 request.append(" RTSP/1.0\r\n"); 1661 1662 if (mTryTCPInterleaving) { 1663 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1664 info->mUsingInterleavedTCP = true; 1665 info->mRTPSocket = interleaveIndex; 1666 info->mRTCPSocket = interleaveIndex + 1; 1667 1668 request.append("Transport: RTP/AVP/TCP;interleaved="); 1669 request.append(interleaveIndex); 1670 request.append("-"); 1671 request.append(interleaveIndex + 1); 1672 } else { 1673 unsigned rtpPort; 1674 ARTPConnection::MakePortPair( 1675 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1676 1677 if (mUIDValid) { 1678 NetworkUtils::RegisterSocketUserTag(info->mRTPSocket, mUID, 1679 (uint32_t)*(uint32_t*) "RTP_"); 1680 NetworkUtils::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1681 (uint32_t)*(uint32_t*) "RTP_"); 1682 NetworkUtils::RegisterSocketUserMark(info->mRTPSocket, mUID); 1683 NetworkUtils::RegisterSocketUserMark(info->mRTCPSocket, mUID); 1684 } 1685 1686 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1687 request.append(rtpPort); 1688 request.append("-"); 1689 request.append(rtpPort + 1); 1690 } 1691 1692 request.append("\r\n"); 1693 1694 if (index > 1) { 1695 request.append("Session: "); 1696 request.append(mSessionID); 1697 request.append("\r\n"); 1698 } 1699 1700 request.append("\r\n"); 1701 1702 sp<AMessage> reply = new AMessage('setu', this); 1703 reply->setSize("index", index); 1704 reply->setSize("track-index", mTracks.size() - 1); 1705 mConn->sendRequest(request.c_str(), reply); 1706 } 1707 1708 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1709 out->clear(); 1710 1711 if (strncasecmp("rtsp://", baseURL, 7)) { 1712 // Base URL must be absolute 1713 return false; 1714 } 1715 1716 if (!strncasecmp("rtsp://", url, 7)) { 1717 // "url" is already an absolute URL, ignore base URL. 1718 out->setTo(url); 1719 return true; 1720 } 1721 1722 size_t n = strlen(baseURL); 1723 out->setTo(baseURL); 1724 if (baseURL[n - 1] != '/') { 1725 out->append("/"); 1726 } 1727 out->append(url); 1728 1729 return true; 1730 } 1731 1732 void fakeTimestamps() { 1733 mNTPAnchorUs = -1ll; 1734 for (size_t i = 0; i < mTracks.size(); ++i) { 1735 onTimeUpdate(i, 0, 0ll); 1736 } 1737 } 1738 1739 bool dataReceivedOnAllChannels() { 1740 TrackInfo *track; 1741 for (size_t i = 0; i < mTracks.size(); ++i) { 1742 track = &mTracks.editItemAt(i); 1743 if (track->mPackets.empty()) { 1744 return false; 1745 } 1746 } 1747 return true; 1748 } 1749 1750 void handleFirstAccessUnit() { 1751 if (mFirstAccessUnit) { 1752 sp<AMessage> msg = mNotify->dup(); 1753 msg->setInt32("what", kWhatConnected); 1754 msg->post(); 1755 1756 if (mSeekable) { 1757 for (size_t i = 0; i < mTracks.size(); ++i) { 1758 TrackInfo *info = &mTracks.editItemAt(i); 1759 1760 postNormalPlayTimeMapping( 1761 i, 1762 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1763 } 1764 } 1765 1766 mFirstAccessUnit = false; 1767 } 1768 } 1769 1770 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1771 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx", 1772 trackIndex, rtpTime, (long long)ntpTime); 1773 1774 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1775 1776 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1777 1778 track->mRTPAnchor = rtpTime; 1779 track->mNTPAnchorUs = ntpTimeUs; 1780 1781 if (mNTPAnchorUs < 0) { 1782 mNTPAnchorUs = ntpTimeUs; 1783 mMediaAnchorUs = mLastMediaTimeUs; 1784 } 1785 1786 if (!mAllTracksHaveTime) { 1787 bool allTracksHaveTime = (mTracks.size() > 0); 1788 for (size_t i = 0; i < mTracks.size(); ++i) { 1789 TrackInfo *track = &mTracks.editItemAt(i); 1790 if (track->mNTPAnchorUs < 0) { 1791 allTracksHaveTime = false; 1792 break; 1793 } 1794 } 1795 if (allTracksHaveTime) { 1796 mAllTracksHaveTime = true; 1797 ALOGI("Time now established for all tracks."); 1798 } 1799 } 1800 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1801 handleFirstAccessUnit(); 1802 1803 // Time is now established, lets start timestamping immediately 1804 for (size_t i = 0; i < mTracks.size(); ++i) { 1805 if (OK != processAccessUnitQueue(i)) { 1806 return; 1807 } 1808 } 1809 for (size_t i = 0; i < mTracks.size(); ++i) { 1810 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1811 if (trackInfo->mEOSReceived) { 1812 postQueueEOS(i, ERROR_END_OF_STREAM); 1813 trackInfo->mEOSReceived = false; 1814 } 1815 } 1816 } 1817 } 1818 1819 status_t processAccessUnitQueue(int32_t trackIndex) { 1820 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1821 while (!track->mPackets.empty()) { 1822 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1823 track->mPackets.erase(track->mPackets.begin()); 1824 1825 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1826 if (track->mNewSegment) { 1827 // The sequence number from RTP packet has only 16 bits and is extended 1828 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of 1829 // RTSP "PLAY" command should be used to detect the first RTP packet 1830 // after seeking. 1831 if (mSeekable) { 1832 if (track->mAllowedStaleAccessUnits > 0) { 1833 uint32_t seqNum16 = seqNum & 0xffff; 1834 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff; 1835 if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits 1836 || seqNum16 < firstSeqNumInSegment16) { 1837 // Not the first rtp packet of the stream after seeking, discarding. 1838 track->mAllowedStaleAccessUnits--; 1839 ALOGV("discarding stale access unit (0x%x : 0x%x)", 1840 seqNum, track->mFirstSeqNumInSegment); 1841 continue; 1842 } 1843 ALOGW_IF(seqNum16 != firstSeqNumInSegment16, 1844 "Missing the first packet(%u), now take packet(%u) as first one", 1845 track->mFirstSeqNumInSegment, seqNum); 1846 } else { // track->mAllowedStaleAccessUnits <= 0 1847 mNumAccessUnitsReceived = 0; 1848 ALOGW_IF(track->mAllowedStaleAccessUnits == 0, 1849 "Still no first rtp packet after %d stale ones", 1850 kMaxAllowedStaleAccessUnits); 1851 track->mAllowedStaleAccessUnits = -1; 1852 return UNKNOWN_ERROR; 1853 } 1854 } 1855 1856 // Now found the first rtp packet of the stream after seeking. 1857 track->mFirstSeqNumInSegment = seqNum; 1858 track->mNewSegment = false; 1859 } 1860 1861 if (seqNum < track->mFirstSeqNumInSegment) { 1862 ALOGV("dropping stale access-unit (%d < %d)", 1863 seqNum, track->mFirstSeqNumInSegment); 1864 continue; 1865 } 1866 1867 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1868 postQueueAccessUnit(trackIndex, accessUnit); 1869 } 1870 } 1871 return OK; 1872 } 1873 1874 void onAccessUnitComplete( 1875 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1876 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1877 track->mPackets.push_back(accessUnit); 1878 1879 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1880 ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum); 1881 1882 if(!mPlayResponseParsed){ 1883 ALOGV("play response is not parsed"); 1884 return; 1885 } 1886 1887 handleFirstAccessUnit(); 1888 1889 if (!mAllTracksHaveTime) { 1890 ALOGV("storing accessUnit, no time established yet"); 1891 return; 1892 } 1893 1894 if (OK != processAccessUnitQueue(trackIndex)) { 1895 return; 1896 } 1897 1898 if (track->mEOSReceived) { 1899 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1900 track->mEOSReceived = false; 1901 } 1902 } 1903 1904 bool addMediaTimestamp( 1905 int32_t trackIndex, const TrackInfo *track, 1906 const sp<ABuffer> &accessUnit) { 1907 UNUSED_UNLESS_VERBOSE(trackIndex); 1908 1909 uint32_t rtpTime; 1910 CHECK(accessUnit->meta()->findInt32( 1911 "rtp-time", (int32_t *)&rtpTime)); 1912 1913 int64_t relRtpTimeUs = 1914 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1915 / track->mTimeScale; 1916 1917 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1918 1919 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1920 1921 if (mediaTimeUs > mLastMediaTimeUs) { 1922 mLastMediaTimeUs = mediaTimeUs; 1923 } 1924 1925 if (mediaTimeUs < 0 && !mSeekable) { 1926 ALOGV("dropping early accessUnit."); 1927 return false; 1928 } 1929 1930 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)", 1931 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6); 1932 1933 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1934 1935 return true; 1936 } 1937 1938 void postQueueAccessUnit( 1939 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1940 sp<AMessage> msg = mNotify->dup(); 1941 msg->setInt32("what", kWhatAccessUnit); 1942 msg->setSize("trackIndex", trackIndex); 1943 msg->setBuffer("accessUnit", accessUnit); 1944 msg->post(); 1945 } 1946 1947 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1948 sp<AMessage> msg = mNotify->dup(); 1949 msg->setInt32("what", kWhatEOS); 1950 msg->setSize("trackIndex", trackIndex); 1951 msg->setInt32("finalResult", finalResult); 1952 msg->post(); 1953 } 1954 1955 void postQueueSeekDiscontinuity(size_t trackIndex) { 1956 sp<AMessage> msg = mNotify->dup(); 1957 msg->setInt32("what", kWhatSeekDiscontinuity); 1958 msg->setSize("trackIndex", trackIndex); 1959 msg->post(); 1960 } 1961 1962 void postNormalPlayTimeMapping( 1963 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1964 sp<AMessage> msg = mNotify->dup(); 1965 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1966 msg->setSize("trackIndex", trackIndex); 1967 msg->setInt32("rtpTime", rtpTime); 1968 msg->setInt64("nptUs", nptUs); 1969 msg->post(); 1970 } 1971 1972 void postTimeout() { 1973 sp<AMessage> timeout = new AMessage('tiou', this); 1974 mCheckTimeoutGeneration++; 1975 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1976 1977 int64_t startupTimeoutUs; 1978 startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs); 1979 timeout->post(startupTimeoutUs); 1980 } 1981 1982 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1983 }; 1984 1985 } // namespace android 1986 1987 #endif // MY_HANDLER_H_ 1988