Home | History | Annotate | Download | only in foundation
      1 /*
      2  * Copyright 2012, The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "NetworkSession"
     19 #include <utils/Log.h>
     20 
     21 #include "ANetworkSession.h"
     22 #include "ParsedMessage.h"
     23 
     24 #include <arpa/inet.h>
     25 #include <fcntl.h>
     26 #include <linux/tcp.h>
     27 #include <net/if.h>
     28 #include <netdb.h>
     29 #include <netinet/in.h>
     30 #include <sys/ioctl.h>
     31 #include <sys/socket.h>
     32 
     33 #include <media/stagefright/foundation/ABuffer.h>
     34 #include <media/stagefright/foundation/ADebug.h>
     35 #include <media/stagefright/foundation/AMessage.h>
     36 #include <media/stagefright/foundation/hexdump.h>
     37 
     38 namespace android {
     39 
     40 static uint16_t U16_AT(const uint8_t *ptr) {
     41     return ptr[0] << 8 | ptr[1];
     42 }
     43 
     44 static uint32_t U32_AT(const uint8_t *ptr) {
     45     return ptr[0] << 24 | ptr[1] << 16 | ptr[2] << 8 | ptr[3];
     46 }
     47 
     48 static uint64_t U64_AT(const uint8_t *ptr) {
     49     return ((uint64_t)U32_AT(ptr)) << 32 | U32_AT(ptr + 4);
     50 }
     51 
     52 static const size_t kMaxUDPSize = 1500;
     53 static const int32_t kMaxUDPRetries = 200;
     54 
     55 struct ANetworkSession::NetworkThread : public Thread {
     56     explicit NetworkThread(ANetworkSession *session);
     57 
     58 protected:
     59     virtual ~NetworkThread();
     60 
     61 private:
     62     ANetworkSession *mSession;
     63 
     64     virtual bool threadLoop();
     65 
     66     DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
     67 };
     68 
     69 struct ANetworkSession::Session : public RefBase {
     70     enum Mode {
     71         MODE_RTSP,
     72         MODE_DATAGRAM,
     73         MODE_WEBSOCKET,
     74     };
     75 
     76     enum State {
     77         CONNECTING,
     78         CONNECTED,
     79         LISTENING_RTSP,
     80         LISTENING_TCP_DGRAMS,
     81         DATAGRAM,
     82     };
     83 
     84     Session(int32_t sessionID,
     85             State state,
     86             int s,
     87             const sp<AMessage> &notify);
     88 
     89     int32_t sessionID() const;
     90     int socket() const;
     91     sp<AMessage> getNotificationMessage() const;
     92 
     93     bool isRTSPServer() const;
     94     bool isTCPDatagramServer() const;
     95 
     96     bool wantsToRead();
     97     bool wantsToWrite();
     98 
     99     status_t readMore();
    100     status_t writeMore();
    101 
    102     status_t sendRequest(
    103             const void *data, ssize_t size, bool timeValid, int64_t timeUs);
    104 
    105     void setMode(Mode mode);
    106 
    107     status_t switchToWebSocketMode();
    108 
    109 protected:
    110     virtual ~Session();
    111 
    112 private:
    113     enum {
    114         FRAGMENT_FLAG_TIME_VALID = 1,
    115     };
    116     struct Fragment {
    117         uint32_t mFlags;
    118         int64_t mTimeUs;
    119         sp<ABuffer> mBuffer;
    120     };
    121 
    122     int32_t mSessionID;
    123     State mState;
    124     Mode mMode;
    125     int mSocket;
    126     sp<AMessage> mNotify;
    127     bool mSawReceiveFailure, mSawSendFailure;
    128     int32_t mUDPRetries;
    129 
    130     List<Fragment> mOutFragments;
    131 
    132     AString mInBuffer;
    133 
    134     int64_t mLastStallReportUs;
    135 
    136     void notifyError(bool send, status_t err, const char *detail);
    137     void notify(NotificationReason reason);
    138 
    139     void dumpFragmentStats(const Fragment &frag);
    140 
    141     DISALLOW_EVIL_CONSTRUCTORS(Session);
    142 };
    143 ////////////////////////////////////////////////////////////////////////////////
    144 
    145 ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
    146     : mSession(session) {
    147 }
    148 
    149 ANetworkSession::NetworkThread::~NetworkThread() {
    150 }
    151 
    152 bool ANetworkSession::NetworkThread::threadLoop() {
    153     mSession->threadLoop();
    154 
    155     return true;
    156 }
    157 
    158 ////////////////////////////////////////////////////////////////////////////////
    159 
    160 ANetworkSession::Session::Session(
    161         int32_t sessionID,
    162         State state,
    163         int s,
    164         const sp<AMessage> &notify)
    165     : mSessionID(sessionID),
    166       mState(state),
    167       mMode(MODE_DATAGRAM),
    168       mSocket(s),
    169       mNotify(notify),
    170       mSawReceiveFailure(false),
    171       mSawSendFailure(false),
    172       mUDPRetries(kMaxUDPRetries),
    173       mLastStallReportUs(-1ll) {
    174     if (mState == CONNECTED) {
    175         struct sockaddr_in localAddr;
    176         socklen_t localAddrLen = sizeof(localAddr);
    177 
    178         int res = getsockname(
    179                 mSocket, (struct sockaddr *)&localAddr, &localAddrLen);
    180         CHECK_GE(res, 0);
    181 
    182         struct sockaddr_in remoteAddr;
    183         socklen_t remoteAddrLen = sizeof(remoteAddr);
    184 
    185         res = getpeername(
    186                 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
    187         CHECK_GE(res, 0);
    188 
    189         in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
    190         AString localAddrString = AStringPrintf(
    191                 "%d.%d.%d.%d",
    192                 (addr >> 24),
    193                 (addr >> 16) & 0xff,
    194                 (addr >> 8) & 0xff,
    195                 addr & 0xff);
    196 
    197         addr = ntohl(remoteAddr.sin_addr.s_addr);
    198         AString remoteAddrString = AStringPrintf(
    199                 "%d.%d.%d.%d",
    200                 (addr >> 24),
    201                 (addr >> 16) & 0xff,
    202                 (addr >> 8) & 0xff,
    203                 addr & 0xff);
    204 
    205         sp<AMessage> msg = mNotify->dup();
    206         msg->setInt32("sessionID", mSessionID);
    207         msg->setInt32("reason", kWhatClientConnected);
    208         msg->setString("server-ip", localAddrString.c_str());
    209         msg->setInt32("server-port", ntohs(localAddr.sin_port));
    210         msg->setString("client-ip", remoteAddrString.c_str());
    211         msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
    212         msg->post();
    213     }
    214 }
    215 
    216 ANetworkSession::Session::~Session() {
    217     ALOGV("Session %d gone", mSessionID);
    218 
    219     close(mSocket);
    220     mSocket = -1;
    221 }
    222 
    223 int32_t ANetworkSession::Session::sessionID() const {
    224     return mSessionID;
    225 }
    226 
    227 int ANetworkSession::Session::socket() const {
    228     return mSocket;
    229 }
    230 
    231 void ANetworkSession::Session::setMode(Mode mode) {
    232     mMode = mode;
    233 }
    234 
    235 status_t ANetworkSession::Session::switchToWebSocketMode() {
    236     if (mState != CONNECTED || mMode != MODE_RTSP) {
    237         return INVALID_OPERATION;
    238     }
    239 
    240     mMode = MODE_WEBSOCKET;
    241 
    242     return OK;
    243 }
    244 
    245 sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
    246     return mNotify;
    247 }
    248 
    249 bool ANetworkSession::Session::isRTSPServer() const {
    250     return mState == LISTENING_RTSP;
    251 }
    252 
    253 bool ANetworkSession::Session::isTCPDatagramServer() const {
    254     return mState == LISTENING_TCP_DGRAMS;
    255 }
    256 
    257 bool ANetworkSession::Session::wantsToRead() {
    258     return !mSawReceiveFailure && mState != CONNECTING;
    259 }
    260 
    261 bool ANetworkSession::Session::wantsToWrite() {
    262     return !mSawSendFailure
    263         && (mState == CONNECTING
    264             || (mState == CONNECTED && !mOutFragments.empty())
    265             || (mState == DATAGRAM && !mOutFragments.empty()));
    266 }
    267 
    268 status_t ANetworkSession::Session::readMore() {
    269     if (mState == DATAGRAM) {
    270         CHECK_EQ(mMode, MODE_DATAGRAM);
    271 
    272         status_t err;
    273         do {
    274             sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
    275 
    276             struct sockaddr_in remoteAddr;
    277             socklen_t remoteAddrLen = sizeof(remoteAddr);
    278 
    279             ssize_t n;
    280             do {
    281                 n = recvfrom(
    282                         mSocket, buf->data(), buf->capacity(), 0,
    283                         (struct sockaddr *)&remoteAddr, &remoteAddrLen);
    284             } while (n < 0 && errno == EINTR);
    285 
    286             err = OK;
    287             if (n < 0) {
    288                 err = -errno;
    289             } else if (n == 0) {
    290                 err = -ECONNRESET;
    291             } else {
    292                 buf->setRange(0, n);
    293 
    294                 int64_t nowUs = ALooper::GetNowUs();
    295                 buf->meta()->setInt64("arrivalTimeUs", nowUs);
    296 
    297                 sp<AMessage> notify = mNotify->dup();
    298                 notify->setInt32("sessionID", mSessionID);
    299                 notify->setInt32("reason", kWhatDatagram);
    300 
    301                 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
    302                 notify->setString(
    303                         "fromAddr",
    304                         AStringPrintf(
    305                             "%u.%u.%u.%u",
    306                             ip >> 24,
    307                             (ip >> 16) & 0xff,
    308                             (ip >> 8) & 0xff,
    309                             ip & 0xff).c_str());
    310 
    311                 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
    312 
    313                 notify->setBuffer("data", buf);
    314                 notify->post();
    315             }
    316         } while (err == OK);
    317 
    318         if (err == -EAGAIN) {
    319             err = OK;
    320         }
    321 
    322         if (err != OK) {
    323             if (!mUDPRetries) {
    324                 notifyError(false /* send */, err, "Recvfrom failed.");
    325                 mSawReceiveFailure = true;
    326             } else {
    327                 mUDPRetries--;
    328                 ALOGE("Recvfrom failed, %d/%d retries left",
    329                         mUDPRetries, kMaxUDPRetries);
    330                 err = OK;
    331             }
    332         } else {
    333             mUDPRetries = kMaxUDPRetries;
    334         }
    335 
    336         return err;
    337     }
    338 
    339     char tmp[512];
    340     ssize_t n;
    341     do {
    342         n = recv(mSocket, tmp, sizeof(tmp), 0);
    343     } while (n < 0 && errno == EINTR);
    344 
    345     status_t err = OK;
    346 
    347     if (n > 0) {
    348         mInBuffer.append(tmp, n);
    349 
    350 #if 0
    351         ALOGI("in:");
    352         hexdump(tmp, n);
    353 #endif
    354     } else if (n < 0) {
    355         err = -errno;
    356     } else {
    357         err = -ECONNRESET;
    358     }
    359 
    360     if (mMode == MODE_DATAGRAM) {
    361         // TCP stream carrying 16-bit length-prefixed datagrams.
    362 
    363         while (mInBuffer.size() >= 2) {
    364             size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
    365 
    366             if (mInBuffer.size() < packetSize + 2) {
    367                 break;
    368             }
    369 
    370             sp<ABuffer> packet = new ABuffer(packetSize);
    371             memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
    372 
    373             int64_t nowUs = ALooper::GetNowUs();
    374             packet->meta()->setInt64("arrivalTimeUs", nowUs);
    375 
    376             sp<AMessage> notify = mNotify->dup();
    377             notify->setInt32("sessionID", mSessionID);
    378             notify->setInt32("reason", kWhatDatagram);
    379             notify->setBuffer("data", packet);
    380             notify->post();
    381 
    382             mInBuffer.erase(0, packetSize + 2);
    383         }
    384     } else if (mMode == MODE_RTSP) {
    385         for (;;) {
    386             size_t length;
    387 
    388             if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
    389                 if (mInBuffer.size() < 4) {
    390                     break;
    391                 }
    392 
    393                 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
    394 
    395                 if (mInBuffer.size() < 4 + length) {
    396                     break;
    397                 }
    398 
    399                 sp<AMessage> notify = mNotify->dup();
    400                 notify->setInt32("sessionID", mSessionID);
    401                 notify->setInt32("reason", kWhatBinaryData);
    402                 notify->setInt32("channel", mInBuffer.c_str()[1]);
    403 
    404                 sp<ABuffer> data = new ABuffer(length);
    405                 memcpy(data->data(), mInBuffer.c_str() + 4, length);
    406 
    407                 int64_t nowUs = ALooper::GetNowUs();
    408                 data->meta()->setInt64("arrivalTimeUs", nowUs);
    409 
    410                 notify->setBuffer("data", data);
    411                 notify->post();
    412 
    413                 mInBuffer.erase(0, 4 + length);
    414                 continue;
    415             }
    416 
    417             sp<ParsedMessage> msg =
    418                 ParsedMessage::Parse(
    419                         mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
    420 
    421             if (msg == NULL) {
    422                 break;
    423             }
    424 
    425             sp<AMessage> notify = mNotify->dup();
    426             notify->setInt32("sessionID", mSessionID);
    427             notify->setInt32("reason", kWhatData);
    428             notify->setObject("data", msg);
    429             notify->post();
    430 
    431 #if 1
    432             // XXX The (old) dongle sends the wrong content length header on a
    433             // SET_PARAMETER request that signals a "wfd_idr_request".
    434             // (17 instead of 19).
    435             const char *content = msg->getContent();
    436             if (content
    437                     && !memcmp(content, "wfd_idr_request\r\n", 17)
    438                     && length >= 19
    439                     && mInBuffer.c_str()[length] == '\r'
    440                     && mInBuffer.c_str()[length + 1] == '\n') {
    441                 length += 2;
    442             }
    443 #endif
    444 
    445             mInBuffer.erase(0, length);
    446 
    447             if (err != OK) {
    448                 break;
    449             }
    450         }
    451     } else {
    452         CHECK_EQ(mMode, MODE_WEBSOCKET);
    453 
    454         const uint8_t *data = (const uint8_t *)mInBuffer.c_str();
    455         // hexdump(data, mInBuffer.size());
    456 
    457         while (mInBuffer.size() >= 2) {
    458             size_t offset = 2;
    459 
    460             uint64_t payloadLen = data[1] & 0x7f;
    461             if (payloadLen == 126) {
    462                 if (offset + 2 > mInBuffer.size()) {
    463                     break;
    464                 }
    465 
    466                 payloadLen = U16_AT(&data[offset]);
    467                 offset += 2;
    468             } else if (payloadLen == 127) {
    469                 if (offset + 8 > mInBuffer.size()) {
    470                     break;
    471                 }
    472 
    473                 payloadLen = U64_AT(&data[offset]);
    474                 offset += 8;
    475             }
    476 
    477             uint32_t mask = 0;
    478             if (data[1] & 0x80) {
    479                 // MASK==1
    480                 if (offset + 4 > mInBuffer.size()) {
    481                     break;
    482                 }
    483 
    484                 mask = U32_AT(&data[offset]);
    485                 offset += 4;
    486             }
    487 
    488             if (payloadLen > mInBuffer.size() || offset > mInBuffer.size() - payloadLen) {
    489                 break;
    490             }
    491 
    492             // We have the full message.
    493 
    494             sp<ABuffer> packet = new ABuffer(payloadLen);
    495             memcpy(packet->data(), &data[offset], payloadLen);
    496 
    497             if (mask != 0) {
    498                 for (size_t i = 0; i < payloadLen; ++i) {
    499                     packet->data()[i] =
    500                         data[offset + i]
    501                             ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
    502                 }
    503             }
    504 
    505             sp<AMessage> notify = mNotify->dup();
    506             notify->setInt32("sessionID", mSessionID);
    507             notify->setInt32("reason", kWhatWebSocketMessage);
    508             notify->setBuffer("data", packet);
    509             notify->setInt32("headerByte", data[0]);
    510             notify->post();
    511 
    512             mInBuffer.erase(0, offset + payloadLen);
    513         }
    514     }
    515 
    516     if (err != OK) {
    517         notifyError(false /* send */, err, "Recv failed.");
    518         mSawReceiveFailure = true;
    519     }
    520 
    521     return err;
    522 }
    523 
    524 void ANetworkSession::Session::dumpFragmentStats(const Fragment & /* frag */) {
    525 #if 0
    526     int64_t nowUs = ALooper::GetNowUs();
    527     int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;
    528 
    529     static const int64_t kMinDelayMs = 0;
    530     static const int64_t kMaxDelayMs = 300;
    531 
    532     const char *kPattern = "########################################";
    533     size_t kPatternSize = strlen(kPattern);
    534 
    535     int n = (kPatternSize * (delayMs - kMinDelayMs))
    536                 / (kMaxDelayMs - kMinDelayMs);
    537 
    538     if (n < 0) {
    539         n = 0;
    540     } else if ((size_t)n > kPatternSize) {
    541         n = kPatternSize;
    542     }
    543 
    544     ALOGI("[%lld]: (%4lld ms) %s\n",
    545           frag.mTimeUs / 1000,
    546           delayMs,
    547           kPattern + kPatternSize - n);
    548 #endif
    549 }
    550 
    551 status_t ANetworkSession::Session::writeMore() {
    552     if (mState == DATAGRAM) {
    553         CHECK(!mOutFragments.empty());
    554 
    555         status_t err;
    556         do {
    557             const Fragment &frag = *mOutFragments.begin();
    558             const sp<ABuffer> &datagram = frag.mBuffer;
    559 
    560             int n;
    561             do {
    562                 n = send(mSocket, datagram->data(), datagram->size(), 0);
    563             } while (n < 0 && errno == EINTR);
    564 
    565             err = OK;
    566 
    567             if (n > 0) {
    568                 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
    569                     dumpFragmentStats(frag);
    570                 }
    571 
    572                 mOutFragments.erase(mOutFragments.begin());
    573             } else if (n < 0) {
    574                 err = -errno;
    575             } else if (n == 0) {
    576                 err = -ECONNRESET;
    577             }
    578         } while (err == OK && !mOutFragments.empty());
    579 
    580         if (err == -EAGAIN) {
    581             if (!mOutFragments.empty()) {
    582                 ALOGI("%zu datagrams remain queued.", mOutFragments.size());
    583             }
    584             err = OK;
    585         }
    586 
    587         if (err != OK) {
    588             if (!mUDPRetries) {
    589                 notifyError(true /* send */, err, "Send datagram failed.");
    590                 mSawSendFailure = true;
    591             } else {
    592                 mUDPRetries--;
    593                 ALOGE("Send datagram failed, %d/%d retries left",
    594                         mUDPRetries, kMaxUDPRetries);
    595                 err = OK;
    596             }
    597         } else {
    598             mUDPRetries = kMaxUDPRetries;
    599         }
    600 
    601         return err;
    602     }
    603 
    604     if (mState == CONNECTING) {
    605         int err;
    606         socklen_t optionLen = sizeof(err);
    607         CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
    608         CHECK_EQ(optionLen, (socklen_t)sizeof(err));
    609 
    610         if (err != 0) {
    611             notifyError(kWhatError, -err, "Connection failed");
    612             mSawSendFailure = true;
    613 
    614             return -err;
    615         }
    616 
    617         mState = CONNECTED;
    618         notify(kWhatConnected);
    619 
    620         return OK;
    621     }
    622 
    623     CHECK_EQ(mState, CONNECTED);
    624     CHECK(!mOutFragments.empty());
    625 
    626     ssize_t n = -1;
    627     while (!mOutFragments.empty()) {
    628         const Fragment &frag = *mOutFragments.begin();
    629 
    630         do {
    631             n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
    632         } while (n < 0 && errno == EINTR);
    633 
    634         if (n <= 0) {
    635             break;
    636         }
    637 
    638         frag.mBuffer->setRange(
    639                 frag.mBuffer->offset() + n, frag.mBuffer->size() - n);
    640 
    641         if (frag.mBuffer->size() > 0) {
    642             break;
    643         }
    644 
    645         if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
    646             dumpFragmentStats(frag);
    647         }
    648 
    649         mOutFragments.erase(mOutFragments.begin());
    650     }
    651 
    652     status_t err = OK;
    653 
    654     if (n < 0) {
    655         err = -errno;
    656     } else if (n == 0) {
    657         err = -ECONNRESET;
    658     }
    659 
    660     if (err != OK) {
    661         notifyError(true /* send */, err, "Send failed.");
    662         mSawSendFailure = true;
    663     }
    664 
    665 #if 0
    666     int numBytesQueued;
    667     int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
    668     if (res == 0 && numBytesQueued > 50 * 1024) {
    669         if (numBytesQueued > 409600) {
    670             ALOGW("!!! numBytesQueued = %d", numBytesQueued);
    671         }
    672 
    673         int64_t nowUs = ALooper::GetNowUs();
    674 
    675         if (mLastStallReportUs < 0ll
    676                 || nowUs > mLastStallReportUs + 100000ll) {
    677             sp<AMessage> msg = mNotify->dup();
    678             msg->setInt32("sessionID", mSessionID);
    679             msg->setInt32("reason", kWhatNetworkStall);
    680             msg->setSize("numBytesQueued", numBytesQueued);
    681             msg->post();
    682 
    683             mLastStallReportUs = nowUs;
    684         }
    685     }
    686 #endif
    687 
    688     return err;
    689 }
    690 
    691 status_t ANetworkSession::Session::sendRequest(
    692         const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
    693     CHECK(mState == CONNECTED || mState == DATAGRAM);
    694 
    695     if (size < 0) {
    696         size = strlen((const char *)data);
    697     }
    698 
    699     if (size == 0) {
    700         return OK;
    701     }
    702 
    703     sp<ABuffer> buffer;
    704 
    705     if (mState == CONNECTED && mMode == MODE_DATAGRAM) {
    706         CHECK_LE(size, 65535);
    707 
    708         buffer = new ABuffer(size + 2);
    709         buffer->data()[0] = size >> 8;
    710         buffer->data()[1] = size & 0xff;
    711         memcpy(buffer->data() + 2, data, size);
    712     } else if (mState == CONNECTED && mMode == MODE_WEBSOCKET) {
    713         static const bool kUseMask = false;  // Chromium doesn't like it.
    714 
    715         size_t numHeaderBytes = 2 + (kUseMask ? 4 : 0);
    716         if (size > 65535) {
    717             numHeaderBytes += 8;
    718         } else if (size > 125) {
    719             numHeaderBytes += 2;
    720         }
    721 
    722         buffer = new ABuffer(numHeaderBytes + size);
    723         buffer->data()[0] = 0x81;  // FIN==1 | opcode=1 (text)
    724         buffer->data()[1] = kUseMask ? 0x80 : 0x00;
    725 
    726         if (size > 65535) {
    727             buffer->data()[1] |= 127;
    728             buffer->data()[2] = 0x00;
    729             buffer->data()[3] = 0x00;
    730             buffer->data()[4] = 0x00;
    731             buffer->data()[5] = 0x00;
    732             buffer->data()[6] = (size >> 24) & 0xff;
    733             buffer->data()[7] = (size >> 16) & 0xff;
    734             buffer->data()[8] = (size >> 8) & 0xff;
    735             buffer->data()[9] = size & 0xff;
    736         } else if (size > 125) {
    737             buffer->data()[1] |= 126;
    738             buffer->data()[2] = (size >> 8) & 0xff;
    739             buffer->data()[3] = size & 0xff;
    740         } else {
    741             buffer->data()[1] |= size;
    742         }
    743 
    744         if (kUseMask) {
    745             uint32_t mask = rand();
    746 
    747             buffer->data()[numHeaderBytes - 4] = (mask >> 24) & 0xff;
    748             buffer->data()[numHeaderBytes - 3] = (mask >> 16) & 0xff;
    749             buffer->data()[numHeaderBytes - 2] = (mask >> 8) & 0xff;
    750             buffer->data()[numHeaderBytes - 1] = mask & 0xff;
    751 
    752             for (size_t i = 0; i < (size_t)size; ++i) {
    753                 buffer->data()[numHeaderBytes + i] =
    754                     ((const uint8_t *)data)[i]
    755                         ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
    756             }
    757         } else {
    758             memcpy(buffer->data() + numHeaderBytes, data, size);
    759         }
    760     } else {
    761         buffer = new ABuffer(size);
    762         memcpy(buffer->data(), data, size);
    763     }
    764 
    765     Fragment frag;
    766 
    767     frag.mFlags = 0;
    768     if (timeValid) {
    769         frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
    770         frag.mTimeUs = timeUs;
    771     }
    772 
    773     frag.mBuffer = buffer;
    774 
    775     mOutFragments.push_back(frag);
    776 
    777     return OK;
    778 }
    779 
    780 void ANetworkSession::Session::notifyError(
    781         bool send, status_t err, const char *detail) {
    782     sp<AMessage> msg = mNotify->dup();
    783     msg->setInt32("sessionID", mSessionID);
    784     msg->setInt32("reason", kWhatError);
    785     msg->setInt32("send", send);
    786     msg->setInt32("err", err);
    787     msg->setString("detail", detail);
    788     msg->post();
    789 }
    790 
    791 void ANetworkSession::Session::notify(NotificationReason reason) {
    792     sp<AMessage> msg = mNotify->dup();
    793     msg->setInt32("sessionID", mSessionID);
    794     msg->setInt32("reason", reason);
    795     msg->post();
    796 }
    797 
    798 ////////////////////////////////////////////////////////////////////////////////
    799 
    800 ANetworkSession::ANetworkSession()
    801     : mNextSessionID(1) {
    802     mPipeFd[0] = mPipeFd[1] = -1;
    803 }
    804 
    805 ANetworkSession::~ANetworkSession() {
    806     stop();
    807 }
    808 
    809 status_t ANetworkSession::start() {
    810     if (mThread != NULL) {
    811         return INVALID_OPERATION;
    812     }
    813 
    814     int res = pipe(mPipeFd);
    815     if (res != 0) {
    816         mPipeFd[0] = mPipeFd[1] = -1;
    817         return -errno;
    818     }
    819 
    820     mThread = new NetworkThread(this);
    821 
    822     status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
    823 
    824     if (err != OK) {
    825         mThread.clear();
    826 
    827         close(mPipeFd[0]);
    828         close(mPipeFd[1]);
    829         mPipeFd[0] = mPipeFd[1] = -1;
    830 
    831         return err;
    832     }
    833 
    834     return OK;
    835 }
    836 
    837 status_t ANetworkSession::stop() {
    838     if (mThread == NULL) {
    839         return INVALID_OPERATION;
    840     }
    841 
    842     mThread->requestExit();
    843     interrupt();
    844     mThread->requestExitAndWait();
    845 
    846     mThread.clear();
    847 
    848     close(mPipeFd[0]);
    849     close(mPipeFd[1]);
    850     mPipeFd[0] = mPipeFd[1] = -1;
    851 
    852     return OK;
    853 }
    854 
    855 status_t ANetworkSession::createRTSPClient(
    856         const char *host, unsigned port, const sp<AMessage> &notify,
    857         int32_t *sessionID) {
    858     return createClientOrServer(
    859             kModeCreateRTSPClient,
    860             NULL /* addr */,
    861             0 /* port */,
    862             host,
    863             port,
    864             notify,
    865             sessionID);
    866 }
    867 
    868 status_t ANetworkSession::createRTSPServer(
    869         const struct in_addr &addr, unsigned port,
    870         const sp<AMessage> &notify, int32_t *sessionID) {
    871     return createClientOrServer(
    872             kModeCreateRTSPServer,
    873             &addr,
    874             port,
    875             NULL /* remoteHost */,
    876             0 /* remotePort */,
    877             notify,
    878             sessionID);
    879 }
    880 
    881 status_t ANetworkSession::createUDPSession(
    882         unsigned localPort, const sp<AMessage> &notify, int32_t *sessionID) {
    883     return createUDPSession(localPort, NULL, 0, notify, sessionID);
    884 }
    885 
    886 status_t ANetworkSession::createUDPSession(
    887         unsigned localPort,
    888         const char *remoteHost,
    889         unsigned remotePort,
    890         const sp<AMessage> &notify,
    891         int32_t *sessionID) {
    892     return createClientOrServer(
    893             kModeCreateUDPSession,
    894             NULL /* addr */,
    895             localPort,
    896             remoteHost,
    897             remotePort,
    898             notify,
    899             sessionID);
    900 }
    901 
    902 status_t ANetworkSession::createTCPDatagramSession(
    903         const struct in_addr &addr, unsigned port,
    904         const sp<AMessage> &notify, int32_t *sessionID) {
    905     return createClientOrServer(
    906             kModeCreateTCPDatagramSessionPassive,
    907             &addr,
    908             port,
    909             NULL /* remoteHost */,
    910             0 /* remotePort */,
    911             notify,
    912             sessionID);
    913 }
    914 
    915 status_t ANetworkSession::createTCPDatagramSession(
    916         unsigned localPort,
    917         const char *remoteHost,
    918         unsigned remotePort,
    919         const sp<AMessage> &notify,
    920         int32_t *sessionID) {
    921     return createClientOrServer(
    922             kModeCreateTCPDatagramSessionActive,
    923             NULL /* addr */,
    924             localPort,
    925             remoteHost,
    926             remotePort,
    927             notify,
    928             sessionID);
    929 }
    930 
    931 status_t ANetworkSession::destroySession(int32_t sessionID) {
    932     Mutex::Autolock autoLock(mLock);
    933 
    934     ssize_t index = mSessions.indexOfKey(sessionID);
    935 
    936     if (index < 0) {
    937         return -ENOENT;
    938     }
    939 
    940     mSessions.removeItemsAt(index);
    941 
    942     interrupt();
    943 
    944     return OK;
    945 }
    946 
    947 // static
    948 status_t ANetworkSession::MakeSocketNonBlocking(int s) {
    949     int flags = fcntl(s, F_GETFL, 0);
    950     if (flags < 0) {
    951         flags = 0;
    952     }
    953 
    954     int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
    955     if (res < 0) {
    956         return -errno;
    957     }
    958 
    959     return OK;
    960 }
    961 
    962 status_t ANetworkSession::createClientOrServer(
    963         Mode mode,
    964         const struct in_addr *localAddr,
    965         unsigned port,
    966         const char *remoteHost,
    967         unsigned remotePort,
    968         const sp<AMessage> &notify,
    969         int32_t *sessionID) {
    970     Mutex::Autolock autoLock(mLock);
    971 
    972     *sessionID = 0;
    973     status_t err = OK;
    974     int s, res;
    975     sp<Session> session;
    976 
    977     s = socket(
    978             AF_INET,
    979             (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
    980             0);
    981 
    982     if (s < 0) {
    983         err = -errno;
    984         goto bail;
    985     }
    986 
    987     if (mode == kModeCreateRTSPServer
    988             || mode == kModeCreateTCPDatagramSessionPassive) {
    989         const int yes = 1;
    990         res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
    991 
    992         if (res < 0) {
    993             err = -errno;
    994             goto bail2;
    995         }
    996     }
    997 
    998     if (mode == kModeCreateUDPSession) {
    999         int size = 256 * 1024;
   1000 
   1001         res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
   1002 
   1003         if (res < 0) {
   1004             err = -errno;
   1005             goto bail2;
   1006         }
   1007 
   1008         res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
   1009 
   1010         if (res < 0) {
   1011             err = -errno;
   1012             goto bail2;
   1013         }
   1014     } else if (mode == kModeCreateTCPDatagramSessionActive) {
   1015         int flag = 1;
   1016         res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
   1017 
   1018         if (res < 0) {
   1019             err = -errno;
   1020             goto bail2;
   1021         }
   1022 
   1023         int tos = 224;  // VOICE
   1024         res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos));
   1025 
   1026         if (res < 0) {
   1027             err = -errno;
   1028             goto bail2;
   1029         }
   1030     }
   1031 
   1032     err = MakeSocketNonBlocking(s);
   1033 
   1034     if (err != OK) {
   1035         goto bail2;
   1036     }
   1037 
   1038     struct sockaddr_in addr;
   1039     memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
   1040     addr.sin_family = AF_INET;
   1041 
   1042     if (mode == kModeCreateRTSPClient
   1043             || mode == kModeCreateTCPDatagramSessionActive) {
   1044         struct hostent *ent= gethostbyname(remoteHost);
   1045         if (ent == NULL) {
   1046             err = -h_errno;
   1047             goto bail2;
   1048         }
   1049 
   1050         addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
   1051         addr.sin_port = htons(remotePort);
   1052     } else if (localAddr != NULL) {
   1053         addr.sin_addr = *localAddr;
   1054         addr.sin_port = htons(port);
   1055     } else {
   1056         addr.sin_addr.s_addr = htonl(INADDR_ANY);
   1057         addr.sin_port = htons(port);
   1058     }
   1059 
   1060     if (mode == kModeCreateRTSPClient
   1061             || mode == kModeCreateTCPDatagramSessionActive) {
   1062         in_addr_t x = ntohl(addr.sin_addr.s_addr);
   1063         ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
   1064               s,
   1065               (x >> 24),
   1066               (x >> 16) & 0xff,
   1067               (x >> 8) & 0xff,
   1068               x & 0xff,
   1069               ntohs(addr.sin_port));
   1070 
   1071         res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
   1072 
   1073         CHECK_LT(res, 0);
   1074         if (errno == EINPROGRESS) {
   1075             res = 0;
   1076         }
   1077     } else {
   1078         res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
   1079 
   1080         if (res == 0) {
   1081             if (mode == kModeCreateRTSPServer
   1082                     || mode == kModeCreateTCPDatagramSessionPassive) {
   1083                 res = listen(s, 4);
   1084             } else {
   1085                 CHECK_EQ(mode, kModeCreateUDPSession);
   1086 
   1087                 if (remoteHost != NULL) {
   1088                     struct sockaddr_in remoteAddr;
   1089                     memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
   1090                     remoteAddr.sin_family = AF_INET;
   1091                     remoteAddr.sin_port = htons(remotePort);
   1092 
   1093                     struct hostent *ent= gethostbyname(remoteHost);
   1094                     if (ent == NULL) {
   1095                         err = -h_errno;
   1096                         goto bail2;
   1097                     }
   1098 
   1099                     remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
   1100 
   1101                     res = connect(
   1102                             s,
   1103                             (const struct sockaddr *)&remoteAddr,
   1104                             sizeof(remoteAddr));
   1105                 }
   1106             }
   1107         }
   1108     }
   1109 
   1110     if (res < 0) {
   1111         err = -errno;
   1112         goto bail2;
   1113     }
   1114 
   1115     Session::State state;
   1116     switch (mode) {
   1117         case kModeCreateRTSPClient:
   1118             state = Session::CONNECTING;
   1119             break;
   1120 
   1121         case kModeCreateTCPDatagramSessionActive:
   1122             state = Session::CONNECTING;
   1123             break;
   1124 
   1125         case kModeCreateTCPDatagramSessionPassive:
   1126             state = Session::LISTENING_TCP_DGRAMS;
   1127             break;
   1128 
   1129         case kModeCreateRTSPServer:
   1130             state = Session::LISTENING_RTSP;
   1131             break;
   1132 
   1133         default:
   1134             CHECK_EQ(mode, kModeCreateUDPSession);
   1135             state = Session::DATAGRAM;
   1136             break;
   1137     }
   1138 
   1139     session = new Session(
   1140             mNextSessionID++,
   1141             state,
   1142             s,
   1143             notify);
   1144 
   1145     if (mode == kModeCreateTCPDatagramSessionActive) {
   1146         session->setMode(Session::MODE_DATAGRAM);
   1147     } else if (mode == kModeCreateRTSPClient) {
   1148         session->setMode(Session::MODE_RTSP);
   1149     }
   1150 
   1151     mSessions.add(session->sessionID(), session);
   1152 
   1153     interrupt();
   1154 
   1155     *sessionID = session->sessionID();
   1156 
   1157     goto bail;
   1158 
   1159 bail2:
   1160     close(s);
   1161     s = -1;
   1162 
   1163 bail:
   1164     return err;
   1165 }
   1166 
   1167 status_t ANetworkSession::connectUDPSession(
   1168         int32_t sessionID, const char *remoteHost, unsigned remotePort) {
   1169     Mutex::Autolock autoLock(mLock);
   1170 
   1171     ssize_t index = mSessions.indexOfKey(sessionID);
   1172 
   1173     if (index < 0) {
   1174         return -ENOENT;
   1175     }
   1176 
   1177     const sp<Session> session = mSessions.valueAt(index);
   1178     int s = session->socket();
   1179 
   1180     struct sockaddr_in remoteAddr;
   1181     memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
   1182     remoteAddr.sin_family = AF_INET;
   1183     remoteAddr.sin_port = htons(remotePort);
   1184 
   1185     status_t err = OK;
   1186     struct hostent *ent = gethostbyname(remoteHost);
   1187     if (ent == NULL) {
   1188         err = -h_errno;
   1189     } else {
   1190         remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
   1191 
   1192         int res = connect(
   1193                 s,
   1194                 (const struct sockaddr *)&remoteAddr,
   1195                 sizeof(remoteAddr));
   1196 
   1197         if (res < 0) {
   1198             err = -errno;
   1199         }
   1200     }
   1201 
   1202     return err;
   1203 }
   1204 
   1205 status_t ANetworkSession::sendRequest(
   1206         int32_t sessionID, const void *data, ssize_t size,
   1207         bool timeValid, int64_t timeUs) {
   1208     Mutex::Autolock autoLock(mLock);
   1209 
   1210     ssize_t index = mSessions.indexOfKey(sessionID);
   1211 
   1212     if (index < 0) {
   1213         return -ENOENT;
   1214     }
   1215 
   1216     const sp<Session> session = mSessions.valueAt(index);
   1217 
   1218     status_t err = session->sendRequest(data, size, timeValid, timeUs);
   1219 
   1220     interrupt();
   1221 
   1222     return err;
   1223 }
   1224 
   1225 status_t ANetworkSession::switchToWebSocketMode(int32_t sessionID) {
   1226     Mutex::Autolock autoLock(mLock);
   1227 
   1228     ssize_t index = mSessions.indexOfKey(sessionID);
   1229 
   1230     if (index < 0) {
   1231         return -ENOENT;
   1232     }
   1233 
   1234     const sp<Session> session = mSessions.valueAt(index);
   1235     return session->switchToWebSocketMode();
   1236 }
   1237 
   1238 void ANetworkSession::interrupt() {
   1239     static const char dummy = 0;
   1240 
   1241     ssize_t n;
   1242     do {
   1243         n = write(mPipeFd[1], &dummy, 1);
   1244     } while (n < 0 && errno == EINTR);
   1245 
   1246     if (n < 0) {
   1247         ALOGW("Error writing to pipe (%s)", strerror(errno));
   1248     }
   1249 }
   1250 
   1251 void ANetworkSession::threadLoop() {
   1252     fd_set rs, ws;
   1253     FD_ZERO(&rs);
   1254     FD_ZERO(&ws);
   1255 
   1256     FD_SET(mPipeFd[0], &rs);
   1257     int maxFd = mPipeFd[0];
   1258 
   1259     {
   1260         Mutex::Autolock autoLock(mLock);
   1261 
   1262         for (size_t i = 0; i < mSessions.size(); ++i) {
   1263             const sp<Session> &session = mSessions.valueAt(i);
   1264 
   1265             int s = session->socket();
   1266 
   1267             if (s < 0) {
   1268                 continue;
   1269             }
   1270 
   1271             if (session->wantsToRead()) {
   1272                 FD_SET(s, &rs);
   1273                 if (s > maxFd) {
   1274                     maxFd = s;
   1275                 }
   1276             }
   1277 
   1278             if (session->wantsToWrite()) {
   1279                 FD_SET(s, &ws);
   1280                 if (s > maxFd) {
   1281                     maxFd = s;
   1282                 }
   1283             }
   1284         }
   1285     }
   1286 
   1287     int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
   1288 
   1289     if (res == 0) {
   1290         return;
   1291     }
   1292 
   1293     if (res < 0) {
   1294         if (errno == EINTR) {
   1295             return;
   1296         }
   1297 
   1298         ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
   1299         return;
   1300     }
   1301 
   1302     if (FD_ISSET(mPipeFd[0], &rs)) {
   1303         char c;
   1304         ssize_t n;
   1305         do {
   1306             n = read(mPipeFd[0], &c, 1);
   1307         } while (n < 0 && errno == EINTR);
   1308 
   1309         if (n < 0) {
   1310             ALOGW("Error reading from pipe (%s)", strerror(errno));
   1311         }
   1312 
   1313         --res;
   1314     }
   1315 
   1316     {
   1317         Mutex::Autolock autoLock(mLock);
   1318 
   1319         List<sp<Session> > sessionsToAdd;
   1320 
   1321         for (size_t i = mSessions.size(); res > 0 && i > 0;) {
   1322             i--;
   1323             const sp<Session> &session = mSessions.valueAt(i);
   1324 
   1325             int s = session->socket();
   1326 
   1327             if (s < 0) {
   1328                 continue;
   1329             }
   1330 
   1331             if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
   1332                 --res;
   1333             }
   1334 
   1335             if (FD_ISSET(s, &rs)) {
   1336                 if (session->isRTSPServer() || session->isTCPDatagramServer()) {
   1337                     struct sockaddr_in remoteAddr;
   1338                     socklen_t remoteAddrLen = sizeof(remoteAddr);
   1339 
   1340                     int clientSocket = accept(
   1341                             s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
   1342 
   1343                     if (clientSocket >= 0) {
   1344                         status_t err = MakeSocketNonBlocking(clientSocket);
   1345 
   1346                         if (err != OK) {
   1347                             ALOGE("Unable to make client socket non blocking, "
   1348                                   "failed w/ error %d (%s)",
   1349                                   err, strerror(-err));
   1350 
   1351                             close(clientSocket);
   1352                             clientSocket = -1;
   1353                         } else {
   1354                             in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
   1355 
   1356                             ALOGI("incoming connection from %d.%d.%d.%d:%d "
   1357                                   "(socket %d)",
   1358                                   (addr >> 24),
   1359                                   (addr >> 16) & 0xff,
   1360                                   (addr >> 8) & 0xff,
   1361                                   addr & 0xff,
   1362                                   ntohs(remoteAddr.sin_port),
   1363                                   clientSocket);
   1364 
   1365                             sp<Session> clientSession =
   1366                                 new Session(
   1367                                         mNextSessionID++,
   1368                                         Session::CONNECTED,
   1369                                         clientSocket,
   1370                                         session->getNotificationMessage());
   1371 
   1372                             clientSession->setMode(
   1373                                     session->isRTSPServer()
   1374                                         ? Session::MODE_RTSP
   1375                                         : Session::MODE_DATAGRAM);
   1376 
   1377                             sessionsToAdd.push_back(clientSession);
   1378                         }
   1379                     } else {
   1380                         ALOGE("accept returned error %d (%s)",
   1381                               errno, strerror(errno));
   1382                     }
   1383                 } else {
   1384                     status_t err = session->readMore();
   1385                     if (err != OK) {
   1386                         ALOGE("readMore on socket %d failed w/ error %d (%s)",
   1387                               s, err, strerror(-err));
   1388                     }
   1389                 }
   1390             }
   1391 
   1392             if (FD_ISSET(s, &ws)) {
   1393                 status_t err = session->writeMore();
   1394                 if (err != OK) {
   1395                     ALOGE("writeMore on socket %d failed w/ error %d (%s)",
   1396                           s, err, strerror(-err));
   1397                 }
   1398             }
   1399         }
   1400 
   1401         while (!sessionsToAdd.empty()) {
   1402             sp<Session> session = *sessionsToAdd.begin();
   1403             sessionsToAdd.erase(sessionsToAdd.begin());
   1404 
   1405             mSessions.add(session->sessionID(), session);
   1406 
   1407             ALOGI("added clientSession %d", session->sessionID());
   1408         }
   1409     }
   1410 }
   1411 
   1412 }  // namespace android
   1413