Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #ifndef MY_HANDLER_H_
     18 
     19 #define MY_HANDLER_H_
     20 
     21 //#define LOG_NDEBUG 0
     22 #define LOG_TAG "MyHandler"
     23 #include <utils/Log.h>
     24 
     25 #include "APacketSource.h"
     26 #include "ARTPConnection.h"
     27 #include "ARTSPConnection.h"
     28 #include "ASessionDescription.h"
     29 
     30 #include <ctype.h>
     31 
     32 #include <media/stagefright/foundation/ABuffer.h>
     33 #include <media/stagefright/foundation/ADebug.h>
     34 #include <media/stagefright/foundation/ALooper.h>
     35 #include <media/stagefright/foundation/AMessage.h>
     36 #include <media/stagefright/MediaErrors.h>
     37 #include <media/stagefright/Utils.h>
     38 
     39 #include <arpa/inet.h>
     40 #include <sys/socket.h>
     41 #include <netdb.h>
     42 
     43 #include "HTTPBase.h"
     44 
     45 // If no access units are received within 5 secs, assume that the rtp
     46 // stream has ended and signal end of stream.
     47 static int64_t kAccessUnitTimeoutUs = 10000000ll;
     48 
     49 // If no access units arrive for the first 10 secs after starting the
     50 // stream, assume none ever will and signal EOS or switch transports.
     51 static int64_t kStartupTimeoutUs = 10000000ll;
     52 
     53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
     54 
     55 static int64_t kPauseDelayUs = 3000000ll;
     56 
     57 namespace android {
     58 
     59 static bool GetAttribute(const char *s, const char *key, AString *value) {
     60     value->clear();
     61 
     62     size_t keyLen = strlen(key);
     63 
     64     for (;;) {
     65         while (isspace(*s)) {
     66             ++s;
     67         }
     68 
     69         const char *colonPos = strchr(s, ';');
     70 
     71         size_t len =
     72             (colonPos == NULL) ? strlen(s) : colonPos - s;
     73 
     74         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
     75             value->setTo(&s[keyLen + 1], len - keyLen - 1);
     76             return true;
     77         }
     78 
     79         if (colonPos == NULL) {
     80             return false;
     81         }
     82 
     83         s = colonPos + 1;
     84     }
     85 }
     86 
     87 struct MyHandler : public AHandler {
     88     enum {
     89         kWhatConnected                  = 'conn',
     90         kWhatDisconnected               = 'disc',
     91         kWhatSeekDone                   = 'sdon',
     92 
     93         kWhatAccessUnit                 = 'accU',
     94         kWhatEOS                        = 'eos!',
     95         kWhatSeekDiscontinuity          = 'seeD',
     96         kWhatNormalPlayTimeMapping      = 'nptM',
     97     };
     98 
     99     MyHandler(
    100             const char *url,
    101             const sp<AMessage> &notify,
    102             bool uidValid = false, uid_t uid = 0)
    103         : mNotify(notify),
    104           mUIDValid(uidValid),
    105           mUID(uid),
    106           mNetLooper(new ALooper),
    107           mConn(new ARTSPConnection(mUIDValid, mUID)),
    108           mRTPConn(new ARTPConnection),
    109           mOriginalSessionURL(url),
    110           mSessionURL(url),
    111           mSetupTracksSuccessful(false),
    112           mSeekPending(false),
    113           mFirstAccessUnit(true),
    114           mAllTracksHaveTime(false),
    115           mNTPAnchorUs(-1),
    116           mMediaAnchorUs(-1),
    117           mLastMediaTimeUs(0),
    118           mNumAccessUnitsReceived(0),
    119           mCheckPending(false),
    120           mCheckGeneration(0),
    121           mCheckTimeoutGeneration(0),
    122           mTryTCPInterleaving(false),
    123           mTryFakeRTCP(false),
    124           mReceivedFirstRTCPPacket(false),
    125           mReceivedFirstRTPPacket(false),
    126           mSeekable(true),
    127           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
    128           mKeepAliveGeneration(0),
    129           mPausing(false),
    130           mPauseGeneration(0),
    131           mPlayResponseParsed(false) {
    132         mNetLooper->setName("rtsp net");
    133         mNetLooper->start(false /* runOnCallingThread */,
    134                           false /* canCallJava */,
    135                           PRIORITY_HIGHEST);
    136 
    137         // Strip any authentication info from the session url, we don't
    138         // want to transmit user/pass in cleartext.
    139         AString host, path, user, pass;
    140         unsigned port;
    141         CHECK(ARTSPConnection::ParseURL(
    142                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
    143 
    144         if (user.size() > 0) {
    145             mSessionURL.clear();
    146             mSessionURL.append("rtsp://");
    147             mSessionURL.append(host);
    148             mSessionURL.append(":");
    149             mSessionURL.append(StringPrintf("%u", port));
    150             mSessionURL.append(path);
    151 
    152             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
    153         }
    154 
    155         mSessionHost = host;
    156     }
    157 
    158     void connect() {
    159         looper()->registerHandler(mConn);
    160         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    161 
    162         sp<AMessage> notify = new AMessage('biny', id());
    163         mConn->observeBinaryData(notify);
    164 
    165         sp<AMessage> reply = new AMessage('conn', id());
    166         mConn->connect(mOriginalSessionURL.c_str(), reply);
    167     }
    168 
    169     void loadSDP(const sp<ASessionDescription>& desc) {
    170         looper()->registerHandler(mConn);
    171         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    172 
    173         sp<AMessage> notify = new AMessage('biny', id());
    174         mConn->observeBinaryData(notify);
    175 
    176         sp<AMessage> reply = new AMessage('sdpl', id());
    177         reply->setObject("description", desc);
    178         mConn->connect(mOriginalSessionURL.c_str(), reply);
    179     }
    180 
    181     AString getControlURL(sp<ASessionDescription> desc) {
    182         AString sessionLevelControlURL;
    183         if (mSessionDesc->findAttribute(
    184                 0,
    185                 "a=control",
    186                 &sessionLevelControlURL)) {
    187             if (sessionLevelControlURL.compare("*") == 0) {
    188                 return mBaseURL;
    189             } else {
    190                 AString controlURL;
    191                 CHECK(MakeURL(
    192                         mBaseURL.c_str(),
    193                         sessionLevelControlURL.c_str(),
    194                         &controlURL));
    195                 return controlURL;
    196             }
    197         } else {
    198             return mSessionURL;
    199         }
    200     }
    201 
    202     void disconnect() {
    203         (new AMessage('abor', id()))->post();
    204     }
    205 
    206     void seek(int64_t timeUs) {
    207         sp<AMessage> msg = new AMessage('seek', id());
    208         msg->setInt64("time", timeUs);
    209         mPauseGeneration++;
    210         msg->post();
    211     }
    212 
    213     bool isSeekable() const {
    214         return mSeekable;
    215     }
    216 
    217     void pause() {
    218         sp<AMessage> msg = new AMessage('paus', id());
    219         mPauseGeneration++;
    220         msg->setInt32("pausecheck", mPauseGeneration);
    221         msg->post(kPauseDelayUs);
    222     }
    223 
    224     void resume() {
    225         sp<AMessage> msg = new AMessage('resu', id());
    226         mPauseGeneration++;
    227         msg->post();
    228     }
    229 
    230     static void addRR(const sp<ABuffer> &buf) {
    231         uint8_t *ptr = buf->data() + buf->size();
    232         ptr[0] = 0x80 | 0;
    233         ptr[1] = 201;  // RR
    234         ptr[2] = 0;
    235         ptr[3] = 1;
    236         ptr[4] = 0xde;  // SSRC
    237         ptr[5] = 0xad;
    238         ptr[6] = 0xbe;
    239         ptr[7] = 0xef;
    240 
    241         buf->setRange(0, buf->size() + 8);
    242     }
    243 
    244     static void addSDES(int s, const sp<ABuffer> &buffer) {
    245         struct sockaddr_in addr;
    246         socklen_t addrSize = sizeof(addr);
    247         CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
    248 
    249         uint8_t *data = buffer->data() + buffer->size();
    250         data[0] = 0x80 | 1;
    251         data[1] = 202;  // SDES
    252         data[4] = 0xde;  // SSRC
    253         data[5] = 0xad;
    254         data[6] = 0xbe;
    255         data[7] = 0xef;
    256 
    257         size_t offset = 8;
    258 
    259         data[offset++] = 1;  // CNAME
    260 
    261         AString cname = "stagefright@";
    262         cname.append(inet_ntoa(addr.sin_addr));
    263         data[offset++] = cname.size();
    264 
    265         memcpy(&data[offset], cname.c_str(), cname.size());
    266         offset += cname.size();
    267 
    268         data[offset++] = 6;  // TOOL
    269 
    270         AString tool = MakeUserAgent();
    271 
    272         data[offset++] = tool.size();
    273 
    274         memcpy(&data[offset], tool.c_str(), tool.size());
    275         offset += tool.size();
    276 
    277         data[offset++] = 0;
    278 
    279         if ((offset % 4) > 0) {
    280             size_t count = 4 - (offset % 4);
    281             switch (count) {
    282                 case 3:
    283                     data[offset++] = 0;
    284                 case 2:
    285                     data[offset++] = 0;
    286                 case 1:
    287                     data[offset++] = 0;
    288             }
    289         }
    290 
    291         size_t numWords = (offset / 4) - 1;
    292         data[2] = numWords >> 8;
    293         data[3] = numWords & 0xff;
    294 
    295         buffer->setRange(buffer->offset(), buffer->size() + offset);
    296     }
    297 
    298     // In case we're behind NAT, fire off two UDP packets to the remote
    299     // rtp/rtcp ports to poke a hole into the firewall for future incoming
    300     // packets. We're going to send an RR/SDES RTCP packet to both of them.
    301     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
    302         struct sockaddr_in addr;
    303         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    304         addr.sin_family = AF_INET;
    305 
    306         AString source;
    307         AString server_port;
    308         if (!GetAttribute(transport.c_str(),
    309                           "source",
    310                           &source)) {
    311             ALOGW("Missing 'source' field in Transport response. Using "
    312                  "RTSP endpoint address.");
    313 
    314             struct hostent *ent = gethostbyname(mSessionHost.c_str());
    315             if (ent == NULL) {
    316                 ALOGE("Failed to look up address of session host '%s'",
    317                      mSessionHost.c_str());
    318 
    319                 return false;
    320             }
    321 
    322             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    323         } else {
    324             addr.sin_addr.s_addr = inet_addr(source.c_str());
    325         }
    326 
    327         if (!GetAttribute(transport.c_str(),
    328                                  "server_port",
    329                                  &server_port)) {
    330             ALOGI("Missing 'server_port' field in Transport response.");
    331             return false;
    332         }
    333 
    334         int rtpPort, rtcpPort;
    335         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
    336                 || rtpPort <= 0 || rtpPort > 65535
    337                 || rtcpPort <=0 || rtcpPort > 65535
    338                 || rtcpPort != rtpPort + 1) {
    339             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
    340                  " RTP port must be even, RTCP port must be one higher.",
    341                  server_port.c_str());
    342 
    343             return false;
    344         }
    345 
    346         if (rtpPort & 1) {
    347             ALOGW("Server picked an odd RTP port, it should've picked an "
    348                  "even one, we'll let it pass for now, but this may break "
    349                  "in the future.");
    350         }
    351 
    352         if (addr.sin_addr.s_addr == INADDR_NONE) {
    353             return true;
    354         }
    355 
    356         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
    357             // No firewalls to traverse on the loopback interface.
    358             return true;
    359         }
    360 
    361         // Make up an RR/SDES RTCP packet.
    362         sp<ABuffer> buf = new ABuffer(65536);
    363         buf->setRange(0, 0);
    364         addRR(buf);
    365         addSDES(rtpSocket, buf);
    366 
    367         addr.sin_port = htons(rtpPort);
    368 
    369         ssize_t n = sendto(
    370                 rtpSocket, buf->data(), buf->size(), 0,
    371                 (const sockaddr *)&addr, sizeof(addr));
    372 
    373         if (n < (ssize_t)buf->size()) {
    374             ALOGE("failed to poke a hole for RTP packets");
    375             return false;
    376         }
    377 
    378         addr.sin_port = htons(rtcpPort);
    379 
    380         n = sendto(
    381                 rtcpSocket, buf->data(), buf->size(), 0,
    382                 (const sockaddr *)&addr, sizeof(addr));
    383 
    384         if (n < (ssize_t)buf->size()) {
    385             ALOGE("failed to poke a hole for RTCP packets");
    386             return false;
    387         }
    388 
    389         ALOGV("successfully poked holes.");
    390 
    391         return true;
    392     }
    393 
    394     static bool isLiveStream(const sp<ASessionDescription> &desc) {
    395         AString attrLiveStream;
    396         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
    397             ssize_t semicolonPos = attrLiveStream.find(";", 2);
    398 
    399             const char* liveStreamValue;
    400             if (semicolonPos < 0) {
    401                 liveStreamValue = attrLiveStream.c_str();
    402             } else {
    403                 AString valString;
    404                 valString.setTo(attrLiveStream,
    405                         semicolonPos + 1,
    406                         attrLiveStream.size() - semicolonPos - 1);
    407                 liveStreamValue = valString.c_str();
    408             }
    409 
    410             uint32_t value = strtoul(liveStreamValue, NULL, 10);
    411             if (value == 1) {
    412                 ALOGV("found live stream");
    413                 return true;
    414             }
    415         } else {
    416             // It is a live stream if no duration is returned
    417             int64_t durationUs;
    418             if (!desc->getDurationUs(&durationUs)) {
    419                 ALOGV("No duration found, assume live stream");
    420                 return true;
    421             }
    422         }
    423 
    424         return false;
    425     }
    426 
    427     virtual void onMessageReceived(const sp<AMessage> &msg) {
    428         switch (msg->what()) {
    429             case 'conn':
    430             {
    431                 int32_t result;
    432                 CHECK(msg->findInt32("result", &result));
    433 
    434                 ALOGI("connection request completed with result %d (%s)",
    435                      result, strerror(-result));
    436 
    437                 if (result == OK) {
    438                     AString request;
    439                     request = "DESCRIBE ";
    440                     request.append(mSessionURL);
    441                     request.append(" RTSP/1.0\r\n");
    442                     request.append("Accept: application/sdp\r\n");
    443                     request.append("\r\n");
    444 
    445                     sp<AMessage> reply = new AMessage('desc', id());
    446                     mConn->sendRequest(request.c_str(), reply);
    447                 } else {
    448                     (new AMessage('disc', id()))->post();
    449                 }
    450                 break;
    451             }
    452 
    453             case 'disc':
    454             {
    455                 ++mKeepAliveGeneration;
    456 
    457                 int32_t reconnect;
    458                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    459                     sp<AMessage> reply = new AMessage('conn', id());
    460                     mConn->connect(mOriginalSessionURL.c_str(), reply);
    461                 } else {
    462                     (new AMessage('quit', id()))->post();
    463                 }
    464                 break;
    465             }
    466 
    467             case 'desc':
    468             {
    469                 int32_t result;
    470                 CHECK(msg->findInt32("result", &result));
    471 
    472                 ALOGI("DESCRIBE completed with result %d (%s)",
    473                      result, strerror(-result));
    474 
    475                 if (result == OK) {
    476                     sp<RefBase> obj;
    477                     CHECK(msg->findObject("response", &obj));
    478                     sp<ARTSPResponse> response =
    479                         static_cast<ARTSPResponse *>(obj.get());
    480 
    481                     if (response->mStatusCode == 302) {
    482                         ssize_t i = response->mHeaders.indexOfKey("location");
    483                         CHECK_GE(i, 0);
    484 
    485                         mSessionURL = response->mHeaders.valueAt(i);
    486 
    487                         AString request;
    488                         request = "DESCRIBE ";
    489                         request.append(mSessionURL);
    490                         request.append(" RTSP/1.0\r\n");
    491                         request.append("Accept: application/sdp\r\n");
    492                         request.append("\r\n");
    493 
    494                         sp<AMessage> reply = new AMessage('desc', id());
    495                         mConn->sendRequest(request.c_str(), reply);
    496                         break;
    497                     }
    498 
    499                     if (response->mStatusCode != 200) {
    500                         result = UNKNOWN_ERROR;
    501                     } else if (response->mContent == NULL) {
    502                         result = ERROR_MALFORMED;
    503                         ALOGE("The response has no content.");
    504                     } else {
    505                         mSessionDesc = new ASessionDescription;
    506 
    507                         mSessionDesc->setTo(
    508                                 response->mContent->data(),
    509                                 response->mContent->size());
    510 
    511                         if (!mSessionDesc->isValid()) {
    512                             ALOGE("Failed to parse session description.");
    513                             result = ERROR_MALFORMED;
    514                         } else {
    515                             ssize_t i = response->mHeaders.indexOfKey("content-base");
    516                             if (i >= 0) {
    517                                 mBaseURL = response->mHeaders.valueAt(i);
    518                             } else {
    519                                 i = response->mHeaders.indexOfKey("content-location");
    520                                 if (i >= 0) {
    521                                     mBaseURL = response->mHeaders.valueAt(i);
    522                                 } else {
    523                                     mBaseURL = mSessionURL;
    524                                 }
    525                             }
    526 
    527                             mSeekable = !isLiveStream(mSessionDesc);
    528 
    529                             if (!mBaseURL.startsWith("rtsp://")) {
    530                                 // Some misbehaving servers specify a relative
    531                                 // URL in one of the locations above, combine
    532                                 // it with the absolute session URL to get
    533                                 // something usable...
    534 
    535                                 ALOGW("Server specified a non-absolute base URL"
    536                                      ", combining it with the session URL to "
    537                                      "get something usable...");
    538 
    539                                 AString tmp;
    540                                 CHECK(MakeURL(
    541                                             mSessionURL.c_str(),
    542                                             mBaseURL.c_str(),
    543                                             &tmp));
    544 
    545                                 mBaseURL = tmp;
    546                             }
    547 
    548                             mControlURL = getControlURL(mSessionDesc);
    549 
    550                             if (mSessionDesc->countTracks() < 2) {
    551                                 // There's no actual tracks in this session.
    552                                 // The first "track" is merely session meta
    553                                 // data.
    554 
    555                                 ALOGW("Session doesn't contain any playable "
    556                                      "tracks. Aborting.");
    557                                 result = ERROR_UNSUPPORTED;
    558                             } else {
    559                                 setupTrack(1);
    560                             }
    561                         }
    562                     }
    563                 }
    564 
    565                 if (result != OK) {
    566                     sp<AMessage> reply = new AMessage('disc', id());
    567                     mConn->disconnect(reply);
    568                 }
    569                 break;
    570             }
    571 
    572             case 'sdpl':
    573             {
    574                 int32_t result;
    575                 CHECK(msg->findInt32("result", &result));
    576 
    577                 ALOGI("SDP connection request completed with result %d (%s)",
    578                      result, strerror(-result));
    579 
    580                 if (result == OK) {
    581                     sp<RefBase> obj;
    582                     CHECK(msg->findObject("description", &obj));
    583                     mSessionDesc =
    584                         static_cast<ASessionDescription *>(obj.get());
    585 
    586                     if (!mSessionDesc->isValid()) {
    587                         ALOGE("Failed to parse session description.");
    588                         result = ERROR_MALFORMED;
    589                     } else {
    590                         mBaseURL = mSessionURL;
    591 
    592                         mSeekable = !isLiveStream(mSessionDesc);
    593 
    594                         mControlURL = getControlURL(mSessionDesc);
    595 
    596                         if (mSessionDesc->countTracks() < 2) {
    597                             // There's no actual tracks in this session.
    598                             // The first "track" is merely session meta
    599                             // data.
    600 
    601                             ALOGW("Session doesn't contain any playable "
    602                                  "tracks. Aborting.");
    603                             result = ERROR_UNSUPPORTED;
    604                         } else {
    605                             setupTrack(1);
    606                         }
    607                     }
    608                 }
    609 
    610                 if (result != OK) {
    611                     sp<AMessage> reply = new AMessage('disc', id());
    612                     mConn->disconnect(reply);
    613                 }
    614                 break;
    615             }
    616 
    617             case 'setu':
    618             {
    619                 size_t index;
    620                 CHECK(msg->findSize("index", &index));
    621 
    622                 TrackInfo *track = NULL;
    623                 size_t trackIndex;
    624                 if (msg->findSize("track-index", &trackIndex)) {
    625                     track = &mTracks.editItemAt(trackIndex);
    626                 }
    627 
    628                 int32_t result;
    629                 CHECK(msg->findInt32("result", &result));
    630 
    631                 ALOGI("SETUP(%d) completed with result %d (%s)",
    632                      index, result, strerror(-result));
    633 
    634                 if (result == OK) {
    635                     CHECK(track != NULL);
    636 
    637                     sp<RefBase> obj;
    638                     CHECK(msg->findObject("response", &obj));
    639                     sp<ARTSPResponse> response =
    640                         static_cast<ARTSPResponse *>(obj.get());
    641 
    642                     if (response->mStatusCode != 200) {
    643                         result = UNKNOWN_ERROR;
    644                     } else {
    645                         ssize_t i = response->mHeaders.indexOfKey("session");
    646                         CHECK_GE(i, 0);
    647 
    648                         mSessionID = response->mHeaders.valueAt(i);
    649 
    650                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    651                         AString timeoutStr;
    652                         if (GetAttribute(
    653                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
    654                             char *end;
    655                             unsigned long timeoutSecs =
    656                                 strtoul(timeoutStr.c_str(), &end, 10);
    657 
    658                             if (end == timeoutStr.c_str() || *end != '\0') {
    659                                 ALOGW("server specified malformed timeout '%s'",
    660                                      timeoutStr.c_str());
    661 
    662                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    663                             } else if (timeoutSecs < 15) {
    664                                 ALOGW("server specified too short a timeout "
    665                                      "(%lu secs), using default.",
    666                                      timeoutSecs);
    667 
    668                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    669                             } else {
    670                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
    671 
    672                                 ALOGI("server specified timeout of %lu secs.",
    673                                      timeoutSecs);
    674                             }
    675                         }
    676 
    677                         i = mSessionID.find(";");
    678                         if (i >= 0) {
    679                             // Remove options, i.e. ";timeout=90"
    680                             mSessionID.erase(i, mSessionID.size() - i);
    681                         }
    682 
    683                         sp<AMessage> notify = new AMessage('accu', id());
    684                         notify->setSize("track-index", trackIndex);
    685 
    686                         i = response->mHeaders.indexOfKey("transport");
    687                         CHECK_GE(i, 0);
    688 
    689                         if (!track->mUsingInterleavedTCP) {
    690                             AString transport = response->mHeaders.valueAt(i);
    691 
    692                             // We are going to continue even if we were
    693                             // unable to poke a hole into the firewall...
    694                             pokeAHole(
    695                                     track->mRTPSocket,
    696                                     track->mRTCPSocket,
    697                                     transport);
    698                         }
    699 
    700                         mRTPConn->addStream(
    701                                 track->mRTPSocket, track->mRTCPSocket,
    702                                 mSessionDesc, index,
    703                                 notify, track->mUsingInterleavedTCP);
    704 
    705                         mSetupTracksSuccessful = true;
    706                     }
    707                 }
    708 
    709                 if (result != OK) {
    710                     if (track) {
    711                         if (!track->mUsingInterleavedTCP) {
    712                             // Clear the tag
    713                             if (mUIDValid) {
    714                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
    715                                 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
    716                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
    717                                 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
    718                             }
    719 
    720                             close(track->mRTPSocket);
    721                             close(track->mRTCPSocket);
    722                         }
    723 
    724                         mTracks.removeItemsAt(trackIndex);
    725                     }
    726                 }
    727 
    728                 ++index;
    729                 if (index < mSessionDesc->countTracks()) {
    730                     setupTrack(index);
    731                 } else if (mSetupTracksSuccessful) {
    732                     ++mKeepAliveGeneration;
    733                     postKeepAlive();
    734 
    735                     AString request = "PLAY ";
    736                     request.append(mControlURL);
    737                     request.append(" RTSP/1.0\r\n");
    738 
    739                     request.append("Session: ");
    740                     request.append(mSessionID);
    741                     request.append("\r\n");
    742 
    743                     request.append("\r\n");
    744 
    745                     sp<AMessage> reply = new AMessage('play', id());
    746                     mConn->sendRequest(request.c_str(), reply);
    747                 } else {
    748                     sp<AMessage> reply = new AMessage('disc', id());
    749                     mConn->disconnect(reply);
    750                 }
    751                 break;
    752             }
    753 
    754             case 'play':
    755             {
    756                 int32_t result;
    757                 CHECK(msg->findInt32("result", &result));
    758 
    759                 ALOGI("PLAY completed with result %d (%s)",
    760                      result, strerror(-result));
    761 
    762                 if (result == OK) {
    763                     sp<RefBase> obj;
    764                     CHECK(msg->findObject("response", &obj));
    765                     sp<ARTSPResponse> response =
    766                         static_cast<ARTSPResponse *>(obj.get());
    767 
    768                     if (response->mStatusCode != 200) {
    769                         result = UNKNOWN_ERROR;
    770                     } else {
    771                         parsePlayResponse(response);
    772 
    773                         sp<AMessage> timeout = new AMessage('tiou', id());
    774                         mCheckTimeoutGeneration++;
    775                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
    776                         timeout->post(kStartupTimeoutUs);
    777                     }
    778                 }
    779 
    780                 if (result != OK) {
    781                     sp<AMessage> reply = new AMessage('disc', id());
    782                     mConn->disconnect(reply);
    783                 }
    784 
    785                 break;
    786             }
    787 
    788             case 'aliv':
    789             {
    790                 int32_t generation;
    791                 CHECK(msg->findInt32("generation", &generation));
    792 
    793                 if (generation != mKeepAliveGeneration) {
    794                     // obsolete event.
    795                     break;
    796                 }
    797 
    798                 AString request;
    799                 request.append("OPTIONS ");
    800                 request.append(mSessionURL);
    801                 request.append(" RTSP/1.0\r\n");
    802                 request.append("Session: ");
    803                 request.append(mSessionID);
    804                 request.append("\r\n");
    805                 request.append("\r\n");
    806 
    807                 sp<AMessage> reply = new AMessage('opts', id());
    808                 reply->setInt32("generation", mKeepAliveGeneration);
    809                 mConn->sendRequest(request.c_str(), reply);
    810                 break;
    811             }
    812 
    813             case 'opts':
    814             {
    815                 int32_t result;
    816                 CHECK(msg->findInt32("result", &result));
    817 
    818                 ALOGI("OPTIONS completed with result %d (%s)",
    819                      result, strerror(-result));
    820 
    821                 int32_t generation;
    822                 CHECK(msg->findInt32("generation", &generation));
    823 
    824                 if (generation != mKeepAliveGeneration) {
    825                     // obsolete event.
    826                     break;
    827                 }
    828 
    829                 postKeepAlive();
    830                 break;
    831             }
    832 
    833             case 'abor':
    834             {
    835                 for (size_t i = 0; i < mTracks.size(); ++i) {
    836                     TrackInfo *info = &mTracks.editItemAt(i);
    837 
    838                     if (!mFirstAccessUnit) {
    839                         postQueueEOS(i, ERROR_END_OF_STREAM);
    840                     }
    841 
    842                     if (!info->mUsingInterleavedTCP) {
    843                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
    844 
    845                         // Clear the tag
    846                         if (mUIDValid) {
    847                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
    848                             HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
    849                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
    850                             HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
    851                         }
    852 
    853                         close(info->mRTPSocket);
    854                         close(info->mRTCPSocket);
    855                     }
    856                 }
    857                 mTracks.clear();
    858                 mSetupTracksSuccessful = false;
    859                 mSeekPending = false;
    860                 mFirstAccessUnit = true;
    861                 mAllTracksHaveTime = false;
    862                 mNTPAnchorUs = -1;
    863                 mMediaAnchorUs = -1;
    864                 mNumAccessUnitsReceived = 0;
    865                 mReceivedFirstRTCPPacket = false;
    866                 mReceivedFirstRTPPacket = false;
    867                 mPausing = false;
    868                 mSeekable = true;
    869 
    870                 sp<AMessage> reply = new AMessage('tear', id());
    871 
    872                 int32_t reconnect;
    873                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    874                     reply->setInt32("reconnect", true);
    875                 }
    876 
    877                 AString request;
    878                 request = "TEARDOWN ";
    879 
    880                 // XXX should use aggregate url from SDP here...
    881                 request.append(mSessionURL);
    882                 request.append(" RTSP/1.0\r\n");
    883 
    884                 request.append("Session: ");
    885                 request.append(mSessionID);
    886                 request.append("\r\n");
    887 
    888                 request.append("\r\n");
    889 
    890                 mConn->sendRequest(request.c_str(), reply);
    891                 break;
    892             }
    893 
    894             case 'tear':
    895             {
    896                 int32_t result;
    897                 CHECK(msg->findInt32("result", &result));
    898 
    899                 ALOGI("TEARDOWN completed with result %d (%s)",
    900                      result, strerror(-result));
    901 
    902                 sp<AMessage> reply = new AMessage('disc', id());
    903 
    904                 int32_t reconnect;
    905                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    906                     reply->setInt32("reconnect", true);
    907                 }
    908 
    909                 mConn->disconnect(reply);
    910                 break;
    911             }
    912 
    913             case 'quit':
    914             {
    915                 sp<AMessage> msg = mNotify->dup();
    916                 msg->setInt32("what", kWhatDisconnected);
    917                 msg->setInt32("result", UNKNOWN_ERROR);
    918                 msg->post();
    919                 break;
    920             }
    921 
    922             case 'chek':
    923             {
    924                 int32_t generation;
    925                 CHECK(msg->findInt32("generation", &generation));
    926                 if (generation != mCheckGeneration) {
    927                     // This is an outdated message. Ignore.
    928                     break;
    929                 }
    930 
    931                 if (mNumAccessUnitsReceived == 0) {
    932 #if 1
    933                     ALOGI("stream ended? aborting.");
    934                     (new AMessage('abor', id()))->post();
    935                     break;
    936 #else
    937                     ALOGI("haven't seen an AU in a looong time.");
    938 #endif
    939                 }
    940 
    941                 mNumAccessUnitsReceived = 0;
    942                 msg->post(kAccessUnitTimeoutUs);
    943                 break;
    944             }
    945 
    946             case 'accu':
    947             {
    948                 int32_t timeUpdate;
    949                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
    950                     size_t trackIndex;
    951                     CHECK(msg->findSize("track-index", &trackIndex));
    952 
    953                     uint32_t rtpTime;
    954                     uint64_t ntpTime;
    955                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
    956                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
    957 
    958                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
    959                     break;
    960                 }
    961 
    962                 int32_t first;
    963                 if (msg->findInt32("first-rtcp", &first)) {
    964                     mReceivedFirstRTCPPacket = true;
    965                     break;
    966                 }
    967 
    968                 if (msg->findInt32("first-rtp", &first)) {
    969                     mReceivedFirstRTPPacket = true;
    970                     break;
    971                 }
    972 
    973                 ++mNumAccessUnitsReceived;
    974                 postAccessUnitTimeoutCheck();
    975 
    976                 size_t trackIndex;
    977                 CHECK(msg->findSize("track-index", &trackIndex));
    978 
    979                 if (trackIndex >= mTracks.size()) {
    980                     ALOGV("late packets ignored.");
    981                     break;
    982                 }
    983 
    984                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
    985 
    986                 int32_t eos;
    987                 if (msg->findInt32("eos", &eos)) {
    988                     ALOGI("received BYE on track index %d", trackIndex);
    989                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
    990                         ALOGI("No time established => fake existing data");
    991 
    992                         track->mEOSReceived = true;
    993                         mTryFakeRTCP = true;
    994                         mReceivedFirstRTCPPacket = true;
    995                         fakeTimestamps();
    996                     } else {
    997                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
    998                     }
    999                     return;
   1000                 }
   1001 
   1002                 sp<ABuffer> accessUnit;
   1003                 CHECK(msg->findBuffer("access-unit", &accessUnit));
   1004 
   1005                 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
   1006 
   1007                 if (mSeekPending) {
   1008                     ALOGV("we're seeking, dropping stale packet.");
   1009                     break;
   1010                 }
   1011 
   1012                 if (seqNum < track->mFirstSeqNumInSegment) {
   1013                     ALOGV("dropping stale access-unit (%d < %d)",
   1014                          seqNum, track->mFirstSeqNumInSegment);
   1015                     break;
   1016                 }
   1017 
   1018                 if (track->mNewSegment) {
   1019                     track->mNewSegment = false;
   1020                 }
   1021 
   1022                 onAccessUnitComplete(trackIndex, accessUnit);
   1023                 break;
   1024             }
   1025 
   1026             case 'paus':
   1027             {
   1028                 int32_t generation;
   1029                 CHECK(msg->findInt32("pausecheck", &generation));
   1030                 if (generation != mPauseGeneration) {
   1031                     ALOGV("Ignoring outdated pause message.");
   1032                     break;
   1033                 }
   1034 
   1035                 if (!mSeekable) {
   1036                     ALOGW("This is a live stream, ignoring pause request.");
   1037                     break;
   1038                 }
   1039                 mCheckPending = true;
   1040                 ++mCheckGeneration;
   1041                 mPausing = true;
   1042 
   1043                 AString request = "PAUSE ";
   1044                 request.append(mControlURL);
   1045                 request.append(" RTSP/1.0\r\n");
   1046 
   1047                 request.append("Session: ");
   1048                 request.append(mSessionID);
   1049                 request.append("\r\n");
   1050 
   1051                 request.append("\r\n");
   1052 
   1053                 sp<AMessage> reply = new AMessage('pau2', id());
   1054                 mConn->sendRequest(request.c_str(), reply);
   1055                 break;
   1056             }
   1057 
   1058             case 'pau2':
   1059             {
   1060                 int32_t result;
   1061                 CHECK(msg->findInt32("result", &result));
   1062                 mCheckTimeoutGeneration++;
   1063 
   1064                 ALOGI("PAUSE completed with result %d (%s)",
   1065                      result, strerror(-result));
   1066                 break;
   1067             }
   1068 
   1069             case 'resu':
   1070             {
   1071                 if (mPausing && mSeekPending) {
   1072                     // If seeking, Play will be sent from see1 instead
   1073                     break;
   1074                 }
   1075 
   1076                 if (!mPausing) {
   1077                     // Dont send PLAY if we have not paused
   1078                     break;
   1079                 }
   1080                 AString request = "PLAY ";
   1081                 request.append(mControlURL);
   1082                 request.append(" RTSP/1.0\r\n");
   1083 
   1084                 request.append("Session: ");
   1085                 request.append(mSessionID);
   1086                 request.append("\r\n");
   1087 
   1088                 request.append("\r\n");
   1089 
   1090                 sp<AMessage> reply = new AMessage('res2', id());
   1091                 mConn->sendRequest(request.c_str(), reply);
   1092                 break;
   1093             }
   1094 
   1095             case 'res2':
   1096             {
   1097                 int32_t result;
   1098                 CHECK(msg->findInt32("result", &result));
   1099 
   1100                 ALOGI("PLAY completed with result %d (%s)",
   1101                      result, strerror(-result));
   1102 
   1103                 mCheckPending = false;
   1104                 postAccessUnitTimeoutCheck();
   1105 
   1106                 if (result == OK) {
   1107                     sp<RefBase> obj;
   1108                     CHECK(msg->findObject("response", &obj));
   1109                     sp<ARTSPResponse> response =
   1110                         static_cast<ARTSPResponse *>(obj.get());
   1111 
   1112                     if (response->mStatusCode != 200) {
   1113                         result = UNKNOWN_ERROR;
   1114                     } else {
   1115                         parsePlayResponse(response);
   1116 
   1117                         // Post new timeout in order to make sure to use
   1118                         // fake timestamps if no new Sender Reports arrive
   1119                         sp<AMessage> timeout = new AMessage('tiou', id());
   1120                         mCheckTimeoutGeneration++;
   1121                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1122                         timeout->post(kStartupTimeoutUs);
   1123                     }
   1124                 }
   1125 
   1126                 if (result != OK) {
   1127                     ALOGE("resume failed, aborting.");
   1128                     (new AMessage('abor', id()))->post();
   1129                 }
   1130 
   1131                 mPausing = false;
   1132                 break;
   1133             }
   1134 
   1135             case 'seek':
   1136             {
   1137                 if (!mSeekable) {
   1138                     ALOGW("This is a live stream, ignoring seek request.");
   1139 
   1140                     sp<AMessage> msg = mNotify->dup();
   1141                     msg->setInt32("what", kWhatSeekDone);
   1142                     msg->post();
   1143                     break;
   1144                 }
   1145 
   1146                 int64_t timeUs;
   1147                 CHECK(msg->findInt64("time", &timeUs));
   1148 
   1149                 mSeekPending = true;
   1150 
   1151                 // Disable the access unit timeout until we resumed
   1152                 // playback again.
   1153                 mCheckPending = true;
   1154                 ++mCheckGeneration;
   1155 
   1156                 sp<AMessage> reply = new AMessage('see1', id());
   1157                 reply->setInt64("time", timeUs);
   1158 
   1159                 if (mPausing) {
   1160                     // PAUSE already sent
   1161                     ALOGI("Pause already sent");
   1162                     reply->post();
   1163                     break;
   1164                 }
   1165                 AString request = "PAUSE ";
   1166                 request.append(mControlURL);
   1167                 request.append(" RTSP/1.0\r\n");
   1168 
   1169                 request.append("Session: ");
   1170                 request.append(mSessionID);
   1171                 request.append("\r\n");
   1172 
   1173                 request.append("\r\n");
   1174 
   1175                 mConn->sendRequest(request.c_str(), reply);
   1176                 break;
   1177             }
   1178 
   1179             case 'see1':
   1180             {
   1181                 // Session is paused now.
   1182                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1183                     TrackInfo *info = &mTracks.editItemAt(i);
   1184 
   1185                     postQueueSeekDiscontinuity(i);
   1186                     info->mEOSReceived = false;
   1187 
   1188                     info->mRTPAnchor = 0;
   1189                     info->mNTPAnchorUs = -1;
   1190                 }
   1191 
   1192                 mAllTracksHaveTime = false;
   1193                 mNTPAnchorUs = -1;
   1194 
   1195                 // Start new timeoutgeneration to avoid getting timeout
   1196                 // before PLAY response arrive
   1197                 sp<AMessage> timeout = new AMessage('tiou', id());
   1198                 mCheckTimeoutGeneration++;
   1199                 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1200                 timeout->post(kStartupTimeoutUs);
   1201 
   1202                 int64_t timeUs;
   1203                 CHECK(msg->findInt64("time", &timeUs));
   1204 
   1205                 AString request = "PLAY ";
   1206                 request.append(mControlURL);
   1207                 request.append(" RTSP/1.0\r\n");
   1208 
   1209                 request.append("Session: ");
   1210                 request.append(mSessionID);
   1211                 request.append("\r\n");
   1212 
   1213                 request.append(
   1214                         StringPrintf(
   1215                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
   1216 
   1217                 request.append("\r\n");
   1218 
   1219                 sp<AMessage> reply = new AMessage('see2', id());
   1220                 mConn->sendRequest(request.c_str(), reply);
   1221                 break;
   1222             }
   1223 
   1224             case 'see2':
   1225             {
   1226                 if (mTracks.size() == 0) {
   1227                     // We have already hit abor, break
   1228                     break;
   1229                 }
   1230 
   1231                 int32_t result;
   1232                 CHECK(msg->findInt32("result", &result));
   1233 
   1234                 ALOGI("PLAY completed with result %d (%s)",
   1235                      result, strerror(-result));
   1236 
   1237                 mCheckPending = false;
   1238                 postAccessUnitTimeoutCheck();
   1239 
   1240                 if (result == OK) {
   1241                     sp<RefBase> obj;
   1242                     CHECK(msg->findObject("response", &obj));
   1243                     sp<ARTSPResponse> response =
   1244                         static_cast<ARTSPResponse *>(obj.get());
   1245 
   1246                     if (response->mStatusCode != 200) {
   1247                         result = UNKNOWN_ERROR;
   1248                     } else {
   1249                         parsePlayResponse(response);
   1250 
   1251                         // Post new timeout in order to make sure to use
   1252                         // fake timestamps if no new Sender Reports arrive
   1253                         sp<AMessage> timeout = new AMessage('tiou', id());
   1254                         mCheckTimeoutGeneration++;
   1255                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1256                         timeout->post(kStartupTimeoutUs);
   1257 
   1258                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
   1259                         CHECK_GE(i, 0);
   1260 
   1261                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
   1262 
   1263                         ALOGI("seek completed.");
   1264                     }
   1265                 }
   1266 
   1267                 if (result != OK) {
   1268                     ALOGE("seek failed, aborting.");
   1269                     (new AMessage('abor', id()))->post();
   1270                 }
   1271 
   1272                 mPausing = false;
   1273                 mSeekPending = false;
   1274 
   1275                 sp<AMessage> msg = mNotify->dup();
   1276                 msg->setInt32("what", kWhatSeekDone);
   1277                 msg->post();
   1278                 break;
   1279             }
   1280 
   1281             case 'biny':
   1282             {
   1283                 sp<ABuffer> buffer;
   1284                 CHECK(msg->findBuffer("buffer", &buffer));
   1285 
   1286                 int32_t index;
   1287                 CHECK(buffer->meta()->findInt32("index", &index));
   1288 
   1289                 mRTPConn->injectPacket(index, buffer);
   1290                 break;
   1291             }
   1292 
   1293             case 'tiou':
   1294             {
   1295                 int32_t timeoutGenerationCheck;
   1296                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
   1297                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
   1298                     // This is an outdated message. Ignore.
   1299                     // This typically happens if a lot of seeks are
   1300                     // performed, since new timeout messages now are
   1301                     // posted at seek as well.
   1302                     break;
   1303                 }
   1304                 if (!mReceivedFirstRTCPPacket) {
   1305                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
   1306                         ALOGW("We received RTP packets but no RTCP packets, "
   1307                              "using fake timestamps.");
   1308 
   1309                         mTryFakeRTCP = true;
   1310 
   1311                         mReceivedFirstRTCPPacket = true;
   1312 
   1313                         fakeTimestamps();
   1314                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
   1315                         ALOGW("Never received any data, switching transports.");
   1316 
   1317                         mTryTCPInterleaving = true;
   1318 
   1319                         sp<AMessage> msg = new AMessage('abor', id());
   1320                         msg->setInt32("reconnect", true);
   1321                         msg->post();
   1322                     } else {
   1323                         ALOGW("Never received any data, disconnecting.");
   1324                         (new AMessage('abor', id()))->post();
   1325                     }
   1326                 } else {
   1327                     if (!mAllTracksHaveTime) {
   1328                         ALOGW("We received some RTCP packets, but time "
   1329                               "could not be established on all tracks, now "
   1330                               "using fake timestamps");
   1331 
   1332                         fakeTimestamps();
   1333                     }
   1334                 }
   1335                 break;
   1336             }
   1337 
   1338             default:
   1339                 TRESPASS();
   1340                 break;
   1341         }
   1342     }
   1343 
   1344     void postKeepAlive() {
   1345         sp<AMessage> msg = new AMessage('aliv', id());
   1346         msg->setInt32("generation", mKeepAliveGeneration);
   1347         msg->post((mKeepAliveTimeoutUs * 9) / 10);
   1348     }
   1349 
   1350     void postAccessUnitTimeoutCheck() {
   1351         if (mCheckPending) {
   1352             return;
   1353         }
   1354 
   1355         mCheckPending = true;
   1356         sp<AMessage> check = new AMessage('chek', id());
   1357         check->setInt32("generation", mCheckGeneration);
   1358         check->post(kAccessUnitTimeoutUs);
   1359     }
   1360 
   1361     static void SplitString(
   1362             const AString &s, const char *separator, List<AString> *items) {
   1363         items->clear();
   1364         size_t start = 0;
   1365         while (start < s.size()) {
   1366             ssize_t offset = s.find(separator, start);
   1367 
   1368             if (offset < 0) {
   1369                 items->push_back(AString(s, start, s.size() - start));
   1370                 break;
   1371             }
   1372 
   1373             items->push_back(AString(s, start, offset - start));
   1374             start = offset + strlen(separator);
   1375         }
   1376     }
   1377 
   1378     void parsePlayResponse(const sp<ARTSPResponse> &response) {
   1379         mPlayResponseParsed = true;
   1380         if (mTracks.size() == 0) {
   1381             ALOGV("parsePlayResponse: late packets ignored.");
   1382             return;
   1383         }
   1384 
   1385         ssize_t i = response->mHeaders.indexOfKey("range");
   1386         if (i < 0) {
   1387             // Server doesn't even tell use what range it is going to
   1388             // play, therefore we won't support seeking.
   1389             return;
   1390         }
   1391 
   1392         AString range = response->mHeaders.valueAt(i);
   1393         ALOGV("Range: %s", range.c_str());
   1394 
   1395         AString val;
   1396         CHECK(GetAttribute(range.c_str(), "npt", &val));
   1397 
   1398         float npt1, npt2;
   1399         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
   1400             // This is a live stream and therefore not seekable.
   1401 
   1402             ALOGI("This is a live stream");
   1403             return;
   1404         }
   1405 
   1406         i = response->mHeaders.indexOfKey("rtp-info");
   1407         CHECK_GE(i, 0);
   1408 
   1409         AString rtpInfo = response->mHeaders.valueAt(i);
   1410         List<AString> streamInfos;
   1411         SplitString(rtpInfo, ",", &streamInfos);
   1412 
   1413         int n = 1;
   1414         for (List<AString>::iterator it = streamInfos.begin();
   1415              it != streamInfos.end(); ++it) {
   1416             (*it).trim();
   1417             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
   1418 
   1419             CHECK(GetAttribute((*it).c_str(), "url", &val));
   1420 
   1421             size_t trackIndex = 0;
   1422             while (trackIndex < mTracks.size()
   1423                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
   1424                 ++trackIndex;
   1425             }
   1426             CHECK_LT(trackIndex, mTracks.size());
   1427 
   1428             CHECK(GetAttribute((*it).c_str(), "seq", &val));
   1429 
   1430             char *end;
   1431             unsigned long seq = strtoul(val.c_str(), &end, 10);
   1432 
   1433             TrackInfo *info = &mTracks.editItemAt(trackIndex);
   1434             info->mFirstSeqNumInSegment = seq;
   1435             info->mNewSegment = true;
   1436 
   1437             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
   1438 
   1439             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
   1440 
   1441             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
   1442 
   1443             info->mNormalPlayTimeRTP = rtpTime;
   1444             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
   1445 
   1446             if (!mFirstAccessUnit) {
   1447                 postNormalPlayTimeMapping(
   1448                         trackIndex,
   1449                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1450             }
   1451 
   1452             ++n;
   1453         }
   1454     }
   1455 
   1456     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
   1457         CHECK_GE(index, 0u);
   1458         CHECK_LT(index, mTracks.size());
   1459 
   1460         const TrackInfo &info = mTracks.itemAt(index);
   1461 
   1462         *timeScale = info.mTimeScale;
   1463 
   1464         return info.mPacketSource->getFormat();
   1465     }
   1466 
   1467     size_t countTracks() const {
   1468         return mTracks.size();
   1469     }
   1470 
   1471 private:
   1472     struct TrackInfo {
   1473         AString mURL;
   1474         int mRTPSocket;
   1475         int mRTCPSocket;
   1476         bool mUsingInterleavedTCP;
   1477         uint32_t mFirstSeqNumInSegment;
   1478         bool mNewSegment;
   1479 
   1480         uint32_t mRTPAnchor;
   1481         int64_t mNTPAnchorUs;
   1482         int32_t mTimeScale;
   1483         bool mEOSReceived;
   1484 
   1485         uint32_t mNormalPlayTimeRTP;
   1486         int64_t mNormalPlayTimeUs;
   1487 
   1488         sp<APacketSource> mPacketSource;
   1489 
   1490         // Stores packets temporarily while no notion of time
   1491         // has been established yet.
   1492         List<sp<ABuffer> > mPackets;
   1493     };
   1494 
   1495     sp<AMessage> mNotify;
   1496     bool mUIDValid;
   1497     uid_t mUID;
   1498     sp<ALooper> mNetLooper;
   1499     sp<ARTSPConnection> mConn;
   1500     sp<ARTPConnection> mRTPConn;
   1501     sp<ASessionDescription> mSessionDesc;
   1502     AString mOriginalSessionURL;  // This one still has user:pass@
   1503     AString mSessionURL;
   1504     AString mSessionHost;
   1505     AString mBaseURL;
   1506     AString mControlURL;
   1507     AString mSessionID;
   1508     bool mSetupTracksSuccessful;
   1509     bool mSeekPending;
   1510     bool mFirstAccessUnit;
   1511 
   1512     bool mAllTracksHaveTime;
   1513     int64_t mNTPAnchorUs;
   1514     int64_t mMediaAnchorUs;
   1515     int64_t mLastMediaTimeUs;
   1516 
   1517     int64_t mNumAccessUnitsReceived;
   1518     bool mCheckPending;
   1519     int32_t mCheckGeneration;
   1520     int32_t mCheckTimeoutGeneration;
   1521     bool mTryTCPInterleaving;
   1522     bool mTryFakeRTCP;
   1523     bool mReceivedFirstRTCPPacket;
   1524     bool mReceivedFirstRTPPacket;
   1525     bool mSeekable;
   1526     int64_t mKeepAliveTimeoutUs;
   1527     int32_t mKeepAliveGeneration;
   1528     bool mPausing;
   1529     int32_t mPauseGeneration;
   1530 
   1531     Vector<TrackInfo> mTracks;
   1532 
   1533     bool mPlayResponseParsed;
   1534 
   1535     void setupTrack(size_t index) {
   1536         sp<APacketSource> source =
   1537             new APacketSource(mSessionDesc, index);
   1538 
   1539         if (source->initCheck() != OK) {
   1540             ALOGW("Unsupported format. Ignoring track #%d.", index);
   1541 
   1542             sp<AMessage> reply = new AMessage('setu', id());
   1543             reply->setSize("index", index);
   1544             reply->setInt32("result", ERROR_UNSUPPORTED);
   1545             reply->post();
   1546             return;
   1547         }
   1548 
   1549         AString url;
   1550         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
   1551 
   1552         AString trackURL;
   1553         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
   1554 
   1555         mTracks.push(TrackInfo());
   1556         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
   1557         info->mURL = trackURL;
   1558         info->mPacketSource = source;
   1559         info->mUsingInterleavedTCP = false;
   1560         info->mFirstSeqNumInSegment = 0;
   1561         info->mNewSegment = true;
   1562         info->mRTPAnchor = 0;
   1563         info->mNTPAnchorUs = -1;
   1564         info->mNormalPlayTimeRTP = 0;
   1565         info->mNormalPlayTimeUs = 0ll;
   1566 
   1567         unsigned long PT;
   1568         AString formatDesc;
   1569         AString formatParams;
   1570         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
   1571 
   1572         int32_t timescale;
   1573         int32_t numChannels;
   1574         ASessionDescription::ParseFormatDesc(
   1575                 formatDesc.c_str(), &timescale, &numChannels);
   1576 
   1577         info->mTimeScale = timescale;
   1578         info->mEOSReceived = false;
   1579 
   1580         ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
   1581 
   1582         AString request = "SETUP ";
   1583         request.append(trackURL);
   1584         request.append(" RTSP/1.0\r\n");
   1585 
   1586         if (mTryTCPInterleaving) {
   1587             size_t interleaveIndex = 2 * (mTracks.size() - 1);
   1588             info->mUsingInterleavedTCP = true;
   1589             info->mRTPSocket = interleaveIndex;
   1590             info->mRTCPSocket = interleaveIndex + 1;
   1591 
   1592             request.append("Transport: RTP/AVP/TCP;interleaved=");
   1593             request.append(interleaveIndex);
   1594             request.append("-");
   1595             request.append(interleaveIndex + 1);
   1596         } else {
   1597             unsigned rtpPort;
   1598             ARTPConnection::MakePortPair(
   1599                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
   1600 
   1601             if (mUIDValid) {
   1602                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
   1603                                                 (uint32_t)*(uint32_t*) "RTP_");
   1604                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
   1605                                                 (uint32_t)*(uint32_t*) "RTP_");
   1606                 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
   1607                 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
   1608             }
   1609 
   1610             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
   1611             request.append(rtpPort);
   1612             request.append("-");
   1613             request.append(rtpPort + 1);
   1614         }
   1615 
   1616         request.append("\r\n");
   1617 
   1618         if (index > 1) {
   1619             request.append("Session: ");
   1620             request.append(mSessionID);
   1621             request.append("\r\n");
   1622         }
   1623 
   1624         request.append("\r\n");
   1625 
   1626         sp<AMessage> reply = new AMessage('setu', id());
   1627         reply->setSize("index", index);
   1628         reply->setSize("track-index", mTracks.size() - 1);
   1629         mConn->sendRequest(request.c_str(), reply);
   1630     }
   1631 
   1632     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
   1633         out->clear();
   1634 
   1635         if (strncasecmp("rtsp://", baseURL, 7)) {
   1636             // Base URL must be absolute
   1637             return false;
   1638         }
   1639 
   1640         if (!strncasecmp("rtsp://", url, 7)) {
   1641             // "url" is already an absolute URL, ignore base URL.
   1642             out->setTo(url);
   1643             return true;
   1644         }
   1645 
   1646         size_t n = strlen(baseURL);
   1647         if (baseURL[n - 1] == '/') {
   1648             out->setTo(baseURL);
   1649             out->append(url);
   1650         } else {
   1651             const char *slashPos = strrchr(baseURL, '/');
   1652 
   1653             if (slashPos > &baseURL[6]) {
   1654                 out->setTo(baseURL, slashPos - baseURL);
   1655             } else {
   1656                 out->setTo(baseURL);
   1657             }
   1658 
   1659             out->append("/");
   1660             out->append(url);
   1661         }
   1662 
   1663         return true;
   1664     }
   1665 
   1666     void fakeTimestamps() {
   1667         mNTPAnchorUs = -1ll;
   1668         for (size_t i = 0; i < mTracks.size(); ++i) {
   1669             onTimeUpdate(i, 0, 0ll);
   1670         }
   1671     }
   1672 
   1673     bool dataReceivedOnAllChannels() {
   1674         TrackInfo *track;
   1675         for (size_t i = 0; i < mTracks.size(); ++i) {
   1676             track = &mTracks.editItemAt(i);
   1677             if (track->mPackets.empty()) {
   1678                 return false;
   1679             }
   1680         }
   1681         return true;
   1682     }
   1683 
   1684     void handleFirstAccessUnit() {
   1685         if (mFirstAccessUnit) {
   1686             sp<AMessage> msg = mNotify->dup();
   1687             msg->setInt32("what", kWhatConnected);
   1688             msg->post();
   1689 
   1690             if (mSeekable) {
   1691                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1692                     TrackInfo *info = &mTracks.editItemAt(i);
   1693 
   1694                     postNormalPlayTimeMapping(
   1695                             i,
   1696                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1697                 }
   1698             }
   1699 
   1700             mFirstAccessUnit = false;
   1701         }
   1702     }
   1703 
   1704     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
   1705         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
   1706              trackIndex, rtpTime, ntpTime);
   1707 
   1708         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
   1709 
   1710         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1711 
   1712         track->mRTPAnchor = rtpTime;
   1713         track->mNTPAnchorUs = ntpTimeUs;
   1714 
   1715         if (mNTPAnchorUs < 0) {
   1716             mNTPAnchorUs = ntpTimeUs;
   1717             mMediaAnchorUs = mLastMediaTimeUs;
   1718         }
   1719 
   1720         if (!mAllTracksHaveTime) {
   1721             bool allTracksHaveTime = true;
   1722             for (size_t i = 0; i < mTracks.size(); ++i) {
   1723                 TrackInfo *track = &mTracks.editItemAt(i);
   1724                 if (track->mNTPAnchorUs < 0) {
   1725                     allTracksHaveTime = false;
   1726                     break;
   1727                 }
   1728             }
   1729             if (allTracksHaveTime) {
   1730                 mAllTracksHaveTime = true;
   1731                 ALOGI("Time now established for all tracks.");
   1732             }
   1733         }
   1734         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
   1735             handleFirstAccessUnit();
   1736 
   1737             // Time is now established, lets start timestamping immediately
   1738             for (size_t i = 0; i < mTracks.size(); ++i) {
   1739                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
   1740                 while (!trackInfo->mPackets.empty()) {
   1741                     sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
   1742                     trackInfo->mPackets.erase(trackInfo->mPackets.begin());
   1743 
   1744                     if (addMediaTimestamp(i, trackInfo, accessUnit)) {
   1745                         postQueueAccessUnit(i, accessUnit);
   1746                     }
   1747                 }
   1748             }
   1749             for (size_t i = 0; i < mTracks.size(); ++i) {
   1750                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
   1751                 if (trackInfo->mEOSReceived) {
   1752                     postQueueEOS(i, ERROR_END_OF_STREAM);
   1753                     trackInfo->mEOSReceived = false;
   1754                 }
   1755             }
   1756         }
   1757     }
   1758 
   1759     void onAccessUnitComplete(
   1760             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
   1761         ALOGV("onAccessUnitComplete track %d", trackIndex);
   1762 
   1763         if(!mPlayResponseParsed){
   1764             ALOGI("play response is not parsed, storing accessunit");
   1765             TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1766             track->mPackets.push_back(accessUnit);
   1767             return;
   1768         }
   1769 
   1770         handleFirstAccessUnit();
   1771 
   1772         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1773 
   1774         if (!mAllTracksHaveTime) {
   1775             ALOGV("storing accessUnit, no time established yet");
   1776             track->mPackets.push_back(accessUnit);
   1777             return;
   1778         }
   1779 
   1780         while (!track->mPackets.empty()) {
   1781             sp<ABuffer> accessUnit = *track->mPackets.begin();
   1782             track->mPackets.erase(track->mPackets.begin());
   1783 
   1784             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1785                 postQueueAccessUnit(trackIndex, accessUnit);
   1786             }
   1787         }
   1788 
   1789         if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1790             postQueueAccessUnit(trackIndex, accessUnit);
   1791         }
   1792 
   1793         if (track->mEOSReceived) {
   1794             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
   1795             track->mEOSReceived = false;
   1796         }
   1797     }
   1798 
   1799     bool addMediaTimestamp(
   1800             int32_t trackIndex, const TrackInfo *track,
   1801             const sp<ABuffer> &accessUnit) {
   1802         uint32_t rtpTime;
   1803         CHECK(accessUnit->meta()->findInt32(
   1804                     "rtp-time", (int32_t *)&rtpTime));
   1805 
   1806         int64_t relRtpTimeUs =
   1807             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
   1808                 / track->mTimeScale;
   1809 
   1810         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
   1811 
   1812         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
   1813 
   1814         if (mediaTimeUs > mLastMediaTimeUs) {
   1815             mLastMediaTimeUs = mediaTimeUs;
   1816         }
   1817 
   1818         if (mediaTimeUs < 0) {
   1819             ALOGV("dropping early accessUnit.");
   1820             return false;
   1821         }
   1822 
   1823         ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
   1824              trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
   1825 
   1826         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
   1827 
   1828         return true;
   1829     }
   1830 
   1831     void postQueueAccessUnit(
   1832             size_t trackIndex, const sp<ABuffer> &accessUnit) {
   1833         sp<AMessage> msg = mNotify->dup();
   1834         msg->setInt32("what", kWhatAccessUnit);
   1835         msg->setSize("trackIndex", trackIndex);
   1836         msg->setBuffer("accessUnit", accessUnit);
   1837         msg->post();
   1838     }
   1839 
   1840     void postQueueEOS(size_t trackIndex, status_t finalResult) {
   1841         sp<AMessage> msg = mNotify->dup();
   1842         msg->setInt32("what", kWhatEOS);
   1843         msg->setSize("trackIndex", trackIndex);
   1844         msg->setInt32("finalResult", finalResult);
   1845         msg->post();
   1846     }
   1847 
   1848     void postQueueSeekDiscontinuity(size_t trackIndex) {
   1849         sp<AMessage> msg = mNotify->dup();
   1850         msg->setInt32("what", kWhatSeekDiscontinuity);
   1851         msg->setSize("trackIndex", trackIndex);
   1852         msg->post();
   1853     }
   1854 
   1855     void postNormalPlayTimeMapping(
   1856             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
   1857         sp<AMessage> msg = mNotify->dup();
   1858         msg->setInt32("what", kWhatNormalPlayTimeMapping);
   1859         msg->setSize("trackIndex", trackIndex);
   1860         msg->setInt32("rtpTime", rtpTime);
   1861         msg->setInt64("nptUs", nptUs);
   1862         msg->post();
   1863     }
   1864 
   1865     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
   1866 };
   1867 
   1868 }  // namespace android
   1869 
   1870 #endif  // MY_HANDLER_H_
   1871