Home | History | Annotate | Download | only in platform_CrosDisksArchive
      1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 import shutil
      8 import zipfile
      9 
     10 from autotest_lib.client.bin import test
     11 from autotest_lib.client.common_lib import autotemp, error
     12 from autotest_lib.client.cros.cros_disks import CrosDisksTester
     13 from autotest_lib.client.cros.cros_disks import VirtualFilesystemImage
     14 from autotest_lib.client.cros.cros_disks import DefaultFilesystemTestContent
     15 from collections import deque
     16 
     17 
     18 class CrosDisksArchiveTester(CrosDisksTester):
     19     """A tester to verify archive support in CrosDisks.
     20     """
     21     def __init__(self, test, archive_types):
     22         super(CrosDisksArchiveTester, self).__init__(test)
     23         self._data_dir = os.path.join(test.bindir, 'data')
     24         self._archive_types = archive_types
     25 
     26     def _find_all_files(self, root_dir):
     27         """Returns all files under a directory and its sub-directories.
     28 
     29            This is a generator that performs a breadth-first-search of
     30            all files under a specified directory and its sub-directories.
     31 
     32         Args:
     33             root_dir: The root directory where the search starts from.
     34         Yields:
     35             Path of any found file relative to the root directory.
     36         """
     37         dirs_to_explore = deque([''])
     38         while len(dirs_to_explore) > 0:
     39             current_dir = dirs_to_explore.popleft()
     40             for path in os.listdir(os.path.join(root_dir, current_dir)):
     41                 expanded_path = os.path.join(root_dir, current_dir, path)
     42                 relative_path = os.path.join(current_dir, path)
     43                 if os.path.isdir(expanded_path):
     44                     dirs_to_explore.append(relative_path)
     45                 else:
     46                     yield relative_path
     47 
     48     def _make_zip_archive(self, archive_path, root_dir,
     49                          compression=zipfile.ZIP_DEFLATED):
     50         """Archives a specified directory into a ZIP file.
     51 
     52            The created ZIP file contains all files and sub-directories
     53            under the specified root directory, but not the root directory
     54            itself.
     55 
     56         Args:
     57             archive_path: Path of the output archive.
     58             root_dir: The root directory to archive.
     59             compression: The ZIP compression method.
     60         """
     61         # ZipFile in Python 2.6 does not work with the 'with' statement.
     62         archive = zipfile.ZipFile(archive_path, 'w', compression)
     63         for path in self._find_all_files(root_dir):
     64             archive.write(os.path.join(root_dir, path), path)
     65         archive.close()
     66 
     67     def _make_rar_archive(self, archive_path, root_dir):
     68         """Archives a specified directory into a RAR file.
     69 
     70            The created RAR file contains all files and sub-directories
     71            under the specified root directory, but not the root directory
     72            itself.
     73 
     74         Args:
     75             archive_path: Path of the output archive.
     76             root_dir: The root directory to archive.
     77         """
     78         # DESPICABLE HACK: app-arch/rar provides only pre-compiled rar binaries
     79         # for x86/amd64. As a workaround, we pretend the RAR creation here
     80         # using a precanned RAR file.
     81         shutil.copyfile(os.path.join(self._data_dir, 'test.rar'), archive_path)
     82 
     83     def _make_archive(self, archive_type, archive_path, root_dir):
     84         """Archives a specified directory into an archive of specified type.
     85 
     86            The created archive file contains all files and sub-directories
     87            under the specified root directory, but not the root directory
     88            itself.
     89 
     90         Args:
     91             archive_type: Type of the output archive.
     92             archive_path: Path of the output archive.
     93             root_dir: The root directory to archive.
     94         """
     95         if archive_type in ['zip']:
     96             self._make_zip_archive(archive_path, root_dir)
     97         elif archive_type in ['rar']:
     98             self._make_rar_archive(archive_path, root_dir)
     99         else:
    100             raise error.TestFail("Unsupported archive type " + archive_type)
    101 
    102     def _test_archive(self, archive_type):
    103         # Create the archive file content in a temporary directory.
    104         archive_dir = autotemp.tempdir(unique_id='CrosDisks')
    105         test_content = DefaultFilesystemTestContent()
    106         if not test_content.create(archive_dir.name):
    107             raise error.TestFail("Failed to create archive test content")
    108 
    109         # Create a FAT-formatted virtual filesystem image containing an
    110         # archive file to help stimulate mounting an archive file on a
    111         # removable drive.
    112         with VirtualFilesystemImage(
    113                 block_size=1024,
    114                 block_count=65536,
    115                 filesystem_type='vfat',
    116                 mkfs_options=[ '-F', '32', '-n', 'ARCHIVE' ]) as image:
    117             image.format()
    118             image.mount(options=['sync'])
    119             # Create the archive file on the virtual filesystem image.
    120             archive_name = 'test.' + archive_type
    121             archive_path = os.path.join(image.mount_dir, archive_name)
    122             self._make_archive(archive_type, archive_path, archive_dir.name)
    123             image.unmount()
    124 
    125             # Mount the virtual filesystem image via CrosDisks.
    126             device_file = image.loop_device
    127             self.cros_disks.mount(device_file, '',
    128                                   [ "ro", "nodev", "noexec", "nosuid" ])
    129             result = self.cros_disks.expect_mount_completion({
    130                 'status': 0,
    131                 'source_path': device_file
    132             })
    133 
    134             # Mount the archive file on the mounted filesystem via CrosDisks.
    135             archive_path = os.path.join(result['mount_path'], archive_name)
    136             expected_mount_path = os.path.join('/media/archive', archive_name)
    137             self.cros_disks.mount(archive_path)
    138             result = self.cros_disks.expect_mount_completion({
    139                 'status': 0,
    140                 'source_path': archive_path,
    141                 'mount_path': expected_mount_path
    142             })
    143 
    144             # Verify the content of the mounted archive file.
    145             if not test_content.verify(expected_mount_path):
    146                 raise error.TestFail("Failed to verify filesystem test content")
    147 
    148             self.cros_disks.unmount(expected_mount_path, ['lazy'])
    149             self.cros_disks.unmount(device_file, ['lazy'])
    150 
    151     def test_archives(self):
    152         for archive_type in self._archive_types:
    153             self._test_archive(archive_type)
    154 
    155     def get_tests(self):
    156         return [self.test_archives]
    157 
    158 
    159 class platform_CrosDisksArchive(test.test):
    160     version = 1
    161 
    162     def run_once(self, *args, **kwargs):
    163         tester = CrosDisksArchiveTester(self, kwargs['archive_types'])
    164         tester.run(*args, **kwargs)
    165