Home | History | Annotate | Download | only in test
      1 # This is a variant of the very old (early 90's) file
      2 # Demo/threads/bug.py.  It simply provokes a number of threads into
      3 # trying to import the same module "at the same time".
      4 # There are no pleasant failure modes -- most likely is that Python
      5 # complains several times about module random having no attribute
      6 # randrange, and then Python hangs.
      7 
      8 import _imp as imp
      9 import os
     10 import importlib
     11 import sys
     12 import time
     13 import shutil
     14 import unittest
     15 from test.support import (
     16     verbose, import_module, run_unittest, TESTFN, reap_threads,
     17     forget, unlink, rmtree, start_threads)
     18 threading = import_module('threading')
     19 
     20 def task(N, done, done_tasks, errors):
     21     try:
     22         # We don't use modulefinder but still import it in order to stress
     23         # importing of different modules from several threads.
     24         if len(done_tasks) % 2:
     25             import modulefinder
     26             import random
     27         else:
     28             import random
     29             import modulefinder
     30         # This will fail if random is not completely initialized
     31         x = random.randrange(1, 3)
     32     except Exception as e:
     33         errors.append(e.with_traceback(None))
     34     finally:
     35         done_tasks.append(threading.get_ident())
     36         finished = len(done_tasks) == N
     37         if finished:
     38             done.set()
     39 
     40 # Create a circular import structure: A -> C -> B -> D -> A
     41 # NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
     42 
     43 circular_imports_modules = {
     44     'A': """if 1:
     45         import time
     46         time.sleep(%(delay)s)
     47         x = 'a'
     48         import C
     49         """,
     50     'B': """if 1:
     51         import time
     52         time.sleep(%(delay)s)
     53         x = 'b'
     54         import D
     55         """,
     56     'C': """import B""",
     57     'D': """import A""",
     58 }
     59 
     60 class Finder:
     61     """A dummy finder to detect concurrent access to its find_spec()
     62     method."""
     63 
     64     def __init__(self):
     65         self.numcalls = 0
     66         self.x = 0
     67         self.lock = threading.Lock()
     68 
     69     def find_spec(self, name, path=None, target=None):
     70         # Simulate some thread-unsafe behaviour. If calls to find_spec()
     71         # are properly serialized, `x` will end up the same as `numcalls`.
     72         # Otherwise not.
     73         assert imp.lock_held()
     74         with self.lock:
     75             self.numcalls += 1
     76         x = self.x
     77         time.sleep(0.01)
     78         self.x = x + 1
     79 
     80 class FlushingFinder:
     81     """A dummy finder which flushes sys.path_importer_cache when it gets
     82     called."""
     83 
     84     def find_spec(self, name, path=None, target=None):
     85         sys.path_importer_cache.clear()
     86 
     87 
     88 class ThreadedImportTests(unittest.TestCase):
     89 
     90     def setUp(self):
     91         self.old_random = sys.modules.pop('random', None)
     92 
     93     def tearDown(self):
     94         # If the `random` module was already initialized, we restore the
     95         # old module at the end so that pickling tests don't fail.
     96         # See http://bugs.python.org/issue3657#msg110461
     97         if self.old_random is not None:
     98             sys.modules['random'] = self.old_random
     99 
    100     def check_parallel_module_init(self):
    101         if imp.lock_held():
    102             # This triggers on, e.g., from test import autotest.
    103             raise unittest.SkipTest("can't run when import lock is held")
    104 
    105         done = threading.Event()
    106         for N in (20, 50) * 3:
    107             if verbose:
    108                 print("Trying", N, "threads ...", end=' ')
    109             # Make sure that random and modulefinder get reimported freshly
    110             for modname in ['random', 'modulefinder']:
    111                 try:
    112                     del sys.modules[modname]
    113                 except KeyError:
    114                     pass
    115             errors = []
    116             done_tasks = []
    117             done.clear()
    118             t0 = time.monotonic()
    119             with start_threads(threading.Thread(target=task,
    120                                                 args=(N, done, done_tasks, errors,))
    121                                for i in range(N)):
    122                 pass
    123             completed = done.wait(10 * 60)
    124             dt = time.monotonic() - t0
    125             if verbose:
    126                 print("%.1f ms" % (dt*1e3), flush=True, end=" ")
    127             dbg_info = 'done: %s/%s' % (len(done_tasks), N)
    128             self.assertFalse(errors, dbg_info)
    129             self.assertTrue(completed, dbg_info)
    130             if verbose:
    131                 print("OK.")
    132 
    133     def test_parallel_module_init(self):
    134         self.check_parallel_module_init()
    135 
    136     def test_parallel_meta_path(self):
    137         finder = Finder()
    138         sys.meta_path.insert(0, finder)
    139         try:
    140             self.check_parallel_module_init()
    141             self.assertGreater(finder.numcalls, 0)
    142             self.assertEqual(finder.x, finder.numcalls)
    143         finally:
    144             sys.meta_path.remove(finder)
    145 
    146     def test_parallel_path_hooks(self):
    147         # Here the Finder instance is only used to check concurrent calls
    148         # to path_hook().
    149         finder = Finder()
    150         # In order for our path hook to be called at each import, we need
    151         # to flush the path_importer_cache, which we do by registering a
    152         # dedicated meta_path entry.
    153         flushing_finder = FlushingFinder()
    154         def path_hook(path):
    155             finder.find_spec('')
    156             raise ImportError
    157         sys.path_hooks.insert(0, path_hook)
    158         sys.meta_path.append(flushing_finder)
    159         try:
    160             # Flush the cache a first time
    161             flushing_finder.find_spec('')
    162             numtests = self.check_parallel_module_init()
    163             self.assertGreater(finder.numcalls, 0)
    164             self.assertEqual(finder.x, finder.numcalls)
    165         finally:
    166             sys.meta_path.remove(flushing_finder)
    167             sys.path_hooks.remove(path_hook)
    168 
    169     def test_import_hangers(self):
    170         # In case this test is run again, make sure the helper module
    171         # gets loaded from scratch again.
    172         try:
    173             del sys.modules['test.threaded_import_hangers']
    174         except KeyError:
    175             pass
    176         import test.threaded_import_hangers
    177         self.assertFalse(test.threaded_import_hangers.errors)
    178 
    179     def test_circular_imports(self):
    180         # The goal of this test is to exercise implementations of the import
    181         # lock which use a per-module lock, rather than a global lock.
    182         # In these implementations, there is a possible deadlock with
    183         # circular imports, for example:
    184         # - thread 1 imports A (grabbing the lock for A) which imports B
    185         # - thread 2 imports B (grabbing the lock for B) which imports A
    186         # Such implementations should be able to detect such situations and
    187         # resolve them one way or the other, without freezing.
    188         # NOTE: our test constructs a slightly less trivial import cycle,
    189         # in order to better stress the deadlock avoidance mechanism.
    190         delay = 0.5
    191         os.mkdir(TESTFN)
    192         self.addCleanup(shutil.rmtree, TESTFN)
    193         sys.path.insert(0, TESTFN)
    194         self.addCleanup(sys.path.remove, TESTFN)
    195         for name, contents in circular_imports_modules.items():
    196             contents = contents % {'delay': delay}
    197             with open(os.path.join(TESTFN, name + ".py"), "wb") as f:
    198                 f.write(contents.encode('utf-8'))
    199             self.addCleanup(forget, name)
    200 
    201         importlib.invalidate_caches()
    202         results = []
    203         def import_ab():
    204             import A
    205             results.append(getattr(A, 'x', None))
    206         def import_ba():
    207             import B
    208             results.append(getattr(B, 'x', None))
    209         t1 = threading.Thread(target=import_ab)
    210         t2 = threading.Thread(target=import_ba)
    211         t1.start()
    212         t2.start()
    213         t1.join()
    214         t2.join()
    215         self.assertEqual(set(results), {'a', 'b'})
    216 
    217     def test_side_effect_import(self):
    218         code = """if 1:
    219             import threading
    220             def target():
    221                 import random
    222             t = threading.Thread(target=target)
    223             t.start()
    224             t.join()"""
    225         sys.path.insert(0, os.curdir)
    226         self.addCleanup(sys.path.remove, os.curdir)
    227         filename = TESTFN + ".py"
    228         with open(filename, "wb") as f:
    229             f.write(code.encode('utf-8'))
    230         self.addCleanup(unlink, filename)
    231         self.addCleanup(forget, TESTFN)
    232         self.addCleanup(rmtree, '__pycache__')
    233         importlib.invalidate_caches()
    234         __import__(TESTFN)
    235 
    236 
    237 @reap_threads
    238 def test_main():
    239     old_switchinterval = None
    240     try:
    241         old_switchinterval = sys.getswitchinterval()
    242         sys.setswitchinterval(1e-5)
    243     except AttributeError:
    244         pass
    245     try:
    246         run_unittest(ThreadedImportTests)
    247     finally:
    248         if old_switchinterval is not None:
    249             sys.setswitchinterval(old_switchinterval)
    250 
    251 if __name__ == "__main__":
    252     test_main()
    253