Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #ifndef MY_HANDLER_H_
     18 
     19 #define MY_HANDLER_H_
     20 
     21 //#define LOG_NDEBUG 0
     22 
     23 #ifndef LOG_TAG
     24 #define LOG_TAG "MyHandler"
     25 #endif
     26 
     27 #include <utils/Log.h>
     28 
     29 #include "APacketSource.h"
     30 #include "ARTPConnection.h"
     31 #include "ARTSPConnection.h"
     32 #include "ASessionDescription.h"
     33 
     34 #include <ctype.h>
     35 
     36 #include <media/stagefright/foundation/ABuffer.h>
     37 #include <media/stagefright/foundation/ADebug.h>
     38 #include <media/stagefright/foundation/ALooper.h>
     39 #include <media/stagefright/foundation/AMessage.h>
     40 #include <media/stagefright/MediaErrors.h>
     41 #include <media/stagefright/Utils.h>
     42 
     43 #include <arpa/inet.h>
     44 #include <sys/socket.h>
     45 #include <netdb.h>
     46 
     47 #include "HTTPBase.h"
     48 
     49 #if LOG_NDEBUG
     50 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
     51 #else
     52 #define UNUSED_UNLESS_VERBOSE(x)
     53 #endif
     54 
     55 // If no access units are received within 5 secs, assume that the rtp
     56 // stream has ended and signal end of stream.
     57 static int64_t kAccessUnitTimeoutUs = 10000000ll;
     58 
     59 // If no access units arrive for the first 10 secs after starting the
     60 // stream, assume none ever will and signal EOS or switch transports.
     61 static int64_t kStartupTimeoutUs = 10000000ll;
     62 
     63 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
     64 
     65 static int64_t kPauseDelayUs = 3000000ll;
     66 
     67 // The allowed maximum number of stale access units at the beginning of
     68 // a new sequence.
     69 static int32_t kMaxAllowedStaleAccessUnits = 20;
     70 
     71 namespace android {
     72 
     73 static bool GetAttribute(const char *s, const char *key, AString *value) {
     74     value->clear();
     75 
     76     size_t keyLen = strlen(key);
     77 
     78     for (;;) {
     79         while (isspace(*s)) {
     80             ++s;
     81         }
     82 
     83         const char *colonPos = strchr(s, ';');
     84 
     85         size_t len =
     86             (colonPos == NULL) ? strlen(s) : colonPos - s;
     87 
     88         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
     89             value->setTo(&s[keyLen + 1], len - keyLen - 1);
     90             return true;
     91         }
     92 
     93         if (colonPos == NULL) {
     94             return false;
     95         }
     96 
     97         s = colonPos + 1;
     98     }
     99 }
    100 
    101 struct MyHandler : public AHandler {
    102     enum {
    103         kWhatConnected                  = 'conn',
    104         kWhatDisconnected               = 'disc',
    105         kWhatSeekPaused                 = 'spau',
    106         kWhatSeekDone                   = 'sdon',
    107 
    108         kWhatAccessUnit                 = 'accU',
    109         kWhatEOS                        = 'eos!',
    110         kWhatSeekDiscontinuity          = 'seeD',
    111         kWhatNormalPlayTimeMapping      = 'nptM',
    112     };
    113 
    114     MyHandler(
    115             const char *url,
    116             const sp<AMessage> &notify,
    117             bool uidValid = false, uid_t uid = 0)
    118         : mNotify(notify),
    119           mUIDValid(uidValid),
    120           mUID(uid),
    121           mNetLooper(new ALooper),
    122           mConn(new ARTSPConnection(mUIDValid, mUID)),
    123           mRTPConn(new ARTPConnection),
    124           mOriginalSessionURL(url),
    125           mSessionURL(url),
    126           mSetupTracksSuccessful(false),
    127           mSeekPending(false),
    128           mFirstAccessUnit(true),
    129           mAllTracksHaveTime(false),
    130           mNTPAnchorUs(-1),
    131           mMediaAnchorUs(-1),
    132           mLastMediaTimeUs(0),
    133           mNumAccessUnitsReceived(0),
    134           mCheckPending(false),
    135           mCheckGeneration(0),
    136           mCheckTimeoutGeneration(0),
    137           mTryTCPInterleaving(false),
    138           mTryFakeRTCP(false),
    139           mReceivedFirstRTCPPacket(false),
    140           mReceivedFirstRTPPacket(false),
    141           mSeekable(true),
    142           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
    143           mKeepAliveGeneration(0),
    144           mPausing(false),
    145           mPauseGeneration(0),
    146           mPlayResponseParsed(false) {
    147         mNetLooper->setName("rtsp net");
    148         mNetLooper->start(false /* runOnCallingThread */,
    149                           false /* canCallJava */,
    150                           PRIORITY_HIGHEST);
    151 
    152         // Strip any authentication info from the session url, we don't
    153         // want to transmit user/pass in cleartext.
    154         AString host, path, user, pass;
    155         unsigned port;
    156         CHECK(ARTSPConnection::ParseURL(
    157                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
    158 
    159         if (user.size() > 0) {
    160             mSessionURL.clear();
    161             mSessionURL.append("rtsp://");
    162             mSessionURL.append(host);
    163             mSessionURL.append(":");
    164             mSessionURL.append(AStringPrintf("%u", port));
    165             mSessionURL.append(path);
    166 
    167             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
    168         }
    169 
    170         mSessionHost = host;
    171     }
    172 
    173     void connect() {
    174         looper()->registerHandler(mConn);
    175         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    176 
    177         sp<AMessage> notify = new AMessage('biny', this);
    178         mConn->observeBinaryData(notify);
    179 
    180         sp<AMessage> reply = new AMessage('conn', this);
    181         mConn->connect(mOriginalSessionURL.c_str(), reply);
    182     }
    183 
    184     void loadSDP(const sp<ASessionDescription>& desc) {
    185         looper()->registerHandler(mConn);
    186         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
    187 
    188         sp<AMessage> notify = new AMessage('biny', this);
    189         mConn->observeBinaryData(notify);
    190 
    191         sp<AMessage> reply = new AMessage('sdpl', this);
    192         reply->setObject("description", desc);
    193         mConn->connect(mOriginalSessionURL.c_str(), reply);
    194     }
    195 
    196     AString getControlURL() {
    197         AString sessionLevelControlURL;
    198         if (mSessionDesc->findAttribute(
    199                 0,
    200                 "a=control",
    201                 &sessionLevelControlURL)) {
    202             if (sessionLevelControlURL.compare("*") == 0) {
    203                 return mBaseURL;
    204             } else {
    205                 AString controlURL;
    206                 CHECK(MakeURL(
    207                         mBaseURL.c_str(),
    208                         sessionLevelControlURL.c_str(),
    209                         &controlURL));
    210                 return controlURL;
    211             }
    212         } else {
    213             return mSessionURL;
    214         }
    215     }
    216 
    217     void disconnect() {
    218         (new AMessage('abor', this))->post();
    219     }
    220 
    221     void seek(int64_t timeUs) {
    222         sp<AMessage> msg = new AMessage('seek', this);
    223         msg->setInt64("time", timeUs);
    224         mPauseGeneration++;
    225         msg->post();
    226     }
    227 
    228     void continueSeekAfterPause(int64_t timeUs) {
    229         sp<AMessage> msg = new AMessage('see1', this);
    230         msg->setInt64("time", timeUs);
    231         msg->post();
    232     }
    233 
    234     bool isSeekable() const {
    235         return mSeekable;
    236     }
    237 
    238     void pause() {
    239         sp<AMessage> msg = new AMessage('paus', this);
    240         mPauseGeneration++;
    241         msg->setInt32("pausecheck", mPauseGeneration);
    242         msg->post();
    243     }
    244 
    245     void resume() {
    246         sp<AMessage> msg = new AMessage('resu', this);
    247         mPauseGeneration++;
    248         msg->post();
    249     }
    250 
    251     static void addRR(const sp<ABuffer> &buf) {
    252         uint8_t *ptr = buf->data() + buf->size();
    253         ptr[0] = 0x80 | 0;
    254         ptr[1] = 201;  // RR
    255         ptr[2] = 0;
    256         ptr[3] = 1;
    257         ptr[4] = 0xde;  // SSRC
    258         ptr[5] = 0xad;
    259         ptr[6] = 0xbe;
    260         ptr[7] = 0xef;
    261 
    262         buf->setRange(0, buf->size() + 8);
    263     }
    264 
    265     static void addSDES(int s, const sp<ABuffer> &buffer) {
    266         struct sockaddr_in addr;
    267         socklen_t addrSize = sizeof(addr);
    268         if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
    269             inet_aton("0.0.0.0", &(addr.sin_addr));
    270         }
    271 
    272         uint8_t *data = buffer->data() + buffer->size();
    273         data[0] = 0x80 | 1;
    274         data[1] = 202;  // SDES
    275         data[4] = 0xde;  // SSRC
    276         data[5] = 0xad;
    277         data[6] = 0xbe;
    278         data[7] = 0xef;
    279 
    280         size_t offset = 8;
    281 
    282         data[offset++] = 1;  // CNAME
    283 
    284         AString cname = "stagefright@";
    285         cname.append(inet_ntoa(addr.sin_addr));
    286         data[offset++] = cname.size();
    287 
    288         memcpy(&data[offset], cname.c_str(), cname.size());
    289         offset += cname.size();
    290 
    291         data[offset++] = 6;  // TOOL
    292 
    293         AString tool = MakeUserAgent();
    294 
    295         data[offset++] = tool.size();
    296 
    297         memcpy(&data[offset], tool.c_str(), tool.size());
    298         offset += tool.size();
    299 
    300         data[offset++] = 0;
    301 
    302         if ((offset % 4) > 0) {
    303             size_t count = 4 - (offset % 4);
    304             switch (count) {
    305                 case 3:
    306                     data[offset++] = 0;
    307                 case 2:
    308                     data[offset++] = 0;
    309                 case 1:
    310                     data[offset++] = 0;
    311             }
    312         }
    313 
    314         size_t numWords = (offset / 4) - 1;
    315         data[2] = numWords >> 8;
    316         data[3] = numWords & 0xff;
    317 
    318         buffer->setRange(buffer->offset(), buffer->size() + offset);
    319     }
    320 
    321     // In case we're behind NAT, fire off two UDP packets to the remote
    322     // rtp/rtcp ports to poke a hole into the firewall for future incoming
    323     // packets. We're going to send an RR/SDES RTCP packet to both of them.
    324     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
    325         struct sockaddr_in addr;
    326         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    327         addr.sin_family = AF_INET;
    328 
    329         AString source;
    330         AString server_port;
    331         if (!GetAttribute(transport.c_str(),
    332                           "source",
    333                           &source)) {
    334             ALOGW("Missing 'source' field in Transport response. Using "
    335                  "RTSP endpoint address.");
    336 
    337             struct hostent *ent = gethostbyname(mSessionHost.c_str());
    338             if (ent == NULL) {
    339                 ALOGE("Failed to look up address of session host '%s'",
    340                      mSessionHost.c_str());
    341 
    342                 return false;
    343             }
    344 
    345             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    346         } else {
    347             addr.sin_addr.s_addr = inet_addr(source.c_str());
    348         }
    349 
    350         if (!GetAttribute(transport.c_str(),
    351                                  "server_port",
    352                                  &server_port)) {
    353             ALOGI("Missing 'server_port' field in Transport response.");
    354             return false;
    355         }
    356 
    357         int rtpPort, rtcpPort;
    358         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
    359                 || rtpPort <= 0 || rtpPort > 65535
    360                 || rtcpPort <=0 || rtcpPort > 65535
    361                 || rtcpPort != rtpPort + 1) {
    362             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
    363                  " RTP port must be even, RTCP port must be one higher.",
    364                  server_port.c_str());
    365 
    366             return false;
    367         }
    368 
    369         if (rtpPort & 1) {
    370             ALOGW("Server picked an odd RTP port, it should've picked an "
    371                  "even one, we'll let it pass for now, but this may break "
    372                  "in the future.");
    373         }
    374 
    375         if (addr.sin_addr.s_addr == INADDR_NONE) {
    376             return true;
    377         }
    378 
    379         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
    380             // No firewalls to traverse on the loopback interface.
    381             return true;
    382         }
    383 
    384         // Make up an RR/SDES RTCP packet.
    385         sp<ABuffer> buf = new ABuffer(65536);
    386         buf->setRange(0, 0);
    387         addRR(buf);
    388         addSDES(rtpSocket, buf);
    389 
    390         addr.sin_port = htons(rtpPort);
    391 
    392         ssize_t n = sendto(
    393                 rtpSocket, buf->data(), buf->size(), 0,
    394                 (const sockaddr *)&addr, sizeof(addr));
    395 
    396         if (n < (ssize_t)buf->size()) {
    397             ALOGE("failed to poke a hole for RTP packets");
    398             return false;
    399         }
    400 
    401         addr.sin_port = htons(rtcpPort);
    402 
    403         n = sendto(
    404                 rtcpSocket, buf->data(), buf->size(), 0,
    405                 (const sockaddr *)&addr, sizeof(addr));
    406 
    407         if (n < (ssize_t)buf->size()) {
    408             ALOGE("failed to poke a hole for RTCP packets");
    409             return false;
    410         }
    411 
    412         ALOGV("successfully poked holes.");
    413 
    414         return true;
    415     }
    416 
    417     static bool isLiveStream(const sp<ASessionDescription> &desc) {
    418         AString attrLiveStream;
    419         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
    420             ssize_t semicolonPos = attrLiveStream.find(";", 2);
    421 
    422             const char* liveStreamValue;
    423             if (semicolonPos < 0) {
    424                 liveStreamValue = attrLiveStream.c_str();
    425             } else {
    426                 AString valString;
    427                 valString.setTo(attrLiveStream,
    428                         semicolonPos + 1,
    429                         attrLiveStream.size() - semicolonPos - 1);
    430                 liveStreamValue = valString.c_str();
    431             }
    432 
    433             uint32_t value = strtoul(liveStreamValue, NULL, 10);
    434             if (value == 1) {
    435                 ALOGV("found live stream");
    436                 return true;
    437             }
    438         } else {
    439             // It is a live stream if no duration is returned
    440             int64_t durationUs;
    441             if (!desc->getDurationUs(&durationUs)) {
    442                 ALOGV("No duration found, assume live stream");
    443                 return true;
    444             }
    445         }
    446 
    447         return false;
    448     }
    449 
    450     virtual void onMessageReceived(const sp<AMessage> &msg) {
    451         switch (msg->what()) {
    452             case 'conn':
    453             {
    454                 int32_t result;
    455                 CHECK(msg->findInt32("result", &result));
    456 
    457                 ALOGI("connection request completed with result %d (%s)",
    458                      result, strerror(-result));
    459 
    460                 if (result == OK) {
    461                     AString request;
    462                     request = "DESCRIBE ";
    463                     request.append(mSessionURL);
    464                     request.append(" RTSP/1.0\r\n");
    465                     request.append("Accept: application/sdp\r\n");
    466                     request.append("\r\n");
    467 
    468                     sp<AMessage> reply = new AMessage('desc', this);
    469                     mConn->sendRequest(request.c_str(), reply);
    470                 } else {
    471                     (new AMessage('disc', this))->post();
    472                 }
    473                 break;
    474             }
    475 
    476             case 'disc':
    477             {
    478                 ++mKeepAliveGeneration;
    479 
    480                 int32_t reconnect;
    481                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    482                     sp<AMessage> reply = new AMessage('conn', this);
    483                     mConn->connect(mOriginalSessionURL.c_str(), reply);
    484                 } else {
    485                     (new AMessage('quit', this))->post();
    486                 }
    487                 break;
    488             }
    489 
    490             case 'desc':
    491             {
    492                 int32_t result;
    493                 CHECK(msg->findInt32("result", &result));
    494 
    495                 ALOGI("DESCRIBE completed with result %d (%s)",
    496                      result, strerror(-result));
    497 
    498                 if (result == OK) {
    499                     sp<RefBase> obj;
    500                     CHECK(msg->findObject("response", &obj));
    501                     sp<ARTSPResponse> response =
    502                         static_cast<ARTSPResponse *>(obj.get());
    503 
    504                     if (response->mStatusCode == 301 || response->mStatusCode == 302) {
    505                         ssize_t i = response->mHeaders.indexOfKey("location");
    506                         CHECK_GE(i, 0);
    507 
    508                         mOriginalSessionURL = response->mHeaders.valueAt(i);
    509                         mSessionURL = mOriginalSessionURL;
    510 
    511                         // Strip any authentication info from the session url, we don't
    512                         // want to transmit user/pass in cleartext.
    513                         AString host, path, user, pass;
    514                         unsigned port;
    515                         if (ARTSPConnection::ParseURL(
    516                                     mSessionURL.c_str(), &host, &port, &path, &user, &pass)
    517                                 && user.size() > 0) {
    518                             mSessionURL.clear();
    519                             mSessionURL.append("rtsp://");
    520                             mSessionURL.append(host);
    521                             mSessionURL.append(":");
    522                             mSessionURL.append(AStringPrintf("%u", port));
    523                             mSessionURL.append(path);
    524 
    525                             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
    526                         }
    527 
    528                         sp<AMessage> reply = new AMessage('conn', this);
    529                         mConn->connect(mOriginalSessionURL.c_str(), reply);
    530                         break;
    531                     }
    532 
    533                     if (response->mStatusCode != 200) {
    534                         result = UNKNOWN_ERROR;
    535                     } else if (response->mContent == NULL) {
    536                         result = ERROR_MALFORMED;
    537                         ALOGE("The response has no content.");
    538                     } else {
    539                         mSessionDesc = new ASessionDescription;
    540 
    541                         mSessionDesc->setTo(
    542                                 response->mContent->data(),
    543                                 response->mContent->size());
    544 
    545                         if (!mSessionDesc->isValid()) {
    546                             ALOGE("Failed to parse session description.");
    547                             result = ERROR_MALFORMED;
    548                         } else {
    549                             ssize_t i = response->mHeaders.indexOfKey("content-base");
    550                             if (i >= 0) {
    551                                 mBaseURL = response->mHeaders.valueAt(i);
    552                             } else {
    553                                 i = response->mHeaders.indexOfKey("content-location");
    554                                 if (i >= 0) {
    555                                     mBaseURL = response->mHeaders.valueAt(i);
    556                                 } else {
    557                                     mBaseURL = mSessionURL;
    558                                 }
    559                             }
    560 
    561                             mSeekable = !isLiveStream(mSessionDesc);
    562 
    563                             if (!mBaseURL.startsWith("rtsp://")) {
    564                                 // Some misbehaving servers specify a relative
    565                                 // URL in one of the locations above, combine
    566                                 // it with the absolute session URL to get
    567                                 // something usable...
    568 
    569                                 ALOGW("Server specified a non-absolute base URL"
    570                                      ", combining it with the session URL to "
    571                                      "get something usable...");
    572 
    573                                 AString tmp;
    574                                 CHECK(MakeURL(
    575                                             mSessionURL.c_str(),
    576                                             mBaseURL.c_str(),
    577                                             &tmp));
    578 
    579                                 mBaseURL = tmp;
    580                             }
    581 
    582                             mControlURL = getControlURL();
    583 
    584                             if (mSessionDesc->countTracks() < 2) {
    585                                 // There's no actual tracks in this session.
    586                                 // The first "track" is merely session meta
    587                                 // data.
    588 
    589                                 ALOGW("Session doesn't contain any playable "
    590                                      "tracks. Aborting.");
    591                                 result = ERROR_UNSUPPORTED;
    592                             } else {
    593                                 setupTrack(1);
    594                             }
    595                         }
    596                     }
    597                 }
    598 
    599                 if (result != OK) {
    600                     sp<AMessage> reply = new AMessage('disc', this);
    601                     mConn->disconnect(reply);
    602                 }
    603                 break;
    604             }
    605 
    606             case 'sdpl':
    607             {
    608                 int32_t result;
    609                 CHECK(msg->findInt32("result", &result));
    610 
    611                 ALOGI("SDP connection request completed with result %d (%s)",
    612                      result, strerror(-result));
    613 
    614                 if (result == OK) {
    615                     sp<RefBase> obj;
    616                     CHECK(msg->findObject("description", &obj));
    617                     mSessionDesc =
    618                         static_cast<ASessionDescription *>(obj.get());
    619 
    620                     if (!mSessionDesc->isValid()) {
    621                         ALOGE("Failed to parse session description.");
    622                         result = ERROR_MALFORMED;
    623                     } else {
    624                         mBaseURL = mSessionURL;
    625 
    626                         mSeekable = !isLiveStream(mSessionDesc);
    627 
    628                         mControlURL = getControlURL();
    629 
    630                         if (mSessionDesc->countTracks() < 2) {
    631                             // There's no actual tracks in this session.
    632                             // The first "track" is merely session meta
    633                             // data.
    634 
    635                             ALOGW("Session doesn't contain any playable "
    636                                  "tracks. Aborting.");
    637                             result = ERROR_UNSUPPORTED;
    638                         } else {
    639                             setupTrack(1);
    640                         }
    641                     }
    642                 }
    643 
    644                 if (result != OK) {
    645                     sp<AMessage> reply = new AMessage('disc', this);
    646                     mConn->disconnect(reply);
    647                 }
    648                 break;
    649             }
    650 
    651             case 'setu':
    652             {
    653                 size_t index;
    654                 CHECK(msg->findSize("index", &index));
    655 
    656                 TrackInfo *track = NULL;
    657                 size_t trackIndex;
    658                 if (msg->findSize("track-index", &trackIndex)) {
    659                     track = &mTracks.editItemAt(trackIndex);
    660                 }
    661 
    662                 int32_t result;
    663                 CHECK(msg->findInt32("result", &result));
    664 
    665                 ALOGI("SETUP(%zu) completed with result %d (%s)",
    666                      index, result, strerror(-result));
    667 
    668                 if (result == OK) {
    669                     CHECK(track != NULL);
    670 
    671                     sp<RefBase> obj;
    672                     CHECK(msg->findObject("response", &obj));
    673                     sp<ARTSPResponse> response =
    674                         static_cast<ARTSPResponse *>(obj.get());
    675 
    676                     if (response->mStatusCode != 200) {
    677                         result = UNKNOWN_ERROR;
    678                     } else {
    679                         ssize_t i = response->mHeaders.indexOfKey("session");
    680                         CHECK_GE(i, 0);
    681 
    682                         mSessionID = response->mHeaders.valueAt(i);
    683 
    684                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    685                         AString timeoutStr;
    686                         if (GetAttribute(
    687                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
    688                             char *end;
    689                             unsigned long timeoutSecs =
    690                                 strtoul(timeoutStr.c_str(), &end, 10);
    691 
    692                             if (end == timeoutStr.c_str() || *end != '\0') {
    693                                 ALOGW("server specified malformed timeout '%s'",
    694                                      timeoutStr.c_str());
    695 
    696                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    697                             } else if (timeoutSecs < 15) {
    698                                 ALOGW("server specified too short a timeout "
    699                                      "(%lu secs), using default.",
    700                                      timeoutSecs);
    701 
    702                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
    703                             } else {
    704                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
    705 
    706                                 ALOGI("server specified timeout of %lu secs.",
    707                                      timeoutSecs);
    708                             }
    709                         }
    710 
    711                         i = mSessionID.find(";");
    712                         if (i >= 0) {
    713                             // Remove options, i.e. ";timeout=90"
    714                             mSessionID.erase(i, mSessionID.size() - i);
    715                         }
    716 
    717                         sp<AMessage> notify = new AMessage('accu', this);
    718                         notify->setSize("track-index", trackIndex);
    719 
    720                         i = response->mHeaders.indexOfKey("transport");
    721                         CHECK_GE(i, 0);
    722 
    723                         if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
    724                             if (!track->mUsingInterleavedTCP) {
    725                                 AString transport = response->mHeaders.valueAt(i);
    726 
    727                                 // We are going to continue even if we were
    728                                 // unable to poke a hole into the firewall...
    729                                 pokeAHole(
    730                                         track->mRTPSocket,
    731                                         track->mRTCPSocket,
    732                                         transport);
    733                             }
    734 
    735                             mRTPConn->addStream(
    736                                     track->mRTPSocket, track->mRTCPSocket,
    737                                     mSessionDesc, index,
    738                                     notify, track->mUsingInterleavedTCP);
    739 
    740                             mSetupTracksSuccessful = true;
    741                         } else {
    742                             result = BAD_VALUE;
    743                         }
    744                     }
    745                 }
    746 
    747                 if (result != OK) {
    748                     if (track) {
    749                         if (!track->mUsingInterleavedTCP) {
    750                             // Clear the tag
    751                             if (mUIDValid) {
    752                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
    753                                 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
    754                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
    755                                 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
    756                             }
    757 
    758                             close(track->mRTPSocket);
    759                             close(track->mRTCPSocket);
    760                         }
    761 
    762                         mTracks.removeItemsAt(trackIndex);
    763                     }
    764                 }
    765 
    766                 ++index;
    767                 if (result == OK && index < mSessionDesc->countTracks()) {
    768                     setupTrack(index);
    769                 } else if (mSetupTracksSuccessful) {
    770                     ++mKeepAliveGeneration;
    771                     postKeepAlive();
    772 
    773                     AString request = "PLAY ";
    774                     request.append(mControlURL);
    775                     request.append(" RTSP/1.0\r\n");
    776 
    777                     request.append("Session: ");
    778                     request.append(mSessionID);
    779                     request.append("\r\n");
    780 
    781                     request.append("\r\n");
    782 
    783                     sp<AMessage> reply = new AMessage('play', this);
    784                     mConn->sendRequest(request.c_str(), reply);
    785                 } else {
    786                     sp<AMessage> reply = new AMessage('disc', this);
    787                     mConn->disconnect(reply);
    788                 }
    789                 break;
    790             }
    791 
    792             case 'play':
    793             {
    794                 int32_t result;
    795                 CHECK(msg->findInt32("result", &result));
    796 
    797                 ALOGI("PLAY completed with result %d (%s)",
    798                      result, strerror(-result));
    799 
    800                 if (result == OK) {
    801                     sp<RefBase> obj;
    802                     CHECK(msg->findObject("response", &obj));
    803                     sp<ARTSPResponse> response =
    804                         static_cast<ARTSPResponse *>(obj.get());
    805 
    806                     if (response->mStatusCode != 200) {
    807                         result = UNKNOWN_ERROR;
    808                     } else {
    809                         parsePlayResponse(response);
    810 
    811                         sp<AMessage> timeout = new AMessage('tiou', this);
    812                         mCheckTimeoutGeneration++;
    813                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
    814                         timeout->post(kStartupTimeoutUs);
    815                     }
    816                 }
    817 
    818                 if (result != OK) {
    819                     sp<AMessage> reply = new AMessage('disc', this);
    820                     mConn->disconnect(reply);
    821                 }
    822 
    823                 break;
    824             }
    825 
    826             case 'aliv':
    827             {
    828                 int32_t generation;
    829                 CHECK(msg->findInt32("generation", &generation));
    830 
    831                 if (generation != mKeepAliveGeneration) {
    832                     // obsolete event.
    833                     break;
    834                 }
    835 
    836                 AString request;
    837                 request.append("OPTIONS ");
    838                 request.append(mSessionURL);
    839                 request.append(" RTSP/1.0\r\n");
    840                 request.append("Session: ");
    841                 request.append(mSessionID);
    842                 request.append("\r\n");
    843                 request.append("\r\n");
    844 
    845                 sp<AMessage> reply = new AMessage('opts', this);
    846                 reply->setInt32("generation", mKeepAliveGeneration);
    847                 mConn->sendRequest(request.c_str(), reply);
    848                 break;
    849             }
    850 
    851             case 'opts':
    852             {
    853                 int32_t result;
    854                 CHECK(msg->findInt32("result", &result));
    855 
    856                 ALOGI("OPTIONS completed with result %d (%s)",
    857                      result, strerror(-result));
    858 
    859                 int32_t generation;
    860                 CHECK(msg->findInt32("generation", &generation));
    861 
    862                 if (generation != mKeepAliveGeneration) {
    863                     // obsolete event.
    864                     break;
    865                 }
    866 
    867                 postKeepAlive();
    868                 break;
    869             }
    870 
    871             case 'abor':
    872             {
    873                 for (size_t i = 0; i < mTracks.size(); ++i) {
    874                     TrackInfo *info = &mTracks.editItemAt(i);
    875 
    876                     if (!mFirstAccessUnit) {
    877                         postQueueEOS(i, ERROR_END_OF_STREAM);
    878                     }
    879 
    880                     if (!info->mUsingInterleavedTCP) {
    881                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
    882 
    883                         // Clear the tag
    884                         if (mUIDValid) {
    885                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
    886                             HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
    887                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
    888                             HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
    889                         }
    890 
    891                         close(info->mRTPSocket);
    892                         close(info->mRTCPSocket);
    893                     }
    894                 }
    895                 mTracks.clear();
    896                 mSetupTracksSuccessful = false;
    897                 mSeekPending = false;
    898                 mFirstAccessUnit = true;
    899                 mAllTracksHaveTime = false;
    900                 mNTPAnchorUs = -1;
    901                 mMediaAnchorUs = -1;
    902                 mNumAccessUnitsReceived = 0;
    903                 mReceivedFirstRTCPPacket = false;
    904                 mReceivedFirstRTPPacket = false;
    905                 mPausing = false;
    906                 mSeekable = true;
    907 
    908                 sp<AMessage> reply = new AMessage('tear', this);
    909 
    910                 int32_t reconnect;
    911                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    912                     reply->setInt32("reconnect", true);
    913                 }
    914 
    915                 AString request;
    916                 request = "TEARDOWN ";
    917 
    918                 // XXX should use aggregate url from SDP here...
    919                 request.append(mSessionURL);
    920                 request.append(" RTSP/1.0\r\n");
    921 
    922                 request.append("Session: ");
    923                 request.append(mSessionID);
    924                 request.append("\r\n");
    925 
    926                 request.append("\r\n");
    927 
    928                 mConn->sendRequest(request.c_str(), reply);
    929                 break;
    930             }
    931 
    932             case 'tear':
    933             {
    934                 int32_t result;
    935                 CHECK(msg->findInt32("result", &result));
    936 
    937                 ALOGI("TEARDOWN completed with result %d (%s)",
    938                      result, strerror(-result));
    939 
    940                 sp<AMessage> reply = new AMessage('disc', this);
    941 
    942                 int32_t reconnect;
    943                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    944                     reply->setInt32("reconnect", true);
    945                 }
    946 
    947                 mConn->disconnect(reply);
    948                 break;
    949             }
    950 
    951             case 'quit':
    952             {
    953                 sp<AMessage> msg = mNotify->dup();
    954                 msg->setInt32("what", kWhatDisconnected);
    955                 msg->setInt32("result", UNKNOWN_ERROR);
    956                 msg->post();
    957                 break;
    958             }
    959 
    960             case 'chek':
    961             {
    962                 int32_t generation;
    963                 CHECK(msg->findInt32("generation", &generation));
    964                 if (generation != mCheckGeneration) {
    965                     // This is an outdated message. Ignore.
    966                     break;
    967                 }
    968 
    969                 if (mNumAccessUnitsReceived == 0) {
    970 #if 1
    971                     ALOGI("stream ended? aborting.");
    972                     (new AMessage('abor', this))->post();
    973                     break;
    974 #else
    975                     ALOGI("haven't seen an AU in a looong time.");
    976 #endif
    977                 }
    978 
    979                 mNumAccessUnitsReceived = 0;
    980                 msg->post(kAccessUnitTimeoutUs);
    981                 break;
    982             }
    983 
    984             case 'accu':
    985             {
    986                 if (mSeekPending) {
    987                     ALOGV("Stale access unit.");
    988                     break;
    989                 }
    990 
    991                 int32_t timeUpdate;
    992                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
    993                     size_t trackIndex;
    994                     CHECK(msg->findSize("track-index", &trackIndex));
    995 
    996                     uint32_t rtpTime;
    997                     uint64_t ntpTime;
    998                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
    999                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
   1000 
   1001                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
   1002                     break;
   1003                 }
   1004 
   1005                 int32_t first;
   1006                 if (msg->findInt32("first-rtcp", &first)) {
   1007                     mReceivedFirstRTCPPacket = true;
   1008                     break;
   1009                 }
   1010 
   1011                 if (msg->findInt32("first-rtp", &first)) {
   1012                     mReceivedFirstRTPPacket = true;
   1013                     break;
   1014                 }
   1015 
   1016                 ++mNumAccessUnitsReceived;
   1017                 postAccessUnitTimeoutCheck();
   1018 
   1019                 size_t trackIndex;
   1020                 CHECK(msg->findSize("track-index", &trackIndex));
   1021 
   1022                 if (trackIndex >= mTracks.size()) {
   1023                     ALOGV("late packets ignored.");
   1024                     break;
   1025                 }
   1026 
   1027                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1028 
   1029                 int32_t eos;
   1030                 if (msg->findInt32("eos", &eos)) {
   1031                     ALOGI("received BYE on track index %zu", trackIndex);
   1032                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
   1033                         ALOGI("No time established => fake existing data");
   1034 
   1035                         track->mEOSReceived = true;
   1036                         mTryFakeRTCP = true;
   1037                         mReceivedFirstRTCPPacket = true;
   1038                         fakeTimestamps();
   1039                     } else {
   1040                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
   1041                     }
   1042                     return;
   1043                 }
   1044 
   1045                 if (mSeekPending) {
   1046                     ALOGV("we're seeking, dropping stale packet.");
   1047                     break;
   1048                 }
   1049 
   1050                 sp<ABuffer> accessUnit;
   1051                 CHECK(msg->findBuffer("access-unit", &accessUnit));
   1052                 onAccessUnitComplete(trackIndex, accessUnit);
   1053                 break;
   1054             }
   1055 
   1056             case 'paus':
   1057             {
   1058                 int32_t generation;
   1059                 CHECK(msg->findInt32("pausecheck", &generation));
   1060                 if (generation != mPauseGeneration) {
   1061                     ALOGV("Ignoring outdated pause message.");
   1062                     break;
   1063                 }
   1064 
   1065                 if (!mSeekable) {
   1066                     ALOGW("This is a live stream, ignoring pause request.");
   1067                     break;
   1068                 }
   1069 
   1070                 if (mPausing) {
   1071                     ALOGV("This stream is already paused.");
   1072                     break;
   1073                 }
   1074 
   1075                 mCheckPending = true;
   1076                 ++mCheckGeneration;
   1077                 mPausing = true;
   1078 
   1079                 AString request = "PAUSE ";
   1080                 request.append(mControlURL);
   1081                 request.append(" RTSP/1.0\r\n");
   1082 
   1083                 request.append("Session: ");
   1084                 request.append(mSessionID);
   1085                 request.append("\r\n");
   1086 
   1087                 request.append("\r\n");
   1088 
   1089                 sp<AMessage> reply = new AMessage('pau2', this);
   1090                 mConn->sendRequest(request.c_str(), reply);
   1091                 break;
   1092             }
   1093 
   1094             case 'pau2':
   1095             {
   1096                 int32_t result;
   1097                 CHECK(msg->findInt32("result", &result));
   1098                 mCheckTimeoutGeneration++;
   1099 
   1100                 ALOGI("PAUSE completed with result %d (%s)",
   1101                      result, strerror(-result));
   1102                 break;
   1103             }
   1104 
   1105             case 'resu':
   1106             {
   1107                 if (mPausing && mSeekPending) {
   1108                     // If seeking, Play will be sent from see1 instead
   1109                     break;
   1110                 }
   1111 
   1112                 if (!mPausing) {
   1113                     // Dont send PLAY if we have not paused
   1114                     break;
   1115                 }
   1116                 AString request = "PLAY ";
   1117                 request.append(mControlURL);
   1118                 request.append(" RTSP/1.0\r\n");
   1119 
   1120                 request.append("Session: ");
   1121                 request.append(mSessionID);
   1122                 request.append("\r\n");
   1123 
   1124                 request.append("\r\n");
   1125 
   1126                 sp<AMessage> reply = new AMessage('res2', this);
   1127                 mConn->sendRequest(request.c_str(), reply);
   1128                 break;
   1129             }
   1130 
   1131             case 'res2':
   1132             {
   1133                 int32_t result;
   1134                 CHECK(msg->findInt32("result", &result));
   1135 
   1136                 ALOGI("PLAY (for resume) completed with result %d (%s)",
   1137                      result, strerror(-result));
   1138 
   1139                 mCheckPending = false;
   1140                 ++mCheckGeneration;
   1141                 postAccessUnitTimeoutCheck();
   1142 
   1143                 if (result == OK) {
   1144                     sp<RefBase> obj;
   1145                     CHECK(msg->findObject("response", &obj));
   1146                     sp<ARTSPResponse> response =
   1147                         static_cast<ARTSPResponse *>(obj.get());
   1148 
   1149                     if (response->mStatusCode != 200) {
   1150                         result = UNKNOWN_ERROR;
   1151                     } else {
   1152                         parsePlayResponse(response);
   1153 
   1154                         // Post new timeout in order to make sure to use
   1155                         // fake timestamps if no new Sender Reports arrive
   1156                         sp<AMessage> timeout = new AMessage('tiou', this);
   1157                         mCheckTimeoutGeneration++;
   1158                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1159                         timeout->post(kStartupTimeoutUs);
   1160                     }
   1161                 }
   1162 
   1163                 if (result != OK) {
   1164                     ALOGE("resume failed, aborting.");
   1165                     (new AMessage('abor', this))->post();
   1166                 }
   1167 
   1168                 mPausing = false;
   1169                 break;
   1170             }
   1171 
   1172             case 'seek':
   1173             {
   1174                 if (!mSeekable) {
   1175                     ALOGW("This is a live stream, ignoring seek request.");
   1176 
   1177                     sp<AMessage> msg = mNotify->dup();
   1178                     msg->setInt32("what", kWhatSeekDone);
   1179                     msg->post();
   1180                     break;
   1181                 }
   1182 
   1183                 int64_t timeUs;
   1184                 CHECK(msg->findInt64("time", &timeUs));
   1185 
   1186                 mSeekPending = true;
   1187 
   1188                 // Disable the access unit timeout until we resumed
   1189                 // playback again.
   1190                 mCheckPending = true;
   1191                 ++mCheckGeneration;
   1192 
   1193                 sp<AMessage> reply = new AMessage('see0', this);
   1194                 reply->setInt64("time", timeUs);
   1195 
   1196                 if (mPausing) {
   1197                     // PAUSE already sent
   1198                     ALOGI("Pause already sent");
   1199                     reply->post();
   1200                     break;
   1201                 }
   1202                 AString request = "PAUSE ";
   1203                 request.append(mControlURL);
   1204                 request.append(" RTSP/1.0\r\n");
   1205 
   1206                 request.append("Session: ");
   1207                 request.append(mSessionID);
   1208                 request.append("\r\n");
   1209 
   1210                 request.append("\r\n");
   1211 
   1212                 mConn->sendRequest(request.c_str(), reply);
   1213                 break;
   1214             }
   1215 
   1216             case 'see0':
   1217             {
   1218                 // Session is paused now.
   1219                 status_t err = OK;
   1220                 msg->findInt32("result", &err);
   1221 
   1222                 int64_t timeUs;
   1223                 CHECK(msg->findInt64("time", &timeUs));
   1224 
   1225                 sp<AMessage> notify = mNotify->dup();
   1226                 notify->setInt32("what", kWhatSeekPaused);
   1227                 notify->setInt32("err", err);
   1228                 notify->setInt64("time", timeUs);
   1229                 notify->post();
   1230                 break;
   1231 
   1232             }
   1233 
   1234             case 'see1':
   1235             {
   1236                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1237                     TrackInfo *info = &mTracks.editItemAt(i);
   1238 
   1239                     postQueueSeekDiscontinuity(i);
   1240                     info->mEOSReceived = false;
   1241 
   1242                     info->mRTPAnchor = 0;
   1243                     info->mNTPAnchorUs = -1;
   1244                 }
   1245 
   1246                 mAllTracksHaveTime = false;
   1247                 mNTPAnchorUs = -1;
   1248 
   1249                 // Start new timeoutgeneration to avoid getting timeout
   1250                 // before PLAY response arrive
   1251                 sp<AMessage> timeout = new AMessage('tiou', this);
   1252                 mCheckTimeoutGeneration++;
   1253                 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1254                 timeout->post(kStartupTimeoutUs);
   1255 
   1256                 int64_t timeUs;
   1257                 CHECK(msg->findInt64("time", &timeUs));
   1258 
   1259                 AString request = "PLAY ";
   1260                 request.append(mControlURL);
   1261                 request.append(" RTSP/1.0\r\n");
   1262 
   1263                 request.append("Session: ");
   1264                 request.append(mSessionID);
   1265                 request.append("\r\n");
   1266 
   1267                 request.append(
   1268                         AStringPrintf(
   1269                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
   1270 
   1271                 request.append("\r\n");
   1272 
   1273                 sp<AMessage> reply = new AMessage('see2', this);
   1274                 mConn->sendRequest(request.c_str(), reply);
   1275                 break;
   1276             }
   1277 
   1278             case 'see2':
   1279             {
   1280                 if (mTracks.size() == 0) {
   1281                     // We have already hit abor, break
   1282                     break;
   1283                 }
   1284 
   1285                 int32_t result;
   1286                 CHECK(msg->findInt32("result", &result));
   1287 
   1288                 ALOGI("PLAY (for seek) completed with result %d (%s)",
   1289                      result, strerror(-result));
   1290 
   1291                 mCheckPending = false;
   1292                 ++mCheckGeneration;
   1293                 postAccessUnitTimeoutCheck();
   1294 
   1295                 if (result == OK) {
   1296                     sp<RefBase> obj;
   1297                     CHECK(msg->findObject("response", &obj));
   1298                     sp<ARTSPResponse> response =
   1299                         static_cast<ARTSPResponse *>(obj.get());
   1300 
   1301                     if (response->mStatusCode != 200) {
   1302                         result = UNKNOWN_ERROR;
   1303                     } else {
   1304                         parsePlayResponse(response);
   1305 
   1306                         // Post new timeout in order to make sure to use
   1307                         // fake timestamps if no new Sender Reports arrive
   1308                         sp<AMessage> timeout = new AMessage('tiou', this);
   1309                         mCheckTimeoutGeneration++;
   1310                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
   1311                         timeout->post(kStartupTimeoutUs);
   1312 
   1313                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
   1314                         CHECK_GE(i, 0);
   1315 
   1316                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
   1317 
   1318                         ALOGI("seek completed.");
   1319                     }
   1320                 }
   1321 
   1322                 if (result != OK) {
   1323                     ALOGE("seek failed, aborting.");
   1324                     (new AMessage('abor', this))->post();
   1325                 }
   1326 
   1327                 mPausing = false;
   1328                 mSeekPending = false;
   1329 
   1330                 // Discard all stale access units.
   1331                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1332                     TrackInfo *track = &mTracks.editItemAt(i);
   1333                     track->mPackets.clear();
   1334                 }
   1335 
   1336                 sp<AMessage> msg = mNotify->dup();
   1337                 msg->setInt32("what", kWhatSeekDone);
   1338                 msg->post();
   1339                 break;
   1340             }
   1341 
   1342             case 'biny':
   1343             {
   1344                 sp<ABuffer> buffer;
   1345                 CHECK(msg->findBuffer("buffer", &buffer));
   1346 
   1347                 int32_t index;
   1348                 CHECK(buffer->meta()->findInt32("index", &index));
   1349 
   1350                 mRTPConn->injectPacket(index, buffer);
   1351                 break;
   1352             }
   1353 
   1354             case 'tiou':
   1355             {
   1356                 int32_t timeoutGenerationCheck;
   1357                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
   1358                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
   1359                     // This is an outdated message. Ignore.
   1360                     // This typically happens if a lot of seeks are
   1361                     // performed, since new timeout messages now are
   1362                     // posted at seek as well.
   1363                     break;
   1364                 }
   1365                 if (!mReceivedFirstRTCPPacket) {
   1366                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
   1367                         ALOGW("We received RTP packets but no RTCP packets, "
   1368                              "using fake timestamps.");
   1369 
   1370                         mTryFakeRTCP = true;
   1371 
   1372                         mReceivedFirstRTCPPacket = true;
   1373 
   1374                         fakeTimestamps();
   1375                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
   1376                         ALOGW("Never received any data, switching transports.");
   1377 
   1378                         mTryTCPInterleaving = true;
   1379 
   1380                         sp<AMessage> msg = new AMessage('abor', this);
   1381                         msg->setInt32("reconnect", true);
   1382                         msg->post();
   1383                     } else {
   1384                         ALOGW("Never received any data, disconnecting.");
   1385                         (new AMessage('abor', this))->post();
   1386                     }
   1387                 } else {
   1388                     if (!mAllTracksHaveTime) {
   1389                         ALOGW("We received some RTCP packets, but time "
   1390                               "could not be established on all tracks, now "
   1391                               "using fake timestamps");
   1392 
   1393                         fakeTimestamps();
   1394                     }
   1395                 }
   1396                 break;
   1397             }
   1398 
   1399             default:
   1400                 TRESPASS();
   1401                 break;
   1402         }
   1403     }
   1404 
   1405     void postKeepAlive() {
   1406         sp<AMessage> msg = new AMessage('aliv', this);
   1407         msg->setInt32("generation", mKeepAliveGeneration);
   1408         msg->post((mKeepAliveTimeoutUs * 9) / 10);
   1409     }
   1410 
   1411     void cancelAccessUnitTimeoutCheck() {
   1412         ALOGV("cancelAccessUnitTimeoutCheck");
   1413         ++mCheckGeneration;
   1414     }
   1415 
   1416     void postAccessUnitTimeoutCheck() {
   1417         if (mCheckPending) {
   1418             return;
   1419         }
   1420 
   1421         mCheckPending = true;
   1422         sp<AMessage> check = new AMessage('chek', this);
   1423         check->setInt32("generation", mCheckGeneration);
   1424         check->post(kAccessUnitTimeoutUs);
   1425     }
   1426 
   1427     static void SplitString(
   1428             const AString &s, const char *separator, List<AString> *items) {
   1429         items->clear();
   1430         size_t start = 0;
   1431         while (start < s.size()) {
   1432             ssize_t offset = s.find(separator, start);
   1433 
   1434             if (offset < 0) {
   1435                 items->push_back(AString(s, start, s.size() - start));
   1436                 break;
   1437             }
   1438 
   1439             items->push_back(AString(s, start, offset - start));
   1440             start = offset + strlen(separator);
   1441         }
   1442     }
   1443 
   1444     void parsePlayResponse(const sp<ARTSPResponse> &response) {
   1445         mPlayResponseParsed = true;
   1446         if (mTracks.size() == 0) {
   1447             ALOGV("parsePlayResponse: late packets ignored.");
   1448             return;
   1449         }
   1450 
   1451         ssize_t i = response->mHeaders.indexOfKey("range");
   1452         if (i < 0) {
   1453             // Server doesn't even tell use what range it is going to
   1454             // play, therefore we won't support seeking.
   1455             return;
   1456         }
   1457 
   1458         AString range = response->mHeaders.valueAt(i);
   1459         ALOGV("Range: %s", range.c_str());
   1460 
   1461         AString val;
   1462         CHECK(GetAttribute(range.c_str(), "npt", &val));
   1463 
   1464         float npt1, npt2;
   1465         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
   1466             // This is a live stream and therefore not seekable.
   1467 
   1468             ALOGI("This is a live stream");
   1469             return;
   1470         }
   1471 
   1472         i = response->mHeaders.indexOfKey("rtp-info");
   1473         CHECK_GE(i, 0);
   1474 
   1475         AString rtpInfo = response->mHeaders.valueAt(i);
   1476         List<AString> streamInfos;
   1477         SplitString(rtpInfo, ",", &streamInfos);
   1478 
   1479         int n = 1;
   1480         for (List<AString>::iterator it = streamInfos.begin();
   1481              it != streamInfos.end(); ++it) {
   1482             (*it).trim();
   1483             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
   1484 
   1485             CHECK(GetAttribute((*it).c_str(), "url", &val));
   1486 
   1487             size_t trackIndex = 0;
   1488             while (trackIndex < mTracks.size()
   1489                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
   1490                 ++trackIndex;
   1491             }
   1492             CHECK_LT(trackIndex, mTracks.size());
   1493 
   1494             CHECK(GetAttribute((*it).c_str(), "seq", &val));
   1495 
   1496             char *end;
   1497             unsigned long seq = strtoul(val.c_str(), &end, 10);
   1498 
   1499             TrackInfo *info = &mTracks.editItemAt(trackIndex);
   1500             info->mFirstSeqNumInSegment = seq;
   1501             info->mNewSegment = true;
   1502             info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
   1503 
   1504             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
   1505 
   1506             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
   1507 
   1508             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
   1509 
   1510             info->mNormalPlayTimeRTP = rtpTime;
   1511             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
   1512 
   1513             if (!mFirstAccessUnit) {
   1514                 postNormalPlayTimeMapping(
   1515                         trackIndex,
   1516                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1517             }
   1518 
   1519             ++n;
   1520         }
   1521     }
   1522 
   1523     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
   1524         CHECK_GE(index, 0u);
   1525         CHECK_LT(index, mTracks.size());
   1526 
   1527         const TrackInfo &info = mTracks.itemAt(index);
   1528 
   1529         *timeScale = info.mTimeScale;
   1530 
   1531         return info.mPacketSource->getFormat();
   1532     }
   1533 
   1534     size_t countTracks() const {
   1535         return mTracks.size();
   1536     }
   1537 
   1538 private:
   1539     struct TrackInfo {
   1540         AString mURL;
   1541         int mRTPSocket;
   1542         int mRTCPSocket;
   1543         bool mUsingInterleavedTCP;
   1544         uint32_t mFirstSeqNumInSegment;
   1545         bool mNewSegment;
   1546         int32_t mAllowedStaleAccessUnits;
   1547 
   1548         uint32_t mRTPAnchor;
   1549         int64_t mNTPAnchorUs;
   1550         int32_t mTimeScale;
   1551         bool mEOSReceived;
   1552 
   1553         uint32_t mNormalPlayTimeRTP;
   1554         int64_t mNormalPlayTimeUs;
   1555 
   1556         sp<APacketSource> mPacketSource;
   1557 
   1558         // Stores packets temporarily while no notion of time
   1559         // has been established yet.
   1560         List<sp<ABuffer> > mPackets;
   1561     };
   1562 
   1563     sp<AMessage> mNotify;
   1564     bool mUIDValid;
   1565     uid_t mUID;
   1566     sp<ALooper> mNetLooper;
   1567     sp<ARTSPConnection> mConn;
   1568     sp<ARTPConnection> mRTPConn;
   1569     sp<ASessionDescription> mSessionDesc;
   1570     AString mOriginalSessionURL;  // This one still has user:pass@
   1571     AString mSessionURL;
   1572     AString mSessionHost;
   1573     AString mBaseURL;
   1574     AString mControlURL;
   1575     AString mSessionID;
   1576     bool mSetupTracksSuccessful;
   1577     bool mSeekPending;
   1578     bool mFirstAccessUnit;
   1579 
   1580     bool mAllTracksHaveTime;
   1581     int64_t mNTPAnchorUs;
   1582     int64_t mMediaAnchorUs;
   1583     int64_t mLastMediaTimeUs;
   1584 
   1585     int64_t mNumAccessUnitsReceived;
   1586     bool mCheckPending;
   1587     int32_t mCheckGeneration;
   1588     int32_t mCheckTimeoutGeneration;
   1589     bool mTryTCPInterleaving;
   1590     bool mTryFakeRTCP;
   1591     bool mReceivedFirstRTCPPacket;
   1592     bool mReceivedFirstRTPPacket;
   1593     bool mSeekable;
   1594     int64_t mKeepAliveTimeoutUs;
   1595     int32_t mKeepAliveGeneration;
   1596     bool mPausing;
   1597     int32_t mPauseGeneration;
   1598 
   1599     Vector<TrackInfo> mTracks;
   1600 
   1601     bool mPlayResponseParsed;
   1602 
   1603     void setupTrack(size_t index) {
   1604         sp<APacketSource> source =
   1605             new APacketSource(mSessionDesc, index);
   1606 
   1607         if (source->initCheck() != OK) {
   1608             ALOGW("Unsupported format. Ignoring track #%zu.", index);
   1609 
   1610             sp<AMessage> reply = new AMessage('setu', this);
   1611             reply->setSize("index", index);
   1612             reply->setInt32("result", ERROR_UNSUPPORTED);
   1613             reply->post();
   1614             return;
   1615         }
   1616 
   1617         AString url;
   1618         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
   1619 
   1620         AString trackURL;
   1621         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
   1622 
   1623         mTracks.push(TrackInfo());
   1624         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
   1625         info->mURL = trackURL;
   1626         info->mPacketSource = source;
   1627         info->mUsingInterleavedTCP = false;
   1628         info->mFirstSeqNumInSegment = 0;
   1629         info->mNewSegment = true;
   1630         info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
   1631         info->mRTPSocket = -1;
   1632         info->mRTCPSocket = -1;
   1633         info->mRTPAnchor = 0;
   1634         info->mNTPAnchorUs = -1;
   1635         info->mNormalPlayTimeRTP = 0;
   1636         info->mNormalPlayTimeUs = 0ll;
   1637 
   1638         unsigned long PT;
   1639         AString formatDesc;
   1640         AString formatParams;
   1641         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
   1642 
   1643         int32_t timescale;
   1644         int32_t numChannels;
   1645         ASessionDescription::ParseFormatDesc(
   1646                 formatDesc.c_str(), &timescale, &numChannels);
   1647 
   1648         info->mTimeScale = timescale;
   1649         info->mEOSReceived = false;
   1650 
   1651         ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
   1652 
   1653         AString request = "SETUP ";
   1654         request.append(trackURL);
   1655         request.append(" RTSP/1.0\r\n");
   1656 
   1657         if (mTryTCPInterleaving) {
   1658             size_t interleaveIndex = 2 * (mTracks.size() - 1);
   1659             info->mUsingInterleavedTCP = true;
   1660             info->mRTPSocket = interleaveIndex;
   1661             info->mRTCPSocket = interleaveIndex + 1;
   1662 
   1663             request.append("Transport: RTP/AVP/TCP;interleaved=");
   1664             request.append(interleaveIndex);
   1665             request.append("-");
   1666             request.append(interleaveIndex + 1);
   1667         } else {
   1668             unsigned rtpPort;
   1669             ARTPConnection::MakePortPair(
   1670                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
   1671 
   1672             if (mUIDValid) {
   1673                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
   1674                                                 (uint32_t)*(uint32_t*) "RTP_");
   1675                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
   1676                                                 (uint32_t)*(uint32_t*) "RTP_");
   1677                 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
   1678                 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
   1679             }
   1680 
   1681             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
   1682             request.append(rtpPort);
   1683             request.append("-");
   1684             request.append(rtpPort + 1);
   1685         }
   1686 
   1687         request.append("\r\n");
   1688 
   1689         if (index > 1) {
   1690             request.append("Session: ");
   1691             request.append(mSessionID);
   1692             request.append("\r\n");
   1693         }
   1694 
   1695         request.append("\r\n");
   1696 
   1697         sp<AMessage> reply = new AMessage('setu', this);
   1698         reply->setSize("index", index);
   1699         reply->setSize("track-index", mTracks.size() - 1);
   1700         mConn->sendRequest(request.c_str(), reply);
   1701     }
   1702 
   1703     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
   1704         out->clear();
   1705 
   1706         if (strncasecmp("rtsp://", baseURL, 7)) {
   1707             // Base URL must be absolute
   1708             return false;
   1709         }
   1710 
   1711         if (!strncasecmp("rtsp://", url, 7)) {
   1712             // "url" is already an absolute URL, ignore base URL.
   1713             out->setTo(url);
   1714             return true;
   1715         }
   1716 
   1717         size_t n = strlen(baseURL);
   1718         out->setTo(baseURL);
   1719         if (baseURL[n - 1] != '/') {
   1720             out->append("/");
   1721         }
   1722         out->append(url);
   1723 
   1724         return true;
   1725     }
   1726 
   1727     void fakeTimestamps() {
   1728         mNTPAnchorUs = -1ll;
   1729         for (size_t i = 0; i < mTracks.size(); ++i) {
   1730             onTimeUpdate(i, 0, 0ll);
   1731         }
   1732     }
   1733 
   1734     bool dataReceivedOnAllChannels() {
   1735         TrackInfo *track;
   1736         for (size_t i = 0; i < mTracks.size(); ++i) {
   1737             track = &mTracks.editItemAt(i);
   1738             if (track->mPackets.empty()) {
   1739                 return false;
   1740             }
   1741         }
   1742         return true;
   1743     }
   1744 
   1745     void handleFirstAccessUnit() {
   1746         if (mFirstAccessUnit) {
   1747             sp<AMessage> msg = mNotify->dup();
   1748             msg->setInt32("what", kWhatConnected);
   1749             msg->post();
   1750 
   1751             if (mSeekable) {
   1752                 for (size_t i = 0; i < mTracks.size(); ++i) {
   1753                     TrackInfo *info = &mTracks.editItemAt(i);
   1754 
   1755                     postNormalPlayTimeMapping(
   1756                             i,
   1757                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
   1758                 }
   1759             }
   1760 
   1761             mFirstAccessUnit = false;
   1762         }
   1763     }
   1764 
   1765     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
   1766         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
   1767              trackIndex, rtpTime, (long long)ntpTime);
   1768 
   1769         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
   1770 
   1771         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1772 
   1773         track->mRTPAnchor = rtpTime;
   1774         track->mNTPAnchorUs = ntpTimeUs;
   1775 
   1776         if (mNTPAnchorUs < 0) {
   1777             mNTPAnchorUs = ntpTimeUs;
   1778             mMediaAnchorUs = mLastMediaTimeUs;
   1779         }
   1780 
   1781         if (!mAllTracksHaveTime) {
   1782             bool allTracksHaveTime = (mTracks.size() > 0);
   1783             for (size_t i = 0; i < mTracks.size(); ++i) {
   1784                 TrackInfo *track = &mTracks.editItemAt(i);
   1785                 if (track->mNTPAnchorUs < 0) {
   1786                     allTracksHaveTime = false;
   1787                     break;
   1788                 }
   1789             }
   1790             if (allTracksHaveTime) {
   1791                 mAllTracksHaveTime = true;
   1792                 ALOGI("Time now established for all tracks.");
   1793             }
   1794         }
   1795         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
   1796             handleFirstAccessUnit();
   1797 
   1798             // Time is now established, lets start timestamping immediately
   1799             for (size_t i = 0; i < mTracks.size(); ++i) {
   1800                 if (OK != processAccessUnitQueue(i)) {
   1801                     return;
   1802                 }
   1803             }
   1804             for (size_t i = 0; i < mTracks.size(); ++i) {
   1805                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
   1806                 if (trackInfo->mEOSReceived) {
   1807                     postQueueEOS(i, ERROR_END_OF_STREAM);
   1808                     trackInfo->mEOSReceived = false;
   1809                 }
   1810             }
   1811         }
   1812     }
   1813 
   1814     status_t processAccessUnitQueue(int32_t trackIndex) {
   1815         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1816         while (!track->mPackets.empty()) {
   1817             sp<ABuffer> accessUnit = *track->mPackets.begin();
   1818             track->mPackets.erase(track->mPackets.begin());
   1819 
   1820             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
   1821             if (track->mNewSegment) {
   1822                 // The sequence number from RTP packet has only 16 bits and is extended
   1823                 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
   1824                 // RTSP "PLAY" command should be used to detect the first RTP packet
   1825                 // after seeking.
   1826                 if (mSeekable) {
   1827                     if (track->mAllowedStaleAccessUnits > 0) {
   1828                         uint32_t seqNum16 = seqNum & 0xffff;
   1829                         uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
   1830                         if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
   1831                                 || seqNum16 < firstSeqNumInSegment16) {
   1832                             // Not the first rtp packet of the stream after seeking, discarding.
   1833                             track->mAllowedStaleAccessUnits--;
   1834                             ALOGV("discarding stale access unit (0x%x : 0x%x)",
   1835                                  seqNum, track->mFirstSeqNumInSegment);
   1836                             continue;
   1837                         }
   1838                         ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
   1839                                 "Missing the first packet(%u), now take packet(%u) as first one",
   1840                                 track->mFirstSeqNumInSegment, seqNum);
   1841                     } else { // track->mAllowedStaleAccessUnits <= 0
   1842                         mNumAccessUnitsReceived = 0;
   1843                         ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
   1844                              "Still no first rtp packet after %d stale ones",
   1845                              kMaxAllowedStaleAccessUnits);
   1846                         track->mAllowedStaleAccessUnits = -1;
   1847                         return UNKNOWN_ERROR;
   1848                     }
   1849                 }
   1850 
   1851                 // Now found the first rtp packet of the stream after seeking.
   1852                 track->mFirstSeqNumInSegment = seqNum;
   1853                 track->mNewSegment = false;
   1854             }
   1855 
   1856             if (seqNum < track->mFirstSeqNumInSegment) {
   1857                 ALOGV("dropping stale access-unit (%d < %d)",
   1858                      seqNum, track->mFirstSeqNumInSegment);
   1859                 continue;
   1860             }
   1861 
   1862             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1863                 postQueueAccessUnit(trackIndex, accessUnit);
   1864             }
   1865         }
   1866         return OK;
   1867     }
   1868 
   1869     void onAccessUnitComplete(
   1870             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
   1871         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1872         track->mPackets.push_back(accessUnit);
   1873 
   1874         uint32_t seqNum = (uint32_t)accessUnit->int32Data();
   1875         ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
   1876 
   1877         if(!mPlayResponseParsed){
   1878             ALOGV("play response is not parsed");
   1879             return;
   1880         }
   1881 
   1882         handleFirstAccessUnit();
   1883 
   1884         if (!mAllTracksHaveTime) {
   1885             ALOGV("storing accessUnit, no time established yet");
   1886             return;
   1887         }
   1888 
   1889         if (OK != processAccessUnitQueue(trackIndex)) {
   1890             return;
   1891         }
   1892 
   1893         if (track->mEOSReceived) {
   1894             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
   1895             track->mEOSReceived = false;
   1896         }
   1897     }
   1898 
   1899     bool addMediaTimestamp(
   1900             int32_t trackIndex, const TrackInfo *track,
   1901             const sp<ABuffer> &accessUnit) {
   1902         UNUSED_UNLESS_VERBOSE(trackIndex);
   1903 
   1904         uint32_t rtpTime;
   1905         CHECK(accessUnit->meta()->findInt32(
   1906                     "rtp-time", (int32_t *)&rtpTime));
   1907 
   1908         int64_t relRtpTimeUs =
   1909             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
   1910                 / track->mTimeScale;
   1911 
   1912         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
   1913 
   1914         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
   1915 
   1916         if (mediaTimeUs > mLastMediaTimeUs) {
   1917             mLastMediaTimeUs = mediaTimeUs;
   1918         }
   1919 
   1920         if (mediaTimeUs < 0) {
   1921             ALOGV("dropping early accessUnit.");
   1922             return false;
   1923         }
   1924 
   1925         ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
   1926              trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
   1927 
   1928         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
   1929 
   1930         return true;
   1931     }
   1932 
   1933     void postQueueAccessUnit(
   1934             size_t trackIndex, const sp<ABuffer> &accessUnit) {
   1935         sp<AMessage> msg = mNotify->dup();
   1936         msg->setInt32("what", kWhatAccessUnit);
   1937         msg->setSize("trackIndex", trackIndex);
   1938         msg->setBuffer("accessUnit", accessUnit);
   1939         msg->post();
   1940     }
   1941 
   1942     void postQueueEOS(size_t trackIndex, status_t finalResult) {
   1943         sp<AMessage> msg = mNotify->dup();
   1944         msg->setInt32("what", kWhatEOS);
   1945         msg->setSize("trackIndex", trackIndex);
   1946         msg->setInt32("finalResult", finalResult);
   1947         msg->post();
   1948     }
   1949 
   1950     void postQueueSeekDiscontinuity(size_t trackIndex) {
   1951         sp<AMessage> msg = mNotify->dup();
   1952         msg->setInt32("what", kWhatSeekDiscontinuity);
   1953         msg->setSize("trackIndex", trackIndex);
   1954         msg->post();
   1955     }
   1956 
   1957     void postNormalPlayTimeMapping(
   1958             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
   1959         sp<AMessage> msg = mNotify->dup();
   1960         msg->setInt32("what", kWhatNormalPlayTimeMapping);
   1961         msg->setSize("trackIndex", trackIndex);
   1962         msg->setInt32("rtpTime", rtpTime);
   1963         msg->setInt64("nptUs", nptUs);
   1964         msg->post();
   1965     }
   1966 
   1967     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
   1968 };
   1969 
   1970 }  // namespace android
   1971 
   1972 #endif  // MY_HANDLER_H_
   1973