Home | History | Annotate | Download | only in test
      1 # tests __main__ module handling in multiprocessing
      2 from test import support
      3 # Skip tests if _thread or _multiprocessing wasn't built.
      4 support.import_module('_thread')
      5 support.import_module('_multiprocessing')
      6 
      7 import importlib
      8 import importlib.machinery
      9 import unittest
     10 import sys
     11 import os
     12 import os.path
     13 import py_compile
     14 
     15 from test.support.script_helper import (
     16     make_pkg, make_script, make_zip_pkg, make_zip_script,
     17     assert_python_ok)
     18 
     19 if support.PGO:
     20     raise unittest.SkipTest("test is not helpful for PGO")
     21 
     22 # Look up which start methods are available to test
     23 import multiprocessing
     24 AVAILABLE_START_METHODS = set(multiprocessing.get_all_start_methods())
     25 
     26 # Issue #22332: Skip tests if sem_open implementation is broken.
     27 support.import_module('multiprocessing.synchronize')
     28 
     29 verbose = support.verbose
     30 
     31 test_source = """\
     32 # multiprocessing includes all sorts of shenanigans to make __main__
     33 # attributes accessible in the subprocess in a pickle compatible way.
     34 
     35 # We run the "doesn't work in the interactive interpreter" example from
     36 # the docs to make sure it *does* work from an executed __main__,
     37 # regardless of the invocation mechanism
     38 
     39 import sys
     40 import time
     41 from multiprocessing import Pool, set_start_method
     42 
     43 # We use this __main__ defined function in the map call below in order to
     44 # check that multiprocessing in correctly running the unguarded
     45 # code in child processes and then making it available as __main__
     46 def f(x):
     47     return x*x
     48 
     49 # Check explicit relative imports
     50 if "check_sibling" in __file__:
     51     # We're inside a package and not in a __main__.py file
     52     # so make sure explicit relative imports work correctly
     53     from . import sibling
     54 
     55 if __name__ == '__main__':
     56     start_method = sys.argv[1]
     57     set_start_method(start_method)
     58     p = Pool(5)
     59     results = []
     60     p.map_async(f, [1, 2, 3], callback=results.extend)
     61     deadline = time.time() + 10 # up to 10 s to report the results
     62     while not results:
     63         time.sleep(0.05)
     64         if time.time() > deadline:
     65             raise RuntimeError("Timed out waiting for results")
     66     results.sort()
     67     print(start_method, "->", results)
     68 """
     69 
     70 test_source_main_skipped_in_children = """\
     71 # __main__.py files have an implied "if __name__ == '__main__'" so
     72 # multiprocessing should always skip running them in child processes
     73 
     74 # This means we can't use __main__ defined functions in child processes,
     75 # so we just use "int" as a passthrough operation below
     76 
     77 if __name__ != "__main__":
     78     raise RuntimeError("Should only be called as __main__!")
     79 
     80 import sys
     81 import time
     82 from multiprocessing import Pool, set_start_method
     83 
     84 start_method = sys.argv[1]
     85 set_start_method(start_method)
     86 p = Pool(5)
     87 results = []
     88 p.map_async(int, [1, 4, 9], callback=results.extend)
     89 deadline = time.time() + 10 # up to 10 s to report the results
     90 while not results:
     91     time.sleep(0.05)
     92     if time.time() > deadline:
     93         raise RuntimeError("Timed out waiting for results")
     94 results.sort()
     95 print(start_method, "->", results)
     96 """
     97 
     98 # These helpers were copied from test_cmd_line_script & tweaked a bit...
     99 
    100 def _make_test_script(script_dir, script_basename,
    101                       source=test_source, omit_suffix=False):
    102     to_return = make_script(script_dir, script_basename,
    103                             source, omit_suffix)
    104     # Hack to check explicit relative imports
    105     if script_basename == "check_sibling":
    106         make_script(script_dir, "sibling", "")
    107     importlib.invalidate_caches()
    108     return to_return
    109 
    110 def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
    111                        source=test_source, depth=1):
    112     to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
    113                              source, depth)
    114     importlib.invalidate_caches()
    115     return to_return
    116 
    117 # There's no easy way to pass the script directory in to get
    118 # -m to work (avoiding that is the whole point of making
    119 # directories and zipfiles executable!)
    120 # So we fake it for testing purposes with a custom launch script
    121 launch_source = """\
    122 import sys, os.path, runpy
    123 sys.path.insert(0, %s)
    124 runpy._run_module_as_main(%r)
    125 """
    126 
    127 def _make_launch_script(script_dir, script_basename, module_name, path=None):
    128     if path is None:
    129         path = "os.path.dirname(__file__)"
    130     else:
    131         path = repr(path)
    132     source = launch_source % (path, module_name)
    133     to_return = make_script(script_dir, script_basename, source)
    134     importlib.invalidate_caches()
    135     return to_return
    136 
    137 class MultiProcessingCmdLineMixin():
    138     maxDiff = None # Show full tracebacks on subprocess failure
    139 
    140     def setUp(self):
    141         if self.start_method not in AVAILABLE_START_METHODS:
    142             self.skipTest("%r start method not available" % self.start_method)
    143 
    144     def _check_output(self, script_name, exit_code, out, err):
    145         if verbose > 1:
    146             print("Output from test script %r:" % script_name)
    147             print(repr(out))
    148         self.assertEqual(exit_code, 0)
    149         self.assertEqual(err.decode('utf-8'), '')
    150         expected_results = "%s -> [1, 4, 9]" % self.start_method
    151         self.assertEqual(out.decode('utf-8').strip(), expected_results)
    152 
    153     def _check_script(self, script_name, *cmd_line_switches):
    154         if not __debug__:
    155             cmd_line_switches += ('-' + 'O' * sys.flags.optimize,)
    156         run_args = cmd_line_switches + (script_name, self.start_method)
    157         rc, out, err = assert_python_ok(*run_args, __isolated=False)
    158         self._check_output(script_name, rc, out, err)
    159 
    160     def test_basic_script(self):
    161         with support.temp_dir() as script_dir:
    162             script_name = _make_test_script(script_dir, 'script')
    163             self._check_script(script_name)
    164 
    165     def test_basic_script_no_suffix(self):
    166         with support.temp_dir() as script_dir:
    167             script_name = _make_test_script(script_dir, 'script',
    168                                             omit_suffix=True)
    169             self._check_script(script_name)
    170 
    171     def test_ipython_workaround(self):
    172         # Some versions of the IPython launch script are missing the
    173         # __name__ = "__main__" guard, and multiprocessing has long had
    174         # a workaround for that case
    175         # See https://github.com/ipython/ipython/issues/4698
    176         source = test_source_main_skipped_in_children
    177         with support.temp_dir() as script_dir:
    178             script_name = _make_test_script(script_dir, 'ipython',
    179                                             source=source)
    180             self._check_script(script_name)
    181             script_no_suffix = _make_test_script(script_dir, 'ipython',
    182                                                  source=source,
    183                                                  omit_suffix=True)
    184             self._check_script(script_no_suffix)
    185 
    186     def test_script_compiled(self):
    187         with support.temp_dir() as script_dir:
    188             script_name = _make_test_script(script_dir, 'script')
    189             py_compile.compile(script_name, doraise=True)
    190             os.remove(script_name)
    191             pyc_file = support.make_legacy_pyc(script_name)
    192             self._check_script(pyc_file)
    193 
    194     def test_directory(self):
    195         source = self.main_in_children_source
    196         with support.temp_dir() as script_dir:
    197             script_name = _make_test_script(script_dir, '__main__',
    198                                             source=source)
    199             self._check_script(script_dir)
    200 
    201     def test_directory_compiled(self):
    202         source = self.main_in_children_source
    203         with support.temp_dir() as script_dir:
    204             script_name = _make_test_script(script_dir, '__main__',
    205                                             source=source)
    206             py_compile.compile(script_name, doraise=True)
    207             os.remove(script_name)
    208             pyc_file = support.make_legacy_pyc(script_name)
    209             self._check_script(script_dir)
    210 
    211     def test_zipfile(self):
    212         source = self.main_in_children_source
    213         with support.temp_dir() as script_dir:
    214             script_name = _make_test_script(script_dir, '__main__',
    215                                             source=source)
    216             zip_name, run_name = make_zip_script(script_dir, 'test_zip', script_name)
    217             self._check_script(zip_name)
    218 
    219     def test_zipfile_compiled(self):
    220         source = self.main_in_children_source
    221         with support.temp_dir() as script_dir:
    222             script_name = _make_test_script(script_dir, '__main__',
    223                                             source=source)
    224             compiled_name = py_compile.compile(script_name, doraise=True)
    225             zip_name, run_name = make_zip_script(script_dir, 'test_zip', compiled_name)
    226             self._check_script(zip_name)
    227 
    228     def test_module_in_package(self):
    229         with support.temp_dir() as script_dir:
    230             pkg_dir = os.path.join(script_dir, 'test_pkg')
    231             make_pkg(pkg_dir)
    232             script_name = _make_test_script(pkg_dir, 'check_sibling')
    233             launch_name = _make_launch_script(script_dir, 'launch',
    234                                               'test_pkg.check_sibling')
    235             self._check_script(launch_name)
    236 
    237     def test_module_in_package_in_zipfile(self):
    238         with support.temp_dir() as script_dir:
    239             zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script')
    240             launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.script', zip_name)
    241             self._check_script(launch_name)
    242 
    243     def test_module_in_subpackage_in_zipfile(self):
    244         with support.temp_dir() as script_dir:
    245             zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script', depth=2)
    246             launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.test_pkg.script', zip_name)
    247             self._check_script(launch_name)
    248 
    249     def test_package(self):
    250         source = self.main_in_children_source
    251         with support.temp_dir() as script_dir:
    252             pkg_dir = os.path.join(script_dir, 'test_pkg')
    253             make_pkg(pkg_dir)
    254             script_name = _make_test_script(pkg_dir, '__main__',
    255                                             source=source)
    256             launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
    257             self._check_script(launch_name)
    258 
    259     def test_package_compiled(self):
    260         source = self.main_in_children_source
    261         with support.temp_dir() as script_dir:
    262             pkg_dir = os.path.join(script_dir, 'test_pkg')
    263             make_pkg(pkg_dir)
    264             script_name = _make_test_script(pkg_dir, '__main__',
    265                                             source=source)
    266             compiled_name = py_compile.compile(script_name, doraise=True)
    267             os.remove(script_name)
    268             pyc_file = support.make_legacy_pyc(script_name)
    269             launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
    270             self._check_script(launch_name)
    271 
    272 # Test all supported start methods (setupClass skips as appropriate)
    273 
    274 class SpawnCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
    275     start_method = 'spawn'
    276     main_in_children_source = test_source_main_skipped_in_children
    277 
    278 class ForkCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
    279     start_method = 'fork'
    280     main_in_children_source = test_source
    281 
    282 class ForkServerCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
    283     start_method = 'forkserver'
    284     main_in_children_source = test_source_main_skipped_in_children
    285 
    286 def tearDownModule():
    287     support.reap_children()
    288 
    289 if __name__ == '__main__':
    290     unittest.main()
    291