1 # Copyright 2015 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import collections 6 import dbus 7 import logging 8 import pipes 9 import re 10 import shlex 11 12 from autotest_lib.client.bin import utils 13 from autotest_lib.client.common_lib import error 14 15 16 # Represents the result of a dbus-send call. |sender| refers to the temporary 17 # bus name of dbus-send, |responder| to the remote process, and |response| 18 # contains the parsed response. 19 DBusSendResult = collections.namedtuple('DBusSendResult', ['sender', 20 'responder', 21 'response']) 22 # Used internally. 23 DictEntry = collections.namedtuple('DictEntry', ['key', 'value']) 24 25 26 def _build_token_stream(headerless_dbus_send_output): 27 """A tokenizer for dbus-send output. 28 29 The output is basically just like splitting on whitespace, except that 30 strings are kept together by " characters. 31 32 @param headerless_dbus_send_output: list of lines of dbus-send output 33 without the meta-information prefix. 34 @return list of tokens in dbus-send output. 35 """ 36 return shlex.split(' '.join(headerless_dbus_send_output)) 37 38 39 def _parse_value(token_stream): 40 """Turn a stream of tokens from dbus-send output into native python types. 41 42 @param token_stream: output from _build_token_stream() above. 43 44 """ 45 # Assumes properly tokenized output (strings with spaces handled). 46 # Assumes tokens are pre-stripped 47 token_type = token_stream.pop(0) 48 if token_type == 'variant': 49 token_type = token_stream.pop(0) 50 if token_type == 'object': 51 token_type = token_stream.pop(0) # Should be 'path' 52 token_value = token_stream.pop(0) 53 INT_TYPES = ('int16', 'uint16', 'int32', 'uint32', 54 'int64', 'uint64', 'byte') 55 if token_type in INT_TYPES: 56 return int(token_value) 57 if token_type == 'string' or token_type == 'path': 58 return token_value # shlex removed surrounding " chars. 59 if token_type == 'boolean': 60 return token_value == 'true' 61 if token_type == 'double': 62 return float(token_value) 63 if token_type == 'array': 64 values = [] 65 while token_stream[0] != ']': 66 values.append(_parse_value(token_stream)) 67 token_stream.pop(0) 68 if values and all([isinstance(x, DictEntry) for x in values]): 69 values = dict(values) 70 return values 71 if token_type == 'dict': 72 assert token_value == 'entry(' 73 key = _parse_value(token_stream) 74 value = _parse_value(token_stream) 75 assert token_stream.pop(0) == ')' 76 return DictEntry(key=key, value=value) 77 raise error.TestError('Unhandled DBus type found: %s' % token_type) 78 79 80 def _parse_dbus_send_output(dbus_send_stdout): 81 """Turn dbus-send output into usable Python types. 82 83 This looks like: 84 85 localhost ~ # dbus-send --system --dest=org.chromium.flimflam \ 86 --print-reply --reply-timeout=2000 / \ 87 org.chromium.flimflam.Manager.GetProperties 88 method return sender=:1.12 -> dest=:1.37 reply_serial=2 89 array [ 90 dict entry( 91 string "ActiveProfile" 92 variant string "/profile/default" 93 ) 94 dict entry( 95 string "ArpGateway" 96 variant boolean true 97 ) 98 ... 99 ] 100 101 @param dbus_send_output: string stdout from dbus-send 102 @return a DBusSendResult. 103 104 """ 105 lines = dbus_send_stdout.strip().splitlines() 106 # The first line contains meta-information about the response 107 header = lines[0] 108 lines = lines[1:] 109 dbus_address_pattern = '[:\d\\.]+' 110 match = re.match('method return sender=(%s) -> dest=(%s) reply_serial=\d+' % 111 (dbus_address_pattern, dbus_address_pattern), header) 112 if match is None: 113 raise error.TestError('Could not parse dbus-send header: %s' % header) 114 115 sender = match.group(1) 116 responder = match.group(2) 117 token_stream = _build_token_stream(lines) 118 ret_val = _parse_value(token_stream) 119 # Note that DBus permits multiple response values, and this is not handled. 120 logging.debug('Got DBus response: %r', ret_val) 121 return DBusSendResult(sender=sender, responder=responder, response=ret_val) 122 123 124 def _build_arg_string(raw_args): 125 """Construct a string of arguments to a DBus method as dbus-send expects. 126 127 @param raw_args list of dbus.* type objects to seriallize. 128 @return string suitable for dbus-send. 129 130 """ 131 dbus.Boolean 132 int_map = { 133 dbus.Int16: 'int16:', 134 dbus.Int32: 'int32:', 135 dbus.Int64: 'int64:', 136 dbus.UInt16: 'uint16:', 137 dbus.UInt32: 'uint32:', 138 dbus.UInt64: 'uint64:', 139 dbus.Double: 'double:', 140 dbus.Byte: 'byte:', 141 } 142 arg_list = [] 143 for arg in raw_args: 144 if isinstance(arg, dbus.String): 145 arg_list.append(pipes.quote('string:%s' % 146 arg.replace('"', r'\"'))) 147 continue 148 if isinstance(arg, dbus.Boolean): 149 if arg: 150 arg_list.append('boolean:true') 151 else: 152 arg_list.append('boolean:false') 153 continue 154 for prim_type, prefix in int_map.iteritems(): 155 if isinstance(arg, prim_type): 156 arg_list.append(prefix + str(arg)) 157 continue 158 159 raise error.TestError('No support for serializing %r' % arg) 160 return ' '.join(arg_list) 161 162 163 def dbus_send(bus_name, interface, object_path, method_name, args=None, 164 host=None, timeout_seconds=2, tolerate_failures=False): 165 """Call dbus-send without arguments. 166 167 @param bus_name: string identifier of DBus connection to send a message to. 168 @param interface: string DBus interface of object to call method on. 169 @param object_path: string DBus path of remote object to call method on. 170 @param method_name: string name of method to call. 171 @param args: optional list of arguments. Arguments must be of types 172 from the python dbus module. 173 @param host: An optional host object if running against a remote host. 174 @param timeout_seconds: number of seconds to wait for a response. 175 @param tolerate_failures: boolean True to ignore problems receiving a 176 response. 177 178 """ 179 run = utils.run if host is None else host.run 180 cmd = ('dbus-send --system --print-reply --reply-timeout=%d --dest=%s ' 181 '%s %s.%s' % (int(timeout_seconds * 1000), bus_name, 182 object_path, interface, method_name)) 183 if args is not None: 184 cmd = cmd + ' ' + _build_arg_string(args) 185 result = run(cmd, ignore_status=tolerate_failures) 186 if result.exit_status != 0: 187 logging.debug('%r', result.stdout) 188 return None 189 return _parse_dbus_send_output(result.stdout) 190 191 192 def get_property(bus_name, interface, object_path, property_name, host=None): 193 """A helpful wrapper that extracts the value of a DBus property. 194 195 @param bus_name: string identifier of DBus connection to send a message to. 196 @param interface: string DBus interface exposing the property. 197 @param object_path: string DBus path of remote object to call method on. 198 @param property_name: string name of property to get. 199 @param host: An optional host object if running against a remote host. 200 201 """ 202 return dbus_send(bus_name, dbus.PROPERTIES_IFACE, object_path, 'Get', 203 args=[dbus.String(interface), dbus.String(property_name)], 204 host=host) 205