1 # Copyright 2016 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """This class defines the CrosHost Label class.""" 6 7 import collections 8 import logging 9 import os 10 import re 11 12 import common 13 14 from autotest_lib.client.bin import utils 15 from autotest_lib.client.common_lib import global_config 16 from autotest_lib.client.cros.audio import cras_utils 17 from autotest_lib.client.cros.video import constants as video_test_constants 18 from autotest_lib.server.cros.dynamic_suite import constants as ds_constants 19 from autotest_lib.server.hosts import base_label 20 from autotest_lib.server.hosts import common_label 21 from autotest_lib.server.hosts import servo_host 22 from autotest_lib.site_utils import hwid_lib 23 24 # pylint: disable=missing-docstring 25 LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board']) 26 27 def _parse_lsb_output(host): 28 """Parses the LSB output and returns key data points for labeling. 29 30 @param host: Host that the command will be executed against 31 @returns: LsbOutput with the result of parsing the /etc/lsb-release output 32 """ 33 release_info = utils.parse_cmd_output('cat /etc/lsb-release', 34 run_method=host.run) 35 36 unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1' 37 return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD']) 38 39 40 class BoardLabel(base_label.StringPrefixLabel): 41 """Determine the correct board label for the device.""" 42 43 _NAME = ds_constants.BOARD_PREFIX.rstrip(':') 44 45 def generate_labels(self, host): 46 # We only want to apply the board labels once, which is when they get 47 # added to the AFE. That way we don't have to worry about the board 48 # label switching on us if the wrong builds get put on the devices. 49 # crbug.com/624207 records one event of the board label switching 50 # unexpectedly on us. 51 for label in host._afe_host.labels: 52 if label.startswith(self._NAME + ':'): 53 return [label.split(':')[-1]] 54 55 return [_parse_lsb_output(host).board] 56 57 58 class ModelLabel(base_label.StringPrefixLabel): 59 """Determine the correct model label for the device.""" 60 61 _NAME = ds_constants.MODEL_LABEL 62 63 def generate_labels(self, host): 64 # Based on the issue explained in BoardLabel, return the existing 65 # label if it has already been set once. 66 for label in host._afe_host.labels: 67 if label.startswith(self._NAME + ':'): 68 return [label.split(':')[-1]] 69 70 lsb_output = _parse_lsb_output(host) 71 model = None 72 73 if lsb_output.unibuild: 74 test_label_cmd = 'cros_config / test-label' 75 result = host.run(command=test_label_cmd, ignore_status=True) 76 if result.exit_status == 0: 77 model = result.stdout.strip() 78 if not model: 79 mosys_cmd = 'mosys platform model' 80 result = host.run(command=mosys_cmd, ignore_status=True) 81 if result.exit_status == 0: 82 model = result.stdout.strip() 83 84 # We need some sort of backwards compatibility for boards that 85 # are not yet supported with mosys and unified builds. 86 # This is necessary so that we can begin changing cbuildbot to take 87 # advantage of the model/board label differentiations for 88 # scheduling, while still retaining backwards compatibility. 89 return [model or lsb_output.board] 90 91 92 class LightSensorLabel(base_label.BaseLabel): 93 """Label indicating if a light sensor is detected.""" 94 95 _NAME = 'lightsensor' 96 _LIGHTSENSOR_SEARCH_DIR = '/sys/bus/iio/devices' 97 _LIGHTSENSOR_FILES = [ 98 "in_illuminance0_input", 99 "in_illuminance_input", 100 "in_illuminance0_raw", 101 "in_illuminance_raw", 102 "illuminance0_input", 103 ] 104 105 def exists(self, host): 106 search_cmd = "find -L %s -maxdepth 4 | egrep '%s'" % ( 107 self._LIGHTSENSOR_SEARCH_DIR, '|'.join(self._LIGHTSENSOR_FILES)) 108 # Run the search cmd following the symlinks. Stderr_tee is set to 109 # None as there can be a symlink loop, but the command will still 110 # execute correctly with a few messages printed to stderr. 111 result = host.run(search_cmd, stdout_tee=None, stderr_tee=None, 112 ignore_status=True) 113 114 return result.exit_status == 0 115 116 117 class BluetoothLabel(base_label.BaseLabel): 118 """Label indicating if bluetooth is detected.""" 119 120 _NAME = 'bluetooth' 121 122 def exists(self, host): 123 result = host.run('test -d /sys/class/bluetooth/hci0', 124 ignore_status=True) 125 126 return result.exit_status == 0 127 128 129 class ECLabel(base_label.BaseLabel): 130 """Label to determine the type of EC on this host.""" 131 132 _NAME = 'ec:cros' 133 134 def exists(self, host): 135 cmd = 'mosys ec info' 136 # The output should look like these, so that the last field should 137 # match our EC version scheme: 138 # 139 # stm | stm32f100 | snow_v1.3.139-375eb9f 140 # ti | Unknown-10de | peppy_v1.5.114-5d52788 141 # 142 # Non-Chrome OS ECs will look like these: 143 # 144 # ENE | KB932 | 00BE107A00 145 # ite | it8518 | 3.08 146 # 147 # And some systems don't have ECs at all (Lumpy, for example). 148 regexp = r'^.*\|\s*(\S+_v\d+\.\d+\.\d+-[0-9a-f]+)\s*$' 149 150 ecinfo = host.run(command=cmd, ignore_status=True) 151 if ecinfo.exit_status == 0: 152 res = re.search(regexp, ecinfo.stdout) 153 if res: 154 logging.info("EC version is %s", res.groups()[0]) 155 return True 156 logging.info("%s got: %s", cmd, ecinfo.stdout) 157 # Has an EC, but it's not a Chrome OS EC 158 logging.info("%s exited with status %d", cmd, ecinfo.exit_status) 159 return False 160 161 162 class AccelsLabel(base_label.BaseLabel): 163 """Determine the type of accelerometers on this host.""" 164 165 _NAME = 'accel:cros-ec' 166 167 def exists(self, host): 168 # Check to make sure we have ectool 169 rv = host.run('which ectool', ignore_status=True) 170 if rv.exit_status: 171 logging.info("No ectool cmd found; assuming no EC accelerometers") 172 return False 173 174 # Check that the EC supports the motionsense command 175 rv = host.run('ectool motionsense', ignore_status=True) 176 if rv.exit_status: 177 logging.info("EC does not support motionsense command; " 178 "assuming no EC accelerometers") 179 return False 180 181 # Check that EC motion sensors are active 182 active = host.run('ectool motionsense active').stdout.split('\n') 183 if active[0] == "0": 184 logging.info("Motion sense inactive; assuming no EC accelerometers") 185 return False 186 187 logging.info("EC accelerometers found") 188 return True 189 190 191 class ChameleonLabel(base_label.BaseLabel): 192 """Determine if a Chameleon is connected to this host.""" 193 194 _NAME = 'chameleon' 195 196 def exists(self, host): 197 return host._chameleon_host is not None 198 199 200 class ChameleonConnectionLabel(base_label.StringPrefixLabel): 201 """Return the Chameleon connection label.""" 202 203 _NAME = 'chameleon' 204 205 def exists(self, host): 206 return host._chameleon_host is not None 207 208 209 def generate_labels(self, host): 210 return [host.chameleon.get_label()] 211 212 213 class ChameleonPeripheralsLabel(base_label.StringPrefixLabel): 214 """Return the Chameleon peripherals labels. 215 216 The 'chameleon:bt_hid' label is applied if the bluetooth 217 classic hid device, i.e, RN-42 emulation kit, is detected. 218 219 Any peripherals plugged into the chameleon board would be 220 detected and applied proper labels in this class. 221 """ 222 223 _NAME = 'chameleon' 224 225 def exists(self, host): 226 return host._chameleon_host is not None 227 228 229 def generate_labels(self, host): 230 bt_hid_device = host.chameleon.get_bluetooh_hid_mouse() 231 return ['bt_hid'] if bt_hid_device.CheckSerialConnection() else [] 232 233 234 class AudioLoopbackDongleLabel(base_label.BaseLabel): 235 """Return the label if an audio loopback dongle is plugged in.""" 236 237 _NAME = 'audio_loopback_dongle' 238 239 def exists(self, host): 240 nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(), 241 ignore_status=True).stdout 242 if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and 243 cras_utils.node_type_is_plugged('MIC', nodes_info)): 244 return True 245 return False 246 247 248 class PowerSupplyLabel(base_label.StringPrefixLabel): 249 """ 250 Return the label describing the power supply type. 251 252 Labels representing this host's power supply. 253 * `power:battery` when the device has a battery intended for 254 extended use 255 * `power:AC_primary` when the device has a battery not intended 256 for extended use (for moving the machine, etc) 257 * `power:AC_only` when the device has no battery at all. 258 """ 259 260 _NAME = 'power' 261 262 def __init__(self): 263 self.psu_cmd_result = None 264 265 266 def exists(self, host): 267 self.psu_cmd_result = host.run(command='mosys psu type', 268 ignore_status=True) 269 return self.psu_cmd_result.stdout.strip() != 'unknown' 270 271 272 def generate_labels(self, host): 273 if self.psu_cmd_result.exit_status: 274 # The psu command for mosys is not included for all platforms. The 275 # assumption is that the device will have a battery if the command 276 # is not found. 277 return ['battery'] 278 return [self.psu_cmd_result.stdout.strip()] 279 280 281 class StorageLabel(base_label.StringPrefixLabel): 282 """ 283 Return the label describing the storage type. 284 285 Determine if the internal device is SCSI or dw_mmc device. 286 Then check that it is SSD or HDD or eMMC or something else. 287 288 Labels representing this host's internal device type: 289 * `storage:ssd` when internal device is solid state drive 290 * `storage:hdd` when internal device is hard disk drive 291 * `storage:mmc` when internal device is mmc drive 292 * `storage:nvme` when internal device is NVMe drive 293 * None When internal device is something else or 294 when we are unable to determine the type 295 """ 296 297 _NAME = 'storage' 298 299 def __init__(self): 300 self.type_str = '' 301 302 303 def exists(self, host): 304 # The output should be /dev/mmcblk* for SD/eMMC or /dev/sd* for scsi 305 rootdev_cmd = ' '.join(['. /usr/sbin/write_gpt.sh;', 306 '. /usr/share/misc/chromeos-common.sh;', 307 'load_base_vars;', 308 'get_fixed_dst_drive']) 309 rootdev = host.run(command=rootdev_cmd, ignore_status=True) 310 if rootdev.exit_status: 311 logging.info("Fail to run %s", rootdev_cmd) 312 return False 313 rootdev_str = rootdev.stdout.strip() 314 315 if not rootdev_str: 316 return False 317 318 rootdev_base = os.path.basename(rootdev_str) 319 320 mmc_pattern = '/dev/mmcblk[0-9]' 321 if re.match(mmc_pattern, rootdev_str): 322 # Use type to determine if the internal device is eMMC or somthing 323 # else. We can assume that MMC is always an internal device. 324 type_cmd = 'cat /sys/block/%s/device/type' % rootdev_base 325 type = host.run(command=type_cmd, ignore_status=True) 326 if type.exit_status: 327 logging.info("Fail to run %s", type_cmd) 328 return False 329 type_str = type.stdout.strip() 330 331 if type_str == 'MMC': 332 self.type_str = 'mmc' 333 return True 334 335 scsi_pattern = '/dev/sd[a-z]+' 336 if re.match(scsi_pattern, rootdev.stdout): 337 # Read symlink for /sys/block/sd* to determine if the internal 338 # device is connected via ata or usb. 339 link_cmd = 'readlink /sys/block/%s' % rootdev_base 340 link = host.run(command=link_cmd, ignore_status=True) 341 if link.exit_status: 342 logging.info("Fail to run %s", link_cmd) 343 return False 344 link_str = link.stdout.strip() 345 if 'usb' in link_str: 346 return False 347 348 # Read rotation to determine if the internal device is ssd or hdd. 349 rotate_cmd = str('cat /sys/block/%s/queue/rotational' 350 % rootdev_base) 351 rotate = host.run(command=rotate_cmd, ignore_status=True) 352 if rotate.exit_status: 353 logging.info("Fail to run %s", rotate_cmd) 354 return False 355 rotate_str = rotate.stdout.strip() 356 357 rotate_dict = {'0':'ssd', '1':'hdd'} 358 self.type_str = rotate_dict.get(rotate_str) 359 return True 360 361 nvme_pattern = '/dev/nvme[0-9]+n[0-9]+' 362 if re.match(nvme_pattern, rootdev_str): 363 self.type_str = 'nvme' 364 return True 365 366 # All other internal device / error case will always fall here 367 return False 368 369 370 def generate_labels(self, host): 371 return [self.type_str] 372 373 374 class ServoLabel(base_label.BaseLabel): 375 """Label to apply if a servo is present.""" 376 377 _NAME = 'servo' 378 379 def exists(self, host): 380 """ 381 Check if the servo label should apply to the host or not. 382 383 @returns True if a servo host is detected, False otherwise. 384 """ 385 servo_host_hostname = None 386 servo_args, _ = servo_host._get_standard_servo_args(host) 387 if servo_args: 388 servo_host_hostname = servo_args.get(servo_host.SERVO_HOST_ATTR) 389 return (servo_host_hostname is not None 390 and servo_host.servo_host_is_up(servo_host_hostname)) 391 392 393 class VideoLabel(base_label.StringLabel): 394 """Labels detailing video capabilities.""" 395 396 # List gathered from 397 # https://chromium.googlesource.com/chromiumos/ 398 # platform2/+/master/avtest_label_detect/main.c#19 399 # TODO(hiroh): '4k_video' won't be used. It will be removed in the future. 400 _NAME = [ 401 'hw_jpeg_acc_dec', 402 'hw_video_acc_h264', 403 'hw_video_acc_vp8', 404 'hw_video_acc_vp9', 405 'hw_video_acc_enc_h264', 406 'hw_video_acc_enc_vp8', 407 'webcam', 408 '4k_video', 409 '4k_video_h264', 410 '4k_video_vp8', 411 '4k_video_vp9', 412 ] 413 414 def generate_labels(self, host): 415 result = host.run('/usr/local/bin/avtest_label_detect', 416 ignore_status=True).stdout 417 return re.findall('^Detected label: (\w+)$', result, re.M) 418 419 420 class ArcLabel(base_label.BaseLabel): 421 """Label indicates if host has ARC support.""" 422 423 _NAME = 'arc' 424 425 @base_label.forever_exists_decorate 426 def exists(self, host): 427 return 0 == host.run( 428 'grep CHROMEOS_ARC_VERSION /etc/lsb-release', 429 ignore_status=True).exit_status 430 431 432 class CtsArchLabel(base_label.StringLabel): 433 """Labels to determine the abi of the CTS bundle (arm or x86 only).""" 434 # TODO(ihf): create labels for ABIs supported by container like x86_64. 435 436 _NAME = ['cts_abi_arm', 'cts_abi_x86'] 437 438 def _get_cts_abis(self, host): 439 """Return supported CTS ABIs. 440 441 @return List of supported CTS bundle ABIs. 442 """ 443 cts_abis = {'x86_64': ['arm', 'x86'], 'arm': ['arm']} 444 return cts_abis.get(host.get_cpu_arch(), []) 445 446 def generate_labels(self, host): 447 return ['cts_abi_' + abi for abi in self._get_cts_abis(host)] 448 449 450 class SparseCoverageLabel(base_label.StringLabel): 451 """Label indicates if it is desirable to cover a test for this build.""" 452 453 # Prime numbers. We can easily construct 6, 10, 15 and 30 from these. 454 _NAME = ['sparse_coverage_2', 'sparse_coverage_3', 'sparse_coverage_5'] 455 456 def _should_cover(self, host, nth_build): 457 release_info = utils.parse_cmd_output( 458 'cat /etc/lsb-release', run_method=host.run) 459 build = release_info.get('CHROMEOS_RELEASE_BUILD_NUMBER') 460 branch = release_info.get('CHROMEOS_RELEASE_BRANCH_NUMBER') 461 patch = release_info.get('CHROMEOS_RELEASE_PATCH_NUMBER') 462 builder = release_info.get('CHROMEOS_RELEASE_BUILDER_PATH') 463 if not 'release' in builder: 464 # Sparse coverage only makes sense on release/canary builds. 465 return True 466 if patch != '0': 467 # We are on a paladin or pfq build. These are never sparse. 468 # Redundant with release check above but just in case. 469 return True 470 if branch != '0': 471 # We are on a branch. For now these are not sparse. 472 # TODO(ihf): Consider sparse coverage on beta. 473 return True 474 # Now we can be sure we are on master. 475 if int(build) % nth_build == 0: 476 # We only want to cover one in n builds on master. This is the 477 # lucky one. 478 return True 479 # We skip all other builds on master. 480 return False 481 482 def generate_labels(self, host): 483 labels = [] 484 for n in [2, 3, 5]: 485 if self._should_cover(host, n): 486 labels.append('sparse_coverage_%d' % n) 487 return labels 488 489 490 class VideoGlitchLabel(base_label.BaseLabel): 491 """Label indicates if host supports video glitch detection tests.""" 492 493 _NAME = 'video_glitch_detection' 494 495 def exists(self, host): 496 board = host.get_board().replace(ds_constants.BOARD_PREFIX, '') 497 498 return board in video_test_constants.SUPPORTED_BOARDS 499 500 501 class InternalDisplayLabel(base_label.StringLabel): 502 """Label that determines if the device has an internal display.""" 503 504 _NAME = 'internal_display' 505 506 def generate_labels(self, host): 507 from autotest_lib.client.cros.graphics import graphics_utils 508 from autotest_lib.client.common_lib import utils as common_utils 509 510 def __system_output(cmd): 511 return host.run(cmd).stdout 512 513 def __read_file(remote_path): 514 return host.run('cat %s' % remote_path).stdout 515 516 # Hijack the necessary client functions so that we can take advantage 517 # of the client lib here. 518 # FIXME: find a less hacky way than this 519 original_system_output = utils.system_output 520 original_read_file = common_utils.read_file 521 utils.system_output = __system_output 522 common_utils.read_file = __read_file 523 try: 524 return ([self._NAME] 525 if graphics_utils.has_internal_display() 526 else []) 527 finally: 528 utils.system_output = original_system_output 529 common_utils.read_file = original_read_file 530 531 532 class LucidSleepLabel(base_label.BaseLabel): 533 """Label that determines if device has support for lucid sleep.""" 534 535 # TODO(kevcheng): See if we can determine if this label is applicable a 536 # better way (crbug.com/592146). 537 _NAME = 'lucidsleep' 538 LUCID_SLEEP_BOARDS = ['samus', 'lulu'] 539 540 def exists(self, host): 541 board = host.get_board().replace(ds_constants.BOARD_PREFIX, '') 542 return board in self.LUCID_SLEEP_BOARDS 543 544 545 class HWIDLabel(base_label.StringLabel): 546 """Return all the labels generated from the hwid.""" 547 548 # We leave out _NAME because hwid_lib will generate everything for us. 549 550 def __init__(self): 551 # Grab the key file needed to access the hwid service. 552 self.key_file = global_config.global_config.get_config_value( 553 'CROS', 'HWID_KEY', type=str) 554 555 556 def generate_labels(self, host): 557 hwid_labels = [] 558 hwid = host.run_output('crossystem hwid').strip() 559 hwid_info_list = hwid_lib.get_hwid_info(hwid, hwid_lib.HWID_INFO_LABEL, 560 self.key_file).get('labels', []) 561 562 for hwid_info in hwid_info_list: 563 # If it's a prefix, we'll have: 564 # {'name': prefix_label, 'value': postfix_label} and create 565 # 'prefix_label:postfix_label'; otherwise it'll just be 566 # {'name': label} which should just be 'label'. 567 value = hwid_info.get('value', '') 568 name = hwid_info.get('name', '') 569 # There should always be a name but just in case there is not. 570 if name: 571 hwid_labels.append(name if not value else 572 '%s:%s' % (name, value)) 573 return hwid_labels 574 575 576 def get_all_labels(self): 577 """We need to try all labels as a prefix and as standalone. 578 579 We don't know for sure which labels are prefix labels and which are 580 standalone so we try all of them as both. 581 """ 582 all_hwid_labels = [] 583 try: 584 all_hwid_labels = hwid_lib.get_all_possible_dut_labels( 585 self.key_file) 586 except IOError: 587 logging.error('Can not open key file: %s', self.key_file) 588 except hwid_lib.HwIdException as e: 589 logging.error('hwid service: %s', e) 590 return all_hwid_labels, all_hwid_labels 591 592 593 class DetachableBaseLabel(base_label.BaseLabel): 594 """Label indicating if device has detachable keyboard.""" 595 596 _NAME = 'detachablebase' 597 598 def exists(self, host): 599 return host.run('which hammerd', ignore_status=True).exit_status == 0 600 601 602 CROS_LABELS = [ 603 AccelsLabel(), 604 ArcLabel(), 605 AudioLoopbackDongleLabel(), 606 BluetoothLabel(), 607 BoardLabel(), 608 ModelLabel(), 609 ChameleonConnectionLabel(), 610 ChameleonLabel(), 611 ChameleonPeripheralsLabel(), 612 common_label.OSLabel(), 613 CtsArchLabel(), 614 DetachableBaseLabel(), 615 ECLabel(), 616 HWIDLabel(), 617 InternalDisplayLabel(), 618 LightSensorLabel(), 619 LucidSleepLabel(), 620 PowerSupplyLabel(), 621 ServoLabel(), 622 SparseCoverageLabel(), 623 StorageLabel(), 624 VideoGlitchLabel(), 625 VideoLabel(), 626 ] 627