1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "ARTPConnection" 19 #include <utils/Log.h> 20 21 #include "ARTPAssembler.h" 22 #include "ARTPConnection.h" 23 24 #include "ARTPSource.h" 25 #include "ASessionDescription.h" 26 27 #include <media/stagefright/foundation/ABuffer.h> 28 #include <media/stagefright/foundation/ADebug.h> 29 #include <media/stagefright/foundation/AMessage.h> 30 #include <media/stagefright/foundation/AString.h> 31 #include <media/stagefright/foundation/hexdump.h> 32 33 #include <arpa/inet.h> 34 #include <sys/socket.h> 35 36 namespace android { 37 38 static const size_t kMaxUDPSize = 1500; 39 40 static uint16_t u16at(const uint8_t *data) { 41 return data[0] << 8 | data[1]; 42 } 43 44 static uint32_t u32at(const uint8_t *data) { 45 return u16at(data) << 16 | u16at(&data[2]); 46 } 47 48 static uint64_t u64at(const uint8_t *data) { 49 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]); 50 } 51 52 // static 53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll; 54 55 struct ARTPConnection::StreamInfo { 56 int mRTPSocket; 57 int mRTCPSocket; 58 sp<ASessionDescription> mSessionDesc; 59 size_t mIndex; 60 sp<AMessage> mNotifyMsg; 61 KeyedVector<uint32_t, sp<ARTPSource> > mSources; 62 63 int64_t mNumRTCPPacketsReceived; 64 int64_t mNumRTPPacketsReceived; 65 struct sockaddr_in mRemoteRTCPAddr; 66 67 bool mIsInjected; 68 }; 69 70 ARTPConnection::ARTPConnection(uint32_t flags) 71 : mFlags(flags), 72 mPollEventPending(false), 73 mLastReceiverReportTimeUs(-1) { 74 } 75 76 ARTPConnection::~ARTPConnection() { 77 } 78 79 void ARTPConnection::addStream( 80 int rtpSocket, int rtcpSocket, 81 const sp<ASessionDescription> &sessionDesc, 82 size_t index, 83 const sp<AMessage> ¬ify, 84 bool injected) { 85 sp<AMessage> msg = new AMessage(kWhatAddStream, id()); 86 msg->setInt32("rtp-socket", rtpSocket); 87 msg->setInt32("rtcp-socket", rtcpSocket); 88 msg->setObject("session-desc", sessionDesc); 89 msg->setSize("index", index); 90 msg->setMessage("notify", notify); 91 msg->setInt32("injected", injected); 92 msg->post(); 93 } 94 95 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) { 96 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id()); 97 msg->setInt32("rtp-socket", rtpSocket); 98 msg->setInt32("rtcp-socket", rtcpSocket); 99 msg->post(); 100 } 101 102 static void bumpSocketBufferSize(int s) { 103 int size = 256 * 1024; 104 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0); 105 } 106 107 // static 108 void ARTPConnection::MakePortPair( 109 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) { 110 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0); 111 CHECK_GE(*rtpSocket, 0); 112 113 bumpSocketBufferSize(*rtpSocket); 114 115 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0); 116 CHECK_GE(*rtcpSocket, 0); 117 118 bumpSocketBufferSize(*rtcpSocket); 119 120 unsigned start = (rand() * 1000)/ RAND_MAX + 15550; 121 start &= ~1; 122 123 for (unsigned port = start; port < 65536; port += 2) { 124 struct sockaddr_in addr; 125 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 126 addr.sin_family = AF_INET; 127 addr.sin_addr.s_addr = htonl(INADDR_ANY); 128 addr.sin_port = htons(port); 129 130 if (bind(*rtpSocket, 131 (const struct sockaddr *)&addr, sizeof(addr)) < 0) { 132 continue; 133 } 134 135 addr.sin_port = htons(port + 1); 136 137 if (bind(*rtcpSocket, 138 (const struct sockaddr *)&addr, sizeof(addr)) == 0) { 139 *rtpPort = port; 140 return; 141 } 142 } 143 144 TRESPASS(); 145 } 146 147 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) { 148 switch (msg->what()) { 149 case kWhatAddStream: 150 { 151 onAddStream(msg); 152 break; 153 } 154 155 case kWhatRemoveStream: 156 { 157 onRemoveStream(msg); 158 break; 159 } 160 161 case kWhatPollStreams: 162 { 163 onPollStreams(); 164 break; 165 } 166 167 case kWhatInjectPacket: 168 { 169 onInjectPacket(msg); 170 break; 171 } 172 173 default: 174 { 175 TRESPASS(); 176 break; 177 } 178 } 179 } 180 181 void ARTPConnection::onAddStream(const sp<AMessage> &msg) { 182 mStreams.push_back(StreamInfo()); 183 StreamInfo *info = &*--mStreams.end(); 184 185 int32_t s; 186 CHECK(msg->findInt32("rtp-socket", &s)); 187 info->mRTPSocket = s; 188 CHECK(msg->findInt32("rtcp-socket", &s)); 189 info->mRTCPSocket = s; 190 191 int32_t injected; 192 CHECK(msg->findInt32("injected", &injected)); 193 194 info->mIsInjected = injected; 195 196 sp<RefBase> obj; 197 CHECK(msg->findObject("session-desc", &obj)); 198 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); 199 200 CHECK(msg->findSize("index", &info->mIndex)); 201 CHECK(msg->findMessage("notify", &info->mNotifyMsg)); 202 203 info->mNumRTCPPacketsReceived = 0; 204 info->mNumRTPPacketsReceived = 0; 205 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); 206 207 if (!injected) { 208 postPollEvent(); 209 } 210 } 211 212 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { 213 int32_t rtpSocket, rtcpSocket; 214 CHECK(msg->findInt32("rtp-socket", &rtpSocket)); 215 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket)); 216 217 List<StreamInfo>::iterator it = mStreams.begin(); 218 while (it != mStreams.end() 219 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) { 220 ++it; 221 } 222 223 if (it == mStreams.end()) { 224 return; 225 } 226 227 mStreams.erase(it); 228 } 229 230 void ARTPConnection::postPollEvent() { 231 if (mPollEventPending) { 232 return; 233 } 234 235 sp<AMessage> msg = new AMessage(kWhatPollStreams, id()); 236 msg->post(); 237 238 mPollEventPending = true; 239 } 240 241 void ARTPConnection::onPollStreams() { 242 mPollEventPending = false; 243 244 if (mStreams.empty()) { 245 return; 246 } 247 248 struct timeval tv; 249 tv.tv_sec = 0; 250 tv.tv_usec = kSelectTimeoutUs; 251 252 fd_set rs; 253 FD_ZERO(&rs); 254 255 int maxSocket = -1; 256 for (List<StreamInfo>::iterator it = mStreams.begin(); 257 it != mStreams.end(); ++it) { 258 if ((*it).mIsInjected) { 259 continue; 260 } 261 262 FD_SET(it->mRTPSocket, &rs); 263 FD_SET(it->mRTCPSocket, &rs); 264 265 if (it->mRTPSocket > maxSocket) { 266 maxSocket = it->mRTPSocket; 267 } 268 if (it->mRTCPSocket > maxSocket) { 269 maxSocket = it->mRTCPSocket; 270 } 271 } 272 273 if (maxSocket == -1) { 274 return; 275 } 276 277 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv); 278 279 if (res > 0) { 280 List<StreamInfo>::iterator it = mStreams.begin(); 281 while (it != mStreams.end()) { 282 if ((*it).mIsInjected) { 283 ++it; 284 continue; 285 } 286 287 status_t err = OK; 288 if (FD_ISSET(it->mRTPSocket, &rs)) { 289 err = receive(&*it, true); 290 } 291 if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) { 292 err = receive(&*it, false); 293 } 294 295 if (err == -ECONNRESET) { 296 // socket failure, this stream is dead, Jim. 297 298 ALOGW("failed to receive RTP/RTCP datagram."); 299 it = mStreams.erase(it); 300 continue; 301 } 302 303 ++it; 304 } 305 } 306 307 int64_t nowUs = ALooper::GetNowUs(); 308 if (mLastReceiverReportTimeUs <= 0 309 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) { 310 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize); 311 List<StreamInfo>::iterator it = mStreams.begin(); 312 while (it != mStreams.end()) { 313 StreamInfo *s = &*it; 314 315 if (s->mIsInjected) { 316 ++it; 317 continue; 318 } 319 320 if (s->mNumRTCPPacketsReceived == 0) { 321 // We have never received any RTCP packets on this stream, 322 // we don't even know where to send a report. 323 ++it; 324 continue; 325 } 326 327 buffer->setRange(0, 0); 328 329 for (size_t i = 0; i < s->mSources.size(); ++i) { 330 sp<ARTPSource> source = s->mSources.valueAt(i); 331 332 source->addReceiverReport(buffer); 333 334 if (mFlags & kRegularlyRequestFIR) { 335 source->addFIR(buffer); 336 } 337 } 338 339 if (buffer->size() > 0) { 340 ALOGV("Sending RR..."); 341 342 ssize_t n; 343 do { 344 n = sendto( 345 s->mRTCPSocket, buffer->data(), buffer->size(), 0, 346 (const struct sockaddr *)&s->mRemoteRTCPAddr, 347 sizeof(s->mRemoteRTCPAddr)); 348 } while (n < 0 && errno == EINTR); 349 350 if (n <= 0) { 351 ALOGW("failed to send RTCP receiver report (%s).", 352 n == 0 ? "connection gone" : strerror(errno)); 353 354 it = mStreams.erase(it); 355 continue; 356 } 357 358 CHECK_EQ(n, (ssize_t)buffer->size()); 359 360 mLastReceiverReportTimeUs = nowUs; 361 } 362 363 ++it; 364 } 365 } 366 367 if (!mStreams.empty()) { 368 postPollEvent(); 369 } 370 } 371 372 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { 373 ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP"); 374 375 CHECK(!s->mIsInjected); 376 377 sp<ABuffer> buffer = new ABuffer(65536); 378 379 socklen_t remoteAddrLen = 380 (!receiveRTP && s->mNumRTCPPacketsReceived == 0) 381 ? sizeof(s->mRemoteRTCPAddr) : 0; 382 383 ssize_t nbytes; 384 do { 385 nbytes = recvfrom( 386 receiveRTP ? s->mRTPSocket : s->mRTCPSocket, 387 buffer->data(), 388 buffer->capacity(), 389 0, 390 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL, 391 remoteAddrLen > 0 ? &remoteAddrLen : NULL); 392 } while (nbytes < 0 && errno == EINTR); 393 394 if (nbytes <= 0) { 395 return -ECONNRESET; 396 } 397 398 buffer->setRange(0, nbytes); 399 400 // ALOGI("received %d bytes.", buffer->size()); 401 402 status_t err; 403 if (receiveRTP) { 404 err = parseRTP(s, buffer); 405 } else { 406 err = parseRTCP(s, buffer); 407 } 408 409 return err; 410 } 411 412 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) { 413 if (s->mNumRTPPacketsReceived++ == 0) { 414 sp<AMessage> notify = s->mNotifyMsg->dup(); 415 notify->setInt32("first-rtp", true); 416 notify->post(); 417 } 418 419 size_t size = buffer->size(); 420 421 if (size < 12) { 422 // Too short to be a valid RTP header. 423 return -1; 424 } 425 426 const uint8_t *data = buffer->data(); 427 428 if ((data[0] >> 6) != 2) { 429 // Unsupported version. 430 return -1; 431 } 432 433 if (data[0] & 0x20) { 434 // Padding present. 435 436 size_t paddingLength = data[size - 1]; 437 438 if (paddingLength + 12 > size) { 439 // If we removed this much padding we'd end up with something 440 // that's too short to be a valid RTP header. 441 return -1; 442 } 443 444 size -= paddingLength; 445 } 446 447 int numCSRCs = data[0] & 0x0f; 448 449 size_t payloadOffset = 12 + 4 * numCSRCs; 450 451 if (size < payloadOffset) { 452 // Not enough data to fit the basic header and all the CSRC entries. 453 return -1; 454 } 455 456 if (data[0] & 0x10) { 457 // Header eXtension present. 458 459 if (size < payloadOffset + 4) { 460 // Not enough data to fit the basic header, all CSRC entries 461 // and the first 4 bytes of the extension header. 462 463 return -1; 464 } 465 466 const uint8_t *extensionData = &data[payloadOffset]; 467 468 size_t extensionLength = 469 4 * (extensionData[2] << 8 | extensionData[3]); 470 471 if (size < payloadOffset + 4 + extensionLength) { 472 return -1; 473 } 474 475 payloadOffset += 4 + extensionLength; 476 } 477 478 uint32_t srcId = u32at(&data[8]); 479 480 sp<ARTPSource> source = findSource(s, srcId); 481 482 uint32_t rtpTime = u32at(&data[4]); 483 484 sp<AMessage> meta = buffer->meta(); 485 meta->setInt32("ssrc", srcId); 486 meta->setInt32("rtp-time", rtpTime); 487 meta->setInt32("PT", data[1] & 0x7f); 488 meta->setInt32("M", data[1] >> 7); 489 490 buffer->setInt32Data(u16at(&data[2])); 491 buffer->setRange(payloadOffset, size - payloadOffset); 492 493 source->processRTPPacket(buffer); 494 495 return OK; 496 } 497 498 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) { 499 if (s->mNumRTCPPacketsReceived++ == 0) { 500 sp<AMessage> notify = s->mNotifyMsg->dup(); 501 notify->setInt32("first-rtcp", true); 502 notify->post(); 503 } 504 505 const uint8_t *data = buffer->data(); 506 size_t size = buffer->size(); 507 508 while (size > 0) { 509 if (size < 8) { 510 // Too short to be a valid RTCP header 511 return -1; 512 } 513 514 if ((data[0] >> 6) != 2) { 515 // Unsupported version. 516 return -1; 517 } 518 519 if (data[0] & 0x20) { 520 // Padding present. 521 522 size_t paddingLength = data[size - 1]; 523 524 if (paddingLength + 12 > size) { 525 // If we removed this much padding we'd end up with something 526 // that's too short to be a valid RTP header. 527 return -1; 528 } 529 530 size -= paddingLength; 531 } 532 533 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 534 535 if (size < headerLength) { 536 // Only received a partial packet? 537 return -1; 538 } 539 540 switch (data[1]) { 541 case 200: 542 { 543 parseSR(s, data, headerLength); 544 break; 545 } 546 547 case 201: // RR 548 case 202: // SDES 549 case 204: // APP 550 break; 551 552 case 205: // TSFB (transport layer specific feedback) 553 case 206: // PSFB (payload specific feedback) 554 // hexdump(data, headerLength); 555 break; 556 557 case 203: 558 { 559 parseBYE(s, data, headerLength); 560 break; 561 } 562 563 default: 564 { 565 ALOGW("Unknown RTCP packet type %u of size %d", 566 (unsigned)data[1], headerLength); 567 break; 568 } 569 } 570 571 data += headerLength; 572 size -= headerLength; 573 } 574 575 return OK; 576 } 577 578 status_t ARTPConnection::parseBYE( 579 StreamInfo *s, const uint8_t *data, size_t size) { 580 size_t SC = data[0] & 0x3f; 581 582 if (SC == 0 || size < (4 + SC * 4)) { 583 // Packet too short for the minimal BYE header. 584 return -1; 585 } 586 587 uint32_t id = u32at(&data[4]); 588 589 sp<ARTPSource> source = findSource(s, id); 590 591 source->byeReceived(); 592 593 return OK; 594 } 595 596 status_t ARTPConnection::parseSR( 597 StreamInfo *s, const uint8_t *data, size_t size) { 598 size_t RC = data[0] & 0x1f; 599 600 if (size < (7 + RC * 6) * 4) { 601 // Packet too short for the minimal SR header. 602 return -1; 603 } 604 605 uint32_t id = u32at(&data[4]); 606 uint64_t ntpTime = u64at(&data[8]); 607 uint32_t rtpTime = u32at(&data[16]); 608 609 #if 0 610 ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f", 611 id, 612 rtpTime, 613 (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32)); 614 #endif 615 616 sp<ARTPSource> source = findSource(s, id); 617 618 source->timeUpdate(rtpTime, ntpTime); 619 620 return 0; 621 } 622 623 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) { 624 sp<ARTPSource> source; 625 ssize_t index = info->mSources.indexOfKey(srcId); 626 if (index < 0) { 627 index = info->mSources.size(); 628 629 source = new ARTPSource( 630 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg); 631 632 info->mSources.add(srcId, source); 633 } else { 634 source = info->mSources.valueAt(index); 635 } 636 637 return source; 638 } 639 640 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) { 641 sp<AMessage> msg = new AMessage(kWhatInjectPacket, id()); 642 msg->setInt32("index", index); 643 msg->setBuffer("buffer", buffer); 644 msg->post(); 645 } 646 647 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) { 648 int32_t index; 649 CHECK(msg->findInt32("index", &index)); 650 651 sp<ABuffer> buffer; 652 CHECK(msg->findBuffer("buffer", &buffer)); 653 654 List<StreamInfo>::iterator it = mStreams.begin(); 655 while (it != mStreams.end() 656 && it->mRTPSocket != index && it->mRTCPSocket != index) { 657 ++it; 658 } 659 660 if (it == mStreams.end()) { 661 TRESPASS(); 662 } 663 664 StreamInfo *s = &*it; 665 666 status_t err; 667 if (it->mRTPSocket == index) { 668 err = parseRTP(s, buffer); 669 } else { 670 err = parseRTCP(s, buffer); 671 } 672 } 673 674 } // namespace android 675 676