Home | History | Annotate | Download | only in foundation
      1 /*
      2  * Copyright 2012, The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "NetworkSession"
     19 #include <utils/Log.h>
     20 
     21 #include "ANetworkSession.h"
     22 #include "ParsedMessage.h"
     23 
     24 #include <arpa/inet.h>
     25 #include <fcntl.h>
     26 #include <linux/tcp.h>
     27 #include <net/if.h>
     28 #include <netdb.h>
     29 #include <netinet/in.h>
     30 #include <sys/ioctl.h>
     31 #include <sys/socket.h>
     32 
     33 #include <media/stagefright/foundation/ABuffer.h>
     34 #include <media/stagefright/foundation/ADebug.h>
     35 #include <media/stagefright/foundation/AMessage.h>
     36 #include <media/stagefright/foundation/ByteUtils.h>
     37 #include <media/stagefright/foundation/hexdump.h>
     38 
     39 namespace android {
     40 
     41 static const size_t kMaxUDPSize = 1500;
     42 static const int32_t kMaxUDPRetries = 200;
     43 
     44 struct ANetworkSession::NetworkThread : public Thread {
     45     explicit NetworkThread(ANetworkSession *session);
     46 
     47 protected:
     48     virtual ~NetworkThread();
     49 
     50 private:
     51     ANetworkSession *mSession;
     52 
     53     virtual bool threadLoop();
     54 
     55     DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
     56 };
     57 
     58 struct ANetworkSession::Session : public RefBase {
     59     enum Mode {
     60         MODE_RTSP,
     61         MODE_DATAGRAM,
     62         MODE_WEBSOCKET,
     63     };
     64 
     65     enum State {
     66         CONNECTING,
     67         CONNECTED,
     68         LISTENING_RTSP,
     69         LISTENING_TCP_DGRAMS,
     70         DATAGRAM,
     71     };
     72 
     73     Session(int32_t sessionID,
     74             State state,
     75             int s,
     76             const sp<AMessage> &notify);
     77 
     78     int32_t sessionID() const;
     79     int socket() const;
     80     sp<AMessage> getNotificationMessage() const;
     81 
     82     bool isRTSPServer() const;
     83     bool isTCPDatagramServer() const;
     84 
     85     bool wantsToRead();
     86     bool wantsToWrite();
     87 
     88     status_t readMore();
     89     status_t writeMore();
     90 
     91     status_t sendRequest(
     92             const void *data, ssize_t size, bool timeValid, int64_t timeUs);
     93 
     94     void setMode(Mode mode);
     95 
     96     status_t switchToWebSocketMode();
     97 
     98 protected:
     99     virtual ~Session();
    100 
    101 private:
    102     enum {
    103         FRAGMENT_FLAG_TIME_VALID = 1,
    104     };
    105     struct Fragment {
    106         uint32_t mFlags;
    107         int64_t mTimeUs;
    108         sp<ABuffer> mBuffer;
    109     };
    110 
    111     int32_t mSessionID;
    112     State mState;
    113     Mode mMode;
    114     int mSocket;
    115     sp<AMessage> mNotify;
    116     bool mSawReceiveFailure, mSawSendFailure;
    117     int32_t mUDPRetries;
    118 
    119     List<Fragment> mOutFragments;
    120 
    121     AString mInBuffer;
    122 
    123     int64_t mLastStallReportUs;
    124 
    125     void notifyError(bool send, status_t err, const char *detail);
    126     void notify(NotificationReason reason);
    127 
    128     void dumpFragmentStats(const Fragment &frag);
    129 
    130     DISALLOW_EVIL_CONSTRUCTORS(Session);
    131 };
    132 ////////////////////////////////////////////////////////////////////////////////
    133 
    134 ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
    135     : mSession(session) {
    136 }
    137 
    138 ANetworkSession::NetworkThread::~NetworkThread() {
    139 }
    140 
    141 bool ANetworkSession::NetworkThread::threadLoop() {
    142     mSession->threadLoop();
    143 
    144     return true;
    145 }
    146 
    147 ////////////////////////////////////////////////////////////////////////////////
    148 
    149 ANetworkSession::Session::Session(
    150         int32_t sessionID,
    151         State state,
    152         int s,
    153         const sp<AMessage> &notify)
    154     : mSessionID(sessionID),
    155       mState(state),
    156       mMode(MODE_DATAGRAM),
    157       mSocket(s),
    158       mNotify(notify),
    159       mSawReceiveFailure(false),
    160       mSawSendFailure(false),
    161       mUDPRetries(kMaxUDPRetries),
    162       mLastStallReportUs(-1ll) {
    163     if (mState == CONNECTED) {
    164         struct sockaddr_in localAddr;
    165         socklen_t localAddrLen = sizeof(localAddr);
    166 
    167         int res = getsockname(
    168                 mSocket, (struct sockaddr *)&localAddr, &localAddrLen);
    169         CHECK_GE(res, 0);
    170 
    171         struct sockaddr_in remoteAddr;
    172         socklen_t remoteAddrLen = sizeof(remoteAddr);
    173 
    174         res = getpeername(
    175                 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
    176         CHECK_GE(res, 0);
    177 
    178         in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
    179         AString localAddrString = AStringPrintf(
    180                 "%d.%d.%d.%d",
    181                 (addr >> 24),
    182                 (addr >> 16) & 0xff,
    183                 (addr >> 8) & 0xff,
    184                 addr & 0xff);
    185 
    186         addr = ntohl(remoteAddr.sin_addr.s_addr);
    187         AString remoteAddrString = AStringPrintf(
    188                 "%d.%d.%d.%d",
    189                 (addr >> 24),
    190                 (addr >> 16) & 0xff,
    191                 (addr >> 8) & 0xff,
    192                 addr & 0xff);
    193 
    194         sp<AMessage> msg = mNotify->dup();
    195         msg->setInt32("sessionID", mSessionID);
    196         msg->setInt32("reason", kWhatClientConnected);
    197         msg->setString("server-ip", localAddrString.c_str());
    198         msg->setInt32("server-port", ntohs(localAddr.sin_port));
    199         msg->setString("client-ip", remoteAddrString.c_str());
    200         msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
    201         msg->post();
    202     }
    203 }
    204 
    205 ANetworkSession::Session::~Session() {
    206     ALOGV("Session %d gone", mSessionID);
    207 
    208     close(mSocket);
    209     mSocket = -1;
    210 }
    211 
    212 int32_t ANetworkSession::Session::sessionID() const {
    213     return mSessionID;
    214 }
    215 
    216 int ANetworkSession::Session::socket() const {
    217     return mSocket;
    218 }
    219 
    220 void ANetworkSession::Session::setMode(Mode mode) {
    221     mMode = mode;
    222 }
    223 
    224 status_t ANetworkSession::Session::switchToWebSocketMode() {
    225     if (mState != CONNECTED || mMode != MODE_RTSP) {
    226         return INVALID_OPERATION;
    227     }
    228 
    229     mMode = MODE_WEBSOCKET;
    230 
    231     return OK;
    232 }
    233 
    234 sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
    235     return mNotify;
    236 }
    237 
    238 bool ANetworkSession::Session::isRTSPServer() const {
    239     return mState == LISTENING_RTSP;
    240 }
    241 
    242 bool ANetworkSession::Session::isTCPDatagramServer() const {
    243     return mState == LISTENING_TCP_DGRAMS;
    244 }
    245 
    246 bool ANetworkSession::Session::wantsToRead() {
    247     return !mSawReceiveFailure && mState != CONNECTING;
    248 }
    249 
    250 bool ANetworkSession::Session::wantsToWrite() {
    251     return !mSawSendFailure
    252         && (mState == CONNECTING
    253             || (mState == CONNECTED && !mOutFragments.empty())
    254             || (mState == DATAGRAM && !mOutFragments.empty()));
    255 }
    256 
    257 status_t ANetworkSession::Session::readMore() {
    258     if (mState == DATAGRAM) {
    259         CHECK_EQ(mMode, MODE_DATAGRAM);
    260 
    261         status_t err;
    262         do {
    263             sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
    264 
    265             struct sockaddr_in remoteAddr;
    266             socklen_t remoteAddrLen = sizeof(remoteAddr);
    267 
    268             ssize_t n;
    269             do {
    270                 n = recvfrom(
    271                         mSocket, buf->data(), buf->capacity(), 0,
    272                         (struct sockaddr *)&remoteAddr, &remoteAddrLen);
    273             } while (n < 0 && errno == EINTR);
    274 
    275             err = OK;
    276             if (n < 0) {
    277                 err = -errno;
    278             } else if (n == 0) {
    279                 err = -ECONNRESET;
    280             } else {
    281                 buf->setRange(0, n);
    282 
    283                 int64_t nowUs = ALooper::GetNowUs();
    284                 buf->meta()->setInt64("arrivalTimeUs", nowUs);
    285 
    286                 sp<AMessage> notify = mNotify->dup();
    287                 notify->setInt32("sessionID", mSessionID);
    288                 notify->setInt32("reason", kWhatDatagram);
    289 
    290                 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
    291                 notify->setString(
    292                         "fromAddr",
    293                         AStringPrintf(
    294                             "%u.%u.%u.%u",
    295                             ip >> 24,
    296                             (ip >> 16) & 0xff,
    297                             (ip >> 8) & 0xff,
    298                             ip & 0xff).c_str());
    299 
    300                 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
    301 
    302                 notify->setBuffer("data", buf);
    303                 notify->post();
    304             }
    305         } while (err == OK);
    306 
    307         if (err == -EAGAIN) {
    308             err = OK;
    309         }
    310 
    311         if (err != OK) {
    312             if (!mUDPRetries) {
    313                 notifyError(false /* send */, err, "Recvfrom failed.");
    314                 mSawReceiveFailure = true;
    315             } else {
    316                 mUDPRetries--;
    317                 ALOGE("Recvfrom failed, %d/%d retries left",
    318                         mUDPRetries, kMaxUDPRetries);
    319                 err = OK;
    320             }
    321         } else {
    322             mUDPRetries = kMaxUDPRetries;
    323         }
    324 
    325         return err;
    326     }
    327 
    328     char tmp[512];
    329     ssize_t n;
    330     do {
    331         n = recv(mSocket, tmp, sizeof(tmp), 0);
    332     } while (n < 0 && errno == EINTR);
    333 
    334     status_t err = OK;
    335 
    336     if (n > 0) {
    337         mInBuffer.append(tmp, n);
    338 
    339 #if 0
    340         ALOGI("in:");
    341         hexdump(tmp, n);
    342 #endif
    343     } else if (n < 0) {
    344         err = -errno;
    345     } else {
    346         err = -ECONNRESET;
    347     }
    348 
    349     if (mMode == MODE_DATAGRAM) {
    350         // TCP stream carrying 16-bit length-prefixed datagrams.
    351 
    352         while (mInBuffer.size() >= 2) {
    353             size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
    354 
    355             if (mInBuffer.size() < packetSize + 2) {
    356                 break;
    357             }
    358 
    359             sp<ABuffer> packet = new ABuffer(packetSize);
    360             memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
    361 
    362             int64_t nowUs = ALooper::GetNowUs();
    363             packet->meta()->setInt64("arrivalTimeUs", nowUs);
    364 
    365             sp<AMessage> notify = mNotify->dup();
    366             notify->setInt32("sessionID", mSessionID);
    367             notify->setInt32("reason", kWhatDatagram);
    368             notify->setBuffer("data", packet);
    369             notify->post();
    370 
    371             mInBuffer.erase(0, packetSize + 2);
    372         }
    373     } else if (mMode == MODE_RTSP) {
    374         for (;;) {
    375             size_t length;
    376 
    377             if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
    378                 if (mInBuffer.size() < 4) {
    379                     break;
    380                 }
    381 
    382                 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
    383 
    384                 if (mInBuffer.size() < 4 + length) {
    385                     break;
    386                 }
    387 
    388                 sp<AMessage> notify = mNotify->dup();
    389                 notify->setInt32("sessionID", mSessionID);
    390                 notify->setInt32("reason", kWhatBinaryData);
    391                 notify->setInt32("channel", mInBuffer.c_str()[1]);
    392 
    393                 sp<ABuffer> data = new ABuffer(length);
    394                 memcpy(data->data(), mInBuffer.c_str() + 4, length);
    395 
    396                 int64_t nowUs = ALooper::GetNowUs();
    397                 data->meta()->setInt64("arrivalTimeUs", nowUs);
    398 
    399                 notify->setBuffer("data", data);
    400                 notify->post();
    401 
    402                 mInBuffer.erase(0, 4 + length);
    403                 continue;
    404             }
    405 
    406             sp<ParsedMessage> msg =
    407                 ParsedMessage::Parse(
    408                         mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
    409 
    410             if (msg == NULL) {
    411                 break;
    412             }
    413 
    414             sp<AMessage> notify = mNotify->dup();
    415             notify->setInt32("sessionID", mSessionID);
    416             notify->setInt32("reason", kWhatData);
    417             notify->setObject("data", msg);
    418             notify->post();
    419 
    420 #if 1
    421             // XXX The (old) dongle sends the wrong content length header on a
    422             // SET_PARAMETER request that signals a "wfd_idr_request".
    423             // (17 instead of 19).
    424             const char *content = msg->getContent();
    425             if (content
    426                     && !memcmp(content, "wfd_idr_request\r\n", 17)
    427                     && length >= 19
    428                     && mInBuffer.c_str()[length] == '\r'
    429                     && mInBuffer.c_str()[length + 1] == '\n') {
    430                 length += 2;
    431             }
    432 #endif
    433 
    434             mInBuffer.erase(0, length);
    435 
    436             if (err != OK) {
    437                 break;
    438             }
    439         }
    440     } else {
    441         CHECK_EQ(mMode, MODE_WEBSOCKET);
    442 
    443         const uint8_t *data = (const uint8_t *)mInBuffer.c_str();
    444         // hexdump(data, mInBuffer.size());
    445 
    446         while (mInBuffer.size() >= 2) {
    447             size_t offset = 2;
    448 
    449             uint64_t payloadLen = data[1] & 0x7f;
    450             if (payloadLen == 126) {
    451                 if (offset + 2 > mInBuffer.size()) {
    452                     break;
    453                 }
    454 
    455                 payloadLen = U16_AT(&data[offset]);
    456                 offset += 2;
    457             } else if (payloadLen == 127) {
    458                 if (offset + 8 > mInBuffer.size()) {
    459                     break;
    460                 }
    461 
    462                 payloadLen = U64_AT(&data[offset]);
    463                 offset += 8;
    464             }
    465 
    466             uint32_t mask = 0;
    467             if (data[1] & 0x80) {
    468                 // MASK==1
    469                 if (offset + 4 > mInBuffer.size()) {
    470                     break;
    471                 }
    472 
    473                 mask = U32_AT(&data[offset]);
    474                 offset += 4;
    475             }
    476 
    477             if (payloadLen > mInBuffer.size() || offset > mInBuffer.size() - payloadLen) {
    478                 break;
    479             }
    480 
    481             // We have the full message.
    482 
    483             sp<ABuffer> packet = new ABuffer(payloadLen);
    484             memcpy(packet->data(), &data[offset], payloadLen);
    485 
    486             if (mask != 0) {
    487                 for (size_t i = 0; i < payloadLen; ++i) {
    488                     packet->data()[i] =
    489                         data[offset + i]
    490                             ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
    491                 }
    492             }
    493 
    494             sp<AMessage> notify = mNotify->dup();
    495             notify->setInt32("sessionID", mSessionID);
    496             notify->setInt32("reason", kWhatWebSocketMessage);
    497             notify->setBuffer("data", packet);
    498             notify->setInt32("headerByte", data[0]);
    499             notify->post();
    500 
    501             mInBuffer.erase(0, offset + payloadLen);
    502         }
    503     }
    504 
    505     if (err != OK) {
    506         notifyError(false /* send */, err, "Recv failed.");
    507         mSawReceiveFailure = true;
    508     }
    509 
    510     return err;
    511 }
    512 
    513 void ANetworkSession::Session::dumpFragmentStats(const Fragment & /* frag */) {
    514 #if 0
    515     int64_t nowUs = ALooper::GetNowUs();
    516     int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;
    517 
    518     static const int64_t kMinDelayMs = 0;
    519     static const int64_t kMaxDelayMs = 300;
    520 
    521     const char *kPattern = "########################################";
    522     size_t kPatternSize = strlen(kPattern);
    523 
    524     int n = (kPatternSize * (delayMs - kMinDelayMs))
    525                 / (kMaxDelayMs - kMinDelayMs);
    526 
    527     if (n < 0) {
    528         n = 0;
    529     } else if ((size_t)n > kPatternSize) {
    530         n = kPatternSize;
    531     }
    532 
    533     ALOGI("[%lld]: (%4lld ms) %s\n",
    534           frag.mTimeUs / 1000,
    535           delayMs,
    536           kPattern + kPatternSize - n);
    537 #endif
    538 }
    539 
    540 status_t ANetworkSession::Session::writeMore() {
    541     if (mState == DATAGRAM) {
    542         CHECK(!mOutFragments.empty());
    543 
    544         status_t err;
    545         do {
    546             const Fragment &frag = *mOutFragments.begin();
    547             const sp<ABuffer> &datagram = frag.mBuffer;
    548 
    549             int n;
    550             do {
    551                 n = send(mSocket, datagram->data(), datagram->size(), 0);
    552             } while (n < 0 && errno == EINTR);
    553 
    554             err = OK;
    555 
    556             if (n > 0) {
    557                 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
    558                     dumpFragmentStats(frag);
    559                 }
    560 
    561                 mOutFragments.erase(mOutFragments.begin());
    562             } else if (n < 0) {
    563                 err = -errno;
    564             } else if (n == 0) {
    565                 err = -ECONNRESET;
    566             }
    567         } while (err == OK && !mOutFragments.empty());
    568 
    569         if (err == -EAGAIN) {
    570             if (!mOutFragments.empty()) {
    571                 ALOGI("%zu datagrams remain queued.", mOutFragments.size());
    572             }
    573             err = OK;
    574         }
    575 
    576         if (err != OK) {
    577             if (!mUDPRetries) {
    578                 notifyError(true /* send */, err, "Send datagram failed.");
    579                 mSawSendFailure = true;
    580             } else {
    581                 mUDPRetries--;
    582                 ALOGE("Send datagram failed, %d/%d retries left",
    583                         mUDPRetries, kMaxUDPRetries);
    584                 err = OK;
    585             }
    586         } else {
    587             mUDPRetries = kMaxUDPRetries;
    588         }
    589 
    590         return err;
    591     }
    592 
    593     if (mState == CONNECTING) {
    594         int err;
    595         socklen_t optionLen = sizeof(err);
    596         CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
    597         CHECK_EQ(optionLen, (socklen_t)sizeof(err));
    598 
    599         if (err != 0) {
    600             notifyError(kWhatError, -err, "Connection failed");
    601             mSawSendFailure = true;
    602 
    603             return -err;
    604         }
    605 
    606         mState = CONNECTED;
    607         notify(kWhatConnected);
    608 
    609         return OK;
    610     }
    611 
    612     CHECK_EQ(mState, CONNECTED);
    613     CHECK(!mOutFragments.empty());
    614 
    615     ssize_t n = -1;
    616     while (!mOutFragments.empty()) {
    617         const Fragment &frag = *mOutFragments.begin();
    618 
    619         do {
    620             n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
    621         } while (n < 0 && errno == EINTR);
    622 
    623         if (n <= 0) {
    624             break;
    625         }
    626 
    627         frag.mBuffer->setRange(
    628                 frag.mBuffer->offset() + n, frag.mBuffer->size() - n);
    629 
    630         if (frag.mBuffer->size() > 0) {
    631             break;
    632         }
    633 
    634         if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
    635             dumpFragmentStats(frag);
    636         }
    637 
    638         mOutFragments.erase(mOutFragments.begin());
    639     }
    640 
    641     status_t err = OK;
    642 
    643     if (n < 0) {
    644         err = -errno;
    645     } else if (n == 0) {
    646         err = -ECONNRESET;
    647     }
    648 
    649     if (err != OK) {
    650         notifyError(true /* send */, err, "Send failed.");
    651         mSawSendFailure = true;
    652     }
    653 
    654 #if 0
    655     int numBytesQueued;
    656     int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
    657     if (res == 0 && numBytesQueued > 50 * 1024) {
    658         if (numBytesQueued > 409600) {
    659             ALOGW("!!! numBytesQueued = %d", numBytesQueued);
    660         }
    661 
    662         int64_t nowUs = ALooper::GetNowUs();
    663 
    664         if (mLastStallReportUs < 0ll
    665                 || nowUs > mLastStallReportUs + 100000ll) {
    666             sp<AMessage> msg = mNotify->dup();
    667             msg->setInt32("sessionID", mSessionID);
    668             msg->setInt32("reason", kWhatNetworkStall);
    669             msg->setSize("numBytesQueued", numBytesQueued);
    670             msg->post();
    671 
    672             mLastStallReportUs = nowUs;
    673         }
    674     }
    675 #endif
    676 
    677     return err;
    678 }
    679 
    680 status_t ANetworkSession::Session::sendRequest(
    681         const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
    682     CHECK(mState == CONNECTED || mState == DATAGRAM);
    683 
    684     if (size < 0) {
    685         size = strlen((const char *)data);
    686     }
    687 
    688     if (size == 0) {
    689         return OK;
    690     }
    691 
    692     sp<ABuffer> buffer;
    693 
    694     if (mState == CONNECTED && mMode == MODE_DATAGRAM) {
    695         CHECK_LE(size, 65535);
    696 
    697         buffer = new ABuffer(size + 2);
    698         buffer->data()[0] = size >> 8;
    699         buffer->data()[1] = size & 0xff;
    700         memcpy(buffer->data() + 2, data, size);
    701     } else if (mState == CONNECTED && mMode == MODE_WEBSOCKET) {
    702         static const bool kUseMask = false;  // Chromium doesn't like it.
    703 
    704         size_t numHeaderBytes = 2 + (kUseMask ? 4 : 0);
    705         if (size > 65535) {
    706             numHeaderBytes += 8;
    707         } else if (size > 125) {
    708             numHeaderBytes += 2;
    709         }
    710 
    711         buffer = new ABuffer(numHeaderBytes + size);
    712         buffer->data()[0] = 0x81;  // FIN==1 | opcode=1 (text)
    713         buffer->data()[1] = kUseMask ? 0x80 : 0x00;
    714 
    715         if (size > 65535) {
    716             buffer->data()[1] |= 127;
    717             buffer->data()[2] = 0x00;
    718             buffer->data()[3] = 0x00;
    719             buffer->data()[4] = 0x00;
    720             buffer->data()[5] = 0x00;
    721             buffer->data()[6] = (size >> 24) & 0xff;
    722             buffer->data()[7] = (size >> 16) & 0xff;
    723             buffer->data()[8] = (size >> 8) & 0xff;
    724             buffer->data()[9] = size & 0xff;
    725         } else if (size > 125) {
    726             buffer->data()[1] |= 126;
    727             buffer->data()[2] = (size >> 8) & 0xff;
    728             buffer->data()[3] = size & 0xff;
    729         } else {
    730             buffer->data()[1] |= size;
    731         }
    732 
    733         if (kUseMask) {
    734             uint32_t mask = rand();
    735 
    736             buffer->data()[numHeaderBytes - 4] = (mask >> 24) & 0xff;
    737             buffer->data()[numHeaderBytes - 3] = (mask >> 16) & 0xff;
    738             buffer->data()[numHeaderBytes - 2] = (mask >> 8) & 0xff;
    739             buffer->data()[numHeaderBytes - 1] = mask & 0xff;
    740 
    741             for (size_t i = 0; i < (size_t)size; ++i) {
    742                 buffer->data()[numHeaderBytes + i] =
    743                     ((const uint8_t *)data)[i]
    744                         ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
    745             }
    746         } else {
    747             memcpy(buffer->data() + numHeaderBytes, data, size);
    748         }
    749     } else {
    750         buffer = new ABuffer(size);
    751         memcpy(buffer->data(), data, size);
    752     }
    753 
    754     Fragment frag;
    755 
    756     frag.mFlags = 0;
    757     if (timeValid) {
    758         frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
    759         frag.mTimeUs = timeUs;
    760     }
    761 
    762     frag.mBuffer = buffer;
    763 
    764     mOutFragments.push_back(frag);
    765 
    766     return OK;
    767 }
    768 
    769 void ANetworkSession::Session::notifyError(
    770         bool send, status_t err, const char *detail) {
    771     sp<AMessage> msg = mNotify->dup();
    772     msg->setInt32("sessionID", mSessionID);
    773     msg->setInt32("reason", kWhatError);
    774     msg->setInt32("send", send);
    775     msg->setInt32("err", err);
    776     msg->setString("detail", detail);
    777     msg->post();
    778 }
    779 
    780 void ANetworkSession::Session::notify(NotificationReason reason) {
    781     sp<AMessage> msg = mNotify->dup();
    782     msg->setInt32("sessionID", mSessionID);
    783     msg->setInt32("reason", reason);
    784     msg->post();
    785 }
    786 
    787 ////////////////////////////////////////////////////////////////////////////////
    788 
    789 ANetworkSession::ANetworkSession()
    790     : mNextSessionID(1) {
    791     mPipeFd[0] = mPipeFd[1] = -1;
    792 }
    793 
    794 ANetworkSession::~ANetworkSession() {
    795     stop();
    796 }
    797 
    798 status_t ANetworkSession::start() {
    799     if (mThread != NULL) {
    800         return INVALID_OPERATION;
    801     }
    802 
    803     int res = pipe(mPipeFd);
    804     if (res != 0) {
    805         mPipeFd[0] = mPipeFd[1] = -1;
    806         return -errno;
    807     }
    808 
    809     mThread = new NetworkThread(this);
    810 
    811     status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
    812 
    813     if (err != OK) {
    814         mThread.clear();
    815 
    816         close(mPipeFd[0]);
    817         close(mPipeFd[1]);
    818         mPipeFd[0] = mPipeFd[1] = -1;
    819 
    820         return err;
    821     }
    822 
    823     return OK;
    824 }
    825 
    826 status_t ANetworkSession::stop() {
    827     if (mThread == NULL) {
    828         return INVALID_OPERATION;
    829     }
    830 
    831     mThread->requestExit();
    832     interrupt();
    833     mThread->requestExitAndWait();
    834 
    835     mThread.clear();
    836 
    837     close(mPipeFd[0]);
    838     close(mPipeFd[1]);
    839     mPipeFd[0] = mPipeFd[1] = -1;
    840 
    841     return OK;
    842 }
    843 
    844 status_t ANetworkSession::createRTSPClient(
    845         const char *host, unsigned port, const sp<AMessage> &notify,
    846         int32_t *sessionID) {
    847     return createClientOrServer(
    848             kModeCreateRTSPClient,
    849             NULL /* addr */,
    850             0 /* port */,
    851             host,
    852             port,
    853             notify,
    854             sessionID);
    855 }
    856 
    857 status_t ANetworkSession::createRTSPServer(
    858         const struct in_addr &addr, unsigned port,
    859         const sp<AMessage> &notify, int32_t *sessionID) {
    860     return createClientOrServer(
    861             kModeCreateRTSPServer,
    862             &addr,
    863             port,
    864             NULL /* remoteHost */,
    865             0 /* remotePort */,
    866             notify,
    867             sessionID);
    868 }
    869 
    870 status_t ANetworkSession::createUDPSession(
    871         unsigned localPort, const sp<AMessage> &notify, int32_t *sessionID) {
    872     return createUDPSession(localPort, NULL, 0, notify, sessionID);
    873 }
    874 
    875 status_t ANetworkSession::createUDPSession(
    876         unsigned localPort,
    877         const char *remoteHost,
    878         unsigned remotePort,
    879         const sp<AMessage> &notify,
    880         int32_t *sessionID) {
    881     return createClientOrServer(
    882             kModeCreateUDPSession,
    883             NULL /* addr */,
    884             localPort,
    885             remoteHost,
    886             remotePort,
    887             notify,
    888             sessionID);
    889 }
    890 
    891 status_t ANetworkSession::createTCPDatagramSession(
    892         const struct in_addr &addr, unsigned port,
    893         const sp<AMessage> &notify, int32_t *sessionID) {
    894     return createClientOrServer(
    895             kModeCreateTCPDatagramSessionPassive,
    896             &addr,
    897             port,
    898             NULL /* remoteHost */,
    899             0 /* remotePort */,
    900             notify,
    901             sessionID);
    902 }
    903 
    904 status_t ANetworkSession::createTCPDatagramSession(
    905         unsigned localPort,
    906         const char *remoteHost,
    907         unsigned remotePort,
    908         const sp<AMessage> &notify,
    909         int32_t *sessionID) {
    910     return createClientOrServer(
    911             kModeCreateTCPDatagramSessionActive,
    912             NULL /* addr */,
    913             localPort,
    914             remoteHost,
    915             remotePort,
    916             notify,
    917             sessionID);
    918 }
    919 
    920 status_t ANetworkSession::destroySession(int32_t sessionID) {
    921     Mutex::Autolock autoLock(mLock);
    922 
    923     ssize_t index = mSessions.indexOfKey(sessionID);
    924 
    925     if (index < 0) {
    926         return -ENOENT;
    927     }
    928 
    929     mSessions.removeItemsAt(index);
    930 
    931     interrupt();
    932 
    933     return OK;
    934 }
    935 
    936 // static
    937 status_t ANetworkSession::MakeSocketNonBlocking(int s) {
    938     int flags = fcntl(s, F_GETFL, 0);
    939     if (flags < 0) {
    940         flags = 0;
    941     }
    942 
    943     int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
    944     if (res < 0) {
    945         return -errno;
    946     }
    947 
    948     return OK;
    949 }
    950 
    951 status_t ANetworkSession::createClientOrServer(
    952         Mode mode,
    953         const struct in_addr *localAddr,
    954         unsigned port,
    955         const char *remoteHost,
    956         unsigned remotePort,
    957         const sp<AMessage> &notify,
    958         int32_t *sessionID) {
    959     Mutex::Autolock autoLock(mLock);
    960 
    961     *sessionID = 0;
    962     status_t err = OK;
    963     int s, res;
    964     sp<Session> session;
    965 
    966     s = socket(
    967             AF_INET,
    968             (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
    969             0);
    970 
    971     if (s < 0) {
    972         err = -errno;
    973         goto bail;
    974     }
    975 
    976     if (mode == kModeCreateRTSPServer
    977             || mode == kModeCreateTCPDatagramSessionPassive) {
    978         const int yes = 1;
    979         res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
    980 
    981         if (res < 0) {
    982             err = -errno;
    983             goto bail2;
    984         }
    985     }
    986 
    987     if (mode == kModeCreateUDPSession) {
    988         int size = 256 * 1024;
    989 
    990         res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
    991 
    992         if (res < 0) {
    993             err = -errno;
    994             goto bail2;
    995         }
    996 
    997         res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
    998 
    999         if (res < 0) {
   1000             err = -errno;
   1001             goto bail2;
   1002         }
   1003     } else if (mode == kModeCreateTCPDatagramSessionActive) {
   1004         int flag = 1;
   1005         res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
   1006 
   1007         if (res < 0) {
   1008             err = -errno;
   1009             goto bail2;
   1010         }
   1011 
   1012         int tos = 224;  // VOICE
   1013         res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos));
   1014 
   1015         if (res < 0) {
   1016             err = -errno;
   1017             goto bail2;
   1018         }
   1019     }
   1020 
   1021     err = MakeSocketNonBlocking(s);
   1022 
   1023     if (err != OK) {
   1024         goto bail2;
   1025     }
   1026 
   1027     struct sockaddr_in addr;
   1028     memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
   1029     addr.sin_family = AF_INET;
   1030 
   1031     if (mode == kModeCreateRTSPClient
   1032             || mode == kModeCreateTCPDatagramSessionActive) {
   1033         struct hostent *ent= gethostbyname(remoteHost);
   1034         if (ent == NULL) {
   1035             err = -h_errno;
   1036             goto bail2;
   1037         }
   1038 
   1039         addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
   1040         addr.sin_port = htons(remotePort);
   1041     } else if (localAddr != NULL) {
   1042         addr.sin_addr = *localAddr;
   1043         addr.sin_port = htons(port);
   1044     } else {
   1045         addr.sin_addr.s_addr = htonl(INADDR_ANY);
   1046         addr.sin_port = htons(port);
   1047     }
   1048 
   1049     if (mode == kModeCreateRTSPClient
   1050             || mode == kModeCreateTCPDatagramSessionActive) {
   1051         in_addr_t x = ntohl(addr.sin_addr.s_addr);
   1052         ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
   1053               s,
   1054               (x >> 24),
   1055               (x >> 16) & 0xff,
   1056               (x >> 8) & 0xff,
   1057               x & 0xff,
   1058               ntohs(addr.sin_port));
   1059 
   1060         res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
   1061 
   1062         CHECK_LT(res, 0);
   1063         if (errno == EINPROGRESS) {
   1064             res = 0;
   1065         }
   1066     } else {
   1067         res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
   1068 
   1069         if (res == 0) {
   1070             if (mode == kModeCreateRTSPServer
   1071                     || mode == kModeCreateTCPDatagramSessionPassive) {
   1072                 res = listen(s, 4);
   1073             } else {
   1074                 CHECK_EQ(mode, kModeCreateUDPSession);
   1075 
   1076                 if (remoteHost != NULL) {
   1077                     struct sockaddr_in remoteAddr;
   1078                     memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
   1079                     remoteAddr.sin_family = AF_INET;
   1080                     remoteAddr.sin_port = htons(remotePort);
   1081 
   1082                     struct hostent *ent= gethostbyname(remoteHost);
   1083                     if (ent == NULL) {
   1084                         err = -h_errno;
   1085                         goto bail2;
   1086                     }
   1087 
   1088                     remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
   1089 
   1090                     res = connect(
   1091                             s,
   1092                             (const struct sockaddr *)&remoteAddr,
   1093                             sizeof(remoteAddr));
   1094                 }
   1095             }
   1096         }
   1097     }
   1098 
   1099     if (res < 0) {
   1100         err = -errno;
   1101         goto bail2;
   1102     }
   1103 
   1104     Session::State state;
   1105     switch (mode) {
   1106         case kModeCreateRTSPClient:
   1107             state = Session::CONNECTING;
   1108             break;
   1109 
   1110         case kModeCreateTCPDatagramSessionActive:
   1111             state = Session::CONNECTING;
   1112             break;
   1113 
   1114         case kModeCreateTCPDatagramSessionPassive:
   1115             state = Session::LISTENING_TCP_DGRAMS;
   1116             break;
   1117 
   1118         case kModeCreateRTSPServer:
   1119             state = Session::LISTENING_RTSP;
   1120             break;
   1121 
   1122         default:
   1123             CHECK_EQ(mode, kModeCreateUDPSession);
   1124             state = Session::DATAGRAM;
   1125             break;
   1126     }
   1127 
   1128     session = new Session(
   1129             mNextSessionID++,
   1130             state,
   1131             s,
   1132             notify);
   1133 
   1134     if (mode == kModeCreateTCPDatagramSessionActive) {
   1135         session->setMode(Session::MODE_DATAGRAM);
   1136     } else if (mode == kModeCreateRTSPClient) {
   1137         session->setMode(Session::MODE_RTSP);
   1138     }
   1139 
   1140     mSessions.add(session->sessionID(), session);
   1141 
   1142     interrupt();
   1143 
   1144     *sessionID = session->sessionID();
   1145 
   1146     goto bail;
   1147 
   1148 bail2:
   1149     close(s);
   1150     s = -1;
   1151 
   1152 bail:
   1153     return err;
   1154 }
   1155 
   1156 status_t ANetworkSession::connectUDPSession(
   1157         int32_t sessionID, const char *remoteHost, unsigned remotePort) {
   1158     Mutex::Autolock autoLock(mLock);
   1159 
   1160     ssize_t index = mSessions.indexOfKey(sessionID);
   1161 
   1162     if (index < 0) {
   1163         return -ENOENT;
   1164     }
   1165 
   1166     const sp<Session> session = mSessions.valueAt(index);
   1167     int s = session->socket();
   1168 
   1169     struct sockaddr_in remoteAddr;
   1170     memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
   1171     remoteAddr.sin_family = AF_INET;
   1172     remoteAddr.sin_port = htons(remotePort);
   1173 
   1174     status_t err = OK;
   1175     struct hostent *ent = gethostbyname(remoteHost);
   1176     if (ent == NULL) {
   1177         err = -h_errno;
   1178     } else {
   1179         remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
   1180 
   1181         int res = connect(
   1182                 s,
   1183                 (const struct sockaddr *)&remoteAddr,
   1184                 sizeof(remoteAddr));
   1185 
   1186         if (res < 0) {
   1187             err = -errno;
   1188         }
   1189     }
   1190 
   1191     return err;
   1192 }
   1193 
   1194 status_t ANetworkSession::sendRequest(
   1195         int32_t sessionID, const void *data, ssize_t size,
   1196         bool timeValid, int64_t timeUs) {
   1197     Mutex::Autolock autoLock(mLock);
   1198 
   1199     ssize_t index = mSessions.indexOfKey(sessionID);
   1200 
   1201     if (index < 0) {
   1202         return -ENOENT;
   1203     }
   1204 
   1205     const sp<Session> session = mSessions.valueAt(index);
   1206 
   1207     status_t err = session->sendRequest(data, size, timeValid, timeUs);
   1208 
   1209     interrupt();
   1210 
   1211     return err;
   1212 }
   1213 
   1214 status_t ANetworkSession::switchToWebSocketMode(int32_t sessionID) {
   1215     Mutex::Autolock autoLock(mLock);
   1216 
   1217     ssize_t index = mSessions.indexOfKey(sessionID);
   1218 
   1219     if (index < 0) {
   1220         return -ENOENT;
   1221     }
   1222 
   1223     const sp<Session> session = mSessions.valueAt(index);
   1224     return session->switchToWebSocketMode();
   1225 }
   1226 
   1227 void ANetworkSession::interrupt() {
   1228     static const char dummy = 0;
   1229 
   1230     ssize_t n;
   1231     do {
   1232         n = write(mPipeFd[1], &dummy, 1);
   1233     } while (n < 0 && errno == EINTR);
   1234 
   1235     if (n < 0) {
   1236         ALOGW("Error writing to pipe (%s)", strerror(errno));
   1237     }
   1238 }
   1239 
   1240 void ANetworkSession::threadLoop() {
   1241     fd_set rs, ws;
   1242     FD_ZERO(&rs);
   1243     FD_ZERO(&ws);
   1244 
   1245     FD_SET(mPipeFd[0], &rs);
   1246     int maxFd = mPipeFd[0];
   1247 
   1248     {
   1249         Mutex::Autolock autoLock(mLock);
   1250 
   1251         for (size_t i = 0; i < mSessions.size(); ++i) {
   1252             const sp<Session> &session = mSessions.valueAt(i);
   1253 
   1254             int s = session->socket();
   1255 
   1256             if (s < 0) {
   1257                 continue;
   1258             }
   1259 
   1260             if (session->wantsToRead()) {
   1261                 FD_SET(s, &rs);
   1262                 if (s > maxFd) {
   1263                     maxFd = s;
   1264                 }
   1265             }
   1266 
   1267             if (session->wantsToWrite()) {
   1268                 FD_SET(s, &ws);
   1269                 if (s > maxFd) {
   1270                     maxFd = s;
   1271                 }
   1272             }
   1273         }
   1274     }
   1275 
   1276     int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
   1277 
   1278     if (res == 0) {
   1279         return;
   1280     }
   1281 
   1282     if (res < 0) {
   1283         if (errno == EINTR) {
   1284             return;
   1285         }
   1286 
   1287         ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
   1288         return;
   1289     }
   1290 
   1291     if (FD_ISSET(mPipeFd[0], &rs)) {
   1292         char c;
   1293         ssize_t n;
   1294         do {
   1295             n = read(mPipeFd[0], &c, 1);
   1296         } while (n < 0 && errno == EINTR);
   1297 
   1298         if (n < 0) {
   1299             ALOGW("Error reading from pipe (%s)", strerror(errno));
   1300         }
   1301 
   1302         --res;
   1303     }
   1304 
   1305     {
   1306         Mutex::Autolock autoLock(mLock);
   1307 
   1308         List<sp<Session> > sessionsToAdd;
   1309 
   1310         for (size_t i = mSessions.size(); res > 0 && i > 0;) {
   1311             i--;
   1312             const sp<Session> &session = mSessions.valueAt(i);
   1313 
   1314             int s = session->socket();
   1315 
   1316             if (s < 0) {
   1317                 continue;
   1318             }
   1319 
   1320             if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
   1321                 --res;
   1322             }
   1323 
   1324             if (FD_ISSET(s, &rs)) {
   1325                 if (session->isRTSPServer() || session->isTCPDatagramServer()) {
   1326                     struct sockaddr_in remoteAddr;
   1327                     socklen_t remoteAddrLen = sizeof(remoteAddr);
   1328 
   1329                     int clientSocket = accept(
   1330                             s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
   1331 
   1332                     if (clientSocket >= 0) {
   1333                         status_t err = MakeSocketNonBlocking(clientSocket);
   1334 
   1335                         if (err != OK) {
   1336                             ALOGE("Unable to make client socket non blocking, "
   1337                                   "failed w/ error %d (%s)",
   1338                                   err, strerror(-err));
   1339 
   1340                             close(clientSocket);
   1341                             clientSocket = -1;
   1342                         } else {
   1343                             in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
   1344 
   1345                             ALOGI("incoming connection from %d.%d.%d.%d:%d "
   1346                                   "(socket %d)",
   1347                                   (addr >> 24),
   1348                                   (addr >> 16) & 0xff,
   1349                                   (addr >> 8) & 0xff,
   1350                                   addr & 0xff,
   1351                                   ntohs(remoteAddr.sin_port),
   1352                                   clientSocket);
   1353 
   1354                             sp<Session> clientSession =
   1355                                 new Session(
   1356                                         mNextSessionID++,
   1357                                         Session::CONNECTED,
   1358                                         clientSocket,
   1359                                         session->getNotificationMessage());
   1360 
   1361                             clientSession->setMode(
   1362                                     session->isRTSPServer()
   1363                                         ? Session::MODE_RTSP
   1364                                         : Session::MODE_DATAGRAM);
   1365 
   1366                             sessionsToAdd.push_back(clientSession);
   1367                         }
   1368                     } else {
   1369                         ALOGE("accept returned error %d (%s)",
   1370                               errno, strerror(errno));
   1371                     }
   1372                 } else {
   1373                     status_t err = session->readMore();
   1374                     if (err != OK) {
   1375                         ALOGE("readMore on socket %d failed w/ error %d (%s)",
   1376                               s, err, strerror(-err));
   1377                     }
   1378                 }
   1379             }
   1380 
   1381             if (FD_ISSET(s, &ws)) {
   1382                 status_t err = session->writeMore();
   1383                 if (err != OK) {
   1384                     ALOGE("writeMore on socket %d failed w/ error %d (%s)",
   1385                           s, err, strerror(-err));
   1386                 }
   1387             }
   1388         }
   1389 
   1390         while (!sessionsToAdd.empty()) {
   1391             sp<Session> session = *sessionsToAdd.begin();
   1392             sessionsToAdd.erase(sessionsToAdd.begin());
   1393 
   1394             mSessions.add(session->sessionID(), session);
   1395 
   1396             ALOGI("added clientSession %d", session->sessionID());
   1397         }
   1398     }
   1399 }
   1400 
   1401 }  // namespace android
   1402