1 # Copyright (c) 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import collections 6 import copy 7 import logging 8 import re 9 import time 10 11 from autotest_lib.client.common_lib import error 12 from autotest_lib.client.common_lib import utils 13 from autotest_lib.client.common_lib.cros.network import iw_event_logger 14 15 # These must mirror the values in 'iw list' output. 16 CHAN_FLAG_DISABLED = 'disabled' 17 CHAN_FLAG_NO_IR = 'no IR' 18 CHAN_FLAG_PASSIVE_SCAN = 'passive scan' 19 CHAN_FLAG_RADAR_DETECT = 'radar detection' 20 DEV_MODE_AP = 'AP' 21 DEV_MODE_IBSS = 'IBSS' 22 DEV_MODE_MONITOR = 'monitor' 23 24 HT20 = 'HT20' 25 HT40_ABOVE = 'HT40+' 26 HT40_BELOW = 'HT40-' 27 28 SECURITY_OPEN = 'open' 29 SECURITY_WEP = 'wep' 30 SECURITY_WPA = 'wpa' 31 SECURITY_WPA2 = 'wpa2' 32 # Mixed mode security is WPA2/WPA 33 SECURITY_MIXED = 'mixed' 34 35 # Table of lookups between the output of item 'secondary channel offset:' from 36 # iw <device> scan to constants. 37 38 HT_TABLE = {'no secondary': HT20, 39 'above': HT40_ABOVE, 40 'below': HT40_BELOW} 41 42 IwBand = collections.namedtuple( 43 'Band', ['num', 'frequencies', 'frequency_flags', 'mcs_indices']) 44 IwBss = collections.namedtuple('IwBss', ['bss', 'frequency', 'ssid', 'security', 45 'ht', 'signal']) 46 IwNetDev = collections.namedtuple('IwNetDev', ['phy', 'if_name', 'if_type']) 47 IwTimedScan = collections.namedtuple('IwTimedScan', ['time', 'bss_list']) 48 49 # The fields for IwPhy are as follows: 50 # name: string name of the phy, such as "phy0" 51 # bands: list of IwBand objects. 52 # modes: List of strings containing interface modes supported, such as "AP". 53 # commands: List of strings containing nl80211 commands supported, such as 54 # "authenticate". 55 # features: List of strings containing nl80211 features supported, such as 56 # "T-DLS". 57 # max_scan_ssids: Maximum number of SSIDs which can be scanned at once. 58 IwPhy = collections.namedtuple( 59 'Phy', ['name', 'bands', 'modes', 'commands', 'features', 60 'max_scan_ssids', 'avail_tx_antennas', 'avail_rx_antennas', 61 'supports_setting_antenna_mask', 'support_vht']) 62 63 DEFAULT_COMMAND_IW = 'iw' 64 65 # Redirect stderr to stdout on Cros since adb commands cannot distinguish them 66 # on Brillo. 67 IW_TIME_COMMAND_FORMAT = '(time -p %s) 2>&1' 68 IW_TIME_COMMAND_OUTPUT_START = 'real' 69 70 IW_LINK_KEY_BEACON_INTERVAL = 'beacon int' 71 IW_LINK_KEY_DTIM_PERIOD = 'dtim period' 72 IW_LINK_KEY_FREQUENCY = 'freq' 73 IW_LOCAL_EVENT_LOG_FILE = './debug/iw_event_%d.log' 74 75 76 class IwRunner(object): 77 """Defines an interface to the 'iw' command.""" 78 79 80 def __init__(self, remote_host=None, command_iw=DEFAULT_COMMAND_IW): 81 self._run = utils.run 82 self._host = remote_host 83 if remote_host: 84 self._run = remote_host.run 85 self._command_iw = command_iw 86 self._log_id = 0 87 88 89 def _parse_scan_results(self, output): 90 """Parse the output of the 'scan' and 'scan dump' commands. 91 92 Here is an example of what a single network would look like for 93 the input parameter. Some fields have been removed in this example: 94 BSS 00:11:22:33:44:55(on wlan0) 95 freq: 2447 96 beacon interval: 100 TUs 97 signal: -46.00 dBm 98 Information elements from Probe Response frame: 99 SSID: my_open_network 100 Extended supported rates: 24.0 36.0 48.0 54.0 101 HT capabilities: 102 Capabilities: 0x0c 103 HT20 104 HT operation: 105 * primary channel: 8 106 * secondary channel offset: no secondary 107 * STA channel width: 20 MHz 108 RSN: * Version: 1 109 * Group cipher: CCMP 110 * Pairwise ciphers: CCMP 111 * Authentication suites: PSK 112 * Capabilities: 1-PTKSA-RC 1-GTKSA-RC (0x0000) 113 114 @param output: string command output. 115 116 @returns a list of IwBss namedtuples; None if the scan fails 117 118 """ 119 bss = None 120 frequency = None 121 ssid = None 122 ht = None 123 signal = None 124 security = None 125 supported_securities = [] 126 bss_list = [] 127 for line in output.splitlines(): 128 line = line.strip() 129 bss_match = re.match('BSS ([0-9a-f:]+)', line) 130 if bss_match: 131 if bss != None: 132 security = self.determine_security(supported_securities) 133 iwbss = IwBss(bss, frequency, ssid, security, ht, signal) 134 bss_list.append(iwbss) 135 bss = frequency = ssid = security = ht = None 136 supported_securities = [] 137 bss = bss_match.group(1) 138 if line.startswith('freq:'): 139 frequency = int(line.split()[1]) 140 if line.startswith('signal:'): 141 signal = float(line.split()[1]) 142 if line.startswith('SSID: '): 143 _, ssid = line.split(': ', 1) 144 if line.startswith('* secondary channel offset'): 145 ht = HT_TABLE[line.split(':')[1].strip()] 146 if line.startswith('WPA'): 147 supported_securities.append(SECURITY_WPA) 148 if line.startswith('RSN'): 149 supported_securities.append(SECURITY_WPA2) 150 security = self.determine_security(supported_securities) 151 bss_list.append(IwBss(bss, frequency, ssid, security, ht, signal)) 152 return bss_list 153 154 155 def _parse_scan_time(self, output): 156 """ 157 Parse the scan time in seconds from the output of the 'time -p "scan"' 158 command. 159 160 'time -p' Command output format is below: 161 real 0.01 162 user 0.01 163 sys 0.00 164 165 @param output: string command output. 166 167 @returns float time in seconds. 168 169 """ 170 output_lines = output.splitlines() 171 for line_num, line in enumerate(output_lines): 172 line = line.strip() 173 if (line.startswith(IW_TIME_COMMAND_OUTPUT_START) and 174 output_lines[line_num + 1].startswith('user') and 175 output_lines[line_num + 2].startswith('sys')): 176 return float(line.split()[1]) 177 raise error.TestFail('Could not parse scan time.') 178 179 180 def add_interface(self, phy, interface, interface_type): 181 """ 182 Add an interface to a WiFi PHY. 183 184 @param phy: string name of PHY to add an interface to. 185 @param interface: string name of interface to add. 186 @param interface_type: string type of interface to add (e.g. 'monitor'). 187 188 """ 189 self._run('%s phy %s interface add %s type %s' % 190 (self._command_iw, phy, interface, interface_type)) 191 192 193 def disconnect_station(self, interface): 194 """ 195 Disconnect a STA from a network. 196 197 @param interface: string name of interface to disconnect. 198 199 """ 200 self._run('%s dev %s disconnect' % (self._command_iw, interface)) 201 202 203 def get_current_bssid(self, interface_name): 204 """Get the BSSID that |interface_name| is associated with. 205 206 @param interface_name: string name of interface (e.g. 'wlan0'). 207 @return string bssid of our current association, or None. 208 209 """ 210 result = self._run('%s dev %s link' % 211 (self._command_iw, interface_name), 212 ignore_status=True) 213 if result.exit_status: 214 # See comment in get_link_value. 215 return None 216 217 # We're looking for a line like: 218 # Connected to 04:f0:21:03:7d:bb (on wlan0) 219 match = re.search( 220 'Connected to ([0-9a-fA-F:]{17}) \\(on %s\\)' % interface_name, 221 result.stdout) 222 if match is None: 223 return None 224 return match.group(1) 225 226 227 def get_interface(self, interface_name): 228 """Get full information about an interface given an interface name. 229 230 @param interface_name: string name of interface (e.g. 'wlan0'). 231 @return IwNetDev tuple. 232 233 """ 234 matching_interfaces = [iw_if for iw_if in self.list_interfaces() 235 if iw_if.if_name == interface_name] 236 if len(matching_interfaces) != 1: 237 raise error.TestFail('Could not find interface named %s' % 238 interface_name) 239 240 return matching_interfaces[0] 241 242 243 def get_link_value(self, interface, iw_link_key): 244 """Get the value of a link property for |interface|. 245 246 This command parses fields of iw link: 247 248 #> iw dev wlan0 link 249 Connected to 74:e5:43:10:4f:c0 (on wlan0) 250 SSID: PMKSACaching_4m9p5_ch1 251 freq: 5220 252 RX: 5370 bytes (37 packets) 253 TX: 3604 bytes (15 packets) 254 signal: -59 dBm 255 tx bitrate: 13.0 MBit/s MCS 1 256 257 bss flags: short-slot-time 258 dtim period: 5 259 beacon int: 100 260 261 @param iw_link_key: string one of IW_LINK_KEY_* defined above. 262 @param interface: string desired value of iw link property. 263 264 """ 265 result = self._run('%s dev %s link' % (self._command_iw, interface), 266 ignore_status=True) 267 if result.exit_status: 268 # When roaming, there is a period of time for mac80211 based drivers 269 # when the driver is 'associated' with an SSID but not a particular 270 # BSS. This causes iw to return an error code (-2) when attempting 271 # to retrieve information specific to the BSS. This does not happen 272 # in mwifiex drivers. 273 return None 274 275 find_re = re.compile('\s*%s:\s*(.*\S)\s*$' % iw_link_key) 276 find_results = filter(bool, 277 map(find_re.match, result.stdout.splitlines())) 278 if not find_results: 279 return None 280 281 actual_value = find_results[0].group(1) 282 logging.info('Found iw link key %s with value %s.', 283 iw_link_key, actual_value) 284 return actual_value 285 286 287 def ibss_join(self, interface, ssid, frequency): 288 """ 289 Join a WiFi interface to an IBSS. 290 291 @param interface: string name of interface to join to the IBSS. 292 @param ssid: string SSID of IBSS to join. 293 @param frequency: int frequency of IBSS in Mhz. 294 295 """ 296 self._run('%s dev %s ibss join %s %d' % 297 (self._command_iw, interface, ssid, frequency)) 298 299 300 def ibss_leave(self, interface): 301 """ 302 Leave an IBSS. 303 304 @param interface: string name of interface to remove from the IBSS. 305 306 """ 307 self._run('%s dev %s ibss leave' % (self._command_iw, interface)) 308 309 310 def list_interfaces(self, desired_if_type=None): 311 """List WiFi related interfaces on this system. 312 313 @param desired_if_type: string type of interface to filter 314 our returned list of interfaces for (e.g. 'managed'). 315 316 @return list of IwNetDev tuples. 317 318 """ 319 320 # Parse output in the following format: 321 # 322 # $ adb shell iw dev 323 # phy#0 324 # Unnamed/non-netdev interface 325 # wdev 0x2 326 # addr aa:bb:cc:dd:ee:ff 327 # type P2P-device 328 # Interface wlan0 329 # ifindex 4 330 # wdev 0x1 331 # addr aa:bb:cc:dd:ee:ff 332 # ssid Whatever 333 # type managed 334 335 output = self._run('%s dev' % self._command_iw).stdout 336 interfaces = [] 337 phy = None 338 if_name = None 339 if_type = None 340 for line in output.splitlines(): 341 m = re.match('phy#([0-9]+)', line) 342 if m: 343 phy = 'phy%d' % int(m.group(1)) 344 if_name = None 345 if_type = None 346 continue 347 if not phy: 348 continue 349 m = re.match('[\s]*Interface (.*)', line) 350 if m: 351 if_name = m.group(1) 352 continue 353 if not if_name: 354 continue 355 # Common values for type are 'managed', 'monitor', and 'IBSS'. 356 m = re.match('[\s]*type ([a-zA-Z]+)', line) 357 if m: 358 if_type = m.group(1) 359 interfaces.append(IwNetDev(phy=phy, if_name=if_name, 360 if_type=if_type)) 361 # One phy may have many interfaces, so don't reset it. 362 if_name = None 363 364 if desired_if_type: 365 interfaces = [interface for interface in interfaces 366 if interface.if_type == desired_if_type] 367 return interfaces 368 369 370 def list_phys(self): 371 """ 372 List WiFi PHYs on the given host. 373 374 @return list of IwPhy tuples. 375 376 """ 377 output = self._run('%s list' % self._command_iw).stdout 378 379 pending_phy_name = None 380 current_band = None 381 current_section = None 382 all_phys = [] 383 384 def add_pending_phy(): 385 """Add the pending phy into |all_phys|.""" 386 bands = tuple(IwBand(band.num, 387 tuple(band.frequencies), 388 dict(band.frequency_flags), 389 tuple(band.mcs_indices)) 390 for band in pending_phy_bands) 391 new_phy = IwPhy(pending_phy_name, 392 bands, 393 tuple(pending_phy_modes), 394 tuple(pending_phy_commands), 395 tuple(pending_phy_features), 396 pending_phy_max_scan_ssids, 397 pending_phy_tx_antennas, 398 pending_phy_rx_antennas, 399 pending_phy_tx_antennas and pending_phy_rx_antennas, 400 pending_phy_support_vht) 401 all_phys.append(new_phy) 402 403 for line in output.splitlines(): 404 match_phy = re.search('Wiphy (.*)', line) 405 if match_phy: 406 if pending_phy_name: 407 add_pending_phy() 408 pending_phy_name = match_phy.group(1) 409 pending_phy_bands = [] 410 pending_phy_modes = [] 411 pending_phy_commands = [] 412 pending_phy_features = [] 413 pending_phy_max_scan_ssids = None 414 pending_phy_tx_antennas = 0 415 pending_phy_rx_antennas = 0 416 pending_phy_support_vht = False 417 continue 418 419 match_section = re.match('\s*(\w.*):\s*$', line) 420 if match_section: 421 current_section = match_section.group(1) 422 match_band = re.match('Band (\d+)', current_section) 423 if match_band: 424 current_band = IwBand(num=int(match_band.group(1)), 425 frequencies=[], 426 frequency_flags={}, 427 mcs_indices=[]) 428 pending_phy_bands.append(current_band) 429 continue 430 431 # Check for max_scan_ssids. This isn't a section, but it 432 # also isn't within a section. 433 match_max_scan_ssids = re.match('\s*max # scan SSIDs: (\d+)', 434 line) 435 if match_max_scan_ssids and pending_phy_name: 436 pending_phy_max_scan_ssids = int( 437 match_max_scan_ssids.group(1)) 438 continue 439 440 if (current_section == 'Supported interface modes' and 441 pending_phy_name): 442 mode_match = re.search('\* (\w+)', line) 443 if mode_match: 444 pending_phy_modes.append(mode_match.group(1)) 445 continue 446 447 if current_section == 'Supported commands' and pending_phy_name: 448 command_match = re.search('\* (\w+)', line) 449 if command_match: 450 pending_phy_commands.append(command_match.group(1)) 451 continue 452 453 if (current_section is not None and 454 current_section.startswith('VHT Capabilities') and 455 pending_phy_name): 456 pending_phy_support_vht = True 457 continue 458 459 match_avail_antennas = re.match('\s*Available Antennas: TX (\S+)' 460 ' RX (\S+)', line) 461 if match_avail_antennas and pending_phy_name: 462 pending_phy_tx_antennas = int( 463 match_avail_antennas.group(1), 16) 464 pending_phy_rx_antennas = int( 465 match_avail_antennas.group(2), 16) 466 continue 467 468 match_device_support = re.match('\s*Device supports (.*)\.', line) 469 if match_device_support and pending_phy_name: 470 pending_phy_features.append(match_device_support.group(1)) 471 continue 472 473 if not all([current_band, pending_phy_name, 474 line.startswith('\t')]): 475 continue 476 477 # E.g. 478 # * 2412 MHz [1] (20.0 dBm) 479 # * 2467 MHz [12] (20.0 dBm) (passive scan) 480 # * 2472 MHz [13] (disabled) 481 # * 5260 MHz [52] (19.0 dBm) (no IR, radar detection) 482 match_chan_info = re.search( 483 r'(?P<frequency>\d+) MHz' 484 r' (?P<chan_num>\[\d+\])' 485 r'(?: \((?P<tx_power_limit>[0-9.]+ dBm)\))?' 486 r'(?: \((?P<flags>[a-zA-Z, ]+)\))?', line) 487 if match_chan_info: 488 frequency = int(match_chan_info.group('frequency')) 489 current_band.frequencies.append(frequency) 490 flags_string = match_chan_info.group('flags') 491 if flags_string: 492 current_band.frequency_flags[frequency] = frozenset( 493 flags_string.split(',')) 494 else: 495 # Populate the dict with an empty set, to make 496 # things uniform for client code. 497 current_band.frequency_flags[frequency] = frozenset() 498 continue 499 500 # re_mcs needs to match something like: 501 # HT TX/RX MCS rate indexes supported: 0-15, 32 502 if re.search('HT TX/RX MCS rate indexes supported: ', line): 503 rate_string = line.split(':')[1].strip() 504 for piece in rate_string.split(','): 505 if piece.find('-') > 0: 506 # Must be a range like ' 0-15' 507 begin, end = piece.split('-') 508 for index in range(int(begin), int(end) + 1): 509 current_band.mcs_indices.append(index) 510 else: 511 # Must be a single rate like '32 ' 512 current_band.mcs_indices.append(int(piece)) 513 if pending_phy_name: 514 add_pending_phy() 515 return all_phys 516 517 518 def remove_interface(self, interface, ignore_status=False): 519 """ 520 Remove a WiFi interface from a PHY. 521 522 @param interface: string name of interface (e.g. mon0) 523 @param ignore_status: boolean True iff we should ignore failures 524 to remove the interface. 525 526 """ 527 self._run('%s dev %s del' % (self._command_iw, interface), 528 ignore_status=ignore_status) 529 530 531 def determine_security(self, supported_securities): 532 """Determines security from the given list of supported securities. 533 534 @param supported_securities: list of supported securities from scan 535 536 """ 537 if not supported_securities: 538 security = SECURITY_OPEN 539 elif len(supported_securities) == 1: 540 security = supported_securities[0] 541 else: 542 security = SECURITY_MIXED 543 return security 544 545 546 def scan(self, interface, frequencies=(), ssids=()): 547 """Performs a scan. 548 549 @param interface: the interface to run the iw command against 550 @param frequencies: list of int frequencies in Mhz to scan. 551 @param ssids: list of string SSIDs to send probe requests for. 552 553 @returns a list of IwBss namedtuples; None if the scan fails 554 555 """ 556 scan_result = self.timed_scan(interface, frequencies, ssids) 557 if scan_result is None: 558 return None 559 return scan_result.bss_list 560 561 562 def timed_scan(self, interface, frequencies=(), ssids=()): 563 """Performs a timed scan. 564 565 @param interface: the interface to run the iw command against 566 @param frequencies: list of int frequencies in Mhz to scan. 567 @param ssids: list of string SSIDs to send probe requests for. 568 569 @returns a IwTimedScan namedtuple; None if the scan fails 570 571 """ 572 freq_param = '' 573 if frequencies: 574 freq_param = ' freq %s' % ' '.join(map(str, frequencies)) 575 ssid_param = '' 576 if ssids: 577 ssid_param = ' ssid "%s"' % '" "'.join(ssids) 578 579 iw_command = '%s dev %s scan%s%s' % (self._command_iw, 580 interface, freq_param, ssid_param) 581 command = IW_TIME_COMMAND_FORMAT % iw_command 582 scan = self._run(command, ignore_status=True) 583 if scan.exit_status != 0: 584 # The device was busy 585 logging.debug('scan exit_status: %d', scan.exit_status) 586 return None 587 if not scan.stdout: 588 raise error.TestFail('Missing scan parse time') 589 590 if scan.stdout.startswith(IW_TIME_COMMAND_OUTPUT_START): 591 logging.debug('Empty scan result') 592 bss_list = [] 593 else: 594 bss_list = self._parse_scan_results(scan.stdout) 595 scan_time = self._parse_scan_time(scan.stdout) 596 return IwTimedScan(scan_time, bss_list) 597 598 599 def scan_dump(self, interface): 600 """Dump the contents of the scan cache. 601 602 Note that this does not trigger a scan. Instead, it returns 603 the kernel's idea of what BSS's are currently visible. 604 605 @param interface: the interface to run the iw command against 606 607 @returns a list of IwBss namedtuples; None if the scan fails 608 609 """ 610 result = self._run('%s dev %s scan dump' % (self._command_iw, 611 interface)) 612 return self._parse_scan_results(result.stdout) 613 614 615 def set_tx_power(self, interface, power): 616 """ 617 Set the transmission power for an interface. 618 619 @param interface: string name of interface to set Tx power on. 620 @param power: string power parameter. (e.g. 'auto'). 621 622 """ 623 self._run('%s dev %s set txpower %s' % 624 (self._command_iw, interface, power)) 625 626 627 def set_freq(self, interface, freq): 628 """ 629 Set the frequency for an interface. 630 631 @param interface: string name of interface to set frequency on. 632 @param freq: int frequency 633 634 """ 635 self._run('%s dev %s set freq %d' % 636 (self._command_iw, interface, freq)) 637 638 639 def set_regulatory_domain(self, domain_string): 640 """ 641 Set the regulatory domain of the current machine. Note that 642 the regulatory change happens asynchronously to the exit of 643 this function. 644 645 @param domain_string: string regulatory domain name (e.g. 'US'). 646 647 """ 648 self._run('%s reg set %s' % (self._command_iw, domain_string)) 649 650 651 def get_regulatory_domain(self): 652 """ 653 Get the regulatory domain of the current machine. 654 655 @returns a string containing the 2-letter regulatory domain name 656 (e.g. 'US'). 657 658 """ 659 output = self._run('%s reg get' % self._command_iw).stdout 660 m = re.match('^country (..):', output) 661 if not m: 662 return None 663 return m.group(1) 664 665 666 def wait_for_scan_result(self, interface, bsses=(), ssids=(), 667 timeout_seconds=30, wait_for_all=False): 668 """Returns a list of IWBSS objects for given list of bsses or ssids. 669 670 This method will scan for a given timeout and return all of the networks 671 that have a matching ssid or bss. If wait_for_all is true and all 672 networks are not found within the given timeout an empty list will 673 be returned. 674 675 @param interface: which interface to run iw against 676 @param bsses: a list of BSS strings 677 @param ssids: a list of ssid strings 678 @param timeout_seconds: the amount of time to wait in seconds 679 @param wait_for_all: True to wait for all listed bsses or ssids; False 680 to return if any of the networks were found 681 682 @returns a list of IwBss collections that contain the given bss or ssid; 683 if the scan is empty or returns an error code None is returned. 684 685 """ 686 start_time = time.time() 687 scan_failure_attempts = 0 688 logging.info('Performing a scan with a max timeout of %d seconds.', 689 timeout_seconds) 690 remaining_bsses = copy.copy(bsses) 691 remaining_ssids = copy.copy(ssids) 692 while time.time() - start_time < timeout_seconds: 693 scan_results = self.scan(interface) 694 if scan_results is None or len(scan_results) == 0: 695 scan_failure_attempts += 1 696 # Allow in-progress scan to complete 697 time.sleep(5) 698 # If the in-progress scan takes more than 30 seconds to 699 # complete it will most likely never complete; abort. 700 # See crbug.com/309148. 701 if scan_failure_attempts > 5: 702 logging.error('Scan failed to run, see debug log for ' 703 'error code.') 704 return None 705 continue 706 scan_failure_attempts = 0 707 matching_iwbsses = set() 708 for iwbss in scan_results: 709 if iwbss.bss in bsses and len(remaining_bsses) > 0: 710 remaining_bsses.remove(iwbss.bss) 711 matching_iwbsses.add(iwbss) 712 if iwbss.ssid in ssids and len(remaining_ssids) > 0: 713 remaining_ssids.remove(iwbss.ssid) 714 matching_iwbsses.add(iwbss) 715 if wait_for_all: 716 if len(remaining_bsses) == 0 and len(remaining_ssids) == 0: 717 return list(matching_iwbsses) 718 else: 719 if len(matching_iwbsses) > 0: 720 return list(matching_iwbsses) 721 722 723 if scan_failure_attempts > 0: 724 return None 725 # The SSID wasn't found, but the device is fine. 726 return list() 727 728 729 def wait_for_link(self, interface, timeout_seconds=10): 730 """Waits until a link completes on |interface|. 731 732 @param interface: which interface to run iw against. 733 @param timeout_seconds: the amount of time to wait in seconds. 734 735 @returns True if link was established before the timeout. 736 737 """ 738 start_time = time.time() 739 while time.time() - start_time < timeout_seconds: 740 link_results = self._run('%s dev %s link' % 741 (self._command_iw, interface)) 742 if 'Not connected' not in link_results.stdout: 743 return True 744 time.sleep(1) 745 return False 746 747 748 def set_antenna_bitmap(self, phy, tx_bitmap, rx_bitmap): 749 """Set antenna chain mask on given phy (radio). 750 751 This function will set the antennas allowed to use for TX and 752 RX on the |phy| based on the |tx_bitmap| and |rx_bitmap|. 753 This command is only allowed when the interfaces on the phy are down. 754 755 @param phy: phy name 756 @param tx_bitmap: bitmap of allowed antennas to use for TX 757 @param rx_bitmap: bitmap of allowed antennas to use for RX 758 759 """ 760 command = '%s phy %s set antenna %d %d' % (self._command_iw, phy, 761 tx_bitmap, rx_bitmap) 762 self._run(command) 763 764 765 def get_event_logger(self): 766 """Create and return a IwEventLogger object. 767 768 @returns a IwEventLogger object. 769 770 """ 771 local_file = IW_LOCAL_EVENT_LOG_FILE % (self._log_id) 772 self._log_id += 1 773 return iw_event_logger.IwEventLogger(self._host, self._command_iw, 774 local_file) 775 776 777 def vht_supported(self): 778 """Returns True if VHT is supported; False otherwise.""" 779 result = self._run('%s list' % self._command_iw).stdout 780 if 'VHT Capabilities' in result: 781 return True 782 return False 783 784 785 def frequency_supported(self, frequency): 786 """Returns True if the given frequency is supported; False otherwise. 787 788 @param frequency: int Wifi frequency to check if it is supported by 789 DUT. 790 """ 791 phys = self.list_phys() 792 for phy in phys: 793 for band in phy.bands: 794 if frequency in band.frequencies: 795 return True 796 return False 797