Home | History | Annotate | Download | only in audio
      1 #!/usr/bin/python
      2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 
      7 import logging
      8 import numpy
      9 import os
     10 import re
     11 import tempfile
     12 import threading
     13 import time
     14 
     15 from glob import glob
     16 from autotest_lib.client.bin import test, utils
     17 from autotest_lib.client.bin.input.input_device import *
     18 from autotest_lib.client.common_lib import error
     19 from autotest_lib.client.cros.audio import alsa_utils
     20 from autotest_lib.client.cros.audio import audio_data
     21 from autotest_lib.client.cros.audio import cmd_utils
     22 from autotest_lib.client.cros.audio import cras_utils
     23 from autotest_lib.client.cros.audio import sox_utils
     24 
     25 LD_LIBRARY_PATH = 'LD_LIBRARY_PATH'
     26 
     27 _AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics'
     28 
     29 _DEFAULT_NUM_CHANNELS = 2
     30 _DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat'
     31 _DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
     32 _DEFAULT_PLAYBACK_VOLUME = 100
     33 _DEFAULT_CAPTURE_GAIN = 2500
     34 _DEFAULT_ALSA_MAX_VOLUME = '100%'
     35 _DEFAULT_ALSA_CAPTURE_GAIN = '25dB'
     36 
     37 # Minimum RMS value to pass when checking recorded file.
     38 _DEFAULT_SOX_RMS_THRESHOLD = 0.08
     39 
     40 _JACK_VALUE_ON_RE = re.compile('.*values=on')
     41 _HP_JACK_CONTROL_RE = re.compile('numid=(\d+).*Headphone\sJack')
     42 _MIC_JACK_CONTROL_RE = re.compile('numid=(\d+).*Mic\sJack')
     43 
     44 _SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)')
     45 _SOX_ROUGH_FREQ_RE = re.compile('Rough\s+frequency:\s+(.+)')
     46 
     47 _AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected'
     48 _MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS'
     49 _REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS'
     50 
     51 # Tools from platform/audiotest
     52 AUDIOFUNTEST_PATH = 'audiofuntest'
     53 AUDIOLOOP_PATH = 'looptest'
     54 LOOPBACK_LATENCY_PATH = 'loopback_latency'
     55 SOX_PATH = 'sox'
     56 TEST_TONES_PATH = 'test_tones'
     57 
     58 _MINIMUM_NORM = 0.001
     59 _CORRELATION_INDEX_THRESHOLD = 0.999
     60 # The minimum difference of estimated frequencies between two sine waves.
     61 _FREQUENCY_DIFF_THRESHOLD = 20
     62 # The minimum RMS value of meaningful audio data.
     63 _MEANINGFUL_RMS_THRESHOLD = 0.001
     64 
     65 def set_mixer_controls(mixer_settings={}, card='0'):
     66     """Sets all mixer controls listed in the mixer settings on card.
     67 
     68     @param mixer_settings: Mixer settings to set.
     69     @param card: Index of audio card to set mixer settings for.
     70     """
     71     logging.info('Setting mixer control values on %s', card)
     72     for item in mixer_settings:
     73         logging.info('Setting %s to %s on card %s',
     74                      item['name'], item['value'], card)
     75         cmd = 'amixer -c %s cset name=%s %s'
     76         cmd = cmd % (card, item['name'], item['value'])
     77         try:
     78             utils.system(cmd)
     79         except error.CmdError:
     80             # A card is allowed not to support all the controls, so don't
     81             # fail the test here if we get an error.
     82             logging.info('amixer command failed: %s', cmd)
     83 
     84 def set_volume_levels(volume, capture):
     85     """Sets the volume and capture gain through cras_test_client.
     86 
     87     @param volume: The playback volume to set.
     88     @param capture: The capture gain to set.
     89     """
     90     logging.info('Setting volume level to %d', volume)
     91     utils.system('/usr/bin/cras_test_client --volume %d' % volume)
     92     logging.info('Setting capture gain to %d', capture)
     93     utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture)
     94     utils.system('/usr/bin/cras_test_client --dump_server_info')
     95     utils.system('/usr/bin/cras_test_client --mute 0')
     96     utils.system('amixer -c 0 contents')
     97 
     98 def loopback_latency_check(**args):
     99     """Checks loopback latency.
    100 
    101     @param args: additional arguments for loopback_latency.
    102 
    103     @return A tuple containing measured and reported latency in uS.
    104         Return None if no audio detected.
    105     """
    106     noise_threshold = str(args['n']) if args.has_key('n') else '400'
    107 
    108     cmd = '%s -n %s' % (LOOPBACK_LATENCY_PATH, noise_threshold)
    109 
    110     output = utils.system_output(cmd, retain_output=True)
    111 
    112     # Sleep for a short while to make sure device is not busy anymore
    113     # after called loopback_latency.
    114     time.sleep(.1)
    115 
    116     measured_latency = None
    117     reported_latency = None
    118     for line in output.split('\n'):
    119         match = re.search(_MEASURED_LATENCY_RE, line, re.I)
    120         if match:
    121             measured_latency = int(match.group(1))
    122             continue
    123         match = re.search(_REPORTED_LATENCY_RE, line, re.I)
    124         if match:
    125             reported_latency = int(match.group(1))
    126             continue
    127         if re.search(_AUDIO_NOT_FOUND_RE, line, re.I):
    128             return None
    129     if measured_latency and reported_latency:
    130         return (measured_latency, reported_latency)
    131     else:
    132         # Should not reach here, just in case.
    133         return None
    134 
    135 def get_mixer_jack_status(jack_reg_exp):
    136     """Gets the mixer jack status.
    137 
    138     @param jack_reg_exp: The regular expression to match jack control name.
    139 
    140     @return None if the control does not exist, return True if jack control
    141         is detected plugged, return False otherwise.
    142     """
    143     output = utils.system_output('amixer -c0 controls', retain_output=True)
    144     numid = None
    145     for line in output.split('\n'):
    146         m = jack_reg_exp.match(line)
    147         if m:
    148             numid = m.group(1)
    149             break
    150 
    151     # Proceed only when matched numid is not empty.
    152     if numid:
    153         output = utils.system_output('amixer -c0 cget numid=%s' % numid)
    154         for line in output.split('\n'):
    155             if _JACK_VALUE_ON_RE.match(line):
    156                 return True
    157         return False
    158     else:
    159         return None
    160 
    161 def get_hp_jack_status():
    162     """Gets the status of headphone jack."""
    163     status = get_mixer_jack_status(_HP_JACK_CONTROL_RE)
    164     if status is not None:
    165         return status
    166 
    167     # When headphone jack is not found in amixer, lookup input devices
    168     # instead.
    169     #
    170     # TODO(hychao): Check hp/mic jack status dynamically from evdev. And
    171     # possibly replace the existing check using amixer.
    172     for evdev in glob('/dev/input/event*'):
    173         device = InputDevice(evdev)
    174         if device.is_hp_jack():
    175             return device.get_headphone_insert()
    176     else:
    177         return None
    178 
    179 def get_mic_jack_status():
    180     """Gets the status of mic jack."""
    181     status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE)
    182     if status is not None:
    183         return status
    184 
    185     # When mic jack is not found in amixer, lookup input devices instead.
    186     for evdev in glob('/dev/input/event*'):
    187         device = InputDevice(evdev)
    188         if device.is_mic_jack():
    189             return device.get_microphone_insert()
    190     else:
    191         return None
    192 
    193 def log_loopback_dongle_status():
    194     """Log the status of the loopback dongle to make sure it is equipped."""
    195     dongle_status_ok = True
    196 
    197     # Check Mic Jack
    198     mic_jack_status = get_mic_jack_status()
    199     logging.info('Mic jack status: %s', mic_jack_status)
    200     dongle_status_ok &= bool(mic_jack_status)
    201 
    202     # Check Headphone Jack
    203     hp_jack_status = get_hp_jack_status()
    204     logging.info('Headphone jack status: %s', hp_jack_status)
    205     dongle_status_ok &= bool(hp_jack_status)
    206 
    207     # Use latency check to test if audio can be captured through dongle.
    208     # We only want to know the basic function of dongle, so no need to
    209     # assert the latency accuracy here.
    210     latency = loopback_latency_check(n=4000)
    211     if latency:
    212         logging.info('Got latency measured %d, reported %d',
    213                 latency[0], latency[1])
    214     else:
    215         logging.info('Latency check fail.')
    216         dongle_status_ok = False
    217 
    218     logging.info('audio loopback dongle test: %s',
    219             'PASS' if dongle_status_ok else 'FAIL')
    220 
    221 # Functions to test audio palyback.
    222 def play_sound(duration_seconds=None, audio_file_path=None):
    223     """Plays a sound file found at |audio_file_path| for |duration_seconds|.
    224 
    225     If |audio_file_path|=None, plays a default audio file.
    226     If |duration_seconds|=None, plays audio file in its entirety.
    227 
    228     @param duration_seconds: Duration to play sound.
    229     @param audio_file_path: Path to the audio file.
    230     """
    231     if not audio_file_path:
    232         audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav'
    233     duration_arg = ('-d %d' % duration_seconds) if duration_seconds else ''
    234     utils.system('aplay %s %s' % (duration_arg, audio_file_path))
    235 
    236 def get_play_sine_args(channel, odev='default', freq=1000, duration=10,
    237                        sample_size=16):
    238     """Gets the command args to generate a sine wav to play to odev.
    239 
    240     @param channel: 0 for left, 1 for right; otherwize, mono.
    241     @param odev: alsa output device.
    242     @param freq: frequency of the generated sine tone.
    243     @param duration: duration of the generated sine tone.
    244     @param sample_size: output audio sample size. Default to 16.
    245     """
    246     cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa',
    247                odev, 'synth', str(duration)]
    248     if channel == 0:
    249         cmdargs += ['sine', str(freq), 'sine', '0']
    250     elif channel == 1:
    251         cmdargs += ['sine', '0', 'sine', str(freq)]
    252     else:
    253         cmdargs += ['sine', str(freq)]
    254 
    255     return cmdargs
    256 
    257 def play_sine(channel, odev='default', freq=1000, duration=10,
    258               sample_size=16):
    259     """Generates a sine wave and plays to odev.
    260 
    261     @param channel: 0 for left, 1 for right; otherwize, mono.
    262     @param odev: alsa output device.
    263     @param freq: frequency of the generated sine tone.
    264     @param duration: duration of the generated sine tone.
    265     @param sample_size: output audio sample size. Default to 16.
    266     """
    267     cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size)
    268     utils.system(' '.join(cmdargs))
    269 
    270 # Functions to compose customized sox command, execute it and process the
    271 # output of sox command.
    272 def get_sox_mixer_cmd(infile, channel,
    273                       num_channels=_DEFAULT_NUM_CHANNELS,
    274                       sox_format=_DEFAULT_SOX_FORMAT):
    275     """Gets sox mixer command to reduce channel.
    276 
    277     @param infile: Input file name.
    278     @param channel: The selected channel to take effect.
    279     @param num_channels: The number of total channels to test.
    280     @param sox_format: Format to generate sox command.
    281     """
    282     # Build up a pan value string for the sox command.
    283     if channel == 0:
    284         pan_values = '1'
    285     else:
    286         pan_values = '0'
    287     for pan_index in range(1, num_channels):
    288         if channel == pan_index:
    289             pan_values = '%s%s' % (pan_values, ',1')
    290         else:
    291             pan_values = '%s%s' % (pan_values, ',0')
    292 
    293     return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH,
    294             sox_format, infile, sox_format, pan_values)
    295 
    296 def sox_stat_output(infile, channel,
    297                     num_channels=_DEFAULT_NUM_CHANNELS,
    298                     sox_format=_DEFAULT_SOX_FORMAT):
    299     """Executes sox stat command.
    300 
    301     @param infile: Input file name.
    302     @param channel: The selected channel.
    303     @param num_channels: The number of total channels to test.
    304     @param sox_format: Format to generate sox command.
    305 
    306     @return The output of sox stat command
    307     """
    308     sox_mixer_cmd = get_sox_mixer_cmd(infile, channel,
    309                                       num_channels, sox_format)
    310     stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format)
    311     sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
    312     return utils.system_output(sox_cmd, retain_output=True)
    313 
    314 def get_audio_rms(sox_output):
    315     """Gets the audio RMS value from sox stat output
    316 
    317     @param sox_output: Output of sox stat command.
    318 
    319     @return The RMS value parsed from sox stat output.
    320     """
    321     for rms_line in sox_output.split('\n'):
    322         m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
    323         if m is not None:
    324             return float(m.group(1))
    325 
    326 def get_rough_freq(sox_output):
    327     """Gets the rough audio frequency from sox stat output
    328 
    329     @param sox_output: Output of sox stat command.
    330 
    331     @return The rough frequency value parsed from sox stat output.
    332     """
    333     for rms_line in sox_output.split('\n'):
    334         m = _SOX_ROUGH_FREQ_RE.match(rms_line)
    335         if m is not None:
    336             return int(m.group(1))
    337 
    338 def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD):
    339     """Checks if the calculated RMS value is expected.
    340 
    341     @param sox_output: The output from sox stat command.
    342     @param sox_threshold: The threshold to test RMS value against.
    343 
    344     @raises error.TestError if RMS amplitude can't be parsed.
    345     @raises error.TestFail if the RMS amplitude of the recording isn't above
    346             the threshold.
    347     """
    348     rms_val = get_audio_rms(sox_output)
    349 
    350     # In case we don't get a valid RMS value.
    351     if rms_val is None:
    352         raise error.TestError(
    353             'Failed to generate an audio RMS value from playback.')
    354 
    355     logging.info('Got audio RMS value of %f. Minimum pass is %f.',
    356                  rms_val, sox_threshold)
    357     if rms_val < sox_threshold:
    358         raise error.TestFail(
    359             'Audio RMS value %f too low. Minimum pass is %f.' %
    360             (rms_val, sox_threshold))
    361 
    362 def noise_reduce_file(in_file, noise_file, out_file,
    363                       sox_format=_DEFAULT_SOX_FORMAT):
    364     """Runs the sox command to reduce noise.
    365 
    366     Runs the sox command to noise-reduce in_file using the noise
    367     profile from noise_file.
    368 
    369     @param in_file: The file to noise reduce.
    370     @param noise_file: The file containing the noise profile.
    371         This can be created by recording silence.
    372     @param out_file: The file contains the noise reduced sound.
    373     @param sox_format: The  sox format to generate sox command.
    374     """
    375     prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH,
    376                sox_format, noise_file)
    377     reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
    378             (SOX_PATH, sox_format, in_file, sox_format, out_file))
    379     utils.system('%s | %s' % (prof_cmd, reduce_cmd))
    380 
    381 def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND):
    382     """Records a sample from the default input device.
    383 
    384     @param tmpfile: The file to record to.
    385     @param record_command: The command to record audio.
    386     """
    387     utils.system('%s %s' % (record_command, tmpfile))
    388 
    389 def create_wav_file(wav_dir, prefix=""):
    390     """Creates a unique name for wav file.
    391 
    392     The created file name will be preserved in autotest result directory
    393     for future analysis.
    394 
    395     @param wav_dir: The directory of created wav file.
    396     @param prefix: specified file name prefix.
    397     """
    398     filename = "%s-%s.wav" % (prefix, time.time())
    399     return os.path.join(wav_dir, filename)
    400 
    401 def run_in_parallel(*funs):
    402     """Runs methods in parallel.
    403 
    404     @param funs: methods to run.
    405     """
    406     threads = []
    407     for f in funs:
    408         t = threading.Thread(target=f)
    409         t.start()
    410         threads.append(t)
    411 
    412     for t in threads:
    413         t.join()
    414 
    415 def loopback_test_channels(noise_file_name, wav_dir,
    416                            playback_callback=None,
    417                            check_recorded_callback=check_audio_rms,
    418                            preserve_test_file=True,
    419                            num_channels = _DEFAULT_NUM_CHANNELS,
    420                            record_callback=record_sample,
    421                            mix_callback=None):
    422     """Tests loopback on all channels.
    423 
    424     @param noise_file_name: Name of the file contains pre-recorded noise.
    425     @param wav_dir: The directory of created wav file.
    426     @param playback_callback: The callback to do the playback for
    427         one channel.
    428     @param record_callback: The callback to do the recording.
    429     @param check_recorded_callback: The callback to check recorded file.
    430     @param preserve_test_file: Retain the recorded files for future debugging.
    431     @param num_channels: The number of total channels to test.
    432     @param mix_callback: The callback to do on the one-channel file.
    433     """
    434     for channel in xrange(num_channels):
    435         record_file_name = create_wav_file(wav_dir,
    436                                            "record-%d" % channel)
    437         functions = [lambda: record_callback(record_file_name)]
    438 
    439         if playback_callback:
    440             functions.append(lambda: playback_callback(channel))
    441 
    442         if mix_callback:
    443             mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel)
    444             functions.append(lambda: mix_callback(mix_file_name))
    445 
    446         run_in_parallel(*functions)
    447 
    448         if mix_callback:
    449             sox_output_mix = sox_stat_output(mix_file_name, channel)
    450             rms_val_mix = get_audio_rms(sox_output_mix)
    451             logging.info('Got mixed audio RMS value of %f.', rms_val_mix)
    452 
    453         sox_output_record = sox_stat_output(record_file_name, channel)
    454         rms_val_record = get_audio_rms(sox_output_record)
    455         logging.info('Got recorded audio RMS value of %f.', rms_val_record)
    456 
    457         reduced_file_name = create_wav_file(wav_dir,
    458                                             "reduced-%d" % channel)
    459         noise_reduce_file(record_file_name, noise_file_name,
    460                           reduced_file_name)
    461 
    462         sox_output_reduced = sox_stat_output(reduced_file_name, channel)
    463 
    464         if not preserve_test_file:
    465             os.unlink(reduced_file_name)
    466             os.unlink(record_file_name)
    467             if mix_callback:
    468                 os.unlink(mix_file_name)
    469 
    470         check_recorded_callback(sox_output_reduced)
    471 
    472 
    473 def get_channel_sox_stat(
    474         input_audio, channel_index, channels=2, bits=16, rate=48000):
    475     """Gets the sox stat info of the selected channel in the input audio file.
    476 
    477     @param input_audio: The input audio file to be analyzed.
    478     @param channel_index: The index of the channel to be analyzed.
    479                           (1 for the first channel).
    480     @param channels: The number of channels in the input audio.
    481     @param bits: The number of bits of each audio sample.
    482     @param rate: The sampling rate.
    483     """
    484     if channel_index <= 0 or channel_index > channels:
    485         raise ValueError('incorrect channel_indexi: %d' % channel_index)
    486 
    487     if channels == 1:
    488         return sox_utils.get_stat(
    489                 input_audio, channels=channels, bits=bits, rate=rate)
    490 
    491     p1 = cmd_utils.popen(
    492             sox_utils.extract_channel_cmd(
    493                     input_audio, '-', channel_index,
    494                     channels=channels, bits=bits, rate=rate),
    495             stdout=cmd_utils.PIPE)
    496     p2 = cmd_utils.popen(
    497             sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate),
    498             stdin=p1.stdout, stderr=cmd_utils.PIPE)
    499     stat_output = p2.stderr.read()
    500     cmd_utils.wait_and_check_returncode(p1, p2)
    501     return sox_utils.parse_stat_output(stat_output)
    502 
    503 
    504 def get_rms(input_audio, channels=1, bits=16, rate=48000):
    505     """Gets the RMS values of all channels of the input audio.
    506 
    507     @param input_audio: The input audio file to be checked.
    508     @param channels: The number of channels in the input audio.
    509     @param bits: The number of bits of each audio sample.
    510     @param rate: The sampling rate.
    511     """
    512     stats = [get_channel_sox_stat(
    513             input_audio, i + 1, channels=channels, bits=bits,
    514             rate=rate) for i in xrange(channels)]
    515 
    516     logging.info('sox stat: %s', [str(s) for s in stats])
    517     return [s.rms for s in stats]
    518 
    519 
    520 def reduce_noise_and_get_rms(
    521         input_audio, noise_file, channels=1, bits=16, rate=48000):
    522     """Reduces noise in the input audio by the given noise file and then gets
    523     the RMS values of all channels of the input audio.
    524 
    525     @param input_audio: The input audio file to be analyzed.
    526     @param noise_file: The noise file used to reduce noise in the input audio.
    527     @param channels: The number of channels in the input audio.
    528     @param bits: The number of bits of each audio sample.
    529     @param rate: The sampling rate.
    530     """
    531     with tempfile.NamedTemporaryFile() as reduced_file:
    532         p1 = cmd_utils.popen(
    533                 sox_utils.noise_profile_cmd(
    534                         noise_file, '-', channels=channels, bits=bits,
    535                         rate=rate),
    536                 stdout=cmd_utils.PIPE)
    537         p2 = cmd_utils.popen(
    538                 sox_utils.noise_reduce_cmd(
    539                         input_audio, reduced_file.name, '-',
    540                         channels=channels, bits=bits, rate=rate),
    541                 stdin=p1.stdout)
    542         cmd_utils.wait_and_check_returncode(p1, p2)
    543         return get_rms(reduced_file.name, channels, bits, rate)
    544 
    545 
    546 def skip_devices_to_test(*boards):
    547     """Devices to skip due to hardware or test compatibility issues.
    548 
    549     @param boards: the boards to skip testing.
    550     """
    551     # TODO(scottz): Remove this when crbug.com/220147 is fixed.
    552     dut_board = utils.get_current_board()
    553     if dut_board in boards:
    554        raise error.TestNAError('This test is not available on %s' % dut_board)
    555 
    556 
    557 def cras_rms_test_setup():
    558     """Setups for the cras_rms_tests.
    559 
    560     To make sure the line_out-to-mic_in path is all green.
    561     """
    562     # TODO(owenlin): Now, the nodes are choosed by chrome.
    563     #                We should do it here.
    564     cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME)
    565     cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME)
    566 
    567     cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN)
    568 
    569     cras_utils.set_system_mute(False)
    570     cras_utils.set_capture_mute(False)
    571 
    572 
    573 def generate_rms_postmortem():
    574     """Generates postmortem for rms tests."""
    575     try:
    576         logging.info('audio postmortem report')
    577         log_loopback_dongle_status()
    578         logging.info(get_audio_diagnostics())
    579     except Exception:
    580         logging.exception('Error while generating postmortem report')
    581 
    582 
    583 def get_audio_diagnostics():
    584     """Gets audio diagnostic results.
    585 
    586     @returns: a string containing diagnostic results.
    587 
    588     """
    589     return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=cmd_utils.PIPE)
    590 
    591 
    592 def get_max_cross_correlation(signal_a, signal_b):
    593     """Gets max cross-correlation and best time delay of two signals.
    594 
    595     Computes cross-correlation function between two
    596     signals and gets the maximum value and time delay.
    597     The steps includes:
    598       1. Compute cross-correlation function of X and Y and get Cxy.
    599          The correlation function Cxy is an array where Cxy[k] is the
    600          cross product of X and Y when Y is delayed by k.
    601          Refer to manual of numpy.correlate for detail of correlation.
    602       2. Find the maximum value C_max and index C_index in Cxy.
    603       3. Compute L2 norm of X and Y to get norm(X) and norm(Y).
    604       4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation.
    605 
    606     Max cross-correlation indicates the similarity of X and Y. The value
    607     is 1 if X equals Y multiplied by a positive scalar.
    608     The value is -1 if X equals Y multiplied by a negative scaler.
    609     Any constant level shift will be regarded as distortion and will make
    610     max cross-correlation value deviated from 1.
    611     C_index is the best time delay of Y that make Y looks similar to X.
    612     Refer to http://en.wikipedia.org/wiki/Cross-correlation.
    613 
    614     @param signal_a: A list of numbers which contains the first signal.
    615     @param signal_b: A list of numbers which contains the second signal.
    616 
    617     @raises: ValueError if any number in signal_a or signal_b is not a float.
    618              ValueError if norm of any array is less than _MINIMUM_NORM.
    619 
    620     @returns: A tuple (correlation index, best delay). If there are more than
    621               one best delay, just return the first one.
    622     """
    623     def check_list_contains_float(numbers):
    624         """Checks the elements in a list are all float.
    625 
    626         @param numbers: A list of numbers.
    627 
    628         @raises: ValueError if there is any element which is not a float
    629                  in the list.
    630         """
    631         if any(not isinstance(x, float) for x in numbers):
    632             raise ValueError('List contains number which is not a float')
    633 
    634     check_list_contains_float(signal_a)
    635     check_list_contains_float(signal_b)
    636 
    637     norm_a = numpy.linalg.norm(signal_a)
    638     norm_b = numpy.linalg.norm(signal_b)
    639     logging.debug('norm_a: %f', norm_a)
    640     logging.debug('norm_b: %f', norm_b)
    641     if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM:
    642         raise ValueError('No meaningful data as norm is too small.')
    643 
    644     correlation = numpy.correlate(signal_a, signal_b, 'full')
    645     max_correlation = max(correlation)
    646     best_delays = [i for i, j in enumerate(correlation) if j == max_correlation]
    647     if len(best_delays) > 1:
    648         logging.warning('There are more than one best delay: %r', best_delays)
    649     return max_correlation / (norm_a * norm_b), best_delays[0]
    650 
    651 
    652 def trim_data(data, threshold=0):
    653     """Trims a data by removing value that is too small in head and tail.
    654 
    655     Removes elements in head and tail whose absolute value is smaller than
    656     or equal to threshold.
    657     E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) =
    658     ([0.2, 0.3, 0.2], 2)
    659 
    660     @param data: A list of numbers.
    661     @param threshold: The threshold to compare against.
    662 
    663     @returns: A tuple (trimmed_data, end_trimmed_length), where
    664               end_trimmed_length is the length of original data being trimmed
    665               from the end.
    666               Returns ([], None) if there is no valid data.
    667     """
    668     indice_valid = [
    669             i for i, j in enumerate(data) if abs(j) > threshold]
    670     if not indice_valid:
    671         logging.warning(
    672                 'There is no element with absolute value greater '
    673                 'than threshold %f', threshold)
    674         return [], None
    675     logging.debug('Start and end of indice_valid: %d, %d',
    676                   indice_valid[0], indice_valid[-1])
    677     end_trimmed_length = len(data) - indice_valid[-1] - 1
    678     logging.debug('Trimmed length in the end: %d', end_trimmed_length)
    679     return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length)
    680 
    681 
    682 def get_one_channel_correlation(test_data, golden_data):
    683     """Gets max cross-correlation of test_data and golden_data.
    684 
    685     Trims test data and compute the max cross-correlation against golden_data.
    686     Signal can be trimmed because those zero values in the head and tail of
    687     a signal will not affect correlation computation.
    688 
    689     @param test_data: A list containing the data to compare against golden data.
    690     @param golden_data: A list containing the golden data.
    691 
    692     @returns: A tuple (max cross-correlation, best_delay) if data is valid.
    693               Otherwise returns (None, None). Refer to docstring of
    694               get_max_cross_correlation.
    695     """
    696     trimmed_test_data, end_trimmed_length = trim_data(test_data)
    697 
    698     def to_float(samples):
    699       """Casts elements in the list to float.
    700 
    701       @param samples: A list of numbers.
    702 
    703       @returns: A list of original numbers casted to float.
    704       """
    705       samples_float = [float(x) for x in samples]
    706       return samples_float
    707 
    708     max_cross_correlation, best_delay =  get_max_cross_correlation(
    709             to_float(golden_data),
    710             to_float(trimmed_test_data))
    711 
    712     # The reason to add back the trimmed length in the end.
    713     # E.g.:
    714     # golden data:
    715     #
    716     # |-----------vvvv----------------|  vvvv is the signal of interest.
    717     #       a                 b
    718     #
    719     # test data:
    720     #
    721     # |---x----vvvv--------x----------------|  x is the place to trim.
    722     #   c   d         e            f
    723     #
    724     # trimmed test data:
    725     #
    726     # |----vvvv--------|
    727     #   d         e
    728     #
    729     # The first output of cross correlation computation :
    730     #
    731     #                  |-----------vvvv----------------|
    732     #                       a                 b
    733     #
    734     # |----vvvv--------|
    735     #   d         e
    736     #
    737     # The largest output of cross correlation computation happens at
    738     # delay a + e.
    739     #
    740     #                  |-----------vvvv----------------|
    741     #                       a                 b
    742     #
    743     #                         |----vvvv--------|
    744     #                           d         e
    745     #
    746     # Cross correlation starts computing by aligning the last sample
    747     # of the trimmed test data to the first sample of golden data.
    748     # The best delay calculated from trimmed test data and golden data
    749     # cross correlation is e + a. But the real best delay that should be
    750     # identical on two channel should be e + a + f.
    751     # So we need to add back the length being trimmed in the end.
    752 
    753     if max_cross_correlation:
    754         return max_cross_correlation, best_delay + end_trimmed_length
    755     else:
    756         return None, None
    757 
    758 
    759 def compare_one_channel_correlation(test_data, golden_data, parameters):
    760     """Compares two one-channel data by correlation.
    761 
    762     @param test_data: A list containing the data to compare against golden data.
    763     @param golden_data: A list containing the golden data.
    764     @param parameters: A dict containing parameters for method.
    765 
    766     @returns: A dict containing:
    767               index: The index of similarity where 1 means they are different
    768                   only by a positive scale.
    769               best_delay: The best delay of test data in relative to golden
    770                   data.
    771               equal: A bool containing comparing result.
    772     """
    773     if 'correlation_threshold' in parameters:
    774         threshold = parameters['correlation_threshold']
    775     else:
    776         threshold = _CORRELATION_INDEX_THRESHOLD
    777 
    778     result_dict = dict()
    779     max_cross_correlation, best_delay = get_one_channel_correlation(
    780             test_data, golden_data)
    781     result_dict['index'] = max_cross_correlation
    782     result_dict['best_delay'] = best_delay
    783     result_dict['equal'] = True if (
    784         max_cross_correlation and
    785         max_cross_correlation > threshold) else False
    786     logging.debug('result_dict: %r', result_dict)
    787     return result_dict
    788 
    789 
    790 def get_one_channel_stat(data, data_format):
    791     """Gets statistic information of data.
    792 
    793     @param data: A list containing one channel data.
    794     @param data_format: A dict containing data format of data.
    795 
    796     @return: The sox stat parsed result. An object containing
    797              sameple_count: An int. Samples read.
    798              length: A float. Length in seconds.
    799              rms: A float. RMS amplitude.
    800              rough_frequency: A float. Rough frequency.
    801     """
    802     if not data:
    803         raise ValueError('Data is empty. Can not get stat')
    804     raw_data = audio_data.AudioRawData(
    805             binary=None, channel=1,
    806             sample_format=data_format['sample_format'])
    807     raw_data.copy_channel_data([data])
    808     with tempfile.NamedTemporaryFile() as raw_data_file:
    809         raw_data_path = raw_data_file.name
    810         raw_data.write_to_file(raw_data_path)
    811 
    812         bits = 8 * (audio_data.SAMPLE_FORMATS[
    813                     data_format['sample_format']]['size_bytes'])
    814         stat = sox_utils.get_stat(raw_data_path, channels=1, bits=bits,
    815                                   rate=data_format['rate'])
    816         return stat
    817 
    818 
    819 def compare_one_channel_frequency(test_data, test_data_format,
    820                                   golden_data, golden_data_format):
    821     """Compares two one-channel data by frequency.
    822 
    823     @param test_data: A list containing the data to compare against golden data.
    824     @param test_data_format: A dict containing data format of test data.
    825     @param golden_data: A list containing the golden data.
    826     @param golden_data_format: A dict containing data format of golden data.
    827 
    828     @returns: A dict containing:
    829               test_data_frequency: test data frequency.
    830               golden_data_frequency: golden data frequency.
    831               equal: A bool containing comparing result.
    832 
    833     @raises: ValueError if the test data RMS is too small to be meaningful.
    834 
    835     """
    836     result_dict = dict()
    837     golden_data_stat = get_one_channel_stat(golden_data, golden_data_format)
    838     logging.info('Get golden data one channel stat: %s', golden_data_stat)
    839     test_data_stat = get_one_channel_stat(test_data, test_data_format)
    840     logging.info('Get test data one channel stat: %s', test_data_stat)
    841 
    842     result_dict['golden_data_frequency'] = golden_data_stat.rough_frequency
    843     result_dict['test_data_frequency'] = test_data_stat.rough_frequency
    844     result_dict['equal'] = True if (
    845             abs(result_dict['test_data_frequency'] -
    846                 result_dict['golden_data_frequency']) < _FREQUENCY_DIFF_THRESHOLD
    847             ) else False
    848     logging.debug('result_dict: %r', result_dict)
    849     if test_data_stat.rms < _MEANINGFUL_RMS_THRESHOLD:
    850         raise ValueError('Recorded RMS %f is too small to be meaningful.',
    851                          test_data_stat.rms)
    852     return result_dict
    853 
    854 
    855 def compare_one_channel_data(test_data, test_data_format,
    856                              golden_data, golden_data_format, method,
    857                              parameters):
    858     """Compares two one-channel data.
    859 
    860     @param test_data: A list containing the data to compare against golden data.
    861     @param test_data_format: The data format of test data.
    862     @param golden_data: A list containing the golden data.
    863     @param golden_data_format: The data format of golden data.
    864     @param method: The comparing method. Currently only 'correlation' is
    865                    supported.
    866     @param parameters: A dict containing parameters for method.
    867 
    868     @returns: A dict containing:
    869               index: The index of similarity where 1 means they are different
    870                   only by a positive scale.
    871               best_delay: The best delay of test data in relative to golden
    872                   data.
    873               equal: A bool containing comparing result.
    874 
    875     @raises: NotImplementedError if method is not supported.
    876     """
    877     if method == 'correlation':
    878         return compare_one_channel_correlation(test_data, golden_data,
    879                 parameters)
    880     if method == 'frequency':
    881         return compare_one_channel_frequency(
    882                 test_data, test_data_format, golden_data, golden_data_format)
    883     raise NotImplementedError('method %s is not implemented' % method)
    884 
    885 
    886 def compare_data(golden_data_binary, golden_data_format,
    887                  test_data_binary, test_data_format,
    888                  channel_map, method, parameters=None):
    889     """Compares two raw data.
    890 
    891     @param golden_data_binary: The binary containing golden data.
    892     @param golden_data_format: The data format of golden data.
    893     @param test_data_binary: The binary containing test data.
    894     @param test_data_format: The data format of test data.
    895     @param channel_map: A list containing channel mapping.
    896                         E.g. [1, 0, None, None, None, None, None, None] means
    897                         channel 0 of test data should map to channel 1 of
    898                         golden data. Channel 1 of test data should map to
    899                         channel 0 of golden data. Channel 2 to 7 of test data
    900                         should be skipped.
    901     @param method: The method to compare data. Use 'correlation' to compare
    902                    general data. Use 'frequency' to compare data containing
    903                    sine wave.
    904 
    905     @param parameters: A dict containing parameters for method, if needed.
    906 
    907     @returns: A boolean for compare result.
    908 
    909     @raises: NotImplementedError if file type is not raw.
    910              NotImplementedError if sampling rates of two data are not the same.
    911     """
    912     if parameters is None:
    913         parameters = dict()
    914 
    915     if (golden_data_format['file_type'] != 'raw' or
    916         test_data_format['file_type'] != 'raw'):
    917         raise NotImplementedError('Only support raw data in compare_data.')
    918     if (golden_data_format['rate'] != test_data_format['rate']):
    919         raise NotImplementedError(
    920                 'Only support comparing data with the same sampling rate')
    921     golden_data = audio_data.AudioRawData(
    922             binary=golden_data_binary,
    923             channel=golden_data_format['channel'],
    924             sample_format=golden_data_format['sample_format'])
    925     test_data = audio_data.AudioRawData(
    926             binary=test_data_binary,
    927             channel=test_data_format['channel'],
    928             sample_format=test_data_format['sample_format'])
    929     compare_results = []
    930     for test_channel, golden_channel in enumerate(channel_map):
    931         if golden_channel is None:
    932             logging.info('Skipped channel %d', test_channel)
    933             continue
    934         test_data_one_channel = test_data.channel_data[test_channel]
    935         golden_data_one_channel = golden_data.channel_data[golden_channel]
    936         result_dict = dict(test_channel=test_channel,
    937                            golden_channel=golden_channel)
    938         result_dict.update(
    939                 compare_one_channel_data(
    940                         test_data_one_channel, test_data_format,
    941                         golden_data_one_channel, golden_data_format, method,
    942                         parameters))
    943         compare_results.append(result_dict)
    944     logging.info('compare_results: %r', compare_results)
    945     return_value = False if not compare_results else True
    946     for result in compare_results:
    947         if not result['equal']:
    948             logging.error(
    949                     'Failed on test channel %d and golden channel %d',
    950                     result['test_channel'], result['golden_channel'])
    951             return_value = False
    952     # Also checks best delay are exactly the same.
    953     if method == 'correlation':
    954         best_delays = set([result['best_delay'] for result in compare_results])
    955         if len(best_delays) > 1:
    956             logging.error('There are more than one best delay.')
    957             return_value = False
    958     return return_value
    959 
    960 
    961 class _base_rms_test(test.test):
    962     """Base class for all rms_test """
    963 
    964     def postprocess(self):
    965         super(_base_rms_test, self).postprocess()
    966 
    967         # Sum up the number of failed constraints in each iteration
    968         if sum(len(x) for x in self.failed_constraints):
    969             generate_rms_postmortem()
    970 
    971 
    972 class chrome_rms_test(_base_rms_test):
    973     """Base test class for audio RMS test with Chrome.
    974 
    975     The chrome instance can be accessed by self.chrome.
    976     """
    977     def warmup(self):
    978         skip_devices_to_test('x86-mario')
    979         super(chrome_rms_test, self).warmup()
    980 
    981         # Not all client of this file using telemetry.
    982         # Just do the import here for those who really need it.
    983         from autotest_lib.client.common_lib.cros import chrome
    984 
    985         self.chrome = chrome.Chrome()
    986 
    987         # The audio configuration could be changed when we
    988         # restart chrome.
    989         try:
    990             cras_rms_test_setup()
    991         except Exception:
    992             self.chrome.browser.Close()
    993             raise
    994 
    995 
    996     def cleanup(self, *args):
    997         try:
    998             self.chrome.browser.Close()
    999         finally:
   1000             super(chrome_rms_test, self).cleanup()
   1001 
   1002 class cras_rms_test(_base_rms_test):
   1003     """Base test class for CRAS audio RMS test."""
   1004 
   1005     def warmup(self):
   1006         skip_devices_to_test('x86-mario')
   1007         super(cras_rms_test, self).warmup()
   1008         cras_rms_test_setup()
   1009 
   1010 
   1011 def alsa_rms_test_setup():
   1012     """Setup for alsa_rms_test.
   1013 
   1014     Different boards/chipsets have different set of mixer controls.  Even
   1015     controls that have the same name on different boards might have different
   1016     capabilities.  The following is a general idea to setup a given class of
   1017     boards, and some specialized setup for certain boards.
   1018     """
   1019     card_id = alsa_utils.get_first_soundcard_with_control('Mic Jack', 'Mic')
   1020     arch = utils.get_arch()
   1021     board = utils.get_board()
   1022     uses_max98090 = os.path.exists('/sys/module/snd_soc_max98090')
   1023     if board in ['daisy_spring', 'daisy_skate']:
   1024         # The MIC controls of the boards do not support dB syntax.
   1025         alsa_utils.mixer_cmd(card_id,
   1026                              'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME)
   1027         alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_MAX_VOLUME)
   1028         alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_MAX_VOLUME)
   1029     elif arch in ['armv7l', 'aarch64'] or uses_max98090:
   1030         # ARM platforms or Intel platforms that uses max98090 codec driver.
   1031         alsa_utils.mixer_cmd(card_id,
   1032                              'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME)
   1033         alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_CAPTURE_GAIN)
   1034         alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_CAPTURE_GAIN)
   1035     else:
   1036         # The rest of Intel platforms.
   1037         alsa_utils.mixer_cmd(card_id, 'sset Master ' + _DEFAULT_ALSA_MAX_VOLUME)
   1038         alsa_utils.mixer_cmd(card_id,
   1039                              'sset Capture ' + _DEFAULT_ALSA_CAPTURE_GAIN)
   1040 
   1041 
   1042 class alsa_rms_test(_base_rms_test):
   1043     """Base test class for ALSA audio RMS test."""
   1044 
   1045     def warmup(self):
   1046         skip_devices_to_test('x86-mario')
   1047         super(alsa_rms_test, self).warmup()
   1048 
   1049         alsa_rms_test_setup()
   1050