1 # Copyright (c) 2015 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging 6 import subprocess 7 import tempfile 8 9 from autotest_lib.client.bin import utils 10 from autotest_lib.client.common_lib import error 11 from autotest_lib.client.common_lib.cros.network import iw_runner 12 from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes 13 from autotest_lib.server import test 14 from autotest_lib.server.cros.network import hostap_config 15 from autotest_lib.server.cros.network import wifi_test_context_manager 16 17 18 class network_WiFi_RegDomain(test.test): 19 """Verifies that a DUT connects, or fails to connect, on particular 20 channels, in particular regions, per expectations.""" 21 version = 1 22 23 24 MISSING_SSID = "MissingSsid" 25 # TODO(quiche): Shrink or remove the repeat count, once we've 26 # figured out why tcpdump sometimes misses data. crbug.com/477536 27 PASSIVE_SCAN_REPEAT_COUNT = 30 28 REBOOT_TIMEOUT_SECS = 60 29 # TODO(quiche): Migrate to the shiny new pyshark code from rpius. 30 TSHARK_COMMAND = 'tshark' 31 TSHARK_DISABLE_NAME_RESOLUTION = '-n' 32 TSHARK_READ_FILE = '-r' 33 TSHARK_SRC_FILTER = 'wlan.sa == %s' 34 VPD_CACHE_FILE = \ 35 '/mnt/stateful_partition/unencrypted/cache/vpd/full-v2.txt' 36 VPD_CLEAN_COMMAND ='dump_vpd_log --clean' 37 38 39 @staticmethod 40 def assert_equal(description, actual, expected): 41 """Verifies that |actual| equals |expected|. 42 43 @param description A string describing the data being checked. 44 @param actual The actual value encountered by the test. 45 @param expected The value we expected to encounter. 46 @raise error.TestFail If actual != expected. 47 48 """ 49 if actual != expected: 50 raise error.TestFail( 51 'Expected %s |%s|, but got |%s|.' % 52 (description, expected, actual)) 53 54 55 @staticmethod 56 def phy_list_to_channel_expectations(phy_list): 57 """Maps phy information to expected scanning/connection behavior. 58 59 Converts phy information from iw_runner.IwRunner.list_phys() 60 into a map from channel numbers to expected connection 61 behavior. This mapping is useful for comparison with the 62 expectations programmed into the control file. 63 64 @param phy_list The return value of iw_runner.IwRunner.list_phys() 65 @return A dict from channel numbers to expected behaviors. 66 67 """ 68 channel_to_expectation = {} 69 for phy in phy_list: 70 for band in phy.bands: 71 for frequency, flags in band.frequency_flags.iteritems(): 72 channel = ( 73 hostap_config.HostapConfig.get_channel_for_frequency( 74 frequency)) 75 # While we don't expect a channel to have both 76 # CHAN_FLAG_DISABLED, and (CHAN_FLAG_PASSIVE_SCAN 77 # or CHAN_FLAG_NO_IR), we still test the most 78 # restrictive flag first. 79 if iw_runner.CHAN_FLAG_DISABLED in flags: 80 channel_to_expectation[channel] = 'no-connect' 81 elif (iw_runner.CHAN_FLAG_PASSIVE_SCAN in flags or 82 iw_runner.CHAN_FLAG_NO_IR in flags): 83 channel_to_expectation[channel] = 'passive-scan' 84 else: 85 channel_to_expectation[channel] = 'connect' 86 return channel_to_expectation 87 88 89 @staticmethod 90 def test_connect(wifi_context, frequency, expect_connect, hide_ssid): 91 """Verifies that a DUT does/does not connect on a particular frequency. 92 93 @param wifi_context: A WiFiTestContextManager. 94 @param frequency: int frequency to test. 95 @param expect_connect: bool whether or not connection should succeed. 96 @param hide_ssid: bool whether or not the AP should hide its SSID. 97 @raise error.TestFail if behavior does not match expectation. 98 99 """ 100 try: 101 router_ssid = None 102 if hide_ssid: 103 pcap_name = '%d_connect_hidden.pcap' % frequency 104 test_description = 'hidden' 105 else: 106 pcap_name = '%d_connect_visible.pcap' % frequency 107 test_description = 'visible' 108 wifi_context.router.start_capture(frequency, filename=pcap_name) 109 wifi_context.router.hostap_configure( 110 hostap_config.HostapConfig( 111 frequency=frequency, 112 hide_ssid=hide_ssid, 113 mode=hostap_config.HostapConfig.MODE_11N_MIXED)) 114 router_ssid = wifi_context.router.get_ssid() 115 client_conf = xmlrpc_datatypes.AssociationParameters( 116 ssid=router_ssid, 117 is_hidden=hide_ssid, 118 expect_failure=not expect_connect 119 ) 120 wifi_context.assert_connect_wifi(client_conf, test_description) 121 finally: 122 if router_ssid: 123 wifi_context.client.shill.delete_entries_for_ssid(router_ssid) 124 wifi_context.router.stop_capture() 125 126 127 @classmethod 128 def count_mismatched_phy_configs(cls, dut_host, expected_channel_configs): 129 """Verifies that phys on the DUT place the expected restrictions on 130 channels. 131 132 Compares the restrictions reported by the running system to 133 the restrictions in |expected_channel_configs|. Returns a 134 count of the number of mismatches. 135 136 Note that this method deliberately ignores channels that are 137 reported by the running system, but not mentioned in 138 |expected_channel_configs|. This allows us to program the 139 control file with "spot checks", rather than an exhaustive 140 list of channels. 141 142 @param dut_host The host object for the DUT. 143 @param expected_channel_configs A channel_infos list. 144 @return int count of mismatches 145 146 """ 147 actual_channel_expectations = cls.phy_list_to_channel_expectations( 148 iw_runner.IwRunner(dut_host).list_phys()) 149 mismatches = 0 150 for expected_config in expected_channel_configs: 151 channel = expected_config['chnum'] 152 expected = expected_config['expect'] 153 actual = actual_channel_expectations[channel] 154 if actual != expected: 155 logging.error( 156 'Expected phy config for channel %d of |%s|, but got |%s|.', 157 channel, expected, actual) 158 mismatches += 1 159 return mismatches 160 161 162 @classmethod 163 def assert_scanning_is_passive(cls, client, router, scan_freq): 164 """Initiates single-channel scans, and verifies no probes are sent. 165 166 @param client The WiFiClient object for the DUT. 167 @param router The LinuxCrosRouter object for the router. 168 @param scan_freq The frequency (in MHz) on which to scan. 169 """ 170 try: 171 client.claim_wifi_if() # Stop shill/supplicant scans. 172 router.start_capture( 173 scan_freq, filename='%d_scan.pcap' % scan_freq) 174 for i in range(0, cls.PASSIVE_SCAN_REPEAT_COUNT): 175 # We pass in an SSID here, to check that even hidden 176 # SSIDs do not cause probe requests to be sent. 177 client.scan( 178 [scan_freq], [cls.MISSING_SSID], require_match=False) 179 pcap_path = router.stop_capture()[0].local_pcap_path 180 dut_frames = subprocess.check_output( 181 [cls.TSHARK_COMMAND, 182 cls.TSHARK_DISABLE_NAME_RESOLUTION, 183 cls.TSHARK_READ_FILE, pcap_path, 184 cls.TSHARK_SRC_FILTER % client.wifi_mac]) 185 if len(dut_frames): 186 raise error.TestFail('Saw unexpected frames from DUT.') 187 finally: 188 client.release_wifi_if() 189 router.stop_capture() 190 191 192 @classmethod 193 def assert_scanning_fails(cls, client, scan_freq): 194 """Initiates a single-channel scan, and verifies that it fails. 195 196 @param client The WiFiClient object for the DUT. 197 @param scan_freq The frequency (in MHz) on which to scan. 198 """ 199 client.claim_wifi_if() # Stop shill/supplicant scans. 200 try: 201 # We use IwRunner directly here, because WiFiClient.scan() 202 # wants a scan to succeed, while we want the scan to fail. 203 if iw_runner.IwRunner(client.host).timed_scan( 204 client.wifi_if, [scan_freq], [cls.MISSING_SSID]): 205 # We should have got None, to represent failure. 206 raise error.TestFail( 207 'Scan succeeded (and was expected to fail).') 208 finally: 209 client.release_wifi_if() 210 211 212 def fake_up_region(self, region): 213 """Modifies VPD cache to force a particular region, and reboots system 214 into to faked state. 215 216 @param region: The region we want to force the host into. 217 218 """ 219 self.host.run(self.VPD_CLEAN_COMMAND) 220 temp_vpd = tempfile.NamedTemporaryFile() 221 temp_vpd.write('"region"="%s"' % region) 222 temp_vpd.flush() 223 self.host.send_file(temp_vpd.name, self.VPD_CACHE_FILE) 224 self.host.reboot(timeout=self.REBOOT_TIMEOUT_SECS, wait=True) 225 226 227 def warmup(self, host, raw_cmdline_args, additional_params): 228 """Stashes away parameters for use by run_once(). 229 230 @param host Host object representing the client DUT. 231 @param raw_cmdline_args Raw input from autotest. 232 @param additional_params One item from CONFIGS in control file. 233 234 """ 235 self.host = host 236 self.cmdline_args = utils.args_to_dict(raw_cmdline_args) 237 self.channel_infos = additional_params['channel_infos'] 238 self.expected_country_code = additional_params['country_code'] 239 self.region_name = additional_params['region_name'] 240 241 242 def test_channel(self, wifi_context, channel_config): 243 """Verifies that a DUT's behavior on a channel is per expectations. 244 245 - Verifies that scanning behavior is per expectations. 246 - Verifies that connect behavior is per expectations. 247 - Verifies that connect behavior is the same for hidden networks, 248 as it is for visible networks. 249 250 @param wifi_context: A WiFiTestContextManager. 251 @param channel_config: A dict with 'chnum' and 'expect' keys. 252 253 """ 254 router_freq = hostap_config.HostapConfig.get_frequency_for_channel( 255 channel_config['chnum']) 256 257 # Test scanning behavior, as appropriate. To ensure that, 258 # e.g., AP beacons don't affect the DUT's behavior, this is 259 # done with no AP running. 260 if channel_config['expect'] == 'passive-scan': 261 self.assert_scanning_is_passive( 262 wifi_context.client, wifi_context.router, router_freq) 263 elif channel_config['expect'] == 'no-connect': 264 self.assert_scanning_fails(wifi_context.client, router_freq) 265 266 for hide_ssid in (False, True): # Simple case first. 267 self.test_connect( 268 wifi_context, 269 router_freq, 270 expect_connect=channel_config['expect'] in ( 271 'connect', 'passive-scan'), 272 hide_ssid=hide_ssid) 273 274 275 def run_once(self): 276 """Configures a DUT to behave as if it was manufactured for a 277 particular region. Then verifies that the DUT connects, or 278 fails to connect, per expectations. 279 280 """ 281 num_failures = 0 282 try: 283 self.fake_up_region(self.region_name) 284 self.assert_equal( 285 'country code', 286 iw_runner.IwRunner(self.host).get_regulatory_domain(), 287 self.expected_country_code) 288 num_mismatches = self.count_mismatched_phy_configs( 289 self.host, self.channel_infos) 290 if num_mismatches: 291 raise error.TestFail( 292 '%d phy configs were not as expected (see below)' % 293 num_mismatches) 294 wifi_context = wifi_test_context_manager.WiFiTestContextManager( 295 self.__class__.__name__, 296 self.host, 297 self.cmdline_args, 298 self.debugdir) 299 with wifi_context: 300 wifi_context.router.reboot(timeout=self.REBOOT_TIMEOUT_SECS) 301 for channel_config in self.channel_infos: 302 try: 303 self.test_channel(wifi_context, channel_config) 304 except error.TestFail as e: 305 # Log the error, but keep going. This way, we 306 # get a full report of channels where behavior 307 # differs from expectations. 308 logging.error('Verification failed for |%s|: %s', 309 self.region_name, channel_config) 310 logging.error(e) 311 num_failures += 1 312 finally: 313 if num_failures: 314 raise error.TestFail( 315 'Verification failed for %d channel configs (see below)' % 316 num_failures) 317 self.host.run(self.VPD_CLEAN_COMMAND) 318 self.host.reboot(timeout=self.REBOOT_TIMEOUT_SECS, wait=True) 319