Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "ARTPConnection"
     19 #include <utils/Log.h>
     20 
     21 #include "ARTPConnection.h"
     22 
     23 #include "ARTPSource.h"
     24 #include "ASessionDescription.h"
     25 
     26 #include <media/stagefright/foundation/ABuffer.h>
     27 #include <media/stagefright/foundation/ADebug.h>
     28 #include <media/stagefright/foundation/AMessage.h>
     29 #include <media/stagefright/foundation/AString.h>
     30 #include <media/stagefright/foundation/hexdump.h>
     31 
     32 #include <arpa/inet.h>
     33 #include <sys/socket.h>
     34 
     35 namespace android {
     36 
     37 static const size_t kMaxUDPSize = 1500;
     38 
     39 static uint16_t u16at(const uint8_t *data) {
     40     return data[0] << 8 | data[1];
     41 }
     42 
     43 static uint32_t u32at(const uint8_t *data) {
     44     return u16at(data) << 16 | u16at(&data[2]);
     45 }
     46 
     47 static uint64_t u64at(const uint8_t *data) {
     48     return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
     49 }
     50 
     51 // static
     52 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
     53 
     54 struct ARTPConnection::StreamInfo {
     55     int mRTPSocket;
     56     int mRTCPSocket;
     57     sp<ASessionDescription> mSessionDesc;
     58     size_t mIndex;
     59     sp<AMessage> mNotifyMsg;
     60     KeyedVector<uint32_t, sp<ARTPSource> > mSources;
     61 
     62     int64_t mNumRTCPPacketsReceived;
     63     int64_t mNumRTPPacketsReceived;
     64     struct sockaddr_in mRemoteRTCPAddr;
     65 
     66     bool mIsInjected;
     67 };
     68 
     69 ARTPConnection::ARTPConnection(uint32_t flags)
     70     : mFlags(flags),
     71       mPollEventPending(false),
     72       mLastReceiverReportTimeUs(-1) {
     73 }
     74 
     75 ARTPConnection::~ARTPConnection() {
     76 }
     77 
     78 void ARTPConnection::addStream(
     79         int rtpSocket, int rtcpSocket,
     80         const sp<ASessionDescription> &sessionDesc,
     81         size_t index,
     82         const sp<AMessage> &notify,
     83         bool injected) {
     84     sp<AMessage> msg = new AMessage(kWhatAddStream, id());
     85     msg->setInt32("rtp-socket", rtpSocket);
     86     msg->setInt32("rtcp-socket", rtcpSocket);
     87     msg->setObject("session-desc", sessionDesc);
     88     msg->setSize("index", index);
     89     msg->setMessage("notify", notify);
     90     msg->setInt32("injected", injected);
     91     msg->post();
     92 }
     93 
     94 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
     95     sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
     96     msg->setInt32("rtp-socket", rtpSocket);
     97     msg->setInt32("rtcp-socket", rtcpSocket);
     98     msg->post();
     99 }
    100 
    101 static void bumpSocketBufferSize(int s) {
    102     int size = 256 * 1024;
    103     CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
    104 }
    105 
    106 // static
    107 void ARTPConnection::MakePortPair(
    108         int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
    109     *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
    110     CHECK_GE(*rtpSocket, 0);
    111 
    112     bumpSocketBufferSize(*rtpSocket);
    113 
    114     *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
    115     CHECK_GE(*rtcpSocket, 0);
    116 
    117     bumpSocketBufferSize(*rtcpSocket);
    118 
    119     unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
    120     start &= ~1;
    121 
    122     for (unsigned port = start; port < 65536; port += 2) {
    123         struct sockaddr_in addr;
    124         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    125         addr.sin_family = AF_INET;
    126         addr.sin_addr.s_addr = htonl(INADDR_ANY);
    127         addr.sin_port = htons(port);
    128 
    129         if (bind(*rtpSocket,
    130                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
    131             continue;
    132         }
    133 
    134         addr.sin_port = htons(port + 1);
    135 
    136         if (bind(*rtcpSocket,
    137                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
    138             *rtpPort = port;
    139             return;
    140         }
    141     }
    142 
    143     TRESPASS();
    144 }
    145 
    146 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
    147     switch (msg->what()) {
    148         case kWhatAddStream:
    149         {
    150             onAddStream(msg);
    151             break;
    152         }
    153 
    154         case kWhatRemoveStream:
    155         {
    156             onRemoveStream(msg);
    157             break;
    158         }
    159 
    160         case kWhatPollStreams:
    161         {
    162             onPollStreams();
    163             break;
    164         }
    165 
    166         case kWhatInjectPacket:
    167         {
    168             onInjectPacket(msg);
    169             break;
    170         }
    171 
    172         default:
    173         {
    174             TRESPASS();
    175             break;
    176         }
    177     }
    178 }
    179 
    180 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
    181     mStreams.push_back(StreamInfo());
    182     StreamInfo *info = &*--mStreams.end();
    183 
    184     int32_t s;
    185     CHECK(msg->findInt32("rtp-socket", &s));
    186     info->mRTPSocket = s;
    187     CHECK(msg->findInt32("rtcp-socket", &s));
    188     info->mRTCPSocket = s;
    189 
    190     int32_t injected;
    191     CHECK(msg->findInt32("injected", &injected));
    192 
    193     info->mIsInjected = injected;
    194 
    195     sp<RefBase> obj;
    196     CHECK(msg->findObject("session-desc", &obj));
    197     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
    198 
    199     CHECK(msg->findSize("index", &info->mIndex));
    200     CHECK(msg->findMessage("notify", &info->mNotifyMsg));
    201 
    202     info->mNumRTCPPacketsReceived = 0;
    203     info->mNumRTPPacketsReceived = 0;
    204     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
    205 
    206     if (!injected) {
    207         postPollEvent();
    208     }
    209 }
    210 
    211 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
    212     int32_t rtpSocket, rtcpSocket;
    213     CHECK(msg->findInt32("rtp-socket", &rtpSocket));
    214     CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
    215 
    216     List<StreamInfo>::iterator it = mStreams.begin();
    217     while (it != mStreams.end()
    218            && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
    219         ++it;
    220     }
    221 
    222     if (it == mStreams.end()) {
    223         return;
    224     }
    225 
    226     mStreams.erase(it);
    227 }
    228 
    229 void ARTPConnection::postPollEvent() {
    230     if (mPollEventPending) {
    231         return;
    232     }
    233 
    234     sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
    235     msg->post();
    236 
    237     mPollEventPending = true;
    238 }
    239 
    240 void ARTPConnection::onPollStreams() {
    241     mPollEventPending = false;
    242 
    243     if (mStreams.empty()) {
    244         return;
    245     }
    246 
    247     struct timeval tv;
    248     tv.tv_sec = 0;
    249     tv.tv_usec = kSelectTimeoutUs;
    250 
    251     fd_set rs;
    252     FD_ZERO(&rs);
    253 
    254     int maxSocket = -1;
    255     for (List<StreamInfo>::iterator it = mStreams.begin();
    256          it != mStreams.end(); ++it) {
    257         if ((*it).mIsInjected) {
    258             continue;
    259         }
    260 
    261         FD_SET(it->mRTPSocket, &rs);
    262         FD_SET(it->mRTCPSocket, &rs);
    263 
    264         if (it->mRTPSocket > maxSocket) {
    265             maxSocket = it->mRTPSocket;
    266         }
    267         if (it->mRTCPSocket > maxSocket) {
    268             maxSocket = it->mRTCPSocket;
    269         }
    270     }
    271 
    272     if (maxSocket == -1) {
    273         return;
    274     }
    275 
    276     int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
    277 
    278     if (res > 0) {
    279         List<StreamInfo>::iterator it = mStreams.begin();
    280         while (it != mStreams.end()) {
    281             if ((*it).mIsInjected) {
    282                 ++it;
    283                 continue;
    284             }
    285 
    286             status_t err = OK;
    287             if (FD_ISSET(it->mRTPSocket, &rs)) {
    288                 err = receive(&*it, true);
    289             }
    290             if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
    291                 err = receive(&*it, false);
    292             }
    293 
    294             if (err == -ECONNRESET) {
    295                 // socket failure, this stream is dead, Jim.
    296 
    297                 LOGW("failed to receive RTP/RTCP datagram.");
    298                 it = mStreams.erase(it);
    299                 continue;
    300             }
    301 
    302             ++it;
    303         }
    304     }
    305 
    306     int64_t nowUs = ALooper::GetNowUs();
    307     if (mLastReceiverReportTimeUs <= 0
    308             || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
    309         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
    310         List<StreamInfo>::iterator it = mStreams.begin();
    311         while (it != mStreams.end()) {
    312             StreamInfo *s = &*it;
    313 
    314             if (s->mIsInjected) {
    315                 ++it;
    316                 continue;
    317             }
    318 
    319             if (s->mNumRTCPPacketsReceived == 0) {
    320                 // We have never received any RTCP packets on this stream,
    321                 // we don't even know where to send a report.
    322                 ++it;
    323                 continue;
    324             }
    325 
    326             buffer->setRange(0, 0);
    327 
    328             for (size_t i = 0; i < s->mSources.size(); ++i) {
    329                 sp<ARTPSource> source = s->mSources.valueAt(i);
    330 
    331                 source->addReceiverReport(buffer);
    332 
    333                 if (mFlags & kRegularlyRequestFIR) {
    334                     source->addFIR(buffer);
    335                 }
    336             }
    337 
    338             if (buffer->size() > 0) {
    339                 LOGV("Sending RR...");
    340 
    341                 ssize_t n;
    342                 do {
    343                     n = sendto(
    344                         s->mRTCPSocket, buffer->data(), buffer->size(), 0,
    345                         (const struct sockaddr *)&s->mRemoteRTCPAddr,
    346                         sizeof(s->mRemoteRTCPAddr));
    347                 } while (n < 0 && errno == EINTR);
    348 
    349                 if (n <= 0) {
    350                     LOGW("failed to send RTCP receiver report (%s).",
    351                          n == 0 ? "connection gone" : strerror(errno));
    352 
    353                     it = mStreams.erase(it);
    354                     continue;
    355                 }
    356 
    357                 CHECK_EQ(n, (ssize_t)buffer->size());
    358 
    359                 mLastReceiverReportTimeUs = nowUs;
    360             }
    361 
    362             ++it;
    363         }
    364     }
    365 
    366     if (!mStreams.empty()) {
    367         postPollEvent();
    368     }
    369 }
    370 
    371 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
    372     LOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
    373 
    374     CHECK(!s->mIsInjected);
    375 
    376     sp<ABuffer> buffer = new ABuffer(65536);
    377 
    378     socklen_t remoteAddrLen =
    379         (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
    380             ? sizeof(s->mRemoteRTCPAddr) : 0;
    381 
    382     ssize_t nbytes;
    383     do {
    384         nbytes = recvfrom(
    385             receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
    386             buffer->data(),
    387             buffer->capacity(),
    388             0,
    389             remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
    390             remoteAddrLen > 0 ? &remoteAddrLen : NULL);
    391     } while (nbytes < 0 && errno == EINTR);
    392 
    393     if (nbytes <= 0) {
    394         return -ECONNRESET;
    395     }
    396 
    397     buffer->setRange(0, nbytes);
    398 
    399     // LOGI("received %d bytes.", buffer->size());
    400 
    401     status_t err;
    402     if (receiveRTP) {
    403         err = parseRTP(s, buffer);
    404     } else {
    405         err = parseRTCP(s, buffer);
    406     }
    407 
    408     return err;
    409 }
    410 
    411 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
    412     if (s->mNumRTPPacketsReceived++ == 0) {
    413         sp<AMessage> notify = s->mNotifyMsg->dup();
    414         notify->setInt32("first-rtp", true);
    415         notify->post();
    416     }
    417 
    418     size_t size = buffer->size();
    419 
    420     if (size < 12) {
    421         // Too short to be a valid RTP header.
    422         return -1;
    423     }
    424 
    425     const uint8_t *data = buffer->data();
    426 
    427     if ((data[0] >> 6) != 2) {
    428         // Unsupported version.
    429         return -1;
    430     }
    431 
    432     if (data[0] & 0x20) {
    433         // Padding present.
    434 
    435         size_t paddingLength = data[size - 1];
    436 
    437         if (paddingLength + 12 > size) {
    438             // If we removed this much padding we'd end up with something
    439             // that's too short to be a valid RTP header.
    440             return -1;
    441         }
    442 
    443         size -= paddingLength;
    444     }
    445 
    446     int numCSRCs = data[0] & 0x0f;
    447 
    448     size_t payloadOffset = 12 + 4 * numCSRCs;
    449 
    450     if (size < payloadOffset) {
    451         // Not enough data to fit the basic header and all the CSRC entries.
    452         return -1;
    453     }
    454 
    455     if (data[0] & 0x10) {
    456         // Header eXtension present.
    457 
    458         if (size < payloadOffset + 4) {
    459             // Not enough data to fit the basic header, all CSRC entries
    460             // and the first 4 bytes of the extension header.
    461 
    462             return -1;
    463         }
    464 
    465         const uint8_t *extensionData = &data[payloadOffset];
    466 
    467         size_t extensionLength =
    468             4 * (extensionData[2] << 8 | extensionData[3]);
    469 
    470         if (size < payloadOffset + 4 + extensionLength) {
    471             return -1;
    472         }
    473 
    474         payloadOffset += 4 + extensionLength;
    475     }
    476 
    477     uint32_t srcId = u32at(&data[8]);
    478 
    479     sp<ARTPSource> source = findSource(s, srcId);
    480 
    481     uint32_t rtpTime = u32at(&data[4]);
    482 
    483     sp<AMessage> meta = buffer->meta();
    484     meta->setInt32("ssrc", srcId);
    485     meta->setInt32("rtp-time", rtpTime);
    486     meta->setInt32("PT", data[1] & 0x7f);
    487     meta->setInt32("M", data[1] >> 7);
    488 
    489     buffer->setInt32Data(u16at(&data[2]));
    490     buffer->setRange(payloadOffset, size - payloadOffset);
    491 
    492     source->processRTPPacket(buffer);
    493 
    494     return OK;
    495 }
    496 
    497 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
    498     if (s->mNumRTCPPacketsReceived++ == 0) {
    499         sp<AMessage> notify = s->mNotifyMsg->dup();
    500         notify->setInt32("first-rtcp", true);
    501         notify->post();
    502     }
    503 
    504     const uint8_t *data = buffer->data();
    505     size_t size = buffer->size();
    506 
    507     while (size > 0) {
    508         if (size < 8) {
    509             // Too short to be a valid RTCP header
    510             return -1;
    511         }
    512 
    513         if ((data[0] >> 6) != 2) {
    514             // Unsupported version.
    515             return -1;
    516         }
    517 
    518         if (data[0] & 0x20) {
    519             // Padding present.
    520 
    521             size_t paddingLength = data[size - 1];
    522 
    523             if (paddingLength + 12 > size) {
    524                 // If we removed this much padding we'd end up with something
    525                 // that's too short to be a valid RTP header.
    526                 return -1;
    527             }
    528 
    529             size -= paddingLength;
    530         }
    531 
    532         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
    533 
    534         if (size < headerLength) {
    535             // Only received a partial packet?
    536             return -1;
    537         }
    538 
    539         switch (data[1]) {
    540             case 200:
    541             {
    542                 parseSR(s, data, headerLength);
    543                 break;
    544             }
    545 
    546             case 201:  // RR
    547             case 202:  // SDES
    548             case 204:  // APP
    549                 break;
    550 
    551             case 205:  // TSFB (transport layer specific feedback)
    552             case 206:  // PSFB (payload specific feedback)
    553                 // hexdump(data, headerLength);
    554                 break;
    555 
    556             case 203:
    557             {
    558                 parseBYE(s, data, headerLength);
    559                 break;
    560             }
    561 
    562             default:
    563             {
    564                 LOGW("Unknown RTCP packet type %u of size %d",
    565                      (unsigned)data[1], headerLength);
    566                 break;
    567             }
    568         }
    569 
    570         data += headerLength;
    571         size -= headerLength;
    572     }
    573 
    574     return OK;
    575 }
    576 
    577 status_t ARTPConnection::parseBYE(
    578         StreamInfo *s, const uint8_t *data, size_t size) {
    579     size_t SC = data[0] & 0x3f;
    580 
    581     if (SC == 0 || size < (4 + SC * 4)) {
    582         // Packet too short for the minimal BYE header.
    583         return -1;
    584     }
    585 
    586     uint32_t id = u32at(&data[4]);
    587 
    588     sp<ARTPSource> source = findSource(s, id);
    589 
    590     source->byeReceived();
    591 
    592     return OK;
    593 }
    594 
    595 status_t ARTPConnection::parseSR(
    596         StreamInfo *s, const uint8_t *data, size_t size) {
    597     size_t RC = data[0] & 0x1f;
    598 
    599     if (size < (7 + RC * 6) * 4) {
    600         // Packet too short for the minimal SR header.
    601         return -1;
    602     }
    603 
    604     uint32_t id = u32at(&data[4]);
    605     uint64_t ntpTime = u64at(&data[8]);
    606     uint32_t rtpTime = u32at(&data[16]);
    607 
    608 #if 0
    609     LOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
    610          id,
    611          rtpTime,
    612          (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
    613 #endif
    614 
    615     sp<ARTPSource> source = findSource(s, id);
    616 
    617     source->timeUpdate(rtpTime, ntpTime);
    618 
    619     return 0;
    620 }
    621 
    622 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
    623     sp<ARTPSource> source;
    624     ssize_t index = info->mSources.indexOfKey(srcId);
    625     if (index < 0) {
    626         index = info->mSources.size();
    627 
    628         source = new ARTPSource(
    629                 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
    630 
    631         info->mSources.add(srcId, source);
    632     } else {
    633         source = info->mSources.valueAt(index);
    634     }
    635 
    636     return source;
    637 }
    638 
    639 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
    640     sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
    641     msg->setInt32("index", index);
    642     msg->setObject("buffer", buffer);
    643     msg->post();
    644 }
    645 
    646 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
    647     int32_t index;
    648     CHECK(msg->findInt32("index", &index));
    649 
    650     sp<RefBase> obj;
    651     CHECK(msg->findObject("buffer", &obj));
    652 
    653     sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
    654 
    655     List<StreamInfo>::iterator it = mStreams.begin();
    656     while (it != mStreams.end()
    657            && it->mRTPSocket != index && it->mRTCPSocket != index) {
    658         ++it;
    659     }
    660 
    661     if (it == mStreams.end()) {
    662         TRESPASS();
    663     }
    664 
    665     StreamInfo *s = &*it;
    666 
    667     status_t err;
    668     if (it->mRTPSocket == index) {
    669         err = parseRTP(s, buffer);
    670     } else {
    671         err = parseRTCP(s, buffer);
    672     }
    673 }
    674 
    675 }  // namespace android
    676 
    677