Home | History | Annotate | Download | only in servo
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 #
      5 # Expects to be run in an environment with sudo and no interactive password
      6 # prompt, such as within the Chromium OS development chroot.
      7 
      8 import os
      9 
     10 import logging, re, time, xmlrpclib
     11 
     12 from autotest_lib.client.common_lib import error
     13 from autotest_lib.server.cros.servo import firmware_programmer
     14 
     15 # Time to wait when probing for a usb device, it takes on avg 17 seconds
     16 # to do a full probe.
     17 _USB_PROBE_TIMEOUT = 40
     18 
     19 
     20 class _PowerStateController(object):
     21 
     22     """Class to provide board-specific power operations.
     23 
     24     This class is responsible for "power on" and "power off"
     25     operations that can operate without making assumptions in
     26     advance about board state.  It offers an interface that
     27     abstracts out the different sequences required for different
     28     board types.
     29 
     30     """
     31 
     32     # Constants acceptable to be passed for the `rec_mode` parameter
     33     # to power_on().
     34     #
     35     # REC_ON:  Boot the DUT in recovery mode, i.e. boot from USB or
     36     #   SD card.
     37     # REC_OFF:  Boot in normal mode, i.e. boot from internal storage.
     38 
     39     REC_ON = 'rec'
     40     REC_OFF = 'on'
     41 
     42     # Delay in seconds needed between asserting and de-asserting
     43     # warm reset.
     44     _RESET_HOLD_TIME = 0.5
     45 
     46     def __init__(self, servo):
     47         """Initialize the power state control.
     48 
     49         @param servo Servo object providing the underlying `set` and `get`
     50                      methods for the target controls.
     51 
     52         """
     53         self._servo = servo
     54 
     55     def reset(self):
     56         """Force the DUT to reset.
     57 
     58         The DUT is guaranteed to be on at the end of this call,
     59         regardless of its previous state, provided that there is
     60         working OS software. This also guarantees that the EC has
     61         been restarted.
     62 
     63         """
     64         self._servo.set_nocheck('power_state', 'reset')
     65 
     66     def warm_reset(self):
     67         """Apply warm reset to the DUT.
     68 
     69         This asserts, then de-asserts the 'warm_reset' signal.
     70         Generally, this causes the board to restart.
     71 
     72         """
     73         self._servo.set_get_all(['warm_reset:on',
     74                                  'sleep:%.4f' % self._RESET_HOLD_TIME,
     75                                  'warm_reset:off'])
     76 
     77     def power_off(self):
     78         """Force the DUT to power off.
     79 
     80         The DUT is guaranteed to be off at the end of this call,
     81         regardless of its previous state, provided that there is
     82         working EC and boot firmware.  There is no requirement for
     83         working OS software.
     84 
     85         """
     86         self._servo.set_nocheck('power_state', 'off')
     87 
     88     def power_on(self, rec_mode=REC_OFF):
     89         """Force the DUT to power on.
     90 
     91         Prior to calling this function, the DUT must be powered off,
     92         e.g. with a call to `power_off()`.
     93 
     94         At power on, recovery mode is set as specified by the
     95         corresponding argument.  When booting with recovery mode on, it
     96         is the caller's responsibility to unplug/plug in a bootable
     97         external storage device.
     98 
     99         If the DUT requires a delay after powering on but before
    100         processing inputs such as USB stick insertion, the delay is
    101         handled by this method; the caller is not responsible for such
    102         delays.
    103 
    104         @param rec_mode Setting of recovery mode to be applied at
    105                         power on. default: REC_OFF aka 'off'
    106 
    107         """
    108         self._servo.set_nocheck('power_state', rec_mode)
    109 
    110 
    111 class Servo(object):
    112 
    113     """Manages control of a Servo board.
    114 
    115     Servo is a board developed by hardware group to aide in the debug and
    116     control of various partner devices. Servo's features include the simulation
    117     of pressing the power button, closing the lid, and pressing Ctrl-d. This
    118     class manages setting up and communicating with a servo demon (servod)
    119     process. It provides both high-level functions for common servo tasks and
    120     low-level functions for directly setting and reading gpios.
    121 
    122     """
    123 
    124     # Power button press delays in seconds.
    125     #
    126     # The EC specification says that 8.0 seconds should be enough
    127     # for the long power press.  However, some platforms need a bit
    128     # more time.  Empirical testing has found these requirements:
    129     #   Alex: 8.2 seconds
    130     #   ZGB:  8.5 seconds
    131     # The actual value is set to the largest known necessary value.
    132     #
    133     # TODO(jrbarnette) Being generous is the right thing to do for
    134     # existing platforms, but if this code is to be used for
    135     # qualification of new hardware, we should be less generous.
    136     SHORT_DELAY = 0.1
    137 
    138     # Maximum number of times to re-read power button on release.
    139     GET_RETRY_MAX = 10
    140 
    141     # Delays to deal with DUT state transitions.
    142     SLEEP_DELAY = 6
    143     BOOT_DELAY = 10
    144 
    145     # Default minimum time interval between 'press' and 'release'
    146     # keyboard events.
    147     SERVO_KEY_PRESS_DELAY = 0.1
    148 
    149     # Time to toggle recovery switch on and off.
    150     REC_TOGGLE_DELAY = 0.1
    151 
    152     # Time to toggle development switch on and off.
    153     DEV_TOGGLE_DELAY = 0.1
    154 
    155     # Time between an usb disk plugged-in and detected in the system.
    156     USB_DETECTION_DELAY = 10
    157     # Time to keep USB power off before and after USB mux direction is changed
    158     USB_POWEROFF_DELAY = 2
    159 
    160     # Time to wait before timing out on servo initialization.
    161     INIT_TIMEOUT_SECS = 10
    162 
    163 
    164     def __init__(self, servo_host, servo_serial=None):
    165         """Sets up the servo communication infrastructure.
    166 
    167         @param servo_host: A ServoHost object representing
    168                            the host running servod.
    169         @param servo_serial: Serial number of the servo board.
    170         """
    171         # TODO(fdeng): crbug.com/298379
    172         # We should move servo_host object out of servo object
    173         # to minimize the dependencies on the rest of Autotest.
    174         self._servo_host = servo_host
    175         self._servo_serial = servo_serial
    176         self._server = servo_host.get_servod_server_proxy()
    177         self._power_state = _PowerStateController(self)
    178         self._usb_state = None
    179         self._programmer = None
    180 
    181 
    182     @property
    183     def servo_serial(self):
    184         """Returns the serial number of the servo board."""
    185         return self._servo_serial
    186 
    187 
    188     def get_power_state_controller(self):
    189         """Return the power state controller for this Servo.
    190 
    191         The power state controller provides board-independent
    192         interfaces for reset, power-on, power-off operations.
    193 
    194         """
    195         return self._power_state
    196 
    197 
    198     def initialize_dut(self, cold_reset=False):
    199         """Initializes a dut for testing purposes.
    200 
    201         This sets various servo signals back to default values
    202         appropriate for the target board.  By default, if the DUT
    203         is already on, it stays on.  If the DUT is powered off
    204         before initialization, its state afterward is unspecified.
    205 
    206         Rationale:  Basic initialization of servo sets the lid open,
    207         when there is a lid.  This operation won't affect powered on
    208         units; however, setting the lid open may power on a unit
    209         that's off, depending on the board type and previous state
    210         of the device.
    211 
    212         If `cold_reset` is a true value, the DUT and its EC will be
    213         reset, and the DUT rebooted in normal mode.
    214 
    215         @param cold_reset If True, cold reset the device after
    216                           initialization.
    217         """
    218         self._server.hwinit()
    219         self.set('usb_mux_oe1', 'on')
    220         self._usb_state = None
    221         self.switch_usbkey('off')
    222         if cold_reset:
    223             self._power_state.reset()
    224         logging.debug('Servo initialized, version is %s',
    225                       self._server.get_version())
    226 
    227 
    228     def is_localhost(self):
    229         """Is the servod hosted locally?
    230 
    231         Returns:
    232           True if local hosted; otherwise, False.
    233         """
    234         return self._servo_host.is_localhost()
    235 
    236 
    237     def power_long_press(self):
    238         """Simulate a long power button press."""
    239         # After a long power press, the EC may ignore the next power
    240         # button press (at least on Alex).  To guarantee that this
    241         # won't happen, we need to allow the EC one second to
    242         # collect itself.
    243         self._server.power_long_press()
    244 
    245 
    246     def power_normal_press(self):
    247         """Simulate a normal power button press."""
    248         self._server.power_normal_press()
    249 
    250 
    251     def power_short_press(self):
    252         """Simulate a short power button press."""
    253         self._server.power_short_press()
    254 
    255 
    256     def power_key(self, press_secs=''):
    257         """Simulate a power button press.
    258 
    259         @param press_secs : Str. Time to press key.
    260         """
    261         self._server.power_key(press_secs)
    262 
    263 
    264     def lid_open(self):
    265         """Simulate opening the lid and raise exception if all attempts fail"""
    266         self.set('lid_open', 'yes')
    267 
    268 
    269     def lid_close(self):
    270         """Simulate closing the lid and raise exception if all attempts fail
    271 
    272         Waits 6 seconds to ensure the device is fully asleep before returning.
    273         """
    274         self.set('lid_open', 'no')
    275         time.sleep(Servo.SLEEP_DELAY)
    276 
    277     def volume_up(self, timeout=300):
    278         """Simulate pushing the volume down button.
    279 
    280         @param timeout: Timeout for setting the volume.
    281         """
    282         self.set_get_all(['volume_up:yes',
    283                           'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
    284                           'volume_up:no'])
    285         # we need to wait for commands to take effect before moving on
    286         time_left = float(timeout)
    287         while time_left > 0.0:
    288             value = self.get('volume_up')
    289             if value == 'no':
    290                 return
    291             time.sleep(self.SHORT_DELAY)
    292             time_left = time_left - self.SHORT_DELAY
    293         raise error.TestFail("Failed setting volume_up to no")
    294 
    295     def volume_down(self, timeout=300):
    296         """Simulate pushing the volume down button.
    297 
    298         @param timeout: Timeout for setting the volume.
    299         """
    300         self.set_get_all(['volume_down:yes',
    301                           'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
    302                           'volume_down:no'])
    303         # we need to wait for commands to take effect before moving on
    304         time_left = float(timeout)
    305         while time_left > 0.0:
    306             value = self.get('volume_down')
    307             if value == 'no':
    308                 return
    309             time.sleep(self.SHORT_DELAY)
    310             time_left = time_left - self.SHORT_DELAY
    311         raise error.TestFail("Failed setting volume_down to no")
    312 
    313     def ctrl_d(self, press_secs=''):
    314         """Simulate Ctrl-d simultaneous button presses.
    315 
    316         @param press_secs : Str. Time to press key.
    317         """
    318         self._server.ctrl_d(press_secs)
    319 
    320 
    321     def ctrl_u(self):
    322         """Simulate Ctrl-u simultaneous button presses.
    323 
    324         @param press_secs : Str. Time to press key.
    325         """
    326         self._server.ctrl_u()
    327 
    328 
    329     def ctrl_enter(self, press_secs=''):
    330         """Simulate Ctrl-enter simultaneous button presses.
    331 
    332         @param press_secs : Str. Time to press key.
    333         """
    334         self._server.ctrl_enter(press_secs)
    335 
    336 
    337     def d_key(self, press_secs=''):
    338         """Simulate Enter key button press.
    339 
    340         @param press_secs : Str. Time to press key.
    341         """
    342         self._server.d_key(press_secs)
    343 
    344 
    345     def ctrl_key(self, press_secs=''):
    346         """Simulate Enter key button press.
    347 
    348         @param press_secs : Str. Time to press key.
    349         """
    350         self._server.ctrl_key(press_secs)
    351 
    352 
    353     def enter_key(self, press_secs=''):
    354         """Simulate Enter key button press.
    355 
    356         @param press_secs : Str. Time to press key.
    357         """
    358         self._server.enter_key(press_secs)
    359 
    360 
    361     def refresh_key(self, press_secs=''):
    362         """Simulate Refresh key (F3) button press.
    363 
    364         @param press_secs : Str. Time to press key.
    365         """
    366         self._server.refresh_key(press_secs)
    367 
    368 
    369     def ctrl_refresh_key(self, press_secs=''):
    370         """Simulate Ctrl and Refresh (F3) simultaneous press.
    371 
    372         This key combination is an alternative of Space key.
    373 
    374         @param press_secs : Str. Time to press key.
    375         """
    376         self._server.ctrl_refresh_key(press_secs)
    377 
    378 
    379     def imaginary_key(self, press_secs=''):
    380         """Simulate imaginary key button press.
    381 
    382         Maps to a key that doesn't physically exist.
    383 
    384         @param press_secs : Str. Time to press key.
    385         """
    386         self._server.imaginary_key(press_secs)
    387 
    388 
    389     def sysrq_x(self, press_secs=''):
    390         """Simulate Alt VolumeUp X simulataneous press.
    391 
    392         This key combination is the kernel system request (sysrq) X.
    393 
    394         @param press_secs : Str. Time to press key.
    395         """
    396         self._server.sysrq_x(press_secs)
    397 
    398 
    399     def toggle_recovery_switch(self):
    400         """Toggle recovery switch on and off."""
    401         self.enable_recovery_mode()
    402         time.sleep(self.REC_TOGGLE_DELAY)
    403         self.disable_recovery_mode()
    404 
    405 
    406     def enable_recovery_mode(self):
    407         """Enable recovery mode on device."""
    408         self.set('rec_mode', 'on')
    409 
    410 
    411     def disable_recovery_mode(self):
    412         """Disable recovery mode on device."""
    413         self.set('rec_mode', 'off')
    414 
    415 
    416     def toggle_development_switch(self):
    417         """Toggle development switch on and off."""
    418         self.enable_development_mode()
    419         time.sleep(self.DEV_TOGGLE_DELAY)
    420         self.disable_development_mode()
    421 
    422 
    423     def enable_development_mode(self):
    424         """Enable development mode on device."""
    425         self.set('dev_mode', 'on')
    426 
    427 
    428     def disable_development_mode(self):
    429         """Disable development mode on device."""
    430         self.set('dev_mode', 'off')
    431 
    432     def boot_devmode(self):
    433         """Boot a dev-mode device that is powered off."""
    434         self.power_short_press()
    435         self.pass_devmode()
    436 
    437 
    438     def pass_devmode(self):
    439         """Pass through boot screens in dev-mode."""
    440         time.sleep(Servo.BOOT_DELAY)
    441         self.ctrl_d()
    442         time.sleep(Servo.BOOT_DELAY)
    443 
    444 
    445     def get_board(self):
    446         """Get the board connected to servod.
    447 
    448         """
    449         return self._server.get_board()
    450 
    451 
    452     def _get_xmlrpclib_exception(self, xmlexc):
    453         """Get meaningful exception string from xmlrpc.
    454 
    455         Args:
    456             xmlexc: xmlrpclib.Fault object
    457 
    458         xmlrpclib.Fault.faultString has the following format:
    459 
    460         <type 'exception type'>:'actual error message'
    461 
    462         Parse and return the real exception from the servod side instead of the
    463         less meaningful one like,
    464            <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no
    465            attribute 'hw_driver'">
    466 
    467         Returns:
    468             string of underlying exception raised in servod.
    469         """
    470         return re.sub('^.*>:', '', xmlexc.faultString)
    471 
    472 
    473     def get(self, gpio_name):
    474         """Get the value of a gpio from Servod.
    475 
    476         @param gpio_name Name of the gpio.
    477         """
    478         assert gpio_name
    479         try:
    480             return self._server.get(gpio_name)
    481         except  xmlrpclib.Fault as e:
    482             err_msg = "Getting '%s' :: %s" % \
    483                 (gpio_name, self._get_xmlrpclib_exception(e))
    484             raise error.TestFail(err_msg)
    485 
    486 
    487     def set(self, gpio_name, gpio_value):
    488         """Set and check the value of a gpio using Servod.
    489 
    490         @param gpio_name Name of the gpio.
    491         @param gpio_value New setting for the gpio.
    492         """
    493         self.set_nocheck(gpio_name, gpio_value)
    494         retry_count = Servo.GET_RETRY_MAX
    495         while gpio_value != self.get(gpio_name) and retry_count:
    496             logging.warning("%s != %s, retry %d", gpio_name, gpio_value,
    497                          retry_count)
    498             retry_count -= 1
    499             time.sleep(Servo.SHORT_DELAY)
    500         if not retry_count:
    501             assert gpio_value == self.get(gpio_name), \
    502                 'Servo failed to set %s to %s' % (gpio_name, gpio_value)
    503 
    504 
    505     def set_nocheck(self, gpio_name, gpio_value):
    506         """Set the value of a gpio using Servod.
    507 
    508         @param gpio_name Name of the gpio.
    509         @param gpio_value New setting for the gpio.
    510         """
    511         assert gpio_name and gpio_value
    512         logging.info('Setting %s to %s', gpio_name, gpio_value)
    513         try:
    514             self._server.set(gpio_name, gpio_value)
    515         except  xmlrpclib.Fault as e:
    516             err_msg = "Setting '%s' to '%s' :: %s" % \
    517                 (gpio_name, gpio_value, self._get_xmlrpclib_exception(e))
    518             raise error.TestFail(err_msg)
    519 
    520 
    521     def set_get_all(self, controls):
    522         """Set &| get one or more control values.
    523 
    524         @param controls: list of strings, controls to set &| get.
    525 
    526         @raise: error.TestError in case error occurs setting/getting values.
    527         """
    528         rv = []
    529         try:
    530             logging.info('Set/get all: %s', str(controls))
    531             rv = self._server.set_get_all(controls)
    532         except xmlrpclib.Fault as e:
    533             # TODO(waihong): Remove the following backward compatibility when
    534             # the new versions of hdctools are deployed.
    535             if 'not supported' in str(e):
    536                 logging.warning('The servod is too old that set_get_all '
    537                         'not supported. Use set and get instead.')
    538                 for control in controls:
    539                     if ':' in control:
    540                         (name, value) = control.split(':')
    541                         if name == 'sleep':
    542                             time.sleep(float(value))
    543                         else:
    544                             self.set_nocheck(name, value)
    545                         rv.append(True)
    546                     else:
    547                         rv.append(self.get(name))
    548             else:
    549                 err_msg = "Problem with '%s' :: %s" % \
    550                     (controls, self._get_xmlrpclib_exception(e))
    551                 raise error.TestFail(err_msg)
    552         return rv
    553 
    554 
    555     # TODO(waihong) It may fail if multiple servo's are connected to the same
    556     # host. Should look for a better way, like the USB serial name, to identify
    557     # the USB device.
    558     # TODO(sbasi) Remove this code from autoserv once firmware tests have been
    559     # updated.
    560     def probe_host_usb_dev(self, timeout=_USB_PROBE_TIMEOUT):
    561         """Probe the USB disk device plugged-in the servo from the host side.
    562 
    563         It uses servod to discover if there is a usb device attached to it.
    564 
    565         @param timeout The timeout period when probing for the usb host device.
    566 
    567         @return: String of USB disk path (e.g. '/dev/sdb') or None.
    568         """
    569         return self._server.probe_host_usb_dev(timeout) or None
    570 
    571 
    572     def image_to_servo_usb(self, image_path=None,
    573                            make_image_noninteractive=False):
    574         """Install an image to the USB key plugged into the servo.
    575 
    576         This method may copy any image to the servo USB key including a
    577         recovery image or a test image.  These images are frequently used
    578         for test purposes such as restoring a corrupted image or conducting
    579         an upgrade of ec/fw/kernel as part of a test of a specific image part.
    580 
    581         @param image_path Path on the host to the recovery image.
    582         @param make_image_noninteractive Make the recovery image
    583                                    noninteractive, therefore the DUT
    584                                    will reboot automatically after
    585                                    installation.
    586         """
    587         # We're about to start plugging/unplugging the USB key.  We
    588         # don't know the state of the DUT, or what it might choose
    589         # to do to the device after hotplug.  To avoid surprises,
    590         # force the DUT to be off.
    591         self._server.hwinit()
    592         self._power_state.power_off()
    593 
    594         # Set up Servo's usb mux.
    595         self.switch_usbkey('host')
    596         if image_path:
    597             logging.info('Searching for usb device and copying image to it. '
    598                          'Please wait a few minutes...')
    599             if not self._server.download_image_to_usb(image_path):
    600                 logging.error('Failed to transfer requested image to USB. '
    601                               'Please take a look at Servo Logs.')
    602                 raise error.AutotestError('Download image to usb failed.')
    603             if make_image_noninteractive:
    604                 logging.info('Making image noninteractive')
    605                 if not self._server.make_image_noninteractive():
    606                     logging.error('Failed to make image noninteractive. '
    607                                   'Please take a look at Servo Logs.')
    608 
    609 
    610     def install_recovery_image(self, image_path=None,
    611                                make_image_noninteractive=False):
    612         """Install the recovery image specified by the path onto the DUT.
    613 
    614         This method uses google recovery mode to install a recovery image
    615         onto a DUT through the use of a USB stick that is mounted on a servo
    616         board specified by the usb_dev.  If no image path is specified
    617         we use the recovery image already on the usb image.
    618 
    619         @param image_path: Path on the host to the recovery image.
    620         @param make_image_noninteractive: Make the recovery image
    621                 noninteractive, therefore the DUT will reboot automatically
    622                 after installation.
    623         """
    624         self.image_to_servo_usb(image_path, make_image_noninteractive)
    625         self._power_state.power_on(rec_mode=self._power_state.REC_ON)
    626         self.switch_usbkey('dut')
    627 
    628 
    629     def _scp_image(self, image_path):
    630         """Copy image to the servo host.
    631 
    632         When programming a firmware image on the DUT, the image must be
    633         located on the host to which the servo device is connected. Sometimes
    634         servo is controlled by a remote host, in this case the image needs to
    635         be transferred to the remote host.
    636 
    637         @param image_path: a string, name of the firmware image file to be
    638                transferred.
    639         @return: a string, full path name of the copied file on the remote.
    640         """
    641 
    642         dest_path = os.path.join('/tmp', os.path.basename(image_path))
    643         self._servo_host.send_file(image_path, dest_path)
    644         return dest_path
    645 
    646 
    647     def system(self, command, timeout=3600):
    648         """Execute the passed in command on the servod host.
    649 
    650         @param command Command to be executed.
    651         @param timeout Maximum number of seconds of runtime allowed. Default to
    652                        1 hour.
    653         """
    654         logging.info('Will execute on servo host: %s', command)
    655         self._servo_host.run(command, timeout=timeout)
    656 
    657 
    658     def system_output(self, command, timeout=3600,
    659                       ignore_status=False, args=()):
    660         """Execute the passed in command on the servod host, return stdout.
    661 
    662         @param command a string, the command to execute
    663         @param timeout an int, max number of seconds to wait til command
    664                execution completes. Default to 1 hour.
    665         @param ignore_status a Boolean, if true - ignore command's nonzero exit
    666                status, otherwise an exception will be thrown
    667         @param args a tuple of strings, each becoming a separate command line
    668                parameter for the command
    669         @return command's stdout as a string.
    670         """
    671         return self._servo_host.run(command, timeout=timeout,
    672                                     ignore_status=ignore_status,
    673                                     args=args).stdout.strip()
    674 
    675 
    676     def get_servo_version(self):
    677         """Get the version of the servo, e.g., servo_v2 or servo_v3.
    678 
    679         @return: The version of the servo.
    680 
    681         """
    682         return self._server.get_version()
    683 
    684 
    685     def _initialize_programmer(self, rw_only=False):
    686         """Initialize the firmware programmer.
    687 
    688         @param rw_only: True to initialize a programmer which only
    689                         programs the RW portions.
    690         """
    691         if self._programmer:
    692             return
    693         # Initialize firmware programmer
    694         servo_version = self.get_servo_version()
    695         if servo_version.startswith('servo_v2'):
    696             self._programmer = firmware_programmer.ProgrammerV2(self)
    697             self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self)
    698         # Both servo v3 and v4 use the same programming methods so just leverage
    699         # ProgrammerV3 for servo v4 as well.
    700         elif (servo_version.startswith('servo_v3') or
    701               servo_version.startswith('servo_v4')):
    702             self._programmer = firmware_programmer.ProgrammerV3(self)
    703             self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self)
    704         else:
    705             raise error.TestError(
    706                     'No firmware programmer for servo version: %s' %
    707                          servo_version)
    708 
    709 
    710     def program_bios(self, image, rw_only=False):
    711         """Program bios on DUT with given image.
    712 
    713         @param image: a string, file name of the BIOS image to program
    714                       on the DUT.
    715         @param rw_only: True to only program the RW portion of BIOS.
    716 
    717         """
    718         self._initialize_programmer()
    719         if not self.is_localhost():
    720             image = self._scp_image(image)
    721         if rw_only:
    722             self._programmer_rw.program_bios(image)
    723         else:
    724             self._programmer.program_bios(image)
    725 
    726 
    727     def program_ec(self, image, rw_only=False):
    728         """Program ec on DUT with given image.
    729 
    730         @param image: a string, file name of the EC image to program
    731                       on the DUT.
    732         @param rw_only: True to only program the RW portion of EC.
    733 
    734         """
    735         self._initialize_programmer()
    736         if not self.is_localhost():
    737             image = self._scp_image(image)
    738         if rw_only:
    739            self._programmer_rw.program_ec(image)
    740         else:
    741            self._programmer.program_ec(image)
    742 
    743 
    744     def _switch_usbkey_power(self, power_state, detection_delay=False):
    745         """Switch usbkey power.
    746 
    747         This function switches usbkey power by setting the value of
    748         'prtctl4_pwren'. If the power is already in the
    749         requested state, this function simply returns.
    750 
    751         @param power_state: A string, 'on' or 'off'.
    752         @param detection_delay: A boolean value, if True, sleep
    753                                 for |USB_DETECTION_DELAY| after switching
    754                                 the power on.
    755         """
    756         # TODO(kevcheng): Forgive me for this terrible hack. This is just to
    757         # handle beaglebones that haven't yet updated and have the
    758         # safe_switch_usbkey_power RPC.  I'll remove this once all beaglebones
    759         # have been updated and also think about a better way to handle
    760         # situations like this.
    761         try:
    762             self._server.safe_switch_usbkey_power(power_state)
    763         except Exception:
    764             self.set('prtctl4_pwren', power_state)
    765         if power_state == 'off':
    766             time.sleep(self.USB_POWEROFF_DELAY)
    767         elif detection_delay:
    768             time.sleep(self.USB_DETECTION_DELAY)
    769 
    770 
    771     def switch_usbkey(self, usb_state):
    772         """Connect USB flash stick to either host or DUT, or turn USB port off.
    773 
    774         This function switches the servo multiplexer to provide electrical
    775         connection between the USB port J3 and either host or DUT side. It
    776         can also be used to turn the USB port off.
    777 
    778         Switching to 'dut' or 'host' is accompanied by powercycling
    779         of the USB stick, because it sometimes gets wedged if the mux
    780         is switched while the stick power is on.
    781 
    782         @param usb_state: A string, one of 'dut', 'host', or 'off'.
    783                           'dut' and 'host' indicate which side the
    784                           USB flash device is required to be connected to.
    785                           'off' indicates turning the USB port off.
    786 
    787         @raise: error.TestError in case the parameter is not 'dut'
    788                 'host', or 'off'.
    789         """
    790         if self.get_usbkey_direction() == usb_state:
    791             return
    792 
    793         if usb_state == 'off':
    794             self._switch_usbkey_power('off')
    795             self._usb_state = usb_state
    796             return
    797         elif usb_state == 'host':
    798             mux_direction = 'servo_sees_usbkey'
    799         elif usb_state == 'dut':
    800             mux_direction = 'dut_sees_usbkey'
    801         else:
    802             raise error.TestError('Unknown USB state request: %s' % usb_state)
    803 
    804         self._switch_usbkey_power('off')
    805         # TODO(kevcheng): Forgive me for this terrible hack. This is just to
    806         # handle beaglebones that haven't yet updated and have the
    807         # safe_switch_usbkey RPC.  I'll remove this once all beaglebones have
    808         # been updated and also think about a better way to handle situations
    809         # like this.
    810         try:
    811             self._server.safe_switch_usbkey(mux_direction)
    812         except Exception:
    813             self.set('usb_mux_sel1', mux_direction)
    814         time.sleep(self.USB_POWEROFF_DELAY)
    815         self._switch_usbkey_power('on', usb_state == 'host')
    816         self._usb_state = usb_state
    817 
    818 
    819     def get_usbkey_direction(self):
    820         """Get which side USB is connected to or 'off' if usb power is off.
    821 
    822         @return: A string, one of 'dut', 'host', or 'off'.
    823         """
    824         if not self._usb_state:
    825             if self.get('prtctl4_pwren') == 'off':
    826                 self._usb_state = 'off'
    827             elif self.get('usb_mux_sel1').startswith('dut'):
    828                 self._usb_state = 'dut'
    829             else:
    830                 self._usb_state = 'host'
    831         return self._usb_state
    832