Home | History | Annotate | Download | only in wifi-display
      1 /*
      2  * Copyright 2012, The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "NetworkSession"
     19 #include <utils/Log.h>
     20 
     21 #include "ANetworkSession.h"
     22 #include "ParsedMessage.h"
     23 
     24 #include <arpa/inet.h>
     25 #include <fcntl.h>
     26 #include <linux/tcp.h>
     27 #include <net/if.h>
     28 #include <netdb.h>
     29 #include <netinet/in.h>
     30 #include <sys/ioctl.h>
     31 #include <sys/socket.h>
     32 
     33 #include <media/stagefright/foundation/ABuffer.h>
     34 #include <media/stagefright/foundation/ADebug.h>
     35 #include <media/stagefright/foundation/AMessage.h>
     36 #include <media/stagefright/foundation/hexdump.h>
     37 #include <media/stagefright/Utils.h>
     38 
     39 namespace android {
     40 
     41 static const size_t kMaxUDPSize = 1500;
     42 static const int32_t kMaxUDPRetries = 200;
     43 
     44 struct ANetworkSession::NetworkThread : public Thread {
     45     NetworkThread(ANetworkSession *session);
     46 
     47 protected:
     48     virtual ~NetworkThread();
     49 
     50 private:
     51     ANetworkSession *mSession;
     52 
     53     virtual bool threadLoop();
     54 
     55     DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
     56 };
     57 
     58 struct ANetworkSession::Session : public RefBase {
     59     enum State {
     60         CONNECTING,
     61         CONNECTED,
     62         LISTENING_RTSP,
     63         LISTENING_TCP_DGRAMS,
     64         DATAGRAM,
     65     };
     66 
     67     Session(int32_t sessionID,
     68             State state,
     69             int s,
     70             const sp<AMessage> &notify);
     71 
     72     int32_t sessionID() const;
     73     int socket() const;
     74     sp<AMessage> getNotificationMessage() const;
     75 
     76     bool isRTSPServer() const;
     77     bool isTCPDatagramServer() const;
     78 
     79     bool wantsToRead();
     80     bool wantsToWrite();
     81 
     82     status_t readMore();
     83     status_t writeMore();
     84 
     85     status_t sendRequest(
     86             const void *data, ssize_t size, bool timeValid, int64_t timeUs);
     87 
     88     void setIsRTSPConnection(bool yesno);
     89 
     90 protected:
     91     virtual ~Session();
     92 
     93 private:
     94     enum {
     95         FRAGMENT_FLAG_TIME_VALID = 1,
     96     };
     97     struct Fragment {
     98         uint32_t mFlags;
     99         int64_t mTimeUs;
    100         sp<ABuffer> mBuffer;
    101     };
    102 
    103     int32_t mSessionID;
    104     State mState;
    105     bool mIsRTSPConnection;
    106     int mSocket;
    107     sp<AMessage> mNotify;
    108     bool mSawReceiveFailure, mSawSendFailure;
    109     int32_t mUDPRetries;
    110 
    111     List<Fragment> mOutFragments;
    112 
    113     AString mInBuffer;
    114 
    115     int64_t mLastStallReportUs;
    116 
    117     void notifyError(bool send, status_t err, const char *detail);
    118     void notify(NotificationReason reason);
    119 
    120     void dumpFragmentStats(const Fragment &frag);
    121 
    122     DISALLOW_EVIL_CONSTRUCTORS(Session);
    123 };
    124 ////////////////////////////////////////////////////////////////////////////////
    125 
    126 ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
    127     : mSession(session) {
    128 }
    129 
    130 ANetworkSession::NetworkThread::~NetworkThread() {
    131 }
    132 
    133 bool ANetworkSession::NetworkThread::threadLoop() {
    134     mSession->threadLoop();
    135 
    136     return true;
    137 }
    138 
    139 ////////////////////////////////////////////////////////////////////////////////
    140 
    141 ANetworkSession::Session::Session(
    142         int32_t sessionID,
    143         State state,
    144         int s,
    145         const sp<AMessage> &notify)
    146     : mSessionID(sessionID),
    147       mState(state),
    148       mIsRTSPConnection(false),
    149       mSocket(s),
    150       mNotify(notify),
    151       mSawReceiveFailure(false),
    152       mSawSendFailure(false),
    153       mUDPRetries(kMaxUDPRetries),
    154       mLastStallReportUs(-1ll) {
    155     if (mState == CONNECTED) {
    156         struct sockaddr_in localAddr;
    157         socklen_t localAddrLen = sizeof(localAddr);
    158 
    159         int res = getsockname(
    160                 mSocket, (struct sockaddr *)&localAddr, &localAddrLen);
    161         CHECK_GE(res, 0);
    162 
    163         struct sockaddr_in remoteAddr;
    164         socklen_t remoteAddrLen = sizeof(remoteAddr);
    165 
    166         res = getpeername(
    167                 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
    168         CHECK_GE(res, 0);
    169 
    170         in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
    171         AString localAddrString = StringPrintf(
    172                 "%d.%d.%d.%d",
    173                 (addr >> 24),
    174                 (addr >> 16) & 0xff,
    175                 (addr >> 8) & 0xff,
    176                 addr & 0xff);
    177 
    178         addr = ntohl(remoteAddr.sin_addr.s_addr);
    179         AString remoteAddrString = StringPrintf(
    180                 "%d.%d.%d.%d",
    181                 (addr >> 24),
    182                 (addr >> 16) & 0xff,
    183                 (addr >> 8) & 0xff,
    184                 addr & 0xff);
    185 
    186         sp<AMessage> msg = mNotify->dup();
    187         msg->setInt32("sessionID", mSessionID);
    188         msg->setInt32("reason", kWhatClientConnected);
    189         msg->setString("server-ip", localAddrString.c_str());
    190         msg->setInt32("server-port", ntohs(localAddr.sin_port));
    191         msg->setString("client-ip", remoteAddrString.c_str());
    192         msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
    193         msg->post();
    194     }
    195 }
    196 
    197 ANetworkSession::Session::~Session() {
    198     ALOGV("Session %d gone", mSessionID);
    199 
    200     close(mSocket);
    201     mSocket = -1;
    202 }
    203 
    204 int32_t ANetworkSession::Session::sessionID() const {
    205     return mSessionID;
    206 }
    207 
    208 int ANetworkSession::Session::socket() const {
    209     return mSocket;
    210 }
    211 
    212 void ANetworkSession::Session::setIsRTSPConnection(bool yesno) {
    213     mIsRTSPConnection = yesno;
    214 }
    215 
    216 sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
    217     return mNotify;
    218 }
    219 
    220 bool ANetworkSession::Session::isRTSPServer() const {
    221     return mState == LISTENING_RTSP;
    222 }
    223 
    224 bool ANetworkSession::Session::isTCPDatagramServer() const {
    225     return mState == LISTENING_TCP_DGRAMS;
    226 }
    227 
    228 bool ANetworkSession::Session::wantsToRead() {
    229     return !mSawReceiveFailure && mState != CONNECTING;
    230 }
    231 
    232 bool ANetworkSession::Session::wantsToWrite() {
    233     return !mSawSendFailure
    234         && (mState == CONNECTING
    235             || (mState == CONNECTED && !mOutFragments.empty())
    236             || (mState == DATAGRAM && !mOutFragments.empty()));
    237 }
    238 
    239 status_t ANetworkSession::Session::readMore() {
    240     if (mState == DATAGRAM) {
    241         status_t err;
    242         do {
    243             sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
    244 
    245             struct sockaddr_in remoteAddr;
    246             socklen_t remoteAddrLen = sizeof(remoteAddr);
    247 
    248             ssize_t n;
    249             do {
    250                 n = recvfrom(
    251                         mSocket, buf->data(), buf->capacity(), 0,
    252                         (struct sockaddr *)&remoteAddr, &remoteAddrLen);
    253             } while (n < 0 && errno == EINTR);
    254 
    255             err = OK;
    256             if (n < 0) {
    257                 err = -errno;
    258             } else if (n == 0) {
    259                 err = -ECONNRESET;
    260             } else {
    261                 buf->setRange(0, n);
    262 
    263                 int64_t nowUs = ALooper::GetNowUs();
    264                 buf->meta()->setInt64("arrivalTimeUs", nowUs);
    265 
    266                 sp<AMessage> notify = mNotify->dup();
    267                 notify->setInt32("sessionID", mSessionID);
    268                 notify->setInt32("reason", kWhatDatagram);
    269 
    270                 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
    271                 notify->setString(
    272                         "fromAddr",
    273                         StringPrintf(
    274                             "%u.%u.%u.%u",
    275                             ip >> 24,
    276                             (ip >> 16) & 0xff,
    277                             (ip >> 8) & 0xff,
    278                             ip & 0xff).c_str());
    279 
    280                 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
    281 
    282                 notify->setBuffer("data", buf);
    283                 notify->post();
    284             }
    285         } while (err == OK);
    286 
    287         if (err == -EAGAIN) {
    288             err = OK;
    289         }
    290 
    291         if (err != OK) {
    292             if (!mUDPRetries) {
    293                 notifyError(false /* send */, err, "Recvfrom failed.");
    294                 mSawReceiveFailure = true;
    295             } else {
    296                 mUDPRetries--;
    297                 ALOGE("Recvfrom failed, %d/%d retries left",
    298                         mUDPRetries, kMaxUDPRetries);
    299                 err = OK;
    300             }
    301         } else {
    302             mUDPRetries = kMaxUDPRetries;
    303         }
    304 
    305         return err;
    306     }
    307 
    308     char tmp[512];
    309     ssize_t n;
    310     do {
    311         n = recv(mSocket, tmp, sizeof(tmp), 0);
    312     } while (n < 0 && errno == EINTR);
    313 
    314     status_t err = OK;
    315 
    316     if (n > 0) {
    317         mInBuffer.append(tmp, n);
    318 
    319 #if 0
    320         ALOGI("in:");
    321         hexdump(tmp, n);
    322 #endif
    323     } else if (n < 0) {
    324         err = -errno;
    325     } else {
    326         err = -ECONNRESET;
    327     }
    328 
    329     if (!mIsRTSPConnection) {
    330         // TCP stream carrying 16-bit length-prefixed datagrams.
    331 
    332         while (mInBuffer.size() >= 2) {
    333             size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
    334 
    335             if (mInBuffer.size() < packetSize + 2) {
    336                 break;
    337             }
    338 
    339             sp<ABuffer> packet = new ABuffer(packetSize);
    340             memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
    341 
    342             int64_t nowUs = ALooper::GetNowUs();
    343             packet->meta()->setInt64("arrivalTimeUs", nowUs);
    344 
    345             sp<AMessage> notify = mNotify->dup();
    346             notify->setInt32("sessionID", mSessionID);
    347             notify->setInt32("reason", kWhatDatagram);
    348             notify->setBuffer("data", packet);
    349             notify->post();
    350 
    351             mInBuffer.erase(0, packetSize + 2);
    352         }
    353     } else {
    354         for (;;) {
    355             size_t length;
    356 
    357             if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
    358                 if (mInBuffer.size() < 4) {
    359                     break;
    360                 }
    361 
    362                 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
    363 
    364                 if (mInBuffer.size() < 4 + length) {
    365                     break;
    366                 }
    367 
    368                 sp<AMessage> notify = mNotify->dup();
    369                 notify->setInt32("sessionID", mSessionID);
    370                 notify->setInt32("reason", kWhatBinaryData);
    371                 notify->setInt32("channel", mInBuffer.c_str()[1]);
    372 
    373                 sp<ABuffer> data = new ABuffer(length);
    374                 memcpy(data->data(), mInBuffer.c_str() + 4, length);
    375 
    376                 int64_t nowUs = ALooper::GetNowUs();
    377                 data->meta()->setInt64("arrivalTimeUs", nowUs);
    378 
    379                 notify->setBuffer("data", data);
    380                 notify->post();
    381 
    382                 mInBuffer.erase(0, 4 + length);
    383                 continue;
    384             }
    385 
    386             sp<ParsedMessage> msg =
    387                 ParsedMessage::Parse(
    388                         mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
    389 
    390             if (msg == NULL) {
    391                 break;
    392             }
    393 
    394             sp<AMessage> notify = mNotify->dup();
    395             notify->setInt32("sessionID", mSessionID);
    396             notify->setInt32("reason", kWhatData);
    397             notify->setObject("data", msg);
    398             notify->post();
    399 
    400 #if 1
    401             // XXX The (old) dongle sends the wrong content length header on a
    402             // SET_PARAMETER request that signals a "wfd_idr_request".
    403             // (17 instead of 19).
    404             const char *content = msg->getContent();
    405             if (content
    406                     && !memcmp(content, "wfd_idr_request\r\n", 17)
    407                     && length >= 19
    408                     && mInBuffer.c_str()[length] == '\r'
    409                     && mInBuffer.c_str()[length + 1] == '\n') {
    410                 length += 2;
    411             }
    412 #endif
    413 
    414             mInBuffer.erase(0, length);
    415 
    416             if (err != OK) {
    417                 break;
    418             }
    419         }
    420     }
    421 
    422     if (err != OK) {
    423         notifyError(false /* send */, err, "Recv failed.");
    424         mSawReceiveFailure = true;
    425     }
    426 
    427     return err;
    428 }
    429 
    430 void ANetworkSession::Session::dumpFragmentStats(const Fragment &frag) {
    431 #if 0
    432     int64_t nowUs = ALooper::GetNowUs();
    433     int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;
    434 
    435     static const int64_t kMinDelayMs = 0;
    436     static const int64_t kMaxDelayMs = 300;
    437 
    438     const char *kPattern = "########################################";
    439     size_t kPatternSize = strlen(kPattern);
    440 
    441     int n = (kPatternSize * (delayMs - kMinDelayMs))
    442                 / (kMaxDelayMs - kMinDelayMs);
    443 
    444     if (n < 0) {
    445         n = 0;
    446     } else if ((size_t)n > kPatternSize) {
    447         n = kPatternSize;
    448     }
    449 
    450     ALOGI("[%lld]: (%4lld ms) %s\n",
    451           frag.mTimeUs / 1000,
    452           delayMs,
    453           kPattern + kPatternSize - n);
    454 #endif
    455 }
    456 
    457 status_t ANetworkSession::Session::writeMore() {
    458     if (mState == DATAGRAM) {
    459         CHECK(!mOutFragments.empty());
    460 
    461         status_t err;
    462         do {
    463             const Fragment &frag = *mOutFragments.begin();
    464             const sp<ABuffer> &datagram = frag.mBuffer;
    465 
    466             int n;
    467             do {
    468                 n = send(mSocket, datagram->data(), datagram->size(), 0);
    469             } while (n < 0 && errno == EINTR);
    470 
    471             err = OK;
    472 
    473             if (n > 0) {
    474                 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
    475                     dumpFragmentStats(frag);
    476                 }
    477 
    478                 mOutFragments.erase(mOutFragments.begin());
    479             } else if (n < 0) {
    480                 err = -errno;
    481             } else if (n == 0) {
    482                 err = -ECONNRESET;
    483             }
    484         } while (err == OK && !mOutFragments.empty());
    485 
    486         if (err == -EAGAIN) {
    487             if (!mOutFragments.empty()) {
    488                 ALOGI("%d datagrams remain queued.", mOutFragments.size());
    489             }
    490             err = OK;
    491         }
    492 
    493         if (err != OK) {
    494             if (!mUDPRetries) {
    495                 notifyError(true /* send */, err, "Send datagram failed.");
    496                 mSawSendFailure = true;
    497             } else {
    498                 mUDPRetries--;
    499                 ALOGE("Send datagram failed, %d/%d retries left",
    500                         mUDPRetries, kMaxUDPRetries);
    501                 err = OK;
    502             }
    503         } else {
    504             mUDPRetries = kMaxUDPRetries;
    505         }
    506 
    507         return err;
    508     }
    509 
    510     if (mState == CONNECTING) {
    511         int err;
    512         socklen_t optionLen = sizeof(err);
    513         CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
    514         CHECK_EQ(optionLen, (socklen_t)sizeof(err));
    515 
    516         if (err != 0) {
    517             notifyError(kWhatError, -err, "Connection failed");
    518             mSawSendFailure = true;
    519 
    520             return -err;
    521         }
    522 
    523         mState = CONNECTED;
    524         notify(kWhatConnected);
    525 
    526         return OK;
    527     }
    528 
    529     CHECK_EQ(mState, CONNECTED);
    530     CHECK(!mOutFragments.empty());
    531 
    532     ssize_t n;
    533     while (!mOutFragments.empty()) {
    534         const Fragment &frag = *mOutFragments.begin();
    535 
    536         do {
    537             n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
    538         } while (n < 0 && errno == EINTR);
    539 
    540         if (n <= 0) {
    541             break;
    542         }
    543 
    544         frag.mBuffer->setRange(
    545                 frag.mBuffer->offset() + n, frag.mBuffer->size() - n);
    546 
    547         if (frag.mBuffer->size() > 0) {
    548             break;
    549         }
    550 
    551         if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
    552             dumpFragmentStats(frag);
    553         }
    554 
    555         mOutFragments.erase(mOutFragments.begin());
    556     }
    557 
    558     status_t err = OK;
    559 
    560     if (n < 0) {
    561         err = -errno;
    562     } else if (n == 0) {
    563         err = -ECONNRESET;
    564     }
    565 
    566     if (err != OK) {
    567         notifyError(true /* send */, err, "Send failed.");
    568         mSawSendFailure = true;
    569     }
    570 
    571 #if 0
    572     int numBytesQueued;
    573     int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
    574     if (res == 0 && numBytesQueued > 50 * 1024) {
    575         if (numBytesQueued > 409600) {
    576             ALOGW("!!! numBytesQueued = %d", numBytesQueued);
    577         }
    578 
    579         int64_t nowUs = ALooper::GetNowUs();
    580 
    581         if (mLastStallReportUs < 0ll
    582                 || nowUs > mLastStallReportUs + 100000ll) {
    583             sp<AMessage> msg = mNotify->dup();
    584             msg->setInt32("sessionID", mSessionID);
    585             msg->setInt32("reason", kWhatNetworkStall);
    586             msg->setSize("numBytesQueued", numBytesQueued);
    587             msg->post();
    588 
    589             mLastStallReportUs = nowUs;
    590         }
    591     }
    592 #endif
    593 
    594     return err;
    595 }
    596 
    597 status_t ANetworkSession::Session::sendRequest(
    598         const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
    599     CHECK(mState == CONNECTED || mState == DATAGRAM);
    600 
    601     if (size < 0) {
    602         size = strlen((const char *)data);
    603     }
    604 
    605     if (size == 0) {
    606         return OK;
    607     }
    608 
    609     sp<ABuffer> buffer;
    610 
    611     if (mState == CONNECTED && !mIsRTSPConnection) {
    612         CHECK_LE(size, 65535);
    613 
    614         buffer = new ABuffer(size + 2);
    615         buffer->data()[0] = size >> 8;
    616         buffer->data()[1] = size & 0xff;
    617         memcpy(buffer->data() + 2, data, size);
    618     } else {
    619         buffer = new ABuffer(size);
    620         memcpy(buffer->data(), data, size);
    621     }
    622 
    623     Fragment frag;
    624 
    625     frag.mFlags = 0;
    626     if (timeValid) {
    627         frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
    628         frag.mTimeUs = timeUs;
    629     }
    630 
    631     frag.mBuffer = buffer;
    632 
    633     mOutFragments.push_back(frag);
    634 
    635     return OK;
    636 }
    637 
    638 void ANetworkSession::Session::notifyError(
    639         bool send, status_t err, const char *detail) {
    640     sp<AMessage> msg = mNotify->dup();
    641     msg->setInt32("sessionID", mSessionID);
    642     msg->setInt32("reason", kWhatError);
    643     msg->setInt32("send", send);
    644     msg->setInt32("err", err);
    645     msg->setString("detail", detail);
    646     msg->post();
    647 }
    648 
    649 void ANetworkSession::Session::notify(NotificationReason reason) {
    650     sp<AMessage> msg = mNotify->dup();
    651     msg->setInt32("sessionID", mSessionID);
    652     msg->setInt32("reason", reason);
    653     msg->post();
    654 }
    655 
    656 ////////////////////////////////////////////////////////////////////////////////
    657 
    658 ANetworkSession::ANetworkSession()
    659     : mNextSessionID(1) {
    660     mPipeFd[0] = mPipeFd[1] = -1;
    661 }
    662 
    663 ANetworkSession::~ANetworkSession() {
    664     stop();
    665 }
    666 
    667 status_t ANetworkSession::start() {
    668     if (mThread != NULL) {
    669         return INVALID_OPERATION;
    670     }
    671 
    672     int res = pipe(mPipeFd);
    673     if (res != 0) {
    674         mPipeFd[0] = mPipeFd[1] = -1;
    675         return -errno;
    676     }
    677 
    678     mThread = new NetworkThread(this);
    679 
    680     status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
    681 
    682     if (err != OK) {
    683         mThread.clear();
    684 
    685         close(mPipeFd[0]);
    686         close(mPipeFd[1]);
    687         mPipeFd[0] = mPipeFd[1] = -1;
    688 
    689         return err;
    690     }
    691 
    692     return OK;
    693 }
    694 
    695 status_t ANetworkSession::stop() {
    696     if (mThread == NULL) {
    697         return INVALID_OPERATION;
    698     }
    699 
    700     mThread->requestExit();
    701     interrupt();
    702     mThread->requestExitAndWait();
    703 
    704     mThread.clear();
    705 
    706     close(mPipeFd[0]);
    707     close(mPipeFd[1]);
    708     mPipeFd[0] = mPipeFd[1] = -1;
    709 
    710     return OK;
    711 }
    712 
    713 status_t ANetworkSession::createRTSPClient(
    714         const char *host, unsigned port, const sp<AMessage> &notify,
    715         int32_t *sessionID) {
    716     return createClientOrServer(
    717             kModeCreateRTSPClient,
    718             NULL /* addr */,
    719             0 /* port */,
    720             host,
    721             port,
    722             notify,
    723             sessionID);
    724 }
    725 
    726 status_t ANetworkSession::createRTSPServer(
    727         const struct in_addr &addr, unsigned port,
    728         const sp<AMessage> &notify, int32_t *sessionID) {
    729     return createClientOrServer(
    730             kModeCreateRTSPServer,
    731             &addr,
    732             port,
    733             NULL /* remoteHost */,
    734             0 /* remotePort */,
    735             notify,
    736             sessionID);
    737 }
    738 
    739 status_t ANetworkSession::createUDPSession(
    740         unsigned localPort, const sp<AMessage> &notify, int32_t *sessionID) {
    741     return createUDPSession(localPort, NULL, 0, notify, sessionID);
    742 }
    743 
    744 status_t ANetworkSession::createUDPSession(
    745         unsigned localPort,
    746         const char *remoteHost,
    747         unsigned remotePort,
    748         const sp<AMessage> &notify,
    749         int32_t *sessionID) {
    750     return createClientOrServer(
    751             kModeCreateUDPSession,
    752             NULL /* addr */,
    753             localPort,
    754             remoteHost,
    755             remotePort,
    756             notify,
    757             sessionID);
    758 }
    759 
    760 status_t ANetworkSession::createTCPDatagramSession(
    761         const struct in_addr &addr, unsigned port,
    762         const sp<AMessage> &notify, int32_t *sessionID) {
    763     return createClientOrServer(
    764             kModeCreateTCPDatagramSessionPassive,
    765             &addr,
    766             port,
    767             NULL /* remoteHost */,
    768             0 /* remotePort */,
    769             notify,
    770             sessionID);
    771 }
    772 
    773 status_t ANetworkSession::createTCPDatagramSession(
    774         unsigned localPort,
    775         const char *remoteHost,
    776         unsigned remotePort,
    777         const sp<AMessage> &notify,
    778         int32_t *sessionID) {
    779     return createClientOrServer(
    780             kModeCreateTCPDatagramSessionActive,
    781             NULL /* addr */,
    782             localPort,
    783             remoteHost,
    784             remotePort,
    785             notify,
    786             sessionID);
    787 }
    788 
    789 status_t ANetworkSession::destroySession(int32_t sessionID) {
    790     Mutex::Autolock autoLock(mLock);
    791 
    792     ssize_t index = mSessions.indexOfKey(sessionID);
    793 
    794     if (index < 0) {
    795         return -ENOENT;
    796     }
    797 
    798     mSessions.removeItemsAt(index);
    799 
    800     interrupt();
    801 
    802     return OK;
    803 }
    804 
    805 // static
    806 status_t ANetworkSession::MakeSocketNonBlocking(int s) {
    807     int flags = fcntl(s, F_GETFL, 0);
    808     if (flags < 0) {
    809         flags = 0;
    810     }
    811 
    812     int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
    813     if (res < 0) {
    814         return -errno;
    815     }
    816 
    817     return OK;
    818 }
    819 
    820 status_t ANetworkSession::createClientOrServer(
    821         Mode mode,
    822         const struct in_addr *localAddr,
    823         unsigned port,
    824         const char *remoteHost,
    825         unsigned remotePort,
    826         const sp<AMessage> &notify,
    827         int32_t *sessionID) {
    828     Mutex::Autolock autoLock(mLock);
    829 
    830     *sessionID = 0;
    831     status_t err = OK;
    832     int s, res;
    833     sp<Session> session;
    834 
    835     s = socket(
    836             AF_INET,
    837             (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
    838             0);
    839 
    840     if (s < 0) {
    841         err = -errno;
    842         goto bail;
    843     }
    844 
    845     if (mode == kModeCreateRTSPServer
    846             || mode == kModeCreateTCPDatagramSessionPassive) {
    847         const int yes = 1;
    848         res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
    849 
    850         if (res < 0) {
    851             err = -errno;
    852             goto bail2;
    853         }
    854     }
    855 
    856     if (mode == kModeCreateUDPSession) {
    857         int size = 256 * 1024;
    858 
    859         res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
    860 
    861         if (res < 0) {
    862             err = -errno;
    863             goto bail2;
    864         }
    865 
    866         res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
    867 
    868         if (res < 0) {
    869             err = -errno;
    870             goto bail2;
    871         }
    872     } else if (mode == kModeCreateTCPDatagramSessionActive) {
    873         int flag = 1;
    874         res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
    875 
    876         if (res < 0) {
    877             err = -errno;
    878             goto bail2;
    879         }
    880 
    881         int tos = 224;  // VOICE
    882         res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos));
    883 
    884         if (res < 0) {
    885             err = -errno;
    886             goto bail2;
    887         }
    888     }
    889 
    890     err = MakeSocketNonBlocking(s);
    891 
    892     if (err != OK) {
    893         goto bail2;
    894     }
    895 
    896     struct sockaddr_in addr;
    897     memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    898     addr.sin_family = AF_INET;
    899 
    900     if (mode == kModeCreateRTSPClient
    901             || mode == kModeCreateTCPDatagramSessionActive) {
    902         struct hostent *ent= gethostbyname(remoteHost);
    903         if (ent == NULL) {
    904             err = -h_errno;
    905             goto bail2;
    906         }
    907 
    908         addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    909         addr.sin_port = htons(remotePort);
    910     } else if (localAddr != NULL) {
    911         addr.sin_addr = *localAddr;
    912         addr.sin_port = htons(port);
    913     } else {
    914         addr.sin_addr.s_addr = htonl(INADDR_ANY);
    915         addr.sin_port = htons(port);
    916     }
    917 
    918     if (mode == kModeCreateRTSPClient
    919             || mode == kModeCreateTCPDatagramSessionActive) {
    920         in_addr_t x = ntohl(addr.sin_addr.s_addr);
    921         ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
    922               s,
    923               (x >> 24),
    924               (x >> 16) & 0xff,
    925               (x >> 8) & 0xff,
    926               x & 0xff,
    927               ntohs(addr.sin_port));
    928 
    929         res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
    930 
    931         CHECK_LT(res, 0);
    932         if (errno == EINPROGRESS) {
    933             res = 0;
    934         }
    935     } else {
    936         res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
    937 
    938         if (res == 0) {
    939             if (mode == kModeCreateRTSPServer
    940                     || mode == kModeCreateTCPDatagramSessionPassive) {
    941                 res = listen(s, 4);
    942             } else {
    943                 CHECK_EQ(mode, kModeCreateUDPSession);
    944 
    945                 if (remoteHost != NULL) {
    946                     struct sockaddr_in remoteAddr;
    947                     memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
    948                     remoteAddr.sin_family = AF_INET;
    949                     remoteAddr.sin_port = htons(remotePort);
    950 
    951                     struct hostent *ent= gethostbyname(remoteHost);
    952                     if (ent == NULL) {
    953                         err = -h_errno;
    954                         goto bail2;
    955                     }
    956 
    957                     remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    958 
    959                     res = connect(
    960                             s,
    961                             (const struct sockaddr *)&remoteAddr,
    962                             sizeof(remoteAddr));
    963                 }
    964             }
    965         }
    966     }
    967 
    968     if (res < 0) {
    969         err = -errno;
    970         goto bail2;
    971     }
    972 
    973     Session::State state;
    974     switch (mode) {
    975         case kModeCreateRTSPClient:
    976             state = Session::CONNECTING;
    977             break;
    978 
    979         case kModeCreateTCPDatagramSessionActive:
    980             state = Session::CONNECTING;
    981             break;
    982 
    983         case kModeCreateTCPDatagramSessionPassive:
    984             state = Session::LISTENING_TCP_DGRAMS;
    985             break;
    986 
    987         case kModeCreateRTSPServer:
    988             state = Session::LISTENING_RTSP;
    989             break;
    990 
    991         default:
    992             CHECK_EQ(mode, kModeCreateUDPSession);
    993             state = Session::DATAGRAM;
    994             break;
    995     }
    996 
    997     session = new Session(
    998             mNextSessionID++,
    999             state,
   1000             s,
   1001             notify);
   1002 
   1003     if (mode == kModeCreateTCPDatagramSessionActive) {
   1004         session->setIsRTSPConnection(false);
   1005     } else if (mode == kModeCreateRTSPClient) {
   1006         session->setIsRTSPConnection(true);
   1007     }
   1008 
   1009     mSessions.add(session->sessionID(), session);
   1010 
   1011     interrupt();
   1012 
   1013     *sessionID = session->sessionID();
   1014 
   1015     goto bail;
   1016 
   1017 bail2:
   1018     close(s);
   1019     s = -1;
   1020 
   1021 bail:
   1022     return err;
   1023 }
   1024 
   1025 status_t ANetworkSession::connectUDPSession(
   1026         int32_t sessionID, const char *remoteHost, unsigned remotePort) {
   1027     Mutex::Autolock autoLock(mLock);
   1028 
   1029     ssize_t index = mSessions.indexOfKey(sessionID);
   1030 
   1031     if (index < 0) {
   1032         return -ENOENT;
   1033     }
   1034 
   1035     const sp<Session> session = mSessions.valueAt(index);
   1036     int s = session->socket();
   1037 
   1038     struct sockaddr_in remoteAddr;
   1039     memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
   1040     remoteAddr.sin_family = AF_INET;
   1041     remoteAddr.sin_port = htons(remotePort);
   1042 
   1043     status_t err = OK;
   1044     struct hostent *ent = gethostbyname(remoteHost);
   1045     if (ent == NULL) {
   1046         err = -h_errno;
   1047     } else {
   1048         remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
   1049 
   1050         int res = connect(
   1051                 s,
   1052                 (const struct sockaddr *)&remoteAddr,
   1053                 sizeof(remoteAddr));
   1054 
   1055         if (res < 0) {
   1056             err = -errno;
   1057         }
   1058     }
   1059 
   1060     return err;
   1061 }
   1062 
   1063 status_t ANetworkSession::sendRequest(
   1064         int32_t sessionID, const void *data, ssize_t size,
   1065         bool timeValid, int64_t timeUs) {
   1066     Mutex::Autolock autoLock(mLock);
   1067 
   1068     ssize_t index = mSessions.indexOfKey(sessionID);
   1069 
   1070     if (index < 0) {
   1071         return -ENOENT;
   1072     }
   1073 
   1074     const sp<Session> session = mSessions.valueAt(index);
   1075 
   1076     status_t err = session->sendRequest(data, size, timeValid, timeUs);
   1077 
   1078     interrupt();
   1079 
   1080     return err;
   1081 }
   1082 
   1083 void ANetworkSession::interrupt() {
   1084     static const char dummy = 0;
   1085 
   1086     ssize_t n;
   1087     do {
   1088         n = write(mPipeFd[1], &dummy, 1);
   1089     } while (n < 0 && errno == EINTR);
   1090 
   1091     if (n < 0) {
   1092         ALOGW("Error writing to pipe (%s)", strerror(errno));
   1093     }
   1094 }
   1095 
   1096 void ANetworkSession::threadLoop() {
   1097     fd_set rs, ws;
   1098     FD_ZERO(&rs);
   1099     FD_ZERO(&ws);
   1100 
   1101     FD_SET(mPipeFd[0], &rs);
   1102     int maxFd = mPipeFd[0];
   1103 
   1104     {
   1105         Mutex::Autolock autoLock(mLock);
   1106 
   1107         for (size_t i = 0; i < mSessions.size(); ++i) {
   1108             const sp<Session> &session = mSessions.valueAt(i);
   1109 
   1110             int s = session->socket();
   1111 
   1112             if (s < 0) {
   1113                 continue;
   1114             }
   1115 
   1116             if (session->wantsToRead()) {
   1117                 FD_SET(s, &rs);
   1118                 if (s > maxFd) {
   1119                     maxFd = s;
   1120                 }
   1121             }
   1122 
   1123             if (session->wantsToWrite()) {
   1124                 FD_SET(s, &ws);
   1125                 if (s > maxFd) {
   1126                     maxFd = s;
   1127                 }
   1128             }
   1129         }
   1130     }
   1131 
   1132     int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
   1133 
   1134     if (res == 0) {
   1135         return;
   1136     }
   1137 
   1138     if (res < 0) {
   1139         if (errno == EINTR) {
   1140             return;
   1141         }
   1142 
   1143         ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
   1144         return;
   1145     }
   1146 
   1147     if (FD_ISSET(mPipeFd[0], &rs)) {
   1148         char c;
   1149         ssize_t n;
   1150         do {
   1151             n = read(mPipeFd[0], &c, 1);
   1152         } while (n < 0 && errno == EINTR);
   1153 
   1154         if (n < 0) {
   1155             ALOGW("Error reading from pipe (%s)", strerror(errno));
   1156         }
   1157 
   1158         --res;
   1159     }
   1160 
   1161     {
   1162         Mutex::Autolock autoLock(mLock);
   1163 
   1164         List<sp<Session> > sessionsToAdd;
   1165 
   1166         for (size_t i = mSessions.size(); res > 0 && i-- > 0;) {
   1167             const sp<Session> &session = mSessions.valueAt(i);
   1168 
   1169             int s = session->socket();
   1170 
   1171             if (s < 0) {
   1172                 continue;
   1173             }
   1174 
   1175             if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
   1176                 --res;
   1177             }
   1178 
   1179             if (FD_ISSET(s, &rs)) {
   1180                 if (session->isRTSPServer() || session->isTCPDatagramServer()) {
   1181                     struct sockaddr_in remoteAddr;
   1182                     socklen_t remoteAddrLen = sizeof(remoteAddr);
   1183 
   1184                     int clientSocket = accept(
   1185                             s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
   1186 
   1187                     if (clientSocket >= 0) {
   1188                         status_t err = MakeSocketNonBlocking(clientSocket);
   1189 
   1190                         if (err != OK) {
   1191                             ALOGE("Unable to make client socket non blocking, "
   1192                                   "failed w/ error %d (%s)",
   1193                                   err, strerror(-err));
   1194 
   1195                             close(clientSocket);
   1196                             clientSocket = -1;
   1197                         } else {
   1198                             in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
   1199 
   1200                             ALOGI("incoming connection from %d.%d.%d.%d:%d "
   1201                                   "(socket %d)",
   1202                                   (addr >> 24),
   1203                                   (addr >> 16) & 0xff,
   1204                                   (addr >> 8) & 0xff,
   1205                                   addr & 0xff,
   1206                                   ntohs(remoteAddr.sin_port),
   1207                                   clientSocket);
   1208 
   1209                             sp<Session> clientSession =
   1210                                 new Session(
   1211                                         mNextSessionID++,
   1212                                         Session::CONNECTED,
   1213                                         clientSocket,
   1214                                         session->getNotificationMessage());
   1215 
   1216                             clientSession->setIsRTSPConnection(
   1217                                     session->isRTSPServer());
   1218 
   1219                             sessionsToAdd.push_back(clientSession);
   1220                         }
   1221                     } else {
   1222                         ALOGE("accept returned error %d (%s)",
   1223                               errno, strerror(errno));
   1224                     }
   1225                 } else {
   1226                     status_t err = session->readMore();
   1227                     if (err != OK) {
   1228                         ALOGE("readMore on socket %d failed w/ error %d (%s)",
   1229                               s, err, strerror(-err));
   1230                     }
   1231                 }
   1232             }
   1233 
   1234             if (FD_ISSET(s, &ws)) {
   1235                 status_t err = session->writeMore();
   1236                 if (err != OK) {
   1237                     ALOGE("writeMore on socket %d failed w/ error %d (%s)",
   1238                           s, err, strerror(-err));
   1239                 }
   1240             }
   1241         }
   1242 
   1243         while (!sessionsToAdd.empty()) {
   1244             sp<Session> session = *sessionsToAdd.begin();
   1245             sessionsToAdd.erase(sessionsToAdd.begin());
   1246 
   1247             mSessions.add(session->sessionID(), session);
   1248 
   1249             ALOGI("added clientSession %d", session->sessionID());
   1250         }
   1251     }
   1252 }
   1253 
   1254 }  // namespace android
   1255 
   1256