Home | History | Annotate | Download | only in common_lib
      1 #!/usr/bin/python
      2 
      3 import logging, os, select, StringIO, subprocess, sys, unittest
      4 import common
      5 from autotest_lib.client.common_lib import logging_manager, logging_config
      6 
      7 
      8 class PipedStringIO(object):
      9     """
     10     Like StringIO, but all I/O passes through a pipe.  This means a
     11     PipedStringIO is backed by a file descriptor is thus can do things like
     12     pass down to a subprocess.  However, this means the creating process must
     13     call read_pipe() (or the classmethod read_all_pipes()) periodically to read
     14     the pipe, and must call close() (or the classmethod cleanup()) to close the
     15     pipe.
     16     """
     17     _instances = set()
     18 
     19     def __init__(self):
     20         self._string_io = StringIO.StringIO()
     21         self._read_end, self._write_end = os.pipe()
     22         PipedStringIO._instances.add(self)
     23 
     24 
     25     def close(self):
     26         self._string_io.close()
     27         os.close(self._read_end)
     28         os.close(self._write_end)
     29         PipedStringIO._instances.remove(self)
     30 
     31 
     32     def write(self, data):
     33         os.write(self._write_end, data)
     34 
     35 
     36     def flush(self):
     37         pass
     38 
     39 
     40     def fileno(self):
     41         return self._write_end
     42 
     43 
     44     def getvalue(self):
     45         self.read_pipe()
     46         return self._string_io.getvalue()
     47 
     48 
     49     def read_pipe(self):
     50         while True:
     51             read_list, _, _ = select.select([self._read_end], [], [], 0)
     52             if not read_list:
     53                 return
     54             self._string_io.write(os.read(self._read_end, 1024))
     55 
     56 
     57     @classmethod
     58     def read_all_pipes(cls):
     59         for instance in cls._instances:
     60             instance.read_pipe()
     61 
     62 
     63     @classmethod
     64     def cleanup_all_instances(cls):
     65         for instance in list(cls._instances):
     66             instance.close()
     67 
     68 
     69 LOGGING_FORMAT = '%(levelname)s: %(message)s'
     70 
     71 _EXPECTED_STDOUT = """\
     72 print 1
     73 system 1
     74 INFO: logging 1
     75 INFO: print 2
     76 INFO: system 2
     77 INFO: logging 2
     78 INFO: print 6
     79 INFO: system 6
     80 INFO: logging 6
     81 print 7
     82 system 7
     83 INFO: logging 7
     84 """
     85 
     86 _EXPECTED_LOG1 = """\
     87 INFO: print 3
     88 INFO: system 3
     89 INFO: logging 3
     90 INFO: print 4
     91 INFO: system 4
     92 INFO: logging 4
     93 INFO: print 5
     94 INFO: system 5
     95 INFO: logging 5
     96 """
     97 
     98 _EXPECTED_LOG2 = """\
     99 INFO: print 4
    100 INFO: system 4
    101 INFO: logging 4
    102 """
    103 
    104 
    105 class DummyLoggingConfig(logging_config.LoggingConfig):
    106     console_formatter = logging.Formatter(LOGGING_FORMAT)
    107 
    108     def __init__(self):
    109         super(DummyLoggingConfig, self).__init__()
    110         self.log = PipedStringIO()
    111 
    112 
    113     def add_debug_file_handlers(self, log_dir, log_name=None):
    114         self.logger.addHandler(logging.StreamHandler(self.log))
    115 
    116 
    117 # this isn't really a unit test since it creates subprocesses and pipes and all
    118 # that. i just use the unittest framework because it's convenient.
    119 class LoggingManagerTest(unittest.TestCase):
    120     def setUp(self):
    121         self.stdout = PipedStringIO()
    122         self._log1 = PipedStringIO()
    123         self._log2 = PipedStringIO()
    124 
    125         self._real_system_calls = False
    126 
    127         # the LoggingManager will change self.stdout (by design), so keep a
    128         # copy around
    129         self._original_stdout = self.stdout
    130 
    131         # clear out import-time logging config and reconfigure
    132         root_logger = logging.getLogger()
    133         for handler in list(root_logger.handlers):
    134             root_logger.removeHandler(handler)
    135         # use INFO to ignore debug output from logging_manager itself
    136         logging.basicConfig(level=logging.INFO, format=LOGGING_FORMAT,
    137                             stream=self.stdout)
    138 
    139         self._config_object = DummyLoggingConfig()
    140         logging_manager.LoggingManager.logging_config_object = (
    141                 self._config_object)
    142 
    143 
    144     def tearDown(self):
    145         PipedStringIO.cleanup_all_instances()
    146 
    147 
    148     def _say(self, suffix):
    149         print >>self.stdout, 'print %s' % suffix
    150         if self._real_system_calls:
    151             os.system('echo system %s >&%s' % (suffix,
    152                                                self._original_stdout.fileno()))
    153         else:
    154             print >>self.stdout, 'system %s' % suffix
    155         logging.info('logging %s', suffix)
    156         PipedStringIO.read_all_pipes()
    157 
    158 
    159     def _setup_manager(self, manager_class=logging_manager.LoggingManager):
    160         def set_stdout(file_object):
    161             self.stdout = file_object
    162         manager = manager_class()
    163         manager.manage_stream(self.stdout, logging.INFO, set_stdout)
    164         return manager
    165 
    166 
    167     def _run_test(self, manager_class):
    168         manager = self._setup_manager(manager_class)
    169 
    170         self._say(1)
    171 
    172         manager.start_logging()
    173         self._say(2)
    174 
    175         manager.redirect_to_stream(self._log1)
    176         self._say(3)
    177 
    178         manager.tee_redirect_to_stream(self._log2)
    179         self._say(4)
    180 
    181         manager.undo_redirect()
    182         self._say(5)
    183 
    184         manager.undo_redirect()
    185         self._say(6)
    186 
    187         manager.stop_logging()
    188         self._say(7)
    189 
    190 
    191     def _grab_fd_info(self):
    192         command = 'ls -l /proc/%s/fd' % os.getpid()
    193         proc = subprocess.Popen(command.split(), shell=True,
    194                                 stdout=subprocess.PIPE)
    195         return proc.communicate()[0]
    196 
    197 
    198     def _compare_logs(self, log_buffer, expected_text):
    199         actual_lines = log_buffer.getvalue().splitlines()
    200         expected_lines = expected_text.splitlines()
    201         if self._real_system_calls:
    202             # because of the many interacting processes, we can't ensure perfect
    203             # interleaving.  so compare sets of lines rather than ordered lines.
    204             actual_lines = set(actual_lines)
    205             expected_lines = set(expected_lines)
    206         self.assertEquals(actual_lines, expected_lines)
    207 
    208 
    209     def _check_results(self):
    210         # ensure our stdout was restored
    211         self.assertEquals(self.stdout, self._original_stdout)
    212 
    213         if self._real_system_calls:
    214             # ensure FDs were left in their original state
    215             self.assertEquals(self._grab_fd_info(), self._fd_info)
    216 
    217         self._compare_logs(self.stdout, _EXPECTED_STDOUT)
    218         self._compare_logs(self._log1, _EXPECTED_LOG1)
    219         self._compare_logs(self._log2, _EXPECTED_LOG2)
    220 
    221 
    222     def test_logging_manager(self):
    223         self._run_test(logging_manager.LoggingManager)
    224         self._check_results()
    225 
    226 
    227     def test_fd_redirection_logging_manager(self):
    228         self._real_system_calls = True
    229         self._fd_info = self._grab_fd_info()
    230         self._run_test(logging_manager.FdRedirectionLoggingManager)
    231         self._check_results()
    232 
    233 
    234     def test_tee_redirect_debug_dir(self):
    235         manager = self._setup_manager()
    236         manager.start_logging()
    237 
    238         manager.tee_redirect_debug_dir('/fake/dir', tag='mytag')
    239         print >>self.stdout, 'hello'
    240 
    241         manager.undo_redirect()
    242         print >>self.stdout, 'goodbye'
    243 
    244         manager.stop_logging()
    245 
    246         self._compare_logs(self.stdout,
    247                            'INFO: mytag : hello\nINFO: goodbye')
    248         self._compare_logs(self._config_object.log, 'hello\n')
    249 
    250 
    251 class MonkeyPatchTestCase(unittest.TestCase):
    252     def setUp(self):
    253         filename = os.path.split(__file__)[1]
    254         if filename.endswith('.pyc'):
    255             filename = filename[:-1]
    256         self.expected_filename = filename
    257 
    258 
    259     def check_filename(self, filename, expected=None):
    260         if expected is None:
    261             expected = [self.expected_filename]
    262         self.assertIn(os.path.split(filename)[1], expected)
    263 
    264 
    265     def _0_test_find_caller(self):
    266         finder = logging_manager._logging_manager_aware_logger__find_caller
    267         filename, lineno, caller_name = finder(logging_manager.logger)
    268         self.check_filename(filename)
    269         self.assertEquals('test_find_caller', caller_name)
    270 
    271 
    272     def _1_test_find_caller(self):
    273         self._0_test_find_caller()
    274 
    275 
    276     def test_find_caller(self):
    277         self._1_test_find_caller()
    278 
    279 
    280     def _0_test_non_reported_find_caller(self):
    281         finder = logging_manager._logging_manager_aware_logger__find_caller
    282         filename, lineno, caller_name = finder(logging_manager.logger)
    283         # Python 2.4 unittest implementation will call the unittest method in
    284         # file 'unittest.py', and Python >= 2.6 does the same in 'case.py'
    285         self.check_filename(filename, expected=['unittest.py', 'case.py'])
    286 
    287 
    288     def _1_test_non_reported_find_caller(self):
    289         self._0_test_non_reported_find_caller()
    290 
    291 
    292     @logging_manager.do_not_report_as_logging_caller
    293     def test_non_reported_find_caller(self):
    294         self._1_test_non_reported_find_caller()
    295 
    296 
    297 
    298 if __name__ == '__main__':
    299     unittest.main()
    300