Home | History | Annotate | Download | only in platform_CrosDisksArchive
      1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 import tarfile
      8 import zipfile
      9 
     10 from autotest_lib.client.bin import test
     11 from autotest_lib.client.common_lib import autotemp, error
     12 from autotest_lib.client.cros.cros_disks import CrosDisksTester
     13 from autotest_lib.client.cros.cros_disks import VirtualFilesystemImage
     14 from autotest_lib.client.cros.cros_disks import DefaultFilesystemTestContent
     15 from collections import deque
     16 
     17 
     18 class CrosDisksArchiveTester(CrosDisksTester):
     19     """A tester to verify archive support in CrosDisks.
     20     """
     21     def __init__(self, test, archive_types):
     22         super(CrosDisksArchiveTester, self).__init__(test)
     23         self._archive_types = archive_types
     24 
     25     def _find_all_files(self, root_dir):
     26         """Returns all files under a directory and its sub-directories.
     27 
     28            This is a generator that performs a breadth-first-search of
     29            all files under a specified directory and its sub-directories.
     30 
     31         Args:
     32             root_dir: The root directory where the search starts from.
     33         Yields:
     34             Path of any found file relative to the root directory.
     35         """
     36         dirs_to_explore = deque([''])
     37         while len(dirs_to_explore) > 0:
     38             current_dir = dirs_to_explore.popleft()
     39             for path in os.listdir(os.path.join(root_dir, current_dir)):
     40                 expanded_path = os.path.join(root_dir, current_dir, path)
     41                 relative_path = os.path.join(current_dir, path)
     42                 if os.path.isdir(expanded_path):
     43                     dirs_to_explore.append(relative_path)
     44                 else:
     45                     yield relative_path
     46 
     47     def _make_tar_archive(self, archive_path, root_dir, compression=None):
     48         """Archives a specified directory into a tar file.
     49 
     50            The created tar file contains all files and sub-directories
     51            under the specified root directory, but not the root directory
     52            itself.
     53 
     54         Args:
     55             archive_path: Path of the output archive.
     56             root_dir: The root directory to archive.
     57             compression: The compression method: None, 'gz', 'bz2'
     58         """
     59         mode = 'w:' + compression if compression else 'w'
     60         # TarFile in Python 2.6 does not work with the 'with' statement.
     61         archive = tarfile.open(archive_path, mode)
     62         for path in self._find_all_files(root_dir):
     63             archive.add(os.path.join(root_dir, path), path)
     64         archive.close()
     65 
     66     def _make_zip_archive(self, archive_path, root_dir,
     67                          compression=zipfile.ZIP_DEFLATED):
     68         """Archives a specified directory into a ZIP file.
     69 
     70            The created ZIP file contains all files and sub-directories
     71            under the specified root directory, but not the root directory
     72            itself.
     73 
     74         Args:
     75             archive_path: Path of the output archive.
     76             root_dir: The root directory to archive.
     77             compression: The ZIP compression method.
     78         """
     79         # ZipFile in Python 2.6 does not work with the 'with' statement.
     80         archive = zipfile.ZipFile(archive_path, 'w', compression)
     81         for path in self._find_all_files(root_dir):
     82             archive.write(os.path.join(root_dir, path), path)
     83         archive.close()
     84 
     85     def _make_archive(self, archive_type, archive_path, root_dir):
     86         """Archives a specified directory into an archive of specified type.
     87 
     88            The created archive file contains all files and sub-directories
     89            under the specified root directory, but not the root directory
     90            itself.
     91 
     92         Args:
     93             archive_type: Type of the output archive.
     94             archive_path: Path of the output archive.
     95             root_dir: The root directory to archive.
     96         """
     97         if archive_type in ['zip']:
     98             self._make_zip_archive(archive_path, root_dir)
     99         elif archive_type in ['tar']:
    100             self._make_tar_archive(archive_path, root_dir)
    101         elif archive_type in ['tar.gz', 'tgz']:
    102             self._make_tar_archive(archive_path, root_dir, 'gz')
    103         elif archive_type in ['tar.bz2', 'tbz', 'tbz2']:
    104             self._make_tar_archive(archive_path, root_dir, 'bz2')
    105         else:
    106             raise error.TestFail("Unsupported archive type " + archive_type)
    107 
    108     def _test_archive(self, archive_type):
    109         # Create the archive file content in a temporary directory.
    110         archive_dir = autotemp.tempdir(unique_id='CrosDisks')
    111         test_content = DefaultFilesystemTestContent()
    112         if not test_content.create(archive_dir.name):
    113             raise error.TestFail("Failed to create archive test content")
    114 
    115         # Create a FAT-formatted virtual filesystem image containing an
    116         # archive file to help stimulate mounting an archive file on a
    117         # removable drive.
    118         with VirtualFilesystemImage(
    119                 block_size=1024,
    120                 block_count=65536,
    121                 filesystem_type='vfat',
    122                 mkfs_options=[ '-F', '32', '-n', 'ARCHIVE' ]) as image:
    123             image.format()
    124             image.mount(options=['sync'])
    125             # Create the archive file on the virtual filesystem image.
    126             archive_name = 'test.' + archive_type
    127             archive_path = os.path.join(image.mount_dir, archive_name)
    128             self._make_archive(archive_type, archive_path, archive_dir.name)
    129             image.unmount()
    130 
    131             # Mount the virtual filesystem image via CrosDisks.
    132             device_file = image.loop_device
    133             self.cros_disks.mount(device_file, '',
    134                                   [ "ro", "nodev", "noexec", "nosuid" ])
    135             result = self.cros_disks.expect_mount_completion({
    136                 'status': 0,
    137                 'source_path': device_file
    138             })
    139 
    140             # Mount the archive file on the mounted filesystem via CrosDisks.
    141             archive_path = os.path.join(result['mount_path'], archive_name)
    142             expected_mount_path = os.path.join('/media/archive', archive_name)
    143             self.cros_disks.mount(archive_path)
    144             result = self.cros_disks.expect_mount_completion({
    145                 'status': 0,
    146                 'source_path': archive_path,
    147                 'mount_path': expected_mount_path
    148             })
    149 
    150             # Verify the content of the mounted archive file.
    151             if not test_content.verify(expected_mount_path):
    152                 raise error.TestFail("Failed to verify filesystem test content")
    153 
    154             self.cros_disks.unmount(expected_mount_path, ['force'])
    155             self.cros_disks.unmount(device_file, ['force'])
    156 
    157     def test_archives(self):
    158         for archive_type in self._archive_types:
    159             self._test_archive(archive_type)
    160 
    161     def get_tests(self):
    162         return [self.test_archives]
    163 
    164 
    165 class platform_CrosDisksArchive(test.test):
    166     version = 1
    167 
    168     def run_once(self, *args, **kwargs):
    169         tester = CrosDisksArchiveTester(self, kwargs['archive_types'])
    170         tester.run(*args, **kwargs)
    171