Home | History | Annotate | Download | only in test
      1 import contextlib
      2 import os
      3 import sys
      4 import tracemalloc
      5 import unittest
      6 from unittest.mock import patch
      7 from test.support.script_helper import (assert_python_ok, assert_python_failure,
      8                                         interpreter_requires_environment)
      9 from test import support
     10 try:
     11     import threading
     12 except ImportError:
     13     threading = None
     14 try:
     15     import _testcapi
     16 except ImportError:
     17     _testcapi = None
     18 
     19 
     20 EMPTY_STRING_SIZE = sys.getsizeof(b'')
     21 
     22 
     23 def get_frames(nframe, lineno_delta):
     24     frames = []
     25     frame = sys._getframe(1)
     26     for index in range(nframe):
     27         code = frame.f_code
     28         lineno = frame.f_lineno + lineno_delta
     29         frames.append((code.co_filename, lineno))
     30         lineno_delta = 0
     31         frame = frame.f_back
     32         if frame is None:
     33             break
     34     return tuple(frames)
     35 
     36 def allocate_bytes(size):
     37     nframe = tracemalloc.get_traceback_limit()
     38     bytes_len = (size - EMPTY_STRING_SIZE)
     39     frames = get_frames(nframe, 1)
     40     data = b'x' * bytes_len
     41     return data, tracemalloc.Traceback(frames)
     42 
     43 def create_snapshots():
     44     traceback_limit = 2
     45 
     46     # _tracemalloc._get_traces() returns a list of (domain, size,
     47     # traceback_frames) tuples. traceback_frames is a tuple of (filename,
     48     # line_number) tuples.
     49     raw_traces = [
     50         (0, 10, (('a.py', 2), ('b.py', 4))),
     51         (0, 10, (('a.py', 2), ('b.py', 4))),
     52         (0, 10, (('a.py', 2), ('b.py', 4))),
     53 
     54         (1, 2, (('a.py', 5), ('b.py', 4))),
     55 
     56         (2, 66, (('b.py', 1),)),
     57 
     58         (3, 7, (('<unknown>', 0),)),
     59     ]
     60     snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
     61 
     62     raw_traces2 = [
     63         (0, 10, (('a.py', 2), ('b.py', 4))),
     64         (0, 10, (('a.py', 2), ('b.py', 4))),
     65         (0, 10, (('a.py', 2), ('b.py', 4))),
     66 
     67         (2, 2, (('a.py', 5), ('b.py', 4))),
     68         (2, 5000, (('a.py', 5), ('b.py', 4))),
     69 
     70         (4, 400, (('c.py', 578),)),
     71     ]
     72     snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
     73 
     74     return (snapshot, snapshot2)
     75 
     76 def frame(filename, lineno):
     77     return tracemalloc._Frame((filename, lineno))
     78 
     79 def traceback(*frames):
     80     return tracemalloc.Traceback(frames)
     81 
     82 def traceback_lineno(filename, lineno):
     83     return traceback((filename, lineno))
     84 
     85 def traceback_filename(filename):
     86     return traceback_lineno(filename, 0)
     87 
     88 
     89 class TestTracemallocEnabled(unittest.TestCase):
     90     def setUp(self):
     91         if tracemalloc.is_tracing():
     92             self.skipTest("tracemalloc must be stopped before the test")
     93 
     94         tracemalloc.start(1)
     95 
     96     def tearDown(self):
     97         tracemalloc.stop()
     98 
     99     def test_get_tracemalloc_memory(self):
    100         data = [allocate_bytes(123) for count in range(1000)]
    101         size = tracemalloc.get_tracemalloc_memory()
    102         self.assertGreaterEqual(size, 0)
    103 
    104         tracemalloc.clear_traces()
    105         size2 = tracemalloc.get_tracemalloc_memory()
    106         self.assertGreaterEqual(size2, 0)
    107         self.assertLessEqual(size2, size)
    108 
    109     def test_get_object_traceback(self):
    110         tracemalloc.clear_traces()
    111         obj_size = 12345
    112         obj, obj_traceback = allocate_bytes(obj_size)
    113         traceback = tracemalloc.get_object_traceback(obj)
    114         self.assertEqual(traceback, obj_traceback)
    115 
    116     def test_set_traceback_limit(self):
    117         obj_size = 10
    118 
    119         tracemalloc.stop()
    120         self.assertRaises(ValueError, tracemalloc.start, -1)
    121 
    122         tracemalloc.stop()
    123         tracemalloc.start(10)
    124         obj2, obj2_traceback = allocate_bytes(obj_size)
    125         traceback = tracemalloc.get_object_traceback(obj2)
    126         self.assertEqual(len(traceback), 10)
    127         self.assertEqual(traceback, obj2_traceback)
    128 
    129         tracemalloc.stop()
    130         tracemalloc.start(1)
    131         obj, obj_traceback = allocate_bytes(obj_size)
    132         traceback = tracemalloc.get_object_traceback(obj)
    133         self.assertEqual(len(traceback), 1)
    134         self.assertEqual(traceback, obj_traceback)
    135 
    136     def find_trace(self, traces, traceback):
    137         for trace in traces:
    138             if trace[2] == traceback._frames:
    139                 return trace
    140 
    141         self.fail("trace not found")
    142 
    143     def test_get_traces(self):
    144         tracemalloc.clear_traces()
    145         obj_size = 12345
    146         obj, obj_traceback = allocate_bytes(obj_size)
    147 
    148         traces = tracemalloc._get_traces()
    149         trace = self.find_trace(traces, obj_traceback)
    150 
    151         self.assertIsInstance(trace, tuple)
    152         domain, size, traceback = trace
    153         self.assertEqual(size, obj_size)
    154         self.assertEqual(traceback, obj_traceback._frames)
    155 
    156         tracemalloc.stop()
    157         self.assertEqual(tracemalloc._get_traces(), [])
    158 
    159     def test_get_traces_intern_traceback(self):
    160         # dummy wrappers to get more useful and identical frames in the traceback
    161         def allocate_bytes2(size):
    162             return allocate_bytes(size)
    163         def allocate_bytes3(size):
    164             return allocate_bytes2(size)
    165         def allocate_bytes4(size):
    166             return allocate_bytes3(size)
    167 
    168         # Ensure that two identical tracebacks are not duplicated
    169         tracemalloc.stop()
    170         tracemalloc.start(4)
    171         obj_size = 123
    172         obj1, obj1_traceback = allocate_bytes4(obj_size)
    173         obj2, obj2_traceback = allocate_bytes4(obj_size)
    174 
    175         traces = tracemalloc._get_traces()
    176 
    177         trace1 = self.find_trace(traces, obj1_traceback)
    178         trace2 = self.find_trace(traces, obj2_traceback)
    179         domain1, size1, traceback1 = trace1
    180         domain2, size2, traceback2 = trace2
    181         self.assertIs(traceback2, traceback1)
    182 
    183     def test_get_traced_memory(self):
    184         # Python allocates some internals objects, so the test must tolerate
    185         # a small difference between the expected size and the real usage
    186         max_error = 2048
    187 
    188         # allocate one object
    189         obj_size = 1024 * 1024
    190         tracemalloc.clear_traces()
    191         obj, obj_traceback = allocate_bytes(obj_size)
    192         size, peak_size = tracemalloc.get_traced_memory()
    193         self.assertGreaterEqual(size, obj_size)
    194         self.assertGreaterEqual(peak_size, size)
    195 
    196         self.assertLessEqual(size - obj_size, max_error)
    197         self.assertLessEqual(peak_size - size, max_error)
    198 
    199         # destroy the object
    200         obj = None
    201         size2, peak_size2 = tracemalloc.get_traced_memory()
    202         self.assertLess(size2, size)
    203         self.assertGreaterEqual(size - size2, obj_size - max_error)
    204         self.assertGreaterEqual(peak_size2, peak_size)
    205 
    206         # clear_traces() must reset traced memory counters
    207         tracemalloc.clear_traces()
    208         self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
    209 
    210         # allocate another object
    211         obj, obj_traceback = allocate_bytes(obj_size)
    212         size, peak_size = tracemalloc.get_traced_memory()
    213         self.assertGreaterEqual(size, obj_size)
    214 
    215         # stop() also resets traced memory counters
    216         tracemalloc.stop()
    217         self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
    218 
    219     def test_clear_traces(self):
    220         obj, obj_traceback = allocate_bytes(123)
    221         traceback = tracemalloc.get_object_traceback(obj)
    222         self.assertIsNotNone(traceback)
    223 
    224         tracemalloc.clear_traces()
    225         traceback2 = tracemalloc.get_object_traceback(obj)
    226         self.assertIsNone(traceback2)
    227 
    228     def test_is_tracing(self):
    229         tracemalloc.stop()
    230         self.assertFalse(tracemalloc.is_tracing())
    231 
    232         tracemalloc.start()
    233         self.assertTrue(tracemalloc.is_tracing())
    234 
    235     def test_snapshot(self):
    236         obj, source = allocate_bytes(123)
    237 
    238         # take a snapshot
    239         snapshot = tracemalloc.take_snapshot()
    240 
    241         # write on disk
    242         snapshot.dump(support.TESTFN)
    243         self.addCleanup(support.unlink, support.TESTFN)
    244 
    245         # load from disk
    246         snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
    247         self.assertEqual(snapshot2.traces, snapshot.traces)
    248 
    249         # tracemalloc must be tracing memory allocations to take a snapshot
    250         tracemalloc.stop()
    251         with self.assertRaises(RuntimeError) as cm:
    252             tracemalloc.take_snapshot()
    253         self.assertEqual(str(cm.exception),
    254                          "the tracemalloc module must be tracing memory "
    255                          "allocations to take a snapshot")
    256 
    257     def test_snapshot_save_attr(self):
    258         # take a snapshot with a new attribute
    259         snapshot = tracemalloc.take_snapshot()
    260         snapshot.test_attr = "new"
    261         snapshot.dump(support.TESTFN)
    262         self.addCleanup(support.unlink, support.TESTFN)
    263 
    264         # load() should recreate the attribute
    265         snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
    266         self.assertEqual(snapshot2.test_attr, "new")
    267 
    268     def fork_child(self):
    269         if not tracemalloc.is_tracing():
    270             return 2
    271 
    272         obj_size = 12345
    273         obj, obj_traceback = allocate_bytes(obj_size)
    274         traceback = tracemalloc.get_object_traceback(obj)
    275         if traceback is None:
    276             return 3
    277 
    278         # everything is fine
    279         return 0
    280 
    281     @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
    282     def test_fork(self):
    283         # check that tracemalloc is still working after fork
    284         pid = os.fork()
    285         if not pid:
    286             # child
    287             exitcode = 1
    288             try:
    289                 exitcode = self.fork_child()
    290             finally:
    291                 os._exit(exitcode)
    292         else:
    293             pid2, status = os.waitpid(pid, 0)
    294             self.assertTrue(os.WIFEXITED(status))
    295             exitcode = os.WEXITSTATUS(status)
    296             self.assertEqual(exitcode, 0)
    297 
    298 
    299 class TestSnapshot(unittest.TestCase):
    300     maxDiff = 4000
    301 
    302     def test_create_snapshot(self):
    303         raw_traces = [(0, 5, (('a.py', 2),))]
    304 
    305         with contextlib.ExitStack() as stack:
    306             stack.enter_context(patch.object(tracemalloc, 'is_tracing',
    307                                              return_value=True))
    308             stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
    309                                              return_value=5))
    310             stack.enter_context(patch.object(tracemalloc, '_get_traces',
    311                                              return_value=raw_traces))
    312 
    313             snapshot = tracemalloc.take_snapshot()
    314             self.assertEqual(snapshot.traceback_limit, 5)
    315             self.assertEqual(len(snapshot.traces), 1)
    316             trace = snapshot.traces[0]
    317             self.assertEqual(trace.size, 5)
    318             self.assertEqual(len(trace.traceback), 1)
    319             self.assertEqual(trace.traceback[0].filename, 'a.py')
    320             self.assertEqual(trace.traceback[0].lineno, 2)
    321 
    322     def test_filter_traces(self):
    323         snapshot, snapshot2 = create_snapshots()
    324         filter1 = tracemalloc.Filter(False, "b.py")
    325         filter2 = tracemalloc.Filter(True, "a.py", 2)
    326         filter3 = tracemalloc.Filter(True, "a.py", 5)
    327 
    328         original_traces = list(snapshot.traces._traces)
    329 
    330         # exclude b.py
    331         snapshot3 = snapshot.filter_traces((filter1,))
    332         self.assertEqual(snapshot3.traces._traces, [
    333             (0, 10, (('a.py', 2), ('b.py', 4))),
    334             (0, 10, (('a.py', 2), ('b.py', 4))),
    335             (0, 10, (('a.py', 2), ('b.py', 4))),
    336             (1, 2, (('a.py', 5), ('b.py', 4))),
    337             (3, 7, (('<unknown>', 0),)),
    338         ])
    339 
    340         # filter_traces() must not touch the original snapshot
    341         self.assertEqual(snapshot.traces._traces, original_traces)
    342 
    343         # only include two lines of a.py
    344         snapshot4 = snapshot3.filter_traces((filter2, filter3))
    345         self.assertEqual(snapshot4.traces._traces, [
    346             (0, 10, (('a.py', 2), ('b.py', 4))),
    347             (0, 10, (('a.py', 2), ('b.py', 4))),
    348             (0, 10, (('a.py', 2), ('b.py', 4))),
    349             (1, 2, (('a.py', 5), ('b.py', 4))),
    350         ])
    351 
    352         # No filter: just duplicate the snapshot
    353         snapshot5 = snapshot.filter_traces(())
    354         self.assertIsNot(snapshot5, snapshot)
    355         self.assertIsNot(snapshot5.traces, snapshot.traces)
    356         self.assertEqual(snapshot5.traces, snapshot.traces)
    357 
    358         self.assertRaises(TypeError, snapshot.filter_traces, filter1)
    359 
    360     def test_filter_traces_domain(self):
    361         snapshot, snapshot2 = create_snapshots()
    362         filter1 = tracemalloc.Filter(False, "a.py", domain=1)
    363         filter2 = tracemalloc.Filter(True, "a.py", domain=1)
    364 
    365         original_traces = list(snapshot.traces._traces)
    366 
    367         # exclude a.py of domain 1
    368         snapshot3 = snapshot.filter_traces((filter1,))
    369         self.assertEqual(snapshot3.traces._traces, [
    370             (0, 10, (('a.py', 2), ('b.py', 4))),
    371             (0, 10, (('a.py', 2), ('b.py', 4))),
    372             (0, 10, (('a.py', 2), ('b.py', 4))),
    373             (2, 66, (('b.py', 1),)),
    374             (3, 7, (('<unknown>', 0),)),
    375         ])
    376 
    377         # include domain 1
    378         snapshot3 = snapshot.filter_traces((filter1,))
    379         self.assertEqual(snapshot3.traces._traces, [
    380             (0, 10, (('a.py', 2), ('b.py', 4))),
    381             (0, 10, (('a.py', 2), ('b.py', 4))),
    382             (0, 10, (('a.py', 2), ('b.py', 4))),
    383             (2, 66, (('b.py', 1),)),
    384             (3, 7, (('<unknown>', 0),)),
    385         ])
    386 
    387     def test_filter_traces_domain_filter(self):
    388         snapshot, snapshot2 = create_snapshots()
    389         filter1 = tracemalloc.DomainFilter(False, domain=3)
    390         filter2 = tracemalloc.DomainFilter(True, domain=3)
    391 
    392         # exclude domain 2
    393         snapshot3 = snapshot.filter_traces((filter1,))
    394         self.assertEqual(snapshot3.traces._traces, [
    395             (0, 10, (('a.py', 2), ('b.py', 4))),
    396             (0, 10, (('a.py', 2), ('b.py', 4))),
    397             (0, 10, (('a.py', 2), ('b.py', 4))),
    398             (1, 2, (('a.py', 5), ('b.py', 4))),
    399             (2, 66, (('b.py', 1),)),
    400         ])
    401 
    402         # include domain 2
    403         snapshot3 = snapshot.filter_traces((filter2,))
    404         self.assertEqual(snapshot3.traces._traces, [
    405             (3, 7, (('<unknown>', 0),)),
    406         ])
    407 
    408     def test_snapshot_group_by_line(self):
    409         snapshot, snapshot2 = create_snapshots()
    410         tb_0 = traceback_lineno('<unknown>', 0)
    411         tb_a_2 = traceback_lineno('a.py', 2)
    412         tb_a_5 = traceback_lineno('a.py', 5)
    413         tb_b_1 = traceback_lineno('b.py', 1)
    414         tb_c_578 = traceback_lineno('c.py', 578)
    415 
    416         # stats per file and line
    417         stats1 = snapshot.statistics('lineno')
    418         self.assertEqual(stats1, [
    419             tracemalloc.Statistic(tb_b_1, 66, 1),
    420             tracemalloc.Statistic(tb_a_2, 30, 3),
    421             tracemalloc.Statistic(tb_0, 7, 1),
    422             tracemalloc.Statistic(tb_a_5, 2, 1),
    423         ])
    424 
    425         # stats per file and line (2)
    426         stats2 = snapshot2.statistics('lineno')
    427         self.assertEqual(stats2, [
    428             tracemalloc.Statistic(tb_a_5, 5002, 2),
    429             tracemalloc.Statistic(tb_c_578, 400, 1),
    430             tracemalloc.Statistic(tb_a_2, 30, 3),
    431         ])
    432 
    433         # stats diff per file and line
    434         statistics = snapshot2.compare_to(snapshot, 'lineno')
    435         self.assertEqual(statistics, [
    436             tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
    437             tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
    438             tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
    439             tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
    440             tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
    441         ])
    442 
    443     def test_snapshot_group_by_file(self):
    444         snapshot, snapshot2 = create_snapshots()
    445         tb_0 = traceback_filename('<unknown>')
    446         tb_a = traceback_filename('a.py')
    447         tb_b = traceback_filename('b.py')
    448         tb_c = traceback_filename('c.py')
    449 
    450         # stats per file
    451         stats1 = snapshot.statistics('filename')
    452         self.assertEqual(stats1, [
    453             tracemalloc.Statistic(tb_b, 66, 1),
    454             tracemalloc.Statistic(tb_a, 32, 4),
    455             tracemalloc.Statistic(tb_0, 7, 1),
    456         ])
    457 
    458         # stats per file (2)
    459         stats2 = snapshot2.statistics('filename')
    460         self.assertEqual(stats2, [
    461             tracemalloc.Statistic(tb_a, 5032, 5),
    462             tracemalloc.Statistic(tb_c, 400, 1),
    463         ])
    464 
    465         # stats diff per file
    466         diff = snapshot2.compare_to(snapshot, 'filename')
    467         self.assertEqual(diff, [
    468             tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
    469             tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
    470             tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
    471             tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
    472         ])
    473 
    474     def test_snapshot_group_by_traceback(self):
    475         snapshot, snapshot2 = create_snapshots()
    476 
    477         # stats per file
    478         tb1 = traceback(('a.py', 2), ('b.py', 4))
    479         tb2 = traceback(('a.py', 5), ('b.py', 4))
    480         tb3 = traceback(('b.py', 1))
    481         tb4 = traceback(('<unknown>', 0))
    482         stats1 = snapshot.statistics('traceback')
    483         self.assertEqual(stats1, [
    484             tracemalloc.Statistic(tb3, 66, 1),
    485             tracemalloc.Statistic(tb1, 30, 3),
    486             tracemalloc.Statistic(tb4, 7, 1),
    487             tracemalloc.Statistic(tb2, 2, 1),
    488         ])
    489 
    490         # stats per file (2)
    491         tb5 = traceback(('c.py', 578))
    492         stats2 = snapshot2.statistics('traceback')
    493         self.assertEqual(stats2, [
    494             tracemalloc.Statistic(tb2, 5002, 2),
    495             tracemalloc.Statistic(tb5, 400, 1),
    496             tracemalloc.Statistic(tb1, 30, 3),
    497         ])
    498 
    499         # stats diff per file
    500         diff = snapshot2.compare_to(snapshot, 'traceback')
    501         self.assertEqual(diff, [
    502             tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
    503             tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
    504             tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
    505             tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
    506             tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
    507         ])
    508 
    509         self.assertRaises(ValueError,
    510                           snapshot.statistics, 'traceback', cumulative=True)
    511 
    512     def test_snapshot_group_by_cumulative(self):
    513         snapshot, snapshot2 = create_snapshots()
    514         tb_0 = traceback_filename('<unknown>')
    515         tb_a = traceback_filename('a.py')
    516         tb_b = traceback_filename('b.py')
    517         tb_a_2 = traceback_lineno('a.py', 2)
    518         tb_a_5 = traceback_lineno('a.py', 5)
    519         tb_b_1 = traceback_lineno('b.py', 1)
    520         tb_b_4 = traceback_lineno('b.py', 4)
    521 
    522         # per file
    523         stats = snapshot.statistics('filename', True)
    524         self.assertEqual(stats, [
    525             tracemalloc.Statistic(tb_b, 98, 5),
    526             tracemalloc.Statistic(tb_a, 32, 4),
    527             tracemalloc.Statistic(tb_0, 7, 1),
    528         ])
    529 
    530         # per line
    531         stats = snapshot.statistics('lineno', True)
    532         self.assertEqual(stats, [
    533             tracemalloc.Statistic(tb_b_1, 66, 1),
    534             tracemalloc.Statistic(tb_b_4, 32, 4),
    535             tracemalloc.Statistic(tb_a_2, 30, 3),
    536             tracemalloc.Statistic(tb_0, 7, 1),
    537             tracemalloc.Statistic(tb_a_5, 2, 1),
    538         ])
    539 
    540     def test_trace_format(self):
    541         snapshot, snapshot2 = create_snapshots()
    542         trace = snapshot.traces[0]
    543         self.assertEqual(str(trace), 'a.py:2: 10 B')
    544         traceback = trace.traceback
    545         self.assertEqual(str(traceback), 'a.py:2')
    546         frame = traceback[0]
    547         self.assertEqual(str(frame), 'a.py:2')
    548 
    549     def test_statistic_format(self):
    550         snapshot, snapshot2 = create_snapshots()
    551         stats = snapshot.statistics('lineno')
    552         stat = stats[0]
    553         self.assertEqual(str(stat),
    554                          'b.py:1: size=66 B, count=1, average=66 B')
    555 
    556     def test_statistic_diff_format(self):
    557         snapshot, snapshot2 = create_snapshots()
    558         stats = snapshot2.compare_to(snapshot, 'lineno')
    559         stat = stats[0]
    560         self.assertEqual(str(stat),
    561                          'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
    562 
    563     def test_slices(self):
    564         snapshot, snapshot2 = create_snapshots()
    565         self.assertEqual(snapshot.traces[:2],
    566                          (snapshot.traces[0], snapshot.traces[1]))
    567 
    568         traceback = snapshot.traces[0].traceback
    569         self.assertEqual(traceback[:2],
    570                          (traceback[0], traceback[1]))
    571 
    572     def test_format_traceback(self):
    573         snapshot, snapshot2 = create_snapshots()
    574         def getline(filename, lineno):
    575             return '  <%s, %s>' % (filename, lineno)
    576         with unittest.mock.patch('tracemalloc.linecache.getline',
    577                                  side_effect=getline):
    578             tb = snapshot.traces[0].traceback
    579             self.assertEqual(tb.format(),
    580                              ['  File "a.py", line 2',
    581                               '    <a.py, 2>',
    582                               '  File "b.py", line 4',
    583                               '    <b.py, 4>'])
    584 
    585             self.assertEqual(tb.format(limit=1),
    586                              ['  File "a.py", line 2',
    587                               '    <a.py, 2>'])
    588 
    589             self.assertEqual(tb.format(limit=-1),
    590                              [])
    591 
    592 
    593 class TestFilters(unittest.TestCase):
    594     maxDiff = 2048
    595 
    596     def test_filter_attributes(self):
    597         # test default values
    598         f = tracemalloc.Filter(True, "abc")
    599         self.assertEqual(f.inclusive, True)
    600         self.assertEqual(f.filename_pattern, "abc")
    601         self.assertIsNone(f.lineno)
    602         self.assertEqual(f.all_frames, False)
    603 
    604         # test custom values
    605         f = tracemalloc.Filter(False, "test.py", 123, True)
    606         self.assertEqual(f.inclusive, False)
    607         self.assertEqual(f.filename_pattern, "test.py")
    608         self.assertEqual(f.lineno, 123)
    609         self.assertEqual(f.all_frames, True)
    610 
    611         # parameters passed by keyword
    612         f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
    613         self.assertEqual(f.inclusive, False)
    614         self.assertEqual(f.filename_pattern, "test.py")
    615         self.assertEqual(f.lineno, 123)
    616         self.assertEqual(f.all_frames, True)
    617 
    618         # read-only attribute
    619         self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
    620 
    621     def test_filter_match(self):
    622         # filter without line number
    623         f = tracemalloc.Filter(True, "abc")
    624         self.assertTrue(f._match_frame("abc", 0))
    625         self.assertTrue(f._match_frame("abc", 5))
    626         self.assertTrue(f._match_frame("abc", 10))
    627         self.assertFalse(f._match_frame("12356", 0))
    628         self.assertFalse(f._match_frame("12356", 5))
    629         self.assertFalse(f._match_frame("12356", 10))
    630 
    631         f = tracemalloc.Filter(False, "abc")
    632         self.assertFalse(f._match_frame("abc", 0))
    633         self.assertFalse(f._match_frame("abc", 5))
    634         self.assertFalse(f._match_frame("abc", 10))
    635         self.assertTrue(f._match_frame("12356", 0))
    636         self.assertTrue(f._match_frame("12356", 5))
    637         self.assertTrue(f._match_frame("12356", 10))
    638 
    639         # filter with line number > 0
    640         f = tracemalloc.Filter(True, "abc", 5)
    641         self.assertFalse(f._match_frame("abc", 0))
    642         self.assertTrue(f._match_frame("abc", 5))
    643         self.assertFalse(f._match_frame("abc", 10))
    644         self.assertFalse(f._match_frame("12356", 0))
    645         self.assertFalse(f._match_frame("12356", 5))
    646         self.assertFalse(f._match_frame("12356", 10))
    647 
    648         f = tracemalloc.Filter(False, "abc", 5)
    649         self.assertTrue(f._match_frame("abc", 0))
    650         self.assertFalse(f._match_frame("abc", 5))
    651         self.assertTrue(f._match_frame("abc", 10))
    652         self.assertTrue(f._match_frame("12356", 0))
    653         self.assertTrue(f._match_frame("12356", 5))
    654         self.assertTrue(f._match_frame("12356", 10))
    655 
    656         # filter with line number 0
    657         f = tracemalloc.Filter(True, "abc", 0)
    658         self.assertTrue(f._match_frame("abc", 0))
    659         self.assertFalse(f._match_frame("abc", 5))
    660         self.assertFalse(f._match_frame("abc", 10))
    661         self.assertFalse(f._match_frame("12356", 0))
    662         self.assertFalse(f._match_frame("12356", 5))
    663         self.assertFalse(f._match_frame("12356", 10))
    664 
    665         f = tracemalloc.Filter(False, "abc", 0)
    666         self.assertFalse(f._match_frame("abc", 0))
    667         self.assertTrue(f._match_frame("abc", 5))
    668         self.assertTrue(f._match_frame("abc", 10))
    669         self.assertTrue(f._match_frame("12356", 0))
    670         self.assertTrue(f._match_frame("12356", 5))
    671         self.assertTrue(f._match_frame("12356", 10))
    672 
    673     def test_filter_match_filename(self):
    674         def fnmatch(inclusive, filename, pattern):
    675             f = tracemalloc.Filter(inclusive, pattern)
    676             return f._match_frame(filename, 0)
    677 
    678         self.assertTrue(fnmatch(True, "abc", "abc"))
    679         self.assertFalse(fnmatch(True, "12356", "abc"))
    680         self.assertFalse(fnmatch(True, "<unknown>", "abc"))
    681 
    682         self.assertFalse(fnmatch(False, "abc", "abc"))
    683         self.assertTrue(fnmatch(False, "12356", "abc"))
    684         self.assertTrue(fnmatch(False, "<unknown>", "abc"))
    685 
    686     def test_filter_match_filename_joker(self):
    687         def fnmatch(filename, pattern):
    688             filter = tracemalloc.Filter(True, pattern)
    689             return filter._match_frame(filename, 0)
    690 
    691         # empty string
    692         self.assertFalse(fnmatch('abc', ''))
    693         self.assertFalse(fnmatch('', 'abc'))
    694         self.assertTrue(fnmatch('', ''))
    695         self.assertTrue(fnmatch('', '*'))
    696 
    697         # no *
    698         self.assertTrue(fnmatch('abc', 'abc'))
    699         self.assertFalse(fnmatch('abc', 'abcd'))
    700         self.assertFalse(fnmatch('abc', 'def'))
    701 
    702         # a*
    703         self.assertTrue(fnmatch('abc', 'a*'))
    704         self.assertTrue(fnmatch('abc', 'abc*'))
    705         self.assertFalse(fnmatch('abc', 'b*'))
    706         self.assertFalse(fnmatch('abc', 'abcd*'))
    707 
    708         # a*b
    709         self.assertTrue(fnmatch('abc', 'a*c'))
    710         self.assertTrue(fnmatch('abcdcx', 'a*cx'))
    711         self.assertFalse(fnmatch('abb', 'a*c'))
    712         self.assertFalse(fnmatch('abcdce', 'a*cx'))
    713 
    714         # a*b*c
    715         self.assertTrue(fnmatch('abcde', 'a*c*e'))
    716         self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
    717         self.assertFalse(fnmatch('abcdd', 'a*c*e'))
    718         self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
    719 
    720         # replace .pyc suffix with .py
    721         self.assertTrue(fnmatch('a.pyc', 'a.py'))
    722         self.assertTrue(fnmatch('a.py', 'a.pyc'))
    723 
    724         if os.name == 'nt':
    725             # case insensitive
    726             self.assertTrue(fnmatch('aBC', 'ABc'))
    727             self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
    728 
    729             self.assertTrue(fnmatch('a.pyc', 'a.PY'))
    730             self.assertTrue(fnmatch('a.py', 'a.PYC'))
    731         else:
    732             # case sensitive
    733             self.assertFalse(fnmatch('aBC', 'ABc'))
    734             self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
    735 
    736             self.assertFalse(fnmatch('a.pyc', 'a.PY'))
    737             self.assertFalse(fnmatch('a.py', 'a.PYC'))
    738 
    739         if os.name == 'nt':
    740             # normalize alternate separator "/" to the standard separator "\"
    741             self.assertTrue(fnmatch(r'a/b', r'a\b'))
    742             self.assertTrue(fnmatch(r'a\b', r'a/b'))
    743             self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
    744             self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
    745         else:
    746             # there is no alternate separator
    747             self.assertFalse(fnmatch(r'a/b', r'a\b'))
    748             self.assertFalse(fnmatch(r'a\b', r'a/b'))
    749             self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
    750             self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
    751 
    752         # as of 3.5, .pyo is no longer munged to .py
    753         self.assertFalse(fnmatch('a.pyo', 'a.py'))
    754 
    755     def test_filter_match_trace(self):
    756         t1 = (("a.py", 2), ("b.py", 3))
    757         t2 = (("b.py", 4), ("b.py", 5))
    758         t3 = (("c.py", 5), ('<unknown>', 0))
    759         unknown = (('<unknown>', 0),)
    760 
    761         f = tracemalloc.Filter(True, "b.py", all_frames=True)
    762         self.assertTrue(f._match_traceback(t1))
    763         self.assertTrue(f._match_traceback(t2))
    764         self.assertFalse(f._match_traceback(t3))
    765         self.assertFalse(f._match_traceback(unknown))
    766 
    767         f = tracemalloc.Filter(True, "b.py", all_frames=False)
    768         self.assertFalse(f._match_traceback(t1))
    769         self.assertTrue(f._match_traceback(t2))
    770         self.assertFalse(f._match_traceback(t3))
    771         self.assertFalse(f._match_traceback(unknown))
    772 
    773         f = tracemalloc.Filter(False, "b.py", all_frames=True)
    774         self.assertFalse(f._match_traceback(t1))
    775         self.assertFalse(f._match_traceback(t2))
    776         self.assertTrue(f._match_traceback(t3))
    777         self.assertTrue(f._match_traceback(unknown))
    778 
    779         f = tracemalloc.Filter(False, "b.py", all_frames=False)
    780         self.assertTrue(f._match_traceback(t1))
    781         self.assertFalse(f._match_traceback(t2))
    782         self.assertTrue(f._match_traceback(t3))
    783         self.assertTrue(f._match_traceback(unknown))
    784 
    785         f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
    786         self.assertTrue(f._match_traceback(t1))
    787         self.assertTrue(f._match_traceback(t2))
    788         self.assertTrue(f._match_traceback(t3))
    789         self.assertFalse(f._match_traceback(unknown))
    790 
    791         f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
    792         self.assertFalse(f._match_traceback(t1))
    793         self.assertFalse(f._match_traceback(t2))
    794         self.assertTrue(f._match_traceback(t3))
    795         self.assertTrue(f._match_traceback(unknown))
    796 
    797         f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
    798         self.assertTrue(f._match_traceback(t1))
    799         self.assertTrue(f._match_traceback(t2))
    800         self.assertFalse(f._match_traceback(t3))
    801         self.assertFalse(f._match_traceback(unknown))
    802 
    803 
    804 class TestCommandLine(unittest.TestCase):
    805     def test_env_var_disabled_by_default(self):
    806         # not tracing by default
    807         code = 'import tracemalloc; print(tracemalloc.is_tracing())'
    808         ok, stdout, stderr = assert_python_ok('-c', code)
    809         stdout = stdout.rstrip()
    810         self.assertEqual(stdout, b'False')
    811 
    812     @unittest.skipIf(interpreter_requires_environment(),
    813                      'Cannot run -E tests when PYTHON env vars are required.')
    814     def test_env_var_ignored_with_E(self):
    815         """PYTHON* environment variables must be ignored when -E is present."""
    816         code = 'import tracemalloc; print(tracemalloc.is_tracing())'
    817         ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
    818         stdout = stdout.rstrip()
    819         self.assertEqual(stdout, b'False')
    820 
    821     def test_env_var_enabled_at_startup(self):
    822         # tracing at startup
    823         code = 'import tracemalloc; print(tracemalloc.is_tracing())'
    824         ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
    825         stdout = stdout.rstrip()
    826         self.assertEqual(stdout, b'True')
    827 
    828     def test_env_limit(self):
    829         # start and set the number of frames
    830         code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
    831         ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
    832         stdout = stdout.rstrip()
    833         self.assertEqual(stdout, b'10')
    834 
    835     def test_env_var_invalid(self):
    836         for nframe in (-1, 0, 2**30):
    837             with self.subTest(nframe=nframe):
    838                 with support.SuppressCrashReport():
    839                     ok, stdout, stderr = assert_python_failure(
    840                         '-c', 'pass',
    841                         PYTHONTRACEMALLOC=str(nframe))
    842                     self.assertIn(b'PYTHONTRACEMALLOC: invalid '
    843                                   b'number of frames',
    844                                   stderr)
    845 
    846     def test_sys_xoptions(self):
    847         for xoptions, nframe in (
    848             ('tracemalloc', 1),
    849             ('tracemalloc=1', 1),
    850             ('tracemalloc=15', 15),
    851         ):
    852             with self.subTest(xoptions=xoptions, nframe=nframe):
    853                 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
    854                 ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
    855                 stdout = stdout.rstrip()
    856                 self.assertEqual(stdout, str(nframe).encode('ascii'))
    857 
    858     def test_sys_xoptions_invalid(self):
    859         for nframe in (-1, 0, 2**30):
    860             with self.subTest(nframe=nframe):
    861                 with support.SuppressCrashReport():
    862                     args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
    863                     ok, stdout, stderr = assert_python_failure(*args)
    864                     self.assertIn(b'-X tracemalloc=NFRAME: invalid '
    865                                   b'number of frames',
    866                                   stderr)
    867 
    868     def test_pymem_alloc0(self):
    869         # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled
    870         # does not crash.
    871         code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1'
    872         assert_python_ok('-X', 'tracemalloc', '-c', code)
    873 
    874 
    875 @unittest.skipIf(_testcapi is None, 'need _testcapi')
    876 class TestCAPI(unittest.TestCase):
    877     maxDiff = 80 * 20
    878 
    879     def setUp(self):
    880         if tracemalloc.is_tracing():
    881             self.skipTest("tracemalloc must be stopped before the test")
    882 
    883         self.domain = 5
    884         self.size = 123
    885         self.obj = allocate_bytes(self.size)[0]
    886 
    887         # for the type "object", id(obj) is the address of its memory block.
    888         # This type is not tracked by the garbage collector
    889         self.ptr = id(self.obj)
    890 
    891     def tearDown(self):
    892         tracemalloc.stop()
    893 
    894     def get_traceback(self):
    895         frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr)
    896         if frames is not None:
    897             return tracemalloc.Traceback(frames)
    898         else:
    899             return None
    900 
    901     def track(self, release_gil=False, nframe=1):
    902         frames = get_frames(nframe, 2)
    903         _testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
    904                                     release_gil)
    905         return frames
    906 
    907     def untrack(self):
    908         _testcapi.tracemalloc_untrack(self.domain, self.ptr)
    909 
    910     def get_traced_memory(self):
    911         # Get the traced size in the domain
    912         snapshot = tracemalloc.take_snapshot()
    913         domain_filter = tracemalloc.DomainFilter(True, self.domain)
    914         snapshot = snapshot.filter_traces([domain_filter])
    915         return sum(trace.size for trace in snapshot.traces)
    916 
    917     def check_track(self, release_gil):
    918         nframe = 5
    919         tracemalloc.start(nframe)
    920 
    921         size = tracemalloc.get_traced_memory()[0]
    922 
    923         frames = self.track(release_gil, nframe)
    924         self.assertEqual(self.get_traceback(),
    925                          tracemalloc.Traceback(frames))
    926 
    927         self.assertEqual(self.get_traced_memory(), self.size)
    928 
    929     def test_track(self):
    930         self.check_track(False)
    931 
    932     def test_track_without_gil(self):
    933         # check that calling _PyTraceMalloc_Track() without holding the GIL
    934         # works too
    935         self.check_track(True)
    936 
    937     def test_track_already_tracked(self):
    938         nframe = 5
    939         tracemalloc.start(nframe)
    940 
    941         # track a first time
    942         self.track()
    943 
    944         # calling _PyTraceMalloc_Track() must remove the old trace and add
    945         # a new trace with the new traceback
    946         frames = self.track(nframe=nframe)
    947         self.assertEqual(self.get_traceback(),
    948                          tracemalloc.Traceback(frames))
    949 
    950     def test_untrack(self):
    951         tracemalloc.start()
    952 
    953         self.track()
    954         self.assertIsNotNone(self.get_traceback())
    955         self.assertEqual(self.get_traced_memory(), self.size)
    956 
    957         # untrack must remove the trace
    958         self.untrack()
    959         self.assertIsNone(self.get_traceback())
    960         self.assertEqual(self.get_traced_memory(), 0)
    961 
    962         # calling _PyTraceMalloc_Untrack() multiple times must not crash
    963         self.untrack()
    964         self.untrack()
    965 
    966     def test_stop_track(self):
    967         tracemalloc.start()
    968         tracemalloc.stop()
    969 
    970         with self.assertRaises(RuntimeError):
    971             self.track()
    972         self.assertIsNone(self.get_traceback())
    973 
    974     def test_stop_untrack(self):
    975         tracemalloc.start()
    976         self.track()
    977 
    978         tracemalloc.stop()
    979         with self.assertRaises(RuntimeError):
    980             self.untrack()
    981 
    982 
    983 def test_main():
    984     support.run_unittest(
    985         TestTracemallocEnabled,
    986         TestSnapshot,
    987         TestFilters,
    988         TestCommandLine,
    989         TestCAPI,
    990     )
    991 
    992 if __name__ == "__main__":
    993     test_main()
    994