Home | History | Annotate | Download | only in servo
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import ast, logging, re, time
      6 
      7 from autotest_lib.client.common_lib import error
      8 from autotest_lib.client.cros import ec
      9 
     10 # Hostevent codes, copied from:
     11 #     ec/include/ec_commands.h
     12 HOSTEVENT_LID_CLOSED        = 0x00000001
     13 HOSTEVENT_LID_OPEN          = 0x00000002
     14 HOSTEVENT_POWER_BUTTON      = 0x00000004
     15 HOSTEVENT_AC_CONNECTED      = 0x00000008
     16 HOSTEVENT_AC_DISCONNECTED   = 0x00000010
     17 HOSTEVENT_BATTERY_LOW       = 0x00000020
     18 HOSTEVENT_BATTERY_CRITICAL  = 0x00000040
     19 HOSTEVENT_BATTERY           = 0x00000080
     20 HOSTEVENT_THERMAL_THRESHOLD = 0x00000100
     21 HOSTEVENT_THERMAL_OVERLOAD  = 0x00000200
     22 HOSTEVENT_THERMAL           = 0x00000400
     23 HOSTEVENT_USB_CHARGER       = 0x00000800
     24 HOSTEVENT_KEY_PRESSED       = 0x00001000
     25 HOSTEVENT_INTERFACE_READY   = 0x00002000
     26 # Keyboard recovery combo has been pressed
     27 HOSTEVENT_KEYBOARD_RECOVERY = 0x00004000
     28 # Shutdown due to thermal overload
     29 HOSTEVENT_THERMAL_SHUTDOWN  = 0x00008000
     30 # Shutdown due to battery level too low
     31 HOSTEVENT_BATTERY_SHUTDOWN  = 0x00010000
     32 HOSTEVENT_INVALID           = 0x80000000
     33 
     34 # Time to wait after sending keypress commands.
     35 KEYPRESS_RECOVERY_TIME = 0.5
     36 
     37 
     38 class ChromeConsole(object):
     39     """Manages control of a Chrome console.
     40 
     41     We control the Chrome console via the UART of a Servo board. Chrome console
     42     provides many interfaces to set and get its behavior via console commands.
     43     This class is to abstract these interfaces.
     44     """
     45 
     46     CMD = "_cmd"
     47     REGEXP = "_regexp"
     48     MULTICMD = "_multicmd"
     49 
     50     def __init__(self, servo, name):
     51         """Initialize and keep the servo object.
     52 
     53         Args:
     54           servo: A Servo object.
     55           name: The console name.
     56         """
     57         self.name = name
     58         self.uart_cmd = self.name + self.CMD
     59         self.uart_regexp = self.name + self.REGEXP
     60         self.uart_multicmd = self.name + self.MULTICMD
     61 
     62         self._servo = servo
     63         self._cached_uart_regexp = None
     64 
     65 
     66     def set_uart_regexp(self, regexp):
     67         if self._cached_uart_regexp == regexp:
     68             return
     69         self._cached_uart_regexp = regexp
     70         self._servo.set(self.uart_regexp, regexp)
     71 
     72 
     73     def send_command(self, commands):
     74         """Send command through UART.
     75 
     76         This function opens UART pty when called, and then command is sent
     77         through UART.
     78 
     79         Args:
     80           commands: The commands to send, either a list or a string.
     81         """
     82         self.set_uart_regexp('None')
     83         if isinstance(commands, list):
     84             try:
     85                 self._servo.set_nocheck(self.uart_multicmd, ';'.join(commands))
     86             except error.TestFail as e:
     87                 if 'No control named' in str(e):
     88                     logging.warning(
     89                             'The servod is too old that uart_multicmd '
     90                             'not supported. Use uart_cmd instead.')
     91                     for command in commands:
     92                         self._servo.set_nocheck(self.uart_cmd, command)
     93                 else:
     94                     raise
     95         else:
     96             self._servo.set_nocheck(self.uart_cmd, commands)
     97 
     98 
     99     def send_command_get_output(self, command, regexp_list):
    100         """Send command through UART and wait for response.
    101 
    102         This function waits for response message matching regular expressions.
    103 
    104         Args:
    105           command: The command sent.
    106           regexp_list: List of regular expressions used to match response
    107             message. Note, list must be ordered.
    108 
    109         Returns:
    110           List of tuples, each of which contains the entire matched string and
    111           all the subgroups of the match. None if not matched.
    112           For example:
    113             response of the given command:
    114               High temp: 37.2
    115               Low temp: 36.4
    116             regexp_list:
    117               ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
    118             returns:
    119               [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
    120 
    121         Raises:
    122           error.TestError: An error when the given regexp_list is not valid.
    123         """
    124         if not isinstance(regexp_list, list):
    125             raise error.TestError('Arugment regexp_list is not a list: %s' %
    126                                   str(regexp_list))
    127 
    128         self.set_uart_regexp(str(regexp_list))
    129         self._servo.set_nocheck(self.uart_cmd, command)
    130         return ast.literal_eval(self._servo.get(self.uart_cmd))
    131 
    132 
    133 class ChromeEC(ChromeConsole):
    134     """Manages control of a Chrome EC.
    135 
    136     We control the Chrome EC via the UART of a Servo board. Chrome EC
    137     provides many interfaces to set and get its behavior via console commands.
    138     This class is to abstract these interfaces.
    139     """
    140 
    141     def __init__(self, servo, name="ec_uart"):
    142         super(ChromeEC, self).__init__(servo, name)
    143 
    144 
    145     def key_down(self, keyname):
    146         """Simulate pressing a key.
    147 
    148         Args:
    149           keyname: Key name, one of the keys of KEYMATRIX.
    150         """
    151         self.send_command('kbpress %d %d 1' %
    152                 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]))
    153 
    154 
    155     def key_up(self, keyname):
    156         """Simulate releasing a key.
    157 
    158         Args:
    159           keyname: Key name, one of the keys of KEYMATRIX.
    160         """
    161         self.send_command('kbpress %d %d 0' %
    162                 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]))
    163 
    164 
    165     def key_press(self, keyname):
    166         """Press and then release a key.
    167 
    168         Args:
    169           keyname: Key name, one of the keys of KEYMATRIX.
    170         """
    171         self.send_command([
    172                 'kbpress %d %d 1' %
    173                     (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]),
    174                 'kbpress %d %d 0' %
    175                     (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]),
    176                 ])
    177         # Don't spam the EC console as fast as we can; leave some recovery time
    178         # in between commands.
    179         time.sleep(KEYPRESS_RECOVERY_TIME)
    180 
    181 
    182     def send_key_string_raw(self, string):
    183         """Send key strokes consisting of only characters.
    184 
    185         Args:
    186           string: Raw string.
    187         """
    188         for c in string:
    189             self.key_press(c)
    190 
    191 
    192     def send_key_string(self, string):
    193         """Send key strokes including special keys.
    194 
    195         Args:
    196           string: Character string including special keys. An example
    197             is "this is an<tab>example<enter>".
    198         """
    199         for m in re.finditer("(<[^>]+>)|([^<>]+)", string):
    200             sp, raw = m.groups()
    201             if raw is not None:
    202                 self.send_key_string_raw(raw)
    203             else:
    204                 self.key_press(sp)
    205 
    206 
    207     def reboot(self, flags=''):
    208         """Reboot EC with given flags.
    209 
    210         Args:
    211           flags: Optional, a space-separated string of flags passed to the
    212                  reboot command, including:
    213                    default: EC soft reboot;
    214                    'hard': EC hard/cold reboot;
    215                    'ap-off': Leave AP off after EC reboot (by default, EC turns
    216                              AP on after reboot if lid is open).
    217 
    218         Raises:
    219           error.TestError: If the string of flags is invalid.
    220         """
    221         for flag in flags.split():
    222             if flag not in ('hard', 'ap-off'):
    223                 raise error.TestError(
    224                         'The flag %s of EC reboot command is invalid.' % flag)
    225         self.send_command("reboot %s" % flags)
    226 
    227 
    228     def set_flash_write_protect(self, enable):
    229         """Set the software write protect of EC flash.
    230 
    231         Args:
    232           enable: True to enable write protect, False to disable.
    233         """
    234         if enable:
    235             self.send_command("flashwp enable")
    236         else:
    237             self.send_command("flashwp disable")
    238 
    239 
    240     def set_hostevent(self, codes):
    241         """Set the EC hostevent codes.
    242 
    243         Args:
    244           codes: Hostevent codes, HOSTEVENT_*
    245         """
    246         self.send_command("hostevent set %#x" % codes)
    247         # Allow enough time for EC to process input and set flag.
    248         # See chromium:371631 for details.
    249         # FIXME: Stop importing time module if this hack becomes obsolete.
    250         time.sleep(1)
    251 
    252 
    253     def enable_console_channel(self, channel):
    254         """Find console channel mask and enable that channel only
    255 
    256         @param channel: console channel name
    257         """
    258         # The 'chan' command returns a list of console channels,
    259         # their channel masks and channel numbers
    260         regexp = r'(\d+)\s+([\w]+)\s+\*?\s+{0}'.format(channel)
    261         l = self.send_command_get_output('chan', [regexp])
    262         # Use channel mask and append the 0x for proper hex input value
    263         cmd = 'chan 0x' + l[0][2]
    264         # Set console to only output the desired channel
    265         self.send_command(cmd)
    266 
    267 
    268 class ChromeUSBPD(ChromeEC):
    269     """Manages control of a Chrome USBPD.
    270 
    271     We control the Chrome EC via the UART of a Servo board. Chrome USBPD
    272     provides many interfaces to set and get its behavior via console commands.
    273     This class is to abstract these interfaces.
    274     """
    275 
    276     def __init__(self, servo):
    277         super(ChromeUSBPD, self).__init__(servo, "usbpd_uart")
    278 
    279 
    280 class ChromeCr50(ChromeConsole):
    281     """Manages control of a Chrome Cr50.
    282 
    283     We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
    284     provides many interfaces to set and get its behavior via console commands.
    285     This class is to abstract these interfaces.
    286     """
    287     IDLE_COUNT = 'count: (\d+)'
    288 
    289     def __init__(self, servo):
    290         super(ChromeCr50, self).__init__(servo, "cr50_console")
    291 
    292 
    293     def get_deep_sleep_count(self):
    294         """Get the deep sleep count from the idle task"""
    295         result = self.send_command_get_output('idle', [self.IDLE_COUNT])
    296         return int(result[0][1])
    297 
    298 
    299     def clear_deep_sleep_count(self):
    300         """Clear the deep sleep count"""
    301         result = self.send_command_get_output('idle c', [self.IDLE_COUNT])
    302         if int(result[0][1]):
    303             raise error.TestFail("Could not clear deep sleep count")
    304 
    305 
    306     def ccd_disable(self):
    307         """Change the values of the CC lines to disable CCD"""
    308         self._servo.set_nocheck('servo_v4_ccd_mode', 'disconnect')
    309         # TODO: Add a better way to wait until usb is disconnected
    310         time.sleep(3)
    311 
    312 
    313     def ccd_enable(self):
    314         """Reenable CCD and reset servo interfaces"""
    315         self._servo.set_nocheck('servo_v4_ccd_mode', 'ccd')
    316         self._servo.set('sbu_mux_enable', 'on')
    317         self._servo.set_nocheck('power_state', 'ccd_reset')
    318         time.sleep(2)
    319