Home | History | Annotate | Download | only in test
      1 """
      2 Tests for the threading module.
      3 """
      4 
      5 import test.support
      6 from test.support import (verbose, import_module, cpython_only,
      7                           requires_type_collecting)
      8 from test.support.script_helper import assert_python_ok, assert_python_failure
      9 
     10 import random
     11 import sys
     12 _thread = import_module('_thread')
     13 threading = import_module('threading')
     14 import time
     15 import unittest
     16 import weakref
     17 import os
     18 import subprocess
     19 
     20 from test import lock_tests
     21 from test import support
     22 
     23 
     24 # Between fork() and exec(), only async-safe functions are allowed (issues
     25 # #12316 and #11870), and fork() from a worker thread is known to trigger
     26 # problems with some operating systems (issue #3863): skip problematic tests
     27 # on platforms known to behave badly.
     28 platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
     29                      'hp-ux11')
     30 
     31 
     32 # A trivial mutable counter.
     33 class Counter(object):
     34     def __init__(self):
     35         self.value = 0
     36     def inc(self):
     37         self.value += 1
     38     def dec(self):
     39         self.value -= 1
     40     def get(self):
     41         return self.value
     42 
     43 class TestThread(threading.Thread):
     44     def __init__(self, name, testcase, sema, mutex, nrunning):
     45         threading.Thread.__init__(self, name=name)
     46         self.testcase = testcase
     47         self.sema = sema
     48         self.mutex = mutex
     49         self.nrunning = nrunning
     50 
     51     def run(self):
     52         delay = random.random() / 10000.0
     53         if verbose:
     54             print('task %s will run for %.1f usec' %
     55                   (self.name, delay * 1e6))
     56 
     57         with self.sema:
     58             with self.mutex:
     59                 self.nrunning.inc()
     60                 if verbose:
     61                     print(self.nrunning.get(), 'tasks are running')
     62                 self.testcase.assertLessEqual(self.nrunning.get(), 3)
     63 
     64             time.sleep(delay)
     65             if verbose:
     66                 print('task', self.name, 'done')
     67 
     68             with self.mutex:
     69                 self.nrunning.dec()
     70                 self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
     71                 if verbose:
     72                     print('%s is finished. %d tasks are running' %
     73                           (self.name, self.nrunning.get()))
     74 
     75 
     76 class BaseTestCase(unittest.TestCase):
     77     def setUp(self):
     78         self._threads = test.support.threading_setup()
     79 
     80     def tearDown(self):
     81         test.support.threading_cleanup(*self._threads)
     82         test.support.reap_children()
     83 
     84 
     85 class ThreadTests(BaseTestCase):
     86 
     87     # Create a bunch of threads, let each do some work, wait until all are
     88     # done.
     89     def test_various_ops(self):
     90         # This takes about n/3 seconds to run (about n/3 clumps of tasks,
     91         # times about 1 second per clump).
     92         NUMTASKS = 10
     93 
     94         # no more than 3 of the 10 can run at once
     95         sema = threading.BoundedSemaphore(value=3)
     96         mutex = threading.RLock()
     97         numrunning = Counter()
     98 
     99         threads = []
    100 
    101         for i in range(NUMTASKS):
    102             t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
    103             threads.append(t)
    104             self.assertIsNone(t.ident)
    105             self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
    106             t.start()
    107 
    108         if verbose:
    109             print('waiting for all tasks to complete')
    110         for t in threads:
    111             t.join()
    112             self.assertFalse(t.is_alive())
    113             self.assertNotEqual(t.ident, 0)
    114             self.assertIsNotNone(t.ident)
    115             self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
    116         if verbose:
    117             print('all tasks done')
    118         self.assertEqual(numrunning.get(), 0)
    119 
    120     def test_ident_of_no_threading_threads(self):
    121         # The ident still must work for the main thread and dummy threads.
    122         self.assertIsNotNone(threading.currentThread().ident)
    123         def f():
    124             ident.append(threading.currentThread().ident)
    125             done.set()
    126         done = threading.Event()
    127         ident = []
    128         _thread.start_new_thread(f, ())
    129         done.wait()
    130         self.assertIsNotNone(ident[0])
    131         # Kill the "immortal" _DummyThread
    132         del threading._active[ident[0]]
    133 
    134     # run with a small(ish) thread stack size (256kB)
    135     def test_various_ops_small_stack(self):
    136         if verbose:
    137             print('with 256kB thread stack size...')
    138         try:
    139             threading.stack_size(262144)
    140         except _thread.error:
    141             raise unittest.SkipTest(
    142                 'platform does not support changing thread stack size')
    143         self.test_various_ops()
    144         threading.stack_size(0)
    145 
    146     # run with a large thread stack size (1MB)
    147     def test_various_ops_large_stack(self):
    148         if verbose:
    149             print('with 1MB thread stack size...')
    150         try:
    151             threading.stack_size(0x100000)
    152         except _thread.error:
    153             raise unittest.SkipTest(
    154                 'platform does not support changing thread stack size')
    155         self.test_various_ops()
    156         threading.stack_size(0)
    157 
    158     def test_foreign_thread(self):
    159         # Check that a "foreign" thread can use the threading module.
    160         def f(mutex):
    161             # Calling current_thread() forces an entry for the foreign
    162             # thread to get made in the threading._active map.
    163             threading.current_thread()
    164             mutex.release()
    165 
    166         mutex = threading.Lock()
    167         mutex.acquire()
    168         tid = _thread.start_new_thread(f, (mutex,))
    169         # Wait for the thread to finish.
    170         mutex.acquire()
    171         self.assertIn(tid, threading._active)
    172         self.assertIsInstance(threading._active[tid], threading._DummyThread)
    173         #Issue 29376
    174         self.assertTrue(threading._active[tid].is_alive())
    175         self.assertRegex(repr(threading._active[tid]), '_DummyThread')
    176         del threading._active[tid]
    177 
    178     # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    179     # exposed at the Python level.  This test relies on ctypes to get at it.
    180     def test_PyThreadState_SetAsyncExc(self):
    181         ctypes = import_module("ctypes")
    182 
    183         set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
    184 
    185         class AsyncExc(Exception):
    186             pass
    187 
    188         exception = ctypes.py_object(AsyncExc)
    189 
    190         # First check it works when setting the exception from the same thread.
    191         tid = threading.get_ident()
    192 
    193         try:
    194             result = set_async_exc(ctypes.c_long(tid), exception)
    195             # The exception is async, so we might have to keep the VM busy until
    196             # it notices.
    197             while True:
    198                 pass
    199         except AsyncExc:
    200             pass
    201         else:
    202             # This code is unreachable but it reflects the intent. If we wanted
    203             # to be smarter the above loop wouldn't be infinite.
    204             self.fail("AsyncExc not raised")
    205         try:
    206             self.assertEqual(result, 1) # one thread state modified
    207         except UnboundLocalError:
    208             # The exception was raised too quickly for us to get the result.
    209             pass
    210 
    211         # `worker_started` is set by the thread when it's inside a try/except
    212         # block waiting to catch the asynchronously set AsyncExc exception.
    213         # `worker_saw_exception` is set by the thread upon catching that
    214         # exception.
    215         worker_started = threading.Event()
    216         worker_saw_exception = threading.Event()
    217 
    218         class Worker(threading.Thread):
    219             def run(self):
    220                 self.id = threading.get_ident()
    221                 self.finished = False
    222 
    223                 try:
    224                     while True:
    225                         worker_started.set()
    226                         time.sleep(0.1)
    227                 except AsyncExc:
    228                     self.finished = True
    229                     worker_saw_exception.set()
    230 
    231         t = Worker()
    232         t.daemon = True # so if this fails, we don't hang Python at shutdown
    233         t.start()
    234         if verbose:
    235             print("    started worker thread")
    236 
    237         # Try a thread id that doesn't make sense.
    238         if verbose:
    239             print("    trying nonsensical thread id")
    240         result = set_async_exc(ctypes.c_long(-1), exception)
    241         self.assertEqual(result, 0)  # no thread states modified
    242 
    243         # Now raise an exception in the worker thread.
    244         if verbose:
    245             print("    waiting for worker thread to get started")
    246         ret = worker_started.wait()
    247         self.assertTrue(ret)
    248         if verbose:
    249             print("    verifying worker hasn't exited")
    250         self.assertFalse(t.finished)
    251         if verbose:
    252             print("    attempting to raise asynch exception in worker")
    253         result = set_async_exc(ctypes.c_long(t.id), exception)
    254         self.assertEqual(result, 1) # one thread state modified
    255         if verbose:
    256             print("    waiting for worker to say it caught the exception")
    257         worker_saw_exception.wait(timeout=10)
    258         self.assertTrue(t.finished)
    259         if verbose:
    260             print("    all OK -- joining worker")
    261         if t.finished:
    262             t.join()
    263         # else the thread is still running, and we have no way to kill it
    264 
    265     def test_limbo_cleanup(self):
    266         # Issue 7481: Failure to start thread should cleanup the limbo map.
    267         def fail_new_thread(*args):
    268             raise threading.ThreadError()
    269         _start_new_thread = threading._start_new_thread
    270         threading._start_new_thread = fail_new_thread
    271         try:
    272             t = threading.Thread(target=lambda: None)
    273             self.assertRaises(threading.ThreadError, t.start)
    274             self.assertFalse(
    275                 t in threading._limbo,
    276                 "Failed to cleanup _limbo map on failure of Thread.start().")
    277         finally:
    278             threading._start_new_thread = _start_new_thread
    279 
    280     def test_finalize_runnning_thread(self):
    281         # Issue 1402: the PyGILState_Ensure / _Release functions may be called
    282         # very late on python exit: on deallocation of a running thread for
    283         # example.
    284         import_module("ctypes")
    285 
    286         rc, out, err = assert_python_failure("-c", """if 1:
    287             import ctypes, sys, time, _thread
    288 
    289             # This lock is used as a simple event variable.
    290             ready = _thread.allocate_lock()
    291             ready.acquire()
    292 
    293             # Module globals are cleared before __del__ is run
    294             # So we save the functions in class dict
    295             class C:
    296                 ensure = ctypes.pythonapi.PyGILState_Ensure
    297                 release = ctypes.pythonapi.PyGILState_Release
    298                 def __del__(self):
    299                     state = self.ensure()
    300                     self.release(state)
    301 
    302             def waitingThread():
    303                 x = C()
    304                 ready.release()
    305                 time.sleep(100)
    306 
    307             _thread.start_new_thread(waitingThread, ())
    308             ready.acquire()  # Be sure the other thread is waiting.
    309             sys.exit(42)
    310             """)
    311         self.assertEqual(rc, 42)
    312 
    313     def test_finalize_with_trace(self):
    314         # Issue1733757
    315         # Avoid a deadlock when sys.settrace steps into threading._shutdown
    316         assert_python_ok("-c", """if 1:
    317             import sys, threading
    318 
    319             # A deadlock-killer, to prevent the
    320             # testsuite to hang forever
    321             def killer():
    322                 import os, time
    323                 time.sleep(2)
    324                 print('program blocked; aborting')
    325                 os._exit(2)
    326             t = threading.Thread(target=killer)
    327             t.daemon = True
    328             t.start()
    329 
    330             # This is the trace function
    331             def func(frame, event, arg):
    332                 threading.current_thread()
    333                 return func
    334 
    335             sys.settrace(func)
    336             """)
    337 
    338     def test_join_nondaemon_on_shutdown(self):
    339         # Issue 1722344
    340         # Raising SystemExit skipped threading._shutdown
    341         rc, out, err = assert_python_ok("-c", """if 1:
    342                 import threading
    343                 from time import sleep
    344 
    345                 def child():
    346                     sleep(1)
    347                     # As a non-daemon thread we SHOULD wake up and nothing
    348                     # should be torn down yet
    349                     print("Woke up, sleep function is:", sleep)
    350 
    351                 threading.Thread(target=child).start()
    352                 raise SystemExit
    353             """)
    354         self.assertEqual(out.strip(),
    355             b"Woke up, sleep function is: <built-in function sleep>")
    356         self.assertEqual(err, b"")
    357 
    358     def test_enumerate_after_join(self):
    359         # Try hard to trigger #1703448: a thread is still returned in
    360         # threading.enumerate() after it has been join()ed.
    361         enum = threading.enumerate
    362         old_interval = sys.getswitchinterval()
    363         try:
    364             for i in range(1, 100):
    365                 sys.setswitchinterval(i * 0.0002)
    366                 t = threading.Thread(target=lambda: None)
    367                 t.start()
    368                 t.join()
    369                 l = enum()
    370                 self.assertNotIn(t, l,
    371                     "#1703448 triggered after %d trials: %s" % (i, l))
    372         finally:
    373             sys.setswitchinterval(old_interval)
    374 
    375     def test_no_refcycle_through_target(self):
    376         class RunSelfFunction(object):
    377             def __init__(self, should_raise):
    378                 # The links in this refcycle from Thread back to self
    379                 # should be cleaned up when the thread completes.
    380                 self.should_raise = should_raise
    381                 self.thread = threading.Thread(target=self._run,
    382                                                args=(self,),
    383                                                kwargs={'yet_another':self})
    384                 self.thread.start()
    385 
    386             def _run(self, other_ref, yet_another):
    387                 if self.should_raise:
    388                     raise SystemExit
    389 
    390         cyclic_object = RunSelfFunction(should_raise=False)
    391         weak_cyclic_object = weakref.ref(cyclic_object)
    392         cyclic_object.thread.join()
    393         del cyclic_object
    394         self.assertIsNone(weak_cyclic_object(),
    395                          msg=('%d references still around' %
    396                               sys.getrefcount(weak_cyclic_object())))
    397 
    398         raising_cyclic_object = RunSelfFunction(should_raise=True)
    399         weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
    400         raising_cyclic_object.thread.join()
    401         del raising_cyclic_object
    402         self.assertIsNone(weak_raising_cyclic_object(),
    403                          msg=('%d references still around' %
    404                               sys.getrefcount(weak_raising_cyclic_object())))
    405 
    406     def test_old_threading_api(self):
    407         # Just a quick sanity check to make sure the old method names are
    408         # still present
    409         t = threading.Thread()
    410         t.isDaemon()
    411         t.setDaemon(True)
    412         t.getName()
    413         t.setName("name")
    414         t.isAlive()
    415         e = threading.Event()
    416         e.isSet()
    417         threading.activeCount()
    418 
    419     def test_repr_daemon(self):
    420         t = threading.Thread()
    421         self.assertNotIn('daemon', repr(t))
    422         t.daemon = True
    423         self.assertIn('daemon', repr(t))
    424 
    425     def test_deamon_param(self):
    426         t = threading.Thread()
    427         self.assertFalse(t.daemon)
    428         t = threading.Thread(daemon=False)
    429         self.assertFalse(t.daemon)
    430         t = threading.Thread(daemon=True)
    431         self.assertTrue(t.daemon)
    432 
    433     @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
    434     def test_dummy_thread_after_fork(self):
    435         # Issue #14308: a dummy thread in the active list doesn't mess up
    436         # the after-fork mechanism.
    437         code = """if 1:
    438             import _thread, threading, os, time
    439 
    440             def background_thread(evt):
    441                 # Creates and registers the _DummyThread instance
    442                 threading.current_thread()
    443                 evt.set()
    444                 time.sleep(10)
    445 
    446             evt = threading.Event()
    447             _thread.start_new_thread(background_thread, (evt,))
    448             evt.wait()
    449             assert threading.active_count() == 2, threading.active_count()
    450             if os.fork() == 0:
    451                 assert threading.active_count() == 1, threading.active_count()
    452                 os._exit(0)
    453             else:
    454                 os.wait()
    455         """
    456         _, out, err = assert_python_ok("-c", code)
    457         self.assertEqual(out, b'')
    458         self.assertEqual(err, b'')
    459 
    460     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    461     def test_is_alive_after_fork(self):
    462         # Try hard to trigger #18418: is_alive() could sometimes be True on
    463         # threads that vanished after a fork.
    464         old_interval = sys.getswitchinterval()
    465         self.addCleanup(sys.setswitchinterval, old_interval)
    466 
    467         # Make the bug more likely to manifest.
    468         test.support.setswitchinterval(1e-6)
    469 
    470         for i in range(20):
    471             t = threading.Thread(target=lambda: None)
    472             t.start()
    473             self.addCleanup(t.join)
    474             pid = os.fork()
    475             if pid == 0:
    476                 os._exit(1 if t.is_alive() else 0)
    477             else:
    478                 pid, status = os.waitpid(pid, 0)
    479                 self.assertEqual(0, status)
    480 
    481     def test_main_thread(self):
    482         main = threading.main_thread()
    483         self.assertEqual(main.name, 'MainThread')
    484         self.assertEqual(main.ident, threading.current_thread().ident)
    485         self.assertEqual(main.ident, threading.get_ident())
    486 
    487         def f():
    488             self.assertNotEqual(threading.main_thread().ident,
    489                                 threading.current_thread().ident)
    490         th = threading.Thread(target=f)
    491         th.start()
    492         th.join()
    493 
    494     @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
    495     @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
    496     def test_main_thread_after_fork(self):
    497         code = """if 1:
    498             import os, threading
    499 
    500             pid = os.fork()
    501             if pid == 0:
    502                 main = threading.main_thread()
    503                 print(main.name)
    504                 print(main.ident == threading.current_thread().ident)
    505                 print(main.ident == threading.get_ident())
    506             else:
    507                 os.waitpid(pid, 0)
    508         """
    509         _, out, err = assert_python_ok("-c", code)
    510         data = out.decode().replace('\r', '')
    511         self.assertEqual(err, b"")
    512         self.assertEqual(data, "MainThread\nTrue\nTrue\n")
    513 
    514     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    515     @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
    516     @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
    517     def test_main_thread_after_fork_from_nonmain_thread(self):
    518         code = """if 1:
    519             import os, threading, sys
    520 
    521             def f():
    522                 pid = os.fork()
    523                 if pid == 0:
    524                     main = threading.main_thread()
    525                     print(main.name)
    526                     print(main.ident == threading.current_thread().ident)
    527                     print(main.ident == threading.get_ident())
    528                     # stdout is fully buffered because not a tty,
    529                     # we have to flush before exit.
    530                     sys.stdout.flush()
    531                 else:
    532                     os.waitpid(pid, 0)
    533 
    534             th = threading.Thread(target=f)
    535             th.start()
    536             th.join()
    537         """
    538         _, out, err = assert_python_ok("-c", code)
    539         data = out.decode().replace('\r', '')
    540         self.assertEqual(err, b"")
    541         self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
    542 
    543     def test_tstate_lock(self):
    544         # Test an implementation detail of Thread objects.
    545         started = _thread.allocate_lock()
    546         finish = _thread.allocate_lock()
    547         started.acquire()
    548         finish.acquire()
    549         def f():
    550             started.release()
    551             finish.acquire()
    552             time.sleep(0.01)
    553         # The tstate lock is None until the thread is started
    554         t = threading.Thread(target=f)
    555         self.assertIs(t._tstate_lock, None)
    556         t.start()
    557         started.acquire()
    558         self.assertTrue(t.is_alive())
    559         # The tstate lock can't be acquired when the thread is running
    560         # (or suspended).
    561         tstate_lock = t._tstate_lock
    562         self.assertFalse(tstate_lock.acquire(timeout=0), False)
    563         finish.release()
    564         # When the thread ends, the state_lock can be successfully
    565         # acquired.
    566         self.assertTrue(tstate_lock.acquire(timeout=5), False)
    567         # But is_alive() is still True:  we hold _tstate_lock now, which
    568         # prevents is_alive() from knowing the thread's end-of-life C code
    569         # is done.
    570         self.assertTrue(t.is_alive())
    571         # Let is_alive() find out the C code is done.
    572         tstate_lock.release()
    573         self.assertFalse(t.is_alive())
    574         # And verify the thread disposed of _tstate_lock.
    575         self.assertIsNone(t._tstate_lock)
    576 
    577     def test_repr_stopped(self):
    578         # Verify that "stopped" shows up in repr(Thread) appropriately.
    579         started = _thread.allocate_lock()
    580         finish = _thread.allocate_lock()
    581         started.acquire()
    582         finish.acquire()
    583         def f():
    584             started.release()
    585             finish.acquire()
    586         t = threading.Thread(target=f)
    587         t.start()
    588         started.acquire()
    589         self.assertIn("started", repr(t))
    590         finish.release()
    591         # "stopped" should appear in the repr in a reasonable amount of time.
    592         # Implementation detail:  as of this writing, that's trivially true
    593         # if .join() is called, and almost trivially true if .is_alive() is
    594         # called.  The detail we're testing here is that "stopped" shows up
    595         # "all on its own".
    596         LOOKING_FOR = "stopped"
    597         for i in range(500):
    598             if LOOKING_FOR in repr(t):
    599                 break
    600             time.sleep(0.01)
    601         self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
    602 
    603     def test_BoundedSemaphore_limit(self):
    604         # BoundedSemaphore should raise ValueError if released too often.
    605         for limit in range(1, 10):
    606             bs = threading.BoundedSemaphore(limit)
    607             threads = [threading.Thread(target=bs.acquire)
    608                        for _ in range(limit)]
    609             for t in threads:
    610                 t.start()
    611             for t in threads:
    612                 t.join()
    613             threads = [threading.Thread(target=bs.release)
    614                        for _ in range(limit)]
    615             for t in threads:
    616                 t.start()
    617             for t in threads:
    618                 t.join()
    619             self.assertRaises(ValueError, bs.release)
    620 
    621     @cpython_only
    622     def test_frame_tstate_tracing(self):
    623         # Issue #14432: Crash when a generator is created in a C thread that is
    624         # destroyed while the generator is still used. The issue was that a
    625         # generator contains a frame, and the frame kept a reference to the
    626         # Python state of the destroyed C thread. The crash occurs when a trace
    627         # function is setup.
    628 
    629         def noop_trace(frame, event, arg):
    630             # no operation
    631             return noop_trace
    632 
    633         def generator():
    634             while 1:
    635                 yield "generator"
    636 
    637         def callback():
    638             if callback.gen is None:
    639                 callback.gen = generator()
    640             return next(callback.gen)
    641         callback.gen = None
    642 
    643         old_trace = sys.gettrace()
    644         sys.settrace(noop_trace)
    645         try:
    646             # Install a trace function
    647             threading.settrace(noop_trace)
    648 
    649             # Create a generator in a C thread which exits after the call
    650             import _testcapi
    651             _testcapi.call_in_temporary_c_thread(callback)
    652 
    653             # Call the generator in a different Python thread, check that the
    654             # generator didn't keep a reference to the destroyed thread state
    655             for test in range(3):
    656                 # The trace function is still called here
    657                 callback()
    658         finally:
    659             sys.settrace(old_trace)
    660 
    661 
    662 class ThreadJoinOnShutdown(BaseTestCase):
    663 
    664     def _run_and_join(self, script):
    665         script = """if 1:
    666             import sys, os, time, threading
    667 
    668             # a thread, which waits for the main program to terminate
    669             def joiningfunc(mainthread):
    670                 mainthread.join()
    671                 print('end of thread')
    672                 # stdout is fully buffered because not a tty, we have to flush
    673                 # before exit.
    674                 sys.stdout.flush()
    675         \n""" + script
    676 
    677         rc, out, err = assert_python_ok("-c", script)
    678         data = out.decode().replace('\r', '')
    679         self.assertEqual(data, "end of main\nend of thread\n")
    680 
    681     def test_1_join_on_shutdown(self):
    682         # The usual case: on exit, wait for a non-daemon thread
    683         script = """if 1:
    684             import os
    685             t = threading.Thread(target=joiningfunc,
    686                                  args=(threading.current_thread(),))
    687             t.start()
    688             time.sleep(0.1)
    689             print('end of main')
    690             """
    691         self._run_and_join(script)
    692 
    693     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    694     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    695     def test_2_join_in_forked_process(self):
    696         # Like the test above, but from a forked interpreter
    697         script = """if 1:
    698             childpid = os.fork()
    699             if childpid != 0:
    700                 os.waitpid(childpid, 0)
    701                 sys.exit(0)
    702 
    703             t = threading.Thread(target=joiningfunc,
    704                                  args=(threading.current_thread(),))
    705             t.start()
    706             print('end of main')
    707             """
    708         self._run_and_join(script)
    709 
    710     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    711     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    712     def test_3_join_in_forked_from_thread(self):
    713         # Like the test above, but fork() was called from a worker thread
    714         # In the forked process, the main Thread object must be marked as stopped.
    715 
    716         script = """if 1:
    717             main_thread = threading.current_thread()
    718             def worker():
    719                 childpid = os.fork()
    720                 if childpid != 0:
    721                     os.waitpid(childpid, 0)
    722                     sys.exit(0)
    723 
    724                 t = threading.Thread(target=joiningfunc,
    725                                      args=(main_thread,))
    726                 print('end of main')
    727                 t.start()
    728                 t.join() # Should not block: main_thread is already stopped
    729 
    730             w = threading.Thread(target=worker)
    731             w.start()
    732             """
    733         self._run_and_join(script)
    734 
    735     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    736     def test_4_daemon_threads(self):
    737         # Check that a daemon thread cannot crash the interpreter on shutdown
    738         # by manipulating internal structures that are being disposed of in
    739         # the main thread.
    740         script = """if True:
    741             import os
    742             import random
    743             import sys
    744             import time
    745             import threading
    746 
    747             thread_has_run = set()
    748 
    749             def random_io():
    750                 '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
    751                 while True:
    752                     in_f = open(os.__file__, 'rb')
    753                     stuff = in_f.read(200)
    754                     null_f = open(os.devnull, 'wb')
    755                     null_f.write(stuff)
    756                     time.sleep(random.random() / 1995)
    757                     null_f.close()
    758                     in_f.close()
    759                     thread_has_run.add(threading.current_thread())
    760 
    761             def main():
    762                 count = 0
    763                 for _ in range(40):
    764                     new_thread = threading.Thread(target=random_io)
    765                     new_thread.daemon = True
    766                     new_thread.start()
    767                     count += 1
    768                 while len(thread_has_run) < count:
    769                     time.sleep(0.001)
    770                 # Trigger process shutdown
    771                 sys.exit(0)
    772 
    773             main()
    774             """
    775         rc, out, err = assert_python_ok('-c', script)
    776         self.assertFalse(err)
    777 
    778     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    779     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    780     def test_reinit_tls_after_fork(self):
    781         # Issue #13817: fork() would deadlock in a multithreaded program with
    782         # the ad-hoc TLS implementation.
    783 
    784         def do_fork_and_wait():
    785             # just fork a child process and wait it
    786             pid = os.fork()
    787             if pid > 0:
    788                 os.waitpid(pid, 0)
    789             else:
    790                 os._exit(0)
    791 
    792         # start a bunch of threads that will fork() child processes
    793         threads = []
    794         for i in range(16):
    795             t = threading.Thread(target=do_fork_and_wait)
    796             threads.append(t)
    797             t.start()
    798 
    799         for t in threads:
    800             t.join()
    801 
    802     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    803     def test_clear_threads_states_after_fork(self):
    804         # Issue #17094: check that threads states are cleared after fork()
    805 
    806         # start a bunch of threads
    807         threads = []
    808         for i in range(16):
    809             t = threading.Thread(target=lambda : time.sleep(0.3))
    810             threads.append(t)
    811             t.start()
    812 
    813         pid = os.fork()
    814         if pid == 0:
    815             # check that threads states have been cleared
    816             if len(sys._current_frames()) == 1:
    817                 os._exit(0)
    818             else:
    819                 os._exit(1)
    820         else:
    821             _, status = os.waitpid(pid, 0)
    822             self.assertEqual(0, status)
    823 
    824         for t in threads:
    825             t.join()
    826 
    827 
    828 class SubinterpThreadingTests(BaseTestCase):
    829 
    830     def test_threads_join(self):
    831         # Non-daemon threads should be joined at subinterpreter shutdown
    832         # (issue #18808)
    833         r, w = os.pipe()
    834         self.addCleanup(os.close, r)
    835         self.addCleanup(os.close, w)
    836         code = r"""if 1:
    837             import os
    838             import threading
    839             import time
    840 
    841             def f():
    842                 # Sleep a bit so that the thread is still running when
    843                 # Py_EndInterpreter is called.
    844                 time.sleep(0.05)
    845                 os.write(%d, b"x")
    846             threading.Thread(target=f).start()
    847             """ % (w,)
    848         ret = test.support.run_in_subinterp(code)
    849         self.assertEqual(ret, 0)
    850         # The thread was joined properly.
    851         self.assertEqual(os.read(r, 1), b"x")
    852 
    853     def test_threads_join_2(self):
    854         # Same as above, but a delay gets introduced after the thread's
    855         # Python code returned but before the thread state is deleted.
    856         # To achieve this, we register a thread-local object which sleeps
    857         # a bit when deallocated.
    858         r, w = os.pipe()
    859         self.addCleanup(os.close, r)
    860         self.addCleanup(os.close, w)
    861         code = r"""if 1:
    862             import os
    863             import threading
    864             import time
    865 
    866             class Sleeper:
    867                 def __del__(self):
    868                     time.sleep(0.05)
    869 
    870             tls = threading.local()
    871 
    872             def f():
    873                 # Sleep a bit so that the thread is still running when
    874                 # Py_EndInterpreter is called.
    875                 time.sleep(0.05)
    876                 tls.x = Sleeper()
    877                 os.write(%d, b"x")
    878             threading.Thread(target=f).start()
    879             """ % (w,)
    880         ret = test.support.run_in_subinterp(code)
    881         self.assertEqual(ret, 0)
    882         # The thread was joined properly.
    883         self.assertEqual(os.read(r, 1), b"x")
    884 
    885     @cpython_only
    886     def test_daemon_threads_fatal_error(self):
    887         subinterp_code = r"""if 1:
    888             import os
    889             import threading
    890             import time
    891 
    892             def f():
    893                 # Make sure the daemon thread is still running when
    894                 # Py_EndInterpreter is called.
    895                 time.sleep(10)
    896             threading.Thread(target=f, daemon=True).start()
    897             """
    898         script = r"""if 1:
    899             import _testcapi
    900 
    901             _testcapi.run_in_subinterp(%r)
    902             """ % (subinterp_code,)
    903         with test.support.SuppressCrashReport():
    904             rc, out, err = assert_python_failure("-c", script)
    905         self.assertIn("Fatal Python error: Py_EndInterpreter: "
    906                       "not the last thread", err.decode())
    907 
    908 
    909 class ThreadingExceptionTests(BaseTestCase):
    910     # A RuntimeError should be raised if Thread.start() is called
    911     # multiple times.
    912     def test_start_thread_again(self):
    913         thread = threading.Thread()
    914         thread.start()
    915         self.assertRaises(RuntimeError, thread.start)
    916 
    917     def test_joining_current_thread(self):
    918         current_thread = threading.current_thread()
    919         self.assertRaises(RuntimeError, current_thread.join);
    920 
    921     def test_joining_inactive_thread(self):
    922         thread = threading.Thread()
    923         self.assertRaises(RuntimeError, thread.join)
    924 
    925     def test_daemonize_active_thread(self):
    926         thread = threading.Thread()
    927         thread.start()
    928         self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
    929 
    930     def test_releasing_unacquired_lock(self):
    931         lock = threading.Lock()
    932         self.assertRaises(RuntimeError, lock.release)
    933 
    934     @unittest.skipUnless(sys.platform == 'darwin' and test.support.python_is_optimized(),
    935                          'test macosx problem')
    936     def test_recursion_limit(self):
    937         # Issue 9670
    938         # test that excessive recursion within a non-main thread causes
    939         # an exception rather than crashing the interpreter on platforms
    940         # like Mac OS X or FreeBSD which have small default stack sizes
    941         # for threads
    942         script = """if True:
    943             import threading
    944 
    945             def recurse():
    946                 return recurse()
    947 
    948             def outer():
    949                 try:
    950                     recurse()
    951                 except RecursionError:
    952                     pass
    953 
    954             w = threading.Thread(target=outer)
    955             w.start()
    956             w.join()
    957             print('end of main thread')
    958             """
    959         expected_output = "end of main thread\n"
    960         p = subprocess.Popen([sys.executable, "-c", script],
    961                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    962         stdout, stderr = p.communicate()
    963         data = stdout.decode().replace('\r', '')
    964         self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
    965         self.assertEqual(data, expected_output)
    966 
    967     def test_print_exception(self):
    968         script = r"""if True:
    969             import threading
    970             import time
    971 
    972             running = False
    973             def run():
    974                 global running
    975                 running = True
    976                 while running:
    977                     time.sleep(0.01)
    978                 1/0
    979             t = threading.Thread(target=run)
    980             t.start()
    981             while not running:
    982                 time.sleep(0.01)
    983             running = False
    984             t.join()
    985             """
    986         rc, out, err = assert_python_ok("-c", script)
    987         self.assertEqual(out, b'')
    988         err = err.decode()
    989         self.assertIn("Exception in thread", err)
    990         self.assertIn("Traceback (most recent call last):", err)
    991         self.assertIn("ZeroDivisionError", err)
    992         self.assertNotIn("Unhandled exception", err)
    993 
    994     @requires_type_collecting
    995     def test_print_exception_stderr_is_none_1(self):
    996         script = r"""if True:
    997             import sys
    998             import threading
    999             import time
   1000 
   1001             running = False
   1002             def run():
   1003                 global running
   1004                 running = True
   1005                 while running:
   1006                     time.sleep(0.01)
   1007                 1/0
   1008             t = threading.Thread(target=run)
   1009             t.start()
   1010             while not running:
   1011                 time.sleep(0.01)
   1012             sys.stderr = None
   1013             running = False
   1014             t.join()
   1015             """
   1016         rc, out, err = assert_python_ok("-c", script)
   1017         self.assertEqual(out, b'')
   1018         err = err.decode()
   1019         self.assertIn("Exception in thread", err)
   1020         self.assertIn("Traceback (most recent call last):", err)
   1021         self.assertIn("ZeroDivisionError", err)
   1022         self.assertNotIn("Unhandled exception", err)
   1023 
   1024     def test_print_exception_stderr_is_none_2(self):
   1025         script = r"""if True:
   1026             import sys
   1027             import threading
   1028             import time
   1029 
   1030             running = False
   1031             def run():
   1032                 global running
   1033                 running = True
   1034                 while running:
   1035                     time.sleep(0.01)
   1036                 1/0
   1037             sys.stderr = None
   1038             t = threading.Thread(target=run)
   1039             t.start()
   1040             while not running:
   1041                 time.sleep(0.01)
   1042             running = False
   1043             t.join()
   1044             """
   1045         rc, out, err = assert_python_ok("-c", script)
   1046         self.assertEqual(out, b'')
   1047         self.assertNotIn("Unhandled exception", err.decode())
   1048 
   1049     def test_bare_raise_in_brand_new_thread(self):
   1050         def bare_raise():
   1051             raise
   1052 
   1053         class Issue27558(threading.Thread):
   1054             exc = None
   1055 
   1056             def run(self):
   1057                 try:
   1058                     bare_raise()
   1059                 except Exception as exc:
   1060                     self.exc = exc
   1061 
   1062         thread = Issue27558()
   1063         thread.start()
   1064         thread.join()
   1065         self.assertIsNotNone(thread.exc)
   1066         self.assertIsInstance(thread.exc, RuntimeError)
   1067 
   1068 class TimerTests(BaseTestCase):
   1069 
   1070     def setUp(self):
   1071         BaseTestCase.setUp(self)
   1072         self.callback_args = []
   1073         self.callback_event = threading.Event()
   1074 
   1075     def test_init_immutable_default_args(self):
   1076         # Issue 17435: constructor defaults were mutable objects, they could be
   1077         # mutated via the object attributes and affect other Timer objects.
   1078         timer1 = threading.Timer(0.01, self._callback_spy)
   1079         timer1.start()
   1080         self.callback_event.wait()
   1081         timer1.args.append("blah")
   1082         timer1.kwargs["foo"] = "bar"
   1083         self.callback_event.clear()
   1084         timer2 = threading.Timer(0.01, self._callback_spy)
   1085         timer2.start()
   1086         self.callback_event.wait()
   1087         self.assertEqual(len(self.callback_args), 2)
   1088         self.assertEqual(self.callback_args, [((), {}), ((), {})])
   1089 
   1090     def _callback_spy(self, *args, **kwargs):
   1091         self.callback_args.append((args[:], kwargs.copy()))
   1092         self.callback_event.set()
   1093 
   1094 class LockTests(lock_tests.LockTests):
   1095     locktype = staticmethod(threading.Lock)
   1096 
   1097 class PyRLockTests(lock_tests.RLockTests):
   1098     locktype = staticmethod(threading._PyRLock)
   1099 
   1100 @unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
   1101 class CRLockTests(lock_tests.RLockTests):
   1102     locktype = staticmethod(threading._CRLock)
   1103 
   1104 class EventTests(lock_tests.EventTests):
   1105     eventtype = staticmethod(threading.Event)
   1106 
   1107 class ConditionAsRLockTests(lock_tests.RLockTests):
   1108     # Condition uses an RLock by default and exports its API.
   1109     locktype = staticmethod(threading.Condition)
   1110 
   1111 class ConditionTests(lock_tests.ConditionTests):
   1112     condtype = staticmethod(threading.Condition)
   1113 
   1114 class SemaphoreTests(lock_tests.SemaphoreTests):
   1115     semtype = staticmethod(threading.Semaphore)
   1116 
   1117 class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
   1118     semtype = staticmethod(threading.BoundedSemaphore)
   1119 
   1120 class BarrierTests(lock_tests.BarrierTests):
   1121     barriertype = staticmethod(threading.Barrier)
   1122 
   1123 class MiscTestCase(unittest.TestCase):
   1124     def test__all__(self):
   1125         extra = {"ThreadError"}
   1126         blacklist = {'currentThread', 'activeCount'}
   1127         support.check__all__(self, threading, ('threading', '_thread'),
   1128                              extra=extra, blacklist=blacklist)
   1129 
   1130 if __name__ == "__main__":
   1131     unittest.main()
   1132