Home | History | Annotate | Download | only in cros
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 import logging
      5 import re
      6 import time
      7 
      8 from autotest_lib.client.bin import utils
      9 from autotest_lib.client.common_lib import error
     10 
     11 # en-US key matrix (from "kb membrane pin matrix.pdf")
     12 KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3),
     13              '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9),
     14              '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4),
     15              'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6),
     16              'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5),
     17              '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3),
     18              'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9),
     19              ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2),
     20              'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5),
     21              '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12),
     22              '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1),
     23              '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0),
     24              '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2),
     25              '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2),
     26              '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4),
     27              '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9),
     28              '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11),
     29              '<left>': (7, 12)}
     30 
     31 
     32 def has_ectool():
     33     """Determine if ectool shell command is present.
     34 
     35     Returns:
     36         boolean true if avail, false otherwise.
     37     """
     38     cmd = 'which ectool'
     39     return (utils.system(cmd, ignore_status=True) == 0)
     40 
     41 
     42 class EC_Common(object):
     43     """Class for EC common.
     44 
     45     This incredibly brief base class is intended to encapsulate common elements
     46     across various CrOS MCUs (ec proper, USB-PD, Sensor Hub).  At the moment
     47     that includes only the use of ectool.
     48     """
     49 
     50     def __init__(self, target='cros_ec'):
     51         """Constructor.
     52 
     53         @param target: target name of ec to communicate with.
     54         """
     55         if not has_ectool():
     56             ec_info = utils.system_output("mosys ec info",
     57                                           ignore_status=True)
     58             logging.warning("Ectool absent on this platform ( %s )",
     59                          ec_info)
     60             raise error.TestNAError("Platform doesn't support ectool")
     61         self._target = target
     62 
     63     def ec_command(self, cmd, **kwargs):
     64         """Executes ec command and returns results.
     65 
     66         @param cmd: string of command to execute.
     67         @param kwargs: optional params passed to utils.system_output
     68 
     69         @returns: string of results from ec command.
     70         """
     71         full_cmd = 'ectool --name=%s %s' % (self._target, cmd)
     72         result = utils.system_output(full_cmd, **kwargs)
     73         logging.debug('Command: %s', full_cmd)
     74         logging.debug('Result: %s', result)
     75         return result
     76 
     77 
     78 class EC(EC_Common):
     79     """Class for CrOS embedded controller (EC)."""
     80     HELLO_RE = "EC says hello"
     81     GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)"
     82     SET_FANSPEED_RE = "Fan target RPM set."
     83     TEMP_SENSOR_RE = "Reading temperature...([0-9]*)"
     84     TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on"
     85     # For battery, check we can see a non-zero capacity value.
     86     BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh"
     87     LIGHTBAR_RE = "^ 05\s+3f\s+3f$"
     88 
     89 
     90     def hello(self):
     91         """Test EC hello command.
     92 
     93         @returns True if success False otherwise.
     94         """
     95         response = self.ec_command('hello')
     96         return (re.search(self.HELLO_RE, response) is not None)
     97 
     98     def auto_fan_ctrl(self):
     99         """Turns auto fan ctrl on.
    100 
    101         @returns True if success False otherwise.
    102         """
    103         response = self.ec_command('autofanctrl')
    104         logging.info('Turned on auto fan control.')
    105         return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None)
    106 
    107     def get_fanspeed(self):
    108         """Gets fanspeed.
    109 
    110         @raises error.TestError if regexp fails to match.
    111 
    112         @returns integer of fan speed RPM.
    113         """
    114         response = self.ec_command('pwmgetfanrpm')
    115         match = re.search(self.GET_FANSPEED_RE, response)
    116         if not match:
    117             raise error.TestError('Unable to read fan speed')
    118 
    119         rpm = int(match.group(1))
    120         logging.info('Fan speed: %d', rpm)
    121         return rpm
    122 
    123     def set_fanspeed(self, rpm):
    124         """Sets fan speed.
    125 
    126         @param rpm: integer of fan speed RPM to set
    127 
    128         @returns True if success False otherwise.
    129         """
    130         response = self.ec_command('pwmsetfanrpm %d' % rpm)
    131         logging.info('Set fan speed: %d', rpm)
    132         return (re.search(self.SET_FANSPEED_RE, response) is not None)
    133 
    134     def get_temperature(self, idx):
    135         """Gets temperature from idx sensor.
    136 
    137         @param idx: integer of temp sensor to read.
    138 
    139         @raises error.TestError if fails to read sensor.
    140 
    141         @returns integer of temperature reading in degrees Kelvin.
    142         """
    143         response = self.ec_command('temps %d' % idx)
    144         match = re.search(self.TEMP_SENSOR_RE, response)
    145         if not match:
    146             raise error.TestError('Unable to read temperature sensor %d' % idx)
    147 
    148         return int(match.group(1))
    149 
    150     def get_battery(self):
    151         """Get battery presence (design capacity found).
    152 
    153         @returns True if success False otherwise.
    154         """
    155         response = self.ec_command('battery')
    156         return (re.search(self.BATTERY_RE, response) is not None)
    157 
    158     def get_lightbar(self):
    159         """Test lightbar.
    160 
    161         @returns True if success False otherwise.
    162         """
    163         self.ec_command('lightbar on')
    164         self.ec_command('lightbar init')
    165         self.ec_command('lightbar 4 255 255 255')
    166         response = self.ec_command('lightbar')
    167         self.ec_command('lightbar off')
    168         return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None)
    169 
    170     def key_press(self, key):
    171         """Emit key down and up signal of the keyboard.
    172 
    173         @param key: name of a key defined in KEYMATRIX.
    174         """
    175         self.key_down(key)
    176         self.key_up(key)
    177 
    178     def _key_action(self, key, action_type):
    179         if not key in KEYMATRIX:
    180             raise error.TestError('Unknown key: ' + key)
    181         row, col = KEYMATRIX[key]
    182         self.ec_command('kbpress %d %d %d' % (row, col, action_type))
    183 
    184     def key_down(self, key):
    185         """Emit key down signal of the keyboard.
    186 
    187         @param key: name of a key defined in KEYMATRIX.
    188         """
    189         self._key_action(key, 1)
    190 
    191     def key_up(self, key):
    192         """Emit key up signal of the keyboard.
    193 
    194         @param key: name of a key defined in KEYMATRIX.
    195         """
    196         self._key_action(key, 0)
    197 
    198 
    199 class EC_USBPD_Port(EC_Common):
    200     """Class for CrOS embedded controller for USB-PD Port.
    201 
    202     Public attributes:
    203         index: integer of USB type-C port index.
    204 
    205     Public Methods:
    206         is_dfp: Determine if data role is Downstream Facing Port (DFP).
    207         is_amode_supported: Check if alternate mode is supported by port.
    208         is_amode_entered: Check if alternate mode is entered.
    209         set_amode: Set an alternate mode.
    210 
    211     Private attributes:
    212         _port: integer of USB type-C port id.
    213         _port_info: holds usbpd protocol info.
    214         _amodes: holds alternate mode info.
    215 
    216     Private methods:
    217         _invalidate_port_data: Remove port data to force re-eval.
    218         _get_port_info: Get USB-PD port info.
    219         _get_amodes: parse and return port's svid info.
    220     """
    221     def __init__(self, index):
    222         """Constructor.
    223 
    224         @param index: integer of USB type-C port index.
    225         """
    226         self.index = index
    227         # TODO(crosbug.com/p/38133) target= only works for samus
    228         super(EC_USBPD_Port, self).__init__(target='cros_pd')
    229 
    230         # Interrogate port at instantiation.  Use invalidate to force re-eval.
    231         self._port_info = self._get_port_info()
    232         self._amodes = self._get_amodes()
    233 
    234     def _invalidate_port_data(self):
    235         """Remove port data to force re-eval."""
    236         self._port_info = None
    237         self._amodes = None
    238 
    239     def _get_port_info(self):
    240         """Get USB-PD port info.
    241 
    242         ectool command usbpd provides the following information about the port:
    243           - Enabled/Disabled
    244           - Power & Data Role
    245           - Polarity
    246           - Protocol State
    247 
    248         At time of authoring it looks like:
    249           Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY
    250 
    251         @raises error.TestError if ...
    252           port info not parseable.
    253 
    254         @returns dictionary for <port> with keyval pairs:
    255           enabled: True | False | None
    256           power_role: sink | source | None
    257           data_role: UFP | DFP | None
    258           is_reversed: True | False | None
    259           state: various strings | None
    260         """
    261         PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \
    262                        'Polarity:CC(\d+)\s+State:(\w+)'
    263 
    264         match = re.search(PORT_INFO_RE,
    265                           self.ec_command("usbpd %s" % (self.index)))
    266         if not match or int(match.group(1)) != self.index:
    267             error.TestError('Unable to determine port %d info' % self.index)
    268 
    269         pinfo = dict(enabled=None, power_role=None, data_role=None,
    270                     is_reversed=None, state=None)
    271         pinfo['enabled'] = match.group(2) == 'enabled'
    272         pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source'
    273         pinfo['data_role'] = match.group(4)
    274         pinfo['is_reversed'] = True if match.group(5) == '2' else False
    275         pinfo['state'] = match.group(6)
    276         logging.debug('port_info = %s', pinfo)
    277         return pinfo
    278 
    279     def _get_amodes(self):
    280         """Parse alternate modes from pdgetmode.
    281 
    282         Looks like ...
    283           *SVID:0xff01 *0x00000485  0x00000000 ...
    284           SVID:0x18d1   0x00000001  0x00000000 ...
    285 
    286         @returns dictionary of format:
    287           <svid>: {active: True|False, configs: <config_list>, opos:<opos>}
    288             where:
    289               <svid>        : USB-IF Standard or vendor id as
    290                               hex string (i.e. 0xff01)
    291               <config_list> : list of uint32_t configs
    292               <opos>        : integer of active object position.
    293                               Note, this is the config list index + 1
    294         """
    295         SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)'
    296         svids = dict()
    297         cmd = 'pdgetmode %d' % self.index
    298         for line in self.ec_command(cmd, ignore_status=True).split('\n'):
    299             if line.strip() == '':
    300                 continue
    301             logging.debug('pdgetmode line: %s', line)
    302             match = re.search(SVID_RE, line)
    303             if not match:
    304                 logging.warning("Unable to parse SVID line %s", line)
    305                 continue
    306             active = match.group(1) == '*'
    307             svid = match.group(2)
    308             configs_str = match.group(3)
    309             configs = list()
    310             opos = None
    311             for i,config in enumerate(configs_str.split(), 1):
    312                 if config.startswith('*'):
    313                     opos = i
    314                     config = config[1:]
    315                 config = int(config, 16)
    316                 # ignore unpopulated configs
    317                 if config == 0:
    318                     continue
    319                 configs.append(config)
    320             svids[svid] = dict(active=active, configs=configs, opos=opos)
    321 
    322         logging.debug("Port %d svids = %s", self.index, svids)
    323         return svids
    324 
    325     def is_dfp(self):
    326         """Determine if data role is Downstream Facing Port (DFP).
    327 
    328         @returns True if DFP False otherwise.
    329         """
    330         if self._port_info is None:
    331             self._port_info = self._get_port_info()
    332 
    333         return self._port_info['data_role'] == 'DFP'
    334 
    335     def is_amode_supported(self, svid):
    336         """Check if alternate mode is supported by port partner.
    337 
    338         @param svid: alternate mode SVID hexstring (i.e. 0xff01)
    339         """
    340         if self._amodes is None:
    341             self._amodes = self._get_amodes()
    342 
    343         if svid in self._amodes.keys():
    344             return True
    345         return False
    346 
    347     def is_amode_entered(self, svid, opos):
    348         """Check if alternate mode is entered.
    349 
    350         @param svid: alternate mode SVID hexstring (i.e. 0xff01).
    351         @param opos: object position of config to act on.
    352 
    353         @returns True if entered False otherwise
    354         """
    355         if self._amodes is None:
    356             self._amodes = self._get_amodes()
    357 
    358         if not self.is_amode_supported(svid):
    359             return False
    360 
    361         if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos:
    362             return True
    363 
    364         return False
    365 
    366     def set_amode(self, svid, opos, enter, delay_secs=2):
    367         """Set alternate mode.
    368 
    369         @param svid: alternate mode SVID hexstring (i.e. 0xff01).
    370         @param opos: object position of config to act on.
    371         @param enter: Boolean of whether to enter mode.
    372 
    373         @raises error.TestError if ...
    374            mode not supported.
    375            opos is > number of configs.
    376 
    377         @returns True if successful False otherwise
    378         """
    379         if self._amodes is None:
    380             self._amodes = self._get_amodes()
    381 
    382         if svid not in self._amodes.keys():
    383             raise error.TestError("SVID %s not supported", svid)
    384 
    385         if opos > len(self._amodes[svid]['configs']):
    386             raise error.TestError("opos > available configs")
    387 
    388         cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos,
    389                                          1 if enter else 0)
    390         self.ec_command(cmd, ignore_status=True)
    391         self._invalidate_port_data()
    392 
    393         # allow some time for mode entry/exit
    394         time.sleep(delay_secs)
    395         return self.is_amode_entered(svid, opos) == enter
    396 
    397     def get_flash_info(self):
    398         mat0_re = r'has no discovered device'
    399         mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*'
    400         mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*'
    401         flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major',
    402                                     'dev_minor', 'rw_hash', 'image_status'])
    403 
    404         cmd = 'infopddev %d' % self.index
    405 
    406         tries = 3
    407         while (tries):
    408             res = self.ec_command(cmd, ignore_status=True)
    409             if not 'has no discovered device' in res:
    410                 break
    411 
    412             tries -= 1
    413             time.sleep(1)
    414 
    415         for ln in res.split('\n'):
    416             mat1 = re.match(mat1_re, ln)
    417             if mat1:
    418                 flash_dict['ptype'] = int(mat1.group(1))
    419                 flash_dict['vid'] = mat1.group(2)
    420                 flash_dict['pid'] = mat1.group(3)
    421                 continue
    422 
    423             mat2 = re.match(mat2_re, ln)
    424             if mat2:
    425                 flash_dict['dev_major'] = int(mat2.group(1))
    426                 flash_dict['dev_minor'] = int(mat2.group(2))
    427                 flash_dict['rw_hash'] = mat2.group(3)
    428                 flash_dict['image_status'] = mat2.group(4)
    429                 break
    430 
    431         return flash_dict
    432 
    433 
    434 class EC_USBPD(EC_Common):
    435     """Class for CrOS embedded controller for USB-PD.
    436 
    437     Public attributes:
    438         ports: list EC_USBPD_Port instances
    439 
    440     Public Methods:
    441         get_num_ports: get number of USB-PD ports device has.
    442 
    443     Private attributes:
    444         _num_ports: integer number of USB-PD ports device has.
    445     """
    446     def __init__(self, num_ports=None):
    447         """Constructor.
    448 
    449         @param num_ports: total number of USB-PD ports on device.  This is an
    450           override.  If left 'None' will try to determine.
    451         """
    452         self._num_ports = num_ports
    453         self.ports = list()
    454 
    455         # TODO(crosbug.com/p/38133) target= only works for samus
    456         super(EC_USBPD, self).__init__(target='cros_pd')
    457 
    458         if (self.get_num_ports() == 0):
    459             raise error.TestNAError("Device has no USB-PD ports")
    460 
    461         for i in xrange(self._num_ports):
    462             self.ports.append(EC_USBPD_Port(i))
    463 
    464     def get_num_ports(self):
    465         """Determine the number of ports for device.
    466 
    467         Uses ectool's usbpdpower command which in turn makes host command call
    468         to EC_CMD_USB_PD_PORTS to determine the number of ports.
    469 
    470         TODO(tbroch) May want to consider adding separate ectool command to
    471         surface the number of ports directly instead of via usbpdpower
    472 
    473         @returns number of ports.
    474         """
    475         if (self._num_ports is not None):
    476             return self._num_ports
    477 
    478         self._num_ports = len(self.ec_command("usbpdpower").split(b'\n'))
    479         return self._num_ports
    480