Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "ARTPConnection"
     19 #include <utils/Log.h>
     20 
     21 #include "ARTPAssembler.h"
     22 #include "ARTPConnection.h"
     23 
     24 #include "ARTPSource.h"
     25 #include "ASessionDescription.h"
     26 
     27 #include <media/stagefright/foundation/ABuffer.h>
     28 #include <media/stagefright/foundation/ADebug.h>
     29 #include <media/stagefright/foundation/AMessage.h>
     30 #include <media/stagefright/foundation/AString.h>
     31 #include <media/stagefright/foundation/hexdump.h>
     32 
     33 #include <arpa/inet.h>
     34 #include <sys/socket.h>
     35 
     36 namespace android {
     37 
     38 static const size_t kMaxUDPSize = 1500;
     39 
     40 static uint16_t u16at(const uint8_t *data) {
     41     return data[0] << 8 | data[1];
     42 }
     43 
     44 static uint32_t u32at(const uint8_t *data) {
     45     return u16at(data) << 16 | u16at(&data[2]);
     46 }
     47 
     48 static uint64_t u64at(const uint8_t *data) {
     49     return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
     50 }
     51 
     52 // static
     53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
     54 
     55 struct ARTPConnection::StreamInfo {
     56     int mRTPSocket;
     57     int mRTCPSocket;
     58     sp<ASessionDescription> mSessionDesc;
     59     size_t mIndex;
     60     sp<AMessage> mNotifyMsg;
     61     KeyedVector<uint32_t, sp<ARTPSource> > mSources;
     62 
     63     int64_t mNumRTCPPacketsReceived;
     64     int64_t mNumRTPPacketsReceived;
     65     struct sockaddr_in mRemoteRTCPAddr;
     66 
     67     bool mIsInjected;
     68 };
     69 
     70 ARTPConnection::ARTPConnection(uint32_t flags)
     71     : mFlags(flags),
     72       mPollEventPending(false),
     73       mLastReceiverReportTimeUs(-1) {
     74 }
     75 
     76 ARTPConnection::~ARTPConnection() {
     77 }
     78 
     79 void ARTPConnection::addStream(
     80         int rtpSocket, int rtcpSocket,
     81         const sp<ASessionDescription> &sessionDesc,
     82         size_t index,
     83         const sp<AMessage> &notify,
     84         bool injected) {
     85     sp<AMessage> msg = new AMessage(kWhatAddStream, id());
     86     msg->setInt32("rtp-socket", rtpSocket);
     87     msg->setInt32("rtcp-socket", rtcpSocket);
     88     msg->setObject("session-desc", sessionDesc);
     89     msg->setSize("index", index);
     90     msg->setMessage("notify", notify);
     91     msg->setInt32("injected", injected);
     92     msg->post();
     93 }
     94 
     95 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
     96     sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
     97     msg->setInt32("rtp-socket", rtpSocket);
     98     msg->setInt32("rtcp-socket", rtcpSocket);
     99     msg->post();
    100 }
    101 
    102 static void bumpSocketBufferSize(int s) {
    103     int size = 256 * 1024;
    104     CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
    105 }
    106 
    107 // static
    108 void ARTPConnection::MakePortPair(
    109         int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
    110     *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
    111     CHECK_GE(*rtpSocket, 0);
    112 
    113     bumpSocketBufferSize(*rtpSocket);
    114 
    115     *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
    116     CHECK_GE(*rtcpSocket, 0);
    117 
    118     bumpSocketBufferSize(*rtcpSocket);
    119 
    120     /* rand() * 1000 may overflow int type, use long long */
    121     unsigned start = (unsigned)((rand()* 1000ll)/RAND_MAX) + 15550;
    122     start &= ~1;
    123 
    124     for (unsigned port = start; port < 65536; port += 2) {
    125         struct sockaddr_in addr;
    126         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    127         addr.sin_family = AF_INET;
    128         addr.sin_addr.s_addr = htonl(INADDR_ANY);
    129         addr.sin_port = htons(port);
    130 
    131         if (bind(*rtpSocket,
    132                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
    133             continue;
    134         }
    135 
    136         addr.sin_port = htons(port + 1);
    137 
    138         if (bind(*rtcpSocket,
    139                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
    140             *rtpPort = port;
    141             return;
    142         }
    143     }
    144 
    145     TRESPASS();
    146 }
    147 
    148 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
    149     switch (msg->what()) {
    150         case kWhatAddStream:
    151         {
    152             onAddStream(msg);
    153             break;
    154         }
    155 
    156         case kWhatRemoveStream:
    157         {
    158             onRemoveStream(msg);
    159             break;
    160         }
    161 
    162         case kWhatPollStreams:
    163         {
    164             onPollStreams();
    165             break;
    166         }
    167 
    168         case kWhatInjectPacket:
    169         {
    170             onInjectPacket(msg);
    171             break;
    172         }
    173 
    174         default:
    175         {
    176             TRESPASS();
    177             break;
    178         }
    179     }
    180 }
    181 
    182 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
    183     mStreams.push_back(StreamInfo());
    184     StreamInfo *info = &*--mStreams.end();
    185 
    186     int32_t s;
    187     CHECK(msg->findInt32("rtp-socket", &s));
    188     info->mRTPSocket = s;
    189     CHECK(msg->findInt32("rtcp-socket", &s));
    190     info->mRTCPSocket = s;
    191 
    192     int32_t injected;
    193     CHECK(msg->findInt32("injected", &injected));
    194 
    195     info->mIsInjected = injected;
    196 
    197     sp<RefBase> obj;
    198     CHECK(msg->findObject("session-desc", &obj));
    199     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
    200 
    201     CHECK(msg->findSize("index", &info->mIndex));
    202     CHECK(msg->findMessage("notify", &info->mNotifyMsg));
    203 
    204     info->mNumRTCPPacketsReceived = 0;
    205     info->mNumRTPPacketsReceived = 0;
    206     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
    207 
    208     if (!injected) {
    209         postPollEvent();
    210     }
    211 }
    212 
    213 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
    214     int32_t rtpSocket, rtcpSocket;
    215     CHECK(msg->findInt32("rtp-socket", &rtpSocket));
    216     CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
    217 
    218     List<StreamInfo>::iterator it = mStreams.begin();
    219     while (it != mStreams.end()
    220            && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
    221         ++it;
    222     }
    223 
    224     if (it == mStreams.end()) {
    225         return;
    226     }
    227 
    228     mStreams.erase(it);
    229 }
    230 
    231 void ARTPConnection::postPollEvent() {
    232     if (mPollEventPending) {
    233         return;
    234     }
    235 
    236     sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
    237     msg->post();
    238 
    239     mPollEventPending = true;
    240 }
    241 
    242 void ARTPConnection::onPollStreams() {
    243     mPollEventPending = false;
    244 
    245     if (mStreams.empty()) {
    246         return;
    247     }
    248 
    249     struct timeval tv;
    250     tv.tv_sec = 0;
    251     tv.tv_usec = kSelectTimeoutUs;
    252 
    253     fd_set rs;
    254     FD_ZERO(&rs);
    255 
    256     int maxSocket = -1;
    257     for (List<StreamInfo>::iterator it = mStreams.begin();
    258          it != mStreams.end(); ++it) {
    259         if ((*it).mIsInjected) {
    260             continue;
    261         }
    262 
    263         FD_SET(it->mRTPSocket, &rs);
    264         FD_SET(it->mRTCPSocket, &rs);
    265 
    266         if (it->mRTPSocket > maxSocket) {
    267             maxSocket = it->mRTPSocket;
    268         }
    269         if (it->mRTCPSocket > maxSocket) {
    270             maxSocket = it->mRTCPSocket;
    271         }
    272     }
    273 
    274     if (maxSocket == -1) {
    275         return;
    276     }
    277 
    278     int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
    279 
    280     if (res > 0) {
    281         List<StreamInfo>::iterator it = mStreams.begin();
    282         while (it != mStreams.end()) {
    283             if ((*it).mIsInjected) {
    284                 ++it;
    285                 continue;
    286             }
    287 
    288             status_t err = OK;
    289             if (FD_ISSET(it->mRTPSocket, &rs)) {
    290                 err = receive(&*it, true);
    291             }
    292             if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
    293                 err = receive(&*it, false);
    294             }
    295 
    296             if (err == -ECONNRESET) {
    297                 // socket failure, this stream is dead, Jim.
    298 
    299                 ALOGW("failed to receive RTP/RTCP datagram.");
    300                 it = mStreams.erase(it);
    301                 continue;
    302             }
    303 
    304             ++it;
    305         }
    306     }
    307 
    308     int64_t nowUs = ALooper::GetNowUs();
    309     if (mLastReceiverReportTimeUs <= 0
    310             || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
    311         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
    312         List<StreamInfo>::iterator it = mStreams.begin();
    313         while (it != mStreams.end()) {
    314             StreamInfo *s = &*it;
    315 
    316             if (s->mIsInjected) {
    317                 ++it;
    318                 continue;
    319             }
    320 
    321             if (s->mNumRTCPPacketsReceived == 0) {
    322                 // We have never received any RTCP packets on this stream,
    323                 // we don't even know where to send a report.
    324                 ++it;
    325                 continue;
    326             }
    327 
    328             buffer->setRange(0, 0);
    329 
    330             for (size_t i = 0; i < s->mSources.size(); ++i) {
    331                 sp<ARTPSource> source = s->mSources.valueAt(i);
    332 
    333                 source->addReceiverReport(buffer);
    334 
    335                 if (mFlags & kRegularlyRequestFIR) {
    336                     source->addFIR(buffer);
    337                 }
    338             }
    339 
    340             if (buffer->size() > 0) {
    341                 ALOGV("Sending RR...");
    342 
    343                 ssize_t n;
    344                 do {
    345                     n = sendto(
    346                         s->mRTCPSocket, buffer->data(), buffer->size(), 0,
    347                         (const struct sockaddr *)&s->mRemoteRTCPAddr,
    348                         sizeof(s->mRemoteRTCPAddr));
    349                 } while (n < 0 && errno == EINTR);
    350 
    351                 if (n <= 0) {
    352                     ALOGW("failed to send RTCP receiver report (%s).",
    353                          n == 0 ? "connection gone" : strerror(errno));
    354 
    355                     it = mStreams.erase(it);
    356                     continue;
    357                 }
    358 
    359                 CHECK_EQ(n, (ssize_t)buffer->size());
    360 
    361                 mLastReceiverReportTimeUs = nowUs;
    362             }
    363 
    364             ++it;
    365         }
    366     }
    367 
    368     if (!mStreams.empty()) {
    369         postPollEvent();
    370     }
    371 }
    372 
    373 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
    374     ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
    375 
    376     CHECK(!s->mIsInjected);
    377 
    378     sp<ABuffer> buffer = new ABuffer(65536);
    379 
    380     socklen_t remoteAddrLen =
    381         (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
    382             ? sizeof(s->mRemoteRTCPAddr) : 0;
    383 
    384     ssize_t nbytes;
    385     do {
    386         nbytes = recvfrom(
    387             receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
    388             buffer->data(),
    389             buffer->capacity(),
    390             0,
    391             remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
    392             remoteAddrLen > 0 ? &remoteAddrLen : NULL);
    393     } while (nbytes < 0 && errno == EINTR);
    394 
    395     if (nbytes <= 0) {
    396         return -ECONNRESET;
    397     }
    398 
    399     buffer->setRange(0, nbytes);
    400 
    401     // ALOGI("received %d bytes.", buffer->size());
    402 
    403     status_t err;
    404     if (receiveRTP) {
    405         err = parseRTP(s, buffer);
    406     } else {
    407         err = parseRTCP(s, buffer);
    408     }
    409 
    410     return err;
    411 }
    412 
    413 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
    414     if (s->mNumRTPPacketsReceived++ == 0) {
    415         sp<AMessage> notify = s->mNotifyMsg->dup();
    416         notify->setInt32("first-rtp", true);
    417         notify->post();
    418     }
    419 
    420     size_t size = buffer->size();
    421 
    422     if (size < 12) {
    423         // Too short to be a valid RTP header.
    424         return -1;
    425     }
    426 
    427     const uint8_t *data = buffer->data();
    428 
    429     if ((data[0] >> 6) != 2) {
    430         // Unsupported version.
    431         return -1;
    432     }
    433 
    434     if (data[0] & 0x20) {
    435         // Padding present.
    436 
    437         size_t paddingLength = data[size - 1];
    438 
    439         if (paddingLength + 12 > size) {
    440             // If we removed this much padding we'd end up with something
    441             // that's too short to be a valid RTP header.
    442             return -1;
    443         }
    444 
    445         size -= paddingLength;
    446     }
    447 
    448     int numCSRCs = data[0] & 0x0f;
    449 
    450     size_t payloadOffset = 12 + 4 * numCSRCs;
    451 
    452     if (size < payloadOffset) {
    453         // Not enough data to fit the basic header and all the CSRC entries.
    454         return -1;
    455     }
    456 
    457     if (data[0] & 0x10) {
    458         // Header eXtension present.
    459 
    460         if (size < payloadOffset + 4) {
    461             // Not enough data to fit the basic header, all CSRC entries
    462             // and the first 4 bytes of the extension header.
    463 
    464             return -1;
    465         }
    466 
    467         const uint8_t *extensionData = &data[payloadOffset];
    468 
    469         size_t extensionLength =
    470             4 * (extensionData[2] << 8 | extensionData[3]);
    471 
    472         if (size < payloadOffset + 4 + extensionLength) {
    473             return -1;
    474         }
    475 
    476         payloadOffset += 4 + extensionLength;
    477     }
    478 
    479     uint32_t srcId = u32at(&data[8]);
    480 
    481     sp<ARTPSource> source = findSource(s, srcId);
    482 
    483     uint32_t rtpTime = u32at(&data[4]);
    484 
    485     sp<AMessage> meta = buffer->meta();
    486     meta->setInt32("ssrc", srcId);
    487     meta->setInt32("rtp-time", rtpTime);
    488     meta->setInt32("PT", data[1] & 0x7f);
    489     meta->setInt32("M", data[1] >> 7);
    490 
    491     buffer->setInt32Data(u16at(&data[2]));
    492     buffer->setRange(payloadOffset, size - payloadOffset);
    493 
    494     source->processRTPPacket(buffer);
    495 
    496     return OK;
    497 }
    498 
    499 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
    500     if (s->mNumRTCPPacketsReceived++ == 0) {
    501         sp<AMessage> notify = s->mNotifyMsg->dup();
    502         notify->setInt32("first-rtcp", true);
    503         notify->post();
    504     }
    505 
    506     const uint8_t *data = buffer->data();
    507     size_t size = buffer->size();
    508 
    509     while (size > 0) {
    510         if (size < 8) {
    511             // Too short to be a valid RTCP header
    512             return -1;
    513         }
    514 
    515         if ((data[0] >> 6) != 2) {
    516             // Unsupported version.
    517             return -1;
    518         }
    519 
    520         if (data[0] & 0x20) {
    521             // Padding present.
    522 
    523             size_t paddingLength = data[size - 1];
    524 
    525             if (paddingLength + 12 > size) {
    526                 // If we removed this much padding we'd end up with something
    527                 // that's too short to be a valid RTP header.
    528                 return -1;
    529             }
    530 
    531             size -= paddingLength;
    532         }
    533 
    534         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
    535 
    536         if (size < headerLength) {
    537             // Only received a partial packet?
    538             return -1;
    539         }
    540 
    541         switch (data[1]) {
    542             case 200:
    543             {
    544                 parseSR(s, data, headerLength);
    545                 break;
    546             }
    547 
    548             case 201:  // RR
    549             case 202:  // SDES
    550             case 204:  // APP
    551                 break;
    552 
    553             case 205:  // TSFB (transport layer specific feedback)
    554             case 206:  // PSFB (payload specific feedback)
    555                 // hexdump(data, headerLength);
    556                 break;
    557 
    558             case 203:
    559             {
    560                 parseBYE(s, data, headerLength);
    561                 break;
    562             }
    563 
    564             default:
    565             {
    566                 ALOGW("Unknown RTCP packet type %u of size %d",
    567                      (unsigned)data[1], headerLength);
    568                 break;
    569             }
    570         }
    571 
    572         data += headerLength;
    573         size -= headerLength;
    574     }
    575 
    576     return OK;
    577 }
    578 
    579 status_t ARTPConnection::parseBYE(
    580         StreamInfo *s, const uint8_t *data, size_t size) {
    581     size_t SC = data[0] & 0x3f;
    582 
    583     if (SC == 0 || size < (4 + SC * 4)) {
    584         // Packet too short for the minimal BYE header.
    585         return -1;
    586     }
    587 
    588     uint32_t id = u32at(&data[4]);
    589 
    590     sp<ARTPSource> source = findSource(s, id);
    591 
    592     source->byeReceived();
    593 
    594     return OK;
    595 }
    596 
    597 status_t ARTPConnection::parseSR(
    598         StreamInfo *s, const uint8_t *data, size_t size) {
    599     size_t RC = data[0] & 0x1f;
    600 
    601     if (size < (7 + RC * 6) * 4) {
    602         // Packet too short for the minimal SR header.
    603         return -1;
    604     }
    605 
    606     uint32_t id = u32at(&data[4]);
    607     uint64_t ntpTime = u64at(&data[8]);
    608     uint32_t rtpTime = u32at(&data[16]);
    609 
    610 #if 0
    611     ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
    612          id,
    613          rtpTime,
    614          (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
    615 #endif
    616 
    617     sp<ARTPSource> source = findSource(s, id);
    618 
    619     source->timeUpdate(rtpTime, ntpTime);
    620 
    621     return 0;
    622 }
    623 
    624 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
    625     sp<ARTPSource> source;
    626     ssize_t index = info->mSources.indexOfKey(srcId);
    627     if (index < 0) {
    628         index = info->mSources.size();
    629 
    630         source = new ARTPSource(
    631                 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
    632 
    633         info->mSources.add(srcId, source);
    634     } else {
    635         source = info->mSources.valueAt(index);
    636     }
    637 
    638     return source;
    639 }
    640 
    641 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
    642     sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
    643     msg->setInt32("index", index);
    644     msg->setBuffer("buffer", buffer);
    645     msg->post();
    646 }
    647 
    648 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
    649     int32_t index;
    650     CHECK(msg->findInt32("index", &index));
    651 
    652     sp<ABuffer> buffer;
    653     CHECK(msg->findBuffer("buffer", &buffer));
    654 
    655     List<StreamInfo>::iterator it = mStreams.begin();
    656     while (it != mStreams.end()
    657            && it->mRTPSocket != index && it->mRTCPSocket != index) {
    658         ++it;
    659     }
    660 
    661     if (it == mStreams.end()) {
    662         TRESPASS();
    663     }
    664 
    665     StreamInfo *s = &*it;
    666 
    667     status_t err;
    668     if (it->mRTPSocket == index) {
    669         err = parseRTP(s, buffer);
    670     } else {
    671         err = parseRTCP(s, buffer);
    672     }
    673 }
    674 
    675 }  // namespace android
    676 
    677