Home | History | Annotate | Download | only in sink
      1 /*
      2  * Copyright 2012, The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "RTPSink"
     19 #include <utils/Log.h>
     20 
     21 #include "RTPSink.h"
     22 
     23 #include "ANetworkSession.h"
     24 #include "TunnelRenderer.h"
     25 
     26 #include <media/stagefright/foundation/ABuffer.h>
     27 #include <media/stagefright/foundation/ADebug.h>
     28 #include <media/stagefright/foundation/AMessage.h>
     29 #include <media/stagefright/foundation/hexdump.h>
     30 #include <media/stagefright/MediaErrors.h>
     31 #include <media/stagefright/Utils.h>
     32 
     33 namespace android {
     34 
     35 struct RTPSink::Source : public RefBase {
     36     Source(uint16_t seq, const sp<ABuffer> &buffer,
     37            const sp<AMessage> queueBufferMsg);
     38 
     39     bool updateSeq(uint16_t seq, const sp<ABuffer> &buffer);
     40 
     41     void addReportBlock(uint32_t ssrc, const sp<ABuffer> &buf);
     42 
     43 protected:
     44     virtual ~Source();
     45 
     46 private:
     47     static const uint32_t kMinSequential = 2;
     48     static const uint32_t kMaxDropout = 3000;
     49     static const uint32_t kMaxMisorder = 100;
     50     static const uint32_t kRTPSeqMod = 1u << 16;
     51 
     52     sp<AMessage> mQueueBufferMsg;
     53 
     54     uint16_t mMaxSeq;
     55     uint32_t mCycles;
     56     uint32_t mBaseSeq;
     57     uint32_t mBadSeq;
     58     uint32_t mProbation;
     59     uint32_t mReceived;
     60     uint32_t mExpectedPrior;
     61     uint32_t mReceivedPrior;
     62 
     63     void initSeq(uint16_t seq);
     64     void queuePacket(const sp<ABuffer> &buffer);
     65 
     66     DISALLOW_EVIL_CONSTRUCTORS(Source);
     67 };
     68 
     69 ////////////////////////////////////////////////////////////////////////////////
     70 
     71 RTPSink::Source::Source(
     72         uint16_t seq, const sp<ABuffer> &buffer,
     73         const sp<AMessage> queueBufferMsg)
     74     : mQueueBufferMsg(queueBufferMsg),
     75       mProbation(kMinSequential) {
     76     initSeq(seq);
     77     mMaxSeq = seq - 1;
     78 
     79     buffer->setInt32Data(mCycles | seq);
     80     queuePacket(buffer);
     81 }
     82 
     83 RTPSink::Source::~Source() {
     84 }
     85 
     86 void RTPSink::Source::initSeq(uint16_t seq) {
     87     mMaxSeq = seq;
     88     mCycles = 0;
     89     mBaseSeq = seq;
     90     mBadSeq = kRTPSeqMod + 1;
     91     mReceived = 0;
     92     mExpectedPrior = 0;
     93     mReceivedPrior = 0;
     94 }
     95 
     96 bool RTPSink::Source::updateSeq(uint16_t seq, const sp<ABuffer> &buffer) {
     97     uint16_t udelta = seq - mMaxSeq;
     98 
     99     if (mProbation) {
    100         // Startup phase
    101 
    102         if (seq == mMaxSeq + 1) {
    103             buffer->setInt32Data(mCycles | seq);
    104             queuePacket(buffer);
    105 
    106             --mProbation;
    107             mMaxSeq = seq;
    108             if (mProbation == 0) {
    109                 initSeq(seq);
    110                 ++mReceived;
    111 
    112                 return true;
    113             }
    114         } else {
    115             // Packet out of sequence, restart startup phase
    116 
    117             mProbation = kMinSequential - 1;
    118             mMaxSeq = seq;
    119 
    120 #if 0
    121             mPackets.clear();
    122             mTotalBytesQueued = 0;
    123             ALOGI("XXX cleared packets");
    124 #endif
    125 
    126             buffer->setInt32Data(mCycles | seq);
    127             queuePacket(buffer);
    128         }
    129 
    130         return false;
    131     }
    132 
    133     if (udelta < kMaxDropout) {
    134         // In order, with permissible gap.
    135 
    136         if (seq < mMaxSeq) {
    137             // Sequence number wrapped - count another 64K cycle
    138             mCycles += kRTPSeqMod;
    139         }
    140 
    141         mMaxSeq = seq;
    142     } else if (udelta <= kRTPSeqMod - kMaxMisorder) {
    143         // The sequence number made a very large jump
    144 
    145         if (seq == mBadSeq) {
    146             // Two sequential packets -- assume that the other side
    147             // restarted without telling us so just re-sync
    148             // (i.e. pretend this was the first packet)
    149 
    150             initSeq(seq);
    151         } else {
    152             mBadSeq = (seq + 1) & (kRTPSeqMod - 1);
    153 
    154             return false;
    155         }
    156     } else {
    157         // Duplicate or reordered packet.
    158     }
    159 
    160     ++mReceived;
    161 
    162     buffer->setInt32Data(mCycles | seq);
    163     queuePacket(buffer);
    164 
    165     return true;
    166 }
    167 
    168 void RTPSink::Source::queuePacket(const sp<ABuffer> &buffer) {
    169     sp<AMessage> msg = mQueueBufferMsg->dup();
    170     msg->setBuffer("buffer", buffer);
    171     msg->post();
    172 }
    173 
    174 void RTPSink::Source::addReportBlock(
    175         uint32_t ssrc, const sp<ABuffer> &buf) {
    176     uint32_t extMaxSeq = mMaxSeq | mCycles;
    177     uint32_t expected = extMaxSeq - mBaseSeq + 1;
    178 
    179     int64_t lost = (int64_t)expected - (int64_t)mReceived;
    180     if (lost > 0x7fffff) {
    181         lost = 0x7fffff;
    182     } else if (lost < -0x800000) {
    183         lost = -0x800000;
    184     }
    185 
    186     uint32_t expectedInterval = expected - mExpectedPrior;
    187     mExpectedPrior = expected;
    188 
    189     uint32_t receivedInterval = mReceived - mReceivedPrior;
    190     mReceivedPrior = mReceived;
    191 
    192     int64_t lostInterval = expectedInterval - receivedInterval;
    193 
    194     uint8_t fractionLost;
    195     if (expectedInterval == 0 || lostInterval <=0) {
    196         fractionLost = 0;
    197     } else {
    198         fractionLost = (lostInterval << 8) / expectedInterval;
    199     }
    200 
    201     uint8_t *ptr = buf->data() + buf->size();
    202 
    203     ptr[0] = ssrc >> 24;
    204     ptr[1] = (ssrc >> 16) & 0xff;
    205     ptr[2] = (ssrc >> 8) & 0xff;
    206     ptr[3] = ssrc & 0xff;
    207 
    208     ptr[4] = fractionLost;
    209 
    210     ptr[5] = (lost >> 16) & 0xff;
    211     ptr[6] = (lost >> 8) & 0xff;
    212     ptr[7] = lost & 0xff;
    213 
    214     ptr[8] = extMaxSeq >> 24;
    215     ptr[9] = (extMaxSeq >> 16) & 0xff;
    216     ptr[10] = (extMaxSeq >> 8) & 0xff;
    217     ptr[11] = extMaxSeq & 0xff;
    218 
    219     // XXX TODO:
    220 
    221     ptr[12] = 0x00;  // interarrival jitter
    222     ptr[13] = 0x00;
    223     ptr[14] = 0x00;
    224     ptr[15] = 0x00;
    225 
    226     ptr[16] = 0x00;  // last SR
    227     ptr[17] = 0x00;
    228     ptr[18] = 0x00;
    229     ptr[19] = 0x00;
    230 
    231     ptr[20] = 0x00;  // delay since last SR
    232     ptr[21] = 0x00;
    233     ptr[22] = 0x00;
    234     ptr[23] = 0x00;
    235 }
    236 
    237 ////////////////////////////////////////////////////////////////////////////////
    238 
    239 RTPSink::RTPSink(
    240         const sp<ANetworkSession> &netSession,
    241         const sp<ISurfaceTexture> &surfaceTex)
    242     : mNetSession(netSession),
    243       mSurfaceTex(surfaceTex),
    244       mRTPPort(0),
    245       mRTPSessionID(0),
    246       mRTCPSessionID(0),
    247       mFirstArrivalTimeUs(-1ll),
    248       mNumPacketsReceived(0ll),
    249       mRegression(1000),
    250       mMaxDelayMs(-1ll) {
    251 }
    252 
    253 RTPSink::~RTPSink() {
    254     if (mRTCPSessionID != 0) {
    255         mNetSession->destroySession(mRTCPSessionID);
    256     }
    257 
    258     if (mRTPSessionID != 0) {
    259         mNetSession->destroySession(mRTPSessionID);
    260     }
    261 }
    262 
    263 status_t RTPSink::init(bool useTCPInterleaving) {
    264     if (useTCPInterleaving) {
    265         return OK;
    266     }
    267 
    268     int clientRtp;
    269 
    270     sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id());
    271     sp<AMessage> rtcpNotify = new AMessage(kWhatRTCPNotify, id());
    272     for (clientRtp = 15550;; clientRtp += 2) {
    273         int32_t rtpSession;
    274         status_t err = mNetSession->createUDPSession(
    275                     clientRtp, rtpNotify, &rtpSession);
    276 
    277         if (err != OK) {
    278             ALOGI("failed to create RTP socket on port %d", clientRtp);
    279             continue;
    280         }
    281 
    282         int32_t rtcpSession;
    283         err = mNetSession->createUDPSession(
    284                 clientRtp + 1, rtcpNotify, &rtcpSession);
    285 
    286         if (err == OK) {
    287             mRTPPort = clientRtp;
    288             mRTPSessionID = rtpSession;
    289             mRTCPSessionID = rtcpSession;
    290             break;
    291         }
    292 
    293         ALOGI("failed to create RTCP socket on port %d", clientRtp + 1);
    294         mNetSession->destroySession(rtpSession);
    295     }
    296 
    297     if (mRTPPort == 0) {
    298         return UNKNOWN_ERROR;
    299     }
    300 
    301     return OK;
    302 }
    303 
    304 int32_t RTPSink::getRTPPort() const {
    305     return mRTPPort;
    306 }
    307 
    308 void RTPSink::onMessageReceived(const sp<AMessage> &msg) {
    309     switch (msg->what()) {
    310         case kWhatRTPNotify:
    311         case kWhatRTCPNotify:
    312         {
    313             int32_t reason;
    314             CHECK(msg->findInt32("reason", &reason));
    315 
    316             switch (reason) {
    317                 case ANetworkSession::kWhatError:
    318                 {
    319                     int32_t sessionID;
    320                     CHECK(msg->findInt32("sessionID", &sessionID));
    321 
    322                     int32_t err;
    323                     CHECK(msg->findInt32("err", &err));
    324 
    325                     AString detail;
    326                     CHECK(msg->findString("detail", &detail));
    327 
    328                     ALOGE("An error occurred in session %d (%d, '%s/%s').",
    329                           sessionID,
    330                           err,
    331                           detail.c_str(),
    332                           strerror(-err));
    333 
    334                     mNetSession->destroySession(sessionID);
    335 
    336                     if (sessionID == mRTPSessionID) {
    337                         mRTPSessionID = 0;
    338                     } else if (sessionID == mRTCPSessionID) {
    339                         mRTCPSessionID = 0;
    340                     }
    341                     break;
    342                 }
    343 
    344                 case ANetworkSession::kWhatDatagram:
    345                 {
    346                     int32_t sessionID;
    347                     CHECK(msg->findInt32("sessionID", &sessionID));
    348 
    349                     sp<ABuffer> data;
    350                     CHECK(msg->findBuffer("data", &data));
    351 
    352                     status_t err;
    353                     if (msg->what() == kWhatRTPNotify) {
    354                         err = parseRTP(data);
    355                     } else {
    356                         err = parseRTCP(data);
    357                     }
    358                     break;
    359                 }
    360 
    361                 default:
    362                     TRESPASS();
    363             }
    364             break;
    365         }
    366 
    367         case kWhatSendRR:
    368         {
    369             onSendRR();
    370             break;
    371         }
    372 
    373         case kWhatPacketLost:
    374         {
    375             onPacketLost(msg);
    376             break;
    377         }
    378 
    379         case kWhatInject:
    380         {
    381             int32_t isRTP;
    382             CHECK(msg->findInt32("isRTP", &isRTP));
    383 
    384             sp<ABuffer> buffer;
    385             CHECK(msg->findBuffer("buffer", &buffer));
    386 
    387             status_t err;
    388             if (isRTP) {
    389                 err = parseRTP(buffer);
    390             } else {
    391                 err = parseRTCP(buffer);
    392             }
    393             break;
    394         }
    395 
    396         default:
    397             TRESPASS();
    398     }
    399 }
    400 
    401 status_t RTPSink::injectPacket(bool isRTP, const sp<ABuffer> &buffer) {
    402     sp<AMessage> msg = new AMessage(kWhatInject, id());
    403     msg->setInt32("isRTP", isRTP);
    404     msg->setBuffer("buffer", buffer);
    405     msg->post();
    406 
    407     return OK;
    408 }
    409 
    410 status_t RTPSink::parseRTP(const sp<ABuffer> &buffer) {
    411     size_t size = buffer->size();
    412     if (size < 12) {
    413         // Too short to be a valid RTP header.
    414         return ERROR_MALFORMED;
    415     }
    416 
    417     const uint8_t *data = buffer->data();
    418 
    419     if ((data[0] >> 6) != 2) {
    420         // Unsupported version.
    421         return ERROR_UNSUPPORTED;
    422     }
    423 
    424     if (data[0] & 0x20) {
    425         // Padding present.
    426 
    427         size_t paddingLength = data[size - 1];
    428 
    429         if (paddingLength + 12 > size) {
    430             // If we removed this much padding we'd end up with something
    431             // that's too short to be a valid RTP header.
    432             return ERROR_MALFORMED;
    433         }
    434 
    435         size -= paddingLength;
    436     }
    437 
    438     int numCSRCs = data[0] & 0x0f;
    439 
    440     size_t payloadOffset = 12 + 4 * numCSRCs;
    441 
    442     if (size < payloadOffset) {
    443         // Not enough data to fit the basic header and all the CSRC entries.
    444         return ERROR_MALFORMED;
    445     }
    446 
    447     if (data[0] & 0x10) {
    448         // Header eXtension present.
    449 
    450         if (size < payloadOffset + 4) {
    451             // Not enough data to fit the basic header, all CSRC entries
    452             // and the first 4 bytes of the extension header.
    453 
    454             return ERROR_MALFORMED;
    455         }
    456 
    457         const uint8_t *extensionData = &data[payloadOffset];
    458 
    459         size_t extensionLength =
    460             4 * (extensionData[2] << 8 | extensionData[3]);
    461 
    462         if (size < payloadOffset + 4 + extensionLength) {
    463             return ERROR_MALFORMED;
    464         }
    465 
    466         payloadOffset += 4 + extensionLength;
    467     }
    468 
    469     uint32_t srcId = U32_AT(&data[8]);
    470     uint32_t rtpTime = U32_AT(&data[4]);
    471     uint16_t seqNo = U16_AT(&data[2]);
    472 
    473     int64_t arrivalTimeUs;
    474     CHECK(buffer->meta()->findInt64("arrivalTimeUs", &arrivalTimeUs));
    475 
    476     if (mFirstArrivalTimeUs < 0ll) {
    477         mFirstArrivalTimeUs = arrivalTimeUs;
    478     }
    479     arrivalTimeUs -= mFirstArrivalTimeUs;
    480 
    481     int64_t arrivalTimeMedia = (arrivalTimeUs * 9ll) / 100ll;
    482 
    483     ALOGV("seqNo: %d, SSRC 0x%08x, diff %lld",
    484             seqNo, srcId, rtpTime - arrivalTimeMedia);
    485 
    486     mRegression.addPoint((float)rtpTime, (float)arrivalTimeMedia);
    487 
    488     ++mNumPacketsReceived;
    489 
    490     float n1, n2, b;
    491     if (mRegression.approxLine(&n1, &n2, &b)) {
    492         ALOGV("Line %lld: %.2f %.2f %.2f, slope %.2f",
    493               mNumPacketsReceived, n1, n2, b, -n1 / n2);
    494 
    495         float expectedArrivalTimeMedia = (b - n1 * (float)rtpTime) / n2;
    496         float latenessMs = (arrivalTimeMedia - expectedArrivalTimeMedia) / 90.0;
    497 
    498         if (mMaxDelayMs < 0ll || latenessMs > mMaxDelayMs) {
    499             mMaxDelayMs = latenessMs;
    500             ALOGI("packet was %.2f ms late", latenessMs);
    501         }
    502     }
    503 
    504     sp<AMessage> meta = buffer->meta();
    505     meta->setInt32("ssrc", srcId);
    506     meta->setInt32("rtp-time", rtpTime);
    507     meta->setInt32("PT", data[1] & 0x7f);
    508     meta->setInt32("M", data[1] >> 7);
    509 
    510     buffer->setRange(payloadOffset, size - payloadOffset);
    511 
    512     ssize_t index = mSources.indexOfKey(srcId);
    513     if (index < 0) {
    514         if (mRenderer == NULL) {
    515             sp<AMessage> notifyLost = new AMessage(kWhatPacketLost, id());
    516             notifyLost->setInt32("ssrc", srcId);
    517 
    518             mRenderer = new TunnelRenderer(notifyLost, mSurfaceTex);
    519             looper()->registerHandler(mRenderer);
    520         }
    521 
    522         sp<AMessage> queueBufferMsg =
    523             new AMessage(TunnelRenderer::kWhatQueueBuffer, mRenderer->id());
    524 
    525         sp<Source> source = new Source(seqNo, buffer, queueBufferMsg);
    526         mSources.add(srcId, source);
    527     } else {
    528         mSources.valueAt(index)->updateSeq(seqNo, buffer);
    529     }
    530 
    531     return OK;
    532 }
    533 
    534 status_t RTPSink::parseRTCP(const sp<ABuffer> &buffer) {
    535     const uint8_t *data = buffer->data();
    536     size_t size = buffer->size();
    537 
    538     while (size > 0) {
    539         if (size < 8) {
    540             // Too short to be a valid RTCP header
    541             return ERROR_MALFORMED;
    542         }
    543 
    544         if ((data[0] >> 6) != 2) {
    545             // Unsupported version.
    546             return ERROR_UNSUPPORTED;
    547         }
    548 
    549         if (data[0] & 0x20) {
    550             // Padding present.
    551 
    552             size_t paddingLength = data[size - 1];
    553 
    554             if (paddingLength + 12 > size) {
    555                 // If we removed this much padding we'd end up with something
    556                 // that's too short to be a valid RTP header.
    557                 return ERROR_MALFORMED;
    558             }
    559 
    560             size -= paddingLength;
    561         }
    562 
    563         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
    564 
    565         if (size < headerLength) {
    566             // Only received a partial packet?
    567             return ERROR_MALFORMED;
    568         }
    569 
    570         switch (data[1]) {
    571             case 200:
    572             {
    573                 parseSR(data, headerLength);
    574                 break;
    575             }
    576 
    577             case 201:  // RR
    578             case 202:  // SDES
    579             case 204:  // APP
    580                 break;
    581 
    582             case 205:  // TSFB (transport layer specific feedback)
    583             case 206:  // PSFB (payload specific feedback)
    584                 // hexdump(data, headerLength);
    585                 break;
    586 
    587             case 203:
    588             {
    589                 parseBYE(data, headerLength);
    590                 break;
    591             }
    592 
    593             default:
    594             {
    595                 ALOGW("Unknown RTCP packet type %u of size %d",
    596                      (unsigned)data[1], headerLength);
    597                 break;
    598             }
    599         }
    600 
    601         data += headerLength;
    602         size -= headerLength;
    603     }
    604 
    605     return OK;
    606 }
    607 
    608 status_t RTPSink::parseBYE(const uint8_t *data, size_t size) {
    609     size_t SC = data[0] & 0x3f;
    610 
    611     if (SC == 0 || size < (4 + SC * 4)) {
    612         // Packet too short for the minimal BYE header.
    613         return ERROR_MALFORMED;
    614     }
    615 
    616     uint32_t id = U32_AT(&data[4]);
    617 
    618     return OK;
    619 }
    620 
    621 status_t RTPSink::parseSR(const uint8_t *data, size_t size) {
    622     size_t RC = data[0] & 0x1f;
    623 
    624     if (size < (7 + RC * 6) * 4) {
    625         // Packet too short for the minimal SR header.
    626         return ERROR_MALFORMED;
    627     }
    628 
    629     uint32_t id = U32_AT(&data[4]);
    630     uint64_t ntpTime = U64_AT(&data[8]);
    631     uint32_t rtpTime = U32_AT(&data[16]);
    632 
    633     ALOGV("SR: ssrc 0x%08x, ntpTime 0x%016llx, rtpTime 0x%08x",
    634           id, ntpTime, rtpTime);
    635 
    636     return OK;
    637 }
    638 
    639 status_t RTPSink::connect(
    640         const char *host, int32_t remoteRtpPort, int32_t remoteRtcpPort) {
    641     ALOGI("connecting RTP/RTCP sockets to %s:{%d,%d}",
    642           host, remoteRtpPort, remoteRtcpPort);
    643 
    644     status_t err =
    645         mNetSession->connectUDPSession(mRTPSessionID, host, remoteRtpPort);
    646 
    647     if (err != OK) {
    648         return err;
    649     }
    650 
    651     err = mNetSession->connectUDPSession(mRTCPSessionID, host, remoteRtcpPort);
    652 
    653     if (err != OK) {
    654         return err;
    655     }
    656 
    657 #if 0
    658     sp<ABuffer> buf = new ABuffer(1500);
    659     memset(buf->data(), 0, buf->size());
    660 
    661     mNetSession->sendRequest(
    662             mRTPSessionID, buf->data(), buf->size());
    663 
    664     mNetSession->sendRequest(
    665             mRTCPSessionID, buf->data(), buf->size());
    666 #endif
    667 
    668     scheduleSendRR();
    669 
    670     return OK;
    671 }
    672 
    673 void RTPSink::scheduleSendRR() {
    674     (new AMessage(kWhatSendRR, id()))->post(2000000ll);
    675 }
    676 
    677 void RTPSink::addSDES(const sp<ABuffer> &buffer) {
    678     uint8_t *data = buffer->data() + buffer->size();
    679     data[0] = 0x80 | 1;
    680     data[1] = 202;  // SDES
    681     data[4] = 0xde;  // SSRC
    682     data[5] = 0xad;
    683     data[6] = 0xbe;
    684     data[7] = 0xef;
    685 
    686     size_t offset = 8;
    687 
    688     data[offset++] = 1;  // CNAME
    689 
    690     AString cname = "stagefright@somewhere";
    691     data[offset++] = cname.size();
    692 
    693     memcpy(&data[offset], cname.c_str(), cname.size());
    694     offset += cname.size();
    695 
    696     data[offset++] = 6;  // TOOL
    697 
    698     AString tool = "stagefright/1.0";
    699     data[offset++] = tool.size();
    700 
    701     memcpy(&data[offset], tool.c_str(), tool.size());
    702     offset += tool.size();
    703 
    704     data[offset++] = 0;
    705 
    706     if ((offset % 4) > 0) {
    707         size_t count = 4 - (offset % 4);
    708         switch (count) {
    709             case 3:
    710                 data[offset++] = 0;
    711             case 2:
    712                 data[offset++] = 0;
    713             case 1:
    714                 data[offset++] = 0;
    715         }
    716     }
    717 
    718     size_t numWords = (offset / 4) - 1;
    719     data[2] = numWords >> 8;
    720     data[3] = numWords & 0xff;
    721 
    722     buffer->setRange(buffer->offset(), buffer->size() + offset);
    723 }
    724 
    725 void RTPSink::onSendRR() {
    726     sp<ABuffer> buf = new ABuffer(1500);
    727     buf->setRange(0, 0);
    728 
    729     uint8_t *ptr = buf->data();
    730     ptr[0] = 0x80 | 0;
    731     ptr[1] = 201;  // RR
    732     ptr[2] = 0;
    733     ptr[3] = 1;
    734     ptr[4] = 0xde;  // SSRC
    735     ptr[5] = 0xad;
    736     ptr[6] = 0xbe;
    737     ptr[7] = 0xef;
    738 
    739     buf->setRange(0, 8);
    740 
    741     size_t numReportBlocks = 0;
    742     for (size_t i = 0; i < mSources.size(); ++i) {
    743         uint32_t ssrc = mSources.keyAt(i);
    744         sp<Source> source = mSources.valueAt(i);
    745 
    746         if (numReportBlocks > 31 || buf->size() + 24 > buf->capacity()) {
    747             // Cannot fit another report block.
    748             break;
    749         }
    750 
    751         source->addReportBlock(ssrc, buf);
    752         ++numReportBlocks;
    753     }
    754 
    755     ptr[0] |= numReportBlocks;  // 5 bit
    756 
    757     size_t sizeInWordsMinus1 = 1 + 6 * numReportBlocks;
    758     ptr[2] = sizeInWordsMinus1 >> 8;
    759     ptr[3] = sizeInWordsMinus1 & 0xff;
    760 
    761     buf->setRange(0, (sizeInWordsMinus1 + 1) * 4);
    762 
    763     addSDES(buf);
    764 
    765     mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size());
    766 
    767     scheduleSendRR();
    768 }
    769 
    770 void RTPSink::onPacketLost(const sp<AMessage> &msg) {
    771     uint32_t srcId;
    772     CHECK(msg->findInt32("ssrc", (int32_t *)&srcId));
    773 
    774     int32_t seqNo;
    775     CHECK(msg->findInt32("seqNo", &seqNo));
    776 
    777     int32_t blp = 0;
    778 
    779     sp<ABuffer> buf = new ABuffer(1500);
    780     buf->setRange(0, 0);
    781 
    782     uint8_t *ptr = buf->data();
    783     ptr[0] = 0x80 | 1;  // generic NACK
    784     ptr[1] = 205;  // RTPFB
    785     ptr[2] = 0;
    786     ptr[3] = 3;
    787     ptr[4] = 0xde;  // sender SSRC
    788     ptr[5] = 0xad;
    789     ptr[6] = 0xbe;
    790     ptr[7] = 0xef;
    791     ptr[8] = (srcId >> 24) & 0xff;
    792     ptr[9] = (srcId >> 16) & 0xff;
    793     ptr[10] = (srcId >> 8) & 0xff;
    794     ptr[11] = (srcId & 0xff);
    795     ptr[12] = (seqNo >> 8) & 0xff;
    796     ptr[13] = (seqNo & 0xff);
    797     ptr[14] = (blp >> 8) & 0xff;
    798     ptr[15] = (blp & 0xff);
    799 
    800     buf->setRange(0, 16);
    801 
    802     mNetSession->sendRequest(mRTCPSessionID, buf->data(), buf->size());
    803 }
    804 
    805 }  // namespace android
    806 
    807