Home | History | Annotate | Download | only in servo
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 #
      5 # Expects to be run in an environment with sudo and no interactive password
      6 # prompt, such as within the Chromium OS development chroot.
      7 
      8 import os
      9 
     10 import logging, re, time, xmlrpclib
     11 
     12 from autotest_lib.client.common_lib import error
     13 from autotest_lib.server.cros.servo import firmware_programmer
     14 
     15 # Time to wait when probing for a usb device, it takes on avg 17 seconds
     16 # to do a full probe.
     17 _USB_PROBE_TIMEOUT = 40
     18 
     19 
     20 class _PowerStateController(object):
     21 
     22     """Class to provide board-specific power operations.
     23 
     24     This class is responsible for "power on" and "power off"
     25     operations that can operate without making assumptions in
     26     advance about board state.  It offers an interface that
     27     abstracts out the different sequences required for different
     28     board types.
     29 
     30     """
     31 
     32     # Constants acceptable to be passed for the `rec_mode` parameter
     33     # to power_on().
     34     #
     35     # REC_ON:  Boot the DUT in recovery mode, i.e. boot from USB or
     36     #   SD card.
     37     # REC_OFF:  Boot in normal mode, i.e. boot from internal storage.
     38 
     39     REC_ON = 'rec'
     40     REC_OFF = 'on'
     41 
     42     # Delay in seconds needed between asserting and de-asserting
     43     # warm reset.
     44     _RESET_HOLD_TIME = 0.5
     45 
     46     def __init__(self, servo):
     47         """Initialize the power state control.
     48 
     49         @param servo Servo object providing the underlying `set` and `get`
     50                      methods for the target controls.
     51 
     52         """
     53         self._servo = servo
     54 
     55     def reset(self):
     56         """Force the DUT to reset.
     57 
     58         The DUT is guaranteed to be on at the end of this call,
     59         regardless of its previous state, provided that there is
     60         working OS software. This also guarantees that the EC has
     61         been restarted.
     62 
     63         """
     64         self._servo.set_nocheck('power_state', 'reset')
     65 
     66     def warm_reset(self):
     67         """Apply warm reset to the DUT.
     68 
     69         This asserts, then de-asserts the 'warm_reset' signal.
     70         Generally, this causes the board to restart.
     71 
     72         """
     73         self._servo.set_get_all(['warm_reset:on',
     74                                  'sleep:%.4f' % self._RESET_HOLD_TIME,
     75                                  'warm_reset:off'])
     76 
     77     def power_off(self):
     78         """Force the DUT to power off.
     79 
     80         The DUT is guaranteed to be off at the end of this call,
     81         regardless of its previous state, provided that there is
     82         working EC and boot firmware.  There is no requirement for
     83         working OS software.
     84 
     85         """
     86         self._servo.set_nocheck('power_state', 'off')
     87 
     88     def power_on(self, rec_mode=REC_OFF):
     89         """Force the DUT to power on.
     90 
     91         Prior to calling this function, the DUT must be powered off,
     92         e.g. with a call to `power_off()`.
     93 
     94         At power on, recovery mode is set as specified by the
     95         corresponding argument.  When booting with recovery mode on, it
     96         is the caller's responsibility to unplug/plug in a bootable
     97         external storage device.
     98 
     99         If the DUT requires a delay after powering on but before
    100         processing inputs such as USB stick insertion, the delay is
    101         handled by this method; the caller is not responsible for such
    102         delays.
    103 
    104         @param rec_mode Setting of recovery mode to be applied at
    105                         power on. default: REC_OFF aka 'off'
    106 
    107         """
    108         self._servo.set_nocheck('power_state', rec_mode)
    109 
    110 
    111 class Servo(object):
    112 
    113     """Manages control of a Servo board.
    114 
    115     Servo is a board developed by hardware group to aide in the debug and
    116     control of various partner devices. Servo's features include the simulation
    117     of pressing the power button, closing the lid, and pressing Ctrl-d. This
    118     class manages setting up and communicating with a servo demon (servod)
    119     process. It provides both high-level functions for common servo tasks and
    120     low-level functions for directly setting and reading gpios.
    121 
    122     """
    123 
    124     # Power button press delays in seconds.
    125     #
    126     # The EC specification says that 8.0 seconds should be enough
    127     # for the long power press.  However, some platforms need a bit
    128     # more time.  Empirical testing has found these requirements:
    129     #   Alex: 8.2 seconds
    130     #   ZGB:  8.5 seconds
    131     # The actual value is set to the largest known necessary value.
    132     #
    133     # TODO(jrbarnette) Being generous is the right thing to do for
    134     # existing platforms, but if this code is to be used for
    135     # qualification of new hardware, we should be less generous.
    136     SHORT_DELAY = 0.1
    137 
    138     # Maximum number of times to re-read power button on release.
    139     GET_RETRY_MAX = 10
    140 
    141     # Delays to deal with DUT state transitions.
    142     SLEEP_DELAY = 6
    143     BOOT_DELAY = 10
    144 
    145     # Default minimum time interval between 'press' and 'release'
    146     # keyboard events.
    147     SERVO_KEY_PRESS_DELAY = 0.1
    148 
    149     # Time to toggle recovery switch on and off.
    150     REC_TOGGLE_DELAY = 0.1
    151 
    152     # Time to toggle development switch on and off.
    153     DEV_TOGGLE_DELAY = 0.1
    154 
    155     # Time between an usb disk plugged-in and detected in the system.
    156     USB_DETECTION_DELAY = 10
    157     # Time to keep USB power off before and after USB mux direction is changed
    158     USB_POWEROFF_DELAY = 2
    159 
    160     # Time to wait before timing out on servo initialization.
    161     INIT_TIMEOUT_SECS = 10
    162 
    163 
    164     def __init__(self, servo_host, servo_serial=None):
    165         """Sets up the servo communication infrastructure.
    166 
    167         @param servo_host: A ServoHost object representing
    168                            the host running servod.
    169         @param servo_serial: Serial number of the servo board.
    170         """
    171         # TODO(fdeng): crbug.com/298379
    172         # We should move servo_host object out of servo object
    173         # to minimize the dependencies on the rest of Autotest.
    174         self._servo_host = servo_host
    175         self._servo_serial = servo_serial
    176         self._server = servo_host.get_servod_server_proxy()
    177         self._power_state = _PowerStateController(self)
    178         self._usb_state = None
    179         self._programmer = None
    180 
    181 
    182     @property
    183     def servo_serial(self):
    184         """Returns the serial number of the servo board."""
    185         return self._servo_serial
    186 
    187 
    188     def get_power_state_controller(self):
    189         """Return the power state controller for this Servo.
    190 
    191         The power state controller provides board-independent
    192         interfaces for reset, power-on, power-off operations.
    193 
    194         """
    195         return self._power_state
    196 
    197 
    198     def initialize_dut(self, cold_reset=False):
    199         """Initializes a dut for testing purposes.
    200 
    201         This sets various servo signals back to default values
    202         appropriate for the target board.  By default, if the DUT
    203         is already on, it stays on.  If the DUT is powered off
    204         before initialization, its state afterward is unspecified.
    205 
    206         Rationale:  Basic initialization of servo sets the lid open,
    207         when there is a lid.  This operation won't affect powered on
    208         units; however, setting the lid open may power on a unit
    209         that's off, depending on the board type and previous state
    210         of the device.
    211 
    212         If `cold_reset` is a true value, the DUT and its EC will be
    213         reset, and the DUT rebooted in normal mode.
    214 
    215         @param cold_reset If True, cold reset the device after
    216                           initialization.
    217         """
    218         self._server.hwinit()
    219         self.set('usb_mux_oe1', 'on')
    220         self._usb_state = None
    221         self.switch_usbkey('off')
    222         if cold_reset:
    223             self._power_state.reset()
    224         logging.debug('Servo initialized, version is %s',
    225                       self._server.get_version())
    226 
    227 
    228     def is_localhost(self):
    229         """Is the servod hosted locally?
    230 
    231         Returns:
    232           True if local hosted; otherwise, False.
    233         """
    234         return self._servo_host.is_localhost()
    235 
    236 
    237     def power_long_press(self):
    238         """Simulate a long power button press."""
    239         # After a long power press, the EC may ignore the next power
    240         # button press (at least on Alex).  To guarantee that this
    241         # won't happen, we need to allow the EC one second to
    242         # collect itself.
    243         self._server.power_long_press()
    244 
    245 
    246     def power_normal_press(self):
    247         """Simulate a normal power button press."""
    248         self._server.power_normal_press()
    249 
    250 
    251     def power_short_press(self):
    252         """Simulate a short power button press."""
    253         self._server.power_short_press()
    254 
    255 
    256     def power_key(self, press_secs=''):
    257         """Simulate a power button press.
    258 
    259         @param press_secs : Str. Time to press key.
    260         """
    261         self._server.power_key(press_secs)
    262 
    263 
    264     def lid_open(self):
    265         """Simulate opening the lid and raise exception if all attempts fail"""
    266         self.set('lid_open', 'yes')
    267 
    268 
    269     def lid_close(self):
    270         """Simulate closing the lid and raise exception if all attempts fail
    271 
    272         Waits 6 seconds to ensure the device is fully asleep before returning.
    273         """
    274         self.set('lid_open', 'no')
    275         time.sleep(Servo.SLEEP_DELAY)
    276 
    277     def volume_up(self, timeout=300):
    278         """Simulate pushing the volume down button.
    279 
    280         @param timeout: Timeout for setting the volume.
    281         """
    282         self.set_get_all(['volume_up:yes',
    283                           'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
    284                           'volume_up:no'])
    285         # we need to wait for commands to take effect before moving on
    286         time_left = float(timeout)
    287         while time_left > 0.0:
    288             value = self.get('volume_up')
    289             if value == 'no':
    290                 return
    291             time.sleep(self.SHORT_DELAY)
    292             time_left = time_left - self.SHORT_DELAY
    293         raise error.TestFail("Failed setting volume_up to no")
    294 
    295     def volume_down(self, timeout=300):
    296         """Simulate pushing the volume down button.
    297 
    298         @param timeout: Timeout for setting the volume.
    299         """
    300         self.set_get_all(['volume_down:yes',
    301                           'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
    302                           'volume_down:no'])
    303         # we need to wait for commands to take effect before moving on
    304         time_left = float(timeout)
    305         while time_left > 0.0:
    306             value = self.get('volume_down')
    307             if value == 'no':
    308                 return
    309             time.sleep(self.SHORT_DELAY)
    310             time_left = time_left - self.SHORT_DELAY
    311         raise error.TestFail("Failed setting volume_down to no")
    312 
    313     def ctrl_d(self, press_secs=''):
    314         """Simulate Ctrl-d simultaneous button presses.
    315 
    316         @param press_secs : Str. Time to press key.
    317         """
    318         self._server.ctrl_d(press_secs)
    319 
    320 
    321     def ctrl_u(self):
    322         """Simulate Ctrl-u simultaneous button presses.
    323 
    324         @param press_secs : Str. Time to press key.
    325         """
    326         self._server.ctrl_u()
    327 
    328 
    329     def ctrl_enter(self, press_secs=''):
    330         """Simulate Ctrl-enter simultaneous button presses.
    331 
    332         @param press_secs : Str. Time to press key.
    333         """
    334         self._server.ctrl_enter(press_secs)
    335 
    336 
    337     def d_key(self, press_secs=''):
    338         """Simulate Enter key button press.
    339 
    340         @param press_secs : Str. Time to press key.
    341         """
    342         self._server.d_key(press_secs)
    343 
    344 
    345     def ctrl_key(self, press_secs=''):
    346         """Simulate Enter key button press.
    347 
    348         @param press_secs : Str. Time to press key.
    349         """
    350         self._server.ctrl_key(press_secs)
    351 
    352 
    353     def enter_key(self, press_secs=''):
    354         """Simulate Enter key button press.
    355 
    356         @param press_secs : Str. Time to press key.
    357         """
    358         self._server.enter_key(press_secs)
    359 
    360 
    361     def refresh_key(self, press_secs=''):
    362         """Simulate Refresh key (F3) button press.
    363 
    364         @param press_secs : Str. Time to press key.
    365         """
    366         self._server.refresh_key(press_secs)
    367 
    368 
    369     def ctrl_refresh_key(self, press_secs=''):
    370         """Simulate Ctrl and Refresh (F3) simultaneous press.
    371 
    372         This key combination is an alternative of Space key.
    373 
    374         @param press_secs : Str. Time to press key.
    375         """
    376         self._server.ctrl_refresh_key(press_secs)
    377 
    378 
    379     def imaginary_key(self, press_secs=''):
    380         """Simulate imaginary key button press.
    381 
    382         Maps to a key that doesn't physically exist.
    383 
    384         @param press_secs : Str. Time to press key.
    385         """
    386         self._server.imaginary_key(press_secs)
    387 
    388 
    389     def sysrq_x(self, press_secs=''):
    390         """Simulate Alt VolumeUp X simulataneous press.
    391 
    392         This key combination is the kernel system request (sysrq) X.
    393 
    394         @param press_secs : Str. Time to press key.
    395         """
    396         self._server.sysrq_x(press_secs)
    397 
    398 
    399     def toggle_recovery_switch(self):
    400         """Toggle recovery switch on and off."""
    401         self.enable_recovery_mode()
    402         time.sleep(self.REC_TOGGLE_DELAY)
    403         self.disable_recovery_mode()
    404 
    405 
    406     def enable_recovery_mode(self):
    407         """Enable recovery mode on device."""
    408         self.set('rec_mode', 'on')
    409 
    410 
    411     def disable_recovery_mode(self):
    412         """Disable recovery mode on device."""
    413         self.set('rec_mode', 'off')
    414 
    415 
    416     def toggle_development_switch(self):
    417         """Toggle development switch on and off."""
    418         self.enable_development_mode()
    419         time.sleep(self.DEV_TOGGLE_DELAY)
    420         self.disable_development_mode()
    421 
    422 
    423     def enable_development_mode(self):
    424         """Enable development mode on device."""
    425         self.set('dev_mode', 'on')
    426 
    427 
    428     def disable_development_mode(self):
    429         """Disable development mode on device."""
    430         self.set('dev_mode', 'off')
    431 
    432     def boot_devmode(self):
    433         """Boot a dev-mode device that is powered off."""
    434         self.power_short_press()
    435         self.pass_devmode()
    436 
    437 
    438     def pass_devmode(self):
    439         """Pass through boot screens in dev-mode."""
    440         time.sleep(Servo.BOOT_DELAY)
    441         self.ctrl_d()
    442         time.sleep(Servo.BOOT_DELAY)
    443 
    444 
    445     def get_board(self):
    446         """Get the board connected to servod."""
    447         return self._server.get_board()
    448 
    449 
    450     def get_base_board(self):
    451         """Get the board of the base connected to servod."""
    452         try:
    453             return self._server.get_base_board()
    454         except  xmlrpclib.Fault as e:
    455             # TODO(waihong): Remove the following compatibility check when
    456             # the new versions of hdctools are deployed.
    457             if 'not supported' in str(e):
    458                 logging.warning('The servod is too old that get_base_board '
    459                         'not supported.')
    460                 return ''
    461             raise
    462 
    463 
    464     def get_ec_active_copy(self):
    465         """Get the active copy of the EC image."""
    466         return self.get('ec_active_copy')
    467 
    468 
    469     def _get_xmlrpclib_exception(self, xmlexc):
    470         """Get meaningful exception string from xmlrpc.
    471 
    472         Args:
    473             xmlexc: xmlrpclib.Fault object
    474 
    475         xmlrpclib.Fault.faultString has the following format:
    476 
    477         <type 'exception type'>:'actual error message'
    478 
    479         Parse and return the real exception from the servod side instead of the
    480         less meaningful one like,
    481            <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no
    482            attribute 'hw_driver'">
    483 
    484         Returns:
    485             string of underlying exception raised in servod.
    486         """
    487         return re.sub('^.*>:', '', xmlexc.faultString)
    488 
    489 
    490     def get(self, gpio_name):
    491         """Get the value of a gpio from Servod.
    492 
    493         @param gpio_name Name of the gpio.
    494         """
    495         assert gpio_name
    496         try:
    497             return self._server.get(gpio_name)
    498         except  xmlrpclib.Fault as e:
    499             err_msg = "Getting '%s' :: %s" % \
    500                 (gpio_name, self._get_xmlrpclib_exception(e))
    501             raise error.TestFail(err_msg)
    502 
    503 
    504     def set(self, gpio_name, gpio_value):
    505         """Set and check the value of a gpio using Servod.
    506 
    507         @param gpio_name Name of the gpio.
    508         @param gpio_value New setting for the gpio.
    509         """
    510         self.set_nocheck(gpio_name, gpio_value)
    511         retry_count = Servo.GET_RETRY_MAX
    512         while gpio_value != self.get(gpio_name) and retry_count:
    513             logging.warning("%s != %s, retry %d", gpio_name, gpio_value,
    514                          retry_count)
    515             retry_count -= 1
    516             time.sleep(Servo.SHORT_DELAY)
    517         if not retry_count:
    518             assert gpio_value == self.get(gpio_name), \
    519                 'Servo failed to set %s to %s' % (gpio_name, gpio_value)
    520 
    521 
    522     def set_nocheck(self, gpio_name, gpio_value):
    523         """Set the value of a gpio using Servod.
    524 
    525         @param gpio_name Name of the gpio.
    526         @param gpio_value New setting for the gpio.
    527         """
    528         assert gpio_name and gpio_value
    529         logging.info('Setting %s to %s', gpio_name, gpio_value)
    530         try:
    531             self._server.set(gpio_name, gpio_value)
    532         except  xmlrpclib.Fault as e:
    533             err_msg = "Setting '%s' to '%s' :: %s" % \
    534                 (gpio_name, gpio_value, self._get_xmlrpclib_exception(e))
    535             raise error.TestFail(err_msg)
    536 
    537 
    538     def set_get_all(self, controls):
    539         """Set &| get one or more control values.
    540 
    541         @param controls: list of strings, controls to set &| get.
    542 
    543         @raise: error.TestError in case error occurs setting/getting values.
    544         """
    545         rv = []
    546         try:
    547             logging.info('Set/get all: %s', str(controls))
    548             rv = self._server.set_get_all(controls)
    549         except xmlrpclib.Fault as e:
    550             # TODO(waihong): Remove the following backward compatibility when
    551             # the new versions of hdctools are deployed.
    552             if 'not supported' in str(e):
    553                 logging.warning('The servod is too old that set_get_all '
    554                         'not supported. Use set and get instead.')
    555                 for control in controls:
    556                     if ':' in control:
    557                         (name, value) = control.split(':')
    558                         if name == 'sleep':
    559                             time.sleep(float(value))
    560                         else:
    561                             self.set_nocheck(name, value)
    562                         rv.append(True)
    563                     else:
    564                         rv.append(self.get(name))
    565             else:
    566                 err_msg = "Problem with '%s' :: %s" % \
    567                     (controls, self._get_xmlrpclib_exception(e))
    568                 raise error.TestFail(err_msg)
    569         return rv
    570 
    571 
    572     # TODO(waihong) It may fail if multiple servo's are connected to the same
    573     # host. Should look for a better way, like the USB serial name, to identify
    574     # the USB device.
    575     # TODO(sbasi) Remove this code from autoserv once firmware tests have been
    576     # updated.
    577     def probe_host_usb_dev(self, timeout=_USB_PROBE_TIMEOUT):
    578         """Probe the USB disk device plugged-in the servo from the host side.
    579 
    580         It uses servod to discover if there is a usb device attached to it.
    581 
    582         @param timeout The timeout period when probing for the usb host device.
    583 
    584         @return: String of USB disk path (e.g. '/dev/sdb') or None.
    585         """
    586         return self._server.probe_host_usb_dev(timeout) or None
    587 
    588 
    589     def image_to_servo_usb(self, image_path=None,
    590                            make_image_noninteractive=False):
    591         """Install an image to the USB key plugged into the servo.
    592 
    593         This method may copy any image to the servo USB key including a
    594         recovery image or a test image.  These images are frequently used
    595         for test purposes such as restoring a corrupted image or conducting
    596         an upgrade of ec/fw/kernel as part of a test of a specific image part.
    597 
    598         @param image_path Path on the host to the recovery image.
    599         @param make_image_noninteractive Make the recovery image
    600                                    noninteractive, therefore the DUT
    601                                    will reboot automatically after
    602                                    installation.
    603         """
    604         # We're about to start plugging/unplugging the USB key.  We
    605         # don't know the state of the DUT, or what it might choose
    606         # to do to the device after hotplug.  To avoid surprises,
    607         # force the DUT to be off.
    608         self._server.hwinit()
    609         self._power_state.power_off()
    610 
    611         # Set up Servo's usb mux.
    612         self.switch_usbkey('host')
    613         if image_path:
    614             logging.info('Searching for usb device and copying image to it. '
    615                          'Please wait a few minutes...')
    616             if not self._server.download_image_to_usb(image_path):
    617                 logging.error('Failed to transfer requested image to USB. '
    618                               'Please take a look at Servo Logs.')
    619                 raise error.AutotestError('Download image to usb failed.')
    620             if make_image_noninteractive:
    621                 logging.info('Making image noninteractive')
    622                 if not self._server.make_image_noninteractive():
    623                     logging.error('Failed to make image noninteractive. '
    624                                   'Please take a look at Servo Logs.')
    625 
    626 
    627     def install_recovery_image(self, image_path=None,
    628                                make_image_noninteractive=False):
    629         """Install the recovery image specified by the path onto the DUT.
    630 
    631         This method uses google recovery mode to install a recovery image
    632         onto a DUT through the use of a USB stick that is mounted on a servo
    633         board specified by the usb_dev.  If no image path is specified
    634         we use the recovery image already on the usb image.
    635 
    636         @param image_path: Path on the host to the recovery image.
    637         @param make_image_noninteractive: Make the recovery image
    638                 noninteractive, therefore the DUT will reboot automatically
    639                 after installation.
    640         """
    641         self.image_to_servo_usb(image_path, make_image_noninteractive)
    642         self._power_state.power_on(rec_mode=self._power_state.REC_ON)
    643         self.switch_usbkey('dut')
    644 
    645 
    646     def _scp_image(self, image_path):
    647         """Copy image to the servo host.
    648 
    649         When programming a firmware image on the DUT, the image must be
    650         located on the host to which the servo device is connected. Sometimes
    651         servo is controlled by a remote host, in this case the image needs to
    652         be transferred to the remote host.
    653 
    654         @param image_path: a string, name of the firmware image file to be
    655                transferred.
    656         @return: a string, full path name of the copied file on the remote.
    657         """
    658 
    659         dest_path = os.path.join('/tmp', os.path.basename(image_path))
    660         self._servo_host.send_file(image_path, dest_path)
    661         return dest_path
    662 
    663 
    664     def system(self, command, timeout=3600):
    665         """Execute the passed in command on the servod host.
    666 
    667         @param command Command to be executed.
    668         @param timeout Maximum number of seconds of runtime allowed. Default to
    669                        1 hour.
    670         """
    671         logging.info('Will execute on servo host: %s', command)
    672         self._servo_host.run(command, timeout=timeout)
    673 
    674 
    675     def system_output(self, command, timeout=3600,
    676                       ignore_status=False, args=()):
    677         """Execute the passed in command on the servod host, return stdout.
    678 
    679         @param command a string, the command to execute
    680         @param timeout an int, max number of seconds to wait til command
    681                execution completes. Default to 1 hour.
    682         @param ignore_status a Boolean, if true - ignore command's nonzero exit
    683                status, otherwise an exception will be thrown
    684         @param args a tuple of strings, each becoming a separate command line
    685                parameter for the command
    686         @return command's stdout as a string.
    687         """
    688         return self._servo_host.run(command, timeout=timeout,
    689                                     ignore_status=ignore_status,
    690                                     args=args).stdout.strip()
    691 
    692 
    693     def get_servo_version(self):
    694         """Get the version of the servo, e.g., servo_v2 or servo_v3.
    695 
    696         @return: The version of the servo.
    697 
    698         """
    699         return self._server.get_version()
    700 
    701 
    702     def _initialize_programmer(self, rw_only=False):
    703         """Initialize the firmware programmer.
    704 
    705         @param rw_only: True to initialize a programmer which only
    706                         programs the RW portions.
    707         """
    708         if self._programmer:
    709             return
    710         # Initialize firmware programmer
    711         servo_version = self.get_servo_version()
    712         if servo_version.startswith('servo_v2'):
    713             self._programmer = firmware_programmer.ProgrammerV2(self)
    714             self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self)
    715         # Both servo v3 and v4 use the same programming methods so just leverage
    716         # ProgrammerV3 for servo v4 as well.
    717         elif (servo_version.startswith('servo_v3') or
    718               servo_version.startswith('servo_v4')):
    719             self._programmer = firmware_programmer.ProgrammerV3(self)
    720             self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self)
    721         else:
    722             raise error.TestError(
    723                     'No firmware programmer for servo version: %s' %
    724                          servo_version)
    725 
    726 
    727     def program_bios(self, image, rw_only=False):
    728         """Program bios on DUT with given image.
    729 
    730         @param image: a string, file name of the BIOS image to program
    731                       on the DUT.
    732         @param rw_only: True to only program the RW portion of BIOS.
    733 
    734         """
    735         self._initialize_programmer()
    736         if not self.is_localhost():
    737             image = self._scp_image(image)
    738         if rw_only:
    739             self._programmer_rw.program_bios(image)
    740         else:
    741             self._programmer.program_bios(image)
    742 
    743 
    744     def program_ec(self, image, rw_only=False):
    745         """Program ec on DUT with given image.
    746 
    747         @param image: a string, file name of the EC image to program
    748                       on the DUT.
    749         @param rw_only: True to only program the RW portion of EC.
    750 
    751         """
    752         self._initialize_programmer()
    753         if not self.is_localhost():
    754             image = self._scp_image(image)
    755         if rw_only:
    756            self._programmer_rw.program_ec(image)
    757         else:
    758            self._programmer.program_ec(image)
    759 
    760 
    761     def _switch_usbkey_power(self, power_state, detection_delay=False):
    762         """Switch usbkey power.
    763 
    764         This function switches usbkey power by setting the value of
    765         'prtctl4_pwren'. If the power is already in the
    766         requested state, this function simply returns.
    767 
    768         @param power_state: A string, 'on' or 'off'.
    769         @param detection_delay: A boolean value, if True, sleep
    770                                 for |USB_DETECTION_DELAY| after switching
    771                                 the power on.
    772         """
    773         # TODO(kevcheng): Forgive me for this terrible hack. This is just to
    774         # handle beaglebones that haven't yet updated and have the
    775         # safe_switch_usbkey_power RPC.  I'll remove this once all beaglebones
    776         # have been updated and also think about a better way to handle
    777         # situations like this.
    778         try:
    779             self._server.safe_switch_usbkey_power(power_state)
    780         except Exception:
    781             self.set('prtctl4_pwren', power_state)
    782         if power_state == 'off':
    783             time.sleep(self.USB_POWEROFF_DELAY)
    784         elif detection_delay:
    785             time.sleep(self.USB_DETECTION_DELAY)
    786 
    787 
    788     def switch_usbkey(self, usb_state):
    789         """Connect USB flash stick to either host or DUT, or turn USB port off.
    790 
    791         This function switches the servo multiplexer to provide electrical
    792         connection between the USB port J3 and either host or DUT side. It
    793         can also be used to turn the USB port off.
    794 
    795         Switching to 'dut' or 'host' is accompanied by powercycling
    796         of the USB stick, because it sometimes gets wedged if the mux
    797         is switched while the stick power is on.
    798 
    799         @param usb_state: A string, one of 'dut', 'host', or 'off'.
    800                           'dut' and 'host' indicate which side the
    801                           USB flash device is required to be connected to.
    802                           'off' indicates turning the USB port off.
    803 
    804         @raise: error.TestError in case the parameter is not 'dut'
    805                 'host', or 'off'.
    806         """
    807         if self.get_usbkey_direction() == usb_state:
    808             return
    809 
    810         if usb_state == 'off':
    811             self._switch_usbkey_power('off')
    812             self._usb_state = usb_state
    813             return
    814         elif usb_state == 'host':
    815             mux_direction = 'servo_sees_usbkey'
    816         elif usb_state == 'dut':
    817             mux_direction = 'dut_sees_usbkey'
    818         else:
    819             raise error.TestError('Unknown USB state request: %s' % usb_state)
    820 
    821         self._switch_usbkey_power('off')
    822         # TODO(kevcheng): Forgive me for this terrible hack. This is just to
    823         # handle beaglebones that haven't yet updated and have the
    824         # safe_switch_usbkey RPC.  I'll remove this once all beaglebones have
    825         # been updated and also think about a better way to handle situations
    826         # like this.
    827         try:
    828             self._server.safe_switch_usbkey(mux_direction)
    829         except Exception:
    830             self.set('usb_mux_sel1', mux_direction)
    831         time.sleep(self.USB_POWEROFF_DELAY)
    832         self._switch_usbkey_power('on', usb_state == 'host')
    833         self._usb_state = usb_state
    834 
    835 
    836     def get_usbkey_direction(self):
    837         """Get which side USB is connected to or 'off' if usb power is off.
    838 
    839         @return: A string, one of 'dut', 'host', or 'off'.
    840         """
    841         if not self._usb_state:
    842             if self.get('prtctl4_pwren') == 'off':
    843                 self._usb_state = 'off'
    844             elif self.get('usb_mux_sel1').startswith('dut'):
    845                 self._usb_state = 'dut'
    846             else:
    847                 self._usb_state = 'host'
    848         return self._usb_state
    849