1 # Copyright 2015 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import re 6 import logging 7 import time 8 9 from autotest_lib.client.common_lib import error 10 11 class PDConsoleUtils(object): 12 """ Provides a set of methods common to USB PD FAFT tests 13 14 Each instance of this class is associated with a particular 15 servo UART console. USB PD tests will typically use the console 16 command 'pd' and its subcommands to control/monitor Type C PD 17 connections. The servo object used for UART operations is 18 passed in and stored when this object is created. 19 20 """ 21 22 SRC_CONNECT = 'SRC_READY' 23 SNK_CONNECT = 'SNK_READY' 24 SRC_DISC = 'SRC_DISCONNECTED' 25 SNK_DISC = 'SNK_DISCONNECTED' 26 PD_MAX_PORTS = 2 27 CONNECT_TIME = 4 28 29 # dualrole input/ouput values 30 DUALROLE_QUERY_DELAY = 0.25 31 dual_index = {'on': 0, 'off': 1, 'snk': 2, 'src': 3} 32 dualrole_cmd = ['on', 'off', 'sink', 'source'] 33 dualrole_resp = ['on', 'off', 'force sink', 'force source'] 34 35 # Dictionary for 'pd 0/1 state' parsing 36 PD_STATE_DICT = { 37 'port': 'Port\s+([\w]+)', 38 'role': 'Role:\s+([\w]+-[\w]+)', 39 'pd_state': 'State:\s+([\w]+_[\w]+)', 40 'flags': 'Flags:\s+([\w]+)', 41 'polarity': '(CC\d)' 42 } 43 44 # Dictionary for PD control message types 45 PD_CONTROL_MSG_MASK = 0x1f 46 PD_CONTROL_MSG_DICT = { 47 'GoodCRC': 1, 48 'GotoMin': 2, 49 'Accept': 3, 50 'Reject': 4, 51 'Ping': 5, 52 'PS_RDY': 6, 53 'Get_Source_Cap': 7, 54 'Get_Sink_Cap': 8, 55 'DR_Swap': 9, 56 'PR_Swap': 10, 57 'VCONN_Swap': 11, 58 'Wait': 12, 59 'Soft_Reset': 13 60 } 61 62 # Dictionary for PD firmware state flags 63 PD_STATE_FLAGS_DICT = { 64 'power_swap': 1 << 1, 65 'data_swap': 1 << 2, 66 'data_swap_active': 1 << 3, 67 'vconn_on': 1 << 12 68 } 69 70 def __init__(self, console): 71 """ Console can be either usbpd, ec, or plankton_ec UART 72 This object with then be used by the class which creates 73 the PDConsoleUtils class to send/receive commands to UART 74 """ 75 # save console for UART access functions 76 self.console = console 77 78 def send_pd_command(self, cmd): 79 """Send command to PD console UART 80 81 @param cmd: pd command string 82 """ 83 self.console.send_command(cmd) 84 85 def send_pd_command_get_output(self, cmd, regexp): 86 """Send command to PD console, wait for response 87 88 @param cmd: pd command string 89 @param regexp: regular expression for desired output 90 """ 91 return self.console.send_command_get_output(cmd, regexp) 92 93 def send_pd_command_get_reply_msg(self, cmd): 94 """Send PD protocol msg, get PD control msg reply 95 96 The PD console debug mode is enabled prior to sending 97 a pd protocol message. This allows the 98 control message reply to be extracted. The debug mode 99 is disabled prior to exiting. 100 101 @param cmd: pd command to issue to the UART console 102 103 @returns: PD control header message 104 """ 105 # Enable PD console debug mode to show control messages 106 self.enable_pd_console_debug() 107 m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)']) 108 ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK 109 self.disable_pd_console_debug() 110 return ctrl_msg 111 112 def verify_pd_console(self): 113 """Verify that PD commands exist on UART console 114 115 Send 'help' command to UART console 116 @returns: True if 'pd' is found, False if not 117 """ 118 119 l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)']) 120 if l[0][1] == 'pd': 121 return True 122 else: 123 return False 124 125 def execute_pd_state_cmd(self, port): 126 """Get PD state for specified channel 127 128 pd 0/1 state command gives produces 5 fields. The full response 129 line is captured and then parsed to extract each field to fill 130 the dict containing port, polarity, role, pd_state, and flags. 131 132 @param port: Type C PD port 0 or 1 133 134 @returns: A dict with the 5 fields listed above 135 """ 136 cmd = 'pd' 137 subcmd = 'state' 138 pd_cmd = cmd +" " + str(port) + " " + subcmd 139 # Two FW versions for this command, get full line. 140 m = self.send_pd_command_get_output(pd_cmd, 141 ['(Port.*) - (Role:.*)\n']) 142 143 # Extract desired values from result string 144 state_result = {} 145 for key, regexp in self.PD_STATE_DICT.iteritems(): 146 value = re.search(regexp, m[0][0]) 147 if value: 148 state_result[key] = value.group(1) 149 else: 150 raise error.TestFail('pd 0/1 state: %r value not found' % (key)) 151 152 return state_result 153 154 def get_pd_state(self, port): 155 """Get the current PD state 156 157 @param port: Type C PD port 0/1 158 @returns: current pd state 159 """ 160 161 pd_dict = self.execute_pd_state_cmd(port) 162 return pd_dict['pd_state'] 163 164 def get_pd_port(self, port): 165 """Get the current PD port 166 167 @param port: Type C PD port 0/1 168 @returns: current pd state 169 """ 170 pd_dict = self.execute_pd_state_cmd(port) 171 return pd_dict['port'] 172 173 def get_pd_role(self, port): 174 """Get the current PD power role (source or sink) 175 176 @param port: Type C PD port 0/1 177 @returns: current pd state 178 """ 179 pd_dict = self.execute_pd_state_cmd(port) 180 return pd_dict['role'] 181 182 def get_pd_flags(self, port): 183 """Get the current PD flags 184 185 @param port: Type C PD port 0/1 186 @returns: current pd state 187 """ 188 pd_dict = self.execute_pd_state_cmd(port) 189 return pd_dict['flags'] 190 191 def get_pd_dualrole(self): 192 """Get the current PD dualrole setting 193 194 @returns: current PD dualrole setting 195 """ 196 cmd = 'pd dualrole' 197 dual_list = self.send_pd_command_get_output(cmd, 198 ['dual-role toggling:\s+([\w ]+)']) 199 return dual_list[0][1] 200 201 def set_pd_dualrole(self, value): 202 """Set pd dualrole 203 204 It can be set to either: 205 1. on 206 2. off 207 3. snk (force sink mode) 208 4. src (force source mode) 209 After setting, the current value is read to confirm that it 210 was set properly. 211 212 @param value: One of the 4 options listed 213 """ 214 # Get string required for console command 215 dual_index = self.dual_index[value] 216 # Create console command 217 cmd = 'pd dualrole ' + self.dualrole_cmd[dual_index] 218 self.console.send_command(cmd) 219 time.sleep(self.DUALROLE_QUERY_DELAY) 220 # Get current setting to verify that command was successful 221 dual = self.get_pd_dualrole() 222 # If it doesn't match, then raise error 223 if dual != self.dualrole_resp[dual_index]: 224 raise error.TestFail("dualrole error: " + 225 self.dualrole_resp[dual_index] + " != "+dual) 226 227 def query_pd_connection(self): 228 """Determine if PD connection is present 229 230 Try the 'pd 0/1 state' command and see if it's in either 231 expected state of a conneciton. Record the port number 232 that has an active connection 233 234 @returns: dict with params port, connect, and state 235 """ 236 status = {} 237 port = 0; 238 status['connect'] = False 239 status['port'] = port 240 state = self.get_pd_state(port) 241 # Check port 0 first 242 if state == self.SRC_CONNECT or state == self.SNK_CONNECT: 243 status['connect'] = True 244 status['role'] = state 245 else: 246 port = 1 247 status['port'] = port 248 state = self.get_pd_state(port) 249 # Check port 1 250 if state == self.SRC_CONNECT or state == self.SNK_CONNECT: 251 status['connect'] = True 252 status['role'] = state 253 254 return status 255 256 def swap_power_role(self, port): 257 """Attempt a power role swap 258 259 This method attempts to execute a power role swap. A check 260 is made to ensure that dualrole mode is enabled and that 261 a PD contract is currently established. If both checks pass, 262 then the power role swap command is issued. After a delay, 263 if a PD contract is established and the current state does 264 not equal the starting state, then it was successful. 265 266 @param port: pd port number 267 268 @returns: True if power swap is successful, False otherwise. 269 """ 270 # Get starting state 271 if self.is_pd_dual_role_enabled() == False: 272 logging.info('Dualrole Mode not enabled!') 273 return False 274 if self.is_pd_connected(port) == False: 275 logging.info('PD contract not established!') 276 return False 277 current_pr = self.get_pd_state(port) 278 swap_cmd = 'pd %d swap power' % port 279 self.send_pd_command(swap_cmd) 280 time.sleep(self.CONNECT_TIME) 281 new_pr = self.get_pd_state(port) 282 logging.info('Power swap: %s -> %s', current_pr, new_pr) 283 if self.is_pd_connected(port) == False: 284 return False 285 return bool(current_pr != new_pr) 286 287 def disable_pd_console_debug(self): 288 """Turn off PD console debug 289 290 """ 291 cmd = 'pd dump 0' 292 self.send_pd_command(cmd) 293 294 def enable_pd_console_debug(self): 295 """Enable PD console debug level 1 296 297 """ 298 cmd = 'pd dump 1' 299 self.send_pd_command(cmd) 300 301 def is_pd_flag_set(self, port, key): 302 """Test a bit in PD protocol state flags 303 304 The flag word contains various PD protocol state information. 305 This method allows for a specific flag to be tested. 306 307 @param port: Port which has the active PD connection 308 @param key: dict key to retrieve the flag bit mapping 309 310 @returns True if the bit to be tested is set 311 """ 312 pd_flags = self.get_pd_flags(port) 313 return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16)) 314 315 def is_pd_connected(self, port): 316 """Check if a PD connection is active 317 318 @param port: port to be used for pd console commands 319 320 @returns True if port is in connected state 321 """ 322 state = self.get_pd_state(port) 323 return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT) 324 325 def is_pd_dual_role_enabled(self): 326 """Check if a PD device is in dualrole mode 327 328 @returns True is dualrole mode is active, false otherwise 329 """ 330 drp = self.get_pd_dualrole() 331 return bool(drp == self.dualrole_resp[self.dual_index['on']]) 332 333 334 class PDConnectionUtils(PDConsoleUtils): 335 """Provides a set of methods common to USB PD FAFT tests 336 337 This Class is used for PD utility methods that require access 338 to both Plankton and DUT PD consoles. 339 340 """ 341 342 def __init__(self, dut_console, plankton_console): 343 """ 344 @param dut_console: PD console object for DUT 345 @param plankton_console: PD console object for Plankton 346 """ 347 # save console for DUT PD UART access functions 348 self.dut_console = dut_console 349 # save console for Plankton UART access functions 350 self.plankton_console = plankton_console 351 super(PDConnectionUtils, self).__init__(dut_console) 352 353 def _verify_plankton_connection(self, port): 354 """Verify DUT to Plankton PD connection 355 356 This method checks for a Plankton PD connection for the 357 given port by first verifying if a PD connection is present. 358 If found, then it uses a Plankton feature to force a PD disconnect. 359 If the port is no longer in the connected state, and following 360 a delay, is found to be back in the connected state, then 361 a DUT pd to Plankton connection is verified. 362 363 @param port: DUT pd port to test 364 365 @returns True if DUT to Plankton pd connection is verified 366 """ 367 DISCONNECT_CHECK_TIME = 0.5 368 DISCONNECT_TIME_SEC = 2 369 # plankton console command to force PD disconnect 370 disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000) 371 # Only check for Plankton if DUT has active PD connection 372 if self.dut_console.is_pd_connected(port): 373 # Attempt to force PD disconnection 374 self.plankton_console.send_pd_command(disc_cmd) 375 time.sleep(DISCONNECT_CHECK_TIME) 376 # Verify that DUT PD port is no longer connected 377 if self.dut_console.is_pd_connected(port) == False: 378 # Wait for disconnect timer and give time to reconnect 379 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) 380 if self.dut_console.is_pd_connected(port): 381 logging.info('Plankton connection verified on port %d', 382 port) 383 return True 384 else: 385 # Could have disconnected other port, allow it to reconnect 386 # before exiting. 387 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) 388 return False 389 390 def find_dut_to_plankton_connection(self): 391 """Find the PD port which is connected to Plankton 392 393 @returns DUT pd port number if found, None otherwise 394 """ 395 for port in xrange(self.dut_console.PD_MAX_PORTS): 396 # Check for DUT to Plankton connection on port 397 if self._verify_plankton_connection(port): 398 # Plankton PD connection found so exit 399 return port 400 return None 401