Home | History | Annotate | Download | only in servo
      1 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import functools
      6 import logging
      7 import pprint
      8 import re
      9 import time
     10 
     11 from autotest_lib.client.bin import utils
     12 from autotest_lib.client.common_lib import error
     13 from autotest_lib.client.common_lib.cros import cr50_utils
     14 from autotest_lib.server.cros.servo import chrome_ec
     15 
     16 
     17 def servo_v4_command(func):
     18     """Decorator for methods only relevant to tests running with servo v4."""
     19     @functools.wraps(func)
     20     def wrapper(instance, *args, **kwargs):
     21         """Ignore servo v4 functions it's not being used."""
     22         if instance.using_servo_v4():
     23             return func(instance, *args, **kwargs)
     24         logging.info("not using servo v4. ignoring %s", func.func_name)
     25     return wrapper
     26 
     27 
     28 class ChromeCr50(chrome_ec.ChromeConsole):
     29     """Manages control of a Chrome Cr50.
     30 
     31     We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
     32     provides many interfaces to set and get its behavior via console commands.
     33     This class is to abstract these interfaces.
     34     """
     35     OPEN = 'open'
     36     UNLOCK = 'unlock'
     37     LOCK = 'lock'
     38     # The amount of time you need to show physical presence.
     39     PP_SHORT = 15
     40     PP_LONG = 300
     41     CCD_PASSWORD_RATE_LIMIT = 3
     42     IDLE_COUNT = 'count: (\d+)\s'
     43     SHORT_WAIT = 3
     44     # The version has four groups: the partition, the header version, debug
     45     # descriptor and then version string.
     46     # There are two partitions A and B. The active partition is marked with a
     47     # '*'. If it is a debug image '/DBG' is added to the version string. If the
     48     # image has been corrupted, the version information will be replaced with
     49     # 'Error'.
     50     # So the output may look something like this.
     51     #   RW_A:    0.0.21/cr50_v1.1.6133-fd788b
     52     #   RW_B:  * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d
     53     # Or like this if the region was corrupted.
     54     #   RW_A:  * 0.0.21/cr50_v1.1.6133-fd788b
     55     #   RW_B:    Error
     56     VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s'
     57     INACTIVE_VERSION = VERSION_FORMAT % ''
     58     ACTIVE_VERSION = VERSION_FORMAT % '\*'
     59     # Following lines of the version output may print the image board id
     60     # information. eg.
     61     # BID A:   5a5a4146:ffffffff:00007f00 Yes
     62     # BID B:   00000000:00000000:00000000 Yes
     63     # Use the first group from ACTIVE_VERSION to match the active board id
     64     # partition.
     65     BID_ERROR = 'read_board_id: failed'
     66     BID_FORMAT = ':\s+[a-f0-9:]+ '
     67     ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT,
     68             BID_ERROR)
     69     WAKE_CHAR = '\n\n'
     70     WAKE_RESPONSE = ['(>|Console is enabled)']
     71     START_UNLOCK_TIMEOUT = 20
     72     GETTIME = ['= (\S+)']
     73     FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"]
     74     FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting']
     75     MAX_RETRY_COUNT = 5
     76     START_STR = ['(.*Console is enabled;)']
     77     REBOOT_DELAY_WITH_CCD = 60
     78     REBOOT_DELAY_WITH_FLEX = 3
     79     ON_STRINGS = ['enable', 'enabled', 'on']
     80     CONSERVATIVE_CCD_WAIT = 10
     81     CCD_SHORT_PRESSES = 5
     82     CAP_IS_ACCESSIBLE = 0
     83     CAP_SETTING = 1
     84     CAP_REQ = 2
     85     GET_CAP_TRIES = 3
     86 
     87 
     88     def __init__(self, servo):
     89         super(ChromeCr50, self).__init__(servo, 'cr50_uart')
     90 
     91 
     92     def wake_cr50(self):
     93         """Wake up cr50 by sending some linebreaks and wait for the response"""
     94         logging.debug(super(ChromeCr50, self).send_command_get_output(
     95                 self.WAKE_CHAR, self.WAKE_RESPONSE))
     96 
     97 
     98     def send_command(self, commands):
     99         """Send command through UART.
    100 
    101         Cr50 will drop characters input to the UART when it resumes from sleep.
    102         If servo is not using ccd, send some dummy characters before sending the
    103         real command to make sure cr50 is awake.
    104         """
    105         if not self.using_ccd():
    106             self.wake_cr50()
    107         super(ChromeCr50, self).send_command(commands)
    108 
    109 
    110     def set_cap(self, cap, setting):
    111         """Set the capability to setting"""
    112         self.set_caps({ cap : setting })
    113 
    114 
    115     def set_caps(self, cap_dict):
    116         """Use cap_dict to set all the cap values
    117 
    118         Set all of the capabilities in cap_dict to the correct config.
    119 
    120         Args:
    121             cap_dict: A dictionary with the capability as key and the desired
    122                 setting as values
    123         """
    124         for cap, config in cap_dict.iteritems():
    125             self.send_command('ccd set %s %s' % (cap, config))
    126         current_cap_settings = self.get_cap_dict(info=self.CAP_SETTING)
    127         for cap, config in cap_dict.iteritems():
    128             if (current_cap_settings[cap].lower() !=
    129                 config.lower()):
    130                 raise error.TestFail('Failed to set %s to %s' % (cap, config))
    131 
    132 
    133     def get_cap_overview(self, cap_dict):
    134         """Returns a tuple (in factory mode, is reset)
    135 
    136         If all capabilities are set to Default, ccd has been reset to default.
    137         If all capabilities are set to Always, ccd is in factory mode.
    138         """
    139         in_factory_mode = True
    140         is_reset = True
    141         for cap, cap_info in cap_dict.iteritems():
    142             cap_setting = cap_info[self.CAP_SETTING]
    143             if cap_setting != 'Always':
    144                 in_factory_mode = False
    145             if cap_setting != 'Default':
    146                 is_reset = False
    147         return in_factory_mode, is_reset
    148 
    149 
    150     def wp_is_reset(self):
    151         """Returns True if wp is reset to follow batt pres at all times"""
    152         follow_batt_pres, _, follow_batt_pres_atboot, _ = self.get_wp_state()
    153         return follow_batt_pres and follow_batt_pres_atboot
    154 
    155 
    156     def get_wp_state(self):
    157         """Returns a tuple of the current write protect state.
    158 
    159         The atboot setting cannot really be determined now if it is set to
    160         follow battery presence. It is likely to remain the same after reboot,
    161         but who knows. If the third element of the tuple is True, the last
    162         element will not be that useful
    163 
    164         Returns:
    165             (True if current state is to follow batt presence,
    166              True if write protect is enabled,
    167              True if current state is to follow batt presence atboot,
    168              True if write protect is enabled atboot)
    169         """
    170         rv = self.send_command_get_output('wp',
    171                 ['Flash WP: (forced )?(enabled|disabled).*at boot: (forced )?'
    172                  '(follow|enabled|disabled)'])[0]
    173         _, forced, enabled, _, atboot = rv
    174         logging.debug(rv)
    175         return (not forced, enabled =='enabled',
    176                 atboot == 'follow', atboot == 'enabled')
    177 
    178 
    179     def in_dev_mode(self):
    180         """Return True if cr50 thinks the device is in dev mode"""
    181         return 'dev_mode' in self.get_ccd_info()['TPM']
    182 
    183 
    184     def get_ccd_info(self):
    185         """Get the current ccd state.
    186 
    187         Take the output of 'ccd' and convert it to a dictionary.
    188 
    189         Returns:
    190             A dictionary with the ccd state name as the key and setting as
    191             value.
    192         """
    193         info = {}
    194         original_timeout = float(self._servo.get('cr50_uart_timeout'))
    195         # Change the console timeout to 10s, it may take longer than 3s to read
    196         # ccd info
    197         self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
    198         try:
    199             rv = self.send_command_get_output('ccd', ["ccd.*>"])[0]
    200         finally:
    201             self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
    202         for line in rv.splitlines():
    203             # CCD information is separated with an :
    204             #   State: Opened
    205             # Extract the state name and the value.
    206             line = line.strip()
    207             if ':' not in line or '[' in line:
    208                 continue
    209             key, value = line.split(':')
    210             info[key.strip()] = value.strip()
    211         logging.info('Current CCD settings:\n%s', pprint.pformat(info))
    212         return info
    213 
    214 
    215     def get_cap(self, cap):
    216         """Returns the capabilitiy from the capability dictionary"""
    217         return self.get_cap_dict()[cap]
    218 
    219 
    220     def _get_ccd_cap_string(self):
    221         """Return a string with the current capability settings.
    222 
    223         The ccd information is pretty long. Servo micro sometimes drops
    224         characters. Run the command a couple of times. Return the capapability
    225         string that matches a previous run.
    226 
    227         Raises:
    228             TestError if the test could not retrieve consistent capability
    229             information.
    230         """
    231         past_results = []
    232         for i in range(self.GET_CAP_TRIES):
    233             rv = self.send_safe_command_get_output('ccd',
    234                     ["Capabilities:\s+[\da-f]+\s(.*)TPM:"])[0][1]
    235             logging.debug(rv)
    236             if rv in past_results:
    237                 return rv
    238             past_results.append(rv)
    239         logging.debug(past_results)
    240         raise error.TestError('Could not get consistent capability information')
    241 
    242 
    243     def get_cap_dict(self, info=None):
    244         """Get the current ccd capability settings.
    245 
    246         The capability may be using the 'Default' setting. That doesn't say much
    247         about the ccd state required to use the capability. Return all ccd
    248         information in the cap_dict
    249         [is accessible, setting, requirement]
    250 
    251         Args:
    252             info: Only fill the cap_dict with the requested information:
    253                   CAP_IS_ACCESSIBLE, CAP_SETTING, or CAP_REQ
    254 
    255         Returns:
    256             A dictionary with the capability as the key a list of the current
    257             settings as the value [is_accessible, setting, requirement]
    258         """
    259         caps = {}
    260         cap_info_str = self._get_ccd_cap_string()
    261         # There are two capability formats. Extract the setting and the
    262         # requirement from both formats
    263         #  UartGscRxECTx   Y 3=IfOpened
    264         #  or
    265         #  UartGscRxECTx   Y 0=Default (Always)
    266         cap_settings = re.findall('(\S+) +(Y|-).*=(\w+)( \((\S+)\))?',
    267                                   cap_info_str)
    268         for cap, accessible, setting, _, required in cap_settings:
    269             cap_info = [accessible == 'Y', setting, required]
    270             # If there's only 1 value after =, then the setting is the
    271             # requirement.
    272             if not required:
    273                 cap_info[self.CAP_REQ] = setting
    274             if info is not None:
    275                 caps[cap] = cap_info[info]
    276             else:
    277                 caps[cap] = cap_info
    278         logging.debug(pprint.pformat(caps))
    279         return caps
    280 
    281 
    282     def send_command_get_output(self, command, regexp_list):
    283         """Send command through UART and wait for response.
    284 
    285         Cr50 will drop characters input to the UART when it resumes from sleep.
    286         If servo is not using ccd, send some dummy characters before sending the
    287         real command to make sure cr50 is awake.
    288         """
    289         if not self.using_ccd():
    290             self.wake_cr50()
    291 
    292         # We have started prepending '\n' to separate cr50 console junk from
    293         # the real command. If someone is just searching for .*>, then they will
    294         # only get the output from the first '\n' we added. Raise an error to
    295         # change the test to look for something more specific ex command.*>.
    296         # cr50 will print the command in the output, so that is an easy way to
    297         # modify '.*>' to match the real command output.
    298         if '.*>' in regexp_list:
    299             raise error.TestError('Send more specific regexp %r %r' % (command,
    300                     regexp_list))
    301 
    302         # prepend \n to separate the command from any junk that may have been
    303         # sent to the cr50 uart.
    304         command = '\n' + command
    305         return super(ChromeCr50, self).send_command_get_output(command,
    306                                                                regexp_list)
    307 
    308 
    309     def send_safe_command_get_output(self, command, regexp_list):
    310         """Restrict the console channels while sending console commands"""
    311         self.send_command('chan save')
    312         self.send_command('chan 0')
    313         try:
    314             rv = self.send_command_get_output(command, regexp_list)
    315         finally:
    316             self.send_command('chan restore')
    317         return rv
    318 
    319 
    320     def send_command_retry_get_output(self, command, regexp_list, tries=3):
    321         """Retry sending a command if we can't find the output.
    322 
    323         Cr50 may print irrelevant output while printing command output. It may
    324         prevent the regex from matching. Send command and get the output. If it
    325         fails try again.
    326 
    327         If it fails every time, raise an error.
    328 
    329         Don't use this to set something that should only be set once.
    330         """
    331         # TODO(b/80319784): once chan is unrestricted, use it to restrict what
    332         # output cr50 prints while we are sending commands.
    333         for i in range(tries):
    334             try:
    335                 return self.send_command_get_output(command, regexp_list)
    336             except error.TestFail, e:
    337                 logging.info('Failed to get %r output: %r', command, str(e))
    338                 if i == tries - 1:
    339                     # Raise the last error, if we never successfully returned
    340                     # the command output
    341                     logging.info('Could not get %r output after %d tries',
    342                                  command, tries)
    343                     raise
    344 
    345 
    346     def get_deep_sleep_count(self):
    347         """Get the deep sleep count from the idle task"""
    348         result = self.send_command_retry_get_output('idle', [self.IDLE_COUNT])
    349         return int(result[0][1])
    350 
    351 
    352     def clear_deep_sleep_count(self):
    353         """Clear the deep sleep count"""
    354         result = self.send_command_retry_get_output('idle c', [self.IDLE_COUNT])
    355         if int(result[0][1]):
    356             raise error.TestFail("Could not clear deep sleep count")
    357 
    358 
    359     def get_board_properties(self):
    360         """Get information from the version command"""
    361         rv = self.send_command_retry_get_output('brdprop',
    362                 ['properties = (\S+)\s'])
    363         return int(rv[0][1], 16)
    364 
    365 
    366     def has_command(self, cmd):
    367         """Returns 1 if cr50 has the command 0 if it doesn't"""
    368         try:
    369             self.send_command_retry_get_output('help', [cmd])
    370         except:
    371             logging.info("Image does not include '%s' command", cmd)
    372             return 0
    373         return 1
    374 
    375 
    376     def erase_nvmem(self):
    377         """Use flasherase to erase both nvmem sections"""
    378         if not self.has_command('flasherase'):
    379             raise error.TestError("need image with 'flasherase'")
    380 
    381         self.send_command('flasherase 0x7d000 0x3000')
    382         self.send_command('flasherase 0x3d000 0x3000')
    383 
    384 
    385     def reboot(self):
    386         """Reboot Cr50 and wait for cr50 to reset"""
    387         self.wait_for_reboot(cmd='reboot')
    388 
    389 
    390     def _uart_wait_for_reboot(self, cmd='\n', timeout=60):
    391         """Use uart to wait for cr50 to reboot.
    392 
    393         If a command is given run it and wait for cr50 to reboot. Monitor
    394         the cr50 uart to detect the reset. Wait up to timeout seconds
    395         for the reset.
    396 
    397         Args:
    398             cmd: the command to run to reset cr50.
    399             timeout: seconds to wait to detect the reboot.
    400         """
    401         original_timeout = float(self._servo.get('cr50_uart_timeout'))
    402         # Change the console timeout to timeout, so we wait at least that long
    403         # for cr50 to print the start string.
    404         self._servo.set_nocheck('cr50_uart_timeout', timeout)
    405         try:
    406             self.send_command_get_output(cmd, self.START_STR)
    407             logging.debug('Detected cr50 reboot')
    408         except error.TestFail, e:
    409             logging.debug('Failed to detect cr50 reboot')
    410         # Reset the timeout.
    411         self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
    412 
    413 
    414     def wait_for_reboot(self, cmd='\n', timeout=60):
    415         """Wait for cr50 to reboot
    416 
    417         Run the cr50 reset command. Wait for cr50 to reset and reenable ccd if
    418         necessary.
    419 
    420         Args:
    421             cmd: the command to run to reset cr50.
    422             timeout: seconds to wait to detect the reboot.
    423         """
    424         if self.using_ccd():
    425             self.send_command(cmd)
    426             # Cr50 USB is reset when it reboots. Wait for the CCD connection to
    427             # go down to detect the reboot.
    428             self.wait_for_ccd_disable(timeout, raise_error=False)
    429             self.ccd_enable()
    430         else:
    431             self._uart_wait_for_reboot(cmd, timeout)
    432 
    433 
    434     def rollback(self, eraseflashinfo=True, chip_bid=None, chip_flags=None):
    435         """Set the reset counter high enough to force a rollback then reboot
    436 
    437         Set the new board id before rolling back if one is given.
    438 
    439         Args:
    440             eraseflashinfo: True if eraseflashinfo should be run before rollback
    441             chip_bid: the integer representation of chip board id or None if the
    442                       board id should be erased during rollback
    443             chip_flags: the integer representation of chip board id flags or
    444                         None if the board id should be erased during rollback
    445         """
    446         if (not self.has_command('rollback') or not
    447             self.has_command('eraseflashinfo')):
    448             raise error.TestError("need image with 'rollback' and "
    449                 "'eraseflashinfo'")
    450 
    451         inactive_partition = self.get_inactive_version_info()[0]
    452         # Set the board id if both the board id and flags have been given.
    453         set_bid = chip_bid and chip_flags
    454 
    455         # Erase the infomap
    456         if eraseflashinfo or set_bid:
    457             self.send_command('eraseflashinfo')
    458 
    459         # Update the board id after it has been erased
    460         if set_bid:
    461             self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags))
    462 
    463         self.wait_for_reboot(cmd='rollback')
    464 
    465         running_partition = self.get_active_version_info()[0]
    466         if inactive_partition != running_partition:
    467             raise error.TestError("Failed to rollback to inactive image")
    468 
    469 
    470     def rolledback(self):
    471         """Returns true if cr50 just rolled back"""
    472         return 'Rollback detected' in self.send_safe_command_get_output(
    473                 'sysinfo', ['sysinfo.*>'])[0]
    474 
    475 
    476     def get_version_info(self, regexp):
    477         """Get information from the version command"""
    478         return self.send_command_retry_get_output('ver', [regexp])[0][1::]
    479 
    480 
    481     def get_inactive_version_info(self):
    482         """Get the active partition, version, and hash"""
    483         return self.get_version_info(self.INACTIVE_VERSION)
    484 
    485 
    486     def get_active_version_info(self):
    487         """Get the active partition, version, and hash"""
    488         return self.get_version_info(self.ACTIVE_VERSION)
    489 
    490 
    491     def using_prod_rw_keys(self):
    492         """Returns True if the RW keyid is prod"""
    493         rv = self.send_safe_command_retry_get_output('sysinfo',
    494                 ['RW keyid:.*\(([a-z]+)\)'])
    495         logging.info(rv)
    496         return rv[0][1] == 'prod'
    497 
    498 
    499     def get_active_board_id_str(self):
    500         """Get the running image board id.
    501 
    502         Returns:
    503             The board id string or None if the image does not support board id
    504             or the image is not board id locked.
    505         """
    506         # Getting the board id from the version console command is only
    507         # supported in board id locked images .22 and above. Any image that is
    508         # board id locked will have support for getting the image board id.
    509         #
    510         # If board id is not supported on the device, return None. This is
    511         # still expected on all current non board id locked release images.
    512         try:
    513             version_info = self.get_version_info(self.ACTIVE_BID)
    514         except error.TestFail, e:
    515             logging.info(str(e))
    516             logging.info('Cannot use the version to get the board id')
    517             return None
    518 
    519         if self.BID_ERROR in version_info[4]:
    520             raise error.TestError(version_info)
    521         bid = version_info[4].split()[1]
    522         return cr50_utils.GetBoardIdInfoString(bid)
    523 
    524 
    525     def get_version(self):
    526         """Get the RW version"""
    527         return self.get_active_version_info()[1].strip()
    528 
    529 
    530     def using_servo_v4(self):
    531         """Returns true if the console is being served using servo v4"""
    532         return 'servo_v4' in self._servo.get_servo_version()
    533 
    534 
    535     def using_ccd(self):
    536         """Returns true if the console is being served using CCD"""
    537         return 'ccd_cr50' in self._servo.get_servo_version()
    538 
    539 
    540     def ccd_is_enabled(self):
    541         """Return True if ccd is enabled.
    542 
    543         If the test is running through ccd, return the ccd_state value. If
    544         a flex cable is being used, use the CCD_MODE_L gpio setting to determine
    545         if Cr50 has ccd enabled.
    546 
    547         Returns:
    548             'off' or 'on' based on whether the cr50 console is working.
    549         """
    550         if self.using_ccd():
    551             return self._servo.get('ccd_state') == 'on'
    552         else:
    553             result = self.send_command_retry_get_output('gpioget',
    554                     ['(0|1)[ \S]*CCD_MODE_L'])
    555             return not bool(int(result[0][1]))
    556 
    557 
    558     @servo_v4_command
    559     def wait_for_stable_ccd_state(self, state, timeout, raise_error):
    560         """Wait up to timeout seconds for CCD to be 'on' or 'off'
    561 
    562         Verify ccd is off or on and remains in that state for 3 seconds.
    563 
    564         Args:
    565             state: a string either 'on' or 'off'.
    566             timeout: time in seconds to wait
    567             raise_error: Raise TestFail if the value is state is not reached.
    568 
    569         Raises:
    570             TestFail if ccd never reaches the specified state
    571         """
    572         wait_for_enable = state == 'on'
    573         logging.info("Wait until ccd is %s", 'on' if wait_for_enable else 'off')
    574         enabled = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable,
    575                                        timeout_sec=timeout)
    576         if enabled != wait_for_enable:
    577             error_msg = ("timed out before detecting ccd '%s'" %
    578                          ('on' if wait_for_enable else 'off'))
    579             if raise_error:
    580                 raise error.TestFail(error_msg)
    581             logging.warning(error_msg)
    582         else:
    583             # Make sure the state doesn't change.
    584             enabled = utils.wait_for_value(self.ccd_is_enabled, not enabled,
    585                                            timeout_sec=self.SHORT_WAIT)
    586             if enabled != wait_for_enable:
    587                 error_msg = ("CCD switched %r after briefly being %r" %
    588                              ('on' if enabled else 'off', state))
    589                 if raise_error:
    590                     raise error.TestFail(error_msg)
    591                 logging.info(error_msg)
    592         logging.info("ccd is %r", 'on' if enabled else 'off')
    593 
    594 
    595     @servo_v4_command
    596     def wait_for_ccd_disable(self, timeout=60, raise_error=True):
    597         """Wait for the cr50 console to stop working"""
    598         self.wait_for_stable_ccd_state('off', timeout, raise_error)
    599 
    600 
    601     @servo_v4_command
    602     def wait_for_ccd_enable(self, timeout=60, raise_error=False):
    603         """Wait for the cr50 console to start working"""
    604         self.wait_for_stable_ccd_state('on', timeout, raise_error)
    605 
    606 
    607     @servo_v4_command
    608     def ccd_disable(self, raise_error=True):
    609         """Change the values of the CC lines to disable CCD"""
    610         logging.info("disable ccd")
    611         self._servo.set_nocheck('servo_v4_dts_mode', 'off')
    612         self.wait_for_ccd_disable(raise_error=raise_error)
    613 
    614 
    615     @servo_v4_command
    616     def ccd_enable(self, raise_error=False):
    617         """Reenable CCD and reset servo interfaces"""
    618         logging.info("reenable ccd")
    619         self._servo.set_nocheck('servo_v4_dts_mode', 'on')
    620         # If the test is actually running with ccd, wait for USB communication
    621         # to come up after reset.
    622         if self.using_ccd():
    623             time.sleep(self._servo.USB_DETECTION_DELAY)
    624         self.wait_for_ccd_enable(raise_error=raise_error)
    625 
    626 
    627     def _level_change_req_pp(self, level):
    628         """Returns True if setting the level will require physical presence"""
    629         testlab_pp = level != 'testlab open' and 'testlab' in level
    630         # If the level is open and the ccd capabilities say physical presence
    631         # is required, then physical presence will be required.
    632         open_pp = (level == 'open' and
    633                    not self.get_cap('OpenNoLongPP')[self.CAP_IS_ACCESSIBLE])
    634         return testlab_pp or open_pp
    635 
    636 
    637     def _state_to_bool(self, state):
    638         """Converts the state string to True or False"""
    639         # TODO(mruthven): compare to 'on' once servo is up to date in the lab
    640         return state.lower() in self.ON_STRINGS
    641 
    642 
    643     def testlab_is_on(self):
    644         """Returns True of testlab mode is on"""
    645         return self._state_to_bool(self._servo.get('cr50_testlab'))
    646 
    647 
    648     def set_ccd_testlab(self, state):
    649         """Set the testlab mode
    650 
    651         Args:
    652             state: the desired testlab mode string: 'on' or 'off'
    653 
    654         Raises:
    655             TestFail if testlab mode was not changed
    656         """
    657         if self.using_ccd():
    658             raise error.TestError('Cannot set testlab mode with CCD. Use flex '
    659                     'cable instead.')
    660 
    661         request_on = self._state_to_bool(state)
    662         testlab_on = self.testlab_is_on()
    663         request_str = 'on' if request_on else 'off'
    664 
    665         if testlab_on == request_on:
    666             logging.info('ccd testlab already set to %s', request_str)
    667             return
    668 
    669         original_level = self.get_ccd_level()
    670 
    671         # We can only change the testlab mode when the device is open. If
    672         # testlab mode is already enabled, we can go directly to open using 'ccd
    673         # testlab open'. This will save 5 minutes, because we can skip the
    674         # physical presence check.
    675         if testlab_on:
    676             self.send_command('ccd testlab open')
    677         else:
    678             self.set_ccd_level('open')
    679 
    680         # Set testlab mode
    681         rv = self.send_command_get_output('ccd testlab %s' % request_str,
    682                 ['ccd.*>'])[0]
    683         if 'Access Denied' in rv:
    684             raise error.TestFail("'ccd %s' %s" % (request_str, rv))
    685 
    686         # Press the power button once a second for 15 seconds.
    687         self.run_pp(self.PP_SHORT)
    688 
    689         self.set_ccd_level(original_level)
    690 
    691         if request_on != self.testlab_is_on():
    692             raise error.TestFail('Failed to set ccd testlab to %s' % state)
    693 
    694 
    695     def get_ccd_level(self):
    696         """Returns the current ccd privilege level"""
    697         return self._servo.get('cr50_ccd_level').lower()
    698 
    699 
    700     def set_ccd_level(self, level, password=''):
    701         """Set the Cr50 CCD privilege level.
    702 
    703         Args:
    704             level: a string of the ccd privilege level: 'open', 'lock', or
    705                    'unlock'.
    706             password: send the ccd command with password. This will still
    707                     require the same physical presence.
    708 
    709         Raises:
    710             TestFail if the level couldn't be set
    711         ."""
    712         # TODO(mruthven): add support for CCD password
    713         level = level.lower()
    714 
    715         if level == self.get_ccd_level():
    716             logging.info('CCD privilege level is already %s', level)
    717             return
    718 
    719         if 'testlab' in level:
    720             raise error.TestError("Can't change testlab mode using "
    721                 "ccd_set_level")
    722 
    723         testlab_on = self._state_to_bool(self._servo.get('cr50_testlab'))
    724         req_pp = self._level_change_req_pp(level)
    725         has_pp = not self.using_ccd()
    726         dbg_en = 'DBG' in self._servo.get('cr50_version')
    727 
    728         if req_pp and not has_pp:
    729             raise error.TestError("Can't change privilege level to '%s' "
    730                 "without physical presence." % level)
    731 
    732         if not testlab_on and not has_pp:
    733             raise error.TestError("Wont change privilege level without "
    734                 "physical presence or testlab mode enabled")
    735 
    736         original_timeout = float(self._servo.get('cr50_uart_timeout'))
    737         # Change the console timeout to CONSERVATIVE_CCD_WAIT, running 'ccd' may
    738         # take more than 3 seconds.
    739         self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
    740         # Start the unlock process.
    741 
    742         if level == 'open' or level == 'unlock':
    743             logging.info('waiting %d seconds, the minimum time between'
    744                          ' ccd password attempts',
    745                          self.CCD_PASSWORD_RATE_LIMIT)
    746             time.sleep(self.CCD_PASSWORD_RATE_LIMIT)
    747 
    748         try:
    749             cmd = 'ccd %s%s' % (level, (' ' + password) if password else '')
    750             rv = self.send_command_get_output(cmd, [cmd + '(.*)>'])[0][1]
    751         finally:
    752             self._servo.set('cr50_uart_timeout', original_timeout)
    753         logging.info(rv)
    754         if 'ccd_open denied: fwmp' in rv:
    755             raise error.TestFail('FWMP disabled %r: %s' % (cmd, rv))
    756         if 'Access Denied' in rv:
    757             raise error.TestFail("%r %s" % (cmd, rv))
    758         if 'Busy' in rv:
    759             raise error.TestFail("cr50 is too busy to run %r: %s" % (cmd, rv))
    760 
    761         # Press the power button once a second, if we need physical presence.
    762         if req_pp:
    763             # DBG images have shorter unlock processes
    764             self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG)
    765 
    766         if level != self.get_ccd_level():
    767             raise error.TestFail('Could not set privilege level to %s' % level)
    768 
    769         logging.info('Successfully set CCD privelege level to %s', level)
    770 
    771 
    772     def run_pp(self, unlock_timeout):
    773         """Press the power button a for unlock_timeout seconds.
    774 
    775         This will press the power button many more times than it needs to be
    776         pressed. Cr50 doesn't care if you press it too often. It just cares that
    777         you press the power button at least once within the detect interval.
    778 
    779         For privilege level changes you need to press the power button 5 times
    780         in the short interval and then 4 times within the long interval.
    781         Short Interval
    782         100msec < power button press < 5 seconds
    783         Long Interval
    784         60s < power button press < 300s
    785 
    786         For testlab enable/disable you must press the power button 5 times
    787         spaced between 100msec and 5 seconds apart.
    788         """
    789         end_time = time.time() + unlock_timeout
    790 
    791         logging.info('Pressing power button for %ds to unlock the console.',
    792                      unlock_timeout)
    793         logging.info('The process should end at %s', time.ctime(end_time))
    794 
    795         # Press the power button once a second to unlock the console.
    796         while time.time() < end_time:
    797             self._servo.power_short_press()
    798             time.sleep(1)
    799 
    800 
    801     def gettime(self):
    802         """Get the current cr50 system time"""
    803         result = self.send_command_retry_get_output('gettime', [' = (.*) s'])
    804         return float(result[0][1])
    805 
    806 
    807     def servo_v4_supports_dts_mode(self):
    808         """Returns True if cr50 registers changes in servo v4 dts mode."""
    809         # This is to test that Cr50 actually recognizes the change in ccd state
    810         # We cant do that with tests using ccd, because the cr50 communication
    811         # goes down once ccd is enabled.
    812         if 'servo_v4_with_servo_micro' != self._servo.get_servo_version():
    813             return False
    814 
    815         ccd_start = 'on' if self.ccd_is_enabled() else 'off'
    816         dts_start = self._servo.get('servo_v4_dts_mode')
    817         try:
    818             # Verify both ccd enable and disable
    819             self.ccd_disable(raise_error=True)
    820             self.ccd_enable(raise_error=True)
    821             rv = True
    822         except Exception, e:
    823             logging.info(e)
    824             rv = False
    825         self._servo.set_nocheck('servo_v4_dts_mode', dts_start)
    826         self.wait_for_stable_ccd_state(ccd_start, 60, True)
    827         logging.info('Test setup does%s support servo DTS mode',
    828                 '' if rv else 'n\'t')
    829         return rv
    830 
    831 
    832     def wait_until_update_is_allowed(self):
    833         """Wait until cr50 will be able to accept an update.
    834 
    835         Cr50 rejects any attempt to update if it has been less than 60 seconds
    836         since it last recovered from deep sleep or came up from reboot. This
    837         will wait until cr50 gettime shows a time greater than 60.
    838         """
    839         if self.get_active_version_info()[2]:
    840             logging.info("Running DBG image. Don't need to wait for update.")
    841             return
    842         cr50_time = self.gettime()
    843         if cr50_time < 60:
    844             sleep_time = 61 - cr50_time
    845             logging.info('Cr50 has been up for %ds waiting %ds before update',
    846                          cr50_time, sleep_time)
    847             time.sleep(sleep_time)
    848 
    849     def tpm_is_enabled(self):
    850         """Query the current TPM mode.
    851 
    852         Returns  True if TPM is enabled,
    853                  False otherwise.
    854         """
    855         result = self.send_command_get_output('sysinfo',
    856                 ['(?i)TPM\s+MODE:\s+(enabled|disabled)'])[0][1]
    857         logging.debug(result)
    858 
    859         return result.lower() == 'enabled'
    860 
    861     def keyladder_is_enabled(self):
    862         """Get the status of H1 Key Ladder.
    863 
    864         Returns True if H1 Key Ladder is enabled.
    865                 False otherwise.
    866         """
    867         result = self.send_command_get_output('sysinfo',
    868                 ['(?i)Key\s+Ladder:\s+(enabled|disabled)'])[0][1]
    869         logging.debug(result)
    870 
    871         return result.lower() == 'enabled'
    872