Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "ARTPConnection"
     19 #include <utils/Log.h>
     20 
     21 #include "ARTPAssembler.h"
     22 #include "ARTPConnection.h"
     23 
     24 #include "ARTPSource.h"
     25 #include "ASessionDescription.h"
     26 
     27 #include <media/stagefright/foundation/ABuffer.h>
     28 #include <media/stagefright/foundation/ADebug.h>
     29 #include <media/stagefright/foundation/AMessage.h>
     30 #include <media/stagefright/foundation/AString.h>
     31 #include <media/stagefright/foundation/hexdump.h>
     32 
     33 #include <arpa/inet.h>
     34 #include <sys/socket.h>
     35 
     36 namespace android {
     37 
     38 static const size_t kMaxUDPSize = 1500;
     39 
     40 static uint16_t u16at(const uint8_t *data) {
     41     return data[0] << 8 | data[1];
     42 }
     43 
     44 static uint32_t u32at(const uint8_t *data) {
     45     return u16at(data) << 16 | u16at(&data[2]);
     46 }
     47 
     48 static uint64_t u64at(const uint8_t *data) {
     49     return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
     50 }
     51 
     52 // static
     53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
     54 
     55 struct ARTPConnection::StreamInfo {
     56     int mRTPSocket;
     57     int mRTCPSocket;
     58     sp<ASessionDescription> mSessionDesc;
     59     size_t mIndex;
     60     sp<AMessage> mNotifyMsg;
     61     KeyedVector<uint32_t, sp<ARTPSource> > mSources;
     62 
     63     int64_t mNumRTCPPacketsReceived;
     64     int64_t mNumRTPPacketsReceived;
     65     struct sockaddr_in mRemoteRTCPAddr;
     66 
     67     bool mIsInjected;
     68 };
     69 
     70 ARTPConnection::ARTPConnection(uint32_t flags)
     71     : mFlags(flags),
     72       mPollEventPending(false),
     73       mLastReceiverReportTimeUs(-1) {
     74 }
     75 
     76 ARTPConnection::~ARTPConnection() {
     77 }
     78 
     79 void ARTPConnection::addStream(
     80         int rtpSocket, int rtcpSocket,
     81         const sp<ASessionDescription> &sessionDesc,
     82         size_t index,
     83         const sp<AMessage> &notify,
     84         bool injected) {
     85     sp<AMessage> msg = new AMessage(kWhatAddStream, id());
     86     msg->setInt32("rtp-socket", rtpSocket);
     87     msg->setInt32("rtcp-socket", rtcpSocket);
     88     msg->setObject("session-desc", sessionDesc);
     89     msg->setSize("index", index);
     90     msg->setMessage("notify", notify);
     91     msg->setInt32("injected", injected);
     92     msg->post();
     93 }
     94 
     95 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
     96     sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
     97     msg->setInt32("rtp-socket", rtpSocket);
     98     msg->setInt32("rtcp-socket", rtcpSocket);
     99     msg->post();
    100 }
    101 
    102 static void bumpSocketBufferSize(int s) {
    103     int size = 256 * 1024;
    104     CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
    105 }
    106 
    107 // static
    108 void ARTPConnection::MakePortPair(
    109         int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
    110     *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
    111     CHECK_GE(*rtpSocket, 0);
    112 
    113     bumpSocketBufferSize(*rtpSocket);
    114 
    115     *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
    116     CHECK_GE(*rtcpSocket, 0);
    117 
    118     bumpSocketBufferSize(*rtcpSocket);
    119 
    120     unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
    121     start &= ~1;
    122 
    123     for (unsigned port = start; port < 65536; port += 2) {
    124         struct sockaddr_in addr;
    125         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    126         addr.sin_family = AF_INET;
    127         addr.sin_addr.s_addr = htonl(INADDR_ANY);
    128         addr.sin_port = htons(port);
    129 
    130         if (bind(*rtpSocket,
    131                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
    132             continue;
    133         }
    134 
    135         addr.sin_port = htons(port + 1);
    136 
    137         if (bind(*rtcpSocket,
    138                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
    139             *rtpPort = port;
    140             return;
    141         }
    142     }
    143 
    144     TRESPASS();
    145 }
    146 
    147 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
    148     switch (msg->what()) {
    149         case kWhatAddStream:
    150         {
    151             onAddStream(msg);
    152             break;
    153         }
    154 
    155         case kWhatRemoveStream:
    156         {
    157             onRemoveStream(msg);
    158             break;
    159         }
    160 
    161         case kWhatPollStreams:
    162         {
    163             onPollStreams();
    164             break;
    165         }
    166 
    167         case kWhatInjectPacket:
    168         {
    169             onInjectPacket(msg);
    170             break;
    171         }
    172 
    173         default:
    174         {
    175             TRESPASS();
    176             break;
    177         }
    178     }
    179 }
    180 
    181 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
    182     mStreams.push_back(StreamInfo());
    183     StreamInfo *info = &*--mStreams.end();
    184 
    185     int32_t s;
    186     CHECK(msg->findInt32("rtp-socket", &s));
    187     info->mRTPSocket = s;
    188     CHECK(msg->findInt32("rtcp-socket", &s));
    189     info->mRTCPSocket = s;
    190 
    191     int32_t injected;
    192     CHECK(msg->findInt32("injected", &injected));
    193 
    194     info->mIsInjected = injected;
    195 
    196     sp<RefBase> obj;
    197     CHECK(msg->findObject("session-desc", &obj));
    198     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
    199 
    200     CHECK(msg->findSize("index", &info->mIndex));
    201     CHECK(msg->findMessage("notify", &info->mNotifyMsg));
    202 
    203     info->mNumRTCPPacketsReceived = 0;
    204     info->mNumRTPPacketsReceived = 0;
    205     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
    206 
    207     if (!injected) {
    208         postPollEvent();
    209     }
    210 }
    211 
    212 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
    213     int32_t rtpSocket, rtcpSocket;
    214     CHECK(msg->findInt32("rtp-socket", &rtpSocket));
    215     CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
    216 
    217     List<StreamInfo>::iterator it = mStreams.begin();
    218     while (it != mStreams.end()
    219            && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
    220         ++it;
    221     }
    222 
    223     if (it == mStreams.end()) {
    224         return;
    225     }
    226 
    227     mStreams.erase(it);
    228 }
    229 
    230 void ARTPConnection::postPollEvent() {
    231     if (mPollEventPending) {
    232         return;
    233     }
    234 
    235     sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
    236     msg->post();
    237 
    238     mPollEventPending = true;
    239 }
    240 
    241 void ARTPConnection::onPollStreams() {
    242     mPollEventPending = false;
    243 
    244     if (mStreams.empty()) {
    245         return;
    246     }
    247 
    248     struct timeval tv;
    249     tv.tv_sec = 0;
    250     tv.tv_usec = kSelectTimeoutUs;
    251 
    252     fd_set rs;
    253     FD_ZERO(&rs);
    254 
    255     int maxSocket = -1;
    256     for (List<StreamInfo>::iterator it = mStreams.begin();
    257          it != mStreams.end(); ++it) {
    258         if ((*it).mIsInjected) {
    259             continue;
    260         }
    261 
    262         FD_SET(it->mRTPSocket, &rs);
    263         FD_SET(it->mRTCPSocket, &rs);
    264 
    265         if (it->mRTPSocket > maxSocket) {
    266             maxSocket = it->mRTPSocket;
    267         }
    268         if (it->mRTCPSocket > maxSocket) {
    269             maxSocket = it->mRTCPSocket;
    270         }
    271     }
    272 
    273     if (maxSocket == -1) {
    274         return;
    275     }
    276 
    277     int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
    278 
    279     if (res > 0) {
    280         List<StreamInfo>::iterator it = mStreams.begin();
    281         while (it != mStreams.end()) {
    282             if ((*it).mIsInjected) {
    283                 ++it;
    284                 continue;
    285             }
    286 
    287             status_t err = OK;
    288             if (FD_ISSET(it->mRTPSocket, &rs)) {
    289                 err = receive(&*it, true);
    290             }
    291             if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
    292                 err = receive(&*it, false);
    293             }
    294 
    295             if (err == -ECONNRESET) {
    296                 // socket failure, this stream is dead, Jim.
    297 
    298                 ALOGW("failed to receive RTP/RTCP datagram.");
    299                 it = mStreams.erase(it);
    300                 continue;
    301             }
    302 
    303             ++it;
    304         }
    305     }
    306 
    307     int64_t nowUs = ALooper::GetNowUs();
    308     if (mLastReceiverReportTimeUs <= 0
    309             || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
    310         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
    311         List<StreamInfo>::iterator it = mStreams.begin();
    312         while (it != mStreams.end()) {
    313             StreamInfo *s = &*it;
    314 
    315             if (s->mIsInjected) {
    316                 ++it;
    317                 continue;
    318             }
    319 
    320             if (s->mNumRTCPPacketsReceived == 0) {
    321                 // We have never received any RTCP packets on this stream,
    322                 // we don't even know where to send a report.
    323                 ++it;
    324                 continue;
    325             }
    326 
    327             buffer->setRange(0, 0);
    328 
    329             for (size_t i = 0; i < s->mSources.size(); ++i) {
    330                 sp<ARTPSource> source = s->mSources.valueAt(i);
    331 
    332                 source->addReceiverReport(buffer);
    333 
    334                 if (mFlags & kRegularlyRequestFIR) {
    335                     source->addFIR(buffer);
    336                 }
    337             }
    338 
    339             if (buffer->size() > 0) {
    340                 ALOGV("Sending RR...");
    341 
    342                 ssize_t n;
    343                 do {
    344                     n = sendto(
    345                         s->mRTCPSocket, buffer->data(), buffer->size(), 0,
    346                         (const struct sockaddr *)&s->mRemoteRTCPAddr,
    347                         sizeof(s->mRemoteRTCPAddr));
    348                 } while (n < 0 && errno == EINTR);
    349 
    350                 if (n <= 0) {
    351                     ALOGW("failed to send RTCP receiver report (%s).",
    352                          n == 0 ? "connection gone" : strerror(errno));
    353 
    354                     it = mStreams.erase(it);
    355                     continue;
    356                 }
    357 
    358                 CHECK_EQ(n, (ssize_t)buffer->size());
    359 
    360                 mLastReceiverReportTimeUs = nowUs;
    361             }
    362 
    363             ++it;
    364         }
    365     }
    366 
    367     if (!mStreams.empty()) {
    368         postPollEvent();
    369     }
    370 }
    371 
    372 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
    373     ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
    374 
    375     CHECK(!s->mIsInjected);
    376 
    377     sp<ABuffer> buffer = new ABuffer(65536);
    378 
    379     socklen_t remoteAddrLen =
    380         (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
    381             ? sizeof(s->mRemoteRTCPAddr) : 0;
    382 
    383     ssize_t nbytes;
    384     do {
    385         nbytes = recvfrom(
    386             receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
    387             buffer->data(),
    388             buffer->capacity(),
    389             0,
    390             remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
    391             remoteAddrLen > 0 ? &remoteAddrLen : NULL);
    392     } while (nbytes < 0 && errno == EINTR);
    393 
    394     if (nbytes <= 0) {
    395         return -ECONNRESET;
    396     }
    397 
    398     buffer->setRange(0, nbytes);
    399 
    400     // ALOGI("received %d bytes.", buffer->size());
    401 
    402     status_t err;
    403     if (receiveRTP) {
    404         err = parseRTP(s, buffer);
    405     } else {
    406         err = parseRTCP(s, buffer);
    407     }
    408 
    409     return err;
    410 }
    411 
    412 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
    413     if (s->mNumRTPPacketsReceived++ == 0) {
    414         sp<AMessage> notify = s->mNotifyMsg->dup();
    415         notify->setInt32("first-rtp", true);
    416         notify->post();
    417     }
    418 
    419     size_t size = buffer->size();
    420 
    421     if (size < 12) {
    422         // Too short to be a valid RTP header.
    423         return -1;
    424     }
    425 
    426     const uint8_t *data = buffer->data();
    427 
    428     if ((data[0] >> 6) != 2) {
    429         // Unsupported version.
    430         return -1;
    431     }
    432 
    433     if (data[0] & 0x20) {
    434         // Padding present.
    435 
    436         size_t paddingLength = data[size - 1];
    437 
    438         if (paddingLength + 12 > size) {
    439             // If we removed this much padding we'd end up with something
    440             // that's too short to be a valid RTP header.
    441             return -1;
    442         }
    443 
    444         size -= paddingLength;
    445     }
    446 
    447     int numCSRCs = data[0] & 0x0f;
    448 
    449     size_t payloadOffset = 12 + 4 * numCSRCs;
    450 
    451     if (size < payloadOffset) {
    452         // Not enough data to fit the basic header and all the CSRC entries.
    453         return -1;
    454     }
    455 
    456     if (data[0] & 0x10) {
    457         // Header eXtension present.
    458 
    459         if (size < payloadOffset + 4) {
    460             // Not enough data to fit the basic header, all CSRC entries
    461             // and the first 4 bytes of the extension header.
    462 
    463             return -1;
    464         }
    465 
    466         const uint8_t *extensionData = &data[payloadOffset];
    467 
    468         size_t extensionLength =
    469             4 * (extensionData[2] << 8 | extensionData[3]);
    470 
    471         if (size < payloadOffset + 4 + extensionLength) {
    472             return -1;
    473         }
    474 
    475         payloadOffset += 4 + extensionLength;
    476     }
    477 
    478     uint32_t srcId = u32at(&data[8]);
    479 
    480     sp<ARTPSource> source = findSource(s, srcId);
    481 
    482     uint32_t rtpTime = u32at(&data[4]);
    483 
    484     sp<AMessage> meta = buffer->meta();
    485     meta->setInt32("ssrc", srcId);
    486     meta->setInt32("rtp-time", rtpTime);
    487     meta->setInt32("PT", data[1] & 0x7f);
    488     meta->setInt32("M", data[1] >> 7);
    489 
    490     buffer->setInt32Data(u16at(&data[2]));
    491     buffer->setRange(payloadOffset, size - payloadOffset);
    492 
    493     source->processRTPPacket(buffer);
    494 
    495     return OK;
    496 }
    497 
    498 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
    499     if (s->mNumRTCPPacketsReceived++ == 0) {
    500         sp<AMessage> notify = s->mNotifyMsg->dup();
    501         notify->setInt32("first-rtcp", true);
    502         notify->post();
    503     }
    504 
    505     const uint8_t *data = buffer->data();
    506     size_t size = buffer->size();
    507 
    508     while (size > 0) {
    509         if (size < 8) {
    510             // Too short to be a valid RTCP header
    511             return -1;
    512         }
    513 
    514         if ((data[0] >> 6) != 2) {
    515             // Unsupported version.
    516             return -1;
    517         }
    518 
    519         if (data[0] & 0x20) {
    520             // Padding present.
    521 
    522             size_t paddingLength = data[size - 1];
    523 
    524             if (paddingLength + 12 > size) {
    525                 // If we removed this much padding we'd end up with something
    526                 // that's too short to be a valid RTP header.
    527                 return -1;
    528             }
    529 
    530             size -= paddingLength;
    531         }
    532 
    533         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
    534 
    535         if (size < headerLength) {
    536             // Only received a partial packet?
    537             return -1;
    538         }
    539 
    540         switch (data[1]) {
    541             case 200:
    542             {
    543                 parseSR(s, data, headerLength);
    544                 break;
    545             }
    546 
    547             case 201:  // RR
    548             case 202:  // SDES
    549             case 204:  // APP
    550                 break;
    551 
    552             case 205:  // TSFB (transport layer specific feedback)
    553             case 206:  // PSFB (payload specific feedback)
    554                 // hexdump(data, headerLength);
    555                 break;
    556 
    557             case 203:
    558             {
    559                 parseBYE(s, data, headerLength);
    560                 break;
    561             }
    562 
    563             default:
    564             {
    565                 ALOGW("Unknown RTCP packet type %u of size %d",
    566                      (unsigned)data[1], headerLength);
    567                 break;
    568             }
    569         }
    570 
    571         data += headerLength;
    572         size -= headerLength;
    573     }
    574 
    575     return OK;
    576 }
    577 
    578 status_t ARTPConnection::parseBYE(
    579         StreamInfo *s, const uint8_t *data, size_t size) {
    580     size_t SC = data[0] & 0x3f;
    581 
    582     if (SC == 0 || size < (4 + SC * 4)) {
    583         // Packet too short for the minimal BYE header.
    584         return -1;
    585     }
    586 
    587     uint32_t id = u32at(&data[4]);
    588 
    589     sp<ARTPSource> source = findSource(s, id);
    590 
    591     source->byeReceived();
    592 
    593     return OK;
    594 }
    595 
    596 status_t ARTPConnection::parseSR(
    597         StreamInfo *s, const uint8_t *data, size_t size) {
    598     size_t RC = data[0] & 0x1f;
    599 
    600     if (size < (7 + RC * 6) * 4) {
    601         // Packet too short for the minimal SR header.
    602         return -1;
    603     }
    604 
    605     uint32_t id = u32at(&data[4]);
    606     uint64_t ntpTime = u64at(&data[8]);
    607     uint32_t rtpTime = u32at(&data[16]);
    608 
    609 #if 0
    610     ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
    611          id,
    612          rtpTime,
    613          (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
    614 #endif
    615 
    616     sp<ARTPSource> source = findSource(s, id);
    617 
    618     source->timeUpdate(rtpTime, ntpTime);
    619 
    620     return 0;
    621 }
    622 
    623 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
    624     sp<ARTPSource> source;
    625     ssize_t index = info->mSources.indexOfKey(srcId);
    626     if (index < 0) {
    627         index = info->mSources.size();
    628 
    629         source = new ARTPSource(
    630                 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
    631 
    632         info->mSources.add(srcId, source);
    633     } else {
    634         source = info->mSources.valueAt(index);
    635     }
    636 
    637     return source;
    638 }
    639 
    640 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
    641     sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
    642     msg->setInt32("index", index);
    643     msg->setBuffer("buffer", buffer);
    644     msg->post();
    645 }
    646 
    647 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
    648     int32_t index;
    649     CHECK(msg->findInt32("index", &index));
    650 
    651     sp<ABuffer> buffer;
    652     CHECK(msg->findBuffer("buffer", &buffer));
    653 
    654     List<StreamInfo>::iterator it = mStreams.begin();
    655     while (it != mStreams.end()
    656            && it->mRTPSocket != index && it->mRTCPSocket != index) {
    657         ++it;
    658     }
    659 
    660     if (it == mStreams.end()) {
    661         TRESPASS();
    662     }
    663 
    664     StreamInfo *s = &*it;
    665 
    666     status_t err;
    667     if (it->mRTPSocket == index) {
    668         err = parseRTP(s, buffer);
    669     } else {
    670         err = parseRTCP(s, buffer);
    671     }
    672 }
    673 
    674 }  // namespace android
    675 
    676