Home | History | Annotate | Download | only in cros
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 import logging
      5 import re
      6 import time
      7 
      8 from autotest_lib.client.bin import utils
      9 from autotest_lib.client.common_lib import error
     10 
     11 # en-US key matrix (from "kb membrane pin matrix.pdf")
     12 KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3),
     13              '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9),
     14              '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4),
     15              'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6),
     16              'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5),
     17              '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3),
     18              'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9),
     19              ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2),
     20              'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5),
     21              '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12),
     22              '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1),
     23              '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0),
     24              '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2),
     25              '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2),
     26              '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4),
     27              '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9),
     28              '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11),
     29              '<left>': (7, 12)}
     30 
     31 
     32 def has_ectool():
     33     """Determine if ectool shell command is present.
     34 
     35     Returns:
     36         boolean true if avail, false otherwise.
     37     """
     38     cmd = 'which ectool'
     39     return (utils.system(cmd, ignore_status=True) == 0)
     40 
     41 
     42 class ECError(Exception):
     43     """Base class for a failure when communicating with EC."""
     44     pass
     45 
     46 
     47 class EC_Common(object):
     48     """Class for EC common.
     49 
     50     This incredibly brief base class is intended to encapsulate common elements
     51     across various CrOS MCUs (ec proper, USB-PD, Sensor Hub).  At the moment
     52     that includes only the use of ectool.
     53     """
     54 
     55     def __init__(self, target='cros_ec'):
     56         """Constructor.
     57 
     58         @param target: target name of ec to communicate with.
     59         """
     60         if not has_ectool():
     61             ec_info = utils.system_output("mosys ec info",
     62                                           ignore_status=True)
     63             logging.warning("Ectool absent on this platform ( %s )",
     64                          ec_info)
     65             raise error.TestNAError("Platform doesn't support ectool")
     66         self._target = target
     67 
     68     def ec_command(self, cmd, **kwargs):
     69         """Executes ec command and returns results.
     70 
     71         @param cmd: string of command to execute.
     72         @param kwargs: optional params passed to utils.system_output
     73 
     74         @returns: string of results from ec command.
     75         """
     76         full_cmd = 'ectool --name=%s %s' % (self._target, cmd)
     77         logging.debug('Command: %s', full_cmd)
     78         result = utils.system_output(full_cmd, **kwargs)
     79         logging.debug('Result: %s', result)
     80         return result
     81 
     82 
     83 class EC(EC_Common):
     84     """Class for CrOS embedded controller (EC)."""
     85     HELLO_RE = "EC says hello"
     86     GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)"
     87     SET_FANSPEED_RE = "Fan target RPM set."
     88     TEMP_SENSOR_TEMP_RE = "Reading temperature...([0-9]*)"
     89     # <sensor idx>: <sensor type> <sensor name>
     90     TEMP_SENSOR_INFO_RE = "(\d+):\s+(\d+)\s+([a-zA-Z_0-9]+)"
     91     TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on"
     92     # For battery, check we can see a non-zero capacity value.
     93     BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh"
     94     LIGHTBAR_RE = "^ 05\s+3f\s+3f$"
     95 
     96     def __init__(self):
     97         """Constructor."""
     98         super(EC, self).__init__()
     99         self._temperature_dict = None
    100 
    101     def hello(self, **kwargs):
    102         """Test EC hello command.
    103 
    104         @param kwargs: optional params passed to utils.system_output
    105 
    106         @returns True if success False otherwise.
    107         """
    108         response = self.ec_command('hello', **kwargs)
    109         return (re.search(self.HELLO_RE, response) is not None)
    110 
    111     def auto_fan_ctrl(self):
    112         """Turns auto fan ctrl on.
    113 
    114         @returns True if success False otherwise.
    115         """
    116         response = self.ec_command('autofanctrl')
    117         logging.info('Turned on auto fan control.')
    118         return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None)
    119 
    120     def get_fanspeed(self):
    121         """Gets fanspeed.
    122 
    123         @raises error.TestError if regexp fails to match.
    124 
    125         @returns integer of fan speed RPM.
    126         """
    127         response = self.ec_command('pwmgetfanrpm')
    128         match = re.search(self.GET_FANSPEED_RE, response)
    129         if not match:
    130             raise error.TestError('Unable to read fan speed')
    131 
    132         rpm = int(match.group(1))
    133         logging.info('Fan speed: %d', rpm)
    134         return rpm
    135 
    136     def set_fanspeed(self, rpm):
    137         """Sets fan speed.
    138 
    139         @param rpm: integer of fan speed RPM to set
    140 
    141         @returns True if success False otherwise.
    142         """
    143         response = self.ec_command('pwmsetfanrpm %d' % rpm)
    144         logging.info('Set fan speed: %d', rpm)
    145         return (re.search(self.SET_FANSPEED_RE, response) is not None)
    146 
    147     def _get_temperature_dict(self):
    148         """Read EC temperature name and idx into a dict.
    149 
    150         @returns dict where key=<sensor name>, value =<sensor idx>
    151         """
    152         # The sensor (name, idx) mapping does not change.
    153         if self._temperature_dict:
    154             return self._temperature_dict
    155 
    156         temperature_dict = {}
    157         response = self.ec_command('tempsinfo all')
    158         for rline in response.split('\n'):
    159             match = re.search(self.TEMP_SENSOR_INFO_RE, rline)
    160             if match:
    161                 temperature_dict[match.group(3)] = int(match.group(1))
    162 
    163         self._temperature_dict = temperature_dict
    164         return temperature_dict
    165 
    166     def get_temperature(self, idx=None, name=None):
    167         """Gets temperature from idx sensor.
    168 
    169         Reads temperature either directly if idx is provided or by discovering
    170         idx using name.
    171 
    172         @param idx:  integer of temp sensor to read.  Default=None
    173         @param name: string of temp sensor to read.  Default=None.
    174             For example: Battery, Ambient, Charger, DRAM, eMMC, Gyro
    175 
    176         @raises ECError if fails to find idx of name.
    177         @raises error.TestError if fails to read sensor or fails to identify
    178         sensor to read from idx & name param.
    179 
    180         @returns integer of temperature reading in degrees Kelvin.
    181         """
    182         if idx is None:
    183             temperature_dict = self._get_temperature_dict()
    184             if name in temperature_dict:
    185                 idx = temperature_dict[name]
    186             else:
    187                 raise ECError('Finding temp idx for name %s' % name)
    188 
    189         response = self.ec_command('temps %d' % idx)
    190         match = re.search(self.TEMP_SENSOR_TEMP_RE, response)
    191         if not match:
    192             raise error.TestError('Reading temperature idx %d' % idx)
    193 
    194         return int(match.group(1))
    195 
    196     def get_battery(self):
    197         """Get battery presence (design capacity found).
    198 
    199         @returns True if success False otherwise.
    200         """
    201         try:
    202             response = self.ec_command('battery')
    203         except error.CmdError:
    204             raise ECError('calling EC battery command')
    205 
    206         return (re.search(self.BATTERY_RE, response) is not None)
    207 
    208     def get_lightbar(self):
    209         """Test lightbar.
    210 
    211         @returns True if success False otherwise.
    212         """
    213         self.ec_command('lightbar on')
    214         self.ec_command('lightbar init')
    215         self.ec_command('lightbar 4 255 255 255')
    216         response = self.ec_command('lightbar')
    217         self.ec_command('lightbar off')
    218         return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None)
    219 
    220     def key_press(self, key):
    221         """Emit key down and up signal of the keyboard.
    222 
    223         @param key: name of a key defined in KEYMATRIX.
    224         """
    225         self.key_down(key)
    226         self.key_up(key)
    227 
    228     def _key_action(self, key, action_type):
    229         if not key in KEYMATRIX:
    230             raise error.TestError('Unknown key: ' + key)
    231         row, col = KEYMATRIX[key]
    232         self.ec_command('kbpress %d %d %d' % (row, col, action_type))
    233 
    234     def key_down(self, key):
    235         """Emit key down signal of the keyboard.
    236 
    237         @param key: name of a key defined in KEYMATRIX.
    238         """
    239         self._key_action(key, 1)
    240 
    241     def key_up(self, key):
    242         """Emit key up signal of the keyboard.
    243 
    244         @param key: name of a key defined in KEYMATRIX.
    245         """
    246         self._key_action(key, 0)
    247 
    248 
    249 class EC_USBPD_Port(EC_Common):
    250     """Class for CrOS embedded controller for USB-PD Port.
    251 
    252     Public attributes:
    253         index: integer of USB type-C port index.
    254 
    255     Public Methods:
    256         is_dfp: Determine if data role is Downstream Facing Port (DFP).
    257         is_amode_supported: Check if alternate mode is supported by port.
    258         is_amode_entered: Check if alternate mode is entered.
    259         set_amode: Set an alternate mode.
    260 
    261     Private attributes:
    262         _port: integer of USB type-C port id.
    263         _port_info: holds usbpd protocol info.
    264         _amodes: holds alternate mode info.
    265 
    266     Private methods:
    267         _invalidate_port_data: Remove port data to force re-eval.
    268         _get_port_info: Get USB-PD port info.
    269         _get_amodes: parse and return port's svid info.
    270     """
    271     def __init__(self, index):
    272         """Constructor.
    273 
    274         @param index: integer of USB type-C port index.
    275         """
    276         self.index = index
    277         # TODO(crosbug.com/p/38133) target= only works for samus
    278         super(EC_USBPD_Port, self).__init__(target='cros_pd')
    279 
    280         # Interrogate port at instantiation.  Use invalidate to force re-eval.
    281         self._port_info = self._get_port_info()
    282         self._amodes = self._get_amodes()
    283 
    284     def _invalidate_port_data(self):
    285         """Remove port data to force re-eval."""
    286         self._port_info = None
    287         self._amodes = None
    288 
    289     def _get_port_info(self):
    290         """Get USB-PD port info.
    291 
    292         ectool command usbpd provides the following information about the port:
    293           - Enabled/Disabled
    294           - Power & Data Role
    295           - Polarity
    296           - Protocol State
    297 
    298         At time of authoring it looks like:
    299           Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY
    300 
    301         @raises error.TestError if ...
    302           port info not parseable.
    303 
    304         @returns dictionary for <port> with keyval pairs:
    305           enabled: True | False | None
    306           power_role: sink | source | None
    307           data_role: UFP | DFP | None
    308           is_reversed: True | False | None
    309           state: various strings | None
    310         """
    311         PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \
    312                        'Polarity:CC(\d+)\s+State:(\w+)'
    313 
    314         match = re.search(PORT_INFO_RE,
    315                           self.ec_command("usbpd %s" % (self.index)))
    316         if not match or int(match.group(1)) != self.index:
    317             error.TestError('Unable to determine port %d info' % self.index)
    318 
    319         pinfo = dict(enabled=None, power_role=None, data_role=None,
    320                     is_reversed=None, state=None)
    321         pinfo['enabled'] = match.group(2) == 'enabled'
    322         pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source'
    323         pinfo['data_role'] = match.group(4)
    324         pinfo['is_reversed'] = True if match.group(5) == '2' else False
    325         pinfo['state'] = match.group(6)
    326         logging.debug('port_info = %s', pinfo)
    327         return pinfo
    328 
    329     def _get_amodes(self):
    330         """Parse alternate modes from pdgetmode.
    331 
    332         Looks like ...
    333           *SVID:0xff01 *0x00000485  0x00000000 ...
    334           SVID:0x18d1   0x00000001  0x00000000 ...
    335 
    336         @returns dictionary of format:
    337           <svid>: {active: True|False, configs: <config_list>, opos:<opos>}
    338             where:
    339               <svid>        : USB-IF Standard or vendor id as
    340                               hex string (i.e. 0xff01)
    341               <config_list> : list of uint32_t configs
    342               <opos>        : integer of active object position.
    343                               Note, this is the config list index + 1
    344         """
    345         SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)'
    346         svids = dict()
    347         cmd = 'pdgetmode %d' % self.index
    348         for line in self.ec_command(cmd, ignore_status=True).split('\n'):
    349             if line.strip() == '':
    350                 continue
    351             logging.debug('pdgetmode line: %s', line)
    352             match = re.search(SVID_RE, line)
    353             if not match:
    354                 logging.warning("Unable to parse SVID line %s", line)
    355                 continue
    356             active = match.group(1) == '*'
    357             svid = match.group(2)
    358             configs_str = match.group(3)
    359             configs = list()
    360             opos = None
    361             for i,config in enumerate(configs_str.split(), 1):
    362                 if config.startswith('*'):
    363                     opos = i
    364                     config = config[1:]
    365                 config = int(config, 16)
    366                 # ignore unpopulated configs
    367                 if config == 0:
    368                     continue
    369                 configs.append(config)
    370             svids[svid] = dict(active=active, configs=configs, opos=opos)
    371 
    372         logging.debug("Port %d svids = %s", self.index, svids)
    373         return svids
    374 
    375     def is_dfp(self):
    376         """Determine if data role is Downstream Facing Port (DFP).
    377 
    378         @returns True if DFP False otherwise.
    379         """
    380         if self._port_info is None:
    381             self._port_info = self._get_port_info()
    382 
    383         return self._port_info['data_role'] == 'DFP'
    384 
    385     def is_amode_supported(self, svid):
    386         """Check if alternate mode is supported by port partner.
    387 
    388         @param svid: alternate mode SVID hexstring (i.e. 0xff01)
    389         """
    390         if self._amodes is None:
    391             self._amodes = self._get_amodes()
    392 
    393         if svid in self._amodes.keys():
    394             return True
    395         return False
    396 
    397     def is_amode_entered(self, svid, opos):
    398         """Check if alternate mode is entered.
    399 
    400         @param svid: alternate mode SVID hexstring (i.e. 0xff01).
    401         @param opos: object position of config to act on.
    402 
    403         @returns True if entered False otherwise
    404         """
    405         if self._amodes is None:
    406             self._amodes = self._get_amodes()
    407 
    408         if not self.is_amode_supported(svid):
    409             return False
    410 
    411         if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos:
    412             return True
    413 
    414         return False
    415 
    416     def set_amode(self, svid, opos, enter, delay_secs=2):
    417         """Set alternate mode.
    418 
    419         @param svid: alternate mode SVID hexstring (i.e. 0xff01).
    420         @param opos: object position of config to act on.
    421         @param enter: Boolean of whether to enter mode.
    422 
    423         @raises error.TestError if ...
    424            mode not supported.
    425            opos is > number of configs.
    426 
    427         @returns True if successful False otherwise
    428         """
    429         if self._amodes is None:
    430             self._amodes = self._get_amodes()
    431 
    432         if svid not in self._amodes.keys():
    433             raise error.TestError("SVID %s not supported", svid)
    434 
    435         if opos > len(self._amodes[svid]['configs']):
    436             raise error.TestError("opos > available configs")
    437 
    438         cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos,
    439                                          1 if enter else 0)
    440         self.ec_command(cmd, ignore_status=True)
    441         self._invalidate_port_data()
    442 
    443         # allow some time for mode entry/exit
    444         time.sleep(delay_secs)
    445         return self.is_amode_entered(svid, opos) == enter
    446 
    447     def get_flash_info(self):
    448         mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*'
    449         mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*'
    450         flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major',
    451                                     'dev_minor', 'rw_hash', 'image_status'])
    452 
    453         cmd = 'infopddev %d' % self.index
    454 
    455         tries = 3
    456         while (tries):
    457             res = self.ec_command(cmd, ignore_status=True)
    458             if not 'has no discovered device' in res:
    459                 break
    460 
    461             tries -= 1
    462             time.sleep(1)
    463 
    464         for ln in res.split('\n'):
    465             mat1 = re.match(mat1_re, ln)
    466             if mat1:
    467                 flash_dict['ptype'] = int(mat1.group(1))
    468                 flash_dict['vid'] = mat1.group(2)
    469                 flash_dict['pid'] = mat1.group(3)
    470                 continue
    471 
    472             mat2 = re.match(mat2_re, ln)
    473             if mat2:
    474                 flash_dict['dev_major'] = int(mat2.group(1))
    475                 flash_dict['dev_minor'] = int(mat2.group(2))
    476                 flash_dict['rw_hash'] = mat2.group(3)
    477                 flash_dict['image_status'] = mat2.group(4)
    478                 break
    479 
    480         return flash_dict
    481 
    482 
    483 class EC_USBPD(EC_Common):
    484     """Class for CrOS embedded controller for USB-PD.
    485 
    486     Public attributes:
    487         ports: list EC_USBPD_Port instances
    488 
    489     Public Methods:
    490         get_num_ports: get number of USB-PD ports device has.
    491 
    492     Private attributes:
    493         _num_ports: integer number of USB-PD ports device has.
    494     """
    495     def __init__(self, num_ports=None):
    496         """Constructor.
    497 
    498         @param num_ports: total number of USB-PD ports on device.  This is an
    499           override.  If left 'None' will try to determine.
    500         """
    501         self._num_ports = num_ports
    502         self.ports = list()
    503 
    504         # TODO(crosbug.com/p/38133) target= only works for samus
    505         super(EC_USBPD, self).__init__(target='cros_pd')
    506 
    507         if (self.get_num_ports() == 0):
    508             raise error.TestNAError("Device has no USB-PD ports")
    509 
    510         for i in xrange(self._num_ports):
    511             self.ports.append(EC_USBPD_Port(i))
    512 
    513     def get_num_ports(self):
    514         """Determine the number of ports for device.
    515 
    516         Uses ectool's usbpdpower command which in turn makes host command call
    517         to EC_CMD_USB_PD_PORTS to determine the number of ports.
    518 
    519         TODO(tbroch) May want to consider adding separate ectool command to
    520         surface the number of ports directly instead of via usbpdpower
    521 
    522         @returns number of ports.
    523         """
    524         if (self._num_ports is not None):
    525             return self._num_ports
    526 
    527         self._num_ports = len(self.ec_command("usbpdpower").split(b'\n'))
    528         return self._num_ports
    529