Home | History | Annotate | Download | only in firmware_PDDataSwap
      1 # Copyright 2016 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import re
      7 import time
      8 
      9 from autotest_lib.client.common_lib import error
     10 from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
     11 from autotest_lib.server.cros.servo import pd_console
     12 
     13 
     14 class firmware_PDDataSwap(FirmwareTest):
     15     """
     16     Servo based USB PD data role swap test
     17 
     18     Pass critera is all data role swaps complete, or
     19     a reject control message is received from the DUT in the
     20     cases where the swap does not complete.
     21 
     22     """
     23     version = 1
     24 
     25     PD_ROLE_DELAY = 0.5
     26     PD_CONNECT_DELAY = 4
     27     PLANKTON_PORT = 0
     28     DATA_SWAP_ITERATIONS = 10
     29     # Upward facing port data role
     30     UFP = 'UFP'
     31     # Downward facing port data role
     32     DFP = 'DFP'
     33     # Plankton initiated data swap request
     34     PLANKTON_SWAP_REQ = 'pd %d swap data' % PLANKTON_PORT
     35     # Swap Result Tables
     36     swap_attempt = {
     37         ('rx', DFP): 0,
     38         ('rx', UFP): 0,
     39         ('tx', DFP): 0,
     40         ('tx', UFP): 0
     41     }
     42     swap_failure = {
     43         ('rx', DFP): 0,
     44         ('rx', UFP): 0,
     45         ('tx', DFP): 0,
     46         ('tx', UFP): 0
     47     }
     48 
     49     def _verify_plankton_connection(self, port):
     50         """Verify if DUT to Plankton PD connection
     51 
     52         This method checks for a Plankton PD connection for the
     53         given port by first verifying if a PD connection is present.
     54         If found, then it uses a Plankton feature to force a PD disconnect.
     55         If the port is no longer in the connected state, and following
     56         a delay, is found to be back in the connected state, then
     57         a DUT pd to Plankton connection is verified.
     58 
     59         @param port: DUT pd port to test
     60 
     61         @returns True if DUT to Plankton pd connection is verified
     62         """
     63         DISCONNECT_TIME_SEC = 2
     64         # plankton console command to force PD disconnect
     65         disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC*1000)
     66         # Only check for Plankton if DUT has active PD connection
     67         if self.dut_pd_utils.is_pd_connected(port):
     68             # Attempt to force PD disconnection
     69             self.plankton_pd_utils.send_pd_command(disc_cmd)
     70             time.sleep(self.PD_ROLE_DELAY)
     71             # Verify that DUT PD port is no longer connected
     72             if self.dut_pd_utils.is_pd_connected(port) == False:
     73                 # Wait for disconnect timer and give time to reconnect
     74                 time.sleep(self.PD_CONNECT_DELAY + DISCONNECT_TIME_SEC)
     75                 if self.dut_pd_utils.is_pd_connected(port):
     76                     logging.info('Plankton connection verfied on port %d', port)
     77                     return True
     78             else:
     79                 # Could have disconnected other port, allow it to reconnect
     80                 # before exiting.
     81                 time.sleep(self.PD_CONNECT_DELAY + DISCONNECT_TIME_SEC)
     82         return False
     83 
     84     def _find_dut_to_plankton_connection(self):
     85         """Find the PD port which is connected to Plankton
     86 
     87         @returns DUT pd port number if found, None otherwise
     88         """
     89         for port in xrange(self.dut_pd_utils.PD_MAX_PORTS):
     90             # Check for DUT to Plankton connection on port
     91             if self._verify_plankton_connection(port):
     92                 # Plankton PD connection found so exit
     93                 return port
     94         return None
     95 
     96     def _get_data_role(self, console, port):
     97         """Get data role of PD connection
     98 
     99         @param console: pd console object for uart access
    100         @param port: 0/1 pd port of current connection
    101 
    102         @returns: 'DFP' or 'UFP'
    103         """
    104         role = console.get_pd_role(port)
    105         m = re.search('[\w]+-([\w]+)', role)
    106         return m.group(1)
    107 
    108     def _get_remote_role(self, local_role):
    109         """Invert data role
    110 
    111         @param local_role: data role to be flipped
    112 
    113         @returns: flipped data role value
    114         """
    115         if local_role == self.DFP:
    116             return self.UFP
    117         else:
    118             return self.DFP
    119 
    120     def _change_dut_power_role(self, port):
    121         """Force power role change via Plankton
    122 
    123         @param port: port of DUT PD connection
    124 
    125         @returns True is power role change is successful
    126         """
    127         PLANKTON_SRC_VOLTAGE = 5
    128         PLANKTON_SNK_VOLTAGE = 0
    129         pd_state = self.dut_pd_utils.get_pd_state(port)
    130         if pd_state == self.dut_pd_utils.SRC_CONNECT:
    131             # DUT is currently a SRC, so change to SNK
    132             # Use Plankton method to ensure power role change
    133             self.plankton.charge(PLANKTON_SRC_VOLTAGE)
    134         else:
    135             # DUT is currently a SNK, so change it to a SRC.
    136             self.plankton.charge(PLANKTON_SNK_VOLTAGE)
    137         # Wait for change to take place
    138         time.sleep(self.PD_CONNECT_DELAY)
    139         plankton_state = self.plankton_pd_utils.get_pd_state(0)
    140         # Current Plankton state should equal DUT state when called
    141         return bool(pd_state == plankton_state)
    142 
    143     def _send_data_swap_get_reply(self, console, port):
    144         """Send data swap request, get PD control msg reply
    145 
    146         The PD console debug mode is enabled prior to sending
    147         a pd data role swap request message. This allows the
    148         control message reply to be extracted. The debug mode
    149         is disabled prior to exiting.
    150 
    151         @param console: pd console object for uart access
    152 
    153         @ returns: PD control header message
    154         """
    155         # Enable PD console debug mode to show control messages
    156         console.enable_pd_console_debug()
    157         cmd = 'pd %d swap data' % port
    158         m = console.send_pd_command_get_output(cmd, ['RECV\s([\w]+)'])
    159         ctrl_msg = int(m[0][1], 16) & console.PD_CONTROL_MSG_MASK
    160         console.disable_pd_console_debug()
    161         return ctrl_msg
    162 
    163     def _attempt_data_swap(self, pd_port, direction):
    164         """Perform a data role swap request
    165 
    166         Data swap requests can be either initiated by the DUT or received
    167         by the DUT. This direction determines which PD console is used
    168         to initiate the swap command. The data role before and after
    169         the swap command are compared to determine if it took place.
    170 
    171         Even if data swap capability is advertised, a PD device is allowed
    172         to reject the request. Therefore, not swapping isn't itself a
    173         failure. When Plankton is used to initate the request, the debug
    174         mode is enabled which allows the control message from the DUT to
    175         be analyzed. If the swap does not occur, but the request is rejected
    176         by the DUT then that is not counted as a failure.
    177 
    178         @param pd_port: DUT pd port value 0/1
    179         @param direction: rx or tx from the DUT perspective
    180 
    181         @returns PD control reply message for tx swaps, 0 otherwise
    182         """
    183         # Get starting DUT data role
    184         dut_dr = self._get_data_role(self.dut_pd_utils, pd_port)
    185         self.swap_attempt[(direction, dut_dr)] += 1
    186         if direction == 'tx':
    187             # Initiate swap request from the DUT
    188             console = self.dut_pd_utils
    189             cmd = 'pd %d swap data' % pd_port
    190             # Send the 'swap data' command
    191             self.dut_pd_utils.send_pd_command(cmd)
    192             # Not using debug mode, so there is no reply message
    193             ctrl = 0
    194         else:
    195             # Initiate swap request from Plankton
    196             console = self.plankton_pd_utils
    197             ctrl  = self._send_data_swap_get_reply(console, self.PLANKTON_PORT)
    198 
    199         time.sleep(self.PD_ROLE_DELAY)
    200         # Get DUT current data role
    201         swap_dr = self._get_data_role(self.dut_pd_utils, pd_port)
    202         logging.info('%s swap attempt: prev = %s, new = %s, msg = %s',
    203                       direction, dut_dr, swap_dr, ctrl)
    204         if (dut_dr == swap_dr and
    205                 ctrl != self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']):
    206             self.swap_failure[(direction, dut_dr)] += 1
    207         return ctrl
    208 
    209     def _execute_data_role_swap_test(self, pd_port):
    210         """Execute a series of data role swaps
    211 
    212         Attempt both rx and tx data swaps, from perspective of DUT.
    213         Even if the DUT advertises support, it can
    214         reject swap requests when already in the desired data role. For
    215         example many devices will not swap if already in DFP mode.
    216         However, Plankton should always accept a request. Therefore,
    217         when a swap failed on a rx swap, then that is followed by
    218         a tx swap attempt.
    219 
    220         @param pd_port: port number of DUT PD connection
    221         """
    222         for attempt in xrange(self.DATA_SWAP_ITERATIONS):
    223             # Use the same direction for every 2 loop iterations
    224             if attempt & 2:
    225                 direction = 'tx'
    226             else:
    227                 direction = 'rx'
    228             ctrl_msg = self._attempt_data_swap(pd_port, direction)
    229             if (direction == 'rx' and
    230                     ctrl_msg ==
    231                     self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']):
    232                 # Use plankton initated swap to change roles
    233                 self._attempt_data_swap(pd_port, 'tx')
    234 
    235     def _test_data_swap_reject(self, pd_port):
    236         """Verify that data swap request is rejected
    237 
    238         This tests the case where the DUT doesn't advertise support
    239         for data swaps. A data request is sent by Plankton, and then
    240         the control message checked to ensure the request was rejected.
    241         In addition, the data role and connection state are verified
    242         to remain unchanged.
    243 
    244         @param pd_port: port for DUT pd connection
    245         """
    246         # Get current DUT data role
    247         dut_data_role = self._get_data_role(self.dut_pd_utils, pd_port)
    248         dut_connect_state = self.dut_pd_utils.get_pd_state(pd_port)
    249         # Send swap command from Plankton and get reply
    250         ctrl_msg = self._send_data_swap_get_reply(self.plankton_pd_utils,
    251                                                   self.PLANKTON_PORT)
    252         if ctrl_msg != self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']:
    253             raise error.TestFail('Data Swap Req not rejected, returned %r' %
    254                                  ctrl_msg)
    255         # Get DUT current state
    256         pd_state = self.dut_pd_utils.get_pd_state(pd_port)
    257         if pd_state != dut_connect_state:
    258             raise error.TestFail('PD not connected! pd_state = %r' %
    259                                  pd_state)
    260         # Since reject message was received, verify data role didn't change
    261         curr_dr = self._get_data_role(self.dut_pd_utils, pd_port)
    262         if curr_dr != dut_data_role:
    263             raise error.TestFail('Unexpected PD data role change')
    264 
    265     def initialize(self, host, cmdline_args):
    266         super(firmware_PDDataSwap, self).initialize(host, cmdline_args)
    267         # Only run in normal mode
    268         self.switcher.setup_mode('normal')
    269         self.usbpd.send_command('chan 0')
    270 
    271     def cleanup(self):
    272         self.usbpd.send_command('chan 0xffffffff')
    273         super(firmware_PDDataSwap, self).cleanup()
    274 
    275     def run_once(self):
    276         """Exectue Data Role swap test.
    277 
    278         1. Verify that pd console is accessible
    279         2. Verify that DUT has a valid PD contract
    280         3. Determine if DUT advertises support for data swaps
    281         4. Test DUT initiated and received data swaps
    282         5. Swap power roles if supported
    283         6. Repeat DUT received data swap requests
    284 
    285         """
    286         # create objects for pd utilities
    287         self.dut_pd_utils = pd_console.PDConsoleUtils(self.usbpd)
    288         self.plankton_pd_utils = pd_console.PDConsoleUtils(self.plankton)
    289 
    290         # Make sure PD support exists in the UART console
    291         if self.dut_pd_utils.verify_pd_console() == False:
    292             raise error.TestFail("pd command not present on console!")
    293 
    294         # Type C connection (PD contract) should exist at this point
    295         # For this test, the DUT must be connected to a Plankton.
    296         pd_port = self._find_dut_to_plankton_connection()
    297         if pd_port == None:
    298             raise error.TestFail("DUT to Plankton PD connection not found")
    299         dut_connect_state = self.dut_pd_utils.get_pd_state(pd_port)
    300         logging.info('Initial DUT connect state = %s', dut_connect_state)
    301 
    302         # Determine if DUT supports data role swaps
    303         dr_swap_allowed = self.plankton_pd_utils.is_pd_flag_set(
    304                 self.PLANKTON_PORT, 'data_swap')
    305         # Get current DUT data role
    306         dut_data_role = self._get_data_role(self.dut_pd_utils, pd_port)
    307         logging.info('Starting DUT Data Role = %r', dut_data_role)
    308 
    309         # If data swaps are not allowed on the DUT, then still
    310         # attempt a data swap and verify that the request is
    311         # rejected by the DUT and that it remains connected and
    312         # in the same role.
    313         if dr_swap_allowed == False:
    314             logging.info('Data Swap support not advertised by DUT')
    315             self._test_data_swap_reject(pd_port)
    316             logging.info('Data Swap request rejected by DUT as expected')
    317         else:
    318             # Data role swap support advertised, test this feature.
    319             self._execute_data_role_swap_test(pd_port)
    320 
    321             # If DUT supports Power Role swap then attempt to change roles.
    322             # This way, data role swaps will be tested in both configurations.
    323             if self.plankton_pd_utils.is_pd_flag_set(
    324                      self.PLANKTON_PORT, 'power_swap'):
    325                 logging.info('\nDUT advertises Power Swap Support')
    326                 # Attempt to swap power roles
    327                 power_swap = self._change_dut_power_role(pd_port)
    328                 if power_swap:
    329                     self._execute_data_role_swap_test(pd_port)
    330                 else:
    331                     logging.warn('Power swap not successful!')
    332                     logging.warn('Only tested with DUT in %s state',
    333                                  dut_connect_state)
    334             else:
    335                 logging.info('DUT does not advertise power swap support')
    336 
    337             logging.info('***************** Swap Results ********************')
    338             total_attempts = 0
    339             total_failures = 0
    340             for direction, role in self.swap_attempt.iterkeys():
    341                 logging.info('%s %s swap attempts = %d, failures = %d',
    342                              direction, role,
    343                              self.swap_attempt[(direction, role)],
    344                              self.swap_failure[(direction, role)])
    345                 total_attempts += self.swap_attempt[(direction, role)]
    346                 total_failures += self.swap_failure[(direction, role)]
    347 
    348             # If any swap attempts were not successful, flag test as failure
    349             if total_failures:
    350                 raise error.TestFail('Data Swap Fail: Attempt = %d, Failure = %d' %
    351                                  (total_attempts, total_failures))
    352