Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #ifndef MY_HANDLER_H_
     18 
     19 #define MY_HANDLER_H_
     20 
     21 //#define LOG_NDEBUG 0
     22 #define LOG_TAG "MyHandler"
     23 #include <utils/Log.h>
     24 
     25 #include "APacketSource.h"
     26 #include "ARTPConnection.h"
     27 #include "ARTSPConnection.h"
     28 #include "ASessionDescription.h"
     29 
     30 #include <ctype.h>
     31 #include <cutils/properties.h>
     32 
     33 #include <media/stagefright/foundation/ABuffer.h>
     34 #include <media/stagefright/foundation/ADebug.h>
     35 #include <media/stagefright/foundation/ALooper.h>
     36 #include <media/stagefright/foundation/AMessage.h>
     37 #include <media/stagefright/MediaErrors.h>
     38 
     39 #include <arpa/inet.h>
     40 #include <sys/socket.h>
     41 #include <netdb.h>
     42 
     43 #include "HTTPBase.h"
     44 
     45 // If no access units are received within 5 secs, assume that the rtp
     46 // stream has ended and signal end of stream.
     47 static int64_t kAccessUnitTimeoutUs = 10000000ll;
     48 
     49 // If no access units arrive for the first 10 secs after starting the
     50 // stream, assume none ever will and signal EOS or switch transports.
     51 static int64_t kStartupTimeoutUs = 10000000ll;
     52 
     53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
     54 
     55 namespace android {
     56 
     57 static void MakeUserAgentString(AString *s) {
     58     s->setTo("stagefright/1.1 (Linux;Android ");
     59 
     60 #if (PROPERTY_VALUE_MAX < 8)
     61 #error "PROPERTY_VALUE_MAX must be at least 8"
     62 #endif
     63 
     64     char value[PROPERTY_VALUE_MAX];
     65     property_get("ro.build.version.release", value, "Unknown");
     66     s->append(value);
     67     s->append(")");
     68 }
     69 
     70 static bool GetAttribute(const char *s, const char *key, AString *value) {
     71     value->clear();
     72 
     73     size_t keyLen = strlen(key);
     74 
     75     for (;;) {
     76         while (isspace(*s)) {
     77             ++s;
     78         }
     79 
     80         const char *colonPos = strchr(s, ';');
     81 
     82         size_t len =
     83             (colonPos == NULL) ? strlen(s) : colonPos - s;
     84 
     85         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
     86             value->setTo(&s[keyLen + 1], len - keyLen - 1);
     87             return true;
     88         }
     89 
     90         if (colonPos == NULL) {
     91             return false;
     92         }
     93 
     94         s = colonPos + 1;
     95     }
     96 }
     97 
     98 struct MyHandler : public AHandler {
     99     enum {
    100         kWhatConnected                  = 'conn',
    101         kWhatDisconnected               = 'disc',
    102         kWhatSeekDone                   = 'sdon',
    103 
    104         kWhatAccessUnit                 = 'accU',
    105         kWhatEOS                        = 'eos!',
    106         kWhatSeekDiscontinuity          = 'seeD',
    107         kWhatNormalPlayTimeMapping      = 'nptM',
    108     };
    109 
    110     MyHandler(
    111             const char *url,
    112             const sp<AMessage> &notify,
    113             bool uidValid = false, uid_t uid = 0)
    114         : mNotify(notify),
    115           mUIDValid(uidValid),
    116           mUID(uid),
    117           mNetLooper(new ALooper),
    118           mConn(new ARTSPConnection(mUIDValid, mUID)),
    119           mRTPConn(new ARTPConnection),
    120           mOriginalSessionURL(url),
    121           mSessionURL(url),
    122           mSetupTracksSuccessful(false),
    123           mSeekPending(false),
    124           mFirstAccessUnit(true),
    125           mAllTracksHaveTime(false),
    126           mNTPAnchorUs(-1),
    127           mMediaAnchorUs(-1),
    128           mLastMediaTimeUs(0),
    129           mNumAccessUnitsReceived(0),
    130           mCheckPending(false),
    131           mCheckGeneration(0),
    132           mTryTCPInterleaving(false),
    133           mTryFakeRTCP(false),
    134           mReceivedFirstRTCPPacket(false),
    135           mReceivedFirstRTPPacket(false),
    136           mSeekable(false),
    137           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
    138           mKeepAliveGeneration(0) {
    139         mNetLooper->setName("rtsp net");
    140         mNetLooper->start(false /* runOnCallingThread */,
    141                           false /* canCallJava */,
    142                           PRIORITY_HIGHEST);
    143 
    144         // Strip any authentication info from the session url, we don't
    145         // want to transmit user/pass in cleartext.
    146         AString host, path, user, pass;
    147         unsigned port;
    148         CHECK(ARTSPConnection::ParseURL(
    149                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
    150 
    151         if (user.size() > 0) {
    152             mSessionURL.clear();
    153             mSessionURL.append("rtsp://");
    154             mSessionURL.append(host);
    155             mSessionURL.append(":");
    156             mSessionURL.append(StringPrintf("%u", port));
    157             mSessionURL.append(path);
    158 
    159             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
    160         }
    161 
    162         mSessionHost = host;
    163     }
    164 
    165     void connect() {
    166         looper()->registerHandler(mConn);
    167         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    168 
    169         sp<AMessage> notify = new AMessage('biny', id());
    170         mConn->observeBinaryData(notify);
    171 
    172         sp<AMessage> reply = new AMessage('conn', id());
    173         mConn->connect(mOriginalSessionURL.c_str(), reply);
    174     }
    175 
    176     void disconnect() {
    177         (new AMessage('abor', id()))->post();
    178     }
    179 
    180     void seek(int64_t timeUs) {
    181         sp<AMessage> msg = new AMessage('seek', id());
    182         msg->setInt64("time", timeUs);
    183         msg->post();
    184     }
    185 
    186     static void addRR(const sp<ABuffer> &buf) {
    187         uint8_t *ptr = buf->data() + buf->size();
    188         ptr[0] = 0x80 | 0;
    189         ptr[1] = 201;  // RR
    190         ptr[2] = 0;
    191         ptr[3] = 1;
    192         ptr[4] = 0xde;  // SSRC
    193         ptr[5] = 0xad;
    194         ptr[6] = 0xbe;
    195         ptr[7] = 0xef;
    196 
    197         buf->setRange(0, buf->size() + 8);
    198     }
    199 
    200     static void addSDES(int s, const sp<ABuffer> &buffer) {
    201         struct sockaddr_in addr;
    202         socklen_t addrSize = sizeof(addr);
    203         CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
    204 
    205         uint8_t *data = buffer->data() + buffer->size();
    206         data[0] = 0x80 | 1;
    207         data[1] = 202;  // SDES
    208         data[4] = 0xde;  // SSRC
    209         data[5] = 0xad;
    210         data[6] = 0xbe;
    211         data[7] = 0xef;
    212 
    213         size_t offset = 8;
    214 
    215         data[offset++] = 1;  // CNAME
    216 
    217         AString cname = "stagefright@";
    218         cname.append(inet_ntoa(addr.sin_addr));
    219         data[offset++] = cname.size();
    220 
    221         memcpy(&data[offset], cname.c_str(), cname.size());
    222         offset += cname.size();
    223 
    224         data[offset++] = 6;  // TOOL
    225 
    226         AString tool;
    227         MakeUserAgentString(&tool);
    228 
    229         data[offset++] = tool.size();
    230 
    231         memcpy(&data[offset], tool.c_str(), tool.size());
    232         offset += tool.size();
    233 
    234         data[offset++] = 0;
    235 
    236         if ((offset % 4) > 0) {
    237             size_t count = 4 - (offset % 4);
    238             switch (count) {
    239                 case 3:
    240                     data[offset++] = 0;
    241                 case 2:
    242                     data[offset++] = 0;
    243                 case 1:
    244                     data[offset++] = 0;
    245             }
    246         }
    247 
    248         size_t numWords = (offset / 4) - 1;
    249         data[2] = numWords >> 8;
    250         data[3] = numWords & 0xff;
    251 
    252         buffer->setRange(buffer->offset(), buffer->size() + offset);
    253     }
    254 
    255     // In case we're behind NAT, fire off two UDP packets to the remote
    256     // rtp/rtcp ports to poke a hole into the firewall for future incoming
    257     // packets. We're going to send an RR/SDES RTCP packet to both of them.
    258     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
    259         struct sockaddr_in addr;
    260         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    261         addr.sin_family = AF_INET;
    262 
    263         AString source;
    264         AString server_port;
    265         if (!GetAttribute(transport.c_str(),
    266                           "source",
    267                           &source)) {
    268             ALOGW("Missing 'source' field in Transport response. Using "
    269                  "RTSP endpoint address.");
    270 
    271             struct hostent *ent = gethostbyname(mSessionHost.c_str());
    272             if (ent == NULL) {
    273                 ALOGE("Failed to look up address of session host '%s'",
    274                      mSessionHost.c_str());
    275 
    276                 return false;
    277             }
    278 
    279             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    280         } else {
    281             addr.sin_addr.s_addr = inet_addr(source.c_str());
    282         }
    283 
    284         if (!GetAttribute(transport.c_str(),
    285                                  "server_port",
    286                                  &server_port)) {
    287             ALOGI("Missing 'server_port' field in Transport response.");
    288             return false;
    289         }
    290 
    291         int rtpPort, rtcpPort;
    292         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
    293                 || rtpPort <= 0 || rtpPort > 65535
    294                 || rtcpPort <=0 || rtcpPort > 65535
    295                 || rtcpPort != rtpPort + 1) {
    296             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
    297                  " RTP port must be even, RTCP port must be one higher.",
    298                  server_port.c_str());
    299 
    300             return false;
    301         }
    302 
    303         if (rtpPort & 1) {
    304             ALOGW("Server picked an odd RTP port, it should've picked an "
    305                  "even one, we'll let it pass for now, but this may break "
    306                  "in the future.");
    307         }
    308 
    309         if (addr.sin_addr.s_addr == INADDR_NONE) {
    310             return true;
    311         }
    312 
    313         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
    314             // No firewalls to traverse on the loopback interface.
    315             return true;
    316         }
    317 
    318         // Make up an RR/SDES RTCP packet.
    319         sp<ABuffer> buf = new ABuffer(65536);
    320         buf->setRange(0, 0);
    321         addRR(buf);
    322         addSDES(rtpSocket, buf);
    323 
    324         addr.sin_port = htons(rtpPort);
    325 
    326         ssize_t n = sendto(
    327                 rtpSocket, buf->data(), buf->size(), 0,
    328                 (const sockaddr *)&addr, sizeof(addr));
    329 
    330         if (n < (ssize_t)buf->size()) {
    331             ALOGE("failed to poke a hole for RTP packets");
    332             return false;
    333         }
    334 
    335         addr.sin_port = htons(rtcpPort);
    336 
    337         n = sendto(
    338                 rtcpSocket, buf->data(), buf->size(), 0,
    339                 (const sockaddr *)&addr, sizeof(addr));
    340 
    341         if (n < (ssize_t)buf->size()) {
    342             ALOGE("failed to poke a hole for RTCP packets");
    343             return false;
    344         }
    345 
    346         ALOGV("successfully poked holes.");
    347 
    348         return true;
    349     }
    350 
    351     virtual void onMessageReceived(const sp<AMessage> &msg) {
    352         switch (msg->what()) {
    353             case 'conn':
    354             {
    355                 int32_t result;
    356                 CHECK(msg->findInt32("result", &result));
    357 
    358                 ALOGI("connection request completed with result %d (%s)",
    359                      result, strerror(-result));
    360 
    361                 if (result == OK) {
    362                     AString request;
    363                     request = "DESCRIBE ";
    364                     request.append(mSessionURL);
    365                     request.append(" RTSP/1.0\r\n");
    366                     request.append("Accept: application/sdp\r\n");
    367                     request.append("\r\n");
    368 
    369                     sp<AMessage> reply = new AMessage('desc', id());
    370                     mConn->sendRequest(request.c_str(), reply);
    371                 } else {
    372                     (new AMessage('disc', id()))->post();
    373                 }
    374                 break;
    375             }
    376 
    377             case 'disc':
    378             {
    379                 ++mKeepAliveGeneration;
    380 
    381                 int32_t reconnect;
    382                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    383                     sp<AMessage> reply = new AMessage('conn', id());
    384                     mConn->connect(mOriginalSessionURL.c_str(), reply);
    385                 } else {
    386                     (new AMessage('quit', id()))->post();
    387                 }
    388                 break;
    389             }
    390 
    391             case 'desc':
    392             {
    393                 int32_t result;
    394                 CHECK(msg->findInt32("result", &result));
    395 
    396                 ALOGI("DESCRIBE completed with result %d (%s)",
    397                      result, strerror(-result));
    398 
    399                 if (result == OK) {
    400                     sp<RefBase> obj;
    401                     CHECK(msg->findObject("response", &obj));
    402                     sp<ARTSPResponse> response =
    403                         static_cast<ARTSPResponse *>(obj.get());
    404 
    405                     if (response->mStatusCode == 302) {
    406                         ssize_t i = response->mHeaders.indexOfKey("location");
    407                         CHECK_GE(i, 0);
    408 
    409                         mSessionURL = response->mHeaders.valueAt(i);
    410 
    411                         AString request;
    412                         request = "DESCRIBE ";
    413                         request.append(mSessionURL);
    414                         request.append(" RTSP/1.0\r\n");
    415                         request.append("Accept: application/sdp\r\n");
    416                         request.append("\r\n");
    417 
    418                         sp<AMessage> reply = new AMessage('desc', id());
    419                         mConn->sendRequest(request.c_str(), reply);
    420                         break;
    421                     }
    422 
    423                     if (response->mStatusCode != 200) {
    424                         result = UNKNOWN_ERROR;
    425                     } else {
    426                         mSessionDesc = new ASessionDescription;
    427 
    428                         mSessionDesc->setTo(
    429                                 response->mContent->data(),
    430                                 response->mContent->size());
    431 
    432                         if (!mSessionDesc->isValid()) {
    433                             ALOGE("Failed to parse session description.");
    434                             result = ERROR_MALFORMED;
    435                         } else {
    436                             ssize_t i = response->mHeaders.indexOfKey("content-base");
    437                             if (i >= 0) {
    438                                 mBaseURL = response->mHeaders.valueAt(i);
    439                             } else {
    440                                 i = response->mHeaders.indexOfKey("content-location");
    441                                 if (i >= 0) {
    442                                     mBaseURL = response->mHeaders.valueAt(i);
    443                                 } else {
    444                                     mBaseURL = mSessionURL;
    445                                 }
    446                             }
    447 
    448                             if (!mBaseURL.startsWith("rtsp://")) {
    449                                 // Some misbehaving servers specify a relative
    450                                 // URL in one of the locations above, combine
    451                                 // it with the absolute session URL to get
    452                                 // something usable...
    453 
    454                                 ALOGW("Server specified a non-absolute base URL"
    455                                      ", combining it with the session URL to "
    456                                      "get something usable...");
    457 
    458                                 AString tmp;
    459                                 CHECK(MakeURL(
    460                                             mSessionURL.c_str(),
    461                                             mBaseURL.c_str(),
    462                                             &tmp));
    463 
    464                                 mBaseURL = tmp;
    465                             }
    466 
    467                             if (mSessionDesc->countTracks() < 2) {
    468                                 // There's no actual tracks in this session.
    469                                 // The first "track" is merely session meta
    470                                 // data.
    471 
    472                                 ALOGW("Session doesn't contain any playable "
    473                                      "tracks. Aborting.");
    474                                 result = ERROR_UNSUPPORTED;
    475                             } else {
    476                                 setupTrack(1);
    477                             }
    478                         }
    479                     }
    480                 }
    481 
    482                 if (result != OK) {
    483                     sp<AMessage> reply = new AMessage('disc', id());
    484                     mConn->disconnect(reply);
    485                 }
    486                 break;
    487             }
    488 
    489             case 'setu':
    490             {
    491                 size_t index;
    492                 CHECK(msg->findSize("index", &index));
    493 
    494                 TrackInfo *track = NULL;
    495                 size_t trackIndex;
    496                 if (msg->findSize("track-index", &trackIndex)) {
    497                     track = &mTracks.editItemAt(trackIndex);
    498                 }
    499 
    500                 int32_t result;
    501                 CHECK(msg->findInt32("result", &result));
    502 
    503                 ALOGI("SETUP(%d) completed with result %d (%s)",
    504                      index, result, strerror(-result));
    505 
    506                 if (result == OK) {
    507                     CHECK(track != NULL);
    508 
    509                     sp<RefBase> obj;
    510                     CHECK(msg->findObject("response", &obj));
    511                     sp<ARTSPResponse> response =
    512                         static_cast<ARTSPResponse *>(obj.get());
    513 
    514                     if (response->mStatusCode != 200) {
    515                         result = UNKNOWN_ERROR;
    516                     } else {
    517                         ssize_t i = response->mHeaders.indexOfKey("session");
    518                         CHECK_GE(i, 0);
    519 
    520                         mSessionID = response->mHeaders.valueAt(i);
    521 
    522                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    523                         AString timeoutStr;
    524                         if (GetAttribute(
    525                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
    526                             char *end;
    527                             unsigned long timeoutSecs =
    528                                 strtoul(timeoutStr.c_str(), &end, 10);
    529 
    530                             if (end == timeoutStr.c_str() || *end != '\0') {
    531                                 ALOGW("server specified malformed timeout '%s'",
    532                                      timeoutStr.c_str());
    533 
    534                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    535                             } else if (timeoutSecs < 15) {
    536                                 ALOGW("server specified too short a timeout "
    537                                      "(%lu secs), using default.",
    538                                      timeoutSecs);
    539 
    540                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    541                             } else {
    542                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
    543 
    544                                 ALOGI("server specified timeout of %lu secs.",
    545                                      timeoutSecs);
    546                             }
    547                         }
    548 
    549                         i = mSessionID.find(";");
    550                         if (i >= 0) {
    551                             // Remove options, i.e. ";timeout=90"
    552                             mSessionID.erase(i, mSessionID.size() - i);
    553                         }
    554 
    555                         sp<AMessage> notify = new AMessage('accu', id());
    556                         notify->setSize("track-index", trackIndex);
    557 
    558                         i = response->mHeaders.indexOfKey("transport");
    559                         CHECK_GE(i, 0);
    560 
    561                         if (!track->mUsingInterleavedTCP) {
    562                             AString transport = response->mHeaders.valueAt(i);
    563 
    564                             // We are going to continue even if we were
    565                             // unable to poke a hole into the firewall...
    566                             pokeAHole(
    567                                     track->mRTPSocket,
    568                                     track->mRTCPSocket,
    569                                     transport);
    570                         }
    571 
    572                         mRTPConn->addStream(
    573                                 track->mRTPSocket, track->mRTCPSocket,
    574                                 mSessionDesc, index,
    575                                 notify, track->mUsingInterleavedTCP);
    576 
    577                         mSetupTracksSuccessful = true;
    578                     }
    579                 }
    580 
    581                 if (result != OK) {
    582                     if (track) {
    583                         if (!track->mUsingInterleavedTCP) {
    584                             // Clear the tag
    585                             if (mUIDValid) {
    586                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
    587                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
    588                             }
    589 
    590                             close(track->mRTPSocket);
    591                             close(track->mRTCPSocket);
    592                         }
    593 
    594                         mTracks.removeItemsAt(trackIndex);
    595                     }
    596                 }
    597 
    598                 ++index;
    599                 if (index < mSessionDesc->countTracks()) {
    600                     setupTrack(index);
    601                 } else if (mSetupTracksSuccessful) {
    602                     ++mKeepAliveGeneration;
    603                     postKeepAlive();
    604 
    605                     AString request = "PLAY ";
    606                     request.append(mSessionURL);
    607                     request.append(" RTSP/1.0\r\n");
    608 
    609                     request.append("Session: ");
    610                     request.append(mSessionID);
    611                     request.append("\r\n");
    612 
    613                     request.append("\r\n");
    614 
    615                     sp<AMessage> reply = new AMessage('play', id());
    616                     mConn->sendRequest(request.c_str(), reply);
    617                 } else {
    618                     sp<AMessage> reply = new AMessage('disc', id());
    619                     mConn->disconnect(reply);
    620                 }
    621                 break;
    622             }
    623 
    624             case 'play':
    625             {
    626                 int32_t result;
    627                 CHECK(msg->findInt32("result", &result));
    628 
    629                 ALOGI("PLAY completed with result %d (%s)",
    630                      result, strerror(-result));
    631 
    632                 if (result == OK) {
    633                     sp<RefBase> obj;
    634                     CHECK(msg->findObject("response", &obj));
    635                     sp<ARTSPResponse> response =
    636                         static_cast<ARTSPResponse *>(obj.get());
    637 
    638                     if (response->mStatusCode != 200) {
    639                         result = UNKNOWN_ERROR;
    640                     } else {
    641                         parsePlayResponse(response);
    642 
    643                         sp<AMessage> timeout = new AMessage('tiou', id());
    644                         timeout->post(kStartupTimeoutUs);
    645                     }
    646                 }
    647 
    648                 if (result != OK) {
    649                     sp<AMessage> reply = new AMessage('disc', id());
    650                     mConn->disconnect(reply);
    651                 }
    652 
    653                 break;
    654             }
    655 
    656             case 'aliv':
    657             {
    658                 int32_t generation;
    659                 CHECK(msg->findInt32("generation", &generation));
    660 
    661                 if (generation != mKeepAliveGeneration) {
    662                     // obsolete event.
    663                     break;
    664                 }
    665 
    666                 AString request;
    667                 request.append("OPTIONS ");
    668                 request.append(mSessionURL);
    669                 request.append(" RTSP/1.0\r\n");
    670                 request.append("Session: ");
    671                 request.append(mSessionID);
    672                 request.append("\r\n");
    673                 request.append("\r\n");
    674 
    675                 sp<AMessage> reply = new AMessage('opts', id());
    676                 reply->setInt32("generation", mKeepAliveGeneration);
    677                 mConn->sendRequest(request.c_str(), reply);
    678                 break;
    679             }
    680 
    681             case 'opts':
    682             {
    683                 int32_t result;
    684                 CHECK(msg->findInt32("result", &result));
    685 
    686                 ALOGI("OPTIONS completed with result %d (%s)",
    687                      result, strerror(-result));
    688 
    689                 int32_t generation;
    690                 CHECK(msg->findInt32("generation", &generation));
    691 
    692                 if (generation != mKeepAliveGeneration) {
    693                     // obsolete event.
    694                     break;
    695                 }
    696 
    697                 postKeepAlive();
    698                 break;
    699             }
    700 
    701             case 'abor':
    702             {
    703                 for (size_t i = 0; i < mTracks.size(); ++i) {
    704                     TrackInfo *info = &mTracks.editItemAt(i);
    705 
    706                     if (!mFirstAccessUnit) {
    707                         postQueueEOS(i, ERROR_END_OF_STREAM);
    708                     }
    709 
    710                     if (!info->mUsingInterleavedTCP) {
    711                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
    712 
    713                         // Clear the tag
    714                         if (mUIDValid) {
    715                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
    716                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
    717                         }
    718 
    719                         close(info->mRTPSocket);
    720                         close(info->mRTCPSocket);
    721                     }
    722                 }
    723                 mTracks.clear();
    724                 mSetupTracksSuccessful = false;
    725                 mSeekPending = false;
    726                 mFirstAccessUnit = true;
    727                 mAllTracksHaveTime = false;
    728                 mNTPAnchorUs = -1;
    729                 mMediaAnchorUs = -1;
    730                 mNumAccessUnitsReceived = 0;
    731                 mReceivedFirstRTCPPacket = false;
    732                 mReceivedFirstRTPPacket = false;
    733                 mSeekable = false;
    734 
    735                 sp<AMessage> reply = new AMessage('tear', id());
    736 
    737                 int32_t reconnect;
    738                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    739                     reply->setInt32("reconnect", true);
    740                 }
    741 
    742                 AString request;
    743                 request = "TEARDOWN ";
    744 
    745                 // XXX should use aggregate url from SDP here...
    746                 request.append(mSessionURL);
    747                 request.append(" RTSP/1.0\r\n");
    748 
    749                 request.append("Session: ");
    750                 request.append(mSessionID);
    751                 request.append("\r\n");
    752 
    753                 request.append("\r\n");
    754 
    755                 mConn->sendRequest(request.c_str(), reply);
    756                 break;
    757             }
    758 
    759             case 'tear':
    760             {
    761                 int32_t result;
    762                 CHECK(msg->findInt32("result", &result));
    763 
    764                 ALOGI("TEARDOWN completed with result %d (%s)",
    765                      result, strerror(-result));
    766 
    767                 sp<AMessage> reply = new AMessage('disc', id());
    768 
    769                 int32_t reconnect;
    770                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    771                     reply->setInt32("reconnect", true);
    772                 }
    773 
    774                 mConn->disconnect(reply);
    775                 break;
    776             }
    777 
    778             case 'quit':
    779             {
    780                 sp<AMessage> msg = mNotify->dup();
    781                 msg->setInt32("what", kWhatDisconnected);
    782                 msg->setInt32("result", UNKNOWN_ERROR);
    783                 msg->post();
    784                 break;
    785             }
    786 
    787             case 'chek':
    788             {
    789                 int32_t generation;
    790                 CHECK(msg->findInt32("generation", &generation));
    791                 if (generation != mCheckGeneration) {
    792                     // This is an outdated message. Ignore.
    793                     break;
    794                 }
    795 
    796                 if (mNumAccessUnitsReceived == 0) {
    797 #if 1
    798                     ALOGI("stream ended? aborting.");
    799                     (new AMessage('abor', id()))->post();
    800                     break;
    801 #else
    802                     ALOGI("haven't seen an AU in a looong time.");
    803 #endif
    804                 }
    805 
    806                 mNumAccessUnitsReceived = 0;
    807                 msg->post(kAccessUnitTimeoutUs);
    808                 break;
    809             }
    810 
    811             case 'accu':
    812             {
    813                 int32_t timeUpdate;
    814                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
    815                     size_t trackIndex;
    816                     CHECK(msg->findSize("track-index", &trackIndex));
    817 
    818                     uint32_t rtpTime;
    819                     uint64_t ntpTime;
    820                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
    821                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
    822 
    823                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
    824                     break;
    825                 }
    826 
    827                 int32_t first;
    828                 if (msg->findInt32("first-rtcp", &first)) {
    829                     mReceivedFirstRTCPPacket = true;
    830                     break;
    831                 }
    832 
    833                 if (msg->findInt32("first-rtp", &first)) {
    834                     mReceivedFirstRTPPacket = true;
    835                     break;
    836                 }
    837 
    838                 ++mNumAccessUnitsReceived;
    839                 postAccessUnitTimeoutCheck();
    840 
    841                 size_t trackIndex;
    842                 CHECK(msg->findSize("track-index", &trackIndex));
    843 
    844                 if (trackIndex >= mTracks.size()) {
    845                     ALOGV("late packets ignored.");
    846                     break;
    847                 }
    848 
    849                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
    850 
    851                 int32_t eos;
    852                 if (msg->findInt32("eos", &eos)) {
    853                     ALOGI("received BYE on track index %d", trackIndex);
    854 #if 0
    855                     track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
    856 #endif
    857                     return;
    858                 }
    859 
    860                 sp<ABuffer> accessUnit;
    861                 CHECK(msg->findBuffer("access-unit", &accessUnit));
    862 
    863                 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
    864 
    865                 if (mSeekPending) {
    866                     ALOGV("we're seeking, dropping stale packet.");
    867                     break;
    868                 }
    869 
    870                 if (seqNum < track->mFirstSeqNumInSegment) {
    871                     ALOGV("dropping stale access-unit (%d < %d)",
    872                          seqNum, track->mFirstSeqNumInSegment);
    873                     break;
    874                 }
    875 
    876                 if (track->mNewSegment) {
    877                     track->mNewSegment = false;
    878                 }
    879 
    880                 onAccessUnitComplete(trackIndex, accessUnit);
    881                 break;
    882             }
    883 
    884             case 'seek':
    885             {
    886                 if (!mSeekable) {
    887                     ALOGW("This is a live stream, ignoring seek request.");
    888 
    889                     sp<AMessage> msg = mNotify->dup();
    890                     msg->setInt32("what", kWhatSeekDone);
    891                     msg->post();
    892                     break;
    893                 }
    894 
    895                 int64_t timeUs;
    896                 CHECK(msg->findInt64("time", &timeUs));
    897 
    898                 mSeekPending = true;
    899 
    900                 // Disable the access unit timeout until we resumed
    901                 // playback again.
    902                 mCheckPending = true;
    903                 ++mCheckGeneration;
    904 
    905                 AString request = "PAUSE ";
    906                 request.append(mSessionURL);
    907                 request.append(" RTSP/1.0\r\n");
    908 
    909                 request.append("Session: ");
    910                 request.append(mSessionID);
    911                 request.append("\r\n");
    912 
    913                 request.append("\r\n");
    914 
    915                 sp<AMessage> reply = new AMessage('see1', id());
    916                 reply->setInt64("time", timeUs);
    917                 mConn->sendRequest(request.c_str(), reply);
    918                 break;
    919             }
    920 
    921             case 'see1':
    922             {
    923                 // Session is paused now.
    924                 for (size_t i = 0; i < mTracks.size(); ++i) {
    925                     TrackInfo *info = &mTracks.editItemAt(i);
    926 
    927                     postQueueSeekDiscontinuity(i);
    928 
    929                     info->mRTPAnchor = 0;
    930                     info->mNTPAnchorUs = -1;
    931                 }
    932 
    933                 mAllTracksHaveTime = false;
    934                 mNTPAnchorUs = -1;
    935 
    936                 int64_t timeUs;
    937                 CHECK(msg->findInt64("time", &timeUs));
    938 
    939                 AString request = "PLAY ";
    940                 request.append(mSessionURL);
    941                 request.append(" RTSP/1.0\r\n");
    942 
    943                 request.append("Session: ");
    944                 request.append(mSessionID);
    945                 request.append("\r\n");
    946 
    947                 request.append(
    948                         StringPrintf(
    949                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
    950 
    951                 request.append("\r\n");
    952 
    953                 sp<AMessage> reply = new AMessage('see2', id());
    954                 mConn->sendRequest(request.c_str(), reply);
    955                 break;
    956             }
    957 
    958             case 'see2':
    959             {
    960                 CHECK(mSeekPending);
    961 
    962                 int32_t result;
    963                 CHECK(msg->findInt32("result", &result));
    964 
    965                 ALOGI("PLAY completed with result %d (%s)",
    966                      result, strerror(-result));
    967 
    968                 mCheckPending = false;
    969                 postAccessUnitTimeoutCheck();
    970 
    971                 if (result == OK) {
    972                     sp<RefBase> obj;
    973                     CHECK(msg->findObject("response", &obj));
    974                     sp<ARTSPResponse> response =
    975                         static_cast<ARTSPResponse *>(obj.get());
    976 
    977                     if (response->mStatusCode != 200) {
    978                         result = UNKNOWN_ERROR;
    979                     } else {
    980                         parsePlayResponse(response);
    981 
    982                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
    983                         CHECK_GE(i, 0);
    984 
    985                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
    986 
    987                         ALOGI("seek completed.");
    988                     }
    989                 }
    990 
    991                 if (result != OK) {
    992                     ALOGE("seek failed, aborting.");
    993                     (new AMessage('abor', id()))->post();
    994                 }
    995 
    996                 mSeekPending = false;
    997 
    998                 sp<AMessage> msg = mNotify->dup();
    999                 msg->setInt32("what", kWhatSeekDone);
   1000                 msg->post();
   1001                 break;
   1002             }
   1003 
   1004             case 'biny':
   1005             {
   1006                 sp<ABuffer> buffer;
   1007                 CHECK(msg->findBuffer("buffer", &buffer));
   1008 
   1009                 int32_t index;
   1010                 CHECK(buffer->meta()->findInt32("index", &index));
   1011 
   1012                 mRTPConn->injectPacket(index, buffer);
   1013                 break;
   1014             }
   1015 
   1016             case 'tiou':
   1017             {
   1018                 if (!mReceivedFirstRTCPPacket) {
   1019                     if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
   1020                         ALOGW("We received RTP packets but no RTCP packets, "
   1021                              "using fake timestamps.");
   1022 
   1023                         mTryFakeRTCP = true;
   1024 
   1025                         mReceivedFirstRTCPPacket = true;
   1026 
   1027                         fakeTimestamps();
   1028                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
   1029                         ALOGW("Never received any data, switching transports.");
   1030 
   1031                         mTryTCPInterleaving = true;
   1032 
   1033                         sp<AMessage> msg = new AMessage('abor', id());
   1034                         msg->setInt32("reconnect", true);
   1035                         msg->post();
   1036                     } else {
   1037                         ALOGW("Never received any data, disconnecting.");
   1038                         (new AMessage('abor', id()))->post();
   1039                     }
   1040                 } else {
   1041                     if (!mAllTracksHaveTime) {
   1042                         ALOGW("We received some RTCP packets, but time "
   1043                               "could not be established on all tracks, now "
   1044                               "using fake timestamps");
   1045 
   1046                         fakeTimestamps();
   1047                     }
   1048                 }
   1049                 break;
   1050             }
   1051 
   1052             default:
   1053                 TRESPASS();
   1054                 break;
   1055         }
   1056     }
   1057 
   1058     void postKeepAlive() {
   1059         sp<AMessage> msg = new AMessage('aliv', id());
   1060         msg->setInt32("generation", mKeepAliveGeneration);
   1061         msg->post((mKeepAliveTimeoutUs * 9) / 10);
   1062     }
   1063 
   1064     void postAccessUnitTimeoutCheck() {
   1065         if (mCheckPending) {
   1066             return;
   1067         }
   1068 
   1069         mCheckPending = true;
   1070         sp<AMessage> check = new AMessage('chek', id());
   1071         check->setInt32("generation", mCheckGeneration);
   1072         check->post(kAccessUnitTimeoutUs);
   1073     }
   1074 
   1075     static void SplitString(
   1076             const AString &s, const char *separator, List<AString> *items) {
   1077         items->clear();
   1078         size_t start = 0;
   1079         while (start < s.size()) {
   1080             ssize_t offset = s.find(separator, start);
   1081 
   1082             if (offset < 0) {
   1083                 items->push_back(AString(s, start, s.size() - start));
   1084                 break;
   1085             }
   1086 
   1087             items->push_back(AString(s, start, offset - start));
   1088             start = offset + strlen(separator);
   1089         }
   1090     }
   1091 
   1092     void parsePlayResponse(const sp<ARTSPResponse> &response) {
   1093         mSeekable = false;
   1094 
   1095         ssize_t i = response->mHeaders.indexOfKey("range");
   1096         if (i < 0) {
   1097             // Server doesn't even tell use what range it is going to
   1098             // play, therefore we won't support seeking.
   1099             return;
   1100         }
   1101 
   1102         AString range = response->mHeaders.valueAt(i);
   1103         ALOGV("Range: %s", range.c_str());
   1104 
   1105         AString val;
   1106         CHECK(GetAttribute(range.c_str(), "npt", &val));
   1107 
   1108         float npt1, npt2;
   1109         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
   1110             // This is a live stream and therefore not seekable.
   1111 
   1112             ALOGI("This is a live stream");
   1113             return;
   1114         }
   1115 
   1116         i = response->mHeaders.indexOfKey("rtp-info");
   1117         CHECK_GE(i, 0);
   1118 
   1119         AString rtpInfo = response->mHeaders.valueAt(i);
   1120         List<AString> streamInfos;
   1121         SplitString(rtpInfo, ",", &streamInfos);
   1122 
   1123         int n = 1;
   1124         for (List<AString>::iterator it = streamInfos.begin();
   1125              it != streamInfos.end(); ++it) {
   1126             (*it).trim();
   1127             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
   1128 
   1129             CHECK(GetAttribute((*it).c_str(), "url", &val));
   1130 
   1131             size_t trackIndex = 0;
   1132             while (trackIndex < mTracks.size()
   1133                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
   1134                 ++trackIndex;
   1135             }
   1136             CHECK_LT(trackIndex, mTracks.size());
   1137 
   1138             CHECK(GetAttribute((*it).c_str(), "seq", &val));
   1139 
   1140             char *end;
   1141             unsigned long seq = strtoul(val.c_str(), &end, 10);
   1142 
   1143             TrackInfo *info = &mTracks.editItemAt(trackIndex);
   1144             info->mFirstSeqNumInSegment = seq;
   1145             info->mNewSegment = true;
   1146 
   1147             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
   1148 
   1149             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
   1150 
   1151             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
   1152 
   1153             info->mNormalPlayTimeRTP = rtpTime;
   1154             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
   1155 
   1156             if (!mFirstAccessUnit) {
   1157                 postNormalPlayTimeMapping(
   1158                         trackIndex,
   1159                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1160             }
   1161 
   1162             ++n;
   1163         }
   1164 
   1165         mSeekable = true;
   1166     }
   1167 
   1168     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
   1169         CHECK_GE(index, 0u);
   1170         CHECK_LT(index, mTracks.size());
   1171 
   1172         const TrackInfo &info = mTracks.itemAt(index);
   1173 
   1174         *timeScale = info.mTimeScale;
   1175 
   1176         return info.mPacketSource->getFormat();
   1177     }
   1178 
   1179     size_t countTracks() const {
   1180         return mTracks.size();
   1181     }
   1182 
   1183 private:
   1184     struct TrackInfo {
   1185         AString mURL;
   1186         int mRTPSocket;
   1187         int mRTCPSocket;
   1188         bool mUsingInterleavedTCP;
   1189         uint32_t mFirstSeqNumInSegment;
   1190         bool mNewSegment;
   1191 
   1192         uint32_t mRTPAnchor;
   1193         int64_t mNTPAnchorUs;
   1194         int32_t mTimeScale;
   1195 
   1196         uint32_t mNormalPlayTimeRTP;
   1197         int64_t mNormalPlayTimeUs;
   1198 
   1199         sp<APacketSource> mPacketSource;
   1200 
   1201         // Stores packets temporarily while no notion of time
   1202         // has been established yet.
   1203         List<sp<ABuffer> > mPackets;
   1204     };
   1205 
   1206     sp<AMessage> mNotify;
   1207     bool mUIDValid;
   1208     uid_t mUID;
   1209     sp<ALooper> mNetLooper;
   1210     sp<ARTSPConnection> mConn;
   1211     sp<ARTPConnection> mRTPConn;
   1212     sp<ASessionDescription> mSessionDesc;
   1213     AString mOriginalSessionURL;  // This one still has user:pass@
   1214     AString mSessionURL;
   1215     AString mSessionHost;
   1216     AString mBaseURL;
   1217     AString mSessionID;
   1218     bool mSetupTracksSuccessful;
   1219     bool mSeekPending;
   1220     bool mFirstAccessUnit;
   1221 
   1222     bool mAllTracksHaveTime;
   1223     int64_t mNTPAnchorUs;
   1224     int64_t mMediaAnchorUs;
   1225     int64_t mLastMediaTimeUs;
   1226 
   1227     int64_t mNumAccessUnitsReceived;
   1228     bool mCheckPending;
   1229     int32_t mCheckGeneration;
   1230     bool mTryTCPInterleaving;
   1231     bool mTryFakeRTCP;
   1232     bool mReceivedFirstRTCPPacket;
   1233     bool mReceivedFirstRTPPacket;
   1234     bool mSeekable;
   1235     int64_t mKeepAliveTimeoutUs;
   1236     int32_t mKeepAliveGeneration;
   1237 
   1238     Vector<TrackInfo> mTracks;
   1239 
   1240     void setupTrack(size_t index) {
   1241         sp<APacketSource> source =
   1242             new APacketSource(mSessionDesc, index);
   1243 
   1244         if (source->initCheck() != OK) {
   1245             ALOGW("Unsupported format. Ignoring track #%d.", index);
   1246 
   1247             sp<AMessage> reply = new AMessage('setu', id());
   1248             reply->setSize("index", index);
   1249             reply->setInt32("result", ERROR_UNSUPPORTED);
   1250             reply->post();
   1251             return;
   1252         }
   1253 
   1254         AString url;
   1255         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
   1256 
   1257         AString trackURL;
   1258         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
   1259 
   1260         mTracks.push(TrackInfo());
   1261         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
   1262         info->mURL = trackURL;
   1263         info->mPacketSource = source;
   1264         info->mUsingInterleavedTCP = false;
   1265         info->mFirstSeqNumInSegment = 0;
   1266         info->mNewSegment = true;
   1267         info->mRTPAnchor = 0;
   1268         info->mNTPAnchorUs = -1;
   1269         info->mNormalPlayTimeRTP = 0;
   1270         info->mNormalPlayTimeUs = 0ll;
   1271 
   1272         unsigned long PT;
   1273         AString formatDesc;
   1274         AString formatParams;
   1275         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
   1276 
   1277         int32_t timescale;
   1278         int32_t numChannels;
   1279         ASessionDescription::ParseFormatDesc(
   1280                 formatDesc.c_str(), &timescale, &numChannels);
   1281 
   1282         info->mTimeScale = timescale;
   1283 
   1284         ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
   1285 
   1286         AString request = "SETUP ";
   1287         request.append(trackURL);
   1288         request.append(" RTSP/1.0\r\n");
   1289 
   1290         if (mTryTCPInterleaving) {
   1291             size_t interleaveIndex = 2 * (mTracks.size() - 1);
   1292             info->mUsingInterleavedTCP = true;
   1293             info->mRTPSocket = interleaveIndex;
   1294             info->mRTCPSocket = interleaveIndex + 1;
   1295 
   1296             request.append("Transport: RTP/AVP/TCP;interleaved=");
   1297             request.append(interleaveIndex);
   1298             request.append("-");
   1299             request.append(interleaveIndex + 1);
   1300         } else {
   1301             unsigned rtpPort;
   1302             ARTPConnection::MakePortPair(
   1303                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
   1304 
   1305             if (mUIDValid) {
   1306                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
   1307                                                 (uint32_t)*(uint32_t*) "RTP_");
   1308                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
   1309                                                 (uint32_t)*(uint32_t*) "RTP_");
   1310             }
   1311 
   1312             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
   1313             request.append(rtpPort);
   1314             request.append("-");
   1315             request.append(rtpPort + 1);
   1316         }
   1317 
   1318         request.append("\r\n");
   1319 
   1320         if (index > 1) {
   1321             request.append("Session: ");
   1322             request.append(mSessionID);
   1323             request.append("\r\n");
   1324         }
   1325 
   1326         request.append("\r\n");
   1327 
   1328         sp<AMessage> reply = new AMessage('setu', id());
   1329         reply->setSize("index", index);
   1330         reply->setSize("track-index", mTracks.size() - 1);
   1331         mConn->sendRequest(request.c_str(), reply);
   1332     }
   1333 
   1334     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
   1335         out->clear();
   1336 
   1337         if (strncasecmp("rtsp://", baseURL, 7)) {
   1338             // Base URL must be absolute
   1339             return false;
   1340         }
   1341 
   1342         if (!strncasecmp("rtsp://", url, 7)) {
   1343             // "url" is already an absolute URL, ignore base URL.
   1344             out->setTo(url);
   1345             return true;
   1346         }
   1347 
   1348         size_t n = strlen(baseURL);
   1349         if (baseURL[n - 1] == '/') {
   1350             out->setTo(baseURL);
   1351             out->append(url);
   1352         } else {
   1353             const char *slashPos = strrchr(baseURL, '/');
   1354 
   1355             if (slashPos > &baseURL[6]) {
   1356                 out->setTo(baseURL, slashPos - baseURL);
   1357             } else {
   1358                 out->setTo(baseURL);
   1359             }
   1360 
   1361             out->append("/");
   1362             out->append(url);
   1363         }
   1364 
   1365         return true;
   1366     }
   1367 
   1368     void fakeTimestamps() {
   1369         mNTPAnchorUs = -1ll;
   1370         for (size_t i = 0; i < mTracks.size(); ++i) {
   1371             onTimeUpdate(i, 0, 0ll);
   1372         }
   1373     }
   1374 
   1375     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
   1376         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
   1377              trackIndex, rtpTime, ntpTime);
   1378 
   1379         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
   1380 
   1381         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1382 
   1383         track->mRTPAnchor = rtpTime;
   1384         track->mNTPAnchorUs = ntpTimeUs;
   1385 
   1386         if (mNTPAnchorUs < 0) {
   1387             mNTPAnchorUs = ntpTimeUs;
   1388             mMediaAnchorUs = mLastMediaTimeUs;
   1389         }
   1390 
   1391         if (!mAllTracksHaveTime) {
   1392             bool allTracksHaveTime = true;
   1393             for (size_t i = 0; i < mTracks.size(); ++i) {
   1394                 TrackInfo *track = &mTracks.editItemAt(i);
   1395                 if (track->mNTPAnchorUs < 0) {
   1396                     allTracksHaveTime = false;
   1397                     break;
   1398                 }
   1399             }
   1400             if (allTracksHaveTime) {
   1401                 mAllTracksHaveTime = true;
   1402                 ALOGI("Time now established for all tracks.");
   1403             }
   1404         }
   1405     }
   1406 
   1407     void onAccessUnitComplete(
   1408             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
   1409         ALOGV("onAccessUnitComplete track %d", trackIndex);
   1410 
   1411         if (mFirstAccessUnit) {
   1412             sp<AMessage> msg = mNotify->dup();
   1413             msg->setInt32("what", kWhatConnected);
   1414             msg->post();
   1415 
   1416             if (mSeekable) {
   1417                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1418                     TrackInfo *info = &mTracks.editItemAt(i);
   1419 
   1420                     postNormalPlayTimeMapping(
   1421                             i,
   1422                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1423                 }
   1424             }
   1425 
   1426             mFirstAccessUnit = false;
   1427         }
   1428 
   1429         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1430 
   1431         if (!mAllTracksHaveTime) {
   1432             ALOGV("storing accessUnit, no time established yet");
   1433             track->mPackets.push_back(accessUnit);
   1434             return;
   1435         }
   1436 
   1437         while (!track->mPackets.empty()) {
   1438             sp<ABuffer> accessUnit = *track->mPackets.begin();
   1439             track->mPackets.erase(track->mPackets.begin());
   1440 
   1441             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1442                 postQueueAccessUnit(trackIndex, accessUnit);
   1443             }
   1444         }
   1445 
   1446         if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1447             postQueueAccessUnit(trackIndex, accessUnit);
   1448         }
   1449     }
   1450 
   1451     bool addMediaTimestamp(
   1452             int32_t trackIndex, const TrackInfo *track,
   1453             const sp<ABuffer> &accessUnit) {
   1454         uint32_t rtpTime;
   1455         CHECK(accessUnit->meta()->findInt32(
   1456                     "rtp-time", (int32_t *)&rtpTime));
   1457 
   1458         int64_t relRtpTimeUs =
   1459             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
   1460                 / track->mTimeScale;
   1461 
   1462         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
   1463 
   1464         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
   1465 
   1466         if (mediaTimeUs > mLastMediaTimeUs) {
   1467             mLastMediaTimeUs = mediaTimeUs;
   1468         }
   1469 
   1470         if (mediaTimeUs < 0) {
   1471             ALOGV("dropping early accessUnit.");
   1472             return false;
   1473         }
   1474 
   1475         ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
   1476              trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
   1477 
   1478         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
   1479 
   1480         return true;
   1481     }
   1482 
   1483     void postQueueAccessUnit(
   1484             size_t trackIndex, const sp<ABuffer> &accessUnit) {
   1485         sp<AMessage> msg = mNotify->dup();
   1486         msg->setInt32("what", kWhatAccessUnit);
   1487         msg->setSize("trackIndex", trackIndex);
   1488         msg->setBuffer("accessUnit", accessUnit);
   1489         msg->post();
   1490     }
   1491 
   1492     void postQueueEOS(size_t trackIndex, status_t finalResult) {
   1493         sp<AMessage> msg = mNotify->dup();
   1494         msg->setInt32("what", kWhatEOS);
   1495         msg->setSize("trackIndex", trackIndex);
   1496         msg->setInt32("finalResult", finalResult);
   1497         msg->post();
   1498     }
   1499 
   1500     void postQueueSeekDiscontinuity(size_t trackIndex) {
   1501         sp<AMessage> msg = mNotify->dup();
   1502         msg->setInt32("what", kWhatSeekDiscontinuity);
   1503         msg->setSize("trackIndex", trackIndex);
   1504         msg->post();
   1505     }
   1506 
   1507     void postNormalPlayTimeMapping(
   1508             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
   1509         sp<AMessage> msg = mNotify->dup();
   1510         msg->setInt32("what", kWhatNormalPlayTimeMapping);
   1511         msg->setSize("trackIndex", trackIndex);
   1512         msg->setInt32("rtpTime", rtpTime);
   1513         msg->setInt64("nptUs", nptUs);
   1514         msg->post();
   1515     }
   1516 
   1517     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
   1518 };
   1519 
   1520 }  // namespace android
   1521 
   1522 #endif  // MY_HANDLER_H_
   1523