Home | History | Annotate | Download | only in support
      1 """Supporting definitions for the Python regression tests."""
      2 
      3 if __name__ != 'test.support':
      4     raise ImportError('support must be imported from the test package')
      5 
      6 import collections.abc
      7 import contextlib
      8 import datetime
      9 import errno
     10 import faulthandler
     11 import fnmatch
     12 import functools
     13 import gc
     14 import importlib
     15 import importlib.util
     16 import io
     17 import logging.handlers
     18 import nntplib
     19 import os
     20 import platform
     21 import re
     22 import shutil
     23 import socket
     24 import stat
     25 import struct
     26 import subprocess
     27 import sys
     28 import sysconfig
     29 import tempfile
     30 import _thread
     31 import threading
     32 import time
     33 import types
     34 import unittest
     35 import urllib.error
     36 import warnings
     37 
     38 from .testresult import get_test_runner
     39 
     40 try:
     41     import multiprocessing.process
     42 except ImportError:
     43     multiprocessing = None
     44 
     45 try:
     46     import zlib
     47 except ImportError:
     48     zlib = None
     49 
     50 try:
     51     import gzip
     52 except ImportError:
     53     gzip = None
     54 
     55 try:
     56     import bz2
     57 except ImportError:
     58     bz2 = None
     59 
     60 try:
     61     import lzma
     62 except ImportError:
     63     lzma = None
     64 
     65 try:
     66     import resource
     67 except ImportError:
     68     resource = None
     69 
     70 __all__ = [
     71     # globals
     72     "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
     73     # exceptions
     74     "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
     75     # imports
     76     "import_module", "import_fresh_module", "CleanImport",
     77     # modules
     78     "unload", "forget",
     79     # io
     80     "record_original_stdout", "get_original_stdout", "captured_stdout",
     81     "captured_stdin", "captured_stderr",
     82     # filesystem
     83     "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
     84     "create_empty_file", "can_symlink", "fs_is_case_insensitive",
     85     # unittest
     86     "is_resource_enabled", "requires", "requires_freebsd_version",
     87     "requires_linux_version", "requires_mac_ver", "check_syntax_error",
     88     "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
     89     "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest",
     90     "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
     91     "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
     92     "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
     93     "anticipate_failure", "load_package_tests", "detect_api_mismatch",
     94     "check__all__", "skip_unless_bind_unix_socket",
     95     # sys
     96     "is_jython", "is_android", "check_impl_detail", "unix_shell",
     97     "setswitchinterval",
     98     # network
     99     "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource",
    100     "bind_unix_socket",
    101     # processes
    102     'temp_umask', "reap_children",
    103     # logging
    104     "TestHandler",
    105     # threads
    106     "threading_setup", "threading_cleanup", "reap_threads", "start_threads",
    107     # miscellaneous
    108     "check_warnings", "check_no_resource_warning", "EnvironmentVarGuard",
    109     "run_with_locale", "swap_item",
    110     "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
    111     "run_with_tz", "PGO", "missing_compiler_executable", "fd_count",
    112     ]
    113 
    114 class Error(Exception):
    115     """Base class for regression test exceptions."""
    116 
    117 class TestFailed(Error):
    118     """Test failed."""
    119 
    120 class TestDidNotRun(Error):
    121     """Test did not run any subtests."""
    122 
    123 class ResourceDenied(unittest.SkipTest):
    124     """Test skipped because it requested a disallowed resource.
    125 
    126     This is raised when a test calls requires() for a resource that
    127     has not be enabled.  It is used to distinguish between expected
    128     and unexpected skips.
    129     """
    130 
    131 @contextlib.contextmanager
    132 def _ignore_deprecated_imports(ignore=True):
    133     """Context manager to suppress package and module deprecation
    134     warnings when importing them.
    135 
    136     If ignore is False, this context manager has no effect.
    137     """
    138     if ignore:
    139         with warnings.catch_warnings():
    140             warnings.filterwarnings("ignore", ".+ (module|package)",
    141                                     DeprecationWarning)
    142             yield
    143     else:
    144         yield
    145 
    146 
    147 def import_module(name, deprecated=False, *, required_on=()):
    148     """Import and return the module to be tested, raising SkipTest if
    149     it is not available.
    150 
    151     If deprecated is True, any module or package deprecation messages
    152     will be suppressed. If a module is required on a platform but optional for
    153     others, set required_on to an iterable of platform prefixes which will be
    154     compared against sys.platform.
    155     """
    156     with _ignore_deprecated_imports(deprecated):
    157         try:
    158             return importlib.import_module(name)
    159         except ImportError as msg:
    160             if sys.platform.startswith(tuple(required_on)):
    161                 raise
    162             raise unittest.SkipTest(str(msg))
    163 
    164 
    165 def _save_and_remove_module(name, orig_modules):
    166     """Helper function to save and remove a module from sys.modules
    167 
    168     Raise ImportError if the module can't be imported.
    169     """
    170     # try to import the module and raise an error if it can't be imported
    171     if name not in sys.modules:
    172         __import__(name)
    173         del sys.modules[name]
    174     for modname in list(sys.modules):
    175         if modname == name or modname.startswith(name + '.'):
    176             orig_modules[modname] = sys.modules[modname]
    177             del sys.modules[modname]
    178 
    179 def _save_and_block_module(name, orig_modules):
    180     """Helper function to save and block a module in sys.modules
    181 
    182     Return True if the module was in sys.modules, False otherwise.
    183     """
    184     saved = True
    185     try:
    186         orig_modules[name] = sys.modules[name]
    187     except KeyError:
    188         saved = False
    189     sys.modules[name] = None
    190     return saved
    191 
    192 
    193 def anticipate_failure(condition):
    194     """Decorator to mark a test that is known to be broken in some cases
    195 
    196        Any use of this decorator should have a comment identifying the
    197        associated tracker issue.
    198     """
    199     if condition:
    200         return unittest.expectedFailure
    201     return lambda f: f
    202 
    203 def load_package_tests(pkg_dir, loader, standard_tests, pattern):
    204     """Generic load_tests implementation for simple test packages.
    205 
    206     Most packages can implement load_tests using this function as follows:
    207 
    208        def load_tests(*args):
    209            return load_package_tests(os.path.dirname(__file__), *args)
    210     """
    211     if pattern is None:
    212         pattern = "test*"
    213     top_dir = os.path.dirname(              # Lib
    214                   os.path.dirname(              # test
    215                       os.path.dirname(__file__)))   # support
    216     package_tests = loader.discover(start_dir=pkg_dir,
    217                                     top_level_dir=top_dir,
    218                                     pattern=pattern)
    219     standard_tests.addTests(package_tests)
    220     return standard_tests
    221 
    222 
    223 def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
    224     """Import and return a module, deliberately bypassing sys.modules.
    225 
    226     This function imports and returns a fresh copy of the named Python module
    227     by removing the named module from sys.modules before doing the import.
    228     Note that unlike reload, the original module is not affected by
    229     this operation.
    230 
    231     *fresh* is an iterable of additional module names that are also removed
    232     from the sys.modules cache before doing the import.
    233 
    234     *blocked* is an iterable of module names that are replaced with None
    235     in the module cache during the import to ensure that attempts to import
    236     them raise ImportError.
    237 
    238     The named module and any modules named in the *fresh* and *blocked*
    239     parameters are saved before starting the import and then reinserted into
    240     sys.modules when the fresh import is complete.
    241 
    242     Module and package deprecation messages are suppressed during this import
    243     if *deprecated* is True.
    244 
    245     This function will raise ImportError if the named module cannot be
    246     imported.
    247     """
    248     # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
    249     # to make sure that this utility function is working as expected
    250     with _ignore_deprecated_imports(deprecated):
    251         # Keep track of modules saved for later restoration as well
    252         # as those which just need a blocking entry removed
    253         orig_modules = {}
    254         names_to_remove = []
    255         _save_and_remove_module(name, orig_modules)
    256         try:
    257             for fresh_name in fresh:
    258                 _save_and_remove_module(fresh_name, orig_modules)
    259             for blocked_name in blocked:
    260                 if not _save_and_block_module(blocked_name, orig_modules):
    261                     names_to_remove.append(blocked_name)
    262             fresh_module = importlib.import_module(name)
    263         except ImportError:
    264             fresh_module = None
    265         finally:
    266             for orig_name, module in orig_modules.items():
    267                 sys.modules[orig_name] = module
    268             for name_to_remove in names_to_remove:
    269                 del sys.modules[name_to_remove]
    270         return fresh_module
    271 
    272 
    273 def get_attribute(obj, name):
    274     """Get an attribute, raising SkipTest if AttributeError is raised."""
    275     try:
    276         attribute = getattr(obj, name)
    277     except AttributeError:
    278         raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
    279     else:
    280         return attribute
    281 
    282 verbose = 1              # Flag set to 0 by regrtest.py
    283 use_resources = None     # Flag set to [] by regrtest.py
    284 max_memuse = 0           # Disable bigmem tests (they will still be run with
    285                          # small sizes, to make sure they work.)
    286 real_max_memuse = 0
    287 junit_xml_list = None    # list of testsuite XML elements
    288 failfast = False
    289 
    290 # _original_stdout is meant to hold stdout at the time regrtest began.
    291 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
    292 # The point is to have some flavor of stdout the user can actually see.
    293 _original_stdout = None
    294 def record_original_stdout(stdout):
    295     global _original_stdout
    296     _original_stdout = stdout
    297 
    298 def get_original_stdout():
    299     return _original_stdout or sys.stdout
    300 
    301 def unload(name):
    302     try:
    303         del sys.modules[name]
    304     except KeyError:
    305         pass
    306 
    307 def _force_run(path, func, *args):
    308     try:
    309         return func(*args)
    310     except OSError as err:
    311         if verbose >= 2:
    312             print('%s: %s' % (err.__class__.__name__, err))
    313             print('re-run %s%r' % (func.__name__, args))
    314         os.chmod(path, stat.S_IRWXU)
    315         return func(*args)
    316 
    317 if sys.platform.startswith("win"):
    318     def _waitfor(func, pathname, waitall=False):
    319         # Perform the operation
    320         func(pathname)
    321         # Now setup the wait loop
    322         if waitall:
    323             dirname = pathname
    324         else:
    325             dirname, name = os.path.split(pathname)
    326             dirname = dirname or '.'
    327         # Check for `pathname` to be removed from the filesystem.
    328         # The exponential backoff of the timeout amounts to a total
    329         # of ~1 second after which the deletion is probably an error
    330         # anyway.
    331         # Testing on an i7 (at] 4.3GHz shows that usually only 1 iteration is
    332         # required when contention occurs.
    333         timeout = 0.001
    334         while timeout < 1.0:
    335             # Note we are only testing for the existence of the file(s) in
    336             # the contents of the directory regardless of any security or
    337             # access rights.  If we have made it this far, we have sufficient
    338             # permissions to do that much using Python's equivalent of the
    339             # Windows API FindFirstFile.
    340             # Other Windows APIs can fail or give incorrect results when
    341             # dealing with files that are pending deletion.
    342             L = os.listdir(dirname)
    343             if not (L if waitall else name in L):
    344                 return
    345             # Increase the timeout and try again
    346             time.sleep(timeout)
    347             timeout *= 2
    348         warnings.warn('tests may fail, delete still pending for ' + pathname,
    349                       RuntimeWarning, stacklevel=4)
    350 
    351     def _unlink(filename):
    352         _waitfor(os.unlink, filename)
    353 
    354     def _rmdir(dirname):
    355         _waitfor(os.rmdir, dirname)
    356 
    357     def _rmtree(path):
    358         def _rmtree_inner(path):
    359             for name in _force_run(path, os.listdir, path):
    360                 fullname = os.path.join(path, name)
    361                 try:
    362                     mode = os.lstat(fullname).st_mode
    363                 except OSError as exc:
    364                     print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
    365                           file=sys.__stderr__)
    366                     mode = 0
    367                 if stat.S_ISDIR(mode):
    368                     _waitfor(_rmtree_inner, fullname, waitall=True)
    369                     _force_run(fullname, os.rmdir, fullname)
    370                 else:
    371                     _force_run(fullname, os.unlink, fullname)
    372         _waitfor(_rmtree_inner, path, waitall=True)
    373         _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
    374 
    375     def _longpath(path):
    376         try:
    377             import ctypes
    378         except ImportError:
    379             # No ctypes means we can't expands paths.
    380             pass
    381         else:
    382             buffer = ctypes.create_unicode_buffer(len(path) * 2)
    383             length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
    384                                                              len(buffer))
    385             if length:
    386                 return buffer[:length]
    387         return path
    388 else:
    389     _unlink = os.unlink
    390     _rmdir = os.rmdir
    391 
    392     def _rmtree(path):
    393         try:
    394             shutil.rmtree(path)
    395             return
    396         except OSError:
    397             pass
    398 
    399         def _rmtree_inner(path):
    400             for name in _force_run(path, os.listdir, path):
    401                 fullname = os.path.join(path, name)
    402                 try:
    403                     mode = os.lstat(fullname).st_mode
    404                 except OSError:
    405                     mode = 0
    406                 if stat.S_ISDIR(mode):
    407                     _rmtree_inner(fullname)
    408                     _force_run(path, os.rmdir, fullname)
    409                 else:
    410                     _force_run(path, os.unlink, fullname)
    411         _rmtree_inner(path)
    412         os.rmdir(path)
    413 
    414     def _longpath(path):
    415         return path
    416 
    417 def unlink(filename):
    418     try:
    419         _unlink(filename)
    420     except (FileNotFoundError, NotADirectoryError):
    421         pass
    422 
    423 def rmdir(dirname):
    424     try:
    425         _rmdir(dirname)
    426     except FileNotFoundError:
    427         pass
    428 
    429 def rmtree(path):
    430     try:
    431         _rmtree(path)
    432     except FileNotFoundError:
    433         pass
    434 
    435 def make_legacy_pyc(source):
    436     """Move a PEP 3147/488 pyc file to its legacy pyc location.
    437 
    438     :param source: The file system path to the source file.  The source file
    439         does not need to exist, however the PEP 3147/488 pyc file must exist.
    440     :return: The file system path to the legacy pyc file.
    441     """
    442     pyc_file = importlib.util.cache_from_source(source)
    443     up_one = os.path.dirname(os.path.abspath(source))
    444     legacy_pyc = os.path.join(up_one, source + 'c')
    445     os.rename(pyc_file, legacy_pyc)
    446     return legacy_pyc
    447 
    448 def forget(modname):
    449     """'Forget' a module was ever imported.
    450 
    451     This removes the module from sys.modules and deletes any PEP 3147/488 or
    452     legacy .pyc files.
    453     """
    454     unload(modname)
    455     for dirname in sys.path:
    456         source = os.path.join(dirname, modname + '.py')
    457         # It doesn't matter if they exist or not, unlink all possible
    458         # combinations of PEP 3147/488 and legacy pyc files.
    459         unlink(source + 'c')
    460         for opt in ('', 1, 2):
    461             unlink(importlib.util.cache_from_source(source, optimization=opt))
    462 
    463 # Check whether a gui is actually available
    464 def _is_gui_available():
    465     if hasattr(_is_gui_available, 'result'):
    466         return _is_gui_available.result
    467     reason = None
    468     if sys.platform.startswith('win'):
    469         # if Python is running as a service (such as the buildbot service),
    470         # gui interaction may be disallowed
    471         import ctypes
    472         import ctypes.wintypes
    473         UOI_FLAGS = 1
    474         WSF_VISIBLE = 0x0001
    475         class USEROBJECTFLAGS(ctypes.Structure):
    476             _fields_ = [("fInherit", ctypes.wintypes.BOOL),
    477                         ("fReserved", ctypes.wintypes.BOOL),
    478                         ("dwFlags", ctypes.wintypes.DWORD)]
    479         dll = ctypes.windll.user32
    480         h = dll.GetProcessWindowStation()
    481         if not h:
    482             raise ctypes.WinError()
    483         uof = USEROBJECTFLAGS()
    484         needed = ctypes.wintypes.DWORD()
    485         res = dll.GetUserObjectInformationW(h,
    486             UOI_FLAGS,
    487             ctypes.byref(uof),
    488             ctypes.sizeof(uof),
    489             ctypes.byref(needed))
    490         if not res:
    491             raise ctypes.WinError()
    492         if not bool(uof.dwFlags & WSF_VISIBLE):
    493             reason = "gui not available (WSF_VISIBLE flag not set)"
    494     elif sys.platform == 'darwin':
    495         # The Aqua Tk implementations on OS X can abort the process if
    496         # being called in an environment where a window server connection
    497         # cannot be made, for instance when invoked by a buildbot or ssh
    498         # process not running under the same user id as the current console
    499         # user.  To avoid that, raise an exception if the window manager
    500         # connection is not available.
    501         from ctypes import cdll, c_int, pointer, Structure
    502         from ctypes.util import find_library
    503 
    504         app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
    505 
    506         if app_services.CGMainDisplayID() == 0:
    507             reason = "gui tests cannot run without OS X window manager"
    508         else:
    509             class ProcessSerialNumber(Structure):
    510                 _fields_ = [("highLongOfPSN", c_int),
    511                             ("lowLongOfPSN", c_int)]
    512             psn = ProcessSerialNumber()
    513             psn_p = pointer(psn)
    514             if (  (app_services.GetCurrentProcess(psn_p) < 0) or
    515                   (app_services.SetFrontProcess(psn_p) < 0) ):
    516                 reason = "cannot run without OS X gui process"
    517 
    518     # check on every platform whether tkinter can actually do anything
    519     if not reason:
    520         try:
    521             from tkinter import Tk
    522             root = Tk()
    523             root.withdraw()
    524             root.update()
    525             root.destroy()
    526         except Exception as e:
    527             err_string = str(e)
    528             if len(err_string) > 50:
    529                 err_string = err_string[:50] + ' [...]'
    530             reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
    531                                                            err_string)
    532 
    533     _is_gui_available.reason = reason
    534     _is_gui_available.result = not reason
    535 
    536     return _is_gui_available.result
    537 
    538 def is_resource_enabled(resource):
    539     """Test whether a resource is enabled.
    540 
    541     Known resources are set by regrtest.py.  If not running under regrtest.py,
    542     all resources are assumed enabled unless use_resources has been set.
    543     """
    544     return use_resources is None or resource in use_resources
    545 
    546 def requires(resource, msg=None):
    547     """Raise ResourceDenied if the specified resource is not available."""
    548     if not is_resource_enabled(resource):
    549         if msg is None:
    550             msg = "Use of the %r resource not enabled" % resource
    551         raise ResourceDenied(msg)
    552     if resource == 'gui' and not _is_gui_available():
    553         raise ResourceDenied(_is_gui_available.reason)
    554 
    555 def _requires_unix_version(sysname, min_version):
    556     """Decorator raising SkipTest if the OS is `sysname` and the version is less
    557     than `min_version`.
    558 
    559     For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
    560     the FreeBSD version is less than 7.2.
    561     """
    562     def decorator(func):
    563         @functools.wraps(func)
    564         def wrapper(*args, **kw):
    565             if platform.system() == sysname:
    566                 version_txt = platform.release().split('-', 1)[0]
    567                 try:
    568                     version = tuple(map(int, version_txt.split('.')))
    569                 except ValueError:
    570                     pass
    571                 else:
    572                     if version < min_version:
    573                         min_version_txt = '.'.join(map(str, min_version))
    574                         raise unittest.SkipTest(
    575                             "%s version %s or higher required, not %s"
    576                             % (sysname, min_version_txt, version_txt))
    577             return func(*args, **kw)
    578         wrapper.min_version = min_version
    579         return wrapper
    580     return decorator
    581 
    582 def requires_freebsd_version(*min_version):
    583     """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
    584     less than `min_version`.
    585 
    586     For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
    587     version is less than 7.2.
    588     """
    589     return _requires_unix_version('FreeBSD', min_version)
    590 
    591 def requires_linux_version(*min_version):
    592     """Decorator raising SkipTest if the OS is Linux and the Linux version is
    593     less than `min_version`.
    594 
    595     For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
    596     version is less than 2.6.32.
    597     """
    598     return _requires_unix_version('Linux', min_version)
    599 
    600 def requires_mac_ver(*min_version):
    601     """Decorator raising SkipTest if the OS is Mac OS X and the OS X
    602     version if less than min_version.
    603 
    604     For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
    605     is lesser than 10.5.
    606     """
    607     def decorator(func):
    608         @functools.wraps(func)
    609         def wrapper(*args, **kw):
    610             if sys.platform == 'darwin':
    611                 version_txt = platform.mac_ver()[0]
    612                 try:
    613                     version = tuple(map(int, version_txt.split('.')))
    614                 except ValueError:
    615                     pass
    616                 else:
    617                     if version < min_version:
    618                         min_version_txt = '.'.join(map(str, min_version))
    619                         raise unittest.SkipTest(
    620                             "Mac OS X %s or higher required, not %s"
    621                             % (min_version_txt, version_txt))
    622             return func(*args, **kw)
    623         wrapper.min_version = min_version
    624         return wrapper
    625     return decorator
    626 
    627 
    628 HOST = "localhost"
    629 HOSTv4 = "127.0.0.1"
    630 HOSTv6 = "::1"
    631 
    632 
    633 def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
    634     """Returns an unused port that should be suitable for binding.  This is
    635     achieved by creating a temporary socket with the same family and type as
    636     the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
    637     the specified host address (defaults to 0.0.0.0) with the port set to 0,
    638     eliciting an unused ephemeral port from the OS.  The temporary socket is
    639     then closed and deleted, and the ephemeral port is returned.
    640 
    641     Either this method or bind_port() should be used for any tests where a
    642     server socket needs to be bound to a particular port for the duration of
    643     the test.  Which one to use depends on whether the calling code is creating
    644     a python socket, or if an unused port needs to be provided in a constructor
    645     or passed to an external program (i.e. the -accept argument to openssl's
    646     s_server mode).  Always prefer bind_port() over find_unused_port() where
    647     possible.  Hard coded ports should *NEVER* be used.  As soon as a server
    648     socket is bound to a hard coded port, the ability to run multiple instances
    649     of the test simultaneously on the same host is compromised, which makes the
    650     test a ticking time bomb in a buildbot environment. On Unix buildbots, this
    651     may simply manifest as a failed test, which can be recovered from without
    652     intervention in most cases, but on Windows, the entire python process can
    653     completely and utterly wedge, requiring someone to log in to the buildbot
    654     and manually kill the affected process.
    655 
    656     (This is easy to reproduce on Windows, unfortunately, and can be traced to
    657     the SO_REUSEADDR socket option having different semantics on Windows versus
    658     Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
    659     listen and then accept connections on identical host/ports.  An EADDRINUSE
    660     OSError will be raised at some point (depending on the platform and
    661     the order bind and listen were called on each socket).
    662 
    663     However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
    664     will ever be raised when attempting to bind two identical host/ports. When
    665     accept() is called on each socket, the second caller's process will steal
    666     the port from the first caller, leaving them both in an awkwardly wedged
    667     state where they'll no longer respond to any signals or graceful kills, and
    668     must be forcibly killed via OpenProcess()/TerminateProcess().
    669 
    670     The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
    671     instead of SO_REUSEADDR, which effectively affords the same semantics as
    672     SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
    673     Source world compared to Windows ones, this is a common mistake.  A quick
    674     look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
    675     openssl.exe is called with the 's_server' option, for example. See
    676     http://bugs.python.org/issue2550 for more info.  The following site also
    677     has a very thorough description about the implications of both REUSEADDR
    678     and EXCLUSIVEADDRUSE on Windows:
    679     http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
    680 
    681     XXX: although this approach is a vast improvement on previous attempts to
    682     elicit unused ports, it rests heavily on the assumption that the ephemeral
    683     port returned to us by the OS won't immediately be dished back out to some
    684     other process when we close and delete our temporary socket but before our
    685     calling code has a chance to bind the returned port.  We can deal with this
    686     issue if/when we come across it.
    687     """
    688 
    689     tempsock = socket.socket(family, socktype)
    690     port = bind_port(tempsock)
    691     tempsock.close()
    692     del tempsock
    693     return port
    694 
    695 def bind_port(sock, host=HOST):
    696     """Bind the socket to a free port and return the port number.  Relies on
    697     ephemeral ports in order to ensure we are using an unbound port.  This is
    698     important as many tests may be running simultaneously, especially in a
    699     buildbot environment.  This method raises an exception if the sock.family
    700     is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
    701     or SO_REUSEPORT set on it.  Tests should *never* set these socket options
    702     for TCP/IP sockets.  The only case for setting these options is testing
    703     multicasting via multiple UDP sockets.
    704 
    705     Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
    706     on Windows), it will be set on the socket.  This will prevent anyone else
    707     from bind()'ing to our host/port for the duration of the test.
    708     """
    709 
    710     if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
    711         if hasattr(socket, 'SO_REUSEADDR'):
    712             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
    713                 raise TestFailed("tests should never set the SO_REUSEADDR "   \
    714                                  "socket option on TCP/IP sockets!")
    715         if hasattr(socket, 'SO_REUSEPORT'):
    716             try:
    717                 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
    718                     raise TestFailed("tests should never set the SO_REUSEPORT "   \
    719                                      "socket option on TCP/IP sockets!")
    720             except OSError:
    721                 # Python's socket module was compiled using modern headers
    722                 # thus defining SO_REUSEPORT but this process is running
    723                 # under an older kernel that does not support SO_REUSEPORT.
    724                 pass
    725         if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
    726             sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
    727 
    728     sock.bind((host, 0))
    729     port = sock.getsockname()[1]
    730     return port
    731 
    732 def bind_unix_socket(sock, addr):
    733     """Bind a unix socket, raising SkipTest if PermissionError is raised."""
    734     assert sock.family == socket.AF_UNIX
    735     try:
    736         sock.bind(addr)
    737     except PermissionError:
    738         sock.close()
    739         raise unittest.SkipTest('cannot bind AF_UNIX sockets')
    740 
    741 def _is_ipv6_enabled():
    742     """Check whether IPv6 is enabled on this host."""
    743     if socket.has_ipv6:
    744         sock = None
    745         try:
    746             sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
    747             sock.bind((HOSTv6, 0))
    748             return True
    749         except OSError:
    750             pass
    751         finally:
    752             if sock:
    753                 sock.close()
    754     return False
    755 
    756 IPV6_ENABLED = _is_ipv6_enabled()
    757 
    758 def system_must_validate_cert(f):
    759     """Skip the test on TLS certificate validation failures."""
    760     @functools.wraps(f)
    761     def dec(*args, **kwargs):
    762         try:
    763             f(*args, **kwargs)
    764         except OSError as e:
    765             if "CERTIFICATE_VERIFY_FAILED" in str(e):
    766                 raise unittest.SkipTest("system does not contain "
    767                                         "necessary certificates")
    768             raise
    769     return dec
    770 
    771 # A constant likely larger than the underlying OS pipe buffer size, to
    772 # make writes blocking.
    773 # Windows limit seems to be around 512 B, and many Unix kernels have a
    774 # 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
    775 # (see issue #17835 for a discussion of this number).
    776 PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
    777 
    778 # A constant likely larger than the underlying OS socket buffer size, to make
    779 # writes blocking.
    780 # The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
    781 # on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
    782 # for a discussion of this number).
    783 SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
    784 
    785 # decorator for skipping tests on non-IEEE 754 platforms
    786 requires_IEEE_754 = unittest.skipUnless(
    787     float.__getformat__("double").startswith("IEEE"),
    788     "test requires IEEE 754 doubles")
    789 
    790 requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
    791 
    792 requires_gzip = unittest.skipUnless(gzip, 'requires gzip')
    793 
    794 requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
    795 
    796 requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
    797 
    798 is_jython = sys.platform.startswith('java')
    799 
    800 is_android = hasattr(sys, 'getandroidapilevel')
    801 
    802 if sys.platform != 'win32':
    803     unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
    804 else:
    805     unix_shell = None
    806 
    807 # Filename used for testing
    808 if os.name == 'java':
    809     # Jython disallows @ in module names
    810     TESTFN = '$test'
    811 else:
    812     TESTFN = '@test'
    813 
    814 # Disambiguate TESTFN for parallel testing, while letting it remain a valid
    815 # module name.
    816 TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
    817 
    818 # Define the URL of a dedicated HTTP server for the network tests.
    819 # The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
    820 TEST_HTTP_URL = "http://www.pythontest.net"
    821 
    822 # FS_NONASCII: non-ASCII character encodable by os.fsencode(),
    823 # or None if there is no such character.
    824 FS_NONASCII = None
    825 for character in (
    826     # First try printable and common characters to have a readable filename.
    827     # For each character, the encoding list are just example of encodings able
    828     # to encode the character (the list is not exhaustive).
    829 
    830     # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
    831     '\u00E6',
    832     # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
    833     '\u0130',
    834     # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
    835     '\u0141',
    836     # U+03C6 (Greek Small Letter Phi): cp1253
    837     '\u03C6',
    838     # U+041A (Cyrillic Capital Letter Ka): cp1251
    839     '\u041A',
    840     # U+05D0 (Hebrew Letter Alef): Encodable to cp424
    841     '\u05D0',
    842     # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
    843     '\u060C',
    844     # U+062A (Arabic Letter Teh): cp720
    845     '\u062A',
    846     # U+0E01 (Thai Character Ko Kai): cp874
    847     '\u0E01',
    848 
    849     # Then try more "special" characters. "special" because they may be
    850     # interpreted or displayed differently depending on the exact locale
    851     # encoding and the font.
    852 
    853     # U+00A0 (No-Break Space)
    854     '\u00A0',
    855     # U+20AC (Euro Sign)
    856     '\u20AC',
    857 ):
    858     try:
    859         # If Python is set up to use the legacy 'mbcs' in Windows,
    860         # 'replace' error mode is used, and encode() returns b'?'
    861         # for characters missing in the ANSI codepage
    862         if os.fsdecode(os.fsencode(character)) != character:
    863             raise UnicodeError
    864     except UnicodeError:
    865         pass
    866     else:
    867         FS_NONASCII = character
    868         break
    869 
    870 # TESTFN_UNICODE is a non-ascii filename
    871 TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
    872 if sys.platform == 'darwin':
    873     # In Mac OS X's VFS API file names are, by definition, canonically
    874     # decomposed Unicode, encoded using UTF-8. See QA1173:
    875     # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
    876     import unicodedata
    877     TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
    878 TESTFN_ENCODING = sys.getfilesystemencoding()
    879 
    880 # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
    881 # encoded by the filesystem encoding (in strict mode). It can be None if we
    882 # cannot generate such filename.
    883 TESTFN_UNENCODABLE = None
    884 if os.name == 'nt':
    885     # skip win32s (0) or Windows 9x/ME (1)
    886     if sys.getwindowsversion().platform >= 2:
    887         # Different kinds of characters from various languages to minimize the
    888         # probability that the whole name is encodable to MBCS (issue #9819)
    889         TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
    890         try:
    891             TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
    892         except UnicodeEncodeError:
    893             pass
    894         else:
    895             print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
    896                   'Unicode filename tests may not be effective'
    897                   % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
    898             TESTFN_UNENCODABLE = None
    899 # Mac OS X denies unencodable filenames (invalid utf-8)
    900 elif sys.platform != 'darwin':
    901     try:
    902         # ascii and utf-8 cannot encode the byte 0xff
    903         b'\xff'.decode(TESTFN_ENCODING)
    904     except UnicodeDecodeError:
    905         # 0xff will be encoded using the surrogate character u+DCFF
    906         TESTFN_UNENCODABLE = TESTFN \
    907             + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
    908     else:
    909         # File system encoding (eg. ISO-8859-* encodings) can encode
    910         # the byte 0xff. Skip some unicode filename tests.
    911         pass
    912 
    913 # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
    914 # decoded from the filesystem encoding (in strict mode). It can be None if we
    915 # cannot generate such filename (ex: the latin1 encoding can decode any byte
    916 # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
    917 # to the surrogateescape error handler (PEP 383), but not from the filesystem
    918 # encoding in strict mode.
    919 TESTFN_UNDECODABLE = None
    920 for name in (
    921     # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
    922     # accepts it to create a file or a directory, or don't accept to enter to
    923     # such directory (when the bytes name is used). So test b'\xe7' first: it is
    924     # not decodable from cp932.
    925     b'\xe7w\xf0',
    926     # undecodable from ASCII, UTF-8
    927     b'\xff',
    928     # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
    929     # and cp857
    930     b'\xae\xd5'
    931     # undecodable from UTF-8 (UNIX and Mac OS X)
    932     b'\xed\xb2\x80', b'\xed\xb4\x80',
    933     # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
    934     # cp1253, cp1254, cp1255, cp1257, cp1258
    935     b'\x81\x98',
    936 ):
    937     try:
    938         name.decode(TESTFN_ENCODING)
    939     except UnicodeDecodeError:
    940         TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
    941         break
    942 
    943 if FS_NONASCII:
    944     TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
    945 else:
    946     TESTFN_NONASCII = None
    947 
    948 # Save the initial cwd
    949 SAVEDCWD = os.getcwd()
    950 
    951 # Set by libregrtest/main.py so we can skip tests that are not
    952 # useful for PGO
    953 PGO = False
    954 
    955 @contextlib.contextmanager
    956 def temp_dir(path=None, quiet=False):
    957     """Return a context manager that creates a temporary directory.
    958 
    959     Arguments:
    960 
    961       path: the directory to create temporarily.  If omitted or None,
    962         defaults to creating a temporary directory using tempfile.mkdtemp.
    963 
    964       quiet: if False (the default), the context manager raises an exception
    965         on error.  Otherwise, if the path is specified and cannot be
    966         created, only a warning is issued.
    967 
    968     """
    969     dir_created = False
    970     if path is None:
    971         path = tempfile.mkdtemp()
    972         dir_created = True
    973         path = os.path.realpath(path)
    974     else:
    975         try:
    976             os.mkdir(path)
    977             dir_created = True
    978         except OSError as exc:
    979             if not quiet:
    980                 raise
    981             warnings.warn(f'tests may fail, unable to create '
    982                           f'temporary directory {path!r}: {exc}',
    983                           RuntimeWarning, stacklevel=3)
    984     if dir_created:
    985         pid = os.getpid()
    986     try:
    987         yield path
    988     finally:
    989         # In case the process forks, let only the parent remove the
    990         # directory. The child has a diffent process id. (bpo-30028)
    991         if dir_created and pid == os.getpid():
    992             rmtree(path)
    993 
    994 @contextlib.contextmanager
    995 def change_cwd(path, quiet=False):
    996     """Return a context manager that changes the current working directory.
    997 
    998     Arguments:
    999 
   1000       path: the directory to use as the temporary current working directory.
   1001 
   1002       quiet: if False (the default), the context manager raises an exception
   1003         on error.  Otherwise, it issues only a warning and keeps the current
   1004         working directory the same.
   1005 
   1006     """
   1007     saved_dir = os.getcwd()
   1008     try:
   1009         os.chdir(path)
   1010     except OSError as exc:
   1011         if not quiet:
   1012             raise
   1013         warnings.warn(f'tests may fail, unable to change the current working '
   1014                       f'directory to {path!r}: {exc}',
   1015                       RuntimeWarning, stacklevel=3)
   1016     try:
   1017         yield os.getcwd()
   1018     finally:
   1019         os.chdir(saved_dir)
   1020 
   1021 
   1022 @contextlib.contextmanager
   1023 def temp_cwd(name='tempcwd', quiet=False):
   1024     """
   1025     Context manager that temporarily creates and changes the CWD.
   1026 
   1027     The function temporarily changes the current working directory
   1028     after creating a temporary directory in the current directory with
   1029     name *name*.  If *name* is None, the temporary directory is
   1030     created using tempfile.mkdtemp.
   1031 
   1032     If *quiet* is False (default) and it is not possible to
   1033     create or change the CWD, an error is raised.  If *quiet* is True,
   1034     only a warning is raised and the original CWD is used.
   1035 
   1036     """
   1037     with temp_dir(path=name, quiet=quiet) as temp_path:
   1038         with change_cwd(temp_path, quiet=quiet) as cwd_dir:
   1039             yield cwd_dir
   1040 
   1041 if hasattr(os, "umask"):
   1042     @contextlib.contextmanager
   1043     def temp_umask(umask):
   1044         """Context manager that temporarily sets the process umask."""
   1045         oldmask = os.umask(umask)
   1046         try:
   1047             yield
   1048         finally:
   1049             os.umask(oldmask)
   1050 
   1051 # TEST_HOME_DIR refers to the top level directory of the "test" package
   1052 # that contains Python's regression test suite
   1053 TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
   1054 TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
   1055 
   1056 # TEST_DATA_DIR is used as a target download location for remote resources
   1057 TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
   1058 
   1059 def findfile(filename, subdir=None):
   1060     """Try to find a file on sys.path or in the test directory.  If it is not
   1061     found the argument passed to the function is returned (this does not
   1062     necessarily signal failure; could still be the legitimate path).
   1063 
   1064     Setting *subdir* indicates a relative path to use to find the file
   1065     rather than looking directly in the path directories.
   1066     """
   1067     if os.path.isabs(filename):
   1068         return filename
   1069     if subdir is not None:
   1070         filename = os.path.join(subdir, filename)
   1071     path = [TEST_HOME_DIR] + sys.path
   1072     for dn in path:
   1073         fn = os.path.join(dn, filename)
   1074         if os.path.exists(fn): return fn
   1075     return filename
   1076 
   1077 def create_empty_file(filename):
   1078     """Create an empty file. If the file already exists, truncate it."""
   1079     fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
   1080     os.close(fd)
   1081 
   1082 def sortdict(dict):
   1083     "Like repr(dict), but in sorted order."
   1084     items = sorted(dict.items())
   1085     reprpairs = ["%r: %r" % pair for pair in items]
   1086     withcommas = ", ".join(reprpairs)
   1087     return "{%s}" % withcommas
   1088 
   1089 def make_bad_fd():
   1090     """
   1091     Create an invalid file descriptor by opening and closing a file and return
   1092     its fd.
   1093     """
   1094     file = open(TESTFN, "wb")
   1095     try:
   1096         return file.fileno()
   1097     finally:
   1098         file.close()
   1099         unlink(TESTFN)
   1100 
   1101 def check_syntax_error(testcase, statement, *, lineno=None, offset=None):
   1102     with testcase.assertRaises(SyntaxError) as cm:
   1103         compile(statement, '<test string>', 'exec')
   1104     err = cm.exception
   1105     testcase.assertIsNotNone(err.lineno)
   1106     if lineno is not None:
   1107         testcase.assertEqual(err.lineno, lineno)
   1108     testcase.assertIsNotNone(err.offset)
   1109     if offset is not None:
   1110         testcase.assertEqual(err.offset, offset)
   1111 
   1112 def open_urlresource(url, *args, **kw):
   1113     import urllib.request, urllib.parse
   1114 
   1115     check = kw.pop('check', None)
   1116 
   1117     filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
   1118 
   1119     fn = os.path.join(TEST_DATA_DIR, filename)
   1120 
   1121     def check_valid_file(fn):
   1122         f = open(fn, *args, **kw)
   1123         if check is None:
   1124             return f
   1125         elif check(f):
   1126             f.seek(0)
   1127             return f
   1128         f.close()
   1129 
   1130     if os.path.exists(fn):
   1131         f = check_valid_file(fn)
   1132         if f is not None:
   1133             return f
   1134         unlink(fn)
   1135 
   1136     # Verify the requirement before downloading the file
   1137     requires('urlfetch')
   1138 
   1139     if verbose:
   1140         print('\tfetching %s ...' % url, file=get_original_stdout())
   1141     opener = urllib.request.build_opener()
   1142     if gzip:
   1143         opener.addheaders.append(('Accept-Encoding', 'gzip'))
   1144     f = opener.open(url, timeout=15)
   1145     if gzip and f.headers.get('Content-Encoding') == 'gzip':
   1146         f = gzip.GzipFile(fileobj=f)
   1147     try:
   1148         with open(fn, "wb") as out:
   1149             s = f.read()
   1150             while s:
   1151                 out.write(s)
   1152                 s = f.read()
   1153     finally:
   1154         f.close()
   1155 
   1156     f = check_valid_file(fn)
   1157     if f is not None:
   1158         return f
   1159     raise TestFailed('invalid resource %r' % fn)
   1160 
   1161 
   1162 class WarningsRecorder(object):
   1163     """Convenience wrapper for the warnings list returned on
   1164        entry to the warnings.catch_warnings() context manager.
   1165     """
   1166     def __init__(self, warnings_list):
   1167         self._warnings = warnings_list
   1168         self._last = 0
   1169 
   1170     def __getattr__(self, attr):
   1171         if len(self._warnings) > self._last:
   1172             return getattr(self._warnings[-1], attr)
   1173         elif attr in warnings.WarningMessage._WARNING_DETAILS:
   1174             return None
   1175         raise AttributeError("%r has no attribute %r" % (self, attr))
   1176 
   1177     @property
   1178     def warnings(self):
   1179         return self._warnings[self._last:]
   1180 
   1181     def reset(self):
   1182         self._last = len(self._warnings)
   1183 
   1184 
   1185 def _filterwarnings(filters, quiet=False):
   1186     """Catch the warnings, then check if all the expected
   1187     warnings have been raised and re-raise unexpected warnings.
   1188     If 'quiet' is True, only re-raise the unexpected warnings.
   1189     """
   1190     # Clear the warning registry of the calling module
   1191     # in order to re-raise the warnings.
   1192     frame = sys._getframe(2)
   1193     registry = frame.f_globals.get('__warningregistry__')
   1194     if registry:
   1195         registry.clear()
   1196     with warnings.catch_warnings(record=True) as w:
   1197         # Set filter "always" to record all warnings.  Because
   1198         # test_warnings swap the module, we need to look up in
   1199         # the sys.modules dictionary.
   1200         sys.modules['warnings'].simplefilter("always")
   1201         yield WarningsRecorder(w)
   1202     # Filter the recorded warnings
   1203     reraise = list(w)
   1204     missing = []
   1205     for msg, cat in filters:
   1206         seen = False
   1207         for w in reraise[:]:
   1208             warning = w.message
   1209             # Filter out the matching messages
   1210             if (re.match(msg, str(warning), re.I) and
   1211                 issubclass(warning.__class__, cat)):
   1212                 seen = True
   1213                 reraise.remove(w)
   1214         if not seen and not quiet:
   1215             # This filter caught nothing
   1216             missing.append((msg, cat.__name__))
   1217     if reraise:
   1218         raise AssertionError("unhandled warning %s" % reraise[0])
   1219     if missing:
   1220         raise AssertionError("filter (%r, %s) did not catch any warning" %
   1221                              missing[0])
   1222 
   1223 
   1224 @contextlib.contextmanager
   1225 def check_warnings(*filters, **kwargs):
   1226     """Context manager to silence warnings.
   1227 
   1228     Accept 2-tuples as positional arguments:
   1229         ("message regexp", WarningCategory)
   1230 
   1231     Optional argument:
   1232      - if 'quiet' is True, it does not fail if a filter catches nothing
   1233         (default True without argument,
   1234          default False if some filters are defined)
   1235 
   1236     Without argument, it defaults to:
   1237         check_warnings(("", Warning), quiet=True)
   1238     """
   1239     quiet = kwargs.get('quiet')
   1240     if not filters:
   1241         filters = (("", Warning),)
   1242         # Preserve backward compatibility
   1243         if quiet is None:
   1244             quiet = True
   1245     return _filterwarnings(filters, quiet)
   1246 
   1247 
   1248 @contextlib.contextmanager
   1249 def check_no_resource_warning(testcase):
   1250     """Context manager to check that no ResourceWarning is emitted.
   1251 
   1252     Usage:
   1253 
   1254         with check_no_resource_warning(self):
   1255             f = open(...)
   1256             ...
   1257             del f
   1258 
   1259     You must remove the object which may emit ResourceWarning before
   1260     the end of the context manager.
   1261     """
   1262     with warnings.catch_warnings(record=True) as warns:
   1263         warnings.filterwarnings('always', category=ResourceWarning)
   1264         yield
   1265         gc_collect()
   1266     testcase.assertEqual(warns, [])
   1267 
   1268 
   1269 class CleanImport(object):
   1270     """Context manager to force import to return a new module reference.
   1271 
   1272     This is useful for testing module-level behaviours, such as
   1273     the emission of a DeprecationWarning on import.
   1274 
   1275     Use like this:
   1276 
   1277         with CleanImport("foo"):
   1278             importlib.import_module("foo") # new reference
   1279     """
   1280 
   1281     def __init__(self, *module_names):
   1282         self.original_modules = sys.modules.copy()
   1283         for module_name in module_names:
   1284             if module_name in sys.modules:
   1285                 module = sys.modules[module_name]
   1286                 # It is possible that module_name is just an alias for
   1287                 # another module (e.g. stub for modules renamed in 3.x).
   1288                 # In that case, we also need delete the real module to clear
   1289                 # the import cache.
   1290                 if module.__name__ != module_name:
   1291                     del sys.modules[module.__name__]
   1292                 del sys.modules[module_name]
   1293 
   1294     def __enter__(self):
   1295         return self
   1296 
   1297     def __exit__(self, *ignore_exc):
   1298         sys.modules.update(self.original_modules)
   1299 
   1300 
   1301 class EnvironmentVarGuard(collections.abc.MutableMapping):
   1302 
   1303     """Class to help protect the environment variable properly.  Can be used as
   1304     a context manager."""
   1305 
   1306     def __init__(self):
   1307         self._environ = os.environ
   1308         self._changed = {}
   1309 
   1310     def __getitem__(self, envvar):
   1311         return self._environ[envvar]
   1312 
   1313     def __setitem__(self, envvar, value):
   1314         # Remember the initial value on the first access
   1315         if envvar not in self._changed:
   1316             self._changed[envvar] = self._environ.get(envvar)
   1317         self._environ[envvar] = value
   1318 
   1319     def __delitem__(self, envvar):
   1320         # Remember the initial value on the first access
   1321         if envvar not in self._changed:
   1322             self._changed[envvar] = self._environ.get(envvar)
   1323         if envvar in self._environ:
   1324             del self._environ[envvar]
   1325 
   1326     def keys(self):
   1327         return self._environ.keys()
   1328 
   1329     def __iter__(self):
   1330         return iter(self._environ)
   1331 
   1332     def __len__(self):
   1333         return len(self._environ)
   1334 
   1335     def set(self, envvar, value):
   1336         self[envvar] = value
   1337 
   1338     def unset(self, envvar):
   1339         del self[envvar]
   1340 
   1341     def __enter__(self):
   1342         return self
   1343 
   1344     def __exit__(self, *ignore_exc):
   1345         for (k, v) in self._changed.items():
   1346             if v is None:
   1347                 if k in self._environ:
   1348                     del self._environ[k]
   1349             else:
   1350                 self._environ[k] = v
   1351         os.environ = self._environ
   1352 
   1353 
   1354 class DirsOnSysPath(object):
   1355     """Context manager to temporarily add directories to sys.path.
   1356 
   1357     This makes a copy of sys.path, appends any directories given
   1358     as positional arguments, then reverts sys.path to the copied
   1359     settings when the context ends.
   1360 
   1361     Note that *all* sys.path modifications in the body of the
   1362     context manager, including replacement of the object,
   1363     will be reverted at the end of the block.
   1364     """
   1365 
   1366     def __init__(self, *paths):
   1367         self.original_value = sys.path[:]
   1368         self.original_object = sys.path
   1369         sys.path.extend(paths)
   1370 
   1371     def __enter__(self):
   1372         return self
   1373 
   1374     def __exit__(self, *ignore_exc):
   1375         sys.path = self.original_object
   1376         sys.path[:] = self.original_value
   1377 
   1378 
   1379 class TransientResource(object):
   1380 
   1381     """Raise ResourceDenied if an exception is raised while the context manager
   1382     is in effect that matches the specified exception and attributes."""
   1383 
   1384     def __init__(self, exc, **kwargs):
   1385         self.exc = exc
   1386         self.attrs = kwargs
   1387 
   1388     def __enter__(self):
   1389         return self
   1390 
   1391     def __exit__(self, type_=None, value=None, traceback=None):
   1392         """If type_ is a subclass of self.exc and value has attributes matching
   1393         self.attrs, raise ResourceDenied.  Otherwise let the exception
   1394         propagate (if any)."""
   1395         if type_ is not None and issubclass(self.exc, type_):
   1396             for attr, attr_value in self.attrs.items():
   1397                 if not hasattr(value, attr):
   1398                     break
   1399                 if getattr(value, attr) != attr_value:
   1400                     break
   1401             else:
   1402                 raise ResourceDenied("an optional resource is not available")
   1403 
   1404 # Context managers that raise ResourceDenied when various issues
   1405 # with the Internet connection manifest themselves as exceptions.
   1406 # XXX deprecate these and use transient_internet() instead
   1407 time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
   1408 socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
   1409 ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
   1410 
   1411 
   1412 @contextlib.contextmanager
   1413 def transient_internet(resource_name, *, timeout=30.0, errnos=()):
   1414     """Return a context manager that raises ResourceDenied when various issues
   1415     with the Internet connection manifest themselves as exceptions."""
   1416     default_errnos = [
   1417         ('ECONNREFUSED', 111),
   1418         ('ECONNRESET', 104),
   1419         ('EHOSTUNREACH', 113),
   1420         ('ENETUNREACH', 101),
   1421         ('ETIMEDOUT', 110),
   1422         # socket.create_connection() fails randomly with
   1423         # EADDRNOTAVAIL on Travis CI.
   1424         ('EADDRNOTAVAIL', 99),
   1425     ]
   1426     default_gai_errnos = [
   1427         ('EAI_AGAIN', -3),
   1428         ('EAI_FAIL', -4),
   1429         ('EAI_NONAME', -2),
   1430         ('EAI_NODATA', -5),
   1431         # Encountered when trying to resolve IPv6-only hostnames
   1432         ('WSANO_DATA', 11004),
   1433     ]
   1434 
   1435     denied = ResourceDenied("Resource %r is not available" % resource_name)
   1436     captured_errnos = errnos
   1437     gai_errnos = []
   1438     if not captured_errnos:
   1439         captured_errnos = [getattr(errno, name, num)
   1440                            for (name, num) in default_errnos]
   1441         gai_errnos = [getattr(socket, name, num)
   1442                       for (name, num) in default_gai_errnos]
   1443 
   1444     def filter_error(err):
   1445         n = getattr(err, 'errno', None)
   1446         if (isinstance(err, socket.timeout) or
   1447             (isinstance(err, socket.gaierror) and n in gai_errnos) or
   1448             (isinstance(err, urllib.error.HTTPError) and
   1449              500 <= err.code <= 599) or
   1450             (isinstance(err, urllib.error.URLError) and
   1451                  (("ConnectionRefusedError" in err.reason) or
   1452                   ("TimeoutError" in err.reason) or
   1453                   ("EOFError" in err.reason))) or
   1454             n in captured_errnos):
   1455             if not verbose:
   1456                 sys.stderr.write(denied.args[0] + "\n")
   1457             raise denied from err
   1458 
   1459     old_timeout = socket.getdefaulttimeout()
   1460     try:
   1461         if timeout is not None:
   1462             socket.setdefaulttimeout(timeout)
   1463         yield
   1464     except nntplib.NNTPTemporaryError as err:
   1465         if verbose:
   1466             sys.stderr.write(denied.args[0] + "\n")
   1467         raise denied from err
   1468     except OSError as err:
   1469         # urllib can wrap original socket errors multiple times (!), we must
   1470         # unwrap to get at the original error.
   1471         while True:
   1472             a = err.args
   1473             if len(a) >= 1 and isinstance(a[0], OSError):
   1474                 err = a[0]
   1475             # The error can also be wrapped as args[1]:
   1476             #    except socket.error as msg:
   1477             #        raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
   1478             elif len(a) >= 2 and isinstance(a[1], OSError):
   1479                 err = a[1]
   1480             else:
   1481                 break
   1482         filter_error(err)
   1483         raise
   1484     # XXX should we catch generic exceptions and look for their
   1485     # __cause__ or __context__?
   1486     finally:
   1487         socket.setdefaulttimeout(old_timeout)
   1488 
   1489 
   1490 @contextlib.contextmanager
   1491 def captured_output(stream_name):
   1492     """Return a context manager used by captured_stdout/stdin/stderr
   1493     that temporarily replaces the sys stream *stream_name* with a StringIO."""
   1494     import io
   1495     orig_stdout = getattr(sys, stream_name)
   1496     setattr(sys, stream_name, io.StringIO())
   1497     try:
   1498         yield getattr(sys, stream_name)
   1499     finally:
   1500         setattr(sys, stream_name, orig_stdout)
   1501 
   1502 def captured_stdout():
   1503     """Capture the output of sys.stdout:
   1504 
   1505        with captured_stdout() as stdout:
   1506            print("hello")
   1507        self.assertEqual(stdout.getvalue(), "hello\\n")
   1508     """
   1509     return captured_output("stdout")
   1510 
   1511 def captured_stderr():
   1512     """Capture the output of sys.stderr:
   1513 
   1514        with captured_stderr() as stderr:
   1515            print("hello", file=sys.stderr)
   1516        self.assertEqual(stderr.getvalue(), "hello\\n")
   1517     """
   1518     return captured_output("stderr")
   1519 
   1520 def captured_stdin():
   1521     """Capture the input to sys.stdin:
   1522 
   1523        with captured_stdin() as stdin:
   1524            stdin.write('hello\\n')
   1525            stdin.seek(0)
   1526            # call test code that consumes from sys.stdin
   1527            captured = input()
   1528        self.assertEqual(captured, "hello")
   1529     """
   1530     return captured_output("stdin")
   1531 
   1532 
   1533 def gc_collect():
   1534     """Force as many objects as possible to be collected.
   1535 
   1536     In non-CPython implementations of Python, this is needed because timely
   1537     deallocation is not guaranteed by the garbage collector.  (Even in CPython
   1538     this can be the case in case of reference cycles.)  This means that __del__
   1539     methods may be called later than expected and weakrefs may remain alive for
   1540     longer than expected.  This function tries its best to force all garbage
   1541     objects to disappear.
   1542     """
   1543     gc.collect()
   1544     if is_jython:
   1545         time.sleep(0.1)
   1546     gc.collect()
   1547     gc.collect()
   1548 
   1549 @contextlib.contextmanager
   1550 def disable_gc():
   1551     have_gc = gc.isenabled()
   1552     gc.disable()
   1553     try:
   1554         yield
   1555     finally:
   1556         if have_gc:
   1557             gc.enable()
   1558 
   1559 
   1560 def python_is_optimized():
   1561     """Find if Python was built with optimizations."""
   1562     cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
   1563     final_opt = ""
   1564     for opt in cflags.split():
   1565         if opt.startswith('-O'):
   1566             final_opt = opt
   1567     return final_opt not in ('', '-O0', '-Og')
   1568 
   1569 
   1570 _header = 'nP'
   1571 _align = '0n'
   1572 if hasattr(sys, "gettotalrefcount"):
   1573     _header = '2P' + _header
   1574     _align = '0P'
   1575 _vheader = _header + 'n'
   1576 
   1577 def calcobjsize(fmt):
   1578     return struct.calcsize(_header + fmt + _align)
   1579 
   1580 def calcvobjsize(fmt):
   1581     return struct.calcsize(_vheader + fmt + _align)
   1582 
   1583 
   1584 _TPFLAGS_HAVE_GC = 1<<14
   1585 _TPFLAGS_HEAPTYPE = 1<<9
   1586 
   1587 def check_sizeof(test, o, size):
   1588     import _testcapi
   1589     result = sys.getsizeof(o)
   1590     # add GC header size
   1591     if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
   1592         ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
   1593         size += _testcapi.SIZEOF_PYGC_HEAD
   1594     msg = 'wrong size for %s: got %d, expected %d' \
   1595             % (type(o), result, size)
   1596     test.assertEqual(result, size, msg)
   1597 
   1598 #=======================================================================
   1599 # Decorator for running a function in a different locale, correctly resetting
   1600 # it afterwards.
   1601 
   1602 def run_with_locale(catstr, *locales):
   1603     def decorator(func):
   1604         def inner(*args, **kwds):
   1605             try:
   1606                 import locale
   1607                 category = getattr(locale, catstr)
   1608                 orig_locale = locale.setlocale(category)
   1609             except AttributeError:
   1610                 # if the test author gives us an invalid category string
   1611                 raise
   1612             except:
   1613                 # cannot retrieve original locale, so do nothing
   1614                 locale = orig_locale = None
   1615             else:
   1616                 for loc in locales:
   1617                     try:
   1618                         locale.setlocale(category, loc)
   1619                         break
   1620                     except:
   1621                         pass
   1622 
   1623             # now run the function, resetting the locale on exceptions
   1624             try:
   1625                 return func(*args, **kwds)
   1626             finally:
   1627                 if locale and orig_locale:
   1628                     locale.setlocale(category, orig_locale)
   1629         inner.__name__ = func.__name__
   1630         inner.__doc__ = func.__doc__
   1631         return inner
   1632     return decorator
   1633 
   1634 #=======================================================================
   1635 # Decorator for running a function in a specific timezone, correctly
   1636 # resetting it afterwards.
   1637 
   1638 def run_with_tz(tz):
   1639     def decorator(func):
   1640         def inner(*args, **kwds):
   1641             try:
   1642                 tzset = time.tzset
   1643             except AttributeError:
   1644                 raise unittest.SkipTest("tzset required")
   1645             if 'TZ' in os.environ:
   1646                 orig_tz = os.environ['TZ']
   1647             else:
   1648                 orig_tz = None
   1649             os.environ['TZ'] = tz
   1650             tzset()
   1651 
   1652             # now run the function, resetting the tz on exceptions
   1653             try:
   1654                 return func(*args, **kwds)
   1655             finally:
   1656                 if orig_tz is None:
   1657                     del os.environ['TZ']
   1658                 else:
   1659                     os.environ['TZ'] = orig_tz
   1660                 time.tzset()
   1661 
   1662         inner.__name__ = func.__name__
   1663         inner.__doc__ = func.__doc__
   1664         return inner
   1665     return decorator
   1666 
   1667 #=======================================================================
   1668 # Big-memory-test support. Separate from 'resources' because memory use
   1669 # should be configurable.
   1670 
   1671 # Some handy shorthands. Note that these are used for byte-limits as well
   1672 # as size-limits, in the various bigmem tests
   1673 _1M = 1024*1024
   1674 _1G = 1024 * _1M
   1675 _2G = 2 * _1G
   1676 _4G = 4 * _1G
   1677 
   1678 MAX_Py_ssize_t = sys.maxsize
   1679 
   1680 def set_memlimit(limit):
   1681     global max_memuse
   1682     global real_max_memuse
   1683     sizes = {
   1684         'k': 1024,
   1685         'm': _1M,
   1686         'g': _1G,
   1687         't': 1024*_1G,
   1688     }
   1689     m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
   1690                  re.IGNORECASE | re.VERBOSE)
   1691     if m is None:
   1692         raise ValueError('Invalid memory limit %r' % (limit,))
   1693     memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
   1694     real_max_memuse = memlimit
   1695     if memlimit > MAX_Py_ssize_t:
   1696         memlimit = MAX_Py_ssize_t
   1697     if memlimit < _2G - 1:
   1698         raise ValueError('Memory limit %r too low to be useful' % (limit,))
   1699     max_memuse = memlimit
   1700 
   1701 class _MemoryWatchdog:
   1702     """An object which periodically watches the process' memory consumption
   1703     and prints it out.
   1704     """
   1705 
   1706     def __init__(self):
   1707         self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
   1708         self.started = False
   1709 
   1710     def start(self):
   1711         try:
   1712             f = open(self.procfile, 'r')
   1713         except OSError as e:
   1714             warnings.warn('/proc not available for stats: {}'.format(e),
   1715                           RuntimeWarning)
   1716             sys.stderr.flush()
   1717             return
   1718 
   1719         watchdog_script = findfile("memory_watchdog.py")
   1720         self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
   1721                                              stdin=f, stderr=subprocess.DEVNULL)
   1722         f.close()
   1723         self.started = True
   1724 
   1725     def stop(self):
   1726         if self.started:
   1727             self.mem_watchdog.terminate()
   1728             self.mem_watchdog.wait()
   1729 
   1730 
   1731 def bigmemtest(size, memuse, dry_run=True):
   1732     """Decorator for bigmem tests.
   1733 
   1734     'size' is a requested size for the test (in arbitrary, test-interpreted
   1735     units.) 'memuse' is the number of bytes per unit for the test, or a good
   1736     estimate of it. For example, a test that needs two byte buffers, of 4 GiB
   1737     each, could be decorated with @bigmemtest(size=_4G, memuse=2).
   1738 
   1739     The 'size' argument is normally passed to the decorated test method as an
   1740     extra argument. If 'dry_run' is true, the value passed to the test method
   1741     may be less than the requested value. If 'dry_run' is false, it means the
   1742     test doesn't support dummy runs when -M is not specified.
   1743     """
   1744     def decorator(f):
   1745         def wrapper(self):
   1746             size = wrapper.size
   1747             memuse = wrapper.memuse
   1748             if not real_max_memuse:
   1749                 maxsize = 5147
   1750             else:
   1751                 maxsize = size
   1752 
   1753             if ((real_max_memuse or not dry_run)
   1754                 and real_max_memuse < maxsize * memuse):
   1755                 raise unittest.SkipTest(
   1756                     "not enough memory: %.1fG minimum needed"
   1757                     % (size * memuse / (1024 ** 3)))
   1758 
   1759             if real_max_memuse and verbose:
   1760                 print()
   1761                 print(" ... expected peak memory use: {peak:.1f}G"
   1762                       .format(peak=size * memuse / (1024 ** 3)))
   1763                 watchdog = _MemoryWatchdog()
   1764                 watchdog.start()
   1765             else:
   1766                 watchdog = None
   1767 
   1768             try:
   1769                 return f(self, maxsize)
   1770             finally:
   1771                 if watchdog:
   1772                     watchdog.stop()
   1773 
   1774         wrapper.size = size
   1775         wrapper.memuse = memuse
   1776         return wrapper
   1777     return decorator
   1778 
   1779 def bigaddrspacetest(f):
   1780     """Decorator for tests that fill the address space."""
   1781     def wrapper(self):
   1782         if max_memuse < MAX_Py_ssize_t:
   1783             if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
   1784                 raise unittest.SkipTest(
   1785                     "not enough memory: try a 32-bit build instead")
   1786             else:
   1787                 raise unittest.SkipTest(
   1788                     "not enough memory: %.1fG minimum needed"
   1789                     % (MAX_Py_ssize_t / (1024 ** 3)))
   1790         else:
   1791             return f(self)
   1792     return wrapper
   1793 
   1794 #=======================================================================
   1795 # unittest integration.
   1796 
   1797 class BasicTestRunner:
   1798     def run(self, test):
   1799         result = unittest.TestResult()
   1800         test(result)
   1801         return result
   1802 
   1803 def _id(obj):
   1804     return obj
   1805 
   1806 def requires_resource(resource):
   1807     if resource == 'gui' and not _is_gui_available():
   1808         return unittest.skip(_is_gui_available.reason)
   1809     if is_resource_enabled(resource):
   1810         return _id
   1811     else:
   1812         return unittest.skip("resource {0!r} is not enabled".format(resource))
   1813 
   1814 def cpython_only(test):
   1815     """
   1816     Decorator for tests only applicable on CPython.
   1817     """
   1818     return impl_detail(cpython=True)(test)
   1819 
   1820 def impl_detail(msg=None, **guards):
   1821     if check_impl_detail(**guards):
   1822         return _id
   1823     if msg is None:
   1824         guardnames, default = _parse_guards(guards)
   1825         if default:
   1826             msg = "implementation detail not available on {0}"
   1827         else:
   1828             msg = "implementation detail specific to {0}"
   1829         guardnames = sorted(guardnames.keys())
   1830         msg = msg.format(' or '.join(guardnames))
   1831     return unittest.skip(msg)
   1832 
   1833 def _parse_guards(guards):
   1834     # Returns a tuple ({platform_name: run_me}, default_value)
   1835     if not guards:
   1836         return ({'cpython': True}, False)
   1837     is_true = list(guards.values())[0]
   1838     assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
   1839     return (guards, not is_true)
   1840 
   1841 # Use the following check to guard CPython's implementation-specific tests --
   1842 # or to run them only on the implementation(s) guarded by the arguments.
   1843 def check_impl_detail(**guards):
   1844     """This function returns True or False depending on the host platform.
   1845        Examples:
   1846           if check_impl_detail():               # only on CPython (default)
   1847           if check_impl_detail(jython=True):    # only on Jython
   1848           if check_impl_detail(cpython=False):  # everywhere except on CPython
   1849     """
   1850     guards, default = _parse_guards(guards)
   1851     return guards.get(platform.python_implementation().lower(), default)
   1852 
   1853 
   1854 def no_tracing(func):
   1855     """Decorator to temporarily turn off tracing for the duration of a test."""
   1856     if not hasattr(sys, 'gettrace'):
   1857         return func
   1858     else:
   1859         @functools.wraps(func)
   1860         def wrapper(*args, **kwargs):
   1861             original_trace = sys.gettrace()
   1862             try:
   1863                 sys.settrace(None)
   1864                 return func(*args, **kwargs)
   1865             finally:
   1866                 sys.settrace(original_trace)
   1867         return wrapper
   1868 
   1869 
   1870 def refcount_test(test):
   1871     """Decorator for tests which involve reference counting.
   1872 
   1873     To start, the decorator does not run the test if is not run by CPython.
   1874     After that, any trace function is unset during the test to prevent
   1875     unexpected refcounts caused by the trace function.
   1876 
   1877     """
   1878     return no_tracing(cpython_only(test))
   1879 
   1880 
   1881 def _filter_suite(suite, pred):
   1882     """Recursively filter test cases in a suite based on a predicate."""
   1883     newtests = []
   1884     for test in suite._tests:
   1885         if isinstance(test, unittest.TestSuite):
   1886             _filter_suite(test, pred)
   1887             newtests.append(test)
   1888         else:
   1889             if pred(test):
   1890                 newtests.append(test)
   1891     suite._tests = newtests
   1892 
   1893 def _run_suite(suite):
   1894     """Run tests from a unittest.TestSuite-derived class."""
   1895     runner = get_test_runner(sys.stdout,
   1896                              verbosity=verbose,
   1897                              capture_output=(junit_xml_list is not None))
   1898 
   1899     result = runner.run(suite)
   1900 
   1901     if junit_xml_list is not None:
   1902         junit_xml_list.append(result.get_xml_element())
   1903 
   1904     if not result.testsRun and not result.skipped:
   1905         raise TestDidNotRun
   1906     if not result.wasSuccessful():
   1907         if len(result.errors) == 1 and not result.failures:
   1908             err = result.errors[0][1]
   1909         elif len(result.failures) == 1 and not result.errors:
   1910             err = result.failures[0][1]
   1911         else:
   1912             err = "multiple errors occurred"
   1913             if not verbose: err += "; run in verbose mode for details"
   1914         raise TestFailed(err)
   1915 
   1916 
   1917 # By default, don't filter tests
   1918 _match_test_func = None
   1919 _match_test_patterns = None
   1920 
   1921 
   1922 def match_test(test):
   1923     # Function used by support.run_unittest() and regrtest --list-cases
   1924     if _match_test_func is None:
   1925         return True
   1926     else:
   1927         return _match_test_func(test.id())
   1928 
   1929 
   1930 def _is_full_match_test(pattern):
   1931     # If a pattern contains at least one dot, it's considered
   1932     # as a full test identifier.
   1933     # Example: 'test.test_os.FileTests.test_access'.
   1934     #
   1935     # Reject patterns which contain fnmatch patterns: '*', '?', '[...]'
   1936     # or '[!...]'. For example, reject 'test_access*'.
   1937     return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
   1938 
   1939 
   1940 def set_match_tests(patterns):
   1941     global _match_test_func, _match_test_patterns
   1942 
   1943     if patterns == _match_test_patterns:
   1944         # No change: no need to recompile patterns.
   1945         return
   1946 
   1947     if not patterns:
   1948         func = None
   1949         # set_match_tests(None) behaves as set_match_tests(())
   1950         patterns = ()
   1951     elif all(map(_is_full_match_test, patterns)):
   1952         # Simple case: all patterns are full test identifier.
   1953         # The test.bisect_cmd utility only uses such full test identifiers.
   1954         func = set(patterns).__contains__
   1955     else:
   1956         regex = '|'.join(map(fnmatch.translate, patterns))
   1957         # The search *is* case sensitive on purpose:
   1958         # don't use flags=re.IGNORECASE
   1959         regex_match = re.compile(regex).match
   1960 
   1961         def match_test_regex(test_id):
   1962             if regex_match(test_id):
   1963                 # The regex matchs the whole identifier like
   1964                 # 'test.test_os.FileTests.test_access'
   1965                 return True
   1966             else:
   1967                 # Try to match parts of the test identifier.
   1968                 # For example, split 'test.test_os.FileTests.test_access'
   1969                 # into: 'test', 'test_os', 'FileTests' and 'test_access'.
   1970                 return any(map(regex_match, test_id.split(".")))
   1971 
   1972         func = match_test_regex
   1973 
   1974     # Create a copy since patterns can be mutable and so modified later
   1975     _match_test_patterns = tuple(patterns)
   1976     _match_test_func = func
   1977 
   1978 
   1979 
   1980 def run_unittest(*classes):
   1981     """Run tests from unittest.TestCase-derived classes."""
   1982     valid_types = (unittest.TestSuite, unittest.TestCase)
   1983     suite = unittest.TestSuite()
   1984     for cls in classes:
   1985         if isinstance(cls, str):
   1986             if cls in sys.modules:
   1987                 suite.addTest(unittest.findTestCases(sys.modules[cls]))
   1988             else:
   1989                 raise ValueError("str arguments must be keys in sys.modules")
   1990         elif isinstance(cls, valid_types):
   1991             suite.addTest(cls)
   1992         else:
   1993             suite.addTest(unittest.makeSuite(cls))
   1994     _filter_suite(suite, match_test)
   1995     _run_suite(suite)
   1996 
   1997 #=======================================================================
   1998 # Check for the presence of docstrings.
   1999 
   2000 # Rather than trying to enumerate all the cases where docstrings may be
   2001 # disabled, we just check for that directly
   2002 
   2003 def _check_docstrings():
   2004     """Just used to check if docstrings are enabled"""
   2005 
   2006 MISSING_C_DOCSTRINGS = (check_impl_detail() and
   2007                         sys.platform != 'win32' and
   2008                         not sysconfig.get_config_var('WITH_DOC_STRINGS'))
   2009 
   2010 HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
   2011                    not MISSING_C_DOCSTRINGS)
   2012 
   2013 requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
   2014                                           "test requires docstrings")
   2015 
   2016 
   2017 #=======================================================================
   2018 # doctest driver.
   2019 
   2020 def run_doctest(module, verbosity=None, optionflags=0):
   2021     """Run doctest on the given module.  Return (#failures, #tests).
   2022 
   2023     If optional argument verbosity is not specified (or is None), pass
   2024     support's belief about verbosity on to doctest.  Else doctest's
   2025     usual behavior is used (it searches sys.argv for -v).
   2026     """
   2027 
   2028     import doctest
   2029 
   2030     if verbosity is None:
   2031         verbosity = verbose
   2032     else:
   2033         verbosity = None
   2034 
   2035     f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
   2036     if f:
   2037         raise TestFailed("%d of %d doctests failed" % (f, t))
   2038     if verbose:
   2039         print('doctest (%s) ... %d tests with zero failures' %
   2040               (module.__name__, t))
   2041     return f, t
   2042 
   2043 
   2044 #=======================================================================
   2045 # Support for saving and restoring the imported modules.
   2046 
   2047 def modules_setup():
   2048     return sys.modules.copy(),
   2049 
   2050 def modules_cleanup(oldmodules):
   2051     # Encoders/decoders are registered permanently within the internal
   2052     # codec cache. If we destroy the corresponding modules their
   2053     # globals will be set to None which will trip up the cached functions.
   2054     encodings = [(k, v) for k, v in sys.modules.items()
   2055                  if k.startswith('encodings.')]
   2056     sys.modules.clear()
   2057     sys.modules.update(encodings)
   2058     # XXX: This kind of problem can affect more than just encodings. In particular
   2059     # extension modules (such as _ssl) don't cope with reloading properly.
   2060     # Really, test modules should be cleaning out the test specific modules they
   2061     # know they added (ala test_runpy) rather than relying on this function (as
   2062     # test_importhooks and test_pkg do currently).
   2063     # Implicitly imported *real* modules should be left alone (see issue 10556).
   2064     sys.modules.update(oldmodules)
   2065 
   2066 #=======================================================================
   2067 # Threading support to prevent reporting refleaks when running regrtest.py -R
   2068 
   2069 # Flag used by saved_test_environment of test.libregrtest.save_env,
   2070 # to check if a test modified the environment. The flag should be set to False
   2071 # before running a new test.
   2072 #
   2073 # For example, threading_cleanup() sets the flag is the function fails
   2074 # to cleanup threads.
   2075 environment_altered = False
   2076 
   2077 # NOTE: we use thread._count() rather than threading.enumerate() (or the
   2078 # moral equivalent thereof) because a threading.Thread object is still alive
   2079 # until its __bootstrap() method has returned, even after it has been
   2080 # unregistered from the threading module.
   2081 # thread._count(), on the other hand, only gets decremented *after* the
   2082 # __bootstrap() method has returned, which gives us reliable reference counts
   2083 # at the end of a test run.
   2084 
   2085 def threading_setup():
   2086     return _thread._count(), threading._dangling.copy()
   2087 
   2088 def threading_cleanup(*original_values):
   2089     global environment_altered
   2090 
   2091     _MAX_COUNT = 100
   2092 
   2093     for count in range(_MAX_COUNT):
   2094         values = _thread._count(), threading._dangling
   2095         if values == original_values:
   2096             break
   2097 
   2098         if not count:
   2099             # Display a warning at the first iteration
   2100             environment_altered = True
   2101             dangling_threads = values[1]
   2102             print("Warning -- threading_cleanup() failed to cleanup "
   2103                   "%s threads (count: %s, dangling: %s)"
   2104                   % (values[0] - original_values[0],
   2105                      values[0], len(dangling_threads)),
   2106                   file=sys.stderr)
   2107             for thread in dangling_threads:
   2108                 print(f"Dangling thread: {thread!r}", file=sys.stderr)
   2109             sys.stderr.flush()
   2110 
   2111             # Don't hold references to threads
   2112             dangling_threads = None
   2113         values = None
   2114 
   2115         time.sleep(0.01)
   2116         gc_collect()
   2117 
   2118 
   2119 def reap_threads(func):
   2120     """Use this function when threads are being used.  This will
   2121     ensure that the threads are cleaned up even when the test fails.
   2122     """
   2123     @functools.wraps(func)
   2124     def decorator(*args):
   2125         key = threading_setup()
   2126         try:
   2127             return func(*args)
   2128         finally:
   2129             threading_cleanup(*key)
   2130     return decorator
   2131 
   2132 
   2133 @contextlib.contextmanager
   2134 def wait_threads_exit(timeout=60.0):
   2135     """
   2136     bpo-31234: Context manager to wait until all threads created in the with
   2137     statement exit.
   2138 
   2139     Use _thread.count() to check if threads exited. Indirectly, wait until
   2140     threads exit the internal t_bootstrap() C function of the _thread module.
   2141 
   2142     threading_setup() and threading_cleanup() are designed to emit a warning
   2143     if a test leaves running threads in the background. This context manager
   2144     is designed to cleanup threads started by the _thread.start_new_thread()
   2145     which doesn't allow to wait for thread exit, whereas thread.Thread has a
   2146     join() method.
   2147     """
   2148     old_count = _thread._count()
   2149     try:
   2150         yield
   2151     finally:
   2152         start_time = time.monotonic()
   2153         deadline = start_time + timeout
   2154         while True:
   2155             count = _thread._count()
   2156             if count <= old_count:
   2157                 break
   2158             if time.monotonic() > deadline:
   2159                 dt = time.monotonic() - start_time
   2160                 msg = (f"wait_threads() failed to cleanup {count - old_count} "
   2161                        f"threads after {dt:.1f} seconds "
   2162                        f"(count: {count}, old count: {old_count})")
   2163                 raise AssertionError(msg)
   2164             time.sleep(0.010)
   2165             gc_collect()
   2166 
   2167 
   2168 def join_thread(thread, timeout=30.0):
   2169     """Join a thread. Raise an AssertionError if the thread is still alive
   2170     after timeout seconds.
   2171     """
   2172     thread.join(timeout)
   2173     if thread.is_alive():
   2174         msg = f"failed to join the thread in {timeout:.1f} seconds"
   2175         raise AssertionError(msg)
   2176 
   2177 
   2178 def reap_children():
   2179     """Use this function at the end of test_main() whenever sub-processes
   2180     are started.  This will help ensure that no extra children (zombies)
   2181     stick around to hog resources and create problems when looking
   2182     for refleaks.
   2183     """
   2184     global environment_altered
   2185 
   2186     # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
   2187     if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
   2188         return
   2189 
   2190     # Reap all our dead child processes so we don't leave zombies around.
   2191     # These hog resources and might be causing some of the buildbots to die.
   2192     while True:
   2193         try:
   2194             # Read the exit status of any child process which already completed
   2195             pid, status = os.waitpid(-1, os.WNOHANG)
   2196         except OSError:
   2197             break
   2198 
   2199         if pid == 0:
   2200             break
   2201 
   2202         print("Warning -- reap_children() reaped child process %s"
   2203               % pid, file=sys.stderr)
   2204         environment_altered = True
   2205 
   2206 
   2207 @contextlib.contextmanager
   2208 def start_threads(threads, unlock=None):
   2209     threads = list(threads)
   2210     started = []
   2211     try:
   2212         try:
   2213             for t in threads:
   2214                 t.start()
   2215                 started.append(t)
   2216         except:
   2217             if verbose:
   2218                 print("Can't start %d threads, only %d threads started" %
   2219                       (len(threads), len(started)))
   2220             raise
   2221         yield
   2222     finally:
   2223         try:
   2224             if unlock:
   2225                 unlock()
   2226             endtime = starttime = time.monotonic()
   2227             for timeout in range(1, 16):
   2228                 endtime += 60
   2229                 for t in started:
   2230                     t.join(max(endtime - time.monotonic(), 0.01))
   2231                 started = [t for t in started if t.is_alive()]
   2232                 if not started:
   2233                     break
   2234                 if verbose:
   2235                     print('Unable to join %d threads during a period of '
   2236                           '%d minutes' % (len(started), timeout))
   2237         finally:
   2238             started = [t for t in started if t.is_alive()]
   2239             if started:
   2240                 faulthandler.dump_traceback(sys.stdout)
   2241                 raise AssertionError('Unable to join %d threads' % len(started))
   2242 
   2243 @contextlib.contextmanager
   2244 def swap_attr(obj, attr, new_val):
   2245     """Temporary swap out an attribute with a new object.
   2246 
   2247     Usage:
   2248         with swap_attr(obj, "attr", 5):
   2249             ...
   2250 
   2251         This will set obj.attr to 5 for the duration of the with: block,
   2252         restoring the old value at the end of the block. If `attr` doesn't
   2253         exist on `obj`, it will be created and then deleted at the end of the
   2254         block.
   2255 
   2256         The old value (or None if it doesn't exist) will be assigned to the
   2257         target of the "as" clause, if there is one.
   2258     """
   2259     if hasattr(obj, attr):
   2260         real_val = getattr(obj, attr)
   2261         setattr(obj, attr, new_val)
   2262         try:
   2263             yield real_val
   2264         finally:
   2265             setattr(obj, attr, real_val)
   2266     else:
   2267         setattr(obj, attr, new_val)
   2268         try:
   2269             yield
   2270         finally:
   2271             if hasattr(obj, attr):
   2272                 delattr(obj, attr)
   2273 
   2274 @contextlib.contextmanager
   2275 def swap_item(obj, item, new_val):
   2276     """Temporary swap out an item with a new object.
   2277 
   2278     Usage:
   2279         with swap_item(obj, "item", 5):
   2280             ...
   2281 
   2282         This will set obj["item"] to 5 for the duration of the with: block,
   2283         restoring the old value at the end of the block. If `item` doesn't
   2284         exist on `obj`, it will be created and then deleted at the end of the
   2285         block.
   2286 
   2287         The old value (or None if it doesn't exist) will be assigned to the
   2288         target of the "as" clause, if there is one.
   2289     """
   2290     if item in obj:
   2291         real_val = obj[item]
   2292         obj[item] = new_val
   2293         try:
   2294             yield real_val
   2295         finally:
   2296             obj[item] = real_val
   2297     else:
   2298         obj[item] = new_val
   2299         try:
   2300             yield
   2301         finally:
   2302             if item in obj:
   2303                 del obj[item]
   2304 
   2305 def strip_python_stderr(stderr):
   2306     """Strip the stderr of a Python process from potential debug output
   2307     emitted by the interpreter.
   2308 
   2309     This will typically be run on the result of the communicate() method
   2310     of a subprocess.Popen object.
   2311     """
   2312     stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
   2313     return stderr
   2314 
   2315 requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'),
   2316                         'types are immortal if COUNT_ALLOCS is defined')
   2317 
   2318 def args_from_interpreter_flags():
   2319     """Return a list of command-line arguments reproducing the current
   2320     settings in sys.flags and sys.warnoptions."""
   2321     return subprocess._args_from_interpreter_flags()
   2322 
   2323 def optim_args_from_interpreter_flags():
   2324     """Return a list of command-line arguments reproducing the current
   2325     optimization settings in sys.flags."""
   2326     return subprocess._optim_args_from_interpreter_flags()
   2327 
   2328 #============================================================
   2329 # Support for assertions about logging.
   2330 #============================================================
   2331 
   2332 class TestHandler(logging.handlers.BufferingHandler):
   2333     def __init__(self, matcher):
   2334         # BufferingHandler takes a "capacity" argument
   2335         # so as to know when to flush. As we're overriding
   2336         # shouldFlush anyway, we can set a capacity of zero.
   2337         # You can call flush() manually to clear out the
   2338         # buffer.
   2339         logging.handlers.BufferingHandler.__init__(self, 0)
   2340         self.matcher = matcher
   2341 
   2342     def shouldFlush(self):
   2343         return False
   2344 
   2345     def emit(self, record):
   2346         self.format(record)
   2347         self.buffer.append(record.__dict__)
   2348 
   2349     def matches(self, **kwargs):
   2350         """
   2351         Look for a saved dict whose keys/values match the supplied arguments.
   2352         """
   2353         result = False
   2354         for d in self.buffer:
   2355             if self.matcher.matches(d, **kwargs):
   2356                 result = True
   2357                 break
   2358         return result
   2359 
   2360 class Matcher(object):
   2361 
   2362     _partial_matches = ('msg', 'message')
   2363 
   2364     def matches(self, d, **kwargs):
   2365         """
   2366         Try to match a single dict with the supplied arguments.
   2367 
   2368         Keys whose values are strings and which are in self._partial_matches
   2369         will be checked for partial (i.e. substring) matches. You can extend
   2370         this scheme to (for example) do regular expression matching, etc.
   2371         """
   2372         result = True
   2373         for k in kwargs:
   2374             v = kwargs[k]
   2375             dv = d.get(k)
   2376             if not self.match_value(k, dv, v):
   2377                 result = False
   2378                 break
   2379         return result
   2380 
   2381     def match_value(self, k, dv, v):
   2382         """
   2383         Try to match a single stored value (dv) with a supplied value (v).
   2384         """
   2385         if type(v) != type(dv):
   2386             result = False
   2387         elif type(dv) is not str or k not in self._partial_matches:
   2388             result = (v == dv)
   2389         else:
   2390             result = dv.find(v) >= 0
   2391         return result
   2392 
   2393 
   2394 _can_symlink = None
   2395 def can_symlink():
   2396     global _can_symlink
   2397     if _can_symlink is not None:
   2398         return _can_symlink
   2399     symlink_path = TESTFN + "can_symlink"
   2400     try:
   2401         os.symlink(TESTFN, symlink_path)
   2402         can = True
   2403     except (OSError, NotImplementedError, AttributeError):
   2404         can = False
   2405     else:
   2406         os.remove(symlink_path)
   2407     _can_symlink = can
   2408     return can
   2409 
   2410 def skip_unless_symlink(test):
   2411     """Skip decorator for tests that require functional symlink"""
   2412     ok = can_symlink()
   2413     msg = "Requires functional symlink implementation"
   2414     return test if ok else unittest.skip(msg)(test)
   2415 
   2416 _can_xattr = None
   2417 def can_xattr():
   2418     global _can_xattr
   2419     if _can_xattr is not None:
   2420         return _can_xattr
   2421     if not hasattr(os, "setxattr"):
   2422         can = False
   2423     else:
   2424         tmp_dir = tempfile.mkdtemp()
   2425         tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
   2426         try:
   2427             with open(TESTFN, "wb") as fp:
   2428                 try:
   2429                     # TESTFN & tempfile may use different file systems with
   2430                     # different capabilities
   2431                     os.setxattr(tmp_fp, b"user.test", b"")
   2432                     os.setxattr(tmp_name, b"trusted.foo", b"42")
   2433                     os.setxattr(fp.fileno(), b"user.test", b"")
   2434                     # Kernels < 2.6.39 don't respect setxattr flags.
   2435                     kernel_version = platform.release()
   2436                     m = re.match(r"2.6.(\d{1,2})", kernel_version)
   2437                     can = m is None or int(m.group(1)) >= 39
   2438                 except OSError:
   2439                     can = False
   2440         finally:
   2441             unlink(TESTFN)
   2442             unlink(tmp_name)
   2443             rmdir(tmp_dir)
   2444     _can_xattr = can
   2445     return can
   2446 
   2447 def skip_unless_xattr(test):
   2448     """Skip decorator for tests that require functional extended attributes"""
   2449     ok = can_xattr()
   2450     msg = "no non-broken extended attribute support"
   2451     return test if ok else unittest.skip(msg)(test)
   2452 
   2453 _bind_nix_socket_error = None
   2454 def skip_unless_bind_unix_socket(test):
   2455     """Decorator for tests requiring a functional bind() for unix sockets."""
   2456     if not hasattr(socket, 'AF_UNIX'):
   2457         return unittest.skip('No UNIX Sockets')(test)
   2458     global _bind_nix_socket_error
   2459     if _bind_nix_socket_error is None:
   2460         path = TESTFN + "can_bind_unix_socket"
   2461         with socket.socket(socket.AF_UNIX) as sock:
   2462             try:
   2463                 sock.bind(path)
   2464                 _bind_nix_socket_error = False
   2465             except OSError as e:
   2466                 _bind_nix_socket_error = e
   2467             finally:
   2468                 unlink(path)
   2469     if _bind_nix_socket_error:
   2470         msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
   2471         return unittest.skip(msg)(test)
   2472     else:
   2473         return test
   2474 
   2475 
   2476 def fs_is_case_insensitive(directory):
   2477     """Detects if the file system for the specified directory is case-insensitive."""
   2478     with tempfile.NamedTemporaryFile(dir=directory) as base:
   2479         base_path = base.name
   2480         case_path = base_path.upper()
   2481         if case_path == base_path:
   2482             case_path = base_path.lower()
   2483         try:
   2484             return os.path.samefile(base_path, case_path)
   2485         except FileNotFoundError:
   2486             return False
   2487 
   2488 
   2489 def detect_api_mismatch(ref_api, other_api, *, ignore=()):
   2490     """Returns the set of items in ref_api not in other_api, except for a
   2491     defined list of items to be ignored in this check.
   2492 
   2493     By default this skips private attributes beginning with '_' but
   2494     includes all magic methods, i.e. those starting and ending in '__'.
   2495     """
   2496     missing_items = set(dir(ref_api)) - set(dir(other_api))
   2497     if ignore:
   2498         missing_items -= set(ignore)
   2499     missing_items = set(m for m in missing_items
   2500                         if not m.startswith('_') or m.endswith('__'))
   2501     return missing_items
   2502 
   2503 
   2504 def check__all__(test_case, module, name_of_module=None, extra=(),
   2505                  blacklist=()):
   2506     """Assert that the __all__ variable of 'module' contains all public names.
   2507 
   2508     The module's public names (its API) are detected automatically based on
   2509     whether they match the public name convention and were defined in
   2510     'module'.
   2511 
   2512     The 'name_of_module' argument can specify (as a string or tuple thereof)
   2513     what module(s) an API could be defined in in order to be detected as a
   2514     public API. One case for this is when 'module' imports part of its public
   2515     API from other modules, possibly a C backend (like 'csv' and its '_csv').
   2516 
   2517     The 'extra' argument can be a set of names that wouldn't otherwise be
   2518     automatically detected as "public", like objects without a proper
   2519     '__module__' attribute. If provided, it will be added to the
   2520     automatically detected ones.
   2521 
   2522     The 'blacklist' argument can be a set of names that must not be treated
   2523     as part of the public API even though their names indicate otherwise.
   2524 
   2525     Usage:
   2526         import bar
   2527         import foo
   2528         import unittest
   2529         from test import support
   2530 
   2531         class MiscTestCase(unittest.TestCase):
   2532             def test__all__(self):
   2533                 support.check__all__(self, foo)
   2534 
   2535         class OtherTestCase(unittest.TestCase):
   2536             def test__all__(self):
   2537                 extra = {'BAR_CONST', 'FOO_CONST'}
   2538                 blacklist = {'baz'}  # Undocumented name.
   2539                 # bar imports part of its API from _bar.
   2540                 support.check__all__(self, bar, ('bar', '_bar'),
   2541                                      extra=extra, blacklist=blacklist)
   2542 
   2543     """
   2544 
   2545     if name_of_module is None:
   2546         name_of_module = (module.__name__, )
   2547     elif isinstance(name_of_module, str):
   2548         name_of_module = (name_of_module, )
   2549 
   2550     expected = set(extra)
   2551 
   2552     for name in dir(module):
   2553         if name.startswith('_') or name in blacklist:
   2554             continue
   2555         obj = getattr(module, name)
   2556         if (getattr(obj, '__module__', None) in name_of_module or
   2557                 (not hasattr(obj, '__module__') and
   2558                  not isinstance(obj, types.ModuleType))):
   2559             expected.add(name)
   2560     test_case.assertCountEqual(module.__all__, expected)
   2561 
   2562 
   2563 class SuppressCrashReport:
   2564     """Try to prevent a crash report from popping up.
   2565 
   2566     On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
   2567     disable the creation of coredump file.
   2568     """
   2569     old_value = None
   2570     old_modes = None
   2571 
   2572     def __enter__(self):
   2573         """On Windows, disable Windows Error Reporting dialogs using
   2574         SetErrorMode.
   2575 
   2576         On UNIX, try to save the previous core file size limit, then set
   2577         soft limit to 0.
   2578         """
   2579         if sys.platform.startswith('win'):
   2580             # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
   2581             # GetErrorMode is not available on Windows XP and Windows Server 2003,
   2582             # but SetErrorMode returns the previous value, so we can use that
   2583             import ctypes
   2584             self._k32 = ctypes.windll.kernel32
   2585             SEM_NOGPFAULTERRORBOX = 0x02
   2586             self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
   2587             self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX)
   2588 
   2589             # Suppress assert dialogs in debug builds
   2590             # (see http://bugs.python.org/issue23314)
   2591             try:
   2592                 import msvcrt
   2593                 msvcrt.CrtSetReportMode
   2594             except (AttributeError, ImportError):
   2595                 # no msvcrt or a release build
   2596                 pass
   2597             else:
   2598                 self.old_modes = {}
   2599                 for report_type in [msvcrt.CRT_WARN,
   2600                                     msvcrt.CRT_ERROR,
   2601                                     msvcrt.CRT_ASSERT]:
   2602                     old_mode = msvcrt.CrtSetReportMode(report_type,
   2603                             msvcrt.CRTDBG_MODE_FILE)
   2604                     old_file = msvcrt.CrtSetReportFile(report_type,
   2605                             msvcrt.CRTDBG_FILE_STDERR)
   2606                     self.old_modes[report_type] = old_mode, old_file
   2607 
   2608         else:
   2609             if resource is not None:
   2610                 try:
   2611                     self.old_value = resource.getrlimit(resource.RLIMIT_CORE)
   2612                     resource.setrlimit(resource.RLIMIT_CORE,
   2613                                        (0, self.old_value[1]))
   2614                 except (ValueError, OSError):
   2615                     pass
   2616 
   2617             if sys.platform == 'darwin':
   2618                 # Check if the 'Crash Reporter' on OSX was configured
   2619                 # in 'Developer' mode and warn that it will get triggered
   2620                 # when it is.
   2621                 #
   2622                 # This assumes that this context manager is used in tests
   2623                 # that might trigger the next manager.
   2624                 cmd = ['/usr/bin/defaults', 'read',
   2625                        'com.apple.CrashReporter', 'DialogType']
   2626                 proc = subprocess.Popen(cmd,
   2627                                         stdout=subprocess.PIPE,
   2628                                         stderr=subprocess.PIPE)
   2629                 with proc:
   2630                     stdout = proc.communicate()[0]
   2631                 if stdout.strip() == b'developer':
   2632                     print("this test triggers the Crash Reporter, "
   2633                           "that is intentional", end='', flush=True)
   2634 
   2635         return self
   2636 
   2637     def __exit__(self, *ignore_exc):
   2638         """Restore Windows ErrorMode or core file behavior to initial value."""
   2639         if self.old_value is None:
   2640             return
   2641 
   2642         if sys.platform.startswith('win'):
   2643             self._k32.SetErrorMode(self.old_value)
   2644 
   2645             if self.old_modes:
   2646                 import msvcrt
   2647                 for report_type, (old_mode, old_file) in self.old_modes.items():
   2648                     msvcrt.CrtSetReportMode(report_type, old_mode)
   2649                     msvcrt.CrtSetReportFile(report_type, old_file)
   2650         else:
   2651             if resource is not None:
   2652                 try:
   2653                     resource.setrlimit(resource.RLIMIT_CORE, self.old_value)
   2654                 except (ValueError, OSError):
   2655                     pass
   2656 
   2657 
   2658 def patch(test_instance, object_to_patch, attr_name, new_value):
   2659     """Override 'object_to_patch'.'attr_name' with 'new_value'.
   2660 
   2661     Also, add a cleanup procedure to 'test_instance' to restore
   2662     'object_to_patch' value for 'attr_name'.
   2663     The 'attr_name' should be a valid attribute for 'object_to_patch'.
   2664 
   2665     """
   2666     # check that 'attr_name' is a real attribute for 'object_to_patch'
   2667     # will raise AttributeError if it does not exist
   2668     getattr(object_to_patch, attr_name)
   2669 
   2670     # keep a copy of the old value
   2671     attr_is_local = False
   2672     try:
   2673         old_value = object_to_patch.__dict__[attr_name]
   2674     except (AttributeError, KeyError):
   2675         old_value = getattr(object_to_patch, attr_name, None)
   2676     else:
   2677         attr_is_local = True
   2678 
   2679     # restore the value when the test is done
   2680     def cleanup():
   2681         if attr_is_local:
   2682             setattr(object_to_patch, attr_name, old_value)
   2683         else:
   2684             delattr(object_to_patch, attr_name)
   2685 
   2686     test_instance.addCleanup(cleanup)
   2687 
   2688     # actually override the attribute
   2689     setattr(object_to_patch, attr_name, new_value)
   2690 
   2691 
   2692 def run_in_subinterp(code):
   2693     """
   2694     Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
   2695     module is enabled.
   2696     """
   2697     # Issue #10915, #15751: PyGILState_*() functions don't work with
   2698     # sub-interpreters, the tracemalloc module uses these functions internally
   2699     try:
   2700         import tracemalloc
   2701     except ImportError:
   2702         pass
   2703     else:
   2704         if tracemalloc.is_tracing():
   2705             raise unittest.SkipTest("run_in_subinterp() cannot be used "
   2706                                      "if tracemalloc module is tracing "
   2707                                      "memory allocations")
   2708     import _testcapi
   2709     return _testcapi.run_in_subinterp(code)
   2710 
   2711 
   2712 def check_free_after_iterating(test, iter, cls, args=()):
   2713     class A(cls):
   2714         def __del__(self):
   2715             nonlocal done
   2716             done = True
   2717             try:
   2718                 next(it)
   2719             except StopIteration:
   2720                 pass
   2721 
   2722     done = False
   2723     it = iter(A(*args))
   2724     # Issue 26494: Shouldn't crash
   2725     test.assertRaises(StopIteration, next, it)
   2726     # The sequence should be deallocated just after the end of iterating
   2727     gc_collect()
   2728     test.assertTrue(done)
   2729 
   2730 
   2731 def missing_compiler_executable(cmd_names=[]):
   2732     """Check if the compiler components used to build the interpreter exist.
   2733 
   2734     Check for the existence of the compiler executables whose names are listed
   2735     in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
   2736     and return the first missing executable or None when none is found
   2737     missing.
   2738 
   2739     """
   2740     from distutils import ccompiler, sysconfig, spawn
   2741     compiler = ccompiler.new_compiler()
   2742     sysconfig.customize_compiler(compiler)
   2743     for name in compiler.executables:
   2744         if cmd_names and name not in cmd_names:
   2745             continue
   2746         cmd = getattr(compiler, name)
   2747         if cmd_names:
   2748             assert cmd is not None, \
   2749                     "the '%s' executable is not configured" % name
   2750         elif cmd is None:
   2751             continue
   2752         if spawn.find_executable(cmd[0]) is None:
   2753             return cmd[0]
   2754 
   2755 
   2756 _is_android_emulator = None
   2757 def setswitchinterval(interval):
   2758     # Setting a very low gil interval on the Android emulator causes python
   2759     # to hang (issue #26939).
   2760     minimum_interval = 1e-5
   2761     if is_android and interval < minimum_interval:
   2762         global _is_android_emulator
   2763         if _is_android_emulator is None:
   2764             _is_android_emulator = (subprocess.check_output(
   2765                                ['getprop', 'ro.kernel.qemu']).strip() == b'1')
   2766         if _is_android_emulator:
   2767             interval = minimum_interval
   2768     return sys.setswitchinterval(interval)
   2769 
   2770 
   2771 @contextlib.contextmanager
   2772 def disable_faulthandler():
   2773     # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
   2774     # sys.stderr with a StringIO which has no file descriptor when a test
   2775     # is run with -W/--verbose3.
   2776     fd = sys.__stderr__.fileno()
   2777 
   2778     is_enabled = faulthandler.is_enabled()
   2779     try:
   2780         faulthandler.disable()
   2781         yield
   2782     finally:
   2783         if is_enabled:
   2784             faulthandler.enable(file=fd, all_threads=True)
   2785 
   2786 
   2787 def fd_count():
   2788     """Count the number of open file descriptors.
   2789     """
   2790     if sys.platform.startswith(('linux', 'freebsd')):
   2791         try:
   2792             names = os.listdir("/proc/self/fd")
   2793             # Substract one because listdir() opens internally a file
   2794             # descriptor to list the content of the /proc/self/fd/ directory.
   2795             return len(names) - 1
   2796         except FileNotFoundError:
   2797             pass
   2798 
   2799     MAXFD = 256
   2800     if hasattr(os, 'sysconf'):
   2801         try:
   2802             MAXFD = os.sysconf("SC_OPEN_MAX")
   2803         except OSError:
   2804             pass
   2805 
   2806     old_modes = None
   2807     if sys.platform == 'win32':
   2808         # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
   2809         # on invalid file descriptor if Python is compiled in debug mode
   2810         try:
   2811             import msvcrt
   2812             msvcrt.CrtSetReportMode
   2813         except (AttributeError, ImportError):
   2814             # no msvcrt or a release build
   2815             pass
   2816         else:
   2817             old_modes = {}
   2818             for report_type in (msvcrt.CRT_WARN,
   2819                                 msvcrt.CRT_ERROR,
   2820                                 msvcrt.CRT_ASSERT):
   2821                 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0)
   2822 
   2823     try:
   2824         count = 0
   2825         for fd in range(MAXFD):
   2826             try:
   2827                 # Prefer dup() over fstat(). fstat() can require input/output
   2828                 # whereas dup() doesn't.
   2829                 fd2 = os.dup(fd)
   2830             except OSError as e:
   2831                 if e.errno != errno.EBADF:
   2832                     raise
   2833             else:
   2834                 os.close(fd2)
   2835                 count += 1
   2836     finally:
   2837         if old_modes is not None:
   2838             for report_type in (msvcrt.CRT_WARN,
   2839                                 msvcrt.CRT_ERROR,
   2840                                 msvcrt.CRT_ASSERT):
   2841                 msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
   2842 
   2843     return count
   2844 
   2845 
   2846 class SaveSignals:
   2847     """
   2848     Save an restore signal handlers.
   2849 
   2850     This class is only able to save/restore signal handlers registered
   2851     by the Python signal module: see bpo-13285 for "external" signal
   2852     handlers.
   2853     """
   2854 
   2855     def __init__(self):
   2856         import signal
   2857         self.signal = signal
   2858         self.signals = list(range(1, signal.NSIG))
   2859         # SIGKILL and SIGSTOP signals cannot be ignored nor caught
   2860         for signame in ('SIGKILL', 'SIGSTOP'):
   2861             try:
   2862                 signum = getattr(signal, signame)
   2863             except AttributeError:
   2864                 continue
   2865             self.signals.remove(signum)
   2866         self.handlers = {}
   2867 
   2868     def save(self):
   2869         for signum in self.signals:
   2870             handler = self.signal.getsignal(signum)
   2871             if handler is None:
   2872                 # getsignal() returns None if a signal handler was not
   2873                 # registered by the Python signal module,
   2874                 # and the handler is not SIG_DFL nor SIG_IGN.
   2875                 #
   2876                 # Ignore the signal: we cannot restore the handler.
   2877                 continue
   2878             self.handlers[signum] = handler
   2879 
   2880     def restore(self):
   2881         for signum, handler in self.handlers.items():
   2882             self.signal.signal(signum, handler)
   2883 
   2884 
   2885 def with_pymalloc():
   2886     import _testcapi
   2887     return _testcapi.WITH_PYMALLOC
   2888 
   2889 
   2890 class FakePath:
   2891     """Simple implementing of the path protocol.
   2892     """
   2893     def __init__(self, path):
   2894         self.path = path
   2895 
   2896     def __repr__(self):
   2897         return f'<FakePath {self.path!r}>'
   2898 
   2899     def __fspath__(self):
   2900         if (isinstance(self.path, BaseException) or
   2901             isinstance(self.path, type) and
   2902                 issubclass(self.path, BaseException)):
   2903             raise self.path
   2904         else:
   2905             return self.path
   2906