Home | History | Annotate | Download | only in test
      1 """
      2 Tests for object finalization semantics, as outlined in PEP 442.
      3 """
      4 
      5 import contextlib
      6 import gc
      7 import unittest
      8 import weakref
      9 
     10 try:
     11     from _testcapi import with_tp_del
     12 except ImportError:
     13     def with_tp_del(cls):
     14         class C(object):
     15             def __new__(cls, *args, **kwargs):
     16                 raise TypeError('requires _testcapi.with_tp_del')
     17         return C
     18 
     19 from test import support
     20 
     21 
     22 class NonGCSimpleBase:
     23     """
     24     The base class for all the objects under test, equipped with various
     25     testing features.
     26     """
     27 
     28     survivors = []
     29     del_calls = []
     30     tp_del_calls = []
     31     errors = []
     32 
     33     _cleaning = False
     34 
     35     __slots__ = ()
     36 
     37     @classmethod
     38     def _cleanup(cls):
     39         cls.survivors.clear()
     40         cls.errors.clear()
     41         gc.garbage.clear()
     42         gc.collect()
     43         cls.del_calls.clear()
     44         cls.tp_del_calls.clear()
     45 
     46     @classmethod
     47     @contextlib.contextmanager
     48     def test(cls):
     49         """
     50         A context manager to use around all finalization tests.
     51         """
     52         with support.disable_gc():
     53             cls.del_calls.clear()
     54             cls.tp_del_calls.clear()
     55             NonGCSimpleBase._cleaning = False
     56             try:
     57                 yield
     58                 if cls.errors:
     59                     raise cls.errors[0]
     60             finally:
     61                 NonGCSimpleBase._cleaning = True
     62                 cls._cleanup()
     63 
     64     def check_sanity(self):
     65         """
     66         Check the object is sane (non-broken).
     67         """
     68 
     69     def __del__(self):
     70         """
     71         PEP 442 finalizer.  Record that this was called, check the
     72         object is in a sane state, and invoke a side effect.
     73         """
     74         try:
     75             if not self._cleaning:
     76                 self.del_calls.append(id(self))
     77                 self.check_sanity()
     78                 self.side_effect()
     79         except Exception as e:
     80             self.errors.append(e)
     81 
     82     def side_effect(self):
     83         """
     84         A side effect called on destruction.
     85         """
     86 
     87 
     88 class SimpleBase(NonGCSimpleBase):
     89 
     90     def __init__(self):
     91         self.id_ = id(self)
     92 
     93     def check_sanity(self):
     94         assert self.id_ == id(self)
     95 
     96 
     97 class NonGC(NonGCSimpleBase):
     98     __slots__ = ()
     99 
    100 class NonGCResurrector(NonGCSimpleBase):
    101     __slots__ = ()
    102 
    103     def side_effect(self):
    104         """
    105         Resurrect self by storing self in a class-wide list.
    106         """
    107         self.survivors.append(self)
    108 
    109 class Simple(SimpleBase):
    110     pass
    111 
    112 class SimpleResurrector(NonGCResurrector, SimpleBase):
    113     pass
    114 
    115 
    116 class TestBase:
    117 
    118     def setUp(self):
    119         self.old_garbage = gc.garbage[:]
    120         gc.garbage[:] = []
    121 
    122     def tearDown(self):
    123         # None of the tests here should put anything in gc.garbage
    124         try:
    125             self.assertEqual(gc.garbage, [])
    126         finally:
    127             del self.old_garbage
    128             gc.collect()
    129 
    130     def assert_del_calls(self, ids):
    131         self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))
    132 
    133     def assert_tp_del_calls(self, ids):
    134         self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))
    135 
    136     def assert_survivors(self, ids):
    137         self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))
    138 
    139     def assert_garbage(self, ids):
    140         self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))
    141 
    142     def clear_survivors(self):
    143         SimpleBase.survivors.clear()
    144 
    145 
    146 class SimpleFinalizationTest(TestBase, unittest.TestCase):
    147     """
    148     Test finalization without refcycles.
    149     """
    150 
    151     def test_simple(self):
    152         with SimpleBase.test():
    153             s = Simple()
    154             ids = [id(s)]
    155             wr = weakref.ref(s)
    156             del s
    157             gc.collect()
    158             self.assert_del_calls(ids)
    159             self.assert_survivors([])
    160             self.assertIs(wr(), None)
    161             gc.collect()
    162             self.assert_del_calls(ids)
    163             self.assert_survivors([])
    164 
    165     def test_simple_resurrect(self):
    166         with SimpleBase.test():
    167             s = SimpleResurrector()
    168             ids = [id(s)]
    169             wr = weakref.ref(s)
    170             del s
    171             gc.collect()
    172             self.assert_del_calls(ids)
    173             self.assert_survivors(ids)
    174             self.assertIsNot(wr(), None)
    175             self.clear_survivors()
    176             gc.collect()
    177             self.assert_del_calls(ids)
    178             self.assert_survivors([])
    179         self.assertIs(wr(), None)
    180 
    181     def test_non_gc(self):
    182         with SimpleBase.test():
    183             s = NonGC()
    184             self.assertFalse(gc.is_tracked(s))
    185             ids = [id(s)]
    186             del s
    187             gc.collect()
    188             self.assert_del_calls(ids)
    189             self.assert_survivors([])
    190             gc.collect()
    191             self.assert_del_calls(ids)
    192             self.assert_survivors([])
    193 
    194     def test_non_gc_resurrect(self):
    195         with SimpleBase.test():
    196             s = NonGCResurrector()
    197             self.assertFalse(gc.is_tracked(s))
    198             ids = [id(s)]
    199             del s
    200             gc.collect()
    201             self.assert_del_calls(ids)
    202             self.assert_survivors(ids)
    203             self.clear_survivors()
    204             gc.collect()
    205             self.assert_del_calls(ids * 2)
    206             self.assert_survivors(ids)
    207 
    208 
    209 class SelfCycleBase:
    210 
    211     def __init__(self):
    212         super().__init__()
    213         self.ref = self
    214 
    215     def check_sanity(self):
    216         super().check_sanity()
    217         assert self.ref is self
    218 
    219 class SimpleSelfCycle(SelfCycleBase, Simple):
    220     pass
    221 
    222 class SelfCycleResurrector(SelfCycleBase, SimpleResurrector):
    223     pass
    224 
    225 class SuicidalSelfCycle(SelfCycleBase, Simple):
    226 
    227     def side_effect(self):
    228         """
    229         Explicitly break the reference cycle.
    230         """
    231         self.ref = None
    232 
    233 
    234 class SelfCycleFinalizationTest(TestBase, unittest.TestCase):
    235     """
    236     Test finalization of an object having a single cyclic reference to
    237     itself.
    238     """
    239 
    240     def test_simple(self):
    241         with SimpleBase.test():
    242             s = SimpleSelfCycle()
    243             ids = [id(s)]
    244             wr = weakref.ref(s)
    245             del s
    246             gc.collect()
    247             self.assert_del_calls(ids)
    248             self.assert_survivors([])
    249             self.assertIs(wr(), None)
    250             gc.collect()
    251             self.assert_del_calls(ids)
    252             self.assert_survivors([])
    253 
    254     def test_simple_resurrect(self):
    255         # Test that __del__ can resurrect the object being finalized.
    256         with SimpleBase.test():
    257             s = SelfCycleResurrector()
    258             ids = [id(s)]
    259             wr = weakref.ref(s)
    260             del s
    261             gc.collect()
    262             self.assert_del_calls(ids)
    263             self.assert_survivors(ids)
    264             # XXX is this desirable?
    265             self.assertIs(wr(), None)
    266             # When trying to destroy the object a second time, __del__
    267             # isn't called anymore (and the object isn't resurrected).
    268             self.clear_survivors()
    269             gc.collect()
    270             self.assert_del_calls(ids)
    271             self.assert_survivors([])
    272             self.assertIs(wr(), None)
    273 
    274     def test_simple_suicide(self):
    275         # Test the GC is able to deal with an object that kills its last
    276         # reference during __del__.
    277         with SimpleBase.test():
    278             s = SuicidalSelfCycle()
    279             ids = [id(s)]
    280             wr = weakref.ref(s)
    281             del s
    282             gc.collect()
    283             self.assert_del_calls(ids)
    284             self.assert_survivors([])
    285             self.assertIs(wr(), None)
    286             gc.collect()
    287             self.assert_del_calls(ids)
    288             self.assert_survivors([])
    289             self.assertIs(wr(), None)
    290 
    291 
    292 class ChainedBase:
    293 
    294     def chain(self, left):
    295         self.suicided = False
    296         self.left = left
    297         left.right = self
    298 
    299     def check_sanity(self):
    300         super().check_sanity()
    301         if self.suicided:
    302             assert self.left is None
    303             assert self.right is None
    304         else:
    305             left = self.left
    306             if left.suicided:
    307                 assert left.right is None
    308             else:
    309                 assert left.right is self
    310             right = self.right
    311             if right.suicided:
    312                 assert right.left is None
    313             else:
    314                 assert right.left is self
    315 
    316 class SimpleChained(ChainedBase, Simple):
    317     pass
    318 
    319 class ChainedResurrector(ChainedBase, SimpleResurrector):
    320     pass
    321 
    322 class SuicidalChained(ChainedBase, Simple):
    323 
    324     def side_effect(self):
    325         """
    326         Explicitly break the reference cycle.
    327         """
    328         self.suicided = True
    329         self.left = None
    330         self.right = None
    331 
    332 
    333 class CycleChainFinalizationTest(TestBase, unittest.TestCase):
    334     """
    335     Test finalization of a cyclic chain.  These tests are similar in
    336     spirit to the self-cycle tests above, but the collectable object
    337     graph isn't trivial anymore.
    338     """
    339 
    340     def build_chain(self, classes):
    341         nodes = [cls() for cls in classes]
    342         for i in range(len(nodes)):
    343             nodes[i].chain(nodes[i-1])
    344         return nodes
    345 
    346     def check_non_resurrecting_chain(self, classes):
    347         N = len(classes)
    348         with SimpleBase.test():
    349             nodes = self.build_chain(classes)
    350             ids = [id(s) for s in nodes]
    351             wrs = [weakref.ref(s) for s in nodes]
    352             del nodes
    353             gc.collect()
    354             self.assert_del_calls(ids)
    355             self.assert_survivors([])
    356             self.assertEqual([wr() for wr in wrs], [None] * N)
    357             gc.collect()
    358             self.assert_del_calls(ids)
    359 
    360     def check_resurrecting_chain(self, classes):
    361         N = len(classes)
    362         with SimpleBase.test():
    363             nodes = self.build_chain(classes)
    364             N = len(nodes)
    365             ids = [id(s) for s in nodes]
    366             survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
    367             wrs = [weakref.ref(s) for s in nodes]
    368             del nodes
    369             gc.collect()
    370             self.assert_del_calls(ids)
    371             self.assert_survivors(survivor_ids)
    372             # XXX desirable?
    373             self.assertEqual([wr() for wr in wrs], [None] * N)
    374             self.clear_survivors()
    375             gc.collect()
    376             self.assert_del_calls(ids)
    377             self.assert_survivors([])
    378 
    379     def test_homogenous(self):
    380         self.check_non_resurrecting_chain([SimpleChained] * 3)
    381 
    382     def test_homogenous_resurrect(self):
    383         self.check_resurrecting_chain([ChainedResurrector] * 3)
    384 
    385     def test_homogenous_suicidal(self):
    386         self.check_non_resurrecting_chain([SuicidalChained] * 3)
    387 
    388     def test_heterogenous_suicidal_one(self):
    389         self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)
    390 
    391     def test_heterogenous_suicidal_two(self):
    392         self.check_non_resurrecting_chain(
    393             [SuicidalChained] * 2 + [SimpleChained] * 2)
    394 
    395     def test_heterogenous_resurrect_one(self):
    396         self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)
    397 
    398     def test_heterogenous_resurrect_two(self):
    399         self.check_resurrecting_chain(
    400             [ChainedResurrector, SimpleChained, SuicidalChained] * 2)
    401 
    402     def test_heterogenous_resurrect_three(self):
    403         self.check_resurrecting_chain(
    404             [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)
    405 
    406 
    407 # NOTE: the tp_del slot isn't automatically inherited, so we have to call
    408 # with_tp_del() for each instantiated class.
    409 
    410 class LegacyBase(SimpleBase):
    411 
    412     def __del__(self):
    413         try:
    414             # Do not invoke side_effect here, since we are now exercising
    415             # the tp_del slot.
    416             if not self._cleaning:
    417                 self.del_calls.append(id(self))
    418                 self.check_sanity()
    419         except Exception as e:
    420             self.errors.append(e)
    421 
    422     def __tp_del__(self):
    423         """
    424         Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
    425         """
    426         try:
    427             if not self._cleaning:
    428                 self.tp_del_calls.append(id(self))
    429                 self.check_sanity()
    430                 self.side_effect()
    431         except Exception as e:
    432             self.errors.append(e)
    433 
    434 @with_tp_del
    435 class Legacy(LegacyBase):
    436     pass
    437 
    438 @with_tp_del
    439 class LegacyResurrector(LegacyBase):
    440 
    441     def side_effect(self):
    442         """
    443         Resurrect self by storing self in a class-wide list.
    444         """
    445         self.survivors.append(self)
    446 
    447 @with_tp_del
    448 class LegacySelfCycle(SelfCycleBase, LegacyBase):
    449     pass
    450 
    451 
    452 @support.cpython_only
    453 class LegacyFinalizationTest(TestBase, unittest.TestCase):
    454     """
    455     Test finalization of objects with a tp_del.
    456     """
    457 
    458     def tearDown(self):
    459         # These tests need to clean up a bit more, since they create
    460         # uncollectable objects.
    461         gc.garbage.clear()
    462         gc.collect()
    463         super().tearDown()
    464 
    465     def test_legacy(self):
    466         with SimpleBase.test():
    467             s = Legacy()
    468             ids = [id(s)]
    469             wr = weakref.ref(s)
    470             del s
    471             gc.collect()
    472             self.assert_del_calls(ids)
    473             self.assert_tp_del_calls(ids)
    474             self.assert_survivors([])
    475             self.assertIs(wr(), None)
    476             gc.collect()
    477             self.assert_del_calls(ids)
    478             self.assert_tp_del_calls(ids)
    479 
    480     def test_legacy_resurrect(self):
    481         with SimpleBase.test():
    482             s = LegacyResurrector()
    483             ids = [id(s)]
    484             wr = weakref.ref(s)
    485             del s
    486             gc.collect()
    487             self.assert_del_calls(ids)
    488             self.assert_tp_del_calls(ids)
    489             self.assert_survivors(ids)
    490             # weakrefs are cleared before tp_del is called.
    491             self.assertIs(wr(), None)
    492             self.clear_survivors()
    493             gc.collect()
    494             self.assert_del_calls(ids)
    495             self.assert_tp_del_calls(ids * 2)
    496             self.assert_survivors(ids)
    497         self.assertIs(wr(), None)
    498 
    499     def test_legacy_self_cycle(self):
    500         # Self-cycles with legacy finalizers end up in gc.garbage.
    501         with SimpleBase.test():
    502             s = LegacySelfCycle()
    503             ids = [id(s)]
    504             wr = weakref.ref(s)
    505             del s
    506             gc.collect()
    507             self.assert_del_calls([])
    508             self.assert_tp_del_calls([])
    509             self.assert_survivors([])
    510             self.assert_garbage(ids)
    511             self.assertIsNot(wr(), None)
    512             # Break the cycle to allow collection
    513             gc.garbage[0].ref = None
    514         self.assert_garbage([])
    515         self.assertIs(wr(), None)
    516 
    517 
    518 if __name__ == "__main__":
    519     unittest.main()
    520