Home | History | Annotate | Download | only in rtsp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "ARTSPConnection"
     19 #include <utils/Log.h>
     20 
     21 #include "ARTSPConnection.h"
     22 
     23 #include <media/stagefright/foundation/ABuffer.h>
     24 #include <media/stagefright/foundation/ADebug.h>
     25 #include <media/stagefright/foundation/AMessage.h>
     26 #include <media/stagefright/MediaErrors.h>
     27 
     28 #include <arpa/inet.h>
     29 #include <fcntl.h>
     30 #include <netdb.h>
     31 #include <sys/socket.h>
     32 
     33 namespace android {
     34 
     35 // static
     36 const int64_t ARTSPConnection::kSelectTimeoutUs = 1000ll;
     37 
     38 ARTSPConnection::ARTSPConnection()
     39     : mState(DISCONNECTED),
     40       mSocket(-1),
     41       mConnectionID(0),
     42       mNextCSeq(0),
     43       mReceiveResponseEventPending(false) {
     44 }
     45 
     46 ARTSPConnection::~ARTSPConnection() {
     47     if (mSocket >= 0) {
     48         LOGE("Connection is still open, closing the socket.");
     49         close(mSocket);
     50         mSocket = -1;
     51     }
     52 }
     53 
     54 void ARTSPConnection::connect(const char *url, const sp<AMessage> &reply) {
     55     sp<AMessage> msg = new AMessage(kWhatConnect, id());
     56     msg->setString("url", url);
     57     msg->setMessage("reply", reply);
     58     msg->post();
     59 }
     60 
     61 void ARTSPConnection::disconnect(const sp<AMessage> &reply) {
     62     sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
     63     msg->setMessage("reply", reply);
     64     msg->post();
     65 }
     66 
     67 void ARTSPConnection::sendRequest(
     68         const char *request, const sp<AMessage> &reply) {
     69     sp<AMessage> msg = new AMessage(kWhatSendRequest, id());
     70     msg->setString("request", request);
     71     msg->setMessage("reply", reply);
     72     msg->post();
     73 }
     74 
     75 void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) {
     76     sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id());
     77     msg->setMessage("reply", reply);
     78     msg->post();
     79 }
     80 
     81 void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) {
     82     switch (msg->what()) {
     83         case kWhatConnect:
     84             onConnect(msg);
     85             break;
     86 
     87         case kWhatDisconnect:
     88             onDisconnect(msg);
     89             break;
     90 
     91         case kWhatCompleteConnection:
     92             onCompleteConnection(msg);
     93             break;
     94 
     95         case kWhatSendRequest:
     96             onSendRequest(msg);
     97             break;
     98 
     99         case kWhatReceiveResponse:
    100             onReceiveResponse();
    101             break;
    102 
    103         case kWhatObserveBinaryData:
    104         {
    105             CHECK(msg->findMessage("reply", &mObserveBinaryMessage));
    106             break;
    107         }
    108 
    109         default:
    110             TRESPASS();
    111             break;
    112     }
    113 }
    114 
    115 // static
    116 bool ARTSPConnection::ParseURL(
    117         const char *url, AString *host, unsigned *port, AString *path) {
    118     host->clear();
    119     *port = 0;
    120     path->clear();
    121 
    122     if (strncasecmp("rtsp://", url, 7)) {
    123         return false;
    124     }
    125 
    126     const char *slashPos = strchr(&url[7], '/');
    127 
    128     if (slashPos == NULL) {
    129         host->setTo(&url[7]);
    130         path->setTo("/");
    131     } else {
    132         host->setTo(&url[7], slashPos - &url[7]);
    133         path->setTo(slashPos);
    134     }
    135 
    136     char *colonPos = strchr(host->c_str(), ':');
    137 
    138     if (colonPos != NULL) {
    139         unsigned long x;
    140         if (!ParseSingleUnsignedLong(colonPos + 1, &x) || x >= 65536) {
    141             return false;
    142         }
    143 
    144         *port = x;
    145 
    146         size_t colonOffset = colonPos - host->c_str();
    147         size_t trailing = host->size() - colonOffset;
    148         host->erase(colonOffset, trailing);
    149     } else {
    150         *port = 554;
    151     }
    152 
    153     return true;
    154 }
    155 
    156 static void MakeSocketBlocking(int s, bool blocking) {
    157     // Make socket non-blocking.
    158     int flags = fcntl(s, F_GETFL, 0);
    159     CHECK_NE(flags, -1);
    160 
    161     if (blocking) {
    162         flags &= ~O_NONBLOCK;
    163     } else {
    164         flags |= O_NONBLOCK;
    165     }
    166 
    167     CHECK_NE(fcntl(s, F_SETFL, flags), -1);
    168 }
    169 
    170 void ARTSPConnection::onConnect(const sp<AMessage> &msg) {
    171     ++mConnectionID;
    172 
    173     if (mState != DISCONNECTED) {
    174         close(mSocket);
    175         mSocket = -1;
    176 
    177         flushPendingRequests();
    178     }
    179 
    180     mState = CONNECTING;
    181 
    182     AString url;
    183     CHECK(msg->findString("url", &url));
    184 
    185     sp<AMessage> reply;
    186     CHECK(msg->findMessage("reply", &reply));
    187 
    188     AString host, path;
    189     unsigned port;
    190     if (!ParseURL(url.c_str(), &host, &port, &path)) {
    191         LOGE("Malformed rtsp url %s", url.c_str());
    192 
    193         reply->setInt32("result", ERROR_MALFORMED);
    194         reply->post();
    195 
    196         mState = DISCONNECTED;
    197         return;
    198     }
    199 
    200     struct hostent *ent = gethostbyname(host.c_str());
    201     if (ent == NULL) {
    202         LOGE("Unknown host %s", host.c_str());
    203 
    204         reply->setInt32("result", -ENOENT);
    205         reply->post();
    206 
    207         mState = DISCONNECTED;
    208         return;
    209     }
    210 
    211     mSocket = socket(AF_INET, SOCK_STREAM, 0);
    212 
    213     MakeSocketBlocking(mSocket, false);
    214 
    215     struct sockaddr_in remote;
    216     memset(remote.sin_zero, 0, sizeof(remote.sin_zero));
    217     remote.sin_family = AF_INET;
    218     remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    219     remote.sin_port = htons(port);
    220 
    221     int err = ::connect(
    222             mSocket, (const struct sockaddr *)&remote, sizeof(remote));
    223 
    224     reply->setInt32("server-ip", ntohl(remote.sin_addr.s_addr));
    225 
    226     if (err < 0) {
    227         if (errno == EINPROGRESS) {
    228             sp<AMessage> msg = new AMessage(kWhatCompleteConnection, id());
    229             msg->setMessage("reply", reply);
    230             msg->setInt32("connection-id", mConnectionID);
    231             msg->post();
    232             return;
    233         }
    234 
    235         reply->setInt32("result", -errno);
    236         mState = DISCONNECTED;
    237 
    238         close(mSocket);
    239         mSocket = -1;
    240     } else {
    241         reply->setInt32("result", OK);
    242         mState = CONNECTED;
    243         mNextCSeq = 1;
    244 
    245         postReceiveReponseEvent();
    246     }
    247 
    248     reply->post();
    249 }
    250 
    251 void ARTSPConnection::onDisconnect(const sp<AMessage> &msg) {
    252     if (mState == CONNECTED || mState == CONNECTING) {
    253         close(mSocket);
    254         mSocket = -1;
    255 
    256         flushPendingRequests();
    257     }
    258 
    259     sp<AMessage> reply;
    260     CHECK(msg->findMessage("reply", &reply));
    261 
    262     reply->setInt32("result", OK);
    263     mState = DISCONNECTED;
    264 
    265     reply->post();
    266 }
    267 
    268 void ARTSPConnection::onCompleteConnection(const sp<AMessage> &msg) {
    269     sp<AMessage> reply;
    270     CHECK(msg->findMessage("reply", &reply));
    271 
    272     int32_t connectionID;
    273     CHECK(msg->findInt32("connection-id", &connectionID));
    274 
    275     if ((connectionID != mConnectionID) || mState != CONNECTING) {
    276         // While we were attempting to connect, the attempt was
    277         // cancelled.
    278         reply->setInt32("result", -ECONNABORTED);
    279         reply->post();
    280         return;
    281     }
    282 
    283     struct timeval tv;
    284     tv.tv_sec = 0;
    285     tv.tv_usec = kSelectTimeoutUs;
    286 
    287     fd_set ws;
    288     FD_ZERO(&ws);
    289     FD_SET(mSocket, &ws);
    290 
    291     int res = select(mSocket + 1, NULL, &ws, NULL, &tv);
    292     CHECK_GE(res, 0);
    293 
    294     if (res == 0) {
    295         // Timed out. Not yet connected.
    296 
    297         msg->post();
    298         return;
    299     }
    300 
    301     int err;
    302     socklen_t optionLen = sizeof(err);
    303     CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
    304     CHECK_EQ(optionLen, (socklen_t)sizeof(err));
    305 
    306     if (err != 0) {
    307         LOGE("err = %d (%s)", err, strerror(err));
    308 
    309         reply->setInt32("result", -err);
    310 
    311         mState = DISCONNECTED;
    312         close(mSocket);
    313         mSocket = -1;
    314     } else {
    315         reply->setInt32("result", OK);
    316         mState = CONNECTED;
    317         mNextCSeq = 1;
    318 
    319         postReceiveReponseEvent();
    320     }
    321 
    322     reply->post();
    323 }
    324 
    325 void ARTSPConnection::onSendRequest(const sp<AMessage> &msg) {
    326     sp<AMessage> reply;
    327     CHECK(msg->findMessage("reply", &reply));
    328 
    329     if (mState != CONNECTED) {
    330         reply->setInt32("result", -ENOTCONN);
    331         reply->post();
    332         return;
    333     }
    334 
    335     AString request;
    336     CHECK(msg->findString("request", &request));
    337 
    338     // Find the boundary between headers and the body.
    339     ssize_t i = request.find("\r\n\r\n");
    340     CHECK_GE(i, 0);
    341 
    342     int32_t cseq = mNextCSeq++;
    343 
    344     AString cseqHeader = "CSeq: ";
    345     cseqHeader.append(cseq);
    346     cseqHeader.append("\r\n");
    347 
    348     request.insert(cseqHeader, i + 2);
    349 
    350     LOGV("%s", request.c_str());
    351 
    352     size_t numBytesSent = 0;
    353     while (numBytesSent < request.size()) {
    354         ssize_t n =
    355             send(mSocket, request.c_str() + numBytesSent,
    356                  request.size() - numBytesSent, 0);
    357 
    358         if (n == 0) {
    359             // Server closed the connection.
    360             LOGE("Server unexpectedly closed the connection.");
    361 
    362             reply->setInt32("result", ERROR_IO);
    363             reply->post();
    364             return;
    365         } else if (n < 0) {
    366             if (errno == EINTR) {
    367                 continue;
    368             }
    369 
    370             LOGE("Error sending rtsp request.");
    371             reply->setInt32("result", -errno);
    372             reply->post();
    373             return;
    374         }
    375 
    376         numBytesSent += (size_t)n;
    377     }
    378 
    379     mPendingRequests.add(cseq, reply);
    380 }
    381 
    382 void ARTSPConnection::onReceiveResponse() {
    383     mReceiveResponseEventPending = false;
    384 
    385     if (mState != CONNECTED) {
    386         return;
    387     }
    388 
    389     struct timeval tv;
    390     tv.tv_sec = 0;
    391     tv.tv_usec = kSelectTimeoutUs;
    392 
    393     fd_set rs;
    394     FD_ZERO(&rs);
    395     FD_SET(mSocket, &rs);
    396 
    397     int res = select(mSocket + 1, &rs, NULL, NULL, &tv);
    398     CHECK_GE(res, 0);
    399 
    400     if (res == 1) {
    401         MakeSocketBlocking(mSocket, true);
    402 
    403         bool success = receiveRTSPReponse();
    404 
    405         MakeSocketBlocking(mSocket, false);
    406 
    407         if (!success) {
    408             // Something horrible, irreparable has happened.
    409             flushPendingRequests();
    410             return;
    411         }
    412     }
    413 
    414     postReceiveReponseEvent();
    415 }
    416 
    417 void ARTSPConnection::flushPendingRequests() {
    418     for (size_t i = 0; i < mPendingRequests.size(); ++i) {
    419         sp<AMessage> reply = mPendingRequests.valueAt(i);
    420 
    421         reply->setInt32("result", -ECONNABORTED);
    422         reply->post();
    423     }
    424 
    425     mPendingRequests.clear();
    426 }
    427 
    428 void ARTSPConnection::postReceiveReponseEvent() {
    429     if (mReceiveResponseEventPending) {
    430         return;
    431     }
    432 
    433     sp<AMessage> msg = new AMessage(kWhatReceiveResponse, id());
    434     msg->post();
    435 
    436     mReceiveResponseEventPending = true;
    437 }
    438 
    439 status_t ARTSPConnection::receive(void *data, size_t size) {
    440     size_t offset = 0;
    441     while (offset < size) {
    442         ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0);
    443         if (n == 0) {
    444             // Server closed the connection.
    445             LOGE("Server unexpectedly closed the connection.");
    446             return ERROR_IO;
    447         } else if (n < 0) {
    448             if (errno == EINTR) {
    449                 continue;
    450             }
    451 
    452             LOGE("Error reading rtsp response.");
    453             return -errno;
    454         }
    455 
    456         offset += (size_t)n;
    457     }
    458 
    459     return OK;
    460 }
    461 
    462 bool ARTSPConnection::receiveLine(AString *line) {
    463     line->clear();
    464 
    465     bool sawCR = false;
    466     for (;;) {
    467         char c;
    468         if (receive(&c, 1) != OK) {
    469             return false;
    470         }
    471 
    472         if (sawCR && c == '\n') {
    473             line->erase(line->size() - 1, 1);
    474             return true;
    475         }
    476 
    477         line->append(&c, 1);
    478 
    479         if (c == '$' && line->size() == 1) {
    480             // Special-case for interleaved binary data.
    481             return true;
    482         }
    483 
    484         sawCR = (c == '\r');
    485     }
    486 }
    487 
    488 sp<ABuffer> ARTSPConnection::receiveBinaryData() {
    489     uint8_t x[3];
    490     if (receive(x, 3) != OK) {
    491         return NULL;
    492     }
    493 
    494     sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]);
    495     if (receive(buffer->data(), buffer->size()) != OK) {
    496         return NULL;
    497     }
    498 
    499     buffer->meta()->setInt32("index", (int32_t)x[0]);
    500 
    501     return buffer;
    502 }
    503 
    504 bool ARTSPConnection::receiveRTSPReponse() {
    505     AString statusLine;
    506 
    507     if (!receiveLine(&statusLine)) {
    508         return false;
    509     }
    510 
    511     if (statusLine == "$") {
    512         sp<ABuffer> buffer = receiveBinaryData();
    513 
    514         if (buffer == NULL) {
    515             return false;
    516         }
    517 
    518         if (mObserveBinaryMessage != NULL) {
    519             sp<AMessage> notify = mObserveBinaryMessage->dup();
    520             notify->setObject("buffer", buffer);
    521             notify->post();
    522         } else {
    523             LOGW("received binary data, but no one cares.");
    524         }
    525 
    526         return true;
    527     }
    528 
    529     sp<ARTSPResponse> response = new ARTSPResponse;
    530     response->mStatusLine = statusLine;
    531 
    532     LOGI("status: %s", response->mStatusLine.c_str());
    533 
    534     ssize_t space1 = response->mStatusLine.find(" ");
    535     if (space1 < 0) {
    536         return false;
    537     }
    538     ssize_t space2 = response->mStatusLine.find(" ", space1 + 1);
    539     if (space2 < 0) {
    540         return false;
    541     }
    542 
    543     AString statusCodeStr(
    544             response->mStatusLine, space1 + 1, space2 - space1 - 1);
    545 
    546     if (!ParseSingleUnsignedLong(
    547                 statusCodeStr.c_str(), &response->mStatusCode)
    548             || response->mStatusCode < 100 || response->mStatusCode > 999) {
    549         return false;
    550     }
    551 
    552     AString line;
    553     for (;;) {
    554         if (!receiveLine(&line)) {
    555             break;
    556         }
    557 
    558         if (line.empty()) {
    559             break;
    560         }
    561 
    562         LOGV("line: %s", line.c_str());
    563 
    564         ssize_t colonPos = line.find(":");
    565         if (colonPos < 0) {
    566             // Malformed header line.
    567             return false;
    568         }
    569 
    570         AString key(line, 0, colonPos);
    571         key.trim();
    572         key.tolower();
    573 
    574         line.erase(0, colonPos + 1);
    575         line.trim();
    576 
    577         response->mHeaders.add(key, line);
    578     }
    579 
    580     unsigned long contentLength = 0;
    581 
    582     ssize_t i = response->mHeaders.indexOfKey("content-length");
    583 
    584     if (i >= 0) {
    585         AString value = response->mHeaders.valueAt(i);
    586         if (!ParseSingleUnsignedLong(value.c_str(), &contentLength)) {
    587             return false;
    588         }
    589     }
    590 
    591     if (contentLength > 0) {
    592         response->mContent = new ABuffer(contentLength);
    593 
    594         size_t numBytesRead = 0;
    595         while (numBytesRead < contentLength) {
    596             ssize_t n = recv(
    597                     mSocket, response->mContent->data() + numBytesRead,
    598                     contentLength - numBytesRead, 0);
    599 
    600             if (n == 0) {
    601                 // Server closed the connection.
    602                 TRESPASS();
    603             } else if (n < 0) {
    604                 if (errno == EINTR) {
    605                     continue;
    606                 }
    607 
    608                 TRESPASS();
    609             }
    610 
    611             numBytesRead += (size_t)n;
    612         }
    613     }
    614 
    615     return notifyResponseListener(response);
    616 }
    617 
    618 // static
    619 bool ARTSPConnection::ParseSingleUnsignedLong(
    620         const char *from, unsigned long *x) {
    621     char *end;
    622     *x = strtoul(from, &end, 10);
    623 
    624     if (end == from || *end != '\0') {
    625         return false;
    626     }
    627 
    628     return true;
    629 }
    630 
    631 bool ARTSPConnection::notifyResponseListener(
    632         const sp<ARTSPResponse> &response) {
    633     ssize_t i = response->mHeaders.indexOfKey("cseq");
    634 
    635     if (i < 0) {
    636         return true;
    637     }
    638 
    639     AString value = response->mHeaders.valueAt(i);
    640 
    641     unsigned long cseq;
    642     if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) {
    643         return false;
    644     }
    645 
    646     i = mPendingRequests.indexOfKey(cseq);
    647 
    648     if (i < 0) {
    649         // Unsolicited response?
    650         TRESPASS();
    651     }
    652 
    653     sp<AMessage> reply = mPendingRequests.valueAt(i);
    654     mPendingRequests.removeItemsAt(i);
    655 
    656     reply->setInt32("result", OK);
    657     reply->setObject("response", response);
    658     reply->post();
    659 
    660     return true;
    661 }
    662 
    663 }  // namespace android
    664