Home | History | Annotate | Download | only in processing
      1 #!/usr/bin/python
      2 
      3 # Copyright (C) 2012 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #       http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 from consts import *
     18 import numpy as np
     19 import scipy as sp
     20 import scipy.fftpack as fft
     21 import matplotlib.pyplot as plt
     22 import sys
     23 sys.path.append(sys.path[0])
     24 import calc_delay
     25 
     26 # check if amplitude ratio of DUT / Host signal
     27 #  lies in the given error boundary
     28 # input: host record
     29 #        device record,
     30 #        sampling rate
     31 #        low frequency in Hz,
     32 #        high frequency in Hz,
     33 #        allowed error in negative side for pass in %,
     34 #        allowed error ih positive side for pass
     35 # output: min value in negative side, normalized to 1.0
     36 #         max value in positive side
     37 #         calculated amplittude ratio in magnitude (DUT / Host)
     38 
     39 def do_check_spectrum(hostData, DUTData, samplingRate, fLow, fHigh, margainLow, margainHigh):
     40     # reduce FFT resolution to have averaging effects
     41     N = 512 if (len(hostData) > 512) else len(hostData)
     42     iLow = N * fLow / samplingRate + 1 # 1 for DC
     43     if iLow > (N / 2 - 1):
     44         iLow = (N / 2 - 1)
     45     iHigh = N * fHigh / samplingRate + 1 # 1 for DC
     46     if iHigh > (N / 2 + 1):
     47         iHigh = N / 2 + 1
     48     print fLow, iLow, fHigh, iHigh, samplingRate
     49 
     50     Phh, freqs = plt.psd(hostData, NFFT=N, Fs=samplingRate, Fc=0, detrend=plt.mlab.detrend_none,\
     51         window=plt.mlab.window_hanning, noverlap=0, pad_to=None, sides='onesided',\
     52         scale_by_freq=False)
     53     Pdd, freqs = plt.psd(DUTData, NFFT=N, Fs=samplingRate, Fc=0, detrend=plt.mlab.detrend_none,\
     54         window=plt.mlab.window_hanning, noverlap=0, pad_to=None, sides='onesided',\
     55         scale_by_freq=False)
     56     print len(Phh), len(Pdd)
     57     print "Phh", abs(Phh[iLow:iHigh])
     58     print "Pdd", abs(Pdd[iLow:iHigh])
     59     amplitudeRatio = np.sqrt(abs(Pdd[iLow:iHigh]/Phh[iLow:iHigh]))
     60     ratioMean = np.mean(amplitudeRatio)
     61     amplitudeRatio = amplitudeRatio / ratioMean
     62     print "Normialized ratio", amplitudeRatio
     63     print "ratio mean for normalization", ratioMean
     64     positiveMax = abs(max(amplitudeRatio))
     65     negativeMin = abs(min(amplitudeRatio))
     66     passFail = True if (positiveMax < (margainHigh / 100.0 + 1.0)) and\
     67         ((1.0 - negativeMin) < margainLow / 100.0) else False
     68     RatioResult = np.zeros(len(amplitudeRatio), dtype=np.int16)
     69     for i in range(len(amplitudeRatio)):
     70         RatioResult[i] = amplitudeRatio[i] * 1024 # make fixed point
     71     print "positiveMax", positiveMax, "negativeMin", negativeMin
     72     return (passFail, negativeMin, positiveMax, RatioResult)
     73 
     74 def toMono(stereoData):
     75     n = len(stereoData)/2
     76     monoData = np.zeros(n)
     77     for i in range(n):
     78         monoData[i] = stereoData[2 * i]
     79     return monoData
     80 
     81 def check_spectrum(inputData, inputTypes):
     82     output = []
     83     outputData = []
     84     outputTypes = []
     85     # basic sanity check
     86     inputError = False
     87     if (inputTypes[0] != TYPE_MONO) and (inputTypes[0] != TYPE_STEREO):
     88         inputError = True
     89     if (inputTypes[1] != TYPE_MONO) and (inputTypes[1] != TYPE_STEREO):
     90         inputError = True
     91     if (inputTypes[2] != TYPE_I64):
     92         inputError = True
     93     if (inputTypes[3] != TYPE_I64):
     94         inputError = True
     95     if (inputTypes[4] != TYPE_I64):
     96         inputError = True
     97     if (inputTypes[5] != TYPE_DOUBLE):
     98         inputError = True
     99     if (inputTypes[6] != TYPE_DOUBLE):
    100         inputError = True
    101     if inputError:
    102         print "input error"
    103         output.append(RESULT_ERROR)
    104         output.append(outputData)
    105         output.append(outputTypes)
    106         return output
    107     hostData = inputData[0]
    108     if inputTypes[0] == TYPE_STEREO:
    109         hostData = toMono(hostData)
    110     dutData = inputData[1]
    111     if inputTypes[1] == TYPE_STEREO:
    112         dutData = toMono(dutData)
    113     samplingRate = inputData[2]
    114     fLow = inputData[3]
    115     fHigh = inputData[4]
    116     margainLow = inputData[5]
    117     margainHigh = inputData[6]
    118     delay = 0
    119     N = 0
    120     hostData_ = hostData
    121     dutData_ = dutData
    122     if len(hostData) > len(dutData):
    123         delay = calc_delay.calc_delay(hostData, dutData)
    124         N = len(dutData)
    125         hostData_ = hostData[delay:delay+N]
    126     if len(hostData) < len(dutData):
    127         delay = calc_delay.calc_delay(dutData, hostData)
    128         N = len(hostData)
    129         dutData_ = dutData[delay:delay+N]
    130 
    131     print "delay ", delay, "deviceRecording samples ", N
    132     (passFail, minError, maxError, TF) = do_check_spectrum(hostData_, dutData_,\
    133         samplingRate, fLow, fHigh, margainLow, margainHigh)
    134 
    135     if passFail:
    136         output.append(RESULT_PASS)
    137     else:
    138         output.append(RESULT_OK)
    139     outputData.append(minError)
    140     outputTypes.append(TYPE_DOUBLE)
    141     outputData.append(maxError)
    142     outputTypes.append(TYPE_DOUBLE)
    143     outputData.append(TF)
    144     outputTypes.append(TYPE_MONO)
    145     output.append(outputData)
    146     output.append(outputTypes)
    147     return output
    148 
    149 # test code
    150 if __name__=="__main__":
    151     sys.path.append(sys.path[0])
    152     mod = __import__("gen_random")
    153     peakAmpl = 10000
    154     durationInMSec = 1000
    155     samplingRate = 44100
    156     fLow = 500
    157     fHigh = 15000
    158     data = getattr(mod, "do_gen_random")(peakAmpl, durationInMSec, samplingRate, fHigh,\
    159         stereo=False)
    160     print len(data)
    161     (passFail, minVal, maxVal, ampRatio) = do_check_spectrum(data, data, samplingRate, fLow, fHigh,\
    162         1.0, 1.0)
    163     plt.plot(ampRatio)
    164     plt.show()
    165