Home | History | Annotate | Download | only in source
      1 /*
      2  * Copyright 2012, The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "Sender"
     19 #include <utils/Log.h>
     20 
     21 #include "Sender.h"
     22 
     23 #include "ANetworkSession.h"
     24 
     25 #include <media/stagefright/foundation/ABuffer.h>
     26 #include <media/stagefright/foundation/ADebug.h>
     27 #include <media/stagefright/foundation/AMessage.h>
     28 #include <media/stagefright/foundation/hexdump.h>
     29 #include <media/stagefright/MediaErrors.h>
     30 #include <media/stagefright/Utils.h>
     31 
     32 #include <math.h>
     33 
     34 #define DEBUG_JITTER    0
     35 
     36 namespace android {
     37 
     38 ////////////////////////////////////////////////////////////////////////////////
     39 
     40 #if DEBUG_JITTER
     41 struct TimeSeries {
     42     TimeSeries();
     43 
     44     void add(double val);
     45 
     46     double mean() const;
     47     double sdev() const;
     48 
     49 private:
     50     enum {
     51         kHistorySize = 20
     52     };
     53     double mValues[kHistorySize];
     54 
     55     size_t mCount;
     56     double mSum;
     57 };
     58 
     59 TimeSeries::TimeSeries()
     60     : mCount(0),
     61       mSum(0.0) {
     62 }
     63 
     64 void TimeSeries::add(double val) {
     65     if (mCount < kHistorySize) {
     66         mValues[mCount++] = val;
     67         mSum += val;
     68     } else {
     69         mSum -= mValues[0];
     70         memmove(&mValues[0], &mValues[1], (kHistorySize - 1) * sizeof(double));
     71         mValues[kHistorySize - 1] = val;
     72         mSum += val;
     73     }
     74 }
     75 
     76 double TimeSeries::mean() const {
     77     if (mCount < 1) {
     78         return 0.0;
     79     }
     80 
     81     return mSum / mCount;
     82 }
     83 
     84 double TimeSeries::sdev() const {
     85     if (mCount < 1) {
     86         return 0.0;
     87     }
     88 
     89     double m = mean();
     90 
     91     double sum = 0.0;
     92     for (size_t i = 0; i < mCount; ++i) {
     93         double tmp = mValues[i] - m;
     94         tmp *= tmp;
     95 
     96         sum += tmp;
     97     }
     98 
     99     return sqrt(sum / mCount);
    100 }
    101 #endif  // DEBUG_JITTER
    102 
    103 ////////////////////////////////////////////////////////////////////////////////
    104 
    105 static size_t kMaxRTPPacketSize = 1500;
    106 static size_t kMaxNumTSPacketsPerRTPPacket = (kMaxRTPPacketSize - 12) / 188;
    107 
    108 Sender::Sender(
    109         const sp<ANetworkSession> &netSession,
    110         const sp<AMessage> &notify)
    111     : mNetSession(netSession),
    112       mNotify(notify),
    113       mTSQueue(new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188)),
    114       mTransportMode(TRANSPORT_UDP),
    115       mRTPChannel(0),
    116       mRTCPChannel(0),
    117       mRTPPort(0),
    118       mRTPSessionID(0),
    119       mRTCPSessionID(0),
    120 #if ENABLE_RETRANSMISSION
    121       mRTPRetransmissionSessionID(0),
    122       mRTCPRetransmissionSessionID(0),
    123 #endif
    124       mClientRTPPort(0),
    125       mClientRTCPPort(0),
    126       mRTPConnected(false),
    127       mRTCPConnected(false),
    128       mFirstOutputBufferReadyTimeUs(-1ll),
    129       mFirstOutputBufferSentTimeUs(-1ll),
    130       mRTPSeqNo(0),
    131 #if ENABLE_RETRANSMISSION
    132       mRTPRetransmissionSeqNo(0),
    133 #endif
    134       mLastNTPTime(0),
    135       mLastRTPTime(0),
    136       mNumRTPSent(0),
    137       mNumRTPOctetsSent(0),
    138       mNumSRsSent(0),
    139       mSendSRPending(false)
    140 #if ENABLE_RETRANSMISSION
    141       ,mHistoryLength(0)
    142 #endif
    143 #if TRACK_BANDWIDTH
    144       ,mFirstPacketTimeUs(-1ll)
    145       ,mTotalBytesSent(0ll)
    146 #endif
    147 #if LOG_TRANSPORT_STREAM
    148     ,mLogFile(NULL)
    149 #endif
    150 {
    151     mTSQueue->setRange(0, 12);
    152 
    153 #if LOG_TRANSPORT_STREAM
    154     mLogFile = fopen("/system/etc/log.ts", "wb");
    155 #endif
    156 }
    157 
    158 Sender::~Sender() {
    159 #if ENABLE_RETRANSMISSION
    160     if (mRTCPRetransmissionSessionID != 0) {
    161         mNetSession->destroySession(mRTCPRetransmissionSessionID);
    162     }
    163 
    164     if (mRTPRetransmissionSessionID != 0) {
    165         mNetSession->destroySession(mRTPRetransmissionSessionID);
    166     }
    167 #endif
    168 
    169     if (mRTCPSessionID != 0) {
    170         mNetSession->destroySession(mRTCPSessionID);
    171     }
    172 
    173     if (mRTPSessionID != 0) {
    174         mNetSession->destroySession(mRTPSessionID);
    175     }
    176 
    177 #if LOG_TRANSPORT_STREAM
    178     if (mLogFile != NULL) {
    179         fclose(mLogFile);
    180         mLogFile = NULL;
    181     }
    182 #endif
    183 }
    184 
    185 status_t Sender::init(
    186         const char *clientIP, int32_t clientRtp, int32_t clientRtcp,
    187         TransportMode transportMode) {
    188     mClientIP = clientIP;
    189     mTransportMode = transportMode;
    190 
    191     if (transportMode == TRANSPORT_TCP_INTERLEAVED) {
    192         mRTPChannel = clientRtp;
    193         mRTCPChannel = clientRtcp;
    194         mRTPPort = 0;
    195         mRTPSessionID = 0;
    196         mRTCPSessionID = 0;
    197         return OK;
    198     }
    199 
    200     mRTPChannel = 0;
    201     mRTCPChannel = 0;
    202 
    203     if (mTransportMode == TRANSPORT_TCP) {
    204         // XXX This is wrong, we need to allocate sockets here, we only
    205         // need to do this because the dongles are not establishing their
    206         // end until after PLAY instead of before SETUP.
    207         mRTPPort = 20000;
    208         mRTPSessionID = 0;
    209         mRTCPSessionID = 0;
    210         mClientRTPPort = clientRtp;
    211         mClientRTCPPort = clientRtcp;
    212         return OK;
    213     }
    214 
    215     int serverRtp;
    216 
    217     sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id());
    218     sp<AMessage> rtcpNotify = new AMessage(kWhatRTCPNotify, id());
    219 
    220 #if ENABLE_RETRANSMISSION
    221     sp<AMessage> rtpRetransmissionNotify =
    222         new AMessage(kWhatRTPRetransmissionNotify, id());
    223 
    224     sp<AMessage> rtcpRetransmissionNotify =
    225         new AMessage(kWhatRTCPRetransmissionNotify, id());
    226 #endif
    227 
    228     status_t err;
    229     for (serverRtp = 15550;; serverRtp += 2) {
    230         int32_t rtpSession;
    231         if (mTransportMode == TRANSPORT_UDP) {
    232             err = mNetSession->createUDPSession(
    233                         serverRtp, clientIP, clientRtp,
    234                         rtpNotify, &rtpSession);
    235         } else {
    236             err = mNetSession->createTCPDatagramSession(
    237                         serverRtp, clientIP, clientRtp,
    238                         rtpNotify, &rtpSession);
    239         }
    240 
    241         if (err != OK) {
    242             ALOGI("failed to create RTP socket on port %d", serverRtp);
    243             continue;
    244         }
    245 
    246         int32_t rtcpSession = 0;
    247 
    248         if (clientRtcp >= 0) {
    249             if (mTransportMode == TRANSPORT_UDP) {
    250                 err = mNetSession->createUDPSession(
    251                         serverRtp + 1, clientIP, clientRtcp,
    252                         rtcpNotify, &rtcpSession);
    253             } else {
    254                 err = mNetSession->createTCPDatagramSession(
    255                         serverRtp + 1, clientIP, clientRtcp,
    256                         rtcpNotify, &rtcpSession);
    257             }
    258 
    259             if (err != OK) {
    260                 ALOGI("failed to create RTCP socket on port %d", serverRtp + 1);
    261 
    262                 mNetSession->destroySession(rtpSession);
    263                 continue;
    264             }
    265         }
    266 
    267 #if ENABLE_RETRANSMISSION
    268         if (mTransportMode == TRANSPORT_UDP) {
    269             int32_t rtpRetransmissionSession;
    270 
    271             err = mNetSession->createUDPSession(
    272                         serverRtp + kRetransmissionPortOffset,
    273                         clientIP,
    274                         clientRtp + kRetransmissionPortOffset,
    275                         rtpRetransmissionNotify,
    276                         &rtpRetransmissionSession);
    277 
    278             if (err != OK) {
    279                 mNetSession->destroySession(rtcpSession);
    280                 mNetSession->destroySession(rtpSession);
    281                 continue;
    282             }
    283 
    284             CHECK_GE(clientRtcp, 0);
    285 
    286             int32_t rtcpRetransmissionSession;
    287             err = mNetSession->createUDPSession(
    288                         serverRtp + 1 + kRetransmissionPortOffset,
    289                         clientIP,
    290                         clientRtp + 1 + kRetransmissionPortOffset,
    291                         rtcpRetransmissionNotify,
    292                         &rtcpRetransmissionSession);
    293 
    294             if (err != OK) {
    295                 mNetSession->destroySession(rtpRetransmissionSession);
    296                 mNetSession->destroySession(rtcpSession);
    297                 mNetSession->destroySession(rtpSession);
    298                 continue;
    299             }
    300 
    301             mRTPRetransmissionSessionID = rtpRetransmissionSession;
    302             mRTCPRetransmissionSessionID = rtcpRetransmissionSession;
    303 
    304             ALOGI("rtpRetransmissionSessionID = %d, "
    305                   "rtcpRetransmissionSessionID = %d",
    306                   rtpRetransmissionSession, rtcpRetransmissionSession);
    307         }
    308 #endif
    309 
    310         mRTPPort = serverRtp;
    311         mRTPSessionID = rtpSession;
    312         mRTCPSessionID = rtcpSession;
    313 
    314         ALOGI("rtpSessionID = %d, rtcpSessionID = %d", rtpSession, rtcpSession);
    315         break;
    316     }
    317 
    318     if (mRTPPort == 0) {
    319         return UNKNOWN_ERROR;
    320     }
    321 
    322     return OK;
    323 }
    324 
    325 status_t Sender::finishInit() {
    326     if (mTransportMode != TRANSPORT_TCP) {
    327         notifyInitDone();
    328         return OK;
    329     }
    330 
    331     sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id());
    332 
    333     status_t err = mNetSession->createTCPDatagramSession(
    334                 mRTPPort, mClientIP.c_str(), mClientRTPPort,
    335                 rtpNotify, &mRTPSessionID);
    336 
    337     if (err != OK) {
    338         return err;
    339     }
    340 
    341     if (mClientRTCPPort >= 0) {
    342         sp<AMessage> rtcpNotify = new AMessage(kWhatRTCPNotify, id());
    343 
    344         err = mNetSession->createTCPDatagramSession(
    345                 mRTPPort + 1, mClientIP.c_str(), mClientRTCPPort,
    346                 rtcpNotify, &mRTCPSessionID);
    347 
    348         if (err != OK) {
    349             return err;
    350         }
    351     }
    352 
    353     return OK;
    354 }
    355 
    356 int32_t Sender::getRTPPort() const {
    357     return mRTPPort;
    358 }
    359 
    360 void Sender::queuePackets(
    361         int64_t timeUs, const sp<ABuffer> &packets) {
    362     bool isVideo = false;
    363 
    364     int32_t dummy;
    365     if (packets->meta()->findInt32("isVideo", &dummy)) {
    366         isVideo = true;
    367     }
    368 
    369     int64_t delayUs;
    370     int64_t whenUs;
    371 
    372     if (mFirstOutputBufferReadyTimeUs < 0ll) {
    373         mFirstOutputBufferReadyTimeUs = timeUs;
    374         mFirstOutputBufferSentTimeUs = whenUs = ALooper::GetNowUs();
    375         delayUs = 0ll;
    376     } else {
    377         int64_t nowUs = ALooper::GetNowUs();
    378 
    379         whenUs = (timeUs - mFirstOutputBufferReadyTimeUs)
    380                 + mFirstOutputBufferSentTimeUs;
    381 
    382         delayUs = whenUs - nowUs;
    383     }
    384 
    385     sp<AMessage> msg = new AMessage(kWhatQueuePackets, id());
    386     msg->setBuffer("packets", packets);
    387 
    388     packets->meta()->setInt64("timeUs", timeUs);
    389     packets->meta()->setInt64("whenUs", whenUs);
    390     packets->meta()->setInt64("delayUs", delayUs);
    391     msg->post(delayUs > 0 ? delayUs : 0);
    392 }
    393 
    394 void Sender::onMessageReceived(const sp<AMessage> &msg) {
    395     switch (msg->what()) {
    396         case kWhatRTPNotify:
    397         case kWhatRTCPNotify:
    398 #if ENABLE_RETRANSMISSION
    399         case kWhatRTPRetransmissionNotify:
    400         case kWhatRTCPRetransmissionNotify:
    401 #endif
    402         {
    403             int32_t reason;
    404             CHECK(msg->findInt32("reason", &reason));
    405 
    406             switch (reason) {
    407                 case ANetworkSession::kWhatError:
    408                 {
    409                     int32_t sessionID;
    410                     CHECK(msg->findInt32("sessionID", &sessionID));
    411 
    412                     int32_t err;
    413                     CHECK(msg->findInt32("err", &err));
    414 
    415                     int32_t errorOccuredDuringSend;
    416                     CHECK(msg->findInt32("send", &errorOccuredDuringSend));
    417 
    418                     AString detail;
    419                     CHECK(msg->findString("detail", &detail));
    420 
    421                     if ((msg->what() == kWhatRTPNotify
    422 #if ENABLE_RETRANSMISSION
    423                             || msg->what() == kWhatRTPRetransmissionNotify
    424 #endif
    425                         ) && !errorOccuredDuringSend) {
    426                         // This is ok, we don't expect to receive anything on
    427                         // the RTP socket.
    428                         break;
    429                     }
    430 
    431                     ALOGE("An error occurred during %s in session %d "
    432                           "(%d, '%s' (%s)).",
    433                           errorOccuredDuringSend ? "send" : "receive",
    434                           sessionID,
    435                           err,
    436                           detail.c_str(),
    437                           strerror(-err));
    438 
    439                     mNetSession->destroySession(sessionID);
    440 
    441                     if (sessionID == mRTPSessionID) {
    442                         mRTPSessionID = 0;
    443                     } else if (sessionID == mRTCPSessionID) {
    444                         mRTCPSessionID = 0;
    445                     }
    446 #if ENABLE_RETRANSMISSION
    447                     else if (sessionID == mRTPRetransmissionSessionID) {
    448                         mRTPRetransmissionSessionID = 0;
    449                     } else if (sessionID == mRTCPRetransmissionSessionID) {
    450                         mRTCPRetransmissionSessionID = 0;
    451                     }
    452 #endif
    453 
    454                     notifySessionDead();
    455                     break;
    456                 }
    457 
    458                 case ANetworkSession::kWhatDatagram:
    459                 {
    460                     int32_t sessionID;
    461                     CHECK(msg->findInt32("sessionID", &sessionID));
    462 
    463                     sp<ABuffer> data;
    464                     CHECK(msg->findBuffer("data", &data));
    465 
    466                     status_t err;
    467                     if (msg->what() == kWhatRTCPNotify
    468 #if ENABLE_RETRANSMISSION
    469                             || msg->what() == kWhatRTCPRetransmissionNotify
    470 #endif
    471                        )
    472                     {
    473                         err = parseRTCP(data);
    474                     }
    475                     break;
    476                 }
    477 
    478                 case ANetworkSession::kWhatConnected:
    479                 {
    480                     CHECK_EQ(mTransportMode, TRANSPORT_TCP);
    481 
    482                     int32_t sessionID;
    483                     CHECK(msg->findInt32("sessionID", &sessionID));
    484 
    485                     if (sessionID == mRTPSessionID) {
    486                         CHECK(!mRTPConnected);
    487                         mRTPConnected = true;
    488                         ALOGI("RTP Session now connected.");
    489                     } else if (sessionID == mRTCPSessionID) {
    490                         CHECK(!mRTCPConnected);
    491                         mRTCPConnected = true;
    492                         ALOGI("RTCP Session now connected.");
    493                     } else {
    494                         TRESPASS();
    495                     }
    496 
    497                     if (mRTPConnected
    498                             && (mClientRTCPPort < 0 || mRTCPConnected)) {
    499                         notifyInitDone();
    500                     }
    501                     break;
    502                 }
    503 
    504                 default:
    505                     TRESPASS();
    506             }
    507             break;
    508         }
    509 
    510         case kWhatQueuePackets:
    511         {
    512             sp<ABuffer> packets;
    513             CHECK(msg->findBuffer("packets", &packets));
    514 
    515             onQueuePackets(packets);
    516             break;
    517         }
    518 
    519         case kWhatSendSR:
    520         {
    521             mSendSRPending = false;
    522 
    523             if (mRTCPSessionID == 0) {
    524                 break;
    525             }
    526 
    527             onSendSR();
    528 
    529             scheduleSendSR();
    530             break;
    531         }
    532     }
    533 }
    534 
    535 void Sender::onQueuePackets(const sp<ABuffer> &packets) {
    536 #if DEBUG_JITTER
    537     int32_t dummy;
    538     if (packets->meta()->findInt32("isVideo", &dummy)) {
    539         static int64_t lastTimeUs = 0ll;
    540         int64_t nowUs = ALooper::GetNowUs();
    541 
    542         static TimeSeries series;
    543         series.add((double)(nowUs - lastTimeUs));
    544 
    545         ALOGI("deltaTimeUs = %lld us, mean %.2f, sdev %.2f",
    546               nowUs - lastTimeUs, series.mean(), series.sdev());
    547 
    548         lastTimeUs = nowUs;
    549     }
    550 #endif
    551 
    552     int64_t startTimeUs = ALooper::GetNowUs();
    553 
    554     for (size_t offset = 0;
    555             offset < packets->size(); offset += 188) {
    556         bool lastTSPacket = (offset + 188 >= packets->size());
    557 
    558         appendTSData(
    559                 packets->data() + offset,
    560                 188,
    561                 true /* timeDiscontinuity */,
    562                 lastTSPacket /* flush */);
    563     }
    564 
    565 #if 0
    566     int64_t netTimeUs = ALooper::GetNowUs() - startTimeUs;
    567 
    568     int64_t whenUs;
    569     CHECK(packets->meta()->findInt64("whenUs", &whenUs));
    570 
    571     int64_t delayUs;
    572     CHECK(packets->meta()->findInt64("delayUs", &delayUs));
    573 
    574     bool isVideo = false;
    575     int32_t dummy;
    576     if (packets->meta()->findInt32("isVideo", &dummy)) {
    577         isVideo = true;
    578     }
    579 
    580     int64_t nowUs = ALooper::GetNowUs();
    581 
    582     if (nowUs - whenUs > 2000) {
    583         ALOGI("[%s] delayUs = %lld us, delta = %lld us",
    584               isVideo ? "video" : "audio", delayUs, nowUs - netTimeUs - whenUs);
    585     }
    586 #endif
    587 
    588 #if LOG_TRANSPORT_STREAM
    589     if (mLogFile != NULL) {
    590         fwrite(packets->data(), 1, packets->size(), mLogFile);
    591     }
    592 #endif
    593 }
    594 
    595 ssize_t Sender::appendTSData(
    596         const void *data, size_t size, bool timeDiscontinuity, bool flush) {
    597     CHECK_EQ(size, 188);
    598 
    599     CHECK_LE(mTSQueue->size() + size, mTSQueue->capacity());
    600 
    601     memcpy(mTSQueue->data() + mTSQueue->size(), data, size);
    602     mTSQueue->setRange(0, mTSQueue->size() + size);
    603 
    604     if (flush || mTSQueue->size() == mTSQueue->capacity()) {
    605         // flush
    606 
    607         int64_t nowUs = ALooper::GetNowUs();
    608 
    609 #if TRACK_BANDWIDTH
    610         if (mFirstPacketTimeUs < 0ll) {
    611             mFirstPacketTimeUs = nowUs;
    612         }
    613 #endif
    614 
    615         // 90kHz time scale
    616         uint32_t rtpTime = (nowUs * 9ll) / 100ll;
    617 
    618         uint8_t *rtp = mTSQueue->data();
    619         rtp[0] = 0x80;
    620         rtp[1] = 33 | (timeDiscontinuity ? (1 << 7) : 0);  // M-bit
    621         rtp[2] = (mRTPSeqNo >> 8) & 0xff;
    622         rtp[3] = mRTPSeqNo & 0xff;
    623         rtp[4] = rtpTime >> 24;
    624         rtp[5] = (rtpTime >> 16) & 0xff;
    625         rtp[6] = (rtpTime >> 8) & 0xff;
    626         rtp[7] = rtpTime & 0xff;
    627         rtp[8] = kSourceID >> 24;
    628         rtp[9] = (kSourceID >> 16) & 0xff;
    629         rtp[10] = (kSourceID >> 8) & 0xff;
    630         rtp[11] = kSourceID & 0xff;
    631 
    632         ++mRTPSeqNo;
    633         ++mNumRTPSent;
    634         mNumRTPOctetsSent += mTSQueue->size() - 12;
    635 
    636         mLastRTPTime = rtpTime;
    637         mLastNTPTime = GetNowNTP();
    638 
    639         if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) {
    640             sp<AMessage> notify = mNotify->dup();
    641             notify->setInt32("what", kWhatBinaryData);
    642 
    643             sp<ABuffer> data = new ABuffer(mTSQueue->size());
    644             memcpy(data->data(), rtp, mTSQueue->size());
    645 
    646             notify->setInt32("channel", mRTPChannel);
    647             notify->setBuffer("data", data);
    648             notify->post();
    649         } else {
    650             sendPacket(mRTPSessionID, rtp, mTSQueue->size());
    651 
    652 #if TRACK_BANDWIDTH
    653             mTotalBytesSent += mTSQueue->size();
    654             int64_t delayUs = ALooper::GetNowUs() - mFirstPacketTimeUs;
    655 
    656             if (delayUs > 0ll) {
    657                 ALOGI("approx. net bandwidth used: %.2f Mbit/sec",
    658                         mTotalBytesSent * 8.0 / delayUs);
    659             }
    660 #endif
    661         }
    662 
    663 #if ENABLE_RETRANSMISSION
    664         mTSQueue->setInt32Data(mRTPSeqNo - 1);
    665 
    666         mHistory.push_back(mTSQueue);
    667         ++mHistoryLength;
    668 
    669         if (mHistoryLength > kMaxHistoryLength) {
    670             mTSQueue = *mHistory.begin();
    671             mHistory.erase(mHistory.begin());
    672 
    673             --mHistoryLength;
    674         } else {
    675             mTSQueue = new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188);
    676         }
    677 #endif
    678 
    679         mTSQueue->setRange(0, 12);
    680     }
    681 
    682     return size;
    683 }
    684 
    685 void Sender::scheduleSendSR() {
    686     if (mSendSRPending || mRTCPSessionID == 0) {
    687         return;
    688     }
    689 
    690     mSendSRPending = true;
    691     (new AMessage(kWhatSendSR, id()))->post(kSendSRIntervalUs);
    692 }
    693 
    694 void Sender::addSR(const sp<ABuffer> &buffer) {
    695     uint8_t *data = buffer->data() + buffer->size();
    696 
    697     // TODO: Use macros/utility functions to clean up all the bitshifts below.
    698 
    699     data[0] = 0x80 | 0;
    700     data[1] = 200;  // SR
    701     data[2] = 0;
    702     data[3] = 6;
    703     data[4] = kSourceID >> 24;
    704     data[5] = (kSourceID >> 16) & 0xff;
    705     data[6] = (kSourceID >> 8) & 0xff;
    706     data[7] = kSourceID & 0xff;
    707 
    708     data[8] = mLastNTPTime >> (64 - 8);
    709     data[9] = (mLastNTPTime >> (64 - 16)) & 0xff;
    710     data[10] = (mLastNTPTime >> (64 - 24)) & 0xff;
    711     data[11] = (mLastNTPTime >> 32) & 0xff;
    712     data[12] = (mLastNTPTime >> 24) & 0xff;
    713     data[13] = (mLastNTPTime >> 16) & 0xff;
    714     data[14] = (mLastNTPTime >> 8) & 0xff;
    715     data[15] = mLastNTPTime & 0xff;
    716 
    717     data[16] = (mLastRTPTime >> 24) & 0xff;
    718     data[17] = (mLastRTPTime >> 16) & 0xff;
    719     data[18] = (mLastRTPTime >> 8) & 0xff;
    720     data[19] = mLastRTPTime & 0xff;
    721 
    722     data[20] = mNumRTPSent >> 24;
    723     data[21] = (mNumRTPSent >> 16) & 0xff;
    724     data[22] = (mNumRTPSent >> 8) & 0xff;
    725     data[23] = mNumRTPSent & 0xff;
    726 
    727     data[24] = mNumRTPOctetsSent >> 24;
    728     data[25] = (mNumRTPOctetsSent >> 16) & 0xff;
    729     data[26] = (mNumRTPOctetsSent >> 8) & 0xff;
    730     data[27] = mNumRTPOctetsSent & 0xff;
    731 
    732     buffer->setRange(buffer->offset(), buffer->size() + 28);
    733 }
    734 
    735 void Sender::addSDES(const sp<ABuffer> &buffer) {
    736     uint8_t *data = buffer->data() + buffer->size();
    737     data[0] = 0x80 | 1;
    738     data[1] = 202;  // SDES
    739     data[4] = kSourceID >> 24;
    740     data[5] = (kSourceID >> 16) & 0xff;
    741     data[6] = (kSourceID >> 8) & 0xff;
    742     data[7] = kSourceID & 0xff;
    743 
    744     size_t offset = 8;
    745 
    746     data[offset++] = 1;  // CNAME
    747 
    748     static const char *kCNAME = "someone@somewhere";
    749     data[offset++] = strlen(kCNAME);
    750 
    751     memcpy(&data[offset], kCNAME, strlen(kCNAME));
    752     offset += strlen(kCNAME);
    753 
    754     data[offset++] = 7;  // NOTE
    755 
    756     static const char *kNOTE = "Hell's frozen over.";
    757     data[offset++] = strlen(kNOTE);
    758 
    759     memcpy(&data[offset], kNOTE, strlen(kNOTE));
    760     offset += strlen(kNOTE);
    761 
    762     data[offset++] = 0;
    763 
    764     if ((offset % 4) > 0) {
    765         size_t count = 4 - (offset % 4);
    766         switch (count) {
    767             case 3:
    768                 data[offset++] = 0;
    769             case 2:
    770                 data[offset++] = 0;
    771             case 1:
    772                 data[offset++] = 0;
    773         }
    774     }
    775 
    776     size_t numWords = (offset / 4) - 1;
    777     data[2] = numWords >> 8;
    778     data[3] = numWords & 0xff;
    779 
    780     buffer->setRange(buffer->offset(), buffer->size() + offset);
    781 }
    782 
    783 // static
    784 uint64_t Sender::GetNowNTP() {
    785     uint64_t nowUs = ALooper::GetNowUs();
    786 
    787     nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll;
    788 
    789     uint64_t hi = nowUs / 1000000ll;
    790     uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll;
    791 
    792     return (hi << 32) | lo;
    793 }
    794 
    795 void Sender::onSendSR() {
    796     sp<ABuffer> buffer = new ABuffer(1500);
    797     buffer->setRange(0, 0);
    798 
    799     addSR(buffer);
    800     addSDES(buffer);
    801 
    802     if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) {
    803         sp<AMessage> notify = mNotify->dup();
    804         notify->setInt32("what", kWhatBinaryData);
    805         notify->setInt32("channel", mRTCPChannel);
    806         notify->setBuffer("data", buffer);
    807         notify->post();
    808     } else {
    809         sendPacket(mRTCPSessionID, buffer->data(), buffer->size());
    810     }
    811 
    812     ++mNumSRsSent;
    813 }
    814 
    815 #if ENABLE_RETRANSMISSION
    816 status_t Sender::parseTSFB(
    817         const uint8_t *data, size_t size) {
    818     if ((data[0] & 0x1f) != 1) {
    819         return ERROR_UNSUPPORTED;  // We only support NACK for now.
    820     }
    821 
    822     uint32_t srcId = U32_AT(&data[8]);
    823     if (srcId != kSourceID) {
    824         return ERROR_MALFORMED;
    825     }
    826 
    827     for (size_t i = 12; i < size; i += 4) {
    828         uint16_t seqNo = U16_AT(&data[i]);
    829         uint16_t blp = U16_AT(&data[i + 2]);
    830 
    831         List<sp<ABuffer> >::iterator it = mHistory.begin();
    832         bool foundSeqNo = false;
    833         while (it != mHistory.end()) {
    834             const sp<ABuffer> &buffer = *it;
    835 
    836             uint16_t bufferSeqNo = buffer->int32Data() & 0xffff;
    837 
    838             bool retransmit = false;
    839             if (bufferSeqNo == seqNo) {
    840                 retransmit = true;
    841             } else if (blp != 0) {
    842                 for (size_t i = 0; i < 16; ++i) {
    843                     if ((blp & (1 << i))
    844                         && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) {
    845                         blp &= ~(1 << i);
    846                         retransmit = true;
    847                     }
    848                 }
    849             }
    850 
    851             if (retransmit) {
    852                 ALOGI("retransmitting seqNo %d", bufferSeqNo);
    853 
    854                 sp<ABuffer> retransRTP = new ABuffer(2 + buffer->size());
    855                 uint8_t *rtp = retransRTP->data();
    856                 memcpy(rtp, buffer->data(), 12);
    857                 rtp[2] = (mRTPRetransmissionSeqNo >> 8) & 0xff;
    858                 rtp[3] = mRTPRetransmissionSeqNo & 0xff;
    859                 rtp[12] = (bufferSeqNo >> 8) & 0xff;
    860                 rtp[13] = bufferSeqNo & 0xff;
    861                 memcpy(&rtp[14], buffer->data() + 12, buffer->size() - 12);
    862 
    863                 ++mRTPRetransmissionSeqNo;
    864 
    865                 sendPacket(
    866                         mRTPRetransmissionSessionID,
    867                         retransRTP->data(), retransRTP->size());
    868 
    869                 if (bufferSeqNo == seqNo) {
    870                     foundSeqNo = true;
    871                 }
    872 
    873                 if (foundSeqNo && blp == 0) {
    874                     break;
    875                 }
    876             }
    877 
    878             ++it;
    879         }
    880 
    881         if (!foundSeqNo || blp != 0) {
    882             ALOGI("Some sequence numbers were no longer available for "
    883                   "retransmission");
    884         }
    885     }
    886 
    887     return OK;
    888 }
    889 #endif
    890 
    891 status_t Sender::parseRTCP(
    892         const sp<ABuffer> &buffer) {
    893     const uint8_t *data = buffer->data();
    894     size_t size = buffer->size();
    895 
    896     while (size > 0) {
    897         if (size < 8) {
    898             // Too short to be a valid RTCP header
    899             return ERROR_MALFORMED;
    900         }
    901 
    902         if ((data[0] >> 6) != 2) {
    903             // Unsupported version.
    904             return ERROR_UNSUPPORTED;
    905         }
    906 
    907         if (data[0] & 0x20) {
    908             // Padding present.
    909 
    910             size_t paddingLength = data[size - 1];
    911 
    912             if (paddingLength + 12 > size) {
    913                 // If we removed this much padding we'd end up with something
    914                 // that's too short to be a valid RTP header.
    915                 return ERROR_MALFORMED;
    916             }
    917 
    918             size -= paddingLength;
    919         }
    920 
    921         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
    922 
    923         if (size < headerLength) {
    924             // Only received a partial packet?
    925             return ERROR_MALFORMED;
    926         }
    927 
    928         switch (data[1]) {
    929             case 200:
    930             case 201:  // RR
    931             case 202:  // SDES
    932             case 203:
    933             case 204:  // APP
    934                 break;
    935 
    936 #if ENABLE_RETRANSMISSION
    937             case 205:  // TSFB (transport layer specific feedback)
    938                 parseTSFB(data, headerLength);
    939                 break;
    940 #endif
    941 
    942             case 206:  // PSFB (payload specific feedback)
    943                 hexdump(data, headerLength);
    944                 break;
    945 
    946             default:
    947             {
    948                 ALOGW("Unknown RTCP packet type %u of size %d",
    949                      (unsigned)data[1], headerLength);
    950                 break;
    951             }
    952         }
    953 
    954         data += headerLength;
    955         size -= headerLength;
    956     }
    957 
    958     return OK;
    959 }
    960 
    961 status_t Sender::sendPacket(
    962         int32_t sessionID, const void *data, size_t size) {
    963     return mNetSession->sendRequest(sessionID, data, size);
    964 }
    965 
    966 void Sender::notifyInitDone() {
    967     sp<AMessage> notify = mNotify->dup();
    968     notify->setInt32("what", kWhatInitDone);
    969     notify->post();
    970 }
    971 
    972 void Sender::notifySessionDead() {
    973     sp<AMessage> notify = mNotify->dup();
    974     notify->setInt32("what", kWhatSessionDead);
    975     notify->post();
    976 }
    977 
    978 }  // namespace android
    979 
    980