Home | History | Annotate | Download | only in test_importlib
      1 import abc
      2 import builtins
      3 import contextlib
      4 import errno
      5 import functools
      6 import importlib
      7 from importlib import machinery, util, invalidate_caches
      8 from importlib.abc import ResourceReader
      9 import io
     10 import os
     11 import os.path
     12 from pathlib import Path, PurePath
     13 from test import support
     14 import unittest
     15 import sys
     16 import tempfile
     17 import types
     18 
     19 from . import data01
     20 from . import zipdata01
     21 
     22 
     23 BUILTINS = types.SimpleNamespace()
     24 BUILTINS.good_name = None
     25 BUILTINS.bad_name = None
     26 if 'errno' in sys.builtin_module_names:
     27     BUILTINS.good_name = 'errno'
     28 if 'importlib' not in sys.builtin_module_names:
     29     BUILTINS.bad_name = 'importlib'
     30 
     31 EXTENSIONS = types.SimpleNamespace()
     32 EXTENSIONS.path = None
     33 EXTENSIONS.ext = None
     34 EXTENSIONS.filename = None
     35 EXTENSIONS.file_path = None
     36 EXTENSIONS.name = '_testcapi'
     37 
     38 def _extension_details():
     39     global EXTENSIONS
     40     for path in sys.path:
     41         for ext in machinery.EXTENSION_SUFFIXES:
     42             filename = EXTENSIONS.name + ext
     43             file_path = os.path.join(path, filename)
     44             if os.path.exists(file_path):
     45                 EXTENSIONS.path = path
     46                 EXTENSIONS.ext = ext
     47                 EXTENSIONS.filename = filename
     48                 EXTENSIONS.file_path = file_path
     49                 return
     50 
     51 _extension_details()
     52 
     53 
     54 def import_importlib(module_name):
     55     """Import a module from importlib both w/ and w/o _frozen_importlib."""
     56     fresh = ('importlib',) if '.' in module_name else ()
     57     frozen = support.import_fresh_module(module_name)
     58     source = support.import_fresh_module(module_name, fresh=fresh,
     59                                          blocked=('_frozen_importlib', '_frozen_importlib_external'))
     60     return {'Frozen': frozen, 'Source': source}
     61 
     62 
     63 def specialize_class(cls, kind, base=None, **kwargs):
     64     # XXX Support passing in submodule names--load (and cache) them?
     65     # That would clean up the test modules a bit more.
     66     if base is None:
     67         base = unittest.TestCase
     68     elif not isinstance(base, type):
     69         base = base[kind]
     70     name = '{}_{}'.format(kind, cls.__name__)
     71     bases = (cls, base)
     72     specialized = types.new_class(name, bases)
     73     specialized.__module__ = cls.__module__
     74     specialized._NAME = cls.__name__
     75     specialized._KIND = kind
     76     for attr, values in kwargs.items():
     77         value = values[kind]
     78         setattr(specialized, attr, value)
     79     return specialized
     80 
     81 
     82 def split_frozen(cls, base=None, **kwargs):
     83     frozen = specialize_class(cls, 'Frozen', base, **kwargs)
     84     source = specialize_class(cls, 'Source', base, **kwargs)
     85     return frozen, source
     86 
     87 
     88 def test_both(test_class, base=None, **kwargs):
     89     return split_frozen(test_class, base, **kwargs)
     90 
     91 
     92 CASE_INSENSITIVE_FS = True
     93 # Windows is the only OS that is *always* case-insensitive
     94 # (OS X *can* be case-sensitive).
     95 if sys.platform not in ('win32', 'cygwin'):
     96     changed_name = __file__.upper()
     97     if changed_name == __file__:
     98         changed_name = __file__.lower()
     99     if not os.path.exists(changed_name):
    100         CASE_INSENSITIVE_FS = False
    101 
    102 source_importlib = import_importlib('importlib')['Source']
    103 __import__ = {'Frozen': staticmethod(builtins.__import__),
    104               'Source': staticmethod(source_importlib.__import__)}
    105 
    106 
    107 def case_insensitive_tests(test):
    108     """Class decorator that nullifies tests requiring a case-insensitive
    109     file system."""
    110     return unittest.skipIf(not CASE_INSENSITIVE_FS,
    111                             "requires a case-insensitive filesystem")(test)
    112 
    113 
    114 def submodule(parent, name, pkg_dir, content=''):
    115     path = os.path.join(pkg_dir, name + '.py')
    116     with open(path, 'w') as subfile:
    117         subfile.write(content)
    118     return '{}.{}'.format(parent, name), path
    119 
    120 
    121 @contextlib.contextmanager
    122 def uncache(*names):
    123     """Uncache a module from sys.modules.
    124 
    125     A basic sanity check is performed to prevent uncaching modules that either
    126     cannot/shouldn't be uncached.
    127 
    128     """
    129     for name in names:
    130         if name in ('sys', 'marshal', 'imp'):
    131             raise ValueError(
    132                 "cannot uncache {0}".format(name))
    133         try:
    134             del sys.modules[name]
    135         except KeyError:
    136             pass
    137     try:
    138         yield
    139     finally:
    140         for name in names:
    141             try:
    142                 del sys.modules[name]
    143             except KeyError:
    144                 pass
    145 
    146 
    147 @contextlib.contextmanager
    148 def temp_module(name, content='', *, pkg=False):
    149     conflicts = [n for n in sys.modules if n.partition('.')[0] == name]
    150     with support.temp_cwd(None) as cwd:
    151         with uncache(name, *conflicts):
    152             with support.DirsOnSysPath(cwd):
    153                 invalidate_caches()
    154 
    155                 location = os.path.join(cwd, name)
    156                 if pkg:
    157                     modpath = os.path.join(location, '__init__.py')
    158                     os.mkdir(name)
    159                 else:
    160                     modpath = location + '.py'
    161                     if content is None:
    162                         # Make sure the module file gets created.
    163                         content = ''
    164                 if content is not None:
    165                     # not a namespace package
    166                     with open(modpath, 'w') as modfile:
    167                         modfile.write(content)
    168                 yield location
    169 
    170 
    171 @contextlib.contextmanager
    172 def import_state(**kwargs):
    173     """Context manager to manage the various importers and stored state in the
    174     sys module.
    175 
    176     The 'modules' attribute is not supported as the interpreter state stores a
    177     pointer to the dict that the interpreter uses internally;
    178     reassigning to sys.modules does not have the desired effect.
    179 
    180     """
    181     originals = {}
    182     try:
    183         for attr, default in (('meta_path', []), ('path', []),
    184                               ('path_hooks', []),
    185                               ('path_importer_cache', {})):
    186             originals[attr] = getattr(sys, attr)
    187             if attr in kwargs:
    188                 new_value = kwargs[attr]
    189                 del kwargs[attr]
    190             else:
    191                 new_value = default
    192             setattr(sys, attr, new_value)
    193         if len(kwargs):
    194             raise ValueError(
    195                     'unrecognized arguments: {0}'.format(kwargs.keys()))
    196         yield
    197     finally:
    198         for attr, value in originals.items():
    199             setattr(sys, attr, value)
    200 
    201 
    202 class _ImporterMock:
    203 
    204     """Base class to help with creating importer mocks."""
    205 
    206     def __init__(self, *names, module_code={}):
    207         self.modules = {}
    208         self.module_code = {}
    209         for name in names:
    210             if not name.endswith('.__init__'):
    211                 import_name = name
    212             else:
    213                 import_name = name[:-len('.__init__')]
    214             if '.' not in name:
    215                 package = None
    216             elif import_name == name:
    217                 package = name.rsplit('.', 1)[0]
    218             else:
    219                 package = import_name
    220             module = types.ModuleType(import_name)
    221             module.__loader__ = self
    222             module.__file__ = '<mock __file__>'
    223             module.__package__ = package
    224             module.attr = name
    225             if import_name != name:
    226                 module.__path__ = ['<mock __path__>']
    227             self.modules[import_name] = module
    228             if import_name in module_code:
    229                 self.module_code[import_name] = module_code[import_name]
    230 
    231     def __getitem__(self, name):
    232         return self.modules[name]
    233 
    234     def __enter__(self):
    235         self._uncache = uncache(*self.modules.keys())
    236         self._uncache.__enter__()
    237         return self
    238 
    239     def __exit__(self, *exc_info):
    240         self._uncache.__exit__(None, None, None)
    241 
    242 
    243 class mock_modules(_ImporterMock):
    244 
    245     """Importer mock using PEP 302 APIs."""
    246 
    247     def find_module(self, fullname, path=None):
    248         if fullname not in self.modules:
    249             return None
    250         else:
    251             return self
    252 
    253     def load_module(self, fullname):
    254         if fullname not in self.modules:
    255             raise ImportError
    256         else:
    257             sys.modules[fullname] = self.modules[fullname]
    258             if fullname in self.module_code:
    259                 try:
    260                     self.module_code[fullname]()
    261                 except Exception:
    262                     del sys.modules[fullname]
    263                     raise
    264             return self.modules[fullname]
    265 
    266 
    267 class mock_spec(_ImporterMock):
    268 
    269     """Importer mock using PEP 451 APIs."""
    270 
    271     def find_spec(self, fullname, path=None, parent=None):
    272         try:
    273             module = self.modules[fullname]
    274         except KeyError:
    275             return None
    276         spec = util.spec_from_file_location(
    277                 fullname, module.__file__, loader=self,
    278                 submodule_search_locations=getattr(module, '__path__', None))
    279         return spec
    280 
    281     def create_module(self, spec):
    282         if spec.name not in self.modules:
    283             raise ImportError
    284         return self.modules[spec.name]
    285 
    286     def exec_module(self, module):
    287         try:
    288             self.module_code[module.__spec__.name]()
    289         except KeyError:
    290             pass
    291 
    292 
    293 def writes_bytecode_files(fxn):
    294     """Decorator to protect sys.dont_write_bytecode from mutation and to skip
    295     tests that require it to be set to False."""
    296     if sys.dont_write_bytecode:
    297         return lambda *args, **kwargs: None
    298     @functools.wraps(fxn)
    299     def wrapper(*args, **kwargs):
    300         original = sys.dont_write_bytecode
    301         sys.dont_write_bytecode = False
    302         try:
    303             to_return = fxn(*args, **kwargs)
    304         finally:
    305             sys.dont_write_bytecode = original
    306         return to_return
    307     return wrapper
    308 
    309 
    310 def ensure_bytecode_path(bytecode_path):
    311     """Ensure that the __pycache__ directory for PEP 3147 pyc file exists.
    312 
    313     :param bytecode_path: File system path to PEP 3147 pyc file.
    314     """
    315     try:
    316         os.mkdir(os.path.dirname(bytecode_path))
    317     except OSError as error:
    318         if error.errno != errno.EEXIST:
    319             raise
    320 
    321 
    322 @contextlib.contextmanager
    323 def create_modules(*names):
    324     """Temporarily create each named module with an attribute (named 'attr')
    325     that contains the name passed into the context manager that caused the
    326     creation of the module.
    327 
    328     All files are created in a temporary directory returned by
    329     tempfile.mkdtemp(). This directory is inserted at the beginning of
    330     sys.path. When the context manager exits all created files (source and
    331     bytecode) are explicitly deleted.
    332 
    333     No magic is performed when creating packages! This means that if you create
    334     a module within a package you must also create the package's __init__ as
    335     well.
    336 
    337     """
    338     source = 'attr = {0!r}'
    339     created_paths = []
    340     mapping = {}
    341     state_manager = None
    342     uncache_manager = None
    343     try:
    344         temp_dir = tempfile.mkdtemp()
    345         mapping['.root'] = temp_dir
    346         import_names = set()
    347         for name in names:
    348             if not name.endswith('__init__'):
    349                 import_name = name
    350             else:
    351                 import_name = name[:-len('.__init__')]
    352             import_names.add(import_name)
    353             if import_name in sys.modules:
    354                 del sys.modules[import_name]
    355             name_parts = name.split('.')
    356             file_path = temp_dir
    357             for directory in name_parts[:-1]:
    358                 file_path = os.path.join(file_path, directory)
    359                 if not os.path.exists(file_path):
    360                     os.mkdir(file_path)
    361                     created_paths.append(file_path)
    362             file_path = os.path.join(file_path, name_parts[-1] + '.py')
    363             with open(file_path, 'w') as file:
    364                 file.write(source.format(name))
    365             created_paths.append(file_path)
    366             mapping[name] = file_path
    367         uncache_manager = uncache(*import_names)
    368         uncache_manager.__enter__()
    369         state_manager = import_state(path=[temp_dir])
    370         state_manager.__enter__()
    371         yield mapping
    372     finally:
    373         if state_manager is not None:
    374             state_manager.__exit__(None, None, None)
    375         if uncache_manager is not None:
    376             uncache_manager.__exit__(None, None, None)
    377         support.rmtree(temp_dir)
    378 
    379 
    380 def mock_path_hook(*entries, importer):
    381     """A mock sys.path_hooks entry."""
    382     def hook(entry):
    383         if entry not in entries:
    384             raise ImportError
    385         return importer
    386     return hook
    387 
    388 
    389 class CASEOKTestBase:
    390 
    391     def caseok_env_changed(self, *, should_exist):
    392         possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK'
    393         if any(x in self.importlib._bootstrap_external._os.environ
    394                     for x in possibilities) != should_exist:
    395             self.skipTest('os.environ changes not reflected in _os.environ')
    396 
    397 
    398 def create_package(file, path, is_package=True, contents=()):
    399     class Reader(ResourceReader):
    400         def get_resource_reader(self, package):
    401             return self
    402 
    403         def open_resource(self, path):
    404             self._path = path
    405             if isinstance(file, Exception):
    406                 raise file
    407             else:
    408                 return file
    409 
    410         def resource_path(self, path_):
    411             self._path = path_
    412             if isinstance(path, Exception):
    413                 raise path
    414             else:
    415                 return path
    416 
    417         def is_resource(self, path_):
    418             self._path = path_
    419             if isinstance(path, Exception):
    420                 raise path
    421             for entry in contents:
    422                 parts = entry.split('/')
    423                 if len(parts) == 1 and parts[0] == path_:
    424                     return True
    425             return False
    426 
    427         def contents(self):
    428             if isinstance(path, Exception):
    429                 raise path
    430             # There's no yield from in baseball, er, Python 2.
    431             for entry in contents:
    432                 yield entry
    433 
    434     name = 'testingpackage'
    435     # Unforunately importlib.util.module_from_spec() was not introduced until
    436     # Python 3.5.
    437     module = types.ModuleType(name)
    438     loader = Reader()
    439     spec = machinery.ModuleSpec(
    440         name, loader,
    441         origin='does-not-exist',
    442         is_package=is_package)
    443     module.__spec__ = spec
    444     module.__loader__ = loader
    445     return module
    446 
    447 
    448 class CommonResourceTests(abc.ABC):
    449     @abc.abstractmethod
    450     def execute(self, package, path):
    451         raise NotImplementedError
    452 
    453     def test_package_name(self):
    454         # Passing in the package name should succeed.
    455         self.execute(data01.__name__, 'utf-8.file')
    456 
    457     def test_package_object(self):
    458         # Passing in the package itself should succeed.
    459         self.execute(data01, 'utf-8.file')
    460 
    461     def test_string_path(self):
    462         # Passing in a string for the path should succeed.
    463         path = 'utf-8.file'
    464         self.execute(data01, path)
    465 
    466     @unittest.skipIf(sys.version_info < (3, 6), 'requires os.PathLike support')
    467     def test_pathlib_path(self):
    468         # Passing in a pathlib.PurePath object for the path should succeed.
    469         path = PurePath('utf-8.file')
    470         self.execute(data01, path)
    471 
    472     def test_absolute_path(self):
    473         # An absolute path is a ValueError.
    474         path = Path(__file__)
    475         full_path = path.parent/'utf-8.file'
    476         with self.assertRaises(ValueError):
    477             self.execute(data01, full_path)
    478 
    479     def test_relative_path(self):
    480         # A reative path is a ValueError.
    481         with self.assertRaises(ValueError):
    482             self.execute(data01, '../data01/utf-8.file')
    483 
    484     def test_importing_module_as_side_effect(self):
    485         # The anchor package can already be imported.
    486         del sys.modules[data01.__name__]
    487         self.execute(data01.__name__, 'utf-8.file')
    488 
    489     def test_non_package_by_name(self):
    490         # The anchor package cannot be a module.
    491         with self.assertRaises(TypeError):
    492             self.execute(__name__, 'utf-8.file')
    493 
    494     def test_non_package_by_package(self):
    495         # The anchor package cannot be a module.
    496         with self.assertRaises(TypeError):
    497             module = sys.modules['test.test_importlib.util']
    498             self.execute(module, 'utf-8.file')
    499 
    500     @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
    501     def test_resource_opener(self):
    502         bytes_data = io.BytesIO(b'Hello, world!')
    503         package = create_package(file=bytes_data, path=FileNotFoundError())
    504         self.execute(package, 'utf-8.file')
    505         self.assertEqual(package.__loader__._path, 'utf-8.file')
    506 
    507     @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
    508     def test_resource_path(self):
    509         bytes_data = io.BytesIO(b'Hello, world!')
    510         path = __file__
    511         package = create_package(file=bytes_data, path=path)
    512         self.execute(package, 'utf-8.file')
    513         self.assertEqual(package.__loader__._path, 'utf-8.file')
    514 
    515     def test_useless_loader(self):
    516         package = create_package(file=FileNotFoundError(),
    517                                  path=FileNotFoundError())
    518         with self.assertRaises(FileNotFoundError):
    519             self.execute(package, 'utf-8.file')
    520 
    521 
    522 class ZipSetupBase:
    523     ZIP_MODULE = None
    524 
    525     @classmethod
    526     def setUpClass(cls):
    527         data_path = Path(cls.ZIP_MODULE.__file__)
    528         data_dir = data_path.parent
    529         cls._zip_path = str(data_dir / 'ziptestdata.zip')
    530         sys.path.append(cls._zip_path)
    531         cls.data = importlib.import_module('ziptestdata')
    532 
    533     @classmethod
    534     def tearDownClass(cls):
    535         try:
    536             sys.path.remove(cls._zip_path)
    537         except ValueError:
    538             pass
    539 
    540         try:
    541             del sys.path_importer_cache[cls._zip_path]
    542             del sys.modules[cls.data.__name__]
    543         except KeyError:
    544             pass
    545 
    546         try:
    547             del cls.data
    548             del cls._zip_path
    549         except AttributeError:
    550             pass
    551 
    552     def setUp(self):
    553         modules = support.modules_setup()
    554         self.addCleanup(support.modules_cleanup, *modules)
    555 
    556 
    557 class ZipSetup(ZipSetupBase):
    558     ZIP_MODULE = zipdata01                          # type: ignore
    559