Home | History | Annotate | Download | only in test
      1 import os
      2 import pprint
      3 import unittest
      4 import tempfile
      5 import _hotshot
      6 import gc
      7 
      8 from test import test_support
      9 
     10 # Silence Py3k warning
     11 hotshot = test_support.import_module('hotshot', deprecated=True)
     12 from hotshot.log import ENTER, EXIT, LINE
     13 from hotshot import stats
     14 
     15 
     16 def shortfilename(fn):
     17     # We use a really shortened filename since an exact match is made,
     18     # and the source may be either a Python source file or a
     19     # pre-compiled bytecode file.
     20     if fn:
     21         return os.path.splitext(os.path.basename(fn))[0]
     22     else:
     23         return fn
     24 
     25 
     26 class UnlinkingLogReader(hotshot.log.LogReader):
     27     """Extend the LogReader so the log file is unlinked when we're
     28     done with it."""
     29 
     30     def __init__(self, logfn):
     31         self.__logfn = logfn
     32         hotshot.log.LogReader.__init__(self, logfn)
     33 
     34     def next(self, index=None):
     35         try:
     36             return hotshot.log.LogReader.next(self)
     37         except StopIteration:
     38             self.close()
     39             os.unlink(self.__logfn)
     40             raise
     41 
     42 
     43 class HotShotTestCase(unittest.TestCase):
     44     def new_profiler(self, lineevents=0, linetimings=1):
     45         self.logfn = test_support.TESTFN
     46         return hotshot.Profile(self.logfn, lineevents, linetimings)
     47 
     48     def get_logreader(self):
     49         return UnlinkingLogReader(self.logfn)
     50 
     51     def get_events_wotime(self):
     52         L = []
     53         for event in self.get_logreader():
     54             what, (filename, lineno, funcname), tdelta = event
     55             L.append((what, (shortfilename(filename), lineno, funcname)))
     56         return L
     57 
     58     def check_events(self, expected):
     59         events = self.get_events_wotime()
     60         if events != expected:
     61             self.fail(
     62                 "events did not match expectation; got:\n%s\nexpected:\n%s"
     63                 % (pprint.pformat(events), pprint.pformat(expected)))
     64 
     65     def run_test(self, callable, events, profiler=None):
     66         if profiler is None:
     67             profiler = self.new_profiler()
     68         self.assertTrue(not profiler._prof.closed)
     69         profiler.runcall(callable)
     70         self.assertTrue(not profiler._prof.closed)
     71         profiler.close()
     72         self.assertTrue(profiler._prof.closed)
     73         self.check_events(events)
     74 
     75     def test_addinfo(self):
     76         def f(p):
     77             p.addinfo("test-key", "test-value")
     78         profiler = self.new_profiler()
     79         profiler.runcall(f, profiler)
     80         profiler.close()
     81         log = self.get_logreader()
     82         info = log._info
     83         list(log)
     84         self.assertTrue(info["test-key"] == ["test-value"])
     85 
     86     def test_line_numbers(self):
     87         def f():
     88             y = 2
     89             x = 1
     90         def g():
     91             f()
     92         f_lineno = f.func_code.co_firstlineno
     93         g_lineno = g.func_code.co_firstlineno
     94         events = [(ENTER, ("test_hotshot", g_lineno, "g")),
     95                   (LINE,  ("test_hotshot", g_lineno+1, "g")),
     96                   (ENTER, ("test_hotshot", f_lineno, "f")),
     97                   (LINE,  ("test_hotshot", f_lineno+1, "f")),
     98                   (LINE,  ("test_hotshot", f_lineno+2, "f")),
     99                   (EXIT,  ("test_hotshot", f_lineno, "f")),
    100                   (EXIT,  ("test_hotshot", g_lineno, "g")),
    101                   ]
    102         self.run_test(g, events, self.new_profiler(lineevents=1))
    103 
    104     def test_start_stop(self):
    105         # Make sure we don't return NULL in the start() and stop()
    106         # methods when there isn't an error.  Bug in 2.2 noted by
    107         # Anthony Baxter.
    108         profiler = self.new_profiler()
    109         profiler.start()
    110         profiler.stop()
    111         profiler.close()
    112         os.unlink(self.logfn)
    113 
    114     def test_bad_sys_path(self):
    115         import sys
    116         import os
    117         orig_path = sys.path
    118         coverage = hotshot._hotshot.coverage
    119         try:
    120             # verify we require a list for sys.path
    121             sys.path = 'abc'
    122             self.assertRaises(RuntimeError, coverage, test_support.TESTFN)
    123             # verify that we require sys.path exists
    124             del sys.path
    125             self.assertRaises(RuntimeError, coverage, test_support.TESTFN)
    126         finally:
    127             sys.path = orig_path
    128             if os.path.exists(test_support.TESTFN):
    129                 os.remove(test_support.TESTFN)
    130 
    131     def test_logreader_eof_error(self):
    132         emptyfile = tempfile.NamedTemporaryFile()
    133         try:
    134             self.assertRaises((IOError, EOFError), _hotshot.logreader,
    135                               emptyfile.name)
    136         finally:
    137             emptyfile.close()
    138         gc.collect()
    139 
    140     def test_load_stats(self):
    141         def start(prof):
    142             prof.start()
    143         # Make sure stats can be loaded when start and stop of profiler
    144         # are not executed in the same stack frame.
    145         profiler = self.new_profiler()
    146         start(profiler)
    147         profiler.stop()
    148         profiler.close()
    149         stats.load(self.logfn)
    150         os.unlink(self.logfn)
    151 
    152     def test_large_info(self):
    153         p = self.new_profiler()
    154         self.assertRaises(ValueError, p.addinfo, "A", "A" * 0xfceb)
    155 
    156 
    157 def test_main():
    158     test_support.run_unittest(HotShotTestCase)
    159 
    160 
    161 if __name__ == "__main__":
    162     test_main()
    163