Home | History | Annotate | Download | only in test
      1 """Supporting definitions for the Python regression tests."""
      2 
      3 if __name__ != 'test.test_support':
      4     raise ImportError('test_support must be imported from the test package')
      5 
      6 import contextlib
      7 import errno
      8 import functools
      9 import gc
     10 import socket
     11 import stat
     12 import sys
     13 import os
     14 import platform
     15 import shutil
     16 import warnings
     17 import unittest
     18 import importlib
     19 import UserDict
     20 import re
     21 import time
     22 import struct
     23 import sysconfig
     24 try:
     25     import thread
     26 except ImportError:
     27     thread = None
     28 
     29 __all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
     30            "verbose", "use_resources", "max_memuse", "record_original_stdout",
     31            "get_original_stdout", "unload", "unlink", "rmtree", "forget",
     32            "is_resource_enabled", "requires", "requires_mac_ver",
     33            "find_unused_port", "bind_port",
     34            "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
     35            "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
     36            "open_urlresource", "check_warnings", "check_py3k_warnings",
     37            "CleanImport", "EnvironmentVarGuard", "captured_output",
     38            "captured_stdout", "TransientResource", "transient_internet",
     39            "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
     40            "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
     41            "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
     42            "check_impl_detail", "get_attribute", "py3k_bytes",
     43            "import_fresh_module", "threading_cleanup", "reap_children",
     44            "strip_python_stderr", "IPV6_ENABLED", "run_with_tz"]
     45 
     46 class Error(Exception):
     47     """Base class for regression test exceptions."""
     48 
     49 class TestFailed(Error):
     50     """Test failed."""
     51 
     52 class ResourceDenied(unittest.SkipTest):
     53     """Test skipped because it requested a disallowed resource.
     54 
     55     This is raised when a test calls requires() for a resource that
     56     has not been enabled.  It is used to distinguish between expected
     57     and unexpected skips.
     58     """
     59 
     60 @contextlib.contextmanager
     61 def _ignore_deprecated_imports(ignore=True):
     62     """Context manager to suppress package and module deprecation
     63     warnings when importing them.
     64 
     65     If ignore is False, this context manager has no effect."""
     66     if ignore:
     67         with warnings.catch_warnings():
     68             warnings.filterwarnings("ignore", ".+ (module|package)",
     69                                     DeprecationWarning)
     70             yield
     71     else:
     72         yield
     73 
     74 
     75 def import_module(name, deprecated=False):
     76     """Import and return the module to be tested, raising SkipTest if
     77     it is not available.
     78 
     79     If deprecated is True, any module or package deprecation messages
     80     will be suppressed."""
     81     with _ignore_deprecated_imports(deprecated):
     82         try:
     83             return importlib.import_module(name)
     84         except ImportError, msg:
     85             raise unittest.SkipTest(str(msg))
     86 
     87 
     88 def _save_and_remove_module(name, orig_modules):
     89     """Helper function to save and remove a module from sys.modules
     90 
     91        Raise ImportError if the module can't be imported."""
     92     # try to import the module and raise an error if it can't be imported
     93     if name not in sys.modules:
     94         __import__(name)
     95         del sys.modules[name]
     96     for modname in list(sys.modules):
     97         if modname == name or modname.startswith(name + '.'):
     98             orig_modules[modname] = sys.modules[modname]
     99             del sys.modules[modname]
    100 
    101 def _save_and_block_module(name, orig_modules):
    102     """Helper function to save and block a module in sys.modules
    103 
    104        Return True if the module was in sys.modules, False otherwise."""
    105     saved = True
    106     try:
    107         orig_modules[name] = sys.modules[name]
    108     except KeyError:
    109         saved = False
    110     sys.modules[name] = None
    111     return saved
    112 
    113 
    114 def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
    115     """Imports and returns a module, deliberately bypassing the sys.modules cache
    116     and importing a fresh copy of the module. Once the import is complete,
    117     the sys.modules cache is restored to its original state.
    118 
    119     Modules named in fresh are also imported anew if needed by the import.
    120     If one of these modules can't be imported, None is returned.
    121 
    122     Importing of modules named in blocked is prevented while the fresh import
    123     takes place.
    124 
    125     If deprecated is True, any module or package deprecation messages
    126     will be suppressed."""
    127     # NOTE: test_heapq, test_json, and test_warnings include extra sanity
    128     # checks to make sure that this utility function is working as expected
    129     with _ignore_deprecated_imports(deprecated):
    130         # Keep track of modules saved for later restoration as well
    131         # as those which just need a blocking entry removed
    132         orig_modules = {}
    133         names_to_remove = []
    134         _save_and_remove_module(name, orig_modules)
    135         try:
    136             for fresh_name in fresh:
    137                 _save_and_remove_module(fresh_name, orig_modules)
    138             for blocked_name in blocked:
    139                 if not _save_and_block_module(blocked_name, orig_modules):
    140                     names_to_remove.append(blocked_name)
    141             fresh_module = importlib.import_module(name)
    142         except ImportError:
    143             fresh_module = None
    144         finally:
    145             for orig_name, module in orig_modules.items():
    146                 sys.modules[orig_name] = module
    147             for name_to_remove in names_to_remove:
    148                 del sys.modules[name_to_remove]
    149         return fresh_module
    150 
    151 
    152 def get_attribute(obj, name):
    153     """Get an attribute, raising SkipTest if AttributeError is raised."""
    154     try:
    155         attribute = getattr(obj, name)
    156     except AttributeError:
    157         raise unittest.SkipTest("module %s has no attribute %s" % (
    158             obj.__name__, name))
    159     else:
    160         return attribute
    161 
    162 
    163 verbose = 1              # Flag set to 0 by regrtest.py
    164 use_resources = None     # Flag set to [] by regrtest.py
    165 max_memuse = 0           # Disable bigmem tests (they will still be run with
    166                          # small sizes, to make sure they work.)
    167 real_max_memuse = 0
    168 
    169 # _original_stdout is meant to hold stdout at the time regrtest began.
    170 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
    171 # The point is to have some flavor of stdout the user can actually see.
    172 _original_stdout = None
    173 def record_original_stdout(stdout):
    174     global _original_stdout
    175     _original_stdout = stdout
    176 
    177 def get_original_stdout():
    178     return _original_stdout or sys.stdout
    179 
    180 def unload(name):
    181     try:
    182         del sys.modules[name]
    183     except KeyError:
    184         pass
    185 
    186 def _force_run(path, func, *args):
    187     try:
    188         return func(*args)
    189     except EnvironmentError as err:
    190         if verbose >= 2:
    191             print('%s: %s' % (err.__class__.__name__, err))
    192             print('re-run %s%r' % (func.__name__, args))
    193         os.chmod(path, stat.S_IRWXU)
    194         return func(*args)
    195 
    196 if sys.platform.startswith("win"):
    197     def _waitfor(func, pathname, waitall=False):
    198         # Perform the operation
    199         func(pathname)
    200         # Now setup the wait loop
    201         if waitall:
    202             dirname = pathname
    203         else:
    204             dirname, name = os.path.split(pathname)
    205             dirname = dirname or '.'
    206         # Check for `pathname` to be removed from the filesystem.
    207         # The exponential backoff of the timeout amounts to a total
    208         # of ~1 second after which the deletion is probably an error
    209         # anyway.
    210         # Testing on an i7 (at] 4.3GHz shows that usually only 1 iteration is
    211         # required when contention occurs.
    212         timeout = 0.001
    213         while timeout < 1.0:
    214             # Note we are only testing for the existence of the file(s) in
    215             # the contents of the directory regardless of any security or
    216             # access rights.  If we have made it this far, we have sufficient
    217             # permissions to do that much using Python's equivalent of the
    218             # Windows API FindFirstFile.
    219             # Other Windows APIs can fail or give incorrect results when
    220             # dealing with files that are pending deletion.
    221             L = os.listdir(dirname)
    222             if not (L if waitall else name in L):
    223                 return
    224             # Increase the timeout and try again
    225             time.sleep(timeout)
    226             timeout *= 2
    227         warnings.warn('tests may fail, delete still pending for ' + pathname,
    228                       RuntimeWarning, stacklevel=4)
    229 
    230     def _unlink(filename):
    231         _waitfor(os.unlink, filename)
    232 
    233     def _rmdir(dirname):
    234         _waitfor(os.rmdir, dirname)
    235 
    236     def _rmtree(path):
    237         def _rmtree_inner(path):
    238             for name in _force_run(path, os.listdir, path):
    239                 fullname = os.path.join(path, name)
    240                 if os.path.isdir(fullname):
    241                     _waitfor(_rmtree_inner, fullname, waitall=True)
    242                     _force_run(fullname, os.rmdir, fullname)
    243                 else:
    244                     _force_run(fullname, os.unlink, fullname)
    245         _waitfor(_rmtree_inner, path, waitall=True)
    246         _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
    247 else:
    248     _unlink = os.unlink
    249     _rmdir = os.rmdir
    250 
    251     def _rmtree(path):
    252         try:
    253             shutil.rmtree(path)
    254             return
    255         except EnvironmentError:
    256             pass
    257 
    258         def _rmtree_inner(path):
    259             for name in _force_run(path, os.listdir, path):
    260                 fullname = os.path.join(path, name)
    261                 try:
    262                     mode = os.lstat(fullname).st_mode
    263                 except EnvironmentError:
    264                     mode = 0
    265                 if stat.S_ISDIR(mode):
    266                     _rmtree_inner(fullname)
    267                     _force_run(path, os.rmdir, fullname)
    268                 else:
    269                     _force_run(path, os.unlink, fullname)
    270         _rmtree_inner(path)
    271         os.rmdir(path)
    272 
    273 def unlink(filename):
    274     try:
    275         _unlink(filename)
    276     except OSError:
    277         pass
    278 
    279 def rmdir(dirname):
    280     try:
    281         _rmdir(dirname)
    282     except OSError as error:
    283         # The directory need not exist.
    284         if error.errno != errno.ENOENT:
    285             raise
    286 
    287 def rmtree(path):
    288     try:
    289         _rmtree(path)
    290     except OSError, e:
    291         # Unix returns ENOENT, Windows returns ESRCH.
    292         if e.errno not in (errno.ENOENT, errno.ESRCH):
    293             raise
    294 
    295 def forget(modname):
    296     '''"Forget" a module was ever imported by removing it from sys.modules and
    297     deleting any .pyc and .pyo files.'''
    298     unload(modname)
    299     for dirname in sys.path:
    300         unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
    301         # Deleting the .pyo file cannot be within the 'try' for the .pyc since
    302         # the chance exists that there is no .pyc (and thus the 'try' statement
    303         # is exited) but there is a .pyo file.
    304         unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
    305 
    306 # Check whether a gui is actually available
    307 def _is_gui_available():
    308     if hasattr(_is_gui_available, 'result'):
    309         return _is_gui_available.result
    310     reason = None
    311     if sys.platform.startswith('win'):
    312         # if Python is running as a service (such as the buildbot service),
    313         # gui interaction may be disallowed
    314         import ctypes
    315         import ctypes.wintypes
    316         UOI_FLAGS = 1
    317         WSF_VISIBLE = 0x0001
    318         class USEROBJECTFLAGS(ctypes.Structure):
    319             _fields_ = [("fInherit", ctypes.wintypes.BOOL),
    320                         ("fReserved", ctypes.wintypes.BOOL),
    321                         ("dwFlags", ctypes.wintypes.DWORD)]
    322         dll = ctypes.windll.user32
    323         h = dll.GetProcessWindowStation()
    324         if not h:
    325             raise ctypes.WinError()
    326         uof = USEROBJECTFLAGS()
    327         needed = ctypes.wintypes.DWORD()
    328         res = dll.GetUserObjectInformationW(h,
    329             UOI_FLAGS,
    330             ctypes.byref(uof),
    331             ctypes.sizeof(uof),
    332             ctypes.byref(needed))
    333         if not res:
    334             raise ctypes.WinError()
    335         if not bool(uof.dwFlags & WSF_VISIBLE):
    336             reason = "gui not available (WSF_VISIBLE flag not set)"
    337     elif sys.platform == 'darwin':
    338         # The Aqua Tk implementations on OS X can abort the process if
    339         # being called in an environment where a window server connection
    340         # cannot be made, for instance when invoked by a buildbot or ssh
    341         # process not running under the same user id as the current console
    342         # user.  To avoid that, raise an exception if the window manager
    343         # connection is not available.
    344         from ctypes import cdll, c_int, pointer, Structure
    345         from ctypes.util import find_library
    346 
    347         app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
    348 
    349         if app_services.CGMainDisplayID() == 0:
    350             reason = "gui tests cannot run without OS X window manager"
    351         else:
    352             class ProcessSerialNumber(Structure):
    353                 _fields_ = [("highLongOfPSN", c_int),
    354                             ("lowLongOfPSN", c_int)]
    355             psn = ProcessSerialNumber()
    356             psn_p = pointer(psn)
    357             if (  (app_services.GetCurrentProcess(psn_p) < 0) or
    358                   (app_services.SetFrontProcess(psn_p) < 0) ):
    359                 reason = "cannot run without OS X gui process"
    360 
    361     # check on every platform whether tkinter can actually do anything
    362     if not reason:
    363         try:
    364             from Tkinter import Tk
    365             root = Tk()
    366             root.withdraw()
    367             root.update()
    368             root.destroy()
    369         except Exception as e:
    370             err_string = str(e)
    371             if len(err_string) > 50:
    372                 err_string = err_string[:50] + ' [...]'
    373             reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
    374                                                            err_string)
    375 
    376     _is_gui_available.reason = reason
    377     _is_gui_available.result = not reason
    378 
    379     return _is_gui_available.result
    380 
    381 def is_resource_enabled(resource):
    382     """Test whether a resource is enabled.
    383 
    384     Known resources are set by regrtest.py.  If not running under regrtest.py,
    385     all resources are assumed enabled unless use_resources has been set.
    386     """
    387     return use_resources is None or resource in use_resources
    388 
    389 def requires(resource, msg=None):
    390     """Raise ResourceDenied if the specified resource is not available."""
    391     if not is_resource_enabled(resource):
    392         if msg is None:
    393             msg = "Use of the `%s' resource not enabled" % resource
    394         raise ResourceDenied(msg)
    395     if resource == 'gui' and not _is_gui_available():
    396         raise ResourceDenied(_is_gui_available.reason)
    397 
    398 def requires_mac_ver(*min_version):
    399     """Decorator raising SkipTest if the OS is Mac OS X and the OS X
    400     version if less than min_version.
    401 
    402     For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
    403     is lesser than 10.5.
    404     """
    405     def decorator(func):
    406         @functools.wraps(func)
    407         def wrapper(*args, **kw):
    408             if sys.platform == 'darwin':
    409                 version_txt = platform.mac_ver()[0]
    410                 try:
    411                     version = tuple(map(int, version_txt.split('.')))
    412                 except ValueError:
    413                     pass
    414                 else:
    415                     if version < min_version:
    416                         min_version_txt = '.'.join(map(str, min_version))
    417                         raise unittest.SkipTest(
    418                             "Mac OS X %s or higher required, not %s"
    419                             % (min_version_txt, version_txt))
    420             return func(*args, **kw)
    421         wrapper.min_version = min_version
    422         return wrapper
    423     return decorator
    424 
    425 
    426 # Don't use "localhost", since resolving it uses the DNS under recent
    427 # Windows versions (see issue #18792).
    428 HOST = "127.0.0.1"
    429 HOSTv6 = "::1"
    430 
    431 
    432 def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
    433     """Returns an unused port that should be suitable for binding.  This is
    434     achieved by creating a temporary socket with the same family and type as
    435     the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
    436     the specified host address (defaults to 0.0.0.0) with the port set to 0,
    437     eliciting an unused ephemeral port from the OS.  The temporary socket is
    438     then closed and deleted, and the ephemeral port is returned.
    439 
    440     Either this method or bind_port() should be used for any tests where a
    441     server socket needs to be bound to a particular port for the duration of
    442     the test.  Which one to use depends on whether the calling code is creating
    443     a python socket, or if an unused port needs to be provided in a constructor
    444     or passed to an external program (i.e. the -accept argument to openssl's
    445     s_server mode).  Always prefer bind_port() over find_unused_port() where
    446     possible.  Hard coded ports should *NEVER* be used.  As soon as a server
    447     socket is bound to a hard coded port, the ability to run multiple instances
    448     of the test simultaneously on the same host is compromised, which makes the
    449     test a ticking time bomb in a buildbot environment. On Unix buildbots, this
    450     may simply manifest as a failed test, which can be recovered from without
    451     intervention in most cases, but on Windows, the entire python process can
    452     completely and utterly wedge, requiring someone to log in to the buildbot
    453     and manually kill the affected process.
    454 
    455     (This is easy to reproduce on Windows, unfortunately, and can be traced to
    456     the SO_REUSEADDR socket option having different semantics on Windows versus
    457     Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
    458     listen and then accept connections on identical host/ports.  An EADDRINUSE
    459     socket.error will be raised at some point (depending on the platform and
    460     the order bind and listen were called on each socket).
    461 
    462     However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
    463     will ever be raised when attempting to bind two identical host/ports. When
    464     accept() is called on each socket, the second caller's process will steal
    465     the port from the first caller, leaving them both in an awkwardly wedged
    466     state where they'll no longer respond to any signals or graceful kills, and
    467     must be forcibly killed via OpenProcess()/TerminateProcess().
    468 
    469     The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
    470     instead of SO_REUSEADDR, which effectively affords the same semantics as
    471     SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
    472     Source world compared to Windows ones, this is a common mistake.  A quick
    473     look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
    474     openssl.exe is called with the 's_server' option, for example. See
    475     http://bugs.python.org/issue2550 for more info.  The following site also
    476     has a very thorough description about the implications of both REUSEADDR
    477     and EXCLUSIVEADDRUSE on Windows:
    478     http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
    479 
    480     XXX: although this approach is a vast improvement on previous attempts to
    481     elicit unused ports, it rests heavily on the assumption that the ephemeral
    482     port returned to us by the OS won't immediately be dished back out to some
    483     other process when we close and delete our temporary socket but before our
    484     calling code has a chance to bind the returned port.  We can deal with this
    485     issue if/when we come across it."""
    486     tempsock = socket.socket(family, socktype)
    487     port = bind_port(tempsock)
    488     tempsock.close()
    489     del tempsock
    490     return port
    491 
    492 def bind_port(sock, host=HOST):
    493     """Bind the socket to a free port and return the port number.  Relies on
    494     ephemeral ports in order to ensure we are using an unbound port.  This is
    495     important as many tests may be running simultaneously, especially in a
    496     buildbot environment.  This method raises an exception if the sock.family
    497     is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
    498     or SO_REUSEPORT set on it.  Tests should *never* set these socket options
    499     for TCP/IP sockets.  The only case for setting these options is testing
    500     multicasting via multiple UDP sockets.
    501 
    502     Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
    503     on Windows), it will be set on the socket.  This will prevent anyone else
    504     from bind()'ing to our host/port for the duration of the test.
    505     """
    506     if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
    507         if hasattr(socket, 'SO_REUSEADDR'):
    508             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
    509                 raise TestFailed("tests should never set the SO_REUSEADDR "   \
    510                                  "socket option on TCP/IP sockets!")
    511         if hasattr(socket, 'SO_REUSEPORT'):
    512             try:
    513                 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
    514                     raise TestFailed("tests should never set the SO_REUSEPORT "   \
    515                                      "socket option on TCP/IP sockets!")
    516             except EnvironmentError:
    517                 # Python's socket module was compiled using modern headers
    518                 # thus defining SO_REUSEPORT but this process is running
    519                 # under an older kernel that does not support SO_REUSEPORT.
    520                 pass
    521         if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
    522             sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
    523 
    524     sock.bind((host, 0))
    525     port = sock.getsockname()[1]
    526     return port
    527 
    528 def _is_ipv6_enabled():
    529     """Check whether IPv6 is enabled on this host."""
    530     if socket.has_ipv6:
    531         sock = None
    532         try:
    533             sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
    534             sock.bind((HOSTv6, 0))
    535             return True
    536         except socket.error:
    537             pass
    538         finally:
    539             if sock:
    540                 sock.close()
    541     return False
    542 
    543 IPV6_ENABLED = _is_ipv6_enabled()
    544 
    545 def system_must_validate_cert(f):
    546     """Skip the test on TLS certificate validation failures."""
    547     @functools.wraps(f)
    548     def dec(*args, **kwargs):
    549         try:
    550             f(*args, **kwargs)
    551         except IOError as e:
    552             if "CERTIFICATE_VERIFY_FAILED" in str(e):
    553                 raise unittest.SkipTest("system does not contain "
    554                                         "necessary certificates")
    555             raise
    556     return dec
    557 
    558 FUZZ = 1e-6
    559 
    560 def fcmp(x, y): # fuzzy comparison function
    561     if isinstance(x, float) or isinstance(y, float):
    562         try:
    563             fuzz = (abs(x) + abs(y)) * FUZZ
    564             if abs(x-y) <= fuzz:
    565                 return 0
    566         except:
    567             pass
    568     elif type(x) == type(y) and isinstance(x, (tuple, list)):
    569         for i in range(min(len(x), len(y))):
    570             outcome = fcmp(x[i], y[i])
    571             if outcome != 0:
    572                 return outcome
    573         return (len(x) > len(y)) - (len(x) < len(y))
    574     return (x > y) - (x < y)
    575 
    576 
    577 # A constant likely larger than the underlying OS pipe buffer size, to
    578 # make writes blocking.
    579 # Windows limit seems to be around 512 B, and many Unix kernels have a
    580 # 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
    581 # (see issue #17835 for a discussion of this number).
    582 PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
    583 
    584 # A constant likely larger than the underlying OS socket buffer size, to make
    585 # writes blocking.
    586 # The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
    587 # on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
    588 # for a discussion of this number).
    589 SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
    590 
    591 is_jython = sys.platform.startswith('java')
    592 
    593 try:
    594     unicode
    595     have_unicode = True
    596 except NameError:
    597     have_unicode = False
    598 
    599 requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support')
    600 
    601 def u(s):
    602     return unicode(s, 'unicode-escape')
    603 
    604 # FS_NONASCII: non-ASCII Unicode character encodable by
    605 # sys.getfilesystemencoding(), or None if there is no such character.
    606 FS_NONASCII = None
    607 if have_unicode:
    608     for character in (
    609         # First try printable and common characters to have a readable filename.
    610         # For each character, the encoding list are just example of encodings able
    611         # to encode the character (the list is not exhaustive).
    612 
    613         # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
    614         unichr(0x00E6),
    615         # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
    616         unichr(0x0130),
    617         # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
    618         unichr(0x0141),
    619         # U+03C6 (Greek Small Letter Phi): cp1253
    620         unichr(0x03C6),
    621         # U+041A (Cyrillic Capital Letter Ka): cp1251
    622         unichr(0x041A),
    623         # U+05D0 (Hebrew Letter Alef): Encodable to cp424
    624         unichr(0x05D0),
    625         # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
    626         unichr(0x060C),
    627         # U+062A (Arabic Letter Teh): cp720
    628         unichr(0x062A),
    629         # U+0E01 (Thai Character Ko Kai): cp874
    630         unichr(0x0E01),
    631 
    632         # Then try more "special" characters. "special" because they may be
    633         # interpreted or displayed differently depending on the exact locale
    634         # encoding and the font.
    635 
    636         # U+00A0 (No-Break Space)
    637         unichr(0x00A0),
    638         # U+20AC (Euro Sign)
    639         unichr(0x20AC),
    640     ):
    641         try:
    642             character.encode(sys.getfilesystemencoding())\
    643                      .decode(sys.getfilesystemencoding())
    644         except UnicodeError:
    645             pass
    646         else:
    647             FS_NONASCII = character
    648             break
    649 
    650 # Filename used for testing
    651 if os.name == 'java':
    652     # Jython disallows @ in module names
    653     TESTFN = '$test'
    654 elif os.name == 'riscos':
    655     TESTFN = 'testfile'
    656 else:
    657     TESTFN = '@test'
    658     # Unicode name only used if TEST_FN_ENCODING exists for the platform.
    659     if have_unicode:
    660         # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
    661         # TESTFN_UNICODE is a filename that can be encoded using the
    662         # file system encoding, but *not* with the default (ascii) encoding
    663         if isinstance('', unicode):
    664             # python -U
    665             # XXX perhaps unicode() should accept Unicode strings?
    666             TESTFN_UNICODE = "@test-\xe0\xf2"
    667         else:
    668             # 2 latin characters.
    669             TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
    670         TESTFN_ENCODING = sys.getfilesystemencoding()
    671         # TESTFN_UNENCODABLE is a filename that should *not* be
    672         # able to be encoded by *either* the default or filesystem encoding.
    673         # This test really only makes sense on Windows NT platforms
    674         # which have special Unicode support in posixmodule.
    675         if (not hasattr(sys, "getwindowsversion") or
    676                 sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
    677             TESTFN_UNENCODABLE = None
    678         else:
    679             # Japanese characters (I think - from bug 846133)
    680             TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
    681             try:
    682                 # XXX - Note - should be using TESTFN_ENCODING here - but for
    683                 # Windows, "mbcs" currently always operates as if in
    684                 # errors=ignore' mode - hence we get '?' characters rather than
    685                 # the exception.  'Latin1' operates as we expect - ie, fails.
    686                 # See [ 850997 ] mbcs encoding ignores errors
    687                 TESTFN_UNENCODABLE.encode("Latin1")
    688             except UnicodeEncodeError:
    689                 pass
    690             else:
    691                 print \
    692                 'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
    693                 'Unicode filename tests may not be effective' \
    694                 % TESTFN_UNENCODABLE
    695 
    696 
    697 # Disambiguate TESTFN for parallel testing, while letting it remain a valid
    698 # module name.
    699 TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
    700 
    701 # Save the initial cwd
    702 SAVEDCWD = os.getcwd()
    703 
    704 @contextlib.contextmanager
    705 def change_cwd(path, quiet=False):
    706     """Return a context manager that changes the current working directory.
    707 
    708     Arguments:
    709 
    710       path: the directory to use as the temporary current working directory.
    711 
    712       quiet: if False (the default), the context manager raises an exception
    713         on error.  Otherwise, it issues only a warning and keeps the current
    714         working directory the same.
    715 
    716     """
    717     saved_dir = os.getcwd()
    718     try:
    719         os.chdir(path)
    720     except OSError:
    721         if not quiet:
    722             raise
    723         warnings.warn('tests may fail, unable to change CWD to: ' + path,
    724                       RuntimeWarning, stacklevel=3)
    725     try:
    726         yield os.getcwd()
    727     finally:
    728         os.chdir(saved_dir)
    729 
    730 
    731 @contextlib.contextmanager
    732 def temp_cwd(name='tempcwd', quiet=False):
    733     """
    734     Context manager that creates a temporary directory and set it as CWD.
    735 
    736     The new CWD is created in the current directory and it's named *name*.
    737     If *quiet* is False (default) and it's not possible to create or change
    738     the CWD, an error is raised.  If it's True, only a warning is raised
    739     and the original CWD is used.
    740     """
    741     if (have_unicode and isinstance(name, unicode) and
    742         not os.path.supports_unicode_filenames):
    743         try:
    744             name = name.encode(sys.getfilesystemencoding() or 'ascii')
    745         except UnicodeEncodeError:
    746             if not quiet:
    747                 raise unittest.SkipTest('unable to encode the cwd name with '
    748                                         'the filesystem encoding.')
    749     saved_dir = os.getcwd()
    750     is_temporary = False
    751     try:
    752         os.mkdir(name)
    753         os.chdir(name)
    754         is_temporary = True
    755     except OSError:
    756         if not quiet:
    757             raise
    758         warnings.warn('tests may fail, unable to change the CWD to ' + name,
    759                       RuntimeWarning, stacklevel=3)
    760     try:
    761         yield os.getcwd()
    762     finally:
    763         os.chdir(saved_dir)
    764         if is_temporary:
    765             rmtree(name)
    766 
    767 
    768 def findfile(file, here=__file__, subdir=None):
    769     """Try to find a file on sys.path and the working directory.  If it is not
    770     found the argument passed to the function is returned (this does not
    771     necessarily signal failure; could still be the legitimate path)."""
    772     if os.path.isabs(file):
    773         return file
    774     if subdir is not None:
    775         file = os.path.join(subdir, file)
    776     path = sys.path
    777     path = [os.path.dirname(here)] + path
    778     for dn in path:
    779         fn = os.path.join(dn, file)
    780         if os.path.exists(fn): return fn
    781     return file
    782 
    783 def sortdict(dict):
    784     "Like repr(dict), but in sorted order."
    785     items = dict.items()
    786     items.sort()
    787     reprpairs = ["%r: %r" % pair for pair in items]
    788     withcommas = ", ".join(reprpairs)
    789     return "{%s}" % withcommas
    790 
    791 def make_bad_fd():
    792     """
    793     Create an invalid file descriptor by opening and closing a file and return
    794     its fd.
    795     """
    796     file = open(TESTFN, "wb")
    797     try:
    798         return file.fileno()
    799     finally:
    800         file.close()
    801         unlink(TESTFN)
    802 
    803 def check_syntax_error(testcase, statement):
    804     testcase.assertRaises(SyntaxError, compile, statement,
    805                           '<test string>', 'exec')
    806 
    807 def open_urlresource(url, check=None):
    808     import urlparse, urllib2
    809 
    810     filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
    811 
    812     fn = os.path.join(os.path.dirname(__file__), "data", filename)
    813 
    814     def check_valid_file(fn):
    815         f = open(fn)
    816         if check is None:
    817             return f
    818         elif check(f):
    819             f.seek(0)
    820             return f
    821         f.close()
    822 
    823     if os.path.exists(fn):
    824         f = check_valid_file(fn)
    825         if f is not None:
    826             return f
    827         unlink(fn)
    828 
    829     # Verify the requirement before downloading the file
    830     requires('urlfetch')
    831 
    832     print >> get_original_stdout(), '\tfetching %s ...' % url
    833     f = urllib2.urlopen(url, timeout=15)
    834     try:
    835         with open(fn, "wb") as out:
    836             s = f.read()
    837             while s:
    838                 out.write(s)
    839                 s = f.read()
    840     finally:
    841         f.close()
    842 
    843     f = check_valid_file(fn)
    844     if f is not None:
    845         return f
    846     raise TestFailed('invalid resource "%s"' % fn)
    847 
    848 
    849 class WarningsRecorder(object):
    850     """Convenience wrapper for the warnings list returned on
    851        entry to the warnings.catch_warnings() context manager.
    852     """
    853     def __init__(self, warnings_list):
    854         self._warnings = warnings_list
    855         self._last = 0
    856 
    857     def __getattr__(self, attr):
    858         if len(self._warnings) > self._last:
    859             return getattr(self._warnings[-1], attr)
    860         elif attr in warnings.WarningMessage._WARNING_DETAILS:
    861             return None
    862         raise AttributeError("%r has no attribute %r" % (self, attr))
    863 
    864     @property
    865     def warnings(self):
    866         return self._warnings[self._last:]
    867 
    868     def reset(self):
    869         self._last = len(self._warnings)
    870 
    871 
    872 def _filterwarnings(filters, quiet=False):
    873     """Catch the warnings, then check if all the expected
    874     warnings have been raised and re-raise unexpected warnings.
    875     If 'quiet' is True, only re-raise the unexpected warnings.
    876     """
    877     # Clear the warning registry of the calling module
    878     # in order to re-raise the warnings.
    879     frame = sys._getframe(2)
    880     registry = frame.f_globals.get('__warningregistry__')
    881     if registry:
    882         registry.clear()
    883     with warnings.catch_warnings(record=True) as w:
    884         # Set filter "always" to record all warnings.  Because
    885         # test_warnings swap the module, we need to look up in
    886         # the sys.modules dictionary.
    887         sys.modules['warnings'].simplefilter("always")
    888         yield WarningsRecorder(w)
    889     # Filter the recorded warnings
    890     reraise = [warning.message for warning in w]
    891     missing = []
    892     for msg, cat in filters:
    893         seen = False
    894         for exc in reraise[:]:
    895             message = str(exc)
    896             # Filter out the matching messages
    897             if (re.match(msg, message, re.I) and
    898                 issubclass(exc.__class__, cat)):
    899                 seen = True
    900                 reraise.remove(exc)
    901         if not seen and not quiet:
    902             # This filter caught nothing
    903             missing.append((msg, cat.__name__))
    904     if reraise:
    905         raise AssertionError("unhandled warning %r" % reraise[0])
    906     if missing:
    907         raise AssertionError("filter (%r, %s) did not catch any warning" %
    908                              missing[0])
    909 
    910 
    911 @contextlib.contextmanager
    912 def check_warnings(*filters, **kwargs):
    913     """Context manager to silence warnings.
    914 
    915     Accept 2-tuples as positional arguments:
    916         ("message regexp", WarningCategory)
    917 
    918     Optional argument:
    919      - if 'quiet' is True, it does not fail if a filter catches nothing
    920         (default True without argument,
    921          default False if some filters are defined)
    922 
    923     Without argument, it defaults to:
    924         check_warnings(("", Warning), quiet=True)
    925     """
    926     quiet = kwargs.get('quiet')
    927     if not filters:
    928         filters = (("", Warning),)
    929         # Preserve backward compatibility
    930         if quiet is None:
    931             quiet = True
    932     return _filterwarnings(filters, quiet)
    933 
    934 
    935 @contextlib.contextmanager
    936 def check_py3k_warnings(*filters, **kwargs):
    937     """Context manager to silence py3k warnings.
    938 
    939     Accept 2-tuples as positional arguments:
    940         ("message regexp", WarningCategory)
    941 
    942     Optional argument:
    943      - if 'quiet' is True, it does not fail if a filter catches nothing
    944         (default False)
    945 
    946     Without argument, it defaults to:
    947         check_py3k_warnings(("", DeprecationWarning), quiet=False)
    948     """
    949     if sys.py3kwarning:
    950         if not filters:
    951             filters = (("", DeprecationWarning),)
    952     else:
    953         # It should not raise any py3k warning
    954         filters = ()
    955     return _filterwarnings(filters, kwargs.get('quiet'))
    956 
    957 
    958 class CleanImport(object):
    959     """Context manager to force import to return a new module reference.
    960 
    961     This is useful for testing module-level behaviours, such as
    962     the emission of a DeprecationWarning on import.
    963 
    964     Use like this:
    965 
    966         with CleanImport("foo"):
    967             importlib.import_module("foo") # new reference
    968     """
    969 
    970     def __init__(self, *module_names):
    971         self.original_modules = sys.modules.copy()
    972         for module_name in module_names:
    973             if module_name in sys.modules:
    974                 module = sys.modules[module_name]
    975                 # It is possible that module_name is just an alias for
    976                 # another module (e.g. stub for modules renamed in 3.x).
    977                 # In that case, we also need delete the real module to clear
    978                 # the import cache.
    979                 if module.__name__ != module_name:
    980                     del sys.modules[module.__name__]
    981                 del sys.modules[module_name]
    982 
    983     def __enter__(self):
    984         return self
    985 
    986     def __exit__(self, *ignore_exc):
    987         sys.modules.update(self.original_modules)
    988 
    989 
    990 class EnvironmentVarGuard(UserDict.DictMixin):
    991 
    992     """Class to help protect the environment variable properly.  Can be used as
    993     a context manager."""
    994 
    995     def __init__(self):
    996         self._environ = os.environ
    997         self._changed = {}
    998 
    999     def __getitem__(self, envvar):
   1000         return self._environ[envvar]
   1001 
   1002     def __setitem__(self, envvar, value):
   1003         # Remember the initial value on the first access
   1004         if envvar not in self._changed:
   1005             self._changed[envvar] = self._environ.get(envvar)
   1006         self._environ[envvar] = value
   1007 
   1008     def __delitem__(self, envvar):
   1009         # Remember the initial value on the first access
   1010         if envvar not in self._changed:
   1011             self._changed[envvar] = self._environ.get(envvar)
   1012         if envvar in self._environ:
   1013             del self._environ[envvar]
   1014 
   1015     def keys(self):
   1016         return self._environ.keys()
   1017 
   1018     def set(self, envvar, value):
   1019         self[envvar] = value
   1020 
   1021     def unset(self, envvar):
   1022         del self[envvar]
   1023 
   1024     def __enter__(self):
   1025         return self
   1026 
   1027     def __exit__(self, *ignore_exc):
   1028         for (k, v) in self._changed.items():
   1029             if v is None:
   1030                 if k in self._environ:
   1031                     del self._environ[k]
   1032             else:
   1033                 self._environ[k] = v
   1034         os.environ = self._environ
   1035 
   1036 
   1037 class DirsOnSysPath(object):
   1038     """Context manager to temporarily add directories to sys.path.
   1039 
   1040     This makes a copy of sys.path, appends any directories given
   1041     as positional arguments, then reverts sys.path to the copied
   1042     settings when the context ends.
   1043 
   1044     Note that *all* sys.path modifications in the body of the
   1045     context manager, including replacement of the object,
   1046     will be reverted at the end of the block.
   1047     """
   1048 
   1049     def __init__(self, *paths):
   1050         self.original_value = sys.path[:]
   1051         self.original_object = sys.path
   1052         sys.path.extend(paths)
   1053 
   1054     def __enter__(self):
   1055         return self
   1056 
   1057     def __exit__(self, *ignore_exc):
   1058         sys.path = self.original_object
   1059         sys.path[:] = self.original_value
   1060 
   1061 
   1062 class TransientResource(object):
   1063 
   1064     """Raise ResourceDenied if an exception is raised while the context manager
   1065     is in effect that matches the specified exception and attributes."""
   1066 
   1067     def __init__(self, exc, **kwargs):
   1068         self.exc = exc
   1069         self.attrs = kwargs
   1070 
   1071     def __enter__(self):
   1072         return self
   1073 
   1074     def __exit__(self, type_=None, value=None, traceback=None):
   1075         """If type_ is a subclass of self.exc and value has attributes matching
   1076         self.attrs, raise ResourceDenied.  Otherwise let the exception
   1077         propagate (if any)."""
   1078         if type_ is not None and issubclass(self.exc, type_):
   1079             for attr, attr_value in self.attrs.iteritems():
   1080                 if not hasattr(value, attr):
   1081                     break
   1082                 if getattr(value, attr) != attr_value:
   1083                     break
   1084             else:
   1085                 raise ResourceDenied("an optional resource is not available")
   1086 
   1087 
   1088 @contextlib.contextmanager
   1089 def transient_internet(resource_name, timeout=30.0, errnos=()):
   1090     """Return a context manager that raises ResourceDenied when various issues
   1091     with the Internet connection manifest themselves as exceptions."""
   1092     default_errnos = [
   1093         ('ECONNREFUSED', 111),
   1094         ('ECONNRESET', 104),
   1095         ('EHOSTUNREACH', 113),
   1096         ('ENETUNREACH', 101),
   1097         ('ETIMEDOUT', 110),
   1098     ]
   1099     default_gai_errnos = [
   1100         ('EAI_AGAIN', -3),
   1101         ('EAI_FAIL', -4),
   1102         ('EAI_NONAME', -2),
   1103         ('EAI_NODATA', -5),
   1104         # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
   1105         # implementation actually returns WSANO_DATA i.e. 11004.
   1106         ('WSANO_DATA', 11004),
   1107     ]
   1108 
   1109     denied = ResourceDenied("Resource '%s' is not available" % resource_name)
   1110     captured_errnos = errnos
   1111     gai_errnos = []
   1112     if not captured_errnos:
   1113         captured_errnos = [getattr(errno, name, num)
   1114                            for (name, num) in default_errnos]
   1115         gai_errnos = [getattr(socket, name, num)
   1116                       for (name, num) in default_gai_errnos]
   1117 
   1118     def filter_error(err):
   1119         n = getattr(err, 'errno', None)
   1120         if (isinstance(err, socket.timeout) or
   1121             (isinstance(err, socket.gaierror) and n in gai_errnos) or
   1122             n in captured_errnos):
   1123             if not verbose:
   1124                 sys.stderr.write(denied.args[0] + "\n")
   1125             raise denied
   1126 
   1127     old_timeout = socket.getdefaulttimeout()
   1128     try:
   1129         if timeout is not None:
   1130             socket.setdefaulttimeout(timeout)
   1131         yield
   1132     except IOError as err:
   1133         # urllib can wrap original socket errors multiple times (!), we must
   1134         # unwrap to get at the original error.
   1135         while True:
   1136             a = err.args
   1137             if len(a) >= 1 and isinstance(a[0], IOError):
   1138                 err = a[0]
   1139             # The error can also be wrapped as args[1]:
   1140             #    except socket.error as msg:
   1141             #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
   1142             elif len(a) >= 2 and isinstance(a[1], IOError):
   1143                 err = a[1]
   1144             else:
   1145                 break
   1146         filter_error(err)
   1147         raise
   1148     # XXX should we catch generic exceptions and look for their
   1149     # __cause__ or __context__?
   1150     finally:
   1151         socket.setdefaulttimeout(old_timeout)
   1152 
   1153 
   1154 @contextlib.contextmanager
   1155 def captured_output(stream_name):
   1156     """Return a context manager used by captured_stdout and captured_stdin
   1157     that temporarily replaces the sys stream *stream_name* with a StringIO."""
   1158     import StringIO
   1159     orig_stdout = getattr(sys, stream_name)
   1160     setattr(sys, stream_name, StringIO.StringIO())
   1161     try:
   1162         yield getattr(sys, stream_name)
   1163     finally:
   1164         setattr(sys, stream_name, orig_stdout)
   1165 
   1166 def captured_stdout():
   1167     """Capture the output of sys.stdout:
   1168 
   1169        with captured_stdout() as s:
   1170            print "hello"
   1171        self.assertEqual(s.getvalue(), "hello")
   1172     """
   1173     return captured_output("stdout")
   1174 
   1175 def captured_stderr():
   1176     return captured_output("stderr")
   1177 
   1178 def captured_stdin():
   1179     return captured_output("stdin")
   1180 
   1181 def gc_collect():
   1182     """Force as many objects as possible to be collected.
   1183 
   1184     In non-CPython implementations of Python, this is needed because timely
   1185     deallocation is not guaranteed by the garbage collector.  (Even in CPython
   1186     this can be the case in case of reference cycles.)  This means that __del__
   1187     methods may be called later than expected and weakrefs may remain alive for
   1188     longer than expected.  This function tries its best to force all garbage
   1189     objects to disappear.
   1190     """
   1191     gc.collect()
   1192     if is_jython:
   1193         time.sleep(0.1)
   1194     gc.collect()
   1195     gc.collect()
   1196 
   1197 
   1198 _header = '2P'
   1199 if hasattr(sys, "gettotalrefcount"):
   1200     _header = '2P' + _header
   1201 _vheader = _header + 'P'
   1202 
   1203 def calcobjsize(fmt):
   1204     return struct.calcsize(_header + fmt + '0P')
   1205 
   1206 def calcvobjsize(fmt):
   1207     return struct.calcsize(_vheader + fmt + '0P')
   1208 
   1209 
   1210 _TPFLAGS_HAVE_GC = 1<<14
   1211 _TPFLAGS_HEAPTYPE = 1<<9
   1212 
   1213 def check_sizeof(test, o, size):
   1214     import _testcapi
   1215     result = sys.getsizeof(o)
   1216     # add GC header size
   1217     if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
   1218         ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
   1219         size += _testcapi.SIZEOF_PYGC_HEAD
   1220     msg = 'wrong size for %s: got %d, expected %d' \
   1221             % (type(o), result, size)
   1222     test.assertEqual(result, size, msg)
   1223 
   1224 
   1225 #=======================================================================
   1226 # Decorator for running a function in a different locale, correctly resetting
   1227 # it afterwards.
   1228 
   1229 def run_with_locale(catstr, *locales):
   1230     def decorator(func):
   1231         def inner(*args, **kwds):
   1232             try:
   1233                 import locale
   1234                 category = getattr(locale, catstr)
   1235                 orig_locale = locale.setlocale(category)
   1236             except AttributeError:
   1237                 # if the test author gives us an invalid category string
   1238                 raise
   1239             except:
   1240                 # cannot retrieve original locale, so do nothing
   1241                 locale = orig_locale = None
   1242             else:
   1243                 for loc in locales:
   1244                     try:
   1245                         locale.setlocale(category, loc)
   1246                         break
   1247                     except:
   1248                         pass
   1249 
   1250             # now run the function, resetting the locale on exceptions
   1251             try:
   1252                 return func(*args, **kwds)
   1253             finally:
   1254                 if locale and orig_locale:
   1255                     locale.setlocale(category, orig_locale)
   1256         inner.func_name = func.func_name
   1257         inner.__doc__ = func.__doc__
   1258         return inner
   1259     return decorator
   1260 
   1261 #=======================================================================
   1262 # Decorator for running a function in a specific timezone, correctly
   1263 # resetting it afterwards.
   1264 
   1265 def run_with_tz(tz):
   1266     def decorator(func):
   1267         def inner(*args, **kwds):
   1268             try:
   1269                 tzset = time.tzset
   1270             except AttributeError:
   1271                 raise unittest.SkipTest("tzset required")
   1272             if 'TZ' in os.environ:
   1273                 orig_tz = os.environ['TZ']
   1274             else:
   1275                 orig_tz = None
   1276             os.environ['TZ'] = tz
   1277             tzset()
   1278 
   1279             # now run the function, resetting the tz on exceptions
   1280             try:
   1281                 return func(*args, **kwds)
   1282             finally:
   1283                 if orig_tz is None:
   1284                     del os.environ['TZ']
   1285                 else:
   1286                     os.environ['TZ'] = orig_tz
   1287                 time.tzset()
   1288 
   1289         inner.__name__ = func.__name__
   1290         inner.__doc__ = func.__doc__
   1291         return inner
   1292     return decorator
   1293 
   1294 #=======================================================================
   1295 # Big-memory-test support. Separate from 'resources' because memory use should be configurable.
   1296 
   1297 # Some handy shorthands. Note that these are used for byte-limits as well
   1298 # as size-limits, in the various bigmem tests
   1299 _1M = 1024*1024
   1300 _1G = 1024 * _1M
   1301 _2G = 2 * _1G
   1302 _4G = 4 * _1G
   1303 
   1304 MAX_Py_ssize_t = sys.maxsize
   1305 
   1306 def set_memlimit(limit):
   1307     global max_memuse
   1308     global real_max_memuse
   1309     sizes = {
   1310         'k': 1024,
   1311         'm': _1M,
   1312         'g': _1G,
   1313         't': 1024*_1G,
   1314     }
   1315     m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
   1316                  re.IGNORECASE | re.VERBOSE)
   1317     if m is None:
   1318         raise ValueError('Invalid memory limit %r' % (limit,))
   1319     memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
   1320     real_max_memuse = memlimit
   1321     if memlimit > MAX_Py_ssize_t:
   1322         memlimit = MAX_Py_ssize_t
   1323     if memlimit < _2G - 1:
   1324         raise ValueError('Memory limit %r too low to be useful' % (limit,))
   1325     max_memuse = memlimit
   1326 
   1327 def bigmemtest(minsize, memuse, overhead=5*_1M):
   1328     """Decorator for bigmem tests.
   1329 
   1330     'minsize' is the minimum useful size for the test (in arbitrary,
   1331     test-interpreted units.) 'memuse' is the number of 'bytes per size' for
   1332     the test, or a good estimate of it. 'overhead' specifies fixed overhead,
   1333     independent of the testsize, and defaults to 5Mb.
   1334 
   1335     The decorator tries to guess a good value for 'size' and passes it to
   1336     the decorated test function. If minsize * memuse is more than the
   1337     allowed memory use (as defined by max_memuse), the test is skipped.
   1338     Otherwise, minsize is adjusted upward to use up to max_memuse.
   1339     """
   1340     def decorator(f):
   1341         def wrapper(self):
   1342             if not max_memuse:
   1343                 # If max_memuse is 0 (the default),
   1344                 # we still want to run the tests with size set to a few kb,
   1345                 # to make sure they work. We still want to avoid using
   1346                 # too much memory, though, but we do that noisily.
   1347                 maxsize = 5147
   1348                 self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
   1349             else:
   1350                 maxsize = int((max_memuse - overhead) / memuse)
   1351                 if maxsize < minsize:
   1352                     # Really ought to print 'test skipped' or something
   1353                     if verbose:
   1354                         sys.stderr.write("Skipping %s because of memory "
   1355                                          "constraint\n" % (f.__name__,))
   1356                     return
   1357                 # Try to keep some breathing room in memory use
   1358                 maxsize = max(maxsize - 50 * _1M, minsize)
   1359             return f(self, maxsize)
   1360         wrapper.minsize = minsize
   1361         wrapper.memuse = memuse
   1362         wrapper.overhead = overhead
   1363         return wrapper
   1364     return decorator
   1365 
   1366 def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
   1367     def decorator(f):
   1368         def wrapper(self):
   1369             if not real_max_memuse:
   1370                 maxsize = 5147
   1371             else:
   1372                 maxsize = size
   1373 
   1374             if ((real_max_memuse or not dry_run)
   1375                 and real_max_memuse < maxsize * memuse):
   1376                 if verbose:
   1377                     sys.stderr.write("Skipping %s because of memory "
   1378                                      "constraint\n" % (f.__name__,))
   1379                 return
   1380 
   1381             return f(self, maxsize)
   1382         wrapper.size = size
   1383         wrapper.memuse = memuse
   1384         wrapper.overhead = overhead
   1385         return wrapper
   1386     return decorator
   1387 
   1388 def bigaddrspacetest(f):
   1389     """Decorator for tests that fill the address space."""
   1390     def wrapper(self):
   1391         if max_memuse < MAX_Py_ssize_t:
   1392             if verbose:
   1393                 sys.stderr.write("Skipping %s because of memory "
   1394                                  "constraint\n" % (f.__name__,))
   1395         else:
   1396             return f(self)
   1397     return wrapper
   1398 
   1399 #=======================================================================
   1400 # unittest integration.
   1401 
   1402 class BasicTestRunner:
   1403     def run(self, test):
   1404         result = unittest.TestResult()
   1405         test(result)
   1406         return result
   1407 
   1408 def _id(obj):
   1409     return obj
   1410 
   1411 def requires_resource(resource):
   1412     if resource == 'gui' and not _is_gui_available():
   1413         return unittest.skip(_is_gui_available.reason)
   1414     if is_resource_enabled(resource):
   1415         return _id
   1416     else:
   1417         return unittest.skip("resource {0!r} is not enabled".format(resource))
   1418 
   1419 def cpython_only(test):
   1420     """
   1421     Decorator for tests only applicable on CPython.
   1422     """
   1423     return impl_detail(cpython=True)(test)
   1424 
   1425 def impl_detail(msg=None, **guards):
   1426     if check_impl_detail(**guards):
   1427         return _id
   1428     if msg is None:
   1429         guardnames, default = _parse_guards(guards)
   1430         if default:
   1431             msg = "implementation detail not available on {0}"
   1432         else:
   1433             msg = "implementation detail specific to {0}"
   1434         guardnames = sorted(guardnames.keys())
   1435         msg = msg.format(' or '.join(guardnames))
   1436     return unittest.skip(msg)
   1437 
   1438 def _parse_guards(guards):
   1439     # Returns a tuple ({platform_name: run_me}, default_value)
   1440     if not guards:
   1441         return ({'cpython': True}, False)
   1442     is_true = guards.values()[0]
   1443     assert guards.values() == [is_true] * len(guards)   # all True or all False
   1444     return (guards, not is_true)
   1445 
   1446 # Use the following check to guard CPython's implementation-specific tests --
   1447 # or to run them only on the implementation(s) guarded by the arguments.
   1448 def check_impl_detail(**guards):
   1449     """This function returns True or False depending on the host platform.
   1450        Examples:
   1451           if check_impl_detail():               # only on CPython (default)
   1452           if check_impl_detail(jython=True):    # only on Jython
   1453           if check_impl_detail(cpython=False):  # everywhere except on CPython
   1454     """
   1455     guards, default = _parse_guards(guards)
   1456     return guards.get(platform.python_implementation().lower(), default)
   1457 
   1458 
   1459 
   1460 def _run_suite(suite):
   1461     """Run tests from a unittest.TestSuite-derived class."""
   1462     if verbose:
   1463         runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
   1464     else:
   1465         runner = BasicTestRunner()
   1466 
   1467     result = runner.run(suite)
   1468     if not result.wasSuccessful():
   1469         if len(result.errors) == 1 and not result.failures:
   1470             err = result.errors[0][1]
   1471         elif len(result.failures) == 1 and not result.errors:
   1472             err = result.failures[0][1]
   1473         else:
   1474             err = "multiple errors occurred"
   1475             if not verbose:
   1476                 err += "; run in verbose mode for details"
   1477         raise TestFailed(err)
   1478 
   1479 
   1480 def run_unittest(*classes):
   1481     """Run tests from unittest.TestCase-derived classes."""
   1482     valid_types = (unittest.TestSuite, unittest.TestCase)
   1483     suite = unittest.TestSuite()
   1484     for cls in classes:
   1485         if isinstance(cls, str):
   1486             if cls in sys.modules:
   1487                 suite.addTest(unittest.findTestCases(sys.modules[cls]))
   1488             else:
   1489                 raise ValueError("str arguments must be keys in sys.modules")
   1490         elif isinstance(cls, valid_types):
   1491             suite.addTest(cls)
   1492         else:
   1493             suite.addTest(unittest.makeSuite(cls))
   1494     _run_suite(suite)
   1495 
   1496 #=======================================================================
   1497 # Check for the presence of docstrings.
   1498 
   1499 HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
   1500                    sys.platform == 'win32' or
   1501                    sysconfig.get_config_var('WITH_DOC_STRINGS'))
   1502 
   1503 requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
   1504                                           "test requires docstrings")
   1505 
   1506 
   1507 #=======================================================================
   1508 # doctest driver.
   1509 
   1510 def run_doctest(module, verbosity=None):
   1511     """Run doctest on the given module.  Return (#failures, #tests).
   1512 
   1513     If optional argument verbosity is not specified (or is None), pass
   1514     test_support's belief about verbosity on to doctest.  Else doctest's
   1515     usual behavior is used (it searches sys.argv for -v).
   1516     """
   1517 
   1518     import doctest
   1519 
   1520     if verbosity is None:
   1521         verbosity = verbose
   1522     else:
   1523         verbosity = None
   1524 
   1525     # Direct doctest output (normally just errors) to real stdout; doctest
   1526     # output shouldn't be compared by regrtest.
   1527     save_stdout = sys.stdout
   1528     sys.stdout = get_original_stdout()
   1529     try:
   1530         f, t = doctest.testmod(module, verbose=verbosity)
   1531         if f:
   1532             raise TestFailed("%d of %d doctests failed" % (f, t))
   1533     finally:
   1534         sys.stdout = save_stdout
   1535     if verbose:
   1536         print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
   1537     return f, t
   1538 
   1539 #=======================================================================
   1540 # Threading support to prevent reporting refleaks when running regrtest.py -R
   1541 
   1542 # NOTE: we use thread._count() rather than threading.enumerate() (or the
   1543 # moral equivalent thereof) because a threading.Thread object is still alive
   1544 # until its __bootstrap() method has returned, even after it has been
   1545 # unregistered from the threading module.
   1546 # thread._count(), on the other hand, only gets decremented *after* the
   1547 # __bootstrap() method has returned, which gives us reliable reference counts
   1548 # at the end of a test run.
   1549 
   1550 def threading_setup():
   1551     if thread:
   1552         return thread._count(),
   1553     else:
   1554         return 1,
   1555 
   1556 def threading_cleanup(nb_threads):
   1557     if not thread:
   1558         return
   1559 
   1560     _MAX_COUNT = 10
   1561     for count in range(_MAX_COUNT):
   1562         n = thread._count()
   1563         if n == nb_threads:
   1564             break
   1565         time.sleep(0.1)
   1566     # XXX print a warning in case of failure?
   1567 
   1568 def reap_threads(func):
   1569     """Use this function when threads are being used.  This will
   1570     ensure that the threads are cleaned up even when the test fails.
   1571     If threading is unavailable this function does nothing.
   1572     """
   1573     if not thread:
   1574         return func
   1575 
   1576     @functools.wraps(func)
   1577     def decorator(*args):
   1578         key = threading_setup()
   1579         try:
   1580             return func(*args)
   1581         finally:
   1582             threading_cleanup(*key)
   1583     return decorator
   1584 
   1585 def reap_children():
   1586     """Use this function at the end of test_main() whenever sub-processes
   1587     are started.  This will help ensure that no extra children (zombies)
   1588     stick around to hog resources and create problems when looking
   1589     for refleaks.
   1590     """
   1591 
   1592     # Reap all our dead child processes so we don't leave zombies around.
   1593     # These hog resources and might be causing some of the buildbots to die.
   1594     if hasattr(os, 'waitpid'):
   1595         any_process = -1
   1596         while True:
   1597             try:
   1598                 # This will raise an exception on Windows.  That's ok.
   1599                 pid, status = os.waitpid(any_process, os.WNOHANG)
   1600                 if pid == 0:
   1601                     break
   1602             except:
   1603                 break
   1604 
   1605 @contextlib.contextmanager
   1606 def start_threads(threads, unlock=None):
   1607     threads = list(threads)
   1608     started = []
   1609     try:
   1610         try:
   1611             for t in threads:
   1612                 t.start()
   1613                 started.append(t)
   1614         except:
   1615             if verbose:
   1616                 print("Can't start %d threads, only %d threads started" %
   1617                       (len(threads), len(started)))
   1618             raise
   1619         yield
   1620     finally:
   1621         if unlock:
   1622             unlock()
   1623         endtime = starttime = time.time()
   1624         for timeout in range(1, 16):
   1625             endtime += 60
   1626             for t in started:
   1627                 t.join(max(endtime - time.time(), 0.01))
   1628             started = [t for t in started if t.isAlive()]
   1629             if not started:
   1630                 break
   1631             if verbose:
   1632                 print('Unable to join %d threads during a period of '
   1633                       '%d minutes' % (len(started), timeout))
   1634     started = [t for t in started if t.isAlive()]
   1635     if started:
   1636         raise AssertionError('Unable to join %d threads' % len(started))
   1637 
   1638 @contextlib.contextmanager
   1639 def swap_attr(obj, attr, new_val):
   1640     """Temporary swap out an attribute with a new object.
   1641 
   1642     Usage:
   1643         with swap_attr(obj, "attr", 5):
   1644             ...
   1645 
   1646         This will set obj.attr to 5 for the duration of the with: block,
   1647         restoring the old value at the end of the block. If `attr` doesn't
   1648         exist on `obj`, it will be created and then deleted at the end of the
   1649         block.
   1650     """
   1651     if hasattr(obj, attr):
   1652         real_val = getattr(obj, attr)
   1653         setattr(obj, attr, new_val)
   1654         try:
   1655             yield
   1656         finally:
   1657             setattr(obj, attr, real_val)
   1658     else:
   1659         setattr(obj, attr, new_val)
   1660         try:
   1661             yield
   1662         finally:
   1663             delattr(obj, attr)
   1664 
   1665 def py3k_bytes(b):
   1666     """Emulate the py3k bytes() constructor.
   1667 
   1668     NOTE: This is only a best effort function.
   1669     """
   1670     try:
   1671         # memoryview?
   1672         return b.tobytes()
   1673     except AttributeError:
   1674         try:
   1675             # iterable of ints?
   1676             return b"".join(chr(x) for x in b)
   1677         except TypeError:
   1678             return bytes(b)
   1679 
   1680 def args_from_interpreter_flags():
   1681     """Return a list of command-line arguments reproducing the current
   1682     settings in sys.flags."""
   1683     import subprocess
   1684     return subprocess._args_from_interpreter_flags()
   1685 
   1686 def strip_python_stderr(stderr):
   1687     """Strip the stderr of a Python process from potential debug output
   1688     emitted by the interpreter.
   1689 
   1690     This will typically be run on the result of the communicate() method
   1691     of a subprocess.Popen object.
   1692     """
   1693     stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
   1694     return stderr
   1695 
   1696 
   1697 def check_free_after_iterating(test, iter, cls, args=()):
   1698     class A(cls):
   1699         def __del__(self):
   1700             done[0] = True
   1701             try:
   1702                 next(it)
   1703             except StopIteration:
   1704                 pass
   1705 
   1706     done = [False]
   1707     it = iter(A(*args))
   1708     # Issue 26494: Shouldn't crash
   1709     test.assertRaises(StopIteration, next, it)
   1710     # The sequence should be deallocated just after the end of iterating
   1711     gc_collect()
   1712     test.assertTrue(done[0])
   1713