Home | History | Annotate | Download | only in audio
      1 #!/usr/bin/python
      2 import logging
      3 import numpy
      4 import os
      5 import unittest
      6 
      7 import common
      8 from autotest_lib.client.cros.audio import audio_analysis
      9 from autotest_lib.client.cros.audio import audio_data
     10 
     11 class SpectralAnalysisTest(unittest.TestCase):
     12     def setUp(self):
     13         """Uses the same seed to generate noise for each test."""
     14         numpy.random.seed(0)
     15 
     16 
     17     def dummy_peak_detection(self, array, window_size):
     18         """Detects peaks in an array in simple way.
     19 
     20         A point (i, array[i]) is a peak if array[i] is the maximum among
     21         array[i - half_window_size] to array[i + half_window_size].
     22         If array[i - half_window_size] to array[i + half_window_size] are all
     23         equal, then there is no peak in this window.
     24 
     25         @param window_size: The window to detect peaks.
     26 
     27         @returns: A list of tuples:
     28                   [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2),
     29                    ...]
     30                   where the tuples are sorted by peak values.
     31 
     32         """
     33         half_window_size = window_size / 2
     34         length = len(array)
     35 
     36         def mid_is_peak(array, mid, left, right):
     37             """Checks if value at mid is the largest among left to right.
     38 
     39             @param array: A list of numbers.
     40             @param mid: The mid index.
     41             @param left: The left index.
     42             @param rigth: The right index.
     43 
     44             @returns: True if array[index] is the maximum among numbers in array
     45                       between index [left, right] inclusively.
     46 
     47             """
     48             value_mid = array[mid]
     49             for index in xrange(left, right + 1):
     50                 if index == mid:
     51                     continue
     52                 if array[index] >= value_mid:
     53                     return False
     54             return True
     55 
     56         results = []
     57         for mid in xrange(length):
     58             left = max(0, mid - half_window_size)
     59             right = min(length - 1, mid + half_window_size)
     60             if mid_is_peak(array, mid, left, right):
     61                 results.append((mid, array[mid]))
     62 
     63         # Sort the peaks by values.
     64         return sorted(results, key=lambda x: x[1], reverse=True)
     65 
     66 
     67     def testPeakDetection(self):
     68         array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1]
     69         result = audio_analysis.peak_detection(array, 4)
     70         golden_answer = [(12, 5), (4, 4)]
     71         self.assertEqual(result, golden_answer)
     72 
     73 
     74     def testPeakDetectionLarge(self):
     75         array = numpy.random.uniform(0, 1, 1000000)
     76         window_size = 100
     77         logging.debug('Test large array using dummy peak detection')
     78         dummy_answer = self.dummy_peak_detection(array, window_size)
     79         logging.debug('Test large array using improved peak detection')
     80         improved_answer = audio_analysis.peak_detection(array, window_size)
     81         logging.debug('Compare the result')
     82         self.assertEqual(dummy_answer, improved_answer)
     83 
     84 
     85     def testSpectralAnalysis(self):
     86         rate = 48000
     87         length_in_secs = 0.5
     88         freq_1 = 490.0
     89         freq_2 = 60.0
     90         coeff_1 = 1
     91         coeff_2 = 0.3
     92         samples = length_in_secs * rate
     93         noise = numpy.random.standard_normal(samples) * 0.005
     94         x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
     95         y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) +
     96              coeff_2 * numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
     97         results = audio_analysis.spectral_analysis(y, rate)
     98         # Results should contains
     99         # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
    100         # frequency with coefficient 1, 60Hz is the second dominant frequency
    101         # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
    102         # around 0.1. The k constant is resulted from window function.
    103         logging.debug('Results: %s', results)
    104         self.assertTrue(abs(results[0][0]-freq_1) < 1)
    105         self.assertTrue(abs(results[1][0]-freq_2) < 1)
    106         self.assertTrue(
    107                 abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)
    108 
    109 
    110     def testSpectralAnalysisRealData(self):
    111         """This unittest checks the spectral analysis works on real data."""
    112         file_path = os.path.join(
    113                 os.path.dirname(__file__), 'test_data', '1k_2k.raw')
    114         binary = open(file_path, 'r').read()
    115         data = audio_data.AudioRawData(binary, 2, 'S32_LE')
    116         saturate_value = audio_data.get_maximum_value_from_sample_format(
    117                 'S32_LE')
    118         golden_frequency = [1000, 2000]
    119         for channel in [0, 1]:
    120             normalized_signal = audio_analysis.normalize_signal(
    121                     data.channel_data[channel],saturate_value)
    122             spectral = audio_analysis.spectral_analysis(
    123                     normalized_signal, 48000, 0.02)
    124             logging.debug('channel %s: %s', channel, spectral)
    125             self.assertTrue(abs(spectral[0][0] - golden_frequency[channel]) < 5,
    126                             'Dominant frequency is not correct')
    127 
    128 
    129     def testNotMeaningfulData(self):
    130         """Checks that sepectral analysis handles un-meaningful data."""
    131         rate = 48000
    132         length_in_secs = 0.5
    133         samples = length_in_secs * rate
    134         noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
    135         noise = numpy.random.standard_normal(samples) * noise_amplitude
    136         results = audio_analysis.spectral_analysis(noise, rate)
    137         self.assertEqual([(0, 0)], results)
    138 
    139 
    140     def testEmptyData(self):
    141         """Checks that sepectral analysis rejects empty data."""
    142         with self.assertRaises(audio_analysis.EmptyDataError):
    143             results = audio_analysis.spectral_analysis([], 100)
    144 
    145 
    146 class NormalizeTest(unittest.TestCase):
    147     def testNormalize(self):
    148         y = [1, 2, 3, 4, 5]
    149         normalized_y = audio_analysis.normalize_signal(y, 10)
    150         expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
    151         for i in xrange(len(y)):
    152             self.assertEqual(expected[i], normalized_y[i])
    153 
    154 
    155 class AnomalyTest(unittest.TestCase):
    156     def setUp(self):
    157         """Creates a test signal of sine wave."""
    158         # Use the same seed for each test case.
    159         numpy.random.seed(0)
    160 
    161         self.block_size = 120
    162         self.rate = 48000
    163         self.freq = 440
    164         length_in_secs = 0.25
    165         self.samples = length_in_secs * self.rate
    166         x = numpy.linspace(
    167                 0.0, (self.samples - 1) * 1.0 / self.rate, self.samples)
    168         self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)
    169 
    170 
    171     def add_noise(self):
    172         """Add noise to the test signal."""
    173         noise_amplitude = 0.3
    174         noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
    175         self.y = self.y + noise
    176 
    177 
    178     def insert_anomaly(self):
    179         """Inserts an anomaly to the test signal.
    180 
    181         The anomaly self.anomaly_samples should be created before calling this
    182         method.
    183 
    184         """
    185         self.anomaly_start_secs = 0.1
    186         self.y = numpy.insert(self.y, int(self.anomaly_start_secs * self.rate),
    187                               self.anomaly_samples)
    188 
    189 
    190     def generate_skip_anomaly(self):
    191         """Skips a section of test signal."""
    192         self.anomaly_start_secs = 0.1
    193         self.anomaly_duration_secs = 0.005
    194         anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
    195         anomaly_start_index = self.anomaly_start_secs * self.rate
    196         anomaly_append_index = anomaly_append_secs * self.rate
    197         self.y = numpy.append(self.y[:anomaly_start_index], self.y[anomaly_append_index:])
    198 
    199 
    200     def create_constant_anomaly(self, amplitude):
    201         """Creates an anomaly of constant samples.
    202 
    203         @param amplitude: The amplitude of the constant samples.
    204 
    205         """
    206         self.anomaly_duration_secs = 0.005
    207         self.anomaly_samples = (
    208                 [amplitude] * int(self.anomaly_duration_secs * self.rate))
    209 
    210 
    211     def run_analysis(self):
    212         """Runs the anomaly detection."""
    213         self.results = audio_analysis.anomaly_detection(
    214                 self.y, self.rate, self.freq, self.block_size)
    215         logging.debug('Results: %s', self.results)
    216 
    217 
    218     def check_no_anomaly(self):
    219         """Verifies that there is no anomaly in detection result."""
    220         self.run_analysis()
    221         self.assertFalse(self.results)
    222 
    223 
    224     def check_anomaly(self):
    225         """Verifies that there is anomaly in detection result.
    226 
    227         The detection result should contain anomaly time stamps that are
    228         close to where anomaly was inserted. There can be multiple anomalies
    229         since the detection depends on the block size.
    230 
    231         """
    232         self.run_analysis()
    233         self.assertTrue(self.results)
    234         # Anomaly can be detected as long as the detection window of block size
    235         # overlaps with anomaly.
    236         expected_detected_range_secs = (
    237                 self.anomaly_start_secs - float(self.block_size) / self.rate,
    238                 self.anomaly_start_secs + self.anomaly_duration_secs)
    239         for detected_secs in self.results:
    240             self.assertTrue(detected_secs <= expected_detected_range_secs[1])
    241             self.assertTrue(detected_secs >= expected_detected_range_secs[0] )
    242 
    243 
    244     def testGoodSignal(self):
    245         """Sine wave signal with no noise or anomaly."""
    246         self.check_no_anomaly()
    247 
    248 
    249     def testGoodSignalNoise(self):
    250         """Sine wave signal with noise."""
    251         self.add_noise()
    252         self.check_no_anomaly()
    253 
    254 
    255     def testZeroAnomaly(self):
    256         """Sine wave signal with no noise but with anomaly.
    257 
    258         This test case simulates underrun in digital data where there will be
    259         one block of samples with 0 amplitude.
    260 
    261         """
    262         self.create_constant_anomaly(0)
    263         self.insert_anomaly()
    264         self.check_anomaly()
    265 
    266 
    267     def testZeroAnomalyNoise(self):
    268         """Sine wave signal with noise and anomaly.
    269 
    270         This test case simulates underrun in analog data where there will be
    271         one block of samples with amplitudes close to 0.
    272 
    273         """
    274         self.create_constant_anomaly(0)
    275         self.insert_anomaly()
    276         self.add_noise()
    277         self.check_anomaly()
    278 
    279 
    280     def testLowConstantAnomaly(self):
    281         """Sine wave signal with low constant anomaly.
    282 
    283         The anomaly is one block of constant values.
    284 
    285         """
    286         self.create_constant_anomaly(0.05)
    287         self.insert_anomaly()
    288         self.check_anomaly()
    289 
    290 
    291     def testLowConstantAnomalyNoise(self):
    292         """Sine wave signal with low constant anomaly and noise.
    293 
    294         The anomaly is one block of constant values.
    295 
    296         """
    297         self.create_constant_anomaly(0.05)
    298         self.insert_anomaly()
    299         self.add_noise()
    300         self.check_anomaly()
    301 
    302 
    303     def testHighConstantAnomaly(self):
    304         """Sine wave signal with high constant anomaly.
    305 
    306         The anomaly is one block of constant values.
    307 
    308         """
    309         self.create_constant_anomaly(2)
    310         self.insert_anomaly()
    311         self.check_anomaly()
    312 
    313 
    314     def testHighConstantAnomalyNoise(self):
    315         """Sine wave signal with high constant anomaly and noise.
    316 
    317         The anomaly is one block of constant values.
    318 
    319         """
    320         self.create_constant_anomaly(2)
    321         self.insert_anomaly()
    322         self.add_noise()
    323         self.check_anomaly()
    324 
    325 
    326     def testSkippedAnomaly(self):
    327         """Sine wave signal with skipped anomaly.
    328 
    329         The anomaly simulates the symptom where a block is skipped.
    330 
    331         """
    332         self.generate_skip_anomaly()
    333         self.check_anomaly()
    334 
    335 
    336     def testSkippedAnomalyNoise(self):
    337         """Sine wave signal with skipped anomaly with noise.
    338 
    339         The anomaly simulates the symptom where a block is skipped.
    340 
    341         """
    342         self.generate_skip_anomaly()
    343         self.add_noise()
    344         self.check_anomaly()
    345 
    346 
    347     def testEmptyData(self):
    348         """Checks that anomaly detection rejects empty data."""
    349         self.y = []
    350         with self.assertRaises(audio_analysis.EmptyDataError):
    351             self.check_anomaly()
    352 
    353 
    354 if __name__ == '__main__':
    355     logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    356     unittest.main()
    357