1 # Copyright 2017 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import dbus 6 7 from autotest_lib.client.common_lib import utils 8 from autotest_lib.client.common_lib.cros import dbus_send 9 from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector 10 from autotest_lib.client.common_lib.cros.cfm.usb import cfm_usb_devices 11 12 ATRUS = cfm_usb_devices.ATRUS 13 14 def check_dbus_available(host): 15 """ 16 Checks if Atrusd with dbus support is available. 17 18 @param host: Host object to run a particular host. 19 @returns True if dbus is supported in atrusd. False otherwise. 20 """ 21 result = dbus_send.dbus_send( 22 'org.freedesktop.DBus', 23 'org.freedesktop.DBus', 24 '/org/freedesktop/DBus', 25 'ListNames', 26 args=None, 27 host=host, 28 timeout_seconds=2, 29 tolerate_failures=False) 30 31 return 'org.chromium.Atrusctl' in result.response 32 33 def force_upgrade_atrus(host): 34 """ 35 Executes a forced upgrade of the Atrus device. 36 37 @param host: Host object to run a particular host. 38 @returns True if the upgrade succeeded. False otherwise. 39 """ 40 args = [dbus.String('/lib/firmware/google/atrus-fw-bundle-latest.bin')] 41 result = dbus_send.dbus_send( 42 'org.chromium.Atrusctl', 43 'org.chromium.Atrusctl', 44 '/org/chromium/Atrusctl', 45 'ForceFirmwareUpgrade', 46 args=args, 47 host=host, 48 timeout_seconds=90, 49 tolerate_failures=True) 50 51 return result.response 52 53 def wait_for_atrus_enumeration(host, timeout_seconds=30): 54 """ 55 Wait for an Atrus device to enumerate. 56 57 @param host: Host object to run a particular host. 58 @param timeout_seconds: Maximum number of seconds to wait. 59 """ 60 device_manager = usb_device_collector.UsbDeviceCollector(host) 61 62 utils.poll_for_condition( 63 lambda: len(device_manager.get_devices_by_spec(ATRUS)) >= 1, 64 timeout=timeout_seconds, 65 sleep_interval=1.0, 66 desc='No atrus device enumerated') 67