1 #!/usr/bin/python 2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3 # Use of this source code is governed by a BSD-style license that can be 4 # found in the LICENSE file. 5 6 7 import logging 8 import numpy 9 import os 10 import re 11 import tempfile 12 import threading 13 import time 14 15 from glob import glob 16 from autotest_lib.client.bin import test, utils 17 from autotest_lib.client.bin.input.input_device import * 18 from autotest_lib.client.common_lib import error 19 from autotest_lib.client.cros.audio import alsa_utils 20 from autotest_lib.client.cros.audio import audio_data 21 from autotest_lib.client.cros.audio import cmd_utils 22 from autotest_lib.client.cros.audio import cras_utils 23 from autotest_lib.client.cros.audio import sox_utils 24 25 LD_LIBRARY_PATH = 'LD_LIBRARY_PATH' 26 27 _AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics' 28 29 _DEFAULT_NUM_CHANNELS = 2 30 _DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat' 31 _DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L' 32 _DEFAULT_PLAYBACK_VOLUME = 100 33 _DEFAULT_CAPTURE_GAIN = 2500 34 _DEFAULT_ALSA_MAX_VOLUME = '100%' 35 _DEFAULT_ALSA_CAPTURE_GAIN = '25dB' 36 37 # Minimum RMS value to pass when checking recorded file. 38 _DEFAULT_SOX_RMS_THRESHOLD = 0.08 39 40 _JACK_VALUE_ON_RE = re.compile('.*values=on') 41 _HP_JACK_CONTROL_RE = re.compile('numid=(\d+).*Headphone\sJack') 42 _MIC_JACK_CONTROL_RE = re.compile('numid=(\d+).*Mic\sJack') 43 44 _SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)') 45 _SOX_ROUGH_FREQ_RE = re.compile('Rough\s+frequency:\s+(.+)') 46 47 _AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected' 48 _MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS' 49 _REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS' 50 51 # Tools from platform/audiotest 52 AUDIOFUNTEST_PATH = 'audiofuntest' 53 AUDIOLOOP_PATH = 'looptest' 54 LOOPBACK_LATENCY_PATH = 'loopback_latency' 55 SOX_PATH = 'sox' 56 TEST_TONES_PATH = 'test_tones' 57 58 _MINIMUM_NORM = 0.001 59 _CORRELATION_INDEX_THRESHOLD = 0.999 60 # The minimum difference of estimated frequencies between two sine waves. 61 _FREQUENCY_DIFF_THRESHOLD = 20 62 # The minimum RMS value of meaningful audio data. 63 _MEANINGFUL_RMS_THRESHOLD = 0.001 64 65 def set_mixer_controls(mixer_settings={}, card='0'): 66 """Sets all mixer controls listed in the mixer settings on card. 67 68 @param mixer_settings: Mixer settings to set. 69 @param card: Index of audio card to set mixer settings for. 70 """ 71 logging.info('Setting mixer control values on %s', card) 72 for item in mixer_settings: 73 logging.info('Setting %s to %s on card %s', 74 item['name'], item['value'], card) 75 cmd = 'amixer -c %s cset name=%s %s' 76 cmd = cmd % (card, item['name'], item['value']) 77 try: 78 utils.system(cmd) 79 except error.CmdError: 80 # A card is allowed not to support all the controls, so don't 81 # fail the test here if we get an error. 82 logging.info('amixer command failed: %s', cmd) 83 84 def set_volume_levels(volume, capture): 85 """Sets the volume and capture gain through cras_test_client. 86 87 @param volume: The playback volume to set. 88 @param capture: The capture gain to set. 89 """ 90 logging.info('Setting volume level to %d', volume) 91 utils.system('/usr/bin/cras_test_client --volume %d' % volume) 92 logging.info('Setting capture gain to %d', capture) 93 utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture) 94 utils.system('/usr/bin/cras_test_client --dump_server_info') 95 utils.system('/usr/bin/cras_test_client --mute 0') 96 utils.system('amixer -c 0 contents') 97 98 def loopback_latency_check(**args): 99 """Checks loopback latency. 100 101 @param args: additional arguments for loopback_latency. 102 103 @return A tuple containing measured and reported latency in uS. 104 Return None if no audio detected. 105 """ 106 noise_threshold = str(args['n']) if args.has_key('n') else '400' 107 108 cmd = '%s -n %s' % (LOOPBACK_LATENCY_PATH, noise_threshold) 109 110 output = utils.system_output(cmd, retain_output=True) 111 112 # Sleep for a short while to make sure device is not busy anymore 113 # after called loopback_latency. 114 time.sleep(.1) 115 116 measured_latency = None 117 reported_latency = None 118 for line in output.split('\n'): 119 match = re.search(_MEASURED_LATENCY_RE, line, re.I) 120 if match: 121 measured_latency = int(match.group(1)) 122 continue 123 match = re.search(_REPORTED_LATENCY_RE, line, re.I) 124 if match: 125 reported_latency = int(match.group(1)) 126 continue 127 if re.search(_AUDIO_NOT_FOUND_RE, line, re.I): 128 return None 129 if measured_latency and reported_latency: 130 return (measured_latency, reported_latency) 131 else: 132 # Should not reach here, just in case. 133 return None 134 135 def get_mixer_jack_status(jack_reg_exp): 136 """Gets the mixer jack status. 137 138 @param jack_reg_exp: The regular expression to match jack control name. 139 140 @return None if the control does not exist, return True if jack control 141 is detected plugged, return False otherwise. 142 """ 143 output = utils.system_output('amixer -c0 controls', retain_output=True) 144 numid = None 145 for line in output.split('\n'): 146 m = jack_reg_exp.match(line) 147 if m: 148 numid = m.group(1) 149 break 150 151 # Proceed only when matched numid is not empty. 152 if numid: 153 output = utils.system_output('amixer -c0 cget numid=%s' % numid) 154 for line in output.split('\n'): 155 if _JACK_VALUE_ON_RE.match(line): 156 return True 157 return False 158 else: 159 return None 160 161 def get_hp_jack_status(): 162 """Gets the status of headphone jack.""" 163 status = get_mixer_jack_status(_HP_JACK_CONTROL_RE) 164 if status is not None: 165 return status 166 167 # When headphone jack is not found in amixer, lookup input devices 168 # instead. 169 # 170 # TODO(hychao): Check hp/mic jack status dynamically from evdev. And 171 # possibly replace the existing check using amixer. 172 for evdev in glob('/dev/input/event*'): 173 device = InputDevice(evdev) 174 if device.is_hp_jack(): 175 return device.get_headphone_insert() 176 else: 177 return None 178 179 def get_mic_jack_status(): 180 """Gets the status of mic jack.""" 181 status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE) 182 if status is not None: 183 return status 184 185 # When mic jack is not found in amixer, lookup input devices instead. 186 for evdev in glob('/dev/input/event*'): 187 device = InputDevice(evdev) 188 if device.is_mic_jack(): 189 return device.get_microphone_insert() 190 else: 191 return None 192 193 def log_loopback_dongle_status(): 194 """Log the status of the loopback dongle to make sure it is equipped.""" 195 dongle_status_ok = True 196 197 # Check Mic Jack 198 mic_jack_status = get_mic_jack_status() 199 logging.info('Mic jack status: %s', mic_jack_status) 200 dongle_status_ok &= bool(mic_jack_status) 201 202 # Check Headphone Jack 203 hp_jack_status = get_hp_jack_status() 204 logging.info('Headphone jack status: %s', hp_jack_status) 205 dongle_status_ok &= bool(hp_jack_status) 206 207 # Use latency check to test if audio can be captured through dongle. 208 # We only want to know the basic function of dongle, so no need to 209 # assert the latency accuracy here. 210 latency = loopback_latency_check(n=4000) 211 if latency: 212 logging.info('Got latency measured %d, reported %d', 213 latency[0], latency[1]) 214 else: 215 logging.info('Latency check fail.') 216 dongle_status_ok = False 217 218 logging.info('audio loopback dongle test: %s', 219 'PASS' if dongle_status_ok else 'FAIL') 220 221 # Functions to test audio palyback. 222 def play_sound(duration_seconds=None, audio_file_path=None): 223 """Plays a sound file found at |audio_file_path| for |duration_seconds|. 224 225 If |audio_file_path|=None, plays a default audio file. 226 If |duration_seconds|=None, plays audio file in its entirety. 227 228 @param duration_seconds: Duration to play sound. 229 @param audio_file_path: Path to the audio file. 230 """ 231 if not audio_file_path: 232 audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav' 233 duration_arg = ('-d %d' % duration_seconds) if duration_seconds else '' 234 utils.system('aplay %s %s' % (duration_arg, audio_file_path)) 235 236 def get_play_sine_args(channel, odev='default', freq=1000, duration=10, 237 sample_size=16): 238 """Gets the command args to generate a sine wav to play to odev. 239 240 @param channel: 0 for left, 1 for right; otherwize, mono. 241 @param odev: alsa output device. 242 @param freq: frequency of the generated sine tone. 243 @param duration: duration of the generated sine tone. 244 @param sample_size: output audio sample size. Default to 16. 245 """ 246 cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa', 247 odev, 'synth', str(duration)] 248 if channel == 0: 249 cmdargs += ['sine', str(freq), 'sine', '0'] 250 elif channel == 1: 251 cmdargs += ['sine', '0', 'sine', str(freq)] 252 else: 253 cmdargs += ['sine', str(freq)] 254 255 return cmdargs 256 257 def play_sine(channel, odev='default', freq=1000, duration=10, 258 sample_size=16): 259 """Generates a sine wave and plays to odev. 260 261 @param channel: 0 for left, 1 for right; otherwize, mono. 262 @param odev: alsa output device. 263 @param freq: frequency of the generated sine tone. 264 @param duration: duration of the generated sine tone. 265 @param sample_size: output audio sample size. Default to 16. 266 """ 267 cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size) 268 utils.system(' '.join(cmdargs)) 269 270 # Functions to compose customized sox command, execute it and process the 271 # output of sox command. 272 def get_sox_mixer_cmd(infile, channel, 273 num_channels=_DEFAULT_NUM_CHANNELS, 274 sox_format=_DEFAULT_SOX_FORMAT): 275 """Gets sox mixer command to reduce channel. 276 277 @param infile: Input file name. 278 @param channel: The selected channel to take effect. 279 @param num_channels: The number of total channels to test. 280 @param sox_format: Format to generate sox command. 281 """ 282 # Build up a pan value string for the sox command. 283 if channel == 0: 284 pan_values = '1' 285 else: 286 pan_values = '0' 287 for pan_index in range(1, num_channels): 288 if channel == pan_index: 289 pan_values = '%s%s' % (pan_values, ',1') 290 else: 291 pan_values = '%s%s' % (pan_values, ',0') 292 293 return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH, 294 sox_format, infile, sox_format, pan_values) 295 296 def sox_stat_output(infile, channel, 297 num_channels=_DEFAULT_NUM_CHANNELS, 298 sox_format=_DEFAULT_SOX_FORMAT): 299 """Executes sox stat command. 300 301 @param infile: Input file name. 302 @param channel: The selected channel. 303 @param num_channels: The number of total channels to test. 304 @param sox_format: Format to generate sox command. 305 306 @return The output of sox stat command 307 """ 308 sox_mixer_cmd = get_sox_mixer_cmd(infile, channel, 309 num_channels, sox_format) 310 stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format) 311 sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd) 312 return utils.system_output(sox_cmd, retain_output=True) 313 314 def get_audio_rms(sox_output): 315 """Gets the audio RMS value from sox stat output 316 317 @param sox_output: Output of sox stat command. 318 319 @return The RMS value parsed from sox stat output. 320 """ 321 for rms_line in sox_output.split('\n'): 322 m = _SOX_RMS_AMPLITUDE_RE.match(rms_line) 323 if m is not None: 324 return float(m.group(1)) 325 326 def get_rough_freq(sox_output): 327 """Gets the rough audio frequency from sox stat output 328 329 @param sox_output: Output of sox stat command. 330 331 @return The rough frequency value parsed from sox stat output. 332 """ 333 for rms_line in sox_output.split('\n'): 334 m = _SOX_ROUGH_FREQ_RE.match(rms_line) 335 if m is not None: 336 return int(m.group(1)) 337 338 def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD): 339 """Checks if the calculated RMS value is expected. 340 341 @param sox_output: The output from sox stat command. 342 @param sox_threshold: The threshold to test RMS value against. 343 344 @raises error.TestError if RMS amplitude can't be parsed. 345 @raises error.TestFail if the RMS amplitude of the recording isn't above 346 the threshold. 347 """ 348 rms_val = get_audio_rms(sox_output) 349 350 # In case we don't get a valid RMS value. 351 if rms_val is None: 352 raise error.TestError( 353 'Failed to generate an audio RMS value from playback.') 354 355 logging.info('Got audio RMS value of %f. Minimum pass is %f.', 356 rms_val, sox_threshold) 357 if rms_val < sox_threshold: 358 raise error.TestFail( 359 'Audio RMS value %f too low. Minimum pass is %f.' % 360 (rms_val, sox_threshold)) 361 362 def noise_reduce_file(in_file, noise_file, out_file, 363 sox_format=_DEFAULT_SOX_FORMAT): 364 """Runs the sox command to reduce noise. 365 366 Runs the sox command to noise-reduce in_file using the noise 367 profile from noise_file. 368 369 @param in_file: The file to noise reduce. 370 @param noise_file: The file containing the noise profile. 371 This can be created by recording silence. 372 @param out_file: The file contains the noise reduced sound. 373 @param sox_format: The sox format to generate sox command. 374 """ 375 prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH, 376 sox_format, noise_file) 377 reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' % 378 (SOX_PATH, sox_format, in_file, sox_format, out_file)) 379 utils.system('%s | %s' % (prof_cmd, reduce_cmd)) 380 381 def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND): 382 """Records a sample from the default input device. 383 384 @param tmpfile: The file to record to. 385 @param record_command: The command to record audio. 386 """ 387 utils.system('%s %s' % (record_command, tmpfile)) 388 389 def create_wav_file(wav_dir, prefix=""): 390 """Creates a unique name for wav file. 391 392 The created file name will be preserved in autotest result directory 393 for future analysis. 394 395 @param wav_dir: The directory of created wav file. 396 @param prefix: specified file name prefix. 397 """ 398 filename = "%s-%s.wav" % (prefix, time.time()) 399 return os.path.join(wav_dir, filename) 400 401 def run_in_parallel(*funs): 402 """Runs methods in parallel. 403 404 @param funs: methods to run. 405 """ 406 threads = [] 407 for f in funs: 408 t = threading.Thread(target=f) 409 t.start() 410 threads.append(t) 411 412 for t in threads: 413 t.join() 414 415 def loopback_test_channels(noise_file_name, wav_dir, 416 playback_callback=None, 417 check_recorded_callback=check_audio_rms, 418 preserve_test_file=True, 419 num_channels = _DEFAULT_NUM_CHANNELS, 420 record_callback=record_sample, 421 mix_callback=None): 422 """Tests loopback on all channels. 423 424 @param noise_file_name: Name of the file contains pre-recorded noise. 425 @param wav_dir: The directory of created wav file. 426 @param playback_callback: The callback to do the playback for 427 one channel. 428 @param record_callback: The callback to do the recording. 429 @param check_recorded_callback: The callback to check recorded file. 430 @param preserve_test_file: Retain the recorded files for future debugging. 431 @param num_channels: The number of total channels to test. 432 @param mix_callback: The callback to do on the one-channel file. 433 """ 434 for channel in xrange(num_channels): 435 record_file_name = create_wav_file(wav_dir, 436 "record-%d" % channel) 437 functions = [lambda: record_callback(record_file_name)] 438 439 if playback_callback: 440 functions.append(lambda: playback_callback(channel)) 441 442 if mix_callback: 443 mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel) 444 functions.append(lambda: mix_callback(mix_file_name)) 445 446 run_in_parallel(*functions) 447 448 if mix_callback: 449 sox_output_mix = sox_stat_output(mix_file_name, channel) 450 rms_val_mix = get_audio_rms(sox_output_mix) 451 logging.info('Got mixed audio RMS value of %f.', rms_val_mix) 452 453 sox_output_record = sox_stat_output(record_file_name, channel) 454 rms_val_record = get_audio_rms(sox_output_record) 455 logging.info('Got recorded audio RMS value of %f.', rms_val_record) 456 457 reduced_file_name = create_wav_file(wav_dir, 458 "reduced-%d" % channel) 459 noise_reduce_file(record_file_name, noise_file_name, 460 reduced_file_name) 461 462 sox_output_reduced = sox_stat_output(reduced_file_name, channel) 463 464 if not preserve_test_file: 465 os.unlink(reduced_file_name) 466 os.unlink(record_file_name) 467 if mix_callback: 468 os.unlink(mix_file_name) 469 470 check_recorded_callback(sox_output_reduced) 471 472 473 def get_channel_sox_stat( 474 input_audio, channel_index, channels=2, bits=16, rate=48000): 475 """Gets the sox stat info of the selected channel in the input audio file. 476 477 @param input_audio: The input audio file to be analyzed. 478 @param channel_index: The index of the channel to be analyzed. 479 (1 for the first channel). 480 @param channels: The number of channels in the input audio. 481 @param bits: The number of bits of each audio sample. 482 @param rate: The sampling rate. 483 """ 484 if channel_index <= 0 or channel_index > channels: 485 raise ValueError('incorrect channel_indexi: %d' % channel_index) 486 487 if channels == 1: 488 return sox_utils.get_stat( 489 input_audio, channels=channels, bits=bits, rate=rate) 490 491 p1 = cmd_utils.popen( 492 sox_utils.extract_channel_cmd( 493 input_audio, '-', channel_index, 494 channels=channels, bits=bits, rate=rate), 495 stdout=cmd_utils.PIPE) 496 p2 = cmd_utils.popen( 497 sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate), 498 stdin=p1.stdout, stderr=cmd_utils.PIPE) 499 stat_output = p2.stderr.read() 500 cmd_utils.wait_and_check_returncode(p1, p2) 501 return sox_utils.parse_stat_output(stat_output) 502 503 504 def get_rms(input_audio, channels=1, bits=16, rate=48000): 505 """Gets the RMS values of all channels of the input audio. 506 507 @param input_audio: The input audio file to be checked. 508 @param channels: The number of channels in the input audio. 509 @param bits: The number of bits of each audio sample. 510 @param rate: The sampling rate. 511 """ 512 stats = [get_channel_sox_stat( 513 input_audio, i + 1, channels=channels, bits=bits, 514 rate=rate) for i in xrange(channels)] 515 516 logging.info('sox stat: %s', [str(s) for s in stats]) 517 return [s.rms for s in stats] 518 519 520 def reduce_noise_and_get_rms( 521 input_audio, noise_file, channels=1, bits=16, rate=48000): 522 """Reduces noise in the input audio by the given noise file and then gets 523 the RMS values of all channels of the input audio. 524 525 @param input_audio: The input audio file to be analyzed. 526 @param noise_file: The noise file used to reduce noise in the input audio. 527 @param channels: The number of channels in the input audio. 528 @param bits: The number of bits of each audio sample. 529 @param rate: The sampling rate. 530 """ 531 with tempfile.NamedTemporaryFile() as reduced_file: 532 p1 = cmd_utils.popen( 533 sox_utils.noise_profile_cmd( 534 noise_file, '-', channels=channels, bits=bits, 535 rate=rate), 536 stdout=cmd_utils.PIPE) 537 p2 = cmd_utils.popen( 538 sox_utils.noise_reduce_cmd( 539 input_audio, reduced_file.name, '-', 540 channels=channels, bits=bits, rate=rate), 541 stdin=p1.stdout) 542 cmd_utils.wait_and_check_returncode(p1, p2) 543 return get_rms(reduced_file.name, channels, bits, rate) 544 545 546 def skip_devices_to_test(*boards): 547 """Devices to skip due to hardware or test compatibility issues. 548 549 @param boards: the boards to skip testing. 550 """ 551 # TODO(scottz): Remove this when crbug.com/220147 is fixed. 552 dut_board = utils.get_current_board() 553 if dut_board in boards: 554 raise error.TestNAError('This test is not available on %s' % dut_board) 555 556 557 def cras_rms_test_setup(): 558 """Setups for the cras_rms_tests. 559 560 To make sure the line_out-to-mic_in path is all green. 561 """ 562 # TODO(owenlin): Now, the nodes are choosed by chrome. 563 # We should do it here. 564 cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME) 565 cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME) 566 567 cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN) 568 569 cras_utils.set_system_mute(False) 570 cras_utils.set_capture_mute(False) 571 572 573 def generate_rms_postmortem(): 574 """Generates postmortem for rms tests.""" 575 try: 576 logging.info('audio postmortem report') 577 log_loopback_dongle_status() 578 logging.info(get_audio_diagnostics()) 579 except Exception: 580 logging.exception('Error while generating postmortem report') 581 582 583 def get_audio_diagnostics(): 584 """Gets audio diagnostic results. 585 586 @returns: a string containing diagnostic results. 587 588 """ 589 return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=cmd_utils.PIPE) 590 591 592 def get_max_cross_correlation(signal_a, signal_b): 593 """Gets max cross-correlation and best time delay of two signals. 594 595 Computes cross-correlation function between two 596 signals and gets the maximum value and time delay. 597 The steps includes: 598 1. Compute cross-correlation function of X and Y and get Cxy. 599 The correlation function Cxy is an array where Cxy[k] is the 600 cross product of X and Y when Y is delayed by k. 601 Refer to manual of numpy.correlate for detail of correlation. 602 2. Find the maximum value C_max and index C_index in Cxy. 603 3. Compute L2 norm of X and Y to get norm(X) and norm(Y). 604 4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation. 605 606 Max cross-correlation indicates the similarity of X and Y. The value 607 is 1 if X equals Y multiplied by a positive scalar. 608 The value is -1 if X equals Y multiplied by a negative scaler. 609 Any constant level shift will be regarded as distortion and will make 610 max cross-correlation value deviated from 1. 611 C_index is the best time delay of Y that make Y looks similar to X. 612 Refer to http://en.wikipedia.org/wiki/Cross-correlation. 613 614 @param signal_a: A list of numbers which contains the first signal. 615 @param signal_b: A list of numbers which contains the second signal. 616 617 @raises: ValueError if any number in signal_a or signal_b is not a float. 618 ValueError if norm of any array is less than _MINIMUM_NORM. 619 620 @returns: A tuple (correlation index, best delay). If there are more than 621 one best delay, just return the first one. 622 """ 623 def check_list_contains_float(numbers): 624 """Checks the elements in a list are all float. 625 626 @param numbers: A list of numbers. 627 628 @raises: ValueError if there is any element which is not a float 629 in the list. 630 """ 631 if any(not isinstance(x, float) for x in numbers): 632 raise ValueError('List contains number which is not a float') 633 634 check_list_contains_float(signal_a) 635 check_list_contains_float(signal_b) 636 637 norm_a = numpy.linalg.norm(signal_a) 638 norm_b = numpy.linalg.norm(signal_b) 639 logging.debug('norm_a: %f', norm_a) 640 logging.debug('norm_b: %f', norm_b) 641 if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM: 642 raise ValueError('No meaningful data as norm is too small.') 643 644 correlation = numpy.correlate(signal_a, signal_b, 'full') 645 max_correlation = max(correlation) 646 best_delays = [i for i, j in enumerate(correlation) if j == max_correlation] 647 if len(best_delays) > 1: 648 logging.warning('There are more than one best delay: %r', best_delays) 649 return max_correlation / (norm_a * norm_b), best_delays[0] 650 651 652 def trim_data(data, threshold=0): 653 """Trims a data by removing value that is too small in head and tail. 654 655 Removes elements in head and tail whose absolute value is smaller than 656 or equal to threshold. 657 E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) = 658 ([0.2, 0.3, 0.2], 2) 659 660 @param data: A list of numbers. 661 @param threshold: The threshold to compare against. 662 663 @returns: A tuple (trimmed_data, end_trimmed_length), where 664 end_trimmed_length is the length of original data being trimmed 665 from the end. 666 Returns ([], None) if there is no valid data. 667 """ 668 indice_valid = [ 669 i for i, j in enumerate(data) if abs(j) > threshold] 670 if not indice_valid: 671 logging.warning( 672 'There is no element with absolute value greater ' 673 'than threshold %f', threshold) 674 return [], None 675 logging.debug('Start and end of indice_valid: %d, %d', 676 indice_valid[0], indice_valid[-1]) 677 end_trimmed_length = len(data) - indice_valid[-1] - 1 678 logging.debug('Trimmed length in the end: %d', end_trimmed_length) 679 return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length) 680 681 682 def get_one_channel_correlation(test_data, golden_data): 683 """Gets max cross-correlation of test_data and golden_data. 684 685 Trims test data and compute the max cross-correlation against golden_data. 686 Signal can be trimmed because those zero values in the head and tail of 687 a signal will not affect correlation computation. 688 689 @param test_data: A list containing the data to compare against golden data. 690 @param golden_data: A list containing the golden data. 691 692 @returns: A tuple (max cross-correlation, best_delay) if data is valid. 693 Otherwise returns (None, None). Refer to docstring of 694 get_max_cross_correlation. 695 """ 696 trimmed_test_data, end_trimmed_length = trim_data(test_data) 697 698 def to_float(samples): 699 """Casts elements in the list to float. 700 701 @param samples: A list of numbers. 702 703 @returns: A list of original numbers casted to float. 704 """ 705 samples_float = [float(x) for x in samples] 706 return samples_float 707 708 max_cross_correlation, best_delay = get_max_cross_correlation( 709 to_float(golden_data), 710 to_float(trimmed_test_data)) 711 712 # The reason to add back the trimmed length in the end. 713 # E.g.: 714 # golden data: 715 # 716 # |-----------vvvv----------------| vvvv is the signal of interest. 717 # a b 718 # 719 # test data: 720 # 721 # |---x----vvvv--------x----------------| x is the place to trim. 722 # c d e f 723 # 724 # trimmed test data: 725 # 726 # |----vvvv--------| 727 # d e 728 # 729 # The first output of cross correlation computation : 730 # 731 # |-----------vvvv----------------| 732 # a b 733 # 734 # |----vvvv--------| 735 # d e 736 # 737 # The largest output of cross correlation computation happens at 738 # delay a + e. 739 # 740 # |-----------vvvv----------------| 741 # a b 742 # 743 # |----vvvv--------| 744 # d e 745 # 746 # Cross correlation starts computing by aligning the last sample 747 # of the trimmed test data to the first sample of golden data. 748 # The best delay calculated from trimmed test data and golden data 749 # cross correlation is e + a. But the real best delay that should be 750 # identical on two channel should be e + a + f. 751 # So we need to add back the length being trimmed in the end. 752 753 if max_cross_correlation: 754 return max_cross_correlation, best_delay + end_trimmed_length 755 else: 756 return None, None 757 758 759 def compare_one_channel_correlation(test_data, golden_data, parameters): 760 """Compares two one-channel data by correlation. 761 762 @param test_data: A list containing the data to compare against golden data. 763 @param golden_data: A list containing the golden data. 764 @param parameters: A dict containing parameters for method. 765 766 @returns: A dict containing: 767 index: The index of similarity where 1 means they are different 768 only by a positive scale. 769 best_delay: The best delay of test data in relative to golden 770 data. 771 equal: A bool containing comparing result. 772 """ 773 if 'correlation_threshold' in parameters: 774 threshold = parameters['correlation_threshold'] 775 else: 776 threshold = _CORRELATION_INDEX_THRESHOLD 777 778 result_dict = dict() 779 max_cross_correlation, best_delay = get_one_channel_correlation( 780 test_data, golden_data) 781 result_dict['index'] = max_cross_correlation 782 result_dict['best_delay'] = best_delay 783 result_dict['equal'] = True if ( 784 max_cross_correlation and 785 max_cross_correlation > threshold) else False 786 logging.debug('result_dict: %r', result_dict) 787 return result_dict 788 789 790 def get_one_channel_stat(data, data_format): 791 """Gets statistic information of data. 792 793 @param data: A list containing one channel data. 794 @param data_format: A dict containing data format of data. 795 796 @return: The sox stat parsed result. An object containing 797 sameple_count: An int. Samples read. 798 length: A float. Length in seconds. 799 rms: A float. RMS amplitude. 800 rough_frequency: A float. Rough frequency. 801 """ 802 if not data: 803 raise ValueError('Data is empty. Can not get stat') 804 raw_data = audio_data.AudioRawData( 805 binary=None, channel=1, 806 sample_format=data_format['sample_format']) 807 raw_data.copy_channel_data([data]) 808 with tempfile.NamedTemporaryFile() as raw_data_file: 809 raw_data_path = raw_data_file.name 810 raw_data.write_to_file(raw_data_path) 811 812 bits = 8 * (audio_data.SAMPLE_FORMATS[ 813 data_format['sample_format']]['size_bytes']) 814 stat = sox_utils.get_stat(raw_data_path, channels=1, bits=bits, 815 rate=data_format['rate']) 816 return stat 817 818 819 def compare_one_channel_frequency(test_data, test_data_format, 820 golden_data, golden_data_format): 821 """Compares two one-channel data by frequency. 822 823 @param test_data: A list containing the data to compare against golden data. 824 @param test_data_format: A dict containing data format of test data. 825 @param golden_data: A list containing the golden data. 826 @param golden_data_format: A dict containing data format of golden data. 827 828 @returns: A dict containing: 829 test_data_frequency: test data frequency. 830 golden_data_frequency: golden data frequency. 831 equal: A bool containing comparing result. 832 833 @raises: ValueError if the test data RMS is too small to be meaningful. 834 835 """ 836 result_dict = dict() 837 golden_data_stat = get_one_channel_stat(golden_data, golden_data_format) 838 logging.info('Get golden data one channel stat: %s', golden_data_stat) 839 test_data_stat = get_one_channel_stat(test_data, test_data_format) 840 logging.info('Get test data one channel stat: %s', test_data_stat) 841 842 result_dict['golden_data_frequency'] = golden_data_stat.rough_frequency 843 result_dict['test_data_frequency'] = test_data_stat.rough_frequency 844 result_dict['equal'] = True if ( 845 abs(result_dict['test_data_frequency'] - 846 result_dict['golden_data_frequency']) < _FREQUENCY_DIFF_THRESHOLD 847 ) else False 848 logging.debug('result_dict: %r', result_dict) 849 if test_data_stat.rms < _MEANINGFUL_RMS_THRESHOLD: 850 raise ValueError('Recorded RMS %f is too small to be meaningful.', 851 test_data_stat.rms) 852 return result_dict 853 854 855 def compare_one_channel_data(test_data, test_data_format, 856 golden_data, golden_data_format, method, 857 parameters): 858 """Compares two one-channel data. 859 860 @param test_data: A list containing the data to compare against golden data. 861 @param test_data_format: The data format of test data. 862 @param golden_data: A list containing the golden data. 863 @param golden_data_format: The data format of golden data. 864 @param method: The comparing method. Currently only 'correlation' is 865 supported. 866 @param parameters: A dict containing parameters for method. 867 868 @returns: A dict containing: 869 index: The index of similarity where 1 means they are different 870 only by a positive scale. 871 best_delay: The best delay of test data in relative to golden 872 data. 873 equal: A bool containing comparing result. 874 875 @raises: NotImplementedError if method is not supported. 876 """ 877 if method == 'correlation': 878 return compare_one_channel_correlation(test_data, golden_data, 879 parameters) 880 if method == 'frequency': 881 return compare_one_channel_frequency( 882 test_data, test_data_format, golden_data, golden_data_format) 883 raise NotImplementedError('method %s is not implemented' % method) 884 885 886 def compare_data(golden_data_binary, golden_data_format, 887 test_data_binary, test_data_format, 888 channel_map, method, parameters=None): 889 """Compares two raw data. 890 891 @param golden_data_binary: The binary containing golden data. 892 @param golden_data_format: The data format of golden data. 893 @param test_data_binary: The binary containing test data. 894 @param test_data_format: The data format of test data. 895 @param channel_map: A list containing channel mapping. 896 E.g. [1, 0, None, None, None, None, None, None] means 897 channel 0 of test data should map to channel 1 of 898 golden data. Channel 1 of test data should map to 899 channel 0 of golden data. Channel 2 to 7 of test data 900 should be skipped. 901 @param method: The method to compare data. Use 'correlation' to compare 902 general data. Use 'frequency' to compare data containing 903 sine wave. 904 905 @param parameters: A dict containing parameters for method, if needed. 906 907 @returns: A boolean for compare result. 908 909 @raises: NotImplementedError if file type is not raw. 910 NotImplementedError if sampling rates of two data are not the same. 911 """ 912 if parameters is None: 913 parameters = dict() 914 915 if (golden_data_format['file_type'] != 'raw' or 916 test_data_format['file_type'] != 'raw'): 917 raise NotImplementedError('Only support raw data in compare_data.') 918 if (golden_data_format['rate'] != test_data_format['rate']): 919 raise NotImplementedError( 920 'Only support comparing data with the same sampling rate') 921 golden_data = audio_data.AudioRawData( 922 binary=golden_data_binary, 923 channel=golden_data_format['channel'], 924 sample_format=golden_data_format['sample_format']) 925 test_data = audio_data.AudioRawData( 926 binary=test_data_binary, 927 channel=test_data_format['channel'], 928 sample_format=test_data_format['sample_format']) 929 compare_results = [] 930 for test_channel, golden_channel in enumerate(channel_map): 931 if golden_channel is None: 932 logging.info('Skipped channel %d', test_channel) 933 continue 934 test_data_one_channel = test_data.channel_data[test_channel] 935 golden_data_one_channel = golden_data.channel_data[golden_channel] 936 result_dict = dict(test_channel=test_channel, 937 golden_channel=golden_channel) 938 result_dict.update( 939 compare_one_channel_data( 940 test_data_one_channel, test_data_format, 941 golden_data_one_channel, golden_data_format, method, 942 parameters)) 943 compare_results.append(result_dict) 944 logging.info('compare_results: %r', compare_results) 945 return_value = False if not compare_results else True 946 for result in compare_results: 947 if not result['equal']: 948 logging.error( 949 'Failed on test channel %d and golden channel %d', 950 result['test_channel'], result['golden_channel']) 951 return_value = False 952 # Also checks best delay are exactly the same. 953 if method == 'correlation': 954 best_delays = set([result['best_delay'] for result in compare_results]) 955 if len(best_delays) > 1: 956 logging.error('There are more than one best delay.') 957 return_value = False 958 return return_value 959 960 961 class _base_rms_test(test.test): 962 """Base class for all rms_test """ 963 964 def postprocess(self): 965 super(_base_rms_test, self).postprocess() 966 967 # Sum up the number of failed constraints in each iteration 968 if sum(len(x) for x in self.failed_constraints): 969 generate_rms_postmortem() 970 971 972 class chrome_rms_test(_base_rms_test): 973 """Base test class for audio RMS test with Chrome. 974 975 The chrome instance can be accessed by self.chrome. 976 """ 977 def warmup(self): 978 skip_devices_to_test('x86-mario') 979 super(chrome_rms_test, self).warmup() 980 981 # Not all client of this file using telemetry. 982 # Just do the import here for those who really need it. 983 from autotest_lib.client.common_lib.cros import chrome 984 985 self.chrome = chrome.Chrome() 986 987 # The audio configuration could be changed when we 988 # restart chrome. 989 try: 990 cras_rms_test_setup() 991 except Exception: 992 self.chrome.browser.Close() 993 raise 994 995 996 def cleanup(self, *args): 997 try: 998 self.chrome.browser.Close() 999 finally: 1000 super(chrome_rms_test, self).cleanup() 1001 1002 class cras_rms_test(_base_rms_test): 1003 """Base test class for CRAS audio RMS test.""" 1004 1005 def warmup(self): 1006 skip_devices_to_test('x86-mario') 1007 super(cras_rms_test, self).warmup() 1008 cras_rms_test_setup() 1009 1010 1011 def alsa_rms_test_setup(): 1012 """Setup for alsa_rms_test. 1013 1014 Different boards/chipsets have different set of mixer controls. Even 1015 controls that have the same name on different boards might have different 1016 capabilities. The following is a general idea to setup a given class of 1017 boards, and some specialized setup for certain boards. 1018 """ 1019 card_id = alsa_utils.get_first_soundcard_with_control('Mic Jack', 'Mic') 1020 arch = utils.get_arch() 1021 board = utils.get_board() 1022 uses_max98090 = os.path.exists('/sys/module/snd_soc_max98090') 1023 if board in ['daisy_spring', 'daisy_skate']: 1024 # The MIC controls of the boards do not support dB syntax. 1025 alsa_utils.mixer_cmd(card_id, 1026 'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME) 1027 alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_MAX_VOLUME) 1028 alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_MAX_VOLUME) 1029 elif arch in ['armv7l', 'aarch64'] or uses_max98090: 1030 # ARM platforms or Intel platforms that uses max98090 codec driver. 1031 alsa_utils.mixer_cmd(card_id, 1032 'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME) 1033 alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_CAPTURE_GAIN) 1034 alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_CAPTURE_GAIN) 1035 else: 1036 # The rest of Intel platforms. 1037 alsa_utils.mixer_cmd(card_id, 'sset Master ' + _DEFAULT_ALSA_MAX_VOLUME) 1038 alsa_utils.mixer_cmd(card_id, 1039 'sset Capture ' + _DEFAULT_ALSA_CAPTURE_GAIN) 1040 1041 1042 class alsa_rms_test(_base_rms_test): 1043 """Base test class for ALSA audio RMS test.""" 1044 1045 def warmup(self): 1046 skip_devices_to_test('x86-mario') 1047 super(alsa_rms_test, self).warmup() 1048 1049 alsa_rms_test_setup() 1050