Home | History | Annotate | Download | only in rtp
      1 /*
      2  * Copyright 2013, The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "RTPSender"
     19 #include <utils/Log.h>
     20 
     21 #include "RTPSender.h"
     22 
     23 #include <media/stagefright/foundation/ABuffer.h>
     24 #include <media/stagefright/foundation/ADebug.h>
     25 #include <media/stagefright/foundation/AMessage.h>
     26 #include <media/stagefright/foundation/ANetworkSession.h>
     27 #include <media/stagefright/foundation/hexdump.h>
     28 #include <media/stagefright/MediaErrors.h>
     29 #include <media/stagefright/Utils.h>
     30 
     31 #include "include/avc_utils.h"
     32 
     33 namespace android {
     34 
     35 RTPSender::RTPSender(
     36         const sp<ANetworkSession> &netSession,
     37         const sp<AMessage> &notify)
     38     : mNetSession(netSession),
     39       mNotify(notify),
     40       mRTPMode(TRANSPORT_UNDEFINED),
     41       mRTCPMode(TRANSPORT_UNDEFINED),
     42       mRTPSessionID(0),
     43       mRTCPSessionID(0),
     44       mRTPConnected(false),
     45       mRTCPConnected(false),
     46       mLastNTPTime(0),
     47       mLastRTPTime(0),
     48       mNumRTPSent(0),
     49       mNumRTPOctetsSent(0),
     50       mNumSRsSent(0),
     51       mRTPSeqNo(0),
     52       mHistorySize(0) {
     53 }
     54 
     55 RTPSender::~RTPSender() {
     56     if (mRTCPSessionID != 0) {
     57         mNetSession->destroySession(mRTCPSessionID);
     58         mRTCPSessionID = 0;
     59     }
     60 
     61     if (mRTPSessionID != 0) {
     62         mNetSession->destroySession(mRTPSessionID);
     63         mRTPSessionID = 0;
     64     }
     65 }
     66 
     67 // static
     68 int32_t RTPBase::PickRandomRTPPort() {
     69     // Pick an even integer in range [1024, 65534)
     70 
     71     static const size_t kRange = (65534 - 1024) / 2;
     72 
     73     return (int32_t)(((float)(kRange + 1) * rand()) / RAND_MAX) * 2 + 1024;
     74 }
     75 
     76 status_t RTPSender::initAsync(
     77         const char *remoteHost,
     78         int32_t remoteRTPPort,
     79         TransportMode rtpMode,
     80         int32_t remoteRTCPPort,
     81         TransportMode rtcpMode,
     82         int32_t *outLocalRTPPort) {
     83     if (mRTPMode != TRANSPORT_UNDEFINED
     84             || rtpMode == TRANSPORT_UNDEFINED
     85             || rtpMode == TRANSPORT_NONE
     86             || rtcpMode == TRANSPORT_UNDEFINED) {
     87         return INVALID_OPERATION;
     88     }
     89 
     90     CHECK_NE(rtpMode, TRANSPORT_TCP_INTERLEAVED);
     91     CHECK_NE(rtcpMode, TRANSPORT_TCP_INTERLEAVED);
     92 
     93     if ((rtcpMode == TRANSPORT_NONE && remoteRTCPPort >= 0)
     94             || (rtcpMode != TRANSPORT_NONE && remoteRTCPPort < 0)) {
     95         return INVALID_OPERATION;
     96     }
     97 
     98     sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id());
     99 
    100     sp<AMessage> rtcpNotify;
    101     if (remoteRTCPPort >= 0) {
    102         rtcpNotify = new AMessage(kWhatRTCPNotify, id());
    103     }
    104 
    105     CHECK_EQ(mRTPSessionID, 0);
    106     CHECK_EQ(mRTCPSessionID, 0);
    107 
    108     int32_t localRTPPort;
    109 
    110     for (;;) {
    111         localRTPPort = PickRandomRTPPort();
    112 
    113         status_t err;
    114         if (rtpMode == TRANSPORT_UDP) {
    115             err = mNetSession->createUDPSession(
    116                     localRTPPort,
    117                     remoteHost,
    118                     remoteRTPPort,
    119                     rtpNotify,
    120                     &mRTPSessionID);
    121         } else {
    122             CHECK_EQ(rtpMode, TRANSPORT_TCP);
    123             err = mNetSession->createTCPDatagramSession(
    124                     localRTPPort,
    125                     remoteHost,
    126                     remoteRTPPort,
    127                     rtpNotify,
    128                     &mRTPSessionID);
    129         }
    130 
    131         if (err != OK) {
    132             continue;
    133         }
    134 
    135         if (remoteRTCPPort < 0) {
    136             break;
    137         }
    138 
    139         if (rtcpMode == TRANSPORT_UDP) {
    140             err = mNetSession->createUDPSession(
    141                     localRTPPort + 1,
    142                     remoteHost,
    143                     remoteRTCPPort,
    144                     rtcpNotify,
    145                     &mRTCPSessionID);
    146         } else {
    147             CHECK_EQ(rtcpMode, TRANSPORT_TCP);
    148             err = mNetSession->createTCPDatagramSession(
    149                     localRTPPort + 1,
    150                     remoteHost,
    151                     remoteRTCPPort,
    152                     rtcpNotify,
    153                     &mRTCPSessionID);
    154         }
    155 
    156         if (err == OK) {
    157             break;
    158         }
    159 
    160         mNetSession->destroySession(mRTPSessionID);
    161         mRTPSessionID = 0;
    162     }
    163 
    164     if (rtpMode == TRANSPORT_UDP) {
    165         mRTPConnected = true;
    166     }
    167 
    168     if (rtcpMode == TRANSPORT_UDP) {
    169         mRTCPConnected = true;
    170     }
    171 
    172     mRTPMode = rtpMode;
    173     mRTCPMode = rtcpMode;
    174     *outLocalRTPPort = localRTPPort;
    175 
    176     if (mRTPMode == TRANSPORT_UDP
    177             && (mRTCPMode == TRANSPORT_UDP || mRTCPMode == TRANSPORT_NONE)) {
    178         notifyInitDone(OK);
    179     }
    180 
    181     return OK;
    182 }
    183 
    184 status_t RTPSender::queueBuffer(
    185         const sp<ABuffer> &buffer, uint8_t packetType, PacketizationMode mode) {
    186     status_t err;
    187 
    188     switch (mode) {
    189         case PACKETIZATION_NONE:
    190             err = queueRawPacket(buffer, packetType);
    191             break;
    192 
    193         case PACKETIZATION_TRANSPORT_STREAM:
    194             err = queueTSPackets(buffer, packetType);
    195             break;
    196 
    197         case PACKETIZATION_H264:
    198             err  = queueAVCBuffer(buffer, packetType);
    199             break;
    200 
    201         default:
    202             TRESPASS();
    203     }
    204 
    205     return err;
    206 }
    207 
    208 status_t RTPSender::queueRawPacket(
    209         const sp<ABuffer> &packet, uint8_t packetType) {
    210     CHECK_LE(packet->size(), kMaxUDPPacketSize - 12);
    211 
    212     int64_t timeUs;
    213     CHECK(packet->meta()->findInt64("timeUs", &timeUs));
    214 
    215     sp<ABuffer> udpPacket = new ABuffer(12 + packet->size());
    216 
    217     udpPacket->setInt32Data(mRTPSeqNo);
    218 
    219     uint8_t *rtp = udpPacket->data();
    220     rtp[0] = 0x80;
    221     rtp[1] = packetType;
    222 
    223     rtp[2] = (mRTPSeqNo >> 8) & 0xff;
    224     rtp[3] = mRTPSeqNo & 0xff;
    225     ++mRTPSeqNo;
    226 
    227     uint32_t rtpTime = (timeUs * 9) / 100ll;
    228 
    229     rtp[4] = rtpTime >> 24;
    230     rtp[5] = (rtpTime >> 16) & 0xff;
    231     rtp[6] = (rtpTime >> 8) & 0xff;
    232     rtp[7] = rtpTime & 0xff;
    233 
    234     rtp[8] = kSourceID >> 24;
    235     rtp[9] = (kSourceID >> 16) & 0xff;
    236     rtp[10] = (kSourceID >> 8) & 0xff;
    237     rtp[11] = kSourceID & 0xff;
    238 
    239     memcpy(&rtp[12], packet->data(), packet->size());
    240 
    241     return sendRTPPacket(
    242             udpPacket,
    243             true /* storeInHistory */,
    244             true /* timeValid */,
    245             ALooper::GetNowUs());
    246 }
    247 
    248 status_t RTPSender::queueTSPackets(
    249         const sp<ABuffer> &tsPackets, uint8_t packetType) {
    250     CHECK_EQ(0, tsPackets->size() % 188);
    251 
    252     int64_t timeUs;
    253     CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs));
    254 
    255     const size_t numTSPackets = tsPackets->size() / 188;
    256 
    257     size_t srcOffset = 0;
    258     while (srcOffset < tsPackets->size()) {
    259         sp<ABuffer> udpPacket =
    260             new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188);
    261 
    262         udpPacket->setInt32Data(mRTPSeqNo);
    263 
    264         uint8_t *rtp = udpPacket->data();
    265         rtp[0] = 0x80;
    266         rtp[1] = packetType;
    267 
    268         rtp[2] = (mRTPSeqNo >> 8) & 0xff;
    269         rtp[3] = mRTPSeqNo & 0xff;
    270         ++mRTPSeqNo;
    271 
    272         int64_t nowUs = ALooper::GetNowUs();
    273         uint32_t rtpTime = (nowUs * 9) / 100ll;
    274 
    275         rtp[4] = rtpTime >> 24;
    276         rtp[5] = (rtpTime >> 16) & 0xff;
    277         rtp[6] = (rtpTime >> 8) & 0xff;
    278         rtp[7] = rtpTime & 0xff;
    279 
    280         rtp[8] = kSourceID >> 24;
    281         rtp[9] = (kSourceID >> 16) & 0xff;
    282         rtp[10] = (kSourceID >> 8) & 0xff;
    283         rtp[11] = kSourceID & 0xff;
    284 
    285         size_t numTSPackets = (tsPackets->size() - srcOffset) / 188;
    286         if (numTSPackets > kMaxNumTSPacketsPerRTPPacket) {
    287             numTSPackets = kMaxNumTSPacketsPerRTPPacket;
    288         }
    289 
    290         memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188);
    291 
    292         udpPacket->setRange(0, 12 + numTSPackets * 188);
    293 
    294         srcOffset += numTSPackets * 188;
    295         bool isLastPacket = (srcOffset == tsPackets->size());
    296 
    297         status_t err = sendRTPPacket(
    298                 udpPacket,
    299                 true /* storeInHistory */,
    300                 isLastPacket /* timeValid */,
    301                 timeUs);
    302 
    303         if (err != OK) {
    304             return err;
    305         }
    306     }
    307 
    308     return OK;
    309 }
    310 
    311 status_t RTPSender::queueAVCBuffer(
    312         const sp<ABuffer> &accessUnit, uint8_t packetType) {
    313     int64_t timeUs;
    314     CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
    315 
    316     uint32_t rtpTime = (timeUs * 9 / 100ll);
    317 
    318     List<sp<ABuffer> > packets;
    319 
    320     sp<ABuffer> out = new ABuffer(kMaxUDPPacketSize);
    321     size_t outBytesUsed = 12;  // Placeholder for RTP header.
    322 
    323     const uint8_t *data = accessUnit->data();
    324     size_t size = accessUnit->size();
    325     const uint8_t *nalStart;
    326     size_t nalSize;
    327     while (getNextNALUnit(
    328                 &data, &size, &nalStart, &nalSize,
    329                 true /* startCodeFollows */) == OK) {
    330         size_t bytesNeeded = nalSize + 2;
    331         if (outBytesUsed == 12) {
    332             ++bytesNeeded;
    333         }
    334 
    335         if (outBytesUsed + bytesNeeded > out->capacity()) {
    336             bool emitSingleNALPacket = false;
    337 
    338             if (outBytesUsed == 12
    339                     && outBytesUsed + nalSize <= out->capacity()) {
    340                 // We haven't emitted anything into the current packet yet and
    341                 // this NAL unit fits into a single-NAL-unit-packet while
    342                 // it wouldn't have fit as part of a STAP-A packet.
    343 
    344                 memcpy(out->data() + outBytesUsed, nalStart, nalSize);
    345                 outBytesUsed += nalSize;
    346 
    347                 emitSingleNALPacket = true;
    348             }
    349 
    350             if (outBytesUsed > 12) {
    351                 out->setRange(0, outBytesUsed);
    352                 packets.push_back(out);
    353                 out = new ABuffer(kMaxUDPPacketSize);
    354                 outBytesUsed = 12;  // Placeholder for RTP header
    355             }
    356 
    357             if (emitSingleNALPacket) {
    358                 continue;
    359             }
    360         }
    361 
    362         if (outBytesUsed + bytesNeeded <= out->capacity()) {
    363             uint8_t *dst = out->data() + outBytesUsed;
    364 
    365             if (outBytesUsed == 12) {
    366                 *dst++ = 24;  // STAP-A header
    367             }
    368 
    369             *dst++ = (nalSize >> 8) & 0xff;
    370             *dst++ = nalSize & 0xff;
    371             memcpy(dst, nalStart, nalSize);
    372 
    373             outBytesUsed += bytesNeeded;
    374             continue;
    375         }
    376 
    377         // This single NAL unit does not fit into a single RTP packet,
    378         // we need to emit an FU-A.
    379 
    380         CHECK_EQ(outBytesUsed, 12u);
    381 
    382         uint8_t nalType = nalStart[0] & 0x1f;
    383         uint8_t nri = (nalStart[0] >> 5) & 3;
    384 
    385         size_t srcOffset = 1;
    386         while (srcOffset < nalSize) {
    387             size_t copy = out->capacity() - outBytesUsed - 2;
    388             if (copy > nalSize - srcOffset) {
    389                 copy = nalSize - srcOffset;
    390             }
    391 
    392             uint8_t *dst = out->data() + outBytesUsed;
    393             dst[0] = (nri << 5) | 28;
    394 
    395             dst[1] = nalType;
    396 
    397             if (srcOffset == 1) {
    398                 dst[1] |= 0x80;
    399             }
    400 
    401             if (srcOffset + copy == nalSize) {
    402                 dst[1] |= 0x40;
    403             }
    404 
    405             memcpy(&dst[2], nalStart + srcOffset, copy);
    406             srcOffset += copy;
    407 
    408             out->setRange(0, outBytesUsed + copy + 2);
    409 
    410             packets.push_back(out);
    411             out = new ABuffer(kMaxUDPPacketSize);
    412             outBytesUsed = 12;  // Placeholder for RTP header
    413         }
    414     }
    415 
    416     if (outBytesUsed > 12) {
    417         out->setRange(0, outBytesUsed);
    418         packets.push_back(out);
    419     }
    420 
    421     while (!packets.empty()) {
    422         sp<ABuffer> out = *packets.begin();
    423         packets.erase(packets.begin());
    424 
    425         out->setInt32Data(mRTPSeqNo);
    426 
    427         bool last = packets.empty();
    428 
    429         uint8_t *dst = out->data();
    430 
    431         dst[0] = 0x80;
    432 
    433         dst[1] = packetType;
    434         if (last) {
    435             dst[1] |= 1 << 7;  // M-bit
    436         }
    437 
    438         dst[2] = (mRTPSeqNo >> 8) & 0xff;
    439         dst[3] = mRTPSeqNo & 0xff;
    440         ++mRTPSeqNo;
    441 
    442         dst[4] = rtpTime >> 24;
    443         dst[5] = (rtpTime >> 16) & 0xff;
    444         dst[6] = (rtpTime >> 8) & 0xff;
    445         dst[7] = rtpTime & 0xff;
    446         dst[8] = kSourceID >> 24;
    447         dst[9] = (kSourceID >> 16) & 0xff;
    448         dst[10] = (kSourceID >> 8) & 0xff;
    449         dst[11] = kSourceID & 0xff;
    450 
    451         status_t err = sendRTPPacket(out, true /* storeInHistory */);
    452 
    453         if (err != OK) {
    454             return err;
    455         }
    456     }
    457 
    458     return OK;
    459 }
    460 
    461 status_t RTPSender::sendRTPPacket(
    462         const sp<ABuffer> &buffer, bool storeInHistory,
    463         bool timeValid, int64_t timeUs) {
    464     CHECK(mRTPConnected);
    465 
    466     status_t err = mNetSession->sendRequest(
    467             mRTPSessionID, buffer->data(), buffer->size(),
    468             timeValid, timeUs);
    469 
    470     if (err != OK) {
    471         return err;
    472     }
    473 
    474     mLastNTPTime = GetNowNTP();
    475     mLastRTPTime = U32_AT(buffer->data() + 4);
    476 
    477     ++mNumRTPSent;
    478     mNumRTPOctetsSent += buffer->size() - 12;
    479 
    480     if (storeInHistory) {
    481         if (mHistorySize == kMaxHistorySize) {
    482             mHistory.erase(mHistory.begin());
    483         } else {
    484             ++mHistorySize;
    485         }
    486         mHistory.push_back(buffer);
    487     }
    488 
    489     return OK;
    490 }
    491 
    492 // static
    493 uint64_t RTPSender::GetNowNTP() {
    494     struct timeval tv;
    495     gettimeofday(&tv, NULL /* timezone */);
    496 
    497     uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec;
    498 
    499     nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll;
    500 
    501     uint64_t hi = nowUs / 1000000ll;
    502     uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll;
    503 
    504     return (hi << 32) | lo;
    505 }
    506 
    507 void RTPSender::onMessageReceived(const sp<AMessage> &msg) {
    508     switch (msg->what()) {
    509         case kWhatRTPNotify:
    510         case kWhatRTCPNotify:
    511             onNetNotify(msg->what() == kWhatRTPNotify, msg);
    512             break;
    513 
    514         default:
    515             TRESPASS();
    516     }
    517 }
    518 
    519 void RTPSender::onNetNotify(bool isRTP, const sp<AMessage> &msg) {
    520     int32_t reason;
    521     CHECK(msg->findInt32("reason", &reason));
    522 
    523     switch (reason) {
    524         case ANetworkSession::kWhatError:
    525         {
    526             int32_t sessionID;
    527             CHECK(msg->findInt32("sessionID", &sessionID));
    528 
    529             int32_t err;
    530             CHECK(msg->findInt32("err", &err));
    531 
    532             int32_t errorOccuredDuringSend;
    533             CHECK(msg->findInt32("send", &errorOccuredDuringSend));
    534 
    535             AString detail;
    536             CHECK(msg->findString("detail", &detail));
    537 
    538             ALOGE("An error occurred during %s in session %d "
    539                   "(%d, '%s' (%s)).",
    540                   errorOccuredDuringSend ? "send" : "receive",
    541                   sessionID,
    542                   err,
    543                   detail.c_str(),
    544                   strerror(-err));
    545 
    546             mNetSession->destroySession(sessionID);
    547 
    548             if (sessionID == mRTPSessionID) {
    549                 mRTPSessionID = 0;
    550             } else if (sessionID == mRTCPSessionID) {
    551                 mRTCPSessionID = 0;
    552             }
    553 
    554             if (!mRTPConnected
    555                     || (mRTPMode != TRANSPORT_NONE && !mRTCPConnected)) {
    556                 // We haven't completed initialization, attach the error
    557                 // to the notification instead.
    558                 notifyInitDone(err);
    559                 break;
    560             }
    561 
    562             notifyError(err);
    563             break;
    564         }
    565 
    566         case ANetworkSession::kWhatDatagram:
    567         {
    568             sp<ABuffer> data;
    569             CHECK(msg->findBuffer("data", &data));
    570 
    571             if (isRTP) {
    572                 ALOGW("Huh? Received data on RTP connection...");
    573             } else {
    574                 onRTCPData(data);
    575             }
    576             break;
    577         }
    578 
    579         case ANetworkSession::kWhatConnected:
    580         {
    581             int32_t sessionID;
    582             CHECK(msg->findInt32("sessionID", &sessionID));
    583 
    584             if  (isRTP) {
    585                 CHECK_EQ(mRTPMode, TRANSPORT_TCP);
    586                 CHECK_EQ(sessionID, mRTPSessionID);
    587                 mRTPConnected = true;
    588             } else {
    589                 CHECK_EQ(mRTCPMode, TRANSPORT_TCP);
    590                 CHECK_EQ(sessionID, mRTCPSessionID);
    591                 mRTCPConnected = true;
    592             }
    593 
    594             if (mRTPConnected
    595                     && (mRTCPMode == TRANSPORT_NONE || mRTCPConnected)) {
    596                 notifyInitDone(OK);
    597             }
    598             break;
    599         }
    600 
    601         case ANetworkSession::kWhatNetworkStall:
    602         {
    603             size_t numBytesQueued;
    604             CHECK(msg->findSize("numBytesQueued", &numBytesQueued));
    605 
    606             notifyNetworkStall(numBytesQueued);
    607             break;
    608         }
    609 
    610         default:
    611             TRESPASS();
    612     }
    613 }
    614 
    615 status_t RTPSender::onRTCPData(const sp<ABuffer> &buffer) {
    616     const uint8_t *data = buffer->data();
    617     size_t size = buffer->size();
    618 
    619     while (size > 0) {
    620         if (size < 8) {
    621             // Too short to be a valid RTCP header
    622             return ERROR_MALFORMED;
    623         }
    624 
    625         if ((data[0] >> 6) != 2) {
    626             // Unsupported version.
    627             return ERROR_UNSUPPORTED;
    628         }
    629 
    630         if (data[0] & 0x20) {
    631             // Padding present.
    632 
    633             size_t paddingLength = data[size - 1];
    634 
    635             if (paddingLength + 12 > size) {
    636                 // If we removed this much padding we'd end up with something
    637                 // that's too short to be a valid RTP header.
    638                 return ERROR_MALFORMED;
    639             }
    640 
    641             size -= paddingLength;
    642         }
    643 
    644         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
    645 
    646         if (size < headerLength) {
    647             // Only received a partial packet?
    648             return ERROR_MALFORMED;
    649         }
    650 
    651         switch (data[1]) {
    652             case 200:
    653             case 201:  // RR
    654                 parseReceiverReport(data, headerLength);
    655                 break;
    656 
    657             case 202:  // SDES
    658             case 203:
    659                 break;
    660 
    661             case 204:  // APP
    662                 parseAPP(data, headerLength);
    663                 break;
    664 
    665             case 205:  // TSFB (transport layer specific feedback)
    666                 parseTSFB(data, headerLength);
    667                 break;
    668 
    669             case 206:  // PSFB (payload specific feedback)
    670                 // hexdump(data, headerLength);
    671                 break;
    672 
    673             default:
    674             {
    675                 ALOGW("Unknown RTCP packet type %u of size %d",
    676                      (unsigned)data[1], headerLength);
    677                 break;
    678             }
    679         }
    680 
    681         data += headerLength;
    682         size -= headerLength;
    683     }
    684 
    685     return OK;
    686 }
    687 
    688 status_t RTPSender::parseReceiverReport(
    689         const uint8_t *data, size_t /* size */) {
    690     float fractionLost = data[12] / 256.0f;
    691 
    692     ALOGI("lost %.2f %% of packets during report interval.",
    693           100.0f * fractionLost);
    694 
    695     return OK;
    696 }
    697 
    698 status_t RTPSender::parseTSFB(const uint8_t *data, size_t size) {
    699     if ((data[0] & 0x1f) != 1) {
    700         return ERROR_UNSUPPORTED;  // We only support NACK for now.
    701     }
    702 
    703     uint32_t srcId = U32_AT(&data[8]);
    704     if (srcId != kSourceID) {
    705         return ERROR_MALFORMED;
    706     }
    707 
    708     for (size_t i = 12; i < size; i += 4) {
    709         uint16_t seqNo = U16_AT(&data[i]);
    710         uint16_t blp = U16_AT(&data[i + 2]);
    711 
    712         List<sp<ABuffer> >::iterator it = mHistory.begin();
    713         bool foundSeqNo = false;
    714         while (it != mHistory.end()) {
    715             const sp<ABuffer> &buffer = *it;
    716 
    717             uint16_t bufferSeqNo = buffer->int32Data() & 0xffff;
    718 
    719             bool retransmit = false;
    720             if (bufferSeqNo == seqNo) {
    721                 retransmit = true;
    722             } else if (blp != 0) {
    723                 for (size_t i = 0; i < 16; ++i) {
    724                     if ((blp & (1 << i))
    725                         && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) {
    726                         blp &= ~(1 << i);
    727                         retransmit = true;
    728                     }
    729                 }
    730             }
    731 
    732             if (retransmit) {
    733                 ALOGV("retransmitting seqNo %d", bufferSeqNo);
    734 
    735                 CHECK_EQ((status_t)OK,
    736                          sendRTPPacket(buffer, false /* storeInHistory */));
    737 
    738                 if (bufferSeqNo == seqNo) {
    739                     foundSeqNo = true;
    740                 }
    741 
    742                 if (foundSeqNo && blp == 0) {
    743                     break;
    744                 }
    745             }
    746 
    747             ++it;
    748         }
    749 
    750         if (!foundSeqNo || blp != 0) {
    751             ALOGI("Some sequence numbers were no longer available for "
    752                   "retransmission (seqNo = %d, foundSeqNo = %d, blp = 0x%04x)",
    753                   seqNo, foundSeqNo, blp);
    754 
    755             if (!mHistory.empty()) {
    756                 int32_t earliest = (*mHistory.begin())->int32Data() & 0xffff;
    757                 int32_t latest = (*--mHistory.end())->int32Data() & 0xffff;
    758 
    759                 ALOGI("have seq numbers from %d - %d", earliest, latest);
    760             }
    761         }
    762     }
    763 
    764     return OK;
    765 }
    766 
    767 status_t RTPSender::parseAPP(const uint8_t *data, size_t size) {
    768     if (!memcmp("late", &data[8], 4)) {
    769         int64_t avgLatencyUs = (int64_t)U64_AT(&data[12]);
    770         int64_t maxLatencyUs = (int64_t)U64_AT(&data[20]);
    771 
    772         sp<AMessage> notify = mNotify->dup();
    773         notify->setInt32("what", kWhatInformSender);
    774         notify->setInt64("avgLatencyUs", avgLatencyUs);
    775         notify->setInt64("maxLatencyUs", maxLatencyUs);
    776         notify->post();
    777     }
    778 
    779     return OK;
    780 }
    781 
    782 void RTPSender::notifyInitDone(status_t err) {
    783     sp<AMessage> notify = mNotify->dup();
    784     notify->setInt32("what", kWhatInitDone);
    785     notify->setInt32("err", err);
    786     notify->post();
    787 }
    788 
    789 void RTPSender::notifyError(status_t err) {
    790     sp<AMessage> notify = mNotify->dup();
    791     notify->setInt32("what", kWhatError);
    792     notify->setInt32("err", err);
    793     notify->post();
    794 }
    795 
    796 void RTPSender::notifyNetworkStall(size_t numBytesQueued) {
    797     sp<AMessage> notify = mNotify->dup();
    798     notify->setInt32("what", kWhatNetworkStall);
    799     notify->setSize("numBytesQueued", numBytesQueued);
    800     notify->post();
    801 }
    802 
    803 }  // namespace android
    804 
    805