Home | History | Annotate | Download | only in test
      1 """Supporting definitions for the Python regression tests."""
      2 
      3 if __name__ != 'test.test_support':
      4     raise ImportError('test_support must be imported from the test package')
      5 
      6 import contextlib
      7 import errno
      8 import functools
      9 import gc
     10 import socket
     11 import sys
     12 import os
     13 import platform
     14 import shutil
     15 import warnings
     16 import unittest
     17 import importlib
     18 import UserDict
     19 import re
     20 import time
     21 import struct
     22 import _testcapi
     23 import sysconfig
     24 try:
     25     import thread
     26 except ImportError:
     27     thread = None
     28 
     29 __all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
     30            "verbose", "use_resources", "max_memuse", "record_original_stdout",
     31            "get_original_stdout", "unload", "unlink", "rmtree", "forget",
     32            "is_resource_enabled", "requires", "find_unused_port", "bind_port",
     33            "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
     34            "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
     35            "open_urlresource", "check_warnings", "check_py3k_warnings",
     36            "CleanImport", "EnvironmentVarGuard", "captured_output",
     37            "captured_stdout", "TransientResource", "transient_internet",
     38            "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
     39            "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
     40            "threading_cleanup", "reap_children", "cpython_only",
     41            "check_impl_detail", "get_attribute", "py3k_bytes",
     42            "import_fresh_module", "threading_cleanup", "reap_children",
     43            "strip_python_stderr"]
     44 
     45 class Error(Exception):
     46     """Base class for regression test exceptions."""
     47 
     48 class TestFailed(Error):
     49     """Test failed."""
     50 
     51 class ResourceDenied(unittest.SkipTest):
     52     """Test skipped because it requested a disallowed resource.
     53 
     54     This is raised when a test calls requires() for a resource that
     55     has not been enabled.  It is used to distinguish between expected
     56     and unexpected skips.
     57     """
     58 
     59 @contextlib.contextmanager
     60 def _ignore_deprecated_imports(ignore=True):
     61     """Context manager to suppress package and module deprecation
     62     warnings when importing them.
     63 
     64     If ignore is False, this context manager has no effect."""
     65     if ignore:
     66         with warnings.catch_warnings():
     67             warnings.filterwarnings("ignore", ".+ (module|package)",
     68                                     DeprecationWarning)
     69             yield
     70     else:
     71         yield
     72 
     73 
     74 def import_module(name, deprecated=False):
     75     """Import and return the module to be tested, raising SkipTest if
     76     it is not available.
     77 
     78     If deprecated is True, any module or package deprecation messages
     79     will be suppressed."""
     80     with _ignore_deprecated_imports(deprecated):
     81         try:
     82             return importlib.import_module(name)
     83         except ImportError, msg:
     84             raise unittest.SkipTest(str(msg))
     85 
     86 
     87 def _save_and_remove_module(name, orig_modules):
     88     """Helper function to save and remove a module from sys.modules
     89 
     90        Raise ImportError if the module can't be imported."""
     91     # try to import the module and raise an error if it can't be imported
     92     if name not in sys.modules:
     93         __import__(name)
     94         del sys.modules[name]
     95     for modname in list(sys.modules):
     96         if modname == name or modname.startswith(name + '.'):
     97             orig_modules[modname] = sys.modules[modname]
     98             del sys.modules[modname]
     99 
    100 def _save_and_block_module(name, orig_modules):
    101     """Helper function to save and block a module in sys.modules
    102 
    103        Return True if the module was in sys.modules, False otherwise."""
    104     saved = True
    105     try:
    106         orig_modules[name] = sys.modules[name]
    107     except KeyError:
    108         saved = False
    109     sys.modules[name] = None
    110     return saved
    111 
    112 
    113 def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
    114     """Imports and returns a module, deliberately bypassing the sys.modules cache
    115     and importing a fresh copy of the module. Once the import is complete,
    116     the sys.modules cache is restored to its original state.
    117 
    118     Modules named in fresh are also imported anew if needed by the import.
    119     If one of these modules can't be imported, None is returned.
    120 
    121     Importing of modules named in blocked is prevented while the fresh import
    122     takes place.
    123 
    124     If deprecated is True, any module or package deprecation messages
    125     will be suppressed."""
    126     # NOTE: test_heapq, test_json, and test_warnings include extra sanity
    127     # checks to make sure that this utility function is working as expected
    128     with _ignore_deprecated_imports(deprecated):
    129         # Keep track of modules saved for later restoration as well
    130         # as those which just need a blocking entry removed
    131         orig_modules = {}
    132         names_to_remove = []
    133         _save_and_remove_module(name, orig_modules)
    134         try:
    135             for fresh_name in fresh:
    136                 _save_and_remove_module(fresh_name, orig_modules)
    137             for blocked_name in blocked:
    138                 if not _save_and_block_module(blocked_name, orig_modules):
    139                     names_to_remove.append(blocked_name)
    140             fresh_module = importlib.import_module(name)
    141         except ImportError:
    142             fresh_module = None
    143         finally:
    144             for orig_name, module in orig_modules.items():
    145                 sys.modules[orig_name] = module
    146             for name_to_remove in names_to_remove:
    147                 del sys.modules[name_to_remove]
    148         return fresh_module
    149 
    150 
    151 def get_attribute(obj, name):
    152     """Get an attribute, raising SkipTest if AttributeError is raised."""
    153     try:
    154         attribute = getattr(obj, name)
    155     except AttributeError:
    156         raise unittest.SkipTest("module %s has no attribute %s" % (
    157             obj.__name__, name))
    158     else:
    159         return attribute
    160 
    161 
    162 verbose = 1              # Flag set to 0 by regrtest.py
    163 use_resources = None     # Flag set to [] by regrtest.py
    164 max_memuse = 0           # Disable bigmem tests (they will still be run with
    165                          # small sizes, to make sure they work.)
    166 real_max_memuse = 0
    167 
    168 # _original_stdout is meant to hold stdout at the time regrtest began.
    169 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
    170 # The point is to have some flavor of stdout the user can actually see.
    171 _original_stdout = None
    172 def record_original_stdout(stdout):
    173     global _original_stdout
    174     _original_stdout = stdout
    175 
    176 def get_original_stdout():
    177     return _original_stdout or sys.stdout
    178 
    179 def unload(name):
    180     try:
    181         del sys.modules[name]
    182     except KeyError:
    183         pass
    184 
    185 if sys.platform.startswith("win"):
    186     def _waitfor(func, pathname, waitall=False):
    187         # Peform the operation
    188         func(pathname)
    189         # Now setup the wait loop
    190         if waitall:
    191             dirname = pathname
    192         else:
    193             dirname, name = os.path.split(pathname)
    194             dirname = dirname or '.'
    195         # Check for `pathname` to be removed from the filesystem.
    196         # The exponential backoff of the timeout amounts to a total
    197         # of ~1 second after which the deletion is probably an error
    198         # anyway.
    199         # Testing on a i7 (at] 4.3GHz shows that usually only 1 iteration is
    200         # required when contention occurs.
    201         timeout = 0.001
    202         while timeout < 1.0:
    203             # Note we are only testing for the existance of the file(s) in
    204             # the contents of the directory regardless of any security or
    205             # access rights.  If we have made it this far, we have sufficient
    206             # permissions to do that much using Python's equivalent of the
    207             # Windows API FindFirstFile.
    208             # Other Windows APIs can fail or give incorrect results when
    209             # dealing with files that are pending deletion.
    210             L = os.listdir(dirname)
    211             if not (L if waitall else name in L):
    212                 return
    213             # Increase the timeout and try again
    214             time.sleep(timeout)
    215             timeout *= 2
    216         warnings.warn('tests may fail, delete still pending for ' + pathname,
    217                       RuntimeWarning, stacklevel=4)
    218 
    219     def _unlink(filename):
    220         _waitfor(os.unlink, filename)
    221 
    222     def _rmdir(dirname):
    223         _waitfor(os.rmdir, dirname)
    224 
    225     def _rmtree(path):
    226         def _rmtree_inner(path):
    227             for name in os.listdir(path):
    228                 fullname = os.path.join(path, name)
    229                 if os.path.isdir(fullname):
    230                     _waitfor(_rmtree_inner, fullname, waitall=True)
    231                     os.rmdir(fullname)
    232                 else:
    233                     os.unlink(fullname)
    234         _waitfor(_rmtree_inner, path, waitall=True)
    235         _waitfor(os.rmdir, path)
    236 else:
    237     _unlink = os.unlink
    238     _rmdir = os.rmdir
    239     _rmtree = shutil.rmtree
    240 
    241 def unlink(filename):
    242     try:
    243         _unlink(filename)
    244     except OSError:
    245         pass
    246 
    247 def rmdir(dirname):
    248     try:
    249         _rmdir(dirname)
    250     except OSError as error:
    251         # The directory need not exist.
    252         if error.errno != errno.ENOENT:
    253             raise
    254 
    255 def rmtree(path):
    256     try:
    257         _rmtree(path)
    258     except OSError, e:
    259         # Unix returns ENOENT, Windows returns ESRCH.
    260         if e.errno not in (errno.ENOENT, errno.ESRCH):
    261             raise
    262 
    263 def forget(modname):
    264     '''"Forget" a module was ever imported by removing it from sys.modules and
    265     deleting any .pyc and .pyo files.'''
    266     unload(modname)
    267     for dirname in sys.path:
    268         unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
    269         # Deleting the .pyo file cannot be within the 'try' for the .pyc since
    270         # the chance exists that there is no .pyc (and thus the 'try' statement
    271         # is exited) but there is a .pyo file.
    272         unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
    273 
    274 def is_resource_enabled(resource):
    275     """Test whether a resource is enabled.  Known resources are set by
    276     regrtest.py."""
    277     return use_resources is not None and resource in use_resources
    278 
    279 def requires(resource, msg=None):
    280     """Raise ResourceDenied if the specified resource is not available.
    281 
    282     If the caller's module is __main__ then automatically return True.  The
    283     possibility of False being returned occurs when regrtest.py is executing."""
    284     # see if the caller's module is __main__ - if so, treat as if
    285     # the resource was set
    286     if sys._getframe(1).f_globals.get("__name__") == "__main__":
    287         return
    288     if not is_resource_enabled(resource):
    289         if msg is None:
    290             msg = "Use of the `%s' resource not enabled" % resource
    291         raise ResourceDenied(msg)
    292 
    293 HOST = 'localhost'
    294 
    295 def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
    296     """Returns an unused port that should be suitable for binding.  This is
    297     achieved by creating a temporary socket with the same family and type as
    298     the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
    299     the specified host address (defaults to 0.0.0.0) with the port set to 0,
    300     eliciting an unused ephemeral port from the OS.  The temporary socket is
    301     then closed and deleted, and the ephemeral port is returned.
    302 
    303     Either this method or bind_port() should be used for any tests where a
    304     server socket needs to be bound to a particular port for the duration of
    305     the test.  Which one to use depends on whether the calling code is creating
    306     a python socket, or if an unused port needs to be provided in a constructor
    307     or passed to an external program (i.e. the -accept argument to openssl's
    308     s_server mode).  Always prefer bind_port() over find_unused_port() where
    309     possible.  Hard coded ports should *NEVER* be used.  As soon as a server
    310     socket is bound to a hard coded port, the ability to run multiple instances
    311     of the test simultaneously on the same host is compromised, which makes the
    312     test a ticking time bomb in a buildbot environment. On Unix buildbots, this
    313     may simply manifest as a failed test, which can be recovered from without
    314     intervention in most cases, but on Windows, the entire python process can
    315     completely and utterly wedge, requiring someone to log in to the buildbot
    316     and manually kill the affected process.
    317 
    318     (This is easy to reproduce on Windows, unfortunately, and can be traced to
    319     the SO_REUSEADDR socket option having different semantics on Windows versus
    320     Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
    321     listen and then accept connections on identical host/ports.  An EADDRINUSE
    322     socket.error will be raised at some point (depending on the platform and
    323     the order bind and listen were called on each socket).
    324 
    325     However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
    326     will ever be raised when attempting to bind two identical host/ports. When
    327     accept() is called on each socket, the second caller's process will steal
    328     the port from the first caller, leaving them both in an awkwardly wedged
    329     state where they'll no longer respond to any signals or graceful kills, and
    330     must be forcibly killed via OpenProcess()/TerminateProcess().
    331 
    332     The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
    333     instead of SO_REUSEADDR, which effectively affords the same semantics as
    334     SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
    335     Source world compared to Windows ones, this is a common mistake.  A quick
    336     look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
    337     openssl.exe is called with the 's_server' option, for example. See
    338     http://bugs.python.org/issue2550 for more info.  The following site also
    339     has a very thorough description about the implications of both REUSEADDR
    340     and EXCLUSIVEADDRUSE on Windows:
    341     http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
    342 
    343     XXX: although this approach is a vast improvement on previous attempts to
    344     elicit unused ports, it rests heavily on the assumption that the ephemeral
    345     port returned to us by the OS won't immediately be dished back out to some
    346     other process when we close and delete our temporary socket but before our
    347     calling code has a chance to bind the returned port.  We can deal with this
    348     issue if/when we come across it."""
    349     tempsock = socket.socket(family, socktype)
    350     port = bind_port(tempsock)
    351     tempsock.close()
    352     del tempsock
    353     return port
    354 
    355 def bind_port(sock, host=HOST):
    356     """Bind the socket to a free port and return the port number.  Relies on
    357     ephemeral ports in order to ensure we are using an unbound port.  This is
    358     important as many tests may be running simultaneously, especially in a
    359     buildbot environment.  This method raises an exception if the sock.family
    360     is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
    361     or SO_REUSEPORT set on it.  Tests should *never* set these socket options
    362     for TCP/IP sockets.  The only case for setting these options is testing
    363     multicasting via multiple UDP sockets.
    364 
    365     Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
    366     on Windows), it will be set on the socket.  This will prevent anyone else
    367     from bind()'ing to our host/port for the duration of the test.
    368     """
    369     if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
    370         if hasattr(socket, 'SO_REUSEADDR'):
    371             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
    372                 raise TestFailed("tests should never set the SO_REUSEADDR "   \
    373                                  "socket option on TCP/IP sockets!")
    374         if hasattr(socket, 'SO_REUSEPORT'):
    375             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
    376                 raise TestFailed("tests should never set the SO_REUSEPORT "   \
    377                                  "socket option on TCP/IP sockets!")
    378         if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
    379             sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
    380 
    381     sock.bind((host, 0))
    382     port = sock.getsockname()[1]
    383     return port
    384 
    385 FUZZ = 1e-6
    386 
    387 def fcmp(x, y): # fuzzy comparison function
    388     if isinstance(x, float) or isinstance(y, float):
    389         try:
    390             fuzz = (abs(x) + abs(y)) * FUZZ
    391             if abs(x-y) <= fuzz:
    392                 return 0
    393         except:
    394             pass
    395     elif type(x) == type(y) and isinstance(x, (tuple, list)):
    396         for i in range(min(len(x), len(y))):
    397             outcome = fcmp(x[i], y[i])
    398             if outcome != 0:
    399                 return outcome
    400         return (len(x) > len(y)) - (len(x) < len(y))
    401     return (x > y) - (x < y)
    402 
    403 
    404 # A constant likely larger than the underlying OS pipe buffer size, to
    405 # make writes blocking.
    406 # Windows limit seems to be around 512 B, and many Unix kernels have a
    407 # 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
    408 # (see issue #17835 for a discussion of this number).
    409 PIPE_MAX_SIZE = 4 *1024 * 1024 + 1
    410 
    411 
    412 try:
    413     unicode
    414     have_unicode = True
    415 except NameError:
    416     have_unicode = False
    417 
    418 is_jython = sys.platform.startswith('java')
    419 
    420 # Filename used for testing
    421 if os.name == 'java':
    422     # Jython disallows @ in module names
    423     TESTFN = '$test'
    424 elif os.name == 'riscos':
    425     TESTFN = 'testfile'
    426 else:
    427     TESTFN = '@test'
    428     # Unicode name only used if TEST_FN_ENCODING exists for the platform.
    429     if have_unicode:
    430         # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
    431         # TESTFN_UNICODE is a filename that can be encoded using the
    432         # file system encoding, but *not* with the default (ascii) encoding
    433         if isinstance('', unicode):
    434             # python -U
    435             # XXX perhaps unicode() should accept Unicode strings?
    436             TESTFN_UNICODE = "@test-\xe0\xf2"
    437         else:
    438             # 2 latin characters.
    439             TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
    440         TESTFN_ENCODING = sys.getfilesystemencoding()
    441         # TESTFN_UNENCODABLE is a filename that should *not* be
    442         # able to be encoded by *either* the default or filesystem encoding.
    443         # This test really only makes sense on Windows NT platforms
    444         # which have special Unicode support in posixmodule.
    445         if (not hasattr(sys, "getwindowsversion") or
    446                 sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
    447             TESTFN_UNENCODABLE = None
    448         else:
    449             # Japanese characters (I think - from bug 846133)
    450             TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
    451             try:
    452                 # XXX - Note - should be using TESTFN_ENCODING here - but for
    453                 # Windows, "mbcs" currently always operates as if in
    454                 # errors=ignore' mode - hence we get '?' characters rather than
    455                 # the exception.  'Latin1' operates as we expect - ie, fails.
    456                 # See [ 850997 ] mbcs encoding ignores errors
    457                 TESTFN_UNENCODABLE.encode("Latin1")
    458             except UnicodeEncodeError:
    459                 pass
    460             else:
    461                 print \
    462                 'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
    463                 'Unicode filename tests may not be effective' \
    464                 % TESTFN_UNENCODABLE
    465 
    466 
    467 # Disambiguate TESTFN for parallel testing, while letting it remain a valid
    468 # module name.
    469 TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
    470 
    471 # Save the initial cwd
    472 SAVEDCWD = os.getcwd()
    473 
    474 @contextlib.contextmanager
    475 def temp_cwd(name='tempcwd', quiet=False):
    476     """
    477     Context manager that creates a temporary directory and set it as CWD.
    478 
    479     The new CWD is created in the current directory and it's named *name*.
    480     If *quiet* is False (default) and it's not possible to create or change
    481     the CWD, an error is raised.  If it's True, only a warning is raised
    482     and the original CWD is used.
    483     """
    484     if have_unicode and isinstance(name, unicode):
    485         try:
    486             name = name.encode(sys.getfilesystemencoding() or 'ascii')
    487         except UnicodeEncodeError:
    488             if not quiet:
    489                 raise unittest.SkipTest('unable to encode the cwd name with '
    490                                         'the filesystem encoding.')
    491     saved_dir = os.getcwd()
    492     is_temporary = False
    493     try:
    494         os.mkdir(name)
    495         os.chdir(name)
    496         is_temporary = True
    497     except OSError:
    498         if not quiet:
    499             raise
    500         warnings.warn('tests may fail, unable to change the CWD to ' + name,
    501                       RuntimeWarning, stacklevel=3)
    502     try:
    503         yield os.getcwd()
    504     finally:
    505         os.chdir(saved_dir)
    506         if is_temporary:
    507             rmtree(name)
    508 
    509 
    510 def findfile(file, here=__file__, subdir=None):
    511     """Try to find a file on sys.path and the working directory.  If it is not
    512     found the argument passed to the function is returned (this does not
    513     necessarily signal failure; could still be the legitimate path)."""
    514     if os.path.isabs(file):
    515         return file
    516     if subdir is not None:
    517         file = os.path.join(subdir, file)
    518     path = sys.path
    519     path = [os.path.dirname(here)] + path
    520     for dn in path:
    521         fn = os.path.join(dn, file)
    522         if os.path.exists(fn): return fn
    523     return file
    524 
    525 def sortdict(dict):
    526     "Like repr(dict), but in sorted order."
    527     items = dict.items()
    528     items.sort()
    529     reprpairs = ["%r: %r" % pair for pair in items]
    530     withcommas = ", ".join(reprpairs)
    531     return "{%s}" % withcommas
    532 
    533 def make_bad_fd():
    534     """
    535     Create an invalid file descriptor by opening and closing a file and return
    536     its fd.
    537     """
    538     file = open(TESTFN, "wb")
    539     try:
    540         return file.fileno()
    541     finally:
    542         file.close()
    543         unlink(TESTFN)
    544 
    545 def check_syntax_error(testcase, statement):
    546     testcase.assertRaises(SyntaxError, compile, statement,
    547                           '<test string>', 'exec')
    548 
    549 def open_urlresource(url, check=None):
    550     import urlparse, urllib2
    551 
    552     filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
    553 
    554     fn = os.path.join(os.path.dirname(__file__), "data", filename)
    555 
    556     def check_valid_file(fn):
    557         f = open(fn)
    558         if check is None:
    559             return f
    560         elif check(f):
    561             f.seek(0)
    562             return f
    563         f.close()
    564 
    565     if os.path.exists(fn):
    566         f = check_valid_file(fn)
    567         if f is not None:
    568             return f
    569         unlink(fn)
    570 
    571     # Verify the requirement before downloading the file
    572     requires('urlfetch')
    573 
    574     print >> get_original_stdout(), '\tfetching %s ...' % url
    575     f = urllib2.urlopen(url, timeout=15)
    576     try:
    577         with open(fn, "wb") as out:
    578             s = f.read()
    579             while s:
    580                 out.write(s)
    581                 s = f.read()
    582     finally:
    583         f.close()
    584 
    585     f = check_valid_file(fn)
    586     if f is not None:
    587         return f
    588     raise TestFailed('invalid resource "%s"' % fn)
    589 
    590 
    591 class WarningsRecorder(object):
    592     """Convenience wrapper for the warnings list returned on
    593        entry to the warnings.catch_warnings() context manager.
    594     """
    595     def __init__(self, warnings_list):
    596         self._warnings = warnings_list
    597         self._last = 0
    598 
    599     def __getattr__(self, attr):
    600         if len(self._warnings) > self._last:
    601             return getattr(self._warnings[-1], attr)
    602         elif attr in warnings.WarningMessage._WARNING_DETAILS:
    603             return None
    604         raise AttributeError("%r has no attribute %r" % (self, attr))
    605 
    606     @property
    607     def warnings(self):
    608         return self._warnings[self._last:]
    609 
    610     def reset(self):
    611         self._last = len(self._warnings)
    612 
    613 
    614 def _filterwarnings(filters, quiet=False):
    615     """Catch the warnings, then check if all the expected
    616     warnings have been raised and re-raise unexpected warnings.
    617     If 'quiet' is True, only re-raise the unexpected warnings.
    618     """
    619     # Clear the warning registry of the calling module
    620     # in order to re-raise the warnings.
    621     frame = sys._getframe(2)
    622     registry = frame.f_globals.get('__warningregistry__')
    623     if registry:
    624         registry.clear()
    625     with warnings.catch_warnings(record=True) as w:
    626         # Set filter "always" to record all warnings.  Because
    627         # test_warnings swap the module, we need to look up in
    628         # the sys.modules dictionary.
    629         sys.modules['warnings'].simplefilter("always")
    630         yield WarningsRecorder(w)
    631     # Filter the recorded warnings
    632     reraise = [warning.message for warning in w]
    633     missing = []
    634     for msg, cat in filters:
    635         seen = False
    636         for exc in reraise[:]:
    637             message = str(exc)
    638             # Filter out the matching messages
    639             if (re.match(msg, message, re.I) and
    640                 issubclass(exc.__class__, cat)):
    641                 seen = True
    642                 reraise.remove(exc)
    643         if not seen and not quiet:
    644             # This filter caught nothing
    645             missing.append((msg, cat.__name__))
    646     if reraise:
    647         raise AssertionError("unhandled warning %r" % reraise[0])
    648     if missing:
    649         raise AssertionError("filter (%r, %s) did not catch any warning" %
    650                              missing[0])
    651 
    652 
    653 @contextlib.contextmanager
    654 def check_warnings(*filters, **kwargs):
    655     """Context manager to silence warnings.
    656 
    657     Accept 2-tuples as positional arguments:
    658         ("message regexp", WarningCategory)
    659 
    660     Optional argument:
    661      - if 'quiet' is True, it does not fail if a filter catches nothing
    662         (default True without argument,
    663          default False if some filters are defined)
    664 
    665     Without argument, it defaults to:
    666         check_warnings(("", Warning), quiet=True)
    667     """
    668     quiet = kwargs.get('quiet')
    669     if not filters:
    670         filters = (("", Warning),)
    671         # Preserve backward compatibility
    672         if quiet is None:
    673             quiet = True
    674     return _filterwarnings(filters, quiet)
    675 
    676 
    677 @contextlib.contextmanager
    678 def check_py3k_warnings(*filters, **kwargs):
    679     """Context manager to silence py3k warnings.
    680 
    681     Accept 2-tuples as positional arguments:
    682         ("message regexp", WarningCategory)
    683 
    684     Optional argument:
    685      - if 'quiet' is True, it does not fail if a filter catches nothing
    686         (default False)
    687 
    688     Without argument, it defaults to:
    689         check_py3k_warnings(("", DeprecationWarning), quiet=False)
    690     """
    691     if sys.py3kwarning:
    692         if not filters:
    693             filters = (("", DeprecationWarning),)
    694     else:
    695         # It should not raise any py3k warning
    696         filters = ()
    697     return _filterwarnings(filters, kwargs.get('quiet'))
    698 
    699 
    700 class CleanImport(object):
    701     """Context manager to force import to return a new module reference.
    702 
    703     This is useful for testing module-level behaviours, such as
    704     the emission of a DeprecationWarning on import.
    705 
    706     Use like this:
    707 
    708         with CleanImport("foo"):
    709             importlib.import_module("foo") # new reference
    710     """
    711 
    712     def __init__(self, *module_names):
    713         self.original_modules = sys.modules.copy()
    714         for module_name in module_names:
    715             if module_name in sys.modules:
    716                 module = sys.modules[module_name]
    717                 # It is possible that module_name is just an alias for
    718                 # another module (e.g. stub for modules renamed in 3.x).
    719                 # In that case, we also need delete the real module to clear
    720                 # the import cache.
    721                 if module.__name__ != module_name:
    722                     del sys.modules[module.__name__]
    723                 del sys.modules[module_name]
    724 
    725     def __enter__(self):
    726         return self
    727 
    728     def __exit__(self, *ignore_exc):
    729         sys.modules.update(self.original_modules)
    730 
    731 
    732 class EnvironmentVarGuard(UserDict.DictMixin):
    733 
    734     """Class to help protect the environment variable properly.  Can be used as
    735     a context manager."""
    736 
    737     def __init__(self):
    738         self._environ = os.environ
    739         self._changed = {}
    740 
    741     def __getitem__(self, envvar):
    742         return self._environ[envvar]
    743 
    744     def __setitem__(self, envvar, value):
    745         # Remember the initial value on the first access
    746         if envvar not in self._changed:
    747             self._changed[envvar] = self._environ.get(envvar)
    748         self._environ[envvar] = value
    749 
    750     def __delitem__(self, envvar):
    751         # Remember the initial value on the first access
    752         if envvar not in self._changed:
    753             self._changed[envvar] = self._environ.get(envvar)
    754         if envvar in self._environ:
    755             del self._environ[envvar]
    756 
    757     def keys(self):
    758         return self._environ.keys()
    759 
    760     def set(self, envvar, value):
    761         self[envvar] = value
    762 
    763     def unset(self, envvar):
    764         del self[envvar]
    765 
    766     def __enter__(self):
    767         return self
    768 
    769     def __exit__(self, *ignore_exc):
    770         for (k, v) in self._changed.items():
    771             if v is None:
    772                 if k in self._environ:
    773                     del self._environ[k]
    774             else:
    775                 self._environ[k] = v
    776         os.environ = self._environ
    777 
    778 
    779 class DirsOnSysPath(object):
    780     """Context manager to temporarily add directories to sys.path.
    781 
    782     This makes a copy of sys.path, appends any directories given
    783     as positional arguments, then reverts sys.path to the copied
    784     settings when the context ends.
    785 
    786     Note that *all* sys.path modifications in the body of the
    787     context manager, including replacement of the object,
    788     will be reverted at the end of the block.
    789     """
    790 
    791     def __init__(self, *paths):
    792         self.original_value = sys.path[:]
    793         self.original_object = sys.path
    794         sys.path.extend(paths)
    795 
    796     def __enter__(self):
    797         return self
    798 
    799     def __exit__(self, *ignore_exc):
    800         sys.path = self.original_object
    801         sys.path[:] = self.original_value
    802 
    803 
    804 class TransientResource(object):
    805 
    806     """Raise ResourceDenied if an exception is raised while the context manager
    807     is in effect that matches the specified exception and attributes."""
    808 
    809     def __init__(self, exc, **kwargs):
    810         self.exc = exc
    811         self.attrs = kwargs
    812 
    813     def __enter__(self):
    814         return self
    815 
    816     def __exit__(self, type_=None, value=None, traceback=None):
    817         """If type_ is a subclass of self.exc and value has attributes matching
    818         self.attrs, raise ResourceDenied.  Otherwise let the exception
    819         propagate (if any)."""
    820         if type_ is not None and issubclass(self.exc, type_):
    821             for attr, attr_value in self.attrs.iteritems():
    822                 if not hasattr(value, attr):
    823                     break
    824                 if getattr(value, attr) != attr_value:
    825                     break
    826             else:
    827                 raise ResourceDenied("an optional resource is not available")
    828 
    829 
    830 @contextlib.contextmanager
    831 def transient_internet(resource_name, timeout=30.0, errnos=()):
    832     """Return a context manager that raises ResourceDenied when various issues
    833     with the Internet connection manifest themselves as exceptions."""
    834     default_errnos = [
    835         ('ECONNREFUSED', 111),
    836         ('ECONNRESET', 104),
    837         ('EHOSTUNREACH', 113),
    838         ('ENETUNREACH', 101),
    839         ('ETIMEDOUT', 110),
    840     ]
    841     default_gai_errnos = [
    842         ('EAI_AGAIN', -3),
    843         ('EAI_FAIL', -4),
    844         ('EAI_NONAME', -2),
    845         ('EAI_NODATA', -5),
    846         # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
    847         # implementation actually returns WSANO_DATA i.e. 11004.
    848         ('WSANO_DATA', 11004),
    849     ]
    850 
    851     denied = ResourceDenied("Resource '%s' is not available" % resource_name)
    852     captured_errnos = errnos
    853     gai_errnos = []
    854     if not captured_errnos:
    855         captured_errnos = [getattr(errno, name, num)
    856                            for (name, num) in default_errnos]
    857         gai_errnos = [getattr(socket, name, num)
    858                       for (name, num) in default_gai_errnos]
    859 
    860     def filter_error(err):
    861         n = getattr(err, 'errno', None)
    862         if (isinstance(err, socket.timeout) or
    863             (isinstance(err, socket.gaierror) and n in gai_errnos) or
    864             n in captured_errnos):
    865             if not verbose:
    866                 sys.stderr.write(denied.args[0] + "\n")
    867             raise denied
    868 
    869     old_timeout = socket.getdefaulttimeout()
    870     try:
    871         if timeout is not None:
    872             socket.setdefaulttimeout(timeout)
    873         yield
    874     except IOError as err:
    875         # urllib can wrap original socket errors multiple times (!), we must
    876         # unwrap to get at the original error.
    877         while True:
    878             a = err.args
    879             if len(a) >= 1 and isinstance(a[0], IOError):
    880                 err = a[0]
    881             # The error can also be wrapped as args[1]:
    882             #    except socket.error as msg:
    883             #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
    884             elif len(a) >= 2 and isinstance(a[1], IOError):
    885                 err = a[1]
    886             else:
    887                 break
    888         filter_error(err)
    889         raise
    890     # XXX should we catch generic exceptions and look for their
    891     # __cause__ or __context__?
    892     finally:
    893         socket.setdefaulttimeout(old_timeout)
    894 
    895 
    896 @contextlib.contextmanager
    897 def captured_output(stream_name):
    898     """Return a context manager used by captured_stdout and captured_stdin
    899     that temporarily replaces the sys stream *stream_name* with a StringIO."""
    900     import StringIO
    901     orig_stdout = getattr(sys, stream_name)
    902     setattr(sys, stream_name, StringIO.StringIO())
    903     try:
    904         yield getattr(sys, stream_name)
    905     finally:
    906         setattr(sys, stream_name, orig_stdout)
    907 
    908 def captured_stdout():
    909     """Capture the output of sys.stdout:
    910 
    911        with captured_stdout() as s:
    912            print "hello"
    913        self.assertEqual(s.getvalue(), "hello")
    914     """
    915     return captured_output("stdout")
    916 
    917 def captured_stderr():
    918     return captured_output("stderr")
    919 
    920 def captured_stdin():
    921     return captured_output("stdin")
    922 
    923 def gc_collect():
    924     """Force as many objects as possible to be collected.
    925 
    926     In non-CPython implementations of Python, this is needed because timely
    927     deallocation is not guaranteed by the garbage collector.  (Even in CPython
    928     this can be the case in case of reference cycles.)  This means that __del__
    929     methods may be called later than expected and weakrefs may remain alive for
    930     longer than expected.  This function tries its best to force all garbage
    931     objects to disappear.
    932     """
    933     gc.collect()
    934     if is_jython:
    935         time.sleep(0.1)
    936     gc.collect()
    937     gc.collect()
    938 
    939 
    940 _header = '2P'
    941 if hasattr(sys, "gettotalrefcount"):
    942     _header = '2P' + _header
    943 _vheader = _header + 'P'
    944 
    945 def calcobjsize(fmt):
    946     return struct.calcsize(_header + fmt + '0P')
    947 
    948 def calcvobjsize(fmt):
    949     return struct.calcsize(_vheader + fmt + '0P')
    950 
    951 
    952 _TPFLAGS_HAVE_GC = 1<<14
    953 _TPFLAGS_HEAPTYPE = 1<<9
    954 
    955 def check_sizeof(test, o, size):
    956     result = sys.getsizeof(o)
    957     # add GC header size
    958     if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
    959         ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
    960         size += _testcapi.SIZEOF_PYGC_HEAD
    961     msg = 'wrong size for %s: got %d, expected %d' \
    962             % (type(o), result, size)
    963     test.assertEqual(result, size, msg)
    964 
    965 
    966 #=======================================================================
    967 # Decorator for running a function in a different locale, correctly resetting
    968 # it afterwards.
    969 
    970 def run_with_locale(catstr, *locales):
    971     def decorator(func):
    972         def inner(*args, **kwds):
    973             try:
    974                 import locale
    975                 category = getattr(locale, catstr)
    976                 orig_locale = locale.setlocale(category)
    977             except AttributeError:
    978                 # if the test author gives us an invalid category string
    979                 raise
    980             except:
    981                 # cannot retrieve original locale, so do nothing
    982                 locale = orig_locale = None
    983             else:
    984                 for loc in locales:
    985                     try:
    986                         locale.setlocale(category, loc)
    987                         break
    988                     except:
    989                         pass
    990 
    991             # now run the function, resetting the locale on exceptions
    992             try:
    993                 return func(*args, **kwds)
    994             finally:
    995                 if locale and orig_locale:
    996                     locale.setlocale(category, orig_locale)
    997         inner.func_name = func.func_name
    998         inner.__doc__ = func.__doc__
    999         return inner
   1000     return decorator
   1001 
   1002 #=======================================================================
   1003 # Big-memory-test support. Separate from 'resources' because memory use should be configurable.
   1004 
   1005 # Some handy shorthands. Note that these are used for byte-limits as well
   1006 # as size-limits, in the various bigmem tests
   1007 _1M = 1024*1024
   1008 _1G = 1024 * _1M
   1009 _2G = 2 * _1G
   1010 _4G = 4 * _1G
   1011 
   1012 MAX_Py_ssize_t = sys.maxsize
   1013 
   1014 def set_memlimit(limit):
   1015     global max_memuse
   1016     global real_max_memuse
   1017     sizes = {
   1018         'k': 1024,
   1019         'm': _1M,
   1020         'g': _1G,
   1021         't': 1024*_1G,
   1022     }
   1023     m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
   1024                  re.IGNORECASE | re.VERBOSE)
   1025     if m is None:
   1026         raise ValueError('Invalid memory limit %r' % (limit,))
   1027     memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
   1028     real_max_memuse = memlimit
   1029     if memlimit > MAX_Py_ssize_t:
   1030         memlimit = MAX_Py_ssize_t
   1031     if memlimit < _2G - 1:
   1032         raise ValueError('Memory limit %r too low to be useful' % (limit,))
   1033     max_memuse = memlimit
   1034 
   1035 def bigmemtest(minsize, memuse, overhead=5*_1M):
   1036     """Decorator for bigmem tests.
   1037 
   1038     'minsize' is the minimum useful size for the test (in arbitrary,
   1039     test-interpreted units.) 'memuse' is the number of 'bytes per size' for
   1040     the test, or a good estimate of it. 'overhead' specifies fixed overhead,
   1041     independent of the testsize, and defaults to 5Mb.
   1042 
   1043     The decorator tries to guess a good value for 'size' and passes it to
   1044     the decorated test function. If minsize * memuse is more than the
   1045     allowed memory use (as defined by max_memuse), the test is skipped.
   1046     Otherwise, minsize is adjusted upward to use up to max_memuse.
   1047     """
   1048     def decorator(f):
   1049         def wrapper(self):
   1050             if not max_memuse:
   1051                 # If max_memuse is 0 (the default),
   1052                 # we still want to run the tests with size set to a few kb,
   1053                 # to make sure they work. We still want to avoid using
   1054                 # too much memory, though, but we do that noisily.
   1055                 maxsize = 5147
   1056                 self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
   1057             else:
   1058                 maxsize = int((max_memuse - overhead) / memuse)
   1059                 if maxsize < minsize:
   1060                     # Really ought to print 'test skipped' or something
   1061                     if verbose:
   1062                         sys.stderr.write("Skipping %s because of memory "
   1063                                          "constraint\n" % (f.__name__,))
   1064                     return
   1065                 # Try to keep some breathing room in memory use
   1066                 maxsize = max(maxsize - 50 * _1M, minsize)
   1067             return f(self, maxsize)
   1068         wrapper.minsize = minsize
   1069         wrapper.memuse = memuse
   1070         wrapper.overhead = overhead
   1071         return wrapper
   1072     return decorator
   1073 
   1074 def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
   1075     def decorator(f):
   1076         def wrapper(self):
   1077             if not real_max_memuse:
   1078                 maxsize = 5147
   1079             else:
   1080                 maxsize = size
   1081 
   1082             if ((real_max_memuse or not dry_run)
   1083                 and real_max_memuse < maxsize * memuse):
   1084                 if verbose:
   1085                     sys.stderr.write("Skipping %s because of memory "
   1086                                      "constraint\n" % (f.__name__,))
   1087                 return
   1088 
   1089             return f(self, maxsize)
   1090         wrapper.size = size
   1091         wrapper.memuse = memuse
   1092         wrapper.overhead = overhead
   1093         return wrapper
   1094     return decorator
   1095 
   1096 def bigaddrspacetest(f):
   1097     """Decorator for tests that fill the address space."""
   1098     def wrapper(self):
   1099         if max_memuse < MAX_Py_ssize_t:
   1100             if verbose:
   1101                 sys.stderr.write("Skipping %s because of memory "
   1102                                  "constraint\n" % (f.__name__,))
   1103         else:
   1104             return f(self)
   1105     return wrapper
   1106 
   1107 #=======================================================================
   1108 # unittest integration.
   1109 
   1110 class BasicTestRunner:
   1111     def run(self, test):
   1112         result = unittest.TestResult()
   1113         test(result)
   1114         return result
   1115 
   1116 def _id(obj):
   1117     return obj
   1118 
   1119 def requires_resource(resource):
   1120     if is_resource_enabled(resource):
   1121         return _id
   1122     else:
   1123         return unittest.skip("resource {0!r} is not enabled".format(resource))
   1124 
   1125 def cpython_only(test):
   1126     """
   1127     Decorator for tests only applicable on CPython.
   1128     """
   1129     return impl_detail(cpython=True)(test)
   1130 
   1131 def impl_detail(msg=None, **guards):
   1132     if check_impl_detail(**guards):
   1133         return _id
   1134     if msg is None:
   1135         guardnames, default = _parse_guards(guards)
   1136         if default:
   1137             msg = "implementation detail not available on {0}"
   1138         else:
   1139             msg = "implementation detail specific to {0}"
   1140         guardnames = sorted(guardnames.keys())
   1141         msg = msg.format(' or '.join(guardnames))
   1142     return unittest.skip(msg)
   1143 
   1144 def _parse_guards(guards):
   1145     # Returns a tuple ({platform_name: run_me}, default_value)
   1146     if not guards:
   1147         return ({'cpython': True}, False)
   1148     is_true = guards.values()[0]
   1149     assert guards.values() == [is_true] * len(guards)   # all True or all False
   1150     return (guards, not is_true)
   1151 
   1152 # Use the following check to guard CPython's implementation-specific tests --
   1153 # or to run them only on the implementation(s) guarded by the arguments.
   1154 def check_impl_detail(**guards):
   1155     """This function returns True or False depending on the host platform.
   1156        Examples:
   1157           if check_impl_detail():               # only on CPython (default)
   1158           if check_impl_detail(jython=True):    # only on Jython
   1159           if check_impl_detail(cpython=False):  # everywhere except on CPython
   1160     """
   1161     guards, default = _parse_guards(guards)
   1162     return guards.get(platform.python_implementation().lower(), default)
   1163 
   1164 
   1165 
   1166 def _run_suite(suite):
   1167     """Run tests from a unittest.TestSuite-derived class."""
   1168     if verbose:
   1169         runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
   1170     else:
   1171         runner = BasicTestRunner()
   1172 
   1173     result = runner.run(suite)
   1174     if not result.wasSuccessful():
   1175         if len(result.errors) == 1 and not result.failures:
   1176             err = result.errors[0][1]
   1177         elif len(result.failures) == 1 and not result.errors:
   1178             err = result.failures[0][1]
   1179         else:
   1180             err = "multiple errors occurred"
   1181             if not verbose:
   1182                 err += "; run in verbose mode for details"
   1183         raise TestFailed(err)
   1184 
   1185 
   1186 def run_unittest(*classes):
   1187     """Run tests from unittest.TestCase-derived classes."""
   1188     valid_types = (unittest.TestSuite, unittest.TestCase)
   1189     suite = unittest.TestSuite()
   1190     for cls in classes:
   1191         if isinstance(cls, str):
   1192             if cls in sys.modules:
   1193                 suite.addTest(unittest.findTestCases(sys.modules[cls]))
   1194             else:
   1195                 raise ValueError("str arguments must be keys in sys.modules")
   1196         elif isinstance(cls, valid_types):
   1197             suite.addTest(cls)
   1198         else:
   1199             suite.addTest(unittest.makeSuite(cls))
   1200     _run_suite(suite)
   1201 
   1202 #=======================================================================
   1203 # Check for the presence of docstrings.
   1204 
   1205 HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
   1206                    sys.platform == 'win32' or
   1207                    sysconfig.get_config_var('WITH_DOC_STRINGS'))
   1208 
   1209 requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
   1210                                           "test requires docstrings")
   1211 
   1212 
   1213 #=======================================================================
   1214 # doctest driver.
   1215 
   1216 def run_doctest(module, verbosity=None):
   1217     """Run doctest on the given module.  Return (#failures, #tests).
   1218 
   1219     If optional argument verbosity is not specified (or is None), pass
   1220     test_support's belief about verbosity on to doctest.  Else doctest's
   1221     usual behavior is used (it searches sys.argv for -v).
   1222     """
   1223 
   1224     import doctest
   1225 
   1226     if verbosity is None:
   1227         verbosity = verbose
   1228     else:
   1229         verbosity = None
   1230 
   1231     # Direct doctest output (normally just errors) to real stdout; doctest
   1232     # output shouldn't be compared by regrtest.
   1233     save_stdout = sys.stdout
   1234     sys.stdout = get_original_stdout()
   1235     try:
   1236         f, t = doctest.testmod(module, verbose=verbosity)
   1237         if f:
   1238             raise TestFailed("%d of %d doctests failed" % (f, t))
   1239     finally:
   1240         sys.stdout = save_stdout
   1241     if verbose:
   1242         print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
   1243     return f, t
   1244 
   1245 #=======================================================================
   1246 # Threading support to prevent reporting refleaks when running regrtest.py -R
   1247 
   1248 # NOTE: we use thread._count() rather than threading.enumerate() (or the
   1249 # moral equivalent thereof) because a threading.Thread object is still alive
   1250 # until its __bootstrap() method has returned, even after it has been
   1251 # unregistered from the threading module.
   1252 # thread._count(), on the other hand, only gets decremented *after* the
   1253 # __bootstrap() method has returned, which gives us reliable reference counts
   1254 # at the end of a test run.
   1255 
   1256 def threading_setup():
   1257     if thread:
   1258         return thread._count(),
   1259     else:
   1260         return 1,
   1261 
   1262 def threading_cleanup(nb_threads):
   1263     if not thread:
   1264         return
   1265 
   1266     _MAX_COUNT = 10
   1267     for count in range(_MAX_COUNT):
   1268         n = thread._count()
   1269         if n == nb_threads:
   1270             break
   1271         time.sleep(0.1)
   1272     # XXX print a warning in case of failure?
   1273 
   1274 def reap_threads(func):
   1275     """Use this function when threads are being used.  This will
   1276     ensure that the threads are cleaned up even when the test fails.
   1277     If threading is unavailable this function does nothing.
   1278     """
   1279     if not thread:
   1280         return func
   1281 
   1282     @functools.wraps(func)
   1283     def decorator(*args):
   1284         key = threading_setup()
   1285         try:
   1286             return func(*args)
   1287         finally:
   1288             threading_cleanup(*key)
   1289     return decorator
   1290 
   1291 def reap_children():
   1292     """Use this function at the end of test_main() whenever sub-processes
   1293     are started.  This will help ensure that no extra children (zombies)
   1294     stick around to hog resources and create problems when looking
   1295     for refleaks.
   1296     """
   1297 
   1298     # Reap all our dead child processes so we don't leave zombies around.
   1299     # These hog resources and might be causing some of the buildbots to die.
   1300     if hasattr(os, 'waitpid'):
   1301         any_process = -1
   1302         while True:
   1303             try:
   1304                 # This will raise an exception on Windows.  That's ok.
   1305                 pid, status = os.waitpid(any_process, os.WNOHANG)
   1306                 if pid == 0:
   1307                     break
   1308             except:
   1309                 break
   1310 
   1311 @contextlib.contextmanager
   1312 def swap_attr(obj, attr, new_val):
   1313     """Temporary swap out an attribute with a new object.
   1314 
   1315     Usage:
   1316         with swap_attr(obj, "attr", 5):
   1317             ...
   1318 
   1319         This will set obj.attr to 5 for the duration of the with: block,
   1320         restoring the old value at the end of the block. If `attr` doesn't
   1321         exist on `obj`, it will be created and then deleted at the end of the
   1322         block.
   1323     """
   1324     if hasattr(obj, attr):
   1325         real_val = getattr(obj, attr)
   1326         setattr(obj, attr, new_val)
   1327         try:
   1328             yield
   1329         finally:
   1330             setattr(obj, attr, real_val)
   1331     else:
   1332         setattr(obj, attr, new_val)
   1333         try:
   1334             yield
   1335         finally:
   1336             delattr(obj, attr)
   1337 
   1338 def py3k_bytes(b):
   1339     """Emulate the py3k bytes() constructor.
   1340 
   1341     NOTE: This is only a best effort function.
   1342     """
   1343     try:
   1344         # memoryview?
   1345         return b.tobytes()
   1346     except AttributeError:
   1347         try:
   1348             # iterable of ints?
   1349             return b"".join(chr(x) for x in b)
   1350         except TypeError:
   1351             return bytes(b)
   1352 
   1353 def args_from_interpreter_flags():
   1354     """Return a list of command-line arguments reproducing the current
   1355     settings in sys.flags."""
   1356     import subprocess
   1357     return subprocess._args_from_interpreter_flags()
   1358 
   1359 def strip_python_stderr(stderr):
   1360     """Strip the stderr of a Python process from potential debug output
   1361     emitted by the interpreter.
   1362 
   1363     This will typically be run on the result of the communicate() method
   1364     of a subprocess.Popen object.
   1365     """
   1366     stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
   1367     return stderr
   1368