Home | History | Annotate | Download | only in hosts
      1 #!/usr/bin/python
      2 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 import itertools
      7 import mock
      8 import unittest
      9 
     10 import common
     11 from autotest_lib.client.common_lib import error
     12 from autotest_lib.client.common_lib import hosts
     13 from autotest_lib.client.common_lib.cros import retry
     14 from autotest_lib.server.hosts import cros_firmware
     15 from autotest_lib.server.hosts import cros_repair
     16 from autotest_lib.server.hosts import repair_utils
     17 
     18 
     19 CROS_VERIFY_DAG = (
     20     (repair_utils.SshVerifier, 'ssh', ()),
     21     (cros_repair.DevModeVerifier, 'devmode', ('ssh',)),
     22     (cros_repair.HWIDVerifier,    'hwid',    ('ssh',)),
     23     (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
     24     (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)),
     25     (cros_repair.WritableVerifier, 'writable', ('ssh',)),
     26     (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)),
     27     (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)),
     28     (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)),
     29     (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
     30     (cros_repair.PythonVerifier, 'python', ('ssh',)),
     31     (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
     32     (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)),
     33 )
     34 
     35 CROS_REPAIR_ACTIONS = (
     36     (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
     37     (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)),
     38     (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)),
     39     (cros_firmware.FirmwareRepair,
     40      'firmware', (), ('ssh', 'fwstatus', 'good_au')),
     41     (cros_repair.CrosRebootRepair,
     42      'reboot', ('ssh',), ('devmode', 'writable',)),
     43     (cros_repair.ColdRebootRepair,
     44      'coldboot', ('ssh',), ('ec_reset',)),
     45     (cros_repair.AutoUpdateRepair,
     46      'au',
     47      ('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
     48      ('power', 'rwfw', 'python', 'cros')),
     49     (cros_repair.PowerWashRepair,
     50      'powerwash',
     51      ('ssh', 'writable'),
     52      ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros')),
     53     (cros_repair.ServoInstallRepair,
     54      'usb',
     55      (),
     56      ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw',
     57       'python', 'cros')),
     58 )
     59 
     60 MOBLAB_VERIFY_DAG = (
     61     (repair_utils.SshVerifier, 'ssh', ()),
     62     (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
     63     (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
     64     (cros_repair.PythonVerifier, 'python', ('ssh',)),
     65     (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
     66 )
     67 
     68 MOBLAB_REPAIR_ACTIONS = (
     69     (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
     70     (cros_repair.AutoUpdateRepair,
     71      'au', ('ssh',), ('power', 'rwfw', 'python', 'cros',)),
     72 )
     73 
     74 JETSTREAM_VERIFY_DAG = (
     75     (repair_utils.SshVerifier, 'ssh', ()),
     76     (cros_repair.DevModeVerifier, 'devmode', ('ssh',)),
     77     (cros_repair.HWIDVerifier,    'hwid',    ('ssh',)),
     78     (cros_repair.ACPowerVerifier, 'power', ('ssh',)),
     79     (cros_repair.EXT4fsErrorVerifier, 'ext4', ('ssh',)),
     80     (cros_repair.WritableVerifier, 'writable', ('ssh',)),
     81     (cros_repair.TPMStatusVerifier, 'tpm', ('ssh',)),
     82     (cros_repair.UpdateSuccessVerifier, 'good_au', ('ssh',)),
     83     (cros_firmware.FirmwareStatusVerifier, 'fwstatus', ('ssh',)),
     84     (cros_firmware.FirmwareVersionVerifier, 'rwfw', ('ssh',)),
     85     (cros_repair.PythonVerifier, 'python', ('ssh',)),
     86     (repair_utils.LegacyHostVerifier, 'cros', ('ssh',)),
     87     (cros_repair.KvmExistsVerifier, 'ec_reset', ('ssh',)),
     88     (cros_repair.JetstreamTpmVerifier, 'jetstream_tpm', ('ssh',)),
     89     (cros_repair.JetstreamAttestationVerifier, 'jetstream_attestation',
     90      ('ssh',)),
     91     (cros_repair.JetstreamServicesVerifier, 'jetstream_services', ('ssh',)),
     92 )
     93 
     94 JETSTREAM_REPAIR_ACTIONS = (
     95     (repair_utils.RPMCycleRepair, 'rpm', (), ('ssh', 'power',)),
     96     (cros_repair.ServoSysRqRepair, 'sysrq', (), ('ssh',)),
     97     (cros_repair.ServoResetRepair, 'servoreset', (), ('ssh',)),
     98     (cros_firmware.FirmwareRepair,
     99      'firmware', (), ('ssh', 'fwstatus', 'good_au')),
    100     (cros_repair.CrosRebootRepair,
    101      'reboot', ('ssh',), ('devmode', 'writable',)),
    102     (cros_repair.ColdRebootRepair,
    103      'coldboot', ('ssh',), ('ec_reset',)),
    104     (cros_repair.JetstreamTpmRepair,
    105      'jetstream_tpm_repair',
    106      ('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
    107      ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
    108       'jetstream_attestation')),
    109     (cros_repair.JetstreamServiceRepair,
    110      'jetstream_service_repair',
    111      ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'jetstream_tpm',
    112       'jetstream_attestation'),
    113      ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
    114       'jetstream_attestation', 'jetstream_services')),
    115     (cros_repair.AutoUpdateRepair,
    116      'au',
    117      ('ssh', 'writable', 'tpm', 'good_au', 'ext4'),
    118      ('power', 'rwfw', 'python', 'cros', 'jetstream_tpm',
    119       'jetstream_attestation', 'jetstream_services')),
    120     (cros_repair.PowerWashRepair,
    121      'powerwash',
    122      ('ssh', 'writable'),
    123      ('tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python', 'cros',
    124       'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')),
    125     (cros_repair.ServoInstallRepair,
    126      'usb',
    127      (),
    128      ('ssh', 'writable', 'tpm', 'good_au', 'ext4', 'power', 'rwfw', 'python',
    129       'cros', 'jetstream_tpm', 'jetstream_attestation', 'jetstream_services')),
    130 )
    131 
    132 CRYPTOHOME_STATUS_OWNED = """{
    133    "installattrs": {
    134       "first_install": true,
    135       "initialized": true,
    136       "invalid": false,
    137       "lockbox_index": 536870916,
    138       "lockbox_nvram_version": 2,
    139       "secure": true,
    140       "size": 0,
    141       "version": 1
    142    },
    143    "mounts": [  ],
    144    "tpm": {
    145       "being_owned": false,
    146       "can_connect": true,
    147       "can_decrypt": false,
    148       "can_encrypt": false,
    149       "can_load_srk": true,
    150       "can_load_srk_pubkey": true,
    151       "enabled": true,
    152       "has_context": true,
    153       "has_cryptohome_key": false,
    154       "has_key_handle": false,
    155       "last_error": 0,
    156       "owned": true
    157    }
    158 }
    159 """
    160 
    161 CRYPTOHOME_STATUS_NOT_OWNED = """{
    162    "installattrs": {
    163       "first_install": true,
    164       "initialized": true,
    165       "invalid": false,
    166       "lockbox_index": 536870916,
    167       "lockbox_nvram_version": 2,
    168       "secure": true,
    169       "size": 0,
    170       "version": 1
    171    },
    172    "mounts": [  ],
    173    "tpm": {
    174       "being_owned": false,
    175       "can_connect": true,
    176       "can_decrypt": false,
    177       "can_encrypt": false,
    178       "can_load_srk": false,
    179       "can_load_srk_pubkey": false,
    180       "enabled": true,
    181       "has_context": true,
    182       "has_cryptohome_key": false,
    183       "has_key_handle": false,
    184       "last_error": 0,
    185       "owned": false
    186    }
    187 }
    188 """
    189 
    190 CRYPTOHOME_STATUS_CANNOT_LOAD_SRK = """{
    191    "installattrs": {
    192       "first_install": true,
    193       "initialized": true,
    194       "invalid": false,
    195       "lockbox_index": 536870916,
    196       "lockbox_nvram_version": 2,
    197       "secure": true,
    198       "size": 0,
    199       "version": 1
    200    },
    201    "mounts": [  ],
    202    "tpm": {
    203       "being_owned": false,
    204       "can_connect": true,
    205       "can_decrypt": false,
    206       "can_encrypt": false,
    207       "can_load_srk": false,
    208       "can_load_srk_pubkey": false,
    209       "enabled": true,
    210       "has_context": true,
    211       "has_cryptohome_key": false,
    212       "has_key_handle": false,
    213       "last_error": 0,
    214       "owned": true
    215    }
    216 }
    217 """
    218 
    219 TPM_STATUS_READY = """
    220 TPM Enabled: true
    221 TPM Owned: true
    222 TPM Being Owned: false
    223 TPM Ready: true
    224 TPM Password: 9eaee4da8b4c
    225 """
    226 
    227 TPM_STATUS_NOT_READY = """
    228 TPM Enabled: true
    229 TPM Owned: false
    230 TPM Being Owned: true
    231 TPM Ready: false
    232 TPM Password:
    233 """
    234 
    235 
    236 class CrosRepairUnittests(unittest.TestCase):
    237     # pylint: disable=missing-docstring
    238 
    239     maxDiff = None
    240 
    241     def test_cros_repair(self):
    242         verify_dag = cros_repair._cros_verify_dag()
    243         self.assertTupleEqual(verify_dag, CROS_VERIFY_DAG)
    244         self.check_verify_dag(verify_dag)
    245         repair_actions = cros_repair._cros_repair_actions()
    246         self.assertTupleEqual(repair_actions, CROS_REPAIR_ACTIONS)
    247         self.check_repair_actions(verify_dag, repair_actions)
    248 
    249     def test_moblab_repair(self):
    250         verify_dag = cros_repair._moblab_verify_dag()
    251         self.assertTupleEqual(verify_dag, MOBLAB_VERIFY_DAG)
    252         self.check_verify_dag(verify_dag)
    253         repair_actions = cros_repair._moblab_repair_actions()
    254         self.assertTupleEqual(repair_actions, MOBLAB_REPAIR_ACTIONS)
    255         self.check_repair_actions(verify_dag, repair_actions)
    256 
    257     def test_jetstream_repair(self):
    258         verify_dag = cros_repair._jetstream_verify_dag()
    259         self.assertTupleEqual(verify_dag, JETSTREAM_VERIFY_DAG)
    260         self.check_verify_dag(verify_dag)
    261         repair_actions = cros_repair._jetstream_repair_actions()
    262         self.assertTupleEqual(repair_actions, JETSTREAM_REPAIR_ACTIONS)
    263         self.check_repair_actions(verify_dag, repair_actions)
    264 
    265     def check_verify_dag(self, verify_dag):
    266         """Checks that dependency labels are defined."""
    267         labels = [n[1] for n in verify_dag]
    268         for node in verify_dag:
    269             for dep in node[2]:
    270                 self.assertIn(dep, labels)
    271 
    272     def check_repair_actions(self, verify_dag, repair_actions):
    273         """Checks that dependency and trigger labels are defined."""
    274         verify_labels = [n[1] for n in verify_dag]
    275         for action in repair_actions:
    276             deps = action[2]
    277             triggers = action[3]
    278             for label in deps + triggers:
    279                 self.assertIn(label, verify_labels)
    280 
    281     def test_get_cryptohome_status_owned(self):
    282         mock_host = mock.Mock()
    283         mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
    284         status = cros_repair.CryptohomeStatus(mock_host)
    285         self.assertDictEqual({
    286             'being_owned': False,
    287             'can_connect': True,
    288             'can_decrypt': False,
    289             'can_encrypt': False,
    290             'can_load_srk': True,
    291             'can_load_srk_pubkey': True,
    292             'enabled': True,
    293             'has_context': True,
    294             'has_cryptohome_key': False,
    295             'has_key_handle': False,
    296             'last_error': 0,
    297             'owned': True,
    298             }, status['tpm'])
    299         self.assertTrue(status.tpm_enabled)
    300         self.assertTrue(status.tpm_owned)
    301         self.assertTrue(status.tpm_can_load_srk)
    302         self.assertTrue(status.tpm_can_load_srk_pubkey)
    303 
    304     def test_get_cryptohome_status_not_owned(self):
    305         mock_host = mock.Mock()
    306         mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
    307         status = cros_repair.CryptohomeStatus(mock_host)
    308         self.assertDictEqual({
    309             'being_owned': False,
    310             'can_connect': True,
    311             'can_decrypt': False,
    312             'can_encrypt': False,
    313             'can_load_srk': False,
    314             'can_load_srk_pubkey': False,
    315             'enabled': True,
    316             'has_context': True,
    317             'has_cryptohome_key': False,
    318             'has_key_handle': False,
    319             'last_error': 0,
    320             'owned': False,
    321         }, status['tpm'])
    322         self.assertTrue(status.tpm_enabled)
    323         self.assertFalse(status.tpm_owned)
    324         self.assertFalse(status.tpm_can_load_srk)
    325         self.assertFalse(status.tpm_can_load_srk_pubkey)
    326 
    327     @mock.patch.object(cros_repair, '_is_virtual_machine')
    328     def test_tpm_status_verifier_owned(self, mock_is_virt):
    329         mock_is_virt.return_value = False
    330         mock_host = mock.Mock()
    331         mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_OWNED
    332         tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
    333         tpm_verifier.verify(mock_host)
    334 
    335     @mock.patch.object(cros_repair, '_is_virtual_machine')
    336     def test_tpm_status_verifier_not_owned(self, mock_is_virt):
    337         mock_is_virt.return_value = False
    338         mock_host = mock.Mock()
    339         mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
    340         tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
    341         tpm_verifier.verify(mock_host)
    342 
    343     @mock.patch.object(cros_repair, '_is_virtual_machine')
    344     def test_tpm_status_verifier_cannot_load_srk_pubkey(self, mock_is_virt):
    345         mock_is_virt.return_value = False
    346         mock_host = mock.Mock()
    347         mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_CANNOT_LOAD_SRK
    348         tpm_verifier = cros_repair.TPMStatusVerifier('test', [])
    349         with self.assertRaises(hosts.AutoservVerifyError) as ctx:
    350             tpm_verifier.verify(mock_host)
    351         self.assertEqual('Cannot load the TPM SRK',
    352                          ctx.exception.message)
    353 
    354     def test_jetstream_tpm_owned(self):
    355         mock_host = mock.Mock()
    356         mock_host.run.side_effect = [
    357             mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
    358             mock.Mock(stdout=TPM_STATUS_READY),
    359         ]
    360         tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
    361         tpm_verifier.verify(mock_host)
    362 
    363     @mock.patch.object(retry.logging, 'warning')
    364     @mock.patch.object(retry.time, 'time')
    365     @mock.patch.object(retry.time, 'sleep')
    366     def test_jetstream_tpm_not_owned(self, mock_sleep, mock_time, mock_logging):
    367         mock_time.side_effect = itertools.count(0, 20)
    368         mock_host = mock.Mock()
    369         mock_host.run.return_value.stdout = CRYPTOHOME_STATUS_NOT_OWNED
    370         tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
    371         with self.assertRaises(hosts.AutoservVerifyError) as ctx:
    372             tpm_verifier.verify(mock_host)
    373         self.assertEqual('TPM is not owned', ctx.exception.message)
    374 
    375     @mock.patch.object(retry.logging, 'warning')
    376     @mock.patch.object(retry.time, 'time')
    377     @mock.patch.object(retry.time, 'sleep')
    378     def test_jetstream_tpm_not_ready(self, mock_sleep, mock_time, mock_logging):
    379         mock_time.side_effect = itertools.count(0, 20)
    380         mock_host = mock.Mock()
    381         mock_host.run.side_effect = itertools.cycle([
    382             mock.Mock(stdout=CRYPTOHOME_STATUS_OWNED),
    383             mock.Mock(stdout=TPM_STATUS_NOT_READY),
    384         ])
    385         tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
    386         with self.assertRaises(hosts.AutoservVerifyError) as ctx:
    387             tpm_verifier.verify(mock_host)
    388         self.assertEqual('TPM is not ready', ctx.exception.message)
    389 
    390     @mock.patch.object(retry.logging, 'warning')
    391     @mock.patch.object(retry.time, 'time')
    392     @mock.patch.object(retry.time, 'sleep')
    393     def test_jetstream_cryptohome_missing(self, mock_sleep, mock_time,
    394                                           mock_logging):
    395         mock_time.side_effect = itertools.count(0, 20)
    396         mock_host = mock.Mock()
    397         mock_host.run.side_effect = error.AutoservRunError('test', None)
    398         tpm_verifier = cros_repair.JetstreamTpmVerifier('test', [])
    399         with self.assertRaises(hosts.AutoservVerifyError) as ctx:
    400             tpm_verifier.verify(mock_host)
    401         self.assertEqual('Could not determine TPM status',
    402                          ctx.exception.message)
    403 
    404 
    405 if __name__ == '__main__':
    406     unittest.main()
    407