Home | History | Annotate | Download | only in support
      1 """Supporting definitions for the Python regression tests."""
      2 
      3 if __name__ != 'test.support':
      4     raise ImportError('support must be imported from the test package')
      5 
      6 import collections.abc
      7 import contextlib
      8 import errno
      9 import faulthandler
     10 import fnmatch
     11 import functools
     12 import gc
     13 import importlib
     14 import importlib.util
     15 import logging.handlers
     16 import nntplib
     17 import os
     18 import platform
     19 import re
     20 import shutil
     21 import socket
     22 import stat
     23 import struct
     24 import subprocess
     25 import sys
     26 import sysconfig
     27 import tempfile
     28 import time
     29 import types
     30 import unittest
     31 import urllib.error
     32 import warnings
     33 
     34 try:
     35     import _thread, threading
     36 except ImportError:
     37     _thread = None
     38     threading = None
     39 try:
     40     import multiprocessing.process
     41 except ImportError:
     42     multiprocessing = None
     43 
     44 try:
     45     import zlib
     46 except ImportError:
     47     zlib = None
     48 
     49 try:
     50     import gzip
     51 except ImportError:
     52     gzip = None
     53 
     54 try:
     55     import bz2
     56 except ImportError:
     57     bz2 = None
     58 
     59 try:
     60     import lzma
     61 except ImportError:
     62     lzma = None
     63 
     64 try:
     65     import resource
     66 except ImportError:
     67     resource = None
     68 
     69 __all__ = [
     70     # globals
     71     "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
     72     # exceptions
     73     "Error", "TestFailed", "ResourceDenied",
     74     # imports
     75     "import_module", "import_fresh_module", "CleanImport",
     76     # modules
     77     "unload", "forget",
     78     # io
     79     "record_original_stdout", "get_original_stdout", "captured_stdout",
     80     "captured_stdin", "captured_stderr",
     81     # filesystem
     82     "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
     83     "create_empty_file", "can_symlink", "fs_is_case_insensitive",
     84     # unittest
     85     "is_resource_enabled", "requires", "requires_freebsd_version",
     86     "requires_linux_version", "requires_mac_ver", "check_syntax_error",
     87     "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
     88     "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest",
     89     "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
     90     "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
     91     "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
     92     "anticipate_failure", "load_package_tests", "detect_api_mismatch",
     93     "check__all__", "requires_android_level", "requires_multiprocessing_queue",
     94     # sys
     95     "is_jython", "is_android", "check_impl_detail", "unix_shell",
     96     "setswitchinterval", "android_not_root",
     97     # network
     98     "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource",
     99     "bind_unix_socket",
    100     # processes
    101     'temp_umask', "reap_children",
    102     # logging
    103     "TestHandler",
    104     # threads
    105     "threading_setup", "threading_cleanup", "reap_threads", "start_threads",
    106     # miscellaneous
    107     "check_warnings", "check_no_resource_warning", "EnvironmentVarGuard",
    108     "run_with_locale", "swap_item",
    109     "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
    110     "run_with_tz", "PGO", "missing_compiler_executable",
    111     ]
    112 
    113 class Error(Exception):
    114     """Base class for regression test exceptions."""
    115 
    116 class TestFailed(Error):
    117     """Test failed."""
    118 
    119 class ResourceDenied(unittest.SkipTest):
    120     """Test skipped because it requested a disallowed resource.
    121 
    122     This is raised when a test calls requires() for a resource that
    123     has not be enabled.  It is used to distinguish between expected
    124     and unexpected skips.
    125     """
    126 
    127 @contextlib.contextmanager
    128 def _ignore_deprecated_imports(ignore=True):
    129     """Context manager to suppress package and module deprecation
    130     warnings when importing them.
    131 
    132     If ignore is False, this context manager has no effect.
    133     """
    134     if ignore:
    135         with warnings.catch_warnings():
    136             warnings.filterwarnings("ignore", ".+ (module|package)",
    137                                     DeprecationWarning)
    138             yield
    139     else:
    140         yield
    141 
    142 
    143 def import_module(name, deprecated=False, *, required_on=()):
    144     """Import and return the module to be tested, raising SkipTest if
    145     it is not available.
    146 
    147     If deprecated is True, any module or package deprecation messages
    148     will be suppressed. If a module is required on a platform but optional for
    149     others, set required_on to an iterable of platform prefixes which will be
    150     compared against sys.platform.
    151     """
    152     with _ignore_deprecated_imports(deprecated):
    153         try:
    154             return importlib.import_module(name)
    155         except ImportError as msg:
    156             if sys.platform.startswith(tuple(required_on)):
    157                 raise
    158             raise unittest.SkipTest(str(msg))
    159 
    160 
    161 def _save_and_remove_module(name, orig_modules):
    162     """Helper function to save and remove a module from sys.modules
    163 
    164     Raise ImportError if the module can't be imported.
    165     """
    166     # try to import the module and raise an error if it can't be imported
    167     if name not in sys.modules:
    168         __import__(name)
    169         del sys.modules[name]
    170     for modname in list(sys.modules):
    171         if modname == name or modname.startswith(name + '.'):
    172             orig_modules[modname] = sys.modules[modname]
    173             del sys.modules[modname]
    174 
    175 def _save_and_block_module(name, orig_modules):
    176     """Helper function to save and block a module in sys.modules
    177 
    178     Return True if the module was in sys.modules, False otherwise.
    179     """
    180     saved = True
    181     try:
    182         orig_modules[name] = sys.modules[name]
    183     except KeyError:
    184         saved = False
    185     sys.modules[name] = None
    186     return saved
    187 
    188 
    189 def anticipate_failure(condition):
    190     """Decorator to mark a test that is known to be broken in some cases
    191 
    192        Any use of this decorator should have a comment identifying the
    193        associated tracker issue.
    194     """
    195     if condition:
    196         return unittest.expectedFailure
    197     return lambda f: f
    198 
    199 def load_package_tests(pkg_dir, loader, standard_tests, pattern):
    200     """Generic load_tests implementation for simple test packages.
    201 
    202     Most packages can implement load_tests using this function as follows:
    203 
    204        def load_tests(*args):
    205            return load_package_tests(os.path.dirname(__file__), *args)
    206     """
    207     if pattern is None:
    208         pattern = "test*"
    209     top_dir = os.path.dirname(              # Lib
    210                   os.path.dirname(              # test
    211                       os.path.dirname(__file__)))   # support
    212     package_tests = loader.discover(start_dir=pkg_dir,
    213                                     top_level_dir=top_dir,
    214                                     pattern=pattern)
    215     standard_tests.addTests(package_tests)
    216     return standard_tests
    217 
    218 
    219 def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
    220     """Import and return a module, deliberately bypassing sys.modules.
    221 
    222     This function imports and returns a fresh copy of the named Python module
    223     by removing the named module from sys.modules before doing the import.
    224     Note that unlike reload, the original module is not affected by
    225     this operation.
    226 
    227     *fresh* is an iterable of additional module names that are also removed
    228     from the sys.modules cache before doing the import.
    229 
    230     *blocked* is an iterable of module names that are replaced with None
    231     in the module cache during the import to ensure that attempts to import
    232     them raise ImportError.
    233 
    234     The named module and any modules named in the *fresh* and *blocked*
    235     parameters are saved before starting the import and then reinserted into
    236     sys.modules when the fresh import is complete.
    237 
    238     Module and package deprecation messages are suppressed during this import
    239     if *deprecated* is True.
    240 
    241     This function will raise ImportError if the named module cannot be
    242     imported.
    243     """
    244     # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
    245     # to make sure that this utility function is working as expected
    246     with _ignore_deprecated_imports(deprecated):
    247         # Keep track of modules saved for later restoration as well
    248         # as those which just need a blocking entry removed
    249         orig_modules = {}
    250         names_to_remove = []
    251         _save_and_remove_module(name, orig_modules)
    252         try:
    253             for fresh_name in fresh:
    254                 _save_and_remove_module(fresh_name, orig_modules)
    255             for blocked_name in blocked:
    256                 if not _save_and_block_module(blocked_name, orig_modules):
    257                     names_to_remove.append(blocked_name)
    258             fresh_module = importlib.import_module(name)
    259         except ImportError:
    260             fresh_module = None
    261         finally:
    262             for orig_name, module in orig_modules.items():
    263                 sys.modules[orig_name] = module
    264             for name_to_remove in names_to_remove:
    265                 del sys.modules[name_to_remove]
    266         return fresh_module
    267 
    268 
    269 def get_attribute(obj, name):
    270     """Get an attribute, raising SkipTest if AttributeError is raised."""
    271     try:
    272         attribute = getattr(obj, name)
    273     except AttributeError:
    274         raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
    275     else:
    276         return attribute
    277 
    278 verbose = 1              # Flag set to 0 by regrtest.py
    279 use_resources = None     # Flag set to [] by regrtest.py
    280 max_memuse = 0           # Disable bigmem tests (they will still be run with
    281                          # small sizes, to make sure they work.)
    282 real_max_memuse = 0
    283 failfast = False
    284 match_tests = None
    285 
    286 # _original_stdout is meant to hold stdout at the time regrtest began.
    287 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
    288 # The point is to have some flavor of stdout the user can actually see.
    289 _original_stdout = None
    290 def record_original_stdout(stdout):
    291     global _original_stdout
    292     _original_stdout = stdout
    293 
    294 def get_original_stdout():
    295     return _original_stdout or sys.stdout
    296 
    297 def unload(name):
    298     try:
    299         del sys.modules[name]
    300     except KeyError:
    301         pass
    302 
    303 def _force_run(path, func, *args):
    304     try:
    305         return func(*args)
    306     except OSError as err:
    307         if verbose >= 2:
    308             print('%s: %s' % (err.__class__.__name__, err))
    309             print('re-run %s%r' % (func.__name__, args))
    310         os.chmod(path, stat.S_IRWXU)
    311         return func(*args)
    312 
    313 if sys.platform.startswith("win"):
    314     def _waitfor(func, pathname, waitall=False):
    315         # Perform the operation
    316         func(pathname)
    317         # Now setup the wait loop
    318         if waitall:
    319             dirname = pathname
    320         else:
    321             dirname, name = os.path.split(pathname)
    322             dirname = dirname or '.'
    323         # Check for `pathname` to be removed from the filesystem.
    324         # The exponential backoff of the timeout amounts to a total
    325         # of ~1 second after which the deletion is probably an error
    326         # anyway.
    327         # Testing on an i7 (at] 4.3GHz shows that usually only 1 iteration is
    328         # required when contention occurs.
    329         timeout = 0.001
    330         while timeout < 1.0:
    331             # Note we are only testing for the existence of the file(s) in
    332             # the contents of the directory regardless of any security or
    333             # access rights.  If we have made it this far, we have sufficient
    334             # permissions to do that much using Python's equivalent of the
    335             # Windows API FindFirstFile.
    336             # Other Windows APIs can fail or give incorrect results when
    337             # dealing with files that are pending deletion.
    338             L = os.listdir(dirname)
    339             if not (L if waitall else name in L):
    340                 return
    341             # Increase the timeout and try again
    342             time.sleep(timeout)
    343             timeout *= 2
    344         warnings.warn('tests may fail, delete still pending for ' + pathname,
    345                       RuntimeWarning, stacklevel=4)
    346 
    347     def _unlink(filename):
    348         _waitfor(os.unlink, filename)
    349 
    350     def _rmdir(dirname):
    351         _waitfor(os.rmdir, dirname)
    352 
    353     def _rmtree(path):
    354         def _rmtree_inner(path):
    355             for name in _force_run(path, os.listdir, path):
    356                 fullname = os.path.join(path, name)
    357                 try:
    358                     mode = os.lstat(fullname).st_mode
    359                 except OSError as exc:
    360                     print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
    361                           file=sys.__stderr__)
    362                     mode = 0
    363                 if stat.S_ISDIR(mode):
    364                     _waitfor(_rmtree_inner, fullname, waitall=True)
    365                     _force_run(fullname, os.rmdir, fullname)
    366                 else:
    367                     _force_run(fullname, os.unlink, fullname)
    368         _waitfor(_rmtree_inner, path, waitall=True)
    369         _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
    370 else:
    371     _unlink = os.unlink
    372     _rmdir = os.rmdir
    373 
    374     def _rmtree(path):
    375         try:
    376             shutil.rmtree(path)
    377             return
    378         except OSError:
    379             pass
    380 
    381         def _rmtree_inner(path):
    382             for name in _force_run(path, os.listdir, path):
    383                 fullname = os.path.join(path, name)
    384                 try:
    385                     mode = os.lstat(fullname).st_mode
    386                 except OSError:
    387                     mode = 0
    388                 if stat.S_ISDIR(mode):
    389                     _rmtree_inner(fullname)
    390                     _force_run(path, os.rmdir, fullname)
    391                 else:
    392                     _force_run(path, os.unlink, fullname)
    393         _rmtree_inner(path)
    394         os.rmdir(path)
    395 
    396 def unlink(filename):
    397     try:
    398         _unlink(filename)
    399     except (FileNotFoundError, NotADirectoryError):
    400         pass
    401 
    402 def rmdir(dirname):
    403     try:
    404         _rmdir(dirname)
    405     except FileNotFoundError:
    406         pass
    407 
    408 def rmtree(path):
    409     try:
    410         _rmtree(path)
    411     except FileNotFoundError:
    412         pass
    413 
    414 def make_legacy_pyc(source):
    415     """Move a PEP 3147/488 pyc file to its legacy pyc location.
    416 
    417     :param source: The file system path to the source file.  The source file
    418         does not need to exist, however the PEP 3147/488 pyc file must exist.
    419     :return: The file system path to the legacy pyc file.
    420     """
    421     pyc_file = importlib.util.cache_from_source(source)
    422     up_one = os.path.dirname(os.path.abspath(source))
    423     legacy_pyc = os.path.join(up_one, source + 'c')
    424     os.rename(pyc_file, legacy_pyc)
    425     return legacy_pyc
    426 
    427 def forget(modname):
    428     """'Forget' a module was ever imported.
    429 
    430     This removes the module from sys.modules and deletes any PEP 3147/488 or
    431     legacy .pyc files.
    432     """
    433     unload(modname)
    434     for dirname in sys.path:
    435         source = os.path.join(dirname, modname + '.py')
    436         # It doesn't matter if they exist or not, unlink all possible
    437         # combinations of PEP 3147/488 and legacy pyc files.
    438         unlink(source + 'c')
    439         for opt in ('', 1, 2):
    440             unlink(importlib.util.cache_from_source(source, optimization=opt))
    441 
    442 # Check whether a gui is actually available
    443 def _is_gui_available():
    444     if hasattr(_is_gui_available, 'result'):
    445         return _is_gui_available.result
    446     reason = None
    447     if sys.platform.startswith('win'):
    448         # if Python is running as a service (such as the buildbot service),
    449         # gui interaction may be disallowed
    450         import ctypes
    451         import ctypes.wintypes
    452         UOI_FLAGS = 1
    453         WSF_VISIBLE = 0x0001
    454         class USEROBJECTFLAGS(ctypes.Structure):
    455             _fields_ = [("fInherit", ctypes.wintypes.BOOL),
    456                         ("fReserved", ctypes.wintypes.BOOL),
    457                         ("dwFlags", ctypes.wintypes.DWORD)]
    458         dll = ctypes.windll.user32
    459         h = dll.GetProcessWindowStation()
    460         if not h:
    461             raise ctypes.WinError()
    462         uof = USEROBJECTFLAGS()
    463         needed = ctypes.wintypes.DWORD()
    464         res = dll.GetUserObjectInformationW(h,
    465             UOI_FLAGS,
    466             ctypes.byref(uof),
    467             ctypes.sizeof(uof),
    468             ctypes.byref(needed))
    469         if not res:
    470             raise ctypes.WinError()
    471         if not bool(uof.dwFlags & WSF_VISIBLE):
    472             reason = "gui not available (WSF_VISIBLE flag not set)"
    473     elif sys.platform == 'darwin':
    474         # The Aqua Tk implementations on OS X can abort the process if
    475         # being called in an environment where a window server connection
    476         # cannot be made, for instance when invoked by a buildbot or ssh
    477         # process not running under the same user id as the current console
    478         # user.  To avoid that, raise an exception if the window manager
    479         # connection is not available.
    480         from ctypes import cdll, c_int, pointer, Structure
    481         from ctypes.util import find_library
    482 
    483         app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
    484 
    485         if app_services.CGMainDisplayID() == 0:
    486             reason = "gui tests cannot run without OS X window manager"
    487         else:
    488             class ProcessSerialNumber(Structure):
    489                 _fields_ = [("highLongOfPSN", c_int),
    490                             ("lowLongOfPSN", c_int)]
    491             psn = ProcessSerialNumber()
    492             psn_p = pointer(psn)
    493             if (  (app_services.GetCurrentProcess(psn_p) < 0) or
    494                   (app_services.SetFrontProcess(psn_p) < 0) ):
    495                 reason = "cannot run without OS X gui process"
    496 
    497     # check on every platform whether tkinter can actually do anything
    498     if not reason:
    499         try:
    500             from tkinter import Tk
    501             root = Tk()
    502             root.withdraw()
    503             root.update()
    504             root.destroy()
    505         except Exception as e:
    506             err_string = str(e)
    507             if len(err_string) > 50:
    508                 err_string = err_string[:50] + ' [...]'
    509             reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
    510                                                            err_string)
    511 
    512     _is_gui_available.reason = reason
    513     _is_gui_available.result = not reason
    514 
    515     return _is_gui_available.result
    516 
    517 def is_resource_enabled(resource):
    518     """Test whether a resource is enabled.
    519 
    520     Known resources are set by regrtest.py.  If not running under regrtest.py,
    521     all resources are assumed enabled unless use_resources has been set.
    522     """
    523     return use_resources is None or resource in use_resources
    524 
    525 def requires(resource, msg=None):
    526     """Raise ResourceDenied if the specified resource is not available."""
    527     if not is_resource_enabled(resource):
    528         if msg is None:
    529             msg = "Use of the %r resource not enabled" % resource
    530         raise ResourceDenied(msg)
    531     if resource == 'gui' and not _is_gui_available():
    532         raise ResourceDenied(_is_gui_available.reason)
    533 
    534 def _requires_unix_version(sysname, min_version):
    535     """Decorator raising SkipTest if the OS is `sysname` and the version is less
    536     than `min_version`.
    537 
    538     For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
    539     the FreeBSD version is less than 7.2.
    540     """
    541     def decorator(func):
    542         @functools.wraps(func)
    543         def wrapper(*args, **kw):
    544             if platform.system() == sysname:
    545                 version_txt = platform.release().split('-', 1)[0]
    546                 try:
    547                     version = tuple(map(int, version_txt.split('.')))
    548                 except ValueError:
    549                     pass
    550                 else:
    551                     if version < min_version:
    552                         min_version_txt = '.'.join(map(str, min_version))
    553                         raise unittest.SkipTest(
    554                             "%s version %s or higher required, not %s"
    555                             % (sysname, min_version_txt, version_txt))
    556             return func(*args, **kw)
    557         wrapper.min_version = min_version
    558         return wrapper
    559     return decorator
    560 
    561 def requires_freebsd_version(*min_version):
    562     """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
    563     less than `min_version`.
    564 
    565     For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
    566     version is less than 7.2.
    567     """
    568     return _requires_unix_version('FreeBSD', min_version)
    569 
    570 def requires_linux_version(*min_version):
    571     """Decorator raising SkipTest if the OS is Linux and the Linux version is
    572     less than `min_version`.
    573 
    574     For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
    575     version is less than 2.6.32.
    576     """
    577     return _requires_unix_version('Linux', min_version)
    578 
    579 def requires_mac_ver(*min_version):
    580     """Decorator raising SkipTest if the OS is Mac OS X and the OS X
    581     version if less than min_version.
    582 
    583     For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
    584     is lesser than 10.5.
    585     """
    586     def decorator(func):
    587         @functools.wraps(func)
    588         def wrapper(*args, **kw):
    589             if sys.platform == 'darwin':
    590                 version_txt = platform.mac_ver()[0]
    591                 try:
    592                     version = tuple(map(int, version_txt.split('.')))
    593                 except ValueError:
    594                     pass
    595                 else:
    596                     if version < min_version:
    597                         min_version_txt = '.'.join(map(str, min_version))
    598                         raise unittest.SkipTest(
    599                             "Mac OS X %s or higher required, not %s"
    600                             % (min_version_txt, version_txt))
    601             return func(*args, **kw)
    602         wrapper.min_version = min_version
    603         return wrapper
    604     return decorator
    605 
    606 
    607 # Don't use "localhost", since resolving it uses the DNS under recent
    608 # Windows versions (see issue #18792).
    609 HOST = "127.0.0.1"
    610 HOSTv6 = "::1"
    611 
    612 
    613 def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
    614     """Returns an unused port that should be suitable for binding.  This is
    615     achieved by creating a temporary socket with the same family and type as
    616     the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
    617     the specified host address (defaults to 0.0.0.0) with the port set to 0,
    618     eliciting an unused ephemeral port from the OS.  The temporary socket is
    619     then closed and deleted, and the ephemeral port is returned.
    620 
    621     Either this method or bind_port() should be used for any tests where a
    622     server socket needs to be bound to a particular port for the duration of
    623     the test.  Which one to use depends on whether the calling code is creating
    624     a python socket, or if an unused port needs to be provided in a constructor
    625     or passed to an external program (i.e. the -accept argument to openssl's
    626     s_server mode).  Always prefer bind_port() over find_unused_port() where
    627     possible.  Hard coded ports should *NEVER* be used.  As soon as a server
    628     socket is bound to a hard coded port, the ability to run multiple instances
    629     of the test simultaneously on the same host is compromised, which makes the
    630     test a ticking time bomb in a buildbot environment. On Unix buildbots, this
    631     may simply manifest as a failed test, which can be recovered from without
    632     intervention in most cases, but on Windows, the entire python process can
    633     completely and utterly wedge, requiring someone to log in to the buildbot
    634     and manually kill the affected process.
    635 
    636     (This is easy to reproduce on Windows, unfortunately, and can be traced to
    637     the SO_REUSEADDR socket option having different semantics on Windows versus
    638     Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
    639     listen and then accept connections on identical host/ports.  An EADDRINUSE
    640     OSError will be raised at some point (depending on the platform and
    641     the order bind and listen were called on each socket).
    642 
    643     However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
    644     will ever be raised when attempting to bind two identical host/ports. When
    645     accept() is called on each socket, the second caller's process will steal
    646     the port from the first caller, leaving them both in an awkwardly wedged
    647     state where they'll no longer respond to any signals or graceful kills, and
    648     must be forcibly killed via OpenProcess()/TerminateProcess().
    649 
    650     The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
    651     instead of SO_REUSEADDR, which effectively affords the same semantics as
    652     SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
    653     Source world compared to Windows ones, this is a common mistake.  A quick
    654     look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
    655     openssl.exe is called with the 's_server' option, for example. See
    656     http://bugs.python.org/issue2550 for more info.  The following site also
    657     has a very thorough description about the implications of both REUSEADDR
    658     and EXCLUSIVEADDRUSE on Windows:
    659     http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
    660 
    661     XXX: although this approach is a vast improvement on previous attempts to
    662     elicit unused ports, it rests heavily on the assumption that the ephemeral
    663     port returned to us by the OS won't immediately be dished back out to some
    664     other process when we close and delete our temporary socket but before our
    665     calling code has a chance to bind the returned port.  We can deal with this
    666     issue if/when we come across it.
    667     """
    668 
    669     tempsock = socket.socket(family, socktype)
    670     port = bind_port(tempsock)
    671     tempsock.close()
    672     del tempsock
    673     return port
    674 
    675 def bind_port(sock, host=HOST):
    676     """Bind the socket to a free port and return the port number.  Relies on
    677     ephemeral ports in order to ensure we are using an unbound port.  This is
    678     important as many tests may be running simultaneously, especially in a
    679     buildbot environment.  This method raises an exception if the sock.family
    680     is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
    681     or SO_REUSEPORT set on it.  Tests should *never* set these socket options
    682     for TCP/IP sockets.  The only case for setting these options is testing
    683     multicasting via multiple UDP sockets.
    684 
    685     Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
    686     on Windows), it will be set on the socket.  This will prevent anyone else
    687     from bind()'ing to our host/port for the duration of the test.
    688     """
    689 
    690     if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
    691         if hasattr(socket, 'SO_REUSEADDR'):
    692             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
    693                 raise TestFailed("tests should never set the SO_REUSEADDR "   \
    694                                  "socket option on TCP/IP sockets!")
    695         if hasattr(socket, 'SO_REUSEPORT'):
    696             try:
    697                 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
    698                     raise TestFailed("tests should never set the SO_REUSEPORT "   \
    699                                      "socket option on TCP/IP sockets!")
    700             except OSError:
    701                 # Python's socket module was compiled using modern headers
    702                 # thus defining SO_REUSEPORT but this process is running
    703                 # under an older kernel that does not support SO_REUSEPORT.
    704                 pass
    705         if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
    706             sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
    707 
    708     sock.bind((host, 0))
    709     port = sock.getsockname()[1]
    710     return port
    711 
    712 def bind_unix_socket(sock, addr):
    713     """Bind a unix socket, raising SkipTest if PermissionError is raised."""
    714     assert sock.family == socket.AF_UNIX
    715     try:
    716         sock.bind(addr)
    717     except PermissionError:
    718         sock.close()
    719         raise unittest.SkipTest('cannot bind AF_UNIX sockets')
    720 
    721 def _is_ipv6_enabled():
    722     """Check whether IPv6 is enabled on this host."""
    723     if socket.has_ipv6:
    724         sock = None
    725         try:
    726             sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
    727             sock.bind((HOSTv6, 0))
    728             return True
    729         except OSError:
    730             pass
    731         finally:
    732             if sock:
    733                 sock.close()
    734     return False
    735 
    736 IPV6_ENABLED = _is_ipv6_enabled()
    737 
    738 def system_must_validate_cert(f):
    739     """Skip the test on TLS certificate validation failures."""
    740     @functools.wraps(f)
    741     def dec(*args, **kwargs):
    742         try:
    743             f(*args, **kwargs)
    744         except IOError as e:
    745             if "CERTIFICATE_VERIFY_FAILED" in str(e):
    746                 raise unittest.SkipTest("system does not contain "
    747                                         "necessary certificates")
    748             raise
    749     return dec
    750 
    751 # A constant likely larger than the underlying OS pipe buffer size, to
    752 # make writes blocking.
    753 # Windows limit seems to be around 512 B, and many Unix kernels have a
    754 # 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
    755 # (see issue #17835 for a discussion of this number).
    756 PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
    757 
    758 # A constant likely larger than the underlying OS socket buffer size, to make
    759 # writes blocking.
    760 # The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
    761 # on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
    762 # for a discussion of this number).
    763 SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
    764 
    765 # decorator for skipping tests on non-IEEE 754 platforms
    766 requires_IEEE_754 = unittest.skipUnless(
    767     float.__getformat__("double").startswith("IEEE"),
    768     "test requires IEEE 754 doubles")
    769 
    770 requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
    771 
    772 requires_gzip = unittest.skipUnless(gzip, 'requires gzip')
    773 
    774 requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
    775 
    776 requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
    777 
    778 is_jython = sys.platform.startswith('java')
    779 
    780 _ANDROID_API_LEVEL = sysconfig.get_config_var('ANDROID_API_LEVEL')
    781 is_android = (_ANDROID_API_LEVEL is not None and _ANDROID_API_LEVEL > 0)
    782 android_not_root = (is_android and os.geteuid() != 0)
    783 
    784 if sys.platform != 'win32':
    785     unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
    786 else:
    787     unix_shell = None
    788 
    789 # Filename used for testing
    790 if os.name == 'java':
    791     # Jython disallows @ in module names
    792     TESTFN = '$test'
    793 else:
    794     TESTFN = '@test'
    795 
    796 # Disambiguate TESTFN for parallel testing, while letting it remain a valid
    797 # module name.
    798 TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
    799 
    800 # FS_NONASCII: non-ASCII character encodable by os.fsencode(),
    801 # or None if there is no such character.
    802 FS_NONASCII = None
    803 for character in (
    804     # First try printable and common characters to have a readable filename.
    805     # For each character, the encoding list are just example of encodings able
    806     # to encode the character (the list is not exhaustive).
    807 
    808     # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
    809     '\u00E6',
    810     # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
    811     '\u0130',
    812     # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
    813     '\u0141',
    814     # U+03C6 (Greek Small Letter Phi): cp1253
    815     '\u03C6',
    816     # U+041A (Cyrillic Capital Letter Ka): cp1251
    817     '\u041A',
    818     # U+05D0 (Hebrew Letter Alef): Encodable to cp424
    819     '\u05D0',
    820     # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
    821     '\u060C',
    822     # U+062A (Arabic Letter Teh): cp720
    823     '\u062A',
    824     # U+0E01 (Thai Character Ko Kai): cp874
    825     '\u0E01',
    826 
    827     # Then try more "special" characters. "special" because they may be
    828     # interpreted or displayed differently depending on the exact locale
    829     # encoding and the font.
    830 
    831     # U+00A0 (No-Break Space)
    832     '\u00A0',
    833     # U+20AC (Euro Sign)
    834     '\u20AC',
    835 ):
    836     try:
    837         os.fsdecode(os.fsencode(character))
    838     except UnicodeError:
    839         pass
    840     else:
    841         FS_NONASCII = character
    842         break
    843 
    844 # TESTFN_UNICODE is a non-ascii filename
    845 TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
    846 if sys.platform == 'darwin':
    847     # In Mac OS X's VFS API file names are, by definition, canonically
    848     # decomposed Unicode, encoded using UTF-8. See QA1173:
    849     # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
    850     import unicodedata
    851     TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
    852 TESTFN_ENCODING = sys.getfilesystemencoding()
    853 
    854 # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
    855 # encoded by the filesystem encoding (in strict mode). It can be None if we
    856 # cannot generate such filename.
    857 TESTFN_UNENCODABLE = None
    858 if os.name == 'nt':
    859     # skip win32s (0) or Windows 9x/ME (1)
    860     if sys.getwindowsversion().platform >= 2:
    861         # Different kinds of characters from various languages to minimize the
    862         # probability that the whole name is encodable to MBCS (issue #9819)
    863         TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
    864         try:
    865             TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
    866         except UnicodeEncodeError:
    867             pass
    868         else:
    869             print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
    870                   'Unicode filename tests may not be effective'
    871                   % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
    872             TESTFN_UNENCODABLE = None
    873 # Mac OS X denies unencodable filenames (invalid utf-8)
    874 elif sys.platform != 'darwin':
    875     try:
    876         # ascii and utf-8 cannot encode the byte 0xff
    877         b'\xff'.decode(TESTFN_ENCODING)
    878     except UnicodeDecodeError:
    879         # 0xff will be encoded using the surrogate character u+DCFF
    880         TESTFN_UNENCODABLE = TESTFN \
    881             + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
    882     else:
    883         # File system encoding (eg. ISO-8859-* encodings) can encode
    884         # the byte 0xff. Skip some unicode filename tests.
    885         pass
    886 
    887 # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
    888 # decoded from the filesystem encoding (in strict mode). It can be None if we
    889 # cannot generate such filename (ex: the latin1 encoding can decode any byte
    890 # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
    891 # to the surrogateescape error handler (PEP 383), but not from the filesystem
    892 # encoding in strict mode.
    893 TESTFN_UNDECODABLE = None
    894 for name in (
    895     # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
    896     # accepts it to create a file or a directory, or don't accept to enter to
    897     # such directory (when the bytes name is used). So test b'\xe7' first: it is
    898     # not decodable from cp932.
    899     b'\xe7w\xf0',
    900     # undecodable from ASCII, UTF-8
    901     b'\xff',
    902     # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
    903     # and cp857
    904     b'\xae\xd5'
    905     # undecodable from UTF-8 (UNIX and Mac OS X)
    906     b'\xed\xb2\x80', b'\xed\xb4\x80',
    907     # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
    908     # cp1253, cp1254, cp1255, cp1257, cp1258
    909     b'\x81\x98',
    910 ):
    911     try:
    912         name.decode(TESTFN_ENCODING)
    913     except UnicodeDecodeError:
    914         TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
    915         break
    916 
    917 if FS_NONASCII:
    918     TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
    919 else:
    920     TESTFN_NONASCII = None
    921 
    922 # Save the initial cwd
    923 SAVEDCWD = os.getcwd()
    924 
    925 # Set by libregrtest/main.py so we can skip tests that are not
    926 # useful for PGO
    927 PGO = False
    928 
    929 @contextlib.contextmanager
    930 def temp_dir(path=None, quiet=False):
    931     """Return a context manager that creates a temporary directory.
    932 
    933     Arguments:
    934 
    935       path: the directory to create temporarily.  If omitted or None,
    936         defaults to creating a temporary directory using tempfile.mkdtemp.
    937 
    938       quiet: if False (the default), the context manager raises an exception
    939         on error.  Otherwise, if the path is specified and cannot be
    940         created, only a warning is issued.
    941 
    942     """
    943     dir_created = False
    944     if path is None:
    945         path = tempfile.mkdtemp()
    946         dir_created = True
    947         path = os.path.realpath(path)
    948     else:
    949         try:
    950             os.mkdir(path)
    951             dir_created = True
    952         except OSError:
    953             if not quiet:
    954                 raise
    955             warnings.warn('tests may fail, unable to create temp dir: ' + path,
    956                           RuntimeWarning, stacklevel=3)
    957     try:
    958         yield path
    959     finally:
    960         if dir_created:
    961             rmtree(path)
    962 
    963 @contextlib.contextmanager
    964 def change_cwd(path, quiet=False):
    965     """Return a context manager that changes the current working directory.
    966 
    967     Arguments:
    968 
    969       path: the directory to use as the temporary current working directory.
    970 
    971       quiet: if False (the default), the context manager raises an exception
    972         on error.  Otherwise, it issues only a warning and keeps the current
    973         working directory the same.
    974 
    975     """
    976     saved_dir = os.getcwd()
    977     try:
    978         os.chdir(path)
    979     except OSError:
    980         if not quiet:
    981             raise
    982         warnings.warn('tests may fail, unable to change CWD to: ' + path,
    983                       RuntimeWarning, stacklevel=3)
    984     try:
    985         yield os.getcwd()
    986     finally:
    987         os.chdir(saved_dir)
    988 
    989 
    990 @contextlib.contextmanager
    991 def temp_cwd(name='tempcwd', quiet=False):
    992     """
    993     Context manager that temporarily creates and changes the CWD.
    994 
    995     The function temporarily changes the current working directory
    996     after creating a temporary directory in the current directory with
    997     name *name*.  If *name* is None, the temporary directory is
    998     created using tempfile.mkdtemp.
    999 
   1000     If *quiet* is False (default) and it is not possible to
   1001     create or change the CWD, an error is raised.  If *quiet* is True,
   1002     only a warning is raised and the original CWD is used.
   1003 
   1004     """
   1005     with temp_dir(path=name, quiet=quiet) as temp_path:
   1006         with change_cwd(temp_path, quiet=quiet) as cwd_dir:
   1007             yield cwd_dir
   1008 
   1009 if hasattr(os, "umask"):
   1010     @contextlib.contextmanager
   1011     def temp_umask(umask):
   1012         """Context manager that temporarily sets the process umask."""
   1013         oldmask = os.umask(umask)
   1014         try:
   1015             yield
   1016         finally:
   1017             os.umask(oldmask)
   1018 
   1019 # TEST_HOME_DIR refers to the top level directory of the "test" package
   1020 # that contains Python's regression test suite
   1021 TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
   1022 TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
   1023 
   1024 # TEST_DATA_DIR is used as a target download location for remote resources
   1025 TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
   1026 
   1027 def findfile(filename, subdir=None):
   1028     """Try to find a file on sys.path or in the test directory.  If it is not
   1029     found the argument passed to the function is returned (this does not
   1030     necessarily signal failure; could still be the legitimate path).
   1031 
   1032     Setting *subdir* indicates a relative path to use to find the file
   1033     rather than looking directly in the path directories.
   1034     """
   1035     if os.path.isabs(filename):
   1036         return filename
   1037     if subdir is not None:
   1038         filename = os.path.join(subdir, filename)
   1039     path = [TEST_HOME_DIR] + sys.path
   1040     for dn in path:
   1041         fn = os.path.join(dn, filename)
   1042         if os.path.exists(fn): return fn
   1043     return filename
   1044 
   1045 def create_empty_file(filename):
   1046     """Create an empty file. If the file already exists, truncate it."""
   1047     fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
   1048     os.close(fd)
   1049 
   1050 def sortdict(dict):
   1051     "Like repr(dict), but in sorted order."
   1052     items = sorted(dict.items())
   1053     reprpairs = ["%r: %r" % pair for pair in items]
   1054     withcommas = ", ".join(reprpairs)
   1055     return "{%s}" % withcommas
   1056 
   1057 def make_bad_fd():
   1058     """
   1059     Create an invalid file descriptor by opening and closing a file and return
   1060     its fd.
   1061     """
   1062     file = open(TESTFN, "wb")
   1063     try:
   1064         return file.fileno()
   1065     finally:
   1066         file.close()
   1067         unlink(TESTFN)
   1068 
   1069 def check_syntax_error(testcase, statement, *, lineno=None, offset=None):
   1070     with testcase.assertRaises(SyntaxError) as cm:
   1071         compile(statement, '<test string>', 'exec')
   1072     err = cm.exception
   1073     testcase.assertIsNotNone(err.lineno)
   1074     if lineno is not None:
   1075         testcase.assertEqual(err.lineno, lineno)
   1076     testcase.assertIsNotNone(err.offset)
   1077     if offset is not None:
   1078         testcase.assertEqual(err.offset, offset)
   1079 
   1080 def open_urlresource(url, *args, **kw):
   1081     import urllib.request, urllib.parse
   1082 
   1083     check = kw.pop('check', None)
   1084 
   1085     filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
   1086 
   1087     fn = os.path.join(TEST_DATA_DIR, filename)
   1088 
   1089     def check_valid_file(fn):
   1090         f = open(fn, *args, **kw)
   1091         if check is None:
   1092             return f
   1093         elif check(f):
   1094             f.seek(0)
   1095             return f
   1096         f.close()
   1097 
   1098     if os.path.exists(fn):
   1099         f = check_valid_file(fn)
   1100         if f is not None:
   1101             return f
   1102         unlink(fn)
   1103 
   1104     # Verify the requirement before downloading the file
   1105     requires('urlfetch')
   1106 
   1107     if verbose:
   1108         print('\tfetching %s ...' % url, file=get_original_stdout())
   1109     opener = urllib.request.build_opener()
   1110     if gzip:
   1111         opener.addheaders.append(('Accept-Encoding', 'gzip'))
   1112     f = opener.open(url, timeout=15)
   1113     if gzip and f.headers.get('Content-Encoding') == 'gzip':
   1114         f = gzip.GzipFile(fileobj=f)
   1115     try:
   1116         with open(fn, "wb") as out:
   1117             s = f.read()
   1118             while s:
   1119                 out.write(s)
   1120                 s = f.read()
   1121     finally:
   1122         f.close()
   1123 
   1124     f = check_valid_file(fn)
   1125     if f is not None:
   1126         return f
   1127     raise TestFailed('invalid resource %r' % fn)
   1128 
   1129 
   1130 class WarningsRecorder(object):
   1131     """Convenience wrapper for the warnings list returned on
   1132        entry to the warnings.catch_warnings() context manager.
   1133     """
   1134     def __init__(self, warnings_list):
   1135         self._warnings = warnings_list
   1136         self._last = 0
   1137 
   1138     def __getattr__(self, attr):
   1139         if len(self._warnings) > self._last:
   1140             return getattr(self._warnings[-1], attr)
   1141         elif attr in warnings.WarningMessage._WARNING_DETAILS:
   1142             return None
   1143         raise AttributeError("%r has no attribute %r" % (self, attr))
   1144 
   1145     @property
   1146     def warnings(self):
   1147         return self._warnings[self._last:]
   1148 
   1149     def reset(self):
   1150         self._last = len(self._warnings)
   1151 
   1152 
   1153 def _filterwarnings(filters, quiet=False):
   1154     """Catch the warnings, then check if all the expected
   1155     warnings have been raised and re-raise unexpected warnings.
   1156     If 'quiet' is True, only re-raise the unexpected warnings.
   1157     """
   1158     # Clear the warning registry of the calling module
   1159     # in order to re-raise the warnings.
   1160     frame = sys._getframe(2)
   1161     registry = frame.f_globals.get('__warningregistry__')
   1162     if registry:
   1163         registry.clear()
   1164     with warnings.catch_warnings(record=True) as w:
   1165         # Set filter "always" to record all warnings.  Because
   1166         # test_warnings swap the module, we need to look up in
   1167         # the sys.modules dictionary.
   1168         sys.modules['warnings'].simplefilter("always")
   1169         yield WarningsRecorder(w)
   1170     # Filter the recorded warnings
   1171     reraise = list(w)
   1172     missing = []
   1173     for msg, cat in filters:
   1174         seen = False
   1175         for w in reraise[:]:
   1176             warning = w.message
   1177             # Filter out the matching messages
   1178             if (re.match(msg, str(warning), re.I) and
   1179                 issubclass(warning.__class__, cat)):
   1180                 seen = True
   1181                 reraise.remove(w)
   1182         if not seen and not quiet:
   1183             # This filter caught nothing
   1184             missing.append((msg, cat.__name__))
   1185     if reraise:
   1186         raise AssertionError("unhandled warning %s" % reraise[0])
   1187     if missing:
   1188         raise AssertionError("filter (%r, %s) did not catch any warning" %
   1189                              missing[0])
   1190 
   1191 
   1192 @contextlib.contextmanager
   1193 def check_warnings(*filters, **kwargs):
   1194     """Context manager to silence warnings.
   1195 
   1196     Accept 2-tuples as positional arguments:
   1197         ("message regexp", WarningCategory)
   1198 
   1199     Optional argument:
   1200      - if 'quiet' is True, it does not fail if a filter catches nothing
   1201         (default True without argument,
   1202          default False if some filters are defined)
   1203 
   1204     Without argument, it defaults to:
   1205         check_warnings(("", Warning), quiet=True)
   1206     """
   1207     quiet = kwargs.get('quiet')
   1208     if not filters:
   1209         filters = (("", Warning),)
   1210         # Preserve backward compatibility
   1211         if quiet is None:
   1212             quiet = True
   1213     return _filterwarnings(filters, quiet)
   1214 
   1215 
   1216 @contextlib.contextmanager
   1217 def check_no_resource_warning(testcase):
   1218     """Context manager to check that no ResourceWarning is emitted.
   1219 
   1220     Usage:
   1221 
   1222         with check_no_resource_warning(self):
   1223             f = open(...)
   1224             ...
   1225             del f
   1226 
   1227     You must remove the object which may emit ResourceWarning before
   1228     the end of the context manager.
   1229     """
   1230     with warnings.catch_warnings(record=True) as warns:
   1231         warnings.filterwarnings('always', category=ResourceWarning)
   1232         yield
   1233         gc_collect()
   1234     testcase.assertEqual(warns, [])
   1235 
   1236 
   1237 class CleanImport(object):
   1238     """Context manager to force import to return a new module reference.
   1239 
   1240     This is useful for testing module-level behaviours, such as
   1241     the emission of a DeprecationWarning on import.
   1242 
   1243     Use like this:
   1244 
   1245         with CleanImport("foo"):
   1246             importlib.import_module("foo") # new reference
   1247     """
   1248 
   1249     def __init__(self, *module_names):
   1250         self.original_modules = sys.modules.copy()
   1251         for module_name in module_names:
   1252             if module_name in sys.modules:
   1253                 module = sys.modules[module_name]
   1254                 # It is possible that module_name is just an alias for
   1255                 # another module (e.g. stub for modules renamed in 3.x).
   1256                 # In that case, we also need delete the real module to clear
   1257                 # the import cache.
   1258                 if module.__name__ != module_name:
   1259                     del sys.modules[module.__name__]
   1260                 del sys.modules[module_name]
   1261 
   1262     def __enter__(self):
   1263         return self
   1264 
   1265     def __exit__(self, *ignore_exc):
   1266         sys.modules.update(self.original_modules)
   1267 
   1268 
   1269 class EnvironmentVarGuard(collections.abc.MutableMapping):
   1270 
   1271     """Class to help protect the environment variable properly.  Can be used as
   1272     a context manager."""
   1273 
   1274     def __init__(self):
   1275         self._environ = os.environ
   1276         self._changed = {}
   1277 
   1278     def __getitem__(self, envvar):
   1279         return self._environ[envvar]
   1280 
   1281     def __setitem__(self, envvar, value):
   1282         # Remember the initial value on the first access
   1283         if envvar not in self._changed:
   1284             self._changed[envvar] = self._environ.get(envvar)
   1285         self._environ[envvar] = value
   1286 
   1287     def __delitem__(self, envvar):
   1288         # Remember the initial value on the first access
   1289         if envvar not in self._changed:
   1290             self._changed[envvar] = self._environ.get(envvar)
   1291         if envvar in self._environ:
   1292             del self._environ[envvar]
   1293 
   1294     def keys(self):
   1295         return self._environ.keys()
   1296 
   1297     def __iter__(self):
   1298         return iter(self._environ)
   1299 
   1300     def __len__(self):
   1301         return len(self._environ)
   1302 
   1303     def set(self, envvar, value):
   1304         self[envvar] = value
   1305 
   1306     def unset(self, envvar):
   1307         del self[envvar]
   1308 
   1309     def __enter__(self):
   1310         return self
   1311 
   1312     def __exit__(self, *ignore_exc):
   1313         for (k, v) in self._changed.items():
   1314             if v is None:
   1315                 if k in self._environ:
   1316                     del self._environ[k]
   1317             else:
   1318                 self._environ[k] = v
   1319         os.environ = self._environ
   1320 
   1321 
   1322 class DirsOnSysPath(object):
   1323     """Context manager to temporarily add directories to sys.path.
   1324 
   1325     This makes a copy of sys.path, appends any directories given
   1326     as positional arguments, then reverts sys.path to the copied
   1327     settings when the context ends.
   1328 
   1329     Note that *all* sys.path modifications in the body of the
   1330     context manager, including replacement of the object,
   1331     will be reverted at the end of the block.
   1332     """
   1333 
   1334     def __init__(self, *paths):
   1335         self.original_value = sys.path[:]
   1336         self.original_object = sys.path
   1337         sys.path.extend(paths)
   1338 
   1339     def __enter__(self):
   1340         return self
   1341 
   1342     def __exit__(self, *ignore_exc):
   1343         sys.path = self.original_object
   1344         sys.path[:] = self.original_value
   1345 
   1346 
   1347 class TransientResource(object):
   1348 
   1349     """Raise ResourceDenied if an exception is raised while the context manager
   1350     is in effect that matches the specified exception and attributes."""
   1351 
   1352     def __init__(self, exc, **kwargs):
   1353         self.exc = exc
   1354         self.attrs = kwargs
   1355 
   1356     def __enter__(self):
   1357         return self
   1358 
   1359     def __exit__(self, type_=None, value=None, traceback=None):
   1360         """If type_ is a subclass of self.exc and value has attributes matching
   1361         self.attrs, raise ResourceDenied.  Otherwise let the exception
   1362         propagate (if any)."""
   1363         if type_ is not None and issubclass(self.exc, type_):
   1364             for attr, attr_value in self.attrs.items():
   1365                 if not hasattr(value, attr):
   1366                     break
   1367                 if getattr(value, attr) != attr_value:
   1368                     break
   1369             else:
   1370                 raise ResourceDenied("an optional resource is not available")
   1371 
   1372 # Context managers that raise ResourceDenied when various issues
   1373 # with the Internet connection manifest themselves as exceptions.
   1374 # XXX deprecate these and use transient_internet() instead
   1375 time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
   1376 socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
   1377 ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
   1378 
   1379 
   1380 @contextlib.contextmanager
   1381 def transient_internet(resource_name, *, timeout=30.0, errnos=()):
   1382     """Return a context manager that raises ResourceDenied when various issues
   1383     with the Internet connection manifest themselves as exceptions."""
   1384     default_errnos = [
   1385         ('ECONNREFUSED', 111),
   1386         ('ECONNRESET', 104),
   1387         ('EHOSTUNREACH', 113),
   1388         ('ENETUNREACH', 101),
   1389         ('ETIMEDOUT', 110),
   1390     ]
   1391     default_gai_errnos = [
   1392         ('EAI_AGAIN', -3),
   1393         ('EAI_FAIL', -4),
   1394         ('EAI_NONAME', -2),
   1395         ('EAI_NODATA', -5),
   1396         # Encountered when trying to resolve IPv6-only hostnames
   1397         ('WSANO_DATA', 11004),
   1398     ]
   1399 
   1400     denied = ResourceDenied("Resource %r is not available" % resource_name)
   1401     captured_errnos = errnos
   1402     gai_errnos = []
   1403     if not captured_errnos:
   1404         captured_errnos = [getattr(errno, name, num)
   1405                            for (name, num) in default_errnos]
   1406         gai_errnos = [getattr(socket, name, num)
   1407                       for (name, num) in default_gai_errnos]
   1408 
   1409     def filter_error(err):
   1410         n = getattr(err, 'errno', None)
   1411         if (isinstance(err, socket.timeout) or
   1412             (isinstance(err, socket.gaierror) and n in gai_errnos) or
   1413             (isinstance(err, urllib.error.HTTPError) and
   1414              500 <= err.code <= 599) or
   1415             (isinstance(err, urllib.error.URLError) and
   1416                  (("ConnectionRefusedError" in err.reason) or
   1417                   ("TimeoutError" in err.reason) or
   1418                   ("EOFError" in err.reason))) or
   1419             n in captured_errnos):
   1420             if not verbose:
   1421                 sys.stderr.write(denied.args[0] + "\n")
   1422             raise denied from err
   1423 
   1424     old_timeout = socket.getdefaulttimeout()
   1425     try:
   1426         if timeout is not None:
   1427             socket.setdefaulttimeout(timeout)
   1428         yield
   1429     except nntplib.NNTPTemporaryError as err:
   1430         if verbose:
   1431             sys.stderr.write(denied.args[0] + "\n")
   1432         raise denied from err
   1433     except OSError as err:
   1434         # urllib can wrap original socket errors multiple times (!), we must
   1435         # unwrap to get at the original error.
   1436         while True:
   1437             a = err.args
   1438             if len(a) >= 1 and isinstance(a[0], OSError):
   1439                 err = a[0]
   1440             # The error can also be wrapped as args[1]:
   1441             #    except socket.error as msg:
   1442             #        raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
   1443             elif len(a) >= 2 and isinstance(a[1], OSError):
   1444                 err = a[1]
   1445             else:
   1446                 break
   1447         filter_error(err)
   1448         raise
   1449     # XXX should we catch generic exceptions and look for their
   1450     # __cause__ or __context__?
   1451     finally:
   1452         socket.setdefaulttimeout(old_timeout)
   1453 
   1454 
   1455 @contextlib.contextmanager
   1456 def captured_output(stream_name):
   1457     """Return a context manager used by captured_stdout/stdin/stderr
   1458     that temporarily replaces the sys stream *stream_name* with a StringIO."""
   1459     import io
   1460     orig_stdout = getattr(sys, stream_name)
   1461     setattr(sys, stream_name, io.StringIO())
   1462     try:
   1463         yield getattr(sys, stream_name)
   1464     finally:
   1465         setattr(sys, stream_name, orig_stdout)
   1466 
   1467 def captured_stdout():
   1468     """Capture the output of sys.stdout:
   1469 
   1470        with captured_stdout() as stdout:
   1471            print("hello")
   1472        self.assertEqual(stdout.getvalue(), "hello\\n")
   1473     """
   1474     return captured_output("stdout")
   1475 
   1476 def captured_stderr():
   1477     """Capture the output of sys.stderr:
   1478 
   1479        with captured_stderr() as stderr:
   1480            print("hello", file=sys.stderr)
   1481        self.assertEqual(stderr.getvalue(), "hello\\n")
   1482     """
   1483     return captured_output("stderr")
   1484 
   1485 def captured_stdin():
   1486     """Capture the input to sys.stdin:
   1487 
   1488        with captured_stdin() as stdin:
   1489            stdin.write('hello\\n')
   1490            stdin.seek(0)
   1491            # call test code that consumes from sys.stdin
   1492            captured = input()
   1493        self.assertEqual(captured, "hello")
   1494     """
   1495     return captured_output("stdin")
   1496 
   1497 
   1498 def gc_collect():
   1499     """Force as many objects as possible to be collected.
   1500 
   1501     In non-CPython implementations of Python, this is needed because timely
   1502     deallocation is not guaranteed by the garbage collector.  (Even in CPython
   1503     this can be the case in case of reference cycles.)  This means that __del__
   1504     methods may be called later than expected and weakrefs may remain alive for
   1505     longer than expected.  This function tries its best to force all garbage
   1506     objects to disappear.
   1507     """
   1508     gc.collect()
   1509     if is_jython:
   1510         time.sleep(0.1)
   1511     gc.collect()
   1512     gc.collect()
   1513 
   1514 @contextlib.contextmanager
   1515 def disable_gc():
   1516     have_gc = gc.isenabled()
   1517     gc.disable()
   1518     try:
   1519         yield
   1520     finally:
   1521         if have_gc:
   1522             gc.enable()
   1523 
   1524 
   1525 def python_is_optimized():
   1526     """Find if Python was built with optimizations."""
   1527     cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
   1528     final_opt = ""
   1529     for opt in cflags.split():
   1530         if opt.startswith('-O'):
   1531             final_opt = opt
   1532     return final_opt not in ('', '-O0', '-Og')
   1533 
   1534 
   1535 _header = 'nP'
   1536 _align = '0n'
   1537 if hasattr(sys, "gettotalrefcount"):
   1538     _header = '2P' + _header
   1539     _align = '0P'
   1540 _vheader = _header + 'n'
   1541 
   1542 def calcobjsize(fmt):
   1543     return struct.calcsize(_header + fmt + _align)
   1544 
   1545 def calcvobjsize(fmt):
   1546     return struct.calcsize(_vheader + fmt + _align)
   1547 
   1548 
   1549 _TPFLAGS_HAVE_GC = 1<<14
   1550 _TPFLAGS_HEAPTYPE = 1<<9
   1551 
   1552 def check_sizeof(test, o, size):
   1553     import _testcapi
   1554     result = sys.getsizeof(o)
   1555     # add GC header size
   1556     if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
   1557         ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
   1558         size += _testcapi.SIZEOF_PYGC_HEAD
   1559     msg = 'wrong size for %s: got %d, expected %d' \
   1560             % (type(o), result, size)
   1561     test.assertEqual(result, size, msg)
   1562 
   1563 #=======================================================================
   1564 # Decorator for running a function in a different locale, correctly resetting
   1565 # it afterwards.
   1566 
   1567 def run_with_locale(catstr, *locales):
   1568     def decorator(func):
   1569         def inner(*args, **kwds):
   1570             try:
   1571                 import locale
   1572                 category = getattr(locale, catstr)
   1573                 orig_locale = locale.setlocale(category)
   1574             except AttributeError:
   1575                 # if the test author gives us an invalid category string
   1576                 raise
   1577             except:
   1578                 # cannot retrieve original locale, so do nothing
   1579                 locale = orig_locale = None
   1580             else:
   1581                 for loc in locales:
   1582                     try:
   1583                         locale.setlocale(category, loc)
   1584                         break
   1585                     except:
   1586                         pass
   1587 
   1588             # now run the function, resetting the locale on exceptions
   1589             try:
   1590                 return func(*args, **kwds)
   1591             finally:
   1592                 if locale and orig_locale:
   1593                     locale.setlocale(category, orig_locale)
   1594         inner.__name__ = func.__name__
   1595         inner.__doc__ = func.__doc__
   1596         return inner
   1597     return decorator
   1598 
   1599 #=======================================================================
   1600 # Decorator for running a function in a specific timezone, correctly
   1601 # resetting it afterwards.
   1602 
   1603 def run_with_tz(tz):
   1604     def decorator(func):
   1605         def inner(*args, **kwds):
   1606             try:
   1607                 tzset = time.tzset
   1608             except AttributeError:
   1609                 raise unittest.SkipTest("tzset required")
   1610             if 'TZ' in os.environ:
   1611                 orig_tz = os.environ['TZ']
   1612             else:
   1613                 orig_tz = None
   1614             os.environ['TZ'] = tz
   1615             tzset()
   1616 
   1617             # now run the function, resetting the tz on exceptions
   1618             try:
   1619                 return func(*args, **kwds)
   1620             finally:
   1621                 if orig_tz is None:
   1622                     del os.environ['TZ']
   1623                 else:
   1624                     os.environ['TZ'] = orig_tz
   1625                 time.tzset()
   1626 
   1627         inner.__name__ = func.__name__
   1628         inner.__doc__ = func.__doc__
   1629         return inner
   1630     return decorator
   1631 
   1632 #=======================================================================
   1633 # Big-memory-test support. Separate from 'resources' because memory use
   1634 # should be configurable.
   1635 
   1636 # Some handy shorthands. Note that these are used for byte-limits as well
   1637 # as size-limits, in the various bigmem tests
   1638 _1M = 1024*1024
   1639 _1G = 1024 * _1M
   1640 _2G = 2 * _1G
   1641 _4G = 4 * _1G
   1642 
   1643 MAX_Py_ssize_t = sys.maxsize
   1644 
   1645 def set_memlimit(limit):
   1646     global max_memuse
   1647     global real_max_memuse
   1648     sizes = {
   1649         'k': 1024,
   1650         'm': _1M,
   1651         'g': _1G,
   1652         't': 1024*_1G,
   1653     }
   1654     m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
   1655                  re.IGNORECASE | re.VERBOSE)
   1656     if m is None:
   1657         raise ValueError('Invalid memory limit %r' % (limit,))
   1658     memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
   1659     real_max_memuse = memlimit
   1660     if memlimit > MAX_Py_ssize_t:
   1661         memlimit = MAX_Py_ssize_t
   1662     if memlimit < _2G - 1:
   1663         raise ValueError('Memory limit %r too low to be useful' % (limit,))
   1664     max_memuse = memlimit
   1665 
   1666 class _MemoryWatchdog:
   1667     """An object which periodically watches the process' memory consumption
   1668     and prints it out.
   1669     """
   1670 
   1671     def __init__(self):
   1672         self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
   1673         self.started = False
   1674 
   1675     def start(self):
   1676         try:
   1677             f = open(self.procfile, 'r')
   1678         except OSError as e:
   1679             warnings.warn('/proc not available for stats: {}'.format(e),
   1680                           RuntimeWarning)
   1681             sys.stderr.flush()
   1682             return
   1683 
   1684         watchdog_script = findfile("memory_watchdog.py")
   1685         self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
   1686                                              stdin=f, stderr=subprocess.DEVNULL)
   1687         f.close()
   1688         self.started = True
   1689 
   1690     def stop(self):
   1691         if self.started:
   1692             self.mem_watchdog.terminate()
   1693             self.mem_watchdog.wait()
   1694 
   1695 
   1696 def bigmemtest(size, memuse, dry_run=True):
   1697     """Decorator for bigmem tests.
   1698 
   1699     'size' is a requested size for the test (in arbitrary, test-interpreted
   1700     units.) 'memuse' is the number of bytes per unit for the test, or a good
   1701     estimate of it. For example, a test that needs two byte buffers, of 4 GiB
   1702     each, could be decorated with @bigmemtest(size=_4G, memuse=2).
   1703 
   1704     The 'size' argument is normally passed to the decorated test method as an
   1705     extra argument. If 'dry_run' is true, the value passed to the test method
   1706     may be less than the requested value. If 'dry_run' is false, it means the
   1707     test doesn't support dummy runs when -M is not specified.
   1708     """
   1709     def decorator(f):
   1710         def wrapper(self):
   1711             size = wrapper.size
   1712             memuse = wrapper.memuse
   1713             if not real_max_memuse:
   1714                 maxsize = 5147
   1715             else:
   1716                 maxsize = size
   1717 
   1718             if ((real_max_memuse or not dry_run)
   1719                 and real_max_memuse < maxsize * memuse):
   1720                 raise unittest.SkipTest(
   1721                     "not enough memory: %.1fG minimum needed"
   1722                     % (size * memuse / (1024 ** 3)))
   1723 
   1724             if real_max_memuse and verbose:
   1725                 print()
   1726                 print(" ... expected peak memory use: {peak:.1f}G"
   1727                       .format(peak=size * memuse / (1024 ** 3)))
   1728                 watchdog = _MemoryWatchdog()
   1729                 watchdog.start()
   1730             else:
   1731                 watchdog = None
   1732 
   1733             try:
   1734                 return f(self, maxsize)
   1735             finally:
   1736                 if watchdog:
   1737                     watchdog.stop()
   1738 
   1739         wrapper.size = size
   1740         wrapper.memuse = memuse
   1741         return wrapper
   1742     return decorator
   1743 
   1744 def bigaddrspacetest(f):
   1745     """Decorator for tests that fill the address space."""
   1746     def wrapper(self):
   1747         if max_memuse < MAX_Py_ssize_t:
   1748             if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
   1749                 raise unittest.SkipTest(
   1750                     "not enough memory: try a 32-bit build instead")
   1751             else:
   1752                 raise unittest.SkipTest(
   1753                     "not enough memory: %.1fG minimum needed"
   1754                     % (MAX_Py_ssize_t / (1024 ** 3)))
   1755         else:
   1756             return f(self)
   1757     return wrapper
   1758 
   1759 #=======================================================================
   1760 # unittest integration.
   1761 
   1762 class BasicTestRunner:
   1763     def run(self, test):
   1764         result = unittest.TestResult()
   1765         test(result)
   1766         return result
   1767 
   1768 def _id(obj):
   1769     return obj
   1770 
   1771 def requires_resource(resource):
   1772     if resource == 'gui' and not _is_gui_available():
   1773         return unittest.skip(_is_gui_available.reason)
   1774     if is_resource_enabled(resource):
   1775         return _id
   1776     else:
   1777         return unittest.skip("resource {0!r} is not enabled".format(resource))
   1778 
   1779 def requires_android_level(level, reason):
   1780     if is_android and _ANDROID_API_LEVEL < level:
   1781         return unittest.skip('%s at Android API level %d' %
   1782                              (reason, _ANDROID_API_LEVEL))
   1783     else:
   1784         return _id
   1785 
   1786 def cpython_only(test):
   1787     """
   1788     Decorator for tests only applicable on CPython.
   1789     """
   1790     return impl_detail(cpython=True)(test)
   1791 
   1792 def impl_detail(msg=None, **guards):
   1793     if check_impl_detail(**guards):
   1794         return _id
   1795     if msg is None:
   1796         guardnames, default = _parse_guards(guards)
   1797         if default:
   1798             msg = "implementation detail not available on {0}"
   1799         else:
   1800             msg = "implementation detail specific to {0}"
   1801         guardnames = sorted(guardnames.keys())
   1802         msg = msg.format(' or '.join(guardnames))
   1803     return unittest.skip(msg)
   1804 
   1805 _have_mp_queue = None
   1806 def requires_multiprocessing_queue(test):
   1807     """Skip decorator for tests that use multiprocessing.Queue."""
   1808     global _have_mp_queue
   1809     if _have_mp_queue is None:
   1810         import multiprocessing
   1811         # Without a functioning shared semaphore implementation attempts to
   1812         # instantiate a Queue will result in an ImportError (issue #3770).
   1813         try:
   1814             multiprocessing.Queue()
   1815             _have_mp_queue = True
   1816         except ImportError:
   1817             _have_mp_queue = False
   1818     msg = "requires a functioning shared semaphore implementation"
   1819     return test if _have_mp_queue else unittest.skip(msg)(test)
   1820 
   1821 def _parse_guards(guards):
   1822     # Returns a tuple ({platform_name: run_me}, default_value)
   1823     if not guards:
   1824         return ({'cpython': True}, False)
   1825     is_true = list(guards.values())[0]
   1826     assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
   1827     return (guards, not is_true)
   1828 
   1829 # Use the following check to guard CPython's implementation-specific tests --
   1830 # or to run them only on the implementation(s) guarded by the arguments.
   1831 def check_impl_detail(**guards):
   1832     """This function returns True or False depending on the host platform.
   1833        Examples:
   1834           if check_impl_detail():               # only on CPython (default)
   1835           if check_impl_detail(jython=True):    # only on Jython
   1836           if check_impl_detail(cpython=False):  # everywhere except on CPython
   1837     """
   1838     guards, default = _parse_guards(guards)
   1839     return guards.get(platform.python_implementation().lower(), default)
   1840 
   1841 
   1842 def no_tracing(func):
   1843     """Decorator to temporarily turn off tracing for the duration of a test."""
   1844     if not hasattr(sys, 'gettrace'):
   1845         return func
   1846     else:
   1847         @functools.wraps(func)
   1848         def wrapper(*args, **kwargs):
   1849             original_trace = sys.gettrace()
   1850             try:
   1851                 sys.settrace(None)
   1852                 return func(*args, **kwargs)
   1853             finally:
   1854                 sys.settrace(original_trace)
   1855         return wrapper
   1856 
   1857 
   1858 def refcount_test(test):
   1859     """Decorator for tests which involve reference counting.
   1860 
   1861     To start, the decorator does not run the test if is not run by CPython.
   1862     After that, any trace function is unset during the test to prevent
   1863     unexpected refcounts caused by the trace function.
   1864 
   1865     """
   1866     return no_tracing(cpython_only(test))
   1867 
   1868 
   1869 def _filter_suite(suite, pred):
   1870     """Recursively filter test cases in a suite based on a predicate."""
   1871     newtests = []
   1872     for test in suite._tests:
   1873         if isinstance(test, unittest.TestSuite):
   1874             _filter_suite(test, pred)
   1875             newtests.append(test)
   1876         else:
   1877             if pred(test):
   1878                 newtests.append(test)
   1879     suite._tests = newtests
   1880 
   1881 def _run_suite(suite):
   1882     """Run tests from a unittest.TestSuite-derived class."""
   1883     if verbose:
   1884         runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
   1885                                          failfast=failfast)
   1886     else:
   1887         runner = BasicTestRunner()
   1888 
   1889     result = runner.run(suite)
   1890     if not result.wasSuccessful():
   1891         if len(result.errors) == 1 and not result.failures:
   1892             err = result.errors[0][1]
   1893         elif len(result.failures) == 1 and not result.errors:
   1894             err = result.failures[0][1]
   1895         else:
   1896             err = "multiple errors occurred"
   1897             if not verbose: err += "; run in verbose mode for details"
   1898         raise TestFailed(err)
   1899 
   1900 
   1901 def run_unittest(*classes):
   1902     """Run tests from unittest.TestCase-derived classes."""
   1903     valid_types = (unittest.TestSuite, unittest.TestCase)
   1904     suite = unittest.TestSuite()
   1905     for cls in classes:
   1906         if isinstance(cls, str):
   1907             if cls in sys.modules:
   1908                 suite.addTest(unittest.findTestCases(sys.modules[cls]))
   1909             else:
   1910                 raise ValueError("str arguments must be keys in sys.modules")
   1911         elif isinstance(cls, valid_types):
   1912             suite.addTest(cls)
   1913         else:
   1914             suite.addTest(unittest.makeSuite(cls))
   1915     def case_pred(test):
   1916         if match_tests is None:
   1917             return True
   1918         for name in test.id().split("."):
   1919             if fnmatch.fnmatchcase(name, match_tests):
   1920                 return True
   1921         return False
   1922     _filter_suite(suite, case_pred)
   1923     _run_suite(suite)
   1924 
   1925 #=======================================================================
   1926 # Check for the presence of docstrings.
   1927 
   1928 # Rather than trying to enumerate all the cases where docstrings may be
   1929 # disabled, we just check for that directly
   1930 
   1931 def _check_docstrings():
   1932     """Just used to check if docstrings are enabled"""
   1933 
   1934 MISSING_C_DOCSTRINGS = (check_impl_detail() and
   1935                         sys.platform != 'win32' and
   1936                         not sysconfig.get_config_var('WITH_DOC_STRINGS'))
   1937 
   1938 HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
   1939                    not MISSING_C_DOCSTRINGS)
   1940 
   1941 requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
   1942                                           "test requires docstrings")
   1943 
   1944 
   1945 #=======================================================================
   1946 # doctest driver.
   1947 
   1948 def run_doctest(module, verbosity=None, optionflags=0):
   1949     """Run doctest on the given module.  Return (#failures, #tests).
   1950 
   1951     If optional argument verbosity is not specified (or is None), pass
   1952     support's belief about verbosity on to doctest.  Else doctest's
   1953     usual behavior is used (it searches sys.argv for -v).
   1954     """
   1955 
   1956     import doctest
   1957 
   1958     if verbosity is None:
   1959         verbosity = verbose
   1960     else:
   1961         verbosity = None
   1962 
   1963     f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
   1964     if f:
   1965         raise TestFailed("%d of %d doctests failed" % (f, t))
   1966     if verbose:
   1967         print('doctest (%s) ... %d tests with zero failures' %
   1968               (module.__name__, t))
   1969     return f, t
   1970 
   1971 
   1972 #=======================================================================
   1973 # Support for saving and restoring the imported modules.
   1974 
   1975 def modules_setup():
   1976     return sys.modules.copy(),
   1977 
   1978 def modules_cleanup(oldmodules):
   1979     # Encoders/decoders are registered permanently within the internal
   1980     # codec cache. If we destroy the corresponding modules their
   1981     # globals will be set to None which will trip up the cached functions.
   1982     encodings = [(k, v) for k, v in sys.modules.items()
   1983                  if k.startswith('encodings.')]
   1984     sys.modules.clear()
   1985     sys.modules.update(encodings)
   1986     # XXX: This kind of problem can affect more than just encodings. In particular
   1987     # extension modules (such as _ssl) don't cope with reloading properly.
   1988     # Really, test modules should be cleaning out the test specific modules they
   1989     # know they added (ala test_runpy) rather than relying on this function (as
   1990     # test_importhooks and test_pkg do currently).
   1991     # Implicitly imported *real* modules should be left alone (see issue 10556).
   1992     sys.modules.update(oldmodules)
   1993 
   1994 #=======================================================================
   1995 # Threading support to prevent reporting refleaks when running regrtest.py -R
   1996 
   1997 # NOTE: we use thread._count() rather than threading.enumerate() (or the
   1998 # moral equivalent thereof) because a threading.Thread object is still alive
   1999 # until its __bootstrap() method has returned, even after it has been
   2000 # unregistered from the threading module.
   2001 # thread._count(), on the other hand, only gets decremented *after* the
   2002 # __bootstrap() method has returned, which gives us reliable reference counts
   2003 # at the end of a test run.
   2004 
   2005 def threading_setup():
   2006     if _thread:
   2007         return _thread._count(), threading._dangling.copy()
   2008     else:
   2009         return 1, ()
   2010 
   2011 def threading_cleanup(*original_values):
   2012     if not _thread:
   2013         return
   2014     _MAX_COUNT = 100
   2015     for count in range(_MAX_COUNT):
   2016         values = _thread._count(), threading._dangling
   2017         if values == original_values:
   2018             break
   2019         time.sleep(0.01)
   2020         gc_collect()
   2021     # XXX print a warning in case of failure?
   2022 
   2023 def reap_threads(func):
   2024     """Use this function when threads are being used.  This will
   2025     ensure that the threads are cleaned up even when the test fails.
   2026     If threading is unavailable this function does nothing.
   2027     """
   2028     if not _thread:
   2029         return func
   2030 
   2031     @functools.wraps(func)
   2032     def decorator(*args):
   2033         key = threading_setup()
   2034         try:
   2035             return func(*args)
   2036         finally:
   2037             threading_cleanup(*key)
   2038     return decorator
   2039 
   2040 def reap_children():
   2041     """Use this function at the end of test_main() whenever sub-processes
   2042     are started.  This will help ensure that no extra children (zombies)
   2043     stick around to hog resources and create problems when looking
   2044     for refleaks.
   2045     """
   2046 
   2047     # Reap all our dead child processes so we don't leave zombies around.
   2048     # These hog resources and might be causing some of the buildbots to die.
   2049     if hasattr(os, 'waitpid'):
   2050         any_process = -1
   2051         while True:
   2052             try:
   2053                 # This will raise an exception on Windows.  That's ok.
   2054                 pid, status = os.waitpid(any_process, os.WNOHANG)
   2055                 if pid == 0:
   2056                     break
   2057             except:
   2058                 break
   2059 
   2060 @contextlib.contextmanager
   2061 def start_threads(threads, unlock=None):
   2062     threads = list(threads)
   2063     started = []
   2064     try:
   2065         try:
   2066             for t in threads:
   2067                 t.start()
   2068                 started.append(t)
   2069         except:
   2070             if verbose:
   2071                 print("Can't start %d threads, only %d threads started" %
   2072                       (len(threads), len(started)))
   2073             raise
   2074         yield
   2075     finally:
   2076         try:
   2077             if unlock:
   2078                 unlock()
   2079             endtime = starttime = time.time()
   2080             for timeout in range(1, 16):
   2081                 endtime += 60
   2082                 for t in started:
   2083                     t.join(max(endtime - time.time(), 0.01))
   2084                 started = [t for t in started if t.isAlive()]
   2085                 if not started:
   2086                     break
   2087                 if verbose:
   2088                     print('Unable to join %d threads during a period of '
   2089                           '%d minutes' % (len(started), timeout))
   2090         finally:
   2091             started = [t for t in started if t.isAlive()]
   2092             if started:
   2093                 faulthandler.dump_traceback(sys.stdout)
   2094                 raise AssertionError('Unable to join %d threads' % len(started))
   2095 
   2096 @contextlib.contextmanager
   2097 def swap_attr(obj, attr, new_val):
   2098     """Temporary swap out an attribute with a new object.
   2099 
   2100     Usage:
   2101         with swap_attr(obj, "attr", 5):
   2102             ...
   2103 
   2104         This will set obj.attr to 5 for the duration of the with: block,
   2105         restoring the old value at the end of the block. If `attr` doesn't
   2106         exist on `obj`, it will be created and then deleted at the end of the
   2107         block.
   2108     """
   2109     if hasattr(obj, attr):
   2110         real_val = getattr(obj, attr)
   2111         setattr(obj, attr, new_val)
   2112         try:
   2113             yield
   2114         finally:
   2115             setattr(obj, attr, real_val)
   2116     else:
   2117         setattr(obj, attr, new_val)
   2118         try:
   2119             yield
   2120         finally:
   2121             delattr(obj, attr)
   2122 
   2123 @contextlib.contextmanager
   2124 def swap_item(obj, item, new_val):
   2125     """Temporary swap out an item with a new object.
   2126 
   2127     Usage:
   2128         with swap_item(obj, "item", 5):
   2129             ...
   2130 
   2131         This will set obj["item"] to 5 for the duration of the with: block,
   2132         restoring the old value at the end of the block. If `item` doesn't
   2133         exist on `obj`, it will be created and then deleted at the end of the
   2134         block.
   2135     """
   2136     if item in obj:
   2137         real_val = obj[item]
   2138         obj[item] = new_val
   2139         try:
   2140             yield
   2141         finally:
   2142             obj[item] = real_val
   2143     else:
   2144         obj[item] = new_val
   2145         try:
   2146             yield
   2147         finally:
   2148             del obj[item]
   2149 
   2150 def strip_python_stderr(stderr):
   2151     """Strip the stderr of a Python process from potential debug output
   2152     emitted by the interpreter.
   2153 
   2154     This will typically be run on the result of the communicate() method
   2155     of a subprocess.Popen object.
   2156     """
   2157     stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
   2158     return stderr
   2159 
   2160 requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'),
   2161                         'types are immortal if COUNT_ALLOCS is defined')
   2162 
   2163 def args_from_interpreter_flags():
   2164     """Return a list of command-line arguments reproducing the current
   2165     settings in sys.flags and sys.warnoptions."""
   2166     return subprocess._args_from_interpreter_flags()
   2167 
   2168 def optim_args_from_interpreter_flags():
   2169     """Return a list of command-line arguments reproducing the current
   2170     optimization settings in sys.flags."""
   2171     return subprocess._optim_args_from_interpreter_flags()
   2172 
   2173 #============================================================
   2174 # Support for assertions about logging.
   2175 #============================================================
   2176 
   2177 class TestHandler(logging.handlers.BufferingHandler):
   2178     def __init__(self, matcher):
   2179         # BufferingHandler takes a "capacity" argument
   2180         # so as to know when to flush. As we're overriding
   2181         # shouldFlush anyway, we can set a capacity of zero.
   2182         # You can call flush() manually to clear out the
   2183         # buffer.
   2184         logging.handlers.BufferingHandler.__init__(self, 0)
   2185         self.matcher = matcher
   2186 
   2187     def shouldFlush(self):
   2188         return False
   2189 
   2190     def emit(self, record):
   2191         self.format(record)
   2192         self.buffer.append(record.__dict__)
   2193 
   2194     def matches(self, **kwargs):
   2195         """
   2196         Look for a saved dict whose keys/values match the supplied arguments.
   2197         """
   2198         result = False
   2199         for d in self.buffer:
   2200             if self.matcher.matches(d, **kwargs):
   2201                 result = True
   2202                 break
   2203         return result
   2204 
   2205 class Matcher(object):
   2206 
   2207     _partial_matches = ('msg', 'message')
   2208 
   2209     def matches(self, d, **kwargs):
   2210         """
   2211         Try to match a single dict with the supplied arguments.
   2212 
   2213         Keys whose values are strings and which are in self._partial_matches
   2214         will be checked for partial (i.e. substring) matches. You can extend
   2215         this scheme to (for example) do regular expression matching, etc.
   2216         """
   2217         result = True
   2218         for k in kwargs:
   2219             v = kwargs[k]
   2220             dv = d.get(k)
   2221             if not self.match_value(k, dv, v):
   2222                 result = False
   2223                 break
   2224         return result
   2225 
   2226     def match_value(self, k, dv, v):
   2227         """
   2228         Try to match a single stored value (dv) with a supplied value (v).
   2229         """
   2230         if type(v) != type(dv):
   2231             result = False
   2232         elif type(dv) is not str or k not in self._partial_matches:
   2233             result = (v == dv)
   2234         else:
   2235             result = dv.find(v) >= 0
   2236         return result
   2237 
   2238 
   2239 _can_symlink = None
   2240 def can_symlink():
   2241     global _can_symlink
   2242     if _can_symlink is not None:
   2243         return _can_symlink
   2244     symlink_path = TESTFN + "can_symlink"
   2245     try:
   2246         os.symlink(TESTFN, symlink_path)
   2247         can = True
   2248     except (OSError, NotImplementedError, AttributeError):
   2249         can = False
   2250     else:
   2251         os.remove(symlink_path)
   2252     _can_symlink = can
   2253     return can
   2254 
   2255 def skip_unless_symlink(test):
   2256     """Skip decorator for tests that require functional symlink"""
   2257     ok = can_symlink()
   2258     msg = "Requires functional symlink implementation"
   2259     return test if ok else unittest.skip(msg)(test)
   2260 
   2261 _can_xattr = None
   2262 def can_xattr():
   2263     global _can_xattr
   2264     if _can_xattr is not None:
   2265         return _can_xattr
   2266     if not hasattr(os, "setxattr"):
   2267         can = False
   2268     else:
   2269         tmp_fp, tmp_name = tempfile.mkstemp()
   2270         try:
   2271             with open(TESTFN, "wb") as fp:
   2272                 try:
   2273                     # TESTFN & tempfile may use different file systems with
   2274                     # different capabilities
   2275                     os.setxattr(tmp_fp, b"user.test", b"")
   2276                     os.setxattr(fp.fileno(), b"user.test", b"")
   2277                     # Kernels < 2.6.39 don't respect setxattr flags.
   2278                     kernel_version = platform.release()
   2279                     m = re.match(r"2.6.(\d{1,2})", kernel_version)
   2280                     can = m is None or int(m.group(1)) >= 39
   2281                 except OSError:
   2282                     can = False
   2283         finally:
   2284             unlink(TESTFN)
   2285             unlink(tmp_name)
   2286     _can_xattr = can
   2287     return can
   2288 
   2289 def skip_unless_xattr(test):
   2290     """Skip decorator for tests that require functional extended attributes"""
   2291     ok = can_xattr()
   2292     msg = "no non-broken extended attribute support"
   2293     return test if ok else unittest.skip(msg)(test)
   2294 
   2295 
   2296 def fs_is_case_insensitive(directory):
   2297     """Detects if the file system for the specified directory is case-insensitive."""
   2298     with tempfile.NamedTemporaryFile(dir=directory) as base:
   2299         base_path = base.name
   2300         case_path = base_path.upper()
   2301         if case_path == base_path:
   2302             case_path = base_path.lower()
   2303         try:
   2304             return os.path.samefile(base_path, case_path)
   2305         except FileNotFoundError:
   2306             return False
   2307 
   2308 
   2309 def detect_api_mismatch(ref_api, other_api, *, ignore=()):
   2310     """Returns the set of items in ref_api not in other_api, except for a
   2311     defined list of items to be ignored in this check.
   2312 
   2313     By default this skips private attributes beginning with '_' but
   2314     includes all magic methods, i.e. those starting and ending in '__'.
   2315     """
   2316     missing_items = set(dir(ref_api)) - set(dir(other_api))
   2317     if ignore:
   2318         missing_items -= set(ignore)
   2319     missing_items = set(m for m in missing_items
   2320                         if not m.startswith('_') or m.endswith('__'))
   2321     return missing_items
   2322 
   2323 
   2324 def check__all__(test_case, module, name_of_module=None, extra=(),
   2325                  blacklist=()):
   2326     """Assert that the __all__ variable of 'module' contains all public names.
   2327 
   2328     The module's public names (its API) are detected automatically based on
   2329     whether they match the public name convention and were defined in
   2330     'module'.
   2331 
   2332     The 'name_of_module' argument can specify (as a string or tuple thereof)
   2333     what module(s) an API could be defined in in order to be detected as a
   2334     public API. One case for this is when 'module' imports part of its public
   2335     API from other modules, possibly a C backend (like 'csv' and its '_csv').
   2336 
   2337     The 'extra' argument can be a set of names that wouldn't otherwise be
   2338     automatically detected as "public", like objects without a proper
   2339     '__module__' attriubute. If provided, it will be added to the
   2340     automatically detected ones.
   2341 
   2342     The 'blacklist' argument can be a set of names that must not be treated
   2343     as part of the public API even though their names indicate otherwise.
   2344 
   2345     Usage:
   2346         import bar
   2347         import foo
   2348         import unittest
   2349         from test import support
   2350 
   2351         class MiscTestCase(unittest.TestCase):
   2352             def test__all__(self):
   2353                 support.check__all__(self, foo)
   2354 
   2355         class OtherTestCase(unittest.TestCase):
   2356             def test__all__(self):
   2357                 extra = {'BAR_CONST', 'FOO_CONST'}
   2358                 blacklist = {'baz'}  # Undocumented name.
   2359                 # bar imports part of its API from _bar.
   2360                 support.check__all__(self, bar, ('bar', '_bar'),
   2361                                      extra=extra, blacklist=blacklist)
   2362 
   2363     """
   2364 
   2365     if name_of_module is None:
   2366         name_of_module = (module.__name__, )
   2367     elif isinstance(name_of_module, str):
   2368         name_of_module = (name_of_module, )
   2369 
   2370     expected = set(extra)
   2371 
   2372     for name in dir(module):
   2373         if name.startswith('_') or name in blacklist:
   2374             continue
   2375         obj = getattr(module, name)
   2376         if (getattr(obj, '__module__', None) in name_of_module or
   2377                 (not hasattr(obj, '__module__') and
   2378                  not isinstance(obj, types.ModuleType))):
   2379             expected.add(name)
   2380     test_case.assertCountEqual(module.__all__, expected)
   2381 
   2382 
   2383 class SuppressCrashReport:
   2384     """Try to prevent a crash report from popping up.
   2385 
   2386     On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
   2387     disable the creation of coredump file.
   2388     """
   2389     old_value = None
   2390     old_modes = None
   2391 
   2392     def __enter__(self):
   2393         """On Windows, disable Windows Error Reporting dialogs using
   2394         SetErrorMode.
   2395 
   2396         On UNIX, try to save the previous core file size limit, then set
   2397         soft limit to 0.
   2398         """
   2399         if sys.platform.startswith('win'):
   2400             # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
   2401             # GetErrorMode is not available on Windows XP and Windows Server 2003,
   2402             # but SetErrorMode returns the previous value, so we can use that
   2403             import ctypes
   2404             self._k32 = ctypes.windll.kernel32
   2405             SEM_NOGPFAULTERRORBOX = 0x02
   2406             self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
   2407             self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX)
   2408 
   2409             # Suppress assert dialogs in debug builds
   2410             # (see http://bugs.python.org/issue23314)
   2411             try:
   2412                 import msvcrt
   2413                 msvcrt.CrtSetReportMode
   2414             except (AttributeError, ImportError):
   2415                 # no msvcrt or a release build
   2416                 pass
   2417             else:
   2418                 self.old_modes = {}
   2419                 for report_type in [msvcrt.CRT_WARN,
   2420                                     msvcrt.CRT_ERROR,
   2421                                     msvcrt.CRT_ASSERT]:
   2422                     old_mode = msvcrt.CrtSetReportMode(report_type,
   2423                             msvcrt.CRTDBG_MODE_FILE)
   2424                     old_file = msvcrt.CrtSetReportFile(report_type,
   2425                             msvcrt.CRTDBG_FILE_STDERR)
   2426                     self.old_modes[report_type] = old_mode, old_file
   2427 
   2428         else:
   2429             if resource is not None:
   2430                 try:
   2431                     self.old_value = resource.getrlimit(resource.RLIMIT_CORE)
   2432                     resource.setrlimit(resource.RLIMIT_CORE,
   2433                                        (0, self.old_value[1]))
   2434                 except (ValueError, OSError):
   2435                     pass
   2436             if sys.platform == 'darwin':
   2437                 # Check if the 'Crash Reporter' on OSX was configured
   2438                 # in 'Developer' mode and warn that it will get triggered
   2439                 # when it is.
   2440                 #
   2441                 # This assumes that this context manager is used in tests
   2442                 # that might trigger the next manager.
   2443                 value = subprocess.Popen(['/usr/bin/defaults', 'read',
   2444                         'com.apple.CrashReporter', 'DialogType'],
   2445                         stdout=subprocess.PIPE).communicate()[0]
   2446                 if value.strip() == b'developer':
   2447                     print("this test triggers the Crash Reporter, "
   2448                           "that is intentional", end='', flush=True)
   2449 
   2450         return self
   2451 
   2452     def __exit__(self, *ignore_exc):
   2453         """Restore Windows ErrorMode or core file behavior to initial value."""
   2454         if self.old_value is None:
   2455             return
   2456 
   2457         if sys.platform.startswith('win'):
   2458             self._k32.SetErrorMode(self.old_value)
   2459 
   2460             if self.old_modes:
   2461                 import msvcrt
   2462                 for report_type, (old_mode, old_file) in self.old_modes.items():
   2463                     msvcrt.CrtSetReportMode(report_type, old_mode)
   2464                     msvcrt.CrtSetReportFile(report_type, old_file)
   2465         else:
   2466             if resource is not None:
   2467                 try:
   2468                     resource.setrlimit(resource.RLIMIT_CORE, self.old_value)
   2469                 except (ValueError, OSError):
   2470                     pass
   2471 
   2472 
   2473 def patch(test_instance, object_to_patch, attr_name, new_value):
   2474     """Override 'object_to_patch'.'attr_name' with 'new_value'.
   2475 
   2476     Also, add a cleanup procedure to 'test_instance' to restore
   2477     'object_to_patch' value for 'attr_name'.
   2478     The 'attr_name' should be a valid attribute for 'object_to_patch'.
   2479 
   2480     """
   2481     # check that 'attr_name' is a real attribute for 'object_to_patch'
   2482     # will raise AttributeError if it does not exist
   2483     getattr(object_to_patch, attr_name)
   2484 
   2485     # keep a copy of the old value
   2486     attr_is_local = False
   2487     try:
   2488         old_value = object_to_patch.__dict__[attr_name]
   2489     except (AttributeError, KeyError):
   2490         old_value = getattr(object_to_patch, attr_name, None)
   2491     else:
   2492         attr_is_local = True
   2493 
   2494     # restore the value when the test is done
   2495     def cleanup():
   2496         if attr_is_local:
   2497             setattr(object_to_patch, attr_name, old_value)
   2498         else:
   2499             delattr(object_to_patch, attr_name)
   2500 
   2501     test_instance.addCleanup(cleanup)
   2502 
   2503     # actually override the attribute
   2504     setattr(object_to_patch, attr_name, new_value)
   2505 
   2506 
   2507 def run_in_subinterp(code):
   2508     """
   2509     Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
   2510     module is enabled.
   2511     """
   2512     # Issue #10915, #15751: PyGILState_*() functions don't work with
   2513     # sub-interpreters, the tracemalloc module uses these functions internally
   2514     try:
   2515         import tracemalloc
   2516     except ImportError:
   2517         pass
   2518     else:
   2519         if tracemalloc.is_tracing():
   2520             raise unittest.SkipTest("run_in_subinterp() cannot be used "
   2521                                      "if tracemalloc module is tracing "
   2522                                      "memory allocations")
   2523     import _testcapi
   2524     return _testcapi.run_in_subinterp(code)
   2525 
   2526 
   2527 def check_free_after_iterating(test, iter, cls, args=()):
   2528     class A(cls):
   2529         def __del__(self):
   2530             nonlocal done
   2531             done = True
   2532             try:
   2533                 next(it)
   2534             except StopIteration:
   2535                 pass
   2536 
   2537     done = False
   2538     it = iter(A(*args))
   2539     # Issue 26494: Shouldn't crash
   2540     test.assertRaises(StopIteration, next, it)
   2541     # The sequence should be deallocated just after the end of iterating
   2542     gc_collect()
   2543     test.assertTrue(done)
   2544 
   2545 
   2546 def missing_compiler_executable(cmd_names=[]):
   2547     """Check if the compiler components used to build the interpreter exist.
   2548 
   2549     Check for the existence of the compiler executables whose names are listed
   2550     in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
   2551     and return the first missing executable or None when none is found
   2552     missing.
   2553 
   2554     """
   2555     from distutils import ccompiler, sysconfig, spawn
   2556     compiler = ccompiler.new_compiler()
   2557     sysconfig.customize_compiler(compiler)
   2558     for name in compiler.executables:
   2559         if cmd_names and name not in cmd_names:
   2560             continue
   2561         cmd = getattr(compiler, name)
   2562         if cmd_names:
   2563             assert cmd is not None, \
   2564                     "the '%s' executable is not configured" % name
   2565         elif cmd is None:
   2566             continue
   2567         if spawn.find_executable(cmd[0]) is None:
   2568             return cmd[0]
   2569 
   2570 
   2571 _is_android_emulator = None
   2572 def setswitchinterval(interval):
   2573     # Setting a very low gil interval on the Android emulator causes python
   2574     # to hang (issue #26939).
   2575     minimum_interval = 1e-5
   2576     if is_android and interval < minimum_interval:
   2577         global _is_android_emulator
   2578         if _is_android_emulator is None:
   2579             _is_android_emulator = (subprocess.check_output(
   2580                                ['getprop', 'ro.kernel.qemu']).strip() == b'1')
   2581         if _is_android_emulator:
   2582             interval = minimum_interval
   2583     return sys.setswitchinterval(interval)
   2584