1 /* 2 * Copyright 2012, The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "NetworkSession" 19 #include <utils/Log.h> 20 21 #include "ANetworkSession.h" 22 #include "ParsedMessage.h" 23 24 #include <arpa/inet.h> 25 #include <fcntl.h> 26 #include <net/if.h> 27 #include <netdb.h> 28 #include <netinet/in.h> 29 #include <sys/socket.h> 30 31 #include <media/stagefright/foundation/ABuffer.h> 32 #include <media/stagefright/foundation/ADebug.h> 33 #include <media/stagefright/foundation/AMessage.h> 34 #include <media/stagefright/foundation/hexdump.h> 35 #include <media/stagefright/Utils.h> 36 37 namespace android { 38 39 static const size_t kMaxUDPSize = 1500; 40 41 struct ANetworkSession::NetworkThread : public Thread { 42 NetworkThread(ANetworkSession *session); 43 44 protected: 45 virtual ~NetworkThread(); 46 47 private: 48 ANetworkSession *mSession; 49 50 virtual bool threadLoop(); 51 52 DISALLOW_EVIL_CONSTRUCTORS(NetworkThread); 53 }; 54 55 struct ANetworkSession::Session : public RefBase { 56 enum State { 57 CONNECTING, 58 CONNECTED, 59 LISTENING_RTSP, 60 LISTENING_TCP_DGRAMS, 61 DATAGRAM, 62 }; 63 64 Session(int32_t sessionID, 65 State state, 66 int s, 67 const sp<AMessage> ¬ify); 68 69 int32_t sessionID() const; 70 int socket() const; 71 sp<AMessage> getNotificationMessage() const; 72 73 bool isRTSPServer() const; 74 bool isTCPDatagramServer() const; 75 76 bool wantsToRead(); 77 bool wantsToWrite(); 78 79 status_t readMore(); 80 status_t writeMore(); 81 82 status_t sendRequest(const void *data, ssize_t size); 83 84 void setIsRTSPConnection(bool yesno); 85 86 protected: 87 virtual ~Session(); 88 89 private: 90 int32_t mSessionID; 91 State mState; 92 bool mIsRTSPConnection; 93 int mSocket; 94 sp<AMessage> mNotify; 95 bool mSawReceiveFailure, mSawSendFailure; 96 97 // for TCP / stream data 98 AString mOutBuffer; 99 100 // for UDP / datagrams 101 List<sp<ABuffer> > mOutDatagrams; 102 103 AString mInBuffer; 104 105 void notifyError(bool send, status_t err, const char *detail); 106 void notify(NotificationReason reason); 107 108 DISALLOW_EVIL_CONSTRUCTORS(Session); 109 }; 110 //////////////////////////////////////////////////////////////////////////////// 111 112 ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session) 113 : mSession(session) { 114 } 115 116 ANetworkSession::NetworkThread::~NetworkThread() { 117 } 118 119 bool ANetworkSession::NetworkThread::threadLoop() { 120 mSession->threadLoop(); 121 122 return true; 123 } 124 125 //////////////////////////////////////////////////////////////////////////////// 126 127 ANetworkSession::Session::Session( 128 int32_t sessionID, 129 State state, 130 int s, 131 const sp<AMessage> ¬ify) 132 : mSessionID(sessionID), 133 mState(state), 134 mIsRTSPConnection(false), 135 mSocket(s), 136 mNotify(notify), 137 mSawReceiveFailure(false), 138 mSawSendFailure(false) { 139 if (mState == CONNECTED) { 140 struct sockaddr_in localAddr; 141 socklen_t localAddrLen = sizeof(localAddr); 142 143 int res = getsockname( 144 mSocket, (struct sockaddr *)&localAddr, &localAddrLen); 145 CHECK_GE(res, 0); 146 147 struct sockaddr_in remoteAddr; 148 socklen_t remoteAddrLen = sizeof(remoteAddr); 149 150 res = getpeername( 151 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen); 152 CHECK_GE(res, 0); 153 154 in_addr_t addr = ntohl(localAddr.sin_addr.s_addr); 155 AString localAddrString = StringPrintf( 156 "%d.%d.%d.%d", 157 (addr >> 24), 158 (addr >> 16) & 0xff, 159 (addr >> 8) & 0xff, 160 addr & 0xff); 161 162 addr = ntohl(remoteAddr.sin_addr.s_addr); 163 AString remoteAddrString = StringPrintf( 164 "%d.%d.%d.%d", 165 (addr >> 24), 166 (addr >> 16) & 0xff, 167 (addr >> 8) & 0xff, 168 addr & 0xff); 169 170 sp<AMessage> msg = mNotify->dup(); 171 msg->setInt32("sessionID", mSessionID); 172 msg->setInt32("reason", kWhatClientConnected); 173 msg->setString("server-ip", localAddrString.c_str()); 174 msg->setInt32("server-port", ntohs(localAddr.sin_port)); 175 msg->setString("client-ip", remoteAddrString.c_str()); 176 msg->setInt32("client-port", ntohs(remoteAddr.sin_port)); 177 msg->post(); 178 } 179 } 180 181 ANetworkSession::Session::~Session() { 182 ALOGV("Session %d gone", mSessionID); 183 184 close(mSocket); 185 mSocket = -1; 186 } 187 188 int32_t ANetworkSession::Session::sessionID() const { 189 return mSessionID; 190 } 191 192 int ANetworkSession::Session::socket() const { 193 return mSocket; 194 } 195 196 void ANetworkSession::Session::setIsRTSPConnection(bool yesno) { 197 mIsRTSPConnection = yesno; 198 } 199 200 sp<AMessage> ANetworkSession::Session::getNotificationMessage() const { 201 return mNotify; 202 } 203 204 bool ANetworkSession::Session::isRTSPServer() const { 205 return mState == LISTENING_RTSP; 206 } 207 208 bool ANetworkSession::Session::isTCPDatagramServer() const { 209 return mState == LISTENING_TCP_DGRAMS; 210 } 211 212 bool ANetworkSession::Session::wantsToRead() { 213 return !mSawReceiveFailure && mState != CONNECTING; 214 } 215 216 bool ANetworkSession::Session::wantsToWrite() { 217 return !mSawSendFailure 218 && (mState == CONNECTING 219 || (mState == CONNECTED && !mOutBuffer.empty()) 220 || (mState == DATAGRAM && !mOutDatagrams.empty())); 221 } 222 223 status_t ANetworkSession::Session::readMore() { 224 if (mState == DATAGRAM) { 225 status_t err; 226 do { 227 sp<ABuffer> buf = new ABuffer(kMaxUDPSize); 228 229 struct sockaddr_in remoteAddr; 230 socklen_t remoteAddrLen = sizeof(remoteAddr); 231 232 ssize_t n; 233 do { 234 n = recvfrom( 235 mSocket, buf->data(), buf->capacity(), 0, 236 (struct sockaddr *)&remoteAddr, &remoteAddrLen); 237 } while (n < 0 && errno == EINTR); 238 239 err = OK; 240 if (n < 0) { 241 err = -errno; 242 } else if (n == 0) { 243 err = -ECONNRESET; 244 } else { 245 buf->setRange(0, n); 246 247 int64_t nowUs = ALooper::GetNowUs(); 248 buf->meta()->setInt64("arrivalTimeUs", nowUs); 249 250 sp<AMessage> notify = mNotify->dup(); 251 notify->setInt32("sessionID", mSessionID); 252 notify->setInt32("reason", kWhatDatagram); 253 254 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr); 255 notify->setString( 256 "fromAddr", 257 StringPrintf( 258 "%u.%u.%u.%u", 259 ip >> 24, 260 (ip >> 16) & 0xff, 261 (ip >> 8) & 0xff, 262 ip & 0xff).c_str()); 263 264 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port)); 265 266 notify->setBuffer("data", buf); 267 notify->post(); 268 } 269 } while (err == OK); 270 271 if (err == -EAGAIN) { 272 err = OK; 273 } 274 275 if (err != OK) { 276 notifyError(false /* send */, err, "Recvfrom failed."); 277 mSawReceiveFailure = true; 278 } 279 280 return err; 281 } 282 283 char tmp[512]; 284 ssize_t n; 285 do { 286 n = recv(mSocket, tmp, sizeof(tmp), 0); 287 } while (n < 0 && errno == EINTR); 288 289 status_t err = OK; 290 291 if (n > 0) { 292 mInBuffer.append(tmp, n); 293 294 #if 0 295 ALOGI("in:"); 296 hexdump(tmp, n); 297 #endif 298 } else if (n < 0) { 299 err = -errno; 300 } else { 301 err = -ECONNRESET; 302 } 303 304 if (!mIsRTSPConnection) { 305 // TCP stream carrying 16-bit length-prefixed datagrams. 306 307 while (mInBuffer.size() >= 2) { 308 size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str()); 309 310 if (mInBuffer.size() < packetSize + 2) { 311 break; 312 } 313 314 sp<ABuffer> packet = new ABuffer(packetSize); 315 memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize); 316 317 sp<AMessage> notify = mNotify->dup(); 318 notify->setInt32("sessionID", mSessionID); 319 notify->setInt32("reason", kWhatDatagram); 320 notify->setBuffer("data", packet); 321 notify->post(); 322 323 mInBuffer.erase(0, packetSize + 2); 324 } 325 } else { 326 for (;;) { 327 size_t length; 328 329 if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') { 330 if (mInBuffer.size() < 4) { 331 break; 332 } 333 334 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2); 335 336 if (mInBuffer.size() < 4 + length) { 337 break; 338 } 339 340 sp<AMessage> notify = mNotify->dup(); 341 notify->setInt32("sessionID", mSessionID); 342 notify->setInt32("reason", kWhatBinaryData); 343 notify->setInt32("channel", mInBuffer.c_str()[1]); 344 345 sp<ABuffer> data = new ABuffer(length); 346 memcpy(data->data(), mInBuffer.c_str() + 4, length); 347 348 int64_t nowUs = ALooper::GetNowUs(); 349 data->meta()->setInt64("arrivalTimeUs", nowUs); 350 351 notify->setBuffer("data", data); 352 notify->post(); 353 354 mInBuffer.erase(0, 4 + length); 355 continue; 356 } 357 358 sp<ParsedMessage> msg = 359 ParsedMessage::Parse( 360 mInBuffer.c_str(), mInBuffer.size(), err != OK, &length); 361 362 if (msg == NULL) { 363 break; 364 } 365 366 sp<AMessage> notify = mNotify->dup(); 367 notify->setInt32("sessionID", mSessionID); 368 notify->setInt32("reason", kWhatData); 369 notify->setObject("data", msg); 370 notify->post(); 371 372 #if 1 373 // XXX The (old) dongle sends the wrong content length header on a 374 // SET_PARAMETER request that signals a "wfd_idr_request". 375 // (17 instead of 19). 376 const char *content = msg->getContent(); 377 if (content 378 && !memcmp(content, "wfd_idr_request\r\n", 17) 379 && length >= 19 380 && mInBuffer.c_str()[length] == '\r' 381 && mInBuffer.c_str()[length + 1] == '\n') { 382 length += 2; 383 } 384 #endif 385 386 mInBuffer.erase(0, length); 387 388 if (err != OK) { 389 break; 390 } 391 } 392 } 393 394 if (err != OK) { 395 notifyError(false /* send */, err, "Recv failed."); 396 mSawReceiveFailure = true; 397 } 398 399 return err; 400 } 401 402 status_t ANetworkSession::Session::writeMore() { 403 if (mState == DATAGRAM) { 404 CHECK(!mOutDatagrams.empty()); 405 406 status_t err; 407 do { 408 const sp<ABuffer> &datagram = *mOutDatagrams.begin(); 409 410 int n; 411 do { 412 n = send(mSocket, datagram->data(), datagram->size(), 0); 413 } while (n < 0 && errno == EINTR); 414 415 err = OK; 416 417 if (n > 0) { 418 mOutDatagrams.erase(mOutDatagrams.begin()); 419 } else if (n < 0) { 420 err = -errno; 421 } else if (n == 0) { 422 err = -ECONNRESET; 423 } 424 } while (err == OK && !mOutDatagrams.empty()); 425 426 if (err == -EAGAIN) { 427 err = OK; 428 } 429 430 if (err != OK) { 431 notifyError(true /* send */, err, "Send datagram failed."); 432 mSawSendFailure = true; 433 } 434 435 return err; 436 } 437 438 if (mState == CONNECTING) { 439 int err; 440 socklen_t optionLen = sizeof(err); 441 CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0); 442 CHECK_EQ(optionLen, (socklen_t)sizeof(err)); 443 444 if (err != 0) { 445 notifyError(kWhatError, -err, "Connection failed"); 446 mSawSendFailure = true; 447 448 return -err; 449 } 450 451 mState = CONNECTED; 452 notify(kWhatConnected); 453 454 return OK; 455 } 456 457 CHECK_EQ(mState, CONNECTED); 458 CHECK(!mOutBuffer.empty()); 459 460 ssize_t n; 461 do { 462 n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0); 463 } while (n < 0 && errno == EINTR); 464 465 status_t err = OK; 466 467 if (n > 0) { 468 #if 0 469 ALOGI("out:"); 470 hexdump(mOutBuffer.c_str(), n); 471 #endif 472 473 mOutBuffer.erase(0, n); 474 } else if (n < 0) { 475 err = -errno; 476 } else if (n == 0) { 477 err = -ECONNRESET; 478 } 479 480 if (err != OK) { 481 notifyError(true /* send */, err, "Send failed."); 482 mSawSendFailure = true; 483 } 484 485 return err; 486 } 487 488 status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) { 489 CHECK(mState == CONNECTED || mState == DATAGRAM); 490 491 if (mState == DATAGRAM) { 492 CHECK_GE(size, 0); 493 494 sp<ABuffer> datagram = new ABuffer(size); 495 memcpy(datagram->data(), data, size); 496 497 mOutDatagrams.push_back(datagram); 498 return OK; 499 } 500 501 if (mState == CONNECTED && !mIsRTSPConnection) { 502 CHECK_LE(size, 65535); 503 504 uint8_t prefix[2]; 505 prefix[0] = size >> 8; 506 prefix[1] = size & 0xff; 507 508 mOutBuffer.append((const char *)prefix, sizeof(prefix)); 509 } 510 511 mOutBuffer.append( 512 (const char *)data, 513 (size >= 0) ? size : strlen((const char *)data)); 514 515 return OK; 516 } 517 518 void ANetworkSession::Session::notifyError( 519 bool send, status_t err, const char *detail) { 520 sp<AMessage> msg = mNotify->dup(); 521 msg->setInt32("sessionID", mSessionID); 522 msg->setInt32("reason", kWhatError); 523 msg->setInt32("send", send); 524 msg->setInt32("err", err); 525 msg->setString("detail", detail); 526 msg->post(); 527 } 528 529 void ANetworkSession::Session::notify(NotificationReason reason) { 530 sp<AMessage> msg = mNotify->dup(); 531 msg->setInt32("sessionID", mSessionID); 532 msg->setInt32("reason", reason); 533 msg->post(); 534 } 535 536 //////////////////////////////////////////////////////////////////////////////// 537 538 ANetworkSession::ANetworkSession() 539 : mNextSessionID(1) { 540 mPipeFd[0] = mPipeFd[1] = -1; 541 } 542 543 ANetworkSession::~ANetworkSession() { 544 stop(); 545 } 546 547 status_t ANetworkSession::start() { 548 if (mThread != NULL) { 549 return INVALID_OPERATION; 550 } 551 552 int res = pipe(mPipeFd); 553 if (res != 0) { 554 mPipeFd[0] = mPipeFd[1] = -1; 555 return -errno; 556 } 557 558 mThread = new NetworkThread(this); 559 560 status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO); 561 562 if (err != OK) { 563 mThread.clear(); 564 565 close(mPipeFd[0]); 566 close(mPipeFd[1]); 567 mPipeFd[0] = mPipeFd[1] = -1; 568 569 return err; 570 } 571 572 return OK; 573 } 574 575 status_t ANetworkSession::stop() { 576 if (mThread == NULL) { 577 return INVALID_OPERATION; 578 } 579 580 mThread->requestExit(); 581 interrupt(); 582 mThread->requestExitAndWait(); 583 584 mThread.clear(); 585 586 close(mPipeFd[0]); 587 close(mPipeFd[1]); 588 mPipeFd[0] = mPipeFd[1] = -1; 589 590 return OK; 591 } 592 593 status_t ANetworkSession::createRTSPClient( 594 const char *host, unsigned port, const sp<AMessage> ¬ify, 595 int32_t *sessionID) { 596 return createClientOrServer( 597 kModeCreateRTSPClient, 598 NULL /* addr */, 599 0 /* port */, 600 host, 601 port, 602 notify, 603 sessionID); 604 } 605 606 status_t ANetworkSession::createRTSPServer( 607 const struct in_addr &addr, unsigned port, 608 const sp<AMessage> ¬ify, int32_t *sessionID) { 609 return createClientOrServer( 610 kModeCreateRTSPServer, 611 &addr, 612 port, 613 NULL /* remoteHost */, 614 0 /* remotePort */, 615 notify, 616 sessionID); 617 } 618 619 status_t ANetworkSession::createUDPSession( 620 unsigned localPort, const sp<AMessage> ¬ify, int32_t *sessionID) { 621 return createUDPSession(localPort, NULL, 0, notify, sessionID); 622 } 623 624 status_t ANetworkSession::createUDPSession( 625 unsigned localPort, 626 const char *remoteHost, 627 unsigned remotePort, 628 const sp<AMessage> ¬ify, 629 int32_t *sessionID) { 630 return createClientOrServer( 631 kModeCreateUDPSession, 632 NULL /* addr */, 633 localPort, 634 remoteHost, 635 remotePort, 636 notify, 637 sessionID); 638 } 639 640 status_t ANetworkSession::createTCPDatagramSession( 641 const struct in_addr &addr, unsigned port, 642 const sp<AMessage> ¬ify, int32_t *sessionID) { 643 return createClientOrServer( 644 kModeCreateTCPDatagramSessionPassive, 645 &addr, 646 port, 647 NULL /* remoteHost */, 648 0 /* remotePort */, 649 notify, 650 sessionID); 651 } 652 653 status_t ANetworkSession::createTCPDatagramSession( 654 unsigned localPort, 655 const char *remoteHost, 656 unsigned remotePort, 657 const sp<AMessage> ¬ify, 658 int32_t *sessionID) { 659 return createClientOrServer( 660 kModeCreateTCPDatagramSessionActive, 661 NULL /* addr */, 662 localPort, 663 remoteHost, 664 remotePort, 665 notify, 666 sessionID); 667 } 668 669 status_t ANetworkSession::destroySession(int32_t sessionID) { 670 Mutex::Autolock autoLock(mLock); 671 672 ssize_t index = mSessions.indexOfKey(sessionID); 673 674 if (index < 0) { 675 return -ENOENT; 676 } 677 678 mSessions.removeItemsAt(index); 679 680 interrupt(); 681 682 return OK; 683 } 684 685 // static 686 status_t ANetworkSession::MakeSocketNonBlocking(int s) { 687 int flags = fcntl(s, F_GETFL, 0); 688 if (flags < 0) { 689 flags = 0; 690 } 691 692 int res = fcntl(s, F_SETFL, flags | O_NONBLOCK); 693 if (res < 0) { 694 return -errno; 695 } 696 697 return OK; 698 } 699 700 status_t ANetworkSession::createClientOrServer( 701 Mode mode, 702 const struct in_addr *localAddr, 703 unsigned port, 704 const char *remoteHost, 705 unsigned remotePort, 706 const sp<AMessage> ¬ify, 707 int32_t *sessionID) { 708 Mutex::Autolock autoLock(mLock); 709 710 *sessionID = 0; 711 status_t err = OK; 712 int s, res; 713 sp<Session> session; 714 715 s = socket( 716 AF_INET, 717 (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM, 718 0); 719 720 if (s < 0) { 721 err = -errno; 722 goto bail; 723 } 724 725 if (mode == kModeCreateRTSPServer 726 || mode == kModeCreateTCPDatagramSessionPassive) { 727 const int yes = 1; 728 res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); 729 730 if (res < 0) { 731 err = -errno; 732 goto bail2; 733 } 734 } 735 736 if (mode == kModeCreateUDPSession) { 737 int size = 256 * 1024; 738 739 res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)); 740 741 if (res < 0) { 742 err = -errno; 743 goto bail2; 744 } 745 746 res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)); 747 748 if (res < 0) { 749 err = -errno; 750 goto bail2; 751 } 752 } 753 754 err = MakeSocketNonBlocking(s); 755 756 if (err != OK) { 757 goto bail2; 758 } 759 760 struct sockaddr_in addr; 761 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 762 addr.sin_family = AF_INET; 763 764 if (mode == kModeCreateRTSPClient 765 || mode == kModeCreateTCPDatagramSessionActive) { 766 struct hostent *ent= gethostbyname(remoteHost); 767 if (ent == NULL) { 768 err = -h_errno; 769 goto bail2; 770 } 771 772 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 773 addr.sin_port = htons(remotePort); 774 } else if (localAddr != NULL) { 775 addr.sin_addr = *localAddr; 776 addr.sin_port = htons(port); 777 } else { 778 addr.sin_addr.s_addr = htonl(INADDR_ANY); 779 addr.sin_port = htons(port); 780 } 781 782 if (mode == kModeCreateRTSPClient 783 || mode == kModeCreateTCPDatagramSessionActive) { 784 in_addr_t x = ntohl(addr.sin_addr.s_addr); 785 ALOGI("connecting socket %d to %d.%d.%d.%d:%d", 786 s, 787 (x >> 24), 788 (x >> 16) & 0xff, 789 (x >> 8) & 0xff, 790 x & 0xff, 791 ntohs(addr.sin_port)); 792 793 res = connect(s, (const struct sockaddr *)&addr, sizeof(addr)); 794 795 CHECK_LT(res, 0); 796 if (errno == EINPROGRESS) { 797 res = 0; 798 } 799 } else { 800 res = bind(s, (const struct sockaddr *)&addr, sizeof(addr)); 801 802 if (res == 0) { 803 if (mode == kModeCreateRTSPServer 804 || mode == kModeCreateTCPDatagramSessionPassive) { 805 res = listen(s, 4); 806 } else { 807 CHECK_EQ(mode, kModeCreateUDPSession); 808 809 if (remoteHost != NULL) { 810 struct sockaddr_in remoteAddr; 811 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); 812 remoteAddr.sin_family = AF_INET; 813 remoteAddr.sin_port = htons(remotePort); 814 815 struct hostent *ent= gethostbyname(remoteHost); 816 if (ent == NULL) { 817 err = -h_errno; 818 goto bail2; 819 } 820 821 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 822 823 res = connect( 824 s, 825 (const struct sockaddr *)&remoteAddr, 826 sizeof(remoteAddr)); 827 } 828 } 829 } 830 } 831 832 if (res < 0) { 833 err = -errno; 834 goto bail2; 835 } 836 837 Session::State state; 838 switch (mode) { 839 case kModeCreateRTSPClient: 840 state = Session::CONNECTING; 841 break; 842 843 case kModeCreateTCPDatagramSessionActive: 844 state = Session::CONNECTING; 845 break; 846 847 case kModeCreateTCPDatagramSessionPassive: 848 state = Session::LISTENING_TCP_DGRAMS; 849 break; 850 851 case kModeCreateRTSPServer: 852 state = Session::LISTENING_RTSP; 853 break; 854 855 default: 856 CHECK_EQ(mode, kModeCreateUDPSession); 857 state = Session::DATAGRAM; 858 break; 859 } 860 861 session = new Session( 862 mNextSessionID++, 863 state, 864 s, 865 notify); 866 867 if (mode == kModeCreateTCPDatagramSessionActive) { 868 session->setIsRTSPConnection(false); 869 } else if (mode == kModeCreateRTSPClient) { 870 session->setIsRTSPConnection(true); 871 } 872 873 mSessions.add(session->sessionID(), session); 874 875 interrupt(); 876 877 *sessionID = session->sessionID(); 878 879 goto bail; 880 881 bail2: 882 close(s); 883 s = -1; 884 885 bail: 886 return err; 887 } 888 889 status_t ANetworkSession::connectUDPSession( 890 int32_t sessionID, const char *remoteHost, unsigned remotePort) { 891 Mutex::Autolock autoLock(mLock); 892 893 ssize_t index = mSessions.indexOfKey(sessionID); 894 895 if (index < 0) { 896 return -ENOENT; 897 } 898 899 const sp<Session> session = mSessions.valueAt(index); 900 int s = session->socket(); 901 902 struct sockaddr_in remoteAddr; 903 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); 904 remoteAddr.sin_family = AF_INET; 905 remoteAddr.sin_port = htons(remotePort); 906 907 status_t err = OK; 908 struct hostent *ent = gethostbyname(remoteHost); 909 if (ent == NULL) { 910 err = -h_errno; 911 } else { 912 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 913 914 int res = connect( 915 s, 916 (const struct sockaddr *)&remoteAddr, 917 sizeof(remoteAddr)); 918 919 if (res < 0) { 920 err = -errno; 921 } 922 } 923 924 return err; 925 } 926 927 status_t ANetworkSession::sendRequest( 928 int32_t sessionID, const void *data, ssize_t size) { 929 Mutex::Autolock autoLock(mLock); 930 931 ssize_t index = mSessions.indexOfKey(sessionID); 932 933 if (index < 0) { 934 return -ENOENT; 935 } 936 937 const sp<Session> session = mSessions.valueAt(index); 938 939 status_t err = session->sendRequest(data, size); 940 941 interrupt(); 942 943 return err; 944 } 945 946 void ANetworkSession::interrupt() { 947 static const char dummy = 0; 948 949 ssize_t n; 950 do { 951 n = write(mPipeFd[1], &dummy, 1); 952 } while (n < 0 && errno == EINTR); 953 954 if (n < 0) { 955 ALOGW("Error writing to pipe (%s)", strerror(errno)); 956 } 957 } 958 959 void ANetworkSession::threadLoop() { 960 fd_set rs, ws; 961 FD_ZERO(&rs); 962 FD_ZERO(&ws); 963 964 FD_SET(mPipeFd[0], &rs); 965 int maxFd = mPipeFd[0]; 966 967 { 968 Mutex::Autolock autoLock(mLock); 969 970 for (size_t i = 0; i < mSessions.size(); ++i) { 971 const sp<Session> &session = mSessions.valueAt(i); 972 973 int s = session->socket(); 974 975 if (s < 0) { 976 continue; 977 } 978 979 if (session->wantsToRead()) { 980 FD_SET(s, &rs); 981 if (s > maxFd) { 982 maxFd = s; 983 } 984 } 985 986 if (session->wantsToWrite()) { 987 FD_SET(s, &ws); 988 if (s > maxFd) { 989 maxFd = s; 990 } 991 } 992 } 993 } 994 995 int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */); 996 997 if (res == 0) { 998 return; 999 } 1000 1001 if (res < 0) { 1002 if (errno == EINTR) { 1003 return; 1004 } 1005 1006 ALOGE("select failed w/ error %d (%s)", errno, strerror(errno)); 1007 return; 1008 } 1009 1010 if (FD_ISSET(mPipeFd[0], &rs)) { 1011 char c; 1012 ssize_t n; 1013 do { 1014 n = read(mPipeFd[0], &c, 1); 1015 } while (n < 0 && errno == EINTR); 1016 1017 if (n < 0) { 1018 ALOGW("Error reading from pipe (%s)", strerror(errno)); 1019 } 1020 1021 --res; 1022 } 1023 1024 { 1025 Mutex::Autolock autoLock(mLock); 1026 1027 List<sp<Session> > sessionsToAdd; 1028 1029 for (size_t i = mSessions.size(); res > 0 && i-- > 0;) { 1030 const sp<Session> &session = mSessions.valueAt(i); 1031 1032 int s = session->socket(); 1033 1034 if (s < 0) { 1035 continue; 1036 } 1037 1038 if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) { 1039 --res; 1040 } 1041 1042 if (FD_ISSET(s, &rs)) { 1043 if (session->isRTSPServer() || session->isTCPDatagramServer()) { 1044 struct sockaddr_in remoteAddr; 1045 socklen_t remoteAddrLen = sizeof(remoteAddr); 1046 1047 int clientSocket = accept( 1048 s, (struct sockaddr *)&remoteAddr, &remoteAddrLen); 1049 1050 if (clientSocket >= 0) { 1051 status_t err = MakeSocketNonBlocking(clientSocket); 1052 1053 if (err != OK) { 1054 ALOGE("Unable to make client socket non blocking, " 1055 "failed w/ error %d (%s)", 1056 err, strerror(-err)); 1057 1058 close(clientSocket); 1059 clientSocket = -1; 1060 } else { 1061 in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr); 1062 1063 ALOGI("incoming connection from %d.%d.%d.%d:%d " 1064 "(socket %d)", 1065 (addr >> 24), 1066 (addr >> 16) & 0xff, 1067 (addr >> 8) & 0xff, 1068 addr & 0xff, 1069 ntohs(remoteAddr.sin_port), 1070 clientSocket); 1071 1072 sp<Session> clientSession = 1073 // using socket sd as sessionID 1074 new Session( 1075 mNextSessionID++, 1076 Session::CONNECTED, 1077 clientSocket, 1078 session->getNotificationMessage()); 1079 1080 clientSession->setIsRTSPConnection( 1081 session->isRTSPServer()); 1082 1083 sessionsToAdd.push_back(clientSession); 1084 } 1085 } else { 1086 ALOGE("accept returned error %d (%s)", 1087 errno, strerror(errno)); 1088 } 1089 } else { 1090 status_t err = session->readMore(); 1091 if (err != OK) { 1092 ALOGE("readMore on socket %d failed w/ error %d (%s)", 1093 s, err, strerror(-err)); 1094 } 1095 } 1096 } 1097 1098 if (FD_ISSET(s, &ws)) { 1099 status_t err = session->writeMore(); 1100 if (err != OK) { 1101 ALOGE("writeMore on socket %d failed w/ error %d (%s)", 1102 s, err, strerror(-err)); 1103 } 1104 } 1105 } 1106 1107 while (!sessionsToAdd.empty()) { 1108 sp<Session> session = *sessionsToAdd.begin(); 1109 sessionsToAdd.erase(sessionsToAdd.begin()); 1110 1111 mSessions.add(session->sessionID(), session); 1112 1113 ALOGI("added clientSession %d", session->sessionID()); 1114 } 1115 } 1116 } 1117 1118 } // namespace android 1119 1120