Home | History | Annotate | Download | only in chameleon
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """This module provides the test utilities for audio tests using chameleon."""
      6 
      7 # TODO (cychiang) Move test utilities from chameleon_audio_helpers
      8 # to this module.
      9 
     10 import logging
     11 import multiprocessing
     12 import os
     13 import time
     14 from contextlib import contextmanager
     15 
     16 from autotest_lib.client.common_lib import error
     17 from autotest_lib.client.cros import constants
     18 from autotest_lib.client.cros.audio import audio_analysis
     19 from autotest_lib.client.cros.audio import audio_data
     20 
     21 def check_audio_nodes(audio_facade, audio_nodes):
     22     """Checks the node selected by Cros device is correct.
     23 
     24     @param audio_facade: A RemoteAudioFacade to access audio functions on
     25                          Cros device.
     26 
     27     @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
     28                         expected selected output and input nodes.
     29 
     30     @raises: error.TestFail if the nodes selected by Cros device are not expected.
     31 
     32     """
     33     curr_out_nodes, curr_in_nodes = audio_facade.get_selected_node_types()
     34     out_audio_nodes, in_audio_nodes = audio_nodes
     35     if (in_audio_nodes != None and
     36         sorted(curr_in_nodes) != sorted(in_audio_nodes)):
     37         raise error.TestFail('Wrong input node(s) selected %s '
     38                 'instead %s!' % (str(curr_in_nodes), str(in_audio_nodes)))
     39     if (out_audio_nodes != None and
     40         sorted(curr_out_nodes) != sorted(out_audio_nodes)):
     41         raise error.TestFail('Wrong output node(s) selected %s '
     42                 'instead %s!' % (str(curr_out_nodes), str(out_audio_nodes)))
     43 
     44 
     45 def check_plugged_nodes(audio_facade, audio_nodes):
     46     """Checks the nodes that are currently plugged on Cros device are correct.
     47 
     48     @param audio_facade: A RemoteAudioFacade to access audio functions on
     49                          Cros device.
     50 
     51     @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
     52                         expected plugged output and input nodes.
     53 
     54     @raises: error.TestFail if the plugged nodes on Cros device are not expected.
     55 
     56     """
     57     curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
     58     out_audio_nodes, in_audio_nodes = audio_nodes
     59     if (in_audio_nodes != None and
     60         sorted(curr_in_nodes) != sorted(in_audio_nodes)):
     61         raise error.TestFail('Wrong input node(s) plugged %s '
     62                 'instead %s!' % (str(curr_in_nodes), str(in_audio_nodes)))
     63     if (out_audio_nodes != None and
     64         sorted(curr_out_nodes) != sorted(out_audio_nodes)):
     65         raise error.TestFail('Wrong output node(s) plugged %s '
     66                 'instead %s!' % (str(curr_out_nodes), str(out_audio_nodes)))
     67 
     68 
     69 def bluetooth_nodes_plugged(audio_facade):
     70     """Checks bluetooth nodes are plugged.
     71 
     72     @param audio_facade: A RemoteAudioFacade to access audio functions on
     73                          Cros device.
     74 
     75     @raises: error.TestFail if either input or output bluetooth node is
     76              not plugged.
     77 
     78     """
     79     curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
     80     return 'BLUETOOTH' in curr_out_nodes and 'BLUETOOTH' in curr_in_nodes
     81 
     82 
     83 def _get_board_name(host):
     84     """Gets the board name.
     85 
     86     @param host: The CrosHost object.
     87 
     88     @returns: The board name.
     89 
     90     """
     91     return host.get_board().split(':')[1]
     92 
     93 
     94 def correction_plug_unplug_for_audio(host, port):
     95     """Plugs/unplugs several times for Cros device to detect audio.
     96 
     97     For issue crbug.com/450101, Exynos HDMI driver has problem recognizing
     98     HDMI audio, while display can be detected. Do several plug/unplug and wait
     99     as a workaround. Note that port will be in unplugged state in the end if
    100     extra plug/unplug is needed.
    101 
    102     @param host: A CrosHost object.
    103     @param port: A ChameleonVideoInput object.
    104 
    105     """
    106     board = _get_board_name(host)
    107     if board in ['peach_pit', 'peach_pi', 'daisy', 'daisy_spring',
    108                  'daisy_skate']:
    109         logging.info('Need extra plug/unplug on board %s', board)
    110         for _ in xrange(3):
    111             port.plug()
    112             time.sleep(3)
    113             port.unplug()
    114             time.sleep(3)
    115 
    116 
    117 def has_internal_speaker(host):
    118     """Checks if the Cros device has speaker.
    119 
    120     @param host: The CrosHost object.
    121 
    122     @returns: True if Cros device has internal speaker. False otherwise.
    123 
    124     """
    125     board_name = _get_board_name(host)
    126     if host.get_board_type() == 'CHROMEBOX' and board_name != 'stumpy':
    127         logging.info('Board %s does not have speaker.', board_name)
    128         return False
    129     return True
    130 
    131 
    132 def has_internal_microphone(host):
    133     """Checks if the Cros device has internal microphone.
    134 
    135     @param host: The CrosHost object.
    136 
    137     @returns: True if Cros device has internal microphone. False otherwise.
    138 
    139     """
    140     board_name = _get_board_name(host)
    141     if host.get_board_type() == 'CHROMEBOX':
    142         logging.info('Board %s does not have internal microphone.', board_name)
    143         return False
    144     return True
    145 
    146 
    147 def suspend_resume(host, suspend_time_secs, resume_network_timeout_secs=50):
    148     """Performs the suspend/resume on Cros device.
    149 
    150     @param suspend_time_secs: Time in seconds to let Cros device suspend.
    151     @resume_network_timeout_secs: Time in seconds to let Cros device resume and
    152                                   obtain network.
    153     """
    154     def action_suspend():
    155         """Calls the host method suspend."""
    156         host.suspend(suspend_time=suspend_time_secs)
    157 
    158     boot_id = host.get_boot_id()
    159     proc = multiprocessing.Process(target=action_suspend)
    160     logging.info("Suspending...")
    161     proc.daemon = True
    162     proc.start()
    163     host.test_wait_for_sleep(suspend_time_secs / 3)
    164     logging.info("DUT suspended! Waiting to resume...")
    165     host.test_wait_for_resume(
    166             boot_id, suspend_time_secs + resume_network_timeout_secs)
    167     logging.info("DUT resumed!")
    168 
    169 
    170 def dump_cros_audio_logs(host, audio_facade, directory, suffix=''):
    171     """Dumps logs for audio debugging from Cros device.
    172 
    173     @param host: The CrosHost object.
    174     @param audio_facade: A RemoteAudioFacade to access audio functions on
    175                          Cros device.
    176     @directory: The directory to dump logs.
    177 
    178     """
    179     def get_file_path(name):
    180         """Gets file path to dump logs.
    181 
    182         @param name: The file name.
    183 
    184         @returns: The file path with an optional suffix.
    185 
    186         """
    187         file_name = '%s.%s' % (name, suffix) if suffix else name
    188         file_path = os.path.join(directory, file_name)
    189         return file_path
    190 
    191     audio_facade.dump_diagnostics(get_file_path('audio_diagnostics.txt'))
    192 
    193     host.get_file('/var/log/messages', get_file_path('messages'))
    194 
    195     host.get_file(constants.MULTIMEDIA_XMLRPC_SERVER_LOG_FILE,
    196                   get_file_path('multimedia_xmlrpc_server.log'))
    197 
    198 
    199 @contextmanager
    200 def monitor_no_nodes_changed(audio_facade, callback=None):
    201     """Context manager to monitor nodes changed signal on Cros device.
    202 
    203     Starts the counter in the beginning. Stops the counter in the end to make
    204     sure there is no NodesChanged signal during the try block.
    205 
    206     E.g. with monitor_no_nodes_changed(audio_facade):
    207              do something on playback/recording
    208 
    209     @param audio_facade: A RemoteAudioFacade to access audio functions on
    210                          Cros device.
    211     @param fail_callback: The callback to call before raising TestFail
    212                           when there is unexpected NodesChanged signals.
    213 
    214     @raises: error.TestFail if there is NodesChanged signal on
    215              Cros device during the context.
    216 
    217     """
    218     try:
    219         audio_facade.start_counting_signal('NodesChanged')
    220         yield
    221     finally:
    222         count = audio_facade.stop_counting_signal()
    223         if count:
    224             message = 'Got %d unexpected NodesChanged signal' % count
    225             logging.error(message)
    226             if callback:
    227                 callback()
    228             raise error.TestFail(message)
    229 
    230 
    231 # The second dominant frequency should have energy less than -26dB of the
    232 # first dominant frequency in the spectrum.
    233 DEFAULT_SECOND_PEAK_RATIO = 0.05
    234 
    235 # Tolerate more for bluetooth audio using HSP.
    236 HSP_SECOND_PEAK_RATIO = 0.2
    237 
    238 # The deviation of estimated dominant frequency from golden frequency.
    239 DEFAULT_FREQUENCY_DIFF_THRESHOLD = 5
    240 
    241 def check_recorded_frequency(
    242         golden_file, recorder,
    243         second_peak_ratio=DEFAULT_SECOND_PEAK_RATIO,
    244         frequency_diff_threshold=DEFAULT_FREQUENCY_DIFF_THRESHOLD,
    245         ignore_frequencies=None, check_anomaly=False):
    246     """Checks if the recorded data contains sine tone of golden frequency.
    247 
    248     @param golden_file: An AudioTestData object that serves as golden data.
    249     @param recorder: An AudioWidget used in the test to record data.
    250     @param second_peak_ratio: The test fails when the second dominant
    251                               frequency has coefficient larger than this
    252                               ratio of the coefficient of first dominant
    253                               frequency.
    254     @param frequency_diff_threshold: The maximum difference between estimated
    255                                      frequency of test signal and golden
    256                                      frequency. This value should be small for
    257                                      signal passed through line.
    258     @param ignore_frequencies: A list of frequencies to be ignored. The
    259                                component in the spectral with frequency too
    260                                close to the frequency in the list will be
    261                                ignored. The comparison of frequencies uses
    262                                frequency_diff_threshold as well.
    263     @param check_anomaly: True to check anomaly in the signal.
    264 
    265     @raises error.TestFail if the recorded data does not contain sine tone of
    266             golden frequency.
    267 
    268     """
    269     data_format = recorder.data_format
    270     recorded_data = audio_data.AudioRawData(
    271             binary=recorder.get_binary(),
    272             channel=data_format['channel'],
    273             sample_format=data_format['sample_format'])
    274 
    275     errors = []
    276 
    277     for test_channel, golden_channel in enumerate(recorder.channel_map):
    278         if golden_channel is None:
    279             logging.info('Skipped channel %d', test_channel)
    280             continue
    281 
    282         signal = recorded_data.channel_data[test_channel]
    283         saturate_value = audio_data.get_maximum_value_from_sample_format(
    284                 data_format['sample_format'])
    285         normalized_signal = audio_analysis.normalize_signal(
    286                 signal, saturate_value)
    287         spectral = audio_analysis.spectral_analysis(
    288                 normalized_signal, data_format['rate'])
    289 
    290         if not spectral:
    291             errors.append(
    292                     'Channel %d: Can not find dominant frequency.' %
    293                             test_channel)
    294 
    295         golden_frequency = golden_file.frequencies[golden_channel]
    296         logging.debug('Checking channel %s spectral %s against frequency %s',
    297                 test_channel, spectral, golden_frequency)
    298 
    299         dominant_frequency = spectral[0][0]
    300 
    301         if (abs(dominant_frequency - golden_frequency) >
    302             frequency_diff_threshold):
    303             errors.append(
    304                     'Channel %d: Dominant frequency %s is away from golden %s' %
    305                     (test_channel, dominant_frequency, golden_frequency))
    306 
    307         if check_anomaly:
    308             detected_anomaly = audio_analysis.anomaly_detection(
    309                     signal=normalized_signal,
    310                     rate=data_format['rate'],
    311                     freq=golden_frequency)
    312             if detected_anomaly:
    313                 errors.append(
    314                         'Channel %d: Detect anomaly near these time: %s' %
    315                         (test_channel, detected_anomaly))
    316             else:
    317                 logging.info(
    318                         'Channel %d: Quality is good as there is no anomaly',
    319                         test_channel)
    320 
    321 
    322         def should_be_ignored(frequency):
    323             """Checks if frequency is close to any frequency in ignore list.
    324 
    325             @param frequency: The frequency to be tested.
    326 
    327             @returns: True if the frequency should be ignored. False otherwise.
    328 
    329             """
    330             for ignore_frequency in ignore_frequencies:
    331                 if (abs(frequency - ignore_frequency) <
    332                     frequency_diff_threshold):
    333                     logging.debug('Ignore frequency: %s', frequency)
    334                     return True
    335 
    336         # Filter out the frequencies to be ignored.
    337         if ignore_frequencies:
    338             spectral = [x for x in spectral if not should_be_ignored(x[0])]
    339 
    340         if len(spectral) > 1:
    341             first_coeff = spectral[0][1]
    342             second_coeff = spectral[1][1]
    343             if second_coeff > first_coeff * second_peak_ratio:
    344                 errors.append(
    345                         'Channel %d: Found large second dominant frequencies: '
    346                         '%s' % (test_channel, spectral))
    347 
    348     if errors:
    349         raise error.TestFail(', '.join(errors))
    350