Home | History | Annotate | Download | only in rtp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include <stdio.h>
     18 #include <stdint.h>
     19 #include <string.h>
     20 #include <errno.h>
     21 #include <fcntl.h>
     22 #include <sys/epoll.h>
     23 #include <sys/types.h>
     24 #include <sys/socket.h>
     25 #include <sys/stat.h>
     26 #include <sys/time.h>
     27 #include <time.h>
     28 #include <arpa/inet.h>
     29 #include <netinet/in.h>
     30 
     31 // #define LOG_NDEBUG 0
     32 #define LOG_TAG "AudioGroup"
     33 #include <cutils/atomic.h>
     34 #include <cutils/properties.h>
     35 #include <utils/Log.h>
     36 #include <utils/Errors.h>
     37 #include <utils/RefBase.h>
     38 #include <utils/threads.h>
     39 #include <utils/SystemClock.h>
     40 #include <media/AudioSystem.h>
     41 #include <media/AudioRecord.h>
     42 #include <media/AudioTrack.h>
     43 #include <media/mediarecorder.h>
     44 #include <media/AudioEffect.h>
     45 #include <audio_effects/effect_aec.h>
     46 #include <system/audio.h>
     47 
     48 #include "jni.h"
     49 #include "JNIHelp.h"
     50 
     51 #include "AudioCodec.h"
     52 #include "EchoSuppressor.h"
     53 
     54 extern int parse(JNIEnv *env, jstring jAddress, int port, sockaddr_storage *ss);
     55 
     56 namespace {
     57 
     58 using namespace android;
     59 
     60 int gRandom = -1;
     61 
     62 // We use a circular array to implement jitter buffer. The simplest way is doing
     63 // a modulo operation on the index while accessing the array. However modulo can
     64 // be expensive on some platforms, such as ARM. Thus we round up the size of the
     65 // array to the nearest power of 2 and then use bitwise-and instead of modulo.
     66 // Currently we make it 2048ms long and assume packet interval is 50ms or less.
     67 // The first 100ms is the place where samples get mixed. The rest is the real
     68 // jitter buffer. For a stream at 8000Hz it takes 32 kilobytes. These numbers
     69 // are chosen by experiments and each of them can be adjusted as needed.
     70 
     71 // Originally a stream does not send packets when it is receive-only or there is
     72 // nothing to mix. However, this causes some problems with certain firewalls and
     73 // proxies. A firewall might remove a port mapping when there is no outgoing
     74 // packet for a preiod of time, and a proxy might wait for incoming packets from
     75 // both sides before start forwarding. To solve these problems, we send out a
     76 // silence packet on the stream for every second. It should be good enough to
     77 // keep the stream alive with relatively low resources.
     78 
     79 // Other notes:
     80 // + We use elapsedRealtime() to get the time. Since we use 32bit variables
     81 //   instead of 64bit ones, comparison must be done by subtraction.
     82 // + Sampling rate must be multiple of 1000Hz, and packet length must be in
     83 //   milliseconds. No floating points.
     84 // + If we cannot get enough CPU, we drop samples and simulate packet loss.
     85 // + Resampling is not done yet, so streams in one group must use the same rate.
     86 //   For the first release only 8000Hz is supported.
     87 
     88 #define BUFFER_SIZE     2048
     89 #define HISTORY_SIZE    100
     90 #define MEASURE_BASE    100
     91 #define MEASURE_PERIOD  5000
     92 #define DTMF_PERIOD     200
     93 
     94 class AudioStream
     95 {
     96 public:
     97     AudioStream();
     98     ~AudioStream();
     99     bool set(int mode, int socket, sockaddr_storage *remote,
    100         AudioCodec *codec, int sampleRate, int sampleCount,
    101         int codecType, int dtmfType);
    102 
    103     void sendDtmf(int event);
    104     bool mix(int32_t *output, int head, int tail, int sampleRate);
    105     void encode(int tick, AudioStream *chain);
    106     void decode(int tick);
    107 
    108 private:
    109     enum {
    110         NORMAL = 0,
    111         SEND_ONLY = 1,
    112         RECEIVE_ONLY = 2,
    113         LAST_MODE = 2,
    114     };
    115 
    116     int mMode;
    117     int mSocket;
    118     sockaddr_storage mRemote;
    119     AudioCodec *mCodec;
    120     uint32_t mCodecMagic;
    121     uint32_t mDtmfMagic;
    122     bool mFixRemote;
    123 
    124     int mTick;
    125     int mSampleRate;
    126     int mSampleCount;
    127     int mInterval;
    128     int mKeepAlive;
    129 
    130     int16_t *mBuffer;
    131     int mBufferMask;
    132     int mBufferHead;
    133     int mBufferTail;
    134     int mLatencyTimer;
    135     int mLatencyScore;
    136 
    137     uint16_t mSequence;
    138     uint32_t mTimestamp;
    139     uint32_t mSsrc;
    140 
    141     int mDtmfEvent;
    142     int mDtmfStart;
    143 
    144     AudioStream *mNext;
    145 
    146     friend class AudioGroup;
    147 };
    148 
    149 AudioStream::AudioStream()
    150 {
    151     mSocket = -1;
    152     mCodec = NULL;
    153     mBuffer = NULL;
    154     mNext = NULL;
    155 }
    156 
    157 AudioStream::~AudioStream()
    158 {
    159     close(mSocket);
    160     delete mCodec;
    161     delete [] mBuffer;
    162     ALOGD("stream[%d] is dead", mSocket);
    163 }
    164 
    165 bool AudioStream::set(int mode, int socket, sockaddr_storage *remote,
    166     AudioCodec *codec, int sampleRate, int sampleCount,
    167     int codecType, int dtmfType)
    168 {
    169     if (mode < 0 || mode > LAST_MODE) {
    170         return false;
    171     }
    172     mMode = mode;
    173 
    174     mCodecMagic = (0x8000 | codecType) << 16;
    175     mDtmfMagic = (dtmfType == -1) ? 0 : (0x8000 | dtmfType) << 16;
    176 
    177     mTick = elapsedRealtime();
    178     mSampleRate = sampleRate / 1000;
    179     mSampleCount = sampleCount;
    180     mInterval = mSampleCount / mSampleRate;
    181 
    182     // Allocate jitter buffer.
    183     for (mBufferMask = 8; mBufferMask < mSampleRate; mBufferMask <<= 1);
    184     mBufferMask *= BUFFER_SIZE;
    185     mBuffer = new int16_t[mBufferMask];
    186     --mBufferMask;
    187     mBufferHead = 0;
    188     mBufferTail = 0;
    189     mLatencyTimer = 0;
    190     mLatencyScore = 0;
    191 
    192     // Initialize random bits.
    193     read(gRandom, &mSequence, sizeof(mSequence));
    194     read(gRandom, &mTimestamp, sizeof(mTimestamp));
    195     read(gRandom, &mSsrc, sizeof(mSsrc));
    196 
    197     mDtmfEvent = -1;
    198     mDtmfStart = 0;
    199 
    200     // Only take over these things when succeeded.
    201     mSocket = socket;
    202     if (codec) {
    203         mRemote = *remote;
    204         mCodec = codec;
    205 
    206         // Here we should never get an private address, but some buggy proxy
    207         // servers do give us one. To solve this, we replace the address when
    208         // the first time we successfully decode an incoming packet.
    209         mFixRemote = false;
    210         if (remote->ss_family == AF_INET) {
    211             unsigned char *address =
    212                 (unsigned char *)&((sockaddr_in *)remote)->sin_addr;
    213             if (address[0] == 10 ||
    214                 (address[0] == 172 && (address[1] >> 4) == 1) ||
    215                 (address[0] == 192 && address[1] == 168)) {
    216                 mFixRemote = true;
    217             }
    218         }
    219     }
    220 
    221     ALOGD("stream[%d] is configured as %s %dkHz %dms mode %d", mSocket,
    222         (codec ? codec->name : "RAW"), mSampleRate, mInterval, mMode);
    223     return true;
    224 }
    225 
    226 void AudioStream::sendDtmf(int event)
    227 {
    228     if (mDtmfMagic != 0) {
    229         mDtmfEvent = event << 24;
    230         mDtmfStart = mTimestamp + mSampleCount;
    231     }
    232 }
    233 
    234 bool AudioStream::mix(int32_t *output, int head, int tail, int sampleRate)
    235 {
    236     if (mMode == SEND_ONLY) {
    237         return false;
    238     }
    239 
    240     if (head - mBufferHead < 0) {
    241         head = mBufferHead;
    242     }
    243     if (tail - mBufferTail > 0) {
    244         tail = mBufferTail;
    245     }
    246     if (tail - head <= 0) {
    247         return false;
    248     }
    249 
    250     head *= mSampleRate;
    251     tail *= mSampleRate;
    252 
    253     if (sampleRate == mSampleRate) {
    254         for (int i = head; i - tail < 0; ++i) {
    255             output[i - head] += mBuffer[i & mBufferMask];
    256         }
    257     } else {
    258         // TODO: implement resampling.
    259         return false;
    260     }
    261     return true;
    262 }
    263 
    264 void AudioStream::encode(int tick, AudioStream *chain)
    265 {
    266     if (tick - mTick >= mInterval) {
    267         // We just missed the train. Pretend that packets in between are lost.
    268         int skipped = (tick - mTick) / mInterval;
    269         mTick += skipped * mInterval;
    270         mSequence += skipped;
    271         mTimestamp += skipped * mSampleCount;
    272         ALOGV("stream[%d] skips %d packets", mSocket, skipped);
    273     }
    274 
    275     tick = mTick;
    276     mTick += mInterval;
    277     ++mSequence;
    278     mTimestamp += mSampleCount;
    279 
    280     // If there is an ongoing DTMF event, send it now.
    281     if (mMode != RECEIVE_ONLY && mDtmfEvent != -1) {
    282         int duration = mTimestamp - mDtmfStart;
    283         // Make sure duration is reasonable.
    284         if (duration >= 0 && duration < mSampleRate * DTMF_PERIOD) {
    285             duration += mSampleCount;
    286             int32_t buffer[4] = {
    287                 static_cast<int32_t>(htonl(mDtmfMagic | mSequence)),
    288                 static_cast<int32_t>(htonl(mDtmfStart)),
    289                 static_cast<int32_t>(mSsrc),
    290                 static_cast<int32_t>(htonl(mDtmfEvent | duration)),
    291             };
    292             if (duration >= mSampleRate * DTMF_PERIOD) {
    293                 buffer[3] |= htonl(1 << 23);
    294                 mDtmfEvent = -1;
    295             }
    296             sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,
    297                 (sockaddr *)&mRemote, sizeof(mRemote));
    298             return;
    299         }
    300         mDtmfEvent = -1;
    301     }
    302 
    303     int32_t buffer[mSampleCount + 3];
    304     bool data = false;
    305     if (mMode != RECEIVE_ONLY) {
    306         // Mix all other streams.
    307         memset(buffer, 0, sizeof(buffer));
    308         while (chain) {
    309             if (chain != this) {
    310                 data |= chain->mix(buffer, tick - mInterval, tick, mSampleRate);
    311             }
    312             chain = chain->mNext;
    313         }
    314     }
    315 
    316     int16_t samples[mSampleCount];
    317     if (data) {
    318         // Saturate into 16 bits.
    319         for (int i = 0; i < mSampleCount; ++i) {
    320             int32_t sample = buffer[i];
    321             if (sample < -32768) {
    322                 sample = -32768;
    323             }
    324             if (sample > 32767) {
    325                 sample = 32767;
    326             }
    327             samples[i] = sample;
    328         }
    329     } else {
    330         if ((mTick ^ mKeepAlive) >> 10 == 0) {
    331             return;
    332         }
    333         mKeepAlive = mTick;
    334         memset(samples, 0, sizeof(samples));
    335 
    336         if (mMode != RECEIVE_ONLY) {
    337             ALOGV("stream[%d] no data", mSocket);
    338         }
    339     }
    340 
    341     if (!mCodec) {
    342         // Special case for device stream.
    343         send(mSocket, samples, sizeof(samples), MSG_DONTWAIT);
    344         return;
    345     }
    346 
    347     // Cook the packet and send it out.
    348     buffer[0] = htonl(mCodecMagic | mSequence);
    349     buffer[1] = htonl(mTimestamp);
    350     buffer[2] = mSsrc;
    351     int length = mCodec->encode(&buffer[3], samples);
    352     if (length <= 0) {
    353         ALOGV("stream[%d] encoder error", mSocket);
    354         return;
    355     }
    356     sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,
    357         sizeof(mRemote));
    358 }
    359 
    360 void AudioStream::decode(int tick)
    361 {
    362     char c;
    363     if (mMode == SEND_ONLY) {
    364         recv(mSocket, &c, 1, MSG_DONTWAIT);
    365         return;
    366     }
    367 
    368     // Make sure mBufferHead and mBufferTail are reasonable.
    369     if ((unsigned int)(tick + BUFFER_SIZE - mBufferHead) > BUFFER_SIZE * 2) {
    370         mBufferHead = tick - HISTORY_SIZE;
    371         mBufferTail = mBufferHead;
    372     }
    373 
    374     if (tick - mBufferHead > HISTORY_SIZE) {
    375         // Throw away outdated samples.
    376         mBufferHead = tick - HISTORY_SIZE;
    377         if (mBufferTail - mBufferHead < 0) {
    378             mBufferTail = mBufferHead;
    379         }
    380     }
    381 
    382     // Adjust the jitter buffer if the latency keeps larger than the threshold
    383     // in the measurement period.
    384     int score = mBufferTail - tick - MEASURE_BASE;
    385     if (mLatencyScore > score || mLatencyScore <= 0) {
    386         mLatencyScore = score;
    387         mLatencyTimer = tick;
    388     } else if (tick - mLatencyTimer >= MEASURE_PERIOD) {
    389         ALOGV("stream[%d] reduces latency of %dms", mSocket, mLatencyScore);
    390         mBufferTail -= mLatencyScore;
    391         mLatencyScore = -1;
    392     }
    393 
    394     int count = (BUFFER_SIZE - (mBufferTail - mBufferHead)) * mSampleRate;
    395     if (count < mSampleCount) {
    396         // Buffer overflow. Drop the packet.
    397         ALOGV("stream[%d] buffer overflow", mSocket);
    398         recv(mSocket, &c, 1, MSG_DONTWAIT);
    399         return;
    400     }
    401 
    402     // Receive the packet and decode it.
    403     int16_t samples[count];
    404     if (!mCodec) {
    405         // Special case for device stream.
    406         count = recv(mSocket, samples, sizeof(samples),
    407             MSG_TRUNC | MSG_DONTWAIT) >> 1;
    408     } else {
    409         __attribute__((aligned(4))) uint8_t buffer[2048];
    410         sockaddr_storage remote;
    411         socklen_t addrlen = sizeof(remote);
    412 
    413         int length = recvfrom(mSocket, buffer, sizeof(buffer),
    414             MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &addrlen);
    415 
    416         // Do we need to check SSRC, sequence, and timestamp? They are not
    417         // reliable but at least they can be used to identify duplicates?
    418         if (length < 12 || length > (int)sizeof(buffer) ||
    419             (ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {
    420             ALOGV("stream[%d] malformed packet", mSocket);
    421             return;
    422         }
    423         int offset = 12 + ((buffer[0] & 0x0F) << 2);
    424         if ((buffer[0] & 0x10) != 0) {
    425             offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);
    426         }
    427         if ((buffer[0] & 0x20) != 0) {
    428             length -= buffer[length - 1];
    429         }
    430         length -= offset;
    431         if (length >= 0) {
    432             length = mCodec->decode(samples, count, &buffer[offset], length);
    433         }
    434         if (length > 0 && mFixRemote) {
    435             mRemote = remote;
    436             mFixRemote = false;
    437         }
    438         count = length;
    439     }
    440     if (count <= 0) {
    441         ALOGV("stream[%d] decoder error", mSocket);
    442         return;
    443     }
    444 
    445     if (tick - mBufferTail > 0) {
    446         // Buffer underrun. Reset the jitter buffer.
    447         ALOGV("stream[%d] buffer underrun", mSocket);
    448         if (mBufferTail - mBufferHead <= 0) {
    449             mBufferHead = tick + mInterval;
    450             mBufferTail = mBufferHead;
    451         } else {
    452             int tail = (tick + mInterval) * mSampleRate;
    453             for (int i = mBufferTail * mSampleRate; i - tail < 0; ++i) {
    454                 mBuffer[i & mBufferMask] = 0;
    455             }
    456             mBufferTail = tick + mInterval;
    457         }
    458     }
    459 
    460     // Append to the jitter buffer.
    461     int tail = mBufferTail * mSampleRate;
    462     for (int i = 0; i < count; ++i) {
    463         mBuffer[tail & mBufferMask] = samples[i];
    464         ++tail;
    465     }
    466     mBufferTail += mInterval;
    467 }
    468 
    469 //------------------------------------------------------------------------------
    470 
    471 class AudioGroup
    472 {
    473 public:
    474     AudioGroup();
    475     ~AudioGroup();
    476     bool set(int sampleRate, int sampleCount);
    477 
    478     bool setMode(int mode);
    479     bool sendDtmf(int event);
    480     bool add(AudioStream *stream);
    481     bool remove(AudioStream *stream);
    482     bool platformHasAec() { return mPlatformHasAec; }
    483 
    484 private:
    485     enum {
    486         ON_HOLD = 0,
    487         MUTED = 1,
    488         NORMAL = 2,
    489         ECHO_SUPPRESSION = 3,
    490         LAST_MODE = 3,
    491     };
    492 
    493     bool checkPlatformAec();
    494 
    495     AudioStream *mChain;
    496     int mEventQueue;
    497     volatile int mDtmfEvent;
    498 
    499     int mMode;
    500     int mSampleRate;
    501     int mSampleCount;
    502     int mDeviceSocket;
    503     bool mPlatformHasAec;
    504 
    505     class NetworkThread : public Thread
    506     {
    507     public:
    508         NetworkThread(AudioGroup *group) : Thread(false), mGroup(group) {}
    509 
    510         bool start()
    511         {
    512             if (run("Network", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
    513                 ALOGE("cannot start network thread");
    514                 return false;
    515             }
    516             return true;
    517         }
    518 
    519     private:
    520         AudioGroup *mGroup;
    521         bool threadLoop();
    522     };
    523     sp<NetworkThread> mNetworkThread;
    524 
    525     class DeviceThread : public Thread
    526     {
    527     public:
    528         DeviceThread(AudioGroup *group) : Thread(false), mGroup(group) {}
    529 
    530         bool start()
    531         {
    532             if (run("Device", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
    533                 ALOGE("cannot start device thread");
    534                 return false;
    535             }
    536             return true;
    537         }
    538 
    539     private:
    540         AudioGroup *mGroup;
    541         bool threadLoop();
    542     };
    543     sp<DeviceThread> mDeviceThread;
    544 };
    545 
    546 AudioGroup::AudioGroup()
    547 {
    548     mMode = ON_HOLD;
    549     mChain = NULL;
    550     mEventQueue = -1;
    551     mDtmfEvent = -1;
    552     mDeviceSocket = -1;
    553     mNetworkThread = new NetworkThread(this);
    554     mDeviceThread = new DeviceThread(this);
    555     mPlatformHasAec = checkPlatformAec();
    556 }
    557 
    558 AudioGroup::~AudioGroup()
    559 {
    560     mNetworkThread->requestExitAndWait();
    561     mDeviceThread->requestExitAndWait();
    562     close(mEventQueue);
    563     close(mDeviceSocket);
    564     while (mChain) {
    565         AudioStream *next = mChain->mNext;
    566         delete mChain;
    567         mChain = next;
    568     }
    569     ALOGD("group[%d] is dead", mDeviceSocket);
    570 }
    571 
    572 bool AudioGroup::set(int sampleRate, int sampleCount)
    573 {
    574     mEventQueue = epoll_create(2);
    575     if (mEventQueue == -1) {
    576         ALOGE("epoll_create: %s", strerror(errno));
    577         return false;
    578     }
    579 
    580     mSampleRate = sampleRate;
    581     mSampleCount = sampleCount;
    582 
    583     // Create device socket.
    584     int pair[2];
    585     if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {
    586         ALOGE("socketpair: %s", strerror(errno));
    587         return false;
    588     }
    589     mDeviceSocket = pair[0];
    590 
    591     // Create device stream.
    592     mChain = new AudioStream;
    593     if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,
    594         sampleRate, sampleCount, -1, -1)) {
    595         close(pair[1]);
    596         ALOGE("cannot initialize device stream");
    597         return false;
    598     }
    599 
    600     // Give device socket a reasonable timeout.
    601     timeval tv;
    602     tv.tv_sec = 0;
    603     tv.tv_usec = 1000 * sampleCount / sampleRate * 500;
    604     if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
    605         ALOGE("setsockopt: %s", strerror(errno));
    606         return false;
    607     }
    608 
    609     // Add device stream into event queue.
    610     epoll_event event;
    611     event.events = EPOLLIN;
    612     event.data.ptr = mChain;
    613     if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {
    614         ALOGE("epoll_ctl: %s", strerror(errno));
    615         return false;
    616     }
    617 
    618     // Anything else?
    619     ALOGD("stream[%d] joins group[%d]", pair[1], pair[0]);
    620     return true;
    621 }
    622 
    623 bool AudioGroup::setMode(int mode)
    624 {
    625     if (mode < 0 || mode > LAST_MODE) {
    626         return false;
    627     }
    628     // FIXME: temporary code to overcome echo and mic gain issues on herring and tuna boards.
    629     // Must be modified/removed when the root cause of the issue is fixed in the hardware or
    630     // driver
    631     char value[PROPERTY_VALUE_MAX];
    632     property_get("ro.product.board", value, "");
    633     if (mode == NORMAL &&
    634             (!strcmp(value, "herring") || !strcmp(value, "tuna"))) {
    635         mode = ECHO_SUPPRESSION;
    636     }
    637     if (mMode == mode) {
    638         return true;
    639     }
    640 
    641     mDeviceThread->requestExitAndWait();
    642     ALOGD("group[%d] switches from mode %d to %d", mDeviceSocket, mMode, mode);
    643     mMode = mode;
    644     return (mode == ON_HOLD) || mDeviceThread->start();
    645 }
    646 
    647 bool AudioGroup::sendDtmf(int event)
    648 {
    649     if (event < 0 || event > 15) {
    650         return false;
    651     }
    652 
    653     // DTMF is rarely used, so we try to make it as lightweight as possible.
    654     // Using volatile might be dodgy, but using a pipe or pthread primitives
    655     // or stop-set-restart threads seems too heavy. Will investigate later.
    656     timespec ts;
    657     ts.tv_sec = 0;
    658     ts.tv_nsec = 100000000;
    659     for (int i = 0; mDtmfEvent != -1 && i < 20; ++i) {
    660         nanosleep(&ts, NULL);
    661     }
    662     if (mDtmfEvent != -1) {
    663         return false;
    664     }
    665     mDtmfEvent = event;
    666     nanosleep(&ts, NULL);
    667     return true;
    668 }
    669 
    670 bool AudioGroup::add(AudioStream *stream)
    671 {
    672     mNetworkThread->requestExitAndWait();
    673 
    674     epoll_event event;
    675     event.events = EPOLLIN;
    676     event.data.ptr = stream;
    677     if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, stream->mSocket, &event)) {
    678         ALOGE("epoll_ctl: %s", strerror(errno));
    679         return false;
    680     }
    681 
    682     stream->mNext = mChain->mNext;
    683     mChain->mNext = stream;
    684     if (!mNetworkThread->start()) {
    685         // Only take over the stream when succeeded.
    686         mChain->mNext = stream->mNext;
    687         return false;
    688     }
    689 
    690     ALOGD("stream[%d] joins group[%d]", stream->mSocket, mDeviceSocket);
    691     return true;
    692 }
    693 
    694 bool AudioGroup::remove(AudioStream *stream)
    695 {
    696     mNetworkThread->requestExitAndWait();
    697 
    698     for (AudioStream *chain = mChain; chain->mNext; chain = chain->mNext) {
    699         if (chain->mNext == stream) {
    700             if (epoll_ctl(mEventQueue, EPOLL_CTL_DEL, stream->mSocket, NULL)) {
    701                 ALOGE("epoll_ctl: %s", strerror(errno));
    702                 return false;
    703             }
    704             chain->mNext = stream->mNext;
    705             ALOGD("stream[%d] leaves group[%d]", stream->mSocket, mDeviceSocket);
    706             delete stream;
    707             break;
    708         }
    709     }
    710 
    711     // Do not start network thread if there is only one stream.
    712     if (!mChain->mNext || !mNetworkThread->start()) {
    713         return false;
    714     }
    715     return true;
    716 }
    717 
    718 bool AudioGroup::NetworkThread::threadLoop()
    719 {
    720     AudioStream *chain = mGroup->mChain;
    721     int tick = elapsedRealtime();
    722     int deadline = tick + 10;
    723     int count = 0;
    724 
    725     for (AudioStream *stream = chain; stream; stream = stream->mNext) {
    726         if (tick - stream->mTick >= 0) {
    727             stream->encode(tick, chain);
    728         }
    729         if (deadline - stream->mTick > 0) {
    730             deadline = stream->mTick;
    731         }
    732         ++count;
    733     }
    734 
    735     int event = mGroup->mDtmfEvent;
    736     if (event != -1) {
    737         for (AudioStream *stream = chain; stream; stream = stream->mNext) {
    738             stream->sendDtmf(event);
    739         }
    740         mGroup->mDtmfEvent = -1;
    741     }
    742 
    743     deadline -= tick;
    744     if (deadline < 1) {
    745         deadline = 1;
    746     }
    747 
    748     epoll_event events[count];
    749     count = epoll_wait(mGroup->mEventQueue, events, count, deadline);
    750     if (count == -1) {
    751         ALOGE("epoll_wait: %s", strerror(errno));
    752         return false;
    753     }
    754     for (int i = 0; i < count; ++i) {
    755         ((AudioStream *)events[i].data.ptr)->decode(tick);
    756     }
    757 
    758     return true;
    759 }
    760 
    761 bool AudioGroup::checkPlatformAec()
    762 {
    763     effect_descriptor_t fxDesc;
    764     uint32_t numFx;
    765 
    766     if (AudioEffect::queryNumberEffects(&numFx) != NO_ERROR) {
    767         return false;
    768     }
    769     for (uint32_t i = 0; i < numFx; i++) {
    770         if (AudioEffect::queryEffect(i, &fxDesc) != NO_ERROR) {
    771             continue;
    772         }
    773         if (memcmp(&fxDesc.type, FX_IID_AEC, sizeof(effect_uuid_t)) == 0) {
    774             return true;
    775         }
    776     }
    777     return false;
    778 }
    779 
    780 bool AudioGroup::DeviceThread::threadLoop()
    781 {
    782     int mode = mGroup->mMode;
    783     int sampleRate = mGroup->mSampleRate;
    784     int sampleCount = mGroup->mSampleCount;
    785     int deviceSocket = mGroup->mDeviceSocket;
    786 
    787     // Find out the frame count for AudioTrack and AudioRecord.
    788     size_t output = 0;
    789     size_t input = 0;
    790     if (AudioTrack::getMinFrameCount(&output, AUDIO_STREAM_VOICE_CALL,
    791         sampleRate) != NO_ERROR || output <= 0 ||
    792         AudioRecord::getMinFrameCount(&input, sampleRate,
    793         AUDIO_FORMAT_PCM_16_BIT, AUDIO_CHANNEL_IN_MONO) != NO_ERROR || input <= 0) {
    794         ALOGE("cannot compute frame count");
    795         return false;
    796     }
    797     ALOGD("reported frame count: output %d, input %d", output, input);
    798 
    799     if (output < sampleCount * 2) {
    800         output = sampleCount * 2;
    801     }
    802     if (input < sampleCount * 2) {
    803         input = sampleCount * 2;
    804     }
    805     ALOGD("adjusted frame count: output %d, input %d", output, input);
    806 
    807     // Initialize AudioTrack and AudioRecord.
    808     sp<AudioTrack> track = new AudioTrack();
    809     sp<AudioRecord> record = new AudioRecord();
    810     if (track->set(AUDIO_STREAM_VOICE_CALL, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
    811                 AUDIO_CHANNEL_OUT_MONO, output, AUDIO_OUTPUT_FLAG_NONE, NULL /*callback_t*/,
    812                 NULL /*user*/, 0 /*notificationFrames*/, 0 /*sharedBuffer*/,
    813                 false /*threadCanCallJava*/, 0 /*sessionId*/,
    814                 AudioTrack::TRANSFER_OBTAIN) != NO_ERROR ||
    815             record->set(AUDIO_SOURCE_VOICE_COMMUNICATION, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
    816                 AUDIO_CHANNEL_IN_MONO, input, NULL /*callback_t*/, NULL /*user*/,
    817                 0 /*notificationFrames*/, false /*threadCanCallJava*/, 0 /*sessionId*/,
    818                 AudioRecord::TRANSFER_OBTAIN) != NO_ERROR) {
    819         ALOGE("cannot initialize audio device");
    820         return false;
    821     }
    822     ALOGD("latency: output %d, input %d", track->latency(), record->latency());
    823 
    824     // Give device socket a reasonable buffer size.
    825     setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));
    826     setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));
    827 
    828     // Drain device socket.
    829     char c;
    830     while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
    831 
    832     // check if platform supports echo cancellation and do not active local echo suppression in
    833     // this case
    834     EchoSuppressor *echo = NULL;
    835     AudioEffect *aec = NULL;
    836     if (mode == ECHO_SUPPRESSION) {
    837         if (mGroup->platformHasAec()) {
    838             aec = new AudioEffect(FX_IID_AEC,
    839                                     NULL,
    840                                     0,
    841                                     0,
    842                                     0,
    843                                     record->getSessionId(),
    844                                     record->getInput());
    845             status_t status = aec->initCheck();
    846             if (status == NO_ERROR || status == ALREADY_EXISTS) {
    847                 aec->setEnabled(true);
    848             } else {
    849                 delete aec;
    850                 aec = NULL;
    851             }
    852         }
    853         // Create local echo suppressor if platform AEC cannot be used.
    854         if (aec == NULL) {
    855              echo = new EchoSuppressor(sampleCount,
    856                                        (track->latency() + record->latency()) * sampleRate / 1000);
    857         }
    858     }
    859     // Start AudioRecord before AudioTrack. This prevents AudioTrack from being
    860     // disabled due to buffer underrun while waiting for AudioRecord.
    861     if (mode != MUTED) {
    862         record->start();
    863         int16_t one;
    864         // FIXME this may not work any more
    865         record->read(&one, sizeof(one));
    866     }
    867     track->start();
    868 
    869     while (!exitPending()) {
    870         int16_t output[sampleCount];
    871         if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {
    872             memset(output, 0, sizeof(output));
    873         }
    874 
    875         int16_t input[sampleCount];
    876         int toWrite = sampleCount;
    877         int toRead = (mode == MUTED) ? 0 : sampleCount;
    878         int chances = 100;
    879 
    880         while (--chances > 0 && (toWrite > 0 || toRead > 0)) {
    881             if (toWrite > 0) {
    882                 AudioTrack::Buffer buffer;
    883                 buffer.frameCount = toWrite;
    884 
    885                 status_t status = track->obtainBuffer(&buffer, 1);
    886                 if (status == NO_ERROR) {
    887                     int offset = sampleCount - toWrite;
    888                     memcpy(buffer.i8, &output[offset], buffer.size);
    889                     toWrite -= buffer.frameCount;
    890                     track->releaseBuffer(&buffer);
    891                 } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
    892                     ALOGE("cannot write to AudioTrack");
    893                     goto exit;
    894                 }
    895             }
    896 
    897             if (toRead > 0) {
    898                 AudioRecord::Buffer buffer;
    899                 buffer.frameCount = toRead;
    900 
    901                 status_t status = record->obtainBuffer(&buffer, 1);
    902                 if (status == NO_ERROR) {
    903                     int offset = sampleCount - toRead;
    904                     memcpy(&input[offset], buffer.i8, buffer.size);
    905                     toRead -= buffer.frameCount;
    906                     record->releaseBuffer(&buffer);
    907                 } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
    908                     ALOGE("cannot read from AudioRecord");
    909                     goto exit;
    910                 }
    911             }
    912         }
    913 
    914         if (chances <= 0) {
    915             ALOGW("device loop timeout");
    916             while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
    917         }
    918 
    919         if (mode != MUTED) {
    920             if (echo != NULL) {
    921                 ALOGV("echo->run()");
    922                 echo->run(output, input);
    923             }
    924             send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
    925         }
    926     }
    927 
    928 exit:
    929     delete echo;
    930     delete aec;
    931     return true;
    932 }
    933 
    934 //------------------------------------------------------------------------------
    935 
    936 static jfieldID gNative;
    937 static jfieldID gMode;
    938 
    939 jlong add(JNIEnv *env, jobject thiz, jint mode,
    940     jint socket, jstring jRemoteAddress, jint remotePort,
    941     jstring jCodecSpec, jint dtmfType)
    942 {
    943     AudioCodec *codec = NULL;
    944     AudioStream *stream = NULL;
    945     AudioGroup *group = NULL;
    946 
    947     // Sanity check.
    948     sockaddr_storage remote;
    949     if (parse(env, jRemoteAddress, remotePort, &remote) < 0) {
    950         // Exception already thrown.
    951         return 0;
    952     }
    953     if (!jCodecSpec) {
    954         jniThrowNullPointerException(env, "codecSpec");
    955         return 0;
    956     }
    957     const char *codecSpec = env->GetStringUTFChars(jCodecSpec, NULL);
    958     if (!codecSpec) {
    959         // Exception already thrown.
    960         return 0;
    961     }
    962     socket = dup(socket);
    963     if (socket == -1) {
    964         jniThrowException(env, "java/lang/IllegalStateException",
    965             "cannot get stream socket");
    966         return 0;
    967     }
    968 
    969     // Create audio codec.
    970     int codecType = -1;
    971     char codecName[16];
    972     int sampleRate = -1;
    973     sscanf(codecSpec, "%d %15[^/]%*c%d", &codecType, codecName, &sampleRate);
    974     codec = newAudioCodec(codecName);
    975     int sampleCount = (codec ? codec->set(sampleRate, codecSpec) : -1);
    976     env->ReleaseStringUTFChars(jCodecSpec, codecSpec);
    977     if (sampleCount <= 0) {
    978         jniThrowException(env, "java/lang/IllegalStateException",
    979             "cannot initialize audio codec");
    980         goto error;
    981     }
    982 
    983     // Create audio stream.
    984     stream = new AudioStream;
    985     if (!stream->set(mode, socket, &remote, codec, sampleRate, sampleCount,
    986         codecType, dtmfType)) {
    987         jniThrowException(env, "java/lang/IllegalStateException",
    988             "cannot initialize audio stream");
    989         goto error;
    990     }
    991     socket = -1;
    992     codec = NULL;
    993 
    994     // Create audio group.
    995     group = (AudioGroup *)env->GetLongField(thiz, gNative);
    996     if (!group) {
    997         int mode = env->GetIntField(thiz, gMode);
    998         group = new AudioGroup;
    999         if (!group->set(8000, 256) || !group->setMode(mode)) {
   1000             jniThrowException(env, "java/lang/IllegalStateException",
   1001                 "cannot initialize audio group");
   1002             goto error;
   1003         }
   1004     }
   1005 
   1006     // Add audio stream into audio group.
   1007     if (!group->add(stream)) {
   1008         jniThrowException(env, "java/lang/IllegalStateException",
   1009             "cannot add audio stream");
   1010         goto error;
   1011     }
   1012 
   1013     // Succeed.
   1014     env->SetLongField(thiz, gNative, (jlong)group);
   1015     return (jlong)stream;
   1016 
   1017 error:
   1018     delete group;
   1019     delete stream;
   1020     delete codec;
   1021     close(socket);
   1022     env->SetLongField(thiz, gNative, 0);
   1023     return 0;
   1024 }
   1025 
   1026 void remove(JNIEnv *env, jobject thiz, jlong stream)
   1027 {
   1028     AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
   1029     if (group) {
   1030         if (!stream || !group->remove((AudioStream *)stream)) {
   1031             delete group;
   1032             env->SetLongField(thiz, gNative, 0);
   1033         }
   1034     }
   1035 }
   1036 
   1037 void setMode(JNIEnv *env, jobject thiz, jint mode)
   1038 {
   1039     AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
   1040     if (group && !group->setMode(mode)) {
   1041         jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
   1042     }
   1043 }
   1044 
   1045 void sendDtmf(JNIEnv *env, jobject thiz, jint event)
   1046 {
   1047     AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
   1048     if (group && !group->sendDtmf(event)) {
   1049         jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
   1050     }
   1051 }
   1052 
   1053 JNINativeMethod gMethods[] = {
   1054     {"nativeAdd", "(IILjava/lang/String;ILjava/lang/String;I)J", (void *)add},
   1055     {"nativeRemove", "(J)V", (void *)remove},
   1056     {"nativeSetMode", "(I)V", (void *)setMode},
   1057     {"nativeSendDtmf", "(I)V", (void *)sendDtmf},
   1058 };
   1059 
   1060 } // namespace
   1061 
   1062 int registerAudioGroup(JNIEnv *env)
   1063 {
   1064     gRandom = open("/dev/urandom", O_RDONLY);
   1065     if (gRandom == -1) {
   1066         ALOGE("urandom: %s", strerror(errno));
   1067         return -1;
   1068     }
   1069 
   1070     jclass clazz;
   1071     if ((clazz = env->FindClass("android/net/rtp/AudioGroup")) == NULL ||
   1072         (gNative = env->GetFieldID(clazz, "mNative", "J")) == NULL ||
   1073         (gMode = env->GetFieldID(clazz, "mMode", "I")) == NULL ||
   1074         env->RegisterNatives(clazz, gMethods, NELEM(gMethods)) < 0) {
   1075         ALOGE("JNI registration failed");
   1076         return -1;
   1077     }
   1078     return 0;
   1079 }
   1080