Home | History | Annotate | Download | only in test
      1 import gc
      2 import pprint
      3 import sys
      4 import unittest
      5 
      6 
      7 class TestGetProfile(unittest.TestCase):
      8     def setUp(self):
      9         sys.setprofile(None)
     10 
     11     def tearDown(self):
     12         sys.setprofile(None)
     13 
     14     def test_empty(self):
     15         self.assertIsNone(sys.getprofile())
     16 
     17     def test_setget(self):
     18         def fn(*args):
     19             pass
     20 
     21         sys.setprofile(fn)
     22         self.assertIs(sys.getprofile(), fn)
     23 
     24 class HookWatcher:
     25     def __init__(self):
     26         self.frames = []
     27         self.events = []
     28 
     29     def callback(self, frame, event, arg):
     30         if (event == "call"
     31             or event == "return"
     32             or event == "exception"):
     33             self.add_event(event, frame)
     34 
     35     def add_event(self, event, frame=None):
     36         """Add an event to the log."""
     37         if frame is None:
     38             frame = sys._getframe(1)
     39 
     40         try:
     41             frameno = self.frames.index(frame)
     42         except ValueError:
     43             frameno = len(self.frames)
     44             self.frames.append(frame)
     45 
     46         self.events.append((frameno, event, ident(frame)))
     47 
     48     def get_events(self):
     49         """Remove calls to add_event()."""
     50         disallowed = [ident(self.add_event.__func__), ident(ident)]
     51         self.frames = None
     52 
     53         return [item for item in self.events if item[2] not in disallowed]
     54 
     55 
     56 class ProfileSimulator(HookWatcher):
     57     def __init__(self, testcase):
     58         self.testcase = testcase
     59         self.stack = []
     60         HookWatcher.__init__(self)
     61 
     62     def callback(self, frame, event, arg):
     63         # Callback registered with sys.setprofile()/sys.settrace()
     64         self.dispatch[event](self, frame)
     65 
     66     def trace_call(self, frame):
     67         self.add_event('call', frame)
     68         self.stack.append(frame)
     69 
     70     def trace_return(self, frame):
     71         self.add_event('return', frame)
     72         self.stack.pop()
     73 
     74     def trace_exception(self, frame):
     75         self.testcase.fail(
     76             "the profiler should never receive exception events")
     77 
     78     def trace_pass(self, frame):
     79         pass
     80 
     81     dispatch = {
     82         'call': trace_call,
     83         'exception': trace_exception,
     84         'return': trace_return,
     85         'c_call': trace_pass,
     86         'c_return': trace_pass,
     87         'c_exception': trace_pass,
     88         }
     89 
     90 
     91 class TestCaseBase(unittest.TestCase):
     92     def check_events(self, callable, expected):
     93         events = capture_events(callable, self.new_watcher())
     94         if events != expected:
     95             self.fail("Expected events:\n%s\nReceived events:\n%s"
     96                       % (pprint.pformat(expected), pprint.pformat(events)))
     97 
     98 
     99 class ProfileHookTestCase(TestCaseBase):
    100     def new_watcher(self):
    101         return HookWatcher()
    102 
    103     def test_simple(self):
    104         def f(p):
    105             pass
    106         f_ident = ident(f)
    107         self.check_events(f, [(1, 'call', f_ident),
    108                               (1, 'return', f_ident),
    109                               ])
    110 
    111     def test_exception(self):
    112         def f(p):
    113             1/0
    114         f_ident = ident(f)
    115         self.check_events(f, [(1, 'call', f_ident),
    116                               (1, 'return', f_ident),
    117                               ])
    118 
    119     def test_caught_exception(self):
    120         def f(p):
    121             try: 1/0
    122             except: pass
    123         f_ident = ident(f)
    124         self.check_events(f, [(1, 'call', f_ident),
    125                               (1, 'return', f_ident),
    126                               ])
    127 
    128     def test_caught_nested_exception(self):
    129         def f(p):
    130             try: 1/0
    131             except: pass
    132         f_ident = ident(f)
    133         self.check_events(f, [(1, 'call', f_ident),
    134                               (1, 'return', f_ident),
    135                               ])
    136 
    137     def test_nested_exception(self):
    138         def f(p):
    139             1/0
    140         f_ident = ident(f)
    141         self.check_events(f, [(1, 'call', f_ident),
    142                               # This isn't what I expected:
    143                               # (0, 'exception', protect_ident),
    144                               # I expected this again:
    145                               (1, 'return', f_ident),
    146                               ])
    147 
    148     def test_exception_in_except_clause(self):
    149         def f(p):
    150             1/0
    151         def g(p):
    152             try:
    153                 f(p)
    154             except:
    155                 try: f(p)
    156                 except: pass
    157         f_ident = ident(f)
    158         g_ident = ident(g)
    159         self.check_events(g, [(1, 'call', g_ident),
    160                               (2, 'call', f_ident),
    161                               (2, 'return', f_ident),
    162                               (3, 'call', f_ident),
    163                               (3, 'return', f_ident),
    164                               (1, 'return', g_ident),
    165                               ])
    166 
    167     def test_exception_propagation(self):
    168         def f(p):
    169             1/0
    170         def g(p):
    171             try: f(p)
    172             finally: p.add_event("falling through")
    173         f_ident = ident(f)
    174         g_ident = ident(g)
    175         self.check_events(g, [(1, 'call', g_ident),
    176                               (2, 'call', f_ident),
    177                               (2, 'return', f_ident),
    178                               (1, 'falling through', g_ident),
    179                               (1, 'return', g_ident),
    180                               ])
    181 
    182     def test_raise_twice(self):
    183         def f(p):
    184             try: 1/0
    185             except: 1/0
    186         f_ident = ident(f)
    187         self.check_events(f, [(1, 'call', f_ident),
    188                               (1, 'return', f_ident),
    189                               ])
    190 
    191     def test_raise_reraise(self):
    192         def f(p):
    193             try: 1/0
    194             except: raise
    195         f_ident = ident(f)
    196         self.check_events(f, [(1, 'call', f_ident),
    197                               (1, 'return', f_ident),
    198                               ])
    199 
    200     def test_raise(self):
    201         def f(p):
    202             raise Exception()
    203         f_ident = ident(f)
    204         self.check_events(f, [(1, 'call', f_ident),
    205                               (1, 'return', f_ident),
    206                               ])
    207 
    208     def test_distant_exception(self):
    209         def f():
    210             1/0
    211         def g():
    212             f()
    213         def h():
    214             g()
    215         def i():
    216             h()
    217         def j(p):
    218             i()
    219         f_ident = ident(f)
    220         g_ident = ident(g)
    221         h_ident = ident(h)
    222         i_ident = ident(i)
    223         j_ident = ident(j)
    224         self.check_events(j, [(1, 'call', j_ident),
    225                               (2, 'call', i_ident),
    226                               (3, 'call', h_ident),
    227                               (4, 'call', g_ident),
    228                               (5, 'call', f_ident),
    229                               (5, 'return', f_ident),
    230                               (4, 'return', g_ident),
    231                               (3, 'return', h_ident),
    232                               (2, 'return', i_ident),
    233                               (1, 'return', j_ident),
    234                               ])
    235 
    236     def test_generator(self):
    237         def f():
    238             for i in range(2):
    239                 yield i
    240         def g(p):
    241             for i in f():
    242                 pass
    243         f_ident = ident(f)
    244         g_ident = ident(g)
    245         self.check_events(g, [(1, 'call', g_ident),
    246                               # call the iterator twice to generate values
    247                               (2, 'call', f_ident),
    248                               (2, 'return', f_ident),
    249                               (2, 'call', f_ident),
    250                               (2, 'return', f_ident),
    251                               # once more; returns end-of-iteration with
    252                               # actually raising an exception
    253                               (2, 'call', f_ident),
    254                               (2, 'return', f_ident),
    255                               (1, 'return', g_ident),
    256                               ])
    257 
    258     def test_stop_iteration(self):
    259         def f():
    260             for i in range(2):
    261                 yield i
    262         def g(p):
    263             for i in f():
    264                 pass
    265         f_ident = ident(f)
    266         g_ident = ident(g)
    267         self.check_events(g, [(1, 'call', g_ident),
    268                               # call the iterator twice to generate values
    269                               (2, 'call', f_ident),
    270                               (2, 'return', f_ident),
    271                               (2, 'call', f_ident),
    272                               (2, 'return', f_ident),
    273                               # once more to hit the raise:
    274                               (2, 'call', f_ident),
    275                               (2, 'return', f_ident),
    276                               (1, 'return', g_ident),
    277                               ])
    278 
    279 
    280 class ProfileSimulatorTestCase(TestCaseBase):
    281     def new_watcher(self):
    282         return ProfileSimulator(self)
    283 
    284     def test_simple(self):
    285         def f(p):
    286             pass
    287         f_ident = ident(f)
    288         self.check_events(f, [(1, 'call', f_ident),
    289                               (1, 'return', f_ident),
    290                               ])
    291 
    292     def test_basic_exception(self):
    293         def f(p):
    294             1/0
    295         f_ident = ident(f)
    296         self.check_events(f, [(1, 'call', f_ident),
    297                               (1, 'return', f_ident),
    298                               ])
    299 
    300     def test_caught_exception(self):
    301         def f(p):
    302             try: 1/0
    303             except: pass
    304         f_ident = ident(f)
    305         self.check_events(f, [(1, 'call', f_ident),
    306                               (1, 'return', f_ident),
    307                               ])
    308 
    309     def test_distant_exception(self):
    310         def f():
    311             1/0
    312         def g():
    313             f()
    314         def h():
    315             g()
    316         def i():
    317             h()
    318         def j(p):
    319             i()
    320         f_ident = ident(f)
    321         g_ident = ident(g)
    322         h_ident = ident(h)
    323         i_ident = ident(i)
    324         j_ident = ident(j)
    325         self.check_events(j, [(1, 'call', j_ident),
    326                               (2, 'call', i_ident),
    327                               (3, 'call', h_ident),
    328                               (4, 'call', g_ident),
    329                               (5, 'call', f_ident),
    330                               (5, 'return', f_ident),
    331                               (4, 'return', g_ident),
    332                               (3, 'return', h_ident),
    333                               (2, 'return', i_ident),
    334                               (1, 'return', j_ident),
    335                               ])
    336 
    337 
    338 def ident(function):
    339     if hasattr(function, "f_code"):
    340         code = function.f_code
    341     else:
    342         code = function.__code__
    343     return code.co_firstlineno, code.co_name
    344 
    345 
    346 def protect(f, p):
    347     try: f(p)
    348     except: pass
    349 
    350 protect_ident = ident(protect)
    351 
    352 
    353 def capture_events(callable, p=None):
    354     if p is None:
    355         p = HookWatcher()
    356     # Disable the garbage collector. This prevents __del__s from showing up in
    357     # traces.
    358     old_gc = gc.isenabled()
    359     gc.disable()
    360     try:
    361         sys.setprofile(p.callback)
    362         protect(callable, p)
    363         sys.setprofile(None)
    364     finally:
    365         if old_gc:
    366             gc.enable()
    367     return p.get_events()[1:-1]
    368 
    369 
    370 def show_events(callable):
    371     import pprint
    372     pprint.pprint(capture_events(callable))
    373 
    374 
    375 if __name__ == "__main__":
    376     unittest.main()
    377