Home | History | Annotate | Download | only in audio
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """This module provides utilities to do audio data analysis."""
      6 
      7 import logging
      8 import numpy
      9 import operator
     10 
     11 # Only peaks with coefficient greater than 0.01 of the first peak should be
     12 # considered. Note that this correspond to -40dB in the spectrum.
     13 DEFAULT_MIN_PEAK_RATIO = 0.01
     14 
     15 PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection.
     16 
     17 # The minimum RMS value of meaningful audio data.
     18 MEANINGFUL_RMS_THRESHOLD = 0.001
     19 
     20 class RMSTooSmallError(Exception):
     21     """Error when signal RMS is too small."""
     22     pass
     23 
     24 
     25 class EmptyDataError(Exception):
     26     """Error when signal is empty."""
     27 
     28 
     29 def normalize_signal(signal, saturate_value):
     30     """Normalizes the signal with respect to the saturate value.
     31 
     32     @param signal: A list for one-channel PCM data.
     33     @param saturate_value: The maximum value that the PCM data might be.
     34 
     35     @returns: A numpy array containing normalized signal. The normalized signal
     36               has value -1 and 1 when it saturates.
     37 
     38     """
     39     signal = numpy.array(signal)
     40     return signal / float(saturate_value)
     41 
     42 
     43 def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
     44                       peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
     45     """Gets the dominant frequencies by spectral analysis.
     46 
     47     @param signal: A list of numbers for one-channel PCM data. This should be
     48                    normalized to [-1, 1] so the function can check if signal RMS
     49                    is too small to be meaningful.
     50     @param rate: Sampling rate.
     51     @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the
     52                            peaks other than the greatest one should be
     53                            considered.
     54                            This is to ignore peaks that are too small compared
     55                            to the first peak peak_0.
     56     @param peak_window_size_hz: The window size in Hz to find the peaks.
     57                                 The minimum differences between found peaks will
     58                                 be half of this value.
     59 
     60     @returns: A list of tuples:
     61               [(peak_frequency_0, peak_coefficient_0),
     62                (peak_frequency_1, peak_coefficient_1),
     63                (peak_frequency_2, peak_coefficient_2), ...]
     64               where the tuples are sorted by coefficients.
     65               The last peak_coefficient will be no less than
     66               peak_coefficient * min_peak_ratio.
     67               If RMS is less than MEANINGFUL_RMS_THRESHOLD, returns [(0, 0)].
     68 
     69     """
     70     # Checks the signal is meaningful.
     71     if len(signal) == 0:
     72         raise EmptyDataError('Signal data is empty')
     73 
     74     signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
     75     logging.debug('signal RMS = %s', signal_rms)
     76 
     77     # If RMS is too small, set dominant frequency and coefficient to 0.
     78     if signal_rms < MEANINGFUL_RMS_THRESHOLD:
     79         logging.warning(
     80                 'RMS %s is too small to be meaningful. Set frequency to 0.',
     81                 signal_rms)
     82         return [(0, 0)]
     83 
     84     logging.debug('Doing spectral analysis ...')
     85 
     86     # First, pass signal through a window function to mitigate spectral leakage.
     87     y_conv_w = signal * numpy.hanning(len(signal))
     88 
     89     length = len(y_conv_w)
     90 
     91     # x_f is the frequency in Hz, y_f is the transformed coefficient.
     92     x_f = _rfft_freq(length, rate)
     93     y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
     94 
     95     # y_f is complex so consider its absolute value for magnitude.
     96     abs_y_f = numpy.abs(y_f)
     97     threshold = max(abs_y_f) * min_peak_ratio
     98 
     99     # Suppresses all coefficients that are below threshold.
    100     for i in xrange(len(abs_y_f)):
    101         if abs_y_f[i] < threshold:
    102             abs_y_f[i] = 0
    103 
    104     # Gets the peak detection window size in indice.
    105     # x_f[1] is the frequency difference per index.
    106     peak_window_size = int(peak_window_size_hz / x_f[1])
    107 
    108     # Detects peaks.
    109     peaks = peak_detection(abs_y_f, peak_window_size)
    110 
    111     # Transform back the peak location from index to frequency.
    112     results = []
    113     for index, value in peaks:
    114         results.append((x_f[index], value))
    115     return results
    116 
    117 
    118 def _rfft_freq(length, rate):
    119     """Gets the frequency at each index of real FFT.
    120 
    121     @param length: The window length of FFT.
    122     @param rate: Sampling rate.
    123 
    124     @returns: A numpy array containing frequency corresponding to
    125               numpy.fft.rfft result at each index.
    126 
    127     """
    128     # The difference in Hz between each index.
    129     val = rate / float(length)
    130     # Only care half of frequencies for FFT on real signal.
    131     result_length = length // 2 + 1
    132     return numpy.linspace(0, (result_length - 1) * val, result_length)
    133 
    134 
    135 def peak_detection(array, window_size):
    136     """Detects peaks in an array.
    137 
    138     A point (i, array[i]) is a peak if array[i] is the maximum among
    139     array[i - half_window_size] to array[i + half_window_size].
    140     If array[i - half_window_size] to array[i + half_window_size] are all equal,
    141     then there is no peak in this window.
    142     Note that we only consider peak with value greater than 0.
    143 
    144     @param window_size: The window to detect peaks.
    145 
    146     @returns: A list of tuples:
    147               [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
    148               where the tuples are sorted by peak values.
    149 
    150     """
    151     half_window_size = window_size / 2
    152     length = len(array)
    153 
    154     def mid_is_peak(array, mid, left, right):
    155         """Checks if value at mid is the largest among left to right in array.
    156 
    157         @param array: A list of numbers.
    158         @param mid: The mid index.
    159         @param left: The left index.
    160         @param rigth: The right index.
    161 
    162         @returns: A tuple (is_peak, next_candidate)
    163                   is_peak is True if array[index] is the maximum among numbers
    164                   in array between index [left, right] inclusively.
    165                   next_candidate is the index of next candidate for peak if
    166                   is_peak is False. It is the index of maximum value in
    167                   [mid + 1, right]. If is_peak is True, next_candidate is
    168                   right + 1.
    169 
    170         """
    171         value_mid = array[mid]
    172         is_peak = True
    173         next_peak_candidate_index = None
    174 
    175         # Check the left half window.
    176         for index in xrange(left, mid):
    177             if array[index] >= value_mid:
    178                 is_peak = False
    179                 break
    180 
    181         # Mid is at the end of array.
    182         if mid == right:
    183             return is_peak, right + 1
    184 
    185         # Check the right half window and also record next candidate.
    186         # Favor the larger index for next_peak_candidate_index.
    187         for index in xrange(right, mid, -1):
    188             if (next_peak_candidate_index is None or
    189                 array[index] > array[next_peak_candidate_index]):
    190                 next_peak_candidate_index = index
    191 
    192         if array[next_peak_candidate_index] >= value_mid:
    193             is_peak = False
    194 
    195         if is_peak:
    196             next_peak_candidate_index = right + 1
    197 
    198         return is_peak, next_peak_candidate_index
    199 
    200     results = []
    201     mid = 0
    202     next_candidate_idx = None
    203     while mid < length:
    204         left = max(0, mid - half_window_size)
    205         right = min(length - 1, mid + half_window_size)
    206 
    207         # Only consider value greater than 0.
    208         if array[mid] == 0:
    209             mid = mid + 1
    210             continue;
    211 
    212         is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right)
    213 
    214         if is_peak:
    215             results.append((mid, array[mid]))
    216 
    217         # Use the next candidate found in [mid + 1, right], or right + 1.
    218         mid = next_candidate_idx
    219 
    220     # Sort the peaks by values.
    221     return sorted(results, key=lambda x: x[1], reverse=True)
    222 
    223 
    224 # The default pattern mathing threshold. By experiment, this threshold
    225 # can tolerate normal noise of 0.3 amplitude when sine wave signal
    226 # amplitude is 1.
    227 PATTERN_MATCHING_THRESHOLD = 0.85
    228 
    229 # The default block size of pattern matching.
    230 ANOMALY_DETECTION_BLOCK_SIZE = 120
    231 
    232 def anomaly_detection(signal, rate, freq,
    233                       block_size=ANOMALY_DETECTION_BLOCK_SIZE,
    234                       threshold=PATTERN_MATCHING_THRESHOLD):
    235     """Detects anomaly in a sine wave signal.
    236 
    237     This method detects anomaly in a sine wave signal by matching
    238     patterns of each block.
    239     For each moving window of block in the test signal, checks if there
    240     is any block in golden signal that is similar to this block of test signal.
    241     If there is such a block in golden signal, then this block of test
    242     signal is matched and there is no anomaly in this block of test signal.
    243     If there is any block in test signal that is not matched, then this block
    244     covers an anomaly.
    245     The block of test signal starts from index 0, and proceeds in steps of
    246     half block size. The overlapping of test signal blocks makes sure there must
    247     be at least one block covering the transition from sine wave to anomaly.
    248 
    249     @param signal: A 1-D array-like object for 1-channel PCM data.
    250     @param rate: The sampling rate.
    251     @param freq: The expected frequency of signal.
    252     @param block_size: The block size in samples to detect anomaly.
    253     @param threshold: The threshold of correlation index to be judge as matched.
    254 
    255     @returns: A list containing detected anomaly time in seconds.
    256 
    257     """
    258     if len(signal) == 0:
    259         raise EmptyDataError('Signal data is empty')
    260 
    261     golden_y = _generate_golden_pattern(rate, freq, block_size)
    262 
    263     results = []
    264 
    265     for start in xrange(0, len(signal), block_size / 2):
    266         end = start + block_size
    267         test_signal = signal[start:end]
    268         matched = _moving_pattern_matching(golden_y, test_signal, threshold)
    269         if not matched:
    270             results.append(start)
    271 
    272     results = [float(x) / rate for x in results]
    273 
    274     return results
    275 
    276 
    277 def _generate_golden_pattern(rate, freq, block_size):
    278     """Generates a golden pattern of certain frequency.
    279 
    280     The golden pattern must cover all the possibilities of waveforms in a
    281     block. So, we need a golden pattern covering 1 period + 1 block size,
    282     such that the test block can start anywhere in a period, and extends
    283     a block size.
    284 
    285     |period |1 bk|
    286     |       |    |
    287      . .     . .
    288     .   .   .   .
    289          . .     .
    290 
    291     @param rate: The sampling rate.
    292     @param freq: The frequency of golden pattern.
    293     @param block_size: The block size in samples to detect anomaly.
    294 
    295     @returns: A 1-D array for golden pattern.
    296 
    297     """
    298     samples_in_a_period = int(rate / freq) + 1
    299     samples_in_golden_pattern = samples_in_a_period + block_size
    300     golden_x = numpy.linspace(
    301             0.0, (samples_in_golden_pattern - 1) * 1.0 / rate,
    302             samples_in_golden_pattern)
    303     golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
    304     return golden_y
    305 
    306 
    307 def _moving_pattern_matching(golden_signal, test_signal, threshold):
    308     """Checks if test_signal is similar to any block of golden_signal.
    309 
    310     Compares test signal with each block of golden signal by correlation
    311     index. If there is any block of golden signal that is similar to
    312     test signal, then it is matched.
    313 
    314     @param golden_signal: A 1-D array for golden signal.
    315     @param test_signal: A 1-D array for test signal.
    316     @param threshold: The threshold of correlation index to be judge as matched.
    317 
    318     @returns: True if there is a match. False otherwise.
    319 
    320     @raises: ValueError: if test signal is longer than golden signal.
    321 
    322     """
    323     if len(golden_signal) < len(test_signal):
    324         raise ValueError('Test signal is longer than golden signal')
    325 
    326     block_length = len(test_signal)
    327     number_of_movings = len(golden_signal) - block_length + 1
    328     correlation_indices = []
    329     for moving_index in xrange(number_of_movings):
    330         # Cuts one block of golden signal from start index.
    331         # The block length is the same as test signal.
    332         start = moving_index
    333         end = start + block_length
    334         golden_signal_block = golden_signal[start:end]
    335         try:
    336             correlation_index = _get_correlation_index(
    337                     golden_signal_block, test_signal)
    338         except TestSignalNormTooSmallError:
    339             logging.info('Caught one block of test signal that has no meaningful norm')
    340             return False
    341         correlation_indices.append(correlation_index)
    342 
    343     # Checks if the maximum correlation index is high enough.
    344     max_corr = max(correlation_indices)
    345     if max_corr < threshold:
    346         logging.debug('Got one unmatched block with max_corr: %s', max_corr)
    347         return False
    348     return True
    349 
    350 
    351 class GoldenSignalNormTooSmallError(Exception):
    352     """Exception when golden signal norm is too small."""
    353     pass
    354 
    355 
    356 class TestSignalNormTooSmallError(Exception):
    357     """Exception when test signal norm is too small."""
    358     pass
    359 
    360 
    361 _MINIMUM_SIGNAL_NORM = 0.001
    362 
    363 def _get_correlation_index(golden_signal, test_signal):
    364     """Computes correlation index of two signal of same length.
    365 
    366     @param golden_signal: An 1-D array-like object.
    367     @param test_signal: An 1-D array-like object.
    368 
    369     @raises: ValueError: if two signal have different lengths.
    370     @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small
    371     @raises: TestSignalNormTooSmallError: if test signal norm is too small.
    372 
    373     @returns: The correlation index.
    374     """
    375     if len(golden_signal) != len(test_signal):
    376         raise ValueError(
    377                 'Only accepts signal of same length: %s, %s' % (
    378                         len(golden_signal), len(test_signal)))
    379 
    380     norm_golden = numpy.linalg.norm(golden_signal)
    381     norm_test = numpy.linalg.norm(test_signal)
    382     if norm_golden <= _MINIMUM_SIGNAL_NORM:
    383         raise GoldenSignalNormTooSmallError(
    384                 'No meaningful data as norm is too small.')
    385     if norm_test <= _MINIMUM_SIGNAL_NORM:
    386         raise TestSignalNormTooSmallError(
    387                 'No meaningful data as norm is too small.')
    388 
    389     # The 'valid' cross correlation result of two signals of same length will
    390     # contain only one number.
    391     correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
    392     return correlation / (norm_golden * norm_test)
    393