Home | History | Annotate | Download | only in servo
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import ast
      6 import functools
      7 import logging
      8 import re
      9 import time
     10 
     11 from autotest_lib.client.bin import utils
     12 from autotest_lib.client.common_lib import error
     13 from autotest_lib.client.cros import ec
     14 
     15 # Hostevent codes, copied from:
     16 #     ec/include/ec_commands.h
     17 HOSTEVENT_LID_CLOSED        = 0x00000001
     18 HOSTEVENT_LID_OPEN          = 0x00000002
     19 HOSTEVENT_POWER_BUTTON      = 0x00000004
     20 HOSTEVENT_AC_CONNECTED      = 0x00000008
     21 HOSTEVENT_AC_DISCONNECTED   = 0x00000010
     22 HOSTEVENT_BATTERY_LOW       = 0x00000020
     23 HOSTEVENT_BATTERY_CRITICAL  = 0x00000040
     24 HOSTEVENT_BATTERY           = 0x00000080
     25 HOSTEVENT_THERMAL_THRESHOLD = 0x00000100
     26 HOSTEVENT_THERMAL_OVERLOAD  = 0x00000200
     27 HOSTEVENT_THERMAL           = 0x00000400
     28 HOSTEVENT_USB_CHARGER       = 0x00000800
     29 HOSTEVENT_KEY_PRESSED       = 0x00001000
     30 HOSTEVENT_INTERFACE_READY   = 0x00002000
     31 # Keyboard recovery combo has been pressed
     32 HOSTEVENT_KEYBOARD_RECOVERY = 0x00004000
     33 # Shutdown due to thermal overload
     34 HOSTEVENT_THERMAL_SHUTDOWN  = 0x00008000
     35 # Shutdown due to battery level too low
     36 HOSTEVENT_BATTERY_SHUTDOWN  = 0x00010000
     37 HOSTEVENT_INVALID           = 0x80000000
     38 
     39 # Time to wait after sending keypress commands.
     40 KEYPRESS_RECOVERY_TIME = 0.5
     41 
     42 
     43 class ChromeConsole(object):
     44     """Manages control of a Chrome console.
     45 
     46     We control the Chrome console via the UART of a Servo board. Chrome console
     47     provides many interfaces to set and get its behavior via console commands.
     48     This class is to abstract these interfaces.
     49     """
     50 
     51     CMD = "_cmd"
     52     REGEXP = "_regexp"
     53     MULTICMD = "_multicmd"
     54 
     55     def __init__(self, servo, name):
     56         """Initialize and keep the servo object.
     57 
     58         Args:
     59           servo: A Servo object.
     60           name: The console name.
     61         """
     62         self.name = name
     63         self.uart_cmd = self.name + self.CMD
     64         self.uart_regexp = self.name + self.REGEXP
     65         self.uart_multicmd = self.name + self.MULTICMD
     66 
     67         self._servo = servo
     68         self._cached_uart_regexp = None
     69 
     70 
     71     def set_uart_regexp(self, regexp):
     72         if self._cached_uart_regexp == regexp:
     73             return
     74         self._cached_uart_regexp = regexp
     75         self._servo.set(self.uart_regexp, regexp)
     76 
     77 
     78     def send_command(self, commands):
     79         """Send command through UART.
     80 
     81         This function opens UART pty when called, and then command is sent
     82         through UART.
     83 
     84         Args:
     85           commands: The commands to send, either a list or a string.
     86         """
     87         self.set_uart_regexp('None')
     88         if isinstance(commands, list):
     89             try:
     90                 self._servo.set_nocheck(self.uart_multicmd, ';'.join(commands))
     91             except error.TestFail as e:
     92                 if 'No control named' in str(e):
     93                     logging.warning(
     94                             'The servod is too old that uart_multicmd '
     95                             'not supported. Use uart_cmd instead.')
     96                     for command in commands:
     97                         self._servo.set_nocheck(self.uart_cmd, command)
     98                 else:
     99                     raise
    100         else:
    101             self._servo.set_nocheck(self.uart_cmd, commands)
    102 
    103 
    104     def send_command_get_output(self, command, regexp_list):
    105         """Send command through UART and wait for response.
    106 
    107         This function waits for response message matching regular expressions.
    108 
    109         Args:
    110           command: The command sent.
    111           regexp_list: List of regular expressions used to match response
    112             message. Note, list must be ordered.
    113 
    114         Returns:
    115           List of tuples, each of which contains the entire matched string and
    116           all the subgroups of the match. None if not matched.
    117           For example:
    118             response of the given command:
    119               High temp: 37.2
    120               Low temp: 36.4
    121             regexp_list:
    122               ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
    123             returns:
    124               [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
    125 
    126         Raises:
    127           error.TestError: An error when the given regexp_list is not valid.
    128         """
    129         if not isinstance(regexp_list, list):
    130             raise error.TestError('Arugment regexp_list is not a list: %s' %
    131                                   str(regexp_list))
    132 
    133         self.set_uart_regexp(str(regexp_list))
    134         self._servo.set_nocheck(self.uart_cmd, command)
    135         return ast.literal_eval(self._servo.get(self.uart_cmd))
    136 
    137 
    138 class ChromeEC(ChromeConsole):
    139     """Manages control of a Chrome EC.
    140 
    141     We control the Chrome EC via the UART of a Servo board. Chrome EC
    142     provides many interfaces to set and get its behavior via console commands.
    143     This class is to abstract these interfaces.
    144     """
    145 
    146     def __init__(self, servo, name="ec_uart"):
    147         super(ChromeEC, self).__init__(servo, name)
    148 
    149 
    150     def key_down(self, keyname):
    151         """Simulate pressing a key.
    152 
    153         Args:
    154           keyname: Key name, one of the keys of KEYMATRIX.
    155         """
    156         self.send_command('kbpress %d %d 1' %
    157                 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]))
    158 
    159 
    160     def key_up(self, keyname):
    161         """Simulate releasing a key.
    162 
    163         Args:
    164           keyname: Key name, one of the keys of KEYMATRIX.
    165         """
    166         self.send_command('kbpress %d %d 0' %
    167                 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]))
    168 
    169 
    170     def key_press(self, keyname):
    171         """Press and then release a key.
    172 
    173         Args:
    174           keyname: Key name, one of the keys of KEYMATRIX.
    175         """
    176         self.send_command([
    177                 'kbpress %d %d 1' %
    178                     (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]),
    179                 'kbpress %d %d 0' %
    180                     (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]),
    181                 ])
    182         # Don't spam the EC console as fast as we can; leave some recovery time
    183         # in between commands.
    184         time.sleep(KEYPRESS_RECOVERY_TIME)
    185 
    186 
    187     def send_key_string_raw(self, string):
    188         """Send key strokes consisting of only characters.
    189 
    190         Args:
    191           string: Raw string.
    192         """
    193         for c in string:
    194             self.key_press(c)
    195 
    196 
    197     def send_key_string(self, string):
    198         """Send key strokes including special keys.
    199 
    200         Args:
    201           string: Character string including special keys. An example
    202             is "this is an<tab>example<enter>".
    203         """
    204         for m in re.finditer("(<[^>]+>)|([^<>]+)", string):
    205             sp, raw = m.groups()
    206             if raw is not None:
    207                 self.send_key_string_raw(raw)
    208             else:
    209                 self.key_press(sp)
    210 
    211 
    212     def reboot(self, flags=''):
    213         """Reboot EC with given flags.
    214 
    215         Args:
    216           flags: Optional, a space-separated string of flags passed to the
    217                  reboot command, including:
    218                    default: EC soft reboot;
    219                    'hard': EC hard/cold reboot;
    220                    'ap-off': Leave AP off after EC reboot (by default, EC turns
    221                              AP on after reboot if lid is open).
    222 
    223         Raises:
    224           error.TestError: If the string of flags is invalid.
    225         """
    226         for flag in flags.split():
    227             if flag not in ('hard', 'ap-off'):
    228                 raise error.TestError(
    229                         'The flag %s of EC reboot command is invalid.' % flag)
    230         self.send_command("reboot %s" % flags)
    231 
    232 
    233     def set_flash_write_protect(self, enable):
    234         """Set the software write protect of EC flash.
    235 
    236         Args:
    237           enable: True to enable write protect, False to disable.
    238         """
    239         if enable:
    240             self.send_command("flashwp enable")
    241         else:
    242             self.send_command("flashwp disable")
    243 
    244 
    245     def set_hostevent(self, codes):
    246         """Set the EC hostevent codes.
    247 
    248         Args:
    249           codes: Hostevent codes, HOSTEVENT_*
    250         """
    251         self.send_command("hostevent set %#x" % codes)
    252         # Allow enough time for EC to process input and set flag.
    253         # See chromium:371631 for details.
    254         # FIXME: Stop importing time module if this hack becomes obsolete.
    255         time.sleep(1)
    256 
    257 
    258     def enable_console_channel(self, channel):
    259         """Find console channel mask and enable that channel only
    260 
    261         @param channel: console channel name
    262         """
    263         # The 'chan' command returns a list of console channels,
    264         # their channel masks and channel numbers
    265         regexp = r'(\d+)\s+([\w]+)\s+\*?\s+{0}'.format(channel)
    266         l = self.send_command_get_output('chan', [regexp])
    267         # Use channel mask and append the 0x for proper hex input value
    268         cmd = 'chan 0x' + l[0][2]
    269         # Set console to only output the desired channel
    270         self.send_command(cmd)
    271 
    272 
    273 class ChromeUSBPD(ChromeEC):
    274     """Manages control of a Chrome USBPD.
    275 
    276     We control the Chrome EC via the UART of a Servo board. Chrome USBPD
    277     provides many interfaces to set and get its behavior via console commands.
    278     This class is to abstract these interfaces.
    279     """
    280 
    281     def __init__(self, servo):
    282         super(ChromeUSBPD, self).__init__(servo, "usbpd_uart")
    283