1 /* 2 * Copyright 2012, The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "NetworkSession" 19 #include <utils/Log.h> 20 21 #include "ANetworkSession.h" 22 #include "ParsedMessage.h" 23 24 #include <arpa/inet.h> 25 #include <fcntl.h> 26 #include <linux/tcp.h> 27 #include <net/if.h> 28 #include <netdb.h> 29 #include <netinet/in.h> 30 #include <sys/ioctl.h> 31 #include <sys/socket.h> 32 33 #include <media/stagefright/foundation/ABuffer.h> 34 #include <media/stagefright/foundation/ADebug.h> 35 #include <media/stagefright/foundation/AMessage.h> 36 #include <media/stagefright/foundation/hexdump.h> 37 #include <media/stagefright/Utils.h> 38 39 namespace android { 40 41 static const size_t kMaxUDPSize = 1500; 42 static const int32_t kMaxUDPRetries = 200; 43 44 struct ANetworkSession::NetworkThread : public Thread { 45 NetworkThread(ANetworkSession *session); 46 47 protected: 48 virtual ~NetworkThread(); 49 50 private: 51 ANetworkSession *mSession; 52 53 virtual bool threadLoop(); 54 55 DISALLOW_EVIL_CONSTRUCTORS(NetworkThread); 56 }; 57 58 struct ANetworkSession::Session : public RefBase { 59 enum State { 60 CONNECTING, 61 CONNECTED, 62 LISTENING_RTSP, 63 LISTENING_TCP_DGRAMS, 64 DATAGRAM, 65 }; 66 67 Session(int32_t sessionID, 68 State state, 69 int s, 70 const sp<AMessage> ¬ify); 71 72 int32_t sessionID() const; 73 int socket() const; 74 sp<AMessage> getNotificationMessage() const; 75 76 bool isRTSPServer() const; 77 bool isTCPDatagramServer() const; 78 79 bool wantsToRead(); 80 bool wantsToWrite(); 81 82 status_t readMore(); 83 status_t writeMore(); 84 85 status_t sendRequest( 86 const void *data, ssize_t size, bool timeValid, int64_t timeUs); 87 88 void setIsRTSPConnection(bool yesno); 89 90 protected: 91 virtual ~Session(); 92 93 private: 94 enum { 95 FRAGMENT_FLAG_TIME_VALID = 1, 96 }; 97 struct Fragment { 98 uint32_t mFlags; 99 int64_t mTimeUs; 100 sp<ABuffer> mBuffer; 101 }; 102 103 int32_t mSessionID; 104 State mState; 105 bool mIsRTSPConnection; 106 int mSocket; 107 sp<AMessage> mNotify; 108 bool mSawReceiveFailure, mSawSendFailure; 109 int32_t mUDPRetries; 110 111 List<Fragment> mOutFragments; 112 113 AString mInBuffer; 114 115 int64_t mLastStallReportUs; 116 117 void notifyError(bool send, status_t err, const char *detail); 118 void notify(NotificationReason reason); 119 120 void dumpFragmentStats(const Fragment &frag); 121 122 DISALLOW_EVIL_CONSTRUCTORS(Session); 123 }; 124 //////////////////////////////////////////////////////////////////////////////// 125 126 ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session) 127 : mSession(session) { 128 } 129 130 ANetworkSession::NetworkThread::~NetworkThread() { 131 } 132 133 bool ANetworkSession::NetworkThread::threadLoop() { 134 mSession->threadLoop(); 135 136 return true; 137 } 138 139 //////////////////////////////////////////////////////////////////////////////// 140 141 ANetworkSession::Session::Session( 142 int32_t sessionID, 143 State state, 144 int s, 145 const sp<AMessage> ¬ify) 146 : mSessionID(sessionID), 147 mState(state), 148 mIsRTSPConnection(false), 149 mSocket(s), 150 mNotify(notify), 151 mSawReceiveFailure(false), 152 mSawSendFailure(false), 153 mUDPRetries(kMaxUDPRetries), 154 mLastStallReportUs(-1ll) { 155 if (mState == CONNECTED) { 156 struct sockaddr_in localAddr; 157 socklen_t localAddrLen = sizeof(localAddr); 158 159 int res = getsockname( 160 mSocket, (struct sockaddr *)&localAddr, &localAddrLen); 161 CHECK_GE(res, 0); 162 163 struct sockaddr_in remoteAddr; 164 socklen_t remoteAddrLen = sizeof(remoteAddr); 165 166 res = getpeername( 167 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen); 168 CHECK_GE(res, 0); 169 170 in_addr_t addr = ntohl(localAddr.sin_addr.s_addr); 171 AString localAddrString = StringPrintf( 172 "%d.%d.%d.%d", 173 (addr >> 24), 174 (addr >> 16) & 0xff, 175 (addr >> 8) & 0xff, 176 addr & 0xff); 177 178 addr = ntohl(remoteAddr.sin_addr.s_addr); 179 AString remoteAddrString = StringPrintf( 180 "%d.%d.%d.%d", 181 (addr >> 24), 182 (addr >> 16) & 0xff, 183 (addr >> 8) & 0xff, 184 addr & 0xff); 185 186 sp<AMessage> msg = mNotify->dup(); 187 msg->setInt32("sessionID", mSessionID); 188 msg->setInt32("reason", kWhatClientConnected); 189 msg->setString("server-ip", localAddrString.c_str()); 190 msg->setInt32("server-port", ntohs(localAddr.sin_port)); 191 msg->setString("client-ip", remoteAddrString.c_str()); 192 msg->setInt32("client-port", ntohs(remoteAddr.sin_port)); 193 msg->post(); 194 } 195 } 196 197 ANetworkSession::Session::~Session() { 198 ALOGV("Session %d gone", mSessionID); 199 200 close(mSocket); 201 mSocket = -1; 202 } 203 204 int32_t ANetworkSession::Session::sessionID() const { 205 return mSessionID; 206 } 207 208 int ANetworkSession::Session::socket() const { 209 return mSocket; 210 } 211 212 void ANetworkSession::Session::setIsRTSPConnection(bool yesno) { 213 mIsRTSPConnection = yesno; 214 } 215 216 sp<AMessage> ANetworkSession::Session::getNotificationMessage() const { 217 return mNotify; 218 } 219 220 bool ANetworkSession::Session::isRTSPServer() const { 221 return mState == LISTENING_RTSP; 222 } 223 224 bool ANetworkSession::Session::isTCPDatagramServer() const { 225 return mState == LISTENING_TCP_DGRAMS; 226 } 227 228 bool ANetworkSession::Session::wantsToRead() { 229 return !mSawReceiveFailure && mState != CONNECTING; 230 } 231 232 bool ANetworkSession::Session::wantsToWrite() { 233 return !mSawSendFailure 234 && (mState == CONNECTING 235 || (mState == CONNECTED && !mOutFragments.empty()) 236 || (mState == DATAGRAM && !mOutFragments.empty())); 237 } 238 239 status_t ANetworkSession::Session::readMore() { 240 if (mState == DATAGRAM) { 241 status_t err; 242 do { 243 sp<ABuffer> buf = new ABuffer(kMaxUDPSize); 244 245 struct sockaddr_in remoteAddr; 246 socklen_t remoteAddrLen = sizeof(remoteAddr); 247 248 ssize_t n; 249 do { 250 n = recvfrom( 251 mSocket, buf->data(), buf->capacity(), 0, 252 (struct sockaddr *)&remoteAddr, &remoteAddrLen); 253 } while (n < 0 && errno == EINTR); 254 255 err = OK; 256 if (n < 0) { 257 err = -errno; 258 } else if (n == 0) { 259 err = -ECONNRESET; 260 } else { 261 buf->setRange(0, n); 262 263 int64_t nowUs = ALooper::GetNowUs(); 264 buf->meta()->setInt64("arrivalTimeUs", nowUs); 265 266 sp<AMessage> notify = mNotify->dup(); 267 notify->setInt32("sessionID", mSessionID); 268 notify->setInt32("reason", kWhatDatagram); 269 270 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr); 271 notify->setString( 272 "fromAddr", 273 StringPrintf( 274 "%u.%u.%u.%u", 275 ip >> 24, 276 (ip >> 16) & 0xff, 277 (ip >> 8) & 0xff, 278 ip & 0xff).c_str()); 279 280 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port)); 281 282 notify->setBuffer("data", buf); 283 notify->post(); 284 } 285 } while (err == OK); 286 287 if (err == -EAGAIN) { 288 err = OK; 289 } 290 291 if (err != OK) { 292 if (!mUDPRetries) { 293 notifyError(false /* send */, err, "Recvfrom failed."); 294 mSawReceiveFailure = true; 295 } else { 296 mUDPRetries--; 297 ALOGE("Recvfrom failed, %d/%d retries left", 298 mUDPRetries, kMaxUDPRetries); 299 err = OK; 300 } 301 } else { 302 mUDPRetries = kMaxUDPRetries; 303 } 304 305 return err; 306 } 307 308 char tmp[512]; 309 ssize_t n; 310 do { 311 n = recv(mSocket, tmp, sizeof(tmp), 0); 312 } while (n < 0 && errno == EINTR); 313 314 status_t err = OK; 315 316 if (n > 0) { 317 mInBuffer.append(tmp, n); 318 319 #if 0 320 ALOGI("in:"); 321 hexdump(tmp, n); 322 #endif 323 } else if (n < 0) { 324 err = -errno; 325 } else { 326 err = -ECONNRESET; 327 } 328 329 if (!mIsRTSPConnection) { 330 // TCP stream carrying 16-bit length-prefixed datagrams. 331 332 while (mInBuffer.size() >= 2) { 333 size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str()); 334 335 if (mInBuffer.size() < packetSize + 2) { 336 break; 337 } 338 339 sp<ABuffer> packet = new ABuffer(packetSize); 340 memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize); 341 342 int64_t nowUs = ALooper::GetNowUs(); 343 packet->meta()->setInt64("arrivalTimeUs", nowUs); 344 345 sp<AMessage> notify = mNotify->dup(); 346 notify->setInt32("sessionID", mSessionID); 347 notify->setInt32("reason", kWhatDatagram); 348 notify->setBuffer("data", packet); 349 notify->post(); 350 351 mInBuffer.erase(0, packetSize + 2); 352 } 353 } else { 354 for (;;) { 355 size_t length; 356 357 if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') { 358 if (mInBuffer.size() < 4) { 359 break; 360 } 361 362 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2); 363 364 if (mInBuffer.size() < 4 + length) { 365 break; 366 } 367 368 sp<AMessage> notify = mNotify->dup(); 369 notify->setInt32("sessionID", mSessionID); 370 notify->setInt32("reason", kWhatBinaryData); 371 notify->setInt32("channel", mInBuffer.c_str()[1]); 372 373 sp<ABuffer> data = new ABuffer(length); 374 memcpy(data->data(), mInBuffer.c_str() + 4, length); 375 376 int64_t nowUs = ALooper::GetNowUs(); 377 data->meta()->setInt64("arrivalTimeUs", nowUs); 378 379 notify->setBuffer("data", data); 380 notify->post(); 381 382 mInBuffer.erase(0, 4 + length); 383 continue; 384 } 385 386 sp<ParsedMessage> msg = 387 ParsedMessage::Parse( 388 mInBuffer.c_str(), mInBuffer.size(), err != OK, &length); 389 390 if (msg == NULL) { 391 break; 392 } 393 394 sp<AMessage> notify = mNotify->dup(); 395 notify->setInt32("sessionID", mSessionID); 396 notify->setInt32("reason", kWhatData); 397 notify->setObject("data", msg); 398 notify->post(); 399 400 #if 1 401 // XXX The (old) dongle sends the wrong content length header on a 402 // SET_PARAMETER request that signals a "wfd_idr_request". 403 // (17 instead of 19). 404 const char *content = msg->getContent(); 405 if (content 406 && !memcmp(content, "wfd_idr_request\r\n", 17) 407 && length >= 19 408 && mInBuffer.c_str()[length] == '\r' 409 && mInBuffer.c_str()[length + 1] == '\n') { 410 length += 2; 411 } 412 #endif 413 414 mInBuffer.erase(0, length); 415 416 if (err != OK) { 417 break; 418 } 419 } 420 } 421 422 if (err != OK) { 423 notifyError(false /* send */, err, "Recv failed."); 424 mSawReceiveFailure = true; 425 } 426 427 return err; 428 } 429 430 void ANetworkSession::Session::dumpFragmentStats(const Fragment &frag) { 431 #if 0 432 int64_t nowUs = ALooper::GetNowUs(); 433 int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll; 434 435 static const int64_t kMinDelayMs = 0; 436 static const int64_t kMaxDelayMs = 300; 437 438 const char *kPattern = "########################################"; 439 size_t kPatternSize = strlen(kPattern); 440 441 int n = (kPatternSize * (delayMs - kMinDelayMs)) 442 / (kMaxDelayMs - kMinDelayMs); 443 444 if (n < 0) { 445 n = 0; 446 } else if ((size_t)n > kPatternSize) { 447 n = kPatternSize; 448 } 449 450 ALOGI("[%lld]: (%4lld ms) %s\n", 451 frag.mTimeUs / 1000, 452 delayMs, 453 kPattern + kPatternSize - n); 454 #endif 455 } 456 457 status_t ANetworkSession::Session::writeMore() { 458 if (mState == DATAGRAM) { 459 CHECK(!mOutFragments.empty()); 460 461 status_t err; 462 do { 463 const Fragment &frag = *mOutFragments.begin(); 464 const sp<ABuffer> &datagram = frag.mBuffer; 465 466 int n; 467 do { 468 n = send(mSocket, datagram->data(), datagram->size(), 0); 469 } while (n < 0 && errno == EINTR); 470 471 err = OK; 472 473 if (n > 0) { 474 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) { 475 dumpFragmentStats(frag); 476 } 477 478 mOutFragments.erase(mOutFragments.begin()); 479 } else if (n < 0) { 480 err = -errno; 481 } else if (n == 0) { 482 err = -ECONNRESET; 483 } 484 } while (err == OK && !mOutFragments.empty()); 485 486 if (err == -EAGAIN) { 487 if (!mOutFragments.empty()) { 488 ALOGI("%d datagrams remain queued.", mOutFragments.size()); 489 } 490 err = OK; 491 } 492 493 if (err != OK) { 494 if (!mUDPRetries) { 495 notifyError(true /* send */, err, "Send datagram failed."); 496 mSawSendFailure = true; 497 } else { 498 mUDPRetries--; 499 ALOGE("Send datagram failed, %d/%d retries left", 500 mUDPRetries, kMaxUDPRetries); 501 err = OK; 502 } 503 } else { 504 mUDPRetries = kMaxUDPRetries; 505 } 506 507 return err; 508 } 509 510 if (mState == CONNECTING) { 511 int err; 512 socklen_t optionLen = sizeof(err); 513 CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0); 514 CHECK_EQ(optionLen, (socklen_t)sizeof(err)); 515 516 if (err != 0) { 517 notifyError(kWhatError, -err, "Connection failed"); 518 mSawSendFailure = true; 519 520 return -err; 521 } 522 523 mState = CONNECTED; 524 notify(kWhatConnected); 525 526 return OK; 527 } 528 529 CHECK_EQ(mState, CONNECTED); 530 CHECK(!mOutFragments.empty()); 531 532 ssize_t n; 533 while (!mOutFragments.empty()) { 534 const Fragment &frag = *mOutFragments.begin(); 535 536 do { 537 n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0); 538 } while (n < 0 && errno == EINTR); 539 540 if (n <= 0) { 541 break; 542 } 543 544 frag.mBuffer->setRange( 545 frag.mBuffer->offset() + n, frag.mBuffer->size() - n); 546 547 if (frag.mBuffer->size() > 0) { 548 break; 549 } 550 551 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) { 552 dumpFragmentStats(frag); 553 } 554 555 mOutFragments.erase(mOutFragments.begin()); 556 } 557 558 status_t err = OK; 559 560 if (n < 0) { 561 err = -errno; 562 } else if (n == 0) { 563 err = -ECONNRESET; 564 } 565 566 if (err != OK) { 567 notifyError(true /* send */, err, "Send failed."); 568 mSawSendFailure = true; 569 } 570 571 #if 0 572 int numBytesQueued; 573 int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued); 574 if (res == 0 && numBytesQueued > 50 * 1024) { 575 if (numBytesQueued > 409600) { 576 ALOGW("!!! numBytesQueued = %d", numBytesQueued); 577 } 578 579 int64_t nowUs = ALooper::GetNowUs(); 580 581 if (mLastStallReportUs < 0ll 582 || nowUs > mLastStallReportUs + 100000ll) { 583 sp<AMessage> msg = mNotify->dup(); 584 msg->setInt32("sessionID", mSessionID); 585 msg->setInt32("reason", kWhatNetworkStall); 586 msg->setSize("numBytesQueued", numBytesQueued); 587 msg->post(); 588 589 mLastStallReportUs = nowUs; 590 } 591 } 592 #endif 593 594 return err; 595 } 596 597 status_t ANetworkSession::Session::sendRequest( 598 const void *data, ssize_t size, bool timeValid, int64_t timeUs) { 599 CHECK(mState == CONNECTED || mState == DATAGRAM); 600 601 if (size < 0) { 602 size = strlen((const char *)data); 603 } 604 605 if (size == 0) { 606 return OK; 607 } 608 609 sp<ABuffer> buffer; 610 611 if (mState == CONNECTED && !mIsRTSPConnection) { 612 CHECK_LE(size, 65535); 613 614 buffer = new ABuffer(size + 2); 615 buffer->data()[0] = size >> 8; 616 buffer->data()[1] = size & 0xff; 617 memcpy(buffer->data() + 2, data, size); 618 } else { 619 buffer = new ABuffer(size); 620 memcpy(buffer->data(), data, size); 621 } 622 623 Fragment frag; 624 625 frag.mFlags = 0; 626 if (timeValid) { 627 frag.mFlags = FRAGMENT_FLAG_TIME_VALID; 628 frag.mTimeUs = timeUs; 629 } 630 631 frag.mBuffer = buffer; 632 633 mOutFragments.push_back(frag); 634 635 return OK; 636 } 637 638 void ANetworkSession::Session::notifyError( 639 bool send, status_t err, const char *detail) { 640 sp<AMessage> msg = mNotify->dup(); 641 msg->setInt32("sessionID", mSessionID); 642 msg->setInt32("reason", kWhatError); 643 msg->setInt32("send", send); 644 msg->setInt32("err", err); 645 msg->setString("detail", detail); 646 msg->post(); 647 } 648 649 void ANetworkSession::Session::notify(NotificationReason reason) { 650 sp<AMessage> msg = mNotify->dup(); 651 msg->setInt32("sessionID", mSessionID); 652 msg->setInt32("reason", reason); 653 msg->post(); 654 } 655 656 //////////////////////////////////////////////////////////////////////////////// 657 658 ANetworkSession::ANetworkSession() 659 : mNextSessionID(1) { 660 mPipeFd[0] = mPipeFd[1] = -1; 661 } 662 663 ANetworkSession::~ANetworkSession() { 664 stop(); 665 } 666 667 status_t ANetworkSession::start() { 668 if (mThread != NULL) { 669 return INVALID_OPERATION; 670 } 671 672 int res = pipe(mPipeFd); 673 if (res != 0) { 674 mPipeFd[0] = mPipeFd[1] = -1; 675 return -errno; 676 } 677 678 mThread = new NetworkThread(this); 679 680 status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO); 681 682 if (err != OK) { 683 mThread.clear(); 684 685 close(mPipeFd[0]); 686 close(mPipeFd[1]); 687 mPipeFd[0] = mPipeFd[1] = -1; 688 689 return err; 690 } 691 692 return OK; 693 } 694 695 status_t ANetworkSession::stop() { 696 if (mThread == NULL) { 697 return INVALID_OPERATION; 698 } 699 700 mThread->requestExit(); 701 interrupt(); 702 mThread->requestExitAndWait(); 703 704 mThread.clear(); 705 706 close(mPipeFd[0]); 707 close(mPipeFd[1]); 708 mPipeFd[0] = mPipeFd[1] = -1; 709 710 return OK; 711 } 712 713 status_t ANetworkSession::createRTSPClient( 714 const char *host, unsigned port, const sp<AMessage> ¬ify, 715 int32_t *sessionID) { 716 return createClientOrServer( 717 kModeCreateRTSPClient, 718 NULL /* addr */, 719 0 /* port */, 720 host, 721 port, 722 notify, 723 sessionID); 724 } 725 726 status_t ANetworkSession::createRTSPServer( 727 const struct in_addr &addr, unsigned port, 728 const sp<AMessage> ¬ify, int32_t *sessionID) { 729 return createClientOrServer( 730 kModeCreateRTSPServer, 731 &addr, 732 port, 733 NULL /* remoteHost */, 734 0 /* remotePort */, 735 notify, 736 sessionID); 737 } 738 739 status_t ANetworkSession::createUDPSession( 740 unsigned localPort, const sp<AMessage> ¬ify, int32_t *sessionID) { 741 return createUDPSession(localPort, NULL, 0, notify, sessionID); 742 } 743 744 status_t ANetworkSession::createUDPSession( 745 unsigned localPort, 746 const char *remoteHost, 747 unsigned remotePort, 748 const sp<AMessage> ¬ify, 749 int32_t *sessionID) { 750 return createClientOrServer( 751 kModeCreateUDPSession, 752 NULL /* addr */, 753 localPort, 754 remoteHost, 755 remotePort, 756 notify, 757 sessionID); 758 } 759 760 status_t ANetworkSession::createTCPDatagramSession( 761 const struct in_addr &addr, unsigned port, 762 const sp<AMessage> ¬ify, int32_t *sessionID) { 763 return createClientOrServer( 764 kModeCreateTCPDatagramSessionPassive, 765 &addr, 766 port, 767 NULL /* remoteHost */, 768 0 /* remotePort */, 769 notify, 770 sessionID); 771 } 772 773 status_t ANetworkSession::createTCPDatagramSession( 774 unsigned localPort, 775 const char *remoteHost, 776 unsigned remotePort, 777 const sp<AMessage> ¬ify, 778 int32_t *sessionID) { 779 return createClientOrServer( 780 kModeCreateTCPDatagramSessionActive, 781 NULL /* addr */, 782 localPort, 783 remoteHost, 784 remotePort, 785 notify, 786 sessionID); 787 } 788 789 status_t ANetworkSession::destroySession(int32_t sessionID) { 790 Mutex::Autolock autoLock(mLock); 791 792 ssize_t index = mSessions.indexOfKey(sessionID); 793 794 if (index < 0) { 795 return -ENOENT; 796 } 797 798 mSessions.removeItemsAt(index); 799 800 interrupt(); 801 802 return OK; 803 } 804 805 // static 806 status_t ANetworkSession::MakeSocketNonBlocking(int s) { 807 int flags = fcntl(s, F_GETFL, 0); 808 if (flags < 0) { 809 flags = 0; 810 } 811 812 int res = fcntl(s, F_SETFL, flags | O_NONBLOCK); 813 if (res < 0) { 814 return -errno; 815 } 816 817 return OK; 818 } 819 820 status_t ANetworkSession::createClientOrServer( 821 Mode mode, 822 const struct in_addr *localAddr, 823 unsigned port, 824 const char *remoteHost, 825 unsigned remotePort, 826 const sp<AMessage> ¬ify, 827 int32_t *sessionID) { 828 Mutex::Autolock autoLock(mLock); 829 830 *sessionID = 0; 831 status_t err = OK; 832 int s, res; 833 sp<Session> session; 834 835 s = socket( 836 AF_INET, 837 (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM, 838 0); 839 840 if (s < 0) { 841 err = -errno; 842 goto bail; 843 } 844 845 if (mode == kModeCreateRTSPServer 846 || mode == kModeCreateTCPDatagramSessionPassive) { 847 const int yes = 1; 848 res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); 849 850 if (res < 0) { 851 err = -errno; 852 goto bail2; 853 } 854 } 855 856 if (mode == kModeCreateUDPSession) { 857 int size = 256 * 1024; 858 859 res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)); 860 861 if (res < 0) { 862 err = -errno; 863 goto bail2; 864 } 865 866 res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)); 867 868 if (res < 0) { 869 err = -errno; 870 goto bail2; 871 } 872 } else if (mode == kModeCreateTCPDatagramSessionActive) { 873 int flag = 1; 874 res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)); 875 876 if (res < 0) { 877 err = -errno; 878 goto bail2; 879 } 880 881 int tos = 224; // VOICE 882 res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)); 883 884 if (res < 0) { 885 err = -errno; 886 goto bail2; 887 } 888 } 889 890 err = MakeSocketNonBlocking(s); 891 892 if (err != OK) { 893 goto bail2; 894 } 895 896 struct sockaddr_in addr; 897 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 898 addr.sin_family = AF_INET; 899 900 if (mode == kModeCreateRTSPClient 901 || mode == kModeCreateTCPDatagramSessionActive) { 902 struct hostent *ent= gethostbyname(remoteHost); 903 if (ent == NULL) { 904 err = -h_errno; 905 goto bail2; 906 } 907 908 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 909 addr.sin_port = htons(remotePort); 910 } else if (localAddr != NULL) { 911 addr.sin_addr = *localAddr; 912 addr.sin_port = htons(port); 913 } else { 914 addr.sin_addr.s_addr = htonl(INADDR_ANY); 915 addr.sin_port = htons(port); 916 } 917 918 if (mode == kModeCreateRTSPClient 919 || mode == kModeCreateTCPDatagramSessionActive) { 920 in_addr_t x = ntohl(addr.sin_addr.s_addr); 921 ALOGI("connecting socket %d to %d.%d.%d.%d:%d", 922 s, 923 (x >> 24), 924 (x >> 16) & 0xff, 925 (x >> 8) & 0xff, 926 x & 0xff, 927 ntohs(addr.sin_port)); 928 929 res = connect(s, (const struct sockaddr *)&addr, sizeof(addr)); 930 931 CHECK_LT(res, 0); 932 if (errno == EINPROGRESS) { 933 res = 0; 934 } 935 } else { 936 res = bind(s, (const struct sockaddr *)&addr, sizeof(addr)); 937 938 if (res == 0) { 939 if (mode == kModeCreateRTSPServer 940 || mode == kModeCreateTCPDatagramSessionPassive) { 941 res = listen(s, 4); 942 } else { 943 CHECK_EQ(mode, kModeCreateUDPSession); 944 945 if (remoteHost != NULL) { 946 struct sockaddr_in remoteAddr; 947 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); 948 remoteAddr.sin_family = AF_INET; 949 remoteAddr.sin_port = htons(remotePort); 950 951 struct hostent *ent= gethostbyname(remoteHost); 952 if (ent == NULL) { 953 err = -h_errno; 954 goto bail2; 955 } 956 957 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 958 959 res = connect( 960 s, 961 (const struct sockaddr *)&remoteAddr, 962 sizeof(remoteAddr)); 963 } 964 } 965 } 966 } 967 968 if (res < 0) { 969 err = -errno; 970 goto bail2; 971 } 972 973 Session::State state; 974 switch (mode) { 975 case kModeCreateRTSPClient: 976 state = Session::CONNECTING; 977 break; 978 979 case kModeCreateTCPDatagramSessionActive: 980 state = Session::CONNECTING; 981 break; 982 983 case kModeCreateTCPDatagramSessionPassive: 984 state = Session::LISTENING_TCP_DGRAMS; 985 break; 986 987 case kModeCreateRTSPServer: 988 state = Session::LISTENING_RTSP; 989 break; 990 991 default: 992 CHECK_EQ(mode, kModeCreateUDPSession); 993 state = Session::DATAGRAM; 994 break; 995 } 996 997 session = new Session( 998 mNextSessionID++, 999 state, 1000 s, 1001 notify); 1002 1003 if (mode == kModeCreateTCPDatagramSessionActive) { 1004 session->setIsRTSPConnection(false); 1005 } else if (mode == kModeCreateRTSPClient) { 1006 session->setIsRTSPConnection(true); 1007 } 1008 1009 mSessions.add(session->sessionID(), session); 1010 1011 interrupt(); 1012 1013 *sessionID = session->sessionID(); 1014 1015 goto bail; 1016 1017 bail2: 1018 close(s); 1019 s = -1; 1020 1021 bail: 1022 return err; 1023 } 1024 1025 status_t ANetworkSession::connectUDPSession( 1026 int32_t sessionID, const char *remoteHost, unsigned remotePort) { 1027 Mutex::Autolock autoLock(mLock); 1028 1029 ssize_t index = mSessions.indexOfKey(sessionID); 1030 1031 if (index < 0) { 1032 return -ENOENT; 1033 } 1034 1035 const sp<Session> session = mSessions.valueAt(index); 1036 int s = session->socket(); 1037 1038 struct sockaddr_in remoteAddr; 1039 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); 1040 remoteAddr.sin_family = AF_INET; 1041 remoteAddr.sin_port = htons(remotePort); 1042 1043 status_t err = OK; 1044 struct hostent *ent = gethostbyname(remoteHost); 1045 if (ent == NULL) { 1046 err = -h_errno; 1047 } else { 1048 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 1049 1050 int res = connect( 1051 s, 1052 (const struct sockaddr *)&remoteAddr, 1053 sizeof(remoteAddr)); 1054 1055 if (res < 0) { 1056 err = -errno; 1057 } 1058 } 1059 1060 return err; 1061 } 1062 1063 status_t ANetworkSession::sendRequest( 1064 int32_t sessionID, const void *data, ssize_t size, 1065 bool timeValid, int64_t timeUs) { 1066 Mutex::Autolock autoLock(mLock); 1067 1068 ssize_t index = mSessions.indexOfKey(sessionID); 1069 1070 if (index < 0) { 1071 return -ENOENT; 1072 } 1073 1074 const sp<Session> session = mSessions.valueAt(index); 1075 1076 status_t err = session->sendRequest(data, size, timeValid, timeUs); 1077 1078 interrupt(); 1079 1080 return err; 1081 } 1082 1083 void ANetworkSession::interrupt() { 1084 static const char dummy = 0; 1085 1086 ssize_t n; 1087 do { 1088 n = write(mPipeFd[1], &dummy, 1); 1089 } while (n < 0 && errno == EINTR); 1090 1091 if (n < 0) { 1092 ALOGW("Error writing to pipe (%s)", strerror(errno)); 1093 } 1094 } 1095 1096 void ANetworkSession::threadLoop() { 1097 fd_set rs, ws; 1098 FD_ZERO(&rs); 1099 FD_ZERO(&ws); 1100 1101 FD_SET(mPipeFd[0], &rs); 1102 int maxFd = mPipeFd[0]; 1103 1104 { 1105 Mutex::Autolock autoLock(mLock); 1106 1107 for (size_t i = 0; i < mSessions.size(); ++i) { 1108 const sp<Session> &session = mSessions.valueAt(i); 1109 1110 int s = session->socket(); 1111 1112 if (s < 0) { 1113 continue; 1114 } 1115 1116 if (session->wantsToRead()) { 1117 FD_SET(s, &rs); 1118 if (s > maxFd) { 1119 maxFd = s; 1120 } 1121 } 1122 1123 if (session->wantsToWrite()) { 1124 FD_SET(s, &ws); 1125 if (s > maxFd) { 1126 maxFd = s; 1127 } 1128 } 1129 } 1130 } 1131 1132 int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */); 1133 1134 if (res == 0) { 1135 return; 1136 } 1137 1138 if (res < 0) { 1139 if (errno == EINTR) { 1140 return; 1141 } 1142 1143 ALOGE("select failed w/ error %d (%s)", errno, strerror(errno)); 1144 return; 1145 } 1146 1147 if (FD_ISSET(mPipeFd[0], &rs)) { 1148 char c; 1149 ssize_t n; 1150 do { 1151 n = read(mPipeFd[0], &c, 1); 1152 } while (n < 0 && errno == EINTR); 1153 1154 if (n < 0) { 1155 ALOGW("Error reading from pipe (%s)", strerror(errno)); 1156 } 1157 1158 --res; 1159 } 1160 1161 { 1162 Mutex::Autolock autoLock(mLock); 1163 1164 List<sp<Session> > sessionsToAdd; 1165 1166 for (size_t i = mSessions.size(); res > 0 && i-- > 0;) { 1167 const sp<Session> &session = mSessions.valueAt(i); 1168 1169 int s = session->socket(); 1170 1171 if (s < 0) { 1172 continue; 1173 } 1174 1175 if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) { 1176 --res; 1177 } 1178 1179 if (FD_ISSET(s, &rs)) { 1180 if (session->isRTSPServer() || session->isTCPDatagramServer()) { 1181 struct sockaddr_in remoteAddr; 1182 socklen_t remoteAddrLen = sizeof(remoteAddr); 1183 1184 int clientSocket = accept( 1185 s, (struct sockaddr *)&remoteAddr, &remoteAddrLen); 1186 1187 if (clientSocket >= 0) { 1188 status_t err = MakeSocketNonBlocking(clientSocket); 1189 1190 if (err != OK) { 1191 ALOGE("Unable to make client socket non blocking, " 1192 "failed w/ error %d (%s)", 1193 err, strerror(-err)); 1194 1195 close(clientSocket); 1196 clientSocket = -1; 1197 } else { 1198 in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr); 1199 1200 ALOGI("incoming connection from %d.%d.%d.%d:%d " 1201 "(socket %d)", 1202 (addr >> 24), 1203 (addr >> 16) & 0xff, 1204 (addr >> 8) & 0xff, 1205 addr & 0xff, 1206 ntohs(remoteAddr.sin_port), 1207 clientSocket); 1208 1209 sp<Session> clientSession = 1210 new Session( 1211 mNextSessionID++, 1212 Session::CONNECTED, 1213 clientSocket, 1214 session->getNotificationMessage()); 1215 1216 clientSession->setIsRTSPConnection( 1217 session->isRTSPServer()); 1218 1219 sessionsToAdd.push_back(clientSession); 1220 } 1221 } else { 1222 ALOGE("accept returned error %d (%s)", 1223 errno, strerror(errno)); 1224 } 1225 } else { 1226 status_t err = session->readMore(); 1227 if (err != OK) { 1228 ALOGE("readMore on socket %d failed w/ error %d (%s)", 1229 s, err, strerror(-err)); 1230 } 1231 } 1232 } 1233 1234 if (FD_ISSET(s, &ws)) { 1235 status_t err = session->writeMore(); 1236 if (err != OK) { 1237 ALOGE("writeMore on socket %d failed w/ error %d (%s)", 1238 s, err, strerror(-err)); 1239 } 1240 } 1241 } 1242 1243 while (!sessionsToAdd.empty()) { 1244 sp<Session> session = *sessionsToAdd.begin(); 1245 sessionsToAdd.erase(sessionsToAdd.begin()); 1246 1247 mSessions.add(session->sessionID(), session); 1248 1249 ALOGI("added clientSession %d", session->sessionID()); 1250 } 1251 } 1252 } 1253 1254 } // namespace android 1255 1256