Home | History | Annotate | Download | only in cros
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import collections
      6 import dbus
      7 import logging
      8 import pipes
      9 import re
     10 import shlex
     11 
     12 from autotest_lib.client.bin import utils
     13 from autotest_lib.client.common_lib import error
     14 
     15 
     16 # Represents the result of a dbus-send call.  |sender| refers to the temporary
     17 # bus name of dbus-send, |responder| to the remote process, and |response|
     18 # contains the parsed response.
     19 DBusSendResult = collections.namedtuple('DBusSendResult', ['sender',
     20                                                            'responder',
     21                                                            'response'])
     22 # Used internally.
     23 DictEntry = collections.namedtuple('DictEntry', ['key', 'value'])
     24 
     25 
     26 def _build_token_stream(headerless_dbus_send_output):
     27     """A tokenizer for dbus-send output.
     28 
     29     The output is basically just like splitting on whitespace, except that
     30     strings are kept together by " characters.
     31 
     32     @param headerless_dbus_send_output: list of lines of dbus-send output
     33             without the meta-information prefix.
     34     @return list of tokens in dbus-send output.
     35     """
     36     return shlex.split(' '.join(headerless_dbus_send_output))
     37 
     38 
     39 def _parse_value(token_stream):
     40     """Turn a stream of tokens from dbus-send output into native python types.
     41 
     42     @param token_stream: output from _build_token_stream() above.
     43 
     44     """
     45     # Assumes properly tokenized output (strings with spaces handled).
     46     # Assumes tokens are pre-stripped
     47     token_type = token_stream.pop(0)
     48     if token_type == 'variant':
     49         token_type = token_stream.pop(0)
     50     if token_type == 'object':
     51         token_type = token_stream.pop(0)  # Should be 'path'
     52     token_value = token_stream.pop(0)
     53     INT_TYPES = ('int16', 'uint16', 'int32', 'uint32',
     54                  'int64', 'uint64', 'byte')
     55     if token_type in INT_TYPES:
     56         return int(token_value)
     57     if token_type == 'string' or token_type == 'path':
     58         return token_value  # shlex removed surrounding " chars.
     59     if token_type == 'boolean':
     60         return token_value == 'true'
     61     if token_type == 'double':
     62         return float(token_value)
     63     if token_type == 'array':
     64         values = []
     65         while token_stream[0] != ']':
     66             values.append(_parse_value(token_stream))
     67         token_stream.pop(0)
     68         if values and all([isinstance(x, DictEntry) for x in values]):
     69             values = dict(values)
     70         return values
     71     if token_type == 'dict':
     72         assert token_value == 'entry('
     73         key = _parse_value(token_stream)
     74         value = _parse_value(token_stream)
     75         assert token_stream.pop(0) == ')'
     76         return DictEntry(key=key, value=value)
     77     raise error.TestError('Unhandled DBus type found: %s' % token_type)
     78 
     79 
     80 def _parse_dbus_send_output(dbus_send_stdout):
     81     """Turn dbus-send output into usable Python types.
     82 
     83     This looks like:
     84 
     85     localhost ~ # dbus-send --system --dest=org.chromium.flimflam \
     86             --print-reply --reply-timeout=2000 / \
     87             org.chromium.flimflam.Manager.GetProperties
     88     method return sender=:1.12 -> dest=:1.37 reply_serial=2
     89        array [
     90           dict entry(
     91              string "ActiveProfile"
     92              variant             string "/profile/default"
     93           )
     94           dict entry(
     95              string "ArpGateway"
     96              variant             boolean true
     97           )
     98           ...
     99        ]
    100 
    101     @param dbus_send_output: string stdout from dbus-send
    102     @return a DBusSendResult.
    103 
    104     """
    105     lines = dbus_send_stdout.strip().splitlines()
    106     # The first line contains meta-information about the response
    107     header = lines[0]
    108     lines = lines[1:]
    109     dbus_address_pattern = '[:\d\\.]+'
    110     match = re.match('method return sender=(%s) -> dest=(%s) reply_serial=\d+' %
    111                      (dbus_address_pattern, dbus_address_pattern), header)
    112     if match is None:
    113         raise error.TestError('Could not parse dbus-send header: %s' % header)
    114 
    115     sender = match.group(1)
    116     responder = match.group(2)
    117     token_stream = _build_token_stream(lines)
    118     ret_val = _parse_value(token_stream)
    119     # Note that DBus permits multiple response values, and this is not handled.
    120     logging.debug('Got DBus response: %r', ret_val)
    121     return DBusSendResult(sender=sender, responder=responder, response=ret_val)
    122 
    123 
    124 def _build_arg_string(raw_args):
    125     """Construct a string of arguments to a DBus method as dbus-send expects.
    126 
    127     @param raw_args list of dbus.* type objects to seriallize.
    128     @return string suitable for dbus-send.
    129 
    130     """
    131     dbus.Boolean
    132     int_map = {
    133             dbus.Int16: 'int16:',
    134             dbus.Int32: 'int32:',
    135             dbus.Int64: 'int64:',
    136             dbus.UInt16: 'uint16:',
    137             dbus.UInt32: 'uint32:',
    138             dbus.UInt64: 'uint64:',
    139             dbus.Double: 'double:',
    140             dbus.Byte: 'byte:',
    141     }
    142     arg_list = []
    143     for arg in raw_args:
    144         if isinstance(arg, dbus.String):
    145             arg_list.append(pipes.quote('string:%s' %
    146                                         arg.replace('"', r'\"')))
    147             continue
    148         if isinstance(arg, dbus.Boolean):
    149             if arg:
    150                 arg_list.append('boolean:true')
    151             else:
    152                 arg_list.append('boolean:false')
    153             continue
    154         for prim_type, prefix in int_map.iteritems():
    155             if isinstance(arg, prim_type):
    156                 arg_list.append(prefix + str(arg))
    157                 continue
    158 
    159         raise error.TestError('No support for serializing %r' % arg)
    160     return ' '.join(arg_list)
    161 
    162 
    163 def dbus_send(bus_name, interface, object_path, method_name, args=None,
    164               host=None, timeout_seconds=2, tolerate_failures=False):
    165     """Call dbus-send without arguments.
    166 
    167     @param bus_name: string identifier of DBus connection to send a message to.
    168     @param interface: string DBus interface of object to call method on.
    169     @param object_path: string DBus path of remote object to call method on.
    170     @param method_name: string name of method to call.
    171     @param args: optional list of arguments.  Arguments must be of types
    172             from the python dbus module.
    173     @param host: An optional host object if running against a remote host.
    174     @param timeout_seconds: number of seconds to wait for a response.
    175     @param tolerate_failures: boolean True to ignore problems receiving a
    176             response.
    177 
    178     """
    179     run = utils.run if host is None else host.run
    180     cmd = ('dbus-send --system --print-reply --reply-timeout=%d --dest=%s '
    181            '%s %s.%s' % (int(timeout_seconds * 1000), bus_name,
    182                          object_path, interface, method_name))
    183     if args is not None:
    184         cmd = cmd + ' ' + _build_arg_string(args)
    185     result = run(cmd, ignore_status=tolerate_failures)
    186     if result.exit_status != 0:
    187         logging.debug('%r', result.stdout)
    188         return None
    189     return _parse_dbus_send_output(result.stdout)
    190 
    191 
    192 def get_property(bus_name, interface, object_path, property_name, host=None):
    193     """A helpful wrapper that extracts the value of a DBus property.
    194 
    195     @param bus_name: string identifier of DBus connection to send a message to.
    196     @param interface: string DBus interface exposing the property.
    197     @param object_path: string DBus path of remote object to call method on.
    198     @param property_name: string name of property to get.
    199     @param host: An optional host object if running against a remote host.
    200 
    201     """
    202     return dbus_send(bus_name, dbus.PROPERTIES_IFACE, object_path, 'Get',
    203                      args=[dbus.String(interface), dbus.String(property_name)],
    204                      host=host)
    205