1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MY_HANDLER_H_ 18 19 #define MY_HANDLER_H_ 20 21 //#define LOG_NDEBUG 0 22 #define LOG_TAG "MyHandler" 23 #include <utils/Log.h> 24 25 #include "APacketSource.h" 26 #include "ARTPConnection.h" 27 #include "ARTSPConnection.h" 28 #include "ASessionDescription.h" 29 30 #include <ctype.h> 31 #include <cutils/properties.h> 32 33 #include <media/stagefright/foundation/ABuffer.h> 34 #include <media/stagefright/foundation/ADebug.h> 35 #include <media/stagefright/foundation/ALooper.h> 36 #include <media/stagefright/foundation/AMessage.h> 37 #include <media/stagefright/MediaErrors.h> 38 39 #include <arpa/inet.h> 40 #include <sys/socket.h> 41 #include <netdb.h> 42 43 #include "HTTPBase.h" 44 45 // If no access units are received within 5 secs, assume that the rtp 46 // stream has ended and signal end of stream. 47 static int64_t kAccessUnitTimeoutUs = 10000000ll; 48 49 // If no access units arrive for the first 10 secs after starting the 50 // stream, assume none ever will and signal EOS or switch transports. 51 static int64_t kStartupTimeoutUs = 10000000ll; 52 53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll; 54 55 namespace android { 56 57 static void MakeUserAgentString(AString *s) { 58 s->setTo("stagefright/1.1 (Linux;Android "); 59 60 #if (PROPERTY_VALUE_MAX < 8) 61 #error "PROPERTY_VALUE_MAX must be at least 8" 62 #endif 63 64 char value[PROPERTY_VALUE_MAX]; 65 property_get("ro.build.version.release", value, "Unknown"); 66 s->append(value); 67 s->append(")"); 68 } 69 70 static bool GetAttribute(const char *s, const char *key, AString *value) { 71 value->clear(); 72 73 size_t keyLen = strlen(key); 74 75 for (;;) { 76 while (isspace(*s)) { 77 ++s; 78 } 79 80 const char *colonPos = strchr(s, ';'); 81 82 size_t len = 83 (colonPos == NULL) ? strlen(s) : colonPos - s; 84 85 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 86 value->setTo(&s[keyLen + 1], len - keyLen - 1); 87 return true; 88 } 89 90 if (colonPos == NULL) { 91 return false; 92 } 93 94 s = colonPos + 1; 95 } 96 } 97 98 struct MyHandler : public AHandler { 99 enum { 100 kWhatConnected = 'conn', 101 kWhatDisconnected = 'disc', 102 kWhatSeekDone = 'sdon', 103 104 kWhatAccessUnit = 'accU', 105 kWhatEOS = 'eos!', 106 kWhatSeekDiscontinuity = 'seeD', 107 kWhatNormalPlayTimeMapping = 'nptM', 108 }; 109 110 MyHandler( 111 const char *url, 112 const sp<AMessage> ¬ify, 113 bool uidValid = false, uid_t uid = 0) 114 : mNotify(notify), 115 mUIDValid(uidValid), 116 mUID(uid), 117 mNetLooper(new ALooper), 118 mConn(new ARTSPConnection(mUIDValid, mUID)), 119 mRTPConn(new ARTPConnection), 120 mOriginalSessionURL(url), 121 mSessionURL(url), 122 mSetupTracksSuccessful(false), 123 mSeekPending(false), 124 mFirstAccessUnit(true), 125 mNTPAnchorUs(-1), 126 mMediaAnchorUs(-1), 127 mLastMediaTimeUs(0), 128 mNumAccessUnitsReceived(0), 129 mCheckPending(false), 130 mCheckGeneration(0), 131 mTryTCPInterleaving(false), 132 mTryFakeRTCP(false), 133 mReceivedFirstRTCPPacket(false), 134 mReceivedFirstRTPPacket(false), 135 mSeekable(false), 136 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs), 137 mKeepAliveGeneration(0) { 138 mNetLooper->setName("rtsp net"); 139 mNetLooper->start(false /* runOnCallingThread */, 140 false /* canCallJava */, 141 PRIORITY_HIGHEST); 142 143 // Strip any authentication info from the session url, we don't 144 // want to transmit user/pass in cleartext. 145 AString host, path, user, pass; 146 unsigned port; 147 CHECK(ARTSPConnection::ParseURL( 148 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 149 150 if (user.size() > 0) { 151 mSessionURL.clear(); 152 mSessionURL.append("rtsp://"); 153 mSessionURL.append(host); 154 mSessionURL.append(":"); 155 mSessionURL.append(StringPrintf("%u", port)); 156 mSessionURL.append(path); 157 158 LOGI("rewritten session url: '%s'", mSessionURL.c_str()); 159 } 160 161 mSessionHost = host; 162 } 163 164 void connect() { 165 looper()->registerHandler(mConn); 166 (1 ? mNetLooper : looper())->registerHandler(mRTPConn); 167 168 sp<AMessage> notify = new AMessage('biny', id()); 169 mConn->observeBinaryData(notify); 170 171 sp<AMessage> reply = new AMessage('conn', id()); 172 mConn->connect(mOriginalSessionURL.c_str(), reply); 173 } 174 175 void disconnect() { 176 (new AMessage('abor', id()))->post(); 177 } 178 179 void seek(int64_t timeUs) { 180 sp<AMessage> msg = new AMessage('seek', id()); 181 msg->setInt64("time", timeUs); 182 msg->post(); 183 } 184 185 static void addRR(const sp<ABuffer> &buf) { 186 uint8_t *ptr = buf->data() + buf->size(); 187 ptr[0] = 0x80 | 0; 188 ptr[1] = 201; // RR 189 ptr[2] = 0; 190 ptr[3] = 1; 191 ptr[4] = 0xde; // SSRC 192 ptr[5] = 0xad; 193 ptr[6] = 0xbe; 194 ptr[7] = 0xef; 195 196 buf->setRange(0, buf->size() + 8); 197 } 198 199 static void addSDES(int s, const sp<ABuffer> &buffer) { 200 struct sockaddr_in addr; 201 socklen_t addrSize = sizeof(addr); 202 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 203 204 uint8_t *data = buffer->data() + buffer->size(); 205 data[0] = 0x80 | 1; 206 data[1] = 202; // SDES 207 data[4] = 0xde; // SSRC 208 data[5] = 0xad; 209 data[6] = 0xbe; 210 data[7] = 0xef; 211 212 size_t offset = 8; 213 214 data[offset++] = 1; // CNAME 215 216 AString cname = "stagefright@"; 217 cname.append(inet_ntoa(addr.sin_addr)); 218 data[offset++] = cname.size(); 219 220 memcpy(&data[offset], cname.c_str(), cname.size()); 221 offset += cname.size(); 222 223 data[offset++] = 6; // TOOL 224 225 AString tool; 226 MakeUserAgentString(&tool); 227 228 data[offset++] = tool.size(); 229 230 memcpy(&data[offset], tool.c_str(), tool.size()); 231 offset += tool.size(); 232 233 data[offset++] = 0; 234 235 if ((offset % 4) > 0) { 236 size_t count = 4 - (offset % 4); 237 switch (count) { 238 case 3: 239 data[offset++] = 0; 240 case 2: 241 data[offset++] = 0; 242 case 1: 243 data[offset++] = 0; 244 } 245 } 246 247 size_t numWords = (offset / 4) - 1; 248 data[2] = numWords >> 8; 249 data[3] = numWords & 0xff; 250 251 buffer->setRange(buffer->offset(), buffer->size() + offset); 252 } 253 254 // In case we're behind NAT, fire off two UDP packets to the remote 255 // rtp/rtcp ports to poke a hole into the firewall for future incoming 256 // packets. We're going to send an RR/SDES RTCP packet to both of them. 257 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 258 struct sockaddr_in addr; 259 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 260 addr.sin_family = AF_INET; 261 262 AString source; 263 AString server_port; 264 if (!GetAttribute(transport.c_str(), 265 "source", 266 &source)) { 267 LOGW("Missing 'source' field in Transport response. Using " 268 "RTSP endpoint address."); 269 270 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 271 if (ent == NULL) { 272 LOGE("Failed to look up address of session host '%s'", 273 mSessionHost.c_str()); 274 275 return false; 276 } 277 278 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 279 } else { 280 addr.sin_addr.s_addr = inet_addr(source.c_str()); 281 } 282 283 if (!GetAttribute(transport.c_str(), 284 "server_port", 285 &server_port)) { 286 LOGI("Missing 'server_port' field in Transport response."); 287 return false; 288 } 289 290 int rtpPort, rtcpPort; 291 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 292 || rtpPort <= 0 || rtpPort > 65535 293 || rtcpPort <=0 || rtcpPort > 65535 294 || rtcpPort != rtpPort + 1) { 295 LOGE("Server picked invalid RTP/RTCP port pair %s," 296 " RTP port must be even, RTCP port must be one higher.", 297 server_port.c_str()); 298 299 return false; 300 } 301 302 if (rtpPort & 1) { 303 LOGW("Server picked an odd RTP port, it should've picked an " 304 "even one, we'll let it pass for now, but this may break " 305 "in the future."); 306 } 307 308 if (addr.sin_addr.s_addr == INADDR_NONE) { 309 return true; 310 } 311 312 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 313 // No firewalls to traverse on the loopback interface. 314 return true; 315 } 316 317 // Make up an RR/SDES RTCP packet. 318 sp<ABuffer> buf = new ABuffer(65536); 319 buf->setRange(0, 0); 320 addRR(buf); 321 addSDES(rtpSocket, buf); 322 323 addr.sin_port = htons(rtpPort); 324 325 ssize_t n = sendto( 326 rtpSocket, buf->data(), buf->size(), 0, 327 (const sockaddr *)&addr, sizeof(addr)); 328 329 if (n < (ssize_t)buf->size()) { 330 LOGE("failed to poke a hole for RTP packets"); 331 return false; 332 } 333 334 addr.sin_port = htons(rtcpPort); 335 336 n = sendto( 337 rtcpSocket, buf->data(), buf->size(), 0, 338 (const sockaddr *)&addr, sizeof(addr)); 339 340 if (n < (ssize_t)buf->size()) { 341 LOGE("failed to poke a hole for RTCP packets"); 342 return false; 343 } 344 345 LOGV("successfully poked holes."); 346 347 return true; 348 } 349 350 virtual void onMessageReceived(const sp<AMessage> &msg) { 351 switch (msg->what()) { 352 case 'conn': 353 { 354 int32_t result; 355 CHECK(msg->findInt32("result", &result)); 356 357 LOGI("connection request completed with result %d (%s)", 358 result, strerror(-result)); 359 360 if (result == OK) { 361 AString request; 362 request = "DESCRIBE "; 363 request.append(mSessionURL); 364 request.append(" RTSP/1.0\r\n"); 365 request.append("Accept: application/sdp\r\n"); 366 request.append("\r\n"); 367 368 sp<AMessage> reply = new AMessage('desc', id()); 369 mConn->sendRequest(request.c_str(), reply); 370 } else { 371 (new AMessage('disc', id()))->post(); 372 } 373 break; 374 } 375 376 case 'disc': 377 { 378 ++mKeepAliveGeneration; 379 380 int32_t reconnect; 381 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 382 sp<AMessage> reply = new AMessage('conn', id()); 383 mConn->connect(mOriginalSessionURL.c_str(), reply); 384 } else { 385 (new AMessage('quit', id()))->post(); 386 } 387 break; 388 } 389 390 case 'desc': 391 { 392 int32_t result; 393 CHECK(msg->findInt32("result", &result)); 394 395 LOGI("DESCRIBE completed with result %d (%s)", 396 result, strerror(-result)); 397 398 if (result == OK) { 399 sp<RefBase> obj; 400 CHECK(msg->findObject("response", &obj)); 401 sp<ARTSPResponse> response = 402 static_cast<ARTSPResponse *>(obj.get()); 403 404 if (response->mStatusCode == 302) { 405 ssize_t i = response->mHeaders.indexOfKey("location"); 406 CHECK_GE(i, 0); 407 408 mSessionURL = response->mHeaders.valueAt(i); 409 410 AString request; 411 request = "DESCRIBE "; 412 request.append(mSessionURL); 413 request.append(" RTSP/1.0\r\n"); 414 request.append("Accept: application/sdp\r\n"); 415 request.append("\r\n"); 416 417 sp<AMessage> reply = new AMessage('desc', id()); 418 mConn->sendRequest(request.c_str(), reply); 419 break; 420 } 421 422 if (response->mStatusCode != 200) { 423 result = UNKNOWN_ERROR; 424 } else { 425 mSessionDesc = new ASessionDescription; 426 427 mSessionDesc->setTo( 428 response->mContent->data(), 429 response->mContent->size()); 430 431 if (!mSessionDesc->isValid()) { 432 LOGE("Failed to parse session description."); 433 result = ERROR_MALFORMED; 434 } else { 435 ssize_t i = response->mHeaders.indexOfKey("content-base"); 436 if (i >= 0) { 437 mBaseURL = response->mHeaders.valueAt(i); 438 } else { 439 i = response->mHeaders.indexOfKey("content-location"); 440 if (i >= 0) { 441 mBaseURL = response->mHeaders.valueAt(i); 442 } else { 443 mBaseURL = mSessionURL; 444 } 445 } 446 447 if (!mBaseURL.startsWith("rtsp://")) { 448 // Some misbehaving servers specify a relative 449 // URL in one of the locations above, combine 450 // it with the absolute session URL to get 451 // something usable... 452 453 LOGW("Server specified a non-absolute base URL" 454 ", combining it with the session URL to " 455 "get something usable..."); 456 457 AString tmp; 458 CHECK(MakeURL( 459 mSessionURL.c_str(), 460 mBaseURL.c_str(), 461 &tmp)); 462 463 mBaseURL = tmp; 464 } 465 466 if (mSessionDesc->countTracks() < 2) { 467 // There's no actual tracks in this session. 468 // The first "track" is merely session meta 469 // data. 470 471 LOGW("Session doesn't contain any playable " 472 "tracks. Aborting."); 473 result = ERROR_UNSUPPORTED; 474 } else { 475 setupTrack(1); 476 } 477 } 478 } 479 } 480 481 if (result != OK) { 482 sp<AMessage> reply = new AMessage('disc', id()); 483 mConn->disconnect(reply); 484 } 485 break; 486 } 487 488 case 'setu': 489 { 490 size_t index; 491 CHECK(msg->findSize("index", &index)); 492 493 TrackInfo *track = NULL; 494 size_t trackIndex; 495 if (msg->findSize("track-index", &trackIndex)) { 496 track = &mTracks.editItemAt(trackIndex); 497 } 498 499 int32_t result; 500 CHECK(msg->findInt32("result", &result)); 501 502 LOGI("SETUP(%d) completed with result %d (%s)", 503 index, result, strerror(-result)); 504 505 if (result == OK) { 506 CHECK(track != NULL); 507 508 sp<RefBase> obj; 509 CHECK(msg->findObject("response", &obj)); 510 sp<ARTSPResponse> response = 511 static_cast<ARTSPResponse *>(obj.get()); 512 513 if (response->mStatusCode != 200) { 514 result = UNKNOWN_ERROR; 515 } else { 516 ssize_t i = response->mHeaders.indexOfKey("session"); 517 CHECK_GE(i, 0); 518 519 mSessionID = response->mHeaders.valueAt(i); 520 521 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 522 AString timeoutStr; 523 if (GetAttribute( 524 mSessionID.c_str(), "timeout", &timeoutStr)) { 525 char *end; 526 unsigned long timeoutSecs = 527 strtoul(timeoutStr.c_str(), &end, 10); 528 529 if (end == timeoutStr.c_str() || *end != '\0') { 530 LOGW("server specified malformed timeout '%s'", 531 timeoutStr.c_str()); 532 533 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 534 } else if (timeoutSecs < 15) { 535 LOGW("server specified too short a timeout " 536 "(%lu secs), using default.", 537 timeoutSecs); 538 539 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs; 540 } else { 541 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll; 542 543 LOGI("server specified timeout of %lu secs.", 544 timeoutSecs); 545 } 546 } 547 548 i = mSessionID.find(";"); 549 if (i >= 0) { 550 // Remove options, i.e. ";timeout=90" 551 mSessionID.erase(i, mSessionID.size() - i); 552 } 553 554 sp<AMessage> notify = new AMessage('accu', id()); 555 notify->setSize("track-index", trackIndex); 556 557 i = response->mHeaders.indexOfKey("transport"); 558 CHECK_GE(i, 0); 559 560 if (!track->mUsingInterleavedTCP) { 561 AString transport = response->mHeaders.valueAt(i); 562 563 // We are going to continue even if we were 564 // unable to poke a hole into the firewall... 565 pokeAHole( 566 track->mRTPSocket, 567 track->mRTCPSocket, 568 transport); 569 } 570 571 mRTPConn->addStream( 572 track->mRTPSocket, track->mRTCPSocket, 573 mSessionDesc, index, 574 notify, track->mUsingInterleavedTCP); 575 576 mSetupTracksSuccessful = true; 577 } 578 } 579 580 if (result != OK) { 581 if (track) { 582 if (!track->mUsingInterleavedTCP) { 583 // Clear the tag 584 if (mUIDValid) { 585 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 586 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 587 } 588 589 close(track->mRTPSocket); 590 close(track->mRTCPSocket); 591 } 592 593 mTracks.removeItemsAt(trackIndex); 594 } 595 } 596 597 ++index; 598 if (index < mSessionDesc->countTracks()) { 599 setupTrack(index); 600 } else if (mSetupTracksSuccessful) { 601 ++mKeepAliveGeneration; 602 postKeepAlive(); 603 604 AString request = "PLAY "; 605 request.append(mSessionURL); 606 request.append(" RTSP/1.0\r\n"); 607 608 request.append("Session: "); 609 request.append(mSessionID); 610 request.append("\r\n"); 611 612 request.append("\r\n"); 613 614 sp<AMessage> reply = new AMessage('play', id()); 615 mConn->sendRequest(request.c_str(), reply); 616 } else { 617 sp<AMessage> reply = new AMessage('disc', id()); 618 mConn->disconnect(reply); 619 } 620 break; 621 } 622 623 case 'play': 624 { 625 int32_t result; 626 CHECK(msg->findInt32("result", &result)); 627 628 LOGI("PLAY completed with result %d (%s)", 629 result, strerror(-result)); 630 631 if (result == OK) { 632 sp<RefBase> obj; 633 CHECK(msg->findObject("response", &obj)); 634 sp<ARTSPResponse> response = 635 static_cast<ARTSPResponse *>(obj.get()); 636 637 if (response->mStatusCode != 200) { 638 result = UNKNOWN_ERROR; 639 } else { 640 parsePlayResponse(response); 641 642 sp<AMessage> timeout = new AMessage('tiou', id()); 643 timeout->post(kStartupTimeoutUs); 644 } 645 } 646 647 if (result != OK) { 648 sp<AMessage> reply = new AMessage('disc', id()); 649 mConn->disconnect(reply); 650 } 651 652 break; 653 } 654 655 case 'aliv': 656 { 657 int32_t generation; 658 CHECK(msg->findInt32("generation", &generation)); 659 660 if (generation != mKeepAliveGeneration) { 661 // obsolete event. 662 break; 663 } 664 665 AString request; 666 request.append("OPTIONS "); 667 request.append(mSessionURL); 668 request.append(" RTSP/1.0\r\n"); 669 request.append("Session: "); 670 request.append(mSessionID); 671 request.append("\r\n"); 672 request.append("\r\n"); 673 674 sp<AMessage> reply = new AMessage('opts', id()); 675 reply->setInt32("generation", mKeepAliveGeneration); 676 mConn->sendRequest(request.c_str(), reply); 677 break; 678 } 679 680 case 'opts': 681 { 682 int32_t result; 683 CHECK(msg->findInt32("result", &result)); 684 685 LOGI("OPTIONS completed with result %d (%s)", 686 result, strerror(-result)); 687 688 int32_t generation; 689 CHECK(msg->findInt32("generation", &generation)); 690 691 if (generation != mKeepAliveGeneration) { 692 // obsolete event. 693 break; 694 } 695 696 postKeepAlive(); 697 break; 698 } 699 700 case 'abor': 701 { 702 for (size_t i = 0; i < mTracks.size(); ++i) { 703 TrackInfo *info = &mTracks.editItemAt(i); 704 705 if (!mFirstAccessUnit) { 706 postQueueEOS(i, ERROR_END_OF_STREAM); 707 } 708 709 if (!info->mUsingInterleavedTCP) { 710 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 711 712 // Clear the tag 713 if (mUIDValid) { 714 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 715 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 716 } 717 718 close(info->mRTPSocket); 719 close(info->mRTCPSocket); 720 } 721 } 722 mTracks.clear(); 723 mSetupTracksSuccessful = false; 724 mSeekPending = false; 725 mFirstAccessUnit = true; 726 mNTPAnchorUs = -1; 727 mMediaAnchorUs = -1; 728 mNumAccessUnitsReceived = 0; 729 mReceivedFirstRTCPPacket = false; 730 mReceivedFirstRTPPacket = false; 731 mSeekable = false; 732 733 sp<AMessage> reply = new AMessage('tear', id()); 734 735 int32_t reconnect; 736 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 737 reply->setInt32("reconnect", true); 738 } 739 740 AString request; 741 request = "TEARDOWN "; 742 743 // XXX should use aggregate url from SDP here... 744 request.append(mSessionURL); 745 request.append(" RTSP/1.0\r\n"); 746 747 request.append("Session: "); 748 request.append(mSessionID); 749 request.append("\r\n"); 750 751 request.append("\r\n"); 752 753 mConn->sendRequest(request.c_str(), reply); 754 break; 755 } 756 757 case 'tear': 758 { 759 int32_t result; 760 CHECK(msg->findInt32("result", &result)); 761 762 LOGI("TEARDOWN completed with result %d (%s)", 763 result, strerror(-result)); 764 765 sp<AMessage> reply = new AMessage('disc', id()); 766 767 int32_t reconnect; 768 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 769 reply->setInt32("reconnect", true); 770 } 771 772 mConn->disconnect(reply); 773 break; 774 } 775 776 case 'quit': 777 { 778 sp<AMessage> msg = mNotify->dup(); 779 msg->setInt32("what", kWhatDisconnected); 780 msg->setInt32("result", UNKNOWN_ERROR); 781 msg->post(); 782 break; 783 } 784 785 case 'chek': 786 { 787 int32_t generation; 788 CHECK(msg->findInt32("generation", &generation)); 789 if (generation != mCheckGeneration) { 790 // This is an outdated message. Ignore. 791 break; 792 } 793 794 if (mNumAccessUnitsReceived == 0) { 795 #if 1 796 LOGI("stream ended? aborting."); 797 (new AMessage('abor', id()))->post(); 798 break; 799 #else 800 LOGI("haven't seen an AU in a looong time."); 801 #endif 802 } 803 804 mNumAccessUnitsReceived = 0; 805 msg->post(kAccessUnitTimeoutUs); 806 break; 807 } 808 809 case 'accu': 810 { 811 int32_t timeUpdate; 812 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 813 size_t trackIndex; 814 CHECK(msg->findSize("track-index", &trackIndex)); 815 816 uint32_t rtpTime; 817 uint64_t ntpTime; 818 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 819 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 820 821 onTimeUpdate(trackIndex, rtpTime, ntpTime); 822 break; 823 } 824 825 int32_t first; 826 if (msg->findInt32("first-rtcp", &first)) { 827 mReceivedFirstRTCPPacket = true; 828 break; 829 } 830 831 if (msg->findInt32("first-rtp", &first)) { 832 mReceivedFirstRTPPacket = true; 833 break; 834 } 835 836 ++mNumAccessUnitsReceived; 837 postAccessUnitTimeoutCheck(); 838 839 size_t trackIndex; 840 CHECK(msg->findSize("track-index", &trackIndex)); 841 842 if (trackIndex >= mTracks.size()) { 843 LOGV("late packets ignored."); 844 break; 845 } 846 847 TrackInfo *track = &mTracks.editItemAt(trackIndex); 848 849 int32_t eos; 850 if (msg->findInt32("eos", &eos)) { 851 LOGI("received BYE on track index %d", trackIndex); 852 #if 0 853 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 854 #endif 855 return; 856 } 857 858 sp<RefBase> obj; 859 CHECK(msg->findObject("access-unit", &obj)); 860 861 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 862 863 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 864 865 if (mSeekPending) { 866 LOGV("we're seeking, dropping stale packet."); 867 break; 868 } 869 870 if (seqNum < track->mFirstSeqNumInSegment) { 871 LOGV("dropping stale access-unit (%d < %d)", 872 seqNum, track->mFirstSeqNumInSegment); 873 break; 874 } 875 876 if (track->mNewSegment) { 877 track->mNewSegment = false; 878 } 879 880 onAccessUnitComplete(trackIndex, accessUnit); 881 break; 882 } 883 884 case 'seek': 885 { 886 if (!mSeekable) { 887 LOGW("This is a live stream, ignoring seek request."); 888 889 sp<AMessage> msg = mNotify->dup(); 890 msg->setInt32("what", kWhatSeekDone); 891 msg->post(); 892 break; 893 } 894 895 int64_t timeUs; 896 CHECK(msg->findInt64("time", &timeUs)); 897 898 mSeekPending = true; 899 900 // Disable the access unit timeout until we resumed 901 // playback again. 902 mCheckPending = true; 903 ++mCheckGeneration; 904 905 AString request = "PAUSE "; 906 request.append(mSessionURL); 907 request.append(" RTSP/1.0\r\n"); 908 909 request.append("Session: "); 910 request.append(mSessionID); 911 request.append("\r\n"); 912 913 request.append("\r\n"); 914 915 sp<AMessage> reply = new AMessage('see1', id()); 916 reply->setInt64("time", timeUs); 917 mConn->sendRequest(request.c_str(), reply); 918 break; 919 } 920 921 case 'see1': 922 { 923 // Session is paused now. 924 for (size_t i = 0; i < mTracks.size(); ++i) { 925 TrackInfo *info = &mTracks.editItemAt(i); 926 927 postQueueSeekDiscontinuity(i); 928 929 info->mRTPAnchor = 0; 930 info->mNTPAnchorUs = -1; 931 } 932 933 mNTPAnchorUs = -1; 934 935 int64_t timeUs; 936 CHECK(msg->findInt64("time", &timeUs)); 937 938 AString request = "PLAY "; 939 request.append(mSessionURL); 940 request.append(" RTSP/1.0\r\n"); 941 942 request.append("Session: "); 943 request.append(mSessionID); 944 request.append("\r\n"); 945 946 request.append( 947 StringPrintf( 948 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 949 950 request.append("\r\n"); 951 952 sp<AMessage> reply = new AMessage('see2', id()); 953 mConn->sendRequest(request.c_str(), reply); 954 break; 955 } 956 957 case 'see2': 958 { 959 CHECK(mSeekPending); 960 961 int32_t result; 962 CHECK(msg->findInt32("result", &result)); 963 964 LOGI("PLAY completed with result %d (%s)", 965 result, strerror(-result)); 966 967 mCheckPending = false; 968 postAccessUnitTimeoutCheck(); 969 970 if (result == OK) { 971 sp<RefBase> obj; 972 CHECK(msg->findObject("response", &obj)); 973 sp<ARTSPResponse> response = 974 static_cast<ARTSPResponse *>(obj.get()); 975 976 if (response->mStatusCode != 200) { 977 result = UNKNOWN_ERROR; 978 } else { 979 parsePlayResponse(response); 980 981 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 982 CHECK_GE(i, 0); 983 984 LOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 985 986 LOGI("seek completed."); 987 } 988 } 989 990 if (result != OK) { 991 LOGE("seek failed, aborting."); 992 (new AMessage('abor', id()))->post(); 993 } 994 995 mSeekPending = false; 996 997 sp<AMessage> msg = mNotify->dup(); 998 msg->setInt32("what", kWhatSeekDone); 999 msg->post(); 1000 break; 1001 } 1002 1003 case 'biny': 1004 { 1005 sp<RefBase> obj; 1006 CHECK(msg->findObject("buffer", &obj)); 1007 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 1008 1009 int32_t index; 1010 CHECK(buffer->meta()->findInt32("index", &index)); 1011 1012 mRTPConn->injectPacket(index, buffer); 1013 break; 1014 } 1015 1016 case 'tiou': 1017 { 1018 if (!mReceivedFirstRTCPPacket) { 1019 if (mReceivedFirstRTPPacket && !mTryFakeRTCP) { 1020 LOGW("We received RTP packets but no RTCP packets, " 1021 "using fake timestamps."); 1022 1023 mTryFakeRTCP = true; 1024 1025 mReceivedFirstRTCPPacket = true; 1026 1027 fakeTimestamps(); 1028 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 1029 LOGW("Never received any data, switching transports."); 1030 1031 mTryTCPInterleaving = true; 1032 1033 sp<AMessage> msg = new AMessage('abor', id()); 1034 msg->setInt32("reconnect", true); 1035 msg->post(); 1036 } else { 1037 LOGW("Never received any data, disconnecting."); 1038 (new AMessage('abor', id()))->post(); 1039 } 1040 } 1041 break; 1042 } 1043 1044 default: 1045 TRESPASS(); 1046 break; 1047 } 1048 } 1049 1050 void postKeepAlive() { 1051 sp<AMessage> msg = new AMessage('aliv', id()); 1052 msg->setInt32("generation", mKeepAliveGeneration); 1053 msg->post((mKeepAliveTimeoutUs * 9) / 10); 1054 } 1055 1056 void postAccessUnitTimeoutCheck() { 1057 if (mCheckPending) { 1058 return; 1059 } 1060 1061 mCheckPending = true; 1062 sp<AMessage> check = new AMessage('chek', id()); 1063 check->setInt32("generation", mCheckGeneration); 1064 check->post(kAccessUnitTimeoutUs); 1065 } 1066 1067 static void SplitString( 1068 const AString &s, const char *separator, List<AString> *items) { 1069 items->clear(); 1070 size_t start = 0; 1071 while (start < s.size()) { 1072 ssize_t offset = s.find(separator, start); 1073 1074 if (offset < 0) { 1075 items->push_back(AString(s, start, s.size() - start)); 1076 break; 1077 } 1078 1079 items->push_back(AString(s, start, offset - start)); 1080 start = offset + strlen(separator); 1081 } 1082 } 1083 1084 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1085 mSeekable = false; 1086 1087 ssize_t i = response->mHeaders.indexOfKey("range"); 1088 if (i < 0) { 1089 // Server doesn't even tell use what range it is going to 1090 // play, therefore we won't support seeking. 1091 return; 1092 } 1093 1094 AString range = response->mHeaders.valueAt(i); 1095 LOGV("Range: %s", range.c_str()); 1096 1097 AString val; 1098 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1099 1100 float npt1, npt2; 1101 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1102 // This is a live stream and therefore not seekable. 1103 1104 LOGI("This is a live stream"); 1105 return; 1106 } 1107 1108 i = response->mHeaders.indexOfKey("rtp-info"); 1109 CHECK_GE(i, 0); 1110 1111 AString rtpInfo = response->mHeaders.valueAt(i); 1112 List<AString> streamInfos; 1113 SplitString(rtpInfo, ",", &streamInfos); 1114 1115 int n = 1; 1116 for (List<AString>::iterator it = streamInfos.begin(); 1117 it != streamInfos.end(); ++it) { 1118 (*it).trim(); 1119 LOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1120 1121 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1122 1123 size_t trackIndex = 0; 1124 while (trackIndex < mTracks.size() 1125 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1126 ++trackIndex; 1127 } 1128 CHECK_LT(trackIndex, mTracks.size()); 1129 1130 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1131 1132 char *end; 1133 unsigned long seq = strtoul(val.c_str(), &end, 10); 1134 1135 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1136 info->mFirstSeqNumInSegment = seq; 1137 info->mNewSegment = true; 1138 1139 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1140 1141 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1142 1143 LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1144 1145 info->mNormalPlayTimeRTP = rtpTime; 1146 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6); 1147 1148 if (!mFirstAccessUnit) { 1149 postNormalPlayTimeMapping( 1150 trackIndex, 1151 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1152 } 1153 1154 ++n; 1155 } 1156 1157 mSeekable = true; 1158 } 1159 1160 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) { 1161 CHECK_GE(index, 0u); 1162 CHECK_LT(index, mTracks.size()); 1163 1164 const TrackInfo &info = mTracks.itemAt(index); 1165 1166 *timeScale = info.mTimeScale; 1167 1168 return info.mPacketSource->getFormat(); 1169 } 1170 1171 size_t countTracks() const { 1172 return mTracks.size(); 1173 } 1174 1175 private: 1176 struct TrackInfo { 1177 AString mURL; 1178 int mRTPSocket; 1179 int mRTCPSocket; 1180 bool mUsingInterleavedTCP; 1181 uint32_t mFirstSeqNumInSegment; 1182 bool mNewSegment; 1183 1184 uint32_t mRTPAnchor; 1185 int64_t mNTPAnchorUs; 1186 int32_t mTimeScale; 1187 1188 uint32_t mNormalPlayTimeRTP; 1189 int64_t mNormalPlayTimeUs; 1190 1191 sp<APacketSource> mPacketSource; 1192 1193 // Stores packets temporarily while no notion of time 1194 // has been established yet. 1195 List<sp<ABuffer> > mPackets; 1196 }; 1197 1198 sp<AMessage> mNotify; 1199 bool mUIDValid; 1200 uid_t mUID; 1201 sp<ALooper> mNetLooper; 1202 sp<ARTSPConnection> mConn; 1203 sp<ARTPConnection> mRTPConn; 1204 sp<ASessionDescription> mSessionDesc; 1205 AString mOriginalSessionURL; // This one still has user:pass@ 1206 AString mSessionURL; 1207 AString mSessionHost; 1208 AString mBaseURL; 1209 AString mSessionID; 1210 bool mSetupTracksSuccessful; 1211 bool mSeekPending; 1212 bool mFirstAccessUnit; 1213 1214 int64_t mNTPAnchorUs; 1215 int64_t mMediaAnchorUs; 1216 int64_t mLastMediaTimeUs; 1217 1218 int64_t mNumAccessUnitsReceived; 1219 bool mCheckPending; 1220 int32_t mCheckGeneration; 1221 bool mTryTCPInterleaving; 1222 bool mTryFakeRTCP; 1223 bool mReceivedFirstRTCPPacket; 1224 bool mReceivedFirstRTPPacket; 1225 bool mSeekable; 1226 int64_t mKeepAliveTimeoutUs; 1227 int32_t mKeepAliveGeneration; 1228 1229 Vector<TrackInfo> mTracks; 1230 1231 void setupTrack(size_t index) { 1232 sp<APacketSource> source = 1233 new APacketSource(mSessionDesc, index); 1234 1235 if (source->initCheck() != OK) { 1236 LOGW("Unsupported format. Ignoring track #%d.", index); 1237 1238 sp<AMessage> reply = new AMessage('setu', id()); 1239 reply->setSize("index", index); 1240 reply->setInt32("result", ERROR_UNSUPPORTED); 1241 reply->post(); 1242 return; 1243 } 1244 1245 AString url; 1246 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1247 1248 AString trackURL; 1249 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1250 1251 mTracks.push(TrackInfo()); 1252 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1253 info->mURL = trackURL; 1254 info->mPacketSource = source; 1255 info->mUsingInterleavedTCP = false; 1256 info->mFirstSeqNumInSegment = 0; 1257 info->mNewSegment = true; 1258 info->mRTPAnchor = 0; 1259 info->mNTPAnchorUs = -1; 1260 info->mNormalPlayTimeRTP = 0; 1261 info->mNormalPlayTimeUs = 0ll; 1262 1263 unsigned long PT; 1264 AString formatDesc; 1265 AString formatParams; 1266 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1267 1268 int32_t timescale; 1269 int32_t numChannels; 1270 ASessionDescription::ParseFormatDesc( 1271 formatDesc.c_str(), ×cale, &numChannels); 1272 1273 info->mTimeScale = timescale; 1274 1275 LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1276 1277 AString request = "SETUP "; 1278 request.append(trackURL); 1279 request.append(" RTSP/1.0\r\n"); 1280 1281 if (mTryTCPInterleaving) { 1282 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1283 info->mUsingInterleavedTCP = true; 1284 info->mRTPSocket = interleaveIndex; 1285 info->mRTCPSocket = interleaveIndex + 1; 1286 1287 request.append("Transport: RTP/AVP/TCP;interleaved="); 1288 request.append(interleaveIndex); 1289 request.append("-"); 1290 request.append(interleaveIndex + 1); 1291 } else { 1292 unsigned rtpPort; 1293 ARTPConnection::MakePortPair( 1294 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1295 1296 if (mUIDValid) { 1297 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1298 (uint32_t)*(uint32_t*) "RTP_"); 1299 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1300 (uint32_t)*(uint32_t*) "RTP_"); 1301 } 1302 1303 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1304 request.append(rtpPort); 1305 request.append("-"); 1306 request.append(rtpPort + 1); 1307 } 1308 1309 request.append("\r\n"); 1310 1311 if (index > 1) { 1312 request.append("Session: "); 1313 request.append(mSessionID); 1314 request.append("\r\n"); 1315 } 1316 1317 request.append("\r\n"); 1318 1319 sp<AMessage> reply = new AMessage('setu', id()); 1320 reply->setSize("index", index); 1321 reply->setSize("track-index", mTracks.size() - 1); 1322 mConn->sendRequest(request.c_str(), reply); 1323 } 1324 1325 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1326 out->clear(); 1327 1328 if (strncasecmp("rtsp://", baseURL, 7)) { 1329 // Base URL must be absolute 1330 return false; 1331 } 1332 1333 if (!strncasecmp("rtsp://", url, 7)) { 1334 // "url" is already an absolute URL, ignore base URL. 1335 out->setTo(url); 1336 return true; 1337 } 1338 1339 size_t n = strlen(baseURL); 1340 if (baseURL[n - 1] == '/') { 1341 out->setTo(baseURL); 1342 out->append(url); 1343 } else { 1344 const char *slashPos = strrchr(baseURL, '/'); 1345 1346 if (slashPos > &baseURL[6]) { 1347 out->setTo(baseURL, slashPos - baseURL); 1348 } else { 1349 out->setTo(baseURL); 1350 } 1351 1352 out->append("/"); 1353 out->append(url); 1354 } 1355 1356 return true; 1357 } 1358 1359 void fakeTimestamps() { 1360 for (size_t i = 0; i < mTracks.size(); ++i) { 1361 onTimeUpdate(i, 0, 0ll); 1362 } 1363 } 1364 1365 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1366 LOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", 1367 trackIndex, rtpTime, ntpTime); 1368 1369 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1370 1371 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1372 1373 track->mRTPAnchor = rtpTime; 1374 track->mNTPAnchorUs = ntpTimeUs; 1375 1376 if (mNTPAnchorUs < 0) { 1377 mNTPAnchorUs = ntpTimeUs; 1378 mMediaAnchorUs = mLastMediaTimeUs; 1379 } 1380 } 1381 1382 void onAccessUnitComplete( 1383 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1384 LOGV("onAccessUnitComplete track %d", trackIndex); 1385 1386 if (mFirstAccessUnit) { 1387 sp<AMessage> msg = mNotify->dup(); 1388 msg->setInt32("what", kWhatConnected); 1389 msg->post(); 1390 1391 if (mSeekable) { 1392 for (size_t i = 0; i < mTracks.size(); ++i) { 1393 TrackInfo *info = &mTracks.editItemAt(i); 1394 1395 postNormalPlayTimeMapping( 1396 i, 1397 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs); 1398 } 1399 } 1400 1401 mFirstAccessUnit = false; 1402 } 1403 1404 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1405 1406 if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) { 1407 LOGV("storing accessUnit, no time established yet"); 1408 track->mPackets.push_back(accessUnit); 1409 return; 1410 } 1411 1412 while (!track->mPackets.empty()) { 1413 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1414 track->mPackets.erase(track->mPackets.begin()); 1415 1416 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1417 postQueueAccessUnit(trackIndex, accessUnit); 1418 } 1419 } 1420 1421 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1422 postQueueAccessUnit(trackIndex, accessUnit); 1423 } 1424 } 1425 1426 bool addMediaTimestamp( 1427 int32_t trackIndex, const TrackInfo *track, 1428 const sp<ABuffer> &accessUnit) { 1429 uint32_t rtpTime; 1430 CHECK(accessUnit->meta()->findInt32( 1431 "rtp-time", (int32_t *)&rtpTime)); 1432 1433 int64_t relRtpTimeUs = 1434 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1435 / track->mTimeScale; 1436 1437 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1438 1439 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1440 1441 if (mediaTimeUs > mLastMediaTimeUs) { 1442 mLastMediaTimeUs = mediaTimeUs; 1443 } 1444 1445 if (mediaTimeUs < 0) { 1446 LOGV("dropping early accessUnit."); 1447 return false; 1448 } 1449 1450 LOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", 1451 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); 1452 1453 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1454 1455 return true; 1456 } 1457 1458 void postQueueAccessUnit( 1459 size_t trackIndex, const sp<ABuffer> &accessUnit) { 1460 sp<AMessage> msg = mNotify->dup(); 1461 msg->setInt32("what", kWhatAccessUnit); 1462 msg->setSize("trackIndex", trackIndex); 1463 msg->setObject("accessUnit", accessUnit); 1464 msg->post(); 1465 } 1466 1467 void postQueueEOS(size_t trackIndex, status_t finalResult) { 1468 sp<AMessage> msg = mNotify->dup(); 1469 msg->setInt32("what", kWhatEOS); 1470 msg->setSize("trackIndex", trackIndex); 1471 msg->setInt32("finalResult", finalResult); 1472 msg->post(); 1473 } 1474 1475 void postQueueSeekDiscontinuity(size_t trackIndex) { 1476 sp<AMessage> msg = mNotify->dup(); 1477 msg->setInt32("what", kWhatSeekDiscontinuity); 1478 msg->setSize("trackIndex", trackIndex); 1479 msg->post(); 1480 } 1481 1482 void postNormalPlayTimeMapping( 1483 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) { 1484 sp<AMessage> msg = mNotify->dup(); 1485 msg->setInt32("what", kWhatNormalPlayTimeMapping); 1486 msg->setSize("trackIndex", trackIndex); 1487 msg->setInt32("rtpTime", rtpTime); 1488 msg->setInt64("nptUs", nptUs); 1489 msg->post(); 1490 } 1491 1492 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1493 }; 1494 1495 } // namespace android 1496 1497 #endif // MY_HANDLER_H_ 1498