Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #ifndef MY_HANDLER_H_
     18 
     19 #define MY_HANDLER_H_
     20 
     21 //#define LOG_NDEBUG 0
     22 #define LOG_TAG "MyHandler"
     23 #include <utils/Log.h>
     24 
     25 #include "APacketSource.h"
     26 #include "ARTPConnection.h"
     27 #include "ARTSPConnection.h"
     28 #include "ASessionDescription.h"
     29 
     30 #include <ctype.h>
     31 #include <cutils/properties.h>
     32 
     33 #include <media/stagefright/foundation/ABuffer.h>
     34 #include <media/stagefright/foundation/ADebug.h>
     35 #include <media/stagefright/foundation/ALooper.h>
     36 #include <media/stagefright/foundation/AMessage.h>
     37 #include <media/stagefright/MediaErrors.h>
     38 
     39 #include <arpa/inet.h>
     40 #include <sys/socket.h>
     41 #include <netdb.h>
     42 
     43 #include "HTTPBase.h"
     44 
     45 // If no access units are received within 5 secs, assume that the rtp
     46 // stream has ended and signal end of stream.
     47 static int64_t kAccessUnitTimeoutUs = 10000000ll;
     48 
     49 // If no access units arrive for the first 10 secs after starting the
     50 // stream, assume none ever will and signal EOS or switch transports.
     51 static int64_t kStartupTimeoutUs = 10000000ll;
     52 
     53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
     54 
     55 namespace android {
     56 
     57 static void MakeUserAgentString(AString *s) {
     58     s->setTo("stagefright/1.1 (Linux;Android ");
     59 
     60 #if (PROPERTY_VALUE_MAX < 8)
     61 #error "PROPERTY_VALUE_MAX must be at least 8"
     62 #endif
     63 
     64     char value[PROPERTY_VALUE_MAX];
     65     property_get("ro.build.version.release", value, "Unknown");
     66     s->append(value);
     67     s->append(")");
     68 }
     69 
     70 static bool GetAttribute(const char *s, const char *key, AString *value) {
     71     value->clear();
     72 
     73     size_t keyLen = strlen(key);
     74 
     75     for (;;) {
     76         while (isspace(*s)) {
     77             ++s;
     78         }
     79 
     80         const char *colonPos = strchr(s, ';');
     81 
     82         size_t len =
     83             (colonPos == NULL) ? strlen(s) : colonPos - s;
     84 
     85         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
     86             value->setTo(&s[keyLen + 1], len - keyLen - 1);
     87             return true;
     88         }
     89 
     90         if (colonPos == NULL) {
     91             return false;
     92         }
     93 
     94         s = colonPos + 1;
     95     }
     96 }
     97 
     98 struct MyHandler : public AHandler {
     99     enum {
    100         kWhatConnected                  = 'conn',
    101         kWhatDisconnected               = 'disc',
    102         kWhatSeekDone                   = 'sdon',
    103 
    104         kWhatAccessUnit                 = 'accU',
    105         kWhatEOS                        = 'eos!',
    106         kWhatSeekDiscontinuity          = 'seeD',
    107         kWhatNormalPlayTimeMapping      = 'nptM',
    108     };
    109 
    110     MyHandler(
    111             const char *url,
    112             const sp<AMessage> &notify,
    113             bool uidValid = false, uid_t uid = 0)
    114         : mNotify(notify),
    115           mUIDValid(uidValid),
    116           mUID(uid),
    117           mNetLooper(new ALooper),
    118           mConn(new ARTSPConnection(mUIDValid, mUID)),
    119           mRTPConn(new ARTPConnection),
    120           mOriginalSessionURL(url),
    121           mSessionURL(url),
    122           mSetupTracksSuccessful(false),
    123           mSeekPending(false),
    124           mFirstAccessUnit(true),
    125           mNTPAnchorUs(-1),
    126           mMediaAnchorUs(-1),
    127           mLastMediaTimeUs(0),
    128           mNumAccessUnitsReceived(0),
    129           mCheckPending(false),
    130           mCheckGeneration(0),
    131           mTryTCPInterleaving(false),
    132           mTryFakeRTCP(false),
    133           mReceivedFirstRTCPPacket(false),
    134           mReceivedFirstRTPPacket(false),
    135           mSeekable(false),
    136           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
    137           mKeepAliveGeneration(0) {
    138         mNetLooper->setName("rtsp net");
    139         mNetLooper->start(false /* runOnCallingThread */,
    140                           false /* canCallJava */,
    141                           PRIORITY_HIGHEST);
    142 
    143         // Strip any authentication info from the session url, we don't
    144         // want to transmit user/pass in cleartext.
    145         AString host, path, user, pass;
    146         unsigned port;
    147         CHECK(ARTSPConnection::ParseURL(
    148                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
    149 
    150         if (user.size() > 0) {
    151             mSessionURL.clear();
    152             mSessionURL.append("rtsp://");
    153             mSessionURL.append(host);
    154             mSessionURL.append(":");
    155             mSessionURL.append(StringPrintf("%u", port));
    156             mSessionURL.append(path);
    157 
    158             LOGI("rewritten session url: '%s'", mSessionURL.c_str());
    159         }
    160 
    161         mSessionHost = host;
    162     }
    163 
    164     void connect() {
    165         looper()->registerHandler(mConn);
    166         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    167 
    168         sp<AMessage> notify = new AMessage('biny', id());
    169         mConn->observeBinaryData(notify);
    170 
    171         sp<AMessage> reply = new AMessage('conn', id());
    172         mConn->connect(mOriginalSessionURL.c_str(), reply);
    173     }
    174 
    175     void disconnect() {
    176         (new AMessage('abor', id()))->post();
    177     }
    178 
    179     void seek(int64_t timeUs) {
    180         sp<AMessage> msg = new AMessage('seek', id());
    181         msg->setInt64("time", timeUs);
    182         msg->post();
    183     }
    184 
    185     static void addRR(const sp<ABuffer> &buf) {
    186         uint8_t *ptr = buf->data() + buf->size();
    187         ptr[0] = 0x80 | 0;
    188         ptr[1] = 201;  // RR
    189         ptr[2] = 0;
    190         ptr[3] = 1;
    191         ptr[4] = 0xde;  // SSRC
    192         ptr[5] = 0xad;
    193         ptr[6] = 0xbe;
    194         ptr[7] = 0xef;
    195 
    196         buf->setRange(0, buf->size() + 8);
    197     }
    198 
    199     static void addSDES(int s, const sp<ABuffer> &buffer) {
    200         struct sockaddr_in addr;
    201         socklen_t addrSize = sizeof(addr);
    202         CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
    203 
    204         uint8_t *data = buffer->data() + buffer->size();
    205         data[0] = 0x80 | 1;
    206         data[1] = 202;  // SDES
    207         data[4] = 0xde;  // SSRC
    208         data[5] = 0xad;
    209         data[6] = 0xbe;
    210         data[7] = 0xef;
    211 
    212         size_t offset = 8;
    213 
    214         data[offset++] = 1;  // CNAME
    215 
    216         AString cname = "stagefright@";
    217         cname.append(inet_ntoa(addr.sin_addr));
    218         data[offset++] = cname.size();
    219 
    220         memcpy(&data[offset], cname.c_str(), cname.size());
    221         offset += cname.size();
    222 
    223         data[offset++] = 6;  // TOOL
    224 
    225         AString tool;
    226         MakeUserAgentString(&tool);
    227 
    228         data[offset++] = tool.size();
    229 
    230         memcpy(&data[offset], tool.c_str(), tool.size());
    231         offset += tool.size();
    232 
    233         data[offset++] = 0;
    234 
    235         if ((offset % 4) > 0) {
    236             size_t count = 4 - (offset % 4);
    237             switch (count) {
    238                 case 3:
    239                     data[offset++] = 0;
    240                 case 2:
    241                     data[offset++] = 0;
    242                 case 1:
    243                     data[offset++] = 0;
    244             }
    245         }
    246 
    247         size_t numWords = (offset / 4) - 1;
    248         data[2] = numWords >> 8;
    249         data[3] = numWords & 0xff;
    250 
    251         buffer->setRange(buffer->offset(), buffer->size() + offset);
    252     }
    253 
    254     // In case we're behind NAT, fire off two UDP packets to the remote
    255     // rtp/rtcp ports to poke a hole into the firewall for future incoming
    256     // packets. We're going to send an RR/SDES RTCP packet to both of them.
    257     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
    258         struct sockaddr_in addr;
    259         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    260         addr.sin_family = AF_INET;
    261 
    262         AString source;
    263         AString server_port;
    264         if (!GetAttribute(transport.c_str(),
    265                           "source",
    266                           &source)) {
    267             LOGW("Missing 'source' field in Transport response. Using "
    268                  "RTSP endpoint address.");
    269 
    270             struct hostent *ent = gethostbyname(mSessionHost.c_str());
    271             if (ent == NULL) {
    272                 LOGE("Failed to look up address of session host '%s'",
    273                      mSessionHost.c_str());
    274 
    275                 return false;
    276             }
    277 
    278             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    279         } else {
    280             addr.sin_addr.s_addr = inet_addr(source.c_str());
    281         }
    282 
    283         if (!GetAttribute(transport.c_str(),
    284                                  "server_port",
    285                                  &server_port)) {
    286             LOGI("Missing 'server_port' field in Transport response.");
    287             return false;
    288         }
    289 
    290         int rtpPort, rtcpPort;
    291         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
    292                 || rtpPort <= 0 || rtpPort > 65535
    293                 || rtcpPort <=0 || rtcpPort > 65535
    294                 || rtcpPort != rtpPort + 1) {
    295             LOGE("Server picked invalid RTP/RTCP port pair %s,"
    296                  " RTP port must be even, RTCP port must be one higher.",
    297                  server_port.c_str());
    298 
    299             return false;
    300         }
    301 
    302         if (rtpPort & 1) {
    303             LOGW("Server picked an odd RTP port, it should've picked an "
    304                  "even one, we'll let it pass for now, but this may break "
    305                  "in the future.");
    306         }
    307 
    308         if (addr.sin_addr.s_addr == INADDR_NONE) {
    309             return true;
    310         }
    311 
    312         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
    313             // No firewalls to traverse on the loopback interface.
    314             return true;
    315         }
    316 
    317         // Make up an RR/SDES RTCP packet.
    318         sp<ABuffer> buf = new ABuffer(65536);
    319         buf->setRange(0, 0);
    320         addRR(buf);
    321         addSDES(rtpSocket, buf);
    322 
    323         addr.sin_port = htons(rtpPort);
    324 
    325         ssize_t n = sendto(
    326                 rtpSocket, buf->data(), buf->size(), 0,
    327                 (const sockaddr *)&addr, sizeof(addr));
    328 
    329         if (n < (ssize_t)buf->size()) {
    330             LOGE("failed to poke a hole for RTP packets");
    331             return false;
    332         }
    333 
    334         addr.sin_port = htons(rtcpPort);
    335 
    336         n = sendto(
    337                 rtcpSocket, buf->data(), buf->size(), 0,
    338                 (const sockaddr *)&addr, sizeof(addr));
    339 
    340         if (n < (ssize_t)buf->size()) {
    341             LOGE("failed to poke a hole for RTCP packets");
    342             return false;
    343         }
    344 
    345         LOGV("successfully poked holes.");
    346 
    347         return true;
    348     }
    349 
    350     virtual void onMessageReceived(const sp<AMessage> &msg) {
    351         switch (msg->what()) {
    352             case 'conn':
    353             {
    354                 int32_t result;
    355                 CHECK(msg->findInt32("result", &result));
    356 
    357                 LOGI("connection request completed with result %d (%s)",
    358                      result, strerror(-result));
    359 
    360                 if (result == OK) {
    361                     AString request;
    362                     request = "DESCRIBE ";
    363                     request.append(mSessionURL);
    364                     request.append(" RTSP/1.0\r\n");
    365                     request.append("Accept: application/sdp\r\n");
    366                     request.append("\r\n");
    367 
    368                     sp<AMessage> reply = new AMessage('desc', id());
    369                     mConn->sendRequest(request.c_str(), reply);
    370                 } else {
    371                     (new AMessage('disc', id()))->post();
    372                 }
    373                 break;
    374             }
    375 
    376             case 'disc':
    377             {
    378                 ++mKeepAliveGeneration;
    379 
    380                 int32_t reconnect;
    381                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    382                     sp<AMessage> reply = new AMessage('conn', id());
    383                     mConn->connect(mOriginalSessionURL.c_str(), reply);
    384                 } else {
    385                     (new AMessage('quit', id()))->post();
    386                 }
    387                 break;
    388             }
    389 
    390             case 'desc':
    391             {
    392                 int32_t result;
    393                 CHECK(msg->findInt32("result", &result));
    394 
    395                 LOGI("DESCRIBE completed with result %d (%s)",
    396                      result, strerror(-result));
    397 
    398                 if (result == OK) {
    399                     sp<RefBase> obj;
    400                     CHECK(msg->findObject("response", &obj));
    401                     sp<ARTSPResponse> response =
    402                         static_cast<ARTSPResponse *>(obj.get());
    403 
    404                     if (response->mStatusCode == 302) {
    405                         ssize_t i = response->mHeaders.indexOfKey("location");
    406                         CHECK_GE(i, 0);
    407 
    408                         mSessionURL = response->mHeaders.valueAt(i);
    409 
    410                         AString request;
    411                         request = "DESCRIBE ";
    412                         request.append(mSessionURL);
    413                         request.append(" RTSP/1.0\r\n");
    414                         request.append("Accept: application/sdp\r\n");
    415                         request.append("\r\n");
    416 
    417                         sp<AMessage> reply = new AMessage('desc', id());
    418                         mConn->sendRequest(request.c_str(), reply);
    419                         break;
    420                     }
    421 
    422                     if (response->mStatusCode != 200) {
    423                         result = UNKNOWN_ERROR;
    424                     } else {
    425                         mSessionDesc = new ASessionDescription;
    426 
    427                         mSessionDesc->setTo(
    428                                 response->mContent->data(),
    429                                 response->mContent->size());
    430 
    431                         if (!mSessionDesc->isValid()) {
    432                             LOGE("Failed to parse session description.");
    433                             result = ERROR_MALFORMED;
    434                         } else {
    435                             ssize_t i = response->mHeaders.indexOfKey("content-base");
    436                             if (i >= 0) {
    437                                 mBaseURL = response->mHeaders.valueAt(i);
    438                             } else {
    439                                 i = response->mHeaders.indexOfKey("content-location");
    440                                 if (i >= 0) {
    441                                     mBaseURL = response->mHeaders.valueAt(i);
    442                                 } else {
    443                                     mBaseURL = mSessionURL;
    444                                 }
    445                             }
    446 
    447                             if (!mBaseURL.startsWith("rtsp://")) {
    448                                 // Some misbehaving servers specify a relative
    449                                 // URL in one of the locations above, combine
    450                                 // it with the absolute session URL to get
    451                                 // something usable...
    452 
    453                                 LOGW("Server specified a non-absolute base URL"
    454                                      ", combining it with the session URL to "
    455                                      "get something usable...");
    456 
    457                                 AString tmp;
    458                                 CHECK(MakeURL(
    459                                             mSessionURL.c_str(),
    460                                             mBaseURL.c_str(),
    461                                             &tmp));
    462 
    463                                 mBaseURL = tmp;
    464                             }
    465 
    466                             if (mSessionDesc->countTracks() < 2) {
    467                                 // There's no actual tracks in this session.
    468                                 // The first "track" is merely session meta
    469                                 // data.
    470 
    471                                 LOGW("Session doesn't contain any playable "
    472                                      "tracks. Aborting.");
    473                                 result = ERROR_UNSUPPORTED;
    474                             } else {
    475                                 setupTrack(1);
    476                             }
    477                         }
    478                     }
    479                 }
    480 
    481                 if (result != OK) {
    482                     sp<AMessage> reply = new AMessage('disc', id());
    483                     mConn->disconnect(reply);
    484                 }
    485                 break;
    486             }
    487 
    488             case 'setu':
    489             {
    490                 size_t index;
    491                 CHECK(msg->findSize("index", &index));
    492 
    493                 TrackInfo *track = NULL;
    494                 size_t trackIndex;
    495                 if (msg->findSize("track-index", &trackIndex)) {
    496                     track = &mTracks.editItemAt(trackIndex);
    497                 }
    498 
    499                 int32_t result;
    500                 CHECK(msg->findInt32("result", &result));
    501 
    502                 LOGI("SETUP(%d) completed with result %d (%s)",
    503                      index, result, strerror(-result));
    504 
    505                 if (result == OK) {
    506                     CHECK(track != NULL);
    507 
    508                     sp<RefBase> obj;
    509                     CHECK(msg->findObject("response", &obj));
    510                     sp<ARTSPResponse> response =
    511                         static_cast<ARTSPResponse *>(obj.get());
    512 
    513                     if (response->mStatusCode != 200) {
    514                         result = UNKNOWN_ERROR;
    515                     } else {
    516                         ssize_t i = response->mHeaders.indexOfKey("session");
    517                         CHECK_GE(i, 0);
    518 
    519                         mSessionID = response->mHeaders.valueAt(i);
    520 
    521                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    522                         AString timeoutStr;
    523                         if (GetAttribute(
    524                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
    525                             char *end;
    526                             unsigned long timeoutSecs =
    527                                 strtoul(timeoutStr.c_str(), &end, 10);
    528 
    529                             if (end == timeoutStr.c_str() || *end != '\0') {
    530                                 LOGW("server specified malformed timeout '%s'",
    531                                      timeoutStr.c_str());
    532 
    533                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    534                             } else if (timeoutSecs < 15) {
    535                                 LOGW("server specified too short a timeout "
    536                                      "(%lu secs), using default.",
    537                                      timeoutSecs);
    538 
    539                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    540                             } else {
    541                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
    542 
    543                                 LOGI("server specified timeout of %lu secs.",
    544                                      timeoutSecs);
    545                             }
    546                         }
    547 
    548                         i = mSessionID.find(";");
    549                         if (i >= 0) {
    550                             // Remove options, i.e. ";timeout=90"
    551                             mSessionID.erase(i, mSessionID.size() - i);
    552                         }
    553 
    554                         sp<AMessage> notify = new AMessage('accu', id());
    555                         notify->setSize("track-index", trackIndex);
    556 
    557                         i = response->mHeaders.indexOfKey("transport");
    558                         CHECK_GE(i, 0);
    559 
    560                         if (!track->mUsingInterleavedTCP) {
    561                             AString transport = response->mHeaders.valueAt(i);
    562 
    563                             // We are going to continue even if we were
    564                             // unable to poke a hole into the firewall...
    565                             pokeAHole(
    566                                     track->mRTPSocket,
    567                                     track->mRTCPSocket,
    568                                     transport);
    569                         }
    570 
    571                         mRTPConn->addStream(
    572                                 track->mRTPSocket, track->mRTCPSocket,
    573                                 mSessionDesc, index,
    574                                 notify, track->mUsingInterleavedTCP);
    575 
    576                         mSetupTracksSuccessful = true;
    577                     }
    578                 }
    579 
    580                 if (result != OK) {
    581                     if (track) {
    582                         if (!track->mUsingInterleavedTCP) {
    583                             // Clear the tag
    584                             if (mUIDValid) {
    585                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
    586                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
    587                             }
    588 
    589                             close(track->mRTPSocket);
    590                             close(track->mRTCPSocket);
    591                         }
    592 
    593                         mTracks.removeItemsAt(trackIndex);
    594                     }
    595                 }
    596 
    597                 ++index;
    598                 if (index < mSessionDesc->countTracks()) {
    599                     setupTrack(index);
    600                 } else if (mSetupTracksSuccessful) {
    601                     ++mKeepAliveGeneration;
    602                     postKeepAlive();
    603 
    604                     AString request = "PLAY ";
    605                     request.append(mSessionURL);
    606                     request.append(" RTSP/1.0\r\n");
    607 
    608                     request.append("Session: ");
    609                     request.append(mSessionID);
    610                     request.append("\r\n");
    611 
    612                     request.append("\r\n");
    613 
    614                     sp<AMessage> reply = new AMessage('play', id());
    615                     mConn->sendRequest(request.c_str(), reply);
    616                 } else {
    617                     sp<AMessage> reply = new AMessage('disc', id());
    618                     mConn->disconnect(reply);
    619                 }
    620                 break;
    621             }
    622 
    623             case 'play':
    624             {
    625                 int32_t result;
    626                 CHECK(msg->findInt32("result", &result));
    627 
    628                 LOGI("PLAY completed with result %d (%s)",
    629                      result, strerror(-result));
    630 
    631                 if (result == OK) {
    632                     sp<RefBase> obj;
    633                     CHECK(msg->findObject("response", &obj));
    634                     sp<ARTSPResponse> response =
    635                         static_cast<ARTSPResponse *>(obj.get());
    636 
    637                     if (response->mStatusCode != 200) {
    638                         result = UNKNOWN_ERROR;
    639                     } else {
    640                         parsePlayResponse(response);
    641 
    642                         sp<AMessage> timeout = new AMessage('tiou', id());
    643                         timeout->post(kStartupTimeoutUs);
    644                     }
    645                 }
    646 
    647                 if (result != OK) {
    648                     sp<AMessage> reply = new AMessage('disc', id());
    649                     mConn->disconnect(reply);
    650                 }
    651 
    652                 break;
    653             }
    654 
    655             case 'aliv':
    656             {
    657                 int32_t generation;
    658                 CHECK(msg->findInt32("generation", &generation));
    659 
    660                 if (generation != mKeepAliveGeneration) {
    661                     // obsolete event.
    662                     break;
    663                 }
    664 
    665                 AString request;
    666                 request.append("OPTIONS ");
    667                 request.append(mSessionURL);
    668                 request.append(" RTSP/1.0\r\n");
    669                 request.append("Session: ");
    670                 request.append(mSessionID);
    671                 request.append("\r\n");
    672                 request.append("\r\n");
    673 
    674                 sp<AMessage> reply = new AMessage('opts', id());
    675                 reply->setInt32("generation", mKeepAliveGeneration);
    676                 mConn->sendRequest(request.c_str(), reply);
    677                 break;
    678             }
    679 
    680             case 'opts':
    681             {
    682                 int32_t result;
    683                 CHECK(msg->findInt32("result", &result));
    684 
    685                 LOGI("OPTIONS completed with result %d (%s)",
    686                      result, strerror(-result));
    687 
    688                 int32_t generation;
    689                 CHECK(msg->findInt32("generation", &generation));
    690 
    691                 if (generation != mKeepAliveGeneration) {
    692                     // obsolete event.
    693                     break;
    694                 }
    695 
    696                 postKeepAlive();
    697                 break;
    698             }
    699 
    700             case 'abor':
    701             {
    702                 for (size_t i = 0; i < mTracks.size(); ++i) {
    703                     TrackInfo *info = &mTracks.editItemAt(i);
    704 
    705                     if (!mFirstAccessUnit) {
    706                         postQueueEOS(i, ERROR_END_OF_STREAM);
    707                     }
    708 
    709                     if (!info->mUsingInterleavedTCP) {
    710                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
    711 
    712                         // Clear the tag
    713                         if (mUIDValid) {
    714                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
    715                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
    716                         }
    717 
    718                         close(info->mRTPSocket);
    719                         close(info->mRTCPSocket);
    720                     }
    721                 }
    722                 mTracks.clear();
    723                 mSetupTracksSuccessful = false;
    724                 mSeekPending = false;
    725                 mFirstAccessUnit = true;
    726                 mNTPAnchorUs = -1;
    727                 mMediaAnchorUs = -1;
    728                 mNumAccessUnitsReceived = 0;
    729                 mReceivedFirstRTCPPacket = false;
    730                 mReceivedFirstRTPPacket = false;
    731                 mSeekable = false;
    732 
    733                 sp<AMessage> reply = new AMessage('tear', id());
    734 
    735                 int32_t reconnect;
    736                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    737                     reply->setInt32("reconnect", true);
    738                 }
    739 
    740                 AString request;
    741                 request = "TEARDOWN ";
    742 
    743                 // XXX should use aggregate url from SDP here...
    744                 request.append(mSessionURL);
    745                 request.append(" RTSP/1.0\r\n");
    746 
    747                 request.append("Session: ");
    748                 request.append(mSessionID);
    749                 request.append("\r\n");
    750 
    751                 request.append("\r\n");
    752 
    753                 mConn->sendRequest(request.c_str(), reply);
    754                 break;
    755             }
    756 
    757             case 'tear':
    758             {
    759                 int32_t result;
    760                 CHECK(msg->findInt32("result", &result));
    761 
    762                 LOGI("TEARDOWN completed with result %d (%s)",
    763                      result, strerror(-result));
    764 
    765                 sp<AMessage> reply = new AMessage('disc', id());
    766 
    767                 int32_t reconnect;
    768                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    769                     reply->setInt32("reconnect", true);
    770                 }
    771 
    772                 mConn->disconnect(reply);
    773                 break;
    774             }
    775 
    776             case 'quit':
    777             {
    778                 sp<AMessage> msg = mNotify->dup();
    779                 msg->setInt32("what", kWhatDisconnected);
    780                 msg->setInt32("result", UNKNOWN_ERROR);
    781                 msg->post();
    782                 break;
    783             }
    784 
    785             case 'chek':
    786             {
    787                 int32_t generation;
    788                 CHECK(msg->findInt32("generation", &generation));
    789                 if (generation != mCheckGeneration) {
    790                     // This is an outdated message. Ignore.
    791                     break;
    792                 }
    793 
    794                 if (mNumAccessUnitsReceived == 0) {
    795 #if 1
    796                     LOGI("stream ended? aborting.");
    797                     (new AMessage('abor', id()))->post();
    798                     break;
    799 #else
    800                     LOGI("haven't seen an AU in a looong time.");
    801 #endif
    802                 }
    803 
    804                 mNumAccessUnitsReceived = 0;
    805                 msg->post(kAccessUnitTimeoutUs);
    806                 break;
    807             }
    808 
    809             case 'accu':
    810             {
    811                 int32_t timeUpdate;
    812                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
    813                     size_t trackIndex;
    814                     CHECK(msg->findSize("track-index", &trackIndex));
    815 
    816                     uint32_t rtpTime;
    817                     uint64_t ntpTime;
    818                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
    819                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
    820 
    821                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
    822                     break;
    823                 }
    824 
    825                 int32_t first;
    826                 if (msg->findInt32("first-rtcp", &first)) {
    827                     mReceivedFirstRTCPPacket = true;
    828                     break;
    829                 }
    830 
    831                 if (msg->findInt32("first-rtp", &first)) {
    832                     mReceivedFirstRTPPacket = true;
    833                     break;
    834                 }
    835 
    836                 ++mNumAccessUnitsReceived;
    837                 postAccessUnitTimeoutCheck();
    838 
    839                 size_t trackIndex;
    840                 CHECK(msg->findSize("track-index", &trackIndex));
    841 
    842                 if (trackIndex >= mTracks.size()) {
    843                     LOGV("late packets ignored.");
    844                     break;
    845                 }
    846 
    847                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
    848 
    849                 int32_t eos;
    850                 if (msg->findInt32("eos", &eos)) {
    851                     LOGI("received BYE on track index %d", trackIndex);
    852 #if 0
    853                     track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
    854 #endif
    855                     return;
    856                 }
    857 
    858                 sp<RefBase> obj;
    859                 CHECK(msg->findObject("access-unit", &obj));
    860 
    861                 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
    862 
    863                 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
    864 
    865                 if (mSeekPending) {
    866                     LOGV("we're seeking, dropping stale packet.");
    867                     break;
    868                 }
    869 
    870                 if (seqNum < track->mFirstSeqNumInSegment) {
    871                     LOGV("dropping stale access-unit (%d < %d)",
    872                          seqNum, track->mFirstSeqNumInSegment);
    873                     break;
    874                 }
    875 
    876                 if (track->mNewSegment) {
    877                     track->mNewSegment = false;
    878                 }
    879 
    880                 onAccessUnitComplete(trackIndex, accessUnit);
    881                 break;
    882             }
    883 
    884             case 'seek':
    885             {
    886                 if (!mSeekable) {
    887                     LOGW("This is a live stream, ignoring seek request.");
    888 
    889                     sp<AMessage> msg = mNotify->dup();
    890                     msg->setInt32("what", kWhatSeekDone);
    891                     msg->post();
    892                     break;
    893                 }
    894 
    895                 int64_t timeUs;
    896                 CHECK(msg->findInt64("time", &timeUs));
    897 
    898                 mSeekPending = true;
    899 
    900                 // Disable the access unit timeout until we resumed
    901                 // playback again.
    902                 mCheckPending = true;
    903                 ++mCheckGeneration;
    904 
    905                 AString request = "PAUSE ";
    906                 request.append(mSessionURL);
    907                 request.append(" RTSP/1.0\r\n");
    908 
    909                 request.append("Session: ");
    910                 request.append(mSessionID);
    911                 request.append("\r\n");
    912 
    913                 request.append("\r\n");
    914 
    915                 sp<AMessage> reply = new AMessage('see1', id());
    916                 reply->setInt64("time", timeUs);
    917                 mConn->sendRequest(request.c_str(), reply);
    918                 break;
    919             }
    920 
    921             case 'see1':
    922             {
    923                 // Session is paused now.
    924                 for (size_t i = 0; i < mTracks.size(); ++i) {
    925                     TrackInfo *info = &mTracks.editItemAt(i);
    926 
    927                     postQueueSeekDiscontinuity(i);
    928 
    929                     info->mRTPAnchor = 0;
    930                     info->mNTPAnchorUs = -1;
    931                 }
    932 
    933                 mNTPAnchorUs = -1;
    934 
    935                 int64_t timeUs;
    936                 CHECK(msg->findInt64("time", &timeUs));
    937 
    938                 AString request = "PLAY ";
    939                 request.append(mSessionURL);
    940                 request.append(" RTSP/1.0\r\n");
    941 
    942                 request.append("Session: ");
    943                 request.append(mSessionID);
    944                 request.append("\r\n");
    945 
    946                 request.append(
    947                         StringPrintf(
    948                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
    949 
    950                 request.append("\r\n");
    951 
    952                 sp<AMessage> reply = new AMessage('see2', id());
    953                 mConn->sendRequest(request.c_str(), reply);
    954                 break;
    955             }
    956 
    957             case 'see2':
    958             {
    959                 CHECK(mSeekPending);
    960 
    961                 int32_t result;
    962                 CHECK(msg->findInt32("result", &result));
    963 
    964                 LOGI("PLAY completed with result %d (%s)",
    965                      result, strerror(-result));
    966 
    967                 mCheckPending = false;
    968                 postAccessUnitTimeoutCheck();
    969 
    970                 if (result == OK) {
    971                     sp<RefBase> obj;
    972                     CHECK(msg->findObject("response", &obj));
    973                     sp<ARTSPResponse> response =
    974                         static_cast<ARTSPResponse *>(obj.get());
    975 
    976                     if (response->mStatusCode != 200) {
    977                         result = UNKNOWN_ERROR;
    978                     } else {
    979                         parsePlayResponse(response);
    980 
    981                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
    982                         CHECK_GE(i, 0);
    983 
    984                         LOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
    985 
    986                         LOGI("seek completed.");
    987                     }
    988                 }
    989 
    990                 if (result != OK) {
    991                     LOGE("seek failed, aborting.");
    992                     (new AMessage('abor', id()))->post();
    993                 }
    994 
    995                 mSeekPending = false;
    996 
    997                 sp<AMessage> msg = mNotify->dup();
    998                 msg->setInt32("what", kWhatSeekDone);
    999                 msg->post();
   1000                 break;
   1001             }
   1002 
   1003             case 'biny':
   1004             {
   1005                 sp<RefBase> obj;
   1006                 CHECK(msg->findObject("buffer", &obj));
   1007                 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
   1008 
   1009                 int32_t index;
   1010                 CHECK(buffer->meta()->findInt32("index", &index));
   1011 
   1012                 mRTPConn->injectPacket(index, buffer);
   1013                 break;
   1014             }
   1015 
   1016             case 'tiou':
   1017             {
   1018                 if (!mReceivedFirstRTCPPacket) {
   1019                     if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
   1020                         LOGW("We received RTP packets but no RTCP packets, "
   1021                              "using fake timestamps.");
   1022 
   1023                         mTryFakeRTCP = true;
   1024 
   1025                         mReceivedFirstRTCPPacket = true;
   1026 
   1027                         fakeTimestamps();
   1028                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
   1029                         LOGW("Never received any data, switching transports.");
   1030 
   1031                         mTryTCPInterleaving = true;
   1032 
   1033                         sp<AMessage> msg = new AMessage('abor', id());
   1034                         msg->setInt32("reconnect", true);
   1035                         msg->post();
   1036                     } else {
   1037                         LOGW("Never received any data, disconnecting.");
   1038                         (new AMessage('abor', id()))->post();
   1039                     }
   1040                 }
   1041                 break;
   1042             }
   1043 
   1044             default:
   1045                 TRESPASS();
   1046                 break;
   1047         }
   1048     }
   1049 
   1050     void postKeepAlive() {
   1051         sp<AMessage> msg = new AMessage('aliv', id());
   1052         msg->setInt32("generation", mKeepAliveGeneration);
   1053         msg->post((mKeepAliveTimeoutUs * 9) / 10);
   1054     }
   1055 
   1056     void postAccessUnitTimeoutCheck() {
   1057         if (mCheckPending) {
   1058             return;
   1059         }
   1060 
   1061         mCheckPending = true;
   1062         sp<AMessage> check = new AMessage('chek', id());
   1063         check->setInt32("generation", mCheckGeneration);
   1064         check->post(kAccessUnitTimeoutUs);
   1065     }
   1066 
   1067     static void SplitString(
   1068             const AString &s, const char *separator, List<AString> *items) {
   1069         items->clear();
   1070         size_t start = 0;
   1071         while (start < s.size()) {
   1072             ssize_t offset = s.find(separator, start);
   1073 
   1074             if (offset < 0) {
   1075                 items->push_back(AString(s, start, s.size() - start));
   1076                 break;
   1077             }
   1078 
   1079             items->push_back(AString(s, start, offset - start));
   1080             start = offset + strlen(separator);
   1081         }
   1082     }
   1083 
   1084     void parsePlayResponse(const sp<ARTSPResponse> &response) {
   1085         mSeekable = false;
   1086 
   1087         ssize_t i = response->mHeaders.indexOfKey("range");
   1088         if (i < 0) {
   1089             // Server doesn't even tell use what range it is going to
   1090             // play, therefore we won't support seeking.
   1091             return;
   1092         }
   1093 
   1094         AString range = response->mHeaders.valueAt(i);
   1095         LOGV("Range: %s", range.c_str());
   1096 
   1097         AString val;
   1098         CHECK(GetAttribute(range.c_str(), "npt", &val));
   1099 
   1100         float npt1, npt2;
   1101         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
   1102             // This is a live stream and therefore not seekable.
   1103 
   1104             LOGI("This is a live stream");
   1105             return;
   1106         }
   1107 
   1108         i = response->mHeaders.indexOfKey("rtp-info");
   1109         CHECK_GE(i, 0);
   1110 
   1111         AString rtpInfo = response->mHeaders.valueAt(i);
   1112         List<AString> streamInfos;
   1113         SplitString(rtpInfo, ",", &streamInfos);
   1114 
   1115         int n = 1;
   1116         for (List<AString>::iterator it = streamInfos.begin();
   1117              it != streamInfos.end(); ++it) {
   1118             (*it).trim();
   1119             LOGV("streamInfo[%d] = %s", n, (*it).c_str());
   1120 
   1121             CHECK(GetAttribute((*it).c_str(), "url", &val));
   1122 
   1123             size_t trackIndex = 0;
   1124             while (trackIndex < mTracks.size()
   1125                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
   1126                 ++trackIndex;
   1127             }
   1128             CHECK_LT(trackIndex, mTracks.size());
   1129 
   1130             CHECK(GetAttribute((*it).c_str(), "seq", &val));
   1131 
   1132             char *end;
   1133             unsigned long seq = strtoul(val.c_str(), &end, 10);
   1134 
   1135             TrackInfo *info = &mTracks.editItemAt(trackIndex);
   1136             info->mFirstSeqNumInSegment = seq;
   1137             info->mNewSegment = true;
   1138 
   1139             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
   1140 
   1141             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
   1142 
   1143             LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
   1144 
   1145             info->mNormalPlayTimeRTP = rtpTime;
   1146             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
   1147 
   1148             if (!mFirstAccessUnit) {
   1149                 postNormalPlayTimeMapping(
   1150                         trackIndex,
   1151                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1152             }
   1153 
   1154             ++n;
   1155         }
   1156 
   1157         mSeekable = true;
   1158     }
   1159 
   1160     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
   1161         CHECK_GE(index, 0u);
   1162         CHECK_LT(index, mTracks.size());
   1163 
   1164         const TrackInfo &info = mTracks.itemAt(index);
   1165 
   1166         *timeScale = info.mTimeScale;
   1167 
   1168         return info.mPacketSource->getFormat();
   1169     }
   1170 
   1171     size_t countTracks() const {
   1172         return mTracks.size();
   1173     }
   1174 
   1175 private:
   1176     struct TrackInfo {
   1177         AString mURL;
   1178         int mRTPSocket;
   1179         int mRTCPSocket;
   1180         bool mUsingInterleavedTCP;
   1181         uint32_t mFirstSeqNumInSegment;
   1182         bool mNewSegment;
   1183 
   1184         uint32_t mRTPAnchor;
   1185         int64_t mNTPAnchorUs;
   1186         int32_t mTimeScale;
   1187 
   1188         uint32_t mNormalPlayTimeRTP;
   1189         int64_t mNormalPlayTimeUs;
   1190 
   1191         sp<APacketSource> mPacketSource;
   1192 
   1193         // Stores packets temporarily while no notion of time
   1194         // has been established yet.
   1195         List<sp<ABuffer> > mPackets;
   1196     };
   1197 
   1198     sp<AMessage> mNotify;
   1199     bool mUIDValid;
   1200     uid_t mUID;
   1201     sp<ALooper> mNetLooper;
   1202     sp<ARTSPConnection> mConn;
   1203     sp<ARTPConnection> mRTPConn;
   1204     sp<ASessionDescription> mSessionDesc;
   1205     AString mOriginalSessionURL;  // This one still has user:pass@
   1206     AString mSessionURL;
   1207     AString mSessionHost;
   1208     AString mBaseURL;
   1209     AString mSessionID;
   1210     bool mSetupTracksSuccessful;
   1211     bool mSeekPending;
   1212     bool mFirstAccessUnit;
   1213 
   1214     int64_t mNTPAnchorUs;
   1215     int64_t mMediaAnchorUs;
   1216     int64_t mLastMediaTimeUs;
   1217 
   1218     int64_t mNumAccessUnitsReceived;
   1219     bool mCheckPending;
   1220     int32_t mCheckGeneration;
   1221     bool mTryTCPInterleaving;
   1222     bool mTryFakeRTCP;
   1223     bool mReceivedFirstRTCPPacket;
   1224     bool mReceivedFirstRTPPacket;
   1225     bool mSeekable;
   1226     int64_t mKeepAliveTimeoutUs;
   1227     int32_t mKeepAliveGeneration;
   1228 
   1229     Vector<TrackInfo> mTracks;
   1230 
   1231     void setupTrack(size_t index) {
   1232         sp<APacketSource> source =
   1233             new APacketSource(mSessionDesc, index);
   1234 
   1235         if (source->initCheck() != OK) {
   1236             LOGW("Unsupported format. Ignoring track #%d.", index);
   1237 
   1238             sp<AMessage> reply = new AMessage('setu', id());
   1239             reply->setSize("index", index);
   1240             reply->setInt32("result", ERROR_UNSUPPORTED);
   1241             reply->post();
   1242             return;
   1243         }
   1244 
   1245         AString url;
   1246         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
   1247 
   1248         AString trackURL;
   1249         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
   1250 
   1251         mTracks.push(TrackInfo());
   1252         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
   1253         info->mURL = trackURL;
   1254         info->mPacketSource = source;
   1255         info->mUsingInterleavedTCP = false;
   1256         info->mFirstSeqNumInSegment = 0;
   1257         info->mNewSegment = true;
   1258         info->mRTPAnchor = 0;
   1259         info->mNTPAnchorUs = -1;
   1260         info->mNormalPlayTimeRTP = 0;
   1261         info->mNormalPlayTimeUs = 0ll;
   1262 
   1263         unsigned long PT;
   1264         AString formatDesc;
   1265         AString formatParams;
   1266         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
   1267 
   1268         int32_t timescale;
   1269         int32_t numChannels;
   1270         ASessionDescription::ParseFormatDesc(
   1271                 formatDesc.c_str(), &timescale, &numChannels);
   1272 
   1273         info->mTimeScale = timescale;
   1274 
   1275         LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
   1276 
   1277         AString request = "SETUP ";
   1278         request.append(trackURL);
   1279         request.append(" RTSP/1.0\r\n");
   1280 
   1281         if (mTryTCPInterleaving) {
   1282             size_t interleaveIndex = 2 * (mTracks.size() - 1);
   1283             info->mUsingInterleavedTCP = true;
   1284             info->mRTPSocket = interleaveIndex;
   1285             info->mRTCPSocket = interleaveIndex + 1;
   1286 
   1287             request.append("Transport: RTP/AVP/TCP;interleaved=");
   1288             request.append(interleaveIndex);
   1289             request.append("-");
   1290             request.append(interleaveIndex + 1);
   1291         } else {
   1292             unsigned rtpPort;
   1293             ARTPConnection::MakePortPair(
   1294                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
   1295 
   1296             if (mUIDValid) {
   1297                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
   1298                                                 (uint32_t)*(uint32_t*) "RTP_");
   1299                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
   1300                                                 (uint32_t)*(uint32_t*) "RTP_");
   1301             }
   1302 
   1303             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
   1304             request.append(rtpPort);
   1305             request.append("-");
   1306             request.append(rtpPort + 1);
   1307         }
   1308 
   1309         request.append("\r\n");
   1310 
   1311         if (index > 1) {
   1312             request.append("Session: ");
   1313             request.append(mSessionID);
   1314             request.append("\r\n");
   1315         }
   1316 
   1317         request.append("\r\n");
   1318 
   1319         sp<AMessage> reply = new AMessage('setu', id());
   1320         reply->setSize("index", index);
   1321         reply->setSize("track-index", mTracks.size() - 1);
   1322         mConn->sendRequest(request.c_str(), reply);
   1323     }
   1324 
   1325     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
   1326         out->clear();
   1327 
   1328         if (strncasecmp("rtsp://", baseURL, 7)) {
   1329             // Base URL must be absolute
   1330             return false;
   1331         }
   1332 
   1333         if (!strncasecmp("rtsp://", url, 7)) {
   1334             // "url" is already an absolute URL, ignore base URL.
   1335             out->setTo(url);
   1336             return true;
   1337         }
   1338 
   1339         size_t n = strlen(baseURL);
   1340         if (baseURL[n - 1] == '/') {
   1341             out->setTo(baseURL);
   1342             out->append(url);
   1343         } else {
   1344             const char *slashPos = strrchr(baseURL, '/');
   1345 
   1346             if (slashPos > &baseURL[6]) {
   1347                 out->setTo(baseURL, slashPos - baseURL);
   1348             } else {
   1349                 out->setTo(baseURL);
   1350             }
   1351 
   1352             out->append("/");
   1353             out->append(url);
   1354         }
   1355 
   1356         return true;
   1357     }
   1358 
   1359     void fakeTimestamps() {
   1360         for (size_t i = 0; i < mTracks.size(); ++i) {
   1361             onTimeUpdate(i, 0, 0ll);
   1362         }
   1363     }
   1364 
   1365     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
   1366         LOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
   1367              trackIndex, rtpTime, ntpTime);
   1368 
   1369         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
   1370 
   1371         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1372 
   1373         track->mRTPAnchor = rtpTime;
   1374         track->mNTPAnchorUs = ntpTimeUs;
   1375 
   1376         if (mNTPAnchorUs < 0) {
   1377             mNTPAnchorUs = ntpTimeUs;
   1378             mMediaAnchorUs = mLastMediaTimeUs;
   1379         }
   1380     }
   1381 
   1382     void onAccessUnitComplete(
   1383             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
   1384         LOGV("onAccessUnitComplete track %d", trackIndex);
   1385 
   1386         if (mFirstAccessUnit) {
   1387             sp<AMessage> msg = mNotify->dup();
   1388             msg->setInt32("what", kWhatConnected);
   1389             msg->post();
   1390 
   1391             if (mSeekable) {
   1392                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1393                     TrackInfo *info = &mTracks.editItemAt(i);
   1394 
   1395                     postNormalPlayTimeMapping(
   1396                             i,
   1397                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1398                 }
   1399             }
   1400 
   1401             mFirstAccessUnit = false;
   1402         }
   1403 
   1404         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1405 
   1406         if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) {
   1407             LOGV("storing accessUnit, no time established yet");
   1408             track->mPackets.push_back(accessUnit);
   1409             return;
   1410         }
   1411 
   1412         while (!track->mPackets.empty()) {
   1413             sp<ABuffer> accessUnit = *track->mPackets.begin();
   1414             track->mPackets.erase(track->mPackets.begin());
   1415 
   1416             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1417                 postQueueAccessUnit(trackIndex, accessUnit);
   1418             }
   1419         }
   1420 
   1421         if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1422             postQueueAccessUnit(trackIndex, accessUnit);
   1423         }
   1424     }
   1425 
   1426     bool addMediaTimestamp(
   1427             int32_t trackIndex, const TrackInfo *track,
   1428             const sp<ABuffer> &accessUnit) {
   1429         uint32_t rtpTime;
   1430         CHECK(accessUnit->meta()->findInt32(
   1431                     "rtp-time", (int32_t *)&rtpTime));
   1432 
   1433         int64_t relRtpTimeUs =
   1434             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
   1435                 / track->mTimeScale;
   1436 
   1437         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
   1438 
   1439         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
   1440 
   1441         if (mediaTimeUs > mLastMediaTimeUs) {
   1442             mLastMediaTimeUs = mediaTimeUs;
   1443         }
   1444 
   1445         if (mediaTimeUs < 0) {
   1446             LOGV("dropping early accessUnit.");
   1447             return false;
   1448         }
   1449 
   1450         LOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
   1451              trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
   1452 
   1453         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
   1454 
   1455         return true;
   1456     }
   1457 
   1458     void postQueueAccessUnit(
   1459             size_t trackIndex, const sp<ABuffer> &accessUnit) {
   1460         sp<AMessage> msg = mNotify->dup();
   1461         msg->setInt32("what", kWhatAccessUnit);
   1462         msg->setSize("trackIndex", trackIndex);
   1463         msg->setObject("accessUnit", accessUnit);
   1464         msg->post();
   1465     }
   1466 
   1467     void postQueueEOS(size_t trackIndex, status_t finalResult) {
   1468         sp<AMessage> msg = mNotify->dup();
   1469         msg->setInt32("what", kWhatEOS);
   1470         msg->setSize("trackIndex", trackIndex);
   1471         msg->setInt32("finalResult", finalResult);
   1472         msg->post();
   1473     }
   1474 
   1475     void postQueueSeekDiscontinuity(size_t trackIndex) {
   1476         sp<AMessage> msg = mNotify->dup();
   1477         msg->setInt32("what", kWhatSeekDiscontinuity);
   1478         msg->setSize("trackIndex", trackIndex);
   1479         msg->post();
   1480     }
   1481 
   1482     void postNormalPlayTimeMapping(
   1483             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
   1484         sp<AMessage> msg = mNotify->dup();
   1485         msg->setInt32("what", kWhatNormalPlayTimeMapping);
   1486         msg->setSize("trackIndex", trackIndex);
   1487         msg->setInt32("rtpTime", rtpTime);
   1488         msg->setInt64("nptUs", nptUs);
   1489         msg->post();
   1490     }
   1491 
   1492     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
   1493 };
   1494 
   1495 }  // namespace android
   1496 
   1497 #endif  // MY_HANDLER_H_
   1498