1 /* 2 * Copyright 2013, The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "RTPSender" 19 #include <utils/Log.h> 20 21 #include "RTPSender.h" 22 23 #include <media/stagefright/foundation/ABuffer.h> 24 #include <media/stagefright/foundation/ADebug.h> 25 #include <media/stagefright/foundation/AMessage.h> 26 #include <media/stagefright/foundation/ANetworkSession.h> 27 #include <media/stagefright/foundation/hexdump.h> 28 #include <media/stagefright/MediaErrors.h> 29 #include <media/stagefright/Utils.h> 30 31 #include "include/avc_utils.h" 32 33 namespace android { 34 35 RTPSender::RTPSender( 36 const sp<ANetworkSession> &netSession, 37 const sp<AMessage> ¬ify) 38 : mNetSession(netSession), 39 mNotify(notify), 40 mRTPMode(TRANSPORT_UNDEFINED), 41 mRTCPMode(TRANSPORT_UNDEFINED), 42 mRTPSessionID(0), 43 mRTCPSessionID(0), 44 mRTPConnected(false), 45 mRTCPConnected(false), 46 mLastNTPTime(0), 47 mLastRTPTime(0), 48 mNumRTPSent(0), 49 mNumRTPOctetsSent(0), 50 mNumSRsSent(0), 51 mRTPSeqNo(0), 52 mHistorySize(0) { 53 } 54 55 RTPSender::~RTPSender() { 56 if (mRTCPSessionID != 0) { 57 mNetSession->destroySession(mRTCPSessionID); 58 mRTCPSessionID = 0; 59 } 60 61 if (mRTPSessionID != 0) { 62 mNetSession->destroySession(mRTPSessionID); 63 mRTPSessionID = 0; 64 } 65 } 66 67 // static 68 int32_t RTPBase::PickRandomRTPPort() { 69 // Pick an even integer in range [1024, 65534) 70 71 static const size_t kRange = (65534 - 1024) / 2; 72 73 return (int32_t)(((float)(kRange + 1) * rand()) / RAND_MAX) * 2 + 1024; 74 } 75 76 status_t RTPSender::initAsync( 77 const char *remoteHost, 78 int32_t remoteRTPPort, 79 TransportMode rtpMode, 80 int32_t remoteRTCPPort, 81 TransportMode rtcpMode, 82 int32_t *outLocalRTPPort) { 83 if (mRTPMode != TRANSPORT_UNDEFINED 84 || rtpMode == TRANSPORT_UNDEFINED 85 || rtpMode == TRANSPORT_NONE 86 || rtcpMode == TRANSPORT_UNDEFINED) { 87 return INVALID_OPERATION; 88 } 89 90 CHECK_NE(rtpMode, TRANSPORT_TCP_INTERLEAVED); 91 CHECK_NE(rtcpMode, TRANSPORT_TCP_INTERLEAVED); 92 93 if ((rtcpMode == TRANSPORT_NONE && remoteRTCPPort >= 0) 94 || (rtcpMode != TRANSPORT_NONE && remoteRTCPPort < 0)) { 95 return INVALID_OPERATION; 96 } 97 98 sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id()); 99 100 sp<AMessage> rtcpNotify; 101 if (remoteRTCPPort >= 0) { 102 rtcpNotify = new AMessage(kWhatRTCPNotify, id()); 103 } 104 105 CHECK_EQ(mRTPSessionID, 0); 106 CHECK_EQ(mRTCPSessionID, 0); 107 108 int32_t localRTPPort; 109 110 for (;;) { 111 localRTPPort = PickRandomRTPPort(); 112 113 status_t err; 114 if (rtpMode == TRANSPORT_UDP) { 115 err = mNetSession->createUDPSession( 116 localRTPPort, 117 remoteHost, 118 remoteRTPPort, 119 rtpNotify, 120 &mRTPSessionID); 121 } else { 122 CHECK_EQ(rtpMode, TRANSPORT_TCP); 123 err = mNetSession->createTCPDatagramSession( 124 localRTPPort, 125 remoteHost, 126 remoteRTPPort, 127 rtpNotify, 128 &mRTPSessionID); 129 } 130 131 if (err != OK) { 132 continue; 133 } 134 135 if (remoteRTCPPort < 0) { 136 break; 137 } 138 139 if (rtcpMode == TRANSPORT_UDP) { 140 err = mNetSession->createUDPSession( 141 localRTPPort + 1, 142 remoteHost, 143 remoteRTCPPort, 144 rtcpNotify, 145 &mRTCPSessionID); 146 } else { 147 CHECK_EQ(rtcpMode, TRANSPORT_TCP); 148 err = mNetSession->createTCPDatagramSession( 149 localRTPPort + 1, 150 remoteHost, 151 remoteRTCPPort, 152 rtcpNotify, 153 &mRTCPSessionID); 154 } 155 156 if (err == OK) { 157 break; 158 } 159 160 mNetSession->destroySession(mRTPSessionID); 161 mRTPSessionID = 0; 162 } 163 164 if (rtpMode == TRANSPORT_UDP) { 165 mRTPConnected = true; 166 } 167 168 if (rtcpMode == TRANSPORT_UDP) { 169 mRTCPConnected = true; 170 } 171 172 mRTPMode = rtpMode; 173 mRTCPMode = rtcpMode; 174 *outLocalRTPPort = localRTPPort; 175 176 if (mRTPMode == TRANSPORT_UDP 177 && (mRTCPMode == TRANSPORT_UDP || mRTCPMode == TRANSPORT_NONE)) { 178 notifyInitDone(OK); 179 } 180 181 return OK; 182 } 183 184 status_t RTPSender::queueBuffer( 185 const sp<ABuffer> &buffer, uint8_t packetType, PacketizationMode mode) { 186 status_t err; 187 188 switch (mode) { 189 case PACKETIZATION_NONE: 190 err = queueRawPacket(buffer, packetType); 191 break; 192 193 case PACKETIZATION_TRANSPORT_STREAM: 194 err = queueTSPackets(buffer, packetType); 195 break; 196 197 case PACKETIZATION_H264: 198 err = queueAVCBuffer(buffer, packetType); 199 break; 200 201 default: 202 TRESPASS(); 203 } 204 205 return err; 206 } 207 208 status_t RTPSender::queueRawPacket( 209 const sp<ABuffer> &packet, uint8_t packetType) { 210 CHECK_LE(packet->size(), kMaxUDPPacketSize - 12); 211 212 int64_t timeUs; 213 CHECK(packet->meta()->findInt64("timeUs", &timeUs)); 214 215 sp<ABuffer> udpPacket = new ABuffer(12 + packet->size()); 216 217 udpPacket->setInt32Data(mRTPSeqNo); 218 219 uint8_t *rtp = udpPacket->data(); 220 rtp[0] = 0x80; 221 rtp[1] = packetType; 222 223 rtp[2] = (mRTPSeqNo >> 8) & 0xff; 224 rtp[3] = mRTPSeqNo & 0xff; 225 ++mRTPSeqNo; 226 227 uint32_t rtpTime = (timeUs * 9) / 100ll; 228 229 rtp[4] = rtpTime >> 24; 230 rtp[5] = (rtpTime >> 16) & 0xff; 231 rtp[6] = (rtpTime >> 8) & 0xff; 232 rtp[7] = rtpTime & 0xff; 233 234 rtp[8] = kSourceID >> 24; 235 rtp[9] = (kSourceID >> 16) & 0xff; 236 rtp[10] = (kSourceID >> 8) & 0xff; 237 rtp[11] = kSourceID & 0xff; 238 239 memcpy(&rtp[12], packet->data(), packet->size()); 240 241 return sendRTPPacket( 242 udpPacket, 243 true /* storeInHistory */, 244 true /* timeValid */, 245 ALooper::GetNowUs()); 246 } 247 248 status_t RTPSender::queueTSPackets( 249 const sp<ABuffer> &tsPackets, uint8_t packetType) { 250 CHECK_EQ(0, tsPackets->size() % 188); 251 252 int64_t timeUs; 253 CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs)); 254 255 const size_t numTSPackets = tsPackets->size() / 188; 256 257 size_t srcOffset = 0; 258 while (srcOffset < tsPackets->size()) { 259 sp<ABuffer> udpPacket = 260 new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188); 261 262 udpPacket->setInt32Data(mRTPSeqNo); 263 264 uint8_t *rtp = udpPacket->data(); 265 rtp[0] = 0x80; 266 rtp[1] = packetType; 267 268 rtp[2] = (mRTPSeqNo >> 8) & 0xff; 269 rtp[3] = mRTPSeqNo & 0xff; 270 ++mRTPSeqNo; 271 272 int64_t nowUs = ALooper::GetNowUs(); 273 uint32_t rtpTime = (nowUs * 9) / 100ll; 274 275 rtp[4] = rtpTime >> 24; 276 rtp[5] = (rtpTime >> 16) & 0xff; 277 rtp[6] = (rtpTime >> 8) & 0xff; 278 rtp[7] = rtpTime & 0xff; 279 280 rtp[8] = kSourceID >> 24; 281 rtp[9] = (kSourceID >> 16) & 0xff; 282 rtp[10] = (kSourceID >> 8) & 0xff; 283 rtp[11] = kSourceID & 0xff; 284 285 size_t numTSPackets = (tsPackets->size() - srcOffset) / 188; 286 if (numTSPackets > kMaxNumTSPacketsPerRTPPacket) { 287 numTSPackets = kMaxNumTSPacketsPerRTPPacket; 288 } 289 290 memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188); 291 292 udpPacket->setRange(0, 12 + numTSPackets * 188); 293 294 srcOffset += numTSPackets * 188; 295 bool isLastPacket = (srcOffset == tsPackets->size()); 296 297 status_t err = sendRTPPacket( 298 udpPacket, 299 true /* storeInHistory */, 300 isLastPacket /* timeValid */, 301 timeUs); 302 303 if (err != OK) { 304 return err; 305 } 306 } 307 308 return OK; 309 } 310 311 status_t RTPSender::queueAVCBuffer( 312 const sp<ABuffer> &accessUnit, uint8_t packetType) { 313 int64_t timeUs; 314 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 315 316 uint32_t rtpTime = (timeUs * 9 / 100ll); 317 318 List<sp<ABuffer> > packets; 319 320 sp<ABuffer> out = new ABuffer(kMaxUDPPacketSize); 321 size_t outBytesUsed = 12; // Placeholder for RTP header. 322 323 const uint8_t *data = accessUnit->data(); 324 size_t size = accessUnit->size(); 325 const uint8_t *nalStart; 326 size_t nalSize; 327 while (getNextNALUnit( 328 &data, &size, &nalStart, &nalSize, 329 true /* startCodeFollows */) == OK) { 330 size_t bytesNeeded = nalSize + 2; 331 if (outBytesUsed == 12) { 332 ++bytesNeeded; 333 } 334 335 if (outBytesUsed + bytesNeeded > out->capacity()) { 336 bool emitSingleNALPacket = false; 337 338 if (outBytesUsed == 12 339 && outBytesUsed + nalSize <= out->capacity()) { 340 // We haven't emitted anything into the current packet yet and 341 // this NAL unit fits into a single-NAL-unit-packet while 342 // it wouldn't have fit as part of a STAP-A packet. 343 344 memcpy(out->data() + outBytesUsed, nalStart, nalSize); 345 outBytesUsed += nalSize; 346 347 emitSingleNALPacket = true; 348 } 349 350 if (outBytesUsed > 12) { 351 out->setRange(0, outBytesUsed); 352 packets.push_back(out); 353 out = new ABuffer(kMaxUDPPacketSize); 354 outBytesUsed = 12; // Placeholder for RTP header 355 } 356 357 if (emitSingleNALPacket) { 358 continue; 359 } 360 } 361 362 if (outBytesUsed + bytesNeeded <= out->capacity()) { 363 uint8_t *dst = out->data() + outBytesUsed; 364 365 if (outBytesUsed == 12) { 366 *dst++ = 24; // STAP-A header 367 } 368 369 *dst++ = (nalSize >> 8) & 0xff; 370 *dst++ = nalSize & 0xff; 371 memcpy(dst, nalStart, nalSize); 372 373 outBytesUsed += bytesNeeded; 374 continue; 375 } 376 377 // This single NAL unit does not fit into a single RTP packet, 378 // we need to emit an FU-A. 379 380 CHECK_EQ(outBytesUsed, 12u); 381 382 uint8_t nalType = nalStart[0] & 0x1f; 383 uint8_t nri = (nalStart[0] >> 5) & 3; 384 385 size_t srcOffset = 1; 386 while (srcOffset < nalSize) { 387 size_t copy = out->capacity() - outBytesUsed - 2; 388 if (copy > nalSize - srcOffset) { 389 copy = nalSize - srcOffset; 390 } 391 392 uint8_t *dst = out->data() + outBytesUsed; 393 dst[0] = (nri << 5) | 28; 394 395 dst[1] = nalType; 396 397 if (srcOffset == 1) { 398 dst[1] |= 0x80; 399 } 400 401 if (srcOffset + copy == nalSize) { 402 dst[1] |= 0x40; 403 } 404 405 memcpy(&dst[2], nalStart + srcOffset, copy); 406 srcOffset += copy; 407 408 out->setRange(0, outBytesUsed + copy + 2); 409 410 packets.push_back(out); 411 out = new ABuffer(kMaxUDPPacketSize); 412 outBytesUsed = 12; // Placeholder for RTP header 413 } 414 } 415 416 if (outBytesUsed > 12) { 417 out->setRange(0, outBytesUsed); 418 packets.push_back(out); 419 } 420 421 while (!packets.empty()) { 422 sp<ABuffer> out = *packets.begin(); 423 packets.erase(packets.begin()); 424 425 out->setInt32Data(mRTPSeqNo); 426 427 bool last = packets.empty(); 428 429 uint8_t *dst = out->data(); 430 431 dst[0] = 0x80; 432 433 dst[1] = packetType; 434 if (last) { 435 dst[1] |= 1 << 7; // M-bit 436 } 437 438 dst[2] = (mRTPSeqNo >> 8) & 0xff; 439 dst[3] = mRTPSeqNo & 0xff; 440 ++mRTPSeqNo; 441 442 dst[4] = rtpTime >> 24; 443 dst[5] = (rtpTime >> 16) & 0xff; 444 dst[6] = (rtpTime >> 8) & 0xff; 445 dst[7] = rtpTime & 0xff; 446 dst[8] = kSourceID >> 24; 447 dst[9] = (kSourceID >> 16) & 0xff; 448 dst[10] = (kSourceID >> 8) & 0xff; 449 dst[11] = kSourceID & 0xff; 450 451 status_t err = sendRTPPacket(out, true /* storeInHistory */); 452 453 if (err != OK) { 454 return err; 455 } 456 } 457 458 return OK; 459 } 460 461 status_t RTPSender::sendRTPPacket( 462 const sp<ABuffer> &buffer, bool storeInHistory, 463 bool timeValid, int64_t timeUs) { 464 CHECK(mRTPConnected); 465 466 status_t err = mNetSession->sendRequest( 467 mRTPSessionID, buffer->data(), buffer->size(), 468 timeValid, timeUs); 469 470 if (err != OK) { 471 return err; 472 } 473 474 mLastNTPTime = GetNowNTP(); 475 mLastRTPTime = U32_AT(buffer->data() + 4); 476 477 ++mNumRTPSent; 478 mNumRTPOctetsSent += buffer->size() - 12; 479 480 if (storeInHistory) { 481 if (mHistorySize == kMaxHistorySize) { 482 mHistory.erase(mHistory.begin()); 483 } else { 484 ++mHistorySize; 485 } 486 mHistory.push_back(buffer); 487 } 488 489 return OK; 490 } 491 492 // static 493 uint64_t RTPSender::GetNowNTP() { 494 struct timeval tv; 495 gettimeofday(&tv, NULL /* timezone */); 496 497 uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec; 498 499 nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll; 500 501 uint64_t hi = nowUs / 1000000ll; 502 uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll; 503 504 return (hi << 32) | lo; 505 } 506 507 void RTPSender::onMessageReceived(const sp<AMessage> &msg) { 508 switch (msg->what()) { 509 case kWhatRTPNotify: 510 case kWhatRTCPNotify: 511 onNetNotify(msg->what() == kWhatRTPNotify, msg); 512 break; 513 514 default: 515 TRESPASS(); 516 } 517 } 518 519 void RTPSender::onNetNotify(bool isRTP, const sp<AMessage> &msg) { 520 int32_t reason; 521 CHECK(msg->findInt32("reason", &reason)); 522 523 switch (reason) { 524 case ANetworkSession::kWhatError: 525 { 526 int32_t sessionID; 527 CHECK(msg->findInt32("sessionID", &sessionID)); 528 529 int32_t err; 530 CHECK(msg->findInt32("err", &err)); 531 532 int32_t errorOccuredDuringSend; 533 CHECK(msg->findInt32("send", &errorOccuredDuringSend)); 534 535 AString detail; 536 CHECK(msg->findString("detail", &detail)); 537 538 ALOGE("An error occurred during %s in session %d " 539 "(%d, '%s' (%s)).", 540 errorOccuredDuringSend ? "send" : "receive", 541 sessionID, 542 err, 543 detail.c_str(), 544 strerror(-err)); 545 546 mNetSession->destroySession(sessionID); 547 548 if (sessionID == mRTPSessionID) { 549 mRTPSessionID = 0; 550 } else if (sessionID == mRTCPSessionID) { 551 mRTCPSessionID = 0; 552 } 553 554 if (!mRTPConnected 555 || (mRTPMode != TRANSPORT_NONE && !mRTCPConnected)) { 556 // We haven't completed initialization, attach the error 557 // to the notification instead. 558 notifyInitDone(err); 559 break; 560 } 561 562 notifyError(err); 563 break; 564 } 565 566 case ANetworkSession::kWhatDatagram: 567 { 568 sp<ABuffer> data; 569 CHECK(msg->findBuffer("data", &data)); 570 571 if (isRTP) { 572 ALOGW("Huh? Received data on RTP connection..."); 573 } else { 574 onRTCPData(data); 575 } 576 break; 577 } 578 579 case ANetworkSession::kWhatConnected: 580 { 581 int32_t sessionID; 582 CHECK(msg->findInt32("sessionID", &sessionID)); 583 584 if (isRTP) { 585 CHECK_EQ(mRTPMode, TRANSPORT_TCP); 586 CHECK_EQ(sessionID, mRTPSessionID); 587 mRTPConnected = true; 588 } else { 589 CHECK_EQ(mRTCPMode, TRANSPORT_TCP); 590 CHECK_EQ(sessionID, mRTCPSessionID); 591 mRTCPConnected = true; 592 } 593 594 if (mRTPConnected 595 && (mRTCPMode == TRANSPORT_NONE || mRTCPConnected)) { 596 notifyInitDone(OK); 597 } 598 break; 599 } 600 601 case ANetworkSession::kWhatNetworkStall: 602 { 603 size_t numBytesQueued; 604 CHECK(msg->findSize("numBytesQueued", &numBytesQueued)); 605 606 notifyNetworkStall(numBytesQueued); 607 break; 608 } 609 610 default: 611 TRESPASS(); 612 } 613 } 614 615 status_t RTPSender::onRTCPData(const sp<ABuffer> &buffer) { 616 const uint8_t *data = buffer->data(); 617 size_t size = buffer->size(); 618 619 while (size > 0) { 620 if (size < 8) { 621 // Too short to be a valid RTCP header 622 return ERROR_MALFORMED; 623 } 624 625 if ((data[0] >> 6) != 2) { 626 // Unsupported version. 627 return ERROR_UNSUPPORTED; 628 } 629 630 if (data[0] & 0x20) { 631 // Padding present. 632 633 size_t paddingLength = data[size - 1]; 634 635 if (paddingLength + 12 > size) { 636 // If we removed this much padding we'd end up with something 637 // that's too short to be a valid RTP header. 638 return ERROR_MALFORMED; 639 } 640 641 size -= paddingLength; 642 } 643 644 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4; 645 646 if (size < headerLength) { 647 // Only received a partial packet? 648 return ERROR_MALFORMED; 649 } 650 651 switch (data[1]) { 652 case 200: 653 case 201: // RR 654 parseReceiverReport(data, headerLength); 655 break; 656 657 case 202: // SDES 658 case 203: 659 break; 660 661 case 204: // APP 662 parseAPP(data, headerLength); 663 break; 664 665 case 205: // TSFB (transport layer specific feedback) 666 parseTSFB(data, headerLength); 667 break; 668 669 case 206: // PSFB (payload specific feedback) 670 // hexdump(data, headerLength); 671 break; 672 673 default: 674 { 675 ALOGW("Unknown RTCP packet type %u of size %d", 676 (unsigned)data[1], headerLength); 677 break; 678 } 679 } 680 681 data += headerLength; 682 size -= headerLength; 683 } 684 685 return OK; 686 } 687 688 status_t RTPSender::parseReceiverReport( 689 const uint8_t *data, size_t /* size */) { 690 float fractionLost = data[12] / 256.0f; 691 692 ALOGI("lost %.2f %% of packets during report interval.", 693 100.0f * fractionLost); 694 695 return OK; 696 } 697 698 status_t RTPSender::parseTSFB(const uint8_t *data, size_t size) { 699 if ((data[0] & 0x1f) != 1) { 700 return ERROR_UNSUPPORTED; // We only support NACK for now. 701 } 702 703 uint32_t srcId = U32_AT(&data[8]); 704 if (srcId != kSourceID) { 705 return ERROR_MALFORMED; 706 } 707 708 for (size_t i = 12; i < size; i += 4) { 709 uint16_t seqNo = U16_AT(&data[i]); 710 uint16_t blp = U16_AT(&data[i + 2]); 711 712 List<sp<ABuffer> >::iterator it = mHistory.begin(); 713 bool foundSeqNo = false; 714 while (it != mHistory.end()) { 715 const sp<ABuffer> &buffer = *it; 716 717 uint16_t bufferSeqNo = buffer->int32Data() & 0xffff; 718 719 bool retransmit = false; 720 if (bufferSeqNo == seqNo) { 721 retransmit = true; 722 } else if (blp != 0) { 723 for (size_t i = 0; i < 16; ++i) { 724 if ((blp & (1 << i)) 725 && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) { 726 blp &= ~(1 << i); 727 retransmit = true; 728 } 729 } 730 } 731 732 if (retransmit) { 733 ALOGV("retransmitting seqNo %d", bufferSeqNo); 734 735 CHECK_EQ((status_t)OK, 736 sendRTPPacket(buffer, false /* storeInHistory */)); 737 738 if (bufferSeqNo == seqNo) { 739 foundSeqNo = true; 740 } 741 742 if (foundSeqNo && blp == 0) { 743 break; 744 } 745 } 746 747 ++it; 748 } 749 750 if (!foundSeqNo || blp != 0) { 751 ALOGI("Some sequence numbers were no longer available for " 752 "retransmission (seqNo = %d, foundSeqNo = %d, blp = 0x%04x)", 753 seqNo, foundSeqNo, blp); 754 755 if (!mHistory.empty()) { 756 int32_t earliest = (*mHistory.begin())->int32Data() & 0xffff; 757 int32_t latest = (*--mHistory.end())->int32Data() & 0xffff; 758 759 ALOGI("have seq numbers from %d - %d", earliest, latest); 760 } 761 } 762 } 763 764 return OK; 765 } 766 767 status_t RTPSender::parseAPP(const uint8_t *data, size_t size) { 768 if (!memcmp("late", &data[8], 4)) { 769 int64_t avgLatencyUs = (int64_t)U64_AT(&data[12]); 770 int64_t maxLatencyUs = (int64_t)U64_AT(&data[20]); 771 772 sp<AMessage> notify = mNotify->dup(); 773 notify->setInt32("what", kWhatInformSender); 774 notify->setInt64("avgLatencyUs", avgLatencyUs); 775 notify->setInt64("maxLatencyUs", maxLatencyUs); 776 notify->post(); 777 } 778 779 return OK; 780 } 781 782 void RTPSender::notifyInitDone(status_t err) { 783 sp<AMessage> notify = mNotify->dup(); 784 notify->setInt32("what", kWhatInitDone); 785 notify->setInt32("err", err); 786 notify->post(); 787 } 788 789 void RTPSender::notifyError(status_t err) { 790 sp<AMessage> notify = mNotify->dup(); 791 notify->setInt32("what", kWhatError); 792 notify->setInt32("err", err); 793 notify->post(); 794 } 795 796 void RTPSender::notifyNetworkStall(size_t numBytesQueued) { 797 sp<AMessage> notify = mNotify->dup(); 798 notify->setInt32("what", kWhatNetworkStall); 799 notify->setSize("numBytesQueued", numBytesQueued); 800 notify->post(); 801 } 802 803 } // namespace android 804 805