1 /* 2 * Copyright 2012, The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "NetworkSession" 19 #include <utils/Log.h> 20 21 #include "ANetworkSession.h" 22 #include "ParsedMessage.h" 23 24 #include <arpa/inet.h> 25 #include <fcntl.h> 26 #include <linux/tcp.h> 27 #include <net/if.h> 28 #include <netdb.h> 29 #include <netinet/in.h> 30 #include <sys/ioctl.h> 31 #include <sys/socket.h> 32 33 #include <media/stagefright/foundation/ABuffer.h> 34 #include <media/stagefright/foundation/ADebug.h> 35 #include <media/stagefright/foundation/AMessage.h> 36 #include <media/stagefright/foundation/ByteUtils.h> 37 #include <media/stagefright/foundation/hexdump.h> 38 39 namespace android { 40 41 static const size_t kMaxUDPSize = 1500; 42 static const int32_t kMaxUDPRetries = 200; 43 44 struct ANetworkSession::NetworkThread : public Thread { 45 explicit NetworkThread(ANetworkSession *session); 46 47 protected: 48 virtual ~NetworkThread(); 49 50 private: 51 ANetworkSession *mSession; 52 53 virtual bool threadLoop(); 54 55 DISALLOW_EVIL_CONSTRUCTORS(NetworkThread); 56 }; 57 58 struct ANetworkSession::Session : public RefBase { 59 enum Mode { 60 MODE_RTSP, 61 MODE_DATAGRAM, 62 MODE_WEBSOCKET, 63 }; 64 65 enum State { 66 CONNECTING, 67 CONNECTED, 68 LISTENING_RTSP, 69 LISTENING_TCP_DGRAMS, 70 DATAGRAM, 71 }; 72 73 Session(int32_t sessionID, 74 State state, 75 int s, 76 const sp<AMessage> ¬ify); 77 78 int32_t sessionID() const; 79 int socket() const; 80 sp<AMessage> getNotificationMessage() const; 81 82 bool isRTSPServer() const; 83 bool isTCPDatagramServer() const; 84 85 bool wantsToRead(); 86 bool wantsToWrite(); 87 88 status_t readMore(); 89 status_t writeMore(); 90 91 status_t sendRequest( 92 const void *data, ssize_t size, bool timeValid, int64_t timeUs); 93 94 void setMode(Mode mode); 95 96 status_t switchToWebSocketMode(); 97 98 protected: 99 virtual ~Session(); 100 101 private: 102 enum { 103 FRAGMENT_FLAG_TIME_VALID = 1, 104 }; 105 struct Fragment { 106 uint32_t mFlags; 107 int64_t mTimeUs; 108 sp<ABuffer> mBuffer; 109 }; 110 111 int32_t mSessionID; 112 State mState; 113 Mode mMode; 114 int mSocket; 115 sp<AMessage> mNotify; 116 bool mSawReceiveFailure, mSawSendFailure; 117 int32_t mUDPRetries; 118 119 List<Fragment> mOutFragments; 120 121 AString mInBuffer; 122 123 int64_t mLastStallReportUs; 124 125 void notifyError(bool send, status_t err, const char *detail); 126 void notify(NotificationReason reason); 127 128 void dumpFragmentStats(const Fragment &frag); 129 130 DISALLOW_EVIL_CONSTRUCTORS(Session); 131 }; 132 //////////////////////////////////////////////////////////////////////////////// 133 134 ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session) 135 : mSession(session) { 136 } 137 138 ANetworkSession::NetworkThread::~NetworkThread() { 139 } 140 141 bool ANetworkSession::NetworkThread::threadLoop() { 142 mSession->threadLoop(); 143 144 return true; 145 } 146 147 //////////////////////////////////////////////////////////////////////////////// 148 149 ANetworkSession::Session::Session( 150 int32_t sessionID, 151 State state, 152 int s, 153 const sp<AMessage> ¬ify) 154 : mSessionID(sessionID), 155 mState(state), 156 mMode(MODE_DATAGRAM), 157 mSocket(s), 158 mNotify(notify), 159 mSawReceiveFailure(false), 160 mSawSendFailure(false), 161 mUDPRetries(kMaxUDPRetries), 162 mLastStallReportUs(-1ll) { 163 if (mState == CONNECTED) { 164 struct sockaddr_in localAddr; 165 socklen_t localAddrLen = sizeof(localAddr); 166 167 int res = getsockname( 168 mSocket, (struct sockaddr *)&localAddr, &localAddrLen); 169 CHECK_GE(res, 0); 170 171 struct sockaddr_in remoteAddr; 172 socklen_t remoteAddrLen = sizeof(remoteAddr); 173 174 res = getpeername( 175 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen); 176 CHECK_GE(res, 0); 177 178 in_addr_t addr = ntohl(localAddr.sin_addr.s_addr); 179 AString localAddrString = AStringPrintf( 180 "%d.%d.%d.%d", 181 (addr >> 24), 182 (addr >> 16) & 0xff, 183 (addr >> 8) & 0xff, 184 addr & 0xff); 185 186 addr = ntohl(remoteAddr.sin_addr.s_addr); 187 AString remoteAddrString = AStringPrintf( 188 "%d.%d.%d.%d", 189 (addr >> 24), 190 (addr >> 16) & 0xff, 191 (addr >> 8) & 0xff, 192 addr & 0xff); 193 194 sp<AMessage> msg = mNotify->dup(); 195 msg->setInt32("sessionID", mSessionID); 196 msg->setInt32("reason", kWhatClientConnected); 197 msg->setString("server-ip", localAddrString.c_str()); 198 msg->setInt32("server-port", ntohs(localAddr.sin_port)); 199 msg->setString("client-ip", remoteAddrString.c_str()); 200 msg->setInt32("client-port", ntohs(remoteAddr.sin_port)); 201 msg->post(); 202 } 203 } 204 205 ANetworkSession::Session::~Session() { 206 ALOGV("Session %d gone", mSessionID); 207 208 close(mSocket); 209 mSocket = -1; 210 } 211 212 int32_t ANetworkSession::Session::sessionID() const { 213 return mSessionID; 214 } 215 216 int ANetworkSession::Session::socket() const { 217 return mSocket; 218 } 219 220 void ANetworkSession::Session::setMode(Mode mode) { 221 mMode = mode; 222 } 223 224 status_t ANetworkSession::Session::switchToWebSocketMode() { 225 if (mState != CONNECTED || mMode != MODE_RTSP) { 226 return INVALID_OPERATION; 227 } 228 229 mMode = MODE_WEBSOCKET; 230 231 return OK; 232 } 233 234 sp<AMessage> ANetworkSession::Session::getNotificationMessage() const { 235 return mNotify; 236 } 237 238 bool ANetworkSession::Session::isRTSPServer() const { 239 return mState == LISTENING_RTSP; 240 } 241 242 bool ANetworkSession::Session::isTCPDatagramServer() const { 243 return mState == LISTENING_TCP_DGRAMS; 244 } 245 246 bool ANetworkSession::Session::wantsToRead() { 247 return !mSawReceiveFailure && mState != CONNECTING; 248 } 249 250 bool ANetworkSession::Session::wantsToWrite() { 251 return !mSawSendFailure 252 && (mState == CONNECTING 253 || (mState == CONNECTED && !mOutFragments.empty()) 254 || (mState == DATAGRAM && !mOutFragments.empty())); 255 } 256 257 status_t ANetworkSession::Session::readMore() { 258 if (mState == DATAGRAM) { 259 CHECK_EQ(mMode, MODE_DATAGRAM); 260 261 status_t err; 262 do { 263 sp<ABuffer> buf = new ABuffer(kMaxUDPSize); 264 265 struct sockaddr_in remoteAddr; 266 socklen_t remoteAddrLen = sizeof(remoteAddr); 267 268 ssize_t n; 269 do { 270 n = recvfrom( 271 mSocket, buf->data(), buf->capacity(), 0, 272 (struct sockaddr *)&remoteAddr, &remoteAddrLen); 273 } while (n < 0 && errno == EINTR); 274 275 err = OK; 276 if (n < 0) { 277 err = -errno; 278 } else if (n == 0) { 279 err = -ECONNRESET; 280 } else { 281 buf->setRange(0, n); 282 283 int64_t nowUs = ALooper::GetNowUs(); 284 buf->meta()->setInt64("arrivalTimeUs", nowUs); 285 286 sp<AMessage> notify = mNotify->dup(); 287 notify->setInt32("sessionID", mSessionID); 288 notify->setInt32("reason", kWhatDatagram); 289 290 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr); 291 notify->setString( 292 "fromAddr", 293 AStringPrintf( 294 "%u.%u.%u.%u", 295 ip >> 24, 296 (ip >> 16) & 0xff, 297 (ip >> 8) & 0xff, 298 ip & 0xff).c_str()); 299 300 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port)); 301 302 notify->setBuffer("data", buf); 303 notify->post(); 304 } 305 } while (err == OK); 306 307 if (err == -EAGAIN) { 308 err = OK; 309 } 310 311 if (err != OK) { 312 if (!mUDPRetries) { 313 notifyError(false /* send */, err, "Recvfrom failed."); 314 mSawReceiveFailure = true; 315 } else { 316 mUDPRetries--; 317 ALOGE("Recvfrom failed, %d/%d retries left", 318 mUDPRetries, kMaxUDPRetries); 319 err = OK; 320 } 321 } else { 322 mUDPRetries = kMaxUDPRetries; 323 } 324 325 return err; 326 } 327 328 char tmp[512]; 329 ssize_t n; 330 do { 331 n = recv(mSocket, tmp, sizeof(tmp), 0); 332 } while (n < 0 && errno == EINTR); 333 334 status_t err = OK; 335 336 if (n > 0) { 337 mInBuffer.append(tmp, n); 338 339 #if 0 340 ALOGI("in:"); 341 hexdump(tmp, n); 342 #endif 343 } else if (n < 0) { 344 err = -errno; 345 } else { 346 err = -ECONNRESET; 347 } 348 349 if (mMode == MODE_DATAGRAM) { 350 // TCP stream carrying 16-bit length-prefixed datagrams. 351 352 while (mInBuffer.size() >= 2) { 353 size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str()); 354 355 if (mInBuffer.size() < packetSize + 2) { 356 break; 357 } 358 359 sp<ABuffer> packet = new ABuffer(packetSize); 360 memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize); 361 362 int64_t nowUs = ALooper::GetNowUs(); 363 packet->meta()->setInt64("arrivalTimeUs", nowUs); 364 365 sp<AMessage> notify = mNotify->dup(); 366 notify->setInt32("sessionID", mSessionID); 367 notify->setInt32("reason", kWhatDatagram); 368 notify->setBuffer("data", packet); 369 notify->post(); 370 371 mInBuffer.erase(0, packetSize + 2); 372 } 373 } else if (mMode == MODE_RTSP) { 374 for (;;) { 375 size_t length; 376 377 if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') { 378 if (mInBuffer.size() < 4) { 379 break; 380 } 381 382 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2); 383 384 if (mInBuffer.size() < 4 + length) { 385 break; 386 } 387 388 sp<AMessage> notify = mNotify->dup(); 389 notify->setInt32("sessionID", mSessionID); 390 notify->setInt32("reason", kWhatBinaryData); 391 notify->setInt32("channel", mInBuffer.c_str()[1]); 392 393 sp<ABuffer> data = new ABuffer(length); 394 memcpy(data->data(), mInBuffer.c_str() + 4, length); 395 396 int64_t nowUs = ALooper::GetNowUs(); 397 data->meta()->setInt64("arrivalTimeUs", nowUs); 398 399 notify->setBuffer("data", data); 400 notify->post(); 401 402 mInBuffer.erase(0, 4 + length); 403 continue; 404 } 405 406 sp<ParsedMessage> msg = 407 ParsedMessage::Parse( 408 mInBuffer.c_str(), mInBuffer.size(), err != OK, &length); 409 410 if (msg == NULL) { 411 break; 412 } 413 414 sp<AMessage> notify = mNotify->dup(); 415 notify->setInt32("sessionID", mSessionID); 416 notify->setInt32("reason", kWhatData); 417 notify->setObject("data", msg); 418 notify->post(); 419 420 #if 1 421 // XXX The (old) dongle sends the wrong content length header on a 422 // SET_PARAMETER request that signals a "wfd_idr_request". 423 // (17 instead of 19). 424 const char *content = msg->getContent(); 425 if (content 426 && !memcmp(content, "wfd_idr_request\r\n", 17) 427 && length >= 19 428 && mInBuffer.c_str()[length] == '\r' 429 && mInBuffer.c_str()[length + 1] == '\n') { 430 length += 2; 431 } 432 #endif 433 434 mInBuffer.erase(0, length); 435 436 if (err != OK) { 437 break; 438 } 439 } 440 } else { 441 CHECK_EQ(mMode, MODE_WEBSOCKET); 442 443 const uint8_t *data = (const uint8_t *)mInBuffer.c_str(); 444 // hexdump(data, mInBuffer.size()); 445 446 while (mInBuffer.size() >= 2) { 447 size_t offset = 2; 448 449 uint64_t payloadLen = data[1] & 0x7f; 450 if (payloadLen == 126) { 451 if (offset + 2 > mInBuffer.size()) { 452 break; 453 } 454 455 payloadLen = U16_AT(&data[offset]); 456 offset += 2; 457 } else if (payloadLen == 127) { 458 if (offset + 8 > mInBuffer.size()) { 459 break; 460 } 461 462 payloadLen = U64_AT(&data[offset]); 463 offset += 8; 464 } 465 466 uint32_t mask = 0; 467 if (data[1] & 0x80) { 468 // MASK==1 469 if (offset + 4 > mInBuffer.size()) { 470 break; 471 } 472 473 mask = U32_AT(&data[offset]); 474 offset += 4; 475 } 476 477 if (payloadLen > mInBuffer.size() || offset > mInBuffer.size() - payloadLen) { 478 break; 479 } 480 481 // We have the full message. 482 483 sp<ABuffer> packet = new ABuffer(payloadLen); 484 memcpy(packet->data(), &data[offset], payloadLen); 485 486 if (mask != 0) { 487 for (size_t i = 0; i < payloadLen; ++i) { 488 packet->data()[i] = 489 data[offset + i] 490 ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff); 491 } 492 } 493 494 sp<AMessage> notify = mNotify->dup(); 495 notify->setInt32("sessionID", mSessionID); 496 notify->setInt32("reason", kWhatWebSocketMessage); 497 notify->setBuffer("data", packet); 498 notify->setInt32("headerByte", data[0]); 499 notify->post(); 500 501 mInBuffer.erase(0, offset + payloadLen); 502 } 503 } 504 505 if (err != OK) { 506 notifyError(false /* send */, err, "Recv failed."); 507 mSawReceiveFailure = true; 508 } 509 510 return err; 511 } 512 513 void ANetworkSession::Session::dumpFragmentStats(const Fragment & /* frag */) { 514 #if 0 515 int64_t nowUs = ALooper::GetNowUs(); 516 int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll; 517 518 static const int64_t kMinDelayMs = 0; 519 static const int64_t kMaxDelayMs = 300; 520 521 const char *kPattern = "########################################"; 522 size_t kPatternSize = strlen(kPattern); 523 524 int n = (kPatternSize * (delayMs - kMinDelayMs)) 525 / (kMaxDelayMs - kMinDelayMs); 526 527 if (n < 0) { 528 n = 0; 529 } else if ((size_t)n > kPatternSize) { 530 n = kPatternSize; 531 } 532 533 ALOGI("[%lld]: (%4lld ms) %s\n", 534 frag.mTimeUs / 1000, 535 delayMs, 536 kPattern + kPatternSize - n); 537 #endif 538 } 539 540 status_t ANetworkSession::Session::writeMore() { 541 if (mState == DATAGRAM) { 542 CHECK(!mOutFragments.empty()); 543 544 status_t err; 545 do { 546 const Fragment &frag = *mOutFragments.begin(); 547 const sp<ABuffer> &datagram = frag.mBuffer; 548 549 int n; 550 do { 551 n = send(mSocket, datagram->data(), datagram->size(), 0); 552 } while (n < 0 && errno == EINTR); 553 554 err = OK; 555 556 if (n > 0) { 557 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) { 558 dumpFragmentStats(frag); 559 } 560 561 mOutFragments.erase(mOutFragments.begin()); 562 } else if (n < 0) { 563 err = -errno; 564 } else if (n == 0) { 565 err = -ECONNRESET; 566 } 567 } while (err == OK && !mOutFragments.empty()); 568 569 if (err == -EAGAIN) { 570 if (!mOutFragments.empty()) { 571 ALOGI("%zu datagrams remain queued.", mOutFragments.size()); 572 } 573 err = OK; 574 } 575 576 if (err != OK) { 577 if (!mUDPRetries) { 578 notifyError(true /* send */, err, "Send datagram failed."); 579 mSawSendFailure = true; 580 } else { 581 mUDPRetries--; 582 ALOGE("Send datagram failed, %d/%d retries left", 583 mUDPRetries, kMaxUDPRetries); 584 err = OK; 585 } 586 } else { 587 mUDPRetries = kMaxUDPRetries; 588 } 589 590 return err; 591 } 592 593 if (mState == CONNECTING) { 594 int err; 595 socklen_t optionLen = sizeof(err); 596 CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0); 597 CHECK_EQ(optionLen, (socklen_t)sizeof(err)); 598 599 if (err != 0) { 600 notifyError(kWhatError, -err, "Connection failed"); 601 mSawSendFailure = true; 602 603 return -err; 604 } 605 606 mState = CONNECTED; 607 notify(kWhatConnected); 608 609 return OK; 610 } 611 612 CHECK_EQ(mState, CONNECTED); 613 CHECK(!mOutFragments.empty()); 614 615 ssize_t n = -1; 616 while (!mOutFragments.empty()) { 617 const Fragment &frag = *mOutFragments.begin(); 618 619 do { 620 n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0); 621 } while (n < 0 && errno == EINTR); 622 623 if (n <= 0) { 624 break; 625 } 626 627 frag.mBuffer->setRange( 628 frag.mBuffer->offset() + n, frag.mBuffer->size() - n); 629 630 if (frag.mBuffer->size() > 0) { 631 break; 632 } 633 634 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) { 635 dumpFragmentStats(frag); 636 } 637 638 mOutFragments.erase(mOutFragments.begin()); 639 } 640 641 status_t err = OK; 642 643 if (n < 0) { 644 err = -errno; 645 } else if (n == 0) { 646 err = -ECONNRESET; 647 } 648 649 if (err != OK) { 650 notifyError(true /* send */, err, "Send failed."); 651 mSawSendFailure = true; 652 } 653 654 #if 0 655 int numBytesQueued; 656 int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued); 657 if (res == 0 && numBytesQueued > 50 * 1024) { 658 if (numBytesQueued > 409600) { 659 ALOGW("!!! numBytesQueued = %d", numBytesQueued); 660 } 661 662 int64_t nowUs = ALooper::GetNowUs(); 663 664 if (mLastStallReportUs < 0ll 665 || nowUs > mLastStallReportUs + 100000ll) { 666 sp<AMessage> msg = mNotify->dup(); 667 msg->setInt32("sessionID", mSessionID); 668 msg->setInt32("reason", kWhatNetworkStall); 669 msg->setSize("numBytesQueued", numBytesQueued); 670 msg->post(); 671 672 mLastStallReportUs = nowUs; 673 } 674 } 675 #endif 676 677 return err; 678 } 679 680 status_t ANetworkSession::Session::sendRequest( 681 const void *data, ssize_t size, bool timeValid, int64_t timeUs) { 682 CHECK(mState == CONNECTED || mState == DATAGRAM); 683 684 if (size < 0) { 685 size = strlen((const char *)data); 686 } 687 688 if (size == 0) { 689 return OK; 690 } 691 692 sp<ABuffer> buffer; 693 694 if (mState == CONNECTED && mMode == MODE_DATAGRAM) { 695 CHECK_LE(size, 65535); 696 697 buffer = new ABuffer(size + 2); 698 buffer->data()[0] = size >> 8; 699 buffer->data()[1] = size & 0xff; 700 memcpy(buffer->data() + 2, data, size); 701 } else if (mState == CONNECTED && mMode == MODE_WEBSOCKET) { 702 static const bool kUseMask = false; // Chromium doesn't like it. 703 704 size_t numHeaderBytes = 2 + (kUseMask ? 4 : 0); 705 if (size > 65535) { 706 numHeaderBytes += 8; 707 } else if (size > 125) { 708 numHeaderBytes += 2; 709 } 710 711 buffer = new ABuffer(numHeaderBytes + size); 712 buffer->data()[0] = 0x81; // FIN==1 | opcode=1 (text) 713 buffer->data()[1] = kUseMask ? 0x80 : 0x00; 714 715 if (size > 65535) { 716 buffer->data()[1] |= 127; 717 buffer->data()[2] = 0x00; 718 buffer->data()[3] = 0x00; 719 buffer->data()[4] = 0x00; 720 buffer->data()[5] = 0x00; 721 buffer->data()[6] = (size >> 24) & 0xff; 722 buffer->data()[7] = (size >> 16) & 0xff; 723 buffer->data()[8] = (size >> 8) & 0xff; 724 buffer->data()[9] = size & 0xff; 725 } else if (size > 125) { 726 buffer->data()[1] |= 126; 727 buffer->data()[2] = (size >> 8) & 0xff; 728 buffer->data()[3] = size & 0xff; 729 } else { 730 buffer->data()[1] |= size; 731 } 732 733 if (kUseMask) { 734 uint32_t mask = rand(); 735 736 buffer->data()[numHeaderBytes - 4] = (mask >> 24) & 0xff; 737 buffer->data()[numHeaderBytes - 3] = (mask >> 16) & 0xff; 738 buffer->data()[numHeaderBytes - 2] = (mask >> 8) & 0xff; 739 buffer->data()[numHeaderBytes - 1] = mask & 0xff; 740 741 for (size_t i = 0; i < (size_t)size; ++i) { 742 buffer->data()[numHeaderBytes + i] = 743 ((const uint8_t *)data)[i] 744 ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff); 745 } 746 } else { 747 memcpy(buffer->data() + numHeaderBytes, data, size); 748 } 749 } else { 750 buffer = new ABuffer(size); 751 memcpy(buffer->data(), data, size); 752 } 753 754 Fragment frag; 755 756 frag.mFlags = 0; 757 if (timeValid) { 758 frag.mFlags = FRAGMENT_FLAG_TIME_VALID; 759 frag.mTimeUs = timeUs; 760 } 761 762 frag.mBuffer = buffer; 763 764 mOutFragments.push_back(frag); 765 766 return OK; 767 } 768 769 void ANetworkSession::Session::notifyError( 770 bool send, status_t err, const char *detail) { 771 sp<AMessage> msg = mNotify->dup(); 772 msg->setInt32("sessionID", mSessionID); 773 msg->setInt32("reason", kWhatError); 774 msg->setInt32("send", send); 775 msg->setInt32("err", err); 776 msg->setString("detail", detail); 777 msg->post(); 778 } 779 780 void ANetworkSession::Session::notify(NotificationReason reason) { 781 sp<AMessage> msg = mNotify->dup(); 782 msg->setInt32("sessionID", mSessionID); 783 msg->setInt32("reason", reason); 784 msg->post(); 785 } 786 787 //////////////////////////////////////////////////////////////////////////////// 788 789 ANetworkSession::ANetworkSession() 790 : mNextSessionID(1) { 791 mPipeFd[0] = mPipeFd[1] = -1; 792 } 793 794 ANetworkSession::~ANetworkSession() { 795 stop(); 796 } 797 798 status_t ANetworkSession::start() { 799 if (mThread != NULL) { 800 return INVALID_OPERATION; 801 } 802 803 int res = pipe(mPipeFd); 804 if (res != 0) { 805 mPipeFd[0] = mPipeFd[1] = -1; 806 return -errno; 807 } 808 809 mThread = new NetworkThread(this); 810 811 status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO); 812 813 if (err != OK) { 814 mThread.clear(); 815 816 close(mPipeFd[0]); 817 close(mPipeFd[1]); 818 mPipeFd[0] = mPipeFd[1] = -1; 819 820 return err; 821 } 822 823 return OK; 824 } 825 826 status_t ANetworkSession::stop() { 827 if (mThread == NULL) { 828 return INVALID_OPERATION; 829 } 830 831 mThread->requestExit(); 832 interrupt(); 833 mThread->requestExitAndWait(); 834 835 mThread.clear(); 836 837 close(mPipeFd[0]); 838 close(mPipeFd[1]); 839 mPipeFd[0] = mPipeFd[1] = -1; 840 841 return OK; 842 } 843 844 status_t ANetworkSession::createRTSPClient( 845 const char *host, unsigned port, const sp<AMessage> ¬ify, 846 int32_t *sessionID) { 847 return createClientOrServer( 848 kModeCreateRTSPClient, 849 NULL /* addr */, 850 0 /* port */, 851 host, 852 port, 853 notify, 854 sessionID); 855 } 856 857 status_t ANetworkSession::createRTSPServer( 858 const struct in_addr &addr, unsigned port, 859 const sp<AMessage> ¬ify, int32_t *sessionID) { 860 return createClientOrServer( 861 kModeCreateRTSPServer, 862 &addr, 863 port, 864 NULL /* remoteHost */, 865 0 /* remotePort */, 866 notify, 867 sessionID); 868 } 869 870 status_t ANetworkSession::createUDPSession( 871 unsigned localPort, const sp<AMessage> ¬ify, int32_t *sessionID) { 872 return createUDPSession(localPort, NULL, 0, notify, sessionID); 873 } 874 875 status_t ANetworkSession::createUDPSession( 876 unsigned localPort, 877 const char *remoteHost, 878 unsigned remotePort, 879 const sp<AMessage> ¬ify, 880 int32_t *sessionID) { 881 return createClientOrServer( 882 kModeCreateUDPSession, 883 NULL /* addr */, 884 localPort, 885 remoteHost, 886 remotePort, 887 notify, 888 sessionID); 889 } 890 891 status_t ANetworkSession::createTCPDatagramSession( 892 const struct in_addr &addr, unsigned port, 893 const sp<AMessage> ¬ify, int32_t *sessionID) { 894 return createClientOrServer( 895 kModeCreateTCPDatagramSessionPassive, 896 &addr, 897 port, 898 NULL /* remoteHost */, 899 0 /* remotePort */, 900 notify, 901 sessionID); 902 } 903 904 status_t ANetworkSession::createTCPDatagramSession( 905 unsigned localPort, 906 const char *remoteHost, 907 unsigned remotePort, 908 const sp<AMessage> ¬ify, 909 int32_t *sessionID) { 910 return createClientOrServer( 911 kModeCreateTCPDatagramSessionActive, 912 NULL /* addr */, 913 localPort, 914 remoteHost, 915 remotePort, 916 notify, 917 sessionID); 918 } 919 920 status_t ANetworkSession::destroySession(int32_t sessionID) { 921 Mutex::Autolock autoLock(mLock); 922 923 ssize_t index = mSessions.indexOfKey(sessionID); 924 925 if (index < 0) { 926 return -ENOENT; 927 } 928 929 mSessions.removeItemsAt(index); 930 931 interrupt(); 932 933 return OK; 934 } 935 936 // static 937 status_t ANetworkSession::MakeSocketNonBlocking(int s) { 938 int flags = fcntl(s, F_GETFL, 0); 939 if (flags < 0) { 940 flags = 0; 941 } 942 943 int res = fcntl(s, F_SETFL, flags | O_NONBLOCK); 944 if (res < 0) { 945 return -errno; 946 } 947 948 return OK; 949 } 950 951 status_t ANetworkSession::createClientOrServer( 952 Mode mode, 953 const struct in_addr *localAddr, 954 unsigned port, 955 const char *remoteHost, 956 unsigned remotePort, 957 const sp<AMessage> ¬ify, 958 int32_t *sessionID) { 959 Mutex::Autolock autoLock(mLock); 960 961 *sessionID = 0; 962 status_t err = OK; 963 int s, res; 964 sp<Session> session; 965 966 s = socket( 967 AF_INET, 968 (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM, 969 0); 970 971 if (s < 0) { 972 err = -errno; 973 goto bail; 974 } 975 976 if (mode == kModeCreateRTSPServer 977 || mode == kModeCreateTCPDatagramSessionPassive) { 978 const int yes = 1; 979 res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); 980 981 if (res < 0) { 982 err = -errno; 983 goto bail2; 984 } 985 } 986 987 if (mode == kModeCreateUDPSession) { 988 int size = 256 * 1024; 989 990 res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)); 991 992 if (res < 0) { 993 err = -errno; 994 goto bail2; 995 } 996 997 res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)); 998 999 if (res < 0) { 1000 err = -errno; 1001 goto bail2; 1002 } 1003 } else if (mode == kModeCreateTCPDatagramSessionActive) { 1004 int flag = 1; 1005 res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)); 1006 1007 if (res < 0) { 1008 err = -errno; 1009 goto bail2; 1010 } 1011 1012 int tos = 224; // VOICE 1013 res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)); 1014 1015 if (res < 0) { 1016 err = -errno; 1017 goto bail2; 1018 } 1019 } 1020 1021 err = MakeSocketNonBlocking(s); 1022 1023 if (err != OK) { 1024 goto bail2; 1025 } 1026 1027 struct sockaddr_in addr; 1028 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 1029 addr.sin_family = AF_INET; 1030 1031 if (mode == kModeCreateRTSPClient 1032 || mode == kModeCreateTCPDatagramSessionActive) { 1033 struct hostent *ent= gethostbyname(remoteHost); 1034 if (ent == NULL) { 1035 err = -h_errno; 1036 goto bail2; 1037 } 1038 1039 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 1040 addr.sin_port = htons(remotePort); 1041 } else if (localAddr != NULL) { 1042 addr.sin_addr = *localAddr; 1043 addr.sin_port = htons(port); 1044 } else { 1045 addr.sin_addr.s_addr = htonl(INADDR_ANY); 1046 addr.sin_port = htons(port); 1047 } 1048 1049 if (mode == kModeCreateRTSPClient 1050 || mode == kModeCreateTCPDatagramSessionActive) { 1051 in_addr_t x = ntohl(addr.sin_addr.s_addr); 1052 ALOGI("connecting socket %d to %d.%d.%d.%d:%d", 1053 s, 1054 (x >> 24), 1055 (x >> 16) & 0xff, 1056 (x >> 8) & 0xff, 1057 x & 0xff, 1058 ntohs(addr.sin_port)); 1059 1060 res = connect(s, (const struct sockaddr *)&addr, sizeof(addr)); 1061 1062 CHECK_LT(res, 0); 1063 if (errno == EINPROGRESS) { 1064 res = 0; 1065 } 1066 } else { 1067 res = bind(s, (const struct sockaddr *)&addr, sizeof(addr)); 1068 1069 if (res == 0) { 1070 if (mode == kModeCreateRTSPServer 1071 || mode == kModeCreateTCPDatagramSessionPassive) { 1072 res = listen(s, 4); 1073 } else { 1074 CHECK_EQ(mode, kModeCreateUDPSession); 1075 1076 if (remoteHost != NULL) { 1077 struct sockaddr_in remoteAddr; 1078 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); 1079 remoteAddr.sin_family = AF_INET; 1080 remoteAddr.sin_port = htons(remotePort); 1081 1082 struct hostent *ent= gethostbyname(remoteHost); 1083 if (ent == NULL) { 1084 err = -h_errno; 1085 goto bail2; 1086 } 1087 1088 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 1089 1090 res = connect( 1091 s, 1092 (const struct sockaddr *)&remoteAddr, 1093 sizeof(remoteAddr)); 1094 } 1095 } 1096 } 1097 } 1098 1099 if (res < 0) { 1100 err = -errno; 1101 goto bail2; 1102 } 1103 1104 Session::State state; 1105 switch (mode) { 1106 case kModeCreateRTSPClient: 1107 state = Session::CONNECTING; 1108 break; 1109 1110 case kModeCreateTCPDatagramSessionActive: 1111 state = Session::CONNECTING; 1112 break; 1113 1114 case kModeCreateTCPDatagramSessionPassive: 1115 state = Session::LISTENING_TCP_DGRAMS; 1116 break; 1117 1118 case kModeCreateRTSPServer: 1119 state = Session::LISTENING_RTSP; 1120 break; 1121 1122 default: 1123 CHECK_EQ(mode, kModeCreateUDPSession); 1124 state = Session::DATAGRAM; 1125 break; 1126 } 1127 1128 session = new Session( 1129 mNextSessionID++, 1130 state, 1131 s, 1132 notify); 1133 1134 if (mode == kModeCreateTCPDatagramSessionActive) { 1135 session->setMode(Session::MODE_DATAGRAM); 1136 } else if (mode == kModeCreateRTSPClient) { 1137 session->setMode(Session::MODE_RTSP); 1138 } 1139 1140 mSessions.add(session->sessionID(), session); 1141 1142 interrupt(); 1143 1144 *sessionID = session->sessionID(); 1145 1146 goto bail; 1147 1148 bail2: 1149 close(s); 1150 s = -1; 1151 1152 bail: 1153 return err; 1154 } 1155 1156 status_t ANetworkSession::connectUDPSession( 1157 int32_t sessionID, const char *remoteHost, unsigned remotePort) { 1158 Mutex::Autolock autoLock(mLock); 1159 1160 ssize_t index = mSessions.indexOfKey(sessionID); 1161 1162 if (index < 0) { 1163 return -ENOENT; 1164 } 1165 1166 const sp<Session> session = mSessions.valueAt(index); 1167 int s = session->socket(); 1168 1169 struct sockaddr_in remoteAddr; 1170 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); 1171 remoteAddr.sin_family = AF_INET; 1172 remoteAddr.sin_port = htons(remotePort); 1173 1174 status_t err = OK; 1175 struct hostent *ent = gethostbyname(remoteHost); 1176 if (ent == NULL) { 1177 err = -h_errno; 1178 } else { 1179 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 1180 1181 int res = connect( 1182 s, 1183 (const struct sockaddr *)&remoteAddr, 1184 sizeof(remoteAddr)); 1185 1186 if (res < 0) { 1187 err = -errno; 1188 } 1189 } 1190 1191 return err; 1192 } 1193 1194 status_t ANetworkSession::sendRequest( 1195 int32_t sessionID, const void *data, ssize_t size, 1196 bool timeValid, int64_t timeUs) { 1197 Mutex::Autolock autoLock(mLock); 1198 1199 ssize_t index = mSessions.indexOfKey(sessionID); 1200 1201 if (index < 0) { 1202 return -ENOENT; 1203 } 1204 1205 const sp<Session> session = mSessions.valueAt(index); 1206 1207 status_t err = session->sendRequest(data, size, timeValid, timeUs); 1208 1209 interrupt(); 1210 1211 return err; 1212 } 1213 1214 status_t ANetworkSession::switchToWebSocketMode(int32_t sessionID) { 1215 Mutex::Autolock autoLock(mLock); 1216 1217 ssize_t index = mSessions.indexOfKey(sessionID); 1218 1219 if (index < 0) { 1220 return -ENOENT; 1221 } 1222 1223 const sp<Session> session = mSessions.valueAt(index); 1224 return session->switchToWebSocketMode(); 1225 } 1226 1227 void ANetworkSession::interrupt() { 1228 static const char dummy = 0; 1229 1230 ssize_t n; 1231 do { 1232 n = write(mPipeFd[1], &dummy, 1); 1233 } while (n < 0 && errno == EINTR); 1234 1235 if (n < 0) { 1236 ALOGW("Error writing to pipe (%s)", strerror(errno)); 1237 } 1238 } 1239 1240 void ANetworkSession::threadLoop() { 1241 fd_set rs, ws; 1242 FD_ZERO(&rs); 1243 FD_ZERO(&ws); 1244 1245 FD_SET(mPipeFd[0], &rs); 1246 int maxFd = mPipeFd[0]; 1247 1248 { 1249 Mutex::Autolock autoLock(mLock); 1250 1251 for (size_t i = 0; i < mSessions.size(); ++i) { 1252 const sp<Session> &session = mSessions.valueAt(i); 1253 1254 int s = session->socket(); 1255 1256 if (s < 0) { 1257 continue; 1258 } 1259 1260 if (session->wantsToRead()) { 1261 FD_SET(s, &rs); 1262 if (s > maxFd) { 1263 maxFd = s; 1264 } 1265 } 1266 1267 if (session->wantsToWrite()) { 1268 FD_SET(s, &ws); 1269 if (s > maxFd) { 1270 maxFd = s; 1271 } 1272 } 1273 } 1274 } 1275 1276 int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */); 1277 1278 if (res == 0) { 1279 return; 1280 } 1281 1282 if (res < 0) { 1283 if (errno == EINTR) { 1284 return; 1285 } 1286 1287 ALOGE("select failed w/ error %d (%s)", errno, strerror(errno)); 1288 return; 1289 } 1290 1291 if (FD_ISSET(mPipeFd[0], &rs)) { 1292 char c; 1293 ssize_t n; 1294 do { 1295 n = read(mPipeFd[0], &c, 1); 1296 } while (n < 0 && errno == EINTR); 1297 1298 if (n < 0) { 1299 ALOGW("Error reading from pipe (%s)", strerror(errno)); 1300 } 1301 1302 --res; 1303 } 1304 1305 { 1306 Mutex::Autolock autoLock(mLock); 1307 1308 List<sp<Session> > sessionsToAdd; 1309 1310 for (size_t i = mSessions.size(); res > 0 && i > 0;) { 1311 i--; 1312 const sp<Session> &session = mSessions.valueAt(i); 1313 1314 int s = session->socket(); 1315 1316 if (s < 0) { 1317 continue; 1318 } 1319 1320 if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) { 1321 --res; 1322 } 1323 1324 if (FD_ISSET(s, &rs)) { 1325 if (session->isRTSPServer() || session->isTCPDatagramServer()) { 1326 struct sockaddr_in remoteAddr; 1327 socklen_t remoteAddrLen = sizeof(remoteAddr); 1328 1329 int clientSocket = accept( 1330 s, (struct sockaddr *)&remoteAddr, &remoteAddrLen); 1331 1332 if (clientSocket >= 0) { 1333 status_t err = MakeSocketNonBlocking(clientSocket); 1334 1335 if (err != OK) { 1336 ALOGE("Unable to make client socket non blocking, " 1337 "failed w/ error %d (%s)", 1338 err, strerror(-err)); 1339 1340 close(clientSocket); 1341 clientSocket = -1; 1342 } else { 1343 in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr); 1344 1345 ALOGI("incoming connection from %d.%d.%d.%d:%d " 1346 "(socket %d)", 1347 (addr >> 24), 1348 (addr >> 16) & 0xff, 1349 (addr >> 8) & 0xff, 1350 addr & 0xff, 1351 ntohs(remoteAddr.sin_port), 1352 clientSocket); 1353 1354 sp<Session> clientSession = 1355 new Session( 1356 mNextSessionID++, 1357 Session::CONNECTED, 1358 clientSocket, 1359 session->getNotificationMessage()); 1360 1361 clientSession->setMode( 1362 session->isRTSPServer() 1363 ? Session::MODE_RTSP 1364 : Session::MODE_DATAGRAM); 1365 1366 sessionsToAdd.push_back(clientSession); 1367 } 1368 } else { 1369 ALOGE("accept returned error %d (%s)", 1370 errno, strerror(errno)); 1371 } 1372 } else { 1373 status_t err = session->readMore(); 1374 if (err != OK) { 1375 ALOGE("readMore on socket %d failed w/ error %d (%s)", 1376 s, err, strerror(-err)); 1377 } 1378 } 1379 } 1380 1381 if (FD_ISSET(s, &ws)) { 1382 status_t err = session->writeMore(); 1383 if (err != OK) { 1384 ALOGE("writeMore on socket %d failed w/ error %d (%s)", 1385 s, err, strerror(-err)); 1386 } 1387 } 1388 } 1389 1390 while (!sessionsToAdd.empty()) { 1391 sp<Session> session = *sessionsToAdd.begin(); 1392 sessionsToAdd.erase(sessionsToAdd.begin()); 1393 1394 mSessions.add(session->sessionID(), session); 1395 1396 ALOGI("added clientSession %d", session->sessionID()); 1397 } 1398 } 1399 } 1400 1401 } // namespace android 1402