Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #ifndef MY_HANDLER_H_
     18 
     19 #define MY_HANDLER_H_
     20 
     21 //#define LOG_NDEBUG 0
     22 
     23 #ifndef LOG_TAG
     24 #define LOG_TAG "MyHandler"
     25 #endif
     26 
     27 #include <utils/Log.h>
     28 #include <cutils/properties.h> // for property_get
     29 
     30 #include "APacketSource.h"
     31 #include "ARTPConnection.h"
     32 #include "ARTSPConnection.h"
     33 #include "ASessionDescription.h"
     34 
     35 #include <ctype.h>
     36 #include <cutils/properties.h>
     37 
     38 #include <media/stagefright/foundation/ABuffer.h>
     39 #include <media/stagefright/foundation/ADebug.h>
     40 #include <media/stagefright/foundation/ALooper.h>
     41 #include <media/stagefright/foundation/AMessage.h>
     42 #include <media/stagefright/MediaErrors.h>
     43 #include <media/stagefright/Utils.h>
     44 
     45 #include <arpa/inet.h>
     46 #include <sys/socket.h>
     47 #include <netdb.h>
     48 
     49 #include "HTTPBase.h"
     50 
     51 #if LOG_NDEBUG
     52 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
     53 #else
     54 #define UNUSED_UNLESS_VERBOSE(x)
     55 #endif
     56 
     57 // If no access units are received within 5 secs, assume that the rtp
     58 // stream has ended and signal end of stream.
     59 static int64_t kAccessUnitTimeoutUs = 10000000ll;
     60 
     61 // If no access units arrive for the first 10 secs after starting the
     62 // stream, assume none ever will and signal EOS or switch transports.
     63 static int64_t kStartupTimeoutUs = 10000000ll;
     64 
     65 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
     66 
     67 static int64_t kPauseDelayUs = 3000000ll;
     68 
     69 // The allowed maximum number of stale access units at the beginning of
     70 // a new sequence.
     71 static int32_t kMaxAllowedStaleAccessUnits = 20;
     72 
     73 namespace android {
     74 
     75 static bool GetAttribute(const char *s, const char *key, AString *value) {
     76     value->clear();
     77 
     78     size_t keyLen = strlen(key);
     79 
     80     for (;;) {
     81         while (isspace(*s)) {
     82             ++s;
     83         }
     84 
     85         const char *colonPos = strchr(s, ';');
     86 
     87         size_t len =
     88             (colonPos == NULL) ? strlen(s) : colonPos - s;
     89 
     90         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
     91             value->setTo(&s[keyLen + 1], len - keyLen - 1);
     92             return true;
     93         }
     94 
     95         if (colonPos == NULL) {
     96             return false;
     97         }
     98 
     99         s = colonPos + 1;
    100     }
    101 }
    102 
    103 struct MyHandler : public AHandler {
    104     enum {
    105         kWhatConnected                  = 'conn',
    106         kWhatDisconnected               = 'disc',
    107         kWhatSeekPaused                 = 'spau',
    108         kWhatSeekDone                   = 'sdon',
    109 
    110         kWhatAccessUnit                 = 'accU',
    111         kWhatEOS                        = 'eos!',
    112         kWhatSeekDiscontinuity          = 'seeD',
    113         kWhatNormalPlayTimeMapping      = 'nptM',
    114     };
    115 
    116     MyHandler(
    117             const char *url,
    118             const sp<AMessage> &notify,
    119             bool uidValid = false, uid_t uid = 0)
    120         : mNotify(notify),
    121           mUIDValid(uidValid),
    122           mUID(uid),
    123           mNetLooper(new ALooper),
    124           mConn(new ARTSPConnection(mUIDValid, mUID)),
    125           mRTPConn(new ARTPConnection),
    126           mOriginalSessionURL(url),
    127           mSessionURL(url),
    128           mSetupTracksSuccessful(false),
    129           mSeekPending(false),
    130           mFirstAccessUnit(true),
    131           mAllTracksHaveTime(false),
    132           mNTPAnchorUs(-1),
    133           mMediaAnchorUs(-1),
    134           mLastMediaTimeUs(0),
    135           mNumAccessUnitsReceived(0),
    136           mCheckPending(false),
    137           mCheckGeneration(0),
    138           mCheckTimeoutGeneration(0),
    139           mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
    140           mTryFakeRTCP(false),
    141           mReceivedFirstRTCPPacket(false),
    142           mReceivedFirstRTPPacket(false),
    143           mSeekable(true),
    144           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
    145           mKeepAliveGeneration(0),
    146           mPausing(false),
    147           mPauseGeneration(0),
    148           mPlayResponseParsed(false) {
    149         mNetLooper->setName("rtsp net");
    150         mNetLooper->start(false /* runOnCallingThread */,
    151                           false /* canCallJava */,
    152                           PRIORITY_HIGHEST);
    153 
    154         // Strip any authentication info from the session url, we don't
    155         // want to transmit user/pass in cleartext.
    156         AString host, path, user, pass;
    157         unsigned port;
    158         CHECK(ARTSPConnection::ParseURL(
    159                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
    160 
    161         if (user.size() > 0) {
    162             mSessionURL.clear();
    163             mSessionURL.append("rtsp://");
    164             mSessionURL.append(host);
    165             mSessionURL.append(":");
    166             mSessionURL.append(AStringPrintf("%u", port));
    167             mSessionURL.append(path);
    168 
    169             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
    170         }
    171 
    172         mSessionHost = host;
    173     }
    174 
    175     void connect() {
    176         looper()->registerHandler(mConn);
    177         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    178 
    179         sp<AMessage> notify = new AMessage('biny', this);
    180         mConn->observeBinaryData(notify);
    181 
    182         sp<AMessage> reply = new AMessage('conn', this);
    183         mConn->connect(mOriginalSessionURL.c_str(), reply);
    184     }
    185 
    186     void loadSDP(const sp<ASessionDescription>& desc) {
    187         looper()->registerHandler(mConn);
    188         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    189 
    190         sp<AMessage> notify = new AMessage('biny', this);
    191         mConn->observeBinaryData(notify);
    192 
    193         sp<AMessage> reply = new AMessage('sdpl', this);
    194         reply->setObject("description", desc);
    195         mConn->connect(mOriginalSessionURL.c_str(), reply);
    196     }
    197 
    198     AString getControlURL() {
    199         AString sessionLevelControlURL;
    200         if (mSessionDesc->findAttribute(
    201                 0,
    202                 "a=control",
    203                 &sessionLevelControlURL)) {
    204             if (sessionLevelControlURL.compare("*") == 0) {
    205                 return mBaseURL;
    206             } else {
    207                 AString controlURL;
    208                 CHECK(MakeURL(
    209                         mBaseURL.c_str(),
    210                         sessionLevelControlURL.c_str(),
    211                         &controlURL));
    212                 return controlURL;
    213             }
    214         } else {
    215             return mSessionURL;
    216         }
    217     }
    218 
    219     void disconnect() {
    220         (new AMessage('abor', this))->post();
    221     }
    222 
    223     void seek(int64_t timeUs) {
    224         sp<AMessage> msg = new AMessage('seek', this);
    225         msg->setInt64("time", timeUs);
    226         mPauseGeneration++;
    227         msg->post();
    228     }
    229 
    230     void continueSeekAfterPause(int64_t timeUs) {
    231         sp<AMessage> msg = new AMessage('see1', this);
    232         msg->setInt64("time", timeUs);
    233         msg->post();
    234     }
    235 
    236     bool isSeekable() const {
    237         return mSeekable;
    238     }
    239 
    240     void pause() {
    241         sp<AMessage> msg = new AMessage('paus', this);
    242         mPauseGeneration++;
    243         msg->setInt32("pausecheck", mPauseGeneration);
    244         msg->post();
    245     }
    246 
    247     void resume() {
    248         sp<AMessage> msg = new AMessage('resu', this);
    249         mPauseGeneration++;
    250         msg->post();
    251     }
    252 
    253     static void addRR(const sp<ABuffer> &buf) {
    254         uint8_t *ptr = buf->data() + buf->size();
    255         ptr[0] = 0x80 | 0;
    256         ptr[1] = 201;  // RR
    257         ptr[2] = 0;
    258         ptr[3] = 1;
    259         ptr[4] = 0xde;  // SSRC
    260         ptr[5] = 0xad;
    261         ptr[6] = 0xbe;
    262         ptr[7] = 0xef;
    263 
    264         buf->setRange(0, buf->size() + 8);
    265     }
    266 
    267     static void addSDES(int s, const sp<ABuffer> &buffer) {
    268         struct sockaddr_in addr;
    269         socklen_t addrSize = sizeof(addr);
    270         if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
    271             inet_aton("0.0.0.0", &(addr.sin_addr));
    272         }
    273 
    274         uint8_t *data = buffer->data() + buffer->size();
    275         data[0] = 0x80 | 1;
    276         data[1] = 202;  // SDES
    277         data[4] = 0xde;  // SSRC
    278         data[5] = 0xad;
    279         data[6] = 0xbe;
    280         data[7] = 0xef;
    281 
    282         size_t offset = 8;
    283 
    284         data[offset++] = 1;  // CNAME
    285 
    286         AString cname = "stagefright@";
    287         cname.append(inet_ntoa(addr.sin_addr));
    288         data[offset++] = cname.size();
    289 
    290         memcpy(&data[offset], cname.c_str(), cname.size());
    291         offset += cname.size();
    292 
    293         data[offset++] = 6;  // TOOL
    294 
    295         AString tool = MakeUserAgent();
    296 
    297         data[offset++] = tool.size();
    298 
    299         memcpy(&data[offset], tool.c_str(), tool.size());
    300         offset += tool.size();
    301 
    302         data[offset++] = 0;
    303 
    304         if ((offset % 4) > 0) {
    305             size_t count = 4 - (offset % 4);
    306             switch (count) {
    307                 case 3:
    308                     data[offset++] = 0;
    309                 case 2:
    310                     data[offset++] = 0;
    311                 case 1:
    312                     data[offset++] = 0;
    313             }
    314         }
    315 
    316         size_t numWords = (offset / 4) - 1;
    317         data[2] = numWords >> 8;
    318         data[3] = numWords & 0xff;
    319 
    320         buffer->setRange(buffer->offset(), buffer->size() + offset);
    321     }
    322 
    323     // In case we're behind NAT, fire off two UDP packets to the remote
    324     // rtp/rtcp ports to poke a hole into the firewall for future incoming
    325     // packets. We're going to send an RR/SDES RTCP packet to both of them.
    326     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
    327         struct sockaddr_in addr;
    328         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    329         addr.sin_family = AF_INET;
    330 
    331         AString source;
    332         AString server_port;
    333         if (!GetAttribute(transport.c_str(),
    334                           "source",
    335                           &source)) {
    336             ALOGW("Missing 'source' field in Transport response. Using "
    337                  "RTSP endpoint address.");
    338 
    339             struct hostent *ent = gethostbyname(mSessionHost.c_str());
    340             if (ent == NULL) {
    341                 ALOGE("Failed to look up address of session host '%s'",
    342                      mSessionHost.c_str());
    343 
    344                 return false;
    345             }
    346 
    347             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    348         } else {
    349             addr.sin_addr.s_addr = inet_addr(source.c_str());
    350         }
    351 
    352         if (!GetAttribute(transport.c_str(),
    353                                  "server_port",
    354                                  &server_port)) {
    355             ALOGI("Missing 'server_port' field in Transport response.");
    356             return false;
    357         }
    358 
    359         int rtpPort, rtcpPort;
    360         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
    361                 || rtpPort <= 0 || rtpPort > 65535
    362                 || rtcpPort <=0 || rtcpPort > 65535
    363                 || rtcpPort != rtpPort + 1) {
    364             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
    365                  " RTP port must be even, RTCP port must be one higher.",
    366                  server_port.c_str());
    367 
    368             return false;
    369         }
    370 
    371         if (rtpPort & 1) {
    372             ALOGW("Server picked an odd RTP port, it should've picked an "
    373                  "even one, we'll let it pass for now, but this may break "
    374                  "in the future.");
    375         }
    376 
    377         if (addr.sin_addr.s_addr == INADDR_NONE) {
    378             return true;
    379         }
    380 
    381         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
    382             // No firewalls to traverse on the loopback interface.
    383             return true;
    384         }
    385 
    386         // Make up an RR/SDES RTCP packet.
    387         sp<ABuffer> buf = new ABuffer(65536);
    388         buf->setRange(0, 0);
    389         addRR(buf);
    390         addSDES(rtpSocket, buf);
    391 
    392         addr.sin_port = htons(rtpPort);
    393 
    394         ssize_t n = sendto(
    395                 rtpSocket, buf->data(), buf->size(), 0,
    396                 (const sockaddr *)&addr, sizeof(addr));
    397 
    398         if (n < (ssize_t)buf->size()) {
    399             ALOGE("failed to poke a hole for RTP packets");
    400             return false;
    401         }
    402 
    403         addr.sin_port = htons(rtcpPort);
    404 
    405         n = sendto(
    406                 rtcpSocket, buf->data(), buf->size(), 0,
    407                 (const sockaddr *)&addr, sizeof(addr));
    408 
    409         if (n < (ssize_t)buf->size()) {
    410             ALOGE("failed to poke a hole for RTCP packets");
    411             return false;
    412         }
    413 
    414         ALOGV("successfully poked holes.");
    415 
    416         return true;
    417     }
    418 
    419     static bool isLiveStream(const sp<ASessionDescription> &desc) {
    420         AString attrLiveStream;
    421         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
    422             ssize_t semicolonPos = attrLiveStream.find(";", 2);
    423 
    424             const char* liveStreamValue;
    425             if (semicolonPos < 0) {
    426                 liveStreamValue = attrLiveStream.c_str();
    427             } else {
    428                 AString valString;
    429                 valString.setTo(attrLiveStream,
    430                         semicolonPos + 1,
    431                         attrLiveStream.size() - semicolonPos - 1);
    432                 liveStreamValue = valString.c_str();
    433             }
    434 
    435             uint32_t value = strtoul(liveStreamValue, NULL, 10);
    436             if (value == 1) {
    437                 ALOGV("found live stream");
    438                 return true;
    439             }
    440         } else {
    441             // It is a live stream if no duration is returned
    442             int64_t durationUs;
    443             if (!desc->getDurationUs(&durationUs)) {
    444                 ALOGV("No duration found, assume live stream");
    445                 return true;
    446             }
    447         }
    448 
    449         return false;
    450     }
    451 
    452     virtual void onMessageReceived(const sp<AMessage> &msg) {
    453         switch (msg->what()) {
    454             case 'conn':
    455             {
    456                 int32_t result;
    457                 CHECK(msg->findInt32("result", &result));
    458 
    459                 ALOGI("connection request completed with result %d (%s)",
    460                      result, strerror(-result));
    461 
    462                 if (result == OK) {
    463                     AString request;
    464                     request = "DESCRIBE ";
    465                     request.append(mSessionURL);
    466                     request.append(" RTSP/1.0\r\n");
    467                     request.append("Accept: application/sdp\r\n");
    468                     request.append("\r\n");
    469 
    470                     sp<AMessage> reply = new AMessage('desc', this);
    471                     mConn->sendRequest(request.c_str(), reply);
    472                 } else {
    473                     (new AMessage('disc', this))->post();
    474                 }
    475                 break;
    476             }
    477 
    478             case 'disc':
    479             {
    480                 ++mKeepAliveGeneration;
    481 
    482                 int32_t reconnect;
    483                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    484                     sp<AMessage> reply = new AMessage('conn', this);
    485                     mConn->connect(mOriginalSessionURL.c_str(), reply);
    486                 } else {
    487                     (new AMessage('quit', this))->post();
    488                 }
    489                 break;
    490             }
    491 
    492             case 'desc':
    493             {
    494                 int32_t result;
    495                 CHECK(msg->findInt32("result", &result));
    496 
    497                 ALOGI("DESCRIBE completed with result %d (%s)",
    498                      result, strerror(-result));
    499 
    500                 if (result == OK) {
    501                     sp<RefBase> obj;
    502                     CHECK(msg->findObject("response", &obj));
    503                     sp<ARTSPResponse> response =
    504                         static_cast<ARTSPResponse *>(obj.get());
    505 
    506                     if (response->mStatusCode == 301 || response->mStatusCode == 302) {
    507                         ssize_t i = response->mHeaders.indexOfKey("location");
    508                         CHECK_GE(i, 0);
    509 
    510                         mOriginalSessionURL = response->mHeaders.valueAt(i);
    511                         mSessionURL = mOriginalSessionURL;
    512 
    513                         // Strip any authentication info from the session url, we don't
    514                         // want to transmit user/pass in cleartext.
    515                         AString host, path, user, pass;
    516                         unsigned port;
    517                         if (ARTSPConnection::ParseURL(
    518                                     mSessionURL.c_str(), &host, &port, &path, &user, &pass)
    519                                 && user.size() > 0) {
    520                             mSessionURL.clear();
    521                             mSessionURL.append("rtsp://");
    522                             mSessionURL.append(host);
    523                             mSessionURL.append(":");
    524                             mSessionURL.append(AStringPrintf("%u", port));
    525                             mSessionURL.append(path);
    526 
    527                             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
    528                         }
    529 
    530                         sp<AMessage> reply = new AMessage('conn', this);
    531                         mConn->connect(mOriginalSessionURL.c_str(), reply);
    532                         break;
    533                     }
    534 
    535                     if (response->mStatusCode != 200) {
    536                         result = UNKNOWN_ERROR;
    537                     } else if (response->mContent == NULL) {
    538                         result = ERROR_MALFORMED;
    539                         ALOGE("The response has no content.");
    540                     } else {
    541                         mSessionDesc = new ASessionDescription;
    542 
    543                         mSessionDesc->setTo(
    544                                 response->mContent->data(),
    545                                 response->mContent->size());
    546 
    547                         if (!mSessionDesc->isValid()) {
    548                             ALOGE("Failed to parse session description.");
    549                             result = ERROR_MALFORMED;
    550                         } else {
    551                             ssize_t i = response->mHeaders.indexOfKey("content-base");
    552                             if (i >= 0) {
    553                                 mBaseURL = response->mHeaders.valueAt(i);
    554                             } else {
    555                                 i = response->mHeaders.indexOfKey("content-location");
    556                                 if (i >= 0) {
    557                                     mBaseURL = response->mHeaders.valueAt(i);
    558                                 } else {
    559                                     mBaseURL = mSessionURL;
    560                                 }
    561                             }
    562 
    563                             mSeekable = !isLiveStream(mSessionDesc);
    564 
    565                             if (!mBaseURL.startsWith("rtsp://")) {
    566                                 // Some misbehaving servers specify a relative
    567                                 // URL in one of the locations above, combine
    568                                 // it with the absolute session URL to get
    569                                 // something usable...
    570 
    571                                 ALOGW("Server specified a non-absolute base URL"
    572                                      ", combining it with the session URL to "
    573                                      "get something usable...");
    574 
    575                                 AString tmp;
    576                                 CHECK(MakeURL(
    577                                             mSessionURL.c_str(),
    578                                             mBaseURL.c_str(),
    579                                             &tmp));
    580 
    581                                 mBaseURL = tmp;
    582                             }
    583 
    584                             mControlURL = getControlURL();
    585 
    586                             if (mSessionDesc->countTracks() < 2) {
    587                                 // There's no actual tracks in this session.
    588                                 // The first "track" is merely session meta
    589                                 // data.
    590 
    591                                 ALOGW("Session doesn't contain any playable "
    592                                      "tracks. Aborting.");
    593                                 result = ERROR_UNSUPPORTED;
    594                             } else {
    595                                 setupTrack(1);
    596                             }
    597                         }
    598                     }
    599                 }
    600 
    601                 if (result != OK) {
    602                     sp<AMessage> reply = new AMessage('disc', this);
    603                     mConn->disconnect(reply);
    604                 }
    605                 break;
    606             }
    607 
    608             case 'sdpl':
    609             {
    610                 int32_t result;
    611                 CHECK(msg->findInt32("result", &result));
    612 
    613                 ALOGI("SDP connection request completed with result %d (%s)",
    614                      result, strerror(-result));
    615 
    616                 if (result == OK) {
    617                     sp<RefBase> obj;
    618                     CHECK(msg->findObject("description", &obj));
    619                     mSessionDesc =
    620                         static_cast<ASessionDescription *>(obj.get());
    621 
    622                     if (!mSessionDesc->isValid()) {
    623                         ALOGE("Failed to parse session description.");
    624                         result = ERROR_MALFORMED;
    625                     } else {
    626                         mBaseURL = mSessionURL;
    627 
    628                         mSeekable = !isLiveStream(mSessionDesc);
    629 
    630                         mControlURL = getControlURL();
    631 
    632                         if (mSessionDesc->countTracks() < 2) {
    633                             // There's no actual tracks in this session.
    634                             // The first "track" is merely session meta
    635                             // data.
    636 
    637                             ALOGW("Session doesn't contain any playable "
    638                                  "tracks. Aborting.");
    639                             result = ERROR_UNSUPPORTED;
    640                         } else {
    641                             setupTrack(1);
    642                         }
    643                     }
    644                 }
    645 
    646                 if (result != OK) {
    647                     sp<AMessage> reply = new AMessage('disc', this);
    648                     mConn->disconnect(reply);
    649                 }
    650                 break;
    651             }
    652 
    653             case 'setu':
    654             {
    655                 size_t index;
    656                 CHECK(msg->findSize("index", &index));
    657 
    658                 TrackInfo *track = NULL;
    659                 size_t trackIndex;
    660                 if (msg->findSize("track-index", &trackIndex)) {
    661                     track = &mTracks.editItemAt(trackIndex);
    662                 }
    663 
    664                 int32_t result;
    665                 CHECK(msg->findInt32("result", &result));
    666 
    667                 ALOGI("SETUP(%zu) completed with result %d (%s)",
    668                      index, result, strerror(-result));
    669 
    670                 if (result == OK) {
    671                     CHECK(track != NULL);
    672 
    673                     sp<RefBase> obj;
    674                     CHECK(msg->findObject("response", &obj));
    675                     sp<ARTSPResponse> response =
    676                         static_cast<ARTSPResponse *>(obj.get());
    677 
    678                     if (response->mStatusCode != 200) {
    679                         result = UNKNOWN_ERROR;
    680                     } else {
    681                         ssize_t i = response->mHeaders.indexOfKey("session");
    682                         CHECK_GE(i, 0);
    683 
    684                         mSessionID = response->mHeaders.valueAt(i);
    685 
    686                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    687                         AString timeoutStr;
    688                         if (GetAttribute(
    689                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
    690                             char *end;
    691                             unsigned long timeoutSecs =
    692                                 strtoul(timeoutStr.c_str(), &end, 10);
    693 
    694                             if (end == timeoutStr.c_str() || *end != '\0') {
    695                                 ALOGW("server specified malformed timeout '%s'",
    696                                      timeoutStr.c_str());
    697 
    698                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    699                             } else if (timeoutSecs < 15) {
    700                                 ALOGW("server specified too short a timeout "
    701                                      "(%lu secs), using default.",
    702                                      timeoutSecs);
    703 
    704                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    705                             } else {
    706                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
    707 
    708                                 ALOGI("server specified timeout of %lu secs.",
    709                                      timeoutSecs);
    710                             }
    711                         }
    712 
    713                         i = mSessionID.find(";");
    714                         if (i >= 0) {
    715                             // Remove options, i.e. ";timeout=90"
    716                             mSessionID.erase(i, mSessionID.size() - i);
    717                         }
    718 
    719                         sp<AMessage> notify = new AMessage('accu', this);
    720                         notify->setSize("track-index", trackIndex);
    721 
    722                         i = response->mHeaders.indexOfKey("transport");
    723                         CHECK_GE(i, 0);
    724 
    725                         if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
    726                             if (!track->mUsingInterleavedTCP) {
    727                                 AString transport = response->mHeaders.valueAt(i);
    728 
    729                                 // We are going to continue even if we were
    730                                 // unable to poke a hole into the firewall...
    731                                 pokeAHole(
    732                                         track->mRTPSocket,
    733                                         track->mRTCPSocket,
    734                                         transport);
    735                             }
    736 
    737                             mRTPConn->addStream(
    738                                     track->mRTPSocket, track->mRTCPSocket,
    739                                     mSessionDesc, index,
    740                                     notify, track->mUsingInterleavedTCP);
    741 
    742                             mSetupTracksSuccessful = true;
    743                         } else {
    744                             result = BAD_VALUE;
    745                         }
    746                     }
    747                 }
    748 
    749                 if (result != OK) {
    750                     if (track) {
    751                         if (!track->mUsingInterleavedTCP) {
    752                             // Clear the tag
    753                             if (mUIDValid) {
    754                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
    755                                 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
    756                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
    757                                 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
    758                             }
    759 
    760                             close(track->mRTPSocket);
    761                             close(track->mRTCPSocket);
    762                         }
    763 
    764                         mTracks.removeItemsAt(trackIndex);
    765                     }
    766                 }
    767 
    768                 ++index;
    769                 if (result == OK && index < mSessionDesc->countTracks()) {
    770                     setupTrack(index);
    771                 } else if (mSetupTracksSuccessful) {
    772                     ++mKeepAliveGeneration;
    773                     postKeepAlive();
    774 
    775                     AString request = "PLAY ";
    776                     request.append(mControlURL);
    777                     request.append(" RTSP/1.0\r\n");
    778 
    779                     request.append("Session: ");
    780                     request.append(mSessionID);
    781                     request.append("\r\n");
    782 
    783                     request.append("\r\n");
    784 
    785                     sp<AMessage> reply = new AMessage('play', this);
    786                     mConn->sendRequest(request.c_str(), reply);
    787                 } else {
    788                     sp<AMessage> reply = new AMessage('disc', this);
    789                     mConn->disconnect(reply);
    790                 }
    791                 break;
    792             }
    793 
    794             case 'play':
    795             {
    796                 int32_t result;
    797                 CHECK(msg->findInt32("result", &result));
    798 
    799                 ALOGI("PLAY completed with result %d (%s)",
    800                      result, strerror(-result));
    801 
    802                 if (result == OK) {
    803                     sp<RefBase> obj;
    804                     CHECK(msg->findObject("response", &obj));
    805                     sp<ARTSPResponse> response =
    806                         static_cast<ARTSPResponse *>(obj.get());
    807 
    808                     if (response->mStatusCode != 200) {
    809                         result = UNKNOWN_ERROR;
    810                     } else {
    811                         parsePlayResponse(response);
    812                         postTimeout();
    813                     }
    814                 }
    815 
    816                 if (result != OK) {
    817                     sp<AMessage> reply = new AMessage('disc', this);
    818                     mConn->disconnect(reply);
    819                 }
    820 
    821                 break;
    822             }
    823 
    824             case 'aliv':
    825             {
    826                 int32_t generation;
    827                 CHECK(msg->findInt32("generation", &generation));
    828 
    829                 if (generation != mKeepAliveGeneration) {
    830                     // obsolete event.
    831                     break;
    832                 }
    833 
    834                 AString request;
    835                 request.append("OPTIONS ");
    836                 request.append(mSessionURL);
    837                 request.append(" RTSP/1.0\r\n");
    838                 request.append("Session: ");
    839                 request.append(mSessionID);
    840                 request.append("\r\n");
    841                 request.append("\r\n");
    842 
    843                 sp<AMessage> reply = new AMessage('opts', this);
    844                 reply->setInt32("generation", mKeepAliveGeneration);
    845                 mConn->sendRequest(request.c_str(), reply);
    846                 break;
    847             }
    848 
    849             case 'opts':
    850             {
    851                 int32_t result;
    852                 CHECK(msg->findInt32("result", &result));
    853 
    854                 ALOGI("OPTIONS completed with result %d (%s)",
    855                      result, strerror(-result));
    856 
    857                 int32_t generation;
    858                 CHECK(msg->findInt32("generation", &generation));
    859 
    860                 if (generation != mKeepAliveGeneration) {
    861                     // obsolete event.
    862                     break;
    863                 }
    864 
    865                 postKeepAlive();
    866                 break;
    867             }
    868 
    869             case 'abor':
    870             {
    871                 for (size_t i = 0; i < mTracks.size(); ++i) {
    872                     TrackInfo *info = &mTracks.editItemAt(i);
    873 
    874                     if (!mFirstAccessUnit) {
    875                         postQueueEOS(i, ERROR_END_OF_STREAM);
    876                     }
    877 
    878                     if (!info->mUsingInterleavedTCP) {
    879                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
    880 
    881                         // Clear the tag
    882                         if (mUIDValid) {
    883                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
    884                             HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
    885                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
    886                             HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
    887                         }
    888 
    889                         close(info->mRTPSocket);
    890                         close(info->mRTCPSocket);
    891                     }
    892                 }
    893                 mTracks.clear();
    894                 mSetupTracksSuccessful = false;
    895                 mSeekPending = false;
    896                 mFirstAccessUnit = true;
    897                 mAllTracksHaveTime = false;
    898                 mNTPAnchorUs = -1;
    899                 mMediaAnchorUs = -1;
    900                 mNumAccessUnitsReceived = 0;
    901                 mReceivedFirstRTCPPacket = false;
    902                 mReceivedFirstRTPPacket = false;
    903                 mPausing = false;
    904                 mSeekable = true;
    905 
    906                 sp<AMessage> reply = new AMessage('tear', this);
    907 
    908                 int32_t reconnect;
    909                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    910                     reply->setInt32("reconnect", true);
    911                 }
    912 
    913                 AString request;
    914                 request = "TEARDOWN ";
    915 
    916                 // XXX should use aggregate url from SDP here...
    917                 request.append(mSessionURL);
    918                 request.append(" RTSP/1.0\r\n");
    919 
    920                 request.append("Session: ");
    921                 request.append(mSessionID);
    922                 request.append("\r\n");
    923 
    924                 request.append("\r\n");
    925 
    926                 mConn->sendRequest(request.c_str(), reply);
    927                 break;
    928             }
    929 
    930             case 'tear':
    931             {
    932                 int32_t result;
    933                 CHECK(msg->findInt32("result", &result));
    934 
    935                 ALOGI("TEARDOWN completed with result %d (%s)",
    936                      result, strerror(-result));
    937 
    938                 sp<AMessage> reply = new AMessage('disc', this);
    939 
    940                 int32_t reconnect;
    941                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    942                     reply->setInt32("reconnect", true);
    943                 }
    944 
    945                 mConn->disconnect(reply);
    946                 break;
    947             }
    948 
    949             case 'quit':
    950             {
    951                 sp<AMessage> msg = mNotify->dup();
    952                 msg->setInt32("what", kWhatDisconnected);
    953                 msg->setInt32("result", UNKNOWN_ERROR);
    954                 msg->post();
    955                 break;
    956             }
    957 
    958             case 'chek':
    959             {
    960                 int32_t generation;
    961                 CHECK(msg->findInt32("generation", &generation));
    962                 if (generation != mCheckGeneration) {
    963                     // This is an outdated message. Ignore.
    964                     break;
    965                 }
    966 
    967                 if (mNumAccessUnitsReceived == 0) {
    968 #if 1
    969                     ALOGI("stream ended? aborting.");
    970                     (new AMessage('abor', this))->post();
    971                     break;
    972 #else
    973                     ALOGI("haven't seen an AU in a looong time.");
    974 #endif
    975                 }
    976 
    977                 mNumAccessUnitsReceived = 0;
    978                 msg->post(kAccessUnitTimeoutUs);
    979                 break;
    980             }
    981 
    982             case 'accu':
    983             {
    984                 if (mSeekPending) {
    985                     ALOGV("Stale access unit.");
    986                     break;
    987                 }
    988 
    989                 int32_t timeUpdate;
    990                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
    991                     size_t trackIndex;
    992                     CHECK(msg->findSize("track-index", &trackIndex));
    993 
    994                     uint32_t rtpTime;
    995                     uint64_t ntpTime;
    996                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
    997                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
    998 
    999                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
   1000                     break;
   1001                 }
   1002 
   1003                 int32_t first;
   1004                 if (msg->findInt32("first-rtcp", &first)) {
   1005                     mReceivedFirstRTCPPacket = true;
   1006                     break;
   1007                 }
   1008 
   1009                 if (msg->findInt32("first-rtp", &first)) {
   1010                     mReceivedFirstRTPPacket = true;
   1011                     break;
   1012                 }
   1013 
   1014                 ++mNumAccessUnitsReceived;
   1015                 postAccessUnitTimeoutCheck();
   1016 
   1017                 size_t trackIndex;
   1018                 CHECK(msg->findSize("track-index", &trackIndex));
   1019 
   1020                 if (trackIndex >= mTracks.size()) {
   1021                     ALOGV("late packets ignored.");
   1022                     break;
   1023                 }
   1024 
   1025                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1026 
   1027                 int32_t eos;
   1028                 if (msg->findInt32("eos", &eos)) {
   1029                     ALOGI("received BYE on track index %zu", trackIndex);
   1030                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
   1031                         ALOGI("No time established => fake existing data");
   1032 
   1033                         track->mEOSReceived = true;
   1034                         mTryFakeRTCP = true;
   1035                         mReceivedFirstRTCPPacket = true;
   1036                         fakeTimestamps();
   1037                     } else {
   1038                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
   1039                     }
   1040                     return;
   1041                 }
   1042 
   1043                 if (mSeekPending) {
   1044                     ALOGV("we're seeking, dropping stale packet.");
   1045                     break;
   1046                 }
   1047 
   1048                 sp<ABuffer> accessUnit;
   1049                 CHECK(msg->findBuffer("access-unit", &accessUnit));
   1050                 onAccessUnitComplete(trackIndex, accessUnit);
   1051                 break;
   1052             }
   1053 
   1054             case 'paus':
   1055             {
   1056                 int32_t generation;
   1057                 CHECK(msg->findInt32("pausecheck", &generation));
   1058                 if (generation != mPauseGeneration) {
   1059                     ALOGV("Ignoring outdated pause message.");
   1060                     break;
   1061                 }
   1062 
   1063                 if (!mSeekable) {
   1064                     ALOGW("This is a live stream, ignoring pause request.");
   1065                     break;
   1066                 }
   1067 
   1068                 if (mPausing) {
   1069                     ALOGV("This stream is already paused.");
   1070                     break;
   1071                 }
   1072 
   1073                 mCheckPending = true;
   1074                 ++mCheckGeneration;
   1075                 mPausing = true;
   1076 
   1077                 AString request = "PAUSE ";
   1078                 request.append(mControlURL);
   1079                 request.append(" RTSP/1.0\r\n");
   1080 
   1081                 request.append("Session: ");
   1082                 request.append(mSessionID);
   1083                 request.append("\r\n");
   1084 
   1085                 request.append("\r\n");
   1086 
   1087                 sp<AMessage> reply = new AMessage('pau2', this);
   1088                 mConn->sendRequest(request.c_str(), reply);
   1089                 break;
   1090             }
   1091 
   1092             case 'pau2':
   1093             {
   1094                 int32_t result;
   1095                 CHECK(msg->findInt32("result", &result));
   1096                 mCheckTimeoutGeneration++;
   1097 
   1098                 ALOGI("PAUSE completed with result %d (%s)",
   1099                      result, strerror(-result));
   1100                 break;
   1101             }
   1102 
   1103             case 'resu':
   1104             {
   1105                 if (mPausing && mSeekPending) {
   1106                     // If seeking, Play will be sent from see1 instead
   1107                     break;
   1108                 }
   1109 
   1110                 if (!mPausing) {
   1111                     // Dont send PLAY if we have not paused
   1112                     break;
   1113                 }
   1114                 AString request = "PLAY ";
   1115                 request.append(mControlURL);
   1116                 request.append(" RTSP/1.0\r\n");
   1117 
   1118                 request.append("Session: ");
   1119                 request.append(mSessionID);
   1120                 request.append("\r\n");
   1121 
   1122                 request.append("\r\n");
   1123 
   1124                 sp<AMessage> reply = new AMessage('res2', this);
   1125                 mConn->sendRequest(request.c_str(), reply);
   1126                 break;
   1127             }
   1128 
   1129             case 'res2':
   1130             {
   1131                 int32_t result;
   1132                 CHECK(msg->findInt32("result", &result));
   1133 
   1134                 ALOGI("PLAY (for resume) completed with result %d (%s)",
   1135                      result, strerror(-result));
   1136 
   1137                 mCheckPending = false;
   1138                 ++mCheckGeneration;
   1139                 postAccessUnitTimeoutCheck();
   1140 
   1141                 if (result == OK) {
   1142                     sp<RefBase> obj;
   1143                     CHECK(msg->findObject("response", &obj));
   1144                     sp<ARTSPResponse> response =
   1145                         static_cast<ARTSPResponse *>(obj.get());
   1146 
   1147                     if (response->mStatusCode != 200) {
   1148                         result = UNKNOWN_ERROR;
   1149                     } else {
   1150                         parsePlayResponse(response);
   1151 
   1152                         // Post new timeout in order to make sure to use
   1153                         // fake timestamps if no new Sender Reports arrive
   1154                         postTimeout();
   1155                     }
   1156                 }
   1157 
   1158                 if (result != OK) {
   1159                     ALOGE("resume failed, aborting.");
   1160                     (new AMessage('abor', this))->post();
   1161                 }
   1162 
   1163                 mPausing = false;
   1164                 break;
   1165             }
   1166 
   1167             case 'seek':
   1168             {
   1169                 if (!mSeekable) {
   1170                     ALOGW("This is a live stream, ignoring seek request.");
   1171 
   1172                     sp<AMessage> msg = mNotify->dup();
   1173                     msg->setInt32("what", kWhatSeekDone);
   1174                     msg->post();
   1175                     break;
   1176                 }
   1177 
   1178                 int64_t timeUs;
   1179                 CHECK(msg->findInt64("time", &timeUs));
   1180 
   1181                 mSeekPending = true;
   1182 
   1183                 // Disable the access unit timeout until we resumed
   1184                 // playback again.
   1185                 mCheckPending = true;
   1186                 ++mCheckGeneration;
   1187 
   1188                 sp<AMessage> reply = new AMessage('see0', this);
   1189                 reply->setInt64("time", timeUs);
   1190 
   1191                 if (mPausing) {
   1192                     // PAUSE already sent
   1193                     ALOGI("Pause already sent");
   1194                     reply->post();
   1195                     break;
   1196                 }
   1197                 AString request = "PAUSE ";
   1198                 request.append(mControlURL);
   1199                 request.append(" RTSP/1.0\r\n");
   1200 
   1201                 request.append("Session: ");
   1202                 request.append(mSessionID);
   1203                 request.append("\r\n");
   1204 
   1205                 request.append("\r\n");
   1206 
   1207                 mConn->sendRequest(request.c_str(), reply);
   1208                 break;
   1209             }
   1210 
   1211             case 'see0':
   1212             {
   1213                 // Session is paused now.
   1214                 status_t err = OK;
   1215                 msg->findInt32("result", &err);
   1216 
   1217                 int64_t timeUs;
   1218                 CHECK(msg->findInt64("time", &timeUs));
   1219 
   1220                 sp<AMessage> notify = mNotify->dup();
   1221                 notify->setInt32("what", kWhatSeekPaused);
   1222                 notify->setInt32("err", err);
   1223                 notify->setInt64("time", timeUs);
   1224                 notify->post();
   1225                 break;
   1226 
   1227             }
   1228 
   1229             case 'see1':
   1230             {
   1231                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1232                     TrackInfo *info = &mTracks.editItemAt(i);
   1233 
   1234                     postQueueSeekDiscontinuity(i);
   1235                     info->mEOSReceived = false;
   1236 
   1237                     info->mRTPAnchor = 0;
   1238                     info->mNTPAnchorUs = -1;
   1239                 }
   1240 
   1241                 mAllTracksHaveTime = false;
   1242                 mNTPAnchorUs = -1;
   1243 
   1244                 // Start new timeoutgeneration to avoid getting timeout
   1245                 // before PLAY response arrive
   1246                 postTimeout();
   1247 
   1248                 int64_t timeUs;
   1249                 CHECK(msg->findInt64("time", &timeUs));
   1250 
   1251                 AString request = "PLAY ";
   1252                 request.append(mControlURL);
   1253                 request.append(" RTSP/1.0\r\n");
   1254 
   1255                 request.append("Session: ");
   1256                 request.append(mSessionID);
   1257                 request.append("\r\n");
   1258 
   1259                 request.append(
   1260                         AStringPrintf(
   1261                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
   1262 
   1263                 request.append("\r\n");
   1264 
   1265                 sp<AMessage> reply = new AMessage('see2', this);
   1266                 mConn->sendRequest(request.c_str(), reply);
   1267                 break;
   1268             }
   1269 
   1270             case 'see2':
   1271             {
   1272                 if (mTracks.size() == 0) {
   1273                     // We have already hit abor, break
   1274                     break;
   1275                 }
   1276 
   1277                 int32_t result;
   1278                 CHECK(msg->findInt32("result", &result));
   1279 
   1280                 ALOGI("PLAY (for seek) completed with result %d (%s)",
   1281                      result, strerror(-result));
   1282 
   1283                 mCheckPending = false;
   1284                 ++mCheckGeneration;
   1285                 postAccessUnitTimeoutCheck();
   1286 
   1287                 if (result == OK) {
   1288                     sp<RefBase> obj;
   1289                     CHECK(msg->findObject("response", &obj));
   1290                     sp<ARTSPResponse> response =
   1291                         static_cast<ARTSPResponse *>(obj.get());
   1292 
   1293                     if (response->mStatusCode != 200) {
   1294                         result = UNKNOWN_ERROR;
   1295                     } else {
   1296                         parsePlayResponse(response);
   1297 
   1298                         // Post new timeout in order to make sure to use
   1299                         // fake timestamps if no new Sender Reports arrive
   1300                         postTimeout();
   1301 
   1302                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
   1303                         CHECK_GE(i, 0);
   1304 
   1305                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
   1306 
   1307                         ALOGI("seek completed.");
   1308                     }
   1309                 }
   1310 
   1311                 if (result != OK) {
   1312                     ALOGE("seek failed, aborting.");
   1313                     (new AMessage('abor', this))->post();
   1314                 }
   1315 
   1316                 mPausing = false;
   1317                 mSeekPending = false;
   1318 
   1319                 // Discard all stale access units.
   1320                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1321                     TrackInfo *track = &mTracks.editItemAt(i);
   1322                     track->mPackets.clear();
   1323                 }
   1324 
   1325                 sp<AMessage> msg = mNotify->dup();
   1326                 msg->setInt32("what", kWhatSeekDone);
   1327                 msg->post();
   1328                 break;
   1329             }
   1330 
   1331             case 'biny':
   1332             {
   1333                 sp<ABuffer> buffer;
   1334                 CHECK(msg->findBuffer("buffer", &buffer));
   1335 
   1336                 int32_t index;
   1337                 CHECK(buffer->meta()->findInt32("index", &index));
   1338 
   1339                 mRTPConn->injectPacket(index, buffer);
   1340                 break;
   1341             }
   1342 
   1343             case 'tiou':
   1344             {
   1345                 int32_t timeoutGenerationCheck;
   1346                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
   1347                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
   1348                     // This is an outdated message. Ignore.
   1349                     // This typically happens if a lot of seeks are
   1350                     // performed, since new timeout messages now are
   1351                     // posted at seek as well.
   1352                     break;
   1353                 }
   1354                 if (!mReceivedFirstRTCPPacket) {
   1355                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
   1356                         ALOGW("We received RTP packets but no RTCP packets, "
   1357                              "using fake timestamps.");
   1358 
   1359                         mTryFakeRTCP = true;
   1360 
   1361                         mReceivedFirstRTCPPacket = true;
   1362 
   1363                         fakeTimestamps();
   1364                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
   1365                         ALOGW("Never received any data, switching transports.");
   1366 
   1367                         mTryTCPInterleaving = true;
   1368 
   1369                         sp<AMessage> msg = new AMessage('abor', this);
   1370                         msg->setInt32("reconnect", true);
   1371                         msg->post();
   1372                     } else {
   1373                         ALOGW("Never received any data, disconnecting.");
   1374                         (new AMessage('abor', this))->post();
   1375                     }
   1376                 } else {
   1377                     if (!mAllTracksHaveTime) {
   1378                         ALOGW("We received some RTCP packets, but time "
   1379                               "could not be established on all tracks, now "
   1380                               "using fake timestamps");
   1381 
   1382                         fakeTimestamps();
   1383                     }
   1384                 }
   1385                 break;
   1386             }
   1387 
   1388             default:
   1389                 TRESPASS();
   1390                 break;
   1391         }
   1392     }
   1393 
   1394     void postKeepAlive() {
   1395         sp<AMessage> msg = new AMessage('aliv', this);
   1396         msg->setInt32("generation", mKeepAliveGeneration);
   1397         msg->post((mKeepAliveTimeoutUs * 9) / 10);
   1398     }
   1399 
   1400     void cancelAccessUnitTimeoutCheck() {
   1401         ALOGV("cancelAccessUnitTimeoutCheck");
   1402         ++mCheckGeneration;
   1403     }
   1404 
   1405     void postAccessUnitTimeoutCheck() {
   1406         if (mCheckPending) {
   1407             return;
   1408         }
   1409 
   1410         mCheckPending = true;
   1411         sp<AMessage> check = new AMessage('chek', this);
   1412         check->setInt32("generation", mCheckGeneration);
   1413         check->post(kAccessUnitTimeoutUs);
   1414     }
   1415 
   1416     static void SplitString(
   1417             const AString &s, const char *separator, List<AString> *items) {
   1418         items->clear();
   1419         size_t start = 0;
   1420         while (start < s.size()) {
   1421             ssize_t offset = s.find(separator, start);
   1422 
   1423             if (offset < 0) {
   1424                 items->push_back(AString(s, start, s.size() - start));
   1425                 break;
   1426             }
   1427 
   1428             items->push_back(AString(s, start, offset - start));
   1429             start = offset + strlen(separator);
   1430         }
   1431     }
   1432 
   1433     void parsePlayResponse(const sp<ARTSPResponse> &response) {
   1434         mPlayResponseParsed = true;
   1435         if (mTracks.size() == 0) {
   1436             ALOGV("parsePlayResponse: late packets ignored.");
   1437             return;
   1438         }
   1439 
   1440         ssize_t i = response->mHeaders.indexOfKey("range");
   1441         if (i < 0) {
   1442             // Server doesn't even tell use what range it is going to
   1443             // play, therefore we won't support seeking.
   1444             return;
   1445         }
   1446 
   1447         AString range = response->mHeaders.valueAt(i);
   1448         ALOGV("Range: %s", range.c_str());
   1449 
   1450         AString val;
   1451         CHECK(GetAttribute(range.c_str(), "npt", &val));
   1452 
   1453         float npt1, npt2;
   1454         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
   1455             // This is a live stream and therefore not seekable.
   1456 
   1457             ALOGI("This is a live stream");
   1458             return;
   1459         }
   1460 
   1461         i = response->mHeaders.indexOfKey("rtp-info");
   1462         CHECK_GE(i, 0);
   1463 
   1464         AString rtpInfo = response->mHeaders.valueAt(i);
   1465         List<AString> streamInfos;
   1466         SplitString(rtpInfo, ",", &streamInfos);
   1467 
   1468         int n = 1;
   1469         for (List<AString>::iterator it = streamInfos.begin();
   1470              it != streamInfos.end(); ++it) {
   1471             (*it).trim();
   1472             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
   1473 
   1474             CHECK(GetAttribute((*it).c_str(), "url", &val));
   1475 
   1476             size_t trackIndex = 0;
   1477             while (trackIndex < mTracks.size()
   1478                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
   1479                 ++trackIndex;
   1480             }
   1481             CHECK_LT(trackIndex, mTracks.size());
   1482 
   1483             CHECK(GetAttribute((*it).c_str(), "seq", &val));
   1484 
   1485             char *end;
   1486             unsigned long seq = strtoul(val.c_str(), &end, 10);
   1487 
   1488             TrackInfo *info = &mTracks.editItemAt(trackIndex);
   1489             info->mFirstSeqNumInSegment = seq;
   1490             info->mNewSegment = true;
   1491             info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
   1492 
   1493             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
   1494 
   1495             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
   1496 
   1497             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
   1498 
   1499             info->mNormalPlayTimeRTP = rtpTime;
   1500             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
   1501 
   1502             if (!mFirstAccessUnit) {
   1503                 postNormalPlayTimeMapping(
   1504                         trackIndex,
   1505                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1506             }
   1507 
   1508             ++n;
   1509         }
   1510     }
   1511 
   1512     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
   1513         CHECK_GE(index, 0u);
   1514         CHECK_LT(index, mTracks.size());
   1515 
   1516         const TrackInfo &info = mTracks.itemAt(index);
   1517 
   1518         *timeScale = info.mTimeScale;
   1519 
   1520         return info.mPacketSource->getFormat();
   1521     }
   1522 
   1523     size_t countTracks() const {
   1524         return mTracks.size();
   1525     }
   1526 
   1527 private:
   1528     struct TrackInfo {
   1529         AString mURL;
   1530         int mRTPSocket;
   1531         int mRTCPSocket;
   1532         bool mUsingInterleavedTCP;
   1533         uint32_t mFirstSeqNumInSegment;
   1534         bool mNewSegment;
   1535         int32_t mAllowedStaleAccessUnits;
   1536 
   1537         uint32_t mRTPAnchor;
   1538         int64_t mNTPAnchorUs;
   1539         int32_t mTimeScale;
   1540         bool mEOSReceived;
   1541 
   1542         uint32_t mNormalPlayTimeRTP;
   1543         int64_t mNormalPlayTimeUs;
   1544 
   1545         sp<APacketSource> mPacketSource;
   1546 
   1547         // Stores packets temporarily while no notion of time
   1548         // has been established yet.
   1549         List<sp<ABuffer> > mPackets;
   1550     };
   1551 
   1552     sp<AMessage> mNotify;
   1553     bool mUIDValid;
   1554     uid_t mUID;
   1555     sp<ALooper> mNetLooper;
   1556     sp<ARTSPConnection> mConn;
   1557     sp<ARTPConnection> mRTPConn;
   1558     sp<ASessionDescription> mSessionDesc;
   1559     AString mOriginalSessionURL;  // This one still has user:pass@
   1560     AString mSessionURL;
   1561     AString mSessionHost;
   1562     AString mBaseURL;
   1563     AString mControlURL;
   1564     AString mSessionID;
   1565     bool mSetupTracksSuccessful;
   1566     bool mSeekPending;
   1567     bool mFirstAccessUnit;
   1568 
   1569     bool mAllTracksHaveTime;
   1570     int64_t mNTPAnchorUs;
   1571     int64_t mMediaAnchorUs;
   1572     int64_t mLastMediaTimeUs;
   1573 
   1574     int64_t mNumAccessUnitsReceived;
   1575     bool mCheckPending;
   1576     int32_t mCheckGeneration;
   1577     int32_t mCheckTimeoutGeneration;
   1578     bool mTryTCPInterleaving;
   1579     bool mTryFakeRTCP;
   1580     bool mReceivedFirstRTCPPacket;
   1581     bool mReceivedFirstRTPPacket;
   1582     bool mSeekable;
   1583     int64_t mKeepAliveTimeoutUs;
   1584     int32_t mKeepAliveGeneration;
   1585     bool mPausing;
   1586     int32_t mPauseGeneration;
   1587 
   1588     Vector<TrackInfo> mTracks;
   1589 
   1590     bool mPlayResponseParsed;
   1591 
   1592     void setupTrack(size_t index) {
   1593         sp<APacketSource> source =
   1594             new APacketSource(mSessionDesc, index);
   1595 
   1596         if (source->initCheck() != OK) {
   1597             ALOGW("Unsupported format. Ignoring track #%zu.", index);
   1598 
   1599             sp<AMessage> reply = new AMessage('setu', this);
   1600             reply->setSize("index", index);
   1601             reply->setInt32("result", ERROR_UNSUPPORTED);
   1602             reply->post();
   1603             return;
   1604         }
   1605 
   1606         AString url;
   1607         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
   1608 
   1609         AString trackURL;
   1610         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
   1611 
   1612         mTracks.push(TrackInfo());
   1613         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
   1614         info->mURL = trackURL;
   1615         info->mPacketSource = source;
   1616         info->mUsingInterleavedTCP = false;
   1617         info->mFirstSeqNumInSegment = 0;
   1618         info->mNewSegment = true;
   1619         info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
   1620         info->mRTPSocket = -1;
   1621         info->mRTCPSocket = -1;
   1622         info->mRTPAnchor = 0;
   1623         info->mNTPAnchorUs = -1;
   1624         info->mNormalPlayTimeRTP = 0;
   1625         info->mNormalPlayTimeUs = 0ll;
   1626 
   1627         unsigned long PT;
   1628         AString formatDesc;
   1629         AString formatParams;
   1630         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
   1631 
   1632         int32_t timescale;
   1633         int32_t numChannels;
   1634         ASessionDescription::ParseFormatDesc(
   1635                 formatDesc.c_str(), &timescale, &numChannels);
   1636 
   1637         info->mTimeScale = timescale;
   1638         info->mEOSReceived = false;
   1639 
   1640         ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
   1641 
   1642         AString request = "SETUP ";
   1643         request.append(trackURL);
   1644         request.append(" RTSP/1.0\r\n");
   1645 
   1646         if (mTryTCPInterleaving) {
   1647             size_t interleaveIndex = 2 * (mTracks.size() - 1);
   1648             info->mUsingInterleavedTCP = true;
   1649             info->mRTPSocket = interleaveIndex;
   1650             info->mRTCPSocket = interleaveIndex + 1;
   1651 
   1652             request.append("Transport: RTP/AVP/TCP;interleaved=");
   1653             request.append(interleaveIndex);
   1654             request.append("-");
   1655             request.append(interleaveIndex + 1);
   1656         } else {
   1657             unsigned rtpPort;
   1658             ARTPConnection::MakePortPair(
   1659                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
   1660 
   1661             if (mUIDValid) {
   1662                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
   1663                                                 (uint32_t)*(uint32_t*) "RTP_");
   1664                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
   1665                                                 (uint32_t)*(uint32_t*) "RTP_");
   1666                 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
   1667                 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
   1668             }
   1669 
   1670             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
   1671             request.append(rtpPort);
   1672             request.append("-");
   1673             request.append(rtpPort + 1);
   1674         }
   1675 
   1676         request.append("\r\n");
   1677 
   1678         if (index > 1) {
   1679             request.append("Session: ");
   1680             request.append(mSessionID);
   1681             request.append("\r\n");
   1682         }
   1683 
   1684         request.append("\r\n");
   1685 
   1686         sp<AMessage> reply = new AMessage('setu', this);
   1687         reply->setSize("index", index);
   1688         reply->setSize("track-index", mTracks.size() - 1);
   1689         mConn->sendRequest(request.c_str(), reply);
   1690     }
   1691 
   1692     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
   1693         out->clear();
   1694 
   1695         if (strncasecmp("rtsp://", baseURL, 7)) {
   1696             // Base URL must be absolute
   1697             return false;
   1698         }
   1699 
   1700         if (!strncasecmp("rtsp://", url, 7)) {
   1701             // "url" is already an absolute URL, ignore base URL.
   1702             out->setTo(url);
   1703             return true;
   1704         }
   1705 
   1706         size_t n = strlen(baseURL);
   1707         out->setTo(baseURL);
   1708         if (baseURL[n - 1] != '/') {
   1709             out->append("/");
   1710         }
   1711         out->append(url);
   1712 
   1713         return true;
   1714     }
   1715 
   1716     void fakeTimestamps() {
   1717         mNTPAnchorUs = -1ll;
   1718         for (size_t i = 0; i < mTracks.size(); ++i) {
   1719             onTimeUpdate(i, 0, 0ll);
   1720         }
   1721     }
   1722 
   1723     bool dataReceivedOnAllChannels() {
   1724         TrackInfo *track;
   1725         for (size_t i = 0; i < mTracks.size(); ++i) {
   1726             track = &mTracks.editItemAt(i);
   1727             if (track->mPackets.empty()) {
   1728                 return false;
   1729             }
   1730         }
   1731         return true;
   1732     }
   1733 
   1734     void handleFirstAccessUnit() {
   1735         if (mFirstAccessUnit) {
   1736             sp<AMessage> msg = mNotify->dup();
   1737             msg->setInt32("what", kWhatConnected);
   1738             msg->post();
   1739 
   1740             if (mSeekable) {
   1741                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1742                     TrackInfo *info = &mTracks.editItemAt(i);
   1743 
   1744                     postNormalPlayTimeMapping(
   1745                             i,
   1746                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1747                 }
   1748             }
   1749 
   1750             mFirstAccessUnit = false;
   1751         }
   1752     }
   1753 
   1754     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
   1755         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
   1756              trackIndex, rtpTime, (long long)ntpTime);
   1757 
   1758         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
   1759 
   1760         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1761 
   1762         track->mRTPAnchor = rtpTime;
   1763         track->mNTPAnchorUs = ntpTimeUs;
   1764 
   1765         if (mNTPAnchorUs < 0) {
   1766             mNTPAnchorUs = ntpTimeUs;
   1767             mMediaAnchorUs = mLastMediaTimeUs;
   1768         }
   1769 
   1770         if (!mAllTracksHaveTime) {
   1771             bool allTracksHaveTime = (mTracks.size() > 0);
   1772             for (size_t i = 0; i < mTracks.size(); ++i) {
   1773                 TrackInfo *track = &mTracks.editItemAt(i);
   1774                 if (track->mNTPAnchorUs < 0) {
   1775                     allTracksHaveTime = false;
   1776                     break;
   1777                 }
   1778             }
   1779             if (allTracksHaveTime) {
   1780                 mAllTracksHaveTime = true;
   1781                 ALOGI("Time now established for all tracks.");
   1782             }
   1783         }
   1784         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
   1785             handleFirstAccessUnit();
   1786 
   1787             // Time is now established, lets start timestamping immediately
   1788             for (size_t i = 0; i < mTracks.size(); ++i) {
   1789                 if (OK != processAccessUnitQueue(i)) {
   1790                     return;
   1791                 }
   1792             }
   1793             for (size_t i = 0; i < mTracks.size(); ++i) {
   1794                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
   1795                 if (trackInfo->mEOSReceived) {
   1796                     postQueueEOS(i, ERROR_END_OF_STREAM);
   1797                     trackInfo->mEOSReceived = false;
   1798                 }
   1799             }
   1800         }
   1801     }
   1802 
   1803     status_t processAccessUnitQueue(int32_t trackIndex) {
   1804         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1805         while (!track->mPackets.empty()) {
   1806             sp<ABuffer> accessUnit = *track->mPackets.begin();
   1807             track->mPackets.erase(track->mPackets.begin());
   1808 
   1809             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
   1810             if (track->mNewSegment) {
   1811                 // The sequence number from RTP packet has only 16 bits and is extended
   1812                 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
   1813                 // RTSP "PLAY" command should be used to detect the first RTP packet
   1814                 // after seeking.
   1815                 if (mSeekable) {
   1816                     if (track->mAllowedStaleAccessUnits > 0) {
   1817                         uint32_t seqNum16 = seqNum & 0xffff;
   1818                         uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
   1819                         if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
   1820                                 || seqNum16 < firstSeqNumInSegment16) {
   1821                             // Not the first rtp packet of the stream after seeking, discarding.
   1822                             track->mAllowedStaleAccessUnits--;
   1823                             ALOGV("discarding stale access unit (0x%x : 0x%x)",
   1824                                  seqNum, track->mFirstSeqNumInSegment);
   1825                             continue;
   1826                         }
   1827                         ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
   1828                                 "Missing the first packet(%u), now take packet(%u) as first one",
   1829                                 track->mFirstSeqNumInSegment, seqNum);
   1830                     } else { // track->mAllowedStaleAccessUnits <= 0
   1831                         mNumAccessUnitsReceived = 0;
   1832                         ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
   1833                              "Still no first rtp packet after %d stale ones",
   1834                              kMaxAllowedStaleAccessUnits);
   1835                         track->mAllowedStaleAccessUnits = -1;
   1836                         return UNKNOWN_ERROR;
   1837                     }
   1838                 }
   1839 
   1840                 // Now found the first rtp packet of the stream after seeking.
   1841                 track->mFirstSeqNumInSegment = seqNum;
   1842                 track->mNewSegment = false;
   1843             }
   1844 
   1845             if (seqNum < track->mFirstSeqNumInSegment) {
   1846                 ALOGV("dropping stale access-unit (%d < %d)",
   1847                      seqNum, track->mFirstSeqNumInSegment);
   1848                 continue;
   1849             }
   1850 
   1851             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1852                 postQueueAccessUnit(trackIndex, accessUnit);
   1853             }
   1854         }
   1855         return OK;
   1856     }
   1857 
   1858     void onAccessUnitComplete(
   1859             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
   1860         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1861         track->mPackets.push_back(accessUnit);
   1862 
   1863         uint32_t seqNum = (uint32_t)accessUnit->int32Data();
   1864         ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
   1865 
   1866         if(!mPlayResponseParsed){
   1867             ALOGV("play response is not parsed");
   1868             return;
   1869         }
   1870 
   1871         handleFirstAccessUnit();
   1872 
   1873         if (!mAllTracksHaveTime) {
   1874             ALOGV("storing accessUnit, no time established yet");
   1875             return;
   1876         }
   1877 
   1878         if (OK != processAccessUnitQueue(trackIndex)) {
   1879             return;
   1880         }
   1881 
   1882         if (track->mEOSReceived) {
   1883             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
   1884             track->mEOSReceived = false;
   1885         }
   1886     }
   1887 
   1888     bool addMediaTimestamp(
   1889             int32_t trackIndex, const TrackInfo *track,
   1890             const sp<ABuffer> &accessUnit) {
   1891         UNUSED_UNLESS_VERBOSE(trackIndex);
   1892 
   1893         uint32_t rtpTime;
   1894         CHECK(accessUnit->meta()->findInt32(
   1895                     "rtp-time", (int32_t *)&rtpTime));
   1896 
   1897         int64_t relRtpTimeUs =
   1898             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
   1899                 / track->mTimeScale;
   1900 
   1901         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
   1902 
   1903         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
   1904 
   1905         if (mediaTimeUs > mLastMediaTimeUs) {
   1906             mLastMediaTimeUs = mediaTimeUs;
   1907         }
   1908 
   1909         if (mediaTimeUs < 0) {
   1910             ALOGV("dropping early accessUnit.");
   1911             return false;
   1912         }
   1913 
   1914         ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
   1915              trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
   1916 
   1917         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
   1918 
   1919         return true;
   1920     }
   1921 
   1922     void postQueueAccessUnit(
   1923             size_t trackIndex, const sp<ABuffer> &accessUnit) {
   1924         sp<AMessage> msg = mNotify->dup();
   1925         msg->setInt32("what", kWhatAccessUnit);
   1926         msg->setSize("trackIndex", trackIndex);
   1927         msg->setBuffer("accessUnit", accessUnit);
   1928         msg->post();
   1929     }
   1930 
   1931     void postQueueEOS(size_t trackIndex, status_t finalResult) {
   1932         sp<AMessage> msg = mNotify->dup();
   1933         msg->setInt32("what", kWhatEOS);
   1934         msg->setSize("trackIndex", trackIndex);
   1935         msg->setInt32("finalResult", finalResult);
   1936         msg->post();
   1937     }
   1938 
   1939     void postQueueSeekDiscontinuity(size_t trackIndex) {
   1940         sp<AMessage> msg = mNotify->dup();
   1941         msg->setInt32("what", kWhatSeekDiscontinuity);
   1942         msg->setSize("trackIndex", trackIndex);
   1943         msg->post();
   1944     }
   1945 
   1946     void postNormalPlayTimeMapping(
   1947             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
   1948         sp<AMessage> msg = mNotify->dup();
   1949         msg->setInt32("what", kWhatNormalPlayTimeMapping);
   1950         msg->setSize("trackIndex", trackIndex);
   1951         msg->setInt32("rtpTime", rtpTime);
   1952         msg->setInt64("nptUs", nptUs);
   1953         msg->post();
   1954     }
   1955 
   1956     void postTimeout() {
   1957         sp<AMessage> timeout = new AMessage('tiou', this);
   1958         mCheckTimeoutGeneration++;
   1959         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1960 
   1961         int64_t startupTimeoutUs;
   1962         startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
   1963         timeout->post(startupTimeoutUs);
   1964     }
   1965 
   1966     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
   1967 };
   1968 
   1969 }  // namespace android
   1970 
   1971 #endif  // MY_HANDLER_H_
   1972