1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MY_HANDLER_H_ 18 19 #define MY_HANDLER_H_ 20 21 //#define LOG_NDEBUG 0 22 23 #ifndef LOG_TAG 24 #define LOG_TAG "MyHandler" 25 #endif 26 27 #include <utils/Log.h> 28 29 #include "APacketSource.h" 30 #include "ARTPConnection.h" 31 #include "ARTSPConnection.h" 32 #include "ASessionDescription.h" 33 34 #include <ctype.h> 35 36 #include <media/stagefright/foundation/ABuffer.h> 37 #include <media/stagefright/foundation/ADebug.h> 38 #include <media/stagefright/foundation/ALooper.h> 39 #include <media/stagefright/foundation/AMessage.h> 40 #include <media/stagefright/MediaErrors.h> 41 #include <media/stagefright/Utils.h> 42 43 #include <arpa/inet.h> 44 #include <sys/socket.h> 45 #include <netdb.h> 46 47 #include "HTTPBase.h" 48 49 #if LOG_NDEBUG 50 #define UNUSED_UNLESS_VERBOSE(x) (void)(x) 51 #else 52 #define UNUSED_UNLESS_VERBOSE(x) 53 #endif 54 55 // If no access units are received within 5 secs, assume that the rtp 56 // stream has ended and signal end of stream. 57 static int64_t kAccessUnitTimeoutUs = 10000000ll; 58 59 // If no access units arrive for the first 10 secs after starting the 60 // stream, assume none ever will and signal EOS or switch transports. 61 static int64_t kStartupTimeoutUs = 10000000ll; 62 63 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 64 65 static int64_t kPauseDelayUs = 3000000ll; 66 67 // The allowed maximum number of stale access units at the beginning of 68 // a new sequence. 69 static int32_t kMaxAllowedStaleAccessUnits = 20; 70 71 namespace android { 72 73 static bool GetAttribute(const char *s, const char *key, AString *value) { 74 value->clear(); 75 76 size_t keyLen = strlen(key); 77 78 for (;;) { 79 while (isspace(*s)) { 80 ++s; 81 } 82 83 const char *colonPos = strchr(s, ';'); 84 85 size_t len = 86 (colonPos == NULL) ? strlen(s) : colonPos - s; 87 88 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 89 value->setTo(&s[keyLen + 1], len - keyLen - 1); 90 return true; 91 } 92 93 if (colonPos == NULL) { 94 return false; 95 } 96 97 s = colonPos + 1; 98 } 99 } 100 101 struct MyHandler : public AHandler { 102 enum { 103 kWhatConnected = 'conn', 104 kWhatDisconnected = 'disc', 105 kWhatSeekPaused = 'spau', 106 kWhatSeekDone = 'sdon', 107 108 kWhatAccessUnit = 'accU', 109 kWhatEOS = 'eos!', 110 kWhatSeekDiscontinuity = 'seeD', 111 kWhatNormalPlayTimeMapping = 'nptM', 112 }; 113 114 MyHandler( 115 const char *url, 116 const sp<AMessage> ¬ify, 117 bool uidValid = false, uid_t uid = 0) 118 : mNotify(notify), 119 mUIDValid(uidValid), 120 mUID(uid), 121 mNetLooper(new ALooper), 122 mConn(new ARTSPConnection(mUIDValid, mUID)), 123 mRTPConn(new ARTPConnection), 124 mOriginalSessionURL(url), 125 mSessionURL(url), 126 mSetupTracksSuccessful(false), 127 mSeekPending(false), 128 mFirstAccessUnit(true), 129 mAllTracksHaveTime(false), 130 mNTPAnchorUs(-1), 131 mMediaAnchorUs(-1), 132 mLastMediaTimeUs(0), 133 mNumAccessUnitsReceived(0), 134 mCheckPending(false), 135 mCheckGeneration(0), 136 mCheckTimeoutGeneration(0), 137 mTryTCPInterleaving(false), 138 mTryFakeRTCP(false), 139 mReceivedFirstRTCPPacket(false), 140 mReceivedFirstRTPPacket(false), 141 mSeekable(true), 142 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 143 mKeepAliveGeneration(0), 144 mPausing(false), 145 mPauseGeneration(0), 146 mPlayResponseParsed(false) { 147 mNetLooper->setName("rtsp net"); 148 mNetLooper->start(false /* runOnCallingThread */, 149 false /* canCallJava */, 150 PRIORITY_HIGHEST); 151 152 // Strip any authentication info from the session url, we don't 153 // want to transmit user/pass in cleartext. 154 AString host, path, user, pass; 155 unsigned port; 156 CHECK(ARTSPConnection::ParseURL( 157 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 158 159 if (user.size() > 0) { 160 mSessionURL.clear(); 161 mSessionURL.append("rtsp://"); 162 mSessionURL.append(host); 163 mSessionURL.append(":"); 164 mSessionURL.append(AStringPrintf("%u", port)); 165 mSessionURL.append(path); 166 167 ALOGV("rewritten session url: '%s'", mSessionURL.c_str()); 168 } 169 170 mSessionHost = host; 171 } 172 173 void connect() { 174 looper()->registerHandler(mConn); 175 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 176 177 sp<AMessage> notify = new AMessage('biny', this); 178 mConn->observeBinaryData(notify); 179 180 sp<AMessage> reply = new AMessage('conn', this); 181 mConn->connect(mOriginalSessionURL.c_str(), reply); 182 } 183 184 void loadSDP(const sp<ASessionDescription>& desc) { 185 looper()->registerHandler(mConn); 186 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 187 188 sp<AMessage> notify = new AMessage('biny', this); 189 mConn->observeBinaryData(notify); 190 191 sp<AMessage> reply = new AMessage('sdpl', this); 192 reply->setObject("description", desc); 193 mConn->connect(mOriginalSessionURL.c_str(), reply); 194 } 195 196 AString getControlURL() { 197 AString sessionLevelControlURL; 198 if (mSessionDesc->findAttribute( 199 0, 200 "a=control", 201 &sessionLevelControlURL)) { 202 if (sessionLevelControlURL.compare("*") == 0) { 203 return mBaseURL; 204 } else { 205 AString controlURL; 206 CHECK(MakeURL( 207 mBaseURL.c_str(), 208 sessionLevelControlURL.c_str(), 209 &controlURL)); 210 return controlURL; 211 } 212 } else { 213 return mSessionURL; 214 } 215 } 216 217 void disconnect() { 218 (new AMessage('abor', this))->post(); 219 } 220 221 void seek(int64_t timeUs) { 222 sp<AMessage> msg = new AMessage('seek', this); 223 msg->setInt64("time", timeUs); 224 mPauseGeneration++; 225 msg->post(); 226 } 227 228 void continueSeekAfterPause(int64_t timeUs) { 229 sp<AMessage> msg = new AMessage('see1', this); 230 msg->setInt64("time", timeUs); 231 msg->post(); 232 } 233 234 bool isSeekable() const { 235 return mSeekable; 236 } 237 238 void pause() { 239 sp<AMessage> msg = new AMessage('paus', this); 240 mPauseGeneration++; 241 msg->setInt32("pausecheck", mPauseGeneration); 242 msg->post(); 243 } 244 245 void resume() { 246 sp<AMessage> msg = new AMessage('resu', this); 247 mPauseGeneration++; 248 msg->post(); 249 } 250 251 static void addRR(const sp<ABuffer> &buf) { 252 uint8_t *ptr = buf->data() + buf->size(); 253 ptr[0] = 0x80 | 0; 254 ptr[1] = 201; // RR 255 ptr[2] = 0; 256 ptr[3] = 1; 257 ptr[4] = 0xde; // SSRC 258 ptr[5] = 0xad; 259 ptr[6] = 0xbe; 260 ptr[7] = 0xef; 261 262 buf->setRange(0, buf->size() + 8); 263 } 264 265 static void addSDES(int s, const sp<ABuffer> &buffer) { 266 struct sockaddr_in addr; 267 socklen_t addrSize = sizeof(addr); 268 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) { 269 inet_aton("0.0.0.0", &(addr.sin_addr)); 270 } 271 272 uint8_t *data = buffer->data() + buffer->size(); 273 data[0] = 0x80 | 1; 274 data[1] = 202; // SDES 275 data[4] = 0xde; // SSRC 276 data[5] = 0xad; 277 data[6] = 0xbe; 278 data[7] = 0xef; 279 280 size_t offset = 8; 281 282 data[offset++] = 1; // CNAME 283 284 AString cname = "stagefright@"; 285 cname.append(inet_ntoa(addr.sin_addr)); 286 data[offset++] = cname.size(); 287 288 memcpy(&data[offset], cname.c_str(), cname.size()); 289 offset += cname.size(); 290 291 data[offset++] = 6; // TOOL 292 293 AString tool = MakeUserAgent(); 294 295 data[offset++] = tool.size(); 296 297 memcpy(&data[offset], tool.c_str(), tool.size()); 298 offset += tool.size(); 299 300 data[offset++] = 0; 301 302 if ((offset % 4) > 0) { 303 size_t count = 4 - (offset % 4); 304 switch (count) { 305 case 3: 306 data[offset++] = 0; 307 case 2: 308 data[offset++] = 0; 309 case 1: 310 data[offset++] = 0; 311 } 312 } 313 314 size_t numWords = (offset / 4) - 1; 315 data[2] = numWords >> 8; 316 data[3] = numWords & 0xff; 317 318 buffer->setRange(buffer->offset(), buffer->size() + offset); 319 } 320 321 // In case we're behind NAT, fire off two UDP packets to the remote 322 // rtp/rtcp ports to poke a hole into the firewall for future incoming 323 // packets. We're going to send an RR/SDES RTCP packet to both of them. 324 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 325 struct sockaddr_in addr; 326 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 327 addr.sin_family = AF_INET; 328 329 AString source; 330 AString server_port; 331 if (!GetAttribute(transport.c_str(), 332 "source", 333 &source)) { 334 ALOGW("Missing 'source' field in Transport response. Using " 335 "RTSP endpoint address."); 336 337 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 338 if (ent == NULL) { 339 ALOGE("Failed to look up address of session host '%s'", 340 mSessionHost.c_str()); 341 342 return false; 343 } 344 345 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 346 } else { 347 addr.sin_addr.s_addr = inet_addr(source.c_str()); 348 } 349 350 if (!GetAttribute(transport.c_str(), 351 "server_port", 352 &server_port)) { 353 ALOGI("Missing 'server_port' field in Transport response."); 354 return false; 355 } 356 357 int rtpPort, rtcpPort; 358 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 359 || rtpPort <= 0 || rtpPort > 65535 360 || rtcpPort <=0 || rtcpPort > 65535 361 || rtcpPort != rtpPort + 1) { 362 ALOGE("Server picked invalid RTP/RTCP port pair %s," 363 " RTP port must be even, RTCP port must be one higher.", 364 server_port.c_str()); 365 366 return false; 367 } 368 369 if (rtpPort & 1) { 370 ALOGW("Server picked an odd RTP port, it should've picked an " 371 "even one, we'll let it pass for now, but this may break " 372 "in the future."); 373 } 374 375 if (addr.sin_addr.s_addr == INADDR_NONE) { 376 return true; 377 } 378 379 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 380 // No firewalls to traverse on the loopback interface. 381 return true; 382 } 383 384 // Make up an RR/SDES RTCP packet. 385 sp<ABuffer> buf = new ABuffer(65536); 386 buf->setRange(0, 0); 387 addRR(buf); 388 addSDES(rtpSocket, buf); 389 390 addr.sin_port = htons(rtpPort); 391 392 ssize_t n = sendto( 393 rtpSocket, buf->data(), buf->size(), 0, 394 (const sockaddr *)&addr, sizeof(addr)); 395 396 if (n < (ssize_t)buf->size()) { 397 ALOGE("failed to poke a hole for RTP packets"); 398 return false; 399 } 400 401 addr.sin_port = htons(rtcpPort); 402 403 n = sendto( 404 rtcpSocket, buf->data(), buf->size(), 0, 405 (const sockaddr *)&addr, sizeof(addr)); 406 407 if (n < (ssize_t)buf->size()) { 408 ALOGE("failed to poke a hole for RTCP packets"); 409 return false; 410 } 411 412 ALOGV("successfully poked holes."); 413 414 return true; 415 } 416 417 static bool isLiveStream(const sp<ASessionDescription> &desc) { 418 AString attrLiveStream; 419 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) { 420 ssize_t semicolonPos = attrLiveStream.find(";", 2); 421 422 const char* liveStreamValue; 423 if (semicolonPos < 0) { 424 liveStreamValue = attrLiveStream.c_str(); 425 } else { 426 AString valString; 427 valString.setTo(attrLiveStream, 428 semicolonPos + 1, 429 attrLiveStream.size() - semicolonPos - 1); 430 liveStreamValue = valString.c_str(); 431 } 432 433 uint32_t value = strtoul(liveStreamValue, NULL, 10); 434 if (value == 1) { 435 ALOGV("found live stream"); 436 return true; 437 } 438 } else { 439 // It is a live stream if no duration is returned 440 int64_t durationUs; 441 if (!desc->getDurationUs(&durationUs)) { 442 ALOGV("No duration found, assume live stream"); 443 return true; 444 } 445 } 446 447 return false; 448 } 449 450 virtual void onMessageReceived(const sp<AMessage> &msg) { 451 switch (msg->what()) { 452 case 'conn': 453 { 454 int32_t result; 455 CHECK(msg->findInt32("result", &result)); 456 457 ALOGI("connection request completed with result %d (%s)", 458 result, strerror(-result)); 459 460 if (result == OK) { 461 AString request; 462 request = "DESCRIBE "; 463 request.append(mSessionURL); 464 request.append(" RTSP/1.0\r\n"); 465 request.append("Accept: application/sdp\r\n"); 466 request.append("\r\n"); 467 468 sp<AMessage> reply = new AMessage('desc', this); 469 mConn->sendRequest(request.c_str(), reply); 470 } else { 471 (new AMessage('disc', this))->post(); 472 } 473 break; 474 } 475 476 case 'disc': 477 { 478 ++mKeepAliveGeneration; 479 480 int32_t reconnect; 481 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 482 sp<AMessage> reply = new AMessage('conn', this); 483 mConn->connect(mOriginalSessionURL.c_str(), reply); 484 } else { 485 (new AMessage('quit', this))->post(); 486 } 487 break; 488 } 489 490 case 'desc': 491 { 492 int32_t result; 493 CHECK(msg->findInt32("result", &result)); 494 495 ALOGI("DESCRIBE completed with result %d (%s)", 496 result, strerror(-result)); 497 498 if (result == OK) { 499 sp<RefBase> obj; 500 CHECK(msg->findObject("response", &obj)); 501 sp<ARTSPResponse> response = 502 static_cast<ARTSPResponse *>(obj.get()); 503 504 if (response->mStatusCode == 301 || response->mStatusCode == 302) { 505 ssize_t i = response->mHeaders.indexOfKey("location"); 506 CHECK_GE(i, 0); 507 508 mOriginalSessionURL = response->mHeaders.valueAt(i); 509 mSessionURL = mOriginalSessionURL; 510 511 // Strip any authentication info from the session url, we don't 512 // want to transmit user/pass in cleartext. 513 AString host, path, user, pass; 514 unsigned port; 515 if (ARTSPConnection::ParseURL( 516 mSessionURL.c_str(), &host, &port, &path, &user, &pass) 517 && user.size() > 0) { 518 mSessionURL.clear(); 519 mSessionURL.append("rtsp://"); 520 mSessionURL.append(host); 521 mSessionURL.append(":"); 522 mSessionURL.append(AStringPrintf("%u", port)); 523 mSessionURL.append(path); 524 525 ALOGI("rewritten session url: '%s'", mSessionURL.c_str()); 526 } 527 528 sp<AMessage> reply = new AMessage('conn', this); 529 mConn->connect(mOriginalSessionURL.c_str(), reply); 530 break; 531 } 532 533 if (response->mStatusCode != 200) { 534 result = UNKNOWN_ERROR; 535 } else if (response->mContent == NULL) { 536 result = ERROR_MALFORMED; 537 ALOGE("The response has no content."); 538 } else { 539 mSessionDesc = new ASessionDescription; 540 541 mSessionDesc->setTo( 542 response->mContent->data(), 543 response->mContent->size()); 544 545 if (!mSessionDesc->isValid()) { 546 ALOGE("Failed to parse session description."); 547 result = ERROR_MALFORMED; 548 } else { 549 ssize_t i = response->mHeaders.indexOfKey("content-base"); 550 if (i >= 0) { 551 mBaseURL = response->mHeaders.valueAt(i); 552 } else { 553 i = response->mHeaders.indexOfKey("content-location"); 554 if (i >= 0) { 555 mBaseURL = response->mHeaders.valueAt(i); 556 } else { 557 mBaseURL = mSessionURL; 558 } 559 } 560 561 mSeekable = !isLiveStream(mSessionDesc); 562 563 if (!mBaseURL.startsWith("rtsp://")) { 564 // Some misbehaving servers specify a relative 565 // URL in one of the locations above, combine 566 // it with the absolute session URL to get 567 // something usable... 568 569 ALOGW("Server specified a non-absolute base URL" 570 ", combining it with the session URL to " 571 "get something usable..."); 572 573 AString tmp; 574 CHECK(MakeURL( 575 mSessionURL.c_str(), 576 mBaseURL.c_str(), 577 &tmp)); 578 579 mBaseURL = tmp; 580 } 581 582 mControlURL = getControlURL(); 583 584 if (mSessionDesc->countTracks() < 2) { 585 // There's no actual tracks in this session. 586 // The first "track" is merely session meta 587 // data. 588 589 ALOGW("Session doesn't contain any playable " 590 "tracks. Aborting."); 591 result = ERROR_UNSUPPORTED; 592 } else { 593 setupTrack(1); 594 } 595 } 596 } 597 } 598 599 if (result != OK) { 600 sp<AMessage> reply = new AMessage('disc', this); 601 mConn->disconnect(reply); 602 } 603 break; 604 } 605 606 case 'sdpl': 607 { 608 int32_t result; 609 CHECK(msg->findInt32("result", &result)); 610 611 ALOGI("SDP connection request completed with result %d (%s)", 612 result, strerror(-result)); 613 614 if (result == OK) { 615 sp<RefBase> obj; 616 CHECK(msg->findObject("description", &obj)); 617 mSessionDesc = 618 static_cast<ASessionDescription *>(obj.get()); 619 620 if (!mSessionDesc->isValid()) { 621 ALOGE("Failed to parse session description."); 622 result = ERROR_MALFORMED; 623 } else { 624 mBaseURL = mSessionURL; 625 626 mSeekable = !isLiveStream(mSessionDesc); 627 628 mControlURL = getControlURL(); 629 630 if (mSessionDesc->countTracks() < 2) { 631 // There's no actual tracks in this session. 632 // The first "track" is merely session meta 633 // data. 634 635 ALOGW("Session doesn't contain any playable " 636 "tracks. Aborting."); 637 result = ERROR_UNSUPPORTED; 638 } else { 639 setupTrack(1); 640 } 641 } 642 } 643 644 if (result != OK) { 645 sp<AMessage> reply = new AMessage('disc', this); 646 mConn->disconnect(reply); 647 } 648 break; 649 } 650 651 case 'setu': 652 { 653 size_t index; 654 CHECK(msg->findSize("index", &index)); 655 656 TrackInfo *track = NULL; 657 size_t trackIndex; 658 if (msg->findSize("track-index", &trackIndex)) { 659 track = &mTracks.editItemAt(trackIndex); 660 } 661 662 int32_t result; 663 CHECK(msg->findInt32("result", &result)); 664 665 ALOGI("SETUP(%zu) completed with result %d (%s)", 666 index, result, strerror(-result)); 667 668 if (result == OK) { 669 CHECK(track != NULL); 670 671 sp<RefBase> obj; 672 CHECK(msg->findObject("response", &obj)); 673 sp<ARTSPResponse> response = 674 static_cast<ARTSPResponse *>(obj.get()); 675 676 if (response->mStatusCode != 200) { 677 result = UNKNOWN_ERROR; 678 } else { 679 ssize_t i = response->mHeaders.indexOfKey("session"); 680 CHECK_GE(i, 0); 681 682 mSessionID = response->mHeaders.valueAt(i); 683 684 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 685 AString timeoutStr; 686 if (GetAttribute( 687 mSessionID.c_str(), "timeout", &timeoutStr)) { 688 char *end; 689 unsigned long timeoutSecs = 690 strtoul(timeoutStr.c_str(), &end, 10); 691 692 if (end == timeoutStr.c_str() || *end != '\0') { 693 ALOGW("server specified malformed timeout '%s'", 694 timeoutStr.c_str()); 695 696 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 697 } else if (timeoutSecs < 15) { 698 ALOGW("server specified too short a timeout " 699 "(%lu secs), using default.", 700 timeoutSecs); 701 702 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 703 } else { 704 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 705 706 ALOGI("server specified timeout of %lu secs.", 707 timeoutSecs); 708 } 709 } 710 711 i = mSessionID.find(";"); 712 if (i >= 0) { 713 // Remove options, i.e. ";timeout=90" 714 mSessionID.erase(i, mSessionID.size() - i); 715 } 716 717 sp<AMessage> notify = new AMessage('accu', this); 718 notify->setSize("track-index", trackIndex); 719 720 i = response->mHeaders.indexOfKey("transport"); 721 CHECK_GE(i, 0); 722 723 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) { 724 if (!track->mUsingInterleavedTCP) { 725 AString transport = response->mHeaders.valueAt(i); 726 727 // We are going to continue even if we were 728 // unable to poke a hole into the firewall... 729 pokeAHole( 730 track->mRTPSocket, 731 track->mRTCPSocket, 732 transport); 733 } 734 735 mRTPConn->addStream( 736 track->mRTPSocket, track->mRTCPSocket, 737 mSessionDesc, index, 738 notify, track->mUsingInterleavedTCP); 739 740 mSetupTracksSuccessful = true; 741 } else { 742 result = BAD_VALUE; 743 } 744 } 745 } 746 747 if (result != OK) { 748 if (track) { 749 if (!track->mUsingInterleavedTCP) { 750 // Clear the tag 751 if (mUIDValid) { 752 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 753 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket); 754 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 755 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket); 756 } 757 758 close(track->mRTPSocket); 759 close(track->mRTCPSocket); 760 } 761 762 mTracks.removeItemsAt(trackIndex); 763 } 764 } 765 766 ++index; 767 if (result == OK && index < mSessionDesc->countTracks()) { 768 setupTrack(index); 769 } else if (mSetupTracksSuccessful) { 770 ++mKeepAliveGeneration; 771 postKeepAlive(); 772 773 AString request = "PLAY "; 774 request.append(mControlURL); 775 request.append(" RTSP/1.0\r\n"); 776 777 request.append("Session: "); 778 request.append(mSessionID); 779 request.append("\r\n"); 780 781 request.append("\r\n"); 782 783 sp<AMessage> reply = new AMessage('play', this); 784 mConn->sendRequest(request.c_str(), reply); 785 } else { 786 sp<AMessage> reply = new AMessage('disc', this); 787 mConn->disconnect(reply); 788 } 789 break; 790 } 791 792 case 'play': 793 { 794 int32_t result; 795 CHECK(msg->findInt32("result", &result)); 796 797 ALOGI("PLAY completed with result %d (%s)", 798 result, strerror(-result)); 799 800 if (result == OK) { 801 sp<RefBase> obj; 802 CHECK(msg->findObject("response", &obj)); 803 sp<ARTSPResponse> response = 804 static_cast<ARTSPResponse *>(obj.get()); 805 806 if (response->mStatusCode != 200) { 807 result = UNKNOWN_ERROR; 808 } else { 809 parsePlayResponse(response); 810 811 sp<AMessage> timeout = new AMessage('tiou', this); 812 mCheckTimeoutGeneration++; 813 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 814 timeout->post(kStartupTimeoutUs); 815 } 816 } 817 818 if (result != OK) { 819 sp<AMessage> reply = new AMessage('disc', this); 820 mConn->disconnect(reply); 821 } 822 823 break; 824 } 825 826 case 'aliv': 827 { 828 int32_t generation; 829 CHECK(msg->findInt32("generation", &generation)); 830 831 if (generation != mKeepAliveGeneration) { 832 // obsolete event. 833 break; 834 } 835 836 AString request; 837 request.append("OPTIONS "); 838 request.append(mSessionURL); 839 request.append(" RTSP/1.0\r\n"); 840 request.append("Session: "); 841 request.append(mSessionID); 842 request.append("\r\n"); 843 request.append("\r\n"); 844 845 sp<AMessage> reply = new AMessage('opts', this); 846 reply->setInt32("generation", mKeepAliveGeneration); 847 mConn->sendRequest(request.c_str(), reply); 848 break; 849 } 850 851 case 'opts': 852 { 853 int32_t result; 854 CHECK(msg->findInt32("result", &result)); 855 856 ALOGI("OPTIONS completed with result %d (%s)", 857 result, strerror(-result)); 858 859 int32_t generation; 860 CHECK(msg->findInt32("generation", &generation)); 861 862 if (generation != mKeepAliveGeneration) { 863 // obsolete event. 864 break; 865 } 866 867 postKeepAlive(); 868 break; 869 } 870 871 case 'abor': 872 { 873 for (size_t i = 0; i < mTracks.size(); ++i) { 874 TrackInfo *info = &mTracks.editItemAt(i); 875 876 if (!mFirstAccessUnit) { 877 postQueueEOS(i, ERROR_END_OF_STREAM); 878 } 879 880 if (!info->mUsingInterleavedTCP) { 881 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 882 883 // Clear the tag 884 if (mUIDValid) { 885 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 886 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket); 887 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 888 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket); 889 } 890 891 close(info->mRTPSocket); 892 close(info->mRTCPSocket); 893 } 894 } 895 mTracks.clear(); 896 mSetupTracksSuccessful = false; 897 mSeekPending = false; 898 mFirstAccessUnit = true; 899 mAllTracksHaveTime = false; 900 mNTPAnchorUs = -1; 901 mMediaAnchorUs = -1; 902 mNumAccessUnitsReceived = 0; 903 mReceivedFirstRTCPPacket = false; 904 mReceivedFirstRTPPacket = false; 905 mPausing = false; 906 mSeekable = true; 907 908 sp<AMessage> reply = new AMessage('tear', this); 909 910 int32_t reconnect; 911 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 912 reply->setInt32("reconnect", true); 913 } 914 915 AString request; 916 request = "TEARDOWN "; 917 918 // XXX should use aggregate url from SDP here... 919 request.append(mSessionURL); 920 request.append(" RTSP/1.0\r\n"); 921 922 request.append("Session: "); 923 request.append(mSessionID); 924 request.append("\r\n"); 925 926 request.append("\r\n"); 927 928 mConn->sendRequest(request.c_str(), reply); 929 break; 930 } 931 932 case 'tear': 933 { 934 int32_t result; 935 CHECK(msg->findInt32("result", &result)); 936 937 ALOGI("TEARDOWN completed with result %d (%s)", 938 result, strerror(-result)); 939 940 sp<AMessage> reply = new AMessage('disc', this); 941 942 int32_t reconnect; 943 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 944 reply->setInt32("reconnect", true); 945 } 946 947 mConn->disconnect(reply); 948 break; 949 } 950 951 case 'quit': 952 { 953 sp<AMessage> msg = mNotify->dup(); 954 msg->setInt32("what", kWhatDisconnected); 955 msg->setInt32("result", UNKNOWN_ERROR); 956 msg->post(); 957 break; 958 } 959 960 case 'chek': 961 { 962 int32_t generation; 963 CHECK(msg->findInt32("generation", &generation)); 964 if (generation != mCheckGeneration) { 965 // This is an outdated message. Ignore. 966 break; 967 } 968 969 if (mNumAccessUnitsReceived == 0) { 970 #if 1 971 ALOGI("stream ended? aborting."); 972 (new AMessage('abor', this))->post(); 973 break; 974 #else 975 ALOGI("haven't seen an AU in a looong time."); 976 #endif 977 } 978 979 mNumAccessUnitsReceived = 0; 980 msg->post(kAccessUnitTimeoutUs); 981 break; 982 } 983 984 case 'accu': 985 { 986 if (mSeekPending) { 987 ALOGV("Stale access unit."); 988 break; 989 } 990 991 int32_t timeUpdate; 992 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 993 size_t trackIndex; 994 CHECK(msg->findSize("track-index", &trackIndex)); 995 996 uint32_t rtpTime; 997 uint64_t ntpTime; 998 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 999 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 1000 1001 onTimeUpdate(trackIndex, rtpTime, ntpTime); 1002 break; 1003 } 1004 1005 int32_t first; 1006 if (msg->findInt32("first-rtcp", &first)) { 1007 mReceivedFirstRTCPPacket = true; 1008 break; 1009 } 1010 1011 if (msg->findInt32("first-rtp", &first)) { 1012 mReceivedFirstRTPPacket = true; 1013 break; 1014 } 1015 1016 ++mNumAccessUnitsReceived; 1017 postAccessUnitTimeoutCheck(); 1018 1019 size_t trackIndex; 1020 CHECK(msg->findSize("track-index", &trackIndex)); 1021 1022 if (trackIndex >= mTracks.size()) { 1023 ALOGV("late packets ignored."); 1024 break; 1025 } 1026 1027 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1028 1029 int32_t eos; 1030 if (msg->findInt32("eos", &eos)) { 1031 ALOGI("received BYE on track index %zu", trackIndex); 1032 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1033 ALOGI("No time established => fake existing data"); 1034 1035 track->mEOSReceived = true; 1036 mTryFakeRTCP = true; 1037 mReceivedFirstRTCPPacket = true; 1038 fakeTimestamps(); 1039 } else { 1040 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1041 } 1042 return; 1043 } 1044 1045 sp<ABuffer> accessUnit; 1046 CHECK(msg->findBuffer("access-unit", &accessUnit)); 1047 1048 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 1049 1050 if (mSeekPending) { 1051 ALOGV("we're seeking, dropping stale packet."); 1052 break; 1053 } 1054 1055 if (track->mNewSegment) { 1056 // The sequence number from RTP packet has only 16 bits and is extended 1057 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of 1058 // RTSP "PLAY" command should be used to detect the first RTP packet 1059 // after seeking. 1060 if (track->mAllowedStaleAccessUnits > 0) { 1061 if ((((seqNum ^ track->mFirstSeqNumInSegment) & 0xffff) != 0)) { 1062 // Not the first rtp packet of the stream after seeking, discarding. 1063 track->mAllowedStaleAccessUnits--; 1064 ALOGV("discarding stale access unit (0x%x : 0x%x)", 1065 seqNum, track->mFirstSeqNumInSegment); 1066 break; 1067 } 1068 } else { // track->mAllowedStaleAccessUnits <= 0 1069 mNumAccessUnitsReceived = 0; 1070 ALOGW_IF(track->mAllowedStaleAccessUnits == 0, 1071 "Still no first rtp packet after %d stale ones", 1072 kMaxAllowedStaleAccessUnits); 1073 track->mAllowedStaleAccessUnits = -1; 1074 break; 1075 } 1076 1077 // Now found the first rtp packet of the stream after seeking. 1078 track->mFirstSeqNumInSegment = seqNum; 1079 track->mNewSegment = false; 1080 } 1081 1082 if (seqNum < track->mFirstSeqNumInSegment) { 1083 ALOGV("dropping stale access-unit (%d < %d)", 1084 seqNum, track->mFirstSeqNumInSegment); 1085 break; 1086 } 1087 1088 onAccessUnitComplete(trackIndex, accessUnit); 1089 break; 1090 } 1091 1092 case 'paus': 1093 { 1094 int32_t generation; 1095 CHECK(msg->findInt32("pausecheck", &generation)); 1096 if (generation != mPauseGeneration) { 1097 ALOGV("Ignoring outdated pause message."); 1098 break; 1099 } 1100 1101 if (!mSeekable) { 1102 ALOGW("This is a live stream, ignoring pause request."); 1103 break; 1104 } 1105 1106 if (mPausing) { 1107 ALOGV("This stream is already paused."); 1108 break; 1109 } 1110 1111 mCheckPending = true; 1112 ++mCheckGeneration; 1113 mPausing = true; 1114 1115 AString request = "PAUSE "; 1116 request.append(mControlURL); 1117 request.append(" RTSP/1.0\r\n"); 1118 1119 request.append("Session: "); 1120 request.append(mSessionID); 1121 request.append("\r\n"); 1122 1123 request.append("\r\n"); 1124 1125 sp<AMessage> reply = new AMessage('pau2', this); 1126 mConn->sendRequest(request.c_str(), reply); 1127 break; 1128 } 1129 1130 case 'pau2': 1131 { 1132 int32_t result; 1133 CHECK(msg->findInt32("result", &result)); 1134 mCheckTimeoutGeneration++; 1135 1136 ALOGI("PAUSE completed with result %d (%s)", 1137 result, strerror(-result)); 1138 break; 1139 } 1140 1141 case 'resu': 1142 { 1143 if (mPausing && mSeekPending) { 1144 // If seeking, Play will be sent from see1 instead 1145 break; 1146 } 1147 1148 if (!mPausing) { 1149 // Dont send PLAY if we have not paused 1150 break; 1151 } 1152 AString request = "PLAY "; 1153 request.append(mControlURL); 1154 request.append(" RTSP/1.0\r\n"); 1155 1156 request.append("Session: "); 1157 request.append(mSessionID); 1158 request.append("\r\n"); 1159 1160 request.append("\r\n"); 1161 1162 sp<AMessage> reply = new AMessage('res2', this); 1163 mConn->sendRequest(request.c_str(), reply); 1164 break; 1165 } 1166 1167 case 'res2': 1168 { 1169 int32_t result; 1170 CHECK(msg->findInt32("result", &result)); 1171 1172 ALOGI("PLAY (for resume) completed with result %d (%s)", 1173 result, strerror(-result)); 1174 1175 mCheckPending = false; 1176 ++mCheckGeneration; 1177 postAccessUnitTimeoutCheck(); 1178 1179 if (result == OK) { 1180 sp<RefBase> obj; 1181 CHECK(msg->findObject("response", &obj)); 1182 sp<ARTSPResponse> response = 1183 static_cast<ARTSPResponse *>(obj.get()); 1184 1185 if (response->mStatusCode != 200) { 1186 result = UNKNOWN_ERROR; 1187 } else { 1188 parsePlayResponse(response); 1189 1190 // Post new timeout in order to make sure to use 1191 // fake timestamps if no new Sender Reports arrive 1192 sp<AMessage> timeout = new AMessage('tiou', this); 1193 mCheckTimeoutGeneration++; 1194 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1195 timeout->post(kStartupTimeoutUs); 1196 } 1197 } 1198 1199 if (result != OK) { 1200 ALOGE("resume failed, aborting."); 1201 (new AMessage('abor', this))->post(); 1202 } 1203 1204 mPausing = false; 1205 break; 1206 } 1207 1208 case 'seek': 1209 { 1210 if (!mSeekable) { 1211 ALOGW("This is a live stream, ignoring seek request."); 1212 1213 sp<AMessage> msg = mNotify->dup(); 1214 msg->setInt32("what", kWhatSeekDone); 1215 msg->post(); 1216 break; 1217 } 1218 1219 int64_t timeUs; 1220 CHECK(msg->findInt64("time", &timeUs)); 1221 1222 mSeekPending = true; 1223 1224 // Disable the access unit timeout until we resumed 1225 // playback again. 1226 mCheckPending = true; 1227 ++mCheckGeneration; 1228 1229 sp<AMessage> reply = new AMessage('see0', this); 1230 reply->setInt64("time", timeUs); 1231 1232 if (mPausing) { 1233 // PAUSE already sent 1234 ALOGI("Pause already sent"); 1235 reply->post(); 1236 break; 1237 } 1238 AString request = "PAUSE "; 1239 request.append(mControlURL); 1240 request.append(" RTSP/1.0\r\n"); 1241 1242 request.append("Session: "); 1243 request.append(mSessionID); 1244 request.append("\r\n"); 1245 1246 request.append("\r\n"); 1247 1248 mConn->sendRequest(request.c_str(), reply); 1249 break; 1250 } 1251 1252 case 'see0': 1253 { 1254 // Session is paused now. 1255 status_t err = OK; 1256 msg->findInt32("result", &err); 1257 1258 int64_t timeUs; 1259 CHECK(msg->findInt64("time", &timeUs)); 1260 1261 sp<AMessage> notify = mNotify->dup(); 1262 notify->setInt32("what", kWhatSeekPaused); 1263 notify->setInt32("err", err); 1264 notify->setInt64("time", timeUs); 1265 notify->post(); 1266 break; 1267 1268 } 1269 1270 case 'see1': 1271 { 1272 for (size_t i = 0; i < mTracks.size(); ++i) { 1273 TrackInfo *info = &mTracks.editItemAt(i); 1274 1275 postQueueSeekDiscontinuity(i); 1276 info->mEOSReceived = false; 1277 1278 info->mRTPAnchor = 0; 1279 info->mNTPAnchorUs = -1; 1280 } 1281 1282 mAllTracksHaveTime = false; 1283 mNTPAnchorUs = -1; 1284 1285 // Start new timeoutgeneration to avoid getting timeout 1286 // before PLAY response arrive 1287 sp<AMessage> timeout = new AMessage('tiou', this); 1288 mCheckTimeoutGeneration++; 1289 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1290 timeout->post(kStartupTimeoutUs); 1291 1292 int64_t timeUs; 1293 CHECK(msg->findInt64("time", &timeUs)); 1294 1295 AString request = "PLAY "; 1296 request.append(mControlURL); 1297 request.append(" RTSP/1.0\r\n"); 1298 1299 request.append("Session: "); 1300 request.append(mSessionID); 1301 request.append("\r\n"); 1302 1303 request.append( 1304 AStringPrintf( 1305 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 1306 1307 request.append("\r\n"); 1308 1309 sp<AMessage> reply = new AMessage('see2', this); 1310 mConn->sendRequest(request.c_str(), reply); 1311 break; 1312 } 1313 1314 case 'see2': 1315 { 1316 if (mTracks.size() == 0) { 1317 // We have already hit abor, break 1318 break; 1319 } 1320 1321 int32_t result; 1322 CHECK(msg->findInt32("result", &result)); 1323 1324 ALOGI("PLAY (for seek) completed with result %d (%s)", 1325 result, strerror(-result)); 1326 1327 mCheckPending = false; 1328 ++mCheckGeneration; 1329 postAccessUnitTimeoutCheck(); 1330 1331 if (result == OK) { 1332 sp<RefBase> obj; 1333 CHECK(msg->findObject("response", &obj)); 1334 sp<ARTSPResponse> response = 1335 static_cast<ARTSPResponse *>(obj.get()); 1336 1337 if (response->mStatusCode != 200) { 1338 result = UNKNOWN_ERROR; 1339 } else { 1340 parsePlayResponse(response); 1341 1342 // Post new timeout in order to make sure to use 1343 // fake timestamps if no new Sender Reports arrive 1344 sp<AMessage> timeout = new AMessage('tiou', this); 1345 mCheckTimeoutGeneration++; 1346 timeout->setInt32("tioucheck", mCheckTimeoutGeneration); 1347 timeout->post(kStartupTimeoutUs); 1348 1349 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 1350 CHECK_GE(i, 0); 1351 1352 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 1353 1354 ALOGI("seek completed."); 1355 } 1356 } 1357 1358 if (result != OK) { 1359 ALOGE("seek failed, aborting."); 1360 (new AMessage('abor', this))->post(); 1361 } 1362 1363 mPausing = false; 1364 mSeekPending = false; 1365 1366 // Discard all stale access units. 1367 for (size_t i = 0; i < mTracks.size(); ++i) { 1368 TrackInfo *track = &mTracks.editItemAt(i); 1369 track->mPackets.clear(); 1370 } 1371 1372 sp<AMessage> msg = mNotify->dup(); 1373 msg->setInt32("what", kWhatSeekDone); 1374 msg->post(); 1375 break; 1376 } 1377 1378 case 'biny': 1379 { 1380 sp<ABuffer> buffer; 1381 CHECK(msg->findBuffer("buffer", &buffer)); 1382 1383 int32_t index; 1384 CHECK(buffer->meta()->findInt32("index", &index)); 1385 1386 mRTPConn->injectPacket(index, buffer); 1387 break; 1388 } 1389 1390 case 'tiou': 1391 { 1392 int32_t timeoutGenerationCheck; 1393 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck)); 1394 if (timeoutGenerationCheck != mCheckTimeoutGeneration) { 1395 // This is an outdated message. Ignore. 1396 // This typically happens if a lot of seeks are 1397 // performed, since new timeout messages now are 1398 // posted at seek as well. 1399 break; 1400 } 1401 if (!mReceivedFirstRTCPPacket) { 1402 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) { 1403 ALOGW("We received RTP packets but no RTCP packets, " 1404 "using fake timestamps."); 1405 1406 mTryFakeRTCP = true; 1407 1408 mReceivedFirstRTCPPacket = true; 1409 1410 fakeTimestamps(); 1411 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1412 ALOGW("Never received any data, switching transports."); 1413 1414 mTryTCPInterleaving = true; 1415 1416 sp<AMessage> msg = new AMessage('abor', this); 1417 msg->setInt32("reconnect", true); 1418 msg->post(); 1419 } else { 1420 ALOGW("Never received any data, disconnecting."); 1421 (new AMessage('abor', this))->post(); 1422 } 1423 } else { 1424 if (!mAllTracksHaveTime) { 1425 ALOGW("We received some RTCP packets, but time " 1426 "could not be established on all tracks, now " 1427 "using fake timestamps"); 1428 1429 fakeTimestamps(); 1430 } 1431 } 1432 break; 1433 } 1434 1435 default: 1436 TRESPASS(); 1437 break; 1438 } 1439 } 1440 1441 void postKeepAlive() { 1442 sp<AMessage> msg = new AMessage('aliv', this); 1443 msg->setInt32("generation", mKeepAliveGeneration); 1444 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1445 } 1446 1447 void postAccessUnitTimeoutCheck() { 1448 if (mCheckPending) { 1449 return; 1450 } 1451 1452 mCheckPending = true; 1453 sp<AMessage> check = new AMessage('chek', this); 1454 check->setInt32("generation", mCheckGeneration); 1455 check->post(kAccessUnitTimeoutUs); 1456 } 1457 1458 static void SplitString( 1459 const AString &s, const char *separator, List<AString> *items) { 1460 items->clear(); 1461 size_t start = 0; 1462 while (start < s.size()) { 1463 ssize_t offset = s.find(separator, start); 1464 1465 if (offset < 0) { 1466 items->push_back(AString(s, start, s.size() - start)); 1467 break; 1468 } 1469 1470 items->push_back(AString(s, start, offset - start)); 1471 start = offset + strlen(separator); 1472 } 1473 } 1474 1475 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1476 mPlayResponseParsed = true; 1477 if (mTracks.size() == 0) { 1478 ALOGV("parsePlayResponse: late packets ignored."); 1479 return; 1480 } 1481 1482 ssize_t i = response->mHeaders.indexOfKey("range"); 1483 if (i < 0) { 1484 // Server doesn't even tell use what range it is going to 1485 // play, therefore we won't support seeking. 1486 return; 1487 } 1488 1489 AString range = response->mHeaders.valueAt(i); 1490 ALOGV("Range: %s", range.c_str()); 1491 1492 AString val; 1493 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1494 1495 float npt1, npt2; 1496 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1497 // This is a live stream and therefore not seekable. 1498 1499 ALOGI("This is a live stream"); 1500 return; 1501 } 1502 1503 i = response->mHeaders.indexOfKey("rtp-info"); 1504 CHECK_GE(i, 0); 1505 1506 AString rtpInfo = response->mHeaders.valueAt(i); 1507 List<AString> streamInfos; 1508 SplitString(rtpInfo, ",", &streamInfos); 1509 1510 int n = 1; 1511 for (List<AString>::iterator it = streamInfos.begin(); 1512 it != streamInfos.end(); ++it) { 1513 (*it).trim(); 1514 ALOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1515 1516 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1517 1518 size_t trackIndex = 0; 1519 while (trackIndex < mTracks.size() 1520 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1521 ++trackIndex; 1522 } 1523 CHECK_LT(trackIndex, mTracks.size()); 1524 1525 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1526 1527 char *end; 1528 unsigned long seq = strtoul(val.c_str(), &end, 10); 1529 1530 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1531 info->mFirstSeqNumInSegment = seq; 1532 info->mNewSegment = true; 1533 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits; 1534 1535 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1536 1537 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1538 1539 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1540 1541 info->mNormalPlayTimeRTP = rtpTime; 1542 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1543 1544 if (!mFirstAccessUnit) { 1545 postNormalPlayTimeMapping( 1546 trackIndex, 1547 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1548 } 1549 1550 ++n; 1551 } 1552 } 1553 1554 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1555 CHECK_GE(index, 0u); 1556 CHECK_LT(index, mTracks.size()); 1557 1558 const TrackInfo &info = mTracks.itemAt(index); 1559 1560 *timeScale = info.mTimeScale; 1561 1562 return info.mPacketSource->getFormat(); 1563 } 1564 1565 size_t countTracks() const { 1566 return mTracks.size(); 1567 } 1568 1569 private: 1570 struct TrackInfo { 1571 AString mURL; 1572 int mRTPSocket; 1573 int mRTCPSocket; 1574 bool mUsingInterleavedTCP; 1575 uint32_t mFirstSeqNumInSegment; 1576 bool mNewSegment; 1577 int32_t mAllowedStaleAccessUnits; 1578 1579 uint32_t mRTPAnchor; 1580 int64_t mNTPAnchorUs; 1581 int32_t mTimeScale; 1582 bool mEOSReceived; 1583 1584 uint32_t mNormalPlayTimeRTP; 1585 int64_t mNormalPlayTimeUs; 1586 1587 sp<APacketSource> mPacketSource; 1588 1589 // Stores packets temporarily while no notion of time 1590 // has been established yet. 1591 List<sp<ABuffer> > mPackets; 1592 }; 1593 1594 sp<AMessage> mNotify; 1595 bool mUIDValid; 1596 uid_t mUID; 1597 sp<ALooper> mNetLooper; 1598 sp<ARTSPConnection> mConn; 1599 sp<ARTPConnection> mRTPConn; 1600 sp<ASessionDescription> mSessionDesc; 1601 AString mOriginalSessionURL; // This one still has user:pass@ 1602 AString mSessionURL; 1603 AString mSessionHost; 1604 AString mBaseURL; 1605 AString mControlURL; 1606 AString mSessionID; 1607 bool mSetupTracksSuccessful; 1608 bool mSeekPending; 1609 bool mFirstAccessUnit; 1610 1611 bool mAllTracksHaveTime; 1612 int64_t mNTPAnchorUs; 1613 int64_t mMediaAnchorUs; 1614 int64_t mLastMediaTimeUs; 1615 1616 int64_t mNumAccessUnitsReceived; 1617 bool mCheckPending; 1618 int32_t mCheckGeneration; 1619 int32_t mCheckTimeoutGeneration; 1620 bool mTryTCPInterleaving; 1621 bool mTryFakeRTCP; 1622 bool mReceivedFirstRTCPPacket; 1623 bool mReceivedFirstRTPPacket; 1624 bool mSeekable; 1625 int64_t mKeepAliveTimeoutUs; 1626 int32_t mKeepAliveGeneration; 1627 bool mPausing; 1628 int32_t mPauseGeneration; 1629 1630 Vector<TrackInfo> mTracks; 1631 1632 bool mPlayResponseParsed; 1633 1634 void setupTrack(size_t index) { 1635 sp<APacketSource> source = 1636 new APacketSource(mSessionDesc, index); 1637 1638 if (source->initCheck() != OK) { 1639 ALOGW("Unsupported format. Ignoring track #%zu.", index); 1640 1641 sp<AMessage> reply = new AMessage('setu', this); 1642 reply->setSize("index", index); 1643 reply->setInt32("result", ERROR_UNSUPPORTED); 1644 reply->post(); 1645 return; 1646 } 1647 1648 AString url; 1649 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1650 1651 AString trackURL; 1652 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1653 1654 mTracks.push(TrackInfo()); 1655 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1656 info->mURL = trackURL; 1657 info->mPacketSource = source; 1658 info->mUsingInterleavedTCP = false; 1659 info->mFirstSeqNumInSegment = 0; 1660 info->mNewSegment = true; 1661 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits; 1662 info->mRTPSocket = -1; 1663 info->mRTCPSocket = -1; 1664 info->mRTPAnchor = 0; 1665 info->mNTPAnchorUs = -1; 1666 info->mNormalPlayTimeRTP = 0; 1667 info->mNormalPlayTimeUs = 0ll; 1668 1669 unsigned long PT; 1670 AString formatDesc; 1671 AString formatParams; 1672 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1673 1674 int32_t timescale; 1675 int32_t numChannels; 1676 ASessionDescription::ParseFormatDesc( 1677 formatDesc.c_str(), ×cale, &numChannels); 1678 1679 info->mTimeScale = timescale; 1680 info->mEOSReceived = false; 1681 1682 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str()); 1683 1684 AString request = "SETUP "; 1685 request.append(trackURL); 1686 request.append(" RTSP/1.0\r\n"); 1687 1688 if (mTryTCPInterleaving) { 1689 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1690 info->mUsingInterleavedTCP = true; 1691 info->mRTPSocket = interleaveIndex; 1692 info->mRTCPSocket = interleaveIndex + 1; 1693 1694 request.append("Transport: RTP/AVP/TCP;interleaved="); 1695 request.append(interleaveIndex); 1696 request.append("-"); 1697 request.append(interleaveIndex + 1); 1698 } else { 1699 unsigned rtpPort; 1700 ARTPConnection::MakePortPair( 1701 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1702 1703 if (mUIDValid) { 1704 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1705 (uint32_t)*(uint32_t*) "RTP_"); 1706 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1707 (uint32_t)*(uint32_t*) "RTP_"); 1708 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID); 1709 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID); 1710 } 1711 1712 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1713 request.append(rtpPort); 1714 request.append("-"); 1715 request.append(rtpPort + 1); 1716 } 1717 1718 request.append("\r\n"); 1719 1720 if (index > 1) { 1721 request.append("Session: "); 1722 request.append(mSessionID); 1723 request.append("\r\n"); 1724 } 1725 1726 request.append("\r\n"); 1727 1728 sp<AMessage> reply = new AMessage('setu', this); 1729 reply->setSize("index", index); 1730 reply->setSize("track-index", mTracks.size() - 1); 1731 mConn->sendRequest(request.c_str(), reply); 1732 } 1733 1734 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1735 out->clear(); 1736 1737 if (strncasecmp("rtsp://", baseURL, 7)) { 1738 // Base URL must be absolute 1739 return false; 1740 } 1741 1742 if (!strncasecmp("rtsp://", url, 7)) { 1743 // "url" is already an absolute URL, ignore base URL. 1744 out->setTo(url); 1745 return true; 1746 } 1747 1748 size_t n = strlen(baseURL); 1749 out->setTo(baseURL); 1750 if (baseURL[n - 1] != '/') { 1751 out->append("/"); 1752 } 1753 out->append(url); 1754 1755 return true; 1756 } 1757 1758 void fakeTimestamps() { 1759 mNTPAnchorUs = -1ll; 1760 for (size_t i = 0; i < mTracks.size(); ++i) { 1761 onTimeUpdate(i, 0, 0ll); 1762 } 1763 } 1764 1765 bool dataReceivedOnAllChannels() { 1766 TrackInfo *track; 1767 for (size_t i = 0; i < mTracks.size(); ++i) { 1768 track = &mTracks.editItemAt(i); 1769 if (track->mPackets.empty()) { 1770 return false; 1771 } 1772 } 1773 return true; 1774 } 1775 1776 void handleFirstAccessUnit() { 1777 if (mFirstAccessUnit) { 1778 sp<AMessage> msg = mNotify->dup(); 1779 msg->setInt32("what", kWhatConnected); 1780 msg->post(); 1781 1782 if (mSeekable) { 1783 for (size_t i = 0; i < mTracks.size(); ++i) { 1784 TrackInfo *info = &mTracks.editItemAt(i); 1785 1786 postNormalPlayTimeMapping( 1787 i, 1788 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1789 } 1790 } 1791 1792 mFirstAccessUnit = false; 1793 } 1794 } 1795 1796 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1797 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx", 1798 trackIndex, rtpTime, (long long)ntpTime); 1799 1800 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1801 1802 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1803 1804 track->mRTPAnchor = rtpTime; 1805 track->mNTPAnchorUs = ntpTimeUs; 1806 1807 if (mNTPAnchorUs < 0) { 1808 mNTPAnchorUs = ntpTimeUs; 1809 mMediaAnchorUs = mLastMediaTimeUs; 1810 } 1811 1812 if (!mAllTracksHaveTime) { 1813 bool allTracksHaveTime = (mTracks.size() > 0); 1814 for (size_t i = 0; i < mTracks.size(); ++i) { 1815 TrackInfo *track = &mTracks.editItemAt(i); 1816 if (track->mNTPAnchorUs < 0) { 1817 allTracksHaveTime = false; 1818 break; 1819 } 1820 } 1821 if (allTracksHaveTime) { 1822 mAllTracksHaveTime = true; 1823 ALOGI("Time now established for all tracks."); 1824 } 1825 } 1826 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) { 1827 handleFirstAccessUnit(); 1828 1829 // Time is now established, lets start timestamping immediately 1830 for (size_t i = 0; i < mTracks.size(); ++i) { 1831 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1832 while (!trackInfo->mPackets.empty()) { 1833 sp<ABuffer> accessUnit = *trackInfo->mPackets.begin(); 1834 trackInfo->mPackets.erase(trackInfo->mPackets.begin()); 1835 1836 if (addMediaTimestamp(i, trackInfo, accessUnit)) { 1837 postQueueAccessUnit(i, accessUnit); 1838 } 1839 } 1840 } 1841 for (size_t i = 0; i < mTracks.size(); ++i) { 1842 TrackInfo *trackInfo = &mTracks.editItemAt(i); 1843 if (trackInfo->mEOSReceived) { 1844 postQueueEOS(i, ERROR_END_OF_STREAM); 1845 trackInfo->mEOSReceived = false; 1846 } 1847 } 1848 } 1849 } 1850 1851 void onAccessUnitComplete( 1852 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1853 ALOGV("onAccessUnitComplete track %d", trackIndex); 1854 1855 if(!mPlayResponseParsed){ 1856 ALOGI("play response is not parsed, storing accessunit"); 1857 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1858 track->mPackets.push_back(accessUnit); 1859 return; 1860 } 1861 1862 handleFirstAccessUnit(); 1863 1864 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1865 1866 if (!mAllTracksHaveTime) { 1867 ALOGV("storing accessUnit, no time established yet"); 1868 track->mPackets.push_back(accessUnit); 1869 return; 1870 } 1871 1872 while (!track->mPackets.empty()) { 1873 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1874 track->mPackets.erase(track->mPackets.begin()); 1875 1876 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1877 postQueueAccessUnit(trackIndex, accessUnit); 1878 } 1879 } 1880 1881 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1882 postQueueAccessUnit(trackIndex, accessUnit); 1883 } 1884 1885 if (track->mEOSReceived) { 1886 postQueueEOS(trackIndex, ERROR_END_OF_STREAM); 1887 track->mEOSReceived = false; 1888 } 1889 } 1890 1891 bool addMediaTimestamp( 1892 int32_t trackIndex, const TrackInfo *track, 1893 const sp<ABuffer> &accessUnit) { 1894 UNUSED_UNLESS_VERBOSE(trackIndex); 1895 1896 uint32_t rtpTime; 1897 CHECK(accessUnit->meta()->findInt32( 1898 "rtp-time", (int32_t *)&rtpTime)); 1899 1900 int64_t relRtpTimeUs = 1901 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1902 / track->mTimeScale; 1903 1904 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1905 1906 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1907 1908 if (mediaTimeUs > mLastMediaTimeUs) { 1909 mLastMediaTimeUs = mediaTimeUs; 1910 } 1911 1912 if (mediaTimeUs < 0) { 1913 ALOGV("dropping early accessUnit."); 1914 return false; 1915 } 1916 1917 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)", 1918 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6); 1919 1920 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1921 1922 return true; 1923 } 1924 1925 void postQueueAccessUnit( 1926 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1927 sp<AMessage> msg = mNotify->dup(); 1928 msg->setInt32("what", kWhatAccessUnit); 1929 msg->setSize("trackIndex", trackIndex); 1930 msg->setBuffer("accessUnit", accessUnit); 1931 msg->post(); 1932 } 1933 1934 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1935 sp<AMessage> msg = mNotify->dup(); 1936 msg->setInt32("what", kWhatEOS); 1937 msg->setSize("trackIndex", trackIndex); 1938 msg->setInt32("finalResult", finalResult); 1939 msg->post(); 1940 } 1941 1942 void postQueueSeekDiscontinuity(size_t trackIndex) { 1943 sp<AMessage> msg = mNotify->dup(); 1944 msg->setInt32("what", kWhatSeekDiscontinuity); 1945 msg->setSize("trackIndex", trackIndex); 1946 msg->post(); 1947 } 1948 1949 void postNormalPlayTimeMapping( 1950 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1951 sp<AMessage> msg = mNotify->dup(); 1952 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1953 msg->setSize("trackIndex", trackIndex); 1954 msg->setInt32("rtpTime", rtpTime); 1955 msg->setInt64("nptUs", nptUs); 1956 msg->post(); 1957 } 1958 1959 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1960 }; 1961 1962 } // namespace android 1963 1964 #endif // MY_HANDLER_H_ 1965