Home | History | Annotate | Download | only in servo
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 #
      5 # Expects to be run in an environment with sudo and no interactive password
      6 # prompt, such as within the Chromium OS development chroot.
      7 
      8 import ast
      9 import logging
     10 import os
     11 import re
     12 import time
     13 import xmlrpclib
     14 
     15 from autotest_lib.client.common_lib import error
     16 from autotest_lib.server import utils as server_utils
     17 from autotest_lib.server.cros.servo import firmware_programmer
     18 
     19 # Time to wait when probing for a usb device, it takes on avg 17 seconds
     20 # to do a full probe.
     21 _USB_PROBE_TIMEOUT = 40
     22 
     23 
     24 def _extract_image_from_tarball(tarball, dest_dir, image_candidates):
     25     """Try extracting the image_candidates from the tarball.
     26 
     27     @param tarball: The path of the tarball.
     28     @param dest_path: The path of the destination.
     29     @param image_candidates: A tuple of the paths of image candidates.
     30 
     31     @return: The first path from the image candidates, which succeeds, or None
     32              if all the image candidates fail.
     33     """
     34     for image in image_candidates:
     35         status = server_utils.system(
     36                 ('tar xf %s -C %s %s' % (tarball, dest_dir, image)),
     37                 timeout=60, ignore_status=True)
     38         if status == 0:
     39             return image
     40     return None
     41 
     42 
     43 class _PowerStateController(object):
     44 
     45     """Class to provide board-specific power operations.
     46 
     47     This class is responsible for "power on" and "power off"
     48     operations that can operate without making assumptions in
     49     advance about board state.  It offers an interface that
     50     abstracts out the different sequences required for different
     51     board types.
     52 
     53     """
     54 
     55     # Constants acceptable to be passed for the `rec_mode` parameter
     56     # to power_on().
     57     #
     58     # REC_ON:  Boot the DUT in recovery mode, i.e. boot from USB or
     59     #   SD card.
     60     # REC_OFF:  Boot in normal mode, i.e. boot from internal storage.
     61 
     62     REC_ON = 'rec'
     63     REC_OFF = 'on'
     64     REC_ON_FORCE_MRC = 'rec_force_mrc'
     65 
     66     # Delay in seconds needed between asserting and de-asserting
     67     # warm reset.
     68     _RESET_HOLD_TIME = 0.5
     69 
     70     def __init__(self, servo):
     71         """Initialize the power state control.
     72 
     73         @param servo Servo object providing the underlying `set` and `get`
     74                      methods for the target controls.
     75 
     76         """
     77         self._servo = servo
     78 
     79     def reset(self):
     80         """Force the DUT to reset.
     81 
     82         The DUT is guaranteed to be on at the end of this call,
     83         regardless of its previous state, provided that there is
     84         working OS software. This also guarantees that the EC has
     85         been restarted.
     86 
     87         """
     88         self._servo.set_nocheck('power_state', 'reset')
     89 
     90     def warm_reset(self):
     91         """Apply warm reset to the DUT.
     92 
     93         This asserts, then de-asserts the 'warm_reset' signal.
     94         Generally, this causes the board to restart.
     95 
     96         """
     97         self._servo.set_get_all(['warm_reset:on',
     98                                  'sleep:%.4f' % self._RESET_HOLD_TIME,
     99                                  'warm_reset:off'])
    100 
    101     def power_off(self):
    102         """Force the DUT to power off.
    103 
    104         The DUT is guaranteed to be off at the end of this call,
    105         regardless of its previous state, provided that there is
    106         working EC and boot firmware.  There is no requirement for
    107         working OS software.
    108 
    109         """
    110         self._servo.set_nocheck('power_state', 'off')
    111 
    112     def power_on(self, rec_mode=REC_OFF):
    113         """Force the DUT to power on.
    114 
    115         Prior to calling this function, the DUT must be powered off,
    116         e.g. with a call to `power_off()`.
    117 
    118         At power on, recovery mode is set as specified by the
    119         corresponding argument.  When booting with recovery mode on, it
    120         is the caller's responsibility to unplug/plug in a bootable
    121         external storage device.
    122 
    123         If the DUT requires a delay after powering on but before
    124         processing inputs such as USB stick insertion, the delay is
    125         handled by this method; the caller is not responsible for such
    126         delays.
    127 
    128         @param rec_mode Setting of recovery mode to be applied at
    129                         power on. default: REC_OFF aka 'off'
    130 
    131         """
    132         self._servo.set_nocheck('power_state', rec_mode)
    133 
    134 
    135 class _Uart(object):
    136     """Class to capture CPU/EC UART streams."""
    137     def __init__(self, servo):
    138         self._servo = servo
    139         self._streams = []
    140         self._logs_dir = None
    141 
    142     def start_capture(self):
    143         """Start capturing Uart streams."""
    144         logging.debug('Start capturing CPU/EC UART.')
    145         self._servo.set('cpu_uart_capture', 'on')
    146         self._streams.append(('cpu_uart_stream', 'cpu_uart.log'))
    147         try:
    148             self._servo.set('ec_uart_capture', 'on')
    149             self._streams.append(('ec_uart_stream', 'ec_uart.log'))
    150         except error.TestFail as err:
    151             if 'No control named' in str(err):
    152                 logging.debug('The servod is too old that ec_uart_capture not '
    153                               'supported.')
    154 
    155     def dump(self):
    156         """Dump UART streams to log files accordingly."""
    157         if not self._logs_dir:
    158             return
    159 
    160         for stream, logfile in self._streams:
    161             logfile_fullname = os.path.join(self._logs_dir, logfile)
    162             try:
    163                 content = self._servo.get(stream)
    164             except Exception as err:
    165                 logging.warn('Failed to get UART log for %s: %s', stream, err)
    166                 continue
    167 
    168             # The UART stream may contain non-printable characters, and servo
    169             # returns it in string representation. We use `ast.leteral_eval`
    170             # to revert it back.
    171             with open(logfile_fullname, 'a') as fd:
    172                 fd.write(ast.literal_eval(content))
    173 
    174     def stop_capture(self):
    175         """Stop capturing UART streams."""
    176         logging.debug('Stop capturing CPU/EC UART.')
    177         for uart in ('cpu_uart_capture', 'ec_uart_capture'):
    178             try:
    179                 self._servo.set(uart, 'off')
    180             except error.TestFail as err:
    181                 if 'No control named' in str(err):
    182                     logging.debug('The servod is too old that %s not '
    183                                   'supported.', uart)
    184             except Exception as err:
    185                 logging.warn('Failed to stop UART logging for %s: %s', uart,
    186                              err)
    187 
    188     @property
    189     def logs_dir(self):
    190         """Return the directory to save UART logs."""
    191         return self._logs_dir
    192 
    193     @logs_dir.setter
    194     def logs_dir(self, a_dir):
    195         """Set directory to save UART logs.
    196 
    197         @param a_dir  String of logs directory name."""
    198         self._logs_dir = a_dir
    199 
    200 
    201 class Servo(object):
    202 
    203     """Manages control of a Servo board.
    204 
    205     Servo is a board developed by hardware group to aide in the debug and
    206     control of various partner devices. Servo's features include the simulation
    207     of pressing the power button, closing the lid, and pressing Ctrl-d. This
    208     class manages setting up and communicating with a servo demon (servod)
    209     process. It provides both high-level functions for common servo tasks and
    210     low-level functions for directly setting and reading gpios.
    211 
    212     """
    213 
    214     # Power button press delays in seconds.
    215     #
    216     # The EC specification says that 8.0 seconds should be enough
    217     # for the long power press.  However, some platforms need a bit
    218     # more time.  Empirical testing has found these requirements:
    219     #   Alex: 8.2 seconds
    220     #   ZGB:  8.5 seconds
    221     # The actual value is set to the largest known necessary value.
    222     #
    223     # TODO(jrbarnette) Being generous is the right thing to do for
    224     # existing platforms, but if this code is to be used for
    225     # qualification of new hardware, we should be less generous.
    226     SHORT_DELAY = 0.1
    227 
    228     # Maximum number of times to re-read power button on release.
    229     GET_RETRY_MAX = 10
    230 
    231     # Delays to deal with DUT state transitions.
    232     SLEEP_DELAY = 6
    233     BOOT_DELAY = 10
    234 
    235     # Default minimum time interval between 'press' and 'release'
    236     # keyboard events.
    237     SERVO_KEY_PRESS_DELAY = 0.1
    238 
    239     # Time to toggle recovery switch on and off.
    240     REC_TOGGLE_DELAY = 0.1
    241 
    242     # Time to toggle development switch on and off.
    243     DEV_TOGGLE_DELAY = 0.1
    244 
    245     # Time between an usb disk plugged-in and detected in the system.
    246     USB_DETECTION_DELAY = 10
    247     # Time to keep USB power off before and after USB mux direction is changed
    248     USB_POWEROFF_DELAY = 2
    249 
    250     # Time to wait before timing out on servo initialization.
    251     INIT_TIMEOUT_SECS = 10
    252 
    253 
    254     def __init__(self, servo_host, servo_serial=None):
    255         """Sets up the servo communication infrastructure.
    256 
    257         @param servo_host: A ServoHost object representing
    258                            the host running servod.
    259         @param servo_serial: Serial number of the servo board.
    260         """
    261         # TODO(fdeng): crbug.com/298379
    262         # We should move servo_host object out of servo object
    263         # to minimize the dependencies on the rest of Autotest.
    264         self._servo_host = servo_host
    265         self._servo_serial = servo_serial
    266         self._server = servo_host.get_servod_server_proxy()
    267         self._power_state = _PowerStateController(self)
    268         self._uart = _Uart(self)
    269         self._usb_state = None
    270         self._programmer = None
    271 
    272 
    273     @property
    274     def servo_serial(self):
    275         """Returns the serial number of the servo board."""
    276         return self._servo_serial
    277 
    278 
    279     def get_power_state_controller(self):
    280         """Return the power state controller for this Servo.
    281 
    282         The power state controller provides board-independent
    283         interfaces for reset, power-on, power-off operations.
    284 
    285         """
    286         return self._power_state
    287 
    288 
    289     def initialize_dut(self, cold_reset=False):
    290         """Initializes a dut for testing purposes.
    291 
    292         This sets various servo signals back to default values
    293         appropriate for the target board.  By default, if the DUT
    294         is already on, it stays on.  If the DUT is powered off
    295         before initialization, its state afterward is unspecified.
    296 
    297         Rationale:  Basic initialization of servo sets the lid open,
    298         when there is a lid.  This operation won't affect powered on
    299         units; however, setting the lid open may power on a unit
    300         that's off, depending on the board type and previous state
    301         of the device.
    302 
    303         If `cold_reset` is a true value, the DUT and its EC will be
    304         reset, and the DUT rebooted in normal mode.
    305 
    306         @param cold_reset If True, cold reset the device after
    307                           initialization.
    308         """
    309         self._server.hwinit()
    310         self.set('usb_mux_oe1', 'on')
    311         self._usb_state = None
    312         self.switch_usbkey('off')
    313         self._uart.start_capture()
    314         if cold_reset:
    315             self._power_state.reset()
    316         logging.debug('Servo initialized, version is %s',
    317                       self._server.get_version())
    318 
    319 
    320     def is_localhost(self):
    321         """Is the servod hosted locally?
    322 
    323         Returns:
    324           True if local hosted; otherwise, False.
    325         """
    326         return self._servo_host.is_localhost()
    327 
    328 
    329     def power_long_press(self):
    330         """Simulate a long power button press."""
    331         # After a long power press, the EC may ignore the next power
    332         # button press (at least on Alex).  To guarantee that this
    333         # won't happen, we need to allow the EC one second to
    334         # collect itself.
    335         self._server.power_long_press()
    336 
    337 
    338     def power_normal_press(self):
    339         """Simulate a normal power button press."""
    340         self._server.power_normal_press()
    341 
    342 
    343     def power_short_press(self):
    344         """Simulate a short power button press."""
    345         self._server.power_short_press()
    346 
    347 
    348     def power_key(self, press_secs=''):
    349         """Simulate a power button press.
    350 
    351         @param press_secs : Str. Time to press key.
    352         """
    353         self._server.power_key(press_secs)
    354 
    355 
    356     def lid_open(self):
    357         """Simulate opening the lid and raise exception if all attempts fail"""
    358         self.set('lid_open', 'yes')
    359 
    360 
    361     def lid_close(self):
    362         """Simulate closing the lid and raise exception if all attempts fail
    363 
    364         Waits 6 seconds to ensure the device is fully asleep before returning.
    365         """
    366         self.set('lid_open', 'no')
    367         time.sleep(Servo.SLEEP_DELAY)
    368 
    369     def volume_up(self, timeout=300):
    370         """Simulate pushing the volume down button.
    371 
    372         @param timeout: Timeout for setting the volume.
    373         """
    374         self.set_get_all(['volume_up:yes',
    375                           'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
    376                           'volume_up:no'])
    377         # we need to wait for commands to take effect before moving on
    378         time_left = float(timeout)
    379         while time_left > 0.0:
    380             value = self.get('volume_up')
    381             if value == 'no':
    382                 return
    383             time.sleep(self.SHORT_DELAY)
    384             time_left = time_left - self.SHORT_DELAY
    385         raise error.TestFail("Failed setting volume_up to no")
    386 
    387     def volume_down(self, timeout=300):
    388         """Simulate pushing the volume down button.
    389 
    390         @param timeout: Timeout for setting the volume.
    391         """
    392         self.set_get_all(['volume_down:yes',
    393                           'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
    394                           'volume_down:no'])
    395         # we need to wait for commands to take effect before moving on
    396         time_left = float(timeout)
    397         while time_left > 0.0:
    398             value = self.get('volume_down')
    399             if value == 'no':
    400                 return
    401             time.sleep(self.SHORT_DELAY)
    402             time_left = time_left - self.SHORT_DELAY
    403         raise error.TestFail("Failed setting volume_down to no")
    404 
    405     def ctrl_d(self, press_secs=''):
    406         """Simulate Ctrl-d simultaneous button presses.
    407 
    408         @param press_secs : Str. Time to press key.
    409         """
    410         self._server.ctrl_d(press_secs)
    411 
    412 
    413     def ctrl_u(self):
    414         """Simulate Ctrl-u simultaneous button presses.
    415 
    416         @param press_secs : Str. Time to press key.
    417         """
    418         self._server.ctrl_u()
    419 
    420 
    421     def ctrl_enter(self, press_secs=''):
    422         """Simulate Ctrl-enter simultaneous button presses.
    423 
    424         @param press_secs : Str. Time to press key.
    425         """
    426         self._server.ctrl_enter(press_secs)
    427 
    428 
    429     def d_key(self, press_secs=''):
    430         """Simulate Enter key button press.
    431 
    432         @param press_secs : Str. Time to press key.
    433         """
    434         self._server.d_key(press_secs)
    435 
    436 
    437     def ctrl_key(self, press_secs=''):
    438         """Simulate Enter key button press.
    439 
    440         @param press_secs : Str. Time to press key.
    441         """
    442         self._server.ctrl_key(press_secs)
    443 
    444 
    445     def enter_key(self, press_secs=''):
    446         """Simulate Enter key button press.
    447 
    448         @param press_secs : Str. Time to press key.
    449         """
    450         self._server.enter_key(press_secs)
    451 
    452 
    453     def refresh_key(self, press_secs=''):
    454         """Simulate Refresh key (F3) button press.
    455 
    456         @param press_secs : Str. Time to press key.
    457         """
    458         self._server.refresh_key(press_secs)
    459 
    460 
    461     def ctrl_refresh_key(self, press_secs=''):
    462         """Simulate Ctrl and Refresh (F3) simultaneous press.
    463 
    464         This key combination is an alternative of Space key.
    465 
    466         @param press_secs : Str. Time to press key.
    467         """
    468         self._server.ctrl_refresh_key(press_secs)
    469 
    470 
    471     def imaginary_key(self, press_secs=''):
    472         """Simulate imaginary key button press.
    473 
    474         Maps to a key that doesn't physically exist.
    475 
    476         @param press_secs : Str. Time to press key.
    477         """
    478         self._server.imaginary_key(press_secs)
    479 
    480 
    481     def sysrq_x(self, press_secs=''):
    482         """Simulate Alt VolumeUp X simulataneous press.
    483 
    484         This key combination is the kernel system request (sysrq) X.
    485 
    486         @param press_secs : Str. Time to press key.
    487         """
    488         self._server.sysrq_x(press_secs)
    489 
    490 
    491     def toggle_recovery_switch(self):
    492         """Toggle recovery switch on and off."""
    493         self.enable_recovery_mode()
    494         time.sleep(self.REC_TOGGLE_DELAY)
    495         self.disable_recovery_mode()
    496 
    497 
    498     def enable_recovery_mode(self):
    499         """Enable recovery mode on device."""
    500         self.set('rec_mode', 'on')
    501 
    502 
    503     def disable_recovery_mode(self):
    504         """Disable recovery mode on device."""
    505         self.set('rec_mode', 'off')
    506 
    507 
    508     def toggle_development_switch(self):
    509         """Toggle development switch on and off."""
    510         self.enable_development_mode()
    511         time.sleep(self.DEV_TOGGLE_DELAY)
    512         self.disable_development_mode()
    513 
    514 
    515     def enable_development_mode(self):
    516         """Enable development mode on device."""
    517         self.set('dev_mode', 'on')
    518 
    519 
    520     def disable_development_mode(self):
    521         """Disable development mode on device."""
    522         self.set('dev_mode', 'off')
    523 
    524     def boot_devmode(self):
    525         """Boot a dev-mode device that is powered off."""
    526         self.power_short_press()
    527         self.pass_devmode()
    528 
    529 
    530     def pass_devmode(self):
    531         """Pass through boot screens in dev-mode."""
    532         time.sleep(Servo.BOOT_DELAY)
    533         self.ctrl_d()
    534         time.sleep(Servo.BOOT_DELAY)
    535 
    536 
    537     def get_board(self):
    538         """Get the board connected to servod."""
    539         return self._server.get_board()
    540 
    541 
    542     def get_base_board(self):
    543         """Get the board of the base connected to servod."""
    544         try:
    545             return self._server.get_base_board()
    546         except  xmlrpclib.Fault as e:
    547             # TODO(waihong): Remove the following compatibility check when
    548             # the new versions of hdctools are deployed.
    549             if 'not supported' in str(e):
    550                 logging.warning('The servod is too old that get_base_board '
    551                         'not supported.')
    552                 return ''
    553             raise
    554 
    555 
    556     def get_ec_active_copy(self):
    557         """Get the active copy of the EC image."""
    558         return self.get('ec_active_copy')
    559 
    560 
    561     def _get_xmlrpclib_exception(self, xmlexc):
    562         """Get meaningful exception string from xmlrpc.
    563 
    564         Args:
    565             xmlexc: xmlrpclib.Fault object
    566 
    567         xmlrpclib.Fault.faultString has the following format:
    568 
    569         <type 'exception type'>:'actual error message'
    570 
    571         Parse and return the real exception from the servod side instead of the
    572         less meaningful one like,
    573            <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no
    574            attribute 'hw_driver'">
    575 
    576         Returns:
    577             string of underlying exception raised in servod.
    578         """
    579         return re.sub('^.*>:', '', xmlexc.faultString)
    580 
    581 
    582     def get(self, gpio_name):
    583         """Get the value of a gpio from Servod.
    584 
    585         @param gpio_name Name of the gpio.
    586         """
    587         assert gpio_name
    588         try:
    589             return self._server.get(gpio_name)
    590         except  xmlrpclib.Fault as e:
    591             err_msg = "Getting '%s' :: %s" % \
    592                 (gpio_name, self._get_xmlrpclib_exception(e))
    593             raise error.TestFail(err_msg)
    594 
    595 
    596     def set(self, gpio_name, gpio_value):
    597         """Set and check the value of a gpio using Servod.
    598 
    599         @param gpio_name Name of the gpio.
    600         @param gpio_value New setting for the gpio.
    601         """
    602         self.set_nocheck(gpio_name, gpio_value)
    603         retry_count = Servo.GET_RETRY_MAX
    604         while gpio_value != self.get(gpio_name) and retry_count:
    605             logging.warning("%s != %s, retry %d", gpio_name, gpio_value,
    606                          retry_count)
    607             retry_count -= 1
    608             time.sleep(Servo.SHORT_DELAY)
    609         if not retry_count:
    610             assert gpio_value == self.get(gpio_name), \
    611                 'Servo failed to set %s to %s' % (gpio_name, gpio_value)
    612 
    613 
    614     def set_nocheck(self, gpio_name, gpio_value):
    615         """Set the value of a gpio using Servod.
    616 
    617         @param gpio_name Name of the gpio.
    618         @param gpio_value New setting for the gpio.
    619         """
    620         assert gpio_name and gpio_value
    621         logging.info('Setting %s to %r', gpio_name, gpio_value)
    622         try:
    623             self._server.set(gpio_name, gpio_value)
    624         except  xmlrpclib.Fault as e:
    625             err_msg = "Setting '%s' to %r :: %s" % \
    626                 (gpio_name, gpio_value, self._get_xmlrpclib_exception(e))
    627             raise error.TestFail(err_msg)
    628 
    629 
    630     def set_get_all(self, controls):
    631         """Set &| get one or more control values.
    632 
    633         @param controls: list of strings, controls to set &| get.
    634 
    635         @raise: error.TestError in case error occurs setting/getting values.
    636         """
    637         rv = []
    638         try:
    639             logging.info('Set/get all: %s', str(controls))
    640             rv = self._server.set_get_all(controls)
    641         except xmlrpclib.Fault as e:
    642             # TODO(waihong): Remove the following backward compatibility when
    643             # the new versions of hdctools are deployed.
    644             if 'not supported' in str(e):
    645                 logging.warning('The servod is too old that set_get_all '
    646                         'not supported. Use set and get instead.')
    647                 for control in controls:
    648                     if ':' in control:
    649                         (name, value) = control.split(':')
    650                         if name == 'sleep':
    651                             time.sleep(float(value))
    652                         else:
    653                             self.set_nocheck(name, value)
    654                         rv.append(True)
    655                     else:
    656                         rv.append(self.get(name))
    657             else:
    658                 err_msg = "Problem with '%s' :: %s" % \
    659                     (controls, self._get_xmlrpclib_exception(e))
    660                 raise error.TestFail(err_msg)
    661         return rv
    662 
    663 
    664     # TODO(waihong) It may fail if multiple servo's are connected to the same
    665     # host. Should look for a better way, like the USB serial name, to identify
    666     # the USB device.
    667     # TODO(sbasi) Remove this code from autoserv once firmware tests have been
    668     # updated.
    669     def probe_host_usb_dev(self, timeout=_USB_PROBE_TIMEOUT):
    670         """Probe the USB disk device plugged-in the servo from the host side.
    671 
    672         It uses servod to discover if there is a usb device attached to it.
    673 
    674         @param timeout The timeout period when probing for the usb host device.
    675 
    676         @return: String of USB disk path (e.g. '/dev/sdb') or None.
    677         """
    678         # Set up Servo's usb mux.
    679         self.switch_usbkey('host')
    680         return self._server.probe_host_usb_dev(timeout) or None
    681 
    682 
    683     def image_to_servo_usb(self, image_path=None,
    684                            make_image_noninteractive=False):
    685         """Install an image to the USB key plugged into the servo.
    686 
    687         This method may copy any image to the servo USB key including a
    688         recovery image or a test image.  These images are frequently used
    689         for test purposes such as restoring a corrupted image or conducting
    690         an upgrade of ec/fw/kernel as part of a test of a specific image part.
    691 
    692         @param image_path Path on the host to the recovery image.
    693         @param make_image_noninteractive Make the recovery image
    694                                    noninteractive, therefore the DUT
    695                                    will reboot automatically after
    696                                    installation.
    697         """
    698         # We're about to start plugging/unplugging the USB key.  We
    699         # don't know the state of the DUT, or what it might choose
    700         # to do to the device after hotplug.  To avoid surprises,
    701         # force the DUT to be off.
    702         self._server.hwinit()
    703         self._power_state.power_off()
    704 
    705         # Set up Servo's usb mux.
    706         self.switch_usbkey('host')
    707         if image_path:
    708             logging.info('Searching for usb device and copying image to it. '
    709                          'Please wait a few minutes...')
    710             if not self._server.download_image_to_usb(image_path):
    711                 logging.error('Failed to transfer requested image to USB. '
    712                               'Please take a look at Servo Logs.')
    713                 raise error.AutotestError('Download image to usb failed.')
    714             if make_image_noninteractive:
    715                 logging.info('Making image noninteractive')
    716                 if not self._server.make_image_noninteractive():
    717                     logging.error('Failed to make image noninteractive. '
    718                                   'Please take a look at Servo Logs.')
    719 
    720     def boot_in_recovery_mode(self):
    721         """Boot host DUT in recovery mode."""
    722         self._power_state.power_on(rec_mode=self._power_state.REC_ON)
    723         self.switch_usbkey('dut')
    724 
    725 
    726     def install_recovery_image(self, image_path=None,
    727                                make_image_noninteractive=False):
    728         """Install the recovery image specified by the path onto the DUT.
    729 
    730         This method uses google recovery mode to install a recovery image
    731         onto a DUT through the use of a USB stick that is mounted on a servo
    732         board specified by the usb_dev.  If no image path is specified
    733         we use the recovery image already on the usb image.
    734 
    735         @param image_path: Path on the host to the recovery image.
    736         @param make_image_noninteractive: Make the recovery image
    737                 noninteractive, therefore the DUT will reboot automatically
    738                 after installation.
    739         """
    740         self.image_to_servo_usb(image_path, make_image_noninteractive)
    741         self.boot_in_recovery_mode()
    742 
    743 
    744     def _scp_image(self, image_path):
    745         """Copy image to the servo host.
    746 
    747         When programming a firmware image on the DUT, the image must be
    748         located on the host to which the servo device is connected. Sometimes
    749         servo is controlled by a remote host, in this case the image needs to
    750         be transferred to the remote host.
    751 
    752         @param image_path: a string, name of the firmware image file to be
    753                transferred.
    754         @return: a string, full path name of the copied file on the remote.
    755         """
    756 
    757         dest_path = os.path.join('/tmp', os.path.basename(image_path))
    758         self._servo_host.send_file(image_path, dest_path)
    759         return dest_path
    760 
    761 
    762     def system(self, command, timeout=3600):
    763         """Execute the passed in command on the servod host.
    764 
    765         @param command Command to be executed.
    766         @param timeout Maximum number of seconds of runtime allowed. Default to
    767                        1 hour.
    768         """
    769         logging.info('Will execute on servo host: %s', command)
    770         self._servo_host.run(command, timeout=timeout)
    771 
    772 
    773     def system_output(self, command, timeout=3600,
    774                       ignore_status=False, args=()):
    775         """Execute the passed in command on the servod host, return stdout.
    776 
    777         @param command a string, the command to execute
    778         @param timeout an int, max number of seconds to wait til command
    779                execution completes. Default to 1 hour.
    780         @param ignore_status a Boolean, if true - ignore command's nonzero exit
    781                status, otherwise an exception will be thrown
    782         @param args a tuple of strings, each becoming a separate command line
    783                parameter for the command
    784         @return command's stdout as a string.
    785         """
    786         return self._servo_host.run(command, timeout=timeout,
    787                                     ignore_status=ignore_status,
    788                                     args=args).stdout.strip()
    789 
    790 
    791     def get_servo_version(self):
    792         """Get the version of the servo, e.g., servo_v2 or servo_v3.
    793 
    794         @return: The version of the servo.
    795 
    796         """
    797         return self._server.get_version()
    798 
    799 
    800     def _initialize_programmer(self, rw_only=False):
    801         """Initialize the firmware programmer.
    802 
    803         @param rw_only: True to initialize a programmer which only
    804                         programs the RW portions.
    805         """
    806         if self._programmer:
    807             return
    808         # Initialize firmware programmer
    809         servo_version = self.get_servo_version()
    810         if servo_version.startswith('servo_v2'):
    811             self._programmer = firmware_programmer.ProgrammerV2(self)
    812             self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self)
    813         # Both servo v3 and v4 use the same programming methods so just leverage
    814         # ProgrammerV3 for servo v4 as well.
    815         elif (servo_version.startswith('servo_v3') or
    816               servo_version.startswith('servo_v4')):
    817             self._programmer = firmware_programmer.ProgrammerV3(self)
    818             self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self)
    819         else:
    820             raise error.TestError(
    821                     'No firmware programmer for servo version: %s' %
    822                     servo_version)
    823 
    824 
    825     def program_bios(self, image, rw_only=False):
    826         """Program bios on DUT with given image.
    827 
    828         @param image: a string, file name of the BIOS image to program
    829                       on the DUT.
    830         @param rw_only: True to only program the RW portion of BIOS.
    831 
    832         """
    833         self._initialize_programmer()
    834         if not self.is_localhost():
    835             image = self._scp_image(image)
    836         if rw_only:
    837             self._programmer_rw.program_bios(image)
    838         else:
    839             self._programmer.program_bios(image)
    840 
    841 
    842     def program_ec(self, image, rw_only=False):
    843         """Program ec on DUT with given image.
    844 
    845         @param image: a string, file name of the EC image to program
    846                       on the DUT.
    847         @param rw_only: True to only program the RW portion of EC.
    848 
    849         """
    850         self._initialize_programmer()
    851         if not self.is_localhost():
    852             image = self._scp_image(image)
    853         if rw_only:
    854             self._programmer_rw.program_ec(image)
    855         else:
    856             self._programmer.program_ec(image)
    857 
    858 
    859     def _reprogram(self, tarball_path, firmware_name, image_candidates,
    860                    rw_only):
    861         """Helper function to reprogram firmware for EC or BIOS.
    862 
    863         @param tarball_path: The path of the downloaded build tarball.
    864         @param: firmware_name: either 'EC' or 'BIOS'.
    865         @param image_candidates: A tuple of the paths of image candidates.
    866         @param rw_only: True to only install firmware to its RW portions. Keep
    867                 the RO portions unchanged.
    868 
    869         @raise: TestError if cannot extract firmware from the tarball.
    870         """
    871         dest_dir = os.path.dirname(tarball_path)
    872         image = _extract_image_from_tarball(tarball_path, dest_dir,
    873                                             image_candidates)
    874         if not image:
    875             if firmware_name == 'EC':
    876                 logging.info('Not a Chrome EC, ignore re-programming it')
    877                 return
    878             else:
    879                 raise error.TestError('Failed to extract the %s image from '
    880                                       'tarball' % firmware_name)
    881 
    882         logging.info('Will re-program %s %snow', firmware_name,
    883                      'RW ' if rw_only else '')
    884 
    885         if firmware_name == 'EC':
    886             self.program_ec(os.path.join(dest_dir, image), rw_only)
    887         else:
    888             self.program_bios(os.path.join(dest_dir, image), rw_only)
    889 
    890 
    891     def program_firmware(self, model, tarball_path, rw_only=False):
    892         """Program firmware (EC, if applied, and BIOS) of the DUT.
    893 
    894         @param model: The DUT model name.
    895         @param tarball_path: The path of the downloaded build tarball.
    896         @param rw_only: True to only install firmware to its RW portions. Keep
    897                 the RO portions unchanged.
    898         """
    899         ap_image_candidates = ('image.bin', 'image-%s.bin' % model)
    900         ec_image_candidates = ('ec.bin', '%s/ec.bin' % model)
    901 
    902         self._reprogram(tarball_path, 'EC', ec_image_candidates, rw_only)
    903         self._reprogram(tarball_path, 'BIOS', ap_image_candidates, rw_only)
    904 
    905         self.get_power_state_controller().reset()
    906         time.sleep(Servo.BOOT_DELAY)
    907 
    908 
    909     def _switch_usbkey_power(self, power_state, detection_delay=False):
    910         """Switch usbkey power.
    911 
    912         This function switches usbkey power by setting the value of
    913         'prtctl4_pwren'. If the power is already in the
    914         requested state, this function simply returns.
    915 
    916         @param power_state: A string, 'on' or 'off'.
    917         @param detection_delay: A boolean value, if True, sleep
    918                                 for |USB_DETECTION_DELAY| after switching
    919                                 the power on.
    920         """
    921         # TODO(kevcheng): Forgive me for this terrible hack. This is just to
    922         # handle beaglebones that haven't yet updated and have the
    923         # safe_switch_usbkey_power RPC.  I'll remove this once all beaglebones
    924         # have been updated and also think about a better way to handle
    925         # situations like this.
    926         try:
    927             self._server.safe_switch_usbkey_power(power_state)
    928         except Exception:
    929             self.set('prtctl4_pwren', power_state)
    930         if power_state == 'off':
    931             time.sleep(self.USB_POWEROFF_DELAY)
    932         elif detection_delay:
    933             time.sleep(self.USB_DETECTION_DELAY)
    934 
    935 
    936     def switch_usbkey(self, usb_state):
    937         """Connect USB flash stick to either host or DUT, or turn USB port off.
    938 
    939         This function switches the servo multiplexer to provide electrical
    940         connection between the USB port J3 and either host or DUT side. It
    941         can also be used to turn the USB port off.
    942 
    943         Switching to 'dut' or 'host' is accompanied by powercycling
    944         of the USB stick, because it sometimes gets wedged if the mux
    945         is switched while the stick power is on.
    946 
    947         @param usb_state: A string, one of 'dut', 'host', or 'off'.
    948                           'dut' and 'host' indicate which side the
    949                           USB flash device is required to be connected to.
    950                           'off' indicates turning the USB port off.
    951 
    952         @raise: error.TestError in case the parameter is not 'dut'
    953                 'host', or 'off'.
    954         """
    955         if self.get_usbkey_direction() == usb_state:
    956             return
    957 
    958         if usb_state == 'off':
    959             self._switch_usbkey_power('off')
    960             self._usb_state = usb_state
    961             return
    962         elif usb_state == 'host':
    963             mux_direction = 'servo_sees_usbkey'
    964         elif usb_state == 'dut':
    965             mux_direction = 'dut_sees_usbkey'
    966         else:
    967             raise error.TestError('Unknown USB state request: %s' % usb_state)
    968 
    969         self._switch_usbkey_power('off')
    970         # TODO(kevcheng): Forgive me for this terrible hack. This is just to
    971         # handle beaglebones that haven't yet updated and have the
    972         # safe_switch_usbkey RPC.  I'll remove this once all beaglebones have
    973         # been updated and also think about a better way to handle situations
    974         # like this.
    975         try:
    976             self._server.safe_switch_usbkey(mux_direction)
    977         except Exception:
    978             self.set('usb_mux_sel1', mux_direction)
    979         time.sleep(self.USB_POWEROFF_DELAY)
    980         self._switch_usbkey_power('on', usb_state == 'host')
    981         self._usb_state = usb_state
    982 
    983 
    984     def get_usbkey_direction(self):
    985         """Get which side USB is connected to or 'off' if usb power is off.
    986 
    987         @return: A string, one of 'dut', 'host', or 'off'.
    988         """
    989         if not self._usb_state:
    990             if self.get('prtctl4_pwren') == 'off':
    991                 self._usb_state = 'off'
    992             elif self.get('usb_mux_sel1').startswith('dut'):
    993                 self._usb_state = 'dut'
    994             else:
    995                 self._usb_state = 'host'
    996         return self._usb_state
    997 
    998 
    999     def set_servo_v4_role(self, role):
   1000         """Set the power role of servo v4, either 'src' or 'snk'.
   1001 
   1002         It does nothing if not a servo v4.
   1003 
   1004         @param role: Power role for DUT port on servo v4, either 'src' or 'snk'.
   1005         """
   1006         servo_version = self.get_servo_version()
   1007         if servo_version.startswith('servo_v4'):
   1008             value = self.get('servo_v4_role')
   1009             if value != role:
   1010                 self.set_nocheck('servo_v4_role', role)
   1011             else:
   1012                 logging.debug('Already in the role: %s.', role)
   1013         else:
   1014             logging.debug('Not a servo v4, unable to set role to %s.', role)
   1015 
   1016 
   1017     @property
   1018     def uart_logs_dir(self):
   1019         """Return the directory to save UART logs."""
   1020         return self._uart.logs_dir if self._uart else ""
   1021 
   1022 
   1023     @uart_logs_dir.setter
   1024     def uart_logs_dir(self, logs_dir):
   1025         """Set directory to save UART logs.
   1026 
   1027         @param logs_dir  String of directory name."""
   1028         if self._uart:
   1029             self._uart.logs_dir = logs_dir
   1030 
   1031 
   1032     def close(self):
   1033         """Close the servo object."""
   1034         if self._uart:
   1035             self._uart.stop_capture()
   1036             self._uart.dump()
   1037             self._uart = None
   1038