Home | History | Annotate | Download | only in test
      1 # Very rudimentary test of threading module
      2 
      3 import test.test_support
      4 from test.test_support import verbose
      5 from test.script_helper import assert_python_ok
      6 
      7 import random
      8 import re
      9 import sys
     10 thread = test.test_support.import_module('thread')
     11 threading = test.test_support.import_module('threading')
     12 import time
     13 import unittest
     14 import weakref
     15 import os
     16 import subprocess
     17 
     18 from test import lock_tests
     19 
     20 # A trivial mutable counter.
     21 class Counter(object):
     22     def __init__(self):
     23         self.value = 0
     24     def inc(self):
     25         self.value += 1
     26     def dec(self):
     27         self.value -= 1
     28     def get(self):
     29         return self.value
     30 
     31 class TestThread(threading.Thread):
     32     def __init__(self, name, testcase, sema, mutex, nrunning):
     33         threading.Thread.__init__(self, name=name)
     34         self.testcase = testcase
     35         self.sema = sema
     36         self.mutex = mutex
     37         self.nrunning = nrunning
     38 
     39     def run(self):
     40         delay = random.random() / 10000.0
     41         if verbose:
     42             print 'task %s will run for %.1f usec' % (
     43                 self.name, delay * 1e6)
     44 
     45         with self.sema:
     46             with self.mutex:
     47                 self.nrunning.inc()
     48                 if verbose:
     49                     print self.nrunning.get(), 'tasks are running'
     50                 self.testcase.assertTrue(self.nrunning.get() <= 3)
     51 
     52             time.sleep(delay)
     53             if verbose:
     54                 print 'task', self.name, 'done'
     55 
     56             with self.mutex:
     57                 self.nrunning.dec()
     58                 self.testcase.assertTrue(self.nrunning.get() >= 0)
     59                 if verbose:
     60                     print '%s is finished. %d tasks are running' % (
     61                         self.name, self.nrunning.get())
     62 
     63 class BaseTestCase(unittest.TestCase):
     64     def setUp(self):
     65         self._threads = test.test_support.threading_setup()
     66 
     67     def tearDown(self):
     68         test.test_support.threading_cleanup(*self._threads)
     69         test.test_support.reap_children()
     70 
     71 
     72 class ThreadTests(BaseTestCase):
     73 
     74     # Create a bunch of threads, let each do some work, wait until all are
     75     # done.
     76     def test_various_ops(self):
     77         # This takes about n/3 seconds to run (about n/3 clumps of tasks,
     78         # times about 1 second per clump).
     79         NUMTASKS = 10
     80 
     81         # no more than 3 of the 10 can run at once
     82         sema = threading.BoundedSemaphore(value=3)
     83         mutex = threading.RLock()
     84         numrunning = Counter()
     85 
     86         threads = []
     87 
     88         for i in range(NUMTASKS):
     89             t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
     90             threads.append(t)
     91             self.assertEqual(t.ident, None)
     92             self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
     93             t.start()
     94 
     95         if verbose:
     96             print 'waiting for all tasks to complete'
     97         for t in threads:
     98             t.join(NUMTASKS)
     99             self.assertTrue(not t.is_alive())
    100             self.assertNotEqual(t.ident, 0)
    101             self.assertFalse(t.ident is None)
    102             self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
    103         if verbose:
    104             print 'all tasks done'
    105         self.assertEqual(numrunning.get(), 0)
    106 
    107     def test_ident_of_no_threading_threads(self):
    108         # The ident still must work for the main thread and dummy threads.
    109         self.assertFalse(threading.currentThread().ident is None)
    110         def f():
    111             ident.append(threading.currentThread().ident)
    112             done.set()
    113         done = threading.Event()
    114         ident = []
    115         thread.start_new_thread(f, ())
    116         done.wait()
    117         self.assertFalse(ident[0] is None)
    118         # Kill the "immortal" _DummyThread
    119         del threading._active[ident[0]]
    120 
    121     # run with a small(ish) thread stack size (256kB)
    122     def test_various_ops_small_stack(self):
    123         if verbose:
    124             print 'with 256kB thread stack size...'
    125         try:
    126             threading.stack_size(262144)
    127         except thread.error:
    128             if verbose:
    129                 print 'platform does not support changing thread stack size'
    130             return
    131         self.test_various_ops()
    132         threading.stack_size(0)
    133 
    134     # run with a large thread stack size (1MB)
    135     def test_various_ops_large_stack(self):
    136         if verbose:
    137             print 'with 1MB thread stack size...'
    138         try:
    139             threading.stack_size(0x100000)
    140         except thread.error:
    141             if verbose:
    142                 print 'platform does not support changing thread stack size'
    143             return
    144         self.test_various_ops()
    145         threading.stack_size(0)
    146 
    147     def test_foreign_thread(self):
    148         # Check that a "foreign" thread can use the threading module.
    149         def f(mutex):
    150             # Calling current_thread() forces an entry for the foreign
    151             # thread to get made in the threading._active map.
    152             threading.current_thread()
    153             mutex.release()
    154 
    155         mutex = threading.Lock()
    156         mutex.acquire()
    157         tid = thread.start_new_thread(f, (mutex,))
    158         # Wait for the thread to finish.
    159         mutex.acquire()
    160         self.assertIn(tid, threading._active)
    161         self.assertIsInstance(threading._active[tid], threading._DummyThread)
    162         del threading._active[tid]
    163 
    164     # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    165     # exposed at the Python level.  This test relies on ctypes to get at it.
    166     def test_PyThreadState_SetAsyncExc(self):
    167         try:
    168             import ctypes
    169         except ImportError:
    170             if verbose:
    171                 print "test_PyThreadState_SetAsyncExc can't import ctypes"
    172             return  # can't do anything
    173 
    174         set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
    175 
    176         class AsyncExc(Exception):
    177             pass
    178 
    179         exception = ctypes.py_object(AsyncExc)
    180 
    181         # First check it works when setting the exception from the same thread.
    182         tid = thread.get_ident()
    183 
    184         try:
    185             result = set_async_exc(ctypes.c_long(tid), exception)
    186             # The exception is async, so we might have to keep the VM busy until
    187             # it notices.
    188             while True:
    189                 pass
    190         except AsyncExc:
    191             pass
    192         else:
    193             # This code is unreachable but it reflects the intent. If we wanted
    194             # to be smarter the above loop wouldn't be infinite.
    195             self.fail("AsyncExc not raised")
    196         try:
    197             self.assertEqual(result, 1) # one thread state modified
    198         except UnboundLocalError:
    199             # The exception was raised too quickly for us to get the result.
    200             pass
    201 
    202         # `worker_started` is set by the thread when it's inside a try/except
    203         # block waiting to catch the asynchronously set AsyncExc exception.
    204         # `worker_saw_exception` is set by the thread upon catching that
    205         # exception.
    206         worker_started = threading.Event()
    207         worker_saw_exception = threading.Event()
    208 
    209         class Worker(threading.Thread):
    210             def run(self):
    211                 self.id = thread.get_ident()
    212                 self.finished = False
    213 
    214                 try:
    215                     while True:
    216                         worker_started.set()
    217                         time.sleep(0.1)
    218                 except AsyncExc:
    219                     self.finished = True
    220                     worker_saw_exception.set()
    221 
    222         t = Worker()
    223         t.daemon = True # so if this fails, we don't hang Python at shutdown
    224         t.start()
    225         if verbose:
    226             print "    started worker thread"
    227 
    228         # Try a thread id that doesn't make sense.
    229         if verbose:
    230             print "    trying nonsensical thread id"
    231         result = set_async_exc(ctypes.c_long(-1), exception)
    232         self.assertEqual(result, 0)  # no thread states modified
    233 
    234         # Now raise an exception in the worker thread.
    235         if verbose:
    236             print "    waiting for worker thread to get started"
    237         ret = worker_started.wait()
    238         self.assertTrue(ret)
    239         if verbose:
    240             print "    verifying worker hasn't exited"
    241         self.assertTrue(not t.finished)
    242         if verbose:
    243             print "    attempting to raise asynch exception in worker"
    244         result = set_async_exc(ctypes.c_long(t.id), exception)
    245         self.assertEqual(result, 1) # one thread state modified
    246         if verbose:
    247             print "    waiting for worker to say it caught the exception"
    248         worker_saw_exception.wait(timeout=10)
    249         self.assertTrue(t.finished)
    250         if verbose:
    251             print "    all OK -- joining worker"
    252         if t.finished:
    253             t.join()
    254         # else the thread is still running, and we have no way to kill it
    255 
    256     def test_limbo_cleanup(self):
    257         # Issue 7481: Failure to start thread should cleanup the limbo map.
    258         def fail_new_thread(*args):
    259             raise thread.error()
    260         _start_new_thread = threading._start_new_thread
    261         threading._start_new_thread = fail_new_thread
    262         try:
    263             t = threading.Thread(target=lambda: None)
    264             self.assertRaises(thread.error, t.start)
    265             self.assertFalse(
    266                 t in threading._limbo,
    267                 "Failed to cleanup _limbo map on failure of Thread.start().")
    268         finally:
    269             threading._start_new_thread = _start_new_thread
    270 
    271     def test_finalize_runnning_thread(self):
    272         # Issue 1402: the PyGILState_Ensure / _Release functions may be called
    273         # very late on python exit: on deallocation of a running thread for
    274         # example.
    275         try:
    276             import ctypes
    277         except ImportError:
    278             if verbose:
    279                 print("test_finalize_with_runnning_thread can't import ctypes")
    280             return  # can't do anything
    281 
    282         rc = subprocess.call([sys.executable, "-c", """if 1:
    283             import ctypes, sys, time, thread
    284 
    285             # This lock is used as a simple event variable.
    286             ready = thread.allocate_lock()
    287             ready.acquire()
    288 
    289             # Module globals are cleared before __del__ is run
    290             # So we save the functions in class dict
    291             class C:
    292                 ensure = ctypes.pythonapi.PyGILState_Ensure
    293                 release = ctypes.pythonapi.PyGILState_Release
    294                 def __del__(self):
    295                     state = self.ensure()
    296                     self.release(state)
    297 
    298             def waitingThread():
    299                 x = C()
    300                 ready.release()
    301                 time.sleep(100)
    302 
    303             thread.start_new_thread(waitingThread, ())
    304             ready.acquire()  # Be sure the other thread is waiting.
    305             sys.exit(42)
    306             """])
    307         self.assertEqual(rc, 42)
    308 
    309     def test_finalize_with_trace(self):
    310         # Issue1733757
    311         # Avoid a deadlock when sys.settrace steps into threading._shutdown
    312         p = subprocess.Popen([sys.executable, "-c", """if 1:
    313             import sys, threading
    314 
    315             # A deadlock-killer, to prevent the
    316             # testsuite to hang forever
    317             def killer():
    318                 import os, time
    319                 time.sleep(2)
    320                 print 'program blocked; aborting'
    321                 os._exit(2)
    322             t = threading.Thread(target=killer)
    323             t.daemon = True
    324             t.start()
    325 
    326             # This is the trace function
    327             def func(frame, event, arg):
    328                 threading.current_thread()
    329                 return func
    330 
    331             sys.settrace(func)
    332             """],
    333             stdout=subprocess.PIPE,
    334             stderr=subprocess.PIPE)
    335         self.addCleanup(p.stdout.close)
    336         self.addCleanup(p.stderr.close)
    337         stdout, stderr = p.communicate()
    338         rc = p.returncode
    339         self.assertFalse(rc == 2, "interpreted was blocked")
    340         self.assertTrue(rc == 0,
    341                         "Unexpected error: " + repr(stderr))
    342 
    343     def test_join_nondaemon_on_shutdown(self):
    344         # Issue 1722344
    345         # Raising SystemExit skipped threading._shutdown
    346         p = subprocess.Popen([sys.executable, "-c", """if 1:
    347                 import threading
    348                 from time import sleep
    349 
    350                 def child():
    351                     sleep(1)
    352                     # As a non-daemon thread we SHOULD wake up and nothing
    353                     # should be torn down yet
    354                     print "Woke up, sleep function is:", sleep
    355 
    356                 threading.Thread(target=child).start()
    357                 raise SystemExit
    358             """],
    359             stdout=subprocess.PIPE,
    360             stderr=subprocess.PIPE)
    361         self.addCleanup(p.stdout.close)
    362         self.addCleanup(p.stderr.close)
    363         stdout, stderr = p.communicate()
    364         self.assertEqual(stdout.strip(),
    365             "Woke up, sleep function is: <built-in function sleep>")
    366         stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
    367         self.assertEqual(stderr, "")
    368 
    369     def test_enumerate_after_join(self):
    370         # Try hard to trigger #1703448: a thread is still returned in
    371         # threading.enumerate() after it has been join()ed.
    372         enum = threading.enumerate
    373         old_interval = sys.getcheckinterval()
    374         try:
    375             for i in xrange(1, 100):
    376                 # Try a couple times at each thread-switching interval
    377                 # to get more interleavings.
    378                 sys.setcheckinterval(i // 5)
    379                 t = threading.Thread(target=lambda: None)
    380                 t.start()
    381                 t.join()
    382                 l = enum()
    383                 self.assertNotIn(t, l,
    384                     "#1703448 triggered after %d trials: %s" % (i, l))
    385         finally:
    386             sys.setcheckinterval(old_interval)
    387 
    388     def test_no_refcycle_through_target(self):
    389         class RunSelfFunction(object):
    390             def __init__(self, should_raise):
    391                 # The links in this refcycle from Thread back to self
    392                 # should be cleaned up when the thread completes.
    393                 self.should_raise = should_raise
    394                 self.thread = threading.Thread(target=self._run,
    395                                                args=(self,),
    396                                                kwargs={'yet_another':self})
    397                 self.thread.start()
    398 
    399             def _run(self, other_ref, yet_another):
    400                 if self.should_raise:
    401                     raise SystemExit
    402 
    403         cyclic_object = RunSelfFunction(should_raise=False)
    404         weak_cyclic_object = weakref.ref(cyclic_object)
    405         cyclic_object.thread.join()
    406         del cyclic_object
    407         self.assertEqual(None, weak_cyclic_object(),
    408                          msg=('%d references still around' %
    409                               sys.getrefcount(weak_cyclic_object())))
    410 
    411         raising_cyclic_object = RunSelfFunction(should_raise=True)
    412         weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
    413         raising_cyclic_object.thread.join()
    414         del raising_cyclic_object
    415         self.assertEqual(None, weak_raising_cyclic_object(),
    416                          msg=('%d references still around' %
    417                               sys.getrefcount(weak_raising_cyclic_object())))
    418 
    419     @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
    420     def test_dummy_thread_after_fork(self):
    421         # Issue #14308: a dummy thread in the active list doesn't mess up
    422         # the after-fork mechanism.
    423         code = """if 1:
    424             import thread, threading, os, time
    425 
    426             def background_thread(evt):
    427                 # Creates and registers the _DummyThread instance
    428                 threading.current_thread()
    429                 evt.set()
    430                 time.sleep(10)
    431 
    432             evt = threading.Event()
    433             thread.start_new_thread(background_thread, (evt,))
    434             evt.wait()
    435             assert threading.active_count() == 2, threading.active_count()
    436             if os.fork() == 0:
    437                 assert threading.active_count() == 1, threading.active_count()
    438                 os._exit(0)
    439             else:
    440                 os.wait()
    441         """
    442         _, out, err = assert_python_ok("-c", code)
    443         self.assertEqual(out, '')
    444         self.assertEqual(err, '')
    445 
    446 
    447 class ThreadJoinOnShutdown(BaseTestCase):
    448 
    449     # Between fork() and exec(), only async-safe functions are allowed (issues
    450     # #12316 and #11870), and fork() from a worker thread is known to trigger
    451     # problems with some operating systems (issue #3863): skip problematic tests
    452     # on platforms known to behave badly.
    453     platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
    454                          'os2emx')
    455 
    456     def _run_and_join(self, script):
    457         script = """if 1:
    458             import sys, os, time, threading
    459 
    460             # a thread, which waits for the main program to terminate
    461             def joiningfunc(mainthread):
    462                 mainthread.join()
    463                 print 'end of thread'
    464         \n""" + script
    465 
    466         p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
    467         rc = p.wait()
    468         data = p.stdout.read().replace('\r', '')
    469         p.stdout.close()
    470         self.assertEqual(data, "end of main\nend of thread\n")
    471         self.assertFalse(rc == 2, "interpreter was blocked")
    472         self.assertTrue(rc == 0, "Unexpected error")
    473 
    474     def test_1_join_on_shutdown(self):
    475         # The usual case: on exit, wait for a non-daemon thread
    476         script = """if 1:
    477             import os
    478             t = threading.Thread(target=joiningfunc,
    479                                  args=(threading.current_thread(),))
    480             t.start()
    481             time.sleep(0.1)
    482             print 'end of main'
    483             """
    484         self._run_and_join(script)
    485 
    486 
    487     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    488     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    489     def test_2_join_in_forked_process(self):
    490         # Like the test above, but from a forked interpreter
    491         script = """if 1:
    492             childpid = os.fork()
    493             if childpid != 0:
    494                 os.waitpid(childpid, 0)
    495                 sys.exit(0)
    496 
    497             t = threading.Thread(target=joiningfunc,
    498                                  args=(threading.current_thread(),))
    499             t.start()
    500             print 'end of main'
    501             """
    502         self._run_and_join(script)
    503 
    504     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    505     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    506     def test_3_join_in_forked_from_thread(self):
    507         # Like the test above, but fork() was called from a worker thread
    508         # In the forked process, the main Thread object must be marked as stopped.
    509         script = """if 1:
    510             main_thread = threading.current_thread()
    511             def worker():
    512                 childpid = os.fork()
    513                 if childpid != 0:
    514                     os.waitpid(childpid, 0)
    515                     sys.exit(0)
    516 
    517                 t = threading.Thread(target=joiningfunc,
    518                                      args=(main_thread,))
    519                 print 'end of main'
    520                 t.start()
    521                 t.join() # Should not block: main_thread is already stopped
    522 
    523             w = threading.Thread(target=worker)
    524             w.start()
    525             """
    526         self._run_and_join(script)
    527 
    528     def assertScriptHasOutput(self, script, expected_output):
    529         p = subprocess.Popen([sys.executable, "-c", script],
    530                              stdout=subprocess.PIPE)
    531         rc = p.wait()
    532         data = p.stdout.read().decode().replace('\r', '')
    533         self.assertEqual(rc, 0, "Unexpected error")
    534         self.assertEqual(data, expected_output)
    535 
    536     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    537     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    538     def test_4_joining_across_fork_in_worker_thread(self):
    539         # There used to be a possible deadlock when forking from a child
    540         # thread.  See http://bugs.python.org/issue6643.
    541 
    542         # The script takes the following steps:
    543         # - The main thread in the parent process starts a new thread and then
    544         #   tries to join it.
    545         # - The join operation acquires the Lock inside the thread's _block
    546         #   Condition.  (See threading.py:Thread.join().)
    547         # - We stub out the acquire method on the condition to force it to wait
    548         #   until the child thread forks.  (See LOCK ACQUIRED HERE)
    549         # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS
    550         #   HERE)
    551         # - The main thread of the parent process enters Condition.wait(),
    552         #   which releases the lock on the child thread.
    553         # - The child process returns.  Without the necessary fix, when the
    554         #   main thread of the child process (which used to be the child thread
    555         #   in the parent process) attempts to exit, it will try to acquire the
    556         #   lock in the Thread._block Condition object and hang, because the
    557         #   lock was held across the fork.
    558 
    559         script = """if 1:
    560             import os, time, threading
    561 
    562             finish_join = False
    563             start_fork = False
    564 
    565             def worker():
    566                 # Wait until this thread's lock is acquired before forking to
    567                 # create the deadlock.
    568                 global finish_join
    569                 while not start_fork:
    570                     time.sleep(0.01)
    571                 # LOCK HELD: Main thread holds lock across this call.
    572                 childpid = os.fork()
    573                 finish_join = True
    574                 if childpid != 0:
    575                     # Parent process just waits for child.
    576                     os.waitpid(childpid, 0)
    577                 # Child process should just return.
    578 
    579             w = threading.Thread(target=worker)
    580 
    581             # Stub out the private condition variable's lock acquire method.
    582             # This acquires the lock and then waits until the child has forked
    583             # before returning, which will release the lock soon after.  If
    584             # someone else tries to fix this test case by acquiring this lock
    585             # before forking instead of resetting it, the test case will
    586             # deadlock when it shouldn't.
    587             condition = w._block
    588             orig_acquire = condition.acquire
    589             call_count_lock = threading.Lock()
    590             call_count = 0
    591             def my_acquire():
    592                 global call_count
    593                 global start_fork
    594                 orig_acquire()  # LOCK ACQUIRED HERE
    595                 start_fork = True
    596                 if call_count == 0:
    597                     while not finish_join:
    598                         time.sleep(0.01)  # WORKER THREAD FORKS HERE
    599                 with call_count_lock:
    600                     call_count += 1
    601             condition.acquire = my_acquire
    602 
    603             w.start()
    604             w.join()
    605             print('end of main')
    606             """
    607         self.assertScriptHasOutput(script, "end of main\n")
    608 
    609     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    610     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    611     def test_5_clear_waiter_locks_to_avoid_crash(self):
    612         # Check that a spawned thread that forks doesn't segfault on certain
    613         # platforms, namely OS X.  This used to happen if there was a waiter
    614         # lock in the thread's condition variable's waiters list.  Even though
    615         # we know the lock will be held across the fork, it is not safe to
    616         # release locks held across forks on all platforms, so releasing the
    617         # waiter lock caused a segfault on OS X.  Furthermore, since locks on
    618         # OS X are (as of this writing) implemented with a mutex + condition
    619         # variable instead of a semaphore, while we know that the Python-level
    620         # lock will be acquired, we can't know if the internal mutex will be
    621         # acquired at the time of the fork.
    622 
    623         script = """if True:
    624             import os, time, threading
    625 
    626             start_fork = False
    627 
    628             def worker():
    629                 # Wait until the main thread has attempted to join this thread
    630                 # before continuing.
    631                 while not start_fork:
    632                     time.sleep(0.01)
    633                 childpid = os.fork()
    634                 if childpid != 0:
    635                     # Parent process just waits for child.
    636                     (cpid, rc) = os.waitpid(childpid, 0)
    637                     assert cpid == childpid
    638                     assert rc == 0
    639                     print('end of worker thread')
    640                 else:
    641                     # Child process should just return.
    642                     pass
    643 
    644             w = threading.Thread(target=worker)
    645 
    646             # Stub out the private condition variable's _release_save method.
    647             # This releases the condition's lock and flips the global that
    648             # causes the worker to fork.  At this point, the problematic waiter
    649             # lock has been acquired once by the waiter and has been put onto
    650             # the waiters list.
    651             condition = w._block
    652             orig_release_save = condition._release_save
    653             def my_release_save():
    654                 global start_fork
    655                 orig_release_save()
    656                 # Waiter lock held here, condition lock released.
    657                 start_fork = True
    658             condition._release_save = my_release_save
    659 
    660             w.start()
    661             w.join()
    662             print('end of main thread')
    663             """
    664         output = "end of worker thread\nend of main thread\n"
    665         self.assertScriptHasOutput(script, output)
    666 
    667     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    668     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    669     def test_reinit_tls_after_fork(self):
    670         # Issue #13817: fork() would deadlock in a multithreaded program with
    671         # the ad-hoc TLS implementation.
    672 
    673         def do_fork_and_wait():
    674             # just fork a child process and wait it
    675             pid = os.fork()
    676             if pid > 0:
    677                 os.waitpid(pid, 0)
    678             else:
    679                 os._exit(0)
    680 
    681         # start a bunch of threads that will fork() child processes
    682         threads = []
    683         for i in range(16):
    684             t = threading.Thread(target=do_fork_and_wait)
    685             threads.append(t)
    686             t.start()
    687 
    688         for t in threads:
    689             t.join()
    690 
    691 
    692 class ThreadingExceptionTests(BaseTestCase):
    693     # A RuntimeError should be raised if Thread.start() is called
    694     # multiple times.
    695     def test_start_thread_again(self):
    696         thread = threading.Thread()
    697         thread.start()
    698         self.assertRaises(RuntimeError, thread.start)
    699 
    700     def test_joining_current_thread(self):
    701         current_thread = threading.current_thread()
    702         self.assertRaises(RuntimeError, current_thread.join);
    703 
    704     def test_joining_inactive_thread(self):
    705         thread = threading.Thread()
    706         self.assertRaises(RuntimeError, thread.join)
    707 
    708     def test_daemonize_active_thread(self):
    709         thread = threading.Thread()
    710         thread.start()
    711         self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
    712 
    713 
    714 class LockTests(lock_tests.LockTests):
    715     locktype = staticmethod(threading.Lock)
    716 
    717 class RLockTests(lock_tests.RLockTests):
    718     locktype = staticmethod(threading.RLock)
    719 
    720 class EventTests(lock_tests.EventTests):
    721     eventtype = staticmethod(threading.Event)
    722 
    723 class ConditionAsRLockTests(lock_tests.RLockTests):
    724     # An Condition uses an RLock by default and exports its API.
    725     locktype = staticmethod(threading.Condition)
    726 
    727 class ConditionTests(lock_tests.ConditionTests):
    728     condtype = staticmethod(threading.Condition)
    729 
    730 class SemaphoreTests(lock_tests.SemaphoreTests):
    731     semtype = staticmethod(threading.Semaphore)
    732 
    733 class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
    734     semtype = staticmethod(threading.BoundedSemaphore)
    735 
    736     @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
    737     def test_recursion_limit(self):
    738         # Issue 9670
    739         # test that excessive recursion within a non-main thread causes
    740         # an exception rather than crashing the interpreter on platforms
    741         # like Mac OS X or FreeBSD which have small default stack sizes
    742         # for threads
    743         script = """if True:
    744             import threading
    745 
    746             def recurse():
    747                 return recurse()
    748 
    749             def outer():
    750                 try:
    751                     recurse()
    752                 except RuntimeError:
    753                     pass
    754 
    755             w = threading.Thread(target=outer)
    756             w.start()
    757             w.join()
    758             print('end of main thread')
    759             """
    760         expected_output = "end of main thread\n"
    761         p = subprocess.Popen([sys.executable, "-c", script],
    762                              stdout=subprocess.PIPE)
    763         stdout, stderr = p.communicate()
    764         data = stdout.decode().replace('\r', '')
    765         self.assertEqual(p.returncode, 0, "Unexpected error")
    766         self.assertEqual(data, expected_output)
    767 
    768 def test_main():
    769     test.test_support.run_unittest(LockTests, RLockTests, EventTests,
    770                                    ConditionAsRLockTests, ConditionTests,
    771                                    SemaphoreTests, BoundedSemaphoreTests,
    772                                    ThreadTests,
    773                                    ThreadJoinOnShutdown,
    774                                    ThreadingExceptionTests,
    775                                    )
    776 
    777 if __name__ == "__main__":
    778     test_main()
    779