Home | History | Annotate | Download | only in audio
      1 #!/usr/bin/python
      2 import logging
      3 import numpy
      4 import unittest
      5 
      6 import common
      7 from autotest_lib.client.cros.audio import audio_analysis
      8 from autotest_lib.client.cros.audio import audio_data
      9 
     10 class SpectralAnalysisTest(unittest.TestCase):
     11     def setUp(self):
     12         """Uses the same seed to generate noise for each test."""
     13         numpy.random.seed(0)
     14 
     15 
     16     def testSpectralAnalysis(self):
     17         rate = 48000
     18         length_in_secs = 0.5
     19         freq_1 = 490.0
     20         freq_2 = 60.0
     21         coeff_1 = 1
     22         coeff_2 = 0.3
     23         samples = length_in_secs * rate
     24         noise = numpy.random.standard_normal(samples) * 0.005
     25         x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
     26         y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) +
     27              coeff_2 * numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
     28         results = audio_analysis.spectral_analysis(y, rate)
     29         # Results should contains
     30         # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
     31         # frequency with coefficient 1, 60Hz is the second dominant frequency
     32         # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
     33         # around 0.1. The k constant is resulted from window function.
     34         logging.debug('Results: %s', results)
     35         self.assertTrue(abs(results[0][0]-freq_1) < 1)
     36         self.assertTrue(abs(results[1][0]-freq_2) < 1)
     37         self.assertTrue(
     38                 abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)
     39 
     40 
     41     def testSpectralAnalysisRealData(self):
     42         """This unittest checks the spectral analysis works on real data."""
     43         binary = open('client/cros/audio/test_data/1k_2k.raw', 'r').read()
     44         data = audio_data.AudioRawData(binary, 2, 'S32_LE')
     45         saturate_value = audio_data.get_maximum_value_from_sample_format(
     46                 'S32_LE')
     47         golden_frequency = [1000, 2000]
     48         for channel in [0, 1]:
     49             normalized_signal = audio_analysis.normalize_signal(
     50                     data.channel_data[channel],saturate_value)
     51             spectral = audio_analysis.spectral_analysis(
     52                     normalized_signal, 48000, 0.02)
     53             logging.debug('channel %s: %s', channel, spectral)
     54             self.assertTrue(abs(spectral[0][0] - golden_frequency[channel]) < 5,
     55                             'Dominant frequency is not correct')
     56 
     57 
     58     def testNotMeaningfulData(self):
     59         """Checks that sepectral analysis rejects not meaningful data."""
     60         rate = 48000
     61         length_in_secs = 0.5
     62         samples = length_in_secs * rate
     63         noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
     64         noise = numpy.random.standard_normal(samples) * noise_amplitude
     65         with self.assertRaises(audio_analysis.RMSTooSmallError):
     66             results = audio_analysis.spectral_analysis(noise, rate)
     67 
     68 
     69     def testEmptyData(self):
     70         """Checks that sepectral analysis rejects empty data."""
     71         with self.assertRaises(audio_analysis.EmptyDataError):
     72             results = audio_analysis.spectral_analysis([], 100)
     73 
     74 
     75 class NormalizeTest(unittest.TestCase):
     76     def testNormalize(self):
     77         y = [1, 2, 3, 4, 5]
     78         normalized_y = audio_analysis.normalize_signal(y, 10)
     79         expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
     80         for i in xrange(len(y)):
     81             self.assertEqual(expected[i], normalized_y[i])
     82 
     83 
     84 class AnomalyTest(unittest.TestCase):
     85     def setUp(self):
     86         """Creates a test signal of sine wave."""
     87         # Use the same seed for each test case.
     88         numpy.random.seed(0)
     89 
     90         self.block_size = 120
     91         self.rate = 48000
     92         self.freq = 440
     93         length_in_secs = 0.25
     94         self.samples = length_in_secs * self.rate
     95         x = numpy.linspace(
     96                 0.0, (self.samples - 1) * 1.0 / self.rate, self.samples)
     97         self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)
     98 
     99 
    100     def add_noise(self):
    101         """Add noise to the test signal."""
    102         noise_amplitude = 0.3
    103         noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
    104         self.y = self.y + noise
    105 
    106 
    107     def insert_anomaly(self):
    108         """Inserts an anomaly to the test signal.
    109 
    110         The anomaly self.anomaly_samples should be created before calling this
    111         method.
    112 
    113         """
    114         self.anomaly_start_secs = 0.1
    115         self.y = numpy.insert(self.y, int(self.anomaly_start_secs * self.rate),
    116                               self.anomaly_samples)
    117 
    118 
    119     def generate_skip_anomaly(self):
    120         """Skips a section of test signal."""
    121         self.anomaly_start_secs = 0.1
    122         self.anomaly_duration_secs = 0.005
    123         anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
    124         anomaly_start_index = self.anomaly_start_secs * self.rate
    125         anomaly_append_index = anomaly_append_secs * self.rate
    126         self.y = numpy.append(self.y[:anomaly_start_index], self.y[anomaly_append_index:])
    127 
    128 
    129     def create_constant_anomaly(self, amplitude):
    130         """Creates an anomaly of constant samples.
    131 
    132         @param amplitude: The amplitude of the constant samples.
    133 
    134         """
    135         self.anomaly_duration_secs = 0.005
    136         self.anomaly_samples = (
    137                 [amplitude] * int(self.anomaly_duration_secs * self.rate))
    138 
    139 
    140     def run_analysis(self):
    141         """Runs the anomaly detection."""
    142         self.results = audio_analysis.anomaly_detection(
    143                 self.y, self.rate, self.freq, self.block_size)
    144         logging.debug('Results: %s', self.results)
    145 
    146 
    147     def check_no_anomaly(self):
    148         """Verifies that there is no anomaly in detection result."""
    149         self.run_analysis()
    150         self.assertFalse(self.results)
    151 
    152 
    153     def check_anomaly(self):
    154         """Verifies that there is anomaly in detection result.
    155 
    156         The detection result should contain anomaly time stamps that are
    157         close to where anomaly was inserted. There can be multiple anomalies
    158         since the detection depends on the block size.
    159 
    160         """
    161         self.run_analysis()
    162         self.assertTrue(self.results)
    163         # Anomaly can be detected as long as the detection window of block size
    164         # overlaps with anomaly.
    165         expected_detected_range_secs = (
    166                 self.anomaly_start_secs - float(self.block_size) / self.rate,
    167                 self.anomaly_start_secs + self.anomaly_duration_secs)
    168         for detected_secs in self.results:
    169             self.assertTrue(detected_secs <= expected_detected_range_secs[1])
    170             self.assertTrue(detected_secs >= expected_detected_range_secs[0] )
    171 
    172 
    173     def testGoodSignal(self):
    174         """Sine wave signal with no noise or anomaly."""
    175         self.check_no_anomaly()
    176 
    177 
    178     def testGoodSignalNoise(self):
    179         """Sine wave signal with noise."""
    180         self.add_noise()
    181         self.check_no_anomaly()
    182 
    183 
    184     def testZeroAnomaly(self):
    185         """Sine wave signal with no noise but with anomaly.
    186 
    187         This test case simulates underrun in digital data where there will be
    188         one block of samples with 0 amplitude.
    189 
    190         """
    191         self.create_constant_anomaly(0)
    192         self.insert_anomaly()
    193         self.check_anomaly()
    194 
    195 
    196     def testZeroAnomalyNoise(self):
    197         """Sine wave signal with noise and anomaly.
    198 
    199         This test case simulates underrun in analog data where there will be
    200         one block of samples with amplitudes close to 0.
    201 
    202         """
    203         self.create_constant_anomaly(0)
    204         self.insert_anomaly()
    205         self.add_noise()
    206         self.check_anomaly()
    207 
    208 
    209     def testLowConstantAnomaly(self):
    210         """Sine wave signal with low constant anomaly.
    211 
    212         The anomaly is one block of constant values.
    213 
    214         """
    215         self.create_constant_anomaly(0.05)
    216         self.insert_anomaly()
    217         self.check_anomaly()
    218 
    219 
    220     def testLowConstantAnomalyNoise(self):
    221         """Sine wave signal with low constant anomaly and noise.
    222 
    223         The anomaly is one block of constant values.
    224 
    225         """
    226         self.create_constant_anomaly(0.05)
    227         self.insert_anomaly()
    228         self.add_noise()
    229         self.check_anomaly()
    230 
    231 
    232     def testHighConstantAnomaly(self):
    233         """Sine wave signal with high constant anomaly.
    234 
    235         The anomaly is one block of constant values.
    236 
    237         """
    238         self.create_constant_anomaly(2)
    239         self.insert_anomaly()
    240         self.check_anomaly()
    241 
    242 
    243     def testHighConstantAnomalyNoise(self):
    244         """Sine wave signal with high constant anomaly and noise.
    245 
    246         The anomaly is one block of constant values.
    247 
    248         """
    249         self.create_constant_anomaly(2)
    250         self.insert_anomaly()
    251         self.add_noise()
    252         self.check_anomaly()
    253 
    254 
    255     def testSkippedAnomaly(self):
    256         """Sine wave signal with skipped anomaly.
    257 
    258         The anomaly simulates the symptom where a block is skipped.
    259 
    260         """
    261         self.generate_skip_anomaly()
    262         self.check_anomaly()
    263 
    264 
    265     def testSkippedAnomalyNoise(self):
    266         """Sine wave signal with skipped anomaly with noise.
    267 
    268         The anomaly simulates the symptom where a block is skipped.
    269 
    270         """
    271         self.generate_skip_anomaly()
    272         self.add_noise()
    273         self.check_anomaly()
    274 
    275 
    276     def testEmptyData(self):
    277         """Checks that anomaly detection rejects empty data."""
    278         self.y = []
    279         with self.assertRaises(audio_analysis.EmptyDataError):
    280             self.check_anomaly()
    281 
    282 
    283 if __name__ == '__main__':
    284     logging.basicConfig(level=logging.DEBUG)
    285     unittest.main()
    286