1 #!/usr/bin/python 2 3 import logging, os, select, StringIO, subprocess, sys, unittest 4 import common 5 from autotest_lib.client.common_lib import logging_manager, logging_config 6 7 8 class PipedStringIO(object): 9 """ 10 Like StringIO, but all I/O passes through a pipe. This means a 11 PipedStringIO is backed by a file descriptor is thus can do things like 12 pass down to a subprocess. However, this means the creating process must 13 call read_pipe() (or the classmethod read_all_pipes()) periodically to read 14 the pipe, and must call close() (or the classmethod cleanup()) to close the 15 pipe. 16 """ 17 _instances = set() 18 19 def __init__(self): 20 self._string_io = StringIO.StringIO() 21 self._read_end, self._write_end = os.pipe() 22 PipedStringIO._instances.add(self) 23 24 25 def close(self): 26 self._string_io.close() 27 os.close(self._read_end) 28 os.close(self._write_end) 29 PipedStringIO._instances.remove(self) 30 31 32 def write(self, data): 33 os.write(self._write_end, data) 34 35 36 def flush(self): 37 pass 38 39 40 def fileno(self): 41 return self._write_end 42 43 44 def getvalue(self): 45 self.read_pipe() 46 return self._string_io.getvalue() 47 48 49 def read_pipe(self): 50 while True: 51 read_list, _, _ = select.select([self._read_end], [], [], 0) 52 if not read_list: 53 return 54 self._string_io.write(os.read(self._read_end, 1024)) 55 56 57 @classmethod 58 def read_all_pipes(cls): 59 for instance in cls._instances: 60 instance.read_pipe() 61 62 63 @classmethod 64 def cleanup_all_instances(cls): 65 for instance in list(cls._instances): 66 instance.close() 67 68 69 LOGGING_FORMAT = '%(levelname)s: %(message)s' 70 71 _EXPECTED_STDOUT = """\ 72 print 1 73 system 1 74 INFO: logging 1 75 INFO: print 2 76 INFO: system 2 77 INFO: logging 2 78 INFO: print 6 79 INFO: system 6 80 INFO: logging 6 81 print 7 82 system 7 83 INFO: logging 7 84 """ 85 86 _EXPECTED_LOG1 = """\ 87 INFO: print 3 88 INFO: system 3 89 INFO: logging 3 90 INFO: print 4 91 INFO: system 4 92 INFO: logging 4 93 INFO: print 5 94 INFO: system 5 95 INFO: logging 5 96 """ 97 98 _EXPECTED_LOG2 = """\ 99 INFO: print 4 100 INFO: system 4 101 INFO: logging 4 102 """ 103 104 105 class DummyLoggingConfig(logging_config.LoggingConfig): 106 console_formatter = logging.Formatter(LOGGING_FORMAT) 107 108 def __init__(self): 109 super(DummyLoggingConfig, self).__init__() 110 self.log = PipedStringIO() 111 112 113 def add_debug_file_handlers(self, log_dir, log_name=None): 114 self.logger.addHandler(logging.StreamHandler(self.log)) 115 116 117 # this isn't really a unit test since it creates subprocesses and pipes and all 118 # that. i just use the unittest framework because it's convenient. 119 class LoggingManagerTest(unittest.TestCase): 120 def setUp(self): 121 self.stdout = PipedStringIO() 122 self._log1 = PipedStringIO() 123 self._log2 = PipedStringIO() 124 125 self._real_system_calls = False 126 127 # the LoggingManager will change self.stdout (by design), so keep a 128 # copy around 129 self._original_stdout = self.stdout 130 131 # clear out import-time logging config and reconfigure 132 root_logger = logging.getLogger() 133 for handler in list(root_logger.handlers): 134 root_logger.removeHandler(handler) 135 # use INFO to ignore debug output from logging_manager itself 136 logging.basicConfig(level=logging.INFO, format=LOGGING_FORMAT, 137 stream=self.stdout) 138 139 self._config_object = DummyLoggingConfig() 140 logging_manager.LoggingManager.logging_config_object = ( 141 self._config_object) 142 143 144 def tearDown(self): 145 PipedStringIO.cleanup_all_instances() 146 147 148 def _say(self, suffix): 149 print >>self.stdout, 'print %s' % suffix 150 if self._real_system_calls: 151 os.system('echo system %s >&%s' % (suffix, 152 self._original_stdout.fileno())) 153 else: 154 print >>self.stdout, 'system %s' % suffix 155 logging.info('logging %s', suffix) 156 PipedStringIO.read_all_pipes() 157 158 159 def _setup_manager(self, manager_class=logging_manager.LoggingManager): 160 def set_stdout(file_object): 161 self.stdout = file_object 162 manager = manager_class() 163 manager.manage_stream(self.stdout, logging.INFO, set_stdout) 164 return manager 165 166 167 def _run_test(self, manager_class): 168 manager = self._setup_manager(manager_class) 169 170 self._say(1) 171 172 manager.start_logging() 173 self._say(2) 174 175 manager.redirect_to_stream(self._log1) 176 self._say(3) 177 178 manager.tee_redirect_to_stream(self._log2) 179 self._say(4) 180 181 manager.undo_redirect() 182 self._say(5) 183 184 manager.undo_redirect() 185 self._say(6) 186 187 manager.stop_logging() 188 self._say(7) 189 190 191 def _grab_fd_info(self): 192 command = 'ls -l /proc/%s/fd' % os.getpid() 193 proc = subprocess.Popen(command.split(), shell=True, 194 stdout=subprocess.PIPE) 195 return proc.communicate()[0] 196 197 198 def _compare_logs(self, log_buffer, expected_text): 199 actual_lines = log_buffer.getvalue().splitlines() 200 expected_lines = expected_text.splitlines() 201 if self._real_system_calls: 202 # because of the many interacting processes, we can't ensure perfect 203 # interleaving. so compare sets of lines rather than ordered lines. 204 actual_lines = set(actual_lines) 205 expected_lines = set(expected_lines) 206 self.assertEquals(actual_lines, expected_lines) 207 208 209 def _check_results(self): 210 # ensure our stdout was restored 211 self.assertEquals(self.stdout, self._original_stdout) 212 213 if self._real_system_calls: 214 # ensure FDs were left in their original state 215 self.assertEquals(self._grab_fd_info(), self._fd_info) 216 217 self._compare_logs(self.stdout, _EXPECTED_STDOUT) 218 self._compare_logs(self._log1, _EXPECTED_LOG1) 219 self._compare_logs(self._log2, _EXPECTED_LOG2) 220 221 222 def test_logging_manager(self): 223 self._run_test(logging_manager.LoggingManager) 224 self._check_results() 225 226 227 def test_fd_redirection_logging_manager(self): 228 self._real_system_calls = True 229 self._fd_info = self._grab_fd_info() 230 self._run_test(logging_manager.FdRedirectionLoggingManager) 231 self._check_results() 232 233 234 def test_tee_redirect_debug_dir(self): 235 manager = self._setup_manager() 236 manager.start_logging() 237 238 manager.tee_redirect_debug_dir('/fake/dir', tag='mytag') 239 print >>self.stdout, 'hello' 240 241 manager.undo_redirect() 242 print >>self.stdout, 'goodbye' 243 244 manager.stop_logging() 245 246 self._compare_logs(self.stdout, 247 'INFO: mytag : hello\nINFO: goodbye') 248 self._compare_logs(self._config_object.log, 'hello\n') 249 250 251 class MonkeyPatchTestCase(unittest.TestCase): 252 def setUp(self): 253 filename = os.path.split(__file__)[1] 254 if filename.endswith('.pyc'): 255 filename = filename[:-1] 256 self.expected_filename = filename 257 258 259 def check_filename(self, filename, expected=None): 260 if expected is None: 261 expected = [self.expected_filename] 262 self.assertIn(os.path.split(filename)[1], expected) 263 264 265 def _0_test_find_caller(self): 266 finder = logging_manager._logging_manager_aware_logger__find_caller 267 filename, lineno, caller_name = finder(logging_manager.logger) 268 self.check_filename(filename) 269 self.assertEquals('test_find_caller', caller_name) 270 271 272 def _1_test_find_caller(self): 273 self._0_test_find_caller() 274 275 276 def test_find_caller(self): 277 self._1_test_find_caller() 278 279 280 def _0_test_non_reported_find_caller(self): 281 finder = logging_manager._logging_manager_aware_logger__find_caller 282 filename, lineno, caller_name = finder(logging_manager.logger) 283 # Python 2.4 unittest implementation will call the unittest method in 284 # file 'unittest.py', and Python >= 2.6 does the same in 'case.py' 285 self.check_filename(filename, expected=['unittest.py', 'case.py']) 286 287 288 def _1_test_non_reported_find_caller(self): 289 self._0_test_non_reported_find_caller() 290 291 292 @logging_manager.do_not_report_as_logging_caller 293 def test_non_reported_find_caller(self): 294 self._1_test_non_reported_find_caller() 295 296 297 298 if __name__ == '__main__': 299 unittest.main() 300