1 /* 2 * Copyright 2013, The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "RTPSender" 19 #include <utils/Log.h> 20 21 #include "RTPSender.h" 22 23 #include <media/stagefright/foundation/ABuffer.h> 24 #include <media/stagefright/foundation/ADebug.h> 25 #include <media/stagefright/foundation/AMessage.h> 26 #include <media/stagefright/foundation/ANetworkSession.h> 27 #include <media/stagefright/foundation/hexdump.h> 28 #include <media/stagefright/MediaErrors.h> 29 #include <media/stagefright/Utils.h> 30 31 #include "include/avc_utils.h" 32 33 namespace android { 34 35 RTPSender::RTPSender( 36 const sp<ANetworkSession> &netSession, 37 const sp<AMessage> ¬ify) 38 : mNetSession(netSession), 39 mNotify(notify), 40 mRTPMode(TRANSPORT_UNDEFINED), 41 mRTCPMode(TRANSPORT_UNDEFINED), 42 mRTPSessionID(0), 43 mRTCPSessionID(0), 44 mRTPConnected(false), 45 mRTCPConnected(false), 46 mLastNTPTime(0), 47 mLastRTPTime(0), 48 mNumRTPSent(0), 49 mNumRTPOctetsSent(0), 50 mNumSRsSent(0), 51 mRTPSeqNo(0), 52 mHistorySize(0) { 53 } 54 55 RTPSender::~RTPSender() { 56 if (mRTCPSessionID != 0) { 57 mNetSession->destroySession(mRTCPSessionID); 58 mRTCPSessionID = 0; 59 } 60 61 if (mRTPSessionID != 0) { 62 mNetSession->destroySession(mRTPSessionID); 63 mRTPSessionID = 0; 64 } 65 } 66 67 // static 68 int32_t RTPBase::PickRandomRTPPort() { 69 // Pick an even integer in range [1024, 65534) 70 71 static const size_t kRange = (65534 - 1024) / 2; 72 73 return (int32_t)(((float)(kRange + 1) * rand()) / RAND_MAX) * 2 + 1024; 74 } 75 76 status_t RTPSender::initAsync( 77 const char *remoteHost, 78 int32_t remoteRTPPort, 79 TransportMode rtpMode, 80 int32_t remoteRTCPPort, 81 TransportMode rtcpMode, 82 int32_t *outLocalRTPPort) { 83 if (mRTPMode != TRANSPORT_UNDEFINED 84 || rtpMode == TRANSPORT_UNDEFINED 85 || rtpMode == TRANSPORT_NONE 86 || rtcpMode == TRANSPORT_UNDEFINED) { 87 return INVALID_OPERATION; 88 } 89 90 CHECK_NE(rtpMode, TRANSPORT_TCP_INTERLEAVED); 91 CHECK_NE(rtcpMode, TRANSPORT_TCP_INTERLEAVED); 92 93 if ((rtcpMode == TRANSPORT_NONE && remoteRTCPPort >= 0) 94 || (rtcpMode != TRANSPORT_NONE && remoteRTCPPort < 0)) { 95 return INVALID_OPERATION; 96 } 97 98 sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, this); 99 100 sp<AMessage> rtcpNotify; 101 if (remoteRTCPPort >= 0) { 102 rtcpNotify = new AMessage(kWhatRTCPNotify, this); 103 } 104 105 CHECK_EQ(mRTPSessionID, 0); 106 CHECK_EQ(mRTCPSessionID, 0); 107 108 int32_t localRTPPort; 109 110 for (;;) { 111 localRTPPort = PickRandomRTPPort(); 112 113 status_t err; 114 if (rtpMode == TRANSPORT_UDP) { 115 err = mNetSession->createUDPSession( 116 localRTPPort, 117 remoteHost, 118 remoteRTPPort, 119 rtpNotify, 120 &mRTPSessionID); 121 } else { 122 CHECK_EQ(rtpMode, TRANSPORT_TCP); 123 err = mNetSession->createTCPDatagramSession( 124 localRTPPort, 125 remoteHost, 126 remoteRTPPort, 127 rtpNotify, 128 &mRTPSessionID); 129 } 130 131 if (err != OK) { 132 continue; 133 } 134 135 if (remoteRTCPPort < 0) { 136 break; 137 } 138 139 if (rtcpMode == TRANSPORT_UDP) { 140 err = mNetSession->createUDPSession( 141 localRTPPort + 1, 142 remoteHost, 143 remoteRTCPPort, 144 rtcpNotify, 145 &mRTCPSessionID); 146 } else { 147 CHECK_EQ(rtcpMode, TRANSPORT_TCP); 148 err = mNetSession->createTCPDatagramSession( 149 localRTPPort + 1, 150 remoteHost, 151 remoteRTCPPort, 152 rtcpNotify, 153 &mRTCPSessionID); 154 } 155 156 if (err == OK) { 157 break; 158 } 159 160 mNetSession->destroySession(mRTPSessionID); 161 mRTPSessionID = 0; 162 } 163 164 if (rtpMode == TRANSPORT_UDP) { 165 mRTPConnected = true; 166 } 167 168 if (rtcpMode == TRANSPORT_UDP) { 169 mRTCPConnected = true; 170 } 171 172 mRTPMode = rtpMode; 173 mRTCPMode = rtcpMode; 174 *outLocalRTPPort = localRTPPort; 175 176 if (mRTPMode == TRANSPORT_UDP 177 && (mRTCPMode == TRANSPORT_UDP || mRTCPMode == TRANSPORT_NONE)) { 178 notifyInitDone(OK); 179 } 180 181 return OK; 182 } 183 184 status_t RTPSender::queueBuffer( 185 const sp<ABuffer> &buffer, uint8_t packetType, PacketizationMode mode) { 186 status_t err; 187 188 switch (mode) { 189 case PACKETIZATION_NONE: 190 err = queueRawPacket(buffer, packetType); 191 break; 192 193 case PACKETIZATION_TRANSPORT_STREAM: 194 err = queueTSPackets(buffer, packetType); 195 break; 196 197 case PACKETIZATION_H264: 198 err = queueAVCBuffer(buffer, packetType); 199 break; 200 201 default: 202 TRESPASS(); 203 } 204 205 return err; 206 } 207 208 status_t RTPSender::queueRawPacket( 209 const sp<ABuffer> &packet, uint8_t packetType) { 210 CHECK_LE(packet->size(), kMaxUDPPacketSize - 12); 211 212 int64_t timeUs; 213 CHECK(packet->meta()->findInt64("timeUs", &timeUs)); 214 215 sp<ABuffer> udpPacket = new ABuffer(12 + packet->size()); 216 217 udpPacket->setInt32Data(mRTPSeqNo); 218 219 uint8_t *rtp = udpPacket->data(); 220 rtp[0] = 0x80; 221 rtp[1] = packetType; 222 223 rtp[2] = (mRTPSeqNo >> 8) & 0xff; 224 rtp[3] = mRTPSeqNo & 0xff; 225 ++mRTPSeqNo; 226 227 uint32_t rtpTime = (timeUs * 9) / 100ll; 228 229 rtp[4] = rtpTime >> 24; 230 rtp[5] = (rtpTime >> 16) & 0xff; 231 rtp[6] = (rtpTime >> 8) & 0xff; 232 rtp[7] = rtpTime & 0xff; 233 234 rtp[8] = kSourceID >> 24; 235 rtp[9] = (kSourceID >> 16) & 0xff; 236 rtp[10] = (kSourceID >> 8) & 0xff; 237 rtp[11] = kSourceID & 0xff; 238 239 memcpy(&rtp[12], packet->data(), packet->size()); 240 241 return sendRTPPacket( 242 udpPacket, 243 true /* storeInHistory */, 244 true /* timeValid */, 245 ALooper::GetNowUs()); 246 } 247 248 status_t RTPSender::queueTSPackets( 249 const sp<ABuffer> &tsPackets, uint8_t packetType) { 250 CHECK_EQ(0, tsPackets->size() % 188); 251 252 int64_t timeUs; 253 CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs)); 254 255 size_t srcOffset = 0; 256 while (srcOffset < tsPackets->size()) { 257 sp<ABuffer> udpPacket = 258 new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188); 259 260 udpPacket->setInt32Data(mRTPSeqNo); 261 262 uint8_t *rtp = udpPacket->data(); 263 rtp[0] = 0x80; 264 rtp[1] = packetType; 265 266 rtp[2] = (mRTPSeqNo >> 8) & 0xff; 267 rtp[3] = mRTPSeqNo & 0xff; 268 ++mRTPSeqNo; 269 270 int64_t nowUs = ALooper::GetNowUs(); 271 uint32_t rtpTime = (nowUs * 9) / 100ll; 272 273 rtp[4] = rtpTime >> 24; 274 rtp[5] = (rtpTime >> 16) & 0xff; 275 rtp[6] = (rtpTime >> 8) & 0xff; 276 rtp[7] = rtpTime & 0xff; 277 278 rtp[8] = kSourceID >> 24; 279 rtp[9] = (kSourceID >> 16) & 0xff; 280 rtp[10] = (kSourceID >> 8) & 0xff; 281 rtp[11] = kSourceID & 0xff; 282 283 size_t numTSPackets = (tsPackets->size() - srcOffset) / 188; 284 if (numTSPackets > kMaxNumTSPacketsPerRTPPacket) { 285 numTSPackets = kMaxNumTSPacketsPerRTPPacket; 286 } 287 288 memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188); 289 290 udpPacket->setRange(0, 12 + numTSPackets * 188); 291 292 srcOffset += numTSPackets * 188; 293 bool isLastPacket = (srcOffset == tsPackets->size()); 294 295 status_t err = sendRTPPacket( 296 udpPacket, 297 true /* storeInHistory */, 298 isLastPacket /* timeValid */, 299 timeUs); 300 301 if (err != OK) { 302 return err; 303 } 304 } 305 306 return OK; 307 } 308 309 status_t RTPSender::queueAVCBuffer( 310 const sp<ABuffer> &accessUnit, uint8_t packetType) { 311 int64_t timeUs; 312 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 313 314 uint32_t rtpTime = (timeUs * 9 / 100ll); 315 316 List<sp<ABuffer> > packets; 317 318 sp<ABuffer> out = new ABuffer(kMaxUDPPacketSize); 319 size_t outBytesUsed = 12; // Placeholder for RTP header. 320 321 const uint8_t *data = accessUnit->data(); 322 size_t size = accessUnit->size(); 323 const uint8_t *nalStart; 324 size_t nalSize; 325 while (getNextNALUnit( 326 &data, &size, &nalStart, &nalSize, 327 true /* startCodeFollows */) == OK) { 328 size_t bytesNeeded = nalSize + 2; 329 if (outBytesUsed == 12) { 330 ++bytesNeeded; 331 } 332 333 if (outBytesUsed + bytesNeeded > out->capacity()) { 334 bool emitSingleNALPacket = false; 335 336 if (outBytesUsed == 12 337 && outBytesUsed + nalSize <= out->capacity()) { 338 // We haven't emitted anything into the current packet yet and 339 // this NAL unit fits into a single-NAL-unit-packet while 340 // it wouldn't have fit as part of a STAP-A packet. 341 342 memcpy(out->data() + outBytesUsed, nalStart, nalSize); 343 outBytesUsed += nalSize; 344 345 emitSingleNALPacket = true; 346 } 347 348 if (outBytesUsed > 12) { 349 out->setRange(0, outBytesUsed); 350 packets.push_back(out); 351 out = new ABuffer(kMaxUDPPacketSize); 352 outBytesUsed = 12; // Placeholder for RTP header 353 } 354 355 if (emitSingleNALPacket) { 356 continue; 357 } 358 } 359 360 if (outBytesUsed + bytesNeeded <= out->capacity()) { 361 uint8_t *dst = out->data() + outBytesUsed; 362 363 if (outBytesUsed == 12) { 364 *dst++ = 24; // STAP-A header 365 } 366 367 *dst++ = (nalSize >> 8) & 0xff; 368 *dst++ = nalSize & 0xff; 369 memcpy(dst, nalStart, nalSize); 370 371 outBytesUsed += bytesNeeded; 372 continue; 373 } 374 375 // This single NAL unit does not fit into a single RTP packet, 376 // we need to emit an FU-A. 377 378 CHECK_EQ(outBytesUsed, 12u); 379 380 uint8_t nalType = nalStart[0] & 0x1f; 381 uint8_t nri = (nalStart[0] >> 5) & 3; 382 383 size_t srcOffset = 1; 384 while (srcOffset < nalSize) { 385 size_t copy = out->capacity() - outBytesUsed - 2; 386 if (copy > nalSize - srcOffset) { 387 copy = nalSize - srcOffset; 388 } 389 390 uint8_t *dst = out->data() + outBytesUsed; 391 dst[0] = (nri << 5) | 28; 392 393 dst[1] = nalType; 394 395 if (srcOffset == 1) { 396 dst[1] |= 0x80; 397 } 398 399 if (srcOffset + copy == nalSize) { 400 dst[1] |= 0x40; 401 } 402 403 memcpy(&dst[2], nalStart + srcOffset, copy); 404 srcOffset += copy; 405 406 out->setRange(0, outBytesUsed + copy + 2); 407 408 packets.push_back(out); 409 out = new ABuffer(kMaxUDPPacketSize); 410 outBytesUsed = 12; // Placeholder for RTP header 411 } 412 } 413 414 if (outBytesUsed > 12) { 415 out->setRange(0, outBytesUsed); 416 packets.push_back(out); 417 } 418 419 while (!packets.empty()) { 420 sp<ABuffer> out = *packets.begin(); 421 packets.erase(packets.begin()); 422 423 out->setInt32Data(mRTPSeqNo); 424 425 bool last = packets.empty(); 426 427 uint8_t *dst = out->data(); 428 429 dst[0] = 0x80; 430 431 dst[1] = packetType; 432 if (last) { 433 dst[1] |= 1 << 7; // M-bit 434 } 435 436 dst[2] = (mRTPSeqNo >> 8) & 0xff; 437 dst[3] = mRTPSeqNo & 0xff; 438 ++mRTPSeqNo; 439 440 dst[4] = rtpTime >> 24; 441 dst[5] = (rtpTime >> 16) & 0xff; 442 dst[6] = (rtpTime >> 8) & 0xff; 443 dst[7] = rtpTime & 0xff; 444 dst[8] = kSourceID >> 24; 445 dst[9] = (kSourceID >> 16) & 0xff; 446 dst[10] = (kSourceID >> 8) & 0xff; 447 dst[11] = kSourceID & 0xff; 448 449 status_t err = sendRTPPacket(out, true /* storeInHistory */); 450 451 if (err != OK) { 452 return err; 453 } 454 } 455 456 return OK; 457 } 458 459 status_t RTPSender::sendRTPPacket( 460 const sp<ABuffer> &buffer, bool storeInHistory, 461 bool timeValid, int64_t timeUs) { 462 CHECK(mRTPConnected); 463 464 status_t err = mNetSession->sendRequest( 465 mRTPSessionID, buffer->data(), buffer->size(), 466 timeValid, timeUs); 467 468 if (err != OK) { 469 return err; 470 } 471 472 mLastNTPTime = GetNowNTP(); 473 mLastRTPTime = U32_AT(buffer->data() + 4); 474 475 ++mNumRTPSent; 476 mNumRTPOctetsSent += buffer->size() - 12; 477 478 if (storeInHistory) { 479 if (mHistorySize == kMaxHistorySize) { 480 mHistory.erase(mHistory.begin()); 481 } else { 482 ++mHistorySize; 483 } 484 mHistory.push_back(buffer); 485 } 486 487 return OK; 488 } 489 490 // static 491 uint64_t RTPSender::GetNowNTP() { 492 struct timeval tv; 493 gettimeofday(&tv, NULL /* timezone */); 494 495 uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec; 496 497 nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll; 498 499 uint64_t hi = nowUs / 1000000ll; 500 uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll; 501 502 return (hi << 32) | lo; 503 } 504 505 void RTPSender::onMessageReceived(const sp<AMessage> &msg) { 506 switch (msg->what()) { 507 case kWhatRTPNotify: 508 case kWhatRTCPNotify: 509 onNetNotify(msg->what() == kWhatRTPNotify, msg); 510 break; 511 512 default: 513 TRESPASS(); 514 } 515 } 516 517 void RTPSender::onNetNotify(bool isRTP, const sp<AMessage> &msg) { 518 int32_t reason; 519 CHECK(msg->findInt32("reason", &reason)); 520 521 switch (reason) { 522 case ANetworkSession::kWhatError: 523 { 524 int32_t sessionID; 525 CHECK(msg->findInt32("sessionID", &sessionID)); 526 527 int32_t err; 528 CHECK(msg->findInt32("err", &err)); 529 530 int32_t errorOccuredDuringSend; 531 CHECK(msg->findInt32("send", &errorOccuredDuringSend)); 532 533 AString detail; 534 CHECK(msg->findString("detail", &detail)); 535 536 ALOGE("An error occurred during %s in session %d " 537 "(%d, '%s' (%s)).", 538 errorOccuredDuringSend ? "send" : "receive", 539 sessionID, 540 err, 541 detail.c_str(), 542 strerror(-err)); 543 544 mNetSession->destroySession(sessionID); 545 546 if (sessionID == mRTPSessionID) { 547 mRTPSessionID = 0; 548 } else if (sessionID == mRTCPSessionID) { 549 mRTCPSessionID = 0; 550 } 551 552 if (!mRTPConnected 553 || (mRTPMode != TRANSPORT_NONE && !mRTCPConnected)) { 554 // We haven't completed initialization, attach the error 555 // to the notification instead. 556 notifyInitDone(err); 557 break; 558 } 559 560 notifyError(err); 561 break; 562 } 563 564 case ANetworkSession::kWhatDatagram: 565 { 566 sp<ABuffer> data; 567 CHECK(msg->findBuffer("data", &data)); 568 569 if (isRTP) { 570 ALOGW("Huh? Received data on RTP connection..."); 571 } else { 572 onRTCPData(data); 573 } 574 break; 575 } 576 577 case ANetworkSession::kWhatConnected: 578 { 579 int32_t sessionID; 580 CHECK(msg->findInt32("sessionID", &sessionID)); 581 582 if (isRTP) { 583 CHECK_EQ(mRTPMode, TRANSPORT_TCP); 584 CHECK_EQ(sessionID, mRTPSessionID); 585 mRTPConnected = true; 586 } else { 587 CHECK_EQ(mRTCPMode, TRANSPORT_TCP); 588 CHECK_EQ(sessionID, mRTCPSessionID); 589 mRTCPConnected = true; 590 } 591 592 if (mRTPConnected 593 && (mRTCPMode == TRANSPORT_NONE || mRTCPConnected)) { 594 notifyInitDone(OK); 595 } 596 break; 597 } 598 599 case ANetworkSession::kWhatNetworkStall: 600 { 601 size_t numBytesQueued; 602 CHECK(msg->findSize("numBytesQueued", &numBytesQueued)); 603 604 notifyNetworkStall(numBytesQueued); 605 break; 606 } 607 608 default: 609 TRESPASS(); 610 } 611 } 612 613 status_t RTPSender::onRTCPData(const sp<ABuffer> &buffer) { 614 const uint8_t *data = buffer->data(); 615 size_t size = buffer->size(); 616 617 while (size > 0) { 618 if (size < 8) { 619 // Too short to be a valid RTCP header 620 return ERROR_MALFORMED; 621 } 622 623 if ((data[0] >> 6) != 2) { 624 // Unsupported version. 625 return ERROR_UNSUPPORTED; 626 } 627 628 if (data[0] & 0x20) { 629 // Padding present. 630 631 size_t paddingLength = data[size - 1]; 632 633 if (paddingLength + 12 > size) { 634 // If we removed this much padding we'd end up with something 635 // that's too short to be a valid RTP header. 636 return ERROR_MALFORMED; 637 } 638 639 size -= paddingLength; 640 } 641 642 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 643 644 if (size < headerLength) { 645 // Only received a partial packet? 646 return ERROR_MALFORMED; 647 } 648 649 switch (data[1]) { 650 case 200: 651 case 201: // RR 652 parseReceiverReport(data, headerLength); 653 break; 654 655 case 202: // SDES 656 case 203: 657 break; 658 659 case 204: // APP 660 parseAPP(data, headerLength); 661 break; 662 663 case 205: // TSFB (transport layer specific feedback) 664 parseTSFB(data, headerLength); 665 break; 666 667 case 206: // PSFB (payload specific feedback) 668 // hexdump(data, headerLength); 669 break; 670 671 default: 672 { 673 ALOGW("Unknown RTCP packet type %u of size %zu", 674 (unsigned)data[1], headerLength); 675 break; 676 } 677 } 678 679 data += headerLength; 680 size -= headerLength; 681 } 682 683 return OK; 684 } 685 686 status_t RTPSender::parseReceiverReport( 687 const uint8_t *data, size_t /* size */) { 688 float fractionLost = data[12] / 256.0f; 689 690 ALOGI("lost %.2f %% of packets during report interval.", 691 100.0f * fractionLost); 692 693 return OK; 694 } 695 696 status_t RTPSender::parseTSFB(const uint8_t *data, size_t size) { 697 if ((data[0] & 0x1f) != 1) { 698 return ERROR_UNSUPPORTED; // We only support NACK for now. 699 } 700 701 uint32_t srcId = U32_AT(&data[8]); 702 if (srcId != kSourceID) { 703 return ERROR_MALFORMED; 704 } 705 706 for (size_t i = 12; i < size; i += 4) { 707 uint16_t seqNo = U16_AT(&data[i]); 708 uint16_t blp = U16_AT(&data[i + 2]); 709 710 List<sp<ABuffer> >::iterator it = mHistory.begin(); 711 bool foundSeqNo = false; 712 while (it != mHistory.end()) { 713 const sp<ABuffer> &buffer = *it; 714 715 uint16_t bufferSeqNo = buffer->int32Data() & 0xffff; 716 717 bool retransmit = false; 718 if (bufferSeqNo == seqNo) { 719 retransmit = true; 720 } else if (blp != 0) { 721 for (size_t i = 0; i < 16; ++i) { 722 if ((blp & (1 << i)) 723 && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) { 724 blp &= ~(1 << i); 725 retransmit = true; 726 } 727 } 728 } 729 730 if (retransmit) { 731 ALOGV("retransmitting seqNo %d", bufferSeqNo); 732 733 CHECK_EQ((status_t)OK, 734 sendRTPPacket(buffer, false /* storeInHistory */)); 735 736 if (bufferSeqNo == seqNo) { 737 foundSeqNo = true; 738 } 739 740 if (foundSeqNo && blp == 0) { 741 break; 742 } 743 } 744 745 ++it; 746 } 747 748 if (!foundSeqNo || blp != 0) { 749 ALOGI("Some sequence numbers were no longer available for " 750 "retransmission (seqNo = %d, foundSeqNo = %d, blp = 0x%04x)", 751 seqNo, foundSeqNo, blp); 752 753 if (!mHistory.empty()) { 754 int32_t earliest = (*mHistory.begin())->int32Data() & 0xffff; 755 int32_t latest = (*--mHistory.end())->int32Data() & 0xffff; 756 757 ALOGI("have seq numbers from %d - %d", earliest, latest); 758 } 759 } 760 } 761 762 return OK; 763 } 764 765 status_t RTPSender::parseAPP(const uint8_t *data, size_t size __unused) { 766 if (!memcmp("late", &data[8], 4)) { 767 int64_t avgLatencyUs = (int64_t)U64_AT(&data[12]); 768 int64_t maxLatencyUs = (int64_t)U64_AT(&data[20]); 769 770 sp<AMessage> notify = mNotify->dup(); 771 notify->setInt32("what", kWhatInformSender); 772 notify->setInt64("avgLatencyUs", avgLatencyUs); 773 notify->setInt64("maxLatencyUs", maxLatencyUs); 774 notify->post(); 775 } 776 777 return OK; 778 } 779 780 void RTPSender::notifyInitDone(status_t err) { 781 sp<AMessage> notify = mNotify->dup(); 782 notify->setInt32("what", kWhatInitDone); 783 notify->setInt32("err", err); 784 notify->post(); 785 } 786 787 void RTPSender::notifyError(status_t err) { 788 sp<AMessage> notify = mNotify->dup(); 789 notify->setInt32("what", kWhatError); 790 notify->setInt32("err", err); 791 notify->post(); 792 } 793 794 void RTPSender::notifyNetworkStall(size_t numBytesQueued) { 795 sp<AMessage> notify = mNotify->dup(); 796 notify->setInt32("what", kWhatNetworkStall); 797 notify->setSize("numBytesQueued", numBytesQueued); 798 notify->post(); 799 } 800 801 } // namespace android 802 803