Home | History | Annotate | Download | only in platform_CrosDisksSshfs
      1 # Copyright 2018 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import base64
      6 import logging
      7 import json
      8 import os
      9 import tempfile
     10 
     11 from autotest_lib.client.bin import test
     12 from autotest_lib.client.bin import utils
     13 from autotest_lib.client.common_lib import error
     14 from autotest_lib.client.cros.cros_disks import CrosDisksTester
     15 
     16 
     17 def try_remove(filename):
     18     try:
     19         os.remove(filename)
     20         return True
     21     except OSError:
     22         return False
     23 
     24 
     25 class CrosDisksFuseTester(CrosDisksTester):
     26     """Common steps for all FUSE-based tests.
     27     """
     28     def __init__(self, test, test_configs):
     29         super(CrosDisksFuseTester, self).__init__(test)
     30         self._test_configs = test_configs
     31 
     32     def setup_test_case(self, config):
     33         pass
     34 
     35     def teardown_test_case(self, config):
     36         pass
     37 
     38     def verify_test_case(self, config, mount_result):
     39         pass
     40 
     41     def _test_case(self, config):
     42         logging.info('Testing "%s"', config['description'])
     43         self.setup_test_case(config)
     44         try:
     45             source = config['test_mount_source_uri']
     46             fstype = config.get('test_mount_filesystem_type')
     47             options = config.get('test_mount_options', [])
     48             expected_mount_completion = {
     49                 'status': config['expected_mount_status'],
     50                 'source_path': source,
     51             }
     52             if 'expected_mount_path' in config:
     53                 expected_mount_completion['mount_path'] = \
     54                     config['expected_mount_path']
     55 
     56             self.cros_disks.mount(source, fstype, options)
     57             result = self.cros_disks.expect_mount_completion(
     58                     expected_mount_completion)
     59             try:
     60                 self.verify_test_case(config, result)
     61             finally:
     62                 self.cros_disks.unmount(source, ['lazy'])
     63         finally:
     64             self.teardown_test_case(config)
     65 
     66     def _run_all_test_cases(self):
     67         try:
     68             for config in self._test_configs:
     69                 self._test_case(config)
     70         except RuntimeError:
     71             cmd = 'ls -la %s' % tempfile.gettempdir()
     72             logging.debug(utils.run(cmd))
     73             raise
     74 
     75     def get_tests(self):
     76         return [self._run_all_test_cases]
     77 
     78 
     79 SSH_DIR_PATH = '/home/chronos/user/.ssh'
     80 AUTHORIZED_KEYS = os.path.join(SSH_DIR_PATH, 'authorized_keys')
     81 AUTHORIZED_KEYS_BACKUP = AUTHORIZED_KEYS + '.sshfsbak'
     82 
     83 # Some discardable SSH key.
     84 SSH_PRIVATE_KEY = '''-----BEGIN RSA PRIVATE KEY-----
     85 MIIEogIBAAKCAQEAvKhQn82O9F+SzDTYgpI+qnCD6E6cYroLvflLp8/onYdqD1xK
     86 ES4wDTGC68DNbS9tIo1hEjwbD79UltQT9NTmJg8DERUrQbNayYXtwxqZ2tSo1Hg5
     87 dpAKLd3GBhwK1Eob+bNgcqEu3iIZq+QRVtlM92Uj4vBFuy8qgvGs4x+n3lACsyk8
     88 4GZGtiFpqTPlTZ6BOEdknZpB0K3HIZ7NjZ8uD9fXJYFuUgQhQvhp1N8aZaf7JtWr
     89 GLQ8Pwq6UYEVb8veHgLVAJ9p/5ko/WNVWf1h78v95pEHSYrQ0opcSDizbquW/1Fs
     90 Fk6elrQcKctJ1FsXMxlWYOzN31yNxcPqT6rzhQIDAQABAoIBAAcD50OZ/DfgGfBY
     91 ArkQQR5LYsxPqAcPzgH5dDPASnEZKPt7PhHXetfywGCN4dWujstbIIHyFDuIrNeS
     92 +U8AX7KIml+XPu2JgtW9kjLQGWqGv+RuuAxNnONJvORbRJfSTaoCXpLEpZ6C/Btl
     93 NrPZDsCgVS5KKv2j6lvGKtyjP7XHiXIXLvlhOJkpWRk4a1IBISP8HPt2w/bG0raD
     94 CW6e6XYYPI4ZPwMlPRympQPGo8mVpNkhFAMHKnaN+E7HplsWXvb0daAVUeCBDVId
     95 QSat88e7PbK2FMsinZvsCZSrHdggS+4u1h6LjMI3GO1PYEjvrMorkHz2w1KS3n1S
     96 n4Eas50CgYEA9JR6JCauiZqJAV4azOylZaeiClkAtsK1IG98XkHyIDfn634U7o5c
     97 6w1Uf0zwxRKx12EPQhzKiYRtp+nPirMAZHmm+gJqExakDV7uJlHNo/6qY7m1Z8I7
     98 Ww/my1Oi5ASQ6Emyrpecvo8xTTl52Kf+l3mQk/EqitZLNWgkX5HdwTMCgYEAxXdh
     99 /DLRDrBz7b+lYahAvBCr+VqUxWdjiarpnC/NZmXIWsI7U0nFpf3H4JzEQVdu5gdV
    100 DKLrU8uw1dGwytgH3zA8s1VMWVg1uvVFduQk+pZeRj9ekGEHvViUEkylg3CaNCyO
    101 2Kl72VS8W/ls5uX74mFcx9fwc9jUue807+406GcCgYAjfpTHQFHeKG4vo5+SE9nh
    102 CdXrWIVRAKrWnTdYWouv/00KEQ8qm8CCYDneC6V5hEAI+M4FEzaVhIGBd94ly9qH
    103 ulvwNn98a7G9OwSmzQJiBWhm9qGMAFUq3wDoiye9nagF/gQPcHNP+Gn4Qhobxi2d
    104 gAfqYHqDEZxykL2OnRWonwKBgBvcl1+9T9ARx5mxI8WettuSQqGhTUJ5Lws6qVGX
    105 URT0oYtkwngi/ZdJMo2XsP1DN+uO90ocJrYhFGdm+dn1F08/gCERlP86OgKSHuYC
    106 lNEirFSfFlmqxyvJNsNKO0RLfAaGjvU1HLtygE096Ua/BoZPlIbCCjReUM2XWdHM
    107 u3xbAoGARqG0gGpNCY7pEjSQ33TdLEXV7O0S4hJiN+IKPS2q8q8k21X7ckkPxsEG
    108 h4dIuHzdLGZsmIXel4Mx3rvyKbboj93K3ia5rbU05keVi9duMv1PlbdQzu9mq2qu
    109 A5CmV2fYpStHZTHsv5BcYWxkhc4aAmvUJwyAzlWEhyijwFK5wSQ=
    110 -----END RSA PRIVATE KEY-----
    111 '''
    112 
    113 SSH_PUBLIC_KEY = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC8qFCfzY' \
    114         '70X5LMNNiCkj6qcIPoTpxiugu9+Uunz+idh2oPXEoRLjANMYLrwM1tL' \
    115         '20ijWESPBsPv1SW1BP01OYmDwMRFStBs1rJhe3DGpna1KjUeDl2kAot' \
    116         '3cYGHArUShv5s2ByoS7eIhmr5BFW2Uz3ZSPi8EW7LyqC8azjH6feUAK' \
    117         'zKTzgZka2IWmpM+VNnoE4R2SdmkHQrcchns2Nny4P19clgW5SBCFC+G' \
    118         'nU3xplp/sm1asYtDw/CrpRgRVvy94eAtUAn2n/mSj9Y1VZ/WHvy/3mk' \
    119         'QdJitDSilxIOLNuq5b/UWwWTp6WtBwpy0nUWxczGVZg7M3fXI3Fw+pP' \
    120         'qvOF root@localhost'
    121 
    122 
    123 class CrosDisksSshfsTester(CrosDisksFuseTester):
    124     """A tester to verify sshfs support in CrosDisks.
    125     """
    126     def __init__(self, test, test_configs):
    127         super(CrosDisksSshfsTester, self).__init__(test, test_configs)
    128 
    129     def setup_test_case(self, config):
    130         if os.path.exists(AUTHORIZED_KEYS):
    131             # Make backup of the current authorized_keys
    132             utils.run('mv -f ' + AUTHORIZED_KEYS + ' ' + AUTHORIZED_KEYS_BACKUP,
    133                       ignore_status=True)
    134 
    135         self._register_key(SSH_PUBLIC_KEY)
    136 
    137         identity = base64.b64encode(SSH_PRIVATE_KEY)
    138         known_hosts = base64.b64encode(self._generate_known_hosts())
    139 
    140         options = config.get('test_mount_options', [])
    141         options.append('IdentityBase64=' + identity)
    142         options.append('UserKnownHostsBase64=' + known_hosts)
    143         config['test_mount_options'] = options
    144 
    145     def teardown_test_case(self, config):
    146         if os.path.exists(AUTHORIZED_KEYS_BACKUP):
    147             # Restore authorized_keys from backup.
    148             utils.run('mv -f ' + AUTHORIZED_KEYS_BACKUP + ' ' + AUTHORIZED_KEYS,
    149                       ignore_status=True)
    150 
    151     def verify_test_case(self, config, mount_result):
    152         if 'expected_file' in config:
    153             f = config['expected_file']
    154             if not os.path.exists(f):
    155                 raise error.TestFail('Expected file "' + f + '" not found')
    156 
    157     def _register_key(self, pubkey):
    158         utils.run('sudo -u chronos mkdir -p ' + SSH_DIR_PATH,
    159                   ignore_status=True)
    160         utils.run('sudo -u chronos touch ' + AUTHORIZED_KEYS)
    161         with open(AUTHORIZED_KEYS, 'wb') as f:
    162             f.write(pubkey)
    163         utils.run('sudo -u chronos chmod 0600 ' + AUTHORIZED_KEYS)
    164 
    165     def _generate_known_hosts(self):
    166         hostkey = '/mnt/stateful_partition/etc/ssh/ssh_host_ed25519_key.pub'
    167         with open(hostkey, 'rb') as f:
    168             keydata = f.readline().split()
    169         return 'localhost {} {}\n'.format(keydata[0], keydata[1])
    170 
    171 
    172 class platform_CrosDisksSshfs(test.test):
    173     version = 1
    174 
    175     def run_once(self, *args, **kwargs):
    176         test_configs = []
    177         config_file = '%s/%s' % (self.bindir, kwargs['config_file'])
    178         with open(config_file, 'rb') as f:
    179             test_configs.extend(json.load(f))
    180 
    181         tester = CrosDisksSshfsTester(self, test_configs)
    182         tester.run(*args, **kwargs)
    183