Home | History | Annotate | Download | only in servo
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import re
      6 import logging
      7 import time
      8 
      9 from autotest_lib.client.common_lib import error
     10 
     11 class PDConsoleUtils(object):
     12     """ Provides a set of methods common to USB PD FAFT tests
     13 
     14     Each instance of this class is associated with a particular
     15     servo UART console. USB PD tests will typically use the console
     16     command 'pd' and its subcommands to control/monitor Type C PD
     17     connections. The servo object used for UART operations is
     18     passed in and stored when this object is created.
     19 
     20     """
     21 
     22     SRC_CONNECT = 'SRC_READY'
     23     SNK_CONNECT = 'SNK_READY'
     24     SRC_DISC = 'SRC_DISCONNECTED'
     25     SNK_DISC = 'SNK_DISCONNECTED'
     26     PD_MAX_PORTS = 2
     27     CONNECT_TIME = 4
     28 
     29     # dualrole input/ouput values
     30     DUALROLE_QUERY_DELAY = 0.25
     31     dual_index = {'on': 0, 'off': 1, 'snk': 2, 'src': 3}
     32     dualrole_cmd = ['on', 'off', 'sink', 'source']
     33     dualrole_resp = ['on', 'off', 'force sink', 'force source']
     34 
     35     # Dictionary for 'pd 0/1 state' parsing
     36     PD_STATE_DICT = {
     37         'port': 'Port\s+([\w]+)',
     38         'role': 'Role:\s+([\w]+-[\w]+)',
     39         'pd_state': 'State:\s+([\w]+_[\w]+)',
     40         'flags': 'Flags:\s+([\w]+)',
     41         'polarity': '(CC\d)'
     42     }
     43 
     44     # Dictionary for PD control message types
     45     PD_CONTROL_MSG_MASK = 0x1f
     46     PD_CONTROL_MSG_DICT = {
     47         'GoodCRC': 1,
     48         'GotoMin': 2,
     49         'Accept': 3,
     50         'Reject': 4,
     51         'Ping': 5,
     52         'PS_RDY': 6,
     53         'Get_Source_Cap': 7,
     54         'Get_Sink_Cap': 8,
     55         'DR_Swap': 9,
     56         'PR_Swap': 10,
     57         'VCONN_Swap': 11,
     58         'Wait': 12,
     59         'Soft_Reset': 13
     60     }
     61 
     62     # Dictionary for PD firmware state flags
     63     PD_STATE_FLAGS_DICT = {
     64         'power_swap': 1 << 1,
     65         'data_swap': 1 << 2,
     66         'data_swap_active': 1 << 3,
     67         'vconn_on': 1 << 12
     68     }
     69 
     70     def __init__(self, console):
     71         """ Console can be either usbpd, ec, or plankton_ec UART
     72         This object with then be used by the class which creates
     73         the PDConsoleUtils class to send/receive commands to UART
     74         """
     75         # save console for UART access functions
     76         self.console = console
     77 
     78     def send_pd_command(self, cmd):
     79         """Send command to PD console UART
     80 
     81         @param cmd: pd command string
     82         """
     83         self.console.send_command(cmd)
     84 
     85     def send_pd_command_get_output(self, cmd, regexp):
     86         """Send command to PD console, wait for response
     87 
     88         @param cmd: pd command string
     89         @param regexp: regular expression for desired output
     90         """
     91         return self.console.send_command_get_output(cmd, regexp)
     92 
     93     def send_pd_command_get_reply_msg(self, cmd):
     94         """Send PD protocol msg, get PD control msg reply
     95 
     96         The PD console debug mode is enabled prior to sending
     97         a pd protocol message. This allows the
     98         control message reply to be extracted. The debug mode
     99         is disabled prior to exiting.
    100 
    101         @param cmd: pd command to issue to the UART console
    102 
    103         @returns: PD control header message
    104         """
    105         # Enable PD console debug mode to show control messages
    106         self.enable_pd_console_debug()
    107         m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)'])
    108         ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK
    109         self.disable_pd_console_debug()
    110         return ctrl_msg
    111 
    112     def verify_pd_console(self):
    113         """Verify that PD commands exist on UART console
    114 
    115         Send 'help' command to UART console
    116         @returns: True if 'pd' is found, False if not
    117         """
    118 
    119         l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
    120         if l[0][1] == 'pd':
    121             return True
    122         else:
    123             return False
    124 
    125     def execute_pd_state_cmd(self, port):
    126         """Get PD state for specified channel
    127 
    128         pd 0/1 state command gives produces 5 fields. The full response
    129         line is captured and then parsed to extract each field to fill
    130         the dict containing port, polarity, role, pd_state, and flags.
    131 
    132         @param port: Type C PD port 0 or 1
    133 
    134         @returns: A dict with the 5 fields listed above
    135         """
    136         cmd = 'pd'
    137         subcmd = 'state'
    138         pd_cmd = cmd +" " + str(port) + " " + subcmd
    139         # Two FW versions for this command, get full line.
    140         m = self.send_pd_command_get_output(pd_cmd,
    141                                             ['(Port.*) - (Role:.*)\n'])
    142 
    143         # Extract desired values from result string
    144         state_result = {}
    145         for key, regexp in self.PD_STATE_DICT.iteritems():
    146             value = re.search(regexp, m[0][0])
    147             if value:
    148                 state_result[key] = value.group(1)
    149             else:
    150                 raise error.TestFail('pd 0/1 state: %r value not found' % (key))
    151 
    152         return state_result
    153 
    154     def get_pd_state(self, port):
    155         """Get the current PD state
    156 
    157         @param port: Type C PD port 0/1
    158         @returns: current pd state
    159         """
    160 
    161         pd_dict = self.execute_pd_state_cmd(port)
    162         return pd_dict['pd_state']
    163 
    164     def get_pd_port(self, port):
    165         """Get the current PD port
    166 
    167         @param port: Type C PD port 0/1
    168         @returns: current pd state
    169         """
    170         pd_dict = self.execute_pd_state_cmd(port)
    171         return pd_dict['port']
    172 
    173     def get_pd_role(self, port):
    174         """Get the current PD power role (source or sink)
    175 
    176         @param port: Type C PD port 0/1
    177         @returns: current pd state
    178         """
    179         pd_dict = self.execute_pd_state_cmd(port)
    180         return pd_dict['role']
    181 
    182     def get_pd_flags(self, port):
    183         """Get the current PD flags
    184 
    185         @param port: Type C PD port 0/1
    186         @returns: current pd state
    187         """
    188         pd_dict = self.execute_pd_state_cmd(port)
    189         return pd_dict['flags']
    190 
    191     def get_pd_dualrole(self):
    192         """Get the current PD dualrole setting
    193 
    194         @returns: current PD dualrole setting
    195         """
    196         cmd = 'pd dualrole'
    197         dual_list = self.send_pd_command_get_output(cmd,
    198                                 ['dual-role toggling:\s+([\w ]+)'])
    199         return dual_list[0][1]
    200 
    201     def set_pd_dualrole(self, value):
    202         """Set pd dualrole
    203 
    204         It can be set to either:
    205         1. on
    206         2. off
    207         3. snk (force sink mode)
    208         4. src (force source mode)
    209         After setting, the current value is read to confirm that it
    210         was set properly.
    211 
    212         @param value: One of the 4 options listed
    213         """
    214         # Get string required for console command
    215         dual_index = self.dual_index[value]
    216         # Create console command
    217         cmd = 'pd dualrole ' + self.dualrole_cmd[dual_index]
    218         self.console.send_command(cmd)
    219         time.sleep(self.DUALROLE_QUERY_DELAY)
    220         # Get current setting to verify that command was successful
    221         dual = self.get_pd_dualrole()
    222         # If it doesn't match, then raise error
    223         if dual != self.dualrole_resp[dual_index]:
    224             raise error.TestFail("dualrole error: " +
    225                                  self.dualrole_resp[dual_index] + " != "+dual)
    226 
    227     def query_pd_connection(self):
    228         """Determine if PD connection is present
    229 
    230         Try the 'pd 0/1 state' command and see if it's in either
    231         expected state of a conneciton. Record the port number
    232         that has an active connection
    233 
    234         @returns: dict with params port, connect, and state
    235         """
    236         status = {}
    237         port = 0;
    238         status['connect'] = False
    239         status['port'] = port
    240         state = self.get_pd_state(port)
    241         # Check port 0 first
    242         if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
    243             status['connect'] = True
    244             status['role'] = state
    245         else:
    246             port = 1
    247             status['port'] = port
    248             state = self.get_pd_state(port)
    249             # Check port 1
    250             if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
    251                 status['connect'] = True
    252                 status['role'] = state
    253 
    254         return status
    255 
    256     def swap_power_role(self, port):
    257         """Attempt a power role swap
    258 
    259         This method attempts to execute a power role swap. A check
    260         is made to ensure that dualrole mode is enabled and that
    261         a PD contract is currently established. If both checks pass,
    262         then the power role swap command is issued. After a delay,
    263         if a PD contract is established and the current state does
    264         not equal the starting state, then it was successful.
    265 
    266         @param port: pd port number
    267 
    268         @returns: True if power swap is successful, False otherwise.
    269         """
    270         # Get starting state
    271         if self.is_pd_dual_role_enabled() == False:
    272             logging.info('Dualrole Mode not enabled!')
    273             return False
    274         if self.is_pd_connected(port) == False:
    275             logging.info('PD contract not established!')
    276             return False
    277         current_pr = self.get_pd_state(port)
    278         swap_cmd = 'pd %d swap power' % port
    279         self.send_pd_command(swap_cmd)
    280         time.sleep(self.CONNECT_TIME)
    281         new_pr = self.get_pd_state(port)
    282         logging.info('Power swap: %s -> %s', current_pr, new_pr)
    283         if self.is_pd_connected(port) == False:
    284             return False
    285         return bool(current_pr != new_pr)
    286 
    287     def disable_pd_console_debug(self):
    288         """Turn off PD console debug
    289 
    290         """
    291         cmd = 'pd dump 0'
    292         self.send_pd_command(cmd)
    293 
    294     def enable_pd_console_debug(self):
    295         """Enable PD console debug level 1
    296 
    297         """
    298         cmd = 'pd dump 1'
    299         self.send_pd_command(cmd)
    300 
    301     def is_pd_flag_set(self, port, key):
    302         """Test a bit in PD protocol state flags
    303 
    304         The flag word contains various PD protocol state information.
    305         This method allows for a specific flag to be tested.
    306 
    307         @param port: Port which has the active PD connection
    308         @param key: dict key to retrieve the flag bit mapping
    309 
    310         @returns True if the bit to be tested is set
    311         """
    312         pd_flags = self.get_pd_flags(port)
    313         return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16))
    314 
    315     def is_pd_connected(self, port):
    316         """Check if a PD connection is active
    317 
    318         @param port: port to be used for pd console commands
    319 
    320         @returns True if port is in connected state
    321         """
    322         state = self.get_pd_state(port)
    323         return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT)
    324 
    325     def is_pd_dual_role_enabled(self):
    326         """Check if a PD device is in dualrole mode
    327 
    328         @returns True is dualrole mode is active, false otherwise
    329         """
    330         drp = self.get_pd_dualrole()
    331         return bool(drp == self.dualrole_resp[self.dual_index['on']])
    332 
    333 
    334 class PDConnectionUtils(PDConsoleUtils):
    335     """Provides a set of methods common to USB PD FAFT tests
    336 
    337     This Class is used for PD utility methods that require access
    338     to both Plankton and DUT PD consoles.
    339 
    340     """
    341 
    342     def __init__(self, dut_console, plankton_console):
    343         """
    344         @param dut_console: PD console object for DUT
    345         @param plankton_console: PD console object for Plankton
    346         """
    347         # save console for DUT PD UART access functions
    348         self.dut_console = dut_console
    349         # save console for Plankton UART access functions
    350         self.plankton_console = plankton_console
    351         super(PDConnectionUtils, self).__init__(dut_console)
    352 
    353     def _verify_plankton_connection(self, port):
    354         """Verify DUT to Plankton PD connection
    355 
    356         This method checks for a Plankton PD connection for the
    357         given port by first verifying if a PD connection is present.
    358         If found, then it uses a Plankton feature to force a PD disconnect.
    359         If the port is no longer in the connected state, and following
    360         a delay, is found to be back in the connected state, then
    361         a DUT pd to Plankton connection is verified.
    362 
    363         @param port: DUT pd port to test
    364 
    365         @returns True if DUT to Plankton pd connection is verified
    366         """
    367         DISCONNECT_CHECK_TIME = 0.5
    368         DISCONNECT_TIME_SEC = 2
    369         # plankton console command to force PD disconnect
    370         disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000)
    371         # Only check for Plankton if DUT has active PD connection
    372         if self.dut_console.is_pd_connected(port):
    373             # Attempt to force PD disconnection
    374             self.plankton_console.send_pd_command(disc_cmd)
    375             time.sleep(DISCONNECT_CHECK_TIME)
    376             # Verify that DUT PD port is no longer connected
    377             if self.dut_console.is_pd_connected(port) == False:
    378                 # Wait for disconnect timer and give time to reconnect
    379                 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
    380                 if self.dut_console.is_pd_connected(port):
    381                     logging.info('Plankton connection verified on port %d',
    382                                  port)
    383                     return True
    384             else:
    385                 # Could have disconnected other port, allow it to reconnect
    386                 # before exiting.
    387                 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
    388         return False
    389 
    390     def find_dut_to_plankton_connection(self):
    391         """Find the PD port which is connected to Plankton
    392 
    393         @returns DUT pd port number if found, None otherwise
    394         """
    395         for port in xrange(self.dut_console.PD_MAX_PORTS):
    396             # Check for DUT to Plankton connection on port
    397             if self._verify_plankton_connection(port):
    398                 # Plankton PD connection found so exit
    399                 return port
    400         return None
    401