1 /* 2 * Copyright 2012, The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "NetworkSession" 19 #include <utils/Log.h> 20 21 #include "ANetworkSession.h" 22 #include "ParsedMessage.h" 23 24 #include <arpa/inet.h> 25 #include <fcntl.h> 26 #include <linux/tcp.h> 27 #include <net/if.h> 28 #include <netdb.h> 29 #include <netinet/in.h> 30 #include <sys/ioctl.h> 31 #include <sys/socket.h> 32 33 #include <media/stagefright/foundation/ABuffer.h> 34 #include <media/stagefright/foundation/ADebug.h> 35 #include <media/stagefright/foundation/AMessage.h> 36 #include <media/stagefright/foundation/hexdump.h> 37 38 namespace android { 39 40 static uint16_t U16_AT(const uint8_t *ptr) { 41 return ptr[0] << 8 | ptr[1]; 42 } 43 44 static uint32_t U32_AT(const uint8_t *ptr) { 45 return ptr[0] << 24 | ptr[1] << 16 | ptr[2] << 8 | ptr[3]; 46 } 47 48 static uint64_t U64_AT(const uint8_t *ptr) { 49 return ((uint64_t)U32_AT(ptr)) << 32 | U32_AT(ptr + 4); 50 } 51 52 static const size_t kMaxUDPSize = 1500; 53 static const int32_t kMaxUDPRetries = 200; 54 55 struct ANetworkSession::NetworkThread : public Thread { 56 NetworkThread(ANetworkSession *session); 57 58 protected: 59 virtual ~NetworkThread(); 60 61 private: 62 ANetworkSession *mSession; 63 64 virtual bool threadLoop(); 65 66 DISALLOW_EVIL_CONSTRUCTORS(NetworkThread); 67 }; 68 69 struct ANetworkSession::Session : public RefBase { 70 enum Mode { 71 MODE_RTSP, 72 MODE_DATAGRAM, 73 MODE_WEBSOCKET, 74 }; 75 76 enum State { 77 CONNECTING, 78 CONNECTED, 79 LISTENING_RTSP, 80 LISTENING_TCP_DGRAMS, 81 DATAGRAM, 82 }; 83 84 Session(int32_t sessionID, 85 State state, 86 int s, 87 const sp<AMessage> ¬ify); 88 89 int32_t sessionID() const; 90 int socket() const; 91 sp<AMessage> getNotificationMessage() const; 92 93 bool isRTSPServer() const; 94 bool isTCPDatagramServer() const; 95 96 bool wantsToRead(); 97 bool wantsToWrite(); 98 99 status_t readMore(); 100 status_t writeMore(); 101 102 status_t sendRequest( 103 const void *data, ssize_t size, bool timeValid, int64_t timeUs); 104 105 void setMode(Mode mode); 106 107 status_t switchToWebSocketMode(); 108 109 protected: 110 virtual ~Session(); 111 112 private: 113 enum { 114 FRAGMENT_FLAG_TIME_VALID = 1, 115 }; 116 struct Fragment { 117 uint32_t mFlags; 118 int64_t mTimeUs; 119 sp<ABuffer> mBuffer; 120 }; 121 122 int32_t mSessionID; 123 State mState; 124 Mode mMode; 125 int mSocket; 126 sp<AMessage> mNotify; 127 bool mSawReceiveFailure, mSawSendFailure; 128 int32_t mUDPRetries; 129 130 List<Fragment> mOutFragments; 131 132 AString mInBuffer; 133 134 int64_t mLastStallReportUs; 135 136 void notifyError(bool send, status_t err, const char *detail); 137 void notify(NotificationReason reason); 138 139 void dumpFragmentStats(const Fragment &frag); 140 141 DISALLOW_EVIL_CONSTRUCTORS(Session); 142 }; 143 //////////////////////////////////////////////////////////////////////////////// 144 145 ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session) 146 : mSession(session) { 147 } 148 149 ANetworkSession::NetworkThread::~NetworkThread() { 150 } 151 152 bool ANetworkSession::NetworkThread::threadLoop() { 153 mSession->threadLoop(); 154 155 return true; 156 } 157 158 //////////////////////////////////////////////////////////////////////////////// 159 160 ANetworkSession::Session::Session( 161 int32_t sessionID, 162 State state, 163 int s, 164 const sp<AMessage> ¬ify) 165 : mSessionID(sessionID), 166 mState(state), 167 mMode(MODE_DATAGRAM), 168 mSocket(s), 169 mNotify(notify), 170 mSawReceiveFailure(false), 171 mSawSendFailure(false), 172 mUDPRetries(kMaxUDPRetries), 173 mLastStallReportUs(-1ll) { 174 if (mState == CONNECTED) { 175 struct sockaddr_in localAddr; 176 socklen_t localAddrLen = sizeof(localAddr); 177 178 int res = getsockname( 179 mSocket, (struct sockaddr *)&localAddr, &localAddrLen); 180 CHECK_GE(res, 0); 181 182 struct sockaddr_in remoteAddr; 183 socklen_t remoteAddrLen = sizeof(remoteAddr); 184 185 res = getpeername( 186 mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen); 187 CHECK_GE(res, 0); 188 189 in_addr_t addr = ntohl(localAddr.sin_addr.s_addr); 190 AString localAddrString = StringPrintf( 191 "%d.%d.%d.%d", 192 (addr >> 24), 193 (addr >> 16) & 0xff, 194 (addr >> 8) & 0xff, 195 addr & 0xff); 196 197 addr = ntohl(remoteAddr.sin_addr.s_addr); 198 AString remoteAddrString = StringPrintf( 199 "%d.%d.%d.%d", 200 (addr >> 24), 201 (addr >> 16) & 0xff, 202 (addr >> 8) & 0xff, 203 addr & 0xff); 204 205 sp<AMessage> msg = mNotify->dup(); 206 msg->setInt32("sessionID", mSessionID); 207 msg->setInt32("reason", kWhatClientConnected); 208 msg->setString("server-ip", localAddrString.c_str()); 209 msg->setInt32("server-port", ntohs(localAddr.sin_port)); 210 msg->setString("client-ip", remoteAddrString.c_str()); 211 msg->setInt32("client-port", ntohs(remoteAddr.sin_port)); 212 msg->post(); 213 } 214 } 215 216 ANetworkSession::Session::~Session() { 217 ALOGV("Session %d gone", mSessionID); 218 219 close(mSocket); 220 mSocket = -1; 221 } 222 223 int32_t ANetworkSession::Session::sessionID() const { 224 return mSessionID; 225 } 226 227 int ANetworkSession::Session::socket() const { 228 return mSocket; 229 } 230 231 void ANetworkSession::Session::setMode(Mode mode) { 232 mMode = mode; 233 } 234 235 status_t ANetworkSession::Session::switchToWebSocketMode() { 236 if (mState != CONNECTED || mMode != MODE_RTSP) { 237 return INVALID_OPERATION; 238 } 239 240 mMode = MODE_WEBSOCKET; 241 242 return OK; 243 } 244 245 sp<AMessage> ANetworkSession::Session::getNotificationMessage() const { 246 return mNotify; 247 } 248 249 bool ANetworkSession::Session::isRTSPServer() const { 250 return mState == LISTENING_RTSP; 251 } 252 253 bool ANetworkSession::Session::isTCPDatagramServer() const { 254 return mState == LISTENING_TCP_DGRAMS; 255 } 256 257 bool ANetworkSession::Session::wantsToRead() { 258 return !mSawReceiveFailure && mState != CONNECTING; 259 } 260 261 bool ANetworkSession::Session::wantsToWrite() { 262 return !mSawSendFailure 263 && (mState == CONNECTING 264 || (mState == CONNECTED && !mOutFragments.empty()) 265 || (mState == DATAGRAM && !mOutFragments.empty())); 266 } 267 268 status_t ANetworkSession::Session::readMore() { 269 if (mState == DATAGRAM) { 270 CHECK_EQ(mMode, MODE_DATAGRAM); 271 272 status_t err; 273 do { 274 sp<ABuffer> buf = new ABuffer(kMaxUDPSize); 275 276 struct sockaddr_in remoteAddr; 277 socklen_t remoteAddrLen = sizeof(remoteAddr); 278 279 ssize_t n; 280 do { 281 n = recvfrom( 282 mSocket, buf->data(), buf->capacity(), 0, 283 (struct sockaddr *)&remoteAddr, &remoteAddrLen); 284 } while (n < 0 && errno == EINTR); 285 286 err = OK; 287 if (n < 0) { 288 err = -errno; 289 } else if (n == 0) { 290 err = -ECONNRESET; 291 } else { 292 buf->setRange(0, n); 293 294 int64_t nowUs = ALooper::GetNowUs(); 295 buf->meta()->setInt64("arrivalTimeUs", nowUs); 296 297 sp<AMessage> notify = mNotify->dup(); 298 notify->setInt32("sessionID", mSessionID); 299 notify->setInt32("reason", kWhatDatagram); 300 301 uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr); 302 notify->setString( 303 "fromAddr", 304 StringPrintf( 305 "%u.%u.%u.%u", 306 ip >> 24, 307 (ip >> 16) & 0xff, 308 (ip >> 8) & 0xff, 309 ip & 0xff).c_str()); 310 311 notify->setInt32("fromPort", ntohs(remoteAddr.sin_port)); 312 313 notify->setBuffer("data", buf); 314 notify->post(); 315 } 316 } while (err == OK); 317 318 if (err == -EAGAIN) { 319 err = OK; 320 } 321 322 if (err != OK) { 323 if (!mUDPRetries) { 324 notifyError(false /* send */, err, "Recvfrom failed."); 325 mSawReceiveFailure = true; 326 } else { 327 mUDPRetries--; 328 ALOGE("Recvfrom failed, %d/%d retries left", 329 mUDPRetries, kMaxUDPRetries); 330 err = OK; 331 } 332 } else { 333 mUDPRetries = kMaxUDPRetries; 334 } 335 336 return err; 337 } 338 339 char tmp[512]; 340 ssize_t n; 341 do { 342 n = recv(mSocket, tmp, sizeof(tmp), 0); 343 } while (n < 0 && errno == EINTR); 344 345 status_t err = OK; 346 347 if (n > 0) { 348 mInBuffer.append(tmp, n); 349 350 #if 0 351 ALOGI("in:"); 352 hexdump(tmp, n); 353 #endif 354 } else if (n < 0) { 355 err = -errno; 356 } else { 357 err = -ECONNRESET; 358 } 359 360 if (mMode == MODE_DATAGRAM) { 361 // TCP stream carrying 16-bit length-prefixed datagrams. 362 363 while (mInBuffer.size() >= 2) { 364 size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str()); 365 366 if (mInBuffer.size() < packetSize + 2) { 367 break; 368 } 369 370 sp<ABuffer> packet = new ABuffer(packetSize); 371 memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize); 372 373 int64_t nowUs = ALooper::GetNowUs(); 374 packet->meta()->setInt64("arrivalTimeUs", nowUs); 375 376 sp<AMessage> notify = mNotify->dup(); 377 notify->setInt32("sessionID", mSessionID); 378 notify->setInt32("reason", kWhatDatagram); 379 notify->setBuffer("data", packet); 380 notify->post(); 381 382 mInBuffer.erase(0, packetSize + 2); 383 } 384 } else if (mMode == MODE_RTSP) { 385 for (;;) { 386 size_t length; 387 388 if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') { 389 if (mInBuffer.size() < 4) { 390 break; 391 } 392 393 length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2); 394 395 if (mInBuffer.size() < 4 + length) { 396 break; 397 } 398 399 sp<AMessage> notify = mNotify->dup(); 400 notify->setInt32("sessionID", mSessionID); 401 notify->setInt32("reason", kWhatBinaryData); 402 notify->setInt32("channel", mInBuffer.c_str()[1]); 403 404 sp<ABuffer> data = new ABuffer(length); 405 memcpy(data->data(), mInBuffer.c_str() + 4, length); 406 407 int64_t nowUs = ALooper::GetNowUs(); 408 data->meta()->setInt64("arrivalTimeUs", nowUs); 409 410 notify->setBuffer("data", data); 411 notify->post(); 412 413 mInBuffer.erase(0, 4 + length); 414 continue; 415 } 416 417 sp<ParsedMessage> msg = 418 ParsedMessage::Parse( 419 mInBuffer.c_str(), mInBuffer.size(), err != OK, &length); 420 421 if (msg == NULL) { 422 break; 423 } 424 425 sp<AMessage> notify = mNotify->dup(); 426 notify->setInt32("sessionID", mSessionID); 427 notify->setInt32("reason", kWhatData); 428 notify->setObject("data", msg); 429 notify->post(); 430 431 #if 1 432 // XXX The (old) dongle sends the wrong content length header on a 433 // SET_PARAMETER request that signals a "wfd_idr_request". 434 // (17 instead of 19). 435 const char *content = msg->getContent(); 436 if (content 437 && !memcmp(content, "wfd_idr_request\r\n", 17) 438 && length >= 19 439 && mInBuffer.c_str()[length] == '\r' 440 && mInBuffer.c_str()[length + 1] == '\n') { 441 length += 2; 442 } 443 #endif 444 445 mInBuffer.erase(0, length); 446 447 if (err != OK) { 448 break; 449 } 450 } 451 } else { 452 CHECK_EQ(mMode, MODE_WEBSOCKET); 453 454 const uint8_t *data = (const uint8_t *)mInBuffer.c_str(); 455 // hexdump(data, mInBuffer.size()); 456 457 while (mInBuffer.size() >= 2) { 458 size_t offset = 2; 459 460 unsigned payloadLen = data[1] & 0x7f; 461 if (payloadLen == 126) { 462 if (offset + 2 > mInBuffer.size()) { 463 break; 464 } 465 466 payloadLen = U16_AT(&data[offset]); 467 offset += 2; 468 } else if (payloadLen == 127) { 469 if (offset + 8 > mInBuffer.size()) { 470 break; 471 } 472 473 payloadLen = U64_AT(&data[offset]); 474 offset += 8; 475 } 476 477 uint32_t mask = 0; 478 if (data[1] & 0x80) { 479 // MASK==1 480 if (offset + 4 > mInBuffer.size()) { 481 break; 482 } 483 484 mask = U32_AT(&data[offset]); 485 offset += 4; 486 } 487 488 if (offset + payloadLen > mInBuffer.size()) { 489 break; 490 } 491 492 // We have the full message. 493 494 sp<ABuffer> packet = new ABuffer(payloadLen); 495 memcpy(packet->data(), &data[offset], payloadLen); 496 497 if (mask != 0) { 498 for (size_t i = 0; i < payloadLen; ++i) { 499 packet->data()[i] = 500 data[offset + i] 501 ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff); 502 } 503 } 504 505 sp<AMessage> notify = mNotify->dup(); 506 notify->setInt32("sessionID", mSessionID); 507 notify->setInt32("reason", kWhatWebSocketMessage); 508 notify->setBuffer("data", packet); 509 notify->setInt32("headerByte", data[0]); 510 notify->post(); 511 512 mInBuffer.erase(0, offset + payloadLen); 513 } 514 } 515 516 if (err != OK) { 517 notifyError(false /* send */, err, "Recv failed."); 518 mSawReceiveFailure = true; 519 } 520 521 return err; 522 } 523 524 void ANetworkSession::Session::dumpFragmentStats(const Fragment &frag) { 525 #if 0 526 int64_t nowUs = ALooper::GetNowUs(); 527 int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll; 528 529 static const int64_t kMinDelayMs = 0; 530 static const int64_t kMaxDelayMs = 300; 531 532 const char *kPattern = "########################################"; 533 size_t kPatternSize = strlen(kPattern); 534 535 int n = (kPatternSize * (delayMs - kMinDelayMs)) 536 / (kMaxDelayMs - kMinDelayMs); 537 538 if (n < 0) { 539 n = 0; 540 } else if ((size_t)n > kPatternSize) { 541 n = kPatternSize; 542 } 543 544 ALOGI("[%lld]: (%4lld ms) %s\n", 545 frag.mTimeUs / 1000, 546 delayMs, 547 kPattern + kPatternSize - n); 548 #endif 549 } 550 551 status_t ANetworkSession::Session::writeMore() { 552 if (mState == DATAGRAM) { 553 CHECK(!mOutFragments.empty()); 554 555 status_t err; 556 do { 557 const Fragment &frag = *mOutFragments.begin(); 558 const sp<ABuffer> &datagram = frag.mBuffer; 559 560 int n; 561 do { 562 n = send(mSocket, datagram->data(), datagram->size(), 0); 563 } while (n < 0 && errno == EINTR); 564 565 err = OK; 566 567 if (n > 0) { 568 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) { 569 dumpFragmentStats(frag); 570 } 571 572 mOutFragments.erase(mOutFragments.begin()); 573 } else if (n < 0) { 574 err = -errno; 575 } else if (n == 0) { 576 err = -ECONNRESET; 577 } 578 } while (err == OK && !mOutFragments.empty()); 579 580 if (err == -EAGAIN) { 581 if (!mOutFragments.empty()) { 582 ALOGI("%d datagrams remain queued.", mOutFragments.size()); 583 } 584 err = OK; 585 } 586 587 if (err != OK) { 588 if (!mUDPRetries) { 589 notifyError(true /* send */, err, "Send datagram failed."); 590 mSawSendFailure = true; 591 } else { 592 mUDPRetries--; 593 ALOGE("Send datagram failed, %d/%d retries left", 594 mUDPRetries, kMaxUDPRetries); 595 err = OK; 596 } 597 } else { 598 mUDPRetries = kMaxUDPRetries; 599 } 600 601 return err; 602 } 603 604 if (mState == CONNECTING) { 605 int err; 606 socklen_t optionLen = sizeof(err); 607 CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0); 608 CHECK_EQ(optionLen, (socklen_t)sizeof(err)); 609 610 if (err != 0) { 611 notifyError(kWhatError, -err, "Connection failed"); 612 mSawSendFailure = true; 613 614 return -err; 615 } 616 617 mState = CONNECTED; 618 notify(kWhatConnected); 619 620 return OK; 621 } 622 623 CHECK_EQ(mState, CONNECTED); 624 CHECK(!mOutFragments.empty()); 625 626 ssize_t n; 627 while (!mOutFragments.empty()) { 628 const Fragment &frag = *mOutFragments.begin(); 629 630 do { 631 n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0); 632 } while (n < 0 && errno == EINTR); 633 634 if (n <= 0) { 635 break; 636 } 637 638 frag.mBuffer->setRange( 639 frag.mBuffer->offset() + n, frag.mBuffer->size() - n); 640 641 if (frag.mBuffer->size() > 0) { 642 break; 643 } 644 645 if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) { 646 dumpFragmentStats(frag); 647 } 648 649 mOutFragments.erase(mOutFragments.begin()); 650 } 651 652 status_t err = OK; 653 654 if (n < 0) { 655 err = -errno; 656 } else if (n == 0) { 657 err = -ECONNRESET; 658 } 659 660 if (err != OK) { 661 notifyError(true /* send */, err, "Send failed."); 662 mSawSendFailure = true; 663 } 664 665 #if 0 666 int numBytesQueued; 667 int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued); 668 if (res == 0 && numBytesQueued > 50 * 1024) { 669 if (numBytesQueued > 409600) { 670 ALOGW("!!! numBytesQueued = %d", numBytesQueued); 671 } 672 673 int64_t nowUs = ALooper::GetNowUs(); 674 675 if (mLastStallReportUs < 0ll 676 || nowUs > mLastStallReportUs + 100000ll) { 677 sp<AMessage> msg = mNotify->dup(); 678 msg->setInt32("sessionID", mSessionID); 679 msg->setInt32("reason", kWhatNetworkStall); 680 msg->setSize("numBytesQueued", numBytesQueued); 681 msg->post(); 682 683 mLastStallReportUs = nowUs; 684 } 685 } 686 #endif 687 688 return err; 689 } 690 691 status_t ANetworkSession::Session::sendRequest( 692 const void *data, ssize_t size, bool timeValid, int64_t timeUs) { 693 CHECK(mState == CONNECTED || mState == DATAGRAM); 694 695 if (size < 0) { 696 size = strlen((const char *)data); 697 } 698 699 if (size == 0) { 700 return OK; 701 } 702 703 sp<ABuffer> buffer; 704 705 if (mState == CONNECTED && mMode == MODE_DATAGRAM) { 706 CHECK_LE(size, 65535); 707 708 buffer = new ABuffer(size + 2); 709 buffer->data()[0] = size >> 8; 710 buffer->data()[1] = size & 0xff; 711 memcpy(buffer->data() + 2, data, size); 712 } else if (mState == CONNECTED && mMode == MODE_WEBSOCKET) { 713 static const bool kUseMask = false; // Chromium doesn't like it. 714 715 size_t numHeaderBytes = 2 + (kUseMask ? 4 : 0); 716 if (size > 65535) { 717 numHeaderBytes += 8; 718 } else if (size > 125) { 719 numHeaderBytes += 2; 720 } 721 722 buffer = new ABuffer(numHeaderBytes + size); 723 buffer->data()[0] = 0x81; // FIN==1 | opcode=1 (text) 724 buffer->data()[1] = kUseMask ? 0x80 : 0x00; 725 726 if (size > 65535) { 727 buffer->data()[1] |= 127; 728 buffer->data()[2] = 0x00; 729 buffer->data()[3] = 0x00; 730 buffer->data()[4] = 0x00; 731 buffer->data()[5] = 0x00; 732 buffer->data()[6] = (size >> 24) & 0xff; 733 buffer->data()[7] = (size >> 16) & 0xff; 734 buffer->data()[8] = (size >> 8) & 0xff; 735 buffer->data()[9] = size & 0xff; 736 } else if (size > 125) { 737 buffer->data()[1] |= 126; 738 buffer->data()[2] = (size >> 8) & 0xff; 739 buffer->data()[3] = size & 0xff; 740 } else { 741 buffer->data()[1] |= size; 742 } 743 744 if (kUseMask) { 745 uint32_t mask = rand(); 746 747 buffer->data()[numHeaderBytes - 4] = (mask >> 24) & 0xff; 748 buffer->data()[numHeaderBytes - 3] = (mask >> 16) & 0xff; 749 buffer->data()[numHeaderBytes - 2] = (mask >> 8) & 0xff; 750 buffer->data()[numHeaderBytes - 1] = mask & 0xff; 751 752 for (size_t i = 0; i < (size_t)size; ++i) { 753 buffer->data()[numHeaderBytes + i] = 754 ((const uint8_t *)data)[i] 755 ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff); 756 } 757 } else { 758 memcpy(buffer->data() + numHeaderBytes, data, size); 759 } 760 } else { 761 buffer = new ABuffer(size); 762 memcpy(buffer->data(), data, size); 763 } 764 765 Fragment frag; 766 767 frag.mFlags = 0; 768 if (timeValid) { 769 frag.mFlags = FRAGMENT_FLAG_TIME_VALID; 770 frag.mTimeUs = timeUs; 771 } 772 773 frag.mBuffer = buffer; 774 775 mOutFragments.push_back(frag); 776 777 return OK; 778 } 779 780 void ANetworkSession::Session::notifyError( 781 bool send, status_t err, const char *detail) { 782 sp<AMessage> msg = mNotify->dup(); 783 msg->setInt32("sessionID", mSessionID); 784 msg->setInt32("reason", kWhatError); 785 msg->setInt32("send", send); 786 msg->setInt32("err", err); 787 msg->setString("detail", detail); 788 msg->post(); 789 } 790 791 void ANetworkSession::Session::notify(NotificationReason reason) { 792 sp<AMessage> msg = mNotify->dup(); 793 msg->setInt32("sessionID", mSessionID); 794 msg->setInt32("reason", reason); 795 msg->post(); 796 } 797 798 //////////////////////////////////////////////////////////////////////////////// 799 800 ANetworkSession::ANetworkSession() 801 : mNextSessionID(1) { 802 mPipeFd[0] = mPipeFd[1] = -1; 803 } 804 805 ANetworkSession::~ANetworkSession() { 806 stop(); 807 } 808 809 status_t ANetworkSession::start() { 810 if (mThread != NULL) { 811 return INVALID_OPERATION; 812 } 813 814 int res = pipe(mPipeFd); 815 if (res != 0) { 816 mPipeFd[0] = mPipeFd[1] = -1; 817 return -errno; 818 } 819 820 mThread = new NetworkThread(this); 821 822 status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO); 823 824 if (err != OK) { 825 mThread.clear(); 826 827 close(mPipeFd[0]); 828 close(mPipeFd[1]); 829 mPipeFd[0] = mPipeFd[1] = -1; 830 831 return err; 832 } 833 834 return OK; 835 } 836 837 status_t ANetworkSession::stop() { 838 if (mThread == NULL) { 839 return INVALID_OPERATION; 840 } 841 842 mThread->requestExit(); 843 interrupt(); 844 mThread->requestExitAndWait(); 845 846 mThread.clear(); 847 848 close(mPipeFd[0]); 849 close(mPipeFd[1]); 850 mPipeFd[0] = mPipeFd[1] = -1; 851 852 return OK; 853 } 854 855 status_t ANetworkSession::createRTSPClient( 856 const char *host, unsigned port, const sp<AMessage> ¬ify, 857 int32_t *sessionID) { 858 return createClientOrServer( 859 kModeCreateRTSPClient, 860 NULL /* addr */, 861 0 /* port */, 862 host, 863 port, 864 notify, 865 sessionID); 866 } 867 868 status_t ANetworkSession::createRTSPServer( 869 const struct in_addr &addr, unsigned port, 870 const sp<AMessage> ¬ify, int32_t *sessionID) { 871 return createClientOrServer( 872 kModeCreateRTSPServer, 873 &addr, 874 port, 875 NULL /* remoteHost */, 876 0 /* remotePort */, 877 notify, 878 sessionID); 879 } 880 881 status_t ANetworkSession::createUDPSession( 882 unsigned localPort, const sp<AMessage> ¬ify, int32_t *sessionID) { 883 return createUDPSession(localPort, NULL, 0, notify, sessionID); 884 } 885 886 status_t ANetworkSession::createUDPSession( 887 unsigned localPort, 888 const char *remoteHost, 889 unsigned remotePort, 890 const sp<AMessage> ¬ify, 891 int32_t *sessionID) { 892 return createClientOrServer( 893 kModeCreateUDPSession, 894 NULL /* addr */, 895 localPort, 896 remoteHost, 897 remotePort, 898 notify, 899 sessionID); 900 } 901 902 status_t ANetworkSession::createTCPDatagramSession( 903 const struct in_addr &addr, unsigned port, 904 const sp<AMessage> ¬ify, int32_t *sessionID) { 905 return createClientOrServer( 906 kModeCreateTCPDatagramSessionPassive, 907 &addr, 908 port, 909 NULL /* remoteHost */, 910 0 /* remotePort */, 911 notify, 912 sessionID); 913 } 914 915 status_t ANetworkSession::createTCPDatagramSession( 916 unsigned localPort, 917 const char *remoteHost, 918 unsigned remotePort, 919 const sp<AMessage> ¬ify, 920 int32_t *sessionID) { 921 return createClientOrServer( 922 kModeCreateTCPDatagramSessionActive, 923 NULL /* addr */, 924 localPort, 925 remoteHost, 926 remotePort, 927 notify, 928 sessionID); 929 } 930 931 status_t ANetworkSession::destroySession(int32_t sessionID) { 932 Mutex::Autolock autoLock(mLock); 933 934 ssize_t index = mSessions.indexOfKey(sessionID); 935 936 if (index < 0) { 937 return -ENOENT; 938 } 939 940 mSessions.removeItemsAt(index); 941 942 interrupt(); 943 944 return OK; 945 } 946 947 // static 948 status_t ANetworkSession::MakeSocketNonBlocking(int s) { 949 int flags = fcntl(s, F_GETFL, 0); 950 if (flags < 0) { 951 flags = 0; 952 } 953 954 int res = fcntl(s, F_SETFL, flags | O_NONBLOCK); 955 if (res < 0) { 956 return -errno; 957 } 958 959 return OK; 960 } 961 962 status_t ANetworkSession::createClientOrServer( 963 Mode mode, 964 const struct in_addr *localAddr, 965 unsigned port, 966 const char *remoteHost, 967 unsigned remotePort, 968 const sp<AMessage> ¬ify, 969 int32_t *sessionID) { 970 Mutex::Autolock autoLock(mLock); 971 972 *sessionID = 0; 973 status_t err = OK; 974 int s, res; 975 sp<Session> session; 976 977 s = socket( 978 AF_INET, 979 (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM, 980 0); 981 982 if (s < 0) { 983 err = -errno; 984 goto bail; 985 } 986 987 if (mode == kModeCreateRTSPServer 988 || mode == kModeCreateTCPDatagramSessionPassive) { 989 const int yes = 1; 990 res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); 991 992 if (res < 0) { 993 err = -errno; 994 goto bail2; 995 } 996 } 997 998 if (mode == kModeCreateUDPSession) { 999 int size = 256 * 1024; 1000 1001 res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)); 1002 1003 if (res < 0) { 1004 err = -errno; 1005 goto bail2; 1006 } 1007 1008 res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)); 1009 1010 if (res < 0) { 1011 err = -errno; 1012 goto bail2; 1013 } 1014 } else if (mode == kModeCreateTCPDatagramSessionActive) { 1015 int flag = 1; 1016 res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)); 1017 1018 if (res < 0) { 1019 err = -errno; 1020 goto bail2; 1021 } 1022 1023 int tos = 224; // VOICE 1024 res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)); 1025 1026 if (res < 0) { 1027 err = -errno; 1028 goto bail2; 1029 } 1030 } 1031 1032 err = MakeSocketNonBlocking(s); 1033 1034 if (err != OK) { 1035 goto bail2; 1036 } 1037 1038 struct sockaddr_in addr; 1039 memset(addr.sin_zero, 0, sizeof(addr.sin_zero)); 1040 addr.sin_family = AF_INET; 1041 1042 if (mode == kModeCreateRTSPClient 1043 || mode == kModeCreateTCPDatagramSessionActive) { 1044 struct hostent *ent= gethostbyname(remoteHost); 1045 if (ent == NULL) { 1046 err = -h_errno; 1047 goto bail2; 1048 } 1049 1050 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 1051 addr.sin_port = htons(remotePort); 1052 } else if (localAddr != NULL) { 1053 addr.sin_addr = *localAddr; 1054 addr.sin_port = htons(port); 1055 } else { 1056 addr.sin_addr.s_addr = htonl(INADDR_ANY); 1057 addr.sin_port = htons(port); 1058 } 1059 1060 if (mode == kModeCreateRTSPClient 1061 || mode == kModeCreateTCPDatagramSessionActive) { 1062 in_addr_t x = ntohl(addr.sin_addr.s_addr); 1063 ALOGI("connecting socket %d to %d.%d.%d.%d:%d", 1064 s, 1065 (x >> 24), 1066 (x >> 16) & 0xff, 1067 (x >> 8) & 0xff, 1068 x & 0xff, 1069 ntohs(addr.sin_port)); 1070 1071 res = connect(s, (const struct sockaddr *)&addr, sizeof(addr)); 1072 1073 CHECK_LT(res, 0); 1074 if (errno == EINPROGRESS) { 1075 res = 0; 1076 } 1077 } else { 1078 res = bind(s, (const struct sockaddr *)&addr, sizeof(addr)); 1079 1080 if (res == 0) { 1081 if (mode == kModeCreateRTSPServer 1082 || mode == kModeCreateTCPDatagramSessionPassive) { 1083 res = listen(s, 4); 1084 } else { 1085 CHECK_EQ(mode, kModeCreateUDPSession); 1086 1087 if (remoteHost != NULL) { 1088 struct sockaddr_in remoteAddr; 1089 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); 1090 remoteAddr.sin_family = AF_INET; 1091 remoteAddr.sin_port = htons(remotePort); 1092 1093 struct hostent *ent= gethostbyname(remoteHost); 1094 if (ent == NULL) { 1095 err = -h_errno; 1096 goto bail2; 1097 } 1098 1099 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 1100 1101 res = connect( 1102 s, 1103 (const struct sockaddr *)&remoteAddr, 1104 sizeof(remoteAddr)); 1105 } 1106 } 1107 } 1108 } 1109 1110 if (res < 0) { 1111 err = -errno; 1112 goto bail2; 1113 } 1114 1115 Session::State state; 1116 switch (mode) { 1117 case kModeCreateRTSPClient: 1118 state = Session::CONNECTING; 1119 break; 1120 1121 case kModeCreateTCPDatagramSessionActive: 1122 state = Session::CONNECTING; 1123 break; 1124 1125 case kModeCreateTCPDatagramSessionPassive: 1126 state = Session::LISTENING_TCP_DGRAMS; 1127 break; 1128 1129 case kModeCreateRTSPServer: 1130 state = Session::LISTENING_RTSP; 1131 break; 1132 1133 default: 1134 CHECK_EQ(mode, kModeCreateUDPSession); 1135 state = Session::DATAGRAM; 1136 break; 1137 } 1138 1139 session = new Session( 1140 mNextSessionID++, 1141 state, 1142 s, 1143 notify); 1144 1145 if (mode == kModeCreateTCPDatagramSessionActive) { 1146 session->setMode(Session::MODE_DATAGRAM); 1147 } else if (mode == kModeCreateRTSPClient) { 1148 session->setMode(Session::MODE_RTSP); 1149 } 1150 1151 mSessions.add(session->sessionID(), session); 1152 1153 interrupt(); 1154 1155 *sessionID = session->sessionID(); 1156 1157 goto bail; 1158 1159 bail2: 1160 close(s); 1161 s = -1; 1162 1163 bail: 1164 return err; 1165 } 1166 1167 status_t ANetworkSession::connectUDPSession( 1168 int32_t sessionID, const char *remoteHost, unsigned remotePort) { 1169 Mutex::Autolock autoLock(mLock); 1170 1171 ssize_t index = mSessions.indexOfKey(sessionID); 1172 1173 if (index < 0) { 1174 return -ENOENT; 1175 } 1176 1177 const sp<Session> session = mSessions.valueAt(index); 1178 int s = session->socket(); 1179 1180 struct sockaddr_in remoteAddr; 1181 memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero)); 1182 remoteAddr.sin_family = AF_INET; 1183 remoteAddr.sin_port = htons(remotePort); 1184 1185 status_t err = OK; 1186 struct hostent *ent = gethostbyname(remoteHost); 1187 if (ent == NULL) { 1188 err = -h_errno; 1189 } else { 1190 remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; 1191 1192 int res = connect( 1193 s, 1194 (const struct sockaddr *)&remoteAddr, 1195 sizeof(remoteAddr)); 1196 1197 if (res < 0) { 1198 err = -errno; 1199 } 1200 } 1201 1202 return err; 1203 } 1204 1205 status_t ANetworkSession::sendRequest( 1206 int32_t sessionID, const void *data, ssize_t size, 1207 bool timeValid, int64_t timeUs) { 1208 Mutex::Autolock autoLock(mLock); 1209 1210 ssize_t index = mSessions.indexOfKey(sessionID); 1211 1212 if (index < 0) { 1213 return -ENOENT; 1214 } 1215 1216 const sp<Session> session = mSessions.valueAt(index); 1217 1218 status_t err = session->sendRequest(data, size, timeValid, timeUs); 1219 1220 interrupt(); 1221 1222 return err; 1223 } 1224 1225 status_t ANetworkSession::switchToWebSocketMode(int32_t sessionID) { 1226 Mutex::Autolock autoLock(mLock); 1227 1228 ssize_t index = mSessions.indexOfKey(sessionID); 1229 1230 if (index < 0) { 1231 return -ENOENT; 1232 } 1233 1234 const sp<Session> session = mSessions.valueAt(index); 1235 return session->switchToWebSocketMode(); 1236 } 1237 1238 void ANetworkSession::interrupt() { 1239 static const char dummy = 0; 1240 1241 ssize_t n; 1242 do { 1243 n = write(mPipeFd[1], &dummy, 1); 1244 } while (n < 0 && errno == EINTR); 1245 1246 if (n < 0) { 1247 ALOGW("Error writing to pipe (%s)", strerror(errno)); 1248 } 1249 } 1250 1251 void ANetworkSession::threadLoop() { 1252 fd_set rs, ws; 1253 FD_ZERO(&rs); 1254 FD_ZERO(&ws); 1255 1256 FD_SET(mPipeFd[0], &rs); 1257 int maxFd = mPipeFd[0]; 1258 1259 { 1260 Mutex::Autolock autoLock(mLock); 1261 1262 for (size_t i = 0; i < mSessions.size(); ++i) { 1263 const sp<Session> &session = mSessions.valueAt(i); 1264 1265 int s = session->socket(); 1266 1267 if (s < 0) { 1268 continue; 1269 } 1270 1271 if (session->wantsToRead()) { 1272 FD_SET(s, &rs); 1273 if (s > maxFd) { 1274 maxFd = s; 1275 } 1276 } 1277 1278 if (session->wantsToWrite()) { 1279 FD_SET(s, &ws); 1280 if (s > maxFd) { 1281 maxFd = s; 1282 } 1283 } 1284 } 1285 } 1286 1287 int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */); 1288 1289 if (res == 0) { 1290 return; 1291 } 1292 1293 if (res < 0) { 1294 if (errno == EINTR) { 1295 return; 1296 } 1297 1298 ALOGE("select failed w/ error %d (%s)", errno, strerror(errno)); 1299 return; 1300 } 1301 1302 if (FD_ISSET(mPipeFd[0], &rs)) { 1303 char c; 1304 ssize_t n; 1305 do { 1306 n = read(mPipeFd[0], &c, 1); 1307 } while (n < 0 && errno == EINTR); 1308 1309 if (n < 0) { 1310 ALOGW("Error reading from pipe (%s)", strerror(errno)); 1311 } 1312 1313 --res; 1314 } 1315 1316 { 1317 Mutex::Autolock autoLock(mLock); 1318 1319 List<sp<Session> > sessionsToAdd; 1320 1321 for (size_t i = mSessions.size(); res > 0 && i-- > 0;) { 1322 const sp<Session> &session = mSessions.valueAt(i); 1323 1324 int s = session->socket(); 1325 1326 if (s < 0) { 1327 continue; 1328 } 1329 1330 if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) { 1331 --res; 1332 } 1333 1334 if (FD_ISSET(s, &rs)) { 1335 if (session->isRTSPServer() || session->isTCPDatagramServer()) { 1336 struct sockaddr_in remoteAddr; 1337 socklen_t remoteAddrLen = sizeof(remoteAddr); 1338 1339 int clientSocket = accept( 1340 s, (struct sockaddr *)&remoteAddr, &remoteAddrLen); 1341 1342 if (clientSocket >= 0) { 1343 status_t err = MakeSocketNonBlocking(clientSocket); 1344 1345 if (err != OK) { 1346 ALOGE("Unable to make client socket non blocking, " 1347 "failed w/ error %d (%s)", 1348 err, strerror(-err)); 1349 1350 close(clientSocket); 1351 clientSocket = -1; 1352 } else { 1353 in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr); 1354 1355 ALOGI("incoming connection from %d.%d.%d.%d:%d " 1356 "(socket %d)", 1357 (addr >> 24), 1358 (addr >> 16) & 0xff, 1359 (addr >> 8) & 0xff, 1360 addr & 0xff, 1361 ntohs(remoteAddr.sin_port), 1362 clientSocket); 1363 1364 sp<Session> clientSession = 1365 new Session( 1366 mNextSessionID++, 1367 Session::CONNECTED, 1368 clientSocket, 1369 session->getNotificationMessage()); 1370 1371 clientSession->setMode( 1372 session->isRTSPServer() 1373 ? Session::MODE_RTSP 1374 : Session::MODE_DATAGRAM); 1375 1376 sessionsToAdd.push_back(clientSession); 1377 } 1378 } else { 1379 ALOGE("accept returned error %d (%s)", 1380 errno, strerror(errno)); 1381 } 1382 } else { 1383 status_t err = session->readMore(); 1384 if (err != OK) { 1385 ALOGE("readMore on socket %d failed w/ error %d (%s)", 1386 s, err, strerror(-err)); 1387 } 1388 } 1389 } 1390 1391 if (FD_ISSET(s, &ws)) { 1392 status_t err = session->writeMore(); 1393 if (err != OK) { 1394 ALOGE("writeMore on socket %d failed w/ error %d (%s)", 1395 s, err, strerror(-err)); 1396 } 1397 } 1398 } 1399 1400 while (!sessionsToAdd.empty()) { 1401 sp<Session> session = *sessionsToAdd.begin(); 1402 sessionsToAdd.erase(sessionsToAdd.begin()); 1403 1404 mSessions.add(session->sessionID(), session); 1405 1406 ALOGI("added clientSession %d", session->sessionID()); 1407 } 1408 } 1409 } 1410 1411 } // namespace android 1412 1413