1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "ARTPConnection" 19 #include <utils/Log.h> 20 21 #include "ARTPAssembler.h" 22 #include "ARTPConnection.h" 23 24 #include "ARTPSource.h" 25 #include "ASessionDescription.h" 26 27 #include <media/stagefright/foundation/ABuffer.h> 28 #include <media/stagefright/foundation/ADebug.h> 29 #include <media/stagefright/foundation/AMessage.h> 30 #include <media/stagefright/foundation/AString.h> 31 #include <media/stagefright/foundation/hexdump.h> 32 33 #include <arpa/inet.h> 34 #include <sys/socket.h> 35 36 namespace android { 37 38 static const size_t kMaxUDPSize = 1500; 39 40 static uint16_t u16at(const uint8_t *data) { 41 return data[0] << 8 | data[1]; 42 } 43 44 static uint32_t u32at(const uint8_t *data) { 45 return u16at(data) << 16 | u16at(&data[2]); 46 } 47 48 static uint64_t u64at(const uint8_t *data) { 49 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); 50 } 51 52 // static 53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; 54 55 struct ARTPConnection::StreamInfo { 56 int mRTPSocket; 57 int mRTCPSocket; 58 sp<ASessionDescription> mSessionDesc; 59 size_t mIndex; 60 sp<AMessage> mNotifyMsg; 61 KeyedVector<uint32_t, sp<ARTPSource> > mSources; 62 63 int64_t mNumRTCPPacketsReceived; 64 int64_t mNumRTPPacketsReceived; 65 struct sockaddr_in mRemoteRTCPAddr; 66 67 bool mIsInjected; 68 }; 69 70 ARTPConnection::ARTPConnection(uint32_t flags) 71 : mFlags(flags), 72 mPollEventPending(false), 73 mLastReceiverReportTimeUs(-1) { 74 } 75 76 ARTPConnection::~ARTPConnection() { 77 } 78 79 void ARTPConnection::addStream( 80 int rtpSocket, int rtcpSocket, 81 const sp<ASessionDescription> &sessionDesc, 82 size_t index, 83 const sp<AMessage> ¬ify, 84 bool injected) { 85 sp<AMessage> msg = new AMessage(kWhatAddStream, id()); 86 msg->setInt32("rtp-socket", rtpSocket); 87 msg->setInt32("rtcp-socket", rtcpSocket); 88 msg->setObject("session-desc", sessionDesc); 89 msg->setSize("index", index); 90 msg->setMessage("notify", notify); 91 msg->setInt32("injected", injected); 92 msg->post(); 93 } 94 95 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { 96 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); 97 msg->setInt32("rtp-socket", rtpSocket); 98 msg->setInt32("rtcp-socket", rtcpSocket); 99 msg->post(); 100 } 101 102 static void bumpSocketBufferSize(int s) { 103 int size = 256 * 1024; 104 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); 105 } 106 107 // static 108 void ARTPConnection::MakePortPair( 109 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { 110 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); 111 CHECK_GE(*rtpSocket, 0); 112 113 bumpSocketBufferSize(*rtpSocket); 114 115 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); 116 CHECK_GE(*rtcpSocket, 0); 117 118 bumpSocketBufferSize(*rtcpSocket); 119 120 /* rand() * 1000 may overflow int type, use long long */ 121 unsigned start = (unsigned)((rand()* 1000ll)/RAND_MAX) + 15550; 122 start &= ~1; 123 124 for (unsigned port = start; port < 65536; port += 2) { 125 struct sockaddr_in addr; 126 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 127 addr.sin_family = AF_INET; 128 addr.sin_addr.s_addr = htonl(INADDR_ANY); 129 addr.sin_port = htons(port); 130 131 if (bind(*rtpSocket, 132 (const struct sockaddr *)&addr, sizeof(addr)) < 0) { 133 continue; 134 } 135 136 addr.sin_port = htons(port + 1); 137 138 if (bind(*rtcpSocket, 139 (const struct sockaddr *)&addr, sizeof(addr)) == 0) { 140 *rtpPort = port; 141 return; 142 } 143 } 144 145 TRESPASS(); 146 } 147 148 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { 149 switch (msg->what()) { 150 case kWhatAddStream: 151 { 152 onAddStream(msg); 153 break; 154 } 155 156 case kWhatRemoveStream: 157 { 158 onRemoveStream(msg); 159 break; 160 } 161 162 case kWhatPollStreams: 163 { 164 onPollStreams(); 165 break; 166 } 167 168 case kWhatInjectPacket: 169 { 170 onInjectPacket(msg); 171 break; 172 } 173 174 default: 175 { 176 TRESPASS(); 177 break; 178 } 179 } 180 } 181 182 void ARTPConnection::onAddStream(const sp<AMessage> &msg) { 183 mStreams.push_back(StreamInfo()); 184 StreamInfo *info = &*--mStreams.end(); 185 186 int32_t s; 187 CHECK(msg->findInt32("rtp-socket", &s)); 188 info->mRTPSocket = s; 189 CHECK(msg->findInt32("rtcp-socket", &s)); 190 info->mRTCPSocket = s; 191 192 int32_t injected; 193 CHECK(msg->findInt32("injected", &injected)); 194 195 info->mIsInjected = injected; 196 197 sp<RefBase> obj; 198 CHECK(msg->findObject("session-desc", &obj)); 199 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); 200 201 CHECK(msg->findSize("index", &info->mIndex)); 202 CHECK(msg->findMessage("notify", &info->mNotifyMsg)); 203 204 info->mNumRTCPPacketsReceived = 0; 205 info->mNumRTPPacketsReceived = 0; 206 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); 207 208 if (!injected) { 209 postPollEvent(); 210 } 211 } 212 213 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { 214 int32_t rtpSocket, rtcpSocket; 215 CHECK(msg->findInt32("rtp-socket", &rtpSocket)); 216 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); 217 218 List<StreamInfo>::iterator it = mStreams.begin(); 219 while (it != mStreams.end() 220 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { 221 ++it; 222 } 223 224 if (it == mStreams.end()) { 225 return; 226 } 227 228 mStreams.erase(it); 229 } 230 231 void ARTPConnection::postPollEvent() { 232 if (mPollEventPending) { 233 return; 234 } 235 236 sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); 237 msg->post(); 238 239 mPollEventPending = true; 240 } 241 242 void ARTPConnection::onPollStreams() { 243 mPollEventPending = false; 244 245 if (mStreams.empty()) { 246 return; 247 } 248 249 struct timeval tv; 250 tv.tv_sec = 0; 251 tv.tv_usec = kSelectTimeoutUs; 252 253 fd_set rs; 254 FD_ZERO(&rs); 255 256 int maxSocket = -1; 257 for (List<StreamInfo>::iterator it = mStreams.begin(); 258 it != mStreams.end(); ++it) { 259 if ((*it).mIsInjected) { 260 continue; 261 } 262 263 FD_SET(it->mRTPSocket, &rs); 264 FD_SET(it->mRTCPSocket, &rs); 265 266 if (it->mRTPSocket > maxSocket) { 267 maxSocket = it->mRTPSocket; 268 } 269 if (it->mRTCPSocket > maxSocket) { 270 maxSocket = it->mRTCPSocket; 271 } 272 } 273 274 if (maxSocket == -1) { 275 return; 276 } 277 278 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); 279 280 if (res > 0) { 281 List<StreamInfo>::iterator it = mStreams.begin(); 282 while (it != mStreams.end()) { 283 if ((*it).mIsInjected) { 284 ++it; 285 continue; 286 } 287 288 status_t err = OK; 289 if (FD_ISSET(it->mRTPSocket, &rs)) { 290 err = receive(&*it, true); 291 } 292 if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) { 293 err = receive(&*it, false); 294 } 295 296 if (err == -ECONNRESET) { 297 // socket failure, this stream is dead, Jim. 298 299 ALOGW("failed to receive RTP/RTCP datagram."); 300 it = mStreams.erase(it); 301 continue; 302 } 303 304 ++it; 305 } 306 } 307 308 int64_t nowUs = ALooper::GetNowUs(); 309 if (mLastReceiverReportTimeUs <= 0 310 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) { 311 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); 312 List<StreamInfo>::iterator it = mStreams.begin(); 313 while (it != mStreams.end()) { 314 StreamInfo *s = &*it; 315 316 if (s->mIsInjected) { 317 ++it; 318 continue; 319 } 320 321 if (s->mNumRTCPPacketsReceived == 0) { 322 // We have never received any RTCP packets on this stream, 323 // we don't even know where to send a report. 324 ++it; 325 continue; 326 } 327 328 buffer->setRange(0, 0); 329 330 for (size_t i = 0; i < s->mSources.size(); ++i) { 331 sp<ARTPSource> source = s->mSources.valueAt(i); 332 333 source->addReceiverReport(buffer); 334 335 if (mFlags & kRegularlyRequestFIR) { 336 source->addFIR(buffer); 337 } 338 } 339 340 if (buffer->size() > 0) { 341 ALOGV("Sending RR..."); 342 343 ssize_t n; 344 do { 345 n = sendto( 346 s->mRTCPSocket, buffer->data(), buffer->size(), 0, 347 (const struct sockaddr *)&s->mRemoteRTCPAddr, 348 sizeof(s->mRemoteRTCPAddr)); 349 } while (n < 0 && errno == EINTR); 350 351 if (n <= 0) { 352 ALOGW("failed to send RTCP receiver report (%s).", 353 n == 0 ? "connection gone" : strerror(errno)); 354 355 it = mStreams.erase(it); 356 continue; 357 } 358 359 CHECK_EQ(n, (ssize_t)buffer->size()); 360 361 mLastReceiverReportTimeUs = nowUs; 362 } 363 364 ++it; 365 } 366 } 367 368 if (!mStreams.empty()) { 369 postPollEvent(); 370 } 371 } 372 373 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { 374 ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP"); 375 376 CHECK(!s->mIsInjected); 377 378 sp<ABuffer> buffer = new ABuffer(65536); 379 380 socklen_t remoteAddrLen = 381 (!receiveRTP && s->mNumRTCPPacketsReceived == 0) 382 ? sizeof(s->mRemoteRTCPAddr) : 0; 383 384 ssize_t nbytes; 385 do { 386 nbytes = recvfrom( 387 receiveRTP ? s->mRTPSocket : s->mRTCPSocket, 388 buffer->data(), 389 buffer->capacity(), 390 0, 391 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, 392 remoteAddrLen > 0 ? &remoteAddrLen : NULL); 393 } while (nbytes < 0 && errno == EINTR); 394 395 if (nbytes <= 0) { 396 return -ECONNRESET; 397 } 398 399 buffer->setRange(0, nbytes); 400 401 // ALOGI("received %d bytes.", buffer->size()); 402 403 status_t err; 404 if (receiveRTP) { 405 err = parseRTP(s, buffer); 406 } else { 407 err = parseRTCP(s, buffer); 408 } 409 410 return err; 411 } 412 413 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { 414 if (s->mNumRTPPacketsReceived++ == 0) { 415 sp<AMessage> notify = s->mNotifyMsg->dup(); 416 notify->setInt32("first-rtp", true); 417 notify->post(); 418 } 419 420 size_t size = buffer->size(); 421 422 if (size < 12) { 423 // Too short to be a valid RTP header. 424 return -1; 425 } 426 427 const uint8_t *data = buffer->data(); 428 429 if ((data[0] >> 6) != 2) { 430 // Unsupported version. 431 return -1; 432 } 433 434 if (data[0] & 0x20) { 435 // Padding present. 436 437 size_t paddingLength = data[size - 1]; 438 439 if (paddingLength + 12 > size) { 440 // If we removed this much padding we'd end up with something 441 // that's too short to be a valid RTP header. 442 return -1; 443 } 444 445 size -= paddingLength; 446 } 447 448 int numCSRCs = data[0] & 0x0f; 449 450 size_t payloadOffset = 12 + 4 * numCSRCs; 451 452 if (size < payloadOffset) { 453 // Not enough data to fit the basic header and all the CSRC entries. 454 return -1; 455 } 456 457 if (data[0] & 0x10) { 458 // Header eXtension present. 459 460 if (size < payloadOffset + 4) { 461 // Not enough data to fit the basic header, all CSRC entries 462 // and the first 4 bytes of the extension header. 463 464 return -1; 465 } 466 467 const uint8_t *extensionData = &data[payloadOffset]; 468 469 size_t extensionLength = 470 4 * (extensionData[2] << 8 | extensionData[3]); 471 472 if (size < payloadOffset + 4 + extensionLength) { 473 return -1; 474 } 475 476 payloadOffset += 4 + extensionLength; 477 } 478 479 uint32_t srcId = u32at(&data[8]); 480 481 sp<ARTPSource> source = findSource(s, srcId); 482 483 uint32_t rtpTime = u32at(&data[4]); 484 485 sp<AMessage> meta = buffer->meta(); 486 meta->setInt32("ssrc", srcId); 487 meta->setInt32("rtp-time", rtpTime); 488 meta->setInt32("PT", data[1] & 0x7f); 489 meta->setInt32("M", data[1] >> 7); 490 491 buffer->setInt32Data(u16at(&data[2])); 492 buffer->setRange(payloadOffset, size - payloadOffset); 493 494 source->processRTPPacket(buffer); 495 496 return OK; 497 } 498 499 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { 500 if (s->mNumRTCPPacketsReceived++ == 0) { 501 sp<AMessage> notify = s->mNotifyMsg->dup(); 502 notify->setInt32("first-rtcp", true); 503 notify->post(); 504 } 505 506 const uint8_t *data = buffer->data(); 507 size_t size = buffer->size(); 508 509 while (size > 0) { 510 if (size < 8) { 511 // Too short to be a valid RTCP header 512 return -1; 513 } 514 515 if ((data[0] >> 6) != 2) { 516 // Unsupported version. 517 return -1; 518 } 519 520 if (data[0] & 0x20) { 521 // Padding present. 522 523 size_t paddingLength = data[size - 1]; 524 525 if (paddingLength + 12 > size) { 526 // If we removed this much padding we'd end up with something 527 // that's too short to be a valid RTP header. 528 return -1; 529 } 530 531 size -= paddingLength; 532 } 533 534 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 535 536 if (size < headerLength) { 537 // Only received a partial packet? 538 return -1; 539 } 540 541 switch (data[1]) { 542 case 200: 543 { 544 parseSR(s, data, headerLength); 545 break; 546 } 547 548 case 201: // RR 549 case 202: // SDES 550 case 204: // APP 551 break; 552 553 case 205: // TSFB (transport layer specific feedback) 554 case 206: // PSFB (payload specific feedback) 555 // hexdump(data, headerLength); 556 break; 557 558 case 203: 559 { 560 parseBYE(s, data, headerLength); 561 break; 562 } 563 564 default: 565 { 566 ALOGW("Unknown RTCP packet type %u of size %d", 567 (unsigned)data[1], headerLength); 568 break; 569 } 570 } 571 572 data += headerLength; 573 size -= headerLength; 574 } 575 576 return OK; 577 } 578 579 status_t ARTPConnection::parseBYE( 580 StreamInfo *s, const uint8_t *data, size_t size) { 581 size_t SC = data[0] & 0x3f; 582 583 if (SC == 0 || size < (4 + SC * 4)) { 584 // Packet too short for the minimal BYE header. 585 return -1; 586 } 587 588 uint32_t id = u32at(&data[4]); 589 590 sp<ARTPSource> source = findSource(s, id); 591 592 source->byeReceived(); 593 594 return OK; 595 } 596 597 status_t ARTPConnection::parseSR( 598 StreamInfo *s, const uint8_t *data, size_t size) { 599 size_t RC = data[0] & 0x1f; 600 601 if (size < (7 + RC * 6) * 4) { 602 // Packet too short for the minimal SR header. 603 return -1; 604 } 605 606 uint32_t id = u32at(&data[4]); 607 uint64_t ntpTime = u64at(&data[8]); 608 uint32_t rtpTime = u32at(&data[16]); 609 610 #if 0 611 ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f", 612 id, 613 rtpTime, 614 (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32)); 615 #endif 616 617 sp<ARTPSource> source = findSource(s, id); 618 619 source->timeUpdate(rtpTime, ntpTime); 620 621 return 0; 622 } 623 624 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { 625 sp<ARTPSource> source; 626 ssize_t index = info->mSources.indexOfKey(srcId); 627 if (index < 0) { 628 index = info->mSources.size(); 629 630 source = new ARTPSource( 631 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg); 632 633 info->mSources.add(srcId, source); 634 } else { 635 source = info->mSources.valueAt(index); 636 } 637 638 return source; 639 } 640 641 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) { 642 sp<AMessage> msg = new AMessage(kWhatInjectPacket, id()); 643 msg->setInt32("index", index); 644 msg->setBuffer("buffer", buffer); 645 msg->post(); 646 } 647 648 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) { 649 int32_t index; 650 CHECK(msg->findInt32("index", &index)); 651 652 sp<ABuffer> buffer; 653 CHECK(msg->findBuffer("buffer", &buffer)); 654 655 List<StreamInfo>::iterator it = mStreams.begin(); 656 while (it != mStreams.end() 657 && it->mRTPSocket != index && it->mRTCPSocket != index) { 658 ++it; 659 } 660 661 if (it == mStreams.end()) { 662 TRESPASS(); 663 } 664 665 StreamInfo *s = &*it; 666 667 status_t err; 668 if (it->mRTPSocket == index) { 669 err = parseRTP(s, buffer); 670 } else { 671 err = parseRTCP(s, buffer); 672 } 673 } 674 675 } // namespace android 676 677