Home | History | Annotate | Download | only in rtp
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include <stdio.h>
     18 #include <stdint.h>
     19 #include <string.h>
     20 #include <errno.h>
     21 #include <fcntl.h>
     22 #include <sys/epoll.h>
     23 #include <sys/types.h>
     24 #include <sys/socket.h>
     25 #include <sys/stat.h>
     26 #include <sys/time.h>
     27 #include <time.h>
     28 #include <arpa/inet.h>
     29 #include <netinet/in.h>
     30 
     31 // #define LOG_NDEBUG 0
     32 #define LOG_TAG "AudioGroup"
     33 #include <cutils/atomic.h>
     34 #include <cutils/properties.h>
     35 #include <utils/Log.h>
     36 #include <utils/Errors.h>
     37 #include <utils/RefBase.h>
     38 #include <utils/threads.h>
     39 #include <utils/SystemClock.h>
     40 #include <media/AudioSystem.h>
     41 #include <media/AudioRecord.h>
     42 #include <media/AudioTrack.h>
     43 #include <media/mediarecorder.h>
     44 #include <media/AudioEffect.h>
     45 #include <system/audio_effects/effect_aec.h>
     46 #include <system/audio.h>
     47 
     48 #include <nativehelper/ScopedUtfChars.h>
     49 
     50 #include "jni.h"
     51 #include <nativehelper/JNIHelp.h>
     52 
     53 #include "AudioCodec.h"
     54 #include "EchoSuppressor.h"
     55 
     56 extern int parse(JNIEnv *env, jstring jAddress, int port, sockaddr_storage *ss);
     57 
     58 namespace {
     59 
     60 using namespace android;
     61 
     62 int gRandom = -1;
     63 
     64 // We use a circular array to implement jitter buffer. The simplest way is doing
     65 // a modulo operation on the index while accessing the array. However modulo can
     66 // be expensive on some platforms, such as ARM. Thus we round up the size of the
     67 // array to the nearest power of 2 and then use bitwise-and instead of modulo.
     68 // Currently we make it 2048ms long and assume packet interval is 50ms or less.
     69 // The first 100ms is the place where samples get mixed. The rest is the real
     70 // jitter buffer. For a stream at 8000Hz it takes 32 kilobytes. These numbers
     71 // are chosen by experiments and each of them can be adjusted as needed.
     72 
     73 // Originally a stream does not send packets when it is receive-only or there is
     74 // nothing to mix. However, this causes some problems with certain firewalls and
     75 // proxies. A firewall might remove a port mapping when there is no outgoing
     76 // packet for a preiod of time, and a proxy might wait for incoming packets from
     77 // both sides before start forwarding. To solve these problems, we send out a
     78 // silence packet on the stream for every second. It should be good enough to
     79 // keep the stream alive with relatively low resources.
     80 
     81 // Other notes:
     82 // + We use elapsedRealtime() to get the time. Since we use 32bit variables
     83 //   instead of 64bit ones, comparison must be done by subtraction.
     84 // + Sampling rate must be multiple of 1000Hz, and packet length must be in
     85 //   milliseconds. No floating points.
     86 // + If we cannot get enough CPU, we drop samples and simulate packet loss.
     87 // + Resampling is not done yet, so streams in one group must use the same rate.
     88 //   For the first release only 8000Hz is supported.
     89 
     90 #define BUFFER_SIZE     2048
     91 #define HISTORY_SIZE    100
     92 #define MEASURE_BASE    100
     93 #define MEASURE_PERIOD  5000
     94 #define DTMF_PERIOD     200
     95 
     96 class AudioStream
     97 {
     98 public:
     99     AudioStream();
    100     ~AudioStream();
    101     bool set(int mode, int socket, sockaddr_storage *remote,
    102         AudioCodec *codec, int sampleRate, int sampleCount,
    103         int codecType, int dtmfType);
    104 
    105     void sendDtmf(int event);
    106     bool mix(int32_t *output, int head, int tail, int sampleRate);
    107     void encode(int tick, AudioStream *chain);
    108     void decode(int tick);
    109 
    110 private:
    111     enum {
    112         NORMAL = 0,
    113         SEND_ONLY = 1,
    114         RECEIVE_ONLY = 2,
    115         LAST_MODE = 2,
    116     };
    117 
    118     int mMode;
    119     int mSocket;
    120     sockaddr_storage mRemote;
    121     AudioCodec *mCodec;
    122     uint32_t mCodecMagic;
    123     uint32_t mDtmfMagic;
    124     bool mFixRemote;
    125 
    126     int mTick;
    127     int mSampleRate;
    128     int mSampleCount;
    129     int mInterval;
    130     int mKeepAlive;
    131 
    132     int16_t *mBuffer;
    133     int mBufferMask;
    134     int mBufferHead;
    135     int mBufferTail;
    136     int mLatencyTimer;
    137     int mLatencyScore;
    138 
    139     uint16_t mSequence;
    140     uint32_t mTimestamp;
    141     uint32_t mSsrc;
    142 
    143     int mDtmfEvent;
    144     int mDtmfStart;
    145 
    146     AudioStream *mNext;
    147 
    148     friend class AudioGroup;
    149 };
    150 
    151 AudioStream::AudioStream()
    152 {
    153     mSocket = -1;
    154     mCodec = NULL;
    155     mBuffer = NULL;
    156     mNext = NULL;
    157 }
    158 
    159 AudioStream::~AudioStream()
    160 {
    161     close(mSocket);
    162     delete mCodec;
    163     delete [] mBuffer;
    164     ALOGD("stream[%d] is dead", mSocket);
    165 }
    166 
    167 bool AudioStream::set(int mode, int socket, sockaddr_storage *remote,
    168     AudioCodec *codec, int sampleRate, int sampleCount,
    169     int codecType, int dtmfType)
    170 {
    171     if (mode < 0 || mode > LAST_MODE) {
    172         return false;
    173     }
    174     mMode = mode;
    175 
    176     mCodecMagic = (0x8000 | codecType) << 16;
    177     mDtmfMagic = (dtmfType == -1) ? 0 : (0x8000 | dtmfType) << 16;
    178 
    179     mTick = elapsedRealtime();
    180     mSampleRate = sampleRate / 1000;
    181     mSampleCount = sampleCount;
    182     mInterval = mSampleCount / mSampleRate;
    183 
    184     // Allocate jitter buffer.
    185     for (mBufferMask = 8; mBufferMask < mSampleRate; mBufferMask <<= 1);
    186     mBufferMask *= BUFFER_SIZE;
    187     mBuffer = new int16_t[mBufferMask];
    188     --mBufferMask;
    189     mBufferHead = 0;
    190     mBufferTail = 0;
    191     mLatencyTimer = 0;
    192     mLatencyScore = 0;
    193 
    194     // Initialize random bits.
    195     read(gRandom, &mSequence, sizeof(mSequence));
    196     read(gRandom, &mTimestamp, sizeof(mTimestamp));
    197     read(gRandom, &mSsrc, sizeof(mSsrc));
    198 
    199     mDtmfEvent = -1;
    200     mDtmfStart = 0;
    201 
    202     // Only take over these things when succeeded.
    203     mSocket = socket;
    204     if (codec) {
    205         mRemote = *remote;
    206         mCodec = codec;
    207 
    208         // Here we should never get an private address, but some buggy proxy
    209         // servers do give us one. To solve this, we replace the address when
    210         // the first time we successfully decode an incoming packet.
    211         mFixRemote = false;
    212         if (remote->ss_family == AF_INET) {
    213             unsigned char *address =
    214                 (unsigned char *)&((sockaddr_in *)remote)->sin_addr;
    215             if (address[0] == 10 ||
    216                 (address[0] == 172 && (address[1] >> 4) == 1) ||
    217                 (address[0] == 192 && address[1] == 168)) {
    218                 mFixRemote = true;
    219             }
    220         }
    221     }
    222 
    223     ALOGD("stream[%d] is configured as %s %dkHz %dms mode %d", mSocket,
    224         (codec ? codec->name : "RAW"), mSampleRate, mInterval, mMode);
    225     return true;
    226 }
    227 
    228 void AudioStream::sendDtmf(int event)
    229 {
    230     if (mDtmfMagic != 0) {
    231         mDtmfEvent = event << 24;
    232         mDtmfStart = mTimestamp + mSampleCount;
    233     }
    234 }
    235 
    236 bool AudioStream::mix(int32_t *output, int head, int tail, int sampleRate)
    237 {
    238     if (mMode == SEND_ONLY) {
    239         return false;
    240     }
    241 
    242     if (head - mBufferHead < 0) {
    243         head = mBufferHead;
    244     }
    245     if (tail - mBufferTail > 0) {
    246         tail = mBufferTail;
    247     }
    248     if (tail - head <= 0) {
    249         return false;
    250     }
    251 
    252     head *= mSampleRate;
    253     tail *= mSampleRate;
    254 
    255     if (sampleRate == mSampleRate) {
    256         for (int i = head; i - tail < 0; ++i) {
    257             output[i - head] += mBuffer[i & mBufferMask];
    258         }
    259     } else {
    260         // TODO: implement resampling.
    261         return false;
    262     }
    263     return true;
    264 }
    265 
    266 void AudioStream::encode(int tick, AudioStream *chain)
    267 {
    268     if (tick - mTick >= mInterval) {
    269         // We just missed the train. Pretend that packets in between are lost.
    270         int skipped = (tick - mTick) / mInterval;
    271         mTick += skipped * mInterval;
    272         mSequence += skipped;
    273         mTimestamp += skipped * mSampleCount;
    274         ALOGV("stream[%d] skips %d packets", mSocket, skipped);
    275     }
    276 
    277     tick = mTick;
    278     mTick += mInterval;
    279     ++mSequence;
    280     mTimestamp += mSampleCount;
    281 
    282     // If there is an ongoing DTMF event, send it now.
    283     if (mMode != RECEIVE_ONLY && mDtmfEvent != -1) {
    284         int duration = mTimestamp - mDtmfStart;
    285         // Make sure duration is reasonable.
    286         if (duration >= 0 && duration < mSampleRate * DTMF_PERIOD) {
    287             duration += mSampleCount;
    288             int32_t buffer[4] = {
    289                 static_cast<int32_t>(htonl(mDtmfMagic | mSequence)),
    290                 static_cast<int32_t>(htonl(mDtmfStart)),
    291                 static_cast<int32_t>(mSsrc),
    292                 static_cast<int32_t>(htonl(mDtmfEvent | duration)),
    293             };
    294             if (duration >= mSampleRate * DTMF_PERIOD) {
    295                 buffer[3] |= htonl(1 << 23);
    296                 mDtmfEvent = -1;
    297             }
    298             sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,
    299                 (sockaddr *)&mRemote, sizeof(mRemote));
    300             return;
    301         }
    302         mDtmfEvent = -1;
    303     }
    304 
    305     int32_t buffer[mSampleCount + 3];
    306     bool data = false;
    307     if (mMode != RECEIVE_ONLY) {
    308         // Mix all other streams.
    309         memset(buffer, 0, sizeof(buffer));
    310         while (chain) {
    311             if (chain != this) {
    312                 data |= chain->mix(buffer, tick - mInterval, tick, mSampleRate);
    313             }
    314             chain = chain->mNext;
    315         }
    316     }
    317 
    318     int16_t samples[mSampleCount];
    319     if (data) {
    320         // Saturate into 16 bits.
    321         for (int i = 0; i < mSampleCount; ++i) {
    322             int32_t sample = buffer[i];
    323             if (sample < -32768) {
    324                 sample = -32768;
    325             }
    326             if (sample > 32767) {
    327                 sample = 32767;
    328             }
    329             samples[i] = sample;
    330         }
    331     } else {
    332         if ((mTick ^ mKeepAlive) >> 10 == 0) {
    333             return;
    334         }
    335         mKeepAlive = mTick;
    336         memset(samples, 0, sizeof(samples));
    337 
    338         if (mMode != RECEIVE_ONLY) {
    339             ALOGV("stream[%d] no data", mSocket);
    340         }
    341     }
    342 
    343     if (!mCodec) {
    344         // Special case for device stream.
    345         send(mSocket, samples, sizeof(samples), MSG_DONTWAIT);
    346         return;
    347     }
    348 
    349     // Cook the packet and send it out.
    350     buffer[0] = htonl(mCodecMagic | mSequence);
    351     buffer[1] = htonl(mTimestamp);
    352     buffer[2] = mSsrc;
    353     int length = mCodec->encode(&buffer[3], samples);
    354     if (length <= 0) {
    355         ALOGV("stream[%d] encoder error", mSocket);
    356         return;
    357     }
    358     sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,
    359         sizeof(mRemote));
    360 }
    361 
    362 void AudioStream::decode(int tick)
    363 {
    364     char c;
    365     if (mMode == SEND_ONLY) {
    366         recv(mSocket, &c, 1, MSG_DONTWAIT);
    367         return;
    368     }
    369 
    370     // Make sure mBufferHead and mBufferTail are reasonable.
    371     if ((unsigned int)(tick + BUFFER_SIZE - mBufferHead) > BUFFER_SIZE * 2) {
    372         mBufferHead = tick - HISTORY_SIZE;
    373         mBufferTail = mBufferHead;
    374     }
    375 
    376     if (tick - mBufferHead > HISTORY_SIZE) {
    377         // Throw away outdated samples.
    378         mBufferHead = tick - HISTORY_SIZE;
    379         if (mBufferTail - mBufferHead < 0) {
    380             mBufferTail = mBufferHead;
    381         }
    382     }
    383 
    384     // Adjust the jitter buffer if the latency keeps larger than the threshold
    385     // in the measurement period.
    386     int score = mBufferTail - tick - MEASURE_BASE;
    387     if (mLatencyScore > score || mLatencyScore <= 0) {
    388         mLatencyScore = score;
    389         mLatencyTimer = tick;
    390     } else if (tick - mLatencyTimer >= MEASURE_PERIOD) {
    391         ALOGV("stream[%d] reduces latency of %dms", mSocket, mLatencyScore);
    392         mBufferTail -= mLatencyScore;
    393         mLatencyScore = -1;
    394     }
    395 
    396     int count = (BUFFER_SIZE - (mBufferTail - mBufferHead)) * mSampleRate;
    397     if (count < mSampleCount) {
    398         // Buffer overflow. Drop the packet.
    399         ALOGV("stream[%d] buffer overflow", mSocket);
    400         recv(mSocket, &c, 1, MSG_DONTWAIT);
    401         return;
    402     }
    403 
    404     // Receive the packet and decode it.
    405     int16_t samples[count];
    406     if (!mCodec) {
    407         // Special case for device stream.
    408         count = recv(mSocket, samples, sizeof(samples),
    409             MSG_TRUNC | MSG_DONTWAIT) >> 1;
    410     } else {
    411         __attribute__((aligned(4))) uint8_t buffer[2048];
    412         sockaddr_storage remote;
    413         socklen_t addrlen = sizeof(remote);
    414 
    415         int length = recvfrom(mSocket, buffer, sizeof(buffer),
    416             MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &addrlen);
    417 
    418         // Do we need to check SSRC, sequence, and timestamp? They are not
    419         // reliable but at least they can be used to identify duplicates?
    420         if (length < 12 || length > (int)sizeof(buffer) ||
    421             (ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {
    422             ALOGV("stream[%d] malformed packet", mSocket);
    423             return;
    424         }
    425         int offset = 12 + ((buffer[0] & 0x0F) << 2);
    426         if ((buffer[0] & 0x10) != 0) {
    427             offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);
    428         }
    429         if ((buffer[0] & 0x20) != 0) {
    430             length -= buffer[length - 1];
    431         }
    432         length -= offset;
    433         if (length >= 0) {
    434             length = mCodec->decode(samples, count, &buffer[offset], length);
    435         }
    436         if (length > 0 && mFixRemote) {
    437             mRemote = remote;
    438             mFixRemote = false;
    439         }
    440         count = length;
    441     }
    442     if (count <= 0) {
    443         ALOGV("stream[%d] decoder error", mSocket);
    444         return;
    445     }
    446 
    447     if (tick - mBufferTail > 0) {
    448         // Buffer underrun. Reset the jitter buffer.
    449         ALOGV("stream[%d] buffer underrun", mSocket);
    450         if (mBufferTail - mBufferHead <= 0) {
    451             mBufferHead = tick + mInterval;
    452             mBufferTail = mBufferHead;
    453         } else {
    454             int tail = (tick + mInterval) * mSampleRate;
    455             for (int i = mBufferTail * mSampleRate; i - tail < 0; ++i) {
    456                 mBuffer[i & mBufferMask] = 0;
    457             }
    458             mBufferTail = tick + mInterval;
    459         }
    460     }
    461 
    462     // Append to the jitter buffer.
    463     int tail = mBufferTail * mSampleRate;
    464     for (int i = 0; i < count; ++i) {
    465         mBuffer[tail & mBufferMask] = samples[i];
    466         ++tail;
    467     }
    468     mBufferTail += mInterval;
    469 }
    470 
    471 //------------------------------------------------------------------------------
    472 
    473 class AudioGroup
    474 {
    475 public:
    476     explicit AudioGroup(const String16 &opPackageName);
    477     ~AudioGroup();
    478     bool set(int sampleRate, int sampleCount);
    479 
    480     bool setMode(int mode);
    481     bool sendDtmf(int event);
    482     bool add(AudioStream *stream);
    483     bool remove(AudioStream *stream);
    484     bool platformHasAec() { return mPlatformHasAec; }
    485 
    486 private:
    487     enum {
    488         ON_HOLD = 0,
    489         MUTED = 1,
    490         NORMAL = 2,
    491         ECHO_SUPPRESSION = 3,
    492         LAST_MODE = 3,
    493     };
    494 
    495     bool checkPlatformAec();
    496 
    497     AudioStream *mChain;
    498     int mEventQueue;
    499     volatile int mDtmfEvent;
    500 
    501     String16 mOpPackageName;
    502 
    503     int mMode;
    504     int mSampleRate;
    505     size_t mSampleCount;
    506     int mDeviceSocket;
    507     bool mPlatformHasAec;
    508 
    509     class NetworkThread : public Thread
    510     {
    511     public:
    512         explicit NetworkThread(AudioGroup *group) : Thread(false), mGroup(group) {}
    513 
    514         bool start()
    515         {
    516             if (run("Network", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
    517                 ALOGE("cannot start network thread");
    518                 return false;
    519             }
    520             return true;
    521         }
    522 
    523     private:
    524         AudioGroup *mGroup;
    525         bool threadLoop();
    526     };
    527     sp<NetworkThread> mNetworkThread;
    528 
    529     class DeviceThread : public Thread
    530     {
    531     public:
    532         explicit DeviceThread(AudioGroup *group) : Thread(false), mGroup(group) {}
    533 
    534         bool start()
    535         {
    536             if (run("Device", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
    537                 ALOGE("cannot start device thread");
    538                 return false;
    539             }
    540             return true;
    541         }
    542 
    543     private:
    544         AudioGroup *mGroup;
    545         bool threadLoop();
    546     };
    547     sp<DeviceThread> mDeviceThread;
    548 };
    549 
    550 AudioGroup::AudioGroup(const String16 &opPackageName)
    551 {
    552     mOpPackageName = opPackageName;
    553     mMode = ON_HOLD;
    554     mChain = NULL;
    555     mEventQueue = -1;
    556     mDtmfEvent = -1;
    557     mDeviceSocket = -1;
    558     mNetworkThread = new NetworkThread(this);
    559     mDeviceThread = new DeviceThread(this);
    560     mPlatformHasAec = checkPlatformAec();
    561 }
    562 
    563 AudioGroup::~AudioGroup()
    564 {
    565     mNetworkThread->requestExitAndWait();
    566     mDeviceThread->requestExitAndWait();
    567     close(mEventQueue);
    568     close(mDeviceSocket);
    569     while (mChain) {
    570         AudioStream *next = mChain->mNext;
    571         delete mChain;
    572         mChain = next;
    573     }
    574     ALOGD("group[%d] is dead", mDeviceSocket);
    575 }
    576 
    577 bool AudioGroup::set(int sampleRate, int sampleCount)
    578 {
    579     mEventQueue = epoll_create(2);
    580     if (mEventQueue == -1) {
    581         ALOGE("epoll_create: %s", strerror(errno));
    582         return false;
    583     }
    584 
    585     mSampleRate = sampleRate;
    586     mSampleCount = sampleCount;
    587 
    588     // Create device socket.
    589     int pair[2];
    590     if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {
    591         ALOGE("socketpair: %s", strerror(errno));
    592         return false;
    593     }
    594     mDeviceSocket = pair[0];
    595 
    596     // Create device stream.
    597     mChain = new AudioStream;
    598     if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,
    599         sampleRate, sampleCount, -1, -1)) {
    600         close(pair[1]);
    601         ALOGE("cannot initialize device stream");
    602         return false;
    603     }
    604 
    605     // Give device socket a reasonable timeout.
    606     timeval tv;
    607     tv.tv_sec = 0;
    608     tv.tv_usec = 1000 * sampleCount / sampleRate * 500;
    609     if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
    610         ALOGE("setsockopt: %s", strerror(errno));
    611         return false;
    612     }
    613 
    614     // Add device stream into event queue.
    615     epoll_event event;
    616     event.events = EPOLLIN;
    617     event.data.ptr = mChain;
    618     if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {
    619         ALOGE("epoll_ctl: %s", strerror(errno));
    620         return false;
    621     }
    622 
    623     // Anything else?
    624     ALOGD("stream[%d] joins group[%d]", pair[1], pair[0]);
    625     return true;
    626 }
    627 
    628 bool AudioGroup::setMode(int mode)
    629 {
    630     if (mode < 0 || mode > LAST_MODE) {
    631         return false;
    632     }
    633     // FIXME: temporary code to overcome echo and mic gain issues on herring and tuna boards.
    634     // Must be modified/removed when the root cause of the issue is fixed in the hardware or
    635     // driver
    636     char value[PROPERTY_VALUE_MAX];
    637     property_get("ro.product.board", value, "");
    638     if (mode == NORMAL &&
    639             (!strcmp(value, "herring") || !strcmp(value, "tuna"))) {
    640         mode = ECHO_SUPPRESSION;
    641     }
    642     if (mMode == mode) {
    643         return true;
    644     }
    645 
    646     mDeviceThread->requestExitAndWait();
    647     ALOGD("group[%d] switches from mode %d to %d", mDeviceSocket, mMode, mode);
    648     mMode = mode;
    649     return (mode == ON_HOLD) || mDeviceThread->start();
    650 }
    651 
    652 bool AudioGroup::sendDtmf(int event)
    653 {
    654     if (event < 0 || event > 15) {
    655         return false;
    656     }
    657 
    658     // DTMF is rarely used, so we try to make it as lightweight as possible.
    659     // Using volatile might be dodgy, but using a pipe or pthread primitives
    660     // or stop-set-restart threads seems too heavy. Will investigate later.
    661     timespec ts;
    662     ts.tv_sec = 0;
    663     ts.tv_nsec = 100000000;
    664     for (int i = 0; mDtmfEvent != -1 && i < 20; ++i) {
    665         nanosleep(&ts, NULL);
    666     }
    667     if (mDtmfEvent != -1) {
    668         return false;
    669     }
    670     mDtmfEvent = event;
    671     nanosleep(&ts, NULL);
    672     return true;
    673 }
    674 
    675 bool AudioGroup::add(AudioStream *stream)
    676 {
    677     mNetworkThread->requestExitAndWait();
    678 
    679     epoll_event event;
    680     event.events = EPOLLIN;
    681     event.data.ptr = stream;
    682     if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, stream->mSocket, &event)) {
    683         ALOGE("epoll_ctl: %s", strerror(errno));
    684         return false;
    685     }
    686 
    687     stream->mNext = mChain->mNext;
    688     mChain->mNext = stream;
    689     if (!mNetworkThread->start()) {
    690         // Only take over the stream when succeeded.
    691         mChain->mNext = stream->mNext;
    692         return false;
    693     }
    694 
    695     ALOGD("stream[%d] joins group[%d]", stream->mSocket, mDeviceSocket);
    696     return true;
    697 }
    698 
    699 bool AudioGroup::remove(AudioStream *stream)
    700 {
    701     mNetworkThread->requestExitAndWait();
    702 
    703     for (AudioStream *chain = mChain; chain->mNext; chain = chain->mNext) {
    704         if (chain->mNext == stream) {
    705             if (epoll_ctl(mEventQueue, EPOLL_CTL_DEL, stream->mSocket, NULL)) {
    706                 ALOGE("epoll_ctl: %s", strerror(errno));
    707                 return false;
    708             }
    709             chain->mNext = stream->mNext;
    710             ALOGD("stream[%d] leaves group[%d]", stream->mSocket, mDeviceSocket);
    711             delete stream;
    712             break;
    713         }
    714     }
    715 
    716     // Do not start network thread if there is only one stream.
    717     if (!mChain->mNext || !mNetworkThread->start()) {
    718         return false;
    719     }
    720     return true;
    721 }
    722 
    723 bool AudioGroup::NetworkThread::threadLoop()
    724 {
    725     AudioStream *chain = mGroup->mChain;
    726     int tick = elapsedRealtime();
    727     int deadline = tick + 10;
    728     int count = 0;
    729 
    730     for (AudioStream *stream = chain; stream; stream = stream->mNext) {
    731         if (tick - stream->mTick >= 0) {
    732             stream->encode(tick, chain);
    733         }
    734         if (deadline - stream->mTick > 0) {
    735             deadline = stream->mTick;
    736         }
    737         ++count;
    738     }
    739 
    740     int event = mGroup->mDtmfEvent;
    741     if (event != -1) {
    742         for (AudioStream *stream = chain; stream; stream = stream->mNext) {
    743             stream->sendDtmf(event);
    744         }
    745         mGroup->mDtmfEvent = -1;
    746     }
    747 
    748     deadline -= tick;
    749     if (deadline < 1) {
    750         deadline = 1;
    751     }
    752 
    753     epoll_event events[count];
    754     count = epoll_wait(mGroup->mEventQueue, events, count, deadline);
    755     if (count == -1) {
    756         ALOGE("epoll_wait: %s", strerror(errno));
    757         return false;
    758     }
    759     for (int i = 0; i < count; ++i) {
    760         ((AudioStream *)events[i].data.ptr)->decode(tick);
    761     }
    762 
    763     return true;
    764 }
    765 
    766 bool AudioGroup::checkPlatformAec()
    767 {
    768     effect_descriptor_t fxDesc;
    769     uint32_t numFx;
    770 
    771     if (AudioEffect::queryNumberEffects(&numFx) != NO_ERROR) {
    772         return false;
    773     }
    774     for (uint32_t i = 0; i < numFx; i++) {
    775         if (AudioEffect::queryEffect(i, &fxDesc) != NO_ERROR) {
    776             continue;
    777         }
    778         if (memcmp(&fxDesc.type, FX_IID_AEC, sizeof(effect_uuid_t)) == 0) {
    779             return true;
    780         }
    781     }
    782     return false;
    783 }
    784 
    785 bool AudioGroup::DeviceThread::threadLoop()
    786 {
    787     int mode = mGroup->mMode;
    788     int sampleRate = mGroup->mSampleRate;
    789     size_t sampleCount = mGroup->mSampleCount;
    790     int deviceSocket = mGroup->mDeviceSocket;
    791 
    792     // Find out the frame count for AudioTrack and AudioRecord.
    793     size_t output = 0;
    794     size_t input = 0;
    795     if (AudioTrack::getMinFrameCount(&output, AUDIO_STREAM_VOICE_CALL,
    796         sampleRate) != NO_ERROR || output <= 0 ||
    797         AudioRecord::getMinFrameCount(&input, sampleRate,
    798         AUDIO_FORMAT_PCM_16_BIT, AUDIO_CHANNEL_IN_MONO) != NO_ERROR || input <= 0) {
    799         ALOGE("cannot compute frame count");
    800         return false;
    801     }
    802     ALOGD("reported frame count: output %zu, input %zu", output, input);
    803 
    804     if (output < sampleCount * 2) {
    805         output = sampleCount * 2;
    806     }
    807     if (input < sampleCount * 2) {
    808         input = sampleCount * 2;
    809     }
    810     ALOGD("adjusted frame count: output %zu, input %zu", output, input);
    811 
    812     // Initialize AudioTrack and AudioRecord.
    813     sp<AudioTrack> track = new AudioTrack();
    814     sp<AudioRecord> record = new AudioRecord(mGroup->mOpPackageName);
    815     if (track->set(AUDIO_STREAM_VOICE_CALL, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
    816                 AUDIO_CHANNEL_OUT_MONO, output, AUDIO_OUTPUT_FLAG_NONE, NULL /*callback_t*/,
    817                 NULL /*user*/, 0 /*notificationFrames*/, 0 /*sharedBuffer*/,
    818                 false /*threadCanCallJava*/, AUDIO_SESSION_ALLOCATE,
    819                 AudioTrack::TRANSFER_OBTAIN) != NO_ERROR ||
    820             record->set(AUDIO_SOURCE_VOICE_COMMUNICATION, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
    821                 AUDIO_CHANNEL_IN_MONO, input, NULL /*callback_t*/, NULL /*user*/,
    822                 0 /*notificationFrames*/, false /*threadCanCallJava*/, AUDIO_SESSION_ALLOCATE,
    823                 AudioRecord::TRANSFER_OBTAIN) != NO_ERROR) {
    824         ALOGE("cannot initialize audio device");
    825         return false;
    826     }
    827     ALOGD("latency: output %d, input %d", track->latency(), record->latency());
    828 
    829     // Give device socket a reasonable buffer size.
    830     setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));
    831     setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));
    832 
    833     // Drain device socket.
    834     char c;
    835     while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
    836 
    837     // check if platform supports echo cancellation and do not active local echo suppression in
    838     // this case
    839     EchoSuppressor *echo = NULL;
    840     sp<AudioEffect> aec;
    841     if (mode == ECHO_SUPPRESSION) {
    842         if (mGroup->platformHasAec()) {
    843             aec = new AudioEffect(FX_IID_AEC,
    844                                     mGroup->mOpPackageName,
    845                                     NULL,
    846                                     0,
    847                                     0,
    848                                     0,
    849                                     record->getSessionId(),
    850                                     record->getInput());
    851             status_t status = aec->initCheck();
    852             if (status == NO_ERROR || status == ALREADY_EXISTS) {
    853                 aec->setEnabled(true);
    854             } else {
    855                 aec.clear();
    856             }
    857         }
    858         // Create local echo suppressor if platform AEC cannot be used.
    859         if (aec == 0) {
    860              echo = new EchoSuppressor(sampleCount,
    861                                        (track->latency() + record->latency()) * sampleRate / 1000);
    862         }
    863     }
    864     // Start AudioRecord before AudioTrack. This prevents AudioTrack from being
    865     // disabled due to buffer underrun while waiting for AudioRecord.
    866     if (mode != MUTED) {
    867         record->start();
    868         int16_t one;
    869         // FIXME this may not work any more
    870         record->read(&one, sizeof(one));
    871     }
    872     track->start();
    873 
    874     while (!exitPending()) {
    875         int16_t output[sampleCount];
    876         if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {
    877             memset(output, 0, sizeof(output));
    878         }
    879 
    880         int16_t input[sampleCount];
    881         int toWrite = sampleCount;
    882         int toRead = (mode == MUTED) ? 0 : sampleCount;
    883         int chances = 100;
    884 
    885         while (--chances > 0 && (toWrite > 0 || toRead > 0)) {
    886             if (toWrite > 0) {
    887                 AudioTrack::Buffer buffer;
    888                 buffer.frameCount = toWrite;
    889 
    890                 status_t status = track->obtainBuffer(&buffer, 1);
    891                 if (status == NO_ERROR) {
    892                     int offset = sampleCount - toWrite;
    893                     memcpy(buffer.i8, &output[offset], buffer.size);
    894                     toWrite -= buffer.frameCount;
    895                     track->releaseBuffer(&buffer);
    896                 } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
    897                     ALOGE("cannot write to AudioTrack");
    898                     goto exit;
    899                 }
    900             }
    901 
    902             if (toRead > 0) {
    903                 AudioRecord::Buffer buffer;
    904                 buffer.frameCount = toRead;
    905 
    906                 status_t status = record->obtainBuffer(&buffer, 1);
    907                 if (status == NO_ERROR) {
    908                     int offset = sampleCount - toRead;
    909                     memcpy(&input[offset], buffer.i8, buffer.size);
    910                     toRead -= buffer.frameCount;
    911                     record->releaseBuffer(&buffer);
    912                 } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
    913                     ALOGE("cannot read from AudioRecord");
    914                     goto exit;
    915                 }
    916             }
    917         }
    918 
    919         if (chances <= 0) {
    920             ALOGW("device loop timeout");
    921             while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
    922         }
    923 
    924         if (mode != MUTED) {
    925             if (echo != NULL) {
    926                 ALOGV("echo->run()");
    927                 echo->run(output, input);
    928             }
    929             send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
    930         }
    931     }
    932 
    933 exit:
    934     delete echo;
    935     return true;
    936 }
    937 
    938 //------------------------------------------------------------------------------
    939 
    940 static jfieldID gNative;
    941 static jfieldID gMode;
    942 
    943 jlong add(JNIEnv *env, jobject thiz, jint mode,
    944     jint socket, jstring jRemoteAddress, jint remotePort,
    945     jstring jCodecSpec, jint dtmfType, jstring opPackageNameStr)
    946 {
    947     AudioCodec *codec = NULL;
    948     AudioStream *stream = NULL;
    949     AudioGroup *group = NULL;
    950 
    951     // Sanity check.
    952     sockaddr_storage remote;
    953     if (parse(env, jRemoteAddress, remotePort, &remote) < 0) {
    954         // Exception already thrown.
    955         return 0;
    956     }
    957     if (!jCodecSpec) {
    958         jniThrowNullPointerException(env, "codecSpec");
    959         return 0;
    960     }
    961     const char *codecSpec = env->GetStringUTFChars(jCodecSpec, NULL);
    962     if (!codecSpec) {
    963         // Exception already thrown.
    964         return 0;
    965     }
    966     socket = dup(socket);
    967     if (socket == -1) {
    968         jniThrowException(env, "java/lang/IllegalStateException",
    969             "cannot get stream socket");
    970         return 0;
    971     }
    972 
    973     ScopedUtfChars opPackageName(env, opPackageNameStr);
    974 
    975     // Create audio codec.
    976     int codecType = -1;
    977     char codecName[16];
    978     int sampleRate = -1;
    979     sscanf(codecSpec, "%d %15[^/]%*c%d", &codecType, codecName, &sampleRate);
    980     codec = newAudioCodec(codecName);
    981     int sampleCount = (codec ? codec->set(sampleRate, codecSpec) : -1);
    982     env->ReleaseStringUTFChars(jCodecSpec, codecSpec);
    983     if (sampleCount <= 0) {
    984         jniThrowException(env, "java/lang/IllegalStateException",
    985             "cannot initialize audio codec");
    986         goto error;
    987     }
    988 
    989     // Create audio stream.
    990     stream = new AudioStream;
    991     if (!stream->set(mode, socket, &remote, codec, sampleRate, sampleCount,
    992         codecType, dtmfType)) {
    993         jniThrowException(env, "java/lang/IllegalStateException",
    994             "cannot initialize audio stream");
    995         goto error;
    996     }
    997     socket = -1;
    998     codec = NULL;
    999 
   1000     // Create audio group.
   1001     group = (AudioGroup *)env->GetLongField(thiz, gNative);
   1002     if (!group) {
   1003         int mode = env->GetIntField(thiz, gMode);
   1004         group = new AudioGroup(String16(opPackageName.c_str()));
   1005         if (!group->set(8000, 256) || !group->setMode(mode)) {
   1006             jniThrowException(env, "java/lang/IllegalStateException",
   1007                 "cannot initialize audio group");
   1008             goto error;
   1009         }
   1010     }
   1011 
   1012     // Add audio stream into audio group.
   1013     if (!group->add(stream)) {
   1014         jniThrowException(env, "java/lang/IllegalStateException",
   1015             "cannot add audio stream");
   1016         goto error;
   1017     }
   1018 
   1019     // Succeed.
   1020     env->SetLongField(thiz, gNative, (jlong)group);
   1021     return (jlong)stream;
   1022 
   1023 error:
   1024     delete group;
   1025     delete stream;
   1026     delete codec;
   1027     close(socket);
   1028     env->SetLongField(thiz, gNative, 0);
   1029     return 0;
   1030 }
   1031 
   1032 void remove(JNIEnv *env, jobject thiz, jlong stream)
   1033 {
   1034     AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
   1035     if (group) {
   1036         if (!stream || !group->remove((AudioStream *)stream)) {
   1037             delete group;
   1038             env->SetLongField(thiz, gNative, 0);
   1039         }
   1040     }
   1041 }
   1042 
   1043 void setMode(JNIEnv *env, jobject thiz, jint mode)
   1044 {
   1045     AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
   1046     if (group && !group->setMode(mode)) {
   1047         jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
   1048     }
   1049 }
   1050 
   1051 void sendDtmf(JNIEnv *env, jobject thiz, jint event)
   1052 {
   1053     AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
   1054     if (group && !group->sendDtmf(event)) {
   1055         jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
   1056     }
   1057 }
   1058 
   1059 JNINativeMethod gMethods[] = {
   1060     {"nativeAdd", "(IILjava/lang/String;ILjava/lang/String;ILjava/lang/String;)J", (void *)add},
   1061     {"nativeRemove", "(J)V", (void *)remove},
   1062     {"nativeSetMode", "(I)V", (void *)setMode},
   1063     {"nativeSendDtmf", "(I)V", (void *)sendDtmf},
   1064 };
   1065 
   1066 } // namespace
   1067 
   1068 int registerAudioGroup(JNIEnv *env)
   1069 {
   1070     gRandom = open("/dev/urandom", O_RDONLY);
   1071     if (gRandom == -1) {
   1072         ALOGE("urandom: %s", strerror(errno));
   1073         return -1;
   1074     }
   1075 
   1076     jclass clazz;
   1077     if ((clazz = env->FindClass("android/net/rtp/AudioGroup")) == NULL ||
   1078         (gNative = env->GetFieldID(clazz, "mNative", "J")) == NULL ||
   1079         (gMode = env->GetFieldID(clazz, "mMode", "I")) == NULL ||
   1080         env->RegisterNatives(clazz, gMethods, NELEM(gMethods)) < 0) {
   1081         ALOGE("JNI registration failed");
   1082         return -1;
   1083     }
   1084     return 0;
   1085 }
   1086