Home | History | Annotate | Download | only in test
      1 """Unit tests for contextlib.py, and other context managers."""
      2 
      3 import io
      4 import sys
      5 import tempfile
      6 import unittest
      7 from contextlib import *  # Tests __all__
      8 from test import support
      9 try:
     10     import threading
     11 except ImportError:
     12     threading = None
     13 
     14 
     15 class TestAbstractContextManager(unittest.TestCase):
     16 
     17     def test_enter(self):
     18         class DefaultEnter(AbstractContextManager):
     19             def __exit__(self, *args):
     20                 super().__exit__(*args)
     21 
     22         manager = DefaultEnter()
     23         self.assertIs(manager.__enter__(), manager)
     24 
     25     def test_exit_is_abstract(self):
     26         class MissingExit(AbstractContextManager):
     27             pass
     28 
     29         with self.assertRaises(TypeError):
     30             MissingExit()
     31 
     32     def test_structural_subclassing(self):
     33         class ManagerFromScratch:
     34             def __enter__(self):
     35                 return self
     36             def __exit__(self, exc_type, exc_value, traceback):
     37                 return None
     38 
     39         self.assertTrue(issubclass(ManagerFromScratch, AbstractContextManager))
     40 
     41         class DefaultEnter(AbstractContextManager):
     42             def __exit__(self, *args):
     43                 super().__exit__(*args)
     44 
     45         self.assertTrue(issubclass(DefaultEnter, AbstractContextManager))
     46 
     47 
     48 class ContextManagerTestCase(unittest.TestCase):
     49 
     50     def test_contextmanager_plain(self):
     51         state = []
     52         @contextmanager
     53         def woohoo():
     54             state.append(1)
     55             yield 42
     56             state.append(999)
     57         with woohoo() as x:
     58             self.assertEqual(state, [1])
     59             self.assertEqual(x, 42)
     60             state.append(x)
     61         self.assertEqual(state, [1, 42, 999])
     62 
     63     def test_contextmanager_finally(self):
     64         state = []
     65         @contextmanager
     66         def woohoo():
     67             state.append(1)
     68             try:
     69                 yield 42
     70             finally:
     71                 state.append(999)
     72         with self.assertRaises(ZeroDivisionError):
     73             with woohoo() as x:
     74                 self.assertEqual(state, [1])
     75                 self.assertEqual(x, 42)
     76                 state.append(x)
     77                 raise ZeroDivisionError()
     78         self.assertEqual(state, [1, 42, 999])
     79 
     80     def test_contextmanager_no_reraise(self):
     81         @contextmanager
     82         def whee():
     83             yield
     84         ctx = whee()
     85         ctx.__enter__()
     86         # Calling __exit__ should not result in an exception
     87         self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None))
     88 
     89     def test_contextmanager_trap_yield_after_throw(self):
     90         @contextmanager
     91         def whoo():
     92             try:
     93                 yield
     94             except:
     95                 yield
     96         ctx = whoo()
     97         ctx.__enter__()
     98         self.assertRaises(
     99             RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
    100         )
    101 
    102     def test_contextmanager_except(self):
    103         state = []
    104         @contextmanager
    105         def woohoo():
    106             state.append(1)
    107             try:
    108                 yield 42
    109             except ZeroDivisionError as e:
    110                 state.append(e.args[0])
    111                 self.assertEqual(state, [1, 42, 999])
    112         with woohoo() as x:
    113             self.assertEqual(state, [1])
    114             self.assertEqual(x, 42)
    115             state.append(x)
    116             raise ZeroDivisionError(999)
    117         self.assertEqual(state, [1, 42, 999])
    118 
    119     def test_contextmanager_except_stopiter(self):
    120         stop_exc = StopIteration('spam')
    121         @contextmanager
    122         def woohoo():
    123             yield
    124         try:
    125             with self.assertWarnsRegex(DeprecationWarning,
    126                                        "StopIteration"):
    127                 with woohoo():
    128                     raise stop_exc
    129         except Exception as ex:
    130             self.assertIs(ex, stop_exc)
    131         else:
    132             self.fail('StopIteration was suppressed')
    133 
    134     def test_contextmanager_except_pep479(self):
    135         code = """\
    136 from __future__ import generator_stop
    137 from contextlib import contextmanager
    138 @contextmanager
    139 def woohoo():
    140     yield
    141 """
    142         locals = {}
    143         exec(code, locals, locals)
    144         woohoo = locals['woohoo']
    145 
    146         stop_exc = StopIteration('spam')
    147         try:
    148             with woohoo():
    149                 raise stop_exc
    150         except Exception as ex:
    151             self.assertIs(ex, stop_exc)
    152         else:
    153             self.fail('StopIteration was suppressed')
    154 
    155     def _create_contextmanager_attribs(self):
    156         def attribs(**kw):
    157             def decorate(func):
    158                 for k,v in kw.items():
    159                     setattr(func,k,v)
    160                 return func
    161             return decorate
    162         @contextmanager
    163         @attribs(foo='bar')
    164         def baz(spam):
    165             """Whee!"""
    166         return baz
    167 
    168     def test_contextmanager_attribs(self):
    169         baz = self._create_contextmanager_attribs()
    170         self.assertEqual(baz.__name__,'baz')
    171         self.assertEqual(baz.foo, 'bar')
    172 
    173     @support.requires_docstrings
    174     def test_contextmanager_doc_attrib(self):
    175         baz = self._create_contextmanager_attribs()
    176         self.assertEqual(baz.__doc__, "Whee!")
    177 
    178     @support.requires_docstrings
    179     def test_instance_docstring_given_cm_docstring(self):
    180         baz = self._create_contextmanager_attribs()(None)
    181         self.assertEqual(baz.__doc__, "Whee!")
    182 
    183     def test_keywords(self):
    184         # Ensure no keyword arguments are inhibited
    185         @contextmanager
    186         def woohoo(self, func, args, kwds):
    187             yield (self, func, args, kwds)
    188         with woohoo(self=11, func=22, args=33, kwds=44) as target:
    189             self.assertEqual(target, (11, 22, 33, 44))
    190 
    191 
    192 class ClosingTestCase(unittest.TestCase):
    193 
    194     @support.requires_docstrings
    195     def test_instance_docs(self):
    196         # Issue 19330: ensure context manager instances have good docstrings
    197         cm_docstring = closing.__doc__
    198         obj = closing(None)
    199         self.assertEqual(obj.__doc__, cm_docstring)
    200 
    201     def test_closing(self):
    202         state = []
    203         class C:
    204             def close(self):
    205                 state.append(1)
    206         x = C()
    207         self.assertEqual(state, [])
    208         with closing(x) as y:
    209             self.assertEqual(x, y)
    210         self.assertEqual(state, [1])
    211 
    212     def test_closing_error(self):
    213         state = []
    214         class C:
    215             def close(self):
    216                 state.append(1)
    217         x = C()
    218         self.assertEqual(state, [])
    219         with self.assertRaises(ZeroDivisionError):
    220             with closing(x) as y:
    221                 self.assertEqual(x, y)
    222                 1 / 0
    223         self.assertEqual(state, [1])
    224 
    225 class FileContextTestCase(unittest.TestCase):
    226 
    227     def testWithOpen(self):
    228         tfn = tempfile.mktemp()
    229         try:
    230             f = None
    231             with open(tfn, "w") as f:
    232                 self.assertFalse(f.closed)
    233                 f.write("Booh\n")
    234             self.assertTrue(f.closed)
    235             f = None
    236             with self.assertRaises(ZeroDivisionError):
    237                 with open(tfn, "r") as f:
    238                     self.assertFalse(f.closed)
    239                     self.assertEqual(f.read(), "Booh\n")
    240                     1 / 0
    241             self.assertTrue(f.closed)
    242         finally:
    243             support.unlink(tfn)
    244 
    245 @unittest.skipUnless(threading, 'Threading required for this test.')
    246 class LockContextTestCase(unittest.TestCase):
    247 
    248     def boilerPlate(self, lock, locked):
    249         self.assertFalse(locked())
    250         with lock:
    251             self.assertTrue(locked())
    252         self.assertFalse(locked())
    253         with self.assertRaises(ZeroDivisionError):
    254             with lock:
    255                 self.assertTrue(locked())
    256                 1 / 0
    257         self.assertFalse(locked())
    258 
    259     def testWithLock(self):
    260         lock = threading.Lock()
    261         self.boilerPlate(lock, lock.locked)
    262 
    263     def testWithRLock(self):
    264         lock = threading.RLock()
    265         self.boilerPlate(lock, lock._is_owned)
    266 
    267     def testWithCondition(self):
    268         lock = threading.Condition()
    269         def locked():
    270             return lock._is_owned()
    271         self.boilerPlate(lock, locked)
    272 
    273     def testWithSemaphore(self):
    274         lock = threading.Semaphore()
    275         def locked():
    276             if lock.acquire(False):
    277                 lock.release()
    278                 return False
    279             else:
    280                 return True
    281         self.boilerPlate(lock, locked)
    282 
    283     def testWithBoundedSemaphore(self):
    284         lock = threading.BoundedSemaphore()
    285         def locked():
    286             if lock.acquire(False):
    287                 lock.release()
    288                 return False
    289             else:
    290                 return True
    291         self.boilerPlate(lock, locked)
    292 
    293 
    294 class mycontext(ContextDecorator):
    295     """Example decoration-compatible context manager for testing"""
    296     started = False
    297     exc = None
    298     catch = False
    299 
    300     def __enter__(self):
    301         self.started = True
    302         return self
    303 
    304     def __exit__(self, *exc):
    305         self.exc = exc
    306         return self.catch
    307 
    308 
    309 class TestContextDecorator(unittest.TestCase):
    310 
    311     @support.requires_docstrings
    312     def test_instance_docs(self):
    313         # Issue 19330: ensure context manager instances have good docstrings
    314         cm_docstring = mycontext.__doc__
    315         obj = mycontext()
    316         self.assertEqual(obj.__doc__, cm_docstring)
    317 
    318     def test_contextdecorator(self):
    319         context = mycontext()
    320         with context as result:
    321             self.assertIs(result, context)
    322             self.assertTrue(context.started)
    323 
    324         self.assertEqual(context.exc, (None, None, None))
    325 
    326 
    327     def test_contextdecorator_with_exception(self):
    328         context = mycontext()
    329 
    330         with self.assertRaisesRegex(NameError, 'foo'):
    331             with context:
    332                 raise NameError('foo')
    333         self.assertIsNotNone(context.exc)
    334         self.assertIs(context.exc[0], NameError)
    335 
    336         context = mycontext()
    337         context.catch = True
    338         with context:
    339             raise NameError('foo')
    340         self.assertIsNotNone(context.exc)
    341         self.assertIs(context.exc[0], NameError)
    342 
    343 
    344     def test_decorator(self):
    345         context = mycontext()
    346 
    347         @context
    348         def test():
    349             self.assertIsNone(context.exc)
    350             self.assertTrue(context.started)
    351         test()
    352         self.assertEqual(context.exc, (None, None, None))
    353 
    354 
    355     def test_decorator_with_exception(self):
    356         context = mycontext()
    357 
    358         @context
    359         def test():
    360             self.assertIsNone(context.exc)
    361             self.assertTrue(context.started)
    362             raise NameError('foo')
    363 
    364         with self.assertRaisesRegex(NameError, 'foo'):
    365             test()
    366         self.assertIsNotNone(context.exc)
    367         self.assertIs(context.exc[0], NameError)
    368 
    369 
    370     def test_decorating_method(self):
    371         context = mycontext()
    372 
    373         class Test(object):
    374 
    375             @context
    376             def method(self, a, b, c=None):
    377                 self.a = a
    378                 self.b = b
    379                 self.c = c
    380 
    381         # these tests are for argument passing when used as a decorator
    382         test = Test()
    383         test.method(1, 2)
    384         self.assertEqual(test.a, 1)
    385         self.assertEqual(test.b, 2)
    386         self.assertEqual(test.c, None)
    387 
    388         test = Test()
    389         test.method('a', 'b', 'c')
    390         self.assertEqual(test.a, 'a')
    391         self.assertEqual(test.b, 'b')
    392         self.assertEqual(test.c, 'c')
    393 
    394         test = Test()
    395         test.method(a=1, b=2)
    396         self.assertEqual(test.a, 1)
    397         self.assertEqual(test.b, 2)
    398 
    399 
    400     def test_typo_enter(self):
    401         class mycontext(ContextDecorator):
    402             def __unter__(self):
    403                 pass
    404             def __exit__(self, *exc):
    405                 pass
    406 
    407         with self.assertRaises(AttributeError):
    408             with mycontext():
    409                 pass
    410 
    411 
    412     def test_typo_exit(self):
    413         class mycontext(ContextDecorator):
    414             def __enter__(self):
    415                 pass
    416             def __uxit__(self, *exc):
    417                 pass
    418 
    419         with self.assertRaises(AttributeError):
    420             with mycontext():
    421                 pass
    422 
    423 
    424     def test_contextdecorator_as_mixin(self):
    425         class somecontext(object):
    426             started = False
    427             exc = None
    428 
    429             def __enter__(self):
    430                 self.started = True
    431                 return self
    432 
    433             def __exit__(self, *exc):
    434                 self.exc = exc
    435 
    436         class mycontext(somecontext, ContextDecorator):
    437             pass
    438 
    439         context = mycontext()
    440         @context
    441         def test():
    442             self.assertIsNone(context.exc)
    443             self.assertTrue(context.started)
    444         test()
    445         self.assertEqual(context.exc, (None, None, None))
    446 
    447 
    448     def test_contextmanager_as_decorator(self):
    449         @contextmanager
    450         def woohoo(y):
    451             state.append(y)
    452             yield
    453             state.append(999)
    454 
    455         state = []
    456         @woohoo(1)
    457         def test(x):
    458             self.assertEqual(state, [1])
    459             state.append(x)
    460         test('something')
    461         self.assertEqual(state, [1, 'something', 999])
    462 
    463         # Issue #11647: Ensure the decorated function is 'reusable'
    464         state = []
    465         test('something else')
    466         self.assertEqual(state, [1, 'something else', 999])
    467 
    468 
    469 class TestExitStack(unittest.TestCase):
    470 
    471     @support.requires_docstrings
    472     def test_instance_docs(self):
    473         # Issue 19330: ensure context manager instances have good docstrings
    474         cm_docstring = ExitStack.__doc__
    475         obj = ExitStack()
    476         self.assertEqual(obj.__doc__, cm_docstring)
    477 
    478     def test_no_resources(self):
    479         with ExitStack():
    480             pass
    481 
    482     def test_callback(self):
    483         expected = [
    484             ((), {}),
    485             ((1,), {}),
    486             ((1,2), {}),
    487             ((), dict(example=1)),
    488             ((1,), dict(example=1)),
    489             ((1,2), dict(example=1)),
    490         ]
    491         result = []
    492         def _exit(*args, **kwds):
    493             """Test metadata propagation"""
    494             result.append((args, kwds))
    495         with ExitStack() as stack:
    496             for args, kwds in reversed(expected):
    497                 if args and kwds:
    498                     f = stack.callback(_exit, *args, **kwds)
    499                 elif args:
    500                     f = stack.callback(_exit, *args)
    501                 elif kwds:
    502                     f = stack.callback(_exit, **kwds)
    503                 else:
    504                     f = stack.callback(_exit)
    505                 self.assertIs(f, _exit)
    506             for wrapper in stack._exit_callbacks:
    507                 self.assertIs(wrapper.__wrapped__, _exit)
    508                 self.assertNotEqual(wrapper.__name__, _exit.__name__)
    509                 self.assertIsNone(wrapper.__doc__, _exit.__doc__)
    510         self.assertEqual(result, expected)
    511 
    512     def test_push(self):
    513         exc_raised = ZeroDivisionError
    514         def _expect_exc(exc_type, exc, exc_tb):
    515             self.assertIs(exc_type, exc_raised)
    516         def _suppress_exc(*exc_details):
    517             return True
    518         def _expect_ok(exc_type, exc, exc_tb):
    519             self.assertIsNone(exc_type)
    520             self.assertIsNone(exc)
    521             self.assertIsNone(exc_tb)
    522         class ExitCM(object):
    523             def __init__(self, check_exc):
    524                 self.check_exc = check_exc
    525             def __enter__(self):
    526                 self.fail("Should not be called!")
    527             def __exit__(self, *exc_details):
    528                 self.check_exc(*exc_details)
    529         with ExitStack() as stack:
    530             stack.push(_expect_ok)
    531             self.assertIs(stack._exit_callbacks[-1], _expect_ok)
    532             cm = ExitCM(_expect_ok)
    533             stack.push(cm)
    534             self.assertIs(stack._exit_callbacks[-1].__self__, cm)
    535             stack.push(_suppress_exc)
    536             self.assertIs(stack._exit_callbacks[-1], _suppress_exc)
    537             cm = ExitCM(_expect_exc)
    538             stack.push(cm)
    539             self.assertIs(stack._exit_callbacks[-1].__self__, cm)
    540             stack.push(_expect_exc)
    541             self.assertIs(stack._exit_callbacks[-1], _expect_exc)
    542             stack.push(_expect_exc)
    543             self.assertIs(stack._exit_callbacks[-1], _expect_exc)
    544             1/0
    545 
    546     def test_enter_context(self):
    547         class TestCM(object):
    548             def __enter__(self):
    549                 result.append(1)
    550             def __exit__(self, *exc_details):
    551                 result.append(3)
    552 
    553         result = []
    554         cm = TestCM()
    555         with ExitStack() as stack:
    556             @stack.callback  # Registered first => cleaned up last
    557             def _exit():
    558                 result.append(4)
    559             self.assertIsNotNone(_exit)
    560             stack.enter_context(cm)
    561             self.assertIs(stack._exit_callbacks[-1].__self__, cm)
    562             result.append(2)
    563         self.assertEqual(result, [1, 2, 3, 4])
    564 
    565     def test_close(self):
    566         result = []
    567         with ExitStack() as stack:
    568             @stack.callback
    569             def _exit():
    570                 result.append(1)
    571             self.assertIsNotNone(_exit)
    572             stack.close()
    573             result.append(2)
    574         self.assertEqual(result, [1, 2])
    575 
    576     def test_pop_all(self):
    577         result = []
    578         with ExitStack() as stack:
    579             @stack.callback
    580             def _exit():
    581                 result.append(3)
    582             self.assertIsNotNone(_exit)
    583             new_stack = stack.pop_all()
    584             result.append(1)
    585         result.append(2)
    586         new_stack.close()
    587         self.assertEqual(result, [1, 2, 3])
    588 
    589     def test_exit_raise(self):
    590         with self.assertRaises(ZeroDivisionError):
    591             with ExitStack() as stack:
    592                 stack.push(lambda *exc: False)
    593                 1/0
    594 
    595     def test_exit_suppress(self):
    596         with ExitStack() as stack:
    597             stack.push(lambda *exc: True)
    598             1/0
    599 
    600     def test_exit_exception_chaining_reference(self):
    601         # Sanity check to make sure that ExitStack chaining matches
    602         # actual nested with statements
    603         class RaiseExc:
    604             def __init__(self, exc):
    605                 self.exc = exc
    606             def __enter__(self):
    607                 return self
    608             def __exit__(self, *exc_details):
    609                 raise self.exc
    610 
    611         class RaiseExcWithContext:
    612             def __init__(self, outer, inner):
    613                 self.outer = outer
    614                 self.inner = inner
    615             def __enter__(self):
    616                 return self
    617             def __exit__(self, *exc_details):
    618                 try:
    619                     raise self.inner
    620                 except:
    621                     raise self.outer
    622 
    623         class SuppressExc:
    624             def __enter__(self):
    625                 return self
    626             def __exit__(self, *exc_details):
    627                 type(self).saved_details = exc_details
    628                 return True
    629 
    630         try:
    631             with RaiseExc(IndexError):
    632                 with RaiseExcWithContext(KeyError, AttributeError):
    633                     with SuppressExc():
    634                         with RaiseExc(ValueError):
    635                             1 / 0
    636         except IndexError as exc:
    637             self.assertIsInstance(exc.__context__, KeyError)
    638             self.assertIsInstance(exc.__context__.__context__, AttributeError)
    639             # Inner exceptions were suppressed
    640             self.assertIsNone(exc.__context__.__context__.__context__)
    641         else:
    642             self.fail("Expected IndexError, but no exception was raised")
    643         # Check the inner exceptions
    644         inner_exc = SuppressExc.saved_details[1]
    645         self.assertIsInstance(inner_exc, ValueError)
    646         self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
    647 
    648     def test_exit_exception_chaining(self):
    649         # Ensure exception chaining matches the reference behaviour
    650         def raise_exc(exc):
    651             raise exc
    652 
    653         saved_details = None
    654         def suppress_exc(*exc_details):
    655             nonlocal saved_details
    656             saved_details = exc_details
    657             return True
    658 
    659         try:
    660             with ExitStack() as stack:
    661                 stack.callback(raise_exc, IndexError)
    662                 stack.callback(raise_exc, KeyError)
    663                 stack.callback(raise_exc, AttributeError)
    664                 stack.push(suppress_exc)
    665                 stack.callback(raise_exc, ValueError)
    666                 1 / 0
    667         except IndexError as exc:
    668             self.assertIsInstance(exc.__context__, KeyError)
    669             self.assertIsInstance(exc.__context__.__context__, AttributeError)
    670             # Inner exceptions were suppressed
    671             self.assertIsNone(exc.__context__.__context__.__context__)
    672         else:
    673             self.fail("Expected IndexError, but no exception was raised")
    674         # Check the inner exceptions
    675         inner_exc = saved_details[1]
    676         self.assertIsInstance(inner_exc, ValueError)
    677         self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
    678 
    679     def test_exit_exception_non_suppressing(self):
    680         # http://bugs.python.org/issue19092
    681         def raise_exc(exc):
    682             raise exc
    683 
    684         def suppress_exc(*exc_details):
    685             return True
    686 
    687         try:
    688             with ExitStack() as stack:
    689                 stack.callback(lambda: None)
    690                 stack.callback(raise_exc, IndexError)
    691         except Exception as exc:
    692             self.assertIsInstance(exc, IndexError)
    693         else:
    694             self.fail("Expected IndexError, but no exception was raised")
    695 
    696         try:
    697             with ExitStack() as stack:
    698                 stack.callback(raise_exc, KeyError)
    699                 stack.push(suppress_exc)
    700                 stack.callback(raise_exc, IndexError)
    701         except Exception as exc:
    702             self.assertIsInstance(exc, KeyError)
    703         else:
    704             self.fail("Expected KeyError, but no exception was raised")
    705 
    706     def test_exit_exception_with_correct_context(self):
    707         # http://bugs.python.org/issue20317
    708         @contextmanager
    709         def gets_the_context_right(exc):
    710             try:
    711                 yield
    712             finally:
    713                 raise exc
    714 
    715         exc1 = Exception(1)
    716         exc2 = Exception(2)
    717         exc3 = Exception(3)
    718         exc4 = Exception(4)
    719 
    720         # The contextmanager already fixes the context, so prior to the
    721         # fix, ExitStack would try to fix it *again* and get into an
    722         # infinite self-referential loop
    723         try:
    724             with ExitStack() as stack:
    725                 stack.enter_context(gets_the_context_right(exc4))
    726                 stack.enter_context(gets_the_context_right(exc3))
    727                 stack.enter_context(gets_the_context_right(exc2))
    728                 raise exc1
    729         except Exception as exc:
    730             self.assertIs(exc, exc4)
    731             self.assertIs(exc.__context__, exc3)
    732             self.assertIs(exc.__context__.__context__, exc2)
    733             self.assertIs(exc.__context__.__context__.__context__, exc1)
    734             self.assertIsNone(
    735                        exc.__context__.__context__.__context__.__context__)
    736 
    737     def test_exit_exception_with_existing_context(self):
    738         # Addresses a lack of test coverage discovered after checking in a
    739         # fix for issue 20317 that still contained debugging code.
    740         def raise_nested(inner_exc, outer_exc):
    741             try:
    742                 raise inner_exc
    743             finally:
    744                 raise outer_exc
    745         exc1 = Exception(1)
    746         exc2 = Exception(2)
    747         exc3 = Exception(3)
    748         exc4 = Exception(4)
    749         exc5 = Exception(5)
    750         try:
    751             with ExitStack() as stack:
    752                 stack.callback(raise_nested, exc4, exc5)
    753                 stack.callback(raise_nested, exc2, exc3)
    754                 raise exc1
    755         except Exception as exc:
    756             self.assertIs(exc, exc5)
    757             self.assertIs(exc.__context__, exc4)
    758             self.assertIs(exc.__context__.__context__, exc3)
    759             self.assertIs(exc.__context__.__context__.__context__, exc2)
    760             self.assertIs(
    761                  exc.__context__.__context__.__context__.__context__, exc1)
    762             self.assertIsNone(
    763                 exc.__context__.__context__.__context__.__context__.__context__)
    764 
    765 
    766 
    767     def test_body_exception_suppress(self):
    768         def suppress_exc(*exc_details):
    769             return True
    770         try:
    771             with ExitStack() as stack:
    772                 stack.push(suppress_exc)
    773                 1/0
    774         except IndexError as exc:
    775             self.fail("Expected no exception, got IndexError")
    776 
    777     def test_exit_exception_chaining_suppress(self):
    778         with ExitStack() as stack:
    779             stack.push(lambda *exc: True)
    780             stack.push(lambda *exc: 1/0)
    781             stack.push(lambda *exc: {}[1])
    782 
    783     def test_excessive_nesting(self):
    784         # The original implementation would die with RecursionError here
    785         with ExitStack() as stack:
    786             for i in range(10000):
    787                 stack.callback(int)
    788 
    789     def test_instance_bypass(self):
    790         class Example(object): pass
    791         cm = Example()
    792         cm.__exit__ = object()
    793         stack = ExitStack()
    794         self.assertRaises(AttributeError, stack.enter_context, cm)
    795         stack.push(cm)
    796         self.assertIs(stack._exit_callbacks[-1], cm)
    797 
    798     def test_dont_reraise_RuntimeError(self):
    799         # https://bugs.python.org/issue27122
    800         class UniqueException(Exception): pass
    801         class UniqueRuntimeError(RuntimeError): pass
    802 
    803         @contextmanager
    804         def second():
    805             try:
    806                 yield 1
    807             except Exception as exc:
    808                 raise UniqueException("new exception") from exc
    809 
    810         @contextmanager
    811         def first():
    812             try:
    813                 yield 1
    814             except Exception as exc:
    815                 raise exc
    816 
    817         # The UniqueRuntimeError should be caught by second()'s exception
    818         # handler which chain raised a new UniqueException.
    819         with self.assertRaises(UniqueException) as err_ctx:
    820             with ExitStack() as es_ctx:
    821                 es_ctx.enter_context(second())
    822                 es_ctx.enter_context(first())
    823                 raise UniqueRuntimeError("please no infinite loop.")
    824 
    825         exc = err_ctx.exception
    826         self.assertIsInstance(exc, UniqueException)
    827         self.assertIsInstance(exc.__context__, UniqueRuntimeError)
    828         self.assertIsNone(exc.__context__.__context__)
    829         self.assertIsNone(exc.__context__.__cause__)
    830         self.assertIs(exc.__cause__, exc.__context__)
    831 
    832 
    833 class TestRedirectStream:
    834 
    835     redirect_stream = None
    836     orig_stream = None
    837 
    838     @support.requires_docstrings
    839     def test_instance_docs(self):
    840         # Issue 19330: ensure context manager instances have good docstrings
    841         cm_docstring = self.redirect_stream.__doc__
    842         obj = self.redirect_stream(None)
    843         self.assertEqual(obj.__doc__, cm_docstring)
    844 
    845     def test_no_redirect_in_init(self):
    846         orig_stdout = getattr(sys, self.orig_stream)
    847         self.redirect_stream(None)
    848         self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
    849 
    850     def test_redirect_to_string_io(self):
    851         f = io.StringIO()
    852         msg = "Consider an API like help(), which prints directly to stdout"
    853         orig_stdout = getattr(sys, self.orig_stream)
    854         with self.redirect_stream(f):
    855             print(msg, file=getattr(sys, self.orig_stream))
    856         self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
    857         s = f.getvalue().strip()
    858         self.assertEqual(s, msg)
    859 
    860     def test_enter_result_is_target(self):
    861         f = io.StringIO()
    862         with self.redirect_stream(f) as enter_result:
    863             self.assertIs(enter_result, f)
    864 
    865     def test_cm_is_reusable(self):
    866         f = io.StringIO()
    867         write_to_f = self.redirect_stream(f)
    868         orig_stdout = getattr(sys, self.orig_stream)
    869         with write_to_f:
    870             print("Hello", end=" ", file=getattr(sys, self.orig_stream))
    871         with write_to_f:
    872             print("World!", file=getattr(sys, self.orig_stream))
    873         self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
    874         s = f.getvalue()
    875         self.assertEqual(s, "Hello World!\n")
    876 
    877     def test_cm_is_reentrant(self):
    878         f = io.StringIO()
    879         write_to_f = self.redirect_stream(f)
    880         orig_stdout = getattr(sys, self.orig_stream)
    881         with write_to_f:
    882             print("Hello", end=" ", file=getattr(sys, self.orig_stream))
    883             with write_to_f:
    884                 print("World!", file=getattr(sys, self.orig_stream))
    885         self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
    886         s = f.getvalue()
    887         self.assertEqual(s, "Hello World!\n")
    888 
    889 
    890 class TestRedirectStdout(TestRedirectStream, unittest.TestCase):
    891 
    892     redirect_stream = redirect_stdout
    893     orig_stream = "stdout"
    894 
    895 
    896 class TestRedirectStderr(TestRedirectStream, unittest.TestCase):
    897 
    898     redirect_stream = redirect_stderr
    899     orig_stream = "stderr"
    900 
    901 
    902 class TestSuppress(unittest.TestCase):
    903 
    904     @support.requires_docstrings
    905     def test_instance_docs(self):
    906         # Issue 19330: ensure context manager instances have good docstrings
    907         cm_docstring = suppress.__doc__
    908         obj = suppress()
    909         self.assertEqual(obj.__doc__, cm_docstring)
    910 
    911     def test_no_result_from_enter(self):
    912         with suppress(ValueError) as enter_result:
    913             self.assertIsNone(enter_result)
    914 
    915     def test_no_exception(self):
    916         with suppress(ValueError):
    917             self.assertEqual(pow(2, 5), 32)
    918 
    919     def test_exact_exception(self):
    920         with suppress(TypeError):
    921             len(5)
    922 
    923     def test_exception_hierarchy(self):
    924         with suppress(LookupError):
    925             'Hello'[50]
    926 
    927     def test_other_exception(self):
    928         with self.assertRaises(ZeroDivisionError):
    929             with suppress(TypeError):
    930                 1/0
    931 
    932     def test_no_args(self):
    933         with self.assertRaises(ZeroDivisionError):
    934             with suppress():
    935                 1/0
    936 
    937     def test_multiple_exception_args(self):
    938         with suppress(ZeroDivisionError, TypeError):
    939             1/0
    940         with suppress(ZeroDivisionError, TypeError):
    941             len(5)
    942 
    943     def test_cm_is_reentrant(self):
    944         ignore_exceptions = suppress(Exception)
    945         with ignore_exceptions:
    946             pass
    947         with ignore_exceptions:
    948             len(5)
    949         with ignore_exceptions:
    950             with ignore_exceptions: # Check nested usage
    951                 len(5)
    952             outer_continued = True
    953             1/0
    954         self.assertTrue(outer_continued)
    955 
    956 if __name__ == "__main__":
    957     unittest.main()
    958