Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "ARTPConnection"
     19 #include <utils/Log.h>
     20 
     21 #include "ARTPConnection.h"
     22 
     23 #include "ARTPSource.h"
     24 #include "ASessionDescription.h"
     25 
     26 #include <media/stagefright/foundation/ABuffer.h>
     27 #include <media/stagefright/foundation/ADebug.h>
     28 #include <media/stagefright/foundation/AMessage.h>
     29 #include <media/stagefright/foundation/AString.h>
     30 #include <media/stagefright/foundation/hexdump.h>
     31 
     32 #include <arpa/inet.h>
     33 #include <sys/socket.h>
     34 
     35 namespace android {
     36 
     37 static const size_t kMaxUDPSize = 1500;
     38 
     39 static uint16_t u16at(const uint8_t *data) {
     40     return data[0] << 8 | data[1];
     41 }
     42 
     43 static uint32_t u32at(const uint8_t *data) {
     44     return u16at(data) << 16 | u16at(&data[2]);
     45 }
     46 
     47 static uint64_t u64at(const uint8_t *data) {
     48     return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
     49 }
     50 
     51 // static
     52 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
     53 
     54 struct ARTPConnection::StreamInfo {
     55     int mRTPSocket;
     56     int mRTCPSocket;
     57     sp<ASessionDescription> mSessionDesc;
     58     size_t mIndex;
     59     sp<AMessage> mNotifyMsg;
     60     KeyedVector<uint32_t, sp<ARTPSource> > mSources;
     61 
     62     int64_t mNumRTCPPacketsReceived;
     63     int64_t mNumRTPPacketsReceived;
     64     struct sockaddr_in mRemoteRTCPAddr;
     65 
     66     bool mIsInjected;
     67 };
     68 
     69 ARTPConnection::ARTPConnection(uint32_t flags)
     70     : mFlags(flags),
     71       mPollEventPending(false),
     72       mLastReceiverReportTimeUs(-1) {
     73 }
     74 
     75 ARTPConnection::~ARTPConnection() {
     76 }
     77 
     78 void ARTPConnection::addStream(
     79         int rtpSocket, int rtcpSocket,
     80         const sp<ASessionDescription> &sessionDesc,
     81         size_t index,
     82         const sp<AMessage> &notify,
     83         bool injected) {
     84     sp<AMessage> msg = new AMessage(kWhatAddStream, id());
     85     msg->setInt32("rtp-socket", rtpSocket);
     86     msg->setInt32("rtcp-socket", rtcpSocket);
     87     msg->setObject("session-desc", sessionDesc);
     88     msg->setSize("index", index);
     89     msg->setMessage("notify", notify);
     90     msg->setInt32("injected", injected);
     91     msg->post();
     92 }
     93 
     94 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
     95     sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
     96     msg->setInt32("rtp-socket", rtpSocket);
     97     msg->setInt32("rtcp-socket", rtcpSocket);
     98     msg->post();
     99 }
    100 
    101 static void bumpSocketBufferSize(int s) {
    102     int size = 256 * 1024;
    103     CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
    104 }
    105 
    106 // static
    107 void ARTPConnection::MakePortPair(
    108         int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
    109     *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
    110     CHECK_GE(*rtpSocket, 0);
    111 
    112     bumpSocketBufferSize(*rtpSocket);
    113 
    114     *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
    115     CHECK_GE(*rtcpSocket, 0);
    116 
    117     bumpSocketBufferSize(*rtcpSocket);
    118 
    119     unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
    120     start &= ~1;
    121 
    122     for (unsigned port = start; port < 65536; port += 2) {
    123         struct sockaddr_in addr;
    124         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    125         addr.sin_family = AF_INET;
    126         addr.sin_addr.s_addr = htonl(INADDR_ANY);
    127         addr.sin_port = htons(port);
    128 
    129         if (bind(*rtpSocket,
    130                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
    131             continue;
    132         }
    133 
    134         addr.sin_port = htons(port + 1);
    135 
    136         if (bind(*rtcpSocket,
    137                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
    138             *rtpPort = port;
    139             return;
    140         }
    141     }
    142 
    143     TRESPASS();
    144 }
    145 
    146 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
    147     switch (msg->what()) {
    148         case kWhatAddStream:
    149         {
    150             onAddStream(msg);
    151             break;
    152         }
    153 
    154         case kWhatRemoveStream:
    155         {
    156             onRemoveStream(msg);
    157             break;
    158         }
    159 
    160         case kWhatPollStreams:
    161         {
    162             onPollStreams();
    163             break;
    164         }
    165 
    166         case kWhatInjectPacket:
    167         {
    168             onInjectPacket(msg);
    169             break;
    170         }
    171 
    172         default:
    173         {
    174             TRESPASS();
    175             break;
    176         }
    177     }
    178 }
    179 
    180 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
    181     mStreams.push_back(StreamInfo());
    182     StreamInfo *info = &*--mStreams.end();
    183 
    184     int32_t s;
    185     CHECK(msg->findInt32("rtp-socket", &s));
    186     info->mRTPSocket = s;
    187     CHECK(msg->findInt32("rtcp-socket", &s));
    188     info->mRTCPSocket = s;
    189 
    190     int32_t injected;
    191     CHECK(msg->findInt32("injected", &injected));
    192 
    193     info->mIsInjected = injected;
    194 
    195     sp<RefBase> obj;
    196     CHECK(msg->findObject("session-desc", &obj));
    197     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
    198 
    199     CHECK(msg->findSize("index", &info->mIndex));
    200     CHECK(msg->findMessage("notify", &info->mNotifyMsg));
    201 
    202     info->mNumRTCPPacketsReceived = 0;
    203     info->mNumRTPPacketsReceived = 0;
    204     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
    205 
    206     if (!injected) {
    207         postPollEvent();
    208     }
    209 }
    210 
    211 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
    212     int32_t rtpSocket, rtcpSocket;
    213     CHECK(msg->findInt32("rtp-socket", &rtpSocket));
    214     CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
    215 
    216     List<StreamInfo>::iterator it = mStreams.begin();
    217     while (it != mStreams.end()
    218            && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
    219         ++it;
    220     }
    221 
    222     if (it == mStreams.end()) {
    223         TRESPASS();
    224     }
    225 
    226     mStreams.erase(it);
    227 }
    228 
    229 void ARTPConnection::postPollEvent() {
    230     if (mPollEventPending) {
    231         return;
    232     }
    233 
    234     sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
    235     msg->post();
    236 
    237     mPollEventPending = true;
    238 }
    239 
    240 void ARTPConnection::onPollStreams() {
    241     mPollEventPending = false;
    242 
    243     if (mStreams.empty()) {
    244         return;
    245     }
    246 
    247     struct timeval tv;
    248     tv.tv_sec = 0;
    249     tv.tv_usec = kSelectTimeoutUs;
    250 
    251     fd_set rs;
    252     FD_ZERO(&rs);
    253 
    254     int maxSocket = -1;
    255     for (List<StreamInfo>::iterator it = mStreams.begin();
    256          it != mStreams.end(); ++it) {
    257         if ((*it).mIsInjected) {
    258             continue;
    259         }
    260 
    261         FD_SET(it->mRTPSocket, &rs);
    262         FD_SET(it->mRTCPSocket, &rs);
    263 
    264         if (it->mRTPSocket > maxSocket) {
    265             maxSocket = it->mRTPSocket;
    266         }
    267         if (it->mRTCPSocket > maxSocket) {
    268             maxSocket = it->mRTCPSocket;
    269         }
    270     }
    271 
    272     if (maxSocket == -1) {
    273         return;
    274     }
    275 
    276     int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
    277     CHECK_GE(res, 0);
    278 
    279     if (res > 0) {
    280         for (List<StreamInfo>::iterator it = mStreams.begin();
    281              it != mStreams.end(); ++it) {
    282             if ((*it).mIsInjected) {
    283                 continue;
    284             }
    285 
    286             if (FD_ISSET(it->mRTPSocket, &rs)) {
    287                 receive(&*it, true);
    288             }
    289             if (FD_ISSET(it->mRTCPSocket, &rs)) {
    290                 receive(&*it, false);
    291             }
    292         }
    293     }
    294 
    295     postPollEvent();
    296 
    297     int64_t nowUs = ALooper::GetNowUs();
    298     if (mLastReceiverReportTimeUs <= 0
    299             || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
    300         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
    301         for (List<StreamInfo>::iterator it = mStreams.begin();
    302              it != mStreams.end(); ++it) {
    303             StreamInfo *s = &*it;
    304 
    305             if (s->mIsInjected) {
    306                 continue;
    307             }
    308 
    309             if (s->mNumRTCPPacketsReceived == 0) {
    310                 // We have never received any RTCP packets on this stream,
    311                 // we don't even know where to send a report.
    312                 continue;
    313             }
    314 
    315             buffer->setRange(0, 0);
    316 
    317             for (size_t i = 0; i < s->mSources.size(); ++i) {
    318                 sp<ARTPSource> source = s->mSources.valueAt(i);
    319 
    320                 source->addReceiverReport(buffer);
    321 
    322                 if (mFlags & kRegularlyRequestFIR) {
    323                     source->addFIR(buffer);
    324                 }
    325             }
    326 
    327             if (buffer->size() > 0) {
    328                 LOGV("Sending RR...");
    329 
    330                 ssize_t n = sendto(
    331                         s->mRTCPSocket, buffer->data(), buffer->size(), 0,
    332                         (const struct sockaddr *)&s->mRemoteRTCPAddr,
    333                         sizeof(s->mRemoteRTCPAddr));
    334                 CHECK_EQ(n, (ssize_t)buffer->size());
    335 
    336                 mLastReceiverReportTimeUs = nowUs;
    337             }
    338         }
    339     }
    340 }
    341 
    342 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
    343     LOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
    344 
    345     CHECK(!s->mIsInjected);
    346 
    347     sp<ABuffer> buffer = new ABuffer(65536);
    348 
    349     socklen_t remoteAddrLen =
    350         (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
    351             ? sizeof(s->mRemoteRTCPAddr) : 0;
    352 
    353     ssize_t nbytes = recvfrom(
    354             receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
    355             buffer->data(),
    356             buffer->capacity(),
    357             0,
    358             remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
    359             remoteAddrLen > 0 ? &remoteAddrLen : NULL);
    360 
    361     if (nbytes < 0) {
    362         return -1;
    363     }
    364 
    365     buffer->setRange(0, nbytes);
    366 
    367     // LOGI("received %d bytes.", buffer->size());
    368 
    369     status_t err;
    370     if (receiveRTP) {
    371         err = parseRTP(s, buffer);
    372     } else {
    373         err = parseRTCP(s, buffer);
    374     }
    375 
    376     return err;
    377 }
    378 
    379 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
    380     if (s->mNumRTPPacketsReceived++ == 0) {
    381         sp<AMessage> notify = s->mNotifyMsg->dup();
    382         notify->setInt32("first-rtp", true);
    383         notify->post();
    384     }
    385 
    386     size_t size = buffer->size();
    387 
    388     if (size < 12) {
    389         // Too short to be a valid RTP header.
    390         return -1;
    391     }
    392 
    393     const uint8_t *data = buffer->data();
    394 
    395     if ((data[0] >> 6) != 2) {
    396         // Unsupported version.
    397         return -1;
    398     }
    399 
    400     if (data[0] & 0x20) {
    401         // Padding present.
    402 
    403         size_t paddingLength = data[size - 1];
    404 
    405         if (paddingLength + 12 > size) {
    406             // If we removed this much padding we'd end up with something
    407             // that's too short to be a valid RTP header.
    408             return -1;
    409         }
    410 
    411         size -= paddingLength;
    412     }
    413 
    414     int numCSRCs = data[0] & 0x0f;
    415 
    416     size_t payloadOffset = 12 + 4 * numCSRCs;
    417 
    418     if (size < payloadOffset) {
    419         // Not enough data to fit the basic header and all the CSRC entries.
    420         return -1;
    421     }
    422 
    423     if (data[0] & 0x10) {
    424         // Header eXtension present.
    425 
    426         if (size < payloadOffset + 4) {
    427             // Not enough data to fit the basic header, all CSRC entries
    428             // and the first 4 bytes of the extension header.
    429 
    430             return -1;
    431         }
    432 
    433         const uint8_t *extensionData = &data[payloadOffset];
    434 
    435         size_t extensionLength =
    436             4 * (extensionData[2] << 8 | extensionData[3]);
    437 
    438         if (size < payloadOffset + 4 + extensionLength) {
    439             return -1;
    440         }
    441 
    442         payloadOffset += 4 + extensionLength;
    443     }
    444 
    445     uint32_t srcId = u32at(&data[8]);
    446 
    447     sp<ARTPSource> source = findSource(s, srcId);
    448 
    449     uint32_t rtpTime = u32at(&data[4]);
    450 
    451     sp<AMessage> meta = buffer->meta();
    452     meta->setInt32("ssrc", srcId);
    453     meta->setInt32("rtp-time", rtpTime);
    454     meta->setInt32("PT", data[1] & 0x7f);
    455     meta->setInt32("M", data[1] >> 7);
    456 
    457     buffer->setInt32Data(u16at(&data[2]));
    458     buffer->setRange(payloadOffset, size - payloadOffset);
    459 
    460     source->processRTPPacket(buffer);
    461 
    462     return OK;
    463 }
    464 
    465 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
    466     if (s->mNumRTCPPacketsReceived++ == 0) {
    467         sp<AMessage> notify = s->mNotifyMsg->dup();
    468         notify->setInt32("first-rtcp", true);
    469         notify->post();
    470     }
    471 
    472     const uint8_t *data = buffer->data();
    473     size_t size = buffer->size();
    474 
    475     while (size > 0) {
    476         if (size < 8) {
    477             // Too short to be a valid RTCP header
    478             return -1;
    479         }
    480 
    481         if ((data[0] >> 6) != 2) {
    482             // Unsupported version.
    483             return -1;
    484         }
    485 
    486         if (data[0] & 0x20) {
    487             // Padding present.
    488 
    489             size_t paddingLength = data[size - 1];
    490 
    491             if (paddingLength + 12 > size) {
    492                 // If we removed this much padding we'd end up with something
    493                 // that's too short to be a valid RTP header.
    494                 return -1;
    495             }
    496 
    497             size -= paddingLength;
    498         }
    499 
    500         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
    501 
    502         if (size < headerLength) {
    503             // Only received a partial packet?
    504             return -1;
    505         }
    506 
    507         switch (data[1]) {
    508             case 200:
    509             {
    510                 parseSR(s, data, headerLength);
    511                 break;
    512             }
    513 
    514             case 201:  // RR
    515             case 202:  // SDES
    516             case 204:  // APP
    517                 break;
    518 
    519             case 205:  // TSFB (transport layer specific feedback)
    520             case 206:  // PSFB (payload specific feedback)
    521                 // hexdump(data, headerLength);
    522                 break;
    523 
    524             case 203:
    525             {
    526                 parseBYE(s, data, headerLength);
    527                 break;
    528             }
    529 
    530             default:
    531             {
    532                 LOGW("Unknown RTCP packet type %u of size %d",
    533                      (unsigned)data[1], headerLength);
    534                 break;
    535             }
    536         }
    537 
    538         data += headerLength;
    539         size -= headerLength;
    540     }
    541 
    542     return OK;
    543 }
    544 
    545 status_t ARTPConnection::parseBYE(
    546         StreamInfo *s, const uint8_t *data, size_t size) {
    547     size_t SC = data[0] & 0x3f;
    548 
    549     if (SC == 0 || size < (4 + SC * 4)) {
    550         // Packet too short for the minimal BYE header.
    551         return -1;
    552     }
    553 
    554     uint32_t id = u32at(&data[4]);
    555 
    556     sp<ARTPSource> source = findSource(s, id);
    557 
    558     source->byeReceived();
    559 
    560     return OK;
    561 }
    562 
    563 status_t ARTPConnection::parseSR(
    564         StreamInfo *s, const uint8_t *data, size_t size) {
    565     size_t RC = data[0] & 0x1f;
    566 
    567     if (size < (7 + RC * 6) * 4) {
    568         // Packet too short for the minimal SR header.
    569         return -1;
    570     }
    571 
    572     uint32_t id = u32at(&data[4]);
    573     uint64_t ntpTime = u64at(&data[8]);
    574     uint32_t rtpTime = u32at(&data[16]);
    575 
    576 #if 0
    577     LOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
    578          id,
    579          rtpTime,
    580          (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
    581 #endif
    582 
    583     sp<ARTPSource> source = findSource(s, id);
    584 
    585     source->timeUpdate(rtpTime, ntpTime);
    586 
    587     return 0;
    588 }
    589 
    590 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
    591     sp<ARTPSource> source;
    592     ssize_t index = info->mSources.indexOfKey(srcId);
    593     if (index < 0) {
    594         index = info->mSources.size();
    595 
    596         source = new ARTPSource(
    597                 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
    598 
    599         info->mSources.add(srcId, source);
    600     } else {
    601         source = info->mSources.valueAt(index);
    602     }
    603 
    604     return source;
    605 }
    606 
    607 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
    608     sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
    609     msg->setInt32("index", index);
    610     msg->setObject("buffer", buffer);
    611     msg->post();
    612 }
    613 
    614 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
    615     int32_t index;
    616     CHECK(msg->findInt32("index", &index));
    617 
    618     sp<RefBase> obj;
    619     CHECK(msg->findObject("buffer", &obj));
    620 
    621     sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
    622 
    623     List<StreamInfo>::iterator it = mStreams.begin();
    624     while (it != mStreams.end()
    625            && it->mRTPSocket != index && it->mRTCPSocket != index) {
    626         ++it;
    627     }
    628 
    629     if (it == mStreams.end()) {
    630         TRESPASS();
    631     }
    632 
    633     StreamInfo *s = &*it;
    634 
    635     status_t err;
    636     if (it->mRTPSocket == index) {
    637         err = parseRTP(s, buffer);
    638     } else {
    639         err = parseRTCP(s, buffer);
    640     }
    641 }
    642 
    643 }  // namespace android
    644 
    645