1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define LOG_TAG "LibAAH_RTP" 18 //#define LOG_NDEBUG 0 19 #include <utils/Log.h> 20 21 #include <fcntl.h> 22 #include <poll.h> 23 #include <sys/socket.h> 24 #include <time.h> 25 #include <utils/misc.h> 26 27 #include <media/stagefright/Utils.h> 28 29 #include "aah_rx_player.h" 30 #include "aah_tx_packet.h" 31 32 namespace android { 33 34 const uint32_t AAH_RXPlayer::kRetransRequestMagic = 35 FOURCC('T','r','e','q'); 36 const uint32_t AAH_RXPlayer::kRetransNAKMagic = 37 FOURCC('T','n','a','k'); 38 const uint32_t AAH_RXPlayer::kFastStartRequestMagic = 39 FOURCC('T','f','s','t'); 40 const uint32_t AAH_RXPlayer::kGapRerequestTimeoutUSec = 75000; 41 const uint32_t AAH_RXPlayer::kFastStartTimeoutUSec = 800000; 42 const uint32_t AAH_RXPlayer::kRTPActivityTimeoutUSec = 10000000; 43 44 static inline int16_t fetchInt16(uint8_t* data) { 45 return static_cast<int16_t>(U16_AT(data)); 46 } 47 48 static inline int32_t fetchInt32(uint8_t* data) { 49 return static_cast<int32_t>(U32_AT(data)); 50 } 51 52 static inline int64_t fetchInt64(uint8_t* data) { 53 return static_cast<int64_t>(U64_AT(data)); 54 } 55 56 uint64_t AAH_RXPlayer::monotonicUSecNow() { 57 struct timespec now; 58 int res = clock_gettime(CLOCK_MONOTONIC, &now); 59 CHECK(res >= 0); 60 61 uint64_t ret = static_cast<uint64_t>(now.tv_sec) * 1000000; 62 ret += now.tv_nsec / 1000; 63 64 return ret; 65 } 66 67 status_t AAH_RXPlayer::startWorkThread() { 68 status_t res; 69 stopWorkThread(); 70 res = thread_wrapper_->run("TRX_Player", PRIORITY_AUDIO); 71 72 if (res != OK) { 73 ALOGE("Failed to start work thread (res = %d)", res); 74 } 75 76 return res; 77 } 78 79 void AAH_RXPlayer::stopWorkThread() { 80 thread_wrapper_->requestExit(); // set the exit pending flag 81 wakeup_work_thread_evt_.setEvent(); 82 83 status_t res; 84 res = thread_wrapper_->requestExitAndWait(); // block until thread exit. 85 if (res != OK) { 86 ALOGE("Failed to stop work thread (res = %d)", res); 87 } 88 89 wakeup_work_thread_evt_.clearPendingEvents(); 90 } 91 92 void AAH_RXPlayer::cleanupSocket() { 93 if (sock_fd_ >= 0) { 94 if (multicast_joined_) { 95 int res; 96 struct ip_mreq mreq; 97 mreq.imr_multiaddr = listen_addr_.sin_addr; 98 mreq.imr_interface.s_addr = htonl(INADDR_ANY); 99 res = setsockopt(sock_fd_, 100 IPPROTO_IP, 101 IP_DROP_MEMBERSHIP, 102 &mreq, sizeof(mreq)); 103 if (res < 0) { 104 ALOGW("Failed to leave multicast group. (%d, %d)", res, errno); 105 } 106 multicast_joined_ = false; 107 } 108 109 close(sock_fd_); 110 sock_fd_ = -1; 111 } 112 113 resetPipeline(); 114 } 115 116 void AAH_RXPlayer::resetPipeline() { 117 ring_buffer_.reset(); 118 119 // Explicitly shudown all of the active substreams, then call clear out the 120 // collection. Failure to clear out a substream can result in its decoder 121 // holding a reference to itself and therefor not going away when the 122 // collection is cleared. 123 for (size_t i = 0; i < substreams_.size(); ++i) 124 substreams_.valueAt(i)->shutdown(); 125 126 substreams_.clear(); 127 128 current_gap_status_ = kGS_NoGap; 129 } 130 131 bool AAH_RXPlayer::setupSocket() { 132 long flags; 133 int res, buf_size; 134 socklen_t opt_size; 135 136 cleanupSocket(); 137 CHECK(sock_fd_ < 0); 138 139 // Make the socket 140 sock_fd_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 141 if (sock_fd_ < 0) { 142 ALOGE("Failed to create listen socket (errno %d)", errno); 143 goto bailout; 144 } 145 146 // Set non-blocking operation 147 flags = fcntl(sock_fd_, F_GETFL); 148 res = fcntl(sock_fd_, F_SETFL, flags | O_NONBLOCK); 149 if (res < 0) { 150 ALOGE("Failed to set socket (%d) to non-blocking mode (errno %d)", 151 sock_fd_, errno); 152 goto bailout; 153 } 154 155 // Bind to our port 156 struct sockaddr_in bind_addr; 157 memset(&bind_addr, 0, sizeof(bind_addr)); 158 bind_addr.sin_family = AF_INET; 159 bind_addr.sin_addr.s_addr = INADDR_ANY; 160 bind_addr.sin_port = listen_addr_.sin_port; 161 res = bind(sock_fd_, 162 reinterpret_cast<const sockaddr*>(&bind_addr), 163 sizeof(bind_addr)); 164 if (res < 0) { 165 uint32_t a = ntohl(bind_addr.sin_addr.s_addr); 166 uint16_t p = ntohs(bind_addr.sin_port); 167 ALOGE("Failed to bind socket (%d) to %d.%d.%d.%d:%hd. (errno %d)", 168 sock_fd_, 169 (a >> 24) & 0xFF, 170 (a >> 16) & 0xFF, 171 (a >> 8) & 0xFF, 172 (a ) & 0xFF, 173 p, 174 errno); 175 176 goto bailout; 177 } 178 179 buf_size = 1 << 16; // 64k 180 res = setsockopt(sock_fd_, 181 SOL_SOCKET, SO_RCVBUF, 182 &buf_size, sizeof(buf_size)); 183 if (res < 0) { 184 ALOGW("Failed to increase socket buffer size to %d. (errno %d)", 185 buf_size, errno); 186 } 187 188 buf_size = 0; 189 opt_size = sizeof(buf_size); 190 res = getsockopt(sock_fd_, 191 SOL_SOCKET, SO_RCVBUF, 192 &buf_size, &opt_size); 193 if (res < 0) { 194 ALOGW("Failed to fetch socket buffer size. (errno %d)", errno); 195 } else { 196 ALOGI("RX socket buffer size is now %d bytes", buf_size); 197 } 198 199 if (listen_addr_.sin_addr.s_addr) { 200 // Join the multicast group and we should be good to go. 201 struct ip_mreq mreq; 202 mreq.imr_multiaddr = listen_addr_.sin_addr; 203 mreq.imr_interface.s_addr = htonl(INADDR_ANY); 204 res = setsockopt(sock_fd_, 205 IPPROTO_IP, 206 IP_ADD_MEMBERSHIP, 207 &mreq, sizeof(mreq)); 208 if (res < 0) { 209 ALOGE("Failed to join multicast group. (errno %d)", errno); 210 goto bailout; 211 } 212 multicast_joined_ = true; 213 } 214 215 return true; 216 217 bailout: 218 cleanupSocket(); 219 return false; 220 } 221 222 bool AAH_RXPlayer::threadLoop() { 223 struct pollfd poll_fds[2]; 224 bool process_more_right_now = false; 225 226 if (!setupSocket()) { 227 sendEvent(MEDIA_ERROR); 228 goto bailout; 229 } 230 231 while (!thread_wrapper_->exitPending()) { 232 // Step 1: Wait until there is something to do. 233 int gap_timeout = computeNextGapRetransmitTimeout(); 234 int ring_timeout = ring_buffer_.computeInactivityTimeout(); 235 int timeout = -1; 236 237 if (!ring_timeout) { 238 ALOGW("RTP inactivity timeout reached, resetting pipeline."); 239 resetPipeline(); 240 timeout = gap_timeout; 241 } else { 242 if (gap_timeout < 0) { 243 timeout = ring_timeout; 244 } else if (ring_timeout < 0) { 245 timeout = gap_timeout; 246 } else { 247 timeout = (gap_timeout < ring_timeout) ? gap_timeout 248 : ring_timeout; 249 } 250 } 251 252 if ((0 != timeout) && (!process_more_right_now)) { 253 // Set up the events to wait on. Start with the wakeup pipe. 254 memset(&poll_fds, 0, sizeof(poll_fds)); 255 poll_fds[0].fd = wakeup_work_thread_evt_.getWakeupHandle(); 256 poll_fds[0].events = POLLIN; 257 258 // Add the RX socket. 259 poll_fds[1].fd = sock_fd_; 260 poll_fds[1].events = POLLIN; 261 262 // Wait for something interesing to happen. 263 int poll_res = poll(poll_fds, NELEM(poll_fds), timeout); 264 if (poll_res < 0) { 265 ALOGE("Fatal error (%d,%d) while waiting on events", 266 poll_res, errno); 267 sendEvent(MEDIA_ERROR); 268 goto bailout; 269 } 270 } 271 272 if (thread_wrapper_->exitPending()) { 273 break; 274 } 275 276 wakeup_work_thread_evt_.clearPendingEvents(); 277 process_more_right_now = false; 278 279 // Step 2: Do we have data waiting in the socket? If so, drain the 280 // socket moving valid RTP information into the ring buffer to be 281 // processed. 282 if (poll_fds[1].revents) { 283 struct sockaddr_in from; 284 socklen_t from_len; 285 286 ssize_t res = 0; 287 while (!thread_wrapper_->exitPending()) { 288 // Check the size of any pending packet. 289 res = recv(sock_fd_, NULL, 0, MSG_PEEK | MSG_TRUNC); 290 291 // Error? 292 if (res < 0) { 293 // If the error is anything other than would block, 294 // something has gone very wrong. 295 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 296 ALOGE("Fatal socket error during recvfrom (%d, %d)", 297 (int)res, errno); 298 goto bailout; 299 } 300 301 // Socket is out of data, just break out of processing and 302 // wait for more. 303 break; 304 } 305 306 // Allocate a payload. 307 PacketBuffer* pb = PacketBuffer::allocate(res); 308 if (NULL == pb) { 309 ALOGE("Fatal error, failed to allocate packet buffer of" 310 " length %u", static_cast<uint32_t>(res)); 311 goto bailout; 312 } 313 314 // Fetch the data. 315 from_len = sizeof(from); 316 res = recvfrom(sock_fd_, pb->data_, pb->length_, 0, 317 reinterpret_cast<struct sockaddr*>(&from), 318 &from_len); 319 if (res != pb->length_) { 320 ALOGE("Fatal error, fetched packet length (%d) does not" 321 " match peeked packet length (%u). This should never" 322 " happen. (errno = %d)", 323 static_cast<int>(res), 324 static_cast<uint32_t>(pb->length_), 325 errno); 326 } 327 328 bool drop_packet = false; 329 if (transmitter_known_) { 330 if (from.sin_addr.s_addr != 331 transmitter_addr_.sin_addr.s_addr) { 332 uint32_t a = ntohl(from.sin_addr.s_addr); 333 uint16_t p = ntohs(from.sin_port); 334 ALOGV("Dropping packet from unknown transmitter" 335 " %u.%u.%u.%u:%hu", 336 ((a >> 24) & 0xFF), 337 ((a >> 16) & 0xFF), 338 ((a >> 8) & 0xFF), 339 ( a & 0xFF), 340 p); 341 342 drop_packet = true; 343 } else { 344 transmitter_addr_.sin_port = from.sin_port; 345 } 346 } else { 347 memcpy(&transmitter_addr_, &from, sizeof(from)); 348 transmitter_known_ = true; 349 } 350 351 if (!drop_packet) { 352 bool serious_error = !processRX(pb); 353 354 if (serious_error) { 355 // Something went "seriously wrong". Currently, the 356 // only trigger for this should be a ring buffer 357 // overflow. The current failsafe behavior for when 358 // something goes seriously wrong is to just reset the 359 // pipeline. The system should behave as if this 360 // AAH_RXPlayer was just set up for the first time. 361 ALOGE("Something just went seriously wrong with the" 362 " pipeline. Resetting."); 363 resetPipeline(); 364 } 365 } else { 366 PacketBuffer::destroy(pb); 367 } 368 } 369 } 370 371 // Step 3: Process any data we mave have accumulated in the ring buffer 372 // so far. 373 if (!thread_wrapper_->exitPending()) { 374 processRingBuffer(); 375 } 376 377 // Step 4: At this point in time, the ring buffer should either be 378 // empty, or stalled in front of a gap caused by some dropped packets. 379 // Check on the current gap situation and deal with it in an appropriate 380 // fashion. If processGaps returns true, it means that it has given up 381 // on a gap and that we should try to process some more data 382 // immediately. 383 if (!thread_wrapper_->exitPending()) { 384 process_more_right_now = processGaps(); 385 } 386 387 // Step 5: Check for fatal errors. If any of our substreams has 388 // encountered a fatal, unrecoverable, error, then propagate the error 389 // up to user level and shut down. 390 for (size_t i = 0; i < substreams_.size(); ++i) { 391 status_t status; 392 CHECK(substreams_.valueAt(i) != NULL); 393 394 status = substreams_.valueAt(i)->getStatus(); 395 if (OK != status) { 396 ALOGE("Substream index %d has encountered an unrecoverable" 397 " error (%d). Signalling application level and shutting" 398 " down.", i, status); 399 sendEvent(MEDIA_ERROR); 400 goto bailout; 401 } 402 } 403 } 404 405 bailout: 406 cleanupSocket(); 407 return false; 408 } 409 410 bool AAH_RXPlayer::processRX(PacketBuffer* pb) { 411 CHECK(NULL != pb); 412 413 uint8_t* data = pb->data_; 414 ssize_t amt = pb->length_; 415 uint32_t nak_magic; 416 uint16_t seq_no; 417 uint32_t epoch; 418 419 // Every packet either starts with an RTP header which is at least 12 bytes 420 // long or is a retry NAK which is 14 bytes long. If there are fewer than 421 // 12 bytes here, this cannot be a proper RTP packet. 422 if (amt < 12) { 423 ALOGV("Dropping packet, too short to contain RTP header (%u bytes)", 424 static_cast<uint32_t>(amt)); 425 goto drop_packet; 426 } 427 428 // Check to see if this is the special case of a NAK packet. 429 nak_magic = ntohl(*(reinterpret_cast<uint32_t*>(data))); 430 if (nak_magic == kRetransNAKMagic) { 431 // Looks like a NAK packet; make sure its long enough. 432 433 if (amt < static_cast<ssize_t>(sizeof(RetransRequest))) { 434 ALOGV("Dropping packet, too short to contain NAK payload" 435 " (%u bytes)", static_cast<uint32_t>(amt)); 436 goto drop_packet; 437 } 438 439 SeqNoGap gap; 440 RetransRequest* rtr = reinterpret_cast<RetransRequest*>(data); 441 gap.start_seq_ = ntohs(rtr->start_seq_); 442 gap.end_seq_ = ntohs(rtr->end_seq_); 443 444 ALOGV("Process NAK for gap at [%hu, %hu]", 445 gap.start_seq_, gap.end_seq_); 446 ring_buffer_.processNAK(&gap); 447 448 return true; 449 } 450 451 // According to the TRTP spec, version should be 2, padding should be 0, 452 // extension should be 0 and CSRCCnt should be 0. If any of these tests 453 // fail, we chuck the packet. 454 if (data[0] != 0x80) { 455 ALOGV("Dropping packet, bad V/P/X/CSRCCnt field (0x%02x)", 456 data[0]); 457 goto drop_packet; 458 } 459 460 // Check the payload type. For TRTP, it should always be 100. 461 if ((data[1] & 0x7F) != 100) { 462 ALOGV("Dropping packet, bad payload type. (%u)", 463 data[1] & 0x7F); 464 goto drop_packet; 465 } 466 467 // Check whether the transmitter has begun a new epoch. 468 epoch = (U32_AT(data + 8) >> 10) & 0x3FFFFF; 469 if (current_epoch_known_) { 470 if (epoch != current_epoch_) { 471 ALOGV("%s: new epoch %u", __PRETTY_FUNCTION__, epoch); 472 current_epoch_ = epoch; 473 resetPipeline(); 474 } 475 } else { 476 current_epoch_ = epoch; 477 current_epoch_known_ = true; 478 } 479 480 // Extract the sequence number and hand the packet off to the ring buffer 481 // for dropped packet detection and later processing. 482 seq_no = U16_AT(data + 2); 483 return ring_buffer_.pushBuffer(pb, seq_no); 484 485 drop_packet: 486 PacketBuffer::destroy(pb); 487 return true; 488 } 489 490 void AAH_RXPlayer::processRingBuffer() { 491 PacketBuffer* pb; 492 bool is_discon; 493 sp<Substream> substream; 494 LinearTransform trans; 495 bool foundTrans = false; 496 497 while (NULL != (pb = ring_buffer_.fetchBuffer(&is_discon))) { 498 if (is_discon) { 499 // Abort all partially assembled payloads. 500 for (size_t i = 0; i < substreams_.size(); ++i) { 501 CHECK(substreams_.valueAt(i) != NULL); 502 substreams_.valueAt(i)->cleanupBufferInProgress(); 503 } 504 } 505 506 uint8_t* data = pb->data_; 507 ssize_t amt = pb->length_; 508 509 // Should not have any non-RTP packets in the ring buffer. RTP packets 510 // must be at least 12 bytes long. 511 CHECK(amt >= 12); 512 513 // Extract the marker bit and the SSRC field. 514 bool marker = (data[1] & 0x80) != 0; 515 uint32_t ssrc = U32_AT(data + 8); 516 517 // Is this the start of a new TRTP payload? If so, the marker bit 518 // should be set and there are some things we should be checking for. 519 if (marker) { 520 // TRTP headers need to have at least a byte for version, a byte for 521 // payload type and flags, and 4 bytes for length. 522 if (amt < 18) { 523 ALOGV("Dropping packet, too short to contain TRTP header" 524 " (%u bytes)", static_cast<uint32_t>(amt)); 525 goto process_next_packet; 526 } 527 528 // Check the TRTP version and extract the payload type/flags. 529 uint8_t trtp_version = data[12]; 530 uint8_t payload_type = (data[13] >> 4) & 0xF; 531 uint8_t trtp_flags = data[13] & 0xF; 532 533 if (1 != trtp_version) { 534 ALOGV("Dropping packet, bad trtp version %hhu", trtp_version); 535 goto process_next_packet; 536 } 537 538 // Is there a timestamp transformation present on this packet? If 539 // so, extract it and pass it to the appropriate substreams. 540 if (trtp_flags & 0x02) { 541 ssize_t offset = 18 + ((trtp_flags & 0x01) ? 4 : 0); 542 if (amt < (offset + 24)) { 543 ALOGV("Dropping packet, too short to contain TRTP Timestamp" 544 " Transformation (%u bytes)", 545 static_cast<uint32_t>(amt)); 546 goto process_next_packet; 547 } 548 549 trans.a_zero = fetchInt64(data + offset); 550 trans.b_zero = fetchInt64(data + offset + 16); 551 trans.a_to_b_numer = static_cast<int32_t>( 552 fetchInt32 (data + offset + 8)); 553 trans.a_to_b_denom = U32_AT(data + offset + 12); 554 foundTrans = true; 555 556 uint32_t program_id = (ssrc >> 5) & 0x1F; 557 for (size_t i = 0; i < substreams_.size(); ++i) { 558 sp<Substream> iter = substreams_.valueAt(i); 559 CHECK(iter != NULL); 560 561 if (iter->getProgramID() == program_id) { 562 iter->processTSTransform(trans); 563 } 564 } 565 } 566 567 // Is this a command packet? If so, its not necessarily associate 568 // with one particular substream. Just give it to the command 569 // packet handler and then move on. 570 if (4 == payload_type) { 571 processCommandPacket(pb); 572 goto process_next_packet; 573 } 574 } 575 576 // If we got to here, then we are a normal packet. Find (or allocate) 577 // the substream we belong to and send the packet off to be processed. 578 substream = substreams_.valueFor(ssrc); 579 if (substream == NULL) { 580 substream = new Substream(ssrc, omx_); 581 if (substream == NULL) { 582 ALOGE("Failed to allocate substream for SSRC 0x%08x", ssrc); 583 goto process_next_packet; 584 } 585 substreams_.add(ssrc, substream); 586 587 if (foundTrans) { 588 substream->processTSTransform(trans); 589 } 590 } 591 592 CHECK(substream != NULL); 593 594 if (marker) { 595 // Start of a new TRTP payload for this substream. Extract the 596 // lower 32 bits of the timestamp and hand the buffer to the 597 // substream for processing. 598 uint32_t ts_lower = U32_AT(data + 4); 599 substream->processPayloadStart(data + 12, amt - 12, ts_lower); 600 } else { 601 // Continuation of an existing TRTP payload. Just hand it off to 602 // the substream for processing. 603 substream->processPayloadCont(data + 12, amt - 12); 604 } 605 606 process_next_packet: 607 PacketBuffer::destroy(pb); 608 } // end of main processing while loop. 609 } 610 611 void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) { 612 CHECK(NULL != pb); 613 614 uint8_t* data = pb->data_; 615 ssize_t amt = pb->length_; 616 617 // verify that this packet meets the minimum length of a command packet 618 if (amt < 20) { 619 return; 620 } 621 622 uint8_t trtp_version = data[12]; 623 uint8_t trtp_flags = data[13] & 0xF; 624 625 if (1 != trtp_version) { 626 ALOGV("Dropping packet, bad trtp version %hhu", trtp_version); 627 return; 628 } 629 630 // calculate the start of the command payload 631 ssize_t offset = 18; 632 if (trtp_flags & 0x01) { 633 // timestamp is present (4 bytes) 634 offset += 4; 635 } 636 if (trtp_flags & 0x02) { 637 // transform is present (24 bytes) 638 offset += 24; 639 } 640 641 // the packet must contain 2 bytes of command payload beyond the TRTP header 642 if (amt < offset + 2) { 643 return; 644 } 645 646 uint16_t command_id = U16_AT(data + offset); 647 648 switch (command_id) { 649 case TRTPControlPacket::kCommandNop: 650 break; 651 652 case TRTPControlPacket::kCommandEOS: 653 case TRTPControlPacket::kCommandFlush: { 654 uint16_t program_id = (U32_AT(data + 8) >> 5) & 0x1F; 655 ALOGI("*** %s flushing program_id=%d", 656 __PRETTY_FUNCTION__, program_id); 657 658 Vector<uint32_t> substreams_to_remove; 659 for (size_t i = 0; i < substreams_.size(); ++i) { 660 sp<Substream> iter = substreams_.valueAt(i); 661 if (iter->getProgramID() == program_id) { 662 iter->shutdown(); 663 substreams_to_remove.add(iter->getSSRC()); 664 } 665 } 666 667 for (size_t i = 0; i < substreams_to_remove.size(); ++i) { 668 substreams_.removeItem(substreams_to_remove[i]); 669 } 670 } break; 671 } 672 } 673 674 bool AAH_RXPlayer::processGaps() { 675 // Deal with the current gap situation. Specifically... 676 // 677 // 1) If a new gap has shown up, send a retransmit request to the 678 // transmitter. 679 // 2) If a gap we were working on has had a packet in the middle or at 680 // the end filled in, send another retransmit request for the begining 681 // portion of the gap. TRTP was designed for LANs where packet 682 // re-ordering is very unlikely; so see the middle or end of a gap 683 // filled in before the begining is an almost certain indication that 684 // a retransmission packet was also dropped. 685 // 3) If we have been working on a gap for a while and it still has not 686 // been filled in, send another retransmit request. 687 // 4) If the are no more gaps in the ring, clear the current_gap_status_ 688 // flag to indicate that all is well again. 689 690 // Start by fetching the active gap status. 691 SeqNoGap gap; 692 bool send_retransmit_request = false; 693 bool ret_val = false; 694 GapStatus gap_status; 695 if (kGS_NoGap != (gap_status = ring_buffer_.fetchCurrentGap(&gap))) { 696 // Note: checking for a change in the end sequence number should cover 697 // moving on to an entirely new gap for case #1 as well as resending the 698 // begining of a gap range for case #2. 699 send_retransmit_request = (kGS_NoGap == current_gap_status_) || 700 (current_gap_.end_seq_ != gap.end_seq_); 701 702 // If this is the same gap we have been working on, and it has timed 703 // out, then check to see if our substreams are about to underflow. If 704 // so, instead of sending another retransmit request, just give up on 705 // this gap and move on. 706 if (!send_retransmit_request && 707 (kGS_NoGap != current_gap_status_) && 708 (0 == computeNextGapRetransmitTimeout())) { 709 710 // If out current gap is the fast-start gap, don't bother to skip it 711 // because substreams look like the are about to underflow. 712 if ((kGS_FastStartGap != gap_status) || 713 (current_gap_.end_seq_ != gap.end_seq_)) { 714 for (size_t i = 0; i < substreams_.size(); ++i) { 715 if (substreams_.valueAt(i)->isAboutToUnderflow()) { 716 ALOGV("About to underflow, giving up on gap [%hu, %hu]", 717 gap.start_seq_, gap.end_seq_); 718 ring_buffer_.processNAK(); 719 current_gap_status_ = kGS_NoGap; 720 return true; 721 } 722 } 723 } 724 725 // Looks like no one is about to underflow. Just go ahead and send 726 // the request. 727 send_retransmit_request = true; 728 } 729 } else { 730 current_gap_status_ = kGS_NoGap; 731 } 732 733 if (send_retransmit_request) { 734 // If we have been working on a fast start, and it is still not filled 735 // in, even after the extended retransmit time out, give up and skip it. 736 // The system should fall back into its normal slow-start behavior. 737 if ((kGS_FastStartGap == current_gap_status_) && 738 (current_gap_.end_seq_ == gap.end_seq_)) { 739 ALOGV("Fast start is taking forever; giving up."); 740 ring_buffer_.processNAK(); 741 current_gap_status_ = kGS_NoGap; 742 return true; 743 } 744 745 // Send the request. 746 RetransRequest req; 747 uint32_t magic = (kGS_FastStartGap == gap_status) 748 ? kFastStartRequestMagic 749 : kRetransRequestMagic; 750 req.magic_ = htonl(magic); 751 req.mcast_ip_ = listen_addr_.sin_addr.s_addr; 752 req.mcast_port_ = listen_addr_.sin_port; 753 req.start_seq_ = htons(gap.start_seq_); 754 req.end_seq_ = htons(gap.end_seq_); 755 756 { 757 uint32_t a = ntohl(transmitter_addr_.sin_addr.s_addr); 758 uint16_t p = ntohs(transmitter_addr_.sin_port); 759 ALOGV("Sending to transmitter %u.%u.%u.%u:%hu", 760 ((a >> 24) & 0xFF), 761 ((a >> 16) & 0xFF), 762 ((a >> 8) & 0xFF), 763 ( a & 0xFF), 764 p); 765 } 766 767 int res = sendto(sock_fd_, &req, sizeof(req), 0, 768 reinterpret_cast<struct sockaddr*>(&transmitter_addr_), 769 sizeof(transmitter_addr_)); 770 if (res < 0) { 771 ALOGE("Error when sending retransmit request (%d)", errno); 772 } else { 773 ALOGV("%s request for range [%hu, %hu] sent", 774 (kGS_FastStartGap == gap_status) ? "Fast Start" 775 : "Retransmit", 776 gap.start_seq_, gap.end_seq_); 777 } 778 779 // Update the current gap info. 780 current_gap_ = gap; 781 current_gap_status_ = gap_status; 782 next_retrans_req_time_ = monotonicUSecNow() + 783 ((kGS_FastStartGap == current_gap_status_) 784 ? kFastStartTimeoutUSec 785 : kGapRerequestTimeoutUSec); 786 } 787 788 return false; 789 } 790 791 // Compute when its time to send the next gap retransmission in milliseconds. 792 // Returns < 0 for an infinite timeout (no gap) and 0 if its time to retransmit 793 // right now. 794 int AAH_RXPlayer::computeNextGapRetransmitTimeout() { 795 if (kGS_NoGap == current_gap_status_) { 796 return -1; 797 } 798 799 int64_t timeout_delta = next_retrans_req_time_ - monotonicUSecNow(); 800 801 timeout_delta /= 1000; 802 if (timeout_delta <= 0) { 803 return 0; 804 } 805 806 return static_cast<uint32_t>(timeout_delta); 807 } 808 809 } // namespace android 810