Home | History | Annotate | Download | only in wifi-display
      1 /*
      2  * Copyright 2012, The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "NetworkSession"
     19 #include <utils/Log.h>
     20 
     21 #include "ANetworkSession.h"
     22 #include "ParsedMessage.h"
     23 
     24 #include <arpa/inet.h>
     25 #include <fcntl.h>
     26 #include <net/if.h>
     27 #include <netdb.h>
     28 #include <netinet/in.h>
     29 #include <sys/socket.h>
     30 
     31 #include <media/stagefright/foundation/ABuffer.h>
     32 #include <media/stagefright/foundation/ADebug.h>
     33 #include <media/stagefright/foundation/AMessage.h>
     34 #include <media/stagefright/foundation/hexdump.h>
     35 #include <media/stagefright/Utils.h>
     36 
     37 namespace android {
     38 
     39 static const size_t kMaxUDPSize = 1500;
     40 
     41 struct ANetworkSession::NetworkThread : public Thread {
     42     NetworkThread(ANetworkSession *session);
     43 
     44 protected:
     45     virtual ~NetworkThread();
     46 
     47 private:
     48     ANetworkSession *mSession;
     49 
     50     virtual bool threadLoop();
     51 
     52     DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
     53 };
     54 
     55 struct ANetworkSession::Session : public RefBase {
     56     enum State {
     57         CONNECTING,
     58         CONNECTED,
     59         LISTENING_RTSP,
     60         LISTENING_TCP_DGRAMS,
     61         DATAGRAM,
     62     };
     63 
     64     Session(int32_t sessionID,
     65             State state,
     66             int s,
     67             const sp<AMessage> &notify);
     68 
     69     int32_t sessionID() const;
     70     int socket() const;
     71     sp<AMessage> getNotificationMessage() const;
     72 
     73     bool isRTSPServer() const;
     74     bool isTCPDatagramServer() const;
     75 
     76     bool wantsToRead();
     77     bool wantsToWrite();
     78 
     79     status_t readMore();
     80     status_t writeMore();
     81 
     82     status_t sendRequest(const void *data, ssize_t size);
     83 
     84     void setIsRTSPConnection(bool yesno);
     85 
     86 protected:
     87     virtual ~Session();
     88 
     89 private:
     90     int32_t mSessionID;
     91     State mState;
     92     bool mIsRTSPConnection;
     93     int mSocket;
     94     sp<AMessage> mNotify;
     95     bool mSawReceiveFailure, mSawSendFailure;
     96 
     97     // for TCP / stream data
     98     AString mOutBuffer;
     99 
    100     // for UDP / datagrams
    101     List<sp<ABuffer> > mOutDatagrams;
    102 
    103     AString mInBuffer;
    104 
    105     void notifyError(bool send, status_t err, const char *detail);
    106     void notify(NotificationReason reason);
    107 
    108     DISALLOW_EVIL_CONSTRUCTORS(Session);
    109 };
    110 ////////////////////////////////////////////////////////////////////////////////
    111 
    112 ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
    113     : mSession(session) {
    114 }
    115 
    116 ANetworkSession::NetworkThread::~NetworkThread() {
    117 }
    118 
    119 bool ANetworkSession::NetworkThread::threadLoop() {
    120     mSession->threadLoop();
    121 
    122     return true;
    123 }
    124 
    125 ////////////////////////////////////////////////////////////////////////////////
    126 
    127 ANetworkSession::Session::Session(
    128         int32_t sessionID,
    129         State state,
    130         int s,
    131         const sp<AMessage> &notify)
    132     : mSessionID(sessionID),
    133       mState(state),
    134       mIsRTSPConnection(false),
    135       mSocket(s),
    136       mNotify(notify),
    137       mSawReceiveFailure(false),
    138       mSawSendFailure(false) {
    139     if (mState == CONNECTED) {
    140         struct sockaddr_in localAddr;
    141         socklen_t localAddrLen = sizeof(localAddr);
    142 
    143         int res = getsockname(
    144                 mSocket, (struct sockaddr *)&localAddr, &localAddrLen);
    145         CHECK_GE(res, 0);
    146 
    147         struct sockaddr_in remoteAddr;
    148         socklen_t remoteAddrLen = sizeof(remoteAddr);
    149 
    150         res = getpeername(
    151                 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
    152         CHECK_GE(res, 0);
    153 
    154         in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
    155         AString localAddrString = StringPrintf(
    156                 "%d.%d.%d.%d",
    157                 (addr >> 24),
    158                 (addr >> 16) & 0xff,
    159                 (addr >> 8) & 0xff,
    160                 addr & 0xff);
    161 
    162         addr = ntohl(remoteAddr.sin_addr.s_addr);
    163         AString remoteAddrString = StringPrintf(
    164                 "%d.%d.%d.%d",
    165                 (addr >> 24),
    166                 (addr >> 16) & 0xff,
    167                 (addr >> 8) & 0xff,
    168                 addr & 0xff);
    169 
    170         sp<AMessage> msg = mNotify->dup();
    171         msg->setInt32("sessionID", mSessionID);
    172         msg->setInt32("reason", kWhatClientConnected);
    173         msg->setString("server-ip", localAddrString.c_str());
    174         msg->setInt32("server-port", ntohs(localAddr.sin_port));
    175         msg->setString("client-ip", remoteAddrString.c_str());
    176         msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
    177         msg->post();
    178     }
    179 }
    180 
    181 ANetworkSession::Session::~Session() {
    182     ALOGV("Session %d gone", mSessionID);
    183 
    184     close(mSocket);
    185     mSocket = -1;
    186 }
    187 
    188 int32_t ANetworkSession::Session::sessionID() const {
    189     return mSessionID;
    190 }
    191 
    192 int ANetworkSession::Session::socket() const {
    193     return mSocket;
    194 }
    195 
    196 void ANetworkSession::Session::setIsRTSPConnection(bool yesno) {
    197     mIsRTSPConnection = yesno;
    198 }
    199 
    200 sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
    201     return mNotify;
    202 }
    203 
    204 bool ANetworkSession::Session::isRTSPServer() const {
    205     return mState == LISTENING_RTSP;
    206 }
    207 
    208 bool ANetworkSession::Session::isTCPDatagramServer() const {
    209     return mState == LISTENING_TCP_DGRAMS;
    210 }
    211 
    212 bool ANetworkSession::Session::wantsToRead() {
    213     return !mSawReceiveFailure && mState != CONNECTING;
    214 }
    215 
    216 bool ANetworkSession::Session::wantsToWrite() {
    217     return !mSawSendFailure
    218         && (mState == CONNECTING
    219             || (mState == CONNECTED && !mOutBuffer.empty())
    220             || (mState == DATAGRAM && !mOutDatagrams.empty()));
    221 }
    222 
    223 status_t ANetworkSession::Session::readMore() {
    224     if (mState == DATAGRAM) {
    225         status_t err;
    226         do {
    227             sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
    228 
    229             struct sockaddr_in remoteAddr;
    230             socklen_t remoteAddrLen = sizeof(remoteAddr);
    231 
    232             ssize_t n;
    233             do {
    234                 n = recvfrom(
    235                         mSocket, buf->data(), buf->capacity(), 0,
    236                         (struct sockaddr *)&remoteAddr, &remoteAddrLen);
    237             } while (n < 0 && errno == EINTR);
    238 
    239             err = OK;
    240             if (n < 0) {
    241                 err = -errno;
    242             } else if (n == 0) {
    243                 err = -ECONNRESET;
    244             } else {
    245                 buf->setRange(0, n);
    246 
    247                 int64_t nowUs = ALooper::GetNowUs();
    248                 buf->meta()->setInt64("arrivalTimeUs", nowUs);
    249 
    250                 sp<AMessage> notify = mNotify->dup();
    251                 notify->setInt32("sessionID", mSessionID);
    252                 notify->setInt32("reason", kWhatDatagram);
    253 
    254                 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
    255                 notify->setString(
    256                         "fromAddr",
    257                         StringPrintf(
    258                             "%u.%u.%u.%u",
    259                             ip >> 24,
    260                             (ip >> 16) & 0xff,
    261                             (ip >> 8) & 0xff,
    262                             ip & 0xff).c_str());
    263 
    264                 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
    265 
    266                 notify->setBuffer("data", buf);
    267                 notify->post();
    268             }
    269         } while (err == OK);
    270 
    271         if (err == -EAGAIN) {
    272             err = OK;
    273         }
    274 
    275         if (err != OK) {
    276             notifyError(false /* send */, err, "Recvfrom failed.");
    277             mSawReceiveFailure = true;
    278         }
    279 
    280         return err;
    281     }
    282 
    283     char tmp[512];
    284     ssize_t n;
    285     do {
    286         n = recv(mSocket, tmp, sizeof(tmp), 0);
    287     } while (n < 0 && errno == EINTR);
    288 
    289     status_t err = OK;
    290 
    291     if (n > 0) {
    292         mInBuffer.append(tmp, n);
    293 
    294 #if 0
    295         ALOGI("in:");
    296         hexdump(tmp, n);
    297 #endif
    298     } else if (n < 0) {
    299         err = -errno;
    300     } else {
    301         err = -ECONNRESET;
    302     }
    303 
    304     if (!mIsRTSPConnection) {
    305         // TCP stream carrying 16-bit length-prefixed datagrams.
    306 
    307         while (mInBuffer.size() >= 2) {
    308             size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
    309 
    310             if (mInBuffer.size() < packetSize + 2) {
    311                 break;
    312             }
    313 
    314             sp<ABuffer> packet = new ABuffer(packetSize);
    315             memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
    316 
    317             sp<AMessage> notify = mNotify->dup();
    318             notify->setInt32("sessionID", mSessionID);
    319             notify->setInt32("reason", kWhatDatagram);
    320             notify->setBuffer("data", packet);
    321             notify->post();
    322 
    323             mInBuffer.erase(0, packetSize + 2);
    324         }
    325     } else {
    326         for (;;) {
    327             size_t length;
    328 
    329             if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
    330                 if (mInBuffer.size() < 4) {
    331                     break;
    332                 }
    333 
    334                 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
    335 
    336                 if (mInBuffer.size() < 4 + length) {
    337                     break;
    338                 }
    339 
    340                 sp<AMessage> notify = mNotify->dup();
    341                 notify->setInt32("sessionID", mSessionID);
    342                 notify->setInt32("reason", kWhatBinaryData);
    343                 notify->setInt32("channel", mInBuffer.c_str()[1]);
    344 
    345                 sp<ABuffer> data = new ABuffer(length);
    346                 memcpy(data->data(), mInBuffer.c_str() + 4, length);
    347 
    348                 int64_t nowUs = ALooper::GetNowUs();
    349                 data->meta()->setInt64("arrivalTimeUs", nowUs);
    350 
    351                 notify->setBuffer("data", data);
    352                 notify->post();
    353 
    354                 mInBuffer.erase(0, 4 + length);
    355                 continue;
    356             }
    357 
    358             sp<ParsedMessage> msg =
    359                 ParsedMessage::Parse(
    360                         mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
    361 
    362             if (msg == NULL) {
    363                 break;
    364             }
    365 
    366             sp<AMessage> notify = mNotify->dup();
    367             notify->setInt32("sessionID", mSessionID);
    368             notify->setInt32("reason", kWhatData);
    369             notify->setObject("data", msg);
    370             notify->post();
    371 
    372 #if 1
    373             // XXX The (old) dongle sends the wrong content length header on a
    374             // SET_PARAMETER request that signals a "wfd_idr_request".
    375             // (17 instead of 19).
    376             const char *content = msg->getContent();
    377             if (content
    378                     && !memcmp(content, "wfd_idr_request\r\n", 17)
    379                     && length >= 19
    380                     && mInBuffer.c_str()[length] == '\r'
    381                     && mInBuffer.c_str()[length + 1] == '\n') {
    382                 length += 2;
    383             }
    384 #endif
    385 
    386             mInBuffer.erase(0, length);
    387 
    388             if (err != OK) {
    389                 break;
    390             }
    391         }
    392     }
    393 
    394     if (err != OK) {
    395         notifyError(false /* send */, err, "Recv failed.");
    396         mSawReceiveFailure = true;
    397     }
    398 
    399     return err;
    400 }
    401 
    402 status_t ANetworkSession::Session::writeMore() {
    403     if (mState == DATAGRAM) {
    404         CHECK(!mOutDatagrams.empty());
    405 
    406         status_t err;
    407         do {
    408             const sp<ABuffer> &datagram = *mOutDatagrams.begin();
    409 
    410             int n;
    411             do {
    412                 n = send(mSocket, datagram->data(), datagram->size(), 0);
    413             } while (n < 0 && errno == EINTR);
    414 
    415             err = OK;
    416 
    417             if (n > 0) {
    418                 mOutDatagrams.erase(mOutDatagrams.begin());
    419             } else if (n < 0) {
    420                 err = -errno;
    421             } else if (n == 0) {
    422                 err = -ECONNRESET;
    423             }
    424         } while (err == OK && !mOutDatagrams.empty());
    425 
    426         if (err == -EAGAIN) {
    427             err = OK;
    428         }
    429 
    430         if (err != OK) {
    431             notifyError(true /* send */, err, "Send datagram failed.");
    432             mSawSendFailure = true;
    433         }
    434 
    435         return err;
    436     }
    437 
    438     if (mState == CONNECTING) {
    439         int err;
    440         socklen_t optionLen = sizeof(err);
    441         CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
    442         CHECK_EQ(optionLen, (socklen_t)sizeof(err));
    443 
    444         if (err != 0) {
    445             notifyError(kWhatError, -err, "Connection failed");
    446             mSawSendFailure = true;
    447 
    448             return -err;
    449         }
    450 
    451         mState = CONNECTED;
    452         notify(kWhatConnected);
    453 
    454         return OK;
    455     }
    456 
    457     CHECK_EQ(mState, CONNECTED);
    458     CHECK(!mOutBuffer.empty());
    459 
    460     ssize_t n;
    461     do {
    462         n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0);
    463     } while (n < 0 && errno == EINTR);
    464 
    465     status_t err = OK;
    466 
    467     if (n > 0) {
    468 #if 0
    469         ALOGI("out:");
    470         hexdump(mOutBuffer.c_str(), n);
    471 #endif
    472 
    473         mOutBuffer.erase(0, n);
    474     } else if (n < 0) {
    475         err = -errno;
    476     } else if (n == 0) {
    477         err = -ECONNRESET;
    478     }
    479 
    480     if (err != OK) {
    481         notifyError(true /* send */, err, "Send failed.");
    482         mSawSendFailure = true;
    483     }
    484 
    485     return err;
    486 }
    487 
    488 status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) {
    489     CHECK(mState == CONNECTED || mState == DATAGRAM);
    490 
    491     if (mState == DATAGRAM) {
    492         CHECK_GE(size, 0);
    493 
    494         sp<ABuffer> datagram = new ABuffer(size);
    495         memcpy(datagram->data(), data, size);
    496 
    497         mOutDatagrams.push_back(datagram);
    498         return OK;
    499     }
    500 
    501     if (mState == CONNECTED && !mIsRTSPConnection) {
    502         CHECK_LE(size, 65535);
    503 
    504         uint8_t prefix[2];
    505         prefix[0] = size >> 8;
    506         prefix[1] = size & 0xff;
    507 
    508         mOutBuffer.append((const char *)prefix, sizeof(prefix));
    509     }
    510 
    511     mOutBuffer.append(
    512             (const char *)data,
    513             (size >= 0) ? size : strlen((const char *)data));
    514 
    515     return OK;
    516 }
    517 
    518 void ANetworkSession::Session::notifyError(
    519         bool send, status_t err, const char *detail) {
    520     sp<AMessage> msg = mNotify->dup();
    521     msg->setInt32("sessionID", mSessionID);
    522     msg->setInt32("reason", kWhatError);
    523     msg->setInt32("send", send);
    524     msg->setInt32("err", err);
    525     msg->setString("detail", detail);
    526     msg->post();
    527 }
    528 
    529 void ANetworkSession::Session::notify(NotificationReason reason) {
    530     sp<AMessage> msg = mNotify->dup();
    531     msg->setInt32("sessionID", mSessionID);
    532     msg->setInt32("reason", reason);
    533     msg->post();
    534 }
    535 
    536 ////////////////////////////////////////////////////////////////////////////////
    537 
    538 ANetworkSession::ANetworkSession()
    539     : mNextSessionID(1) {
    540     mPipeFd[0] = mPipeFd[1] = -1;
    541 }
    542 
    543 ANetworkSession::~ANetworkSession() {
    544     stop();
    545 }
    546 
    547 status_t ANetworkSession::start() {
    548     if (mThread != NULL) {
    549         return INVALID_OPERATION;
    550     }
    551 
    552     int res = pipe(mPipeFd);
    553     if (res != 0) {
    554         mPipeFd[0] = mPipeFd[1] = -1;
    555         return -errno;
    556     }
    557 
    558     mThread = new NetworkThread(this);
    559 
    560     status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
    561 
    562     if (err != OK) {
    563         mThread.clear();
    564 
    565         close(mPipeFd[0]);
    566         close(mPipeFd[1]);
    567         mPipeFd[0] = mPipeFd[1] = -1;
    568 
    569         return err;
    570     }
    571 
    572     return OK;
    573 }
    574 
    575 status_t ANetworkSession::stop() {
    576     if (mThread == NULL) {
    577         return INVALID_OPERATION;
    578     }
    579 
    580     mThread->requestExit();
    581     interrupt();
    582     mThread->requestExitAndWait();
    583 
    584     mThread.clear();
    585 
    586     close(mPipeFd[0]);
    587     close(mPipeFd[1]);
    588     mPipeFd[0] = mPipeFd[1] = -1;
    589 
    590     return OK;
    591 }
    592 
    593 status_t ANetworkSession::createRTSPClient(
    594         const char *host, unsigned port, const sp<AMessage> &notify,
    595         int32_t *sessionID) {
    596     return createClientOrServer(
    597             kModeCreateRTSPClient,
    598             NULL /* addr */,
    599             0 /* port */,
    600             host,
    601             port,
    602             notify,
    603             sessionID);
    604 }
    605 
    606 status_t ANetworkSession::createRTSPServer(
    607         const struct in_addr &addr, unsigned port,
    608         const sp<AMessage> &notify, int32_t *sessionID) {
    609     return createClientOrServer(
    610             kModeCreateRTSPServer,
    611             &addr,
    612             port,
    613             NULL /* remoteHost */,
    614             0 /* remotePort */,
    615             notify,
    616             sessionID);
    617 }
    618 
    619 status_t ANetworkSession::createUDPSession(
    620         unsigned localPort, const sp<AMessage> &notify, int32_t *sessionID) {
    621     return createUDPSession(localPort, NULL, 0, notify, sessionID);
    622 }
    623 
    624 status_t ANetworkSession::createUDPSession(
    625         unsigned localPort,
    626         const char *remoteHost,
    627         unsigned remotePort,
    628         const sp<AMessage> &notify,
    629         int32_t *sessionID) {
    630     return createClientOrServer(
    631             kModeCreateUDPSession,
    632             NULL /* addr */,
    633             localPort,
    634             remoteHost,
    635             remotePort,
    636             notify,
    637             sessionID);
    638 }
    639 
    640 status_t ANetworkSession::createTCPDatagramSession(
    641         const struct in_addr &addr, unsigned port,
    642         const sp<AMessage> &notify, int32_t *sessionID) {
    643     return createClientOrServer(
    644             kModeCreateTCPDatagramSessionPassive,
    645             &addr,
    646             port,
    647             NULL /* remoteHost */,
    648             0 /* remotePort */,
    649             notify,
    650             sessionID);
    651 }
    652 
    653 status_t ANetworkSession::createTCPDatagramSession(
    654         unsigned localPort,
    655         const char *remoteHost,
    656         unsigned remotePort,
    657         const sp<AMessage> &notify,
    658         int32_t *sessionID) {
    659     return createClientOrServer(
    660             kModeCreateTCPDatagramSessionActive,
    661             NULL /* addr */,
    662             localPort,
    663             remoteHost,
    664             remotePort,
    665             notify,
    666             sessionID);
    667 }
    668 
    669 status_t ANetworkSession::destroySession(int32_t sessionID) {
    670     Mutex::Autolock autoLock(mLock);
    671 
    672     ssize_t index = mSessions.indexOfKey(sessionID);
    673 
    674     if (index < 0) {
    675         return -ENOENT;
    676     }
    677 
    678     mSessions.removeItemsAt(index);
    679 
    680     interrupt();
    681 
    682     return OK;
    683 }
    684 
    685 // static
    686 status_t ANetworkSession::MakeSocketNonBlocking(int s) {
    687     int flags = fcntl(s, F_GETFL, 0);
    688     if (flags < 0) {
    689         flags = 0;
    690     }
    691 
    692     int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
    693     if (res < 0) {
    694         return -errno;
    695     }
    696 
    697     return OK;
    698 }
    699 
    700 status_t ANetworkSession::createClientOrServer(
    701         Mode mode,
    702         const struct in_addr *localAddr,
    703         unsigned port,
    704         const char *remoteHost,
    705         unsigned remotePort,
    706         const sp<AMessage> &notify,
    707         int32_t *sessionID) {
    708     Mutex::Autolock autoLock(mLock);
    709 
    710     *sessionID = 0;
    711     status_t err = OK;
    712     int s, res;
    713     sp<Session> session;
    714 
    715     s = socket(
    716             AF_INET,
    717             (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
    718             0);
    719 
    720     if (s < 0) {
    721         err = -errno;
    722         goto bail;
    723     }
    724 
    725     if (mode == kModeCreateRTSPServer
    726             || mode == kModeCreateTCPDatagramSessionPassive) {
    727         const int yes = 1;
    728         res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
    729 
    730         if (res < 0) {
    731             err = -errno;
    732             goto bail2;
    733         }
    734     }
    735 
    736     if (mode == kModeCreateUDPSession) {
    737         int size = 256 * 1024;
    738 
    739         res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
    740 
    741         if (res < 0) {
    742             err = -errno;
    743             goto bail2;
    744         }
    745 
    746         res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
    747 
    748         if (res < 0) {
    749             err = -errno;
    750             goto bail2;
    751         }
    752     }
    753 
    754     err = MakeSocketNonBlocking(s);
    755 
    756     if (err != OK) {
    757         goto bail2;
    758     }
    759 
    760     struct sockaddr_in addr;
    761     memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
    762     addr.sin_family = AF_INET;
    763 
    764     if (mode == kModeCreateRTSPClient
    765             || mode == kModeCreateTCPDatagramSessionActive) {
    766         struct hostent *ent= gethostbyname(remoteHost);
    767         if (ent == NULL) {
    768             err = -h_errno;
    769             goto bail2;
    770         }
    771 
    772         addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    773         addr.sin_port = htons(remotePort);
    774     } else if (localAddr != NULL) {
    775         addr.sin_addr = *localAddr;
    776         addr.sin_port = htons(port);
    777     } else {
    778         addr.sin_addr.s_addr = htonl(INADDR_ANY);
    779         addr.sin_port = htons(port);
    780     }
    781 
    782     if (mode == kModeCreateRTSPClient
    783             || mode == kModeCreateTCPDatagramSessionActive) {
    784         in_addr_t x = ntohl(addr.sin_addr.s_addr);
    785         ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
    786               s,
    787               (x >> 24),
    788               (x >> 16) & 0xff,
    789               (x >> 8) & 0xff,
    790               x & 0xff,
    791               ntohs(addr.sin_port));
    792 
    793         res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
    794 
    795         CHECK_LT(res, 0);
    796         if (errno == EINPROGRESS) {
    797             res = 0;
    798         }
    799     } else {
    800         res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
    801 
    802         if (res == 0) {
    803             if (mode == kModeCreateRTSPServer
    804                     || mode == kModeCreateTCPDatagramSessionPassive) {
    805                 res = listen(s, 4);
    806             } else {
    807                 CHECK_EQ(mode, kModeCreateUDPSession);
    808 
    809                 if (remoteHost != NULL) {
    810                     struct sockaddr_in remoteAddr;
    811                     memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
    812                     remoteAddr.sin_family = AF_INET;
    813                     remoteAddr.sin_port = htons(remotePort);
    814 
    815                     struct hostent *ent= gethostbyname(remoteHost);
    816                     if (ent == NULL) {
    817                         err = -h_errno;
    818                         goto bail2;
    819                     }
    820 
    821                     remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    822 
    823                     res = connect(
    824                             s,
    825                             (const struct sockaddr *)&remoteAddr,
    826                             sizeof(remoteAddr));
    827                 }
    828             }
    829         }
    830     }
    831 
    832     if (res < 0) {
    833         err = -errno;
    834         goto bail2;
    835     }
    836 
    837     Session::State state;
    838     switch (mode) {
    839         case kModeCreateRTSPClient:
    840             state = Session::CONNECTING;
    841             break;
    842 
    843         case kModeCreateTCPDatagramSessionActive:
    844             state = Session::CONNECTING;
    845             break;
    846 
    847         case kModeCreateTCPDatagramSessionPassive:
    848             state = Session::LISTENING_TCP_DGRAMS;
    849             break;
    850 
    851         case kModeCreateRTSPServer:
    852             state = Session::LISTENING_RTSP;
    853             break;
    854 
    855         default:
    856             CHECK_EQ(mode, kModeCreateUDPSession);
    857             state = Session::DATAGRAM;
    858             break;
    859     }
    860 
    861     session = new Session(
    862             mNextSessionID++,
    863             state,
    864             s,
    865             notify);
    866 
    867     if (mode == kModeCreateTCPDatagramSessionActive) {
    868         session->setIsRTSPConnection(false);
    869     } else if (mode == kModeCreateRTSPClient) {
    870         session->setIsRTSPConnection(true);
    871     }
    872 
    873     mSessions.add(session->sessionID(), session);
    874 
    875     interrupt();
    876 
    877     *sessionID = session->sessionID();
    878 
    879     goto bail;
    880 
    881 bail2:
    882     close(s);
    883     s = -1;
    884 
    885 bail:
    886     return err;
    887 }
    888 
    889 status_t ANetworkSession::connectUDPSession(
    890         int32_t sessionID, const char *remoteHost, unsigned remotePort) {
    891     Mutex::Autolock autoLock(mLock);
    892 
    893     ssize_t index = mSessions.indexOfKey(sessionID);
    894 
    895     if (index < 0) {
    896         return -ENOENT;
    897     }
    898 
    899     const sp<Session> session = mSessions.valueAt(index);
    900     int s = session->socket();
    901 
    902     struct sockaddr_in remoteAddr;
    903     memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
    904     remoteAddr.sin_family = AF_INET;
    905     remoteAddr.sin_port = htons(remotePort);
    906 
    907     status_t err = OK;
    908     struct hostent *ent = gethostbyname(remoteHost);
    909     if (ent == NULL) {
    910         err = -h_errno;
    911     } else {
    912         remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
    913 
    914         int res = connect(
    915                 s,
    916                 (const struct sockaddr *)&remoteAddr,
    917                 sizeof(remoteAddr));
    918 
    919         if (res < 0) {
    920             err = -errno;
    921         }
    922     }
    923 
    924     return err;
    925 }
    926 
    927 status_t ANetworkSession::sendRequest(
    928         int32_t sessionID, const void *data, ssize_t size) {
    929     Mutex::Autolock autoLock(mLock);
    930 
    931     ssize_t index = mSessions.indexOfKey(sessionID);
    932 
    933     if (index < 0) {
    934         return -ENOENT;
    935     }
    936 
    937     const sp<Session> session = mSessions.valueAt(index);
    938 
    939     status_t err = session->sendRequest(data, size);
    940 
    941     interrupt();
    942 
    943     return err;
    944 }
    945 
    946 void ANetworkSession::interrupt() {
    947     static const char dummy = 0;
    948 
    949     ssize_t n;
    950     do {
    951         n = write(mPipeFd[1], &dummy, 1);
    952     } while (n < 0 && errno == EINTR);
    953 
    954     if (n < 0) {
    955         ALOGW("Error writing to pipe (%s)", strerror(errno));
    956     }
    957 }
    958 
    959 void ANetworkSession::threadLoop() {
    960     fd_set rs, ws;
    961     FD_ZERO(&rs);
    962     FD_ZERO(&ws);
    963 
    964     FD_SET(mPipeFd[0], &rs);
    965     int maxFd = mPipeFd[0];
    966 
    967     {
    968         Mutex::Autolock autoLock(mLock);
    969 
    970         for (size_t i = 0; i < mSessions.size(); ++i) {
    971             const sp<Session> &session = mSessions.valueAt(i);
    972 
    973             int s = session->socket();
    974 
    975             if (s < 0) {
    976                 continue;
    977             }
    978 
    979             if (session->wantsToRead()) {
    980                 FD_SET(s, &rs);
    981                 if (s > maxFd) {
    982                     maxFd = s;
    983                 }
    984             }
    985 
    986             if (session->wantsToWrite()) {
    987                 FD_SET(s, &ws);
    988                 if (s > maxFd) {
    989                     maxFd = s;
    990                 }
    991             }
    992         }
    993     }
    994 
    995     int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
    996 
    997     if (res == 0) {
    998         return;
    999     }
   1000 
   1001     if (res < 0) {
   1002         if (errno == EINTR) {
   1003             return;
   1004         }
   1005 
   1006         ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
   1007         return;
   1008     }
   1009 
   1010     if (FD_ISSET(mPipeFd[0], &rs)) {
   1011         char c;
   1012         ssize_t n;
   1013         do {
   1014             n = read(mPipeFd[0], &c, 1);
   1015         } while (n < 0 && errno == EINTR);
   1016 
   1017         if (n < 0) {
   1018             ALOGW("Error reading from pipe (%s)", strerror(errno));
   1019         }
   1020 
   1021         --res;
   1022     }
   1023 
   1024     {
   1025         Mutex::Autolock autoLock(mLock);
   1026 
   1027         List<sp<Session> > sessionsToAdd;
   1028 
   1029         for (size_t i = mSessions.size(); res > 0 && i-- > 0;) {
   1030             const sp<Session> &session = mSessions.valueAt(i);
   1031 
   1032             int s = session->socket();
   1033 
   1034             if (s < 0) {
   1035                 continue;
   1036             }
   1037 
   1038             if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
   1039                 --res;
   1040             }
   1041 
   1042             if (FD_ISSET(s, &rs)) {
   1043                 if (session->isRTSPServer() || session->isTCPDatagramServer()) {
   1044                     struct sockaddr_in remoteAddr;
   1045                     socklen_t remoteAddrLen = sizeof(remoteAddr);
   1046 
   1047                     int clientSocket = accept(
   1048                             s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
   1049 
   1050                     if (clientSocket >= 0) {
   1051                         status_t err = MakeSocketNonBlocking(clientSocket);
   1052 
   1053                         if (err != OK) {
   1054                             ALOGE("Unable to make client socket non blocking, "
   1055                                   "failed w/ error %d (%s)",
   1056                                   err, strerror(-err));
   1057 
   1058                             close(clientSocket);
   1059                             clientSocket = -1;
   1060                         } else {
   1061                             in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
   1062 
   1063                             ALOGI("incoming connection from %d.%d.%d.%d:%d "
   1064                                   "(socket %d)",
   1065                                   (addr >> 24),
   1066                                   (addr >> 16) & 0xff,
   1067                                   (addr >> 8) & 0xff,
   1068                                   addr & 0xff,
   1069                                   ntohs(remoteAddr.sin_port),
   1070                                   clientSocket);
   1071 
   1072                             sp<Session> clientSession =
   1073                                 // using socket sd as sessionID
   1074                                 new Session(
   1075                                         mNextSessionID++,
   1076                                         Session::CONNECTED,
   1077                                         clientSocket,
   1078                                         session->getNotificationMessage());
   1079 
   1080                             clientSession->setIsRTSPConnection(
   1081                                     session->isRTSPServer());
   1082 
   1083                             sessionsToAdd.push_back(clientSession);
   1084                         }
   1085                     } else {
   1086                         ALOGE("accept returned error %d (%s)",
   1087                               errno, strerror(errno));
   1088                     }
   1089                 } else {
   1090                     status_t err = session->readMore();
   1091                     if (err != OK) {
   1092                         ALOGE("readMore on socket %d failed w/ error %d (%s)",
   1093                               s, err, strerror(-err));
   1094                     }
   1095                 }
   1096             }
   1097 
   1098             if (FD_ISSET(s, &ws)) {
   1099                 status_t err = session->writeMore();
   1100                 if (err != OK) {
   1101                     ALOGE("writeMore on socket %d failed w/ error %d (%s)",
   1102                           s, err, strerror(-err));
   1103                 }
   1104             }
   1105         }
   1106 
   1107         while (!sessionsToAdd.empty()) {
   1108             sp<Session> session = *sessionsToAdd.begin();
   1109             sessionsToAdd.erase(sessionsToAdd.begin());
   1110 
   1111             mSessions.add(session->sessionID(), session);
   1112 
   1113             ALOGI("added clientSession %d", session->sessionID());
   1114         }
   1115     }
   1116 }
   1117 
   1118 }  // namespace android
   1119 
   1120