Home | History | Annotate | Download | only in rtp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include <stdio.h>
     18 #include <stdint.h>
     19 #include <string.h>
     20 #include <errno.h>
     21 #include <fcntl.h>
     22 #include <sys/epoll.h>
     23 #include <sys/types.h>
     24 #include <sys/socket.h>
     25 #include <sys/stat.h>
     26 #include <sys/time.h>
     27 #include <time.h>
     28 #include <arpa/inet.h>
     29 #include <netinet/in.h>
     30 
     31 #define LOG_TAG "AudioGroup"
     32 #include <cutils/atomic.h>
     33 #include <utils/Log.h>
     34 #include <utils/Errors.h>
     35 #include <utils/RefBase.h>
     36 #include <utils/threads.h>
     37 #include <utils/SystemClock.h>
     38 #include <media/AudioSystem.h>
     39 #include <media/AudioRecord.h>
     40 #include <media/AudioTrack.h>
     41 #include <media/mediarecorder.h>
     42 
     43 #include "jni.h"
     44 #include "JNIHelp.h"
     45 
     46 #include "AudioCodec.h"
     47 #include "EchoSuppressor.h"
     48 
     49 extern int parse(JNIEnv *env, jstring jAddress, int port, sockaddr_storage *ss);
     50 
     51 namespace {
     52 
     53 using namespace android;
     54 
     55 int gRandom = -1;
     56 
     57 // We use a circular array to implement jitter buffer. The simplest way is doing
     58 // a modulo operation on the index while accessing the array. However modulo can
     59 // be expensive on some platforms, such as ARM. Thus we round up the size of the
     60 // array to the nearest power of 2 and then use bitwise-and instead of modulo.
     61 // Currently we make it 512ms long and assume packet interval is 40ms or less.
     62 // The first 80ms is the place where samples get mixed. The rest 432ms is the
     63 // real jitter buffer. For a stream at 8000Hz it takes 8192 bytes. These numbers
     64 // are chosen by experiments and each of them can be adjusted as needed.
     65 
     66 // Other notes:
     67 // + We use elapsedRealtime() to get the time. Since we use 32bit variables
     68 //   instead of 64bit ones, comparison must be done by subtraction.
     69 // + Sampling rate must be multiple of 1000Hz, and packet length must be in
     70 //   milliseconds. No floating points.
     71 // + If we cannot get enough CPU, we drop samples and simulate packet loss.
     72 // + Resampling is not done yet, so streams in one group must use the same rate.
     73 //   For the first release only 8000Hz is supported.
     74 
     75 #define BUFFER_SIZE     512
     76 #define HISTORY_SIZE    80
     77 #define MEASURE_PERIOD  2000
     78 
     79 class AudioStream
     80 {
     81 public:
     82     AudioStream();
     83     ~AudioStream();
     84     bool set(int mode, int socket, sockaddr_storage *remote,
     85         AudioCodec *codec, int sampleRate, int sampleCount,
     86         int codecType, int dtmfType);
     87 
     88     void sendDtmf(int event);
     89     bool mix(int32_t *output, int head, int tail, int sampleRate);
     90     void encode(int tick, AudioStream *chain);
     91     void decode(int tick);
     92 
     93     enum {
     94         NORMAL = 0,
     95         SEND_ONLY = 1,
     96         RECEIVE_ONLY = 2,
     97         LAST_MODE = 2,
     98     };
     99 
    100 private:
    101     int mMode;
    102     int mSocket;
    103     sockaddr_storage mRemote;
    104     AudioCodec *mCodec;
    105     uint32_t mCodecMagic;
    106     uint32_t mDtmfMagic;
    107     bool mFixRemote;
    108 
    109     int mTick;
    110     int mSampleRate;
    111     int mSampleCount;
    112     int mInterval;
    113     int mLogThrottle;
    114 
    115     int16_t *mBuffer;
    116     int mBufferMask;
    117     int mBufferHead;
    118     int mBufferTail;
    119     int mLatencyTimer;
    120     int mLatencyScore;
    121 
    122     uint16_t mSequence;
    123     uint32_t mTimestamp;
    124     uint32_t mSsrc;
    125 
    126     int mDtmfEvent;
    127     int mDtmfStart;
    128 
    129     AudioStream *mNext;
    130 
    131     friend class AudioGroup;
    132 };
    133 
    134 AudioStream::AudioStream()
    135 {
    136     mSocket = -1;
    137     mCodec = NULL;
    138     mBuffer = NULL;
    139     mNext = NULL;
    140 }
    141 
    142 AudioStream::~AudioStream()
    143 {
    144     close(mSocket);
    145     delete mCodec;
    146     delete [] mBuffer;
    147     LOGD("stream[%d] is dead", mSocket);
    148 }
    149 
    150 bool AudioStream::set(int mode, int socket, sockaddr_storage *remote,
    151     AudioCodec *codec, int sampleRate, int sampleCount,
    152     int codecType, int dtmfType)
    153 {
    154     if (mode < 0 || mode > LAST_MODE) {
    155         return false;
    156     }
    157     mMode = mode;
    158 
    159     mCodecMagic = (0x8000 | codecType) << 16;
    160     mDtmfMagic = (dtmfType == -1) ? 0 : (0x8000 | dtmfType) << 16;
    161 
    162     mTick = elapsedRealtime();
    163     mSampleRate = sampleRate / 1000;
    164     mSampleCount = sampleCount;
    165     mInterval = mSampleCount / mSampleRate;
    166 
    167     // Allocate jitter buffer.
    168     for (mBufferMask = 8; mBufferMask < mSampleRate; mBufferMask <<= 1);
    169     mBufferMask *= BUFFER_SIZE;
    170     mBuffer = new int16_t[mBufferMask];
    171     --mBufferMask;
    172     mBufferHead = 0;
    173     mBufferTail = 0;
    174     mLatencyTimer = 0;
    175     mLatencyScore = 0;
    176 
    177     // Initialize random bits.
    178     read(gRandom, &mSequence, sizeof(mSequence));
    179     read(gRandom, &mTimestamp, sizeof(mTimestamp));
    180     read(gRandom, &mSsrc, sizeof(mSsrc));
    181 
    182     mDtmfEvent = -1;
    183     mDtmfStart = 0;
    184 
    185     // Only take over these things when succeeded.
    186     mSocket = socket;
    187     if (codec) {
    188         mRemote = *remote;
    189         mCodec = codec;
    190 
    191         // Here we should never get an private address, but some buggy proxy
    192         // servers do give us one. To solve this, we replace the address when
    193         // the first time we successfully decode an incoming packet.
    194         mFixRemote = false;
    195         if (remote->ss_family == AF_INET) {
    196             unsigned char *address =
    197                 (unsigned char *)&((sockaddr_in *)remote)->sin_addr;
    198             if (address[0] == 10 ||
    199                 (address[0] == 172 && (address[1] >> 4) == 1) ||
    200                 (address[0] == 192 && address[1] == 168)) {
    201                 mFixRemote = true;
    202             }
    203         }
    204     }
    205 
    206     LOGD("stream[%d] is configured as %s %dkHz %dms mode %d", mSocket,
    207         (codec ? codec->name : "RAW"), mSampleRate, mInterval, mMode);
    208     return true;
    209 }
    210 
    211 void AudioStream::sendDtmf(int event)
    212 {
    213     if (mDtmfMagic != 0) {
    214         mDtmfEvent = event << 24;
    215         mDtmfStart = mTimestamp + mSampleCount;
    216     }
    217 }
    218 
    219 bool AudioStream::mix(int32_t *output, int head, int tail, int sampleRate)
    220 {
    221     if (mMode == SEND_ONLY) {
    222         return false;
    223     }
    224 
    225     if (head - mBufferHead < 0) {
    226         head = mBufferHead;
    227     }
    228     if (tail - mBufferTail > 0) {
    229         tail = mBufferTail;
    230     }
    231     if (tail - head <= 0) {
    232         return false;
    233     }
    234 
    235     head *= mSampleRate;
    236     tail *= mSampleRate;
    237 
    238     if (sampleRate == mSampleRate) {
    239         for (int i = head; i - tail < 0; ++i) {
    240             output[i - head] += mBuffer[i & mBufferMask];
    241         }
    242     } else {
    243         // TODO: implement resampling.
    244         return false;
    245     }
    246     return true;
    247 }
    248 
    249 void AudioStream::encode(int tick, AudioStream *chain)
    250 {
    251     if (tick - mTick >= mInterval) {
    252         // We just missed the train. Pretend that packets in between are lost.
    253         int skipped = (tick - mTick) / mInterval;
    254         mTick += skipped * mInterval;
    255         mSequence += skipped;
    256         mTimestamp += skipped * mSampleCount;
    257         LOGV("stream[%d] skips %d packets", mSocket, skipped);
    258     }
    259 
    260     tick = mTick;
    261     mTick += mInterval;
    262     ++mSequence;
    263     mTimestamp += mSampleCount;
    264 
    265     if (mMode == RECEIVE_ONLY) {
    266         return;
    267     }
    268 
    269     // If there is an ongoing DTMF event, send it now.
    270     if (mDtmfEvent != -1) {
    271         int duration = mTimestamp - mDtmfStart;
    272         // Make sure duration is reasonable.
    273         if (duration >= 0 && duration < mSampleRate * 100) {
    274             duration += mSampleCount;
    275             int32_t buffer[4] = {
    276                 htonl(mDtmfMagic | mSequence),
    277                 htonl(mDtmfStart),
    278                 mSsrc,
    279                 htonl(mDtmfEvent | duration),
    280             };
    281             if (duration >= mSampleRate * 100) {
    282                 buffer[3] |= htonl(1 << 23);
    283                 mDtmfEvent = -1;
    284             }
    285             sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,
    286                 (sockaddr *)&mRemote, sizeof(mRemote));
    287             return;
    288         }
    289         mDtmfEvent = -1;
    290     }
    291 
    292     // It is time to mix streams.
    293     bool mixed = false;
    294     int32_t buffer[mSampleCount + 3];
    295     memset(buffer, 0, sizeof(buffer));
    296     while (chain) {
    297         if (chain != this &&
    298             chain->mix(buffer, tick - mInterval, tick, mSampleRate)) {
    299             mixed = true;
    300         }
    301         chain = chain->mNext;
    302     }
    303     if (!mixed) {
    304         if ((mTick ^ mLogThrottle) >> 10) {
    305             mLogThrottle = mTick;
    306             LOGV("stream[%d] no data", mSocket);
    307         }
    308         return;
    309     }
    310 
    311     // Cook the packet and send it out.
    312     int16_t samples[mSampleCount];
    313     for (int i = 0; i < mSampleCount; ++i) {
    314         int32_t sample = buffer[i];
    315         if (sample < -32768) {
    316             sample = -32768;
    317         }
    318         if (sample > 32767) {
    319             sample = 32767;
    320         }
    321         samples[i] = sample;
    322     }
    323     if (!mCodec) {
    324         // Special case for device stream.
    325         send(mSocket, samples, sizeof(samples), MSG_DONTWAIT);
    326         return;
    327     }
    328 
    329     buffer[0] = htonl(mCodecMagic | mSequence);
    330     buffer[1] = htonl(mTimestamp);
    331     buffer[2] = mSsrc;
    332     int length = mCodec->encode(&buffer[3], samples);
    333     if (length <= 0) {
    334         LOGV("stream[%d] encoder error", mSocket);
    335         return;
    336     }
    337     sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,
    338         sizeof(mRemote));
    339 }
    340 
    341 void AudioStream::decode(int tick)
    342 {
    343     char c;
    344     if (mMode == SEND_ONLY) {
    345         recv(mSocket, &c, 1, MSG_DONTWAIT);
    346         return;
    347     }
    348 
    349     // Make sure mBufferHead and mBufferTail are reasonable.
    350     if ((unsigned int)(tick + BUFFER_SIZE - mBufferHead) > BUFFER_SIZE * 2) {
    351         mBufferHead = tick - HISTORY_SIZE;
    352         mBufferTail = mBufferHead;
    353     }
    354 
    355     if (tick - mBufferHead > HISTORY_SIZE) {
    356         // Throw away outdated samples.
    357         mBufferHead = tick - HISTORY_SIZE;
    358         if (mBufferTail - mBufferHead < 0) {
    359             mBufferTail = mBufferHead;
    360         }
    361     }
    362 
    363     // Adjust the jitter buffer if the latency keeps larger than two times of the
    364     // packet interval in the past two seconds.
    365     int score = mBufferTail - tick - mInterval * 2;
    366     if (mLatencyScore > score) {
    367         mLatencyScore = score;
    368     }
    369     if (mLatencyScore <= 0) {
    370         mLatencyTimer = tick;
    371         mLatencyScore = score;
    372     } else if (tick - mLatencyTimer >= MEASURE_PERIOD) {
    373         LOGV("stream[%d] reduces latency of %dms", mSocket, mLatencyScore);
    374         mBufferTail -= mLatencyScore;
    375         mLatencyTimer = tick;
    376     }
    377 
    378     if (mBufferTail - mBufferHead > BUFFER_SIZE - mInterval) {
    379         // Buffer overflow. Drop the packet.
    380         LOGV("stream[%d] buffer overflow", mSocket);
    381         recv(mSocket, &c, 1, MSG_DONTWAIT);
    382         return;
    383     }
    384 
    385     // Receive the packet and decode it.
    386     int16_t samples[mSampleCount];
    387     int length = 0;
    388     if (!mCodec) {
    389         // Special case for device stream.
    390         length = recv(mSocket, samples, sizeof(samples),
    391             MSG_TRUNC | MSG_DONTWAIT) >> 1;
    392     } else {
    393         __attribute__((aligned(4))) uint8_t buffer[2048];
    394         sockaddr_storage remote;
    395         socklen_t len = sizeof(remote);
    396 
    397         length = recvfrom(mSocket, buffer, sizeof(buffer),
    398             MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &len);
    399 
    400         // Do we need to check SSRC, sequence, and timestamp? They are not
    401         // reliable but at least they can be used to identify duplicates?
    402         if (length < 12 || length > (int)sizeof(buffer) ||
    403             (ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {
    404             LOGV("stream[%d] malformed packet", mSocket);
    405             return;
    406         }
    407         int offset = 12 + ((buffer[0] & 0x0F) << 2);
    408         if ((buffer[0] & 0x10) != 0) {
    409             offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);
    410         }
    411         if ((buffer[0] & 0x20) != 0) {
    412             length -= buffer[length - 1];
    413         }
    414         length -= offset;
    415         if (length >= 0) {
    416             length = mCodec->decode(samples, &buffer[offset], length);
    417         }
    418         if (length > 0 && mFixRemote) {
    419             mRemote = remote;
    420             mFixRemote = false;
    421         }
    422     }
    423     if (length <= 0) {
    424         LOGV("stream[%d] decoder error", mSocket);
    425         return;
    426     }
    427 
    428     if (tick - mBufferTail > 0) {
    429         // Buffer underrun. Reset the jitter buffer.
    430         LOGV("stream[%d] buffer underrun", mSocket);
    431         if (mBufferTail - mBufferHead <= 0) {
    432             mBufferHead = tick + mInterval;
    433             mBufferTail = mBufferHead;
    434         } else {
    435             int tail = (tick + mInterval) * mSampleRate;
    436             for (int i = mBufferTail * mSampleRate; i - tail < 0; ++i) {
    437                 mBuffer[i & mBufferMask] = 0;
    438             }
    439             mBufferTail = tick + mInterval;
    440         }
    441     }
    442 
    443     // Append to the jitter buffer.
    444     int tail = mBufferTail * mSampleRate;
    445     for (int i = 0; i < mSampleCount; ++i) {
    446         mBuffer[tail & mBufferMask] = samples[i];
    447         ++tail;
    448     }
    449     mBufferTail += mInterval;
    450 }
    451 
    452 //------------------------------------------------------------------------------
    453 
    454 class AudioGroup
    455 {
    456 public:
    457     AudioGroup();
    458     ~AudioGroup();
    459     bool set(int sampleRate, int sampleCount);
    460 
    461     bool setMode(int mode);
    462     bool sendDtmf(int event);
    463     bool add(AudioStream *stream);
    464     bool remove(int socket);
    465 
    466     enum {
    467         ON_HOLD = 0,
    468         MUTED = 1,
    469         NORMAL = 2,
    470         EC_ENABLED = 3,
    471         LAST_MODE = 3,
    472     };
    473 
    474 private:
    475     AudioStream *mChain;
    476     int mEventQueue;
    477     volatile int mDtmfEvent;
    478 
    479     int mMode;
    480     int mSampleRate;
    481     int mSampleCount;
    482     int mDeviceSocket;
    483 
    484     class NetworkThread : public Thread
    485     {
    486     public:
    487         NetworkThread(AudioGroup *group) : Thread(false), mGroup(group) {}
    488 
    489         bool start()
    490         {
    491             if (run("Network", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
    492                 LOGE("cannot start network thread");
    493                 return false;
    494             }
    495             return true;
    496         }
    497 
    498     private:
    499         AudioGroup *mGroup;
    500         bool threadLoop();
    501     };
    502     sp<NetworkThread> mNetworkThread;
    503 
    504     class DeviceThread : public Thread
    505     {
    506     public:
    507         DeviceThread(AudioGroup *group) : Thread(false), mGroup(group) {}
    508 
    509         bool start()
    510         {
    511             if (run("Device", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
    512                 LOGE("cannot start device thread");
    513                 return false;
    514             }
    515             return true;
    516         }
    517 
    518     private:
    519         AudioGroup *mGroup;
    520         bool threadLoop();
    521     };
    522     sp<DeviceThread> mDeviceThread;
    523 };
    524 
    525 AudioGroup::AudioGroup()
    526 {
    527     mMode = ON_HOLD;
    528     mChain = NULL;
    529     mEventQueue = -1;
    530     mDtmfEvent = -1;
    531     mDeviceSocket = -1;
    532     mNetworkThread = new NetworkThread(this);
    533     mDeviceThread = new DeviceThread(this);
    534 }
    535 
    536 AudioGroup::~AudioGroup()
    537 {
    538     mNetworkThread->requestExitAndWait();
    539     mDeviceThread->requestExitAndWait();
    540     close(mEventQueue);
    541     close(mDeviceSocket);
    542     while (mChain) {
    543         AudioStream *next = mChain->mNext;
    544         delete mChain;
    545         mChain = next;
    546     }
    547     LOGD("group[%d] is dead", mDeviceSocket);
    548 }
    549 
    550 bool AudioGroup::set(int sampleRate, int sampleCount)
    551 {
    552     mEventQueue = epoll_create(2);
    553     if (mEventQueue == -1) {
    554         LOGE("epoll_create: %s", strerror(errno));
    555         return false;
    556     }
    557 
    558     mSampleRate = sampleRate;
    559     mSampleCount = sampleCount;
    560 
    561     // Create device socket.
    562     int pair[2];
    563     if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {
    564         LOGE("socketpair: %s", strerror(errno));
    565         return false;
    566     }
    567     mDeviceSocket = pair[0];
    568 
    569     // Create device stream.
    570     mChain = new AudioStream;
    571     if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,
    572         sampleRate, sampleCount, -1, -1)) {
    573         close(pair[1]);
    574         LOGE("cannot initialize device stream");
    575         return false;
    576     }
    577 
    578     // Give device socket a reasonable timeout.
    579     timeval tv;
    580     tv.tv_sec = 0;
    581     tv.tv_usec = 1000 * sampleCount / sampleRate * 500;
    582     if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
    583         LOGE("setsockopt: %s", strerror(errno));
    584         return false;
    585     }
    586 
    587     // Add device stream into event queue.
    588     epoll_event event;
    589     event.events = EPOLLIN;
    590     event.data.ptr = mChain;
    591     if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {
    592         LOGE("epoll_ctl: %s", strerror(errno));
    593         return false;
    594     }
    595 
    596     // Anything else?
    597     LOGD("stream[%d] joins group[%d]", pair[1], pair[0]);
    598     return true;
    599 }
    600 
    601 bool AudioGroup::setMode(int mode)
    602 {
    603     if (mode < 0 || mode > LAST_MODE) {
    604         return false;
    605     }
    606     if (mMode == mode) {
    607         return true;
    608     }
    609 
    610     mDeviceThread->requestExitAndWait();
    611     LOGD("group[%d] switches from mode %d to %d", mDeviceSocket, mMode, mode);
    612     mMode = mode;
    613     return (mode == ON_HOLD) || mDeviceThread->start();
    614 }
    615 
    616 bool AudioGroup::sendDtmf(int event)
    617 {
    618     if (event < 0 || event > 15) {
    619         return false;
    620     }
    621 
    622     // DTMF is rarely used, so we try to make it as lightweight as possible.
    623     // Using volatile might be dodgy, but using a pipe or pthread primitives
    624     // or stop-set-restart threads seems too heavy. Will investigate later.
    625     timespec ts;
    626     ts.tv_sec = 0;
    627     ts.tv_nsec = 100000000;
    628     for (int i = 0; mDtmfEvent != -1 && i < 20; ++i) {
    629         nanosleep(&ts, NULL);
    630     }
    631     if (mDtmfEvent != -1) {
    632         return false;
    633     }
    634     mDtmfEvent = event;
    635     nanosleep(&ts, NULL);
    636     return true;
    637 }
    638 
    639 bool AudioGroup::add(AudioStream *stream)
    640 {
    641     mNetworkThread->requestExitAndWait();
    642 
    643     epoll_event event;
    644     event.events = EPOLLIN;
    645     event.data.ptr = stream;
    646     if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, stream->mSocket, &event)) {
    647         LOGE("epoll_ctl: %s", strerror(errno));
    648         return false;
    649     }
    650 
    651     stream->mNext = mChain->mNext;
    652     mChain->mNext = stream;
    653     if (!mNetworkThread->start()) {
    654         // Only take over the stream when succeeded.
    655         mChain->mNext = stream->mNext;
    656         return false;
    657     }
    658 
    659     LOGD("stream[%d] joins group[%d]", stream->mSocket, mDeviceSocket);
    660     return true;
    661 }
    662 
    663 bool AudioGroup::remove(int socket)
    664 {
    665     mNetworkThread->requestExitAndWait();
    666 
    667     for (AudioStream *stream = mChain; stream->mNext; stream = stream->mNext) {
    668         AudioStream *target = stream->mNext;
    669         if (target->mSocket == socket) {
    670             if (epoll_ctl(mEventQueue, EPOLL_CTL_DEL, socket, NULL)) {
    671                 LOGE("epoll_ctl: %s", strerror(errno));
    672                 return false;
    673             }
    674             stream->mNext = target->mNext;
    675             LOGD("stream[%d] leaves group[%d]", socket, mDeviceSocket);
    676             delete target;
    677             break;
    678         }
    679     }
    680 
    681     // Do not start network thread if there is only one stream.
    682     if (!mChain->mNext || !mNetworkThread->start()) {
    683         return false;
    684     }
    685     return true;
    686 }
    687 
    688 bool AudioGroup::NetworkThread::threadLoop()
    689 {
    690     AudioStream *chain = mGroup->mChain;
    691     int tick = elapsedRealtime();
    692     int deadline = tick + 10;
    693     int count = 0;
    694 
    695     for (AudioStream *stream = chain; stream; stream = stream->mNext) {
    696         if (tick - stream->mTick >= 0) {
    697             stream->encode(tick, chain);
    698         }
    699         if (deadline - stream->mTick > 0) {
    700             deadline = stream->mTick;
    701         }
    702         ++count;
    703     }
    704 
    705     int event = mGroup->mDtmfEvent;
    706     if (event != -1) {
    707         for (AudioStream *stream = chain; stream; stream = stream->mNext) {
    708             stream->sendDtmf(event);
    709         }
    710         mGroup->mDtmfEvent = -1;
    711     }
    712 
    713     deadline -= tick;
    714     if (deadline < 1) {
    715         deadline = 1;
    716     }
    717 
    718     epoll_event events[count];
    719     count = epoll_wait(mGroup->mEventQueue, events, count, deadline);
    720     if (count == -1) {
    721         LOGE("epoll_wait: %s", strerror(errno));
    722         return false;
    723     }
    724     for (int i = 0; i < count; ++i) {
    725         ((AudioStream *)events[i].data.ptr)->decode(tick);
    726     }
    727 
    728     return true;
    729 }
    730 
    731 bool AudioGroup::DeviceThread::threadLoop()
    732 {
    733     int mode = mGroup->mMode;
    734     int sampleRate = mGroup->mSampleRate;
    735     int sampleCount = mGroup->mSampleCount;
    736     int deviceSocket = mGroup->mDeviceSocket;
    737 
    738     // Find out the frame count for AudioTrack and AudioRecord.
    739     int output = 0;
    740     int input = 0;
    741     if (AudioTrack::getMinFrameCount(&output, AudioSystem::VOICE_CALL,
    742         sampleRate) != NO_ERROR || output <= 0 ||
    743         AudioRecord::getMinFrameCount(&input, sampleRate,
    744         AudioSystem::PCM_16_BIT, 1) != NO_ERROR || input <= 0) {
    745         LOGE("cannot compute frame count");
    746         return false;
    747     }
    748     LOGD("reported frame count: output %d, input %d", output, input);
    749 
    750     if (output < sampleCount * 2) {
    751         output = sampleCount * 2;
    752     }
    753     if (input < sampleCount * 2) {
    754         input = sampleCount * 2;
    755     }
    756     LOGD("adjusted frame count: output %d, input %d", output, input);
    757 
    758     // Initialize AudioTrack and AudioRecord.
    759     AudioTrack track;
    760     AudioRecord record;
    761     if (track.set(AudioSystem::VOICE_CALL, sampleRate, AudioSystem::PCM_16_BIT,
    762         AudioSystem::CHANNEL_OUT_MONO, output) != NO_ERROR ||
    763         record.set(AUDIO_SOURCE_MIC, sampleRate, AudioSystem::PCM_16_BIT,
    764         AudioSystem::CHANNEL_IN_MONO, input) != NO_ERROR) {
    765         LOGE("cannot initialize audio device");
    766         return false;
    767     }
    768     LOGD("latency: output %d, input %d", track.latency(), record.latency());
    769 
    770     // Initialize echo canceler.
    771     EchoSuppressor echo(sampleCount,
    772         (track.latency() + record.latency()) * sampleRate / 1000);
    773 
    774     // Give device socket a reasonable buffer size.
    775     setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));
    776     setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));
    777 
    778     // Drain device socket.
    779     char c;
    780     while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
    781 
    782     // Start AudioRecord before AudioTrack. This prevents AudioTrack from being
    783     // disabled due to buffer underrun while waiting for AudioRecord.
    784     if (mode != MUTED) {
    785         record.start();
    786         int16_t one;
    787         record.read(&one, sizeof(one));
    788     }
    789     track.start();
    790 
    791     while (!exitPending()) {
    792         int16_t output[sampleCount];
    793         if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {
    794             memset(output, 0, sizeof(output));
    795         }
    796 
    797         int16_t input[sampleCount];
    798         int toWrite = sampleCount;
    799         int toRead = (mode == MUTED) ? 0 : sampleCount;
    800         int chances = 100;
    801 
    802         while (--chances > 0 && (toWrite > 0 || toRead > 0)) {
    803             if (toWrite > 0) {
    804                 AudioTrack::Buffer buffer;
    805                 buffer.frameCount = toWrite;
    806 
    807                 status_t status = track.obtainBuffer(&buffer, 1);
    808                 if (status == NO_ERROR) {
    809                     int offset = sampleCount - toWrite;
    810                     memcpy(buffer.i8, &output[offset], buffer.size);
    811                     toWrite -= buffer.frameCount;
    812                     track.releaseBuffer(&buffer);
    813                 } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
    814                     LOGE("cannot write to AudioTrack");
    815                     return true;
    816                 }
    817             }
    818 
    819             if (toRead > 0) {
    820                 AudioRecord::Buffer buffer;
    821                 buffer.frameCount = toRead;
    822 
    823                 status_t status = record.obtainBuffer(&buffer, 1);
    824                 if (status == NO_ERROR) {
    825                     int offset = sampleCount - toRead;
    826                     memcpy(&input[offset], buffer.i8, buffer.size);
    827                     toRead -= buffer.frameCount;
    828                     record.releaseBuffer(&buffer);
    829                 } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
    830                     LOGE("cannot read from AudioRecord");
    831                     return true;
    832                 }
    833             }
    834         }
    835 
    836         if (chances <= 0) {
    837             LOGW("device loop timeout");
    838             while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
    839         }
    840 
    841         if (mode != MUTED) {
    842             if (mode == NORMAL) {
    843                 send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
    844             } else {
    845                 echo.run(output, input);
    846                 send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
    847             }
    848         }
    849     }
    850     return false;
    851 }
    852 
    853 //------------------------------------------------------------------------------
    854 
    855 static jfieldID gNative;
    856 static jfieldID gMode;
    857 
    858 void add(JNIEnv *env, jobject thiz, jint mode,
    859     jint socket, jstring jRemoteAddress, jint remotePort,
    860     jstring jCodecSpec, jint dtmfType)
    861 {
    862     AudioCodec *codec = NULL;
    863     AudioStream *stream = NULL;
    864     AudioGroup *group = NULL;
    865 
    866     // Sanity check.
    867     sockaddr_storage remote;
    868     if (parse(env, jRemoteAddress, remotePort, &remote) < 0) {
    869         // Exception already thrown.
    870         return;
    871     }
    872     if (!jCodecSpec) {
    873         jniThrowNullPointerException(env, "codecSpec");
    874         return;
    875     }
    876     const char *codecSpec = env->GetStringUTFChars(jCodecSpec, NULL);
    877     if (!codecSpec) {
    878         // Exception already thrown.
    879         return;
    880     }
    881 
    882     // Create audio codec.
    883     int codecType = -1;
    884     char codecName[16];
    885     int sampleRate = -1;
    886     sscanf(codecSpec, "%d %[^/]%*c%d", &codecType, codecName, &sampleRate);
    887     codec = newAudioCodec(codecName);
    888     int sampleCount = (codec ? codec->set(sampleRate, codecSpec) : -1);
    889     env->ReleaseStringUTFChars(jCodecSpec, codecSpec);
    890     if (sampleCount <= 0) {
    891         jniThrowException(env, "java/lang/IllegalStateException",
    892             "cannot initialize audio codec");
    893         goto error;
    894     }
    895 
    896     // Create audio stream.
    897     stream = new AudioStream;
    898     if (!stream->set(mode, socket, &remote, codec, sampleRate, sampleCount,
    899         codecType, dtmfType)) {
    900         jniThrowException(env, "java/lang/IllegalStateException",
    901             "cannot initialize audio stream");
    902         goto error;
    903     }
    904     socket = -1;
    905     codec = NULL;
    906 
    907     // Create audio group.
    908     group = (AudioGroup *)env->GetIntField(thiz, gNative);
    909     if (!group) {
    910         int mode = env->GetIntField(thiz, gMode);
    911         group = new AudioGroup;
    912         if (!group->set(8000, 256) || !group->setMode(mode)) {
    913             jniThrowException(env, "java/lang/IllegalStateException",
    914                 "cannot initialize audio group");
    915             goto error;
    916         }
    917     }
    918 
    919     // Add audio stream into audio group.
    920     if (!group->add(stream)) {
    921         jniThrowException(env, "java/lang/IllegalStateException",
    922             "cannot add audio stream");
    923         goto error;
    924     }
    925 
    926     // Succeed.
    927     env->SetIntField(thiz, gNative, (int)group);
    928     return;
    929 
    930 error:
    931     delete group;
    932     delete stream;
    933     delete codec;
    934     close(socket);
    935     env->SetIntField(thiz, gNative, NULL);
    936 }
    937 
    938 void remove(JNIEnv *env, jobject thiz, jint socket)
    939 {
    940     AudioGroup *group = (AudioGroup *)env->GetIntField(thiz, gNative);
    941     if (group) {
    942         if (socket == -1 || !group->remove(socket)) {
    943             delete group;
    944             env->SetIntField(thiz, gNative, NULL);
    945         }
    946     }
    947 }
    948 
    949 void setMode(JNIEnv *env, jobject thiz, jint mode)
    950 {
    951     if (mode < 0 || mode > AudioGroup::LAST_MODE) {
    952         jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
    953         return;
    954     }
    955     AudioGroup *group = (AudioGroup *)env->GetIntField(thiz, gNative);
    956     if (group && !group->setMode(mode)) {
    957         jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
    958         return;
    959     }
    960     env->SetIntField(thiz, gMode, mode);
    961 }
    962 
    963 void sendDtmf(JNIEnv *env, jobject thiz, jint event)
    964 {
    965     AudioGroup *group = (AudioGroup *)env->GetIntField(thiz, gNative);
    966     if (group && !group->sendDtmf(event)) {
    967         jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
    968     }
    969 }
    970 
    971 JNINativeMethod gMethods[] = {
    972     {"add", "(IILjava/lang/String;ILjava/lang/String;I)V", (void *)add},
    973     {"remove", "(I)V", (void *)remove},
    974     {"setMode", "(I)V", (void *)setMode},
    975     {"sendDtmf", "(I)V", (void *)sendDtmf},
    976 };
    977 
    978 } // namespace
    979 
    980 int registerAudioGroup(JNIEnv *env)
    981 {
    982     gRandom = open("/dev/urandom", O_RDONLY);
    983     if (gRandom == -1) {
    984         LOGE("urandom: %s", strerror(errno));
    985         return -1;
    986     }
    987 
    988     jclass clazz;
    989     if ((clazz = env->FindClass("android/net/rtp/AudioGroup")) == NULL ||
    990         (gNative = env->GetFieldID(clazz, "mNative", "I")) == NULL ||
    991         (gMode = env->GetFieldID(clazz, "mMode", "I")) == NULL ||
    992         env->RegisterNatives(clazz, gMethods, NELEM(gMethods)) < 0) {
    993         LOGE("JNI registration failed");
    994         return -1;
    995     }
    996     return 0;
    997 }
    998