Home | History | Annotate | Download | only in test
      1 import sys
      2 import unittest
      3 import io
      4 import atexit
      5 import os
      6 from test import support
      7 from test.support import script_helper
      8 
      9 ### helpers
     10 def h1():
     11     print("h1")
     12 
     13 def h2():
     14     print("h2")
     15 
     16 def h3():
     17     print("h3")
     18 
     19 def h4(*args, **kwargs):
     20     print("h4", args, kwargs)
     21 
     22 def raise1():
     23     raise TypeError
     24 
     25 def raise2():
     26     raise SystemError
     27 
     28 def exit():
     29     raise SystemExit
     30 
     31 
     32 class GeneralTest(unittest.TestCase):
     33 
     34     def setUp(self):
     35         self.save_stdout = sys.stdout
     36         self.save_stderr = sys.stderr
     37         self.stream = io.StringIO()
     38         sys.stdout = sys.stderr = self.stream
     39         atexit._clear()
     40 
     41     def tearDown(self):
     42         sys.stdout = self.save_stdout
     43         sys.stderr = self.save_stderr
     44         atexit._clear()
     45 
     46     def test_args(self):
     47         # be sure args are handled properly
     48         atexit.register(h1)
     49         atexit.register(h4)
     50         atexit.register(h4, 4, kw="abc")
     51         atexit._run_exitfuncs()
     52 
     53         self.assertEqual(self.stream.getvalue(),
     54                             "h4 (4,) {'kw': 'abc'}\nh4 () {}\nh1\n")
     55 
     56     def test_badargs(self):
     57         atexit.register(lambda: 1, 0, 0, (x for x in (1,2)), 0, 0)
     58         self.assertRaises(TypeError, atexit._run_exitfuncs)
     59 
     60     def test_order(self):
     61         # be sure handlers are executed in reverse order
     62         atexit.register(h1)
     63         atexit.register(h2)
     64         atexit.register(h3)
     65         atexit._run_exitfuncs()
     66 
     67         self.assertEqual(self.stream.getvalue(), "h3\nh2\nh1\n")
     68 
     69     def test_raise(self):
     70         # be sure raises are handled properly
     71         atexit.register(raise1)
     72         atexit.register(raise2)
     73 
     74         self.assertRaises(TypeError, atexit._run_exitfuncs)
     75 
     76     def test_raise_unnormalized(self):
     77         # Issue #10756: Make sure that an unnormalized exception is
     78         # handled properly
     79         atexit.register(lambda: 1 / 0)
     80 
     81         self.assertRaises(ZeroDivisionError, atexit._run_exitfuncs)
     82         self.assertIn("ZeroDivisionError", self.stream.getvalue())
     83 
     84     def test_exit(self):
     85         # be sure a SystemExit is handled properly
     86         atexit.register(exit)
     87 
     88         self.assertRaises(SystemExit, atexit._run_exitfuncs)
     89         self.assertEqual(self.stream.getvalue(), '')
     90 
     91     def test_print_tracebacks(self):
     92         # Issue #18776: the tracebacks should be printed when errors occur.
     93         def f():
     94             1/0  # one
     95         def g():
     96             1/0  # two
     97         def h():
     98             1/0  # three
     99         atexit.register(f)
    100         atexit.register(g)
    101         atexit.register(h)
    102 
    103         self.assertRaises(ZeroDivisionError, atexit._run_exitfuncs)
    104         stderr = self.stream.getvalue()
    105         self.assertEqual(stderr.count("ZeroDivisionError"), 3)
    106         self.assertIn("# one", stderr)
    107         self.assertIn("# two", stderr)
    108         self.assertIn("# three", stderr)
    109 
    110     def test_stress(self):
    111         a = [0]
    112         def inc():
    113             a[0] += 1
    114 
    115         for i in range(128):
    116             atexit.register(inc)
    117         atexit._run_exitfuncs()
    118 
    119         self.assertEqual(a[0], 128)
    120 
    121     def test_clear(self):
    122         a = [0]
    123         def inc():
    124             a[0] += 1
    125 
    126         atexit.register(inc)
    127         atexit._clear()
    128         atexit._run_exitfuncs()
    129 
    130         self.assertEqual(a[0], 0)
    131 
    132     def test_unregister(self):
    133         a = [0]
    134         def inc():
    135             a[0] += 1
    136         def dec():
    137             a[0] -= 1
    138 
    139         for i in range(4):
    140             atexit.register(inc)
    141         atexit.register(dec)
    142         atexit.unregister(inc)
    143         atexit._run_exitfuncs()
    144 
    145         self.assertEqual(a[0], -1)
    146 
    147     def test_bound_methods(self):
    148         l = []
    149         atexit.register(l.append, 5)
    150         atexit._run_exitfuncs()
    151         self.assertEqual(l, [5])
    152 
    153         atexit.unregister(l.append)
    154         atexit._run_exitfuncs()
    155         self.assertEqual(l, [5])
    156 
    157     def test_shutdown(self):
    158         # Actually test the shutdown mechanism in a subprocess
    159         code = """if 1:
    160             import atexit
    161 
    162             def f(msg):
    163                 print(msg)
    164 
    165             atexit.register(f, "one")
    166             atexit.register(f, "two")
    167             """
    168         res = script_helper.assert_python_ok("-c", code)
    169         self.assertEqual(res.out.decode().splitlines(), ["two", "one"])
    170         self.assertFalse(res.err)
    171 
    172 
    173 @support.cpython_only
    174 class SubinterpreterTest(unittest.TestCase):
    175 
    176     def test_callbacks_leak(self):
    177         # This test shows a leak in refleak mode if atexit doesn't
    178         # take care to free callbacks in its per-subinterpreter module
    179         # state.
    180         n = atexit._ncallbacks()
    181         code = r"""if 1:
    182             import atexit
    183             def f():
    184                 pass
    185             atexit.register(f)
    186             del atexit
    187             """
    188         ret = support.run_in_subinterp(code)
    189         self.assertEqual(ret, 0)
    190         self.assertEqual(atexit._ncallbacks(), n)
    191 
    192     def test_callbacks_leak_refcycle(self):
    193         # Similar to the above, but with a refcycle through the atexit
    194         # module.
    195         n = atexit._ncallbacks()
    196         code = r"""if 1:
    197             import atexit
    198             def f():
    199                 pass
    200             atexit.register(f)
    201             atexit.__atexit = atexit
    202             """
    203         ret = support.run_in_subinterp(code)
    204         self.assertEqual(ret, 0)
    205         self.assertEqual(atexit._ncallbacks(), n)
    206 
    207     def test_callback_on_subinterpreter_teardown(self):
    208         # This tests if a callback is called on
    209         # subinterpreter teardown.
    210         expected = b"The test has passed!"
    211         r, w = os.pipe()
    212 
    213         code = r"""if 1:
    214             import os
    215             import atexit
    216             def callback():
    217                 os.write({:d}, b"The test has passed!")
    218             atexit.register(callback)
    219         """.format(w)
    220         ret = support.run_in_subinterp(code)
    221         os.close(w)
    222         self.assertEqual(os.read(r, len(expected)), expected)
    223         os.close(r)
    224 
    225 
    226 if __name__ == "__main__":
    227     unittest.main()
    228