Home | History | Annotate | Download | only in rtp
      1 /*
      2  * Copyright 2013, The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "RTPSender"
     19 #include <utils/Log.h>
     20 
     21 #include "RTPSender.h"
     22 
     23 #include <media/stagefright/foundation/ABuffer.h>
     24 #include <media/stagefright/foundation/ADebug.h>
     25 #include <media/stagefright/foundation/AMessage.h>
     26 #include <media/stagefright/foundation/ANetworkSession.h>
     27 #include <media/stagefright/foundation/hexdump.h>
     28 #include <media/stagefright/MediaErrors.h>
     29 #include <media/stagefright/Utils.h>
     30 
     31 #include "include/avc_utils.h"
     32 
     33 namespace android {
     34 
     35 RTPSender::RTPSender(
     36         const sp<ANetworkSession> &netSession,
     37         const sp<AMessage> &notify)
     38     : mNetSession(netSession),
     39       mNotify(notify),
     40       mRTPMode(TRANSPORT_UNDEFINED),
     41       mRTCPMode(TRANSPORT_UNDEFINED),
     42       mRTPSessionID(0),
     43       mRTCPSessionID(0),
     44       mRTPConnected(false),
     45       mRTCPConnected(false),
     46       mLastNTPTime(0),
     47       mLastRTPTime(0),
     48       mNumRTPSent(0),
     49       mNumRTPOctetsSent(0),
     50       mNumSRsSent(0),
     51       mRTPSeqNo(0),
     52       mHistorySize(0) {
     53 }
     54 
     55 RTPSender::~RTPSender() {
     56     if (mRTCPSessionID != 0) {
     57         mNetSession->destroySession(mRTCPSessionID);
     58         mRTCPSessionID = 0;
     59     }
     60 
     61     if (mRTPSessionID != 0) {
     62         mNetSession->destroySession(mRTPSessionID);
     63         mRTPSessionID = 0;
     64     }
     65 }
     66 
     67 // static
     68 int32_t RTPBase::PickRandomRTPPort() {
     69     // Pick an even integer in range [1024, 65534)
     70 
     71     static const size_t kRange = (65534 - 1024) / 2;
     72 
     73     return (int32_t)(((float)(kRange + 1) * rand()) / RAND_MAX) * 2 + 1024;
     74 }
     75 
     76 status_t RTPSender::initAsync(
     77         const char *remoteHost,
     78         int32_t remoteRTPPort,
     79         TransportMode rtpMode,
     80         int32_t remoteRTCPPort,
     81         TransportMode rtcpMode,
     82         int32_t *outLocalRTPPort) {
     83     if (mRTPMode != TRANSPORT_UNDEFINED
     84             || rtpMode == TRANSPORT_UNDEFINED
     85             || rtpMode == TRANSPORT_NONE
     86             || rtcpMode == TRANSPORT_UNDEFINED) {
     87         return INVALID_OPERATION;
     88     }
     89 
     90     CHECK_NE(rtpMode, TRANSPORT_TCP_INTERLEAVED);
     91     CHECK_NE(rtcpMode, TRANSPORT_TCP_INTERLEAVED);
     92 
     93     if ((rtcpMode == TRANSPORT_NONE && remoteRTCPPort >= 0)
     94             || (rtcpMode != TRANSPORT_NONE && remoteRTCPPort < 0)) {
     95         return INVALID_OPERATION;
     96     }
     97 
     98     sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, this);
     99 
    100     sp<AMessage> rtcpNotify;
    101     if (remoteRTCPPort >= 0) {
    102         rtcpNotify = new AMessage(kWhatRTCPNotify, this);
    103     }
    104 
    105     CHECK_EQ(mRTPSessionID, 0);
    106     CHECK_EQ(mRTCPSessionID, 0);
    107 
    108     int32_t localRTPPort;
    109 
    110     for (;;) {
    111         localRTPPort = PickRandomRTPPort();
    112 
    113         status_t err;
    114         if (rtpMode == TRANSPORT_UDP) {
    115             err = mNetSession->createUDPSession(
    116                     localRTPPort,
    117                     remoteHost,
    118                     remoteRTPPort,
    119                     rtpNotify,
    120                     &mRTPSessionID);
    121         } else {
    122             CHECK_EQ(rtpMode, TRANSPORT_TCP);
    123             err = mNetSession->createTCPDatagramSession(
    124                     localRTPPort,
    125                     remoteHost,
    126                     remoteRTPPort,
    127                     rtpNotify,
    128                     &mRTPSessionID);
    129         }
    130 
    131         if (err != OK) {
    132             continue;
    133         }
    134 
    135         if (remoteRTCPPort < 0) {
    136             break;
    137         }
    138 
    139         if (rtcpMode == TRANSPORT_UDP) {
    140             err = mNetSession->createUDPSession(
    141                     localRTPPort + 1,
    142                     remoteHost,
    143                     remoteRTCPPort,
    144                     rtcpNotify,
    145                     &mRTCPSessionID);
    146         } else {
    147             CHECK_EQ(rtcpMode, TRANSPORT_TCP);
    148             err = mNetSession->createTCPDatagramSession(
    149                     localRTPPort + 1,
    150                     remoteHost,
    151                     remoteRTCPPort,
    152                     rtcpNotify,
    153                     &mRTCPSessionID);
    154         }
    155 
    156         if (err == OK) {
    157             break;
    158         }
    159 
    160         mNetSession->destroySession(mRTPSessionID);
    161         mRTPSessionID = 0;
    162     }
    163 
    164     if (rtpMode == TRANSPORT_UDP) {
    165         mRTPConnected = true;
    166     }
    167 
    168     if (rtcpMode == TRANSPORT_UDP) {
    169         mRTCPConnected = true;
    170     }
    171 
    172     mRTPMode = rtpMode;
    173     mRTCPMode = rtcpMode;
    174     *outLocalRTPPort = localRTPPort;
    175 
    176     if (mRTPMode == TRANSPORT_UDP
    177             && (mRTCPMode == TRANSPORT_UDP || mRTCPMode == TRANSPORT_NONE)) {
    178         notifyInitDone(OK);
    179     }
    180 
    181     return OK;
    182 }
    183 
    184 status_t RTPSender::queueBuffer(
    185         const sp<ABuffer> &buffer, uint8_t packetType, PacketizationMode mode) {
    186     status_t err;
    187 
    188     switch (mode) {
    189         case PACKETIZATION_NONE:
    190             err = queueRawPacket(buffer, packetType);
    191             break;
    192 
    193         case PACKETIZATION_TRANSPORT_STREAM:
    194             err = queueTSPackets(buffer, packetType);
    195             break;
    196 
    197         case PACKETIZATION_H264:
    198             err  = queueAVCBuffer(buffer, packetType);
    199             break;
    200 
    201         default:
    202             TRESPASS();
    203     }
    204 
    205     return err;
    206 }
    207 
    208 status_t RTPSender::queueRawPacket(
    209         const sp<ABuffer> &packet, uint8_t packetType) {
    210     CHECK_LE(packet->size(), kMaxUDPPacketSize - 12);
    211 
    212     int64_t timeUs;
    213     CHECK(packet->meta()->findInt64("timeUs", &timeUs));
    214 
    215     sp<ABuffer> udpPacket = new ABuffer(12 + packet->size());
    216 
    217     udpPacket->setInt32Data(mRTPSeqNo);
    218 
    219     uint8_t *rtp = udpPacket->data();
    220     rtp[0] = 0x80;
    221     rtp[1] = packetType;
    222 
    223     rtp[2] = (mRTPSeqNo >> 8) & 0xff;
    224     rtp[3] = mRTPSeqNo & 0xff;
    225     ++mRTPSeqNo;
    226 
    227     uint32_t rtpTime = (timeUs * 9) / 100ll;
    228 
    229     rtp[4] = rtpTime >> 24;
    230     rtp[5] = (rtpTime >> 16) & 0xff;
    231     rtp[6] = (rtpTime >> 8) & 0xff;
    232     rtp[7] = rtpTime & 0xff;
    233 
    234     rtp[8] = kSourceID >> 24;
    235     rtp[9] = (kSourceID >> 16) & 0xff;
    236     rtp[10] = (kSourceID >> 8) & 0xff;
    237     rtp[11] = kSourceID & 0xff;
    238 
    239     memcpy(&rtp[12], packet->data(), packet->size());
    240 
    241     return sendRTPPacket(
    242             udpPacket,
    243             true /* storeInHistory */,
    244             true /* timeValid */,
    245             ALooper::GetNowUs());
    246 }
    247 
    248 status_t RTPSender::queueTSPackets(
    249         const sp<ABuffer> &tsPackets, uint8_t packetType) {
    250     CHECK_EQ(0, tsPackets->size() % 188);
    251 
    252     int64_t timeUs;
    253     CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs));
    254 
    255     size_t srcOffset = 0;
    256     while (srcOffset < tsPackets->size()) {
    257         sp<ABuffer> udpPacket =
    258             new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188);
    259 
    260         udpPacket->setInt32Data(mRTPSeqNo);
    261 
    262         uint8_t *rtp = udpPacket->data();
    263         rtp[0] = 0x80;
    264         rtp[1] = packetType;
    265 
    266         rtp[2] = (mRTPSeqNo >> 8) & 0xff;
    267         rtp[3] = mRTPSeqNo & 0xff;
    268         ++mRTPSeqNo;
    269 
    270         int64_t nowUs = ALooper::GetNowUs();
    271         uint32_t rtpTime = (nowUs * 9) / 100ll;
    272 
    273         rtp[4] = rtpTime >> 24;
    274         rtp[5] = (rtpTime >> 16) & 0xff;
    275         rtp[6] = (rtpTime >> 8) & 0xff;
    276         rtp[7] = rtpTime & 0xff;
    277 
    278         rtp[8] = kSourceID >> 24;
    279         rtp[9] = (kSourceID >> 16) & 0xff;
    280         rtp[10] = (kSourceID >> 8) & 0xff;
    281         rtp[11] = kSourceID & 0xff;
    282 
    283         size_t numTSPackets = (tsPackets->size() - srcOffset) / 188;
    284         if (numTSPackets > kMaxNumTSPacketsPerRTPPacket) {
    285             numTSPackets = kMaxNumTSPacketsPerRTPPacket;
    286         }
    287 
    288         memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188);
    289 
    290         udpPacket->setRange(0, 12 + numTSPackets * 188);
    291 
    292         srcOffset += numTSPackets * 188;
    293         bool isLastPacket = (srcOffset == tsPackets->size());
    294 
    295         status_t err = sendRTPPacket(
    296                 udpPacket,
    297                 true /* storeInHistory */,
    298                 isLastPacket /* timeValid */,
    299                 timeUs);
    300 
    301         if (err != OK) {
    302             return err;
    303         }
    304     }
    305 
    306     return OK;
    307 }
    308 
    309 status_t RTPSender::queueAVCBuffer(
    310         const sp<ABuffer> &accessUnit, uint8_t packetType) {
    311     int64_t timeUs;
    312     CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
    313 
    314     uint32_t rtpTime = (timeUs * 9 / 100ll);
    315 
    316     List<sp<ABuffer> > packets;
    317 
    318     sp<ABuffer> out = new ABuffer(kMaxUDPPacketSize);
    319     size_t outBytesUsed = 12;  // Placeholder for RTP header.
    320 
    321     const uint8_t *data = accessUnit->data();
    322     size_t size = accessUnit->size();
    323     const uint8_t *nalStart;
    324     size_t nalSize;
    325     while (getNextNALUnit(
    326                 &data, &size, &nalStart, &nalSize,
    327                 true /* startCodeFollows */) == OK) {
    328         size_t bytesNeeded = nalSize + 2;
    329         if (outBytesUsed == 12) {
    330             ++bytesNeeded;
    331         }
    332 
    333         if (outBytesUsed + bytesNeeded > out->capacity()) {
    334             bool emitSingleNALPacket = false;
    335 
    336             if (outBytesUsed == 12
    337                     && outBytesUsed + nalSize <= out->capacity()) {
    338                 // We haven't emitted anything into the current packet yet and
    339                 // this NAL unit fits into a single-NAL-unit-packet while
    340                 // it wouldn't have fit as part of a STAP-A packet.
    341 
    342                 memcpy(out->data() + outBytesUsed, nalStart, nalSize);
    343                 outBytesUsed += nalSize;
    344 
    345                 emitSingleNALPacket = true;
    346             }
    347 
    348             if (outBytesUsed > 12) {
    349                 out->setRange(0, outBytesUsed);
    350                 packets.push_back(out);
    351                 out = new ABuffer(kMaxUDPPacketSize);
    352                 outBytesUsed = 12;  // Placeholder for RTP header
    353             }
    354 
    355             if (emitSingleNALPacket) {
    356                 continue;
    357             }
    358         }
    359 
    360         if (outBytesUsed + bytesNeeded <= out->capacity()) {
    361             uint8_t *dst = out->data() + outBytesUsed;
    362 
    363             if (outBytesUsed == 12) {
    364                 *dst++ = 24;  // STAP-A header
    365             }
    366 
    367             *dst++ = (nalSize >> 8) & 0xff;
    368             *dst++ = nalSize & 0xff;
    369             memcpy(dst, nalStart, nalSize);
    370 
    371             outBytesUsed += bytesNeeded;
    372             continue;
    373         }
    374 
    375         // This single NAL unit does not fit into a single RTP packet,
    376         // we need to emit an FU-A.
    377 
    378         CHECK_EQ(outBytesUsed, 12u);
    379 
    380         uint8_t nalType = nalStart[0] & 0x1f;
    381         uint8_t nri = (nalStart[0] >> 5) & 3;
    382 
    383         size_t srcOffset = 1;
    384         while (srcOffset < nalSize) {
    385             size_t copy = out->capacity() - outBytesUsed - 2;
    386             if (copy > nalSize - srcOffset) {
    387                 copy = nalSize - srcOffset;
    388             }
    389 
    390             uint8_t *dst = out->data() + outBytesUsed;
    391             dst[0] = (nri << 5) | 28;
    392 
    393             dst[1] = nalType;
    394 
    395             if (srcOffset == 1) {
    396                 dst[1] |= 0x80;
    397             }
    398 
    399             if (srcOffset + copy == nalSize) {
    400                 dst[1] |= 0x40;
    401             }
    402 
    403             memcpy(&dst[2], nalStart + srcOffset, copy);
    404             srcOffset += copy;
    405 
    406             out->setRange(0, outBytesUsed + copy + 2);
    407 
    408             packets.push_back(out);
    409             out = new ABuffer(kMaxUDPPacketSize);
    410             outBytesUsed = 12;  // Placeholder for RTP header
    411         }
    412     }
    413 
    414     if (outBytesUsed > 12) {
    415         out->setRange(0, outBytesUsed);
    416         packets.push_back(out);
    417     }
    418 
    419     while (!packets.empty()) {
    420         sp<ABuffer> out = *packets.begin();
    421         packets.erase(packets.begin());
    422 
    423         out->setInt32Data(mRTPSeqNo);
    424 
    425         bool last = packets.empty();
    426 
    427         uint8_t *dst = out->data();
    428 
    429         dst[0] = 0x80;
    430 
    431         dst[1] = packetType;
    432         if (last) {
    433             dst[1] |= 1 << 7;  // M-bit
    434         }
    435 
    436         dst[2] = (mRTPSeqNo >> 8) & 0xff;
    437         dst[3] = mRTPSeqNo & 0xff;
    438         ++mRTPSeqNo;
    439 
    440         dst[4] = rtpTime >> 24;
    441         dst[5] = (rtpTime >> 16) & 0xff;
    442         dst[6] = (rtpTime >> 8) & 0xff;
    443         dst[7] = rtpTime & 0xff;
    444         dst[8] = kSourceID >> 24;
    445         dst[9] = (kSourceID >> 16) & 0xff;
    446         dst[10] = (kSourceID >> 8) & 0xff;
    447         dst[11] = kSourceID & 0xff;
    448 
    449         status_t err = sendRTPPacket(out, true /* storeInHistory */);
    450 
    451         if (err != OK) {
    452             return err;
    453         }
    454     }
    455 
    456     return OK;
    457 }
    458 
    459 status_t RTPSender::sendRTPPacket(
    460         const sp<ABuffer> &buffer, bool storeInHistory,
    461         bool timeValid, int64_t timeUs) {
    462     CHECK(mRTPConnected);
    463 
    464     status_t err = mNetSession->sendRequest(
    465             mRTPSessionID, buffer->data(), buffer->size(),
    466             timeValid, timeUs);
    467 
    468     if (err != OK) {
    469         return err;
    470     }
    471 
    472     mLastNTPTime = GetNowNTP();
    473     mLastRTPTime = U32_AT(buffer->data() + 4);
    474 
    475     ++mNumRTPSent;
    476     mNumRTPOctetsSent += buffer->size() - 12;
    477 
    478     if (storeInHistory) {
    479         if (mHistorySize == kMaxHistorySize) {
    480             mHistory.erase(mHistory.begin());
    481         } else {
    482             ++mHistorySize;
    483         }
    484         mHistory.push_back(buffer);
    485     }
    486 
    487     return OK;
    488 }
    489 
    490 // static
    491 uint64_t RTPSender::GetNowNTP() {
    492     struct timeval tv;
    493     gettimeofday(&tv, NULL /* timezone */);
    494 
    495     uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec;
    496 
    497     nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll;
    498 
    499     uint64_t hi = nowUs / 1000000ll;
    500     uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll;
    501 
    502     return (hi << 32) | lo;
    503 }
    504 
    505 void RTPSender::onMessageReceived(const sp<AMessage> &msg) {
    506     switch (msg->what()) {
    507         case kWhatRTPNotify:
    508         case kWhatRTCPNotify:
    509             onNetNotify(msg->what() == kWhatRTPNotify, msg);
    510             break;
    511 
    512         default:
    513             TRESPASS();
    514     }
    515 }
    516 
    517 void RTPSender::onNetNotify(bool isRTP, const sp<AMessage> &msg) {
    518     int32_t reason;
    519     CHECK(msg->findInt32("reason", &reason));
    520 
    521     switch (reason) {
    522         case ANetworkSession::kWhatError:
    523         {
    524             int32_t sessionID;
    525             CHECK(msg->findInt32("sessionID", &sessionID));
    526 
    527             int32_t err;
    528             CHECK(msg->findInt32("err", &err));
    529 
    530             int32_t errorOccuredDuringSend;
    531             CHECK(msg->findInt32("send", &errorOccuredDuringSend));
    532 
    533             AString detail;
    534             CHECK(msg->findString("detail", &detail));
    535 
    536             ALOGE("An error occurred during %s in session %d "
    537                   "(%d, '%s' (%s)).",
    538                   errorOccuredDuringSend ? "send" : "receive",
    539                   sessionID,
    540                   err,
    541                   detail.c_str(),
    542                   strerror(-err));
    543 
    544             mNetSession->destroySession(sessionID);
    545 
    546             if (sessionID == mRTPSessionID) {
    547                 mRTPSessionID = 0;
    548             } else if (sessionID == mRTCPSessionID) {
    549                 mRTCPSessionID = 0;
    550             }
    551 
    552             if (!mRTPConnected
    553                     || (mRTPMode != TRANSPORT_NONE && !mRTCPConnected)) {
    554                 // We haven't completed initialization, attach the error
    555                 // to the notification instead.
    556                 notifyInitDone(err);
    557                 break;
    558             }
    559 
    560             notifyError(err);
    561             break;
    562         }
    563 
    564         case ANetworkSession::kWhatDatagram:
    565         {
    566             sp<ABuffer> data;
    567             CHECK(msg->findBuffer("data", &data));
    568 
    569             if (isRTP) {
    570                 ALOGW("Huh? Received data on RTP connection...");
    571             } else {
    572                 onRTCPData(data);
    573             }
    574             break;
    575         }
    576 
    577         case ANetworkSession::kWhatConnected:
    578         {
    579             int32_t sessionID;
    580             CHECK(msg->findInt32("sessionID", &sessionID));
    581 
    582             if  (isRTP) {
    583                 CHECK_EQ(mRTPMode, TRANSPORT_TCP);
    584                 CHECK_EQ(sessionID, mRTPSessionID);
    585                 mRTPConnected = true;
    586             } else {
    587                 CHECK_EQ(mRTCPMode, TRANSPORT_TCP);
    588                 CHECK_EQ(sessionID, mRTCPSessionID);
    589                 mRTCPConnected = true;
    590             }
    591 
    592             if (mRTPConnected
    593                     && (mRTCPMode == TRANSPORT_NONE || mRTCPConnected)) {
    594                 notifyInitDone(OK);
    595             }
    596             break;
    597         }
    598 
    599         case ANetworkSession::kWhatNetworkStall:
    600         {
    601             size_t numBytesQueued;
    602             CHECK(msg->findSize("numBytesQueued", &numBytesQueued));
    603 
    604             notifyNetworkStall(numBytesQueued);
    605             break;
    606         }
    607 
    608         default:
    609             TRESPASS();
    610     }
    611 }
    612 
    613 status_t RTPSender::onRTCPData(const sp<ABuffer> &buffer) {
    614     const uint8_t *data = buffer->data();
    615     size_t size = buffer->size();
    616 
    617     while (size > 0) {
    618         if (size < 8) {
    619             // Too short to be a valid RTCP header
    620             return ERROR_MALFORMED;
    621         }
    622 
    623         if ((data[0] >> 6) != 2) {
    624             // Unsupported version.
    625             return ERROR_UNSUPPORTED;
    626         }
    627 
    628         if (data[0] & 0x20) {
    629             // Padding present.
    630 
    631             size_t paddingLength = data[size - 1];
    632 
    633             if (paddingLength + 12 > size) {
    634                 // If we removed this much padding we'd end up with something
    635                 // that's too short to be a valid RTP header.
    636                 return ERROR_MALFORMED;
    637             }
    638 
    639             size -= paddingLength;
    640         }
    641 
    642         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
    643 
    644         if (size < headerLength) {
    645             // Only received a partial packet?
    646             return ERROR_MALFORMED;
    647         }
    648 
    649         switch (data[1]) {
    650             case 200:
    651             case 201:  // RR
    652                 parseReceiverReport(data, headerLength);
    653                 break;
    654 
    655             case 202:  // SDES
    656             case 203:
    657                 break;
    658 
    659             case 204:  // APP
    660                 parseAPP(data, headerLength);
    661                 break;
    662 
    663             case 205:  // TSFB (transport layer specific feedback)
    664                 parseTSFB(data, headerLength);
    665                 break;
    666 
    667             case 206:  // PSFB (payload specific feedback)
    668                 // hexdump(data, headerLength);
    669                 break;
    670 
    671             default:
    672             {
    673                 ALOGW("Unknown RTCP packet type %u of size %zu",
    674                         (unsigned)data[1], headerLength);
    675                 break;
    676             }
    677         }
    678 
    679         data += headerLength;
    680         size -= headerLength;
    681     }
    682 
    683     return OK;
    684 }
    685 
    686 status_t RTPSender::parseReceiverReport(
    687         const uint8_t *data, size_t /* size */) {
    688     float fractionLost = data[12] / 256.0f;
    689 
    690     ALOGI("lost %.2f %% of packets during report interval.",
    691           100.0f * fractionLost);
    692 
    693     return OK;
    694 }
    695 
    696 status_t RTPSender::parseTSFB(const uint8_t *data, size_t size) {
    697     if ((data[0] & 0x1f) != 1) {
    698         return ERROR_UNSUPPORTED;  // We only support NACK for now.
    699     }
    700 
    701     uint32_t srcId = U32_AT(&data[8]);
    702     if (srcId != kSourceID) {
    703         return ERROR_MALFORMED;
    704     }
    705 
    706     for (size_t i = 12; i < size; i += 4) {
    707         uint16_t seqNo = U16_AT(&data[i]);
    708         uint16_t blp = U16_AT(&data[i + 2]);
    709 
    710         List<sp<ABuffer> >::iterator it = mHistory.begin();
    711         bool foundSeqNo = false;
    712         while (it != mHistory.end()) {
    713             const sp<ABuffer> &buffer = *it;
    714 
    715             uint16_t bufferSeqNo = buffer->int32Data() & 0xffff;
    716 
    717             bool retransmit = false;
    718             if (bufferSeqNo == seqNo) {
    719                 retransmit = true;
    720             } else if (blp != 0) {
    721                 for (size_t i = 0; i < 16; ++i) {
    722                     if ((blp & (1 << i))
    723                         && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) {
    724                         blp &= ~(1 << i);
    725                         retransmit = true;
    726                     }
    727                 }
    728             }
    729 
    730             if (retransmit) {
    731                 ALOGV("retransmitting seqNo %d", bufferSeqNo);
    732 
    733                 CHECK_EQ((status_t)OK,
    734                          sendRTPPacket(buffer, false /* storeInHistory */));
    735 
    736                 if (bufferSeqNo == seqNo) {
    737                     foundSeqNo = true;
    738                 }
    739 
    740                 if (foundSeqNo && blp == 0) {
    741                     break;
    742                 }
    743             }
    744 
    745             ++it;
    746         }
    747 
    748         if (!foundSeqNo || blp != 0) {
    749             ALOGI("Some sequence numbers were no longer available for "
    750                   "retransmission (seqNo = %d, foundSeqNo = %d, blp = 0x%04x)",
    751                   seqNo, foundSeqNo, blp);
    752 
    753             if (!mHistory.empty()) {
    754                 int32_t earliest = (*mHistory.begin())->int32Data() & 0xffff;
    755                 int32_t latest = (*--mHistory.end())->int32Data() & 0xffff;
    756 
    757                 ALOGI("have seq numbers from %d - %d", earliest, latest);
    758             }
    759         }
    760     }
    761 
    762     return OK;
    763 }
    764 
    765 status_t RTPSender::parseAPP(const uint8_t *data, size_t size __unused) {
    766     if (!memcmp("late", &data[8], 4)) {
    767         int64_t avgLatencyUs = (int64_t)U64_AT(&data[12]);
    768         int64_t maxLatencyUs = (int64_t)U64_AT(&data[20]);
    769 
    770         sp<AMessage> notify = mNotify->dup();
    771         notify->setInt32("what", kWhatInformSender);
    772         notify->setInt64("avgLatencyUs", avgLatencyUs);
    773         notify->setInt64("maxLatencyUs", maxLatencyUs);
    774         notify->post();
    775     }
    776 
    777     return OK;
    778 }
    779 
    780 void RTPSender::notifyInitDone(status_t err) {
    781     sp<AMessage> notify = mNotify->dup();
    782     notify->setInt32("what", kWhatInitDone);
    783     notify->setInt32("err", err);
    784     notify->post();
    785 }
    786 
    787 void RTPSender::notifyError(status_t err) {
    788     sp<AMessage> notify = mNotify->dup();
    789     notify->setInt32("what", kWhatError);
    790     notify->setInt32("err", err);
    791     notify->post();
    792 }
    793 
    794 void RTPSender::notifyNetworkStall(size_t numBytesQueued) {
    795     sp<AMessage> notify = mNotify->dup();
    796     notify->setInt32("what", kWhatNetworkStall);
    797     notify->setSize("numBytesQueued", numBytesQueued);
    798     notify->post();
    799 }
    800 
    801 }  // namespace android
    802 
    803