Home | History | Annotate | Download | only in servo
      1 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import functools
      6 import logging
      7 import time
      8 
      9 from autotest_lib.client.bin import utils
     10 from autotest_lib.client.common_lib import error
     11 from autotest_lib.client.common_lib.cros import cr50_utils
     12 from autotest_lib.server.cros.servo import chrome_ec
     13 
     14 
     15 def servo_v4_command(func):
     16     """Decorator for methods only relevant to tests running with servo v4."""
     17     @functools.wraps(func)
     18     def wrapper(instance, *args, **kwargs):
     19         """Ignore servo v4 functions it's not being used."""
     20         if instance.using_servo_v4():
     21             return func(instance, *args, **kwargs)
     22         logging.info("not using servo v4. ignoring %s", func.func_name)
     23     return wrapper
     24 
     25 
     26 class ChromeCr50(chrome_ec.ChromeConsole):
     27     """Manages control of a Chrome Cr50.
     28 
     29     We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
     30     provides many interfaces to set and get its behavior via console commands.
     31     This class is to abstract these interfaces.
     32     """
     33     # The amount of time you need to show physical presence.
     34     PP_SHORT = 15
     35     PP_LONG = 300
     36     IDLE_COUNT = 'count: (\d+)'
     37     # The version has four groups: the partition, the header version, debug
     38     # descriptor and then version string.
     39     # There are two partitions A and B. The active partition is marked with a
     40     # '*'. If it is a debug image '/DBG' is added to the version string. If the
     41     # image has been corrupted, the version information will be replaced with
     42     # 'Error'.
     43     # So the output may look something like this.
     44     #   RW_A:    0.0.21/cr50_v1.1.6133-fd788b
     45     #   RW_B:  * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d
     46     # Or like this if the region was corrupted.
     47     #   RW_A:  * 0.0.21/cr50_v1.1.6133-fd788b
     48     #   RW_B:    Error
     49     VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s'
     50     INACTIVE_VERSION = VERSION_FORMAT % ''
     51     ACTIVE_VERSION = VERSION_FORMAT % '\*'
     52     # Following lines of the version output may print the image board id
     53     # information. eg.
     54     # BID A:   5a5a4146:ffffffff:00007f00 Yes
     55     # BID B:   00000000:00000000:00000000 Yes
     56     # Use the first group from ACTIVE_VERSION to match the active board id
     57     # partition.
     58     BID_ERROR = 'read_board_id: failed'
     59     BID_FORMAT = ':\s+[a-f0-9:]+ '
     60     ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT,
     61             BID_ERROR)
     62     WAKE_CHAR = '\n'
     63     START_UNLOCK_TIMEOUT = 20
     64     GETTIME = ['= (\S+)']
     65     FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"]
     66     FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting']
     67     MAX_RETRY_COUNT = 5
     68     START_STR = ['(.*Console is enabled;)']
     69     REBOOT_DELAY_WITH_CCD = 60
     70     REBOOT_DELAY_WITH_FLEX = 3
     71     ON_STRINGS = ['enable', 'enabled', 'on']
     72 
     73 
     74     def __init__(self, servo):
     75         super(ChromeCr50, self).__init__(servo, 'cr50_uart')
     76 
     77 
     78     def send_command(self, commands):
     79         """Send command through UART.
     80 
     81         Cr50 will drop characters input to the UART when it resumes from sleep.
     82         If servo is not using ccd, send some dummy characters before sending the
     83         real command to make sure cr50 is awake.
     84         """
     85         if not self.using_ccd():
     86             super(ChromeCr50, self).send_command(self.WAKE_CHAR)
     87         super(ChromeCr50, self).send_command(commands)
     88 
     89 
     90     def send_command_get_output(self, command, regexp_list):
     91         """Send command through UART and wait for response.
     92 
     93         Cr50 will drop characters input to the UART when it resumes from sleep.
     94         If servo is not using ccd, send some dummy characters before sending the
     95         real command to make sure cr50 is awake.
     96         """
     97         if not self.using_ccd():
     98             super(ChromeCr50, self).send_command(self.WAKE_CHAR)
     99         return super(ChromeCr50, self).send_command_get_output(command,
    100                                                                regexp_list)
    101 
    102 
    103     def get_deep_sleep_count(self):
    104         """Get the deep sleep count from the idle task"""
    105         result = self.send_command_get_output('idle', [self.IDLE_COUNT])
    106         return int(result[0][1])
    107 
    108 
    109     def clear_deep_sleep_count(self):
    110         """Clear the deep sleep count"""
    111         result = self.send_command_get_output('idle c', [self.IDLE_COUNT])
    112         if int(result[0][1]):
    113             raise error.TestFail("Could not clear deep sleep count")
    114 
    115 
    116     def has_command(self, cmd):
    117         """Returns 1 if cr50 has the command 0 if it doesn't"""
    118         try:
    119             self.send_command_get_output('help', [cmd])
    120         except:
    121             logging.info("Image does not include '%s' command", cmd)
    122             return 0
    123         return 1
    124 
    125 
    126     def erase_nvmem(self):
    127         """Use flasherase to erase both nvmem sections"""
    128         if not self.has_command('flasherase'):
    129             raise error.TestError("need image with 'flasherase'")
    130 
    131         self.send_command('flasherase 0x7d000 0x3000')
    132         self.send_command('flasherase 0x3d000 0x3000')
    133 
    134 
    135     def reboot(self):
    136         """Reboot Cr50 and wait for cr50 to reset"""
    137         response = [] if self.using_ccd() else self.START_STR
    138         self.send_command_get_output('reboot', response)
    139 
    140         # ccd will stop working after the reboot. Wait until that happens and
    141         # reenable it.
    142         if self.using_ccd():
    143             self.wait_for_reboot()
    144 
    145 
    146     def _uart_wait_for_reboot(self, timeout=60):
    147         """Wait for the cr50 to reboot and enable the console.
    148 
    149         This will wait up to timeout seconds for cr50 to print the start string.
    150 
    151         Args:
    152             timeout: seconds to wait to detect the reboot.
    153         """
    154         original_timeout = float(self._servo.get('cr50_uart_timeout'))
    155         # Change the console timeout to timeout, so we wait at least that long
    156         # for cr50 to print the start string.
    157         self._servo.set_nocheck('cr50_uart_timeout', timeout)
    158         try:
    159             self.send_command_get_output('\n', self.START_STR)
    160             logging.debug('Detected cr50 reboot')
    161         except error.TestFail, e:
    162             logging.debug('Failed to detect cr50 reboot')
    163         # Reset the timeout.
    164         self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
    165 
    166 
    167     def wait_for_reboot(self, timeout=60):
    168         """Wait for cr50 to reboot"""
    169         if self.using_ccd():
    170             # Cr50 USB is reset when it reboots. Wait for the CCD connection to
    171             # go down to detect the reboot.
    172             self.wait_for_ccd_disable(timeout, raise_error=False)
    173             self.ccd_enable()
    174         else:
    175             self._uart_wait_for_reboot(timeout)
    176 
    177 
    178     def rollback(self, eraseflashinfo=True, chip_bid=None, chip_flags=None):
    179         """Set the reset counter high enough to force a rollback then reboot
    180 
    181         Set the new board id before rolling back if one is given.
    182 
    183         Args:
    184             eraseflashinfo: True if eraseflashinfo should be run before rollback
    185             chip_bid: the integer representation of chip board id or None if the
    186                       board id should be erased during rollback
    187             chip_flags: the integer representation of chip board id flags or
    188                         None if the board id should be erased during rollback
    189         """
    190         if (not self.has_command('rollback') or not
    191             self.has_command('eraseflashinfo')):
    192             raise error.TestError("need image with 'rollback' and "
    193                 "'eraseflashinfo'")
    194 
    195         inactive_partition = self.get_inactive_version_info()[0]
    196         # Set the board id if both the board id and flags have been given.
    197         set_bid = chip_bid and chip_flags
    198 
    199         # Erase the infomap
    200         if eraseflashinfo or set_bid:
    201             self.send_command('eraseflashinfo')
    202 
    203         # Update the board id after it has been erased
    204         if set_bid:
    205             self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags))
    206 
    207         self.send_command('rollback')
    208 
    209         # If we aren't using ccd, we should be able to detect the reboot
    210         # almost immediately
    211         self.wait_for_reboot(self.REBOOT_DELAY_WITH_CCD if self.using_ccd() else
    212                 self.REBOOT_DELAY_WITH_FLEX)
    213 
    214         running_partition = self.get_active_version_info()[0]
    215         if inactive_partition != running_partition:
    216             raise error.TestError("Failed to rollback to inactive image")
    217 
    218 
    219     def rolledback(self):
    220         """Returns true if cr50 just rolled back"""
    221         return int(self._servo.get('cr50_reset_count')) > self.MAX_RETRY_COUNT
    222 
    223 
    224     def get_version_info(self, regexp):
    225         """Get information from the version command"""
    226         return self.send_command_get_output('ver', [regexp])[0][1::]
    227 
    228 
    229     def get_inactive_version_info(self):
    230         """Get the active partition, version, and hash"""
    231         return self.get_version_info(self.INACTIVE_VERSION)
    232 
    233 
    234     def get_active_version_info(self):
    235         """Get the active partition, version, and hash"""
    236         return self.get_version_info(self.ACTIVE_VERSION)
    237 
    238 
    239     def get_active_board_id_str(self):
    240         """Get the running image board id.
    241 
    242         Returns:
    243             The board id string or None if the image does not support board id
    244             or the image is not board id locked.
    245         """
    246         # Getting the board id from the version console command is only
    247         # supported in board id locked images .22 and above. Any image that is
    248         # board id locked will have support for getting the image board id.
    249         #
    250         # If board id is not supported on the device, return None. This is
    251         # still expected on all current non board id locked release images.
    252         try:
    253             version_info = self.get_version_info(self.ACTIVE_BID)
    254         except error.TestFail, e:
    255             logging.info(e.message)
    256             logging.info('Cannot use the version to get the board id')
    257             return None
    258 
    259         if self.BID_ERROR in version_info[4]:
    260             raise error.TestError(version_info)
    261         bid = version_info[4].split()[1]
    262         return bid if bid != cr50_utils.EMPTY_IMAGE_BID else None
    263 
    264 
    265     def get_version(self):
    266         """Get the RW version"""
    267         return self.get_active_version_info()[1].strip()
    268 
    269 
    270     def using_servo_v4(self):
    271         """Returns true if the console is being served using servo v4"""
    272         return 'servo_v4' in self._servo.get_servo_version()
    273 
    274 
    275     def using_ccd(self):
    276         """Returns true if the console is being served using CCD"""
    277         return 'ccd_cr50' in self._servo.get_servo_version()
    278 
    279 
    280     def ccd_is_enabled(self):
    281         """Return True if ccd is enabled.
    282 
    283         If the test is running through ccd, return the ccd_state value. If
    284         a flex cable is being used, use the CCD_MODE_L gpio setting to determine
    285         if Cr50 has ccd enabled.
    286 
    287         Returns:
    288             'off' or 'on' based on whether the cr50 console is working.
    289         """
    290         if self.using_ccd():
    291             return self._servo.get('ccd_state') == 'on'
    292         else:
    293             result = self.send_command_get_output('gpioget',
    294                     ['(0|1)..CCD_MODE_L'])
    295             return not bool(int(result[0][1]))
    296 
    297 
    298     @servo_v4_command
    299     def wait_for_ccd_state(self, state, timeout, raise_error):
    300         """Wait up to timeout seconds for CCD to be 'on' or 'off'
    301         Args:
    302             state: a string either 'on' or 'off'.
    303             timeout: time in seconds to wait
    304             raise_error: Raise TestFail if the value is state is not reached.
    305 
    306         Raises
    307             TestFail if ccd never reaches the specified state
    308         """
    309         wait_for_enable = state == 'on'
    310         logging.info("Wait until ccd is '%s'", state)
    311         value = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable,
    312                                      timeout_sec=timeout)
    313         if value != wait_for_enable:
    314             error_msg = "timed out before detecting ccd '%s'" % state
    315             if raise_error:
    316                 raise error.TestFail(error_msg)
    317             logging.warning(error_msg)
    318         logging.info("ccd is '%s'", state)
    319 
    320 
    321     @servo_v4_command
    322     def wait_for_ccd_disable(self, timeout=60, raise_error=True):
    323         """Wait for the cr50 console to stop working"""
    324         self.wait_for_ccd_state('off', timeout, raise_error)
    325 
    326 
    327     @servo_v4_command
    328     def wait_for_ccd_enable(self, timeout=60, raise_error=False):
    329         """Wait for the cr50 console to start working"""
    330         self.wait_for_ccd_state('on', timeout, raise_error)
    331 
    332 
    333     @servo_v4_command
    334     def ccd_disable(self, raise_error=True):
    335         """Change the values of the CC lines to disable CCD"""
    336         logging.info("disable ccd")
    337         self._servo.set_nocheck('servo_v4_dts_mode', 'off')
    338         self.wait_for_ccd_disable(raise_error=raise_error)
    339 
    340 
    341     @servo_v4_command
    342     def ccd_enable(self, raise_error=False):
    343         """Reenable CCD and reset servo interfaces"""
    344         logging.info("reenable ccd")
    345         self._servo.set_nocheck('servo_v4_dts_mode', 'on')
    346         # If the test is actually running with ccd, reset usb and wait for
    347         # communication to come up.
    348         if self.using_ccd():
    349             self._servo.set_nocheck('power_state', 'ccd_reset')
    350         self.wait_for_ccd_enable(raise_error=raise_error)
    351 
    352 
    353     def _level_change_req_pp(self, level):
    354         """Returns True if setting the level will require physical presence"""
    355         testlab_pp = level != 'testlab open' and 'testlab' in level
    356         open_pp = level == 'open'
    357         return testlab_pp or open_pp
    358 
    359 
    360     def _state_to_bool(self, state):
    361         """Converts the state string to True or False"""
    362         # TODO(mruthven): compare to 'on' once servo is up to date in the lab
    363         return state.lower() in self.ON_STRINGS
    364 
    365 
    366     def testlab_is_on(self):
    367         """Returns True of testlab mode is on"""
    368         return self._state_to_bool(self._servo.get('cr50_testlab'))
    369 
    370 
    371     def set_ccd_testlab(self, state):
    372         """Set the testlab mode
    373 
    374         Args:
    375             state: the desired testlab mode string: 'on' or 'off'
    376 
    377         Raises:
    378             TestFail if testlab mode was not changed
    379         """
    380         if self.using_ccd():
    381             raise error.TestError('Cannot set testlab mode with CCD. Use flex '
    382                     'cable instead.')
    383 
    384         request_on = self._state_to_bool(state)
    385         testlab_on = self.testlab_is_on()
    386         request_str = 'on' if request_on else 'off'
    387 
    388         if testlab_on == request_on:
    389             logging.info('ccd testlab already set to %s', request_str)
    390             return
    391 
    392         original_level = self.get_ccd_level()
    393 
    394         # We can only change the testlab mode when the device is open. If
    395         # testlab mode is already enabled, we can go directly to open using 'ccd
    396         # testlab open'. This will save 5 minutes, because we can skip the
    397         # physical presence check.
    398         if testlab_on:
    399             self.send_command('ccd testlab open')
    400         else:
    401             self.set_ccd_level('open')
    402 
    403         # Set testlab mode
    404         rv = self.send_command_get_output('ccd testlab %s' % request_str,
    405                 ['.*>'])[0]
    406         if 'Access Denied' in rv:
    407             raise error.TestFail("'ccd %s' %s" % (request_str, rv))
    408 
    409         # Press the power button once a second for 15 seconds.
    410         self.run_pp(self.PP_SHORT)
    411 
    412         self.set_ccd_level(original_level)
    413 
    414         if request_on != self.testlab_is_on():
    415             raise error.TestFail('Failed to set ccd testlab to %s' % state)
    416 
    417 
    418     def get_ccd_level(self):
    419         """Returns the current ccd privilege level"""
    420         # TODO(mruthven): delete the part removing the trailing 'ed' once
    421         # servo is up to date in the lab
    422         return self._servo.get('cr50_ccd_level').lower().rstrip('ed')
    423 
    424 
    425     def set_ccd_level(self, level):
    426         """Set the Cr50 CCD privilege level.
    427 
    428         Args:
    429             level: a string of the ccd privilege level: 'open', 'lock', or
    430                    'unlock'.
    431 
    432         Raises:
    433             TestFail if the level couldn't be set
    434         ."""
    435         # TODO(mruthven): add support for CCD password
    436         level = level.lower()
    437 
    438         if level == self.get_ccd_level():
    439             logging.info('CCD privilege level is already %s', level)
    440             return
    441 
    442         if 'testlab' in level:
    443             raise error.TestError("Can't change testlab mode using "
    444                 "ccd_set_level")
    445 
    446         testlab_on = self._state_to_bool(self._servo.get('cr50_testlab'))
    447         req_pp = self._level_change_req_pp(level)
    448         has_pp = not self.using_ccd()
    449         dbg_en = 'DBG' in self._servo.get('cr50_version')
    450 
    451         if req_pp and not has_pp:
    452             raise error.TestError("Can't change privilege level to '%s' "
    453                 "without physical presence." % level)
    454 
    455         if not testlab_on and not has_pp:
    456             raise error.TestError("Wont change privilege level without "
    457                 "physical presence or testlab mode enabled")
    458 
    459         # Start the unlock process.
    460         rv = self.send_command_get_output('ccd %s' % level, ['.*>'])[0]
    461         logging.info(rv)
    462         if 'Access Denied' in rv:
    463             raise error.TestFail("'ccd %s' %s" % (level, rv))
    464 
    465         # Press the power button once a second, if we need physical presence.
    466         if req_pp:
    467             # DBG images have shorter unlock processes
    468             self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG)
    469 
    470         if level != self.get_ccd_level():
    471             raise error.TestFail('Could not set privilege level to %s' % level)
    472 
    473         logging.info('Successfully set CCD privelege level to %s', level)
    474 
    475 
    476     def run_pp(self, unlock_timeout):
    477         """Press the power button a for unlock_timeout seconds.
    478 
    479         This will press the power button many more times than it needs to be
    480         pressed. Cr50 doesn't care if you press it too often. It just cares that
    481         you press the power button at least once within the detect interval.
    482 
    483         For privilege level changes you need to press the power button 5 times
    484         in the short interval and then 4 times within the long interval.
    485         Short Interval
    486         100msec < power button press < 5 seconds
    487         Long Interval
    488         60s < power button press < 300s
    489 
    490         For testlab enable/disable you must press the power button 5 times
    491         spaced between 100msec and 5 seconds apart.
    492         """
    493         end_time = time.time() + unlock_timeout
    494 
    495         logging.info('Pressing power button for %ds to unlock the console.',
    496                      unlock_timeout)
    497         logging.info('The process should end at %s', time.ctime(end_time))
    498 
    499         # Press the power button once a second to unlock the console.
    500         while time.time() < end_time:
    501             self._servo.power_short_press()
    502             time.sleep(1)
    503 
    504 
    505     def gettime(self):
    506         """Get the current cr50 system time"""
    507         result = self.send_command_get_output('gettime', [' = (.*) s'])
    508         return float(result[0][1])
    509 
    510 
    511     def servo_v4_supports_dts_mode(self):
    512         """Returns True if cr50 registers changes in servo v4 dts mode."""
    513         # This is to test that Cr50 actually recognizes the change in ccd state
    514         # We cant do that with tests using ccd, because the cr50 communication
    515         # goes down once ccd is enabled.
    516         if 'servo_v4_with_servo_micro' != self._servo.get_servo_version():
    517             return False
    518 
    519         start_val = self._servo.get('servo_v4_dts_mode')
    520         try:
    521             # Verify both ccd enable and disable
    522             self.ccd_disable(raise_error=True)
    523             self.ccd_enable(raise_error=True)
    524             rv = True
    525         except Exception, e:
    526             logging.info(e)
    527             rv = False
    528         self._servo.set_nocheck('servo_v4_dts_mode', start_val)
    529         self.wait_for_ccd_state(start_val, 60, True)
    530         logging.info('Test setup does%s support servo DTS mode',
    531                 '' if rv else 'n\'t')
    532         return rv
    533 
    534 
    535     def wait_until_update_is_allowed(self):
    536         """Wait until cr50 will be able to accept an update.
    537 
    538         Cr50 rejects any attempt to update if it has been less than 60 seconds
    539         since it last recovered from deep sleep or came up from reboot. This
    540         will wait until cr50 gettime shows a time greater than 60.
    541         """
    542         if self.get_active_version_info()[2]:
    543             logging.info("Running DBG image. Don't need to wait for update.")
    544             return
    545         cr50_time = self.gettime()
    546         if cr50_time < 60:
    547             sleep_time = 61 - cr50_time
    548             logging.info('Cr50 has been up for %ds waiting %ds before update',
    549                          cr50_time, sleep_time)
    550             time.sleep(sleep_time)
    551