Home | History | Annotate | Download | only in audio
      1 # Copyright (c) 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """This module provides cras audio utilities."""
      6 
      7 import logging
      8 import re
      9 
     10 from autotest_lib.client.cros.audio import cmd_utils
     11 
     12 _CRAS_TEST_CLIENT = '/usr/bin/cras_test_client'
     13 
     14 
     15 class CrasUtilsError(Exception):
     16     pass
     17 
     18 
     19 def playback(blocking=True, *args, **kargs):
     20     """A helper function to execute the playback_cmd.
     21 
     22     @param blocking: Blocks this call until playback finishes.
     23     @param args: args passed to playback_cmd.
     24     @param kargs: kargs passed to playback_cmd.
     25 
     26     @returns: The process running the playback command. Note that if the
     27               blocking parameter is true, this will return a finished process.
     28     """
     29     process = cmd_utils.popen(playback_cmd(*args, **kargs))
     30     if blocking:
     31         cmd_utils.wait_and_check_returncode(process)
     32     return process
     33 
     34 
     35 def capture(*args, **kargs):
     36     """A helper function to execute the capture_cmd.
     37 
     38     @param args: args passed to capture_cmd.
     39     @param kargs: kargs passed to capture_cmd.
     40 
     41     """
     42     cmd_utils.execute(capture_cmd(*args, **kargs))
     43 
     44 
     45 def playback_cmd(playback_file, block_size=None, duration=None,
     46                  channels=2, rate=48000):
     47     """Gets a command to playback a file with given settings.
     48 
     49     @param playback_file: the name of the file to play. '-' indicates to
     50                           playback raw audio from the stdin.
     51     @param block_size: the number of frames per callback(dictates latency).
     52     @param duration: seconds to playback
     53     @param channels: number of channels.
     54     @param rate: the sampling rate
     55 
     56     @returns: The command args put in a list of strings.
     57 
     58     """
     59     args = [_CRAS_TEST_CLIENT]
     60     args += ['--playback_file', playback_file]
     61     if block_size is not None:
     62         args += ['--block_size', str(block_size)]
     63     if duration is not None:
     64         args += ['--duration', str(duration)]
     65     args += ['--num_channels', str(channels)]
     66     args += ['--rate', str(rate)]
     67     return args
     68 
     69 
     70 def capture_cmd(
     71         capture_file, block_size=None, duration=10, channels=1, rate=48000):
     72     """Gets a command to capture the audio into the file with given settings.
     73 
     74     @param capture_file: the name of file the audio to be stored in.
     75     @param block_size: the number of frames per callback(dictates latency).
     76     @param duration: seconds to record. If it is None, duration is not set,
     77                      and command will keep capturing audio until it is
     78                      terminated.
     79     @param channels: number of channels.
     80     @param rate: the sampling rate.
     81 
     82     @returns: The command args put in a list of strings.
     83 
     84     """
     85     args = [_CRAS_TEST_CLIENT]
     86     args += ['--capture_file', capture_file]
     87     if block_size is not None:
     88         args += ['--block_size', str(block_size)]
     89     if duration is not None:
     90         args += ['--duration', str(duration)]
     91     args += ['--num_channels', str(channels)]
     92     args += ['--rate', str(rate)]
     93     return args
     94 
     95 
     96 def listen_cmd(
     97         capture_file, block_size=None, duration=10, channels=1, rate=48000):
     98     """Gets a command to listen on hotword and record audio into the file with
     99        given settings.
    100 
    101     @param capture_file: the name of file the audio to be stored in.
    102     @param block_size: the number of frames per callback(dictates latency).
    103     @param duration: seconds to record. If it is None, duration is not set,
    104                      and command will keep capturing audio until it is
    105                      terminated.
    106     @param channels: number of channels.
    107     @param rate: the sampling rate.
    108 
    109     @returns: The command args put in a list of strings.
    110 
    111     """
    112     args = [_CRAS_TEST_CLIENT]
    113     args += ['--listen_for_hotword', capture_file]
    114     if block_size is not None:
    115         args += ['--block_size', str(block_size)]
    116     if duration is not None:
    117         args += ['--duration', str(duration)]
    118     args += ['--num_channels', str(channels)]
    119     args += ['--rate', str(rate)]
    120     return args
    121 
    122 
    123 def loopback(*args, **kargs):
    124     """A helper function to execute loopback_cmd.
    125 
    126     @param args: args passed to loopback_cmd.
    127     @param kargs: kargs passed to loopback_cmd.
    128 
    129     """
    130 
    131     cmd_utils.execute(loopback_cmd(*args, **kargs))
    132 
    133 
    134 def loopback_cmd(output_file, duration=10, channels=2, rate=48000):
    135     """Gets a command to record the loopback.
    136 
    137     @param output_file: The name of the file the loopback to be stored in.
    138     @param channels: The number of channels of the recorded audio.
    139     @param duration: seconds to record.
    140     @param rate: the sampling rate.
    141 
    142     @returns: The command args put in a list of strings.
    143 
    144     """
    145     args = [_CRAS_TEST_CLIENT]
    146     args += ['--loopback_file', output_file]
    147     args += ['--duration_seconds', str(duration)]
    148     args += ['--num_channels', str(channels)]
    149     args += ['--rate', str(rate)]
    150     return args
    151 
    152 
    153 def get_cras_nodes_cmd():
    154     """Gets a command to query the nodes from Cras.
    155 
    156     @returns: The command to query nodes information from Cras using dbus-send.
    157 
    158     """
    159     return ('dbus-send --system --type=method_call --print-reply '
    160             '--dest=org.chromium.cras /org/chromium/cras '
    161             'org.chromium.cras.Control.GetNodes')
    162 
    163 
    164 def set_system_volume(volume):
    165     """Set the system volume.
    166 
    167     @param volume: the system output vlume to be set(0 - 100).
    168 
    169     """
    170     get_cras_control_interface().SetOutputVolume(volume)
    171 
    172 
    173 def set_node_volume(node_id, volume):
    174     """Set the volume of the given output node.
    175 
    176     @param node_id: the id of the output node to be set the volume.
    177     @param volume: the volume to be set(0-100).
    178 
    179     """
    180     get_cras_control_interface().SetOutputNodeVolume(node_id, volume)
    181 
    182 
    183 def set_capture_gain(gain):
    184     """Set the system capture gain.
    185 
    186     @param gain the capture gain in db*100 (100 = 1dB)
    187 
    188     """
    189     get_cras_control_interface().SetInputGain(gain)
    190 
    191 
    192 def get_cras_control_interface(private=False):
    193     """Gets Cras DBus control interface.
    194 
    195     @param private: Set to True to use a new instance for dbus.SystemBus
    196                     instead of the shared instance.
    197 
    198     @returns: A dBus.Interface object with Cras Control interface.
    199 
    200     @raises: ImportError if this is not called on Cros device.
    201 
    202     """
    203     try:
    204         import dbus
    205     except ImportError, e:
    206         logging.exception(
    207                 'Can not import dbus: %s. This method should only be '
    208                 'called on Cros device.', e)
    209         raise
    210     bus = dbus.SystemBus(private=private)
    211     cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras')
    212     return dbus.Interface(cras_object, 'org.chromium.cras.Control')
    213 
    214 
    215 def get_cras_nodes():
    216     """Gets nodes information from Cras.
    217 
    218     @returns: A dict containing information of each node.
    219 
    220     """
    221     return get_cras_control_interface().GetNodes()
    222 
    223 
    224 def get_selected_nodes():
    225     """Gets selected output nodes and input nodes.
    226 
    227     @returns: A tuple (output_nodes, input_nodes) where each
    228               field is a list of selected node IDs returned from Cras DBus API.
    229               Note that there may be multiple output/input nodes being selected
    230               at the same time.
    231 
    232     """
    233     output_nodes = []
    234     input_nodes = []
    235     nodes = get_cras_nodes()
    236     for node in nodes:
    237         if node['Active']:
    238             if node['IsInput']:
    239                 input_nodes.append(node['Id'])
    240             else:
    241                 output_nodes.append(node['Id'])
    242     return (output_nodes, input_nodes)
    243 
    244 
    245 def set_selected_output_node_volume(volume):
    246     """Sets the selected output node volume.
    247 
    248     @param volume: the volume to be set (0-100).
    249 
    250     """
    251     selected_output_node_ids, _ = get_selected_nodes()
    252     for node_id in selected_output_node_ids:
    253         set_node_volume(node_id, volume)
    254 
    255 
    256 def get_active_stream_count():
    257     """Gets the number of active streams.
    258 
    259     @returns: The number of active streams.
    260 
    261     """
    262     return int(get_cras_control_interface().GetNumberOfActiveStreams())
    263 
    264 
    265 def set_system_mute(is_mute):
    266     """Sets the system mute switch.
    267 
    268     @param is_mute: Set True to mute the system playback.
    269 
    270     """
    271     get_cras_control_interface().SetOutputMute(is_mute)
    272 
    273 
    274 def set_capture_mute(is_mute):
    275     """Sets the capture mute switch.
    276 
    277     @param is_mute: Set True to mute the capture.
    278 
    279     """
    280     get_cras_control_interface().SetInputMute(is_mute)
    281 
    282 
    283 def node_type_is_plugged(node_type, nodes_info):
    284     """Determine if there is any node of node_type plugged.
    285 
    286     This method is used in the AudioLoopbackDongleLabel class, where the
    287     call is executed on autotest server. Use get_cras_nodes instead if
    288     the call can be executed on Cros device.
    289 
    290     Since Cras only reports the plugged node in GetNodes, we can
    291     parse the return value to see if there is any node with the given type.
    292     For example, if INTERNAL_MIC is of intereset, the pattern we are
    293     looking for is:
    294 
    295     dict entry(
    296        string "Type"
    297        variant             string "INTERNAL_MIC"
    298     )
    299 
    300     @param node_type: A str representing node type defined in CRAS_NODE_TYPES.
    301     @param nodes_info: A str containing output of command get_nodes_cmd.
    302 
    303     @returns: True if there is any node of node_type plugged. False otherwise.
    304 
    305     """
    306     match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type,
    307                       nodes_info)
    308     return True if match else False
    309 
    310 
    311 # Cras node types reported from Cras DBus control API.
    312 CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB',
    313                           'BLUETOOTH', 'LINEOUT', 'UNKNOWN']
    314 CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH',
    315                          'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN',
    316                          'KEYBOARD_MIC', 'HOTWORD', 'FRONT_MIC', 'REAR_MIC']
    317 CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES
    318 
    319 
    320 def get_filtered_node_types(callback):
    321     """Returns the pair of filtered output node types and input node types.
    322 
    323     @param callback: A callback function which takes a node as input parameter
    324                      and filter the node based on its return value.
    325 
    326     @returns: A tuple (output_node_types, input_node_types) where each
    327               field is a list of node types defined in CRAS_NODE_TYPES,
    328               and their 'attribute_name' is True.
    329 
    330     """
    331     output_node_types = []
    332     input_node_types = []
    333     nodes = get_cras_nodes()
    334     for node in nodes:
    335         if callback(node):
    336             node_type = str(node['Type'])
    337             if node_type not in CRAS_NODE_TYPES:
    338                 raise RuntimeError(
    339                         'node type %s is not valid' % node_type)
    340             if node['IsInput']:
    341                 input_node_types.append(node_type)
    342             else:
    343                 output_node_types.append(node_type)
    344     return (output_node_types, input_node_types)
    345 
    346 
    347 def get_selected_node_types():
    348     """Returns the pair of active output node types and input node types.
    349 
    350     @returns: A tuple (output_node_types, input_node_types) where each
    351               field is a list of selected node types defined in CRAS_NODE_TYPES.
    352 
    353     """
    354     def is_selected(node):
    355         """Checks if a node is selected.
    356 
    357         A node is selected if its Active attribute is True.
    358 
    359         @returns: True is a node is selected, False otherwise.
    360 
    361         """
    362         return node['Active']
    363 
    364     return get_filtered_node_types(is_selected)
    365 
    366 
    367 def get_selected_input_device_name():
    368     """Returns the device name of the active input node.
    369 
    370     @returns: device name string. E.g. kbl_r5514_5663_max: :0,1
    371     """
    372     nodes = get_cras_nodes()
    373     for node in nodes:
    374         if node['Active'] and node['IsInput']:
    375             return node['DeviceName']
    376     return None
    377 
    378 
    379 def get_selected_output_device_name():
    380     """Returns the device name of the active output node.
    381 
    382     @returns: device name string. E.g. mtk-rt5650: :0,0
    383     """
    384     nodes = get_cras_nodes()
    385     for node in nodes:
    386         if node['Active'] and not node['IsInput']:
    387             return node['DeviceName']
    388     return None
    389 
    390 
    391 def get_selected_output_device_type():
    392     """Returns the device type of the active output node.
    393 
    394     @returns: device type string. E.g. INTERNAL_SPEAKER
    395     """
    396     nodes = get_cras_nodes()
    397     for node in nodes:
    398         if node['Active'] and not node['IsInput']:
    399             return node['Type']
    400     return None
    401 
    402 
    403 def get_plugged_node_types():
    404     """Returns the pair of plugged output node types and input node types.
    405 
    406     @returns: A tuple (output_node_types, input_node_types) where each
    407               field is a list of plugged node types defined in CRAS_NODE_TYPES.
    408 
    409     """
    410     def is_plugged(node):
    411         """Checks if a node is plugged and is not unknown node.
    412 
    413         Cras DBus API only reports plugged node, so every node reported by Cras
    414         DBus API is plugged. However, we filter out UNKNOWN node here because
    415         the existence of unknown node depends on the number of redundant
    416         playback/record audio device created on audio card. Also, the user of
    417         Cras will ignore unknown nodes.
    418 
    419         @returns: True if a node is plugged and is not an UNKNOWN node.
    420 
    421         """
    422         return node['Type'] != 'UNKNOWN'
    423 
    424     return get_filtered_node_types(is_plugged)
    425 
    426 
    427 def set_selected_node_types(output_node_types, input_node_types):
    428     """Sets selected node types.
    429 
    430     @param output_node_types: A list of output node types. None to skip setting.
    431     @param input_node_types: A list of input node types. None to skip setting.
    432 
    433     """
    434     if len(output_node_types) == 1:
    435         set_single_selected_output_node(output_node_types[0])
    436     elif output_node_types:
    437         set_selected_output_nodes(output_node_types)
    438     if len(input_node_types) == 1:
    439         set_single_selected_input_node(input_node_types[0])
    440     elif input_node_types:
    441         set_selected_input_nodes(input_node_types)
    442 
    443 
    444 def set_single_selected_output_node(node_type):
    445     """Sets one selected output node.
    446 
    447     Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
    448     to select one output node.
    449 
    450     @param node_type: A node type.
    451 
    452     """
    453     nodes = get_cras_nodes()
    454     for node in nodes:
    455         if node['IsInput']:
    456             continue
    457         if node['Type'] == node_type:
    458             set_active_output_node(node['Id'])
    459 
    460 
    461 def set_single_selected_input_node(node_type):
    462     """Sets one selected input node.
    463 
    464     Note that Chrome UI uses SetActiveInputNode of Cras DBus API
    465     to select one input node.
    466 
    467     @param node_type: A node type.
    468 
    469     """
    470     nodes = get_cras_nodes()
    471     for node in nodes:
    472         if not node['IsInput']:
    473             continue
    474         if node['Type'] == node_type:
    475             set_active_input_node(node['Id'])
    476 
    477 
    478 def set_selected_output_nodes(types):
    479     """Sets selected output node types.
    480 
    481     Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
    482     to select one output node. Here we use add/remove active output node
    483     to support multiple nodes.
    484 
    485     @param types: A list of output node types.
    486 
    487     """
    488     nodes = get_cras_nodes()
    489     for node in nodes:
    490         if node['IsInput']:
    491             continue
    492         if node['Type'] in types:
    493             add_active_output_node(node['Id'])
    494         elif node['Active']:
    495             remove_active_output_node(node['Id'])
    496 
    497 
    498 def set_selected_input_nodes(types):
    499     """Sets selected input node types.
    500 
    501     Note that Chrome UI uses SetActiveInputNode of Cras DBus API
    502     to select one input node. Here we use add/remove active input node
    503     to support multiple nodes.
    504 
    505     @param types: A list of input node types.
    506 
    507     """
    508     nodes = get_cras_nodes()
    509     for node in nodes:
    510         if not node['IsInput']:
    511             continue
    512         if node['Type'] in types:
    513             add_active_input_node(node['Id'])
    514         elif node['Active']:
    515             remove_active_input_node(node['Id'])
    516 
    517 
    518 def set_active_input_node(node_id):
    519     """Sets one active input node.
    520 
    521     @param node_id: node id.
    522 
    523     """
    524     get_cras_control_interface().SetActiveInputNode(node_id)
    525 
    526 
    527 def set_active_output_node(node_id):
    528     """Sets one active output node.
    529 
    530     @param node_id: node id.
    531 
    532     """
    533     get_cras_control_interface().SetActiveOutputNode(node_id)
    534 
    535 
    536 def add_active_output_node(node_id):
    537     """Adds an active output node.
    538 
    539     @param node_id: node id.
    540 
    541     """
    542     get_cras_control_interface().AddActiveOutputNode(node_id)
    543 
    544 
    545 def add_active_input_node(node_id):
    546     """Adds an active input node.
    547 
    548     @param node_id: node id.
    549 
    550     """
    551     get_cras_control_interface().AddActiveInputNode(node_id)
    552 
    553 
    554 def remove_active_output_node(node_id):
    555     """Removes an active output node.
    556 
    557     @param node_id: node id.
    558 
    559     """
    560     get_cras_control_interface().RemoveActiveOutputNode(node_id)
    561 
    562 
    563 def remove_active_input_node(node_id):
    564     """Removes an active input node.
    565 
    566     @param node_id: node id.
    567 
    568     """
    569     get_cras_control_interface().RemoveActiveInputNode(node_id)
    570 
    571 
    572 def get_node_id_from_node_type(node_type, is_input):
    573     """Gets node id from node type.
    574 
    575     @param types: A node type defined in CRAS_NODE_TYPES.
    576     @param is_input: True if the node is input. False otherwise.
    577 
    578     @returns: A string for node id.
    579 
    580     @raises: CrasUtilsError: if unique node id can not be found.
    581 
    582     """
    583     nodes = get_cras_nodes()
    584     find_ids = []
    585     for node in nodes:
    586         if node['Type'] == node_type and node['IsInput'] == is_input:
    587             find_ids.append(node['Id'])
    588     if len(find_ids) != 1:
    589         raise CrasUtilsError(
    590                 'Can not find unique node id from node type %s' % node_type)
    591     return find_ids[0]
    592 
    593 def get_active_node_volume():
    594     """Returns volume from active node.
    595 
    596     @returns: int for volume
    597 
    598     @raises: CrasUtilsError: if node volume cannot be found.
    599     """
    600     nodes = get_cras_nodes()
    601     for node in nodes:
    602         if node['Active'] == 1 and node['IsInput'] == 0:
    603             return int(node['NodeVolume'])
    604     raise CrasUtilsError('Cannot find active node volume from nodes.')
    605