Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #ifndef MY_HANDLER_H_
     18 
     19 #define MY_HANDLER_H_
     20 
     21 //#define LOG_NDEBUG 0
     22 #define LOG_TAG "MyHandler"
     23 #include <utils/Log.h>
     24 
     25 #include "APacketSource.h"
     26 #include "ARTPConnection.h"
     27 #include "ARTSPConnection.h"
     28 #include "ASessionDescription.h"
     29 
     30 #include <ctype.h>
     31 
     32 #include <media/stagefright/foundation/ABuffer.h>
     33 #include <media/stagefright/foundation/ADebug.h>
     34 #include <media/stagefright/foundation/ALooper.h>
     35 #include <media/stagefright/foundation/AMessage.h>
     36 #include <media/stagefright/MediaErrors.h>
     37 #include <media/stagefright/Utils.h>
     38 
     39 #include <arpa/inet.h>
     40 #include <sys/socket.h>
     41 #include <netdb.h>
     42 
     43 #include "HTTPBase.h"
     44 
     45 // If no access units are received within 5 secs, assume that the rtp
     46 // stream has ended and signal end of stream.
     47 static int64_t kAccessUnitTimeoutUs = 10000000ll;
     48 
     49 // If no access units arrive for the first 10 secs after starting the
     50 // stream, assume none ever will and signal EOS or switch transports.
     51 static int64_t kStartupTimeoutUs = 10000000ll;
     52 
     53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
     54 
     55 static int64_t kPauseDelayUs = 3000000ll;
     56 
     57 namespace android {
     58 
     59 static bool GetAttribute(const char *s, const char *key, AString *value) {
     60     value->clear();
     61 
     62     size_t keyLen = strlen(key);
     63 
     64     for (;;) {
     65         while (isspace(*s)) {
     66             ++s;
     67         }
     68 
     69         const char *colonPos = strchr(s, ';');
     70 
     71         size_t len =
     72             (colonPos == NULL) ? strlen(s) : colonPos - s;
     73 
     74         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
     75             value->setTo(&s[keyLen + 1], len - keyLen - 1);
     76             return true;
     77         }
     78 
     79         if (colonPos == NULL) {
     80             return false;
     81         }
     82 
     83         s = colonPos + 1;
     84     }
     85 }
     86 
     87 struct MyHandler : public AHandler {
     88     enum {
     89         kWhatConnected                  = 'conn',
     90         kWhatDisconnected               = 'disc',
     91         kWhatSeekDone                   = 'sdon',
     92 
     93         kWhatAccessUnit                 = 'accU',
     94         kWhatEOS                        = 'eos!',
     95         kWhatSeekDiscontinuity          = 'seeD',
     96         kWhatNormalPlayTimeMapping      = 'nptM',
     97     };
     98 
     99     MyHandler(
    100             const char *url,
    101             const sp<AMessage> &notify,
    102             bool uidValid = false, uid_t uid = 0)
    103         : mNotify(notify),
    104           mUIDValid(uidValid),
    105           mUID(uid),
    106           mNetLooper(new ALooper),
    107           mConn(new ARTSPConnection(mUIDValid, mUID)),
    108           mRTPConn(new ARTPConnection),
    109           mOriginalSessionURL(url),
    110           mSessionURL(url),
    111           mSetupTracksSuccessful(false),
    112           mSeekPending(false),
    113           mFirstAccessUnit(true),
    114           mAllTracksHaveTime(false),
    115           mNTPAnchorUs(-1),
    116           mMediaAnchorUs(-1),
    117           mLastMediaTimeUs(0),
    118           mNumAccessUnitsReceived(0),
    119           mCheckPending(false),
    120           mCheckGeneration(0),
    121           mCheckTimeoutGeneration(0),
    122           mTryTCPInterleaving(false),
    123           mTryFakeRTCP(false),
    124           mReceivedFirstRTCPPacket(false),
    125           mReceivedFirstRTPPacket(false),
    126           mSeekable(true),
    127           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
    128           mKeepAliveGeneration(0),
    129           mPausing(false),
    130           mPauseGeneration(0) {
    131         mNetLooper->setName("rtsp net");
    132         mNetLooper->start(false /* runOnCallingThread */,
    133                           false /* canCallJava */,
    134                           PRIORITY_HIGHEST);
    135 
    136         // Strip any authentication info from the session url, we don't
    137         // want to transmit user/pass in cleartext.
    138         AString host, path, user, pass;
    139         unsigned port;
    140         CHECK(ARTSPConnection::ParseURL(
    141                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
    142 
    143         if (user.size() > 0) {
    144             mSessionURL.clear();
    145             mSessionURL.append("rtsp://");
    146             mSessionURL.append(host);
    147             mSessionURL.append(":");
    148             mSessionURL.append(StringPrintf("%u", port));
    149             mSessionURL.append(path);
    150 
    151             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
    152         }
    153 
    154         mSessionHost = host;
    155     }
    156 
    157     void connect() {
    158         looper()->registerHandler(mConn);
    159         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    160 
    161         sp<AMessage> notify = new AMessage('biny', id());
    162         mConn->observeBinaryData(notify);
    163 
    164         sp<AMessage> reply = new AMessage('conn', id());
    165         mConn->connect(mOriginalSessionURL.c_str(), reply);
    166     }
    167 
    168     void loadSDP(const sp<ASessionDescription>& desc) {
    169         looper()->registerHandler(mConn);
    170         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    171 
    172         sp<AMessage> notify = new AMessage('biny', id());
    173         mConn->observeBinaryData(notify);
    174 
    175         sp<AMessage> reply = new AMessage('sdpl', id());
    176         reply->setObject("description", desc);
    177         mConn->connect(mOriginalSessionURL.c_str(), reply);
    178     }
    179 
    180     AString getControlURL(sp<ASessionDescription> desc) {
    181         AString sessionLevelControlURL;
    182         if (mSessionDesc->findAttribute(
    183                 0,
    184                 "a=control",
    185                 &sessionLevelControlURL)) {
    186             if (sessionLevelControlURL.compare("*") == 0) {
    187                 return mBaseURL;
    188             } else {
    189                 AString controlURL;
    190                 CHECK(MakeURL(
    191                         mBaseURL.c_str(),
    192                         sessionLevelControlURL.c_str(),
    193                         &controlURL));
    194                 return controlURL;
    195             }
    196         } else {
    197             return mSessionURL;
    198         }
    199     }
    200 
    201     void disconnect() {
    202         (new AMessage('abor', id()))->post();
    203     }
    204 
    205     void seek(int64_t timeUs) {
    206         sp<AMessage> msg = new AMessage('seek', id());
    207         msg->setInt64("time", timeUs);
    208         mPauseGeneration++;
    209         msg->post();
    210     }
    211 
    212     bool isSeekable() const {
    213         return mSeekable;
    214     }
    215 
    216     void pause() {
    217         sp<AMessage> msg = new AMessage('paus', id());
    218         mPauseGeneration++;
    219         msg->setInt32("pausecheck", mPauseGeneration);
    220         msg->post(kPauseDelayUs);
    221     }
    222 
    223     void resume() {
    224         sp<AMessage> msg = new AMessage('resu', id());
    225         mPauseGeneration++;
    226         msg->post();
    227     }
    228 
    229     static void addRR(const sp<ABuffer> &buf) {
    230         uint8_t *ptr = buf->data() + buf->size();
    231         ptr[0] = 0x80 | 0;
    232         ptr[1] = 201;  // RR
    233         ptr[2] = 0;
    234         ptr[3] = 1;
    235         ptr[4] = 0xde;  // SSRC
    236         ptr[5] = 0xad;
    237         ptr[6] = 0xbe;
    238         ptr[7] = 0xef;
    239 
    240         buf->setRange(0, buf->size() + 8);
    241     }
    242 
    243     static void addSDES(int s, const sp<ABuffer> &buffer) {
    244         struct sockaddr_in addr;
    245         socklen_t addrSize = sizeof(addr);
    246         CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
    247 
    248         uint8_t *data = buffer->data() + buffer->size();
    249         data[0] = 0x80 | 1;
    250         data[1] = 202;  // SDES
    251         data[4] = 0xde;  // SSRC
    252         data[5] = 0xad;
    253         data[6] = 0xbe;
    254         data[7] = 0xef;
    255 
    256         size_t offset = 8;
    257 
    258         data[offset++] = 1;  // CNAME
    259 
    260         AString cname = "stagefright@";
    261         cname.append(inet_ntoa(addr.sin_addr));
    262         data[offset++] = cname.size();
    263 
    264         memcpy(&data[offset], cname.c_str(), cname.size());
    265         offset += cname.size();
    266 
    267         data[offset++] = 6;  // TOOL
    268 
    269         AString tool = MakeUserAgent();
    270 
    271         data[offset++] = tool.size();
    272 
    273         memcpy(&data[offset], tool.c_str(), tool.size());
    274         offset += tool.size();
    275 
    276         data[offset++] = 0;
    277 
    278         if ((offset % 4) > 0) {
    279             size_t count = 4 - (offset % 4);
    280             switch (count) {
    281                 case 3:
    282                     data[offset++] = 0;
    283                 case 2:
    284                     data[offset++] = 0;
    285                 case 1:
    286                     data[offset++] = 0;
    287             }
    288         }
    289 
    290         size_t numWords = (offset / 4) - 1;
    291         data[2] = numWords >> 8;
    292         data[3] = numWords & 0xff;
    293 
    294         buffer->setRange(buffer->offset(), buffer->size() + offset);
    295     }
    296 
    297     // In case we're behind NAT, fire off two UDP packets to the remote
    298     // rtp/rtcp ports to poke a hole into the firewall for future incoming
    299     // packets. We're going to send an RR/SDES RTCP packet to both of them.
    300     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
    301         struct sockaddr_in addr;
    302         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    303         addr.sin_family = AF_INET;
    304 
    305         AString source;
    306         AString server_port;
    307         if (!GetAttribute(transport.c_str(),
    308                           "source",
    309                           &source)) {
    310             ALOGW("Missing 'source' field in Transport response. Using "
    311                  "RTSP endpoint address.");
    312 
    313             struct hostent *ent = gethostbyname(mSessionHost.c_str());
    314             if (ent == NULL) {
    315                 ALOGE("Failed to look up address of session host '%s'",
    316                      mSessionHost.c_str());
    317 
    318                 return false;
    319             }
    320 
    321             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    322         } else {
    323             addr.sin_addr.s_addr = inet_addr(source.c_str());
    324         }
    325 
    326         if (!GetAttribute(transport.c_str(),
    327                                  "server_port",
    328                                  &server_port)) {
    329             ALOGI("Missing 'server_port' field in Transport response.");
    330             return false;
    331         }
    332 
    333         int rtpPort, rtcpPort;
    334         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
    335                 || rtpPort <= 0 || rtpPort > 65535
    336                 || rtcpPort <=0 || rtcpPort > 65535
    337                 || rtcpPort != rtpPort + 1) {
    338             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
    339                  " RTP port must be even, RTCP port must be one higher.",
    340                  server_port.c_str());
    341 
    342             return false;
    343         }
    344 
    345         if (rtpPort & 1) {
    346             ALOGW("Server picked an odd RTP port, it should've picked an "
    347                  "even one, we'll let it pass for now, but this may break "
    348                  "in the future.");
    349         }
    350 
    351         if (addr.sin_addr.s_addr == INADDR_NONE) {
    352             return true;
    353         }
    354 
    355         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
    356             // No firewalls to traverse on the loopback interface.
    357             return true;
    358         }
    359 
    360         // Make up an RR/SDES RTCP packet.
    361         sp<ABuffer> buf = new ABuffer(65536);
    362         buf->setRange(0, 0);
    363         addRR(buf);
    364         addSDES(rtpSocket, buf);
    365 
    366         addr.sin_port = htons(rtpPort);
    367 
    368         ssize_t n = sendto(
    369                 rtpSocket, buf->data(), buf->size(), 0,
    370                 (const sockaddr *)&addr, sizeof(addr));
    371 
    372         if (n < (ssize_t)buf->size()) {
    373             ALOGE("failed to poke a hole for RTP packets");
    374             return false;
    375         }
    376 
    377         addr.sin_port = htons(rtcpPort);
    378 
    379         n = sendto(
    380                 rtcpSocket, buf->data(), buf->size(), 0,
    381                 (const sockaddr *)&addr, sizeof(addr));
    382 
    383         if (n < (ssize_t)buf->size()) {
    384             ALOGE("failed to poke a hole for RTCP packets");
    385             return false;
    386         }
    387 
    388         ALOGV("successfully poked holes.");
    389 
    390         return true;
    391     }
    392 
    393     static bool isLiveStream(const sp<ASessionDescription> &desc) {
    394         AString attrLiveStream;
    395         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
    396             ssize_t semicolonPos = attrLiveStream.find(";", 2);
    397 
    398             const char* liveStreamValue;
    399             if (semicolonPos < 0) {
    400                 liveStreamValue = attrLiveStream.c_str();
    401             } else {
    402                 AString valString;
    403                 valString.setTo(attrLiveStream,
    404                         semicolonPos + 1,
    405                         attrLiveStream.size() - semicolonPos - 1);
    406                 liveStreamValue = valString.c_str();
    407             }
    408 
    409             uint32_t value = strtoul(liveStreamValue, NULL, 10);
    410             if (value == 1) {
    411                 ALOGV("found live stream");
    412                 return true;
    413             }
    414         } else {
    415             // It is a live stream if no duration is returned
    416             int64_t durationUs;
    417             if (!desc->getDurationUs(&durationUs)) {
    418                 ALOGV("No duration found, assume live stream");
    419                 return true;
    420             }
    421         }
    422 
    423         return false;
    424     }
    425 
    426     virtual void onMessageReceived(const sp<AMessage> &msg) {
    427         switch (msg->what()) {
    428             case 'conn':
    429             {
    430                 int32_t result;
    431                 CHECK(msg->findInt32("result", &result));
    432 
    433                 ALOGI("connection request completed with result %d (%s)",
    434                      result, strerror(-result));
    435 
    436                 if (result == OK) {
    437                     AString request;
    438                     request = "DESCRIBE ";
    439                     request.append(mSessionURL);
    440                     request.append(" RTSP/1.0\r\n");
    441                     request.append("Accept: application/sdp\r\n");
    442                     request.append("\r\n");
    443 
    444                     sp<AMessage> reply = new AMessage('desc', id());
    445                     mConn->sendRequest(request.c_str(), reply);
    446                 } else {
    447                     (new AMessage('disc', id()))->post();
    448                 }
    449                 break;
    450             }
    451 
    452             case 'disc':
    453             {
    454                 ++mKeepAliveGeneration;
    455 
    456                 int32_t reconnect;
    457                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    458                     sp<AMessage> reply = new AMessage('conn', id());
    459                     mConn->connect(mOriginalSessionURL.c_str(), reply);
    460                 } else {
    461                     (new AMessage('quit', id()))->post();
    462                 }
    463                 break;
    464             }
    465 
    466             case 'desc':
    467             {
    468                 int32_t result;
    469                 CHECK(msg->findInt32("result", &result));
    470 
    471                 ALOGI("DESCRIBE completed with result %d (%s)",
    472                      result, strerror(-result));
    473 
    474                 if (result == OK) {
    475                     sp<RefBase> obj;
    476                     CHECK(msg->findObject("response", &obj));
    477                     sp<ARTSPResponse> response =
    478                         static_cast<ARTSPResponse *>(obj.get());
    479 
    480                     if (response->mStatusCode == 302) {
    481                         ssize_t i = response->mHeaders.indexOfKey("location");
    482                         CHECK_GE(i, 0);
    483 
    484                         mSessionURL = response->mHeaders.valueAt(i);
    485 
    486                         AString request;
    487                         request = "DESCRIBE ";
    488                         request.append(mSessionURL);
    489                         request.append(" RTSP/1.0\r\n");
    490                         request.append("Accept: application/sdp\r\n");
    491                         request.append("\r\n");
    492 
    493                         sp<AMessage> reply = new AMessage('desc', id());
    494                         mConn->sendRequest(request.c_str(), reply);
    495                         break;
    496                     }
    497 
    498                     if (response->mStatusCode != 200) {
    499                         result = UNKNOWN_ERROR;
    500                     } else if (response->mContent == NULL) {
    501                         result = ERROR_MALFORMED;
    502                         ALOGE("The response has no content.");
    503                     } else {
    504                         mSessionDesc = new ASessionDescription;
    505 
    506                         mSessionDesc->setTo(
    507                                 response->mContent->data(),
    508                                 response->mContent->size());
    509 
    510                         if (!mSessionDesc->isValid()) {
    511                             ALOGE("Failed to parse session description.");
    512                             result = ERROR_MALFORMED;
    513                         } else {
    514                             ssize_t i = response->mHeaders.indexOfKey("content-base");
    515                             if (i >= 0) {
    516                                 mBaseURL = response->mHeaders.valueAt(i);
    517                             } else {
    518                                 i = response->mHeaders.indexOfKey("content-location");
    519                                 if (i >= 0) {
    520                                     mBaseURL = response->mHeaders.valueAt(i);
    521                                 } else {
    522                                     mBaseURL = mSessionURL;
    523                                 }
    524                             }
    525 
    526                             mSeekable = !isLiveStream(mSessionDesc);
    527 
    528                             if (!mBaseURL.startsWith("rtsp://")) {
    529                                 // Some misbehaving servers specify a relative
    530                                 // URL in one of the locations above, combine
    531                                 // it with the absolute session URL to get
    532                                 // something usable...
    533 
    534                                 ALOGW("Server specified a non-absolute base URL"
    535                                      ", combining it with the session URL to "
    536                                      "get something usable...");
    537 
    538                                 AString tmp;
    539                                 CHECK(MakeURL(
    540                                             mSessionURL.c_str(),
    541                                             mBaseURL.c_str(),
    542                                             &tmp));
    543 
    544                                 mBaseURL = tmp;
    545                             }
    546 
    547                             mControlURL = getControlURL(mSessionDesc);
    548 
    549                             if (mSessionDesc->countTracks() < 2) {
    550                                 // There's no actual tracks in this session.
    551                                 // The first "track" is merely session meta
    552                                 // data.
    553 
    554                                 ALOGW("Session doesn't contain any playable "
    555                                      "tracks. Aborting.");
    556                                 result = ERROR_UNSUPPORTED;
    557                             } else {
    558                                 setupTrack(1);
    559                             }
    560                         }
    561                     }
    562                 }
    563 
    564                 if (result != OK) {
    565                     sp<AMessage> reply = new AMessage('disc', id());
    566                     mConn->disconnect(reply);
    567                 }
    568                 break;
    569             }
    570 
    571             case 'sdpl':
    572             {
    573                 int32_t result;
    574                 CHECK(msg->findInt32("result", &result));
    575 
    576                 ALOGI("SDP connection request completed with result %d (%s)",
    577                      result, strerror(-result));
    578 
    579                 if (result == OK) {
    580                     sp<RefBase> obj;
    581                     CHECK(msg->findObject("description", &obj));
    582                     mSessionDesc =
    583                         static_cast<ASessionDescription *>(obj.get());
    584 
    585                     if (!mSessionDesc->isValid()) {
    586                         ALOGE("Failed to parse session description.");
    587                         result = ERROR_MALFORMED;
    588                     } else {
    589                         mBaseURL = mSessionURL;
    590 
    591                         mSeekable = !isLiveStream(mSessionDesc);
    592 
    593                         mControlURL = getControlURL(mSessionDesc);
    594 
    595                         if (mSessionDesc->countTracks() < 2) {
    596                             // There's no actual tracks in this session.
    597                             // The first "track" is merely session meta
    598                             // data.
    599 
    600                             ALOGW("Session doesn't contain any playable "
    601                                  "tracks. Aborting.");
    602                             result = ERROR_UNSUPPORTED;
    603                         } else {
    604                             setupTrack(1);
    605                         }
    606                     }
    607                 }
    608 
    609                 if (result != OK) {
    610                     sp<AMessage> reply = new AMessage('disc', id());
    611                     mConn->disconnect(reply);
    612                 }
    613                 break;
    614             }
    615 
    616             case 'setu':
    617             {
    618                 size_t index;
    619                 CHECK(msg->findSize("index", &index));
    620 
    621                 TrackInfo *track = NULL;
    622                 size_t trackIndex;
    623                 if (msg->findSize("track-index", &trackIndex)) {
    624                     track = &mTracks.editItemAt(trackIndex);
    625                 }
    626 
    627                 int32_t result;
    628                 CHECK(msg->findInt32("result", &result));
    629 
    630                 ALOGI("SETUP(%d) completed with result %d (%s)",
    631                      index, result, strerror(-result));
    632 
    633                 if (result == OK) {
    634                     CHECK(track != NULL);
    635 
    636                     sp<RefBase> obj;
    637                     CHECK(msg->findObject("response", &obj));
    638                     sp<ARTSPResponse> response =
    639                         static_cast<ARTSPResponse *>(obj.get());
    640 
    641                     if (response->mStatusCode != 200) {
    642                         result = UNKNOWN_ERROR;
    643                     } else {
    644                         ssize_t i = response->mHeaders.indexOfKey("session");
    645                         CHECK_GE(i, 0);
    646 
    647                         mSessionID = response->mHeaders.valueAt(i);
    648 
    649                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    650                         AString timeoutStr;
    651                         if (GetAttribute(
    652                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
    653                             char *end;
    654                             unsigned long timeoutSecs =
    655                                 strtoul(timeoutStr.c_str(), &end, 10);
    656 
    657                             if (end == timeoutStr.c_str() || *end != '\0') {
    658                                 ALOGW("server specified malformed timeout '%s'",
    659                                      timeoutStr.c_str());
    660 
    661                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    662                             } else if (timeoutSecs < 15) {
    663                                 ALOGW("server specified too short a timeout "
    664                                      "(%lu secs), using default.",
    665                                      timeoutSecs);
    666 
    667                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    668                             } else {
    669                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
    670 
    671                                 ALOGI("server specified timeout of %lu secs.",
    672                                      timeoutSecs);
    673                             }
    674                         }
    675 
    676                         i = mSessionID.find(";");
    677                         if (i >= 0) {
    678                             // Remove options, i.e. ";timeout=90"
    679                             mSessionID.erase(i, mSessionID.size() - i);
    680                         }
    681 
    682                         sp<AMessage> notify = new AMessage('accu', id());
    683                         notify->setSize("track-index", trackIndex);
    684 
    685                         i = response->mHeaders.indexOfKey("transport");
    686                         CHECK_GE(i, 0);
    687 
    688                         if (!track->mUsingInterleavedTCP) {
    689                             AString transport = response->mHeaders.valueAt(i);
    690 
    691                             // We are going to continue even if we were
    692                             // unable to poke a hole into the firewall...
    693                             pokeAHole(
    694                                     track->mRTPSocket,
    695                                     track->mRTCPSocket,
    696                                     transport);
    697                         }
    698 
    699                         mRTPConn->addStream(
    700                                 track->mRTPSocket, track->mRTCPSocket,
    701                                 mSessionDesc, index,
    702                                 notify, track->mUsingInterleavedTCP);
    703 
    704                         mSetupTracksSuccessful = true;
    705                     }
    706                 }
    707 
    708                 if (result != OK) {
    709                     if (track) {
    710                         if (!track->mUsingInterleavedTCP) {
    711                             // Clear the tag
    712                             if (mUIDValid) {
    713                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
    714                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
    715                             }
    716 
    717                             close(track->mRTPSocket);
    718                             close(track->mRTCPSocket);
    719                         }
    720 
    721                         mTracks.removeItemsAt(trackIndex);
    722                     }
    723                 }
    724 
    725                 ++index;
    726                 if (index < mSessionDesc->countTracks()) {
    727                     setupTrack(index);
    728                 } else if (mSetupTracksSuccessful) {
    729                     ++mKeepAliveGeneration;
    730                     postKeepAlive();
    731 
    732                     AString request = "PLAY ";
    733                     request.append(mControlURL);
    734                     request.append(" RTSP/1.0\r\n");
    735 
    736                     request.append("Session: ");
    737                     request.append(mSessionID);
    738                     request.append("\r\n");
    739 
    740                     request.append("\r\n");
    741 
    742                     sp<AMessage> reply = new AMessage('play', id());
    743                     mConn->sendRequest(request.c_str(), reply);
    744                 } else {
    745                     sp<AMessage> reply = new AMessage('disc', id());
    746                     mConn->disconnect(reply);
    747                 }
    748                 break;
    749             }
    750 
    751             case 'play':
    752             {
    753                 int32_t result;
    754                 CHECK(msg->findInt32("result", &result));
    755 
    756                 ALOGI("PLAY completed with result %d (%s)",
    757                      result, strerror(-result));
    758 
    759                 if (result == OK) {
    760                     sp<RefBase> obj;
    761                     CHECK(msg->findObject("response", &obj));
    762                     sp<ARTSPResponse> response =
    763                         static_cast<ARTSPResponse *>(obj.get());
    764 
    765                     if (response->mStatusCode != 200) {
    766                         result = UNKNOWN_ERROR;
    767                     } else {
    768                         parsePlayResponse(response);
    769 
    770                         sp<AMessage> timeout = new AMessage('tiou', id());
    771                         mCheckTimeoutGeneration++;
    772                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
    773                         timeout->post(kStartupTimeoutUs);
    774                     }
    775                 }
    776 
    777                 if (result != OK) {
    778                     sp<AMessage> reply = new AMessage('disc', id());
    779                     mConn->disconnect(reply);
    780                 }
    781 
    782                 break;
    783             }
    784 
    785             case 'aliv':
    786             {
    787                 int32_t generation;
    788                 CHECK(msg->findInt32("generation", &generation));
    789 
    790                 if (generation != mKeepAliveGeneration) {
    791                     // obsolete event.
    792                     break;
    793                 }
    794 
    795                 AString request;
    796                 request.append("OPTIONS ");
    797                 request.append(mSessionURL);
    798                 request.append(" RTSP/1.0\r\n");
    799                 request.append("Session: ");
    800                 request.append(mSessionID);
    801                 request.append("\r\n");
    802                 request.append("\r\n");
    803 
    804                 sp<AMessage> reply = new AMessage('opts', id());
    805                 reply->setInt32("generation", mKeepAliveGeneration);
    806                 mConn->sendRequest(request.c_str(), reply);
    807                 break;
    808             }
    809 
    810             case 'opts':
    811             {
    812                 int32_t result;
    813                 CHECK(msg->findInt32("result", &result));
    814 
    815                 ALOGI("OPTIONS completed with result %d (%s)",
    816                      result, strerror(-result));
    817 
    818                 int32_t generation;
    819                 CHECK(msg->findInt32("generation", &generation));
    820 
    821                 if (generation != mKeepAliveGeneration) {
    822                     // obsolete event.
    823                     break;
    824                 }
    825 
    826                 postKeepAlive();
    827                 break;
    828             }
    829 
    830             case 'abor':
    831             {
    832                 for (size_t i = 0; i < mTracks.size(); ++i) {
    833                     TrackInfo *info = &mTracks.editItemAt(i);
    834 
    835                     if (!mFirstAccessUnit) {
    836                         postQueueEOS(i, ERROR_END_OF_STREAM);
    837                     }
    838 
    839                     if (!info->mUsingInterleavedTCP) {
    840                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
    841 
    842                         // Clear the tag
    843                         if (mUIDValid) {
    844                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
    845                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
    846                         }
    847 
    848                         close(info->mRTPSocket);
    849                         close(info->mRTCPSocket);
    850                     }
    851                 }
    852                 mTracks.clear();
    853                 mSetupTracksSuccessful = false;
    854                 mSeekPending = false;
    855                 mFirstAccessUnit = true;
    856                 mAllTracksHaveTime = false;
    857                 mNTPAnchorUs = -1;
    858                 mMediaAnchorUs = -1;
    859                 mNumAccessUnitsReceived = 0;
    860                 mReceivedFirstRTCPPacket = false;
    861                 mReceivedFirstRTPPacket = false;
    862                 mPausing = false;
    863                 mSeekable = true;
    864 
    865                 sp<AMessage> reply = new AMessage('tear', id());
    866 
    867                 int32_t reconnect;
    868                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    869                     reply->setInt32("reconnect", true);
    870                 }
    871 
    872                 AString request;
    873                 request = "TEARDOWN ";
    874 
    875                 // XXX should use aggregate url from SDP here...
    876                 request.append(mSessionURL);
    877                 request.append(" RTSP/1.0\r\n");
    878 
    879                 request.append("Session: ");
    880                 request.append(mSessionID);
    881                 request.append("\r\n");
    882 
    883                 request.append("\r\n");
    884 
    885                 mConn->sendRequest(request.c_str(), reply);
    886                 break;
    887             }
    888 
    889             case 'tear':
    890             {
    891                 int32_t result;
    892                 CHECK(msg->findInt32("result", &result));
    893 
    894                 ALOGI("TEARDOWN completed with result %d (%s)",
    895                      result, strerror(-result));
    896 
    897                 sp<AMessage> reply = new AMessage('disc', id());
    898 
    899                 int32_t reconnect;
    900                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    901                     reply->setInt32("reconnect", true);
    902                 }
    903 
    904                 mConn->disconnect(reply);
    905                 break;
    906             }
    907 
    908             case 'quit':
    909             {
    910                 sp<AMessage> msg = mNotify->dup();
    911                 msg->setInt32("what", kWhatDisconnected);
    912                 msg->setInt32("result", UNKNOWN_ERROR);
    913                 msg->post();
    914                 break;
    915             }
    916 
    917             case 'chek':
    918             {
    919                 int32_t generation;
    920                 CHECK(msg->findInt32("generation", &generation));
    921                 if (generation != mCheckGeneration) {
    922                     // This is an outdated message. Ignore.
    923                     break;
    924                 }
    925 
    926                 if (mNumAccessUnitsReceived == 0) {
    927 #if 1
    928                     ALOGI("stream ended? aborting.");
    929                     (new AMessage('abor', id()))->post();
    930                     break;
    931 #else
    932                     ALOGI("haven't seen an AU in a looong time.");
    933 #endif
    934                 }
    935 
    936                 mNumAccessUnitsReceived = 0;
    937                 msg->post(kAccessUnitTimeoutUs);
    938                 break;
    939             }
    940 
    941             case 'accu':
    942             {
    943                 int32_t timeUpdate;
    944                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
    945                     size_t trackIndex;
    946                     CHECK(msg->findSize("track-index", &trackIndex));
    947 
    948                     uint32_t rtpTime;
    949                     uint64_t ntpTime;
    950                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
    951                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
    952 
    953                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
    954                     break;
    955                 }
    956 
    957                 int32_t first;
    958                 if (msg->findInt32("first-rtcp", &first)) {
    959                     mReceivedFirstRTCPPacket = true;
    960                     break;
    961                 }
    962 
    963                 if (msg->findInt32("first-rtp", &first)) {
    964                     mReceivedFirstRTPPacket = true;
    965                     break;
    966                 }
    967 
    968                 ++mNumAccessUnitsReceived;
    969                 postAccessUnitTimeoutCheck();
    970 
    971                 size_t trackIndex;
    972                 CHECK(msg->findSize("track-index", &trackIndex));
    973 
    974                 if (trackIndex >= mTracks.size()) {
    975                     ALOGV("late packets ignored.");
    976                     break;
    977                 }
    978 
    979                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
    980 
    981                 int32_t eos;
    982                 if (msg->findInt32("eos", &eos)) {
    983                     ALOGI("received BYE on track index %d", trackIndex);
    984                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
    985                         ALOGI("No time established => fake existing data");
    986 
    987                         track->mEOSReceived = true;
    988                         mTryFakeRTCP = true;
    989                         mReceivedFirstRTCPPacket = true;
    990                         fakeTimestamps();
    991                     } else {
    992                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
    993                     }
    994                     return;
    995                 }
    996 
    997                 sp<ABuffer> accessUnit;
    998                 CHECK(msg->findBuffer("access-unit", &accessUnit));
    999 
   1000                 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
   1001 
   1002                 if (mSeekPending) {
   1003                     ALOGV("we're seeking, dropping stale packet.");
   1004                     break;
   1005                 }
   1006 
   1007                 if (seqNum < track->mFirstSeqNumInSegment) {
   1008                     ALOGV("dropping stale access-unit (%d < %d)",
   1009                          seqNum, track->mFirstSeqNumInSegment);
   1010                     break;
   1011                 }
   1012 
   1013                 if (track->mNewSegment) {
   1014                     track->mNewSegment = false;
   1015                 }
   1016 
   1017                 onAccessUnitComplete(trackIndex, accessUnit);
   1018                 break;
   1019             }
   1020 
   1021             case 'paus':
   1022             {
   1023                 int32_t generation;
   1024                 CHECK(msg->findInt32("pausecheck", &generation));
   1025                 if (generation != mPauseGeneration) {
   1026                     ALOGV("Ignoring outdated pause message.");
   1027                     break;
   1028                 }
   1029 
   1030                 if (!mSeekable) {
   1031                     ALOGW("This is a live stream, ignoring pause request.");
   1032                     break;
   1033                 }
   1034                 mCheckPending = true;
   1035                 ++mCheckGeneration;
   1036                 mPausing = true;
   1037 
   1038                 AString request = "PAUSE ";
   1039                 request.append(mControlURL);
   1040                 request.append(" RTSP/1.0\r\n");
   1041 
   1042                 request.append("Session: ");
   1043                 request.append(mSessionID);
   1044                 request.append("\r\n");
   1045 
   1046                 request.append("\r\n");
   1047 
   1048                 sp<AMessage> reply = new AMessage('pau2', id());
   1049                 mConn->sendRequest(request.c_str(), reply);
   1050                 break;
   1051             }
   1052 
   1053             case 'pau2':
   1054             {
   1055                 int32_t result;
   1056                 CHECK(msg->findInt32("result", &result));
   1057                 mCheckTimeoutGeneration++;
   1058 
   1059                 ALOGI("PAUSE completed with result %d (%s)",
   1060                      result, strerror(-result));
   1061                 break;
   1062             }
   1063 
   1064             case 'resu':
   1065             {
   1066                 if (mPausing && mSeekPending) {
   1067                     // If seeking, Play will be sent from see1 instead
   1068                     break;
   1069                 }
   1070 
   1071                 if (!mPausing) {
   1072                     // Dont send PLAY if we have not paused
   1073                     break;
   1074                 }
   1075                 AString request = "PLAY ";
   1076                 request.append(mControlURL);
   1077                 request.append(" RTSP/1.0\r\n");
   1078 
   1079                 request.append("Session: ");
   1080                 request.append(mSessionID);
   1081                 request.append("\r\n");
   1082 
   1083                 request.append("\r\n");
   1084 
   1085                 sp<AMessage> reply = new AMessage('res2', id());
   1086                 mConn->sendRequest(request.c_str(), reply);
   1087                 break;
   1088             }
   1089 
   1090             case 'res2':
   1091             {
   1092                 int32_t result;
   1093                 CHECK(msg->findInt32("result", &result));
   1094 
   1095                 ALOGI("PLAY completed with result %d (%s)",
   1096                      result, strerror(-result));
   1097 
   1098                 mCheckPending = false;
   1099                 postAccessUnitTimeoutCheck();
   1100 
   1101                 if (result == OK) {
   1102                     sp<RefBase> obj;
   1103                     CHECK(msg->findObject("response", &obj));
   1104                     sp<ARTSPResponse> response =
   1105                         static_cast<ARTSPResponse *>(obj.get());
   1106 
   1107                     if (response->mStatusCode != 200) {
   1108                         result = UNKNOWN_ERROR;
   1109                     } else {
   1110                         parsePlayResponse(response);
   1111 
   1112                         // Post new timeout in order to make sure to use
   1113                         // fake timestamps if no new Sender Reports arrive
   1114                         sp<AMessage> timeout = new AMessage('tiou', id());
   1115                         mCheckTimeoutGeneration++;
   1116                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1117                         timeout->post(kStartupTimeoutUs);
   1118                     }
   1119                 }
   1120 
   1121                 if (result != OK) {
   1122                     ALOGE("resume failed, aborting.");
   1123                     (new AMessage('abor', id()))->post();
   1124                 }
   1125 
   1126                 mPausing = false;
   1127                 break;
   1128             }
   1129 
   1130             case 'seek':
   1131             {
   1132                 if (!mSeekable) {
   1133                     ALOGW("This is a live stream, ignoring seek request.");
   1134 
   1135                     sp<AMessage> msg = mNotify->dup();
   1136                     msg->setInt32("what", kWhatSeekDone);
   1137                     msg->post();
   1138                     break;
   1139                 }
   1140 
   1141                 int64_t timeUs;
   1142                 CHECK(msg->findInt64("time", &timeUs));
   1143 
   1144                 mSeekPending = true;
   1145 
   1146                 // Disable the access unit timeout until we resumed
   1147                 // playback again.
   1148                 mCheckPending = true;
   1149                 ++mCheckGeneration;
   1150 
   1151                 sp<AMessage> reply = new AMessage('see1', id());
   1152                 reply->setInt64("time", timeUs);
   1153 
   1154                 if (mPausing) {
   1155                     // PAUSE already sent
   1156                     ALOGI("Pause already sent");
   1157                     reply->post();
   1158                     break;
   1159                 }
   1160                 AString request = "PAUSE ";
   1161                 request.append(mControlURL);
   1162                 request.append(" RTSP/1.0\r\n");
   1163 
   1164                 request.append("Session: ");
   1165                 request.append(mSessionID);
   1166                 request.append("\r\n");
   1167 
   1168                 request.append("\r\n");
   1169 
   1170                 mConn->sendRequest(request.c_str(), reply);
   1171                 break;
   1172             }
   1173 
   1174             case 'see1':
   1175             {
   1176                 // Session is paused now.
   1177                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1178                     TrackInfo *info = &mTracks.editItemAt(i);
   1179 
   1180                     postQueueSeekDiscontinuity(i);
   1181                     info->mEOSReceived = false;
   1182 
   1183                     info->mRTPAnchor = 0;
   1184                     info->mNTPAnchorUs = -1;
   1185                 }
   1186 
   1187                 mAllTracksHaveTime = false;
   1188                 mNTPAnchorUs = -1;
   1189 
   1190                 // Start new timeoutgeneration to avoid getting timeout
   1191                 // before PLAY response arrive
   1192                 sp<AMessage> timeout = new AMessage('tiou', id());
   1193                 mCheckTimeoutGeneration++;
   1194                 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1195                 timeout->post(kStartupTimeoutUs);
   1196 
   1197                 int64_t timeUs;
   1198                 CHECK(msg->findInt64("time", &timeUs));
   1199 
   1200                 AString request = "PLAY ";
   1201                 request.append(mControlURL);
   1202                 request.append(" RTSP/1.0\r\n");
   1203 
   1204                 request.append("Session: ");
   1205                 request.append(mSessionID);
   1206                 request.append("\r\n");
   1207 
   1208                 request.append(
   1209                         StringPrintf(
   1210                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
   1211 
   1212                 request.append("\r\n");
   1213 
   1214                 sp<AMessage> reply = new AMessage('see2', id());
   1215                 mConn->sendRequest(request.c_str(), reply);
   1216                 break;
   1217             }
   1218 
   1219             case 'see2':
   1220             {
   1221                 if (mTracks.size() == 0) {
   1222                     // We have already hit abor, break
   1223                     break;
   1224                 }
   1225 
   1226                 int32_t result;
   1227                 CHECK(msg->findInt32("result", &result));
   1228 
   1229                 ALOGI("PLAY completed with result %d (%s)",
   1230                      result, strerror(-result));
   1231 
   1232                 mCheckPending = false;
   1233                 postAccessUnitTimeoutCheck();
   1234 
   1235                 if (result == OK) {
   1236                     sp<RefBase> obj;
   1237                     CHECK(msg->findObject("response", &obj));
   1238                     sp<ARTSPResponse> response =
   1239                         static_cast<ARTSPResponse *>(obj.get());
   1240 
   1241                     if (response->mStatusCode != 200) {
   1242                         result = UNKNOWN_ERROR;
   1243                     } else {
   1244                         parsePlayResponse(response);
   1245 
   1246                         // Post new timeout in order to make sure to use
   1247                         // fake timestamps if no new Sender Reports arrive
   1248                         sp<AMessage> timeout = new AMessage('tiou', id());
   1249                         mCheckTimeoutGeneration++;
   1250                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1251                         timeout->post(kStartupTimeoutUs);
   1252 
   1253                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
   1254                         CHECK_GE(i, 0);
   1255 
   1256                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
   1257 
   1258                         ALOGI("seek completed.");
   1259                     }
   1260                 }
   1261 
   1262                 if (result != OK) {
   1263                     ALOGE("seek failed, aborting.");
   1264                     (new AMessage('abor', id()))->post();
   1265                 }
   1266 
   1267                 mPausing = false;
   1268                 mSeekPending = false;
   1269 
   1270                 sp<AMessage> msg = mNotify->dup();
   1271                 msg->setInt32("what", kWhatSeekDone);
   1272                 msg->post();
   1273                 break;
   1274             }
   1275 
   1276             case 'biny':
   1277             {
   1278                 sp<ABuffer> buffer;
   1279                 CHECK(msg->findBuffer("buffer", &buffer));
   1280 
   1281                 int32_t index;
   1282                 CHECK(buffer->meta()->findInt32("index", &index));
   1283 
   1284                 mRTPConn->injectPacket(index, buffer);
   1285                 break;
   1286             }
   1287 
   1288             case 'tiou':
   1289             {
   1290                 int32_t timeoutGenerationCheck;
   1291                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
   1292                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
   1293                     // This is an outdated message. Ignore.
   1294                     // This typically happens if a lot of seeks are
   1295                     // performed, since new timeout messages now are
   1296                     // posted at seek as well.
   1297                     break;
   1298                 }
   1299                 if (!mReceivedFirstRTCPPacket) {
   1300                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
   1301                         ALOGW("We received RTP packets but no RTCP packets, "
   1302                              "using fake timestamps.");
   1303 
   1304                         mTryFakeRTCP = true;
   1305 
   1306                         mReceivedFirstRTCPPacket = true;
   1307 
   1308                         fakeTimestamps();
   1309                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
   1310                         ALOGW("Never received any data, switching transports.");
   1311 
   1312                         mTryTCPInterleaving = true;
   1313 
   1314                         sp<AMessage> msg = new AMessage('abor', id());
   1315                         msg->setInt32("reconnect", true);
   1316                         msg->post();
   1317                     } else {
   1318                         ALOGW("Never received any data, disconnecting.");
   1319                         (new AMessage('abor', id()))->post();
   1320                     }
   1321                 } else {
   1322                     if (!mAllTracksHaveTime) {
   1323                         ALOGW("We received some RTCP packets, but time "
   1324                               "could not be established on all tracks, now "
   1325                               "using fake timestamps");
   1326 
   1327                         fakeTimestamps();
   1328                     }
   1329                 }
   1330                 break;
   1331             }
   1332 
   1333             default:
   1334                 TRESPASS();
   1335                 break;
   1336         }
   1337     }
   1338 
   1339     void postKeepAlive() {
   1340         sp<AMessage> msg = new AMessage('aliv', id());
   1341         msg->setInt32("generation", mKeepAliveGeneration);
   1342         msg->post((mKeepAliveTimeoutUs * 9) / 10);
   1343     }
   1344 
   1345     void postAccessUnitTimeoutCheck() {
   1346         if (mCheckPending) {
   1347             return;
   1348         }
   1349 
   1350         mCheckPending = true;
   1351         sp<AMessage> check = new AMessage('chek', id());
   1352         check->setInt32("generation", mCheckGeneration);
   1353         check->post(kAccessUnitTimeoutUs);
   1354     }
   1355 
   1356     static void SplitString(
   1357             const AString &s, const char *separator, List<AString> *items) {
   1358         items->clear();
   1359         size_t start = 0;
   1360         while (start < s.size()) {
   1361             ssize_t offset = s.find(separator, start);
   1362 
   1363             if (offset < 0) {
   1364                 items->push_back(AString(s, start, s.size() - start));
   1365                 break;
   1366             }
   1367 
   1368             items->push_back(AString(s, start, offset - start));
   1369             start = offset + strlen(separator);
   1370         }
   1371     }
   1372 
   1373     void parsePlayResponse(const sp<ARTSPResponse> &response) {
   1374         if (mTracks.size() == 0) {
   1375             ALOGV("parsePlayResponse: late packets ignored.");
   1376             return;
   1377         }
   1378 
   1379         ssize_t i = response->mHeaders.indexOfKey("range");
   1380         if (i < 0) {
   1381             // Server doesn't even tell use what range it is going to
   1382             // play, therefore we won't support seeking.
   1383             return;
   1384         }
   1385 
   1386         AString range = response->mHeaders.valueAt(i);
   1387         ALOGV("Range: %s", range.c_str());
   1388 
   1389         AString val;
   1390         CHECK(GetAttribute(range.c_str(), "npt", &val));
   1391 
   1392         float npt1, npt2;
   1393         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
   1394             // This is a live stream and therefore not seekable.
   1395 
   1396             ALOGI("This is a live stream");
   1397             return;
   1398         }
   1399 
   1400         i = response->mHeaders.indexOfKey("rtp-info");
   1401         CHECK_GE(i, 0);
   1402 
   1403         AString rtpInfo = response->mHeaders.valueAt(i);
   1404         List<AString> streamInfos;
   1405         SplitString(rtpInfo, ",", &streamInfos);
   1406 
   1407         int n = 1;
   1408         for (List<AString>::iterator it = streamInfos.begin();
   1409              it != streamInfos.end(); ++it) {
   1410             (*it).trim();
   1411             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
   1412 
   1413             CHECK(GetAttribute((*it).c_str(), "url", &val));
   1414 
   1415             size_t trackIndex = 0;
   1416             while (trackIndex < mTracks.size()
   1417                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
   1418                 ++trackIndex;
   1419             }
   1420             CHECK_LT(trackIndex, mTracks.size());
   1421 
   1422             CHECK(GetAttribute((*it).c_str(), "seq", &val));
   1423 
   1424             char *end;
   1425             unsigned long seq = strtoul(val.c_str(), &end, 10);
   1426 
   1427             TrackInfo *info = &mTracks.editItemAt(trackIndex);
   1428             info->mFirstSeqNumInSegment = seq;
   1429             info->mNewSegment = true;
   1430 
   1431             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
   1432 
   1433             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
   1434 
   1435             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
   1436 
   1437             info->mNormalPlayTimeRTP = rtpTime;
   1438             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
   1439 
   1440             if (!mFirstAccessUnit) {
   1441                 postNormalPlayTimeMapping(
   1442                         trackIndex,
   1443                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1444             }
   1445 
   1446             ++n;
   1447         }
   1448     }
   1449 
   1450     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
   1451         CHECK_GE(index, 0u);
   1452         CHECK_LT(index, mTracks.size());
   1453 
   1454         const TrackInfo &info = mTracks.itemAt(index);
   1455 
   1456         *timeScale = info.mTimeScale;
   1457 
   1458         return info.mPacketSource->getFormat();
   1459     }
   1460 
   1461     size_t countTracks() const {
   1462         return mTracks.size();
   1463     }
   1464 
   1465 private:
   1466     struct TrackInfo {
   1467         AString mURL;
   1468         int mRTPSocket;
   1469         int mRTCPSocket;
   1470         bool mUsingInterleavedTCP;
   1471         uint32_t mFirstSeqNumInSegment;
   1472         bool mNewSegment;
   1473 
   1474         uint32_t mRTPAnchor;
   1475         int64_t mNTPAnchorUs;
   1476         int32_t mTimeScale;
   1477         bool mEOSReceived;
   1478 
   1479         uint32_t mNormalPlayTimeRTP;
   1480         int64_t mNormalPlayTimeUs;
   1481 
   1482         sp<APacketSource> mPacketSource;
   1483 
   1484         // Stores packets temporarily while no notion of time
   1485         // has been established yet.
   1486         List<sp<ABuffer> > mPackets;
   1487     };
   1488 
   1489     sp<AMessage> mNotify;
   1490     bool mUIDValid;
   1491     uid_t mUID;
   1492     sp<ALooper> mNetLooper;
   1493     sp<ARTSPConnection> mConn;
   1494     sp<ARTPConnection> mRTPConn;
   1495     sp<ASessionDescription> mSessionDesc;
   1496     AString mOriginalSessionURL;  // This one still has user:pass@
   1497     AString mSessionURL;
   1498     AString mSessionHost;
   1499     AString mBaseURL;
   1500     AString mControlURL;
   1501     AString mSessionID;
   1502     bool mSetupTracksSuccessful;
   1503     bool mSeekPending;
   1504     bool mFirstAccessUnit;
   1505 
   1506     bool mAllTracksHaveTime;
   1507     int64_t mNTPAnchorUs;
   1508     int64_t mMediaAnchorUs;
   1509     int64_t mLastMediaTimeUs;
   1510 
   1511     int64_t mNumAccessUnitsReceived;
   1512     bool mCheckPending;
   1513     int32_t mCheckGeneration;
   1514     int32_t mCheckTimeoutGeneration;
   1515     bool mTryTCPInterleaving;
   1516     bool mTryFakeRTCP;
   1517     bool mReceivedFirstRTCPPacket;
   1518     bool mReceivedFirstRTPPacket;
   1519     bool mSeekable;
   1520     int64_t mKeepAliveTimeoutUs;
   1521     int32_t mKeepAliveGeneration;
   1522     bool mPausing;
   1523     int32_t mPauseGeneration;
   1524 
   1525     Vector<TrackInfo> mTracks;
   1526 
   1527     void setupTrack(size_t index) {
   1528         sp<APacketSource> source =
   1529             new APacketSource(mSessionDesc, index);
   1530 
   1531         if (source->initCheck() != OK) {
   1532             ALOGW("Unsupported format. Ignoring track #%d.", index);
   1533 
   1534             sp<AMessage> reply = new AMessage('setu', id());
   1535             reply->setSize("index", index);
   1536             reply->setInt32("result", ERROR_UNSUPPORTED);
   1537             reply->post();
   1538             return;
   1539         }
   1540 
   1541         AString url;
   1542         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
   1543 
   1544         AString trackURL;
   1545         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
   1546 
   1547         mTracks.push(TrackInfo());
   1548         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
   1549         info->mURL = trackURL;
   1550         info->mPacketSource = source;
   1551         info->mUsingInterleavedTCP = false;
   1552         info->mFirstSeqNumInSegment = 0;
   1553         info->mNewSegment = true;
   1554         info->mRTPAnchor = 0;
   1555         info->mNTPAnchorUs = -1;
   1556         info->mNormalPlayTimeRTP = 0;
   1557         info->mNormalPlayTimeUs = 0ll;
   1558 
   1559         unsigned long PT;
   1560         AString formatDesc;
   1561         AString formatParams;
   1562         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
   1563 
   1564         int32_t timescale;
   1565         int32_t numChannels;
   1566         ASessionDescription::ParseFormatDesc(
   1567                 formatDesc.c_str(), &timescale, &numChannels);
   1568 
   1569         info->mTimeScale = timescale;
   1570         info->mEOSReceived = false;
   1571 
   1572         ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
   1573 
   1574         AString request = "SETUP ";
   1575         request.append(trackURL);
   1576         request.append(" RTSP/1.0\r\n");
   1577 
   1578         if (mTryTCPInterleaving) {
   1579             size_t interleaveIndex = 2 * (mTracks.size() - 1);
   1580             info->mUsingInterleavedTCP = true;
   1581             info->mRTPSocket = interleaveIndex;
   1582             info->mRTCPSocket = interleaveIndex + 1;
   1583 
   1584             request.append("Transport: RTP/AVP/TCP;interleaved=");
   1585             request.append(interleaveIndex);
   1586             request.append("-");
   1587             request.append(interleaveIndex + 1);
   1588         } else {
   1589             unsigned rtpPort;
   1590             ARTPConnection::MakePortPair(
   1591                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
   1592 
   1593             if (mUIDValid) {
   1594                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
   1595                                                 (uint32_t)*(uint32_t*) "RTP_");
   1596                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
   1597                                                 (uint32_t)*(uint32_t*) "RTP_");
   1598             }
   1599 
   1600             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
   1601             request.append(rtpPort);
   1602             request.append("-");
   1603             request.append(rtpPort + 1);
   1604         }
   1605 
   1606         request.append("\r\n");
   1607 
   1608         if (index > 1) {
   1609             request.append("Session: ");
   1610             request.append(mSessionID);
   1611             request.append("\r\n");
   1612         }
   1613 
   1614         request.append("\r\n");
   1615 
   1616         sp<AMessage> reply = new AMessage('setu', id());
   1617         reply->setSize("index", index);
   1618         reply->setSize("track-index", mTracks.size() - 1);
   1619         mConn->sendRequest(request.c_str(), reply);
   1620     }
   1621 
   1622     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
   1623         out->clear();
   1624 
   1625         if (strncasecmp("rtsp://", baseURL, 7)) {
   1626             // Base URL must be absolute
   1627             return false;
   1628         }
   1629 
   1630         if (!strncasecmp("rtsp://", url, 7)) {
   1631             // "url" is already an absolute URL, ignore base URL.
   1632             out->setTo(url);
   1633             return true;
   1634         }
   1635 
   1636         size_t n = strlen(baseURL);
   1637         if (baseURL[n - 1] == '/') {
   1638             out->setTo(baseURL);
   1639             out->append(url);
   1640         } else {
   1641             const char *slashPos = strrchr(baseURL, '/');
   1642 
   1643             if (slashPos > &baseURL[6]) {
   1644                 out->setTo(baseURL, slashPos - baseURL);
   1645             } else {
   1646                 out->setTo(baseURL);
   1647             }
   1648 
   1649             out->append("/");
   1650             out->append(url);
   1651         }
   1652 
   1653         return true;
   1654     }
   1655 
   1656     void fakeTimestamps() {
   1657         mNTPAnchorUs = -1ll;
   1658         for (size_t i = 0; i < mTracks.size(); ++i) {
   1659             onTimeUpdate(i, 0, 0ll);
   1660         }
   1661     }
   1662 
   1663     bool dataReceivedOnAllChannels() {
   1664         TrackInfo *track;
   1665         for (size_t i = 0; i < mTracks.size(); ++i) {
   1666             track = &mTracks.editItemAt(i);
   1667             if (track->mPackets.empty()) {
   1668                 return false;
   1669             }
   1670         }
   1671         return true;
   1672     }
   1673 
   1674     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
   1675         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
   1676              trackIndex, rtpTime, ntpTime);
   1677 
   1678         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
   1679 
   1680         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1681 
   1682         track->mRTPAnchor = rtpTime;
   1683         track->mNTPAnchorUs = ntpTimeUs;
   1684 
   1685         if (mNTPAnchorUs < 0) {
   1686             mNTPAnchorUs = ntpTimeUs;
   1687             mMediaAnchorUs = mLastMediaTimeUs;
   1688         }
   1689 
   1690         if (!mAllTracksHaveTime) {
   1691             bool allTracksHaveTime = true;
   1692             for (size_t i = 0; i < mTracks.size(); ++i) {
   1693                 TrackInfo *track = &mTracks.editItemAt(i);
   1694                 if (track->mNTPAnchorUs < 0) {
   1695                     allTracksHaveTime = false;
   1696                     break;
   1697                 }
   1698             }
   1699             if (allTracksHaveTime) {
   1700                 mAllTracksHaveTime = true;
   1701                 ALOGI("Time now established for all tracks.");
   1702             }
   1703         }
   1704         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
   1705             // Time is now established, lets start timestamping immediately
   1706             for (size_t i = 0; i < mTracks.size(); ++i) {
   1707                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
   1708                 while (!trackInfo->mPackets.empty()) {
   1709                     sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
   1710                     trackInfo->mPackets.erase(trackInfo->mPackets.begin());
   1711 
   1712                     if (addMediaTimestamp(i, trackInfo, accessUnit)) {
   1713                         postQueueAccessUnit(i, accessUnit);
   1714                     }
   1715                 }
   1716             }
   1717             for (size_t i = 0; i < mTracks.size(); ++i) {
   1718                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
   1719                 if (trackInfo->mEOSReceived) {
   1720                     postQueueEOS(i, ERROR_END_OF_STREAM);
   1721                     trackInfo->mEOSReceived = false;
   1722                 }
   1723             }
   1724         }
   1725     }
   1726 
   1727     void onAccessUnitComplete(
   1728             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
   1729         ALOGV("onAccessUnitComplete track %d", trackIndex);
   1730 
   1731         if (mFirstAccessUnit) {
   1732             sp<AMessage> msg = mNotify->dup();
   1733             msg->setInt32("what", kWhatConnected);
   1734             msg->post();
   1735 
   1736             if (mSeekable) {
   1737                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1738                     TrackInfo *info = &mTracks.editItemAt(i);
   1739 
   1740                     postNormalPlayTimeMapping(
   1741                             i,
   1742                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1743                 }
   1744             }
   1745 
   1746             mFirstAccessUnit = false;
   1747         }
   1748 
   1749         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1750 
   1751         if (!mAllTracksHaveTime) {
   1752             ALOGV("storing accessUnit, no time established yet");
   1753             track->mPackets.push_back(accessUnit);
   1754             return;
   1755         }
   1756 
   1757         while (!track->mPackets.empty()) {
   1758             sp<ABuffer> accessUnit = *track->mPackets.begin();
   1759             track->mPackets.erase(track->mPackets.begin());
   1760 
   1761             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1762                 postQueueAccessUnit(trackIndex, accessUnit);
   1763             }
   1764         }
   1765 
   1766         if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1767             postQueueAccessUnit(trackIndex, accessUnit);
   1768         }
   1769 
   1770         if (track->mEOSReceived) {
   1771             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
   1772             track->mEOSReceived = false;
   1773         }
   1774     }
   1775 
   1776     bool addMediaTimestamp(
   1777             int32_t trackIndex, const TrackInfo *track,
   1778             const sp<ABuffer> &accessUnit) {
   1779         uint32_t rtpTime;
   1780         CHECK(accessUnit->meta()->findInt32(
   1781                     "rtp-time", (int32_t *)&rtpTime));
   1782 
   1783         int64_t relRtpTimeUs =
   1784             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
   1785                 / track->mTimeScale;
   1786 
   1787         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
   1788 
   1789         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
   1790 
   1791         if (mediaTimeUs > mLastMediaTimeUs) {
   1792             mLastMediaTimeUs = mediaTimeUs;
   1793         }
   1794 
   1795         if (mediaTimeUs < 0) {
   1796             ALOGV("dropping early accessUnit.");
   1797             return false;
   1798         }
   1799 
   1800         ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
   1801              trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
   1802 
   1803         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
   1804 
   1805         return true;
   1806     }
   1807 
   1808     void postQueueAccessUnit(
   1809             size_t trackIndex, const sp<ABuffer> &accessUnit) {
   1810         sp<AMessage> msg = mNotify->dup();
   1811         msg->setInt32("what", kWhatAccessUnit);
   1812         msg->setSize("trackIndex", trackIndex);
   1813         msg->setBuffer("accessUnit", accessUnit);
   1814         msg->post();
   1815     }
   1816 
   1817     void postQueueEOS(size_t trackIndex, status_t finalResult) {
   1818         sp<AMessage> msg = mNotify->dup();
   1819         msg->setInt32("what", kWhatEOS);
   1820         msg->setSize("trackIndex", trackIndex);
   1821         msg->setInt32("finalResult", finalResult);
   1822         msg->post();
   1823     }
   1824 
   1825     void postQueueSeekDiscontinuity(size_t trackIndex) {
   1826         sp<AMessage> msg = mNotify->dup();
   1827         msg->setInt32("what", kWhatSeekDiscontinuity);
   1828         msg->setSize("trackIndex", trackIndex);
   1829         msg->post();
   1830     }
   1831 
   1832     void postNormalPlayTimeMapping(
   1833             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
   1834         sp<AMessage> msg = mNotify->dup();
   1835         msg->setInt32("what", kWhatNormalPlayTimeMapping);
   1836         msg->setSize("trackIndex", trackIndex);
   1837         msg->setInt32("rtpTime", rtpTime);
   1838         msg->setInt64("nptUs", nptUs);
   1839         msg->post();
   1840     }
   1841 
   1842     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
   1843 };
   1844 
   1845 }  // namespace android
   1846 
   1847 #endif  // MY_HANDLER_H_
   1848