Home | History | Annotate | Download | only in bin
      1 #!/usr/bin/python
      2 
      3 """Tests for site_sysinfo."""
      4 
      5 __author__ = 'dshi (at] google.com (Dan Shi)'
      6 
      7 import cPickle as pickle
      8 import filecmp
      9 import os
     10 import random
     11 import shutil
     12 import tempfile
     13 import unittest
     14 
     15 import common
     16 from autotest_lib.client.bin import site_sysinfo
     17 from autotest_lib.client.common_lib import autotemp
     18 
     19 
     20 class diffable_logdir_test(unittest.TestCase):
     21     """Tests for methods in class diffable_logdir."""
     22 
     23 
     24     def setUp(self):
     25         """Initialize a temp direcotry with test files."""
     26         self.tempdir = autotemp.tempdir(unique_id='diffable_logdir')
     27         self.src_dir = os.path.join(self.tempdir.name, 'src')
     28         self.dest_dir = os.path.join(self.tempdir.name, 'dest')
     29 
     30         self.existing_files = ['existing_file_'+str(i) for i in range(3)]
     31         self.existing_files_folder = ['', 'sub', 'sub/sub2']
     32         self.existing_files_path = [os.path.join(self.src_dir, folder, f)
     33                                     for f,folder in zip(self.existing_files,
     34                                                 self.existing_files_folder)]
     35         self.new_files = ['new_file_'+str(i) for i in range(2)]
     36         self.new_files_folder = ['sub', 'sub/sub3']
     37         self.new_files_path = [os.path.join(self.src_dir, folder, f)
     38                                     for f,folder in zip(self.new_files,
     39                                                 self.new_files_folder)]
     40 
     41         # Create some file with random data in source directory.
     42         for p in self.existing_files_path:
     43             self.append_text_to_file(str(random.random()), p)
     44 
     45         self.existing_fifo_path = os.path.join(
     46             self.src_dir,'sub/sub2/existing_fifo')
     47         os.mkfifo(self.existing_fifo_path)
     48 
     49 
     50     def tearDown(self):
     51         """Clearn up."""
     52         self.tempdir.clean()
     53 
     54 
     55     def append_text_to_file(self, text, file_path):
     56         """Append text to the end of a file, create the file if not existed.
     57 
     58         @param text: text to be appended to a file.
     59         @param file_path: path to the file.
     60 
     61         """
     62         dir_name = os.path.dirname(file_path)
     63         if not os.path.exists(dir_name):
     64             os.makedirs(dir_name)
     65         with open(file_path, 'a') as f:
     66             f.write(text)
     67 
     68 
     69     def assert_trees_equal(self, dir1, dir2, ignore=None):
     70         """Assert two directory trees contain the same files.
     71 
     72         @param dir1: the left comparison directory.
     73         @param dir2: the right comparison directory.
     74         @param ignore: filenames to ignore (in any directory).
     75 
     76         """
     77         dircmps = []
     78         dircmps.append(filecmp.dircmp(dir1, dir2, ignore))
     79         while dircmps:
     80             dcmp = dircmps.pop()
     81             self.assertEqual(dcmp.left_list, dcmp.right_list)
     82             self.assertEqual([], dcmp.diff_files)
     83             dircmps.extend(dcmp.subdirs.values())
     84 
     85 
     86     def test_diffable_logdir_success(self):
     87         """Test the diff function to save new data from a directory."""
     88         info = site_sysinfo.diffable_logdir(self.src_dir,
     89                                             keep_file_hierarchy=False,
     90                                             append_diff_in_name=False)
     91         # Run the first time to collect file status.
     92         info.run(log_dir=None, collect_init_status=True)
     93 
     94         # Add new files to the test directory.
     95         for file_name, file_path in zip(self.new_files,
     96                                          self.new_files_path):
     97             self.append_text_to_file(file_name, file_path)
     98 
     99         # Temp file for existing_file_2, used to hold on the inode. If the
    100         # file is deleted and recreated, its inode might not change.
    101         existing_file_2 = self.existing_files_path[2]
    102         existing_file_2_tmp =  existing_file_2 + '_tmp'
    103         os.rename(existing_file_2, existing_file_2_tmp)
    104 
    105         # Append data to existing file.
    106         for file_name, file_path in zip(self.existing_files,
    107                                          self.existing_files_path):
    108             self.append_text_to_file(file_name, file_path)
    109 
    110         # Remove the tmp file.
    111         os.remove(existing_file_2_tmp)
    112 
    113         # Add a new FIFO
    114         new_fifo_path = os.path.join(self.src_dir, 'sub/sub2/new_fifo')
    115         os.mkfifo(new_fifo_path)
    116 
    117         # Run the second time to do diff.
    118         info.run(self.dest_dir, collect_init_status=False, collect_all=True)
    119 
    120         # Validate files in dest_dir.
    121         for file_name, file_path in zip(self.existing_files+self.new_files,
    122                                 self.existing_files_path+self.new_files_path):
    123             file_path = file_path.replace('src', 'dest')
    124             with open(file_path, 'r') as f:
    125                 self.assertEqual(file_name, f.read())
    126 
    127         # Assert that FIFOs are not in the diff.
    128         self.assertFalse(
    129                 os.path.exists(self.existing_fifo_path.replace('src', 'dest')),
    130                 msg='Existing FIFO present in diff sysinfo')
    131         self.assertFalse(
    132                 os.path.exists(new_fifo_path.replace('src', 'dest')),
    133                 msg='New FIFO present in diff sysinfo')
    134 
    135         # With collect_all=True, full sysinfo should also be present.
    136         full_sysinfo_path = self.dest_dir + self.src_dir
    137         self.assertTrue(os.path.exists(full_sysinfo_path),
    138                         msg='Full sysinfo not present')
    139 
    140         # Assert that the full sysinfo is present.
    141         self.assertNotEqual(self.src_dir, full_sysinfo_path)
    142         self.assert_trees_equal(self.src_dir, full_sysinfo_path)
    143 
    144 
    145 class LogdirTestCase(unittest.TestCase):
    146     """Tests logdir.run"""
    147 
    148     def setUp(self):
    149         self.tempdir = tempfile.mkdtemp()
    150         self.addCleanup(shutil.rmtree, self.tempdir)
    151 
    152         self.from_dir = os.path.join(self.tempdir, 'from')
    153         self.to_dir = os.path.join(self.tempdir, 'to')
    154         os.mkdir(self.from_dir)
    155         os.mkdir(self.to_dir)
    156 
    157     def _destination_path(self, relative_path, from_dir=None):
    158         """The expected destination path for a subdir of the source directory"""
    159         if from_dir is None:
    160             from_dir = self.from_dir
    161         return '%s%s' % (self.to_dir, os.path.join(from_dir, relative_path))
    162 
    163     def test_run_recreates_absolute_source_path(self):
    164         """When copying files, the absolute path from the source is recreated
    165         in the destination folder.
    166         """
    167         os.mkdir(os.path.join(self.from_dir, 'fubar'))
    168         logdir = site_sysinfo.logdir(self.from_dir)
    169         logdir.run(self.to_dir)
    170         destination_path= self._destination_path('fubar')
    171         self.assertTrue(os.path.exists(destination_path),
    172                         msg='Failed to copy to %s' % destination_path)
    173 
    174     def test_run_skips_symlinks(self):
    175         os.mkdir(os.path.join(self.from_dir, 'real'))
    176         os.symlink(os.path.join(self.from_dir, 'real'),
    177                    os.path.join(self.from_dir, 'symlink'))
    178 
    179         logdir = site_sysinfo.logdir(self.from_dir)
    180         logdir.run(self.to_dir)
    181 
    182         destination_path_real = self._destination_path('real')
    183         self.assertTrue(os.path.exists(destination_path_real),
    184                         msg='real directory was not copied to %s' %
    185                         destination_path_real)
    186         destination_path_link = self._destination_path('symlink')
    187         self.assertFalse(
    188                 os.path.exists(destination_path_link),
    189                 msg='symlink was copied to %s' % destination_path_link)
    190 
    191     def test_run_resolves_symlinks_to_source_root(self):
    192         """run tries hard to get to the source directory before copying.
    193 
    194         Within the source folder, we skip any symlinks, but we first try to
    195         resolve symlinks to the source root itself.
    196         """
    197         os.mkdir(os.path.join(self.from_dir, 'fubar'))
    198         from_symlink = os.path.join(self.tempdir, 'from_symlink')
    199         os.symlink(self.from_dir, from_symlink)
    200 
    201         logdir = site_sysinfo.logdir(from_symlink)
    202         logdir.run(self.to_dir)
    203 
    204         destination_path = self._destination_path('fubar')
    205         self.assertTrue(os.path.exists(destination_path),
    206                         msg='Failed to copy to %s' % destination_path)
    207 
    208     def test_run_excludes_common_patterns(self):
    209         os.mkdir(os.path.join(self.from_dir, 'autoserv2344'))
    210         # Create empty file.
    211         open(os.path.join(self.from_dir, 'system.journal'), 'w').close()
    212         deeper_subdir = os.path.join('prefix', 'autoserv', 'suffix')
    213         os.makedirs(os.path.join(self.from_dir, deeper_subdir))
    214 
    215         logdir = site_sysinfo.logdir(self.from_dir)
    216         logdir.run(self.to_dir)
    217 
    218         destination_path = self._destination_path('autoserv2344')
    219         self.assertFalse(os.path.exists(destination_path),
    220                          msg='Copied banned file to %s' % destination_path)
    221         destination_path = self._destination_path(deeper_subdir)
    222         self.assertFalse(os.path.exists(destination_path),
    223                          msg='Copied banned file to %s' % destination_path)
    224         destination_path = self._destination_path('system.journal')
    225         self.assertFalse(os.path.exists(destination_path),
    226                          msg='Copied banned file to %s' % destination_path)
    227 
    228     def test_run_ignores_exclude_patterns_in_leading_dirs(self):
    229         """Exclude patterns should only be applied to path suffixes within
    230         from_dir, not to the root directory itself.
    231         """
    232         exclude_pattern_dir = os.path.join(self.from_dir, 'autoserv2344')
    233         os.makedirs(os.path.join(exclude_pattern_dir, 'fubar'))
    234         logdir = site_sysinfo.logdir(exclude_pattern_dir)
    235         logdir.run(self.to_dir)
    236         destination_path = self._destination_path('fubar',
    237                                                   from_dir=exclude_pattern_dir)
    238         self.assertTrue(os.path.exists(destination_path),
    239                         msg='Failed to copy to %s' % destination_path)
    240 
    241     def test_pickle_unpickle_equal(self):
    242         """Sanity check pickle-unpickle round-trip."""
    243         logdir = site_sysinfo.logdir(
    244                 self.from_dir,
    245                 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
    246         # base_job uses protocol 2 to pickle. We follow suit.
    247         logdir_pickle = pickle.dumps(logdir, protocol=2)
    248         unpickled_logdir = pickle.loads(logdir_pickle)
    249 
    250         self.assertEqual(unpickled_logdir, logdir)
    251 
    252     def test_pickle_enforce_required_attributes(self):
    253         """Some attributes from this object should never be dropped.
    254 
    255         When running a client test against an older build, we pickle the objects
    256         of this class from newer version of the class and unpicle to older
    257         versions of the class. The older versions require some attributes to be
    258         present.
    259         """
    260         logdir = site_sysinfo.logdir(
    261                 self.from_dir,
    262                 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
    263         # base_job uses protocol 2 to pickle. We follow suit.
    264         logdir_pickle = pickle.dumps(logdir, protocol=2)
    265         logdir = pickle.loads(logdir_pickle)
    266 
    267         self.assertEqual(logdir.additional_exclude, 'a')
    268 
    269     def test_pickle_enforce_required_attributes_default(self):
    270         """Some attributes from this object should never be dropped.
    271 
    272         When running a client test against an older build, we pickle the objects
    273         of this class from newer version of the class and unpicle to older
    274         versions of the class. The older versions require some attributes to be
    275         present.
    276         """
    277         logdir = site_sysinfo.logdir(self.from_dir)
    278         # base_job uses protocol 2 to pickle. We follow suit.
    279         logdir_pickle = pickle.dumps(logdir, protocol=2)
    280         logdir = pickle.loads(logdir_pickle)
    281 
    282         self.assertEqual(logdir.additional_exclude, None)
    283 
    284     def test_unpickle_handle_missing__excludes(self):
    285         """Sanely handle missing _excludes attribute from pickles
    286 
    287         This can happen when running brand new version of this class that
    288         introduced this attribute from older server side code in prod.
    289         """
    290         logdir = site_sysinfo.logdir(self.from_dir)
    291         delattr(logdir, '_excludes')
    292         # base_job uses protocol 2 to pickle. We follow suit.
    293         logdir_pickle = pickle.dumps(logdir, protocol=2)
    294         logdir = pickle.loads(logdir_pickle)
    295 
    296         self.assertEqual(logdir._excludes,
    297                          site_sysinfo.logdir.DEFAULT_EXCLUDES)
    298 
    299     def test_unpickle_handle_missing__excludes_default(self):
    300         """Sanely handle missing _excludes attribute from pickles
    301 
    302         This can happen when running brand new version of this class that
    303         introduced this attribute from older server side code in prod.
    304         """
    305         logdir = site_sysinfo.logdir(
    306                 self.from_dir,
    307                 excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
    308         delattr(logdir, '_excludes')
    309         # base_job uses protocol 2 to pickle. We follow suit.
    310         logdir_pickle = pickle.dumps(logdir, protocol=2)
    311         logdir = pickle.loads(logdir_pickle)
    312 
    313         self.assertEqual(
    314                 logdir._excludes,
    315                 (site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
    316 
    317 
    318 if __name__ == '__main__':
    319     unittest.main()
    320