Home | History | Annotate | Download | only in libaah_rtp
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #define LOG_TAG "LibAAH_RTP"
     18 //#define LOG_NDEBUG 0
     19 #include <utils/Log.h>
     20 
     21 #include <fcntl.h>
     22 #include <poll.h>
     23 #include <sys/socket.h>
     24 #include <time.h>
     25 #include <utils/misc.h>
     26 
     27 #include <media/stagefright/Utils.h>
     28 
     29 #include "aah_rx_player.h"
     30 #include "aah_tx_packet.h"
     31 
     32 namespace android {
     33 
     34 const uint32_t AAH_RXPlayer::kRetransRequestMagic =
     35     FOURCC('T','r','e','q');
     36 const uint32_t AAH_RXPlayer::kRetransNAKMagic =
     37     FOURCC('T','n','a','k');
     38 const uint32_t AAH_RXPlayer::kFastStartRequestMagic =
     39     FOURCC('T','f','s','t');
     40 const uint32_t AAH_RXPlayer::kGapRerequestTimeoutUSec = 75000;
     41 const uint32_t AAH_RXPlayer::kFastStartTimeoutUSec = 800000;
     42 const uint32_t AAH_RXPlayer::kRTPActivityTimeoutUSec = 10000000;
     43 
     44 static inline int16_t fetchInt16(uint8_t* data) {
     45     return static_cast<int16_t>(U16_AT(data));
     46 }
     47 
     48 static inline int32_t fetchInt32(uint8_t* data) {
     49     return static_cast<int32_t>(U32_AT(data));
     50 }
     51 
     52 static inline int64_t fetchInt64(uint8_t* data) {
     53     return static_cast<int64_t>(U64_AT(data));
     54 }
     55 
     56 uint64_t AAH_RXPlayer::monotonicUSecNow() {
     57     struct timespec now;
     58     int res = clock_gettime(CLOCK_MONOTONIC, &now);
     59     CHECK(res >= 0);
     60 
     61     uint64_t ret = static_cast<uint64_t>(now.tv_sec) * 1000000;
     62     ret += now.tv_nsec / 1000;
     63 
     64     return ret;
     65 }
     66 
     67 status_t AAH_RXPlayer::startWorkThread() {
     68     status_t res;
     69     stopWorkThread();
     70     res = thread_wrapper_->run("TRX_Player", PRIORITY_AUDIO);
     71 
     72     if (res != OK) {
     73         ALOGE("Failed to start work thread (res = %d)", res);
     74     }
     75 
     76     return res;
     77 }
     78 
     79 void AAH_RXPlayer::stopWorkThread() {
     80     thread_wrapper_->requestExit();  // set the exit pending flag
     81     wakeup_work_thread_evt_.setEvent();
     82 
     83     status_t res;
     84     res = thread_wrapper_->requestExitAndWait(); // block until thread exit.
     85     if (res != OK) {
     86         ALOGE("Failed to stop work thread (res = %d)", res);
     87     }
     88 
     89     wakeup_work_thread_evt_.clearPendingEvents();
     90 }
     91 
     92 void AAH_RXPlayer::cleanupSocket() {
     93     if (sock_fd_ >= 0) {
     94         if (multicast_joined_) {
     95             int res;
     96             struct ip_mreq mreq;
     97             mreq.imr_multiaddr = listen_addr_.sin_addr;
     98             mreq.imr_interface.s_addr = htonl(INADDR_ANY);
     99             res = setsockopt(sock_fd_,
    100                              IPPROTO_IP,
    101                              IP_DROP_MEMBERSHIP,
    102                              &mreq, sizeof(mreq));
    103             if (res < 0) {
    104                 ALOGW("Failed to leave multicast group. (%d, %d)", res, errno);
    105             }
    106             multicast_joined_ = false;
    107         }
    108 
    109         close(sock_fd_);
    110         sock_fd_ = -1;
    111     }
    112 
    113     resetPipeline();
    114 }
    115 
    116 void AAH_RXPlayer::resetPipeline() {
    117     ring_buffer_.reset();
    118 
    119     // Explicitly shudown all of the active substreams, then call clear out the
    120     // collection.  Failure to clear out a substream can result in its decoder
    121     // holding a reference to itself and therefor not going away when the
    122     // collection is cleared.
    123     for (size_t i = 0; i < substreams_.size(); ++i)
    124         substreams_.valueAt(i)->shutdown();
    125 
    126     substreams_.clear();
    127 
    128     current_gap_status_ = kGS_NoGap;
    129 }
    130 
    131 bool AAH_RXPlayer::setupSocket() {
    132     long flags;
    133     int  res, buf_size;
    134     socklen_t opt_size;
    135 
    136     cleanupSocket();
    137     CHECK(sock_fd_ < 0);
    138 
    139     // Make the socket
    140     sock_fd_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
    141     if (sock_fd_ < 0) {
    142         ALOGE("Failed to create listen socket (errno %d)", errno);
    143         goto bailout;
    144     }
    145 
    146     // Set non-blocking operation
    147     flags = fcntl(sock_fd_, F_GETFL);
    148     res   = fcntl(sock_fd_, F_SETFL, flags | O_NONBLOCK);
    149     if (res < 0) {
    150         ALOGE("Failed to set socket (%d) to non-blocking mode (errno %d)",
    151               sock_fd_, errno);
    152         goto bailout;
    153     }
    154 
    155     // Bind to our port
    156     struct sockaddr_in bind_addr;
    157     memset(&bind_addr, 0, sizeof(bind_addr));
    158     bind_addr.sin_family = AF_INET;
    159     bind_addr.sin_addr.s_addr = INADDR_ANY;
    160     bind_addr.sin_port = listen_addr_.sin_port;
    161     res = bind(sock_fd_,
    162                reinterpret_cast<const sockaddr*>(&bind_addr),
    163                sizeof(bind_addr));
    164     if (res < 0) {
    165         uint32_t a = ntohl(bind_addr.sin_addr.s_addr);
    166         uint16_t p = ntohs(bind_addr.sin_port);
    167         ALOGE("Failed to bind socket (%d) to %d.%d.%d.%d:%hd. (errno %d)",
    168               sock_fd_,
    169               (a >> 24) & 0xFF,
    170               (a >> 16) & 0xFF,
    171               (a >>  8) & 0xFF,
    172               (a      ) & 0xFF,
    173               p,
    174               errno);
    175 
    176         goto bailout;
    177     }
    178 
    179     buf_size = 1 << 16;   // 64k
    180     res = setsockopt(sock_fd_,
    181                      SOL_SOCKET, SO_RCVBUF,
    182                      &buf_size, sizeof(buf_size));
    183     if (res < 0) {
    184         ALOGW("Failed to increase socket buffer size to %d.  (errno %d)",
    185               buf_size, errno);
    186     }
    187 
    188     buf_size = 0;
    189     opt_size = sizeof(buf_size);
    190     res = getsockopt(sock_fd_,
    191                      SOL_SOCKET, SO_RCVBUF,
    192                      &buf_size, &opt_size);
    193     if (res < 0) {
    194         ALOGW("Failed to fetch socket buffer size.  (errno %d)", errno);
    195     } else {
    196         ALOGI("RX socket buffer size is now %d bytes",  buf_size);
    197     }
    198 
    199     if (listen_addr_.sin_addr.s_addr) {
    200         // Join the multicast group and we should be good to go.
    201         struct ip_mreq mreq;
    202         mreq.imr_multiaddr = listen_addr_.sin_addr;
    203         mreq.imr_interface.s_addr = htonl(INADDR_ANY);
    204         res = setsockopt(sock_fd_,
    205                          IPPROTO_IP,
    206                          IP_ADD_MEMBERSHIP,
    207                          &mreq, sizeof(mreq));
    208         if (res < 0) {
    209             ALOGE("Failed to join multicast group. (errno %d)", errno);
    210             goto bailout;
    211         }
    212         multicast_joined_ = true;
    213     }
    214 
    215     return true;
    216 
    217 bailout:
    218     cleanupSocket();
    219     return false;
    220 }
    221 
    222 bool AAH_RXPlayer::threadLoop() {
    223     struct pollfd poll_fds[2];
    224     bool process_more_right_now = false;
    225 
    226     if (!setupSocket()) {
    227         sendEvent(MEDIA_ERROR);
    228         goto bailout;
    229     }
    230 
    231     while (!thread_wrapper_->exitPending()) {
    232         // Step 1: Wait until there is something to do.
    233         int gap_timeout = computeNextGapRetransmitTimeout();
    234         int ring_timeout = ring_buffer_.computeInactivityTimeout();
    235         int timeout = -1;
    236 
    237         if (!ring_timeout) {
    238             ALOGW("RTP inactivity timeout reached, resetting pipeline.");
    239             resetPipeline();
    240             timeout = gap_timeout;
    241         } else {
    242             if (gap_timeout < 0) {
    243                 timeout = ring_timeout;
    244             } else if (ring_timeout < 0) {
    245                 timeout = gap_timeout;
    246             } else {
    247                 timeout = (gap_timeout < ring_timeout) ? gap_timeout
    248                                                        : ring_timeout;
    249             }
    250         }
    251 
    252         if ((0 != timeout) && (!process_more_right_now)) {
    253             // Set up the events to wait on.  Start with the wakeup pipe.
    254             memset(&poll_fds, 0, sizeof(poll_fds));
    255             poll_fds[0].fd     = wakeup_work_thread_evt_.getWakeupHandle();
    256             poll_fds[0].events = POLLIN;
    257 
    258             // Add the RX socket.
    259             poll_fds[1].fd     = sock_fd_;
    260             poll_fds[1].events = POLLIN;
    261 
    262             // Wait for something interesing to happen.
    263             int poll_res = poll(poll_fds, NELEM(poll_fds), timeout);
    264             if (poll_res < 0) {
    265                 ALOGE("Fatal error (%d,%d) while waiting on events",
    266                       poll_res, errno);
    267                 sendEvent(MEDIA_ERROR);
    268                 goto bailout;
    269             }
    270         }
    271 
    272         if (thread_wrapper_->exitPending()) {
    273             break;
    274         }
    275 
    276         wakeup_work_thread_evt_.clearPendingEvents();
    277         process_more_right_now = false;
    278 
    279         // Step 2: Do we have data waiting in the socket?  If so, drain the
    280         // socket moving valid RTP information into the ring buffer to be
    281         // processed.
    282         if (poll_fds[1].revents) {
    283             struct sockaddr_in from;
    284             socklen_t from_len;
    285 
    286             ssize_t res = 0;
    287             while (!thread_wrapper_->exitPending()) {
    288                 // Check the size of any pending packet.
    289                 res = recv(sock_fd_, NULL, 0, MSG_PEEK | MSG_TRUNC);
    290 
    291                 // Error?
    292                 if (res < 0) {
    293                     // If the error is anything other than would block,
    294                     // something has gone very wrong.
    295                     if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
    296                         ALOGE("Fatal socket error during recvfrom (%d, %d)",
    297                               (int)res, errno);
    298                         goto bailout;
    299                     }
    300 
    301                     // Socket is out of data, just break out of processing and
    302                     // wait for more.
    303                     break;
    304                 }
    305 
    306                 // Allocate a payload.
    307                 PacketBuffer* pb = PacketBuffer::allocate(res);
    308                 if (NULL == pb) {
    309                     ALOGE("Fatal error, failed to allocate packet buffer of"
    310                           " length %u", static_cast<uint32_t>(res));
    311                     goto bailout;
    312                 }
    313 
    314                 // Fetch the data.
    315                 from_len = sizeof(from);
    316                 res = recvfrom(sock_fd_, pb->data_, pb->length_, 0,
    317                                reinterpret_cast<struct sockaddr*>(&from),
    318                                &from_len);
    319                 if (res != pb->length_) {
    320                     ALOGE("Fatal error, fetched packet length (%d) does not"
    321                           " match peeked packet length (%u).  This should never"
    322                           " happen.  (errno = %d)",
    323                           static_cast<int>(res),
    324                           static_cast<uint32_t>(pb->length_),
    325                           errno);
    326                 }
    327 
    328                 bool drop_packet = false;
    329                 if (transmitter_known_) {
    330                     if (from.sin_addr.s_addr !=
    331                         transmitter_addr_.sin_addr.s_addr) {
    332                         uint32_t a = ntohl(from.sin_addr.s_addr);
    333                         uint16_t p = ntohs(from.sin_port);
    334                         ALOGV("Dropping packet from unknown transmitter"
    335                               " %u.%u.%u.%u:%hu",
    336                               ((a >> 24) & 0xFF),
    337                               ((a >> 16) & 0xFF),
    338                               ((a >>  8) & 0xFF),
    339                               ( a        & 0xFF),
    340                               p);
    341 
    342                         drop_packet = true;
    343                     } else {
    344                         transmitter_addr_.sin_port = from.sin_port;
    345                     }
    346                 } else {
    347                     memcpy(&transmitter_addr_, &from, sizeof(from));
    348                     transmitter_known_ = true;
    349                 }
    350 
    351                 if (!drop_packet) {
    352                     bool serious_error = !processRX(pb);
    353 
    354                     if (serious_error) {
    355                         // Something went "seriously wrong".  Currently, the
    356                         // only trigger for this should be a ring buffer
    357                         // overflow.  The current failsafe behavior for when
    358                         // something goes seriously wrong is to just reset the
    359                         // pipeline.  The system should behave as if this
    360                         // AAH_RXPlayer was just set up for the first time.
    361                         ALOGE("Something just went seriously wrong with the"
    362                               " pipeline.  Resetting.");
    363                         resetPipeline();
    364                     }
    365                 } else {
    366                     PacketBuffer::destroy(pb);
    367                 }
    368             }
    369         }
    370 
    371         // Step 3: Process any data we mave have accumulated in the ring buffer
    372         // so far.
    373         if (!thread_wrapper_->exitPending()) {
    374             processRingBuffer();
    375         }
    376 
    377         // Step 4: At this point in time, the ring buffer should either be
    378         // empty, or stalled in front of a gap caused by some dropped packets.
    379         // Check on the current gap situation and deal with it in an appropriate
    380         // fashion.  If processGaps returns true, it means that it has given up
    381         // on a gap and that we should try to process some more data
    382         // immediately.
    383         if (!thread_wrapper_->exitPending()) {
    384             process_more_right_now = processGaps();
    385         }
    386 
    387         // Step 5: Check for fatal errors.  If any of our substreams has
    388         // encountered a fatal, unrecoverable, error, then propagate the error
    389         // up to user level and shut down.
    390         for (size_t i = 0; i < substreams_.size(); ++i) {
    391             status_t status;
    392             CHECK(substreams_.valueAt(i) != NULL);
    393 
    394             status = substreams_.valueAt(i)->getStatus();
    395             if (OK != status) {
    396                 ALOGE("Substream index %d has encountered an unrecoverable"
    397                       " error (%d).  Signalling application level and shutting"
    398                       " down.", i, status);
    399                 sendEvent(MEDIA_ERROR);
    400                 goto bailout;
    401             }
    402         }
    403     }
    404 
    405 bailout:
    406     cleanupSocket();
    407     return false;
    408 }
    409 
    410 bool AAH_RXPlayer::processRX(PacketBuffer* pb) {
    411     CHECK(NULL != pb);
    412 
    413     uint8_t* data = pb->data_;
    414     ssize_t  amt  = pb->length_;
    415     uint32_t nak_magic;
    416     uint16_t seq_no;
    417     uint32_t epoch;
    418 
    419     // Every packet either starts with an RTP header which is at least 12 bytes
    420     // long or is a retry NAK which is 14 bytes long.  If there are fewer than
    421     // 12 bytes here, this cannot be a proper RTP packet.
    422     if (amt < 12) {
    423         ALOGV("Dropping packet, too short to contain RTP header (%u bytes)",
    424               static_cast<uint32_t>(amt));
    425         goto drop_packet;
    426     }
    427 
    428     // Check to see if this is the special case of a NAK packet.
    429     nak_magic = ntohl(*(reinterpret_cast<uint32_t*>(data)));
    430     if (nak_magic == kRetransNAKMagic) {
    431         // Looks like a NAK packet; make sure its long enough.
    432 
    433         if (amt < static_cast<ssize_t>(sizeof(RetransRequest))) {
    434             ALOGV("Dropping packet, too short to contain NAK payload"
    435                   " (%u bytes)", static_cast<uint32_t>(amt));
    436             goto drop_packet;
    437         }
    438 
    439         SeqNoGap gap;
    440         RetransRequest* rtr = reinterpret_cast<RetransRequest*>(data);
    441         gap.start_seq_ = ntohs(rtr->start_seq_);
    442         gap.end_seq_   = ntohs(rtr->end_seq_);
    443 
    444         ALOGV("Process NAK for gap at [%hu, %hu]",
    445                 gap.start_seq_, gap.end_seq_);
    446         ring_buffer_.processNAK(&gap);
    447 
    448         return true;
    449     }
    450 
    451     // According to the TRTP spec, version should be 2, padding should be 0,
    452     // extension should be 0 and CSRCCnt should be 0.  If any of these tests
    453     // fail, we chuck the packet.
    454     if (data[0] != 0x80) {
    455         ALOGV("Dropping packet, bad V/P/X/CSRCCnt field (0x%02x)",
    456               data[0]);
    457         goto drop_packet;
    458     }
    459 
    460     // Check the payload type.  For TRTP, it should always be 100.
    461     if ((data[1] & 0x7F) != 100) {
    462         ALOGV("Dropping packet, bad payload type. (%u)",
    463               data[1] & 0x7F);
    464         goto drop_packet;
    465     }
    466 
    467     // Check whether the transmitter has begun a new epoch.
    468     epoch = (U32_AT(data + 8) >> 10) & 0x3FFFFF;
    469     if (current_epoch_known_) {
    470         if (epoch != current_epoch_) {
    471             ALOGV("%s: new epoch %u", __PRETTY_FUNCTION__, epoch);
    472             current_epoch_ = epoch;
    473             resetPipeline();
    474         }
    475     } else {
    476         current_epoch_ = epoch;
    477         current_epoch_known_ = true;
    478     }
    479 
    480     // Extract the sequence number and hand the packet off to the ring buffer
    481     // for dropped packet detection and later processing.
    482     seq_no = U16_AT(data + 2);
    483     return ring_buffer_.pushBuffer(pb, seq_no);
    484 
    485 drop_packet:
    486     PacketBuffer::destroy(pb);
    487     return true;
    488 }
    489 
    490 void AAH_RXPlayer::processRingBuffer() {
    491     PacketBuffer* pb;
    492     bool is_discon;
    493     sp<Substream> substream;
    494     LinearTransform trans;
    495     bool foundTrans = false;
    496 
    497     while (NULL != (pb = ring_buffer_.fetchBuffer(&is_discon))) {
    498         if (is_discon) {
    499             // Abort all partially assembled payloads.
    500             for (size_t i = 0; i < substreams_.size(); ++i) {
    501                 CHECK(substreams_.valueAt(i) != NULL);
    502                 substreams_.valueAt(i)->cleanupBufferInProgress();
    503             }
    504         }
    505 
    506         uint8_t* data = pb->data_;
    507         ssize_t  amt  = pb->length_;
    508 
    509         // Should not have any non-RTP packets in the ring buffer.  RTP packets
    510         // must be at least 12 bytes long.
    511         CHECK(amt >= 12);
    512 
    513         // Extract the marker bit and the SSRC field.
    514         bool     marker = (data[1] & 0x80) != 0;
    515         uint32_t ssrc   = U32_AT(data + 8);
    516 
    517         // Is this the start of a new TRTP payload?  If so, the marker bit
    518         // should be set and there are some things we should be checking for.
    519         if (marker) {
    520             // TRTP headers need to have at least a byte for version, a byte for
    521             // payload type and flags, and 4 bytes for length.
    522             if (amt < 18) {
    523                 ALOGV("Dropping packet, too short to contain TRTP header"
    524                       " (%u bytes)", static_cast<uint32_t>(amt));
    525                 goto process_next_packet;
    526             }
    527 
    528             // Check the TRTP version and extract the payload type/flags.
    529             uint8_t trtp_version =  data[12];
    530             uint8_t payload_type = (data[13] >> 4) & 0xF;
    531             uint8_t trtp_flags   =  data[13]       & 0xF;
    532 
    533             if (1 != trtp_version) {
    534                 ALOGV("Dropping packet, bad trtp version %hhu", trtp_version);
    535                 goto process_next_packet;
    536             }
    537 
    538             // Is there a timestamp transformation present on this packet?  If
    539             // so, extract it and pass it to the appropriate substreams.
    540             if (trtp_flags & 0x02) {
    541                 ssize_t offset = 18 + ((trtp_flags & 0x01) ? 4 : 0);
    542                 if (amt < (offset + 24)) {
    543                     ALOGV("Dropping packet, too short to contain TRTP Timestamp"
    544                           " Transformation (%u bytes)",
    545                           static_cast<uint32_t>(amt));
    546                     goto process_next_packet;
    547                 }
    548 
    549                 trans.a_zero = fetchInt64(data + offset);
    550                 trans.b_zero = fetchInt64(data + offset + 16);
    551                 trans.a_to_b_numer = static_cast<int32_t>(
    552                         fetchInt32 (data + offset + 8));
    553                 trans.a_to_b_denom = U32_AT(data + offset + 12);
    554                 foundTrans = true;
    555 
    556                 uint32_t program_id = (ssrc >> 5) & 0x1F;
    557                 for (size_t i = 0; i < substreams_.size(); ++i) {
    558                     sp<Substream> iter = substreams_.valueAt(i);
    559                     CHECK(iter != NULL);
    560 
    561                     if (iter->getProgramID() == program_id) {
    562                         iter->processTSTransform(trans);
    563                     }
    564                 }
    565             }
    566 
    567             // Is this a command packet?  If so, its not necessarily associate
    568             // with one particular substream.  Just give it to the command
    569             // packet handler and then move on.
    570             if (4 == payload_type) {
    571                 processCommandPacket(pb);
    572                 goto process_next_packet;
    573             }
    574         }
    575 
    576         // If we got to here, then we are a normal packet.  Find (or allocate)
    577         // the substream we belong to and send the packet off to be processed.
    578         substream = substreams_.valueFor(ssrc);
    579         if (substream == NULL) {
    580             substream = new Substream(ssrc, omx_);
    581             if (substream == NULL) {
    582                 ALOGE("Failed to allocate substream for SSRC 0x%08x", ssrc);
    583                 goto process_next_packet;
    584             }
    585             substreams_.add(ssrc, substream);
    586 
    587             if (foundTrans) {
    588                 substream->processTSTransform(trans);
    589             }
    590         }
    591 
    592         CHECK(substream != NULL);
    593 
    594         if (marker) {
    595             // Start of a new TRTP payload for this substream.  Extract the
    596             // lower 32 bits of the timestamp and hand the buffer to the
    597             // substream for processing.
    598             uint32_t ts_lower = U32_AT(data + 4);
    599             substream->processPayloadStart(data + 12, amt - 12, ts_lower);
    600         } else {
    601             // Continuation of an existing TRTP payload.  Just hand it off to
    602             // the substream for processing.
    603             substream->processPayloadCont(data + 12, amt - 12);
    604         }
    605 
    606 process_next_packet:
    607         PacketBuffer::destroy(pb);
    608     }  // end of main processing while loop.
    609 }
    610 
    611 void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) {
    612     CHECK(NULL != pb);
    613 
    614     uint8_t* data = pb->data_;
    615     ssize_t  amt  = pb->length_;
    616 
    617     // verify that this packet meets the minimum length of a command packet
    618     if (amt < 20) {
    619         return;
    620     }
    621 
    622     uint8_t trtp_version =  data[12];
    623     uint8_t trtp_flags   =  data[13]       & 0xF;
    624 
    625     if (1 != trtp_version) {
    626         ALOGV("Dropping packet, bad trtp version %hhu", trtp_version);
    627         return;
    628     }
    629 
    630     // calculate the start of the command payload
    631     ssize_t offset = 18;
    632     if (trtp_flags & 0x01) {
    633         // timestamp is present (4 bytes)
    634         offset += 4;
    635     }
    636     if (trtp_flags & 0x02) {
    637         // transform is present (24 bytes)
    638         offset += 24;
    639     }
    640 
    641     // the packet must contain 2 bytes of command payload beyond the TRTP header
    642     if (amt < offset + 2) {
    643         return;
    644     }
    645 
    646     uint16_t command_id = U16_AT(data + offset);
    647 
    648     switch (command_id) {
    649         case TRTPControlPacket::kCommandNop:
    650             break;
    651 
    652         case TRTPControlPacket::kCommandEOS:
    653         case TRTPControlPacket::kCommandFlush: {
    654             uint16_t program_id = (U32_AT(data + 8) >> 5) & 0x1F;
    655             ALOGI("*** %s flushing program_id=%d",
    656                   __PRETTY_FUNCTION__, program_id);
    657 
    658             Vector<uint32_t> substreams_to_remove;
    659             for (size_t i = 0; i < substreams_.size(); ++i) {
    660                 sp<Substream> iter = substreams_.valueAt(i);
    661                 if (iter->getProgramID() == program_id) {
    662                     iter->shutdown();
    663                     substreams_to_remove.add(iter->getSSRC());
    664                 }
    665             }
    666 
    667             for (size_t i = 0; i < substreams_to_remove.size(); ++i) {
    668                 substreams_.removeItem(substreams_to_remove[i]);
    669             }
    670         } break;
    671     }
    672 }
    673 
    674 bool AAH_RXPlayer::processGaps() {
    675     // Deal with the current gap situation.  Specifically...
    676     //
    677     // 1) If a new gap has shown up, send a retransmit request to the
    678     //    transmitter.
    679     // 2) If a gap we were working on has had a packet in the middle or at
    680     //    the end filled in, send another retransmit request for the begining
    681     //    portion of the gap.  TRTP was designed for LANs where packet
    682     //    re-ordering is very unlikely; so see the middle or end of a gap
    683     //    filled in before the begining is an almost certain indication that
    684     //    a retransmission packet was also dropped.
    685     // 3) If we have been working on a gap for a while and it still has not
    686     //    been filled in, send another retransmit request.
    687     // 4) If the are no more gaps in the ring, clear the current_gap_status_
    688     //    flag to indicate that all is well again.
    689 
    690     // Start by fetching the active gap status.
    691     SeqNoGap gap;
    692     bool send_retransmit_request = false;
    693     bool ret_val = false;
    694     GapStatus gap_status;
    695     if (kGS_NoGap != (gap_status = ring_buffer_.fetchCurrentGap(&gap))) {
    696         // Note: checking for a change in the end sequence number should cover
    697         // moving on to an entirely new gap for case #1 as well as resending the
    698         // begining of a gap range for case #2.
    699         send_retransmit_request = (kGS_NoGap == current_gap_status_) ||
    700                                   (current_gap_.end_seq_ != gap.end_seq_);
    701 
    702         // If this is the same gap we have been working on, and it has timed
    703         // out, then check to see if our substreams are about to underflow.  If
    704         // so, instead of sending another retransmit request, just give up on
    705         // this gap and move on.
    706         if (!send_retransmit_request &&
    707            (kGS_NoGap != current_gap_status_) &&
    708            (0 == computeNextGapRetransmitTimeout())) {
    709 
    710             // If out current gap is the fast-start gap, don't bother to skip it
    711             // because substreams look like the are about to underflow.
    712             if ((kGS_FastStartGap != gap_status) ||
    713                 (current_gap_.end_seq_ != gap.end_seq_)) {
    714                 for (size_t i = 0; i < substreams_.size(); ++i) {
    715                     if (substreams_.valueAt(i)->isAboutToUnderflow()) {
    716                         ALOGV("About to underflow, giving up on gap [%hu, %hu]",
    717                               gap.start_seq_, gap.end_seq_);
    718                         ring_buffer_.processNAK();
    719                         current_gap_status_ = kGS_NoGap;
    720                         return true;
    721                     }
    722                 }
    723             }
    724 
    725             // Looks like no one is about to underflow.  Just go ahead and send
    726             // the request.
    727             send_retransmit_request = true;
    728         }
    729     } else {
    730         current_gap_status_ = kGS_NoGap;
    731     }
    732 
    733     if (send_retransmit_request) {
    734         // If we have been working on a fast start, and it is still not filled
    735         // in, even after the extended retransmit time out, give up and skip it.
    736         // The system should fall back into its normal slow-start behavior.
    737         if ((kGS_FastStartGap == current_gap_status_) &&
    738             (current_gap_.end_seq_ == gap.end_seq_)) {
    739             ALOGV("Fast start is taking forever; giving up.");
    740             ring_buffer_.processNAK();
    741             current_gap_status_ = kGS_NoGap;
    742             return true;
    743         }
    744 
    745         // Send the request.
    746         RetransRequest req;
    747         uint32_t magic  = (kGS_FastStartGap == gap_status)
    748                         ? kFastStartRequestMagic
    749                         : kRetransRequestMagic;
    750         req.magic_      = htonl(magic);
    751         req.mcast_ip_   = listen_addr_.sin_addr.s_addr;
    752         req.mcast_port_ = listen_addr_.sin_port;
    753         req.start_seq_  = htons(gap.start_seq_);
    754         req.end_seq_    = htons(gap.end_seq_);
    755 
    756         {
    757             uint32_t a = ntohl(transmitter_addr_.sin_addr.s_addr);
    758             uint16_t p = ntohs(transmitter_addr_.sin_port);
    759             ALOGV("Sending to transmitter %u.%u.%u.%u:%hu",
    760                     ((a >> 24) & 0xFF),
    761                     ((a >> 16) & 0xFF),
    762                     ((a >>  8) & 0xFF),
    763                     ( a        & 0xFF),
    764                     p);
    765         }
    766 
    767         int res = sendto(sock_fd_, &req, sizeof(req), 0,
    768                          reinterpret_cast<struct sockaddr*>(&transmitter_addr_),
    769                          sizeof(transmitter_addr_));
    770         if (res < 0) {
    771             ALOGE("Error when sending retransmit request (%d)", errno);
    772         } else {
    773             ALOGV("%s request for range [%hu, %hu] sent",
    774                   (kGS_FastStartGap == gap_status) ? "Fast Start"
    775                                                    : "Retransmit",
    776                   gap.start_seq_, gap.end_seq_);
    777         }
    778 
    779         // Update the current gap info.
    780         current_gap_ = gap;
    781         current_gap_status_ = gap_status;
    782         next_retrans_req_time_ = monotonicUSecNow() +
    783                                ((kGS_FastStartGap == current_gap_status_)
    784                                 ? kFastStartTimeoutUSec
    785                                 : kGapRerequestTimeoutUSec);
    786     }
    787 
    788     return false;
    789 }
    790 
    791 // Compute when its time to send the next gap retransmission in milliseconds.
    792 // Returns < 0 for an infinite timeout (no gap) and 0 if its time to retransmit
    793 // right now.
    794 int AAH_RXPlayer::computeNextGapRetransmitTimeout() {
    795     if (kGS_NoGap == current_gap_status_) {
    796         return -1;
    797     }
    798 
    799     int64_t timeout_delta = next_retrans_req_time_ - monotonicUSecNow();
    800 
    801     timeout_delta /= 1000;
    802     if (timeout_delta <= 0) {
    803         return 0;
    804     }
    805 
    806     return static_cast<uint32_t>(timeout_delta);
    807 }
    808 
    809 }  // namespace android
    810