Home | History | Annotate | Download | only in tests
      1 #!/usr/bin/env python3
      2 #
      3 #   Copyright 2017 - The Android Open Source Project
      4 #
      5 #   Licensed under the Apache License, Version 2.0 (the "License");
      6 #   you may not use this file except in compliance with the License.
      7 #   You may obtain a copy of the License at
      8 #
      9 #       http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 #   Unless required by applicable law or agreed to in writing, software
     12 #   distributed under the License is distributed on an "AS IS" BASIS,
     13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 #   See the License for the specific language governing permissions and
     15 #   limitations under the License.
     16 
     17 import logging
     18 import numpy
     19 import os
     20 import unittest
     21 
     22 import acts.test_utils.audio_analysis_lib.audio_analysis as audio_analysis
     23 import acts.test_utils.audio_analysis_lib.audio_data as audio_data
     24 
     25 
     26 class SpectralAnalysisTest(unittest.TestCase):
     27     def setUp(self):
     28         """Uses the same seed to generate noise for each test."""
     29         numpy.random.seed(0)
     30 
     31     def dummy_peak_detection(self, array, window_size):
     32         """Detects peaks in an array in simple way.
     33 
     34         A point (i, array[i]) is a peak if array[i] is the maximum among
     35         array[i - half_window_size] to array[i + half_window_size].
     36         If array[i - half_window_size] to array[i + half_window_size] are all
     37         equal, then there is no peak in this window.
     38 
     39         Args:
     40             array: The input array to detect peaks in. Array is a list of
     41                 absolute values of the magnitude of transformed coefficient.
     42             window_size: The window to detect peaks.
     43 
     44         Returns:
     45             A list of tuples:
     46                 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2),
     47                 ...]
     48                 where the tuples are sorted by peak values.
     49 
     50         """
     51         half_window_size = window_size / 2
     52         length = len(array)
     53 
     54         def mid_is_peak(array, mid, left, right):
     55             """Checks if value at mid is the largest among left to right.
     56 
     57             Args:
     58                 array: A list of numbers.
     59                 mid: The mid index.
     60                 left: The left index.
     61                 rigth: The right index.
     62 
     63             Returns:
     64                 True if array[index] is the maximum among numbers in array
     65                     between index [left, right] inclusively.
     66 
     67             """
     68             value_mid = array[int(mid)]
     69             for index in range(int(left), int(right) + 1):
     70                 if index == mid:
     71                     continue
     72                 if array[index] >= value_mid:
     73                     return False
     74             return True
     75 
     76         results = []
     77         for mid in range(length):
     78             left = max(0, mid - half_window_size)
     79             right = min(length - 1, mid + half_window_size)
     80             if mid_is_peak(array, mid, left, right):
     81                 results.append((mid, array[int(mid)]))
     82 
     83         # Sort the peaks by values.
     84         return sorted(results, key=lambda x: x[1], reverse=True)
     85 
     86     def testPeakDetection(self):
     87         array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1]
     88         result = audio_analysis.peak_detection(array, 4)
     89         golden_answer = [(12, 5), (4, 4)]
     90         self.assertEqual(result, golden_answer)
     91 
     92     def testPeakDetectionLarge(self):
     93         array = numpy.random.uniform(0, 1, 1000000)
     94         window_size = 100
     95         logging.debug('Test large array using dummy peak detection')
     96         dummy_answer = self.dummy_peak_detection(array, window_size)
     97         logging.debug('Test large array using improved peak detection')
     98         improved_answer = audio_analysis.peak_detection(array, window_size)
     99         logging.debug('Compare the result')
    100         self.assertEqual(dummy_answer, improved_answer)
    101 
    102     def testSpectralAnalysis(self):
    103         rate = 48000
    104         length_in_secs = 0.5
    105         freq_1 = 490.0
    106         freq_2 = 60.0
    107         coeff_1 = 1
    108         coeff_2 = 0.3
    109         samples = length_in_secs * rate
    110         noise = numpy.random.standard_normal(int(samples)) * 0.005
    111         x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
    112         y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + coeff_2 *
    113              numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
    114         results = audio_analysis.spectral_analysis(y, rate)
    115         # Results should contains
    116         # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
    117         # frequency with coefficient 1, 60Hz is the second dominant frequency
    118         # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
    119         # around 0.1. The k constant is resulted from window function.
    120         logging.debug('Results: %s', results)
    121         self.assertTrue(abs(results[0][0] - freq_1) < 1)
    122         self.assertTrue(abs(results[1][0] - freq_2) < 1)
    123         self.assertTrue(
    124             abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)
    125 
    126     def testSpectralAnalysisRealData(self):
    127         """This unittest checks the spectral analysis works on real data."""
    128         file_path = os.path.join(
    129             os.path.dirname(__file__), 'test_data', '1k_2k.raw')
    130         binary = open(file_path, 'rb').read()
    131         data = audio_data.AudioRawData(binary, 2, 'S32_LE')
    132         saturate_value = audio_data.get_maximum_value_from_sample_format(
    133             'S32_LE')
    134         golden_frequency = [1000, 2000]
    135         for channel in [0, 1]:
    136             normalized_signal = audio_analysis.normalize_signal(
    137                 data.channel_data[channel], saturate_value)
    138             spectral = audio_analysis.spectral_analysis(normalized_signal,
    139                                                         48000, 0.02)
    140             logging.debug('channel %s: %s', channel, spectral)
    141             self.assertTrue(
    142                 abs(spectral[0][0] - golden_frequency[channel]) < 5,
    143                 'Dominant frequency is not correct')
    144 
    145     def testNotMeaningfulData(self):
    146         """Checks that sepectral analysis handles un-meaningful data."""
    147         rate = 48000
    148         length_in_secs = 0.5
    149         samples = length_in_secs * rate
    150         noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
    151         noise = numpy.random.standard_normal(int(samples)) * noise_amplitude
    152         results = audio_analysis.spectral_analysis(noise, rate)
    153         self.assertEqual([(0, 0)], results)
    154 
    155     def testEmptyData(self):
    156         """Checks that sepectral analysis rejects empty data."""
    157         with self.assertRaises(audio_analysis.EmptyDataError):
    158             results = audio_analysis.spectral_analysis([], 100)
    159 
    160 
    161 class NormalizeTest(unittest.TestCase):
    162     def testNormalize(self):
    163         y = [1, 2, 3, 4, 5]
    164         normalized_y = audio_analysis.normalize_signal(y, 10)
    165         expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
    166         for i in range(len(y)):
    167             self.assertEqual(expected[i], normalized_y[i])
    168 
    169 
    170 class AnomalyTest(unittest.TestCase):
    171     def setUp(self):
    172         """Creates a test signal of sine wave."""
    173         # Use the same seed for each test case.
    174         numpy.random.seed(0)
    175 
    176         self.block_size = 120
    177         self.rate = 48000
    178         self.freq = 440
    179         length_in_secs = 0.25
    180         self.samples = length_in_secs * self.rate
    181         x = numpy.linspace(0.0, (self.samples - 1) * 1.0 / self.rate,
    182                            self.samples)
    183         self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)
    184 
    185     def add_noise(self):
    186         """Add noise to the test signal."""
    187         noise_amplitude = 0.3
    188         noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
    189         self.y = self.y + noise
    190 
    191     def insert_anomaly(self):
    192         """Inserts an anomaly to the test signal.
    193 
    194         The anomaly self.anomaly_samples should be created before calling this
    195         method.
    196 
    197         """
    198         self.anomaly_start_secs = 0.1
    199         self.y = numpy.insert(self.y,
    200                               int(self.anomaly_start_secs * self.rate),
    201                               self.anomaly_samples)
    202 
    203     def generate_skip_anomaly(self):
    204         """Skips a section of test signal."""
    205         self.anomaly_start_secs = 0.1
    206         self.anomaly_duration_secs = 0.005
    207         anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
    208         anomaly_start_index = self.anomaly_start_secs * self.rate
    209         anomaly_append_index = anomaly_append_secs * self.rate
    210         self.y = numpy.append(self.y[:int(anomaly_start_index)],
    211                               self.y[int(anomaly_append_index):])
    212 
    213     def create_constant_anomaly(self, amplitude):
    214         """Creates an anomaly of constant samples.
    215 
    216         Args:
    217             amplitude: The amplitude of the constant samples.
    218 
    219         """
    220         self.anomaly_duration_secs = 0.005
    221         self.anomaly_samples = ([amplitude] *
    222                                 int(self.anomaly_duration_secs * self.rate))
    223 
    224     def run_analysis(self):
    225         """Runs the anomaly detection."""
    226         self.results = audio_analysis.anomaly_detection(
    227             self.y, self.rate, self.freq, self.block_size)
    228         logging.debug('Results: %s', self.results)
    229 
    230     def check_no_anomaly(self):
    231         """Verifies that there is no anomaly in detection result."""
    232         self.run_analysis()
    233         self.assertFalse(self.results)
    234 
    235     def check_anomaly(self):
    236         """Verifies that there is anomaly in detection result.
    237 
    238         The detection result should contain anomaly time stamps that are
    239         close to where anomaly was inserted. There can be multiple anomalies
    240         since the detection depends on the block size.
    241 
    242         """
    243         self.run_analysis()
    244         self.assertTrue(self.results)
    245         # Anomaly can be detected as long as the detection window of block size
    246         # overlaps with anomaly.
    247         expected_detected_range_secs = (
    248             self.anomaly_start_secs - float(self.block_size) / self.rate,
    249             self.anomaly_start_secs + self.anomaly_duration_secs)
    250         for detected_secs in self.results:
    251             self.assertTrue(detected_secs <= expected_detected_range_secs[1])
    252             self.assertTrue(detected_secs >= expected_detected_range_secs[0])
    253 
    254     def testGoodSignal(self):
    255         """Sine wave signal with no noise or anomaly."""
    256         self.check_no_anomaly()
    257 
    258     def testGoodSignalNoise(self):
    259         """Sine wave signal with noise."""
    260         self.add_noise()
    261         self.check_no_anomaly()
    262 
    263     def testZeroAnomaly(self):
    264         """Sine wave signal with no noise but with anomaly.
    265 
    266         This test case simulates underrun in digital data where there will be
    267         one block of samples with 0 amplitude.
    268 
    269         """
    270         self.create_constant_anomaly(0)
    271         self.insert_anomaly()
    272         self.check_anomaly()
    273 
    274     def testZeroAnomalyNoise(self):
    275         """Sine wave signal with noise and anomaly.
    276 
    277         This test case simulates underrun in analog data where there will be
    278         one block of samples with amplitudes close to 0.
    279 
    280         """
    281         self.create_constant_anomaly(0)
    282         self.insert_anomaly()
    283         self.add_noise()
    284         self.check_anomaly()
    285 
    286     def testLowConstantAnomaly(self):
    287         """Sine wave signal with low constant anomaly.
    288 
    289         The anomaly is one block of constant values.
    290 
    291         """
    292         self.create_constant_anomaly(0.05)
    293         self.insert_anomaly()
    294         self.check_anomaly()
    295 
    296     def testLowConstantAnomalyNoise(self):
    297         """Sine wave signal with low constant anomaly and noise.
    298 
    299         The anomaly is one block of constant values.
    300 
    301         """
    302         self.create_constant_anomaly(0.05)
    303         self.insert_anomaly()
    304         self.add_noise()
    305         self.check_anomaly()
    306 
    307     def testHighConstantAnomaly(self):
    308         """Sine wave signal with high constant anomaly.
    309 
    310         The anomaly is one block of constant values.
    311 
    312         """
    313         self.create_constant_anomaly(2)
    314         self.insert_anomaly()
    315         self.check_anomaly()
    316 
    317     def testHighConstantAnomalyNoise(self):
    318         """Sine wave signal with high constant anomaly and noise.
    319 
    320         The anomaly is one block of constant values.
    321 
    322         """
    323         self.create_constant_anomaly(2)
    324         self.insert_anomaly()
    325         self.add_noise()
    326         self.check_anomaly()
    327 
    328     def testSkippedAnomaly(self):
    329         """Sine wave signal with skipped anomaly.
    330 
    331         The anomaly simulates the symptom where a block is skipped.
    332 
    333         """
    334         self.generate_skip_anomaly()
    335         self.check_anomaly()
    336 
    337     def testSkippedAnomalyNoise(self):
    338         """Sine wave signal with skipped anomaly with noise.
    339 
    340         The anomaly simulates the symptom where a block is skipped.
    341 
    342         """
    343         self.generate_skip_anomaly()
    344         self.add_noise()
    345         self.check_anomaly()
    346 
    347     def testEmptyData(self):
    348         """Checks that anomaly detection rejects empty data."""
    349         self.y = []
    350         with self.assertRaises(audio_analysis.EmptyDataError):
    351             self.check_anomaly()
    352 
    353 
    354 if __name__ == '__main__':
    355     logging.basicConfig(
    356         level=logging.DEBUG,
    357         format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    358     unittest.main()
    359