Home | History | Annotate | Download | only in audio
      1 #!/usr/bin/python
      2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 
      7 import logging
      8 import numpy
      9 import os
     10 import re
     11 import subprocess
     12 import tempfile
     13 import threading
     14 import time
     15 
     16 from glob import glob
     17 from autotest_lib.client.bin import test, utils
     18 from autotest_lib.client.bin.input.input_device import *
     19 from autotest_lib.client.common_lib import error
     20 from autotest_lib.client.cros.audio import audio_data
     21 from autotest_lib.client.cros.audio import cmd_utils
     22 from autotest_lib.client.cros.audio import cras_utils
     23 from autotest_lib.client.cros.audio import sox_utils
     24 
     25 LD_LIBRARY_PATH = 'LD_LIBRARY_PATH'
     26 
     27 _AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics'
     28 
     29 _DEFAULT_NUM_CHANNELS = 2
     30 _DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat'
     31 _DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
     32 _DEFAULT_PLAYBACK_VOLUME = 100
     33 _DEFAULT_CAPTURE_GAIN = 2500
     34 _DEFAULT_ALSA_MAX_VOLUME = '100%'
     35 _DEFAULT_ALSA_CAPTURE_GAIN = '25dB'
     36 
     37 # Minimum RMS value to pass when checking recorded file.
     38 _DEFAULT_SOX_RMS_THRESHOLD = 0.08
     39 
     40 _JACK_VALUE_ON_RE = re.compile(r'.*values=on')
     41 _HP_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Headphone\sJack')
     42 _MIC_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Mic\sJack')
     43 
     44 _SOX_RMS_AMPLITUDE_RE = re.compile(r'RMS\s+amplitude:\s+(.+)')
     45 _SOX_ROUGH_FREQ_RE = re.compile(r'Rough\s+frequency:\s+(.+)')
     46 
     47 _AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected'
     48 _MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS'
     49 _REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS'
     50 
     51 # Tools from platform/audiotest
     52 AUDIOFUNTEST_PATH = 'audiofuntest'
     53 AUDIOLOOP_PATH = 'looptest'
     54 LOOPBACK_LATENCY_PATH = 'loopback_latency'
     55 SOX_PATH = 'sox'
     56 TEST_TONES_PATH = 'test_tones'
     57 
     58 _MINIMUM_NORM = 0.001
     59 _CORRELATION_INDEX_THRESHOLD = 0.999
     60 # The minimum difference of estimated frequencies between two sine waves.
     61 _FREQUENCY_DIFF_THRESHOLD = 20
     62 # The minimum RMS value of meaningful audio data.
     63 _MEANINGFUL_RMS_THRESHOLD = 0.001
     64 
     65 def set_mixer_controls(mixer_settings={}, card='0'):
     66     """Sets all mixer controls listed in the mixer settings on card.
     67 
     68     @param mixer_settings: Mixer settings to set.
     69     @param card: Index of audio card to set mixer settings for.
     70     """
     71     logging.info('Setting mixer control values on %s', card)
     72     for item in mixer_settings:
     73         logging.info('Setting %s to %s on card %s',
     74                      item['name'], item['value'], card)
     75         cmd = 'amixer -c %s cset name=%s %s'
     76         cmd = cmd % (card, item['name'], item['value'])
     77         try:
     78             utils.system(cmd)
     79         except error.CmdError:
     80             # A card is allowed not to support all the controls, so don't
     81             # fail the test here if we get an error.
     82             logging.info('amixer command failed: %s', cmd)
     83 
     84 def set_volume_levels(volume, capture):
     85     """Sets the volume and capture gain through cras_test_client.
     86 
     87     @param volume: The playback volume to set.
     88     @param capture: The capture gain to set.
     89     """
     90     logging.info('Setting volume level to %d', volume)
     91     try:
     92         utils.system('/usr/bin/cras_test_client --volume %d' % volume)
     93         logging.info('Setting capture gain to %d', capture)
     94         utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture)
     95         utils.system('/usr/bin/cras_test_client --dump_server_info')
     96         utils.system('/usr/bin/cras_test_client --mute 0')
     97     except error.CmdError as e:
     98         raise error.TestError(
     99                 '*** Can not tune volume through CRAS. *** (' + str(e) + ')')
    100 
    101     try:
    102         utils.system('amixer -c 0 contents')
    103     except error.CmdError as e:
    104         logging.info('amixer command failed: %s', str(e))
    105 
    106 def loopback_latency_check(**args):
    107     """Checks loopback latency.
    108 
    109     @param args: additional arguments for loopback_latency.
    110 
    111     @return A tuple containing measured and reported latency in uS.
    112         Return None if no audio detected.
    113     """
    114     noise_threshold = str(args['n']) if args.has_key('n') else '400'
    115 
    116     cmd = '%s -n %s -c' % (LOOPBACK_LATENCY_PATH, noise_threshold)
    117 
    118     output = utils.system_output(cmd, retain_output=True)
    119 
    120     # Sleep for a short while to make sure device is not busy anymore
    121     # after called loopback_latency.
    122     time.sleep(.1)
    123 
    124     measured_latency = None
    125     reported_latency = None
    126     for line in output.split('\n'):
    127         match = re.search(_MEASURED_LATENCY_RE, line, re.I)
    128         if match:
    129             measured_latency = int(match.group(1))
    130             continue
    131         match = re.search(_REPORTED_LATENCY_RE, line, re.I)
    132         if match:
    133             reported_latency = int(match.group(1))
    134             continue
    135         if re.search(_AUDIO_NOT_FOUND_RE, line, re.I):
    136             return None
    137     if measured_latency and reported_latency:
    138         return (measured_latency, reported_latency)
    139     else:
    140         # Should not reach here, just in case.
    141         return None
    142 
    143 def get_mixer_jack_status(jack_reg_exp):
    144     """Gets the mixer jack status.
    145 
    146     @param jack_reg_exp: The regular expression to match jack control name.
    147 
    148     @return None if the control does not exist, return True if jack control
    149         is detected plugged, return False otherwise.
    150     """
    151     output = utils.system_output('amixer -c0 controls', retain_output=True)
    152     numid = None
    153     for line in output.split('\n'):
    154         m = jack_reg_exp.match(line)
    155         if m:
    156             numid = m.group(1)
    157             break
    158 
    159     # Proceed only when matched numid is not empty.
    160     if numid:
    161         output = utils.system_output('amixer -c0 cget numid=%s' % numid)
    162         for line in output.split('\n'):
    163             if _JACK_VALUE_ON_RE.match(line):
    164                 return True
    165         return False
    166     else:
    167         return None
    168 
    169 def get_hp_jack_status():
    170     """Gets the status of headphone jack."""
    171     status = get_mixer_jack_status(_HP_JACK_CONTROL_RE)
    172     if status is not None:
    173         return status
    174 
    175     # When headphone jack is not found in amixer, lookup input devices
    176     # instead.
    177     #
    178     # TODO(hychao): Check hp/mic jack status dynamically from evdev. And
    179     # possibly replace the existing check using amixer.
    180     for evdev in glob('/dev/input/event*'):
    181         device = InputDevice(evdev)
    182         if device.is_hp_jack():
    183             return device.get_headphone_insert()
    184     else:
    185         return None
    186 
    187 def get_mic_jack_status():
    188     """Gets the status of mic jack."""
    189     status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE)
    190     if status is not None:
    191         return status
    192 
    193     # When mic jack is not found in amixer, lookup input devices instead.
    194     for evdev in glob('/dev/input/event*'):
    195         device = InputDevice(evdev)
    196         if device.is_mic_jack():
    197             return device.get_microphone_insert()
    198     else:
    199         return None
    200 
    201 def log_loopback_dongle_status():
    202     """Log the status of the loopback dongle to make sure it is equipped."""
    203     dongle_status_ok = True
    204 
    205     # Check Mic Jack
    206     mic_jack_status = get_mic_jack_status()
    207     logging.info('Mic jack status: %s', mic_jack_status)
    208     dongle_status_ok &= bool(mic_jack_status)
    209 
    210     # Check Headphone Jack
    211     hp_jack_status = get_hp_jack_status()
    212     logging.info('Headphone jack status: %s', hp_jack_status)
    213     dongle_status_ok &= bool(hp_jack_status)
    214 
    215     # Use latency check to test if audio can be captured through dongle.
    216     # We only want to know the basic function of dongle, so no need to
    217     # assert the latency accuracy here.
    218     latency = loopback_latency_check(n=4000)
    219     if latency:
    220         logging.info('Got latency measured %d, reported %d',
    221                 latency[0], latency[1])
    222     else:
    223         logging.info('Latency check fail.')
    224         dongle_status_ok = False
    225 
    226     logging.info('audio loopback dongle test: %s',
    227             'PASS' if dongle_status_ok else 'FAIL')
    228 
    229 # Functions to test audio palyback.
    230 def play_sound(duration_seconds=None, audio_file_path=None):
    231     """Plays a sound file found at |audio_file_path| for |duration_seconds|.
    232 
    233     If |audio_file_path|=None, plays a default audio file.
    234     If |duration_seconds|=None, plays audio file in its entirety.
    235 
    236     @param duration_seconds: Duration to play sound.
    237     @param audio_file_path: Path to the audio file.
    238     """
    239     if not audio_file_path:
    240         audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav'
    241     duration_arg = ('-d %d' % duration_seconds) if duration_seconds else ''
    242     utils.system('aplay %s %s' % (duration_arg, audio_file_path))
    243 
    244 def get_play_sine_args(channel, odev='default', freq=1000, duration=10,
    245                        sample_size=16):
    246     """Gets the command args to generate a sine wav to play to odev.
    247 
    248     @param channel: 0 for left, 1 for right; otherwize, mono.
    249     @param odev: alsa output device.
    250     @param freq: frequency of the generated sine tone.
    251     @param duration: duration of the generated sine tone.
    252     @param sample_size: output audio sample size. Default to 16.
    253     """
    254     cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa',
    255                odev, 'synth', str(duration)]
    256     if channel == 0:
    257         cmdargs += ['sine', str(freq), 'sine', '0']
    258     elif channel == 1:
    259         cmdargs += ['sine', '0', 'sine', str(freq)]
    260     else:
    261         cmdargs += ['sine', str(freq)]
    262 
    263     return cmdargs
    264 
    265 def play_sine(channel, odev='default', freq=1000, duration=10,
    266               sample_size=16):
    267     """Generates a sine wave and plays to odev.
    268 
    269     @param channel: 0 for left, 1 for right; otherwize, mono.
    270     @param odev: alsa output device.
    271     @param freq: frequency of the generated sine tone.
    272     @param duration: duration of the generated sine tone.
    273     @param sample_size: output audio sample size. Default to 16.
    274     """
    275     cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size)
    276     utils.system(' '.join(cmdargs))
    277 
    278 # Functions to compose customized sox command, execute it and process the
    279 # output of sox command.
    280 def get_sox_mixer_cmd(infile, channel,
    281                       num_channels=_DEFAULT_NUM_CHANNELS,
    282                       sox_format=_DEFAULT_SOX_FORMAT):
    283     """Gets sox mixer command to reduce channel.
    284 
    285     @param infile: Input file name.
    286     @param channel: The selected channel to take effect.
    287     @param num_channels: The number of total channels to test.
    288     @param sox_format: Format to generate sox command.
    289     """
    290     # Build up a pan value string for the sox command.
    291     if channel == 0:
    292         pan_values = '1'
    293     else:
    294         pan_values = '0'
    295     for pan_index in range(1, num_channels):
    296         if channel == pan_index:
    297             pan_values = '%s%s' % (pan_values, ',1')
    298         else:
    299             pan_values = '%s%s' % (pan_values, ',0')
    300 
    301     return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH,
    302             sox_format, infile, sox_format, pan_values)
    303 
    304 def sox_stat_output(infile, channel,
    305                     num_channels=_DEFAULT_NUM_CHANNELS,
    306                     sox_format=_DEFAULT_SOX_FORMAT):
    307     """Executes sox stat command.
    308 
    309     @param infile: Input file name.
    310     @param channel: The selected channel.
    311     @param num_channels: The number of total channels to test.
    312     @param sox_format: Format to generate sox command.
    313 
    314     @return The output of sox stat command
    315     """
    316     sox_mixer_cmd = get_sox_mixer_cmd(infile, channel,
    317                                       num_channels, sox_format)
    318     stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format)
    319     sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
    320     return utils.system_output(sox_cmd, retain_output=True)
    321 
    322 def get_audio_rms(sox_output):
    323     """Gets the audio RMS value from sox stat output
    324 
    325     @param sox_output: Output of sox stat command.
    326 
    327     @return The RMS value parsed from sox stat output.
    328     """
    329     for rms_line in sox_output.split('\n'):
    330         m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
    331         if m is not None:
    332             return float(m.group(1))
    333 
    334 def get_rough_freq(sox_output):
    335     """Gets the rough audio frequency from sox stat output
    336 
    337     @param sox_output: Output of sox stat command.
    338 
    339     @return The rough frequency value parsed from sox stat output.
    340     """
    341     for rms_line in sox_output.split('\n'):
    342         m = _SOX_ROUGH_FREQ_RE.match(rms_line)
    343         if m is not None:
    344             return int(m.group(1))
    345 
    346 def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD):
    347     """Checks if the calculated RMS value is expected.
    348 
    349     @param sox_output: The output from sox stat command.
    350     @param sox_threshold: The threshold to test RMS value against.
    351 
    352     @raises error.TestError if RMS amplitude can't be parsed.
    353     @raises error.TestFail if the RMS amplitude of the recording isn't above
    354             the threshold.
    355     """
    356     rms_val = get_audio_rms(sox_output)
    357 
    358     # In case we don't get a valid RMS value.
    359     if rms_val is None:
    360         raise error.TestError(
    361             'Failed to generate an audio RMS value from playback.')
    362 
    363     logging.info('Got audio RMS value of %f. Minimum pass is %f.',
    364                  rms_val, sox_threshold)
    365     if rms_val < sox_threshold:
    366         raise error.TestFail(
    367             'Audio RMS value %f too low. Minimum pass is %f.' %
    368             (rms_val, sox_threshold))
    369 
    370 def noise_reduce_file(in_file, noise_file, out_file,
    371                       sox_format=_DEFAULT_SOX_FORMAT):
    372     """Runs the sox command to reduce noise.
    373 
    374     Runs the sox command to noise-reduce in_file using the noise
    375     profile from noise_file.
    376 
    377     @param in_file: The file to noise reduce.
    378     @param noise_file: The file containing the noise profile.
    379         This can be created by recording silence.
    380     @param out_file: The file contains the noise reduced sound.
    381     @param sox_format: The  sox format to generate sox command.
    382     """
    383     prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH,
    384                sox_format, noise_file)
    385     reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
    386             (SOX_PATH, sox_format, in_file, sox_format, out_file))
    387     utils.system('%s | %s' % (prof_cmd, reduce_cmd))
    388 
    389 def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND):
    390     """Records a sample from the default input device.
    391 
    392     @param tmpfile: The file to record to.
    393     @param record_command: The command to record audio.
    394     """
    395     utils.system('%s %s' % (record_command, tmpfile))
    396 
    397 def create_wav_file(wav_dir, prefix=""):
    398     """Creates a unique name for wav file.
    399 
    400     The created file name will be preserved in autotest result directory
    401     for future analysis.
    402 
    403     @param wav_dir: The directory of created wav file.
    404     @param prefix: specified file name prefix.
    405     """
    406     filename = "%s-%s.wav" % (prefix, time.time())
    407     return os.path.join(wav_dir, filename)
    408 
    409 def run_in_parallel(*funs):
    410     """Runs methods in parallel.
    411 
    412     @param funs: methods to run.
    413     """
    414     threads = []
    415     for f in funs:
    416         t = threading.Thread(target=f)
    417         t.start()
    418         threads.append(t)
    419 
    420     for t in threads:
    421         t.join()
    422 
    423 def loopback_test_channels(noise_file_name, wav_dir,
    424                            playback_callback=None,
    425                            check_recorded_callback=check_audio_rms,
    426                            preserve_test_file=True,
    427                            num_channels = _DEFAULT_NUM_CHANNELS,
    428                            record_callback=record_sample,
    429                            mix_callback=None):
    430     """Tests loopback on all channels.
    431 
    432     @param noise_file_name: Name of the file contains pre-recorded noise.
    433     @param wav_dir: The directory of created wav file.
    434     @param playback_callback: The callback to do the playback for
    435         one channel.
    436     @param record_callback: The callback to do the recording.
    437     @param check_recorded_callback: The callback to check recorded file.
    438     @param preserve_test_file: Retain the recorded files for future debugging.
    439     @param num_channels: The number of total channels to test.
    440     @param mix_callback: The callback to do on the one-channel file.
    441     """
    442     for channel in xrange(num_channels):
    443         record_file_name = create_wav_file(wav_dir,
    444                                            "record-%d" % channel)
    445         functions = [lambda: record_callback(record_file_name)]
    446 
    447         if playback_callback:
    448             functions.append(lambda: playback_callback(channel))
    449 
    450         if mix_callback:
    451             mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel)
    452             functions.append(lambda: mix_callback(mix_file_name))
    453 
    454         run_in_parallel(*functions)
    455 
    456         if mix_callback:
    457             sox_output_mix = sox_stat_output(mix_file_name, channel)
    458             rms_val_mix = get_audio_rms(sox_output_mix)
    459             logging.info('Got mixed audio RMS value of %f.', rms_val_mix)
    460 
    461         sox_output_record = sox_stat_output(record_file_name, channel)
    462         rms_val_record = get_audio_rms(sox_output_record)
    463         logging.info('Got recorded audio RMS value of %f.', rms_val_record)
    464 
    465         reduced_file_name = create_wav_file(wav_dir,
    466                                             "reduced-%d" % channel)
    467         noise_reduce_file(record_file_name, noise_file_name,
    468                           reduced_file_name)
    469 
    470         sox_output_reduced = sox_stat_output(reduced_file_name, channel)
    471 
    472         if not preserve_test_file:
    473             os.unlink(reduced_file_name)
    474             os.unlink(record_file_name)
    475             if mix_callback:
    476                 os.unlink(mix_file_name)
    477 
    478         check_recorded_callback(sox_output_reduced)
    479 
    480 
    481 def get_channel_sox_stat(
    482         input_audio, channel_index, channels=2, bits=16, rate=48000):
    483     """Gets the sox stat info of the selected channel in the input audio file.
    484 
    485     @param input_audio: The input audio file to be analyzed.
    486     @param channel_index: The index of the channel to be analyzed.
    487                           (1 for the first channel).
    488     @param channels: The number of channels in the input audio.
    489     @param bits: The number of bits of each audio sample.
    490     @param rate: The sampling rate.
    491     """
    492     if channel_index <= 0 or channel_index > channels:
    493         raise ValueError('incorrect channel_indexi: %d' % channel_index)
    494 
    495     if channels == 1:
    496         return sox_utils.get_stat(
    497                 input_audio, channels=channels, bits=bits, rate=rate)
    498 
    499     p1 = cmd_utils.popen(
    500             sox_utils.extract_channel_cmd(
    501                     input_audio, '-', channel_index,
    502                     channels=channels, bits=bits, rate=rate),
    503             stdout=subprocess.PIPE)
    504     p2 = cmd_utils.popen(
    505             sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate),
    506             stdin=p1.stdout, stderr=subprocess.PIPE)
    507     stat_output = p2.stderr.read()
    508     cmd_utils.wait_and_check_returncode(p1, p2)
    509     return sox_utils.parse_stat_output(stat_output)
    510 
    511 
    512 def get_rms(input_audio, channels=1, bits=16, rate=48000):
    513     """Gets the RMS values of all channels of the input audio.
    514 
    515     @param input_audio: The input audio file to be checked.
    516     @param channels: The number of channels in the input audio.
    517     @param bits: The number of bits of each audio sample.
    518     @param rate: The sampling rate.
    519     """
    520     stats = [get_channel_sox_stat(
    521             input_audio, i + 1, channels=channels, bits=bits,
    522             rate=rate) for i in xrange(channels)]
    523 
    524     logging.info('sox stat: %s', [str(s) for s in stats])
    525     return [s.rms for s in stats]
    526 
    527 
    528 def reduce_noise_and_get_rms(
    529         input_audio, noise_file, channels=1, bits=16, rate=48000):
    530     """Reduces noise in the input audio by the given noise file and then gets
    531     the RMS values of all channels of the input audio.
    532 
    533     @param input_audio: The input audio file to be analyzed.
    534     @param noise_file: The noise file used to reduce noise in the input audio.
    535     @param channels: The number of channels in the input audio.
    536     @param bits: The number of bits of each audio sample.
    537     @param rate: The sampling rate.
    538     """
    539     with tempfile.NamedTemporaryFile() as reduced_file:
    540         p1 = cmd_utils.popen(
    541                 sox_utils.noise_profile_cmd(
    542                         noise_file, '-', channels=channels, bits=bits,
    543                         rate=rate),
    544                 stdout=subprocess.PIPE)
    545         p2 = cmd_utils.popen(
    546                 sox_utils.noise_reduce_cmd(
    547                         input_audio, reduced_file.name, '-',
    548                         channels=channels, bits=bits, rate=rate),
    549                 stdin=p1.stdout)
    550         cmd_utils.wait_and_check_returncode(p1, p2)
    551         return get_rms(reduced_file.name, channels, bits, rate)
    552 
    553 
    554 def cras_rms_test_setup():
    555     """Setups for the cras_rms_tests.
    556 
    557     To make sure the line_out-to-mic_in path is all green.
    558     """
    559     # TODO(owenlin): Now, the nodes are choosed by chrome.
    560     #                We should do it here.
    561     cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME)
    562     cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME)
    563 
    564     cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN)
    565 
    566     cras_utils.set_system_mute(False)
    567     cras_utils.set_capture_mute(False)
    568 
    569 
    570 def generate_rms_postmortem():
    571     """Generates postmortem for rms tests."""
    572     try:
    573         logging.info('audio postmortem report')
    574         log_loopback_dongle_status()
    575         logging.info(get_audio_diagnostics())
    576     except Exception:
    577         logging.exception('Error while generating postmortem report')
    578 
    579 
    580 def get_audio_diagnostics():
    581     """Gets audio diagnostic results.
    582 
    583     @returns: a string containing diagnostic results.
    584 
    585     """
    586     return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=subprocess.PIPE)
    587 
    588 
    589 def get_max_cross_correlation(signal_a, signal_b):
    590     """Gets max cross-correlation and best time delay of two signals.
    591 
    592     Computes cross-correlation function between two
    593     signals and gets the maximum value and time delay.
    594     The steps includes:
    595       1. Compute cross-correlation function of X and Y and get Cxy.
    596          The correlation function Cxy is an array where Cxy[k] is the
    597          cross product of X and Y when Y is delayed by k.
    598          Refer to manual of numpy.correlate for detail of correlation.
    599       2. Find the maximum value C_max and index C_index in Cxy.
    600       3. Compute L2 norm of X and Y to get norm(X) and norm(Y).
    601       4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation.
    602 
    603     Max cross-correlation indicates the similarity of X and Y. The value
    604     is 1 if X equals Y multiplied by a positive scalar.
    605     The value is -1 if X equals Y multiplied by a negative scaler.
    606     Any constant level shift will be regarded as distortion and will make
    607     max cross-correlation value deviated from 1.
    608     C_index is the best time delay of Y that make Y looks similar to X.
    609     Refer to http://en.wikipedia.org/wiki/Cross-correlation.
    610 
    611     @param signal_a: A list of numbers which contains the first signal.
    612     @param signal_b: A list of numbers which contains the second signal.
    613 
    614     @raises: ValueError if any number in signal_a or signal_b is not a float.
    615              ValueError if norm of any array is less than _MINIMUM_NORM.
    616 
    617     @returns: A tuple (correlation index, best delay). If there are more than
    618               one best delay, just return the first one.
    619     """
    620     def check_list_contains_float(numbers):
    621         """Checks the elements in a list are all float.
    622 
    623         @param numbers: A list of numbers.
    624 
    625         @raises: ValueError if there is any element which is not a float
    626                  in the list.
    627         """
    628         if any(not isinstance(x, float) for x in numbers):
    629             raise ValueError('List contains number which is not a float')
    630 
    631     check_list_contains_float(signal_a)
    632     check_list_contains_float(signal_b)
    633 
    634     norm_a = numpy.linalg.norm(signal_a)
    635     norm_b = numpy.linalg.norm(signal_b)
    636     logging.debug('norm_a: %f', norm_a)
    637     logging.debug('norm_b: %f', norm_b)
    638     if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM:
    639         raise ValueError('No meaningful data as norm is too small.')
    640 
    641     correlation = numpy.correlate(signal_a, signal_b, 'full')
    642     max_correlation = max(correlation)
    643     best_delays = [i for i, j in enumerate(correlation) if j == max_correlation]
    644     if len(best_delays) > 1:
    645         logging.warning('There are more than one best delay: %r', best_delays)
    646     return max_correlation / (norm_a * norm_b), best_delays[0]
    647 
    648 
    649 def trim_data(data, threshold=0):
    650     """Trims a data by removing value that is too small in head and tail.
    651 
    652     Removes elements in head and tail whose absolute value is smaller than
    653     or equal to threshold.
    654     E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) =
    655     ([0.2, 0.3, 0.2], 2)
    656 
    657     @param data: A list of numbers.
    658     @param threshold: The threshold to compare against.
    659 
    660     @returns: A tuple (trimmed_data, end_trimmed_length), where
    661               end_trimmed_length is the length of original data being trimmed
    662               from the end.
    663               Returns ([], None) if there is no valid data.
    664     """
    665     indice_valid = [
    666             i for i, j in enumerate(data) if abs(j) > threshold]
    667     if not indice_valid:
    668         logging.warning(
    669                 'There is no element with absolute value greater '
    670                 'than threshold %f', threshold)
    671         return [], None
    672     logging.debug('Start and end of indice_valid: %d, %d',
    673                   indice_valid[0], indice_valid[-1])
    674     end_trimmed_length = len(data) - indice_valid[-1] - 1
    675     logging.debug('Trimmed length in the end: %d', end_trimmed_length)
    676     return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length)
    677 
    678 
    679 def get_one_channel_correlation(test_data, golden_data):
    680     """Gets max cross-correlation of test_data and golden_data.
    681 
    682     Trims test data and compute the max cross-correlation against golden_data.
    683     Signal can be trimmed because those zero values in the head and tail of
    684     a signal will not affect correlation computation.
    685 
    686     @param test_data: A list containing the data to compare against golden data.
    687     @param golden_data: A list containing the golden data.
    688 
    689     @returns: A tuple (max cross-correlation, best_delay) if data is valid.
    690               Otherwise returns (None, None). Refer to docstring of
    691               get_max_cross_correlation.
    692     """
    693     trimmed_test_data, end_trimmed_length = trim_data(test_data)
    694 
    695     def to_float(samples):
    696       """Casts elements in the list to float.
    697 
    698       @param samples: A list of numbers.
    699 
    700       @returns: A list of original numbers casted to float.
    701       """
    702       samples_float = [float(x) for x in samples]
    703       return samples_float
    704 
    705     max_cross_correlation, best_delay =  get_max_cross_correlation(
    706             to_float(golden_data),
    707             to_float(trimmed_test_data))
    708 
    709     # The reason to add back the trimmed length in the end.
    710     # E.g.:
    711     # golden data:
    712     #
    713     # |-----------vvvv----------------|  vvvv is the signal of interest.
    714     #       a                 b
    715     #
    716     # test data:
    717     #
    718     # |---x----vvvv--------x----------------|  x is the place to trim.
    719     #   c   d         e            f
    720     #
    721     # trimmed test data:
    722     #
    723     # |----vvvv--------|
    724     #   d         e
    725     #
    726     # The first output of cross correlation computation :
    727     #
    728     #                  |-----------vvvv----------------|
    729     #                       a                 b
    730     #
    731     # |----vvvv--------|
    732     #   d         e
    733     #
    734     # The largest output of cross correlation computation happens at
    735     # delay a + e.
    736     #
    737     #                  |-----------vvvv----------------|
    738     #                       a                 b
    739     #
    740     #                         |----vvvv--------|
    741     #                           d         e
    742     #
    743     # Cross correlation starts computing by aligning the last sample
    744     # of the trimmed test data to the first sample of golden data.
    745     # The best delay calculated from trimmed test data and golden data
    746     # cross correlation is e + a. But the real best delay that should be
    747     # identical on two channel should be e + a + f.
    748     # So we need to add back the length being trimmed in the end.
    749 
    750     if max_cross_correlation:
    751         return max_cross_correlation, best_delay + end_trimmed_length
    752     else:
    753         return None, None
    754 
    755 
    756 def compare_one_channel_correlation(test_data, golden_data, parameters):
    757     """Compares two one-channel data by correlation.
    758 
    759     @param test_data: A list containing the data to compare against golden data.
    760     @param golden_data: A list containing the golden data.
    761     @param parameters: A dict containing parameters for method.
    762 
    763     @returns: A dict containing:
    764               index: The index of similarity where 1 means they are different
    765                   only by a positive scale.
    766               best_delay: The best delay of test data in relative to golden
    767                   data.
    768               equal: A bool containing comparing result.
    769     """
    770     if 'correlation_threshold' in parameters:
    771         threshold = parameters['correlation_threshold']
    772     else:
    773         threshold = _CORRELATION_INDEX_THRESHOLD
    774 
    775     result_dict = dict()
    776     max_cross_correlation, best_delay = get_one_channel_correlation(
    777             test_data, golden_data)
    778     result_dict['index'] = max_cross_correlation
    779     result_dict['best_delay'] = best_delay
    780     result_dict['equal'] = True if (
    781         max_cross_correlation and
    782         max_cross_correlation > threshold) else False
    783     logging.debug('result_dict: %r', result_dict)
    784     return result_dict
    785 
    786 
    787 def compare_data_correlation(golden_data_binary, golden_data_format,
    788                              test_data_binary, test_data_format,
    789                              channel_map, parameters=None):
    790     """Compares two raw data using correlation.
    791 
    792     @param golden_data_binary: The binary containing golden data.
    793     @param golden_data_format: The data format of golden data.
    794     @param test_data_binary: The binary containing test data.
    795     @param test_data_format: The data format of test data.
    796     @param channel_map: A list containing channel mapping.
    797                         E.g. [1, 0, None, None, None, None, None, None] means
    798                         channel 0 of test data should map to channel 1 of
    799                         golden data. Channel 1 of test data should map to
    800                         channel 0 of golden data. Channel 2 to 7 of test data
    801                         should be skipped.
    802     @param parameters: A dict containing parameters for method, if needed.
    803 
    804     @raises: NotImplementedError if file type is not raw.
    805              NotImplementedError if sampling rates of two data are not the same.
    806              error.TestFail if golden data and test data are not equal.
    807     """
    808     if parameters is None:
    809         parameters = dict()
    810 
    811     if (golden_data_format['file_type'] != 'raw' or
    812         test_data_format['file_type'] != 'raw'):
    813         raise NotImplementedError('Only support raw data in compare_data.')
    814     if (golden_data_format['rate'] != test_data_format['rate']):
    815         raise NotImplementedError(
    816                 'Only support comparing data with the same sampling rate')
    817     golden_data = audio_data.AudioRawData(
    818             binary=golden_data_binary,
    819             channel=golden_data_format['channel'],
    820             sample_format=golden_data_format['sample_format'])
    821     test_data = audio_data.AudioRawData(
    822             binary=test_data_binary,
    823             channel=test_data_format['channel'],
    824             sample_format=test_data_format['sample_format'])
    825     compare_results = []
    826     for test_channel, golden_channel in enumerate(channel_map):
    827         if golden_channel is None:
    828             logging.info('Skipped channel %d', test_channel)
    829             continue
    830         test_data_one_channel = test_data.channel_data[test_channel]
    831         golden_data_one_channel = golden_data.channel_data[golden_channel]
    832         result_dict = dict(test_channel=test_channel,
    833                            golden_channel=golden_channel)
    834         result_dict.update(
    835                 compare_one_channel_correlation(
    836                         test_data_one_channel, golden_data_one_channel,
    837                         parameters))
    838         compare_results.append(result_dict)
    839     logging.info('compare_results: %r', compare_results)
    840     for result in compare_results:
    841         if not result['equal']:
    842             error_msg = ('Failed on test channel %d and golden channel %d with '
    843                          'index %f') % (
    844                                  result['test_channel'],
    845                                  result['golden_channel'],
    846                                  result['index'])
    847             logging.error(error_msg)
    848             raise error.TestFail(error_msg)
    849     # Also checks best delay are exactly the same.
    850     best_delays = set([result['best_delay'] for result in compare_results])
    851     if len(best_delays) > 1:
    852         error_msg = 'There are more than one best delay: %s' % best_delays
    853         logging.error(error_msg)
    854         raise error.TestFail(error_msg)
    855 
    856 
    857 class _base_rms_test(test.test):
    858     """Base class for all rms_test """
    859 
    860     def postprocess(self):
    861         super(_base_rms_test, self).postprocess()
    862 
    863         # Sum up the number of failed constraints in each iteration
    864         if sum(len(x) for x in self.failed_constraints):
    865             generate_rms_postmortem()
    866 
    867 
    868 class chrome_rms_test(_base_rms_test):
    869     """Base test class for audio RMS test with Chrome.
    870 
    871     The chrome instance can be accessed by self.chrome.
    872     """
    873     def warmup(self):
    874         super(chrome_rms_test, self).warmup()
    875 
    876         # Not all client of this file using telemetry.
    877         # Just do the import here for those who really need it.
    878         from autotest_lib.client.common_lib.cros import chrome
    879 
    880         self.chrome = chrome.Chrome(init_network_controller=True)
    881 
    882         # The audio configuration could be changed when we
    883         # restart chrome.
    884         try:
    885             cras_rms_test_setup()
    886         except Exception:
    887             self.chrome.browser.Close()
    888             raise
    889 
    890 
    891     def cleanup(self, *args):
    892         try:
    893             self.chrome.close()
    894         finally:
    895             super(chrome_rms_test, self).cleanup()
    896 
    897 class cras_rms_test(_base_rms_test):
    898     """Base test class for CRAS audio RMS test."""
    899 
    900     def warmup(self):
    901         super(cras_rms_test, self).warmup()
    902         cras_rms_test_setup()
    903 
    904 
    905 class alsa_rms_test(_base_rms_test):
    906     """Base test class for ALSA audio RMS test.
    907 
    908     Note the warmup will take 10 seconds and the device cannot be used before it
    909     returns.
    910     """
    911     def warmup(self):
    912         super(alsa_rms_test, self).warmup()
    913 
    914         cras_rms_test_setup()
    915         # We need CRAS to initialize the volume and gain.
    916         cras_utils.playback(playback_file="/dev/zero", duration=1)
    917         # CRAS will release the device after 10 seconds.
    918         time.sleep(11)
    919