Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #ifndef MY_HANDLER_H_
     18 
     19 #define MY_HANDLER_H_
     20 
     21 //#define LOG_NDEBUG 0
     22 
     23 #ifndef LOG_TAG
     24 #define LOG_TAG "MyHandler"
     25 #endif
     26 
     27 #include <utils/Log.h>
     28 #include <cutils/properties.h> // for property_get
     29 
     30 #include "APacketSource.h"
     31 #include "ARTPConnection.h"
     32 #include "ARTSPConnection.h"
     33 #include "ASessionDescription.h"
     34 #include "NetworkUtils.h"
     35 
     36 #include <ctype.h>
     37 #include <cutils/properties.h>
     38 
     39 #include <media/stagefright/foundation/ABuffer.h>
     40 #include <media/stagefright/foundation/ADebug.h>
     41 #include <media/stagefright/foundation/ALooper.h>
     42 #include <media/stagefright/foundation/AMessage.h>
     43 #include <media/stagefright/MediaErrors.h>
     44 #include <media/stagefright/Utils.h>
     45 
     46 #include <arpa/inet.h>
     47 #include <sys/socket.h>
     48 #include <netdb.h>
     49 
     50 #include "HTTPBase.h"
     51 
     52 #if LOG_NDEBUG
     53 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
     54 #else
     55 #define UNUSED_UNLESS_VERBOSE(x)
     56 #endif
     57 
     58 #ifndef FALLTHROUGH_INTENDED
     59 #define FALLTHROUGH_INTENDED [[clang::fallthrough]]  // NOLINT
     60 #endif
     61 
     62 // If no access units are received within 5 secs, assume that the rtp
     63 // stream has ended and signal end of stream.
     64 static int64_t kAccessUnitTimeoutUs = 10000000ll;
     65 
     66 // If no access units arrive for the first 10 secs after starting the
     67 // stream, assume none ever will and signal EOS or switch transports.
     68 static int64_t kStartupTimeoutUs = 10000000ll;
     69 
     70 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
     71 
     72 static int64_t kPauseDelayUs = 3000000ll;
     73 
     74 // The allowed maximum number of stale access units at the beginning of
     75 // a new sequence.
     76 static int32_t kMaxAllowedStaleAccessUnits = 20;
     77 
     78 static int64_t kTearDownTimeoutUs = 3000000ll;
     79 
     80 namespace android {
     81 
     82 static bool GetAttribute(const char *s, const char *key, AString *value) {
     83     value->clear();
     84 
     85     size_t keyLen = strlen(key);
     86 
     87     for (;;) {
     88         while (isspace(*s)) {
     89             ++s;
     90         }
     91 
     92         const char *colonPos = strchr(s, ';');
     93 
     94         size_t len =
     95             (colonPos == NULL) ? strlen(s) : colonPos - s;
     96 
     97         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
     98             value->setTo(&s[keyLen + 1], len - keyLen - 1);
     99             return true;
    100         }
    101 
    102         if (colonPos == NULL) {
    103             return false;
    104         }
    105 
    106         s = colonPos + 1;
    107     }
    108 }
    109 
    110 struct MyHandler : public AHandler {
    111     enum {
    112         kWhatConnected                  = 'conn',
    113         kWhatDisconnected               = 'disc',
    114         kWhatSeekPaused                 = 'spau',
    115         kWhatSeekDone                   = 'sdon',
    116 
    117         kWhatAccessUnit                 = 'accU',
    118         kWhatEOS                        = 'eos!',
    119         kWhatSeekDiscontinuity          = 'seeD',
    120         kWhatNormalPlayTimeMapping      = 'nptM',
    121     };
    122 
    123     MyHandler(
    124             const char *url,
    125             const sp<AMessage> &notify,
    126             bool uidValid = false, uid_t uid = 0)
    127         : mNotify(notify),
    128           mUIDValid(uidValid),
    129           mUID(uid),
    130           mNetLooper(new ALooper),
    131           mConn(new ARTSPConnection(mUIDValid, mUID)),
    132           mRTPConn(new ARTPConnection),
    133           mOriginalSessionURL(url),
    134           mSessionURL(url),
    135           mSetupTracksSuccessful(false),
    136           mSeekPending(false),
    137           mFirstAccessUnit(true),
    138           mAllTracksHaveTime(false),
    139           mNTPAnchorUs(-1),
    140           mMediaAnchorUs(-1),
    141           mLastMediaTimeUs(0),
    142           mNumAccessUnitsReceived(0),
    143           mCheckPending(false),
    144           mCheckGeneration(0),
    145           mCheckTimeoutGeneration(0),
    146           mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
    147           mTryFakeRTCP(false),
    148           mReceivedFirstRTCPPacket(false),
    149           mReceivedFirstRTPPacket(false),
    150           mSeekable(true),
    151           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
    152           mKeepAliveGeneration(0),
    153           mPausing(false),
    154           mPauseGeneration(0),
    155           mPlayResponseParsed(false) {
    156         mNetLooper->setName("rtsp net");
    157         mNetLooper->start(false /* runOnCallingThread */,
    158                           false /* canCallJava */,
    159                           PRIORITY_HIGHEST);
    160 
    161         // Strip any authentication info from the session url, we don't
    162         // want to transmit user/pass in cleartext.
    163         AString host, path, user, pass;
    164         unsigned port;
    165         CHECK(ARTSPConnection::ParseURL(
    166                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
    167 
    168         if (user.size() > 0) {
    169             mSessionURL.clear();
    170             mSessionURL.append("rtsp://");
    171             mSessionURL.append(host);
    172             mSessionURL.append(":");
    173             mSessionURL.append(AStringPrintf("%u", port));
    174             mSessionURL.append(path);
    175 
    176             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
    177         }
    178 
    179         mSessionHost = host;
    180     }
    181 
    182     void connect() {
    183         looper()->registerHandler(mConn);
    184         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    185 
    186         sp<AMessage> notify = new AMessage('biny', this);
    187         mConn->observeBinaryData(notify);
    188 
    189         sp<AMessage> reply = new AMessage('conn', this);
    190         mConn->connect(mOriginalSessionURL.c_str(), reply);
    191     }
    192 
    193     void loadSDP(const sp<ASessionDescription>& desc) {
    194         looper()->registerHandler(mConn);
    195         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    196 
    197         sp<AMessage> notify = new AMessage('biny', this);
    198         mConn->observeBinaryData(notify);
    199 
    200         sp<AMessage> reply = new AMessage('sdpl', this);
    201         reply->setObject("description", desc);
    202         mConn->connect(mOriginalSessionURL.c_str(), reply);
    203     }
    204 
    205     AString getControlURL() {
    206         AString sessionLevelControlURL;
    207         if (mSessionDesc->findAttribute(
    208                 0,
    209                 "a=control",
    210                 &sessionLevelControlURL)) {
    211             if (sessionLevelControlURL.compare("*") == 0) {
    212                 return mBaseURL;
    213             } else {
    214                 AString controlURL;
    215                 CHECK(MakeURL(
    216                         mBaseURL.c_str(),
    217                         sessionLevelControlURL.c_str(),
    218                         &controlURL));
    219                 return controlURL;
    220             }
    221         } else {
    222             return mSessionURL;
    223         }
    224     }
    225 
    226     void disconnect() {
    227         (new AMessage('abor', this))->post();
    228     }
    229 
    230     void seek(int64_t timeUs) {
    231         sp<AMessage> msg = new AMessage('seek', this);
    232         msg->setInt64("time", timeUs);
    233         mPauseGeneration++;
    234         msg->post();
    235     }
    236 
    237     void continueSeekAfterPause(int64_t timeUs) {
    238         sp<AMessage> msg = new AMessage('see1', this);
    239         msg->setInt64("time", timeUs);
    240         msg->post();
    241     }
    242 
    243     bool isSeekable() const {
    244         return mSeekable;
    245     }
    246 
    247     void pause() {
    248         sp<AMessage> msg = new AMessage('paus', this);
    249         mPauseGeneration++;
    250         msg->setInt32("pausecheck", mPauseGeneration);
    251         msg->post();
    252     }
    253 
    254     void resume() {
    255         sp<AMessage> msg = new AMessage('resu', this);
    256         mPauseGeneration++;
    257         msg->post();
    258     }
    259 
    260     static void addRR(const sp<ABuffer> &buf) {
    261         uint8_t *ptr = buf->data() + buf->size();
    262         ptr[0] = 0x80 | 0;
    263         ptr[1] = 201;  // RR
    264         ptr[2] = 0;
    265         ptr[3] = 1;
    266         ptr[4] = 0xde;  // SSRC
    267         ptr[5] = 0xad;
    268         ptr[6] = 0xbe;
    269         ptr[7] = 0xef;
    270 
    271         buf->setRange(0, buf->size() + 8);
    272     }
    273 
    274     static void addSDES(int s, const sp<ABuffer> &buffer) {
    275         struct sockaddr_in addr;
    276         socklen_t addrSize = sizeof(addr);
    277         if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
    278             inet_aton("0.0.0.0", &(addr.sin_addr));
    279         }
    280 
    281         uint8_t *data = buffer->data() + buffer->size();
    282         data[0] = 0x80 | 1;
    283         data[1] = 202;  // SDES
    284         data[4] = 0xde;  // SSRC
    285         data[5] = 0xad;
    286         data[6] = 0xbe;
    287         data[7] = 0xef;
    288 
    289         size_t offset = 8;
    290 
    291         data[offset++] = 1;  // CNAME
    292 
    293         AString cname = "stagefright@";
    294         cname.append(inet_ntoa(addr.sin_addr));
    295         data[offset++] = cname.size();
    296 
    297         memcpy(&data[offset], cname.c_str(), cname.size());
    298         offset += cname.size();
    299 
    300         data[offset++] = 6;  // TOOL
    301 
    302         AString tool = MakeUserAgent();
    303 
    304         data[offset++] = tool.size();
    305 
    306         memcpy(&data[offset], tool.c_str(), tool.size());
    307         offset += tool.size();
    308 
    309         data[offset++] = 0;
    310 
    311         if ((offset % 4) > 0) {
    312             size_t count = 4 - (offset % 4);
    313             switch (count) {
    314                 case 3:
    315                     data[offset++] = 0;
    316                     FALLTHROUGH_INTENDED;
    317                 case 2:
    318                     data[offset++] = 0;
    319                     FALLTHROUGH_INTENDED;
    320                 case 1:
    321                     data[offset++] = 0;
    322             }
    323         }
    324 
    325         size_t numWords = (offset / 4) - 1;
    326         data[2] = numWords >> 8;
    327         data[3] = numWords & 0xff;
    328 
    329         buffer->setRange(buffer->offset(), buffer->size() + offset);
    330     }
    331 
    332     // In case we're behind NAT, fire off two UDP packets to the remote
    333     // rtp/rtcp ports to poke a hole into the firewall for future incoming
    334     // packets. We're going to send an RR/SDES RTCP packet to both of them.
    335     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
    336         struct sockaddr_in addr;
    337         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    338         addr.sin_family = AF_INET;
    339 
    340         AString source;
    341         AString server_port;
    342         if (!GetAttribute(transport.c_str(),
    343                           "source",
    344                           &source)) {
    345             ALOGW("Missing 'source' field in Transport response. Using "
    346                  "RTSP endpoint address.");
    347 
    348             struct hostent *ent = gethostbyname(mSessionHost.c_str());
    349             if (ent == NULL) {
    350                 ALOGE("Failed to look up address of session host");
    351 
    352                 return false;
    353             }
    354 
    355             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    356         } else {
    357             addr.sin_addr.s_addr = inet_addr(source.c_str());
    358         }
    359 
    360         if (!GetAttribute(transport.c_str(),
    361                                  "server_port",
    362                                  &server_port)) {
    363             ALOGI("Missing 'server_port' field in Transport response.");
    364             return false;
    365         }
    366 
    367         int rtpPort, rtcpPort;
    368         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
    369                 || rtpPort <= 0 || rtpPort > 65535
    370                 || rtcpPort <=0 || rtcpPort > 65535
    371                 || rtcpPort != rtpPort + 1) {
    372             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
    373                  " RTP port must be even, RTCP port must be one higher.",
    374                  server_port.c_str());
    375 
    376             return false;
    377         }
    378 
    379         if (rtpPort & 1) {
    380             ALOGW("Server picked an odd RTP port, it should've picked an "
    381                  "even one, we'll let it pass for now, but this may break "
    382                  "in the future.");
    383         }
    384 
    385         if (addr.sin_addr.s_addr == INADDR_NONE) {
    386             return true;
    387         }
    388 
    389         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
    390             // No firewalls to traverse on the loopback interface.
    391             return true;
    392         }
    393 
    394         // Make up an RR/SDES RTCP packet.
    395         sp<ABuffer> buf = new ABuffer(65536);
    396         buf->setRange(0, 0);
    397         addRR(buf);
    398         addSDES(rtpSocket, buf);
    399 
    400         addr.sin_port = htons(rtpPort);
    401 
    402         ssize_t n = sendto(
    403                 rtpSocket, buf->data(), buf->size(), 0,
    404                 (const sockaddr *)&addr, sizeof(addr));
    405 
    406         if (n < (ssize_t)buf->size()) {
    407             ALOGE("failed to poke a hole for RTP packets");
    408             return false;
    409         }
    410 
    411         addr.sin_port = htons(rtcpPort);
    412 
    413         n = sendto(
    414                 rtcpSocket, buf->data(), buf->size(), 0,
    415                 (const sockaddr *)&addr, sizeof(addr));
    416 
    417         if (n < (ssize_t)buf->size()) {
    418             ALOGE("failed to poke a hole for RTCP packets");
    419             return false;
    420         }
    421 
    422         ALOGV("successfully poked holes.");
    423 
    424         return true;
    425     }
    426 
    427     static bool isLiveStream(const sp<ASessionDescription> &desc) {
    428         AString attrLiveStream;
    429         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
    430             ssize_t semicolonPos = attrLiveStream.find(";", 2);
    431 
    432             const char* liveStreamValue;
    433             if (semicolonPos < 0) {
    434                 liveStreamValue = attrLiveStream.c_str();
    435             } else {
    436                 AString valString;
    437                 valString.setTo(attrLiveStream,
    438                         semicolonPos + 1,
    439                         attrLiveStream.size() - semicolonPos - 1);
    440                 liveStreamValue = valString.c_str();
    441             }
    442 
    443             uint32_t value = strtoul(liveStreamValue, NULL, 10);
    444             if (value == 1) {
    445                 ALOGV("found live stream");
    446                 return true;
    447             }
    448         } else {
    449             // It is a live stream if no duration is returned
    450             int64_t durationUs;
    451             if (!desc->getDurationUs(&durationUs)) {
    452                 ALOGV("No duration found, assume live stream");
    453                 return true;
    454             }
    455         }
    456 
    457         return false;
    458     }
    459 
    460     virtual void onMessageReceived(const sp<AMessage> &msg) {
    461         switch (msg->what()) {
    462             case 'conn':
    463             {
    464                 int32_t result;
    465                 CHECK(msg->findInt32("result", &result));
    466 
    467                 ALOGI("connection request completed with result %d (%s)",
    468                      result, strerror(-result));
    469 
    470                 if (result == OK) {
    471                     AString request;
    472                     request = "DESCRIBE ";
    473                     request.append(mSessionURL);
    474                     request.append(" RTSP/1.0\r\n");
    475                     request.append("Accept: application/sdp\r\n");
    476                     request.append("\r\n");
    477 
    478                     sp<AMessage> reply = new AMessage('desc', this);
    479                     mConn->sendRequest(request.c_str(), reply);
    480                 } else {
    481                     (new AMessage('disc', this))->post();
    482                 }
    483                 break;
    484             }
    485 
    486             case 'disc':
    487             {
    488                 ++mKeepAliveGeneration;
    489 
    490                 int32_t reconnect;
    491                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    492                     sp<AMessage> reply = new AMessage('conn', this);
    493                     mConn->connect(mOriginalSessionURL.c_str(), reply);
    494                 } else {
    495                     (new AMessage('quit', this))->post();
    496                 }
    497                 break;
    498             }
    499 
    500             case 'desc':
    501             {
    502                 int32_t result;
    503                 CHECK(msg->findInt32("result", &result));
    504 
    505                 ALOGI("DESCRIBE completed with result %d (%s)",
    506                      result, strerror(-result));
    507 
    508                 if (result == OK) {
    509                     sp<RefBase> obj;
    510                     CHECK(msg->findObject("response", &obj));
    511                     sp<ARTSPResponse> response =
    512                         static_cast<ARTSPResponse *>(obj.get());
    513 
    514                     if (response->mStatusCode == 301 || response->mStatusCode == 302) {
    515                         ssize_t i = response->mHeaders.indexOfKey("location");
    516                         CHECK_GE(i, 0);
    517 
    518                         mOriginalSessionURL = response->mHeaders.valueAt(i);
    519                         mSessionURL = mOriginalSessionURL;
    520 
    521                         // Strip any authentication info from the session url, we don't
    522                         // want to transmit user/pass in cleartext.
    523                         AString host, path, user, pass;
    524                         unsigned port;
    525                         if (ARTSPConnection::ParseURL(
    526                                     mSessionURL.c_str(), &host, &port, &path, &user, &pass)
    527                                 && user.size() > 0) {
    528                             mSessionURL.clear();
    529                             mSessionURL.append("rtsp://");
    530                             mSessionURL.append(host);
    531                             mSessionURL.append(":");
    532                             mSessionURL.append(AStringPrintf("%u", port));
    533                             mSessionURL.append(path);
    534 
    535                             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
    536                         }
    537 
    538                         sp<AMessage> reply = new AMessage('conn', this);
    539                         mConn->connect(mOriginalSessionURL.c_str(), reply);
    540                         break;
    541                     }
    542 
    543                     if (response->mStatusCode != 200) {
    544                         result = UNKNOWN_ERROR;
    545                     } else if (response->mContent == NULL) {
    546                         result = ERROR_MALFORMED;
    547                         ALOGE("The response has no content.");
    548                     } else {
    549                         mSessionDesc = new ASessionDescription;
    550 
    551                         mSessionDesc->setTo(
    552                                 response->mContent->data(),
    553                                 response->mContent->size());
    554 
    555                         if (!mSessionDesc->isValid()) {
    556                             ALOGE("Failed to parse session description.");
    557                             result = ERROR_MALFORMED;
    558                         } else {
    559                             ssize_t i = response->mHeaders.indexOfKey("content-base");
    560                             if (i >= 0) {
    561                                 mBaseURL = response->mHeaders.valueAt(i);
    562                             } else {
    563                                 i = response->mHeaders.indexOfKey("content-location");
    564                                 if (i >= 0) {
    565                                     mBaseURL = response->mHeaders.valueAt(i);
    566                                 } else {
    567                                     mBaseURL = mSessionURL;
    568                                 }
    569                             }
    570 
    571                             mSeekable = !isLiveStream(mSessionDesc);
    572 
    573                             if (!mBaseURL.startsWith("rtsp://")) {
    574                                 // Some misbehaving servers specify a relative
    575                                 // URL in one of the locations above, combine
    576                                 // it with the absolute session URL to get
    577                                 // something usable...
    578 
    579                                 ALOGW("Server specified a non-absolute base URL"
    580                                      ", combining it with the session URL to "
    581                                      "get something usable...");
    582 
    583                                 AString tmp;
    584                                 CHECK(MakeURL(
    585                                             mSessionURL.c_str(),
    586                                             mBaseURL.c_str(),
    587                                             &tmp));
    588 
    589                                 mBaseURL = tmp;
    590                             }
    591 
    592                             mControlURL = getControlURL();
    593 
    594                             if (mSessionDesc->countTracks() < 2) {
    595                                 // There's no actual tracks in this session.
    596                                 // The first "track" is merely session meta
    597                                 // data.
    598 
    599                                 ALOGW("Session doesn't contain any playable "
    600                                      "tracks. Aborting.");
    601                                 result = ERROR_UNSUPPORTED;
    602                             } else {
    603                                 setupTrack(1);
    604                             }
    605                         }
    606                     }
    607                 }
    608 
    609                 if (result != OK) {
    610                     sp<AMessage> reply = new AMessage('disc', this);
    611                     mConn->disconnect(reply);
    612                 }
    613                 break;
    614             }
    615 
    616             case 'sdpl':
    617             {
    618                 int32_t result;
    619                 CHECK(msg->findInt32("result", &result));
    620 
    621                 ALOGI("SDP connection request completed with result %d (%s)",
    622                      result, strerror(-result));
    623 
    624                 if (result == OK) {
    625                     sp<RefBase> obj;
    626                     CHECK(msg->findObject("description", &obj));
    627                     mSessionDesc =
    628                         static_cast<ASessionDescription *>(obj.get());
    629 
    630                     if (!mSessionDesc->isValid()) {
    631                         ALOGE("Failed to parse session description.");
    632                         result = ERROR_MALFORMED;
    633                     } else {
    634                         mBaseURL = mSessionURL;
    635 
    636                         mSeekable = !isLiveStream(mSessionDesc);
    637 
    638                         mControlURL = getControlURL();
    639 
    640                         if (mSessionDesc->countTracks() < 2) {
    641                             // There's no actual tracks in this session.
    642                             // The first "track" is merely session meta
    643                             // data.
    644 
    645                             ALOGW("Session doesn't contain any playable "
    646                                  "tracks. Aborting.");
    647                             result = ERROR_UNSUPPORTED;
    648                         } else {
    649                             setupTrack(1);
    650                         }
    651                     }
    652                 }
    653 
    654                 if (result != OK) {
    655                     sp<AMessage> reply = new AMessage('disc', this);
    656                     mConn->disconnect(reply);
    657                 }
    658                 break;
    659             }
    660 
    661             case 'setu':
    662             {
    663                 size_t index;
    664                 CHECK(msg->findSize("index", &index));
    665 
    666                 TrackInfo *track = NULL;
    667                 size_t trackIndex;
    668                 if (msg->findSize("track-index", &trackIndex)) {
    669                     track = &mTracks.editItemAt(trackIndex);
    670                 }
    671 
    672                 int32_t result;
    673                 CHECK(msg->findInt32("result", &result));
    674 
    675                 ALOGI("SETUP(%zu) completed with result %d (%s)",
    676                      index, result, strerror(-result));
    677 
    678                 if (result == OK) {
    679                     CHECK(track != NULL);
    680 
    681                     sp<RefBase> obj;
    682                     CHECK(msg->findObject("response", &obj));
    683                     sp<ARTSPResponse> response =
    684                         static_cast<ARTSPResponse *>(obj.get());
    685 
    686                     if (response->mStatusCode != 200) {
    687                         result = UNKNOWN_ERROR;
    688                     } else {
    689                         ssize_t i = response->mHeaders.indexOfKey("session");
    690                         CHECK_GE(i, 0);
    691 
    692                         mSessionID = response->mHeaders.valueAt(i);
    693 
    694                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    695                         AString timeoutStr;
    696                         if (GetAttribute(
    697                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
    698                             char *end;
    699                             unsigned long timeoutSecs =
    700                                 strtoul(timeoutStr.c_str(), &end, 10);
    701 
    702                             if (end == timeoutStr.c_str() || *end != '\0') {
    703                                 ALOGW("server specified malformed timeout '%s'",
    704                                      timeoutStr.c_str());
    705 
    706                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    707                             } else if (timeoutSecs < 15) {
    708                                 ALOGW("server specified too short a timeout "
    709                                      "(%lu secs), using default.",
    710                                      timeoutSecs);
    711 
    712                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    713                             } else {
    714                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
    715 
    716                                 ALOGI("server specified timeout of %lu secs.",
    717                                      timeoutSecs);
    718                             }
    719                         }
    720 
    721                         i = mSessionID.find(";");
    722                         if (i >= 0) {
    723                             // Remove options, i.e. ";timeout=90"
    724                             mSessionID.erase(i, mSessionID.size() - i);
    725                         }
    726 
    727                         sp<AMessage> notify = new AMessage('accu', this);
    728                         notify->setSize("track-index", trackIndex);
    729 
    730                         i = response->mHeaders.indexOfKey("transport");
    731                         CHECK_GE(i, 0);
    732 
    733                         if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
    734                             if (!track->mUsingInterleavedTCP) {
    735                                 AString transport = response->mHeaders.valueAt(i);
    736 
    737                                 // We are going to continue even if we were
    738                                 // unable to poke a hole into the firewall...
    739                                 pokeAHole(
    740                                         track->mRTPSocket,
    741                                         track->mRTCPSocket,
    742                                         transport);
    743                             }
    744 
    745                             mRTPConn->addStream(
    746                                     track->mRTPSocket, track->mRTCPSocket,
    747                                     mSessionDesc, index,
    748                                     notify, track->mUsingInterleavedTCP);
    749 
    750                             mSetupTracksSuccessful = true;
    751                         } else {
    752                             result = BAD_VALUE;
    753                         }
    754                     }
    755                 }
    756 
    757                 if (result != OK) {
    758                     if (track) {
    759                         if (!track->mUsingInterleavedTCP) {
    760                             // Clear the tag
    761                             if (mUIDValid) {
    762                                 NetworkUtils::UnRegisterSocketUserTag(track->mRTPSocket);
    763                                 NetworkUtils::UnRegisterSocketUserMark(track->mRTPSocket);
    764                                 NetworkUtils::UnRegisterSocketUserTag(track->mRTCPSocket);
    765                                 NetworkUtils::UnRegisterSocketUserMark(track->mRTCPSocket);
    766                             }
    767 
    768                             close(track->mRTPSocket);
    769                             close(track->mRTCPSocket);
    770                         }
    771 
    772                         mTracks.removeItemsAt(trackIndex);
    773                     }
    774                 }
    775 
    776                 ++index;
    777                 if (result == OK && index < mSessionDesc->countTracks()) {
    778                     setupTrack(index);
    779                 } else if (mSetupTracksSuccessful) {
    780                     ++mKeepAliveGeneration;
    781                     postKeepAlive();
    782 
    783                     AString request = "PLAY ";
    784                     request.append(mControlURL);
    785                     request.append(" RTSP/1.0\r\n");
    786 
    787                     request.append("Session: ");
    788                     request.append(mSessionID);
    789                     request.append("\r\n");
    790 
    791                     request.append("\r\n");
    792 
    793                     sp<AMessage> reply = new AMessage('play', this);
    794                     mConn->sendRequest(request.c_str(), reply);
    795                 } else {
    796                     sp<AMessage> reply = new AMessage('disc', this);
    797                     mConn->disconnect(reply);
    798                 }
    799                 break;
    800             }
    801 
    802             case 'play':
    803             {
    804                 int32_t result;
    805                 CHECK(msg->findInt32("result", &result));
    806 
    807                 ALOGI("PLAY completed with result %d (%s)",
    808                      result, strerror(-result));
    809 
    810                 if (result == OK) {
    811                     sp<RefBase> obj;
    812                     CHECK(msg->findObject("response", &obj));
    813                     sp<ARTSPResponse> response =
    814                         static_cast<ARTSPResponse *>(obj.get());
    815 
    816                     if (response->mStatusCode != 200) {
    817                         result = UNKNOWN_ERROR;
    818                     } else {
    819                         parsePlayResponse(response);
    820                         postTimeout();
    821                     }
    822                 }
    823 
    824                 if (result != OK) {
    825                     sp<AMessage> reply = new AMessage('disc', this);
    826                     mConn->disconnect(reply);
    827                 }
    828 
    829                 break;
    830             }
    831 
    832             case 'aliv':
    833             {
    834                 int32_t generation;
    835                 CHECK(msg->findInt32("generation", &generation));
    836 
    837                 if (generation != mKeepAliveGeneration) {
    838                     // obsolete event.
    839                     break;
    840                 }
    841 
    842                 AString request;
    843                 request.append("OPTIONS ");
    844                 request.append(mSessionURL);
    845                 request.append(" RTSP/1.0\r\n");
    846                 request.append("Session: ");
    847                 request.append(mSessionID);
    848                 request.append("\r\n");
    849                 request.append("\r\n");
    850 
    851                 sp<AMessage> reply = new AMessage('opts', this);
    852                 reply->setInt32("generation", mKeepAliveGeneration);
    853                 mConn->sendRequest(request.c_str(), reply);
    854                 break;
    855             }
    856 
    857             case 'opts':
    858             {
    859                 int32_t result;
    860                 CHECK(msg->findInt32("result", &result));
    861 
    862                 ALOGI("OPTIONS completed with result %d (%s)",
    863                      result, strerror(-result));
    864 
    865                 int32_t generation;
    866                 CHECK(msg->findInt32("generation", &generation));
    867 
    868                 if (generation != mKeepAliveGeneration) {
    869                     // obsolete event.
    870                     break;
    871                 }
    872 
    873                 postKeepAlive();
    874                 break;
    875             }
    876 
    877             case 'abor':
    878             {
    879                 for (size_t i = 0; i < mTracks.size(); ++i) {
    880                     TrackInfo *info = &mTracks.editItemAt(i);
    881 
    882                     if (!mFirstAccessUnit) {
    883                         postQueueEOS(i, ERROR_END_OF_STREAM);
    884                     }
    885 
    886                     if (!info->mUsingInterleavedTCP) {
    887                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
    888 
    889                         // Clear the tag
    890                         if (mUIDValid) {
    891                             NetworkUtils::UnRegisterSocketUserTag(info->mRTPSocket);
    892                             NetworkUtils::UnRegisterSocketUserMark(info->mRTPSocket);
    893                             NetworkUtils::UnRegisterSocketUserTag(info->mRTCPSocket);
    894                             NetworkUtils::UnRegisterSocketUserMark(info->mRTCPSocket);
    895                         }
    896 
    897                         close(info->mRTPSocket);
    898                         close(info->mRTCPSocket);
    899                     }
    900                 }
    901                 mTracks.clear();
    902                 mSetupTracksSuccessful = false;
    903                 mSeekPending = false;
    904                 mFirstAccessUnit = true;
    905                 mAllTracksHaveTime = false;
    906                 mNTPAnchorUs = -1;
    907                 mMediaAnchorUs = -1;
    908                 mNumAccessUnitsReceived = 0;
    909                 mReceivedFirstRTCPPacket = false;
    910                 mReceivedFirstRTPPacket = false;
    911                 mPausing = false;
    912                 mSeekable = true;
    913 
    914                 sp<AMessage> reply = new AMessage('tear', this);
    915 
    916                 int32_t reconnect;
    917                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    918                     reply->setInt32("reconnect", true);
    919                 }
    920 
    921                 AString request;
    922                 request = "TEARDOWN ";
    923 
    924                 // XXX should use aggregate url from SDP here...
    925                 request.append(mSessionURL);
    926                 request.append(" RTSP/1.0\r\n");
    927 
    928                 request.append("Session: ");
    929                 request.append(mSessionID);
    930                 request.append("\r\n");
    931 
    932                 request.append("\r\n");
    933 
    934                 mConn->sendRequest(request.c_str(), reply);
    935 
    936                 // If the response of teardown hasn't been received in 3 seconds,
    937                 // post 'tear' message to avoid ANR.
    938                 if (!msg->findInt32("reconnect", &reconnect) || !reconnect) {
    939                     sp<AMessage> teardown = reply->dup();
    940                     teardown->setInt32("result", -ECONNABORTED);
    941                     teardown->post(kTearDownTimeoutUs);
    942                 }
    943                 break;
    944             }
    945 
    946             case 'tear':
    947             {
    948                 int32_t result;
    949                 CHECK(msg->findInt32("result", &result));
    950 
    951                 ALOGI("TEARDOWN completed with result %d (%s)",
    952                      result, strerror(-result));
    953 
    954                 sp<AMessage> reply = new AMessage('disc', this);
    955 
    956                 int32_t reconnect;
    957                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    958                     reply->setInt32("reconnect", true);
    959                 }
    960 
    961                 mConn->disconnect(reply);
    962                 break;
    963             }
    964 
    965             case 'quit':
    966             {
    967                 sp<AMessage> msg = mNotify->dup();
    968                 msg->setInt32("what", kWhatDisconnected);
    969                 msg->setInt32("result", UNKNOWN_ERROR);
    970                 msg->post();
    971                 break;
    972             }
    973 
    974             case 'chek':
    975             {
    976                 int32_t generation;
    977                 CHECK(msg->findInt32("generation", &generation));
    978                 if (generation != mCheckGeneration) {
    979                     // This is an outdated message. Ignore.
    980                     break;
    981                 }
    982 
    983                 if (mNumAccessUnitsReceived == 0) {
    984 #if 1
    985                     ALOGI("stream ended? aborting.");
    986                     (new AMessage('abor', this))->post();
    987                     break;
    988 #else
    989                     ALOGI("haven't seen an AU in a looong time.");
    990 #endif
    991                 }
    992 
    993                 mNumAccessUnitsReceived = 0;
    994                 msg->post(kAccessUnitTimeoutUs);
    995                 break;
    996             }
    997 
    998             case 'accu':
    999             {
   1000                 if (mSeekPending) {
   1001                     ALOGV("Stale access unit.");
   1002                     break;
   1003                 }
   1004 
   1005                 int32_t timeUpdate;
   1006                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
   1007                     size_t trackIndex;
   1008                     CHECK(msg->findSize("track-index", &trackIndex));
   1009 
   1010                     uint32_t rtpTime;
   1011                     uint64_t ntpTime;
   1012                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
   1013                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
   1014 
   1015                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
   1016                     break;
   1017                 }
   1018 
   1019                 int32_t first;
   1020                 if (msg->findInt32("first-rtcp", &first)) {
   1021                     mReceivedFirstRTCPPacket = true;
   1022                     break;
   1023                 }
   1024 
   1025                 if (msg->findInt32("first-rtp", &first)) {
   1026                     mReceivedFirstRTPPacket = true;
   1027                     break;
   1028                 }
   1029 
   1030                 ++mNumAccessUnitsReceived;
   1031                 postAccessUnitTimeoutCheck();
   1032 
   1033                 size_t trackIndex;
   1034                 CHECK(msg->findSize("track-index", &trackIndex));
   1035 
   1036                 if (trackIndex >= mTracks.size()) {
   1037                     ALOGV("late packets ignored.");
   1038                     break;
   1039                 }
   1040 
   1041                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1042 
   1043                 int32_t eos;
   1044                 if (msg->findInt32("eos", &eos)) {
   1045                     ALOGI("received BYE on track index %zu", trackIndex);
   1046                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
   1047                         ALOGI("No time established => fake existing data");
   1048 
   1049                         track->mEOSReceived = true;
   1050                         mTryFakeRTCP = true;
   1051                         mReceivedFirstRTCPPacket = true;
   1052                         fakeTimestamps();
   1053                     } else {
   1054                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
   1055                     }
   1056                     return;
   1057                 }
   1058 
   1059                 if (mSeekPending) {
   1060                     ALOGV("we're seeking, dropping stale packet.");
   1061                     break;
   1062                 }
   1063 
   1064                 sp<ABuffer> accessUnit;
   1065                 CHECK(msg->findBuffer("access-unit", &accessUnit));
   1066                 onAccessUnitComplete(trackIndex, accessUnit);
   1067                 break;
   1068             }
   1069 
   1070             case 'paus':
   1071             {
   1072                 int32_t generation;
   1073                 CHECK(msg->findInt32("pausecheck", &generation));
   1074                 if (generation != mPauseGeneration) {
   1075                     ALOGV("Ignoring outdated pause message.");
   1076                     break;
   1077                 }
   1078 
   1079                 if (!mSeekable) {
   1080                     ALOGW("This is a live stream, ignoring pause request.");
   1081                     break;
   1082                 }
   1083 
   1084                 if (mPausing) {
   1085                     ALOGV("This stream is already paused.");
   1086                     break;
   1087                 }
   1088 
   1089                 mCheckPending = true;
   1090                 ++mCheckGeneration;
   1091                 mPausing = true;
   1092 
   1093                 AString request = "PAUSE ";
   1094                 request.append(mControlURL);
   1095                 request.append(" RTSP/1.0\r\n");
   1096 
   1097                 request.append("Session: ");
   1098                 request.append(mSessionID);
   1099                 request.append("\r\n");
   1100 
   1101                 request.append("\r\n");
   1102 
   1103                 sp<AMessage> reply = new AMessage('pau2', this);
   1104                 mConn->sendRequest(request.c_str(), reply);
   1105                 break;
   1106             }
   1107 
   1108             case 'pau2':
   1109             {
   1110                 int32_t result;
   1111                 CHECK(msg->findInt32("result", &result));
   1112                 mCheckTimeoutGeneration++;
   1113 
   1114                 ALOGI("PAUSE completed with result %d (%s)",
   1115                      result, strerror(-result));
   1116                 break;
   1117             }
   1118 
   1119             case 'resu':
   1120             {
   1121                 if (mPausing && mSeekPending) {
   1122                     // If seeking, Play will be sent from see1 instead
   1123                     break;
   1124                 }
   1125 
   1126                 if (!mPausing) {
   1127                     // Dont send PLAY if we have not paused
   1128                     break;
   1129                 }
   1130                 AString request = "PLAY ";
   1131                 request.append(mControlURL);
   1132                 request.append(" RTSP/1.0\r\n");
   1133 
   1134                 request.append("Session: ");
   1135                 request.append(mSessionID);
   1136                 request.append("\r\n");
   1137 
   1138                 request.append("\r\n");
   1139 
   1140                 sp<AMessage> reply = new AMessage('res2', this);
   1141                 mConn->sendRequest(request.c_str(), reply);
   1142                 break;
   1143             }
   1144 
   1145             case 'res2':
   1146             {
   1147                 int32_t result;
   1148                 CHECK(msg->findInt32("result", &result));
   1149 
   1150                 ALOGI("PLAY (for resume) completed with result %d (%s)",
   1151                      result, strerror(-result));
   1152 
   1153                 mCheckPending = false;
   1154                 ++mCheckGeneration;
   1155                 postAccessUnitTimeoutCheck();
   1156 
   1157                 if (result == OK) {
   1158                     sp<RefBase> obj;
   1159                     CHECK(msg->findObject("response", &obj));
   1160                     sp<ARTSPResponse> response =
   1161                         static_cast<ARTSPResponse *>(obj.get());
   1162 
   1163                     if (response->mStatusCode != 200) {
   1164                         result = UNKNOWN_ERROR;
   1165                     } else {
   1166                         parsePlayResponse(response);
   1167 
   1168                         // Post new timeout in order to make sure to use
   1169                         // fake timestamps if no new Sender Reports arrive
   1170                         postTimeout();
   1171                     }
   1172                 }
   1173 
   1174                 if (result != OK) {
   1175                     ALOGE("resume failed, aborting.");
   1176                     (new AMessage('abor', this))->post();
   1177                 }
   1178 
   1179                 mPausing = false;
   1180                 break;
   1181             }
   1182 
   1183             case 'seek':
   1184             {
   1185                 if (!mSeekable) {
   1186                     ALOGW("This is a live stream, ignoring seek request.");
   1187 
   1188                     sp<AMessage> msg = mNotify->dup();
   1189                     msg->setInt32("what", kWhatSeekDone);
   1190                     msg->post();
   1191                     break;
   1192                 }
   1193 
   1194                 int64_t timeUs;
   1195                 CHECK(msg->findInt64("time", &timeUs));
   1196 
   1197                 mSeekPending = true;
   1198 
   1199                 // Disable the access unit timeout until we resumed
   1200                 // playback again.
   1201                 mCheckPending = true;
   1202                 ++mCheckGeneration;
   1203 
   1204                 sp<AMessage> reply = new AMessage('see0', this);
   1205                 reply->setInt64("time", timeUs);
   1206 
   1207                 if (mPausing) {
   1208                     // PAUSE already sent
   1209                     ALOGI("Pause already sent");
   1210                     reply->post();
   1211                     break;
   1212                 }
   1213                 AString request = "PAUSE ";
   1214                 request.append(mControlURL);
   1215                 request.append(" RTSP/1.0\r\n");
   1216 
   1217                 request.append("Session: ");
   1218                 request.append(mSessionID);
   1219                 request.append("\r\n");
   1220 
   1221                 request.append("\r\n");
   1222 
   1223                 mConn->sendRequest(request.c_str(), reply);
   1224                 break;
   1225             }
   1226 
   1227             case 'see0':
   1228             {
   1229                 // Session is paused now.
   1230                 status_t err = OK;
   1231                 msg->findInt32("result", &err);
   1232 
   1233                 int64_t timeUs;
   1234                 CHECK(msg->findInt64("time", &timeUs));
   1235 
   1236                 sp<AMessage> notify = mNotify->dup();
   1237                 notify->setInt32("what", kWhatSeekPaused);
   1238                 notify->setInt32("err", err);
   1239                 notify->setInt64("time", timeUs);
   1240                 notify->post();
   1241                 break;
   1242 
   1243             }
   1244 
   1245             case 'see1':
   1246             {
   1247                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1248                     TrackInfo *info = &mTracks.editItemAt(i);
   1249 
   1250                     postQueueSeekDiscontinuity(i);
   1251                     info->mEOSReceived = false;
   1252 
   1253                     info->mRTPAnchor = 0;
   1254                     info->mNTPAnchorUs = -1;
   1255                 }
   1256 
   1257                 mAllTracksHaveTime = false;
   1258                 mNTPAnchorUs = -1;
   1259 
   1260                 // Start new timeoutgeneration to avoid getting timeout
   1261                 // before PLAY response arrive
   1262                 postTimeout();
   1263 
   1264                 int64_t timeUs;
   1265                 CHECK(msg->findInt64("time", &timeUs));
   1266 
   1267                 AString request = "PLAY ";
   1268                 request.append(mControlURL);
   1269                 request.append(" RTSP/1.0\r\n");
   1270 
   1271                 request.append("Session: ");
   1272                 request.append(mSessionID);
   1273                 request.append("\r\n");
   1274 
   1275                 request.append(
   1276                         AStringPrintf(
   1277                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
   1278 
   1279                 request.append("\r\n");
   1280 
   1281                 sp<AMessage> reply = new AMessage('see2', this);
   1282                 mConn->sendRequest(request.c_str(), reply);
   1283                 break;
   1284             }
   1285 
   1286             case 'see2':
   1287             {
   1288                 if (mTracks.size() == 0) {
   1289                     // We have already hit abor, break
   1290                     break;
   1291                 }
   1292 
   1293                 int32_t result;
   1294                 CHECK(msg->findInt32("result", &result));
   1295 
   1296                 ALOGI("PLAY (for seek) completed with result %d (%s)",
   1297                      result, strerror(-result));
   1298 
   1299                 mCheckPending = false;
   1300                 ++mCheckGeneration;
   1301                 postAccessUnitTimeoutCheck();
   1302 
   1303                 if (result == OK) {
   1304                     sp<RefBase> obj;
   1305                     CHECK(msg->findObject("response", &obj));
   1306                     sp<ARTSPResponse> response =
   1307                         static_cast<ARTSPResponse *>(obj.get());
   1308 
   1309                     if (response->mStatusCode != 200) {
   1310                         result = UNKNOWN_ERROR;
   1311                     } else {
   1312                         parsePlayResponse(response);
   1313 
   1314                         // Post new timeout in order to make sure to use
   1315                         // fake timestamps if no new Sender Reports arrive
   1316                         postTimeout();
   1317 
   1318                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
   1319                         CHECK_GE(i, 0);
   1320 
   1321                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
   1322 
   1323                         ALOGI("seek completed.");
   1324                     }
   1325                 }
   1326 
   1327                 if (result != OK) {
   1328                     ALOGE("seek failed, aborting.");
   1329                     (new AMessage('abor', this))->post();
   1330                 }
   1331 
   1332                 mPausing = false;
   1333                 mSeekPending = false;
   1334 
   1335                 // Discard all stale access units.
   1336                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1337                     TrackInfo *track = &mTracks.editItemAt(i);
   1338                     track->mPackets.clear();
   1339                 }
   1340 
   1341                 sp<AMessage> msg = mNotify->dup();
   1342                 msg->setInt32("what", kWhatSeekDone);
   1343                 msg->post();
   1344                 break;
   1345             }
   1346 
   1347             case 'biny':
   1348             {
   1349                 sp<ABuffer> buffer;
   1350                 CHECK(msg->findBuffer("buffer", &buffer));
   1351 
   1352                 int32_t index;
   1353                 CHECK(buffer->meta()->findInt32("index", &index));
   1354 
   1355                 mRTPConn->injectPacket(index, buffer);
   1356                 break;
   1357             }
   1358 
   1359             case 'tiou':
   1360             {
   1361                 int32_t timeoutGenerationCheck;
   1362                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
   1363                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
   1364                     // This is an outdated message. Ignore.
   1365                     // This typically happens if a lot of seeks are
   1366                     // performed, since new timeout messages now are
   1367                     // posted at seek as well.
   1368                     break;
   1369                 }
   1370                 if (!mReceivedFirstRTCPPacket) {
   1371                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
   1372                         ALOGW("We received RTP packets but no RTCP packets, "
   1373                              "using fake timestamps.");
   1374 
   1375                         mTryFakeRTCP = true;
   1376 
   1377                         mReceivedFirstRTCPPacket = true;
   1378 
   1379                         fakeTimestamps();
   1380                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
   1381                         ALOGW("Never received any data, switching transports.");
   1382 
   1383                         mTryTCPInterleaving = true;
   1384 
   1385                         sp<AMessage> msg = new AMessage('abor', this);
   1386                         msg->setInt32("reconnect", true);
   1387                         msg->post();
   1388                     } else {
   1389                         ALOGW("Never received any data, disconnecting.");
   1390                         (new AMessage('abor', this))->post();
   1391                     }
   1392                 } else {
   1393                     if (!mAllTracksHaveTime) {
   1394                         ALOGW("We received some RTCP packets, but time "
   1395                               "could not be established on all tracks, now "
   1396                               "using fake timestamps");
   1397 
   1398                         fakeTimestamps();
   1399                     }
   1400                 }
   1401                 break;
   1402             }
   1403 
   1404             default:
   1405                 TRESPASS();
   1406                 break;
   1407         }
   1408     }
   1409 
   1410     void postKeepAlive() {
   1411         sp<AMessage> msg = new AMessage('aliv', this);
   1412         msg->setInt32("generation", mKeepAliveGeneration);
   1413         msg->post((mKeepAliveTimeoutUs * 9) / 10);
   1414     }
   1415 
   1416     void cancelAccessUnitTimeoutCheck() {
   1417         ALOGV("cancelAccessUnitTimeoutCheck");
   1418         ++mCheckGeneration;
   1419     }
   1420 
   1421     void postAccessUnitTimeoutCheck() {
   1422         if (mCheckPending) {
   1423             return;
   1424         }
   1425 
   1426         mCheckPending = true;
   1427         sp<AMessage> check = new AMessage('chek', this);
   1428         check->setInt32("generation", mCheckGeneration);
   1429         check->post(kAccessUnitTimeoutUs);
   1430     }
   1431 
   1432     static void SplitString(
   1433             const AString &s, const char *separator, List<AString> *items) {
   1434         items->clear();
   1435         size_t start = 0;
   1436         while (start < s.size()) {
   1437             ssize_t offset = s.find(separator, start);
   1438 
   1439             if (offset < 0) {
   1440                 items->push_back(AString(s, start, s.size() - start));
   1441                 break;
   1442             }
   1443 
   1444             items->push_back(AString(s, start, offset - start));
   1445             start = offset + strlen(separator);
   1446         }
   1447     }
   1448 
   1449     void parsePlayResponse(const sp<ARTSPResponse> &response) {
   1450         mPlayResponseParsed = true;
   1451         if (mTracks.size() == 0) {
   1452             ALOGV("parsePlayResponse: late packets ignored.");
   1453             return;
   1454         }
   1455 
   1456         ssize_t i = response->mHeaders.indexOfKey("range");
   1457         if (i < 0) {
   1458             // Server doesn't even tell use what range it is going to
   1459             // play, therefore we won't support seeking.
   1460             return;
   1461         }
   1462 
   1463         AString range = response->mHeaders.valueAt(i);
   1464         ALOGV("Range: %s", range.c_str());
   1465 
   1466         AString val;
   1467         CHECK(GetAttribute(range.c_str(), "npt", &val));
   1468 
   1469         float npt1, npt2;
   1470         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
   1471             // This is a live stream and therefore not seekable.
   1472 
   1473             ALOGI("This is a live stream");
   1474             return;
   1475         }
   1476 
   1477         i = response->mHeaders.indexOfKey("rtp-info");
   1478         CHECK_GE(i, 0);
   1479 
   1480         AString rtpInfo = response->mHeaders.valueAt(i);
   1481         List<AString> streamInfos;
   1482         SplitString(rtpInfo, ",", &streamInfos);
   1483 
   1484         int n = 1;
   1485         for (List<AString>::iterator it = streamInfos.begin();
   1486              it != streamInfos.end(); ++it) {
   1487             (*it).trim();
   1488             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
   1489 
   1490             CHECK(GetAttribute((*it).c_str(), "url", &val));
   1491 
   1492             size_t trackIndex = 0;
   1493             while (trackIndex < mTracks.size()
   1494                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
   1495                 ++trackIndex;
   1496             }
   1497             CHECK_LT(trackIndex, mTracks.size());
   1498 
   1499             CHECK(GetAttribute((*it).c_str(), "seq", &val));
   1500 
   1501             char *end;
   1502             unsigned long seq = strtoul(val.c_str(), &end, 10);
   1503 
   1504             TrackInfo *info = &mTracks.editItemAt(trackIndex);
   1505             info->mFirstSeqNumInSegment = seq;
   1506             info->mNewSegment = true;
   1507             info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
   1508 
   1509             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
   1510 
   1511             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
   1512 
   1513             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
   1514 
   1515             info->mNormalPlayTimeRTP = rtpTime;
   1516             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
   1517 
   1518             if (!mFirstAccessUnit) {
   1519                 postNormalPlayTimeMapping(
   1520                         trackIndex,
   1521                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1522             }
   1523 
   1524             ++n;
   1525         }
   1526     }
   1527 
   1528     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
   1529         CHECK_GE(index, 0u);
   1530         CHECK_LT(index, mTracks.size());
   1531 
   1532         const TrackInfo &info = mTracks.itemAt(index);
   1533 
   1534         *timeScale = info.mTimeScale;
   1535 
   1536         return info.mPacketSource->getFormat();
   1537     }
   1538 
   1539     size_t countTracks() const {
   1540         return mTracks.size();
   1541     }
   1542 
   1543 private:
   1544     struct TrackInfo {
   1545         AString mURL;
   1546         int mRTPSocket;
   1547         int mRTCPSocket;
   1548         bool mUsingInterleavedTCP;
   1549         uint32_t mFirstSeqNumInSegment;
   1550         bool mNewSegment;
   1551         int32_t mAllowedStaleAccessUnits;
   1552 
   1553         uint32_t mRTPAnchor;
   1554         int64_t mNTPAnchorUs;
   1555         int32_t mTimeScale;
   1556         bool mEOSReceived;
   1557 
   1558         uint32_t mNormalPlayTimeRTP;
   1559         int64_t mNormalPlayTimeUs;
   1560 
   1561         sp<APacketSource> mPacketSource;
   1562 
   1563         // Stores packets temporarily while no notion of time
   1564         // has been established yet.
   1565         List<sp<ABuffer> > mPackets;
   1566     };
   1567 
   1568     sp<AMessage> mNotify;
   1569     bool mUIDValid;
   1570     uid_t mUID;
   1571     sp<ALooper> mNetLooper;
   1572     sp<ARTSPConnection> mConn;
   1573     sp<ARTPConnection> mRTPConn;
   1574     sp<ASessionDescription> mSessionDesc;
   1575     AString mOriginalSessionURL;  // This one still has user:pass@
   1576     AString mSessionURL;
   1577     AString mSessionHost;
   1578     AString mBaseURL;
   1579     AString mControlURL;
   1580     AString mSessionID;
   1581     bool mSetupTracksSuccessful;
   1582     bool mSeekPending;
   1583     bool mFirstAccessUnit;
   1584 
   1585     bool mAllTracksHaveTime;
   1586     int64_t mNTPAnchorUs;
   1587     int64_t mMediaAnchorUs;
   1588     int64_t mLastMediaTimeUs;
   1589 
   1590     int64_t mNumAccessUnitsReceived;
   1591     bool mCheckPending;
   1592     int32_t mCheckGeneration;
   1593     int32_t mCheckTimeoutGeneration;
   1594     bool mTryTCPInterleaving;
   1595     bool mTryFakeRTCP;
   1596     bool mReceivedFirstRTCPPacket;
   1597     bool mReceivedFirstRTPPacket;
   1598     bool mSeekable;
   1599     int64_t mKeepAliveTimeoutUs;
   1600     int32_t mKeepAliveGeneration;
   1601     bool mPausing;
   1602     int32_t mPauseGeneration;
   1603 
   1604     Vector<TrackInfo> mTracks;
   1605 
   1606     bool mPlayResponseParsed;
   1607 
   1608     void setupTrack(size_t index) {
   1609         sp<APacketSource> source =
   1610             new APacketSource(mSessionDesc, index);
   1611 
   1612         if (source->initCheck() != OK) {
   1613             ALOGW("Unsupported format. Ignoring track #%zu.", index);
   1614 
   1615             sp<AMessage> reply = new AMessage('setu', this);
   1616             reply->setSize("index", index);
   1617             reply->setInt32("result", ERROR_UNSUPPORTED);
   1618             reply->post();
   1619             return;
   1620         }
   1621 
   1622         AString url;
   1623         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
   1624 
   1625         AString trackURL;
   1626         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
   1627 
   1628         mTracks.push(TrackInfo());
   1629         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
   1630         info->mURL = trackURL;
   1631         info->mPacketSource = source;
   1632         info->mUsingInterleavedTCP = false;
   1633         info->mFirstSeqNumInSegment = 0;
   1634         info->mNewSegment = true;
   1635         info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
   1636         info->mRTPSocket = -1;
   1637         info->mRTCPSocket = -1;
   1638         info->mRTPAnchor = 0;
   1639         info->mNTPAnchorUs = -1;
   1640         info->mNormalPlayTimeRTP = 0;
   1641         info->mNormalPlayTimeUs = 0ll;
   1642 
   1643         unsigned long PT;
   1644         AString formatDesc;
   1645         AString formatParams;
   1646         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
   1647 
   1648         int32_t timescale;
   1649         int32_t numChannels;
   1650         ASessionDescription::ParseFormatDesc(
   1651                 formatDesc.c_str(), &timescale, &numChannels);
   1652 
   1653         info->mTimeScale = timescale;
   1654         info->mEOSReceived = false;
   1655 
   1656         ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
   1657 
   1658         AString request = "SETUP ";
   1659         request.append(trackURL);
   1660         request.append(" RTSP/1.0\r\n");
   1661 
   1662         if (mTryTCPInterleaving) {
   1663             size_t interleaveIndex = 2 * (mTracks.size() - 1);
   1664             info->mUsingInterleavedTCP = true;
   1665             info->mRTPSocket = interleaveIndex;
   1666             info->mRTCPSocket = interleaveIndex + 1;
   1667 
   1668             request.append("Transport: RTP/AVP/TCP;interleaved=");
   1669             request.append(interleaveIndex);
   1670             request.append("-");
   1671             request.append(interleaveIndex + 1);
   1672         } else {
   1673             unsigned rtpPort;
   1674             ARTPConnection::MakePortPair(
   1675                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
   1676 
   1677             if (mUIDValid) {
   1678                 NetworkUtils::RegisterSocketUserTag(info->mRTPSocket, mUID,
   1679                         (uint32_t)*(uint32_t*) "RTP_");
   1680                 NetworkUtils::RegisterSocketUserTag(info->mRTCPSocket, mUID,
   1681                         (uint32_t)*(uint32_t*) "RTP_");
   1682                 NetworkUtils::RegisterSocketUserMark(info->mRTPSocket, mUID);
   1683                 NetworkUtils::RegisterSocketUserMark(info->mRTCPSocket, mUID);
   1684             }
   1685 
   1686             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
   1687             request.append(rtpPort);
   1688             request.append("-");
   1689             request.append(rtpPort + 1);
   1690         }
   1691 
   1692         request.append("\r\n");
   1693 
   1694         if (index > 1) {
   1695             request.append("Session: ");
   1696             request.append(mSessionID);
   1697             request.append("\r\n");
   1698         }
   1699 
   1700         request.append("\r\n");
   1701 
   1702         sp<AMessage> reply = new AMessage('setu', this);
   1703         reply->setSize("index", index);
   1704         reply->setSize("track-index", mTracks.size() - 1);
   1705         mConn->sendRequest(request.c_str(), reply);
   1706     }
   1707 
   1708     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
   1709         out->clear();
   1710 
   1711         if (strncasecmp("rtsp://", baseURL, 7)) {
   1712             // Base URL must be absolute
   1713             return false;
   1714         }
   1715 
   1716         if (!strncasecmp("rtsp://", url, 7)) {
   1717             // "url" is already an absolute URL, ignore base URL.
   1718             out->setTo(url);
   1719             return true;
   1720         }
   1721 
   1722         size_t n = strlen(baseURL);
   1723         out->setTo(baseURL);
   1724         if (baseURL[n - 1] != '/') {
   1725             out->append("/");
   1726         }
   1727         out->append(url);
   1728 
   1729         return true;
   1730     }
   1731 
   1732     void fakeTimestamps() {
   1733         mNTPAnchorUs = -1ll;
   1734         for (size_t i = 0; i < mTracks.size(); ++i) {
   1735             onTimeUpdate(i, 0, 0ll);
   1736         }
   1737     }
   1738 
   1739     bool dataReceivedOnAllChannels() {
   1740         TrackInfo *track;
   1741         for (size_t i = 0; i < mTracks.size(); ++i) {
   1742             track = &mTracks.editItemAt(i);
   1743             if (track->mPackets.empty()) {
   1744                 return false;
   1745             }
   1746         }
   1747         return true;
   1748     }
   1749 
   1750     void handleFirstAccessUnit() {
   1751         if (mFirstAccessUnit) {
   1752             sp<AMessage> msg = mNotify->dup();
   1753             msg->setInt32("what", kWhatConnected);
   1754             msg->post();
   1755 
   1756             if (mSeekable) {
   1757                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1758                     TrackInfo *info = &mTracks.editItemAt(i);
   1759 
   1760                     postNormalPlayTimeMapping(
   1761                             i,
   1762                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1763                 }
   1764             }
   1765 
   1766             mFirstAccessUnit = false;
   1767         }
   1768     }
   1769 
   1770     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
   1771         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
   1772              trackIndex, rtpTime, (long long)ntpTime);
   1773 
   1774         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
   1775 
   1776         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1777 
   1778         track->mRTPAnchor = rtpTime;
   1779         track->mNTPAnchorUs = ntpTimeUs;
   1780 
   1781         if (mNTPAnchorUs < 0) {
   1782             mNTPAnchorUs = ntpTimeUs;
   1783             mMediaAnchorUs = mLastMediaTimeUs;
   1784         }
   1785 
   1786         if (!mAllTracksHaveTime) {
   1787             bool allTracksHaveTime = (mTracks.size() > 0);
   1788             for (size_t i = 0; i < mTracks.size(); ++i) {
   1789                 TrackInfo *track = &mTracks.editItemAt(i);
   1790                 if (track->mNTPAnchorUs < 0) {
   1791                     allTracksHaveTime = false;
   1792                     break;
   1793                 }
   1794             }
   1795             if (allTracksHaveTime) {
   1796                 mAllTracksHaveTime = true;
   1797                 ALOGI("Time now established for all tracks.");
   1798             }
   1799         }
   1800         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
   1801             handleFirstAccessUnit();
   1802 
   1803             // Time is now established, lets start timestamping immediately
   1804             for (size_t i = 0; i < mTracks.size(); ++i) {
   1805                 if (OK != processAccessUnitQueue(i)) {
   1806                     return;
   1807                 }
   1808             }
   1809             for (size_t i = 0; i < mTracks.size(); ++i) {
   1810                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
   1811                 if (trackInfo->mEOSReceived) {
   1812                     postQueueEOS(i, ERROR_END_OF_STREAM);
   1813                     trackInfo->mEOSReceived = false;
   1814                 }
   1815             }
   1816         }
   1817     }
   1818 
   1819     status_t processAccessUnitQueue(int32_t trackIndex) {
   1820         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1821         while (!track->mPackets.empty()) {
   1822             sp<ABuffer> accessUnit = *track->mPackets.begin();
   1823             track->mPackets.erase(track->mPackets.begin());
   1824 
   1825             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
   1826             if (track->mNewSegment) {
   1827                 // The sequence number from RTP packet has only 16 bits and is extended
   1828                 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
   1829                 // RTSP "PLAY" command should be used to detect the first RTP packet
   1830                 // after seeking.
   1831                 if (mSeekable) {
   1832                     if (track->mAllowedStaleAccessUnits > 0) {
   1833                         uint32_t seqNum16 = seqNum & 0xffff;
   1834                         uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
   1835                         if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
   1836                                 || seqNum16 < firstSeqNumInSegment16) {
   1837                             // Not the first rtp packet of the stream after seeking, discarding.
   1838                             track->mAllowedStaleAccessUnits--;
   1839                             ALOGV("discarding stale access unit (0x%x : 0x%x)",
   1840                                  seqNum, track->mFirstSeqNumInSegment);
   1841                             continue;
   1842                         }
   1843                         ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
   1844                                 "Missing the first packet(%u), now take packet(%u) as first one",
   1845                                 track->mFirstSeqNumInSegment, seqNum);
   1846                     } else { // track->mAllowedStaleAccessUnits <= 0
   1847                         mNumAccessUnitsReceived = 0;
   1848                         ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
   1849                              "Still no first rtp packet after %d stale ones",
   1850                              kMaxAllowedStaleAccessUnits);
   1851                         track->mAllowedStaleAccessUnits = -1;
   1852                         return UNKNOWN_ERROR;
   1853                     }
   1854                 }
   1855 
   1856                 // Now found the first rtp packet of the stream after seeking.
   1857                 track->mFirstSeqNumInSegment = seqNum;
   1858                 track->mNewSegment = false;
   1859             }
   1860 
   1861             if (seqNum < track->mFirstSeqNumInSegment) {
   1862                 ALOGV("dropping stale access-unit (%d < %d)",
   1863                      seqNum, track->mFirstSeqNumInSegment);
   1864                 continue;
   1865             }
   1866 
   1867             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1868                 postQueueAccessUnit(trackIndex, accessUnit);
   1869             }
   1870         }
   1871         return OK;
   1872     }
   1873 
   1874     void onAccessUnitComplete(
   1875             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
   1876         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1877         track->mPackets.push_back(accessUnit);
   1878 
   1879         uint32_t seqNum = (uint32_t)accessUnit->int32Data();
   1880         ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
   1881 
   1882         if(!mPlayResponseParsed){
   1883             ALOGV("play response is not parsed");
   1884             return;
   1885         }
   1886 
   1887         handleFirstAccessUnit();
   1888 
   1889         if (!mAllTracksHaveTime) {
   1890             ALOGV("storing accessUnit, no time established yet");
   1891             return;
   1892         }
   1893 
   1894         if (OK != processAccessUnitQueue(trackIndex)) {
   1895             return;
   1896         }
   1897 
   1898         if (track->mEOSReceived) {
   1899             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
   1900             track->mEOSReceived = false;
   1901         }
   1902     }
   1903 
   1904     bool addMediaTimestamp(
   1905             int32_t trackIndex, const TrackInfo *track,
   1906             const sp<ABuffer> &accessUnit) {
   1907         UNUSED_UNLESS_VERBOSE(trackIndex);
   1908 
   1909         uint32_t rtpTime;
   1910         CHECK(accessUnit->meta()->findInt32(
   1911                     "rtp-time", (int32_t *)&rtpTime));
   1912 
   1913         int64_t relRtpTimeUs =
   1914             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
   1915                 / track->mTimeScale;
   1916 
   1917         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
   1918 
   1919         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
   1920 
   1921         if (mediaTimeUs > mLastMediaTimeUs) {
   1922             mLastMediaTimeUs = mediaTimeUs;
   1923         }
   1924 
   1925         if (mediaTimeUs < 0 && !mSeekable) {
   1926             ALOGV("dropping early accessUnit.");
   1927             return false;
   1928         }
   1929 
   1930         ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
   1931              trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
   1932 
   1933         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
   1934 
   1935         return true;
   1936     }
   1937 
   1938     void postQueueAccessUnit(
   1939             size_t trackIndex, const sp<ABuffer> &accessUnit) {
   1940         sp<AMessage> msg = mNotify->dup();
   1941         msg->setInt32("what", kWhatAccessUnit);
   1942         msg->setSize("trackIndex", trackIndex);
   1943         msg->setBuffer("accessUnit", accessUnit);
   1944         msg->post();
   1945     }
   1946 
   1947     void postQueueEOS(size_t trackIndex, status_t finalResult) {
   1948         sp<AMessage> msg = mNotify->dup();
   1949         msg->setInt32("what", kWhatEOS);
   1950         msg->setSize("trackIndex", trackIndex);
   1951         msg->setInt32("finalResult", finalResult);
   1952         msg->post();
   1953     }
   1954 
   1955     void postQueueSeekDiscontinuity(size_t trackIndex) {
   1956         sp<AMessage> msg = mNotify->dup();
   1957         msg->setInt32("what", kWhatSeekDiscontinuity);
   1958         msg->setSize("trackIndex", trackIndex);
   1959         msg->post();
   1960     }
   1961 
   1962     void postNormalPlayTimeMapping(
   1963             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
   1964         sp<AMessage> msg = mNotify->dup();
   1965         msg->setInt32("what", kWhatNormalPlayTimeMapping);
   1966         msg->setSize("trackIndex", trackIndex);
   1967         msg->setInt32("rtpTime", rtpTime);
   1968         msg->setInt64("nptUs", nptUs);
   1969         msg->post();
   1970     }
   1971 
   1972     void postTimeout() {
   1973         sp<AMessage> timeout = new AMessage('tiou', this);
   1974         mCheckTimeoutGeneration++;
   1975         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1976 
   1977         int64_t startupTimeoutUs;
   1978         startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
   1979         timeout->post(startupTimeoutUs);
   1980     }
   1981 
   1982     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
   1983 };
   1984 
   1985 }  // namespace android
   1986 
   1987 #endif  // MY_HANDLER_H_
   1988