Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #ifndef MY_HANDLER_H_
     18 
     19 #define MY_HANDLER_H_
     20 
     21 //#define LOG_NDEBUG 0
     22 #define LOG_TAG "MyHandler"
     23 #include <utils/Log.h>
     24 
     25 #include "APacketSource.h"
     26 #include "ARTPConnection.h"
     27 #include "ARTSPConnection.h"
     28 #include "ASessionDescription.h"
     29 
     30 #include <ctype.h>
     31 #include <cutils/properties.h>
     32 
     33 #include <media/stagefright/foundation/ABuffer.h>
     34 #include <media/stagefright/foundation/ADebug.h>
     35 #include <media/stagefright/foundation/ALooper.h>
     36 #include <media/stagefright/foundation/AMessage.h>
     37 #include <media/stagefright/MediaErrors.h>
     38 
     39 #include <arpa/inet.h>
     40 #include <sys/socket.h>
     41 #include <netdb.h>
     42 
     43 #include "HTTPBase.h"
     44 
     45 // If no access units are received within 5 secs, assume that the rtp
     46 // stream has ended and signal end of stream.
     47 static int64_t kAccessUnitTimeoutUs = 5000000ll;
     48 
     49 // If no access units arrive for the first 10 secs after starting the
     50 // stream, assume none ever will and signal EOS or switch transports.
     51 static int64_t kStartupTimeoutUs = 10000000ll;
     52 
     53 namespace android {
     54 
     55 static void MakeUserAgentString(AString *s) {
     56     s->setTo("stagefright/1.1 (Linux;Android ");
     57 
     58 #if (PROPERTY_VALUE_MAX < 8)
     59 #error "PROPERTY_VALUE_MAX must be at least 8"
     60 #endif
     61 
     62     char value[PROPERTY_VALUE_MAX];
     63     property_get("ro.build.version.release", value, "Unknown");
     64     s->append(value);
     65     s->append(")");
     66 }
     67 
     68 static bool GetAttribute(const char *s, const char *key, AString *value) {
     69     value->clear();
     70 
     71     size_t keyLen = strlen(key);
     72 
     73     for (;;) {
     74         while (isspace(*s)) {
     75             ++s;
     76         }
     77 
     78         const char *colonPos = strchr(s, ';');
     79 
     80         size_t len =
     81             (colonPos == NULL) ? strlen(s) : colonPos - s;
     82 
     83         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
     84             value->setTo(&s[keyLen + 1], len - keyLen - 1);
     85             return true;
     86         }
     87 
     88         if (colonPos == NULL) {
     89             return false;
     90         }
     91 
     92         s = colonPos + 1;
     93     }
     94 }
     95 
     96 struct MyHandler : public AHandler {
     97     MyHandler(
     98             const char *url, const sp<ALooper> &looper,
     99             bool uidValid = false, uid_t uid = 0)
    100         : mUIDValid(uidValid),
    101           mUID(uid),
    102           mLooper(looper),
    103           mNetLooper(new ALooper),
    104           mConn(new ARTSPConnection(mUIDValid, mUID)),
    105           mRTPConn(new ARTPConnection),
    106           mOriginalSessionURL(url),
    107           mSessionURL(url),
    108           mSetupTracksSuccessful(false),
    109           mSeekPending(false),
    110           mFirstAccessUnit(true),
    111           mNTPAnchorUs(-1),
    112           mMediaAnchorUs(-1),
    113           mLastMediaTimeUs(0),
    114           mNumAccessUnitsReceived(0),
    115           mCheckPending(false),
    116           mCheckGeneration(0),
    117           mTryTCPInterleaving(false),
    118           mTryFakeRTCP(false),
    119           mReceivedFirstRTCPPacket(false),
    120           mReceivedFirstRTPPacket(false),
    121           mSeekable(false) {
    122         mNetLooper->setName("rtsp net");
    123         mNetLooper->start(false /* runOnCallingThread */,
    124                           false /* canCallJava */,
    125                           PRIORITY_HIGHEST);
    126 
    127         // Strip any authentication info from the session url, we don't
    128         // want to transmit user/pass in cleartext.
    129         AString host, path, user, pass;
    130         unsigned port;
    131         CHECK(ARTSPConnection::ParseURL(
    132                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
    133 
    134         if (user.size() > 0) {
    135             mSessionURL.clear();
    136             mSessionURL.append("rtsp://");
    137             mSessionURL.append(host);
    138             mSessionURL.append(":");
    139             mSessionURL.append(StringPrintf("%u", port));
    140             mSessionURL.append(path);
    141 
    142             LOGI("rewritten session url: '%s'", mSessionURL.c_str());
    143         }
    144 
    145         mSessionHost = host;
    146     }
    147 
    148     void connect(const sp<AMessage> &doneMsg) {
    149         mDoneMsg = doneMsg;
    150 
    151         mLooper->registerHandler(this);
    152         mLooper->registerHandler(mConn);
    153         (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
    154 
    155         sp<AMessage> notify = new AMessage('biny', id());
    156         mConn->observeBinaryData(notify);
    157 
    158         sp<AMessage> reply = new AMessage('conn', id());
    159         mConn->connect(mOriginalSessionURL.c_str(), reply);
    160     }
    161 
    162     void disconnect(const sp<AMessage> &doneMsg) {
    163         mDoneMsg = doneMsg;
    164 
    165         (new AMessage('abor', id()))->post();
    166     }
    167 
    168     void seek(int64_t timeUs, const sp<AMessage> &doneMsg) {
    169         sp<AMessage> msg = new AMessage('seek', id());
    170         msg->setInt64("time", timeUs);
    171         msg->setMessage("doneMsg", doneMsg);
    172         msg->post();
    173     }
    174 
    175     int64_t getNormalPlayTimeUs() {
    176         int64_t maxTimeUs = 0;
    177         for (size_t i = 0; i < mTracks.size(); ++i) {
    178             int64_t timeUs = mTracks.editItemAt(i).mPacketSource
    179                 ->getNormalPlayTimeUs();
    180 
    181             if (i == 0 || timeUs > maxTimeUs) {
    182                 maxTimeUs = timeUs;
    183             }
    184         }
    185 
    186         return maxTimeUs;
    187     }
    188 
    189     static void addRR(const sp<ABuffer> &buf) {
    190         uint8_t *ptr = buf->data() + buf->size();
    191         ptr[0] = 0x80 | 0;
    192         ptr[1] = 201;  // RR
    193         ptr[2] = 0;
    194         ptr[3] = 1;
    195         ptr[4] = 0xde;  // SSRC
    196         ptr[5] = 0xad;
    197         ptr[6] = 0xbe;
    198         ptr[7] = 0xef;
    199 
    200         buf->setRange(0, buf->size() + 8);
    201     }
    202 
    203     static void addSDES(int s, const sp<ABuffer> &buffer) {
    204         struct sockaddr_in addr;
    205         socklen_t addrSize = sizeof(addr);
    206         CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
    207 
    208         uint8_t *data = buffer->data() + buffer->size();
    209         data[0] = 0x80 | 1;
    210         data[1] = 202;  // SDES
    211         data[4] = 0xde;  // SSRC
    212         data[5] = 0xad;
    213         data[6] = 0xbe;
    214         data[7] = 0xef;
    215 
    216         size_t offset = 8;
    217 
    218         data[offset++] = 1;  // CNAME
    219 
    220         AString cname = "stagefright@";
    221         cname.append(inet_ntoa(addr.sin_addr));
    222         data[offset++] = cname.size();
    223 
    224         memcpy(&data[offset], cname.c_str(), cname.size());
    225         offset += cname.size();
    226 
    227         data[offset++] = 6;  // TOOL
    228 
    229         AString tool;
    230         MakeUserAgentString(&tool);
    231 
    232         data[offset++] = tool.size();
    233 
    234         memcpy(&data[offset], tool.c_str(), tool.size());
    235         offset += tool.size();
    236 
    237         data[offset++] = 0;
    238 
    239         if ((offset % 4) > 0) {
    240             size_t count = 4 - (offset % 4);
    241             switch (count) {
    242                 case 3:
    243                     data[offset++] = 0;
    244                 case 2:
    245                     data[offset++] = 0;
    246                 case 1:
    247                     data[offset++] = 0;
    248             }
    249         }
    250 
    251         size_t numWords = (offset / 4) - 1;
    252         data[2] = numWords >> 8;
    253         data[3] = numWords & 0xff;
    254 
    255         buffer->setRange(buffer->offset(), buffer->size() + offset);
    256     }
    257 
    258     // In case we're behind NAT, fire off two UDP packets to the remote
    259     // rtp/rtcp ports to poke a hole into the firewall for future incoming
    260     // packets. We're going to send an RR/SDES RTCP packet to both of them.
    261     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
    262         struct sockaddr_in addr;
    263         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    264         addr.sin_family = AF_INET;
    265 
    266         AString source;
    267         AString server_port;
    268         if (!GetAttribute(transport.c_str(),
    269                           "source",
    270                           &source)) {
    271             LOGW("Missing 'source' field in Transport response. Using "
    272                  "RTSP endpoint address.");
    273 
    274             struct hostent *ent = gethostbyname(mSessionHost.c_str());
    275             if (ent == NULL) {
    276                 LOGE("Failed to look up address of session host '%s'",
    277                      mSessionHost.c_str());
    278 
    279                 return false;
    280             }
    281 
    282             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    283         } else {
    284             addr.sin_addr.s_addr = inet_addr(source.c_str());
    285         }
    286 
    287         if (!GetAttribute(transport.c_str(),
    288                                  "server_port",
    289                                  &server_port)) {
    290             LOGI("Missing 'server_port' field in Transport response.");
    291             return false;
    292         }
    293 
    294         int rtpPort, rtcpPort;
    295         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
    296                 || rtpPort <= 0 || rtpPort > 65535
    297                 || rtcpPort <=0 || rtcpPort > 65535
    298                 || rtcpPort != rtpPort + 1) {
    299             LOGE("Server picked invalid RTP/RTCP port pair %s,"
    300                  " RTP port must be even, RTCP port must be one higher.",
    301                  server_port.c_str());
    302 
    303             return false;
    304         }
    305 
    306         if (rtpPort & 1) {
    307             LOGW("Server picked an odd RTP port, it should've picked an "
    308                  "even one, we'll let it pass for now, but this may break "
    309                  "in the future.");
    310         }
    311 
    312         if (addr.sin_addr.s_addr == INADDR_NONE) {
    313             return true;
    314         }
    315 
    316         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
    317             // No firewalls to traverse on the loopback interface.
    318             return true;
    319         }
    320 
    321         // Make up an RR/SDES RTCP packet.
    322         sp<ABuffer> buf = new ABuffer(65536);
    323         buf->setRange(0, 0);
    324         addRR(buf);
    325         addSDES(rtpSocket, buf);
    326 
    327         addr.sin_port = htons(rtpPort);
    328 
    329         ssize_t n = sendto(
    330                 rtpSocket, buf->data(), buf->size(), 0,
    331                 (const sockaddr *)&addr, sizeof(addr));
    332 
    333         if (n < (ssize_t)buf->size()) {
    334             LOGE("failed to poke a hole for RTP packets");
    335             return false;
    336         }
    337 
    338         addr.sin_port = htons(rtcpPort);
    339 
    340         n = sendto(
    341                 rtcpSocket, buf->data(), buf->size(), 0,
    342                 (const sockaddr *)&addr, sizeof(addr));
    343 
    344         if (n < (ssize_t)buf->size()) {
    345             LOGE("failed to poke a hole for RTCP packets");
    346             return false;
    347         }
    348 
    349         LOGV("successfully poked holes.");
    350 
    351         return true;
    352     }
    353 
    354     virtual void onMessageReceived(const sp<AMessage> &msg) {
    355         switch (msg->what()) {
    356             case 'conn':
    357             {
    358                 int32_t result;
    359                 CHECK(msg->findInt32("result", &result));
    360 
    361                 LOGI("connection request completed with result %d (%s)",
    362                      result, strerror(-result));
    363 
    364                 if (result == OK) {
    365                     AString request;
    366                     request = "DESCRIBE ";
    367                     request.append(mSessionURL);
    368                     request.append(" RTSP/1.0\r\n");
    369                     request.append("Accept: application/sdp\r\n");
    370                     request.append("\r\n");
    371 
    372                     sp<AMessage> reply = new AMessage('desc', id());
    373                     mConn->sendRequest(request.c_str(), reply);
    374                 } else {
    375                     (new AMessage('disc', id()))->post();
    376                 }
    377                 break;
    378             }
    379 
    380             case 'disc':
    381             {
    382                 int32_t reconnect;
    383                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    384                     sp<AMessage> reply = new AMessage('conn', id());
    385                     mConn->connect(mOriginalSessionURL.c_str(), reply);
    386                 } else {
    387                     (new AMessage('quit', id()))->post();
    388                 }
    389                 break;
    390             }
    391 
    392             case 'desc':
    393             {
    394                 int32_t result;
    395                 CHECK(msg->findInt32("result", &result));
    396 
    397                 LOGI("DESCRIBE completed with result %d (%s)",
    398                      result, strerror(-result));
    399 
    400                 if (result == OK) {
    401                     sp<RefBase> obj;
    402                     CHECK(msg->findObject("response", &obj));
    403                     sp<ARTSPResponse> response =
    404                         static_cast<ARTSPResponse *>(obj.get());
    405 
    406                     if (response->mStatusCode == 302) {
    407                         ssize_t i = response->mHeaders.indexOfKey("location");
    408                         CHECK_GE(i, 0);
    409 
    410                         mSessionURL = response->mHeaders.valueAt(i);
    411 
    412                         AString request;
    413                         request = "DESCRIBE ";
    414                         request.append(mSessionURL);
    415                         request.append(" RTSP/1.0\r\n");
    416                         request.append("Accept: application/sdp\r\n");
    417                         request.append("\r\n");
    418 
    419                         sp<AMessage> reply = new AMessage('desc', id());
    420                         mConn->sendRequest(request.c_str(), reply);
    421                         break;
    422                     }
    423 
    424                     if (response->mStatusCode != 200) {
    425                         result = UNKNOWN_ERROR;
    426                     } else {
    427                         mSessionDesc = new ASessionDescription;
    428 
    429                         mSessionDesc->setTo(
    430                                 response->mContent->data(),
    431                                 response->mContent->size());
    432 
    433                         if (!mSessionDesc->isValid()) {
    434                             LOGE("Failed to parse session description.");
    435                             result = ERROR_MALFORMED;
    436                         } else {
    437                             ssize_t i = response->mHeaders.indexOfKey("content-base");
    438                             if (i >= 0) {
    439                                 mBaseURL = response->mHeaders.valueAt(i);
    440                             } else {
    441                                 i = response->mHeaders.indexOfKey("content-location");
    442                                 if (i >= 0) {
    443                                     mBaseURL = response->mHeaders.valueAt(i);
    444                                 } else {
    445                                     mBaseURL = mSessionURL;
    446                                 }
    447                             }
    448 
    449                             if (!mBaseURL.startsWith("rtsp://")) {
    450                                 // Some misbehaving servers specify a relative
    451                                 // URL in one of the locations above, combine
    452                                 // it with the absolute session URL to get
    453                                 // something usable...
    454 
    455                                 LOGW("Server specified a non-absolute base URL"
    456                                      ", combining it with the session URL to "
    457                                      "get something usable...");
    458 
    459                                 AString tmp;
    460                                 CHECK(MakeURL(
    461                                             mSessionURL.c_str(),
    462                                             mBaseURL.c_str(),
    463                                             &tmp));
    464 
    465                                 mBaseURL = tmp;
    466                             }
    467 
    468                             CHECK_GT(mSessionDesc->countTracks(), 1u);
    469                             setupTrack(1);
    470                         }
    471                     }
    472                 }
    473 
    474                 if (result != OK) {
    475                     sp<AMessage> reply = new AMessage('disc', id());
    476                     mConn->disconnect(reply);
    477                 }
    478                 break;
    479             }
    480 
    481             case 'setu':
    482             {
    483                 size_t index;
    484                 CHECK(msg->findSize("index", &index));
    485 
    486                 TrackInfo *track = NULL;
    487                 size_t trackIndex;
    488                 if (msg->findSize("track-index", &trackIndex)) {
    489                     track = &mTracks.editItemAt(trackIndex);
    490                 }
    491 
    492                 int32_t result;
    493                 CHECK(msg->findInt32("result", &result));
    494 
    495                 LOGI("SETUP(%d) completed with result %d (%s)",
    496                      index, result, strerror(-result));
    497 
    498                 if (result == OK) {
    499                     CHECK(track != NULL);
    500 
    501                     sp<RefBase> obj;
    502                     CHECK(msg->findObject("response", &obj));
    503                     sp<ARTSPResponse> response =
    504                         static_cast<ARTSPResponse *>(obj.get());
    505 
    506                     if (response->mStatusCode != 200) {
    507                         result = UNKNOWN_ERROR;
    508                     } else {
    509                         ssize_t i = response->mHeaders.indexOfKey("session");
    510                         CHECK_GE(i, 0);
    511 
    512                         mSessionID = response->mHeaders.valueAt(i);
    513                         i = mSessionID.find(";");
    514                         if (i >= 0) {
    515                             // Remove options, i.e. ";timeout=90"
    516                             mSessionID.erase(i, mSessionID.size() - i);
    517                         }
    518 
    519                         sp<AMessage> notify = new AMessage('accu', id());
    520                         notify->setSize("track-index", trackIndex);
    521 
    522                         i = response->mHeaders.indexOfKey("transport");
    523                         CHECK_GE(i, 0);
    524 
    525                         if (!track->mUsingInterleavedTCP) {
    526                             AString transport = response->mHeaders.valueAt(i);
    527 
    528                             // We are going to continue even if we were
    529                             // unable to poke a hole into the firewall...
    530                             pokeAHole(
    531                                     track->mRTPSocket,
    532                                     track->mRTCPSocket,
    533                                     transport);
    534                         }
    535 
    536                         mRTPConn->addStream(
    537                                 track->mRTPSocket, track->mRTCPSocket,
    538                                 mSessionDesc, index,
    539                                 notify, track->mUsingInterleavedTCP);
    540 
    541                         mSetupTracksSuccessful = true;
    542                     }
    543                 }
    544 
    545                 if (result != OK) {
    546                     if (track) {
    547                         if (!track->mUsingInterleavedTCP) {
    548                             // Clear the tag
    549                             if (mUIDValid) {
    550                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
    551                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
    552                             }
    553 
    554                             close(track->mRTPSocket);
    555                             close(track->mRTCPSocket);
    556                         }
    557 
    558                         mTracks.removeItemsAt(trackIndex);
    559                     }
    560                 }
    561 
    562                 ++index;
    563                 if (index < mSessionDesc->countTracks()) {
    564                     setupTrack(index);
    565                 } else if (mSetupTracksSuccessful) {
    566                     AString request = "PLAY ";
    567                     request.append(mSessionURL);
    568                     request.append(" RTSP/1.0\r\n");
    569 
    570                     request.append("Session: ");
    571                     request.append(mSessionID);
    572                     request.append("\r\n");
    573 
    574                     request.append("\r\n");
    575 
    576                     sp<AMessage> reply = new AMessage('play', id());
    577                     mConn->sendRequest(request.c_str(), reply);
    578                 } else {
    579                     sp<AMessage> reply = new AMessage('disc', id());
    580                     mConn->disconnect(reply);
    581                 }
    582                 break;
    583             }
    584 
    585             case 'play':
    586             {
    587                 int32_t result;
    588                 CHECK(msg->findInt32("result", &result));
    589 
    590                 LOGI("PLAY completed with result %d (%s)",
    591                      result, strerror(-result));
    592 
    593                 if (result == OK) {
    594                     sp<RefBase> obj;
    595                     CHECK(msg->findObject("response", &obj));
    596                     sp<ARTSPResponse> response =
    597                         static_cast<ARTSPResponse *>(obj.get());
    598 
    599                     if (response->mStatusCode != 200) {
    600                         result = UNKNOWN_ERROR;
    601                     } else {
    602                         parsePlayResponse(response);
    603 
    604                         sp<AMessage> timeout = new AMessage('tiou', id());
    605                         timeout->post(kStartupTimeoutUs);
    606                     }
    607                 }
    608 
    609                 if (result != OK) {
    610                     sp<AMessage> reply = new AMessage('disc', id());
    611                     mConn->disconnect(reply);
    612                 }
    613 
    614                 break;
    615             }
    616 
    617             case 'abor':
    618             {
    619                 for (size_t i = 0; i < mTracks.size(); ++i) {
    620                     TrackInfo *info = &mTracks.editItemAt(i);
    621 
    622                     info->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
    623 
    624                     if (!info->mUsingInterleavedTCP) {
    625                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
    626 
    627                         // Clear the tag
    628                         if (mUIDValid) {
    629                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
    630                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
    631                         }
    632 
    633                         close(info->mRTPSocket);
    634                         close(info->mRTCPSocket);
    635                     }
    636                 }
    637                 mTracks.clear();
    638                 mSetupTracksSuccessful = false;
    639                 mSeekPending = false;
    640                 mFirstAccessUnit = true;
    641                 mNTPAnchorUs = -1;
    642                 mMediaAnchorUs = -1;
    643                 mNumAccessUnitsReceived = 0;
    644                 mReceivedFirstRTCPPacket = false;
    645                 mReceivedFirstRTPPacket = false;
    646                 mSeekable = false;
    647 
    648                 sp<AMessage> reply = new AMessage('tear', id());
    649 
    650                 int32_t reconnect;
    651                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    652                     reply->setInt32("reconnect", true);
    653                 }
    654 
    655                 AString request;
    656                 request = "TEARDOWN ";
    657 
    658                 // XXX should use aggregate url from SDP here...
    659                 request.append(mSessionURL);
    660                 request.append(" RTSP/1.0\r\n");
    661 
    662                 request.append("Session: ");
    663                 request.append(mSessionID);
    664                 request.append("\r\n");
    665 
    666                 request.append("\r\n");
    667 
    668                 mConn->sendRequest(request.c_str(), reply);
    669                 break;
    670             }
    671 
    672             case 'tear':
    673             {
    674                 int32_t result;
    675                 CHECK(msg->findInt32("result", &result));
    676 
    677                 LOGI("TEARDOWN completed with result %d (%s)",
    678                      result, strerror(-result));
    679 
    680                 sp<AMessage> reply = new AMessage('disc', id());
    681 
    682                 int32_t reconnect;
    683                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
    684                     reply->setInt32("reconnect", true);
    685                 }
    686 
    687                 mConn->disconnect(reply);
    688                 break;
    689             }
    690 
    691             case 'quit':
    692             {
    693                 if (mDoneMsg != NULL) {
    694                     mDoneMsg->setInt32("result", UNKNOWN_ERROR);
    695                     mDoneMsg->post();
    696                     mDoneMsg = NULL;
    697                 }
    698                 break;
    699             }
    700 
    701             case 'chek':
    702             {
    703                 int32_t generation;
    704                 CHECK(msg->findInt32("generation", &generation));
    705                 if (generation != mCheckGeneration) {
    706                     // This is an outdated message. Ignore.
    707                     break;
    708                 }
    709 
    710                 if (mNumAccessUnitsReceived == 0) {
    711                     LOGI("stream ended? aborting.");
    712                     (new AMessage('abor', id()))->post();
    713                     break;
    714                 }
    715 
    716                 mNumAccessUnitsReceived = 0;
    717                 msg->post(kAccessUnitTimeoutUs);
    718                 break;
    719             }
    720 
    721             case 'accu':
    722             {
    723                 int32_t timeUpdate;
    724                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
    725                     size_t trackIndex;
    726                     CHECK(msg->findSize("track-index", &trackIndex));
    727 
    728                     uint32_t rtpTime;
    729                     uint64_t ntpTime;
    730                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
    731                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
    732 
    733                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
    734                     break;
    735                 }
    736 
    737                 int32_t first;
    738                 if (msg->findInt32("first-rtcp", &first)) {
    739                     mReceivedFirstRTCPPacket = true;
    740                     break;
    741                 }
    742 
    743                 if (msg->findInt32("first-rtp", &first)) {
    744                     mReceivedFirstRTPPacket = true;
    745                     break;
    746                 }
    747 
    748                 ++mNumAccessUnitsReceived;
    749                 postAccessUnitTimeoutCheck();
    750 
    751                 size_t trackIndex;
    752                 CHECK(msg->findSize("track-index", &trackIndex));
    753 
    754                 if (trackIndex >= mTracks.size()) {
    755                     LOGV("late packets ignored.");
    756                     break;
    757                 }
    758 
    759                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
    760 
    761                 int32_t eos;
    762                 if (msg->findInt32("eos", &eos)) {
    763                     LOGI("received BYE on track index %d", trackIndex);
    764 #if 0
    765                     track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
    766 #endif
    767                     return;
    768                 }
    769 
    770                 sp<RefBase> obj;
    771                 CHECK(msg->findObject("access-unit", &obj));
    772 
    773                 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
    774 
    775                 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
    776 
    777                 if (mSeekPending) {
    778                     LOGV("we're seeking, dropping stale packet.");
    779                     break;
    780                 }
    781 
    782                 if (seqNum < track->mFirstSeqNumInSegment) {
    783                     LOGV("dropping stale access-unit (%d < %d)",
    784                          seqNum, track->mFirstSeqNumInSegment);
    785                     break;
    786                 }
    787 
    788                 if (track->mNewSegment) {
    789                     track->mNewSegment = false;
    790                 }
    791 
    792                 onAccessUnitComplete(trackIndex, accessUnit);
    793                 break;
    794             }
    795 
    796             case 'seek':
    797             {
    798                 sp<AMessage> doneMsg;
    799                 CHECK(msg->findMessage("doneMsg", &doneMsg));
    800 
    801                 if (mSeekPending) {
    802                     doneMsg->post();
    803                     break;
    804                 }
    805 
    806                 if (!mSeekable) {
    807                     LOGW("This is a live stream, ignoring seek request.");
    808                     doneMsg->post();
    809                     break;
    810                 }
    811 
    812                 int64_t timeUs;
    813                 CHECK(msg->findInt64("time", &timeUs));
    814 
    815                 mSeekPending = true;
    816 
    817                 // Disable the access unit timeout until we resumed
    818                 // playback again.
    819                 mCheckPending = true;
    820                 ++mCheckGeneration;
    821 
    822                 AString request = "PAUSE ";
    823                 request.append(mSessionURL);
    824                 request.append(" RTSP/1.0\r\n");
    825 
    826                 request.append("Session: ");
    827                 request.append(mSessionID);
    828                 request.append("\r\n");
    829 
    830                 request.append("\r\n");
    831 
    832                 sp<AMessage> reply = new AMessage('see1', id());
    833                 reply->setInt64("time", timeUs);
    834                 reply->setMessage("doneMsg", doneMsg);
    835                 mConn->sendRequest(request.c_str(), reply);
    836                 break;
    837             }
    838 
    839             case 'see1':
    840             {
    841                 // Session is paused now.
    842                 for (size_t i = 0; i < mTracks.size(); ++i) {
    843                     TrackInfo *info = &mTracks.editItemAt(i);
    844 
    845                     info->mPacketSource->flushQueue();
    846                     info->mRTPAnchor = 0;
    847                     info->mNTPAnchorUs = -1;
    848                 }
    849 
    850                 mNTPAnchorUs = -1;
    851 
    852                 int64_t timeUs;
    853                 CHECK(msg->findInt64("time", &timeUs));
    854 
    855                 AString request = "PLAY ";
    856                 request.append(mSessionURL);
    857                 request.append(" RTSP/1.0\r\n");
    858 
    859                 request.append("Session: ");
    860                 request.append(mSessionID);
    861                 request.append("\r\n");
    862 
    863                 request.append(
    864                         StringPrintf(
    865                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
    866 
    867                 request.append("\r\n");
    868 
    869                 sp<AMessage> doneMsg;
    870                 CHECK(msg->findMessage("doneMsg", &doneMsg));
    871 
    872                 sp<AMessage> reply = new AMessage('see2', id());
    873                 reply->setMessage("doneMsg", doneMsg);
    874                 mConn->sendRequest(request.c_str(), reply);
    875                 break;
    876             }
    877 
    878             case 'see2':
    879             {
    880                 CHECK(mSeekPending);
    881 
    882                 int32_t result;
    883                 CHECK(msg->findInt32("result", &result));
    884 
    885                 LOGI("PLAY completed with result %d (%s)",
    886                      result, strerror(-result));
    887 
    888                 mCheckPending = false;
    889                 postAccessUnitTimeoutCheck();
    890 
    891                 if (result == OK) {
    892                     sp<RefBase> obj;
    893                     CHECK(msg->findObject("response", &obj));
    894                     sp<ARTSPResponse> response =
    895                         static_cast<ARTSPResponse *>(obj.get());
    896 
    897                     if (response->mStatusCode != 200) {
    898                         result = UNKNOWN_ERROR;
    899                     } else {
    900                         parsePlayResponse(response);
    901 
    902                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
    903                         CHECK_GE(i, 0);
    904 
    905                         LOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
    906 
    907                         LOGI("seek completed.");
    908                     }
    909                 }
    910 
    911                 if (result != OK) {
    912                     LOGE("seek failed, aborting.");
    913                     (new AMessage('abor', id()))->post();
    914                 }
    915 
    916                 mSeekPending = false;
    917 
    918                 sp<AMessage> doneMsg;
    919                 CHECK(msg->findMessage("doneMsg", &doneMsg));
    920 
    921                 doneMsg->post();
    922                 break;
    923             }
    924 
    925             case 'biny':
    926             {
    927                 sp<RefBase> obj;
    928                 CHECK(msg->findObject("buffer", &obj));
    929                 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
    930 
    931                 int32_t index;
    932                 CHECK(buffer->meta()->findInt32("index", &index));
    933 
    934                 mRTPConn->injectPacket(index, buffer);
    935                 break;
    936             }
    937 
    938             case 'tiou':
    939             {
    940                 if (!mReceivedFirstRTCPPacket) {
    941                     if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
    942                         LOGW("We received RTP packets but no RTCP packets, "
    943                              "using fake timestamps.");
    944 
    945                         mTryFakeRTCP = true;
    946 
    947                         mReceivedFirstRTCPPacket = true;
    948 
    949                         fakeTimestamps();
    950                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
    951                         LOGW("Never received any data, switching transports.");
    952 
    953                         mTryTCPInterleaving = true;
    954 
    955                         sp<AMessage> msg = new AMessage('abor', id());
    956                         msg->setInt32("reconnect", true);
    957                         msg->post();
    958                     } else {
    959                         LOGW("Never received any data, disconnecting.");
    960                         (new AMessage('abor', id()))->post();
    961                     }
    962                 }
    963                 break;
    964             }
    965 
    966             default:
    967                 TRESPASS();
    968                 break;
    969         }
    970     }
    971 
    972     void postAccessUnitTimeoutCheck() {
    973         if (mCheckPending) {
    974             return;
    975         }
    976 
    977         mCheckPending = true;
    978         sp<AMessage> check = new AMessage('chek', id());
    979         check->setInt32("generation", mCheckGeneration);
    980         check->post(kAccessUnitTimeoutUs);
    981     }
    982 
    983     static void SplitString(
    984             const AString &s, const char *separator, List<AString> *items) {
    985         items->clear();
    986         size_t start = 0;
    987         while (start < s.size()) {
    988             ssize_t offset = s.find(separator, start);
    989 
    990             if (offset < 0) {
    991                 items->push_back(AString(s, start, s.size() - start));
    992                 break;
    993             }
    994 
    995             items->push_back(AString(s, start, offset - start));
    996             start = offset + strlen(separator);
    997         }
    998     }
    999 
   1000     void parsePlayResponse(const sp<ARTSPResponse> &response) {
   1001         mSeekable = false;
   1002 
   1003         ssize_t i = response->mHeaders.indexOfKey("range");
   1004         if (i < 0) {
   1005             // Server doesn't even tell use what range it is going to
   1006             // play, therefore we won't support seeking.
   1007             return;
   1008         }
   1009 
   1010         AString range = response->mHeaders.valueAt(i);
   1011         LOGV("Range: %s", range.c_str());
   1012 
   1013         AString val;
   1014         CHECK(GetAttribute(range.c_str(), "npt", &val));
   1015 
   1016         float npt1, npt2;
   1017         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
   1018             // This is a live stream and therefore not seekable.
   1019             return;
   1020         }
   1021 
   1022         i = response->mHeaders.indexOfKey("rtp-info");
   1023         CHECK_GE(i, 0);
   1024 
   1025         AString rtpInfo = response->mHeaders.valueAt(i);
   1026         List<AString> streamInfos;
   1027         SplitString(rtpInfo, ",", &streamInfos);
   1028 
   1029         int n = 1;
   1030         for (List<AString>::iterator it = streamInfos.begin();
   1031              it != streamInfos.end(); ++it) {
   1032             (*it).trim();
   1033             LOGV("streamInfo[%d] = %s", n, (*it).c_str());
   1034 
   1035             CHECK(GetAttribute((*it).c_str(), "url", &val));
   1036 
   1037             size_t trackIndex = 0;
   1038             while (trackIndex < mTracks.size()
   1039                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
   1040                 ++trackIndex;
   1041             }
   1042             CHECK_LT(trackIndex, mTracks.size());
   1043 
   1044             CHECK(GetAttribute((*it).c_str(), "seq", &val));
   1045 
   1046             char *end;
   1047             unsigned long seq = strtoul(val.c_str(), &end, 10);
   1048 
   1049             TrackInfo *info = &mTracks.editItemAt(trackIndex);
   1050             info->mFirstSeqNumInSegment = seq;
   1051             info->mNewSegment = true;
   1052 
   1053             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
   1054 
   1055             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
   1056 
   1057             LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
   1058 
   1059             info->mPacketSource->setNormalPlayTimeMapping(
   1060                     rtpTime, (int64_t)(npt1 * 1E6));
   1061 
   1062             ++n;
   1063         }
   1064 
   1065         mSeekable = true;
   1066     }
   1067 
   1068     sp<APacketSource> getPacketSource(size_t index) {
   1069         CHECK_GE(index, 0u);
   1070         CHECK_LT(index, mTracks.size());
   1071 
   1072         return mTracks.editItemAt(index).mPacketSource;
   1073     }
   1074 
   1075     size_t countTracks() const {
   1076         return mTracks.size();
   1077     }
   1078 
   1079 private:
   1080     struct TrackInfo {
   1081         AString mURL;
   1082         int mRTPSocket;
   1083         int mRTCPSocket;
   1084         bool mUsingInterleavedTCP;
   1085         uint32_t mFirstSeqNumInSegment;
   1086         bool mNewSegment;
   1087 
   1088         uint32_t mRTPAnchor;
   1089         int64_t mNTPAnchorUs;
   1090         int32_t mTimeScale;
   1091 
   1092         sp<APacketSource> mPacketSource;
   1093 
   1094         // Stores packets temporarily while no notion of time
   1095         // has been established yet.
   1096         List<sp<ABuffer> > mPackets;
   1097     };
   1098 
   1099     bool mUIDValid;
   1100     uid_t mUID;
   1101     sp<ALooper> mLooper;
   1102     sp<ALooper> mNetLooper;
   1103     sp<ARTSPConnection> mConn;
   1104     sp<ARTPConnection> mRTPConn;
   1105     sp<ASessionDescription> mSessionDesc;
   1106     AString mOriginalSessionURL;  // This one still has user:pass@
   1107     AString mSessionURL;
   1108     AString mSessionHost;
   1109     AString mBaseURL;
   1110     AString mSessionID;
   1111     bool mSetupTracksSuccessful;
   1112     bool mSeekPending;
   1113     bool mFirstAccessUnit;
   1114 
   1115     int64_t mNTPAnchorUs;
   1116     int64_t mMediaAnchorUs;
   1117     int64_t mLastMediaTimeUs;
   1118 
   1119     int64_t mNumAccessUnitsReceived;
   1120     bool mCheckPending;
   1121     int32_t mCheckGeneration;
   1122     bool mTryTCPInterleaving;
   1123     bool mTryFakeRTCP;
   1124     bool mReceivedFirstRTCPPacket;
   1125     bool mReceivedFirstRTPPacket;
   1126     bool mSeekable;
   1127 
   1128     Vector<TrackInfo> mTracks;
   1129 
   1130     sp<AMessage> mDoneMsg;
   1131 
   1132     void setupTrack(size_t index) {
   1133         sp<APacketSource> source =
   1134             new APacketSource(mSessionDesc, index);
   1135 
   1136         if (source->initCheck() != OK) {
   1137             LOGW("Unsupported format. Ignoring track #%d.", index);
   1138 
   1139             sp<AMessage> reply = new AMessage('setu', id());
   1140             reply->setSize("index", index);
   1141             reply->setInt32("result", ERROR_UNSUPPORTED);
   1142             reply->post();
   1143             return;
   1144         }
   1145 
   1146         AString url;
   1147         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
   1148 
   1149         AString trackURL;
   1150         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
   1151 
   1152         mTracks.push(TrackInfo());
   1153         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
   1154         info->mURL = trackURL;
   1155         info->mPacketSource = source;
   1156         info->mUsingInterleavedTCP = false;
   1157         info->mFirstSeqNumInSegment = 0;
   1158         info->mNewSegment = true;
   1159         info->mRTPAnchor = 0;
   1160         info->mNTPAnchorUs = -1;
   1161 
   1162         unsigned long PT;
   1163         AString formatDesc;
   1164         AString formatParams;
   1165         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
   1166 
   1167         int32_t timescale;
   1168         int32_t numChannels;
   1169         ASessionDescription::ParseFormatDesc(
   1170                 formatDesc.c_str(), &timescale, &numChannels);
   1171 
   1172         info->mTimeScale = timescale;
   1173 
   1174         LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
   1175 
   1176         AString request = "SETUP ";
   1177         request.append(trackURL);
   1178         request.append(" RTSP/1.0\r\n");
   1179 
   1180         if (mTryTCPInterleaving) {
   1181             size_t interleaveIndex = 2 * (mTracks.size() - 1);
   1182             info->mUsingInterleavedTCP = true;
   1183             info->mRTPSocket = interleaveIndex;
   1184             info->mRTCPSocket = interleaveIndex + 1;
   1185 
   1186             request.append("Transport: RTP/AVP/TCP;interleaved=");
   1187             request.append(interleaveIndex);
   1188             request.append("-");
   1189             request.append(interleaveIndex + 1);
   1190         } else {
   1191             unsigned rtpPort;
   1192             ARTPConnection::MakePortPair(
   1193                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
   1194 
   1195             if (mUIDValid) {
   1196                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
   1197                                                 (uint32_t)*(uint32_t*) "RTP_");
   1198                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
   1199                                                 (uint32_t)*(uint32_t*) "RTP_");
   1200             }
   1201 
   1202             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
   1203             request.append(rtpPort);
   1204             request.append("-");
   1205             request.append(rtpPort + 1);
   1206         }
   1207 
   1208         request.append("\r\n");
   1209 
   1210         if (index > 1) {
   1211             request.append("Session: ");
   1212             request.append(mSessionID);
   1213             request.append("\r\n");
   1214         }
   1215 
   1216         request.append("\r\n");
   1217 
   1218         sp<AMessage> reply = new AMessage('setu', id());
   1219         reply->setSize("index", index);
   1220         reply->setSize("track-index", mTracks.size() - 1);
   1221         mConn->sendRequest(request.c_str(), reply);
   1222     }
   1223 
   1224     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
   1225         out->clear();
   1226 
   1227         if (strncasecmp("rtsp://", baseURL, 7)) {
   1228             // Base URL must be absolute
   1229             return false;
   1230         }
   1231 
   1232         if (!strncasecmp("rtsp://", url, 7)) {
   1233             // "url" is already an absolute URL, ignore base URL.
   1234             out->setTo(url);
   1235             return true;
   1236         }
   1237 
   1238         size_t n = strlen(baseURL);
   1239         if (baseURL[n - 1] == '/') {
   1240             out->setTo(baseURL);
   1241             out->append(url);
   1242         } else {
   1243             const char *slashPos = strrchr(baseURL, '/');
   1244 
   1245             if (slashPos > &baseURL[6]) {
   1246                 out->setTo(baseURL, slashPos - baseURL);
   1247             } else {
   1248                 out->setTo(baseURL);
   1249             }
   1250 
   1251             out->append("/");
   1252             out->append(url);
   1253         }
   1254 
   1255         return true;
   1256     }
   1257 
   1258     void fakeTimestamps() {
   1259         for (size_t i = 0; i < mTracks.size(); ++i) {
   1260             onTimeUpdate(i, 0, 0ll);
   1261         }
   1262     }
   1263 
   1264     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
   1265         LOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
   1266              trackIndex, rtpTime, ntpTime);
   1267 
   1268         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
   1269 
   1270         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1271 
   1272         track->mRTPAnchor = rtpTime;
   1273         track->mNTPAnchorUs = ntpTimeUs;
   1274 
   1275         if (mNTPAnchorUs < 0) {
   1276             mNTPAnchorUs = ntpTimeUs;
   1277             mMediaAnchorUs = mLastMediaTimeUs;
   1278         }
   1279     }
   1280 
   1281     void onAccessUnitComplete(
   1282             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
   1283         LOGV("onAccessUnitComplete track %d", trackIndex);
   1284 
   1285         if (mFirstAccessUnit) {
   1286             mDoneMsg->setInt32("result", OK);
   1287             mDoneMsg->post();
   1288             mDoneMsg = NULL;
   1289 
   1290             mFirstAccessUnit = false;
   1291         }
   1292 
   1293         TrackInfo *track = &mTracks.editItemAt(trackIndex);
   1294 
   1295         if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) {
   1296             LOGV("storing accessUnit, no time established yet");
   1297             track->mPackets.push_back(accessUnit);
   1298             return;
   1299         }
   1300 
   1301         while (!track->mPackets.empty()) {
   1302             sp<ABuffer> accessUnit = *track->mPackets.begin();
   1303             track->mPackets.erase(track->mPackets.begin());
   1304 
   1305             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1306                 track->mPacketSource->queueAccessUnit(accessUnit);
   1307             }
   1308         }
   1309 
   1310         if (addMediaTimestamp(trackIndex, track, accessUnit)) {
   1311             track->mPacketSource->queueAccessUnit(accessUnit);
   1312         }
   1313     }
   1314 
   1315     bool addMediaTimestamp(
   1316             int32_t trackIndex, const TrackInfo *track,
   1317             const sp<ABuffer> &accessUnit) {
   1318         uint32_t rtpTime;
   1319         CHECK(accessUnit->meta()->findInt32(
   1320                     "rtp-time", (int32_t *)&rtpTime));
   1321 
   1322         int64_t relRtpTimeUs =
   1323             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
   1324                 / track->mTimeScale;
   1325 
   1326         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
   1327 
   1328         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
   1329 
   1330         if (mediaTimeUs > mLastMediaTimeUs) {
   1331             mLastMediaTimeUs = mediaTimeUs;
   1332         }
   1333 
   1334         if (mediaTimeUs < 0) {
   1335             LOGV("dropping early accessUnit.");
   1336             return false;
   1337         }
   1338 
   1339         LOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
   1340              trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
   1341 
   1342         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
   1343 
   1344         return true;
   1345     }
   1346 
   1347 
   1348     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
   1349 };
   1350 
   1351 }  // namespace android
   1352 
   1353 #endif  // MY_HANDLER_H_
   1354