Home | History | Annotate | Download | only in test
      1 # Very rudimentary test of threading module
      2 
      3 import test.test_support
      4 from test.test_support import verbose, cpython_only
      5 from test.script_helper import assert_python_ok
      6 
      7 import random
      8 import re
      9 import sys
     10 thread = test.test_support.import_module('thread')
     11 threading = test.test_support.import_module('threading')
     12 import time
     13 import unittest
     14 import weakref
     15 import os
     16 import subprocess
     17 try:
     18     import _testcapi
     19 except ImportError:
     20     _testcapi = None
     21 
     22 from test import lock_tests
     23 
     24 # A trivial mutable counter.
     25 class Counter(object):
     26     def __init__(self):
     27         self.value = 0
     28     def inc(self):
     29         self.value += 1
     30     def dec(self):
     31         self.value -= 1
     32     def get(self):
     33         return self.value
     34 
     35 class TestThread(threading.Thread):
     36     def __init__(self, name, testcase, sema, mutex, nrunning):
     37         threading.Thread.__init__(self, name=name)
     38         self.testcase = testcase
     39         self.sema = sema
     40         self.mutex = mutex
     41         self.nrunning = nrunning
     42 
     43     def run(self):
     44         delay = random.random() / 10000.0
     45         if verbose:
     46             print 'task %s will run for %.1f usec' % (
     47                 self.name, delay * 1e6)
     48 
     49         with self.sema:
     50             with self.mutex:
     51                 self.nrunning.inc()
     52                 if verbose:
     53                     print self.nrunning.get(), 'tasks are running'
     54                 self.testcase.assertLessEqual(self.nrunning.get(), 3)
     55 
     56             time.sleep(delay)
     57             if verbose:
     58                 print 'task', self.name, 'done'
     59 
     60             with self.mutex:
     61                 self.nrunning.dec()
     62                 self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
     63                 if verbose:
     64                     print '%s is finished. %d tasks are running' % (
     65                         self.name, self.nrunning.get())
     66 
     67 class BaseTestCase(unittest.TestCase):
     68     def setUp(self):
     69         self._threads = test.test_support.threading_setup()
     70 
     71     def tearDown(self):
     72         test.test_support.threading_cleanup(*self._threads)
     73         test.test_support.reap_children()
     74 
     75 
     76 class ThreadTests(BaseTestCase):
     77 
     78     # Create a bunch of threads, let each do some work, wait until all are
     79     # done.
     80     def test_various_ops(self):
     81         # This takes about n/3 seconds to run (about n/3 clumps of tasks,
     82         # times about 1 second per clump).
     83         NUMTASKS = 10
     84 
     85         # no more than 3 of the 10 can run at once
     86         sema = threading.BoundedSemaphore(value=3)
     87         mutex = threading.RLock()
     88         numrunning = Counter()
     89 
     90         threads = []
     91 
     92         for i in range(NUMTASKS):
     93             t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
     94             threads.append(t)
     95             self.assertIsNone(t.ident)
     96             self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, initial\)>$')
     97             t.start()
     98 
     99         if verbose:
    100             print 'waiting for all tasks to complete'
    101         for t in threads:
    102             t.join(NUMTASKS)
    103             self.assertFalse(t.is_alive())
    104             self.assertNotEqual(t.ident, 0)
    105             self.assertIsNotNone(t.ident)
    106             self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, \w+ -?\d+\)>$')
    107         if verbose:
    108             print 'all tasks done'
    109         self.assertEqual(numrunning.get(), 0)
    110 
    111     def test_ident_of_no_threading_threads(self):
    112         # The ident still must work for the main thread and dummy threads.
    113         self.assertIsNotNone(threading.currentThread().ident)
    114         def f():
    115             ident.append(threading.currentThread().ident)
    116             done.set()
    117         done = threading.Event()
    118         ident = []
    119         thread.start_new_thread(f, ())
    120         done.wait()
    121         self.assertIsNotNone(ident[0])
    122         # Kill the "immortal" _DummyThread
    123         del threading._active[ident[0]]
    124 
    125     # run with a small(ish) thread stack size (256kB)
    126     def test_various_ops_small_stack(self):
    127         if verbose:
    128             print 'with 256kB thread stack size...'
    129         try:
    130             threading.stack_size(262144)
    131         except thread.error:
    132             self.skipTest('platform does not support changing thread stack size')
    133         self.test_various_ops()
    134         threading.stack_size(0)
    135 
    136     # run with a large thread stack size (1MB)
    137     def test_various_ops_large_stack(self):
    138         if verbose:
    139             print 'with 1MB thread stack size...'
    140         try:
    141             threading.stack_size(0x100000)
    142         except thread.error:
    143             self.skipTest('platform does not support changing thread stack size')
    144         self.test_various_ops()
    145         threading.stack_size(0)
    146 
    147     def test_foreign_thread(self):
    148         # Check that a "foreign" thread can use the threading module.
    149         def f(mutex):
    150             # Calling current_thread() forces an entry for the foreign
    151             # thread to get made in the threading._active map.
    152             threading.current_thread()
    153             mutex.release()
    154 
    155         mutex = threading.Lock()
    156         mutex.acquire()
    157         tid = thread.start_new_thread(f, (mutex,))
    158         # Wait for the thread to finish.
    159         mutex.acquire()
    160         self.assertIn(tid, threading._active)
    161         self.assertIsInstance(threading._active[tid], threading._DummyThread)
    162         del threading._active[tid]
    163 
    164     # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    165     # exposed at the Python level.  This test relies on ctypes to get at it.
    166     def test_PyThreadState_SetAsyncExc(self):
    167         try:
    168             import ctypes
    169         except ImportError:
    170             self.skipTest('requires ctypes')
    171 
    172         set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
    173 
    174         class AsyncExc(Exception):
    175             pass
    176 
    177         exception = ctypes.py_object(AsyncExc)
    178 
    179         # First check it works when setting the exception from the same thread.
    180         tid = thread.get_ident()
    181 
    182         try:
    183             result = set_async_exc(ctypes.c_long(tid), exception)
    184             # The exception is async, so we might have to keep the VM busy until
    185             # it notices.
    186             while True:
    187                 pass
    188         except AsyncExc:
    189             pass
    190         else:
    191             # This code is unreachable but it reflects the intent. If we wanted
    192             # to be smarter the above loop wouldn't be infinite.
    193             self.fail("AsyncExc not raised")
    194         try:
    195             self.assertEqual(result, 1) # one thread state modified
    196         except UnboundLocalError:
    197             # The exception was raised too quickly for us to get the result.
    198             pass
    199 
    200         # `worker_started` is set by the thread when it's inside a try/except
    201         # block waiting to catch the asynchronously set AsyncExc exception.
    202         # `worker_saw_exception` is set by the thread upon catching that
    203         # exception.
    204         worker_started = threading.Event()
    205         worker_saw_exception = threading.Event()
    206 
    207         class Worker(threading.Thread):
    208             def run(self):
    209                 self.id = thread.get_ident()
    210                 self.finished = False
    211 
    212                 try:
    213                     while True:
    214                         worker_started.set()
    215                         time.sleep(0.1)
    216                 except AsyncExc:
    217                     self.finished = True
    218                     worker_saw_exception.set()
    219 
    220         t = Worker()
    221         t.daemon = True # so if this fails, we don't hang Python at shutdown
    222         t.start()
    223         if verbose:
    224             print "    started worker thread"
    225 
    226         # Try a thread id that doesn't make sense.
    227         if verbose:
    228             print "    trying nonsensical thread id"
    229         result = set_async_exc(ctypes.c_long(-1), exception)
    230         self.assertEqual(result, 0)  # no thread states modified
    231 
    232         # Now raise an exception in the worker thread.
    233         if verbose:
    234             print "    waiting for worker thread to get started"
    235         ret = worker_started.wait()
    236         self.assertTrue(ret)
    237         if verbose:
    238             print "    verifying worker hasn't exited"
    239         self.assertFalse(t.finished)
    240         if verbose:
    241             print "    attempting to raise asynch exception in worker"
    242         result = set_async_exc(ctypes.c_long(t.id), exception)
    243         self.assertEqual(result, 1) # one thread state modified
    244         if verbose:
    245             print "    waiting for worker to say it caught the exception"
    246         worker_saw_exception.wait(timeout=10)
    247         self.assertTrue(t.finished)
    248         if verbose:
    249             print "    all OK -- joining worker"
    250         if t.finished:
    251             t.join()
    252         # else the thread is still running, and we have no way to kill it
    253 
    254     def test_limbo_cleanup(self):
    255         # Issue 7481: Failure to start thread should cleanup the limbo map.
    256         def fail_new_thread(*args):
    257             raise thread.error()
    258         _start_new_thread = threading._start_new_thread
    259         threading._start_new_thread = fail_new_thread
    260         try:
    261             t = threading.Thread(target=lambda: None)
    262             self.assertRaises(thread.error, t.start)
    263             self.assertFalse(
    264                 t in threading._limbo,
    265                 "Failed to cleanup _limbo map on failure of Thread.start().")
    266         finally:
    267             threading._start_new_thread = _start_new_thread
    268 
    269     def test_finalize_runnning_thread(self):
    270         # Issue 1402: the PyGILState_Ensure / _Release functions may be called
    271         # very late on python exit: on deallocation of a running thread for
    272         # example.
    273         try:
    274             import ctypes
    275         except ImportError:
    276             self.skipTest('requires ctypes')
    277 
    278         rc = subprocess.call([sys.executable, "-c", """if 1:
    279             import ctypes, sys, time, thread
    280 
    281             # This lock is used as a simple event variable.
    282             ready = thread.allocate_lock()
    283             ready.acquire()
    284 
    285             # Module globals are cleared before __del__ is run
    286             # So we save the functions in class dict
    287             class C:
    288                 ensure = ctypes.pythonapi.PyGILState_Ensure
    289                 release = ctypes.pythonapi.PyGILState_Release
    290                 def __del__(self):
    291                     state = self.ensure()
    292                     self.release(state)
    293 
    294             def waitingThread():
    295                 x = C()
    296                 ready.release()
    297                 time.sleep(100)
    298 
    299             thread.start_new_thread(waitingThread, ())
    300             ready.acquire()  # Be sure the other thread is waiting.
    301             sys.exit(42)
    302             """])
    303         self.assertEqual(rc, 42)
    304 
    305     def test_finalize_with_trace(self):
    306         # Issue1733757
    307         # Avoid a deadlock when sys.settrace steps into threading._shutdown
    308         p = subprocess.Popen([sys.executable, "-c", """if 1:
    309             import sys, threading
    310 
    311             # A deadlock-killer, to prevent the
    312             # testsuite to hang forever
    313             def killer():
    314                 import os, time
    315                 time.sleep(2)
    316                 print 'program blocked; aborting'
    317                 os._exit(2)
    318             t = threading.Thread(target=killer)
    319             t.daemon = True
    320             t.start()
    321 
    322             # This is the trace function
    323             def func(frame, event, arg):
    324                 threading.current_thread()
    325                 return func
    326 
    327             sys.settrace(func)
    328             """],
    329             stdout=subprocess.PIPE,
    330             stderr=subprocess.PIPE)
    331         self.addCleanup(p.stdout.close)
    332         self.addCleanup(p.stderr.close)
    333         stdout, stderr = p.communicate()
    334         rc = p.returncode
    335         self.assertFalse(rc == 2, "interpreted was blocked")
    336         self.assertTrue(rc == 0,
    337                         "Unexpected error: " + repr(stderr))
    338 
    339     def test_join_nondaemon_on_shutdown(self):
    340         # Issue 1722344
    341         # Raising SystemExit skipped threading._shutdown
    342         p = subprocess.Popen([sys.executable, "-c", """if 1:
    343                 import threading
    344                 from time import sleep
    345 
    346                 def child():
    347                     sleep(1)
    348                     # As a non-daemon thread we SHOULD wake up and nothing
    349                     # should be torn down yet
    350                     print "Woke up, sleep function is:", sleep
    351 
    352                 threading.Thread(target=child).start()
    353                 raise SystemExit
    354             """],
    355             stdout=subprocess.PIPE,
    356             stderr=subprocess.PIPE)
    357         self.addCleanup(p.stdout.close)
    358         self.addCleanup(p.stderr.close)
    359         stdout, stderr = p.communicate()
    360         self.assertEqual(stdout.strip(),
    361             "Woke up, sleep function is: <built-in function sleep>")
    362         stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
    363         self.assertEqual(stderr, "")
    364 
    365     def test_enumerate_after_join(self):
    366         # Try hard to trigger #1703448: a thread is still returned in
    367         # threading.enumerate() after it has been join()ed.
    368         enum = threading.enumerate
    369         old_interval = sys.getcheckinterval()
    370         try:
    371             for i in xrange(1, 100):
    372                 # Try a couple times at each thread-switching interval
    373                 # to get more interleavings.
    374                 sys.setcheckinterval(i // 5)
    375                 t = threading.Thread(target=lambda: None)
    376                 t.start()
    377                 t.join()
    378                 l = enum()
    379                 self.assertNotIn(t, l,
    380                     "#1703448 triggered after %d trials: %s" % (i, l))
    381         finally:
    382             sys.setcheckinterval(old_interval)
    383 
    384     def test_no_refcycle_through_target(self):
    385         class RunSelfFunction(object):
    386             def __init__(self, should_raise):
    387                 # The links in this refcycle from Thread back to self
    388                 # should be cleaned up when the thread completes.
    389                 self.should_raise = should_raise
    390                 self.thread = threading.Thread(target=self._run,
    391                                                args=(self,),
    392                                                kwargs={'yet_another':self})
    393                 self.thread.start()
    394 
    395             def _run(self, other_ref, yet_another):
    396                 if self.should_raise:
    397                     raise SystemExit
    398 
    399         cyclic_object = RunSelfFunction(should_raise=False)
    400         weak_cyclic_object = weakref.ref(cyclic_object)
    401         cyclic_object.thread.join()
    402         del cyclic_object
    403         self.assertEqual(None, weak_cyclic_object(),
    404                          msg=('%d references still around' %
    405                               sys.getrefcount(weak_cyclic_object())))
    406 
    407         raising_cyclic_object = RunSelfFunction(should_raise=True)
    408         weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
    409         raising_cyclic_object.thread.join()
    410         del raising_cyclic_object
    411         self.assertEqual(None, weak_raising_cyclic_object(),
    412                          msg=('%d references still around' %
    413                               sys.getrefcount(weak_raising_cyclic_object())))
    414 
    415     @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
    416     def test_dummy_thread_after_fork(self):
    417         # Issue #14308: a dummy thread in the active list doesn't mess up
    418         # the after-fork mechanism.
    419         code = """if 1:
    420             import thread, threading, os, time
    421 
    422             def background_thread(evt):
    423                 # Creates and registers the _DummyThread instance
    424                 threading.current_thread()
    425                 evt.set()
    426                 time.sleep(10)
    427 
    428             evt = threading.Event()
    429             thread.start_new_thread(background_thread, (evt,))
    430             evt.wait()
    431             assert threading.active_count() == 2, threading.active_count()
    432             if os.fork() == 0:
    433                 assert threading.active_count() == 1, threading.active_count()
    434                 os._exit(0)
    435             else:
    436                 os.wait()
    437         """
    438         _, out, err = assert_python_ok("-c", code)
    439         self.assertEqual(out, '')
    440         self.assertEqual(err, '')
    441 
    442     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    443     def test_is_alive_after_fork(self):
    444         # Try hard to trigger #18418: is_alive() could sometimes be True on
    445         # threads that vanished after a fork.
    446         old_interval = sys.getcheckinterval()
    447 
    448         # Make the bug more likely to manifest.
    449         sys.setcheckinterval(10)
    450 
    451         try:
    452             for i in range(20):
    453                 t = threading.Thread(target=lambda: None)
    454                 t.start()
    455                 pid = os.fork()
    456                 if pid == 0:
    457                     os._exit(1 if t.is_alive() else 0)
    458                 else:
    459                     t.join()
    460                     pid, status = os.waitpid(pid, 0)
    461                     self.assertEqual(0, status)
    462         finally:
    463             sys.setcheckinterval(old_interval)
    464 
    465     def test_BoundedSemaphore_limit(self):
    466         # BoundedSemaphore should raise ValueError if released too often.
    467         for limit in range(1, 10):
    468             bs = threading.BoundedSemaphore(limit)
    469             threads = [threading.Thread(target=bs.acquire)
    470                        for _ in range(limit)]
    471             for t in threads:
    472                 t.start()
    473             for t in threads:
    474                 t.join()
    475             threads = [threading.Thread(target=bs.release)
    476                        for _ in range(limit)]
    477             for t in threads:
    478                 t.start()
    479             for t in threads:
    480                 t.join()
    481             self.assertRaises(ValueError, bs.release)
    482 
    483 class ThreadJoinOnShutdown(BaseTestCase):
    484 
    485     # Between fork() and exec(), only async-safe functions are allowed (issues
    486     # #12316 and #11870), and fork() from a worker thread is known to trigger
    487     # problems with some operating systems (issue #3863): skip problematic tests
    488     # on platforms known to behave badly.
    489     platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
    490                          'os2emx')
    491 
    492     def _run_and_join(self, script):
    493         script = """if 1:
    494             import sys, os, time, threading
    495 
    496             # a thread, which waits for the main program to terminate
    497             def joiningfunc(mainthread):
    498                 mainthread.join()
    499                 print 'end of thread'
    500         \n""" + script
    501 
    502         p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
    503         rc = p.wait()
    504         data = p.stdout.read().replace('\r', '')
    505         p.stdout.close()
    506         self.assertEqual(data, "end of main\nend of thread\n")
    507         self.assertFalse(rc == 2, "interpreter was blocked")
    508         self.assertTrue(rc == 0, "Unexpected error")
    509 
    510     def test_1_join_on_shutdown(self):
    511         # The usual case: on exit, wait for a non-daemon thread
    512         script = """if 1:
    513             import os
    514             t = threading.Thread(target=joiningfunc,
    515                                  args=(threading.current_thread(),))
    516             t.start()
    517             time.sleep(0.1)
    518             print 'end of main'
    519             """
    520         self._run_and_join(script)
    521 
    522 
    523     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    524     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    525     def test_2_join_in_forked_process(self):
    526         # Like the test above, but from a forked interpreter
    527         script = """if 1:
    528             childpid = os.fork()
    529             if childpid != 0:
    530                 os.waitpid(childpid, 0)
    531                 sys.exit(0)
    532 
    533             t = threading.Thread(target=joiningfunc,
    534                                  args=(threading.current_thread(),))
    535             t.start()
    536             print 'end of main'
    537             """
    538         self._run_and_join(script)
    539 
    540     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    541     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    542     def test_3_join_in_forked_from_thread(self):
    543         # Like the test above, but fork() was called from a worker thread
    544         # In the forked process, the main Thread object must be marked as stopped.
    545         script = """if 1:
    546             main_thread = threading.current_thread()
    547             def worker():
    548                 childpid = os.fork()
    549                 if childpid != 0:
    550                     os.waitpid(childpid, 0)
    551                     sys.exit(0)
    552 
    553                 t = threading.Thread(target=joiningfunc,
    554                                      args=(main_thread,))
    555                 print 'end of main'
    556                 t.start()
    557                 t.join() # Should not block: main_thread is already stopped
    558 
    559             w = threading.Thread(target=worker)
    560             w.start()
    561             """
    562         self._run_and_join(script)
    563 
    564     def assertScriptHasOutput(self, script, expected_output):
    565         p = subprocess.Popen([sys.executable, "-c", script],
    566                              stdout=subprocess.PIPE)
    567         rc = p.wait()
    568         data = p.stdout.read().decode().replace('\r', '')
    569         self.assertEqual(rc, 0, "Unexpected error")
    570         self.assertEqual(data, expected_output)
    571 
    572     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    573     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    574     def test_4_joining_across_fork_in_worker_thread(self):
    575         # There used to be a possible deadlock when forking from a child
    576         # thread.  See http://bugs.python.org/issue6643.
    577 
    578         # The script takes the following steps:
    579         # - The main thread in the parent process starts a new thread and then
    580         #   tries to join it.
    581         # - The join operation acquires the Lock inside the thread's _block
    582         #   Condition.  (See threading.py:Thread.join().)
    583         # - We stub out the acquire method on the condition to force it to wait
    584         #   until the child thread forks.  (See LOCK ACQUIRED HERE)
    585         # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS
    586         #   HERE)
    587         # - The main thread of the parent process enters Condition.wait(),
    588         #   which releases the lock on the child thread.
    589         # - The child process returns.  Without the necessary fix, when the
    590         #   main thread of the child process (which used to be the child thread
    591         #   in the parent process) attempts to exit, it will try to acquire the
    592         #   lock in the Thread._block Condition object and hang, because the
    593         #   lock was held across the fork.
    594 
    595         script = """if 1:
    596             import os, time, threading
    597 
    598             finish_join = False
    599             start_fork = False
    600 
    601             def worker():
    602                 # Wait until this thread's lock is acquired before forking to
    603                 # create the deadlock.
    604                 global finish_join
    605                 while not start_fork:
    606                     time.sleep(0.01)
    607                 # LOCK HELD: Main thread holds lock across this call.
    608                 childpid = os.fork()
    609                 finish_join = True
    610                 if childpid != 0:
    611                     # Parent process just waits for child.
    612                     os.waitpid(childpid, 0)
    613                 # Child process should just return.
    614 
    615             w = threading.Thread(target=worker)
    616 
    617             # Stub out the private condition variable's lock acquire method.
    618             # This acquires the lock and then waits until the child has forked
    619             # before returning, which will release the lock soon after.  If
    620             # someone else tries to fix this test case by acquiring this lock
    621             # before forking instead of resetting it, the test case will
    622             # deadlock when it shouldn't.
    623             condition = w._block
    624             orig_acquire = condition.acquire
    625             call_count_lock = threading.Lock()
    626             call_count = 0
    627             def my_acquire():
    628                 global call_count
    629                 global start_fork
    630                 orig_acquire()  # LOCK ACQUIRED HERE
    631                 start_fork = True
    632                 if call_count == 0:
    633                     while not finish_join:
    634                         time.sleep(0.01)  # WORKER THREAD FORKS HERE
    635                 with call_count_lock:
    636                     call_count += 1
    637             condition.acquire = my_acquire
    638 
    639             w.start()
    640             w.join()
    641             print('end of main')
    642             """
    643         self.assertScriptHasOutput(script, "end of main\n")
    644 
    645     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    646     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    647     def test_5_clear_waiter_locks_to_avoid_crash(self):
    648         # Check that a spawned thread that forks doesn't segfault on certain
    649         # platforms, namely OS X.  This used to happen if there was a waiter
    650         # lock in the thread's condition variable's waiters list.  Even though
    651         # we know the lock will be held across the fork, it is not safe to
    652         # release locks held across forks on all platforms, so releasing the
    653         # waiter lock caused a segfault on OS X.  Furthermore, since locks on
    654         # OS X are (as of this writing) implemented with a mutex + condition
    655         # variable instead of a semaphore, while we know that the Python-level
    656         # lock will be acquired, we can't know if the internal mutex will be
    657         # acquired at the time of the fork.
    658 
    659         script = """if True:
    660             import os, time, threading
    661 
    662             start_fork = False
    663 
    664             def worker():
    665                 # Wait until the main thread has attempted to join this thread
    666                 # before continuing.
    667                 while not start_fork:
    668                     time.sleep(0.01)
    669                 childpid = os.fork()
    670                 if childpid != 0:
    671                     # Parent process just waits for child.
    672                     (cpid, rc) = os.waitpid(childpid, 0)
    673                     assert cpid == childpid
    674                     assert rc == 0
    675                     print('end of worker thread')
    676                 else:
    677                     # Child process should just return.
    678                     pass
    679 
    680             w = threading.Thread(target=worker)
    681 
    682             # Stub out the private condition variable's _release_save method.
    683             # This releases the condition's lock and flips the global that
    684             # causes the worker to fork.  At this point, the problematic waiter
    685             # lock has been acquired once by the waiter and has been put onto
    686             # the waiters list.
    687             condition = w._block
    688             orig_release_save = condition._release_save
    689             def my_release_save():
    690                 global start_fork
    691                 orig_release_save()
    692                 # Waiter lock held here, condition lock released.
    693                 start_fork = True
    694             condition._release_save = my_release_save
    695 
    696             w.start()
    697             w.join()
    698             print('end of main thread')
    699             """
    700         output = "end of worker thread\nend of main thread\n"
    701         self.assertScriptHasOutput(script, output)
    702 
    703     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    704     @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
    705     def test_reinit_tls_after_fork(self):
    706         # Issue #13817: fork() would deadlock in a multithreaded program with
    707         # the ad-hoc TLS implementation.
    708 
    709         def do_fork_and_wait():
    710             # just fork a child process and wait it
    711             pid = os.fork()
    712             if pid > 0:
    713                 os.waitpid(pid, 0)
    714             else:
    715                 os._exit(0)
    716 
    717         # start a bunch of threads that will fork() child processes
    718         threads = []
    719         for i in range(16):
    720             t = threading.Thread(target=do_fork_and_wait)
    721             threads.append(t)
    722             t.start()
    723 
    724         for t in threads:
    725             t.join()
    726 
    727     @cpython_only
    728     @unittest.skipIf(_testcapi is None, "need _testcapi module")
    729     def test_frame_tstate_tracing(self):
    730         # Issue #14432: Crash when a generator is created in a C thread that is
    731         # destroyed while the generator is still used. The issue was that a
    732         # generator contains a frame, and the frame kept a reference to the
    733         # Python state of the destroyed C thread. The crash occurs when a trace
    734         # function is setup.
    735 
    736         def noop_trace(frame, event, arg):
    737             # no operation
    738             return noop_trace
    739 
    740         def generator():
    741             while 1:
    742                 yield "generator"
    743 
    744         def callback():
    745             if callback.gen is None:
    746                 callback.gen = generator()
    747             return next(callback.gen)
    748         callback.gen = None
    749 
    750         old_trace = sys.gettrace()
    751         sys.settrace(noop_trace)
    752         try:
    753             # Install a trace function
    754             threading.settrace(noop_trace)
    755 
    756             # Create a generator in a C thread which exits after the call
    757             _testcapi.call_in_temporary_c_thread(callback)
    758 
    759             # Call the generator in a different Python thread, check that the
    760             # generator didn't keep a reference to the destroyed thread state
    761             for test in range(3):
    762                 # The trace function is still called here
    763                 callback()
    764         finally:
    765             sys.settrace(old_trace)
    766 
    767 
    768 class ThreadingExceptionTests(BaseTestCase):
    769     # A RuntimeError should be raised if Thread.start() is called
    770     # multiple times.
    771     def test_start_thread_again(self):
    772         thread = threading.Thread()
    773         thread.start()
    774         self.assertRaises(RuntimeError, thread.start)
    775 
    776     def test_joining_current_thread(self):
    777         current_thread = threading.current_thread()
    778         self.assertRaises(RuntimeError, current_thread.join);
    779 
    780     def test_joining_inactive_thread(self):
    781         thread = threading.Thread()
    782         self.assertRaises(RuntimeError, thread.join)
    783 
    784     def test_daemonize_active_thread(self):
    785         thread = threading.Thread()
    786         thread.start()
    787         self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
    788 
    789     def test_print_exception(self):
    790         script = r"""if 1:
    791             import threading
    792             import time
    793 
    794             running = False
    795             def run():
    796                 global running
    797                 running = True
    798                 while running:
    799                     time.sleep(0.01)
    800                 1.0/0.0
    801             t = threading.Thread(target=run)
    802             t.start()
    803             while not running:
    804                 time.sleep(0.01)
    805             running = False
    806             t.join()
    807             """
    808         rc, out, err = assert_python_ok("-c", script)
    809         self.assertEqual(out, '')
    810         self.assertIn("Exception in thread", err)
    811         self.assertIn("Traceback (most recent call last):", err)
    812         self.assertIn("ZeroDivisionError", err)
    813         self.assertNotIn("Unhandled exception", err)
    814 
    815     def test_print_exception_stderr_is_none_1(self):
    816         script = r"""if 1:
    817             import sys
    818             import threading
    819             import time
    820 
    821             running = False
    822             def run():
    823                 global running
    824                 running = True
    825                 while running:
    826                     time.sleep(0.01)
    827                 1.0/0.0
    828             t = threading.Thread(target=run)
    829             t.start()
    830             while not running:
    831                 time.sleep(0.01)
    832             sys.stderr = None
    833             running = False
    834             t.join()
    835             """
    836         rc, out, err = assert_python_ok("-c", script)
    837         self.assertEqual(out, '')
    838         self.assertIn("Exception in thread", err)
    839         self.assertIn("Traceback (most recent call last):", err)
    840         self.assertIn("ZeroDivisionError", err)
    841         self.assertNotIn("Unhandled exception", err)
    842 
    843     def test_print_exception_stderr_is_none_2(self):
    844         script = r"""if 1:
    845             import sys
    846             import threading
    847             import time
    848 
    849             running = False
    850             def run():
    851                 global running
    852                 running = True
    853                 while running:
    854                     time.sleep(0.01)
    855                 1.0/0.0
    856             sys.stderr = None
    857             t = threading.Thread(target=run)
    858             t.start()
    859             while not running:
    860                 time.sleep(0.01)
    861             running = False
    862             t.join()
    863             """
    864         rc, out, err = assert_python_ok("-c", script)
    865         self.assertEqual(out, '')
    866         self.assertNotIn("Unhandled exception", err)
    867 
    868 
    869 class LockTests(lock_tests.LockTests):
    870     locktype = staticmethod(threading.Lock)
    871 
    872 class RLockTests(lock_tests.RLockTests):
    873     locktype = staticmethod(threading.RLock)
    874 
    875 class EventTests(lock_tests.EventTests):
    876     eventtype = staticmethod(threading.Event)
    877 
    878 class ConditionAsRLockTests(lock_tests.RLockTests):
    879     # Condition uses an RLock by default and exports its API.
    880     locktype = staticmethod(threading.Condition)
    881 
    882 class ConditionTests(lock_tests.ConditionTests):
    883     condtype = staticmethod(threading.Condition)
    884 
    885 class SemaphoreTests(lock_tests.SemaphoreTests):
    886     semtype = staticmethod(threading.Semaphore)
    887 
    888 class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
    889     semtype = staticmethod(threading.BoundedSemaphore)
    890 
    891     @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
    892     def test_recursion_limit(self):
    893         # Issue 9670
    894         # test that excessive recursion within a non-main thread causes
    895         # an exception rather than crashing the interpreter on platforms
    896         # like Mac OS X or FreeBSD which have small default stack sizes
    897         # for threads
    898         script = """if True:
    899             import threading
    900 
    901             def recurse():
    902                 return recurse()
    903 
    904             def outer():
    905                 try:
    906                     recurse()
    907                 except RuntimeError:
    908                     pass
    909 
    910             w = threading.Thread(target=outer)
    911             w.start()
    912             w.join()
    913             print('end of main thread')
    914             """
    915         expected_output = "end of main thread\n"
    916         p = subprocess.Popen([sys.executable, "-c", script],
    917                              stdout=subprocess.PIPE)
    918         stdout, stderr = p.communicate()
    919         data = stdout.decode().replace('\r', '')
    920         self.assertEqual(p.returncode, 0, "Unexpected error")
    921         self.assertEqual(data, expected_output)
    922 
    923 def test_main():
    924     test.test_support.run_unittest(LockTests, RLockTests, EventTests,
    925                                    ConditionAsRLockTests, ConditionTests,
    926                                    SemaphoreTests, BoundedSemaphoreTests,
    927                                    ThreadTests,
    928                                    ThreadJoinOnShutdown,
    929                                    ThreadingExceptionTests,
    930                                    )
    931 
    932 if __name__ == "__main__":
    933     test_main()
    934