1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MY_HANDLER_H_ 18 19 #define MY_HANDLER_H_ 20 21 //#define LOG_NDEBUG 0 22 #define LOG_TAG "MyHandler" 23 #include <utils/Log.h> 24 25 #include "APacketSource.h" 26 #include "ARTPConnection.h" 27 #include "ARTSPConnection.h" 28 #include "ASessionDescription.h" 29 30 #include <ctype.h> 31 #include <cutils/properties.h> 32 33 #include <media/stagefright/foundation/ABuffer.h> 34 #include <media/stagefright/foundation/ADebug.h> 35 #include <media/stagefright/foundation/ALooper.h> 36 #include <media/stagefright/foundation/AMessage.h> 37 #include <media/stagefright/MediaErrors.h> 38 39 #include <arpa/inet.h> 40 #include <sys/socket.h> 41 #include <netdb.h> 42 43 #include "HTTPBase.h" 44 45 // If no access units are received within 5 secs, assume that the rtp 46 // stream has ended and signal end of stream. 47 static int64_t kAccessUnitTimeoutUs = 5000000ll; 48 49 // If no access units arrive for the first 10 secs after starting the 50 // stream, assume none ever will and signal EOS or switch transports. 51 static int64_t kStartupTimeoutUs = 10000000ll; 52 53 namespace android { 54 55 static void MakeUserAgentString(AString *s) { 56 s->setTo("stagefright/1.1 (Linux;Android "); 57 58 #if (PROPERTY_VALUE_MAX < 8) 59 #error "PROPERTY_VALUE_MAX must be at least 8" 60 #endif 61 62 char value[PROPERTY_VALUE_MAX]; 63 property_get("ro.build.version.release", value, "Unknown"); 64 s->append(value); 65 s->append(")"); 66 } 67 68 static bool GetAttribute(const char *s, const char *key, AString *value) { 69 value->clear(); 70 71 size_t keyLen = strlen(key); 72 73 for (;;) { 74 while (isspace(*s)) { 75 ++s; 76 } 77 78 const char *colonPos = strchr(s, ';'); 79 80 size_t len = 81 (colonPos == NULL) ? strlen(s) : colonPos - s; 82 83 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) { 84 value->setTo(&s[keyLen + 1], len - keyLen - 1); 85 return true; 86 } 87 88 if (colonPos == NULL) { 89 return false; 90 } 91 92 s = colonPos + 1; 93 } 94 } 95 96 struct MyHandler : public AHandler { 97 MyHandler( 98 const char *url, const sp<ALooper> &looper, 99 bool uidValid = false, uid_t uid = 0) 100 : mUIDValid(uidValid), 101 mUID(uid), 102 mLooper(looper), 103 mNetLooper(new ALooper), 104 mConn(new ARTSPConnection(mUIDValid, mUID)), 105 mRTPConn(new ARTPConnection), 106 mOriginalSessionURL(url), 107 mSessionURL(url), 108 mSetupTracksSuccessful(false), 109 mSeekPending(false), 110 mFirstAccessUnit(true), 111 mNTPAnchorUs(-1), 112 mMediaAnchorUs(-1), 113 mLastMediaTimeUs(0), 114 mNumAccessUnitsReceived(0), 115 mCheckPending(false), 116 mCheckGeneration(0), 117 mTryTCPInterleaving(false), 118 mTryFakeRTCP(false), 119 mReceivedFirstRTCPPacket(false), 120 mReceivedFirstRTPPacket(false), 121 mSeekable(false) { 122 mNetLooper->setName("rtsp net"); 123 mNetLooper->start(false /* runOnCallingThread */, 124 false /* canCallJava */, 125 PRIORITY_HIGHEST); 126 127 // Strip any authentication info from the session url, we don't 128 // want to transmit user/pass in cleartext. 129 AString host, path, user, pass; 130 unsigned port; 131 CHECK(ARTSPConnection::ParseURL( 132 mSessionURL.c_str(), &host, &port, &path, &user, &pass)); 133 134 if (user.size() > 0) { 135 mSessionURL.clear(); 136 mSessionURL.append("rtsp://"); 137 mSessionURL.append(host); 138 mSessionURL.append(":"); 139 mSessionURL.append(StringPrintf("%u", port)); 140 mSessionURL.append(path); 141 142 LOGI("rewritten session url: '%s'", mSessionURL.c_str()); 143 } 144 145 mSessionHost = host; 146 } 147 148 void connect(const sp<AMessage> &doneMsg) { 149 mDoneMsg = doneMsg; 150 151 mLooper->registerHandler(this); 152 mLooper->registerHandler(mConn); 153 (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); 154 155 sp<AMessage> notify = new AMessage('biny', id()); 156 mConn->observeBinaryData(notify); 157 158 sp<AMessage> reply = new AMessage('conn', id()); 159 mConn->connect(mOriginalSessionURL.c_str(), reply); 160 } 161 162 void disconnect(const sp<AMessage> &doneMsg) { 163 mDoneMsg = doneMsg; 164 165 (new AMessage('abor', id()))->post(); 166 } 167 168 void seek(int64_t timeUs, const sp<AMessage> &doneMsg) { 169 sp<AMessage> msg = new AMessage('seek', id()); 170 msg->setInt64("time", timeUs); 171 msg->setMessage("doneMsg", doneMsg); 172 msg->post(); 173 } 174 175 int64_t getNormalPlayTimeUs() { 176 int64_t maxTimeUs = 0; 177 for (size_t i = 0; i < mTracks.size(); ++i) { 178 int64_t timeUs = mTracks.editItemAt(i).mPacketSource 179 ->getNormalPlayTimeUs(); 180 181 if (i == 0 || timeUs > maxTimeUs) { 182 maxTimeUs = timeUs; 183 } 184 } 185 186 return maxTimeUs; 187 } 188 189 static void addRR(const sp<ABuffer> &buf) { 190 uint8_t *ptr = buf->data() + buf->size(); 191 ptr[0] = 0x80 | 0; 192 ptr[1] = 201; // RR 193 ptr[2] = 0; 194 ptr[3] = 1; 195 ptr[4] = 0xde; // SSRC 196 ptr[5] = 0xad; 197 ptr[6] = 0xbe; 198 ptr[7] = 0xef; 199 200 buf->setRange(0, buf->size() + 8); 201 } 202 203 static void addSDES(int s, const sp<ABuffer> &buffer) { 204 struct sockaddr_in addr; 205 socklen_t addrSize = sizeof(addr); 206 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize)); 207 208 uint8_t *data = buffer->data() + buffer->size(); 209 data[0] = 0x80 | 1; 210 data[1] = 202; // SDES 211 data[4] = 0xde; // SSRC 212 data[5] = 0xad; 213 data[6] = 0xbe; 214 data[7] = 0xef; 215 216 size_t offset = 8; 217 218 data[offset++] = 1; // CNAME 219 220 AString cname = "stagefright@"; 221 cname.append(inet_ntoa(addr.sin_addr)); 222 data[offset++] = cname.size(); 223 224 memcpy(&data[offset], cname.c_str(), cname.size()); 225 offset += cname.size(); 226 227 data[offset++] = 6; // TOOL 228 229 AString tool; 230 MakeUserAgentString(&tool); 231 232 data[offset++] = tool.size(); 233 234 memcpy(&data[offset], tool.c_str(), tool.size()); 235 offset += tool.size(); 236 237 data[offset++] = 0; 238 239 if ((offset % 4) > 0) { 240 size_t count = 4 - (offset % 4); 241 switch (count) { 242 case 3: 243 data[offset++] = 0; 244 case 2: 245 data[offset++] = 0; 246 case 1: 247 data[offset++] = 0; 248 } 249 } 250 251 size_t numWords = (offset / 4) - 1; 252 data[2] = numWords >> 8; 253 data[3] = numWords & 0xff; 254 255 buffer->setRange(buffer->offset(), buffer->size() + offset); 256 } 257 258 // In case we're behind NAT, fire off two UDP packets to the remote 259 // rtp/rtcp ports to poke a hole into the firewall for future incoming 260 // packets. We're going to send an RR/SDES RTCP packet to both of them. 261 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) { 262 struct sockaddr_in addr; 263 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 264 addr.sin_family = AF_INET; 265 266 AString source; 267 AString server_port; 268 if (!GetAttribute(transport.c_str(), 269 "source", 270 &source)) { 271 LOGW("Missing 'source' field in Transport response. Using " 272 "RTSP endpoint address."); 273 274 struct hostent *ent = gethostbyname(mSessionHost.c_str()); 275 if (ent == NULL) { 276 LOGE("Failed to look up address of session host '%s'", 277 mSessionHost.c_str()); 278 279 return false; 280 } 281 282 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 283 } else { 284 addr.sin_addr.s_addr = inet_addr(source.c_str()); 285 } 286 287 if (!GetAttribute(transport.c_str(), 288 "server_port", 289 &server_port)) { 290 LOGI("Missing 'server_port' field in Transport response."); 291 return false; 292 } 293 294 int rtpPort, rtcpPort; 295 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2 296 || rtpPort <= 0 || rtpPort > 65535 297 || rtcpPort <=0 || rtcpPort > 65535 298 || rtcpPort != rtpPort + 1) { 299 LOGE("Server picked invalid RTP/RTCP port pair %s," 300 " RTP port must be even, RTCP port must be one higher.", 301 server_port.c_str()); 302 303 return false; 304 } 305 306 if (rtpPort & 1) { 307 LOGW("Server picked an odd RTP port, it should've picked an " 308 "even one, we'll let it pass for now, but this may break " 309 "in the future."); 310 } 311 312 if (addr.sin_addr.s_addr == INADDR_NONE) { 313 return true; 314 } 315 316 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) { 317 // No firewalls to traverse on the loopback interface. 318 return true; 319 } 320 321 // Make up an RR/SDES RTCP packet. 322 sp<ABuffer> buf = new ABuffer(65536); 323 buf->setRange(0, 0); 324 addRR(buf); 325 addSDES(rtpSocket, buf); 326 327 addr.sin_port = htons(rtpPort); 328 329 ssize_t n = sendto( 330 rtpSocket, buf->data(), buf->size(), 0, 331 (const sockaddr *)&addr, sizeof(addr)); 332 333 if (n < (ssize_t)buf->size()) { 334 LOGE("failed to poke a hole for RTP packets"); 335 return false; 336 } 337 338 addr.sin_port = htons(rtcpPort); 339 340 n = sendto( 341 rtcpSocket, buf->data(), buf->size(), 0, 342 (const sockaddr *)&addr, sizeof(addr)); 343 344 if (n < (ssize_t)buf->size()) { 345 LOGE("failed to poke a hole for RTCP packets"); 346 return false; 347 } 348 349 LOGV("successfully poked holes."); 350 351 return true; 352 } 353 354 virtual void onMessageReceived(const sp<AMessage> &msg) { 355 switch (msg->what()) { 356 case 'conn': 357 { 358 int32_t result; 359 CHECK(msg->findInt32("result", &result)); 360 361 LOGI("connection request completed with result %d (%s)", 362 result, strerror(-result)); 363 364 if (result == OK) { 365 AString request; 366 request = "DESCRIBE "; 367 request.append(mSessionURL); 368 request.append(" RTSP/1.0\r\n"); 369 request.append("Accept: application/sdp\r\n"); 370 request.append("\r\n"); 371 372 sp<AMessage> reply = new AMessage('desc', id()); 373 mConn->sendRequest(request.c_str(), reply); 374 } else { 375 (new AMessage('disc', id()))->post(); 376 } 377 break; 378 } 379 380 case 'disc': 381 { 382 int32_t reconnect; 383 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 384 sp<AMessage> reply = new AMessage('conn', id()); 385 mConn->connect(mOriginalSessionURL.c_str(), reply); 386 } else { 387 (new AMessage('quit', id()))->post(); 388 } 389 break; 390 } 391 392 case 'desc': 393 { 394 int32_t result; 395 CHECK(msg->findInt32("result", &result)); 396 397 LOGI("DESCRIBE completed with result %d (%s)", 398 result, strerror(-result)); 399 400 if (result == OK) { 401 sp<RefBase> obj; 402 CHECK(msg->findObject("response", &obj)); 403 sp<ARTSPResponse> response = 404 static_cast<ARTSPResponse *>(obj.get()); 405 406 if (response->mStatusCode == 302) { 407 ssize_t i = response->mHeaders.indexOfKey("location"); 408 CHECK_GE(i, 0); 409 410 mSessionURL = response->mHeaders.valueAt(i); 411 412 AString request; 413 request = "DESCRIBE "; 414 request.append(mSessionURL); 415 request.append(" RTSP/1.0\r\n"); 416 request.append("Accept: application/sdp\r\n"); 417 request.append("\r\n"); 418 419 sp<AMessage> reply = new AMessage('desc', id()); 420 mConn->sendRequest(request.c_str(), reply); 421 break; 422 } 423 424 if (response->mStatusCode != 200) { 425 result = UNKNOWN_ERROR; 426 } else { 427 mSessionDesc = new ASessionDescription; 428 429 mSessionDesc->setTo( 430 response->mContent->data(), 431 response->mContent->size()); 432 433 if (!mSessionDesc->isValid()) { 434 LOGE("Failed to parse session description."); 435 result = ERROR_MALFORMED; 436 } else { 437 ssize_t i = response->mHeaders.indexOfKey("content-base"); 438 if (i >= 0) { 439 mBaseURL = response->mHeaders.valueAt(i); 440 } else { 441 i = response->mHeaders.indexOfKey("content-location"); 442 if (i >= 0) { 443 mBaseURL = response->mHeaders.valueAt(i); 444 } else { 445 mBaseURL = mSessionURL; 446 } 447 } 448 449 if (!mBaseURL.startsWith("rtsp://")) { 450 // Some misbehaving servers specify a relative 451 // URL in one of the locations above, combine 452 // it with the absolute session URL to get 453 // something usable... 454 455 LOGW("Server specified a non-absolute base URL" 456 ", combining it with the session URL to " 457 "get something usable..."); 458 459 AString tmp; 460 CHECK(MakeURL( 461 mSessionURL.c_str(), 462 mBaseURL.c_str(), 463 &tmp)); 464 465 mBaseURL = tmp; 466 } 467 468 CHECK_GT(mSessionDesc->countTracks(), 1u); 469 setupTrack(1); 470 } 471 } 472 } 473 474 if (result != OK) { 475 sp<AMessage> reply = new AMessage('disc', id()); 476 mConn->disconnect(reply); 477 } 478 break; 479 } 480 481 case 'setu': 482 { 483 size_t index; 484 CHECK(msg->findSize("index", &index)); 485 486 TrackInfo *track = NULL; 487 size_t trackIndex; 488 if (msg->findSize("track-index", &trackIndex)) { 489 track = &mTracks.editItemAt(trackIndex); 490 } 491 492 int32_t result; 493 CHECK(msg->findInt32("result", &result)); 494 495 LOGI("SETUP(%d) completed with result %d (%s)", 496 index, result, strerror(-result)); 497 498 if (result == OK) { 499 CHECK(track != NULL); 500 501 sp<RefBase> obj; 502 CHECK(msg->findObject("response", &obj)); 503 sp<ARTSPResponse> response = 504 static_cast<ARTSPResponse *>(obj.get()); 505 506 if (response->mStatusCode != 200) { 507 result = UNKNOWN_ERROR; 508 } else { 509 ssize_t i = response->mHeaders.indexOfKey("session"); 510 CHECK_GE(i, 0); 511 512 mSessionID = response->mHeaders.valueAt(i); 513 i = mSessionID.find(";"); 514 if (i >= 0) { 515 // Remove options, i.e. ";timeout=90" 516 mSessionID.erase(i, mSessionID.size() - i); 517 } 518 519 sp<AMessage> notify = new AMessage('accu', id()); 520 notify->setSize("track-index", trackIndex); 521 522 i = response->mHeaders.indexOfKey("transport"); 523 CHECK_GE(i, 0); 524 525 if (!track->mUsingInterleavedTCP) { 526 AString transport = response->mHeaders.valueAt(i); 527 528 // We are going to continue even if we were 529 // unable to poke a hole into the firewall... 530 pokeAHole( 531 track->mRTPSocket, 532 track->mRTCPSocket, 533 transport); 534 } 535 536 mRTPConn->addStream( 537 track->mRTPSocket, track->mRTCPSocket, 538 mSessionDesc, index, 539 notify, track->mUsingInterleavedTCP); 540 541 mSetupTracksSuccessful = true; 542 } 543 } 544 545 if (result != OK) { 546 if (track) { 547 if (!track->mUsingInterleavedTCP) { 548 // Clear the tag 549 if (mUIDValid) { 550 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket); 551 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket); 552 } 553 554 close(track->mRTPSocket); 555 close(track->mRTCPSocket); 556 } 557 558 mTracks.removeItemsAt(trackIndex); 559 } 560 } 561 562 ++index; 563 if (index < mSessionDesc->countTracks()) { 564 setupTrack(index); 565 } else if (mSetupTracksSuccessful) { 566 AString request = "PLAY "; 567 request.append(mSessionURL); 568 request.append(" RTSP/1.0\r\n"); 569 570 request.append("Session: "); 571 request.append(mSessionID); 572 request.append("\r\n"); 573 574 request.append("\r\n"); 575 576 sp<AMessage> reply = new AMessage('play', id()); 577 mConn->sendRequest(request.c_str(), reply); 578 } else { 579 sp<AMessage> reply = new AMessage('disc', id()); 580 mConn->disconnect(reply); 581 } 582 break; 583 } 584 585 case 'play': 586 { 587 int32_t result; 588 CHECK(msg->findInt32("result", &result)); 589 590 LOGI("PLAY completed with result %d (%s)", 591 result, strerror(-result)); 592 593 if (result == OK) { 594 sp<RefBase> obj; 595 CHECK(msg->findObject("response", &obj)); 596 sp<ARTSPResponse> response = 597 static_cast<ARTSPResponse *>(obj.get()); 598 599 if (response->mStatusCode != 200) { 600 result = UNKNOWN_ERROR; 601 } else { 602 parsePlayResponse(response); 603 604 sp<AMessage> timeout = new AMessage('tiou', id()); 605 timeout->post(kStartupTimeoutUs); 606 } 607 } 608 609 if (result != OK) { 610 sp<AMessage> reply = new AMessage('disc', id()); 611 mConn->disconnect(reply); 612 } 613 614 break; 615 } 616 617 case 'abor': 618 { 619 for (size_t i = 0; i < mTracks.size(); ++i) { 620 TrackInfo *info = &mTracks.editItemAt(i); 621 622 info->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 623 624 if (!info->mUsingInterleavedTCP) { 625 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket); 626 627 // Clear the tag 628 if (mUIDValid) { 629 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket); 630 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket); 631 } 632 633 close(info->mRTPSocket); 634 close(info->mRTCPSocket); 635 } 636 } 637 mTracks.clear(); 638 mSetupTracksSuccessful = false; 639 mSeekPending = false; 640 mFirstAccessUnit = true; 641 mNTPAnchorUs = -1; 642 mMediaAnchorUs = -1; 643 mNumAccessUnitsReceived = 0; 644 mReceivedFirstRTCPPacket = false; 645 mReceivedFirstRTPPacket = false; 646 mSeekable = false; 647 648 sp<AMessage> reply = new AMessage('tear', id()); 649 650 int32_t reconnect; 651 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 652 reply->setInt32("reconnect", true); 653 } 654 655 AString request; 656 request = "TEARDOWN "; 657 658 // XXX should use aggregate url from SDP here... 659 request.append(mSessionURL); 660 request.append(" RTSP/1.0\r\n"); 661 662 request.append("Session: "); 663 request.append(mSessionID); 664 request.append("\r\n"); 665 666 request.append("\r\n"); 667 668 mConn->sendRequest(request.c_str(), reply); 669 break; 670 } 671 672 case 'tear': 673 { 674 int32_t result; 675 CHECK(msg->findInt32("result", &result)); 676 677 LOGI("TEARDOWN completed with result %d (%s)", 678 result, strerror(-result)); 679 680 sp<AMessage> reply = new AMessage('disc', id()); 681 682 int32_t reconnect; 683 if (msg->findInt32("reconnect", &reconnect) && reconnect) { 684 reply->setInt32("reconnect", true); 685 } 686 687 mConn->disconnect(reply); 688 break; 689 } 690 691 case 'quit': 692 { 693 if (mDoneMsg != NULL) { 694 mDoneMsg->setInt32("result", UNKNOWN_ERROR); 695 mDoneMsg->post(); 696 mDoneMsg = NULL; 697 } 698 break; 699 } 700 701 case 'chek': 702 { 703 int32_t generation; 704 CHECK(msg->findInt32("generation", &generation)); 705 if (generation != mCheckGeneration) { 706 // This is an outdated message. Ignore. 707 break; 708 } 709 710 if (mNumAccessUnitsReceived == 0) { 711 LOGI("stream ended? aborting."); 712 (new AMessage('abor', id()))->post(); 713 break; 714 } 715 716 mNumAccessUnitsReceived = 0; 717 msg->post(kAccessUnitTimeoutUs); 718 break; 719 } 720 721 case 'accu': 722 { 723 int32_t timeUpdate; 724 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) { 725 size_t trackIndex; 726 CHECK(msg->findSize("track-index", &trackIndex)); 727 728 uint32_t rtpTime; 729 uint64_t ntpTime; 730 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime)); 731 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime)); 732 733 onTimeUpdate(trackIndex, rtpTime, ntpTime); 734 break; 735 } 736 737 int32_t first; 738 if (msg->findInt32("first-rtcp", &first)) { 739 mReceivedFirstRTCPPacket = true; 740 break; 741 } 742 743 if (msg->findInt32("first-rtp", &first)) { 744 mReceivedFirstRTPPacket = true; 745 break; 746 } 747 748 ++mNumAccessUnitsReceived; 749 postAccessUnitTimeoutCheck(); 750 751 size_t trackIndex; 752 CHECK(msg->findSize("track-index", &trackIndex)); 753 754 if (trackIndex >= mTracks.size()) { 755 LOGV("late packets ignored."); 756 break; 757 } 758 759 TrackInfo *track = &mTracks.editItemAt(trackIndex); 760 761 int32_t eos; 762 if (msg->findInt32("eos", &eos)) { 763 LOGI("received BYE on track index %d", trackIndex); 764 #if 0 765 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 766 #endif 767 return; 768 } 769 770 sp<RefBase> obj; 771 CHECK(msg->findObject("access-unit", &obj)); 772 773 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 774 775 uint32_t seqNum = (uint32_t)accessUnit->int32Data(); 776 777 if (mSeekPending) { 778 LOGV("we're seeking, dropping stale packet."); 779 break; 780 } 781 782 if (seqNum < track->mFirstSeqNumInSegment) { 783 LOGV("dropping stale access-unit (%d < %d)", 784 seqNum, track->mFirstSeqNumInSegment); 785 break; 786 } 787 788 if (track->mNewSegment) { 789 track->mNewSegment = false; 790 } 791 792 onAccessUnitComplete(trackIndex, accessUnit); 793 break; 794 } 795 796 case 'seek': 797 { 798 sp<AMessage> doneMsg; 799 CHECK(msg->findMessage("doneMsg", &doneMsg)); 800 801 if (mSeekPending) { 802 doneMsg->post(); 803 break; 804 } 805 806 if (!mSeekable) { 807 LOGW("This is a live stream, ignoring seek request."); 808 doneMsg->post(); 809 break; 810 } 811 812 int64_t timeUs; 813 CHECK(msg->findInt64("time", &timeUs)); 814 815 mSeekPending = true; 816 817 // Disable the access unit timeout until we resumed 818 // playback again. 819 mCheckPending = true; 820 ++mCheckGeneration; 821 822 AString request = "PAUSE "; 823 request.append(mSessionURL); 824 request.append(" RTSP/1.0\r\n"); 825 826 request.append("Session: "); 827 request.append(mSessionID); 828 request.append("\r\n"); 829 830 request.append("\r\n"); 831 832 sp<AMessage> reply = new AMessage('see1', id()); 833 reply->setInt64("time", timeUs); 834 reply->setMessage("doneMsg", doneMsg); 835 mConn->sendRequest(request.c_str(), reply); 836 break; 837 } 838 839 case 'see1': 840 { 841 // Session is paused now. 842 for (size_t i = 0; i < mTracks.size(); ++i) { 843 TrackInfo *info = &mTracks.editItemAt(i); 844 845 info->mPacketSource->flushQueue(); 846 info->mRTPAnchor = 0; 847 info->mNTPAnchorUs = -1; 848 } 849 850 mNTPAnchorUs = -1; 851 852 int64_t timeUs; 853 CHECK(msg->findInt64("time", &timeUs)); 854 855 AString request = "PLAY "; 856 request.append(mSessionURL); 857 request.append(" RTSP/1.0\r\n"); 858 859 request.append("Session: "); 860 request.append(mSessionID); 861 request.append("\r\n"); 862 863 request.append( 864 StringPrintf( 865 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 866 867 request.append("\r\n"); 868 869 sp<AMessage> doneMsg; 870 CHECK(msg->findMessage("doneMsg", &doneMsg)); 871 872 sp<AMessage> reply = new AMessage('see2', id()); 873 reply->setMessage("doneMsg", doneMsg); 874 mConn->sendRequest(request.c_str(), reply); 875 break; 876 } 877 878 case 'see2': 879 { 880 CHECK(mSeekPending); 881 882 int32_t result; 883 CHECK(msg->findInt32("result", &result)); 884 885 LOGI("PLAY completed with result %d (%s)", 886 result, strerror(-result)); 887 888 mCheckPending = false; 889 postAccessUnitTimeoutCheck(); 890 891 if (result == OK) { 892 sp<RefBase> obj; 893 CHECK(msg->findObject("response", &obj)); 894 sp<ARTSPResponse> response = 895 static_cast<ARTSPResponse *>(obj.get()); 896 897 if (response->mStatusCode != 200) { 898 result = UNKNOWN_ERROR; 899 } else { 900 parsePlayResponse(response); 901 902 ssize_t i = response->mHeaders.indexOfKey("rtp-info"); 903 CHECK_GE(i, 0); 904 905 LOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str()); 906 907 LOGI("seek completed."); 908 } 909 } 910 911 if (result != OK) { 912 LOGE("seek failed, aborting."); 913 (new AMessage('abor', id()))->post(); 914 } 915 916 mSeekPending = false; 917 918 sp<AMessage> doneMsg; 919 CHECK(msg->findMessage("doneMsg", &doneMsg)); 920 921 doneMsg->post(); 922 break; 923 } 924 925 case 'biny': 926 { 927 sp<RefBase> obj; 928 CHECK(msg->findObject("buffer", &obj)); 929 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 930 931 int32_t index; 932 CHECK(buffer->meta()->findInt32("index", &index)); 933 934 mRTPConn->injectPacket(index, buffer); 935 break; 936 } 937 938 case 'tiou': 939 { 940 if (!mReceivedFirstRTCPPacket) { 941 if (mReceivedFirstRTPPacket && !mTryFakeRTCP) { 942 LOGW("We received RTP packets but no RTCP packets, " 943 "using fake timestamps."); 944 945 mTryFakeRTCP = true; 946 947 mReceivedFirstRTCPPacket = true; 948 949 fakeTimestamps(); 950 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) { 951 LOGW("Never received any data, switching transports."); 952 953 mTryTCPInterleaving = true; 954 955 sp<AMessage> msg = new AMessage('abor', id()); 956 msg->setInt32("reconnect", true); 957 msg->post(); 958 } else { 959 LOGW("Never received any data, disconnecting."); 960 (new AMessage('abor', id()))->post(); 961 } 962 } 963 break; 964 } 965 966 default: 967 TRESPASS(); 968 break; 969 } 970 } 971 972 void postAccessUnitTimeoutCheck() { 973 if (mCheckPending) { 974 return; 975 } 976 977 mCheckPending = true; 978 sp<AMessage> check = new AMessage('chek', id()); 979 check->setInt32("generation", mCheckGeneration); 980 check->post(kAccessUnitTimeoutUs); 981 } 982 983 static void SplitString( 984 const AString &s, const char *separator, List<AString> *items) { 985 items->clear(); 986 size_t start = 0; 987 while (start < s.size()) { 988 ssize_t offset = s.find(separator, start); 989 990 if (offset < 0) { 991 items->push_back(AString(s, start, s.size() - start)); 992 break; 993 } 994 995 items->push_back(AString(s, start, offset - start)); 996 start = offset + strlen(separator); 997 } 998 } 999 1000 void parsePlayResponse(const sp<ARTSPResponse> &response) { 1001 mSeekable = false; 1002 1003 ssize_t i = response->mHeaders.indexOfKey("range"); 1004 if (i < 0) { 1005 // Server doesn't even tell use what range it is going to 1006 // play, therefore we won't support seeking. 1007 return; 1008 } 1009 1010 AString range = response->mHeaders.valueAt(i); 1011 LOGV("Range: %s", range.c_str()); 1012 1013 AString val; 1014 CHECK(GetAttribute(range.c_str(), "npt", &val)); 1015 1016 float npt1, npt2; 1017 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) { 1018 // This is a live stream and therefore not seekable. 1019 return; 1020 } 1021 1022 i = response->mHeaders.indexOfKey("rtp-info"); 1023 CHECK_GE(i, 0); 1024 1025 AString rtpInfo = response->mHeaders.valueAt(i); 1026 List<AString> streamInfos; 1027 SplitString(rtpInfo, ",", &streamInfos); 1028 1029 int n = 1; 1030 for (List<AString>::iterator it = streamInfos.begin(); 1031 it != streamInfos.end(); ++it) { 1032 (*it).trim(); 1033 LOGV("streamInfo[%d] = %s", n, (*it).c_str()); 1034 1035 CHECK(GetAttribute((*it).c_str(), "url", &val)); 1036 1037 size_t trackIndex = 0; 1038 while (trackIndex < mTracks.size() 1039 && !(val == mTracks.editItemAt(trackIndex).mURL)) { 1040 ++trackIndex; 1041 } 1042 CHECK_LT(trackIndex, mTracks.size()); 1043 1044 CHECK(GetAttribute((*it).c_str(), "seq", &val)); 1045 1046 char *end; 1047 unsigned long seq = strtoul(val.c_str(), &end, 10); 1048 1049 TrackInfo *info = &mTracks.editItemAt(trackIndex); 1050 info->mFirstSeqNumInSegment = seq; 1051 info->mNewSegment = true; 1052 1053 CHECK(GetAttribute((*it).c_str(), "rtptime", &val)); 1054 1055 uint32_t rtpTime = strtoul(val.c_str(), &end, 10); 1056 1057 LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1); 1058 1059 info->mPacketSource->setNormalPlayTimeMapping( 1060 rtpTime, (int64_t)(npt1 * 1E6)); 1061 1062 ++n; 1063 } 1064 1065 mSeekable = true; 1066 } 1067 1068 sp<APacketSource> getPacketSource(size_t index) { 1069 CHECK_GE(index, 0u); 1070 CHECK_LT(index, mTracks.size()); 1071 1072 return mTracks.editItemAt(index).mPacketSource; 1073 } 1074 1075 size_t countTracks() const { 1076 return mTracks.size(); 1077 } 1078 1079 private: 1080 struct TrackInfo { 1081 AString mURL; 1082 int mRTPSocket; 1083 int mRTCPSocket; 1084 bool mUsingInterleavedTCP; 1085 uint32_t mFirstSeqNumInSegment; 1086 bool mNewSegment; 1087 1088 uint32_t mRTPAnchor; 1089 int64_t mNTPAnchorUs; 1090 int32_t mTimeScale; 1091 1092 sp<APacketSource> mPacketSource; 1093 1094 // Stores packets temporarily while no notion of time 1095 // has been established yet. 1096 List<sp<ABuffer> > mPackets; 1097 }; 1098 1099 bool mUIDValid; 1100 uid_t mUID; 1101 sp<ALooper> mLooper; 1102 sp<ALooper> mNetLooper; 1103 sp<ARTSPConnection> mConn; 1104 sp<ARTPConnection> mRTPConn; 1105 sp<ASessionDescription> mSessionDesc; 1106 AString mOriginalSessionURL; // This one still has user:pass@ 1107 AString mSessionURL; 1108 AString mSessionHost; 1109 AString mBaseURL; 1110 AString mSessionID; 1111 bool mSetupTracksSuccessful; 1112 bool mSeekPending; 1113 bool mFirstAccessUnit; 1114 1115 int64_t mNTPAnchorUs; 1116 int64_t mMediaAnchorUs; 1117 int64_t mLastMediaTimeUs; 1118 1119 int64_t mNumAccessUnitsReceived; 1120 bool mCheckPending; 1121 int32_t mCheckGeneration; 1122 bool mTryTCPInterleaving; 1123 bool mTryFakeRTCP; 1124 bool mReceivedFirstRTCPPacket; 1125 bool mReceivedFirstRTPPacket; 1126 bool mSeekable; 1127 1128 Vector<TrackInfo> mTracks; 1129 1130 sp<AMessage> mDoneMsg; 1131 1132 void setupTrack(size_t index) { 1133 sp<APacketSource> source = 1134 new APacketSource(mSessionDesc, index); 1135 1136 if (source->initCheck() != OK) { 1137 LOGW("Unsupported format. Ignoring track #%d.", index); 1138 1139 sp<AMessage> reply = new AMessage('setu', id()); 1140 reply->setSize("index", index); 1141 reply->setInt32("result", ERROR_UNSUPPORTED); 1142 reply->post(); 1143 return; 1144 } 1145 1146 AString url; 1147 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 1148 1149 AString trackURL; 1150 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 1151 1152 mTracks.push(TrackInfo()); 1153 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 1154 info->mURL = trackURL; 1155 info->mPacketSource = source; 1156 info->mUsingInterleavedTCP = false; 1157 info->mFirstSeqNumInSegment = 0; 1158 info->mNewSegment = true; 1159 info->mRTPAnchor = 0; 1160 info->mNTPAnchorUs = -1; 1161 1162 unsigned long PT; 1163 AString formatDesc; 1164 AString formatParams; 1165 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams); 1166 1167 int32_t timescale; 1168 int32_t numChannels; 1169 ASessionDescription::ParseFormatDesc( 1170 formatDesc.c_str(), ×cale, &numChannels); 1171 1172 info->mTimeScale = timescale; 1173 1174 LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str()); 1175 1176 AString request = "SETUP "; 1177 request.append(trackURL); 1178 request.append(" RTSP/1.0\r\n"); 1179 1180 if (mTryTCPInterleaving) { 1181 size_t interleaveIndex = 2 * (mTracks.size() - 1); 1182 info->mUsingInterleavedTCP = true; 1183 info->mRTPSocket = interleaveIndex; 1184 info->mRTCPSocket = interleaveIndex + 1; 1185 1186 request.append("Transport: RTP/AVP/TCP;interleaved="); 1187 request.append(interleaveIndex); 1188 request.append("-"); 1189 request.append(interleaveIndex + 1); 1190 } else { 1191 unsigned rtpPort; 1192 ARTPConnection::MakePortPair( 1193 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 1194 1195 if (mUIDValid) { 1196 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID, 1197 (uint32_t)*(uint32_t*) "RTP_"); 1198 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID, 1199 (uint32_t)*(uint32_t*) "RTP_"); 1200 } 1201 1202 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 1203 request.append(rtpPort); 1204 request.append("-"); 1205 request.append(rtpPort + 1); 1206 } 1207 1208 request.append("\r\n"); 1209 1210 if (index > 1) { 1211 request.append("Session: "); 1212 request.append(mSessionID); 1213 request.append("\r\n"); 1214 } 1215 1216 request.append("\r\n"); 1217 1218 sp<AMessage> reply = new AMessage('setu', id()); 1219 reply->setSize("index", index); 1220 reply->setSize("track-index", mTracks.size() - 1); 1221 mConn->sendRequest(request.c_str(), reply); 1222 } 1223 1224 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 1225 out->clear(); 1226 1227 if (strncasecmp("rtsp://", baseURL, 7)) { 1228 // Base URL must be absolute 1229 return false; 1230 } 1231 1232 if (!strncasecmp("rtsp://", url, 7)) { 1233 // "url" is already an absolute URL, ignore base URL. 1234 out->setTo(url); 1235 return true; 1236 } 1237 1238 size_t n = strlen(baseURL); 1239 if (baseURL[n - 1] == '/') { 1240 out->setTo(baseURL); 1241 out->append(url); 1242 } else { 1243 const char *slashPos = strrchr(baseURL, '/'); 1244 1245 if (slashPos > &baseURL[6]) { 1246 out->setTo(baseURL, slashPos - baseURL); 1247 } else { 1248 out->setTo(baseURL); 1249 } 1250 1251 out->append("/"); 1252 out->append(url); 1253 } 1254 1255 return true; 1256 } 1257 1258 void fakeTimestamps() { 1259 for (size_t i = 0; i < mTracks.size(); ++i) { 1260 onTimeUpdate(i, 0, 0ll); 1261 } 1262 } 1263 1264 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) { 1265 LOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx", 1266 trackIndex, rtpTime, ntpTime); 1267 1268 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 1269 1270 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1271 1272 track->mRTPAnchor = rtpTime; 1273 track->mNTPAnchorUs = ntpTimeUs; 1274 1275 if (mNTPAnchorUs < 0) { 1276 mNTPAnchorUs = ntpTimeUs; 1277 mMediaAnchorUs = mLastMediaTimeUs; 1278 } 1279 } 1280 1281 void onAccessUnitComplete( 1282 int32_t trackIndex, const sp<ABuffer> &accessUnit) { 1283 LOGV("onAccessUnitComplete track %d", trackIndex); 1284 1285 if (mFirstAccessUnit) { 1286 mDoneMsg->setInt32("result", OK); 1287 mDoneMsg->post(); 1288 mDoneMsg = NULL; 1289 1290 mFirstAccessUnit = false; 1291 } 1292 1293 TrackInfo *track = &mTracks.editItemAt(trackIndex); 1294 1295 if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) { 1296 LOGV("storing accessUnit, no time established yet"); 1297 track->mPackets.push_back(accessUnit); 1298 return; 1299 } 1300 1301 while (!track->mPackets.empty()) { 1302 sp<ABuffer> accessUnit = *track->mPackets.begin(); 1303 track->mPackets.erase(track->mPackets.begin()); 1304 1305 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1306 track->mPacketSource->queueAccessUnit(accessUnit); 1307 } 1308 } 1309 1310 if (addMediaTimestamp(trackIndex, track, accessUnit)) { 1311 track->mPacketSource->queueAccessUnit(accessUnit); 1312 } 1313 } 1314 1315 bool addMediaTimestamp( 1316 int32_t trackIndex, const TrackInfo *track, 1317 const sp<ABuffer> &accessUnit) { 1318 uint32_t rtpTime; 1319 CHECK(accessUnit->meta()->findInt32( 1320 "rtp-time", (int32_t *)&rtpTime)); 1321 1322 int64_t relRtpTimeUs = 1323 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll) 1324 / track->mTimeScale; 1325 1326 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs; 1327 1328 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs; 1329 1330 if (mediaTimeUs > mLastMediaTimeUs) { 1331 mLastMediaTimeUs = mediaTimeUs; 1332 } 1333 1334 if (mediaTimeUs < 0) { 1335 LOGV("dropping early accessUnit."); 1336 return false; 1337 } 1338 1339 LOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)", 1340 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6); 1341 1342 accessUnit->meta()->setInt64("timeUs", mediaTimeUs); 1343 1344 return true; 1345 } 1346 1347 1348 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 1349 }; 1350 1351 } // namespace android 1352 1353 #endif // MY_HANDLER_H_ 1354