Home | History | Annotate | Download | only in test
      1 # Very rudimentary test of threading module

      2 
      3 import test.test_support
      4 from test.test_support import verbose
      5 import random
      6 import re
      7 import sys
      8 thread = test.test_support.import_module('thread')
      9 threading = test.test_support.import_module('threading')
     10 import time
     11 import unittest
     12 import weakref
     13 import os
     14 import subprocess
     15 
     16 from test import lock_tests
     17 
     18 # A trivial mutable counter.

     19 class Counter(object):
     20     def __init__(self):
     21         self.value = 0
     22     def inc(self):
     23         self.value += 1
     24     def dec(self):
     25         self.value -= 1
     26     def get(self):
     27         return self.value
     28 
     29 class TestThread(threading.Thread):
     30     def __init__(self, name, testcase, sema, mutex, nrunning):
     31         threading.Thread.__init__(self, name=name)
     32         self.testcase = testcase
     33         self.sema = sema
     34         self.mutex = mutex
     35         self.nrunning = nrunning
     36 
     37     def run(self):
     38         delay = random.random() / 10000.0
     39         if verbose:
     40             print 'task %s will run for %.1f usec' % (
     41                 self.name, delay * 1e6)
     42 
     43         with self.sema:
     44             with self.mutex:
     45                 self.nrunning.inc()
     46                 if verbose:
     47                     print self.nrunning.get(), 'tasks are running'
     48                 self.testcase.assertTrue(self.nrunning.get() <= 3)
     49 
     50             time.sleep(delay)
     51             if verbose:
     52                 print 'task', self.name, 'done'
     53 
     54             with self.mutex:
     55                 self.nrunning.dec()
     56                 self.testcase.assertTrue(self.nrunning.get() >= 0)
     57                 if verbose:
     58                     print '%s is finished. %d tasks are running' % (
     59                         self.name, self.nrunning.get())
     60 
     61 class BaseTestCase(unittest.TestCase):
     62     def setUp(self):
     63         self._threads = test.test_support.threading_setup()
     64 
     65     def tearDown(self):
     66         test.test_support.threading_cleanup(*self._threads)
     67         test.test_support.reap_children()
     68 
     69 
     70 class ThreadTests(BaseTestCase):
     71 
     72     # Create a bunch of threads, let each do some work, wait until all are

     73     # done.

     74     def test_various_ops(self):
     75         # This takes about n/3 seconds to run (about n/3 clumps of tasks,

     76         # times about 1 second per clump).

     77         NUMTASKS = 10
     78 
     79         # no more than 3 of the 10 can run at once

     80         sema = threading.BoundedSemaphore(value=3)
     81         mutex = threading.RLock()
     82         numrunning = Counter()
     83 
     84         threads = []
     85 
     86         for i in range(NUMTASKS):
     87             t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
     88             threads.append(t)
     89             self.assertEqual(t.ident, None)
     90             self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
     91             t.start()
     92 
     93         if verbose:
     94             print 'waiting for all tasks to complete'
     95         for t in threads:
     96             t.join(NUMTASKS)
     97             self.assertTrue(not t.is_alive())
     98             self.assertNotEqual(t.ident, 0)
     99             self.assertFalse(t.ident is None)
    100             self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
    101         if verbose:
    102             print 'all tasks done'
    103         self.assertEqual(numrunning.get(), 0)
    104 
    105     def test_ident_of_no_threading_threads(self):
    106         # The ident still must work for the main thread and dummy threads.

    107         self.assertFalse(threading.currentThread().ident is None)
    108         def f():
    109             ident.append(threading.currentThread().ident)
    110             done.set()
    111         done = threading.Event()
    112         ident = []
    113         thread.start_new_thread(f, ())
    114         done.wait()
    115         self.assertFalse(ident[0] is None)
    116         # Kill the "immortal" _DummyThread

    117         del threading._active[ident[0]]
    118 
    119     # run with a small(ish) thread stack size (256kB)

    120     def test_various_ops_small_stack(self):
    121         if verbose:
    122             print 'with 256kB thread stack size...'
    123         try:
    124             threading.stack_size(262144)
    125         except thread.error:
    126             if verbose:
    127                 print 'platform does not support changing thread stack size'
    128             return
    129         self.test_various_ops()
    130         threading.stack_size(0)
    131 
    132     # run with a large thread stack size (1MB)

    133     def test_various_ops_large_stack(self):
    134         if verbose:
    135             print 'with 1MB thread stack size...'
    136         try:
    137             threading.stack_size(0x100000)
    138         except thread.error:
    139             if verbose:
    140                 print 'platform does not support changing thread stack size'
    141             return
    142         self.test_various_ops()
    143         threading.stack_size(0)
    144 
    145     def test_foreign_thread(self):
    146         # Check that a "foreign" thread can use the threading module.

    147         def f(mutex):
    148             # Calling current_thread() forces an entry for the foreign

    149             # thread to get made in the threading._active map.

    150             threading.current_thread()
    151             mutex.release()
    152 
    153         mutex = threading.Lock()
    154         mutex.acquire()
    155         tid = thread.start_new_thread(f, (mutex,))
    156         # Wait for the thread to finish.

    157         mutex.acquire()
    158         self.assertIn(tid, threading._active)
    159         self.assertIsInstance(threading._active[tid], threading._DummyThread)
    160         del threading._active[tid]
    161 
    162     # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)

    163     # exposed at the Python level.  This test relies on ctypes to get at it.

    164     def test_PyThreadState_SetAsyncExc(self):
    165         try:
    166             import ctypes
    167         except ImportError:
    168             if verbose:
    169                 print "test_PyThreadState_SetAsyncExc can't import ctypes"
    170             return  # can't do anything

    171 
    172         set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
    173 
    174         class AsyncExc(Exception):
    175             pass
    176 
    177         exception = ctypes.py_object(AsyncExc)
    178 
    179         # First check it works when setting the exception from the same thread.

    180         tid = thread.get_ident()
    181 
    182         try:
    183             result = set_async_exc(ctypes.c_long(tid), exception)
    184             # The exception is async, so we might have to keep the VM busy until

    185             # it notices.

    186             while True:
    187                 pass
    188         except AsyncExc:
    189             pass
    190         else:
    191             # This code is unreachable but it reflects the intent. If we wanted

    192             # to be smarter the above loop wouldn't be infinite.

    193             self.fail("AsyncExc not raised")
    194         try:
    195             self.assertEqual(result, 1) # one thread state modified

    196         except UnboundLocalError:
    197             # The exception was raised too quickly for us to get the result.

    198             pass
    199 
    200         # `worker_started` is set by the thread when it's inside a try/except

    201         # block waiting to catch the asynchronously set AsyncExc exception.

    202         # `worker_saw_exception` is set by the thread upon catching that

    203         # exception.

    204         worker_started = threading.Event()
    205         worker_saw_exception = threading.Event()
    206 
    207         class Worker(threading.Thread):
    208             def run(self):
    209                 self.id = thread.get_ident()
    210                 self.finished = False
    211 
    212                 try:
    213                     while True:
    214                         worker_started.set()
    215                         time.sleep(0.1)
    216                 except AsyncExc:
    217                     self.finished = True
    218                     worker_saw_exception.set()
    219 
    220         t = Worker()
    221         t.daemon = True # so if this fails, we don't hang Python at shutdown

    222         t.start()
    223         if verbose:
    224             print "    started worker thread"
    225 
    226         # Try a thread id that doesn't make sense.

    227         if verbose:
    228             print "    trying nonsensical thread id"
    229         result = set_async_exc(ctypes.c_long(-1), exception)
    230         self.assertEqual(result, 0)  # no thread states modified

    231 
    232         # Now raise an exception in the worker thread.

    233         if verbose:
    234             print "    waiting for worker thread to get started"
    235         ret = worker_started.wait()
    236         self.assertTrue(ret)
    237         if verbose:
    238             print "    verifying worker hasn't exited"
    239         self.assertTrue(not t.finished)
    240         if verbose:
    241             print "    attempting to raise asynch exception in worker"
    242         result = set_async_exc(ctypes.c_long(t.id), exception)
    243         self.assertEqual(result, 1) # one thread state modified

    244         if verbose:
    245             print "    waiting for worker to say it caught the exception"
    246         worker_saw_exception.wait(timeout=10)
    247         self.assertTrue(t.finished)
    248         if verbose:
    249             print "    all OK -- joining worker"
    250         if t.finished:
    251             t.join()
    252         # else the thread is still running, and we have no way to kill it

    253 
    254     def test_limbo_cleanup(self):
    255         # Issue 7481: Failure to start thread should cleanup the limbo map.

    256         def fail_new_thread(*args):
    257             raise thread.error()
    258         _start_new_thread = threading._start_new_thread
    259         threading._start_new_thread = fail_new_thread
    260         try:
    261             t = threading.Thread(target=lambda: None)
    262             self.assertRaises(thread.error, t.start)
    263             self.assertFalse(
    264                 t in threading._limbo,
    265                 "Failed to cleanup _limbo map on failure of Thread.start().")
    266         finally:
    267             threading._start_new_thread = _start_new_thread
    268 
    269     def test_finalize_runnning_thread(self):
    270         # Issue 1402: the PyGILState_Ensure / _Release functions may be called

    271         # very late on python exit: on deallocation of a running thread for

    272         # example.

    273         try:
    274             import ctypes
    275         except ImportError:
    276             if verbose:
    277                 print("test_finalize_with_runnning_thread can't import ctypes")
    278             return  # can't do anything

    279 
    280         rc = subprocess.call([sys.executable, "-c", """if 1:
    281             import ctypes, sys, time, thread
    282 
    283             # This lock is used as a simple event variable.
    284             ready = thread.allocate_lock()
    285             ready.acquire()
    286 
    287             # Module globals are cleared before __del__ is run
    288             # So we save the functions in class dict
    289             class C:
    290                 ensure = ctypes.pythonapi.PyGILState_Ensure
    291                 release = ctypes.pythonapi.PyGILState_Release
    292                 def __del__(self):
    293                     state = self.ensure()
    294                     self.release(state)
    295 
    296             def waitingThread():
    297                 x = C()
    298                 ready.release()
    299                 time.sleep(100)
    300 
    301             thread.start_new_thread(waitingThread, ())
    302             ready.acquire()  # Be sure the other thread is waiting.
    303             sys.exit(42)
    304             """])
    305         self.assertEqual(rc, 42)
    306 
    307     def test_finalize_with_trace(self):
    308         # Issue1733757

    309         # Avoid a deadlock when sys.settrace steps into threading._shutdown

    310         p = subprocess.Popen([sys.executable, "-c", """if 1:
    311             import sys, threading
    312 
    313             # A deadlock-killer, to prevent the
    314             # testsuite to hang forever
    315             def killer():
    316                 import os, time
    317                 time.sleep(2)
    318                 print 'program blocked; aborting'
    319                 os._exit(2)
    320             t = threading.Thread(target=killer)
    321             t.daemon = True
    322             t.start()
    323 
    324             # This is the trace function
    325             def func(frame, event, arg):
    326                 threading.current_thread()
    327                 return func
    328 
    329             sys.settrace(func)
    330             """],
    331             stdout=subprocess.PIPE,
    332             stderr=subprocess.PIPE)
    333         self.addCleanup(p.stdout.close)
    334         self.addCleanup(p.stderr.close)
    335         stdout, stderr = p.communicate()
    336         rc = p.returncode
    337         self.assertFalse(rc == 2, "interpreted was blocked")
    338         self.assertTrue(rc == 0,
    339                         "Unexpected error: " + repr(stderr))
    340 
    341     def test_join_nondaemon_on_shutdown(self):
    342         # Issue 1722344

    343         # Raising SystemExit skipped threading._shutdown

    344         p = subprocess.Popen([sys.executable, "-c", """if 1:
    345                 import threading
    346                 from time import sleep
    347 
    348                 def child():
    349                     sleep(1)
    350                     # As a non-daemon thread we SHOULD wake up and nothing
    351                     # should be torn down yet
    352                     print "Woke up, sleep function is:", sleep
    353 
    354                 threading.Thread(target=child).start()
    355                 raise SystemExit
    356             """],
    357             stdout=subprocess.PIPE,
    358             stderr=subprocess.PIPE)
    359         self.addCleanup(p.stdout.close)
    360         self.addCleanup(p.stderr.close)
    361         stdout, stderr = p.communicate()
    362         self.assertEqual(stdout.strip(),
    363             "Woke up, sleep function is: <built-in function sleep>")
    364         stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
    365         self.assertEqual(stderr, "")
    366 
    367     def test_enumerate_after_join(self):
    368         # Try hard to trigger #1703448: a thread is still returned in

    369         # threading.enumerate() after it has been join()ed.

    370         enum = threading.enumerate
    371         old_interval = sys.getcheckinterval()
    372         try:
    373             for i in xrange(1, 100):
    374                 # Try a couple times at each thread-switching interval

    375                 # to get more interleavings.

    376                 sys.setcheckinterval(i // 5)
    377                 t = threading.Thread(target=lambda: None)
    378                 t.start()
    379                 t.join()
    380                 l = enum()
    381                 self.assertNotIn(t, l,
    382                     "#1703448 triggered after %d trials: %s" % (i, l))
    383         finally:
    384             sys.setcheckinterval(old_interval)
    385 
    386     def test_no_refcycle_through_target(self):
    387         class RunSelfFunction(object):
    388             def __init__(self, should_raise):
    389                 # The links in this refcycle from Thread back to self

    390                 # should be cleaned up when the thread completes.

    391                 self.should_raise = should_raise
    392                 self.thread = threading.Thread(target=self._run,
    393                                                args=(self,),
    394                                                kwargs={'yet_another':self})
    395                 self.thread.start()
    396 
    397             def _run(self, other_ref, yet_another):
    398                 if self.should_raise:
    399                     raise SystemExit
    400 
    401         cyclic_object = RunSelfFunction(should_raise=False)
    402         weak_cyclic_object = weakref.ref(cyclic_object)
    403         cyclic_object.thread.join()
    404         del cyclic_object
    405         self.assertEqual(None, weak_cyclic_object(),
    406                          msg=('%d references still around' %
    407                               sys.getrefcount(weak_cyclic_object())))
    408 
    409         raising_cyclic_object = RunSelfFunction(should_raise=True)
    410         weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
    411         raising_cyclic_object.thread.join()
    412         del raising_cyclic_object
    413         self.assertEqual(None, weak_raising_cyclic_object(),
    414                          msg=('%d references still around' %
    415                               sys.getrefcount(weak_raising_cyclic_object())))
    416 
    417 
    418 class ThreadJoinOnShutdown(BaseTestCase):
    419 
    420     def _run_and_join(self, script):
    421         script = """if 1:
    422             import sys, os, time, threading
    423 
    424             # a thread, which waits for the main program to terminate
    425             def joiningfunc(mainthread):
    426                 mainthread.join()
    427                 print 'end of thread'
    428         \n""" + script
    429 
    430         p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
    431         rc = p.wait()
    432         data = p.stdout.read().replace('\r', '')
    433         p.stdout.close()
    434         self.assertEqual(data, "end of main\nend of thread\n")
    435         self.assertFalse(rc == 2, "interpreter was blocked")
    436         self.assertTrue(rc == 0, "Unexpected error")
    437 
    438     def test_1_join_on_shutdown(self):
    439         # The usual case: on exit, wait for a non-daemon thread

    440         script = """if 1:
    441             import os
    442             t = threading.Thread(target=joiningfunc,
    443                                  args=(threading.current_thread(),))
    444             t.start()
    445             time.sleep(0.1)
    446             print 'end of main'
    447             """
    448         self._run_and_join(script)
    449 
    450 
    451     def test_2_join_in_forked_process(self):
    452         # Like the test above, but from a forked interpreter

    453         import os
    454         if not hasattr(os, 'fork'):
    455             return
    456         script = """if 1:
    457             childpid = os.fork()
    458             if childpid != 0:
    459                 os.waitpid(childpid, 0)
    460                 sys.exit(0)
    461 
    462             t = threading.Thread(target=joiningfunc,
    463                                  args=(threading.current_thread(),))
    464             t.start()
    465             print 'end of main'
    466             """
    467         self._run_and_join(script)
    468 
    469     def test_3_join_in_forked_from_thread(self):
    470         # Like the test above, but fork() was called from a worker thread

    471         # In the forked process, the main Thread object must be marked as stopped.

    472         import os
    473         if not hasattr(os, 'fork'):
    474             return
    475         # Skip platforms with known problems forking from a worker thread.

    476         # See http://bugs.python.org/issue3863.

    477         if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
    478                            'os2emx'):
    479             print >>sys.stderr, ('Skipping test_3_join_in_forked_from_thread'
    480                                  ' due to known OS bugs on'), sys.platform
    481             return
    482         script = """if 1:
    483             main_thread = threading.current_thread()
    484             def worker():
    485                 childpid = os.fork()
    486                 if childpid != 0:
    487                     os.waitpid(childpid, 0)
    488                     sys.exit(0)
    489 
    490                 t = threading.Thread(target=joiningfunc,
    491                                      args=(main_thread,))
    492                 print 'end of main'
    493                 t.start()
    494                 t.join() # Should not block: main_thread is already stopped
    495 
    496             w = threading.Thread(target=worker)
    497             w.start()
    498             """
    499         self._run_and_join(script)
    500 
    501     def assertScriptHasOutput(self, script, expected_output):
    502         p = subprocess.Popen([sys.executable, "-c", script],
    503                              stdout=subprocess.PIPE)
    504         rc = p.wait()
    505         data = p.stdout.read().decode().replace('\r', '')
    506         self.assertEqual(rc, 0, "Unexpected error")
    507         self.assertEqual(data, expected_output)
    508 
    509     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    510     def test_4_joining_across_fork_in_worker_thread(self):
    511         # There used to be a possible deadlock when forking from a child

    512         # thread.  See http://bugs.python.org/issue6643.

    513 
    514         # Skip platforms with known problems forking from a worker thread.

    515         # See http://bugs.python.org/issue3863.

    516         if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
    517             raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
    518 
    519         # The script takes the following steps:

    520         # - The main thread in the parent process starts a new thread and then

    521         #   tries to join it.

    522         # - The join operation acquires the Lock inside the thread's _block

    523         #   Condition.  (See threading.py:Thread.join().)

    524         # - We stub out the acquire method on the condition to force it to wait

    525         #   until the child thread forks.  (See LOCK ACQUIRED HERE)

    526         # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS

    527         #   HERE)

    528         # - The main thread of the parent process enters Condition.wait(),

    529         #   which releases the lock on the child thread.

    530         # - The child process returns.  Without the necessary fix, when the

    531         #   main thread of the child process (which used to be the child thread

    532         #   in the parent process) attempts to exit, it will try to acquire the

    533         #   lock in the Thread._block Condition object and hang, because the

    534         #   lock was held across the fork.

    535 
    536         script = """if 1:
    537             import os, time, threading
    538 
    539             finish_join = False
    540             start_fork = False
    541 
    542             def worker():
    543                 # Wait until this thread's lock is acquired before forking to
    544                 # create the deadlock.
    545                 global finish_join
    546                 while not start_fork:
    547                     time.sleep(0.01)
    548                 # LOCK HELD: Main thread holds lock across this call.
    549                 childpid = os.fork()
    550                 finish_join = True
    551                 if childpid != 0:
    552                     # Parent process just waits for child.
    553                     os.waitpid(childpid, 0)
    554                 # Child process should just return.
    555 
    556             w = threading.Thread(target=worker)
    557 
    558             # Stub out the private condition variable's lock acquire method.
    559             # This acquires the lock and then waits until the child has forked
    560             # before returning, which will release the lock soon after.  If
    561             # someone else tries to fix this test case by acquiring this lock
    562             # before forking instead of resetting it, the test case will
    563             # deadlock when it shouldn't.
    564             condition = w._block
    565             orig_acquire = condition.acquire
    566             call_count_lock = threading.Lock()
    567             call_count = 0
    568             def my_acquire():
    569                 global call_count
    570                 global start_fork
    571                 orig_acquire()  # LOCK ACQUIRED HERE
    572                 start_fork = True
    573                 if call_count == 0:
    574                     while not finish_join:
    575                         time.sleep(0.01)  # WORKER THREAD FORKS HERE
    576                 with call_count_lock:
    577                     call_count += 1
    578             condition.acquire = my_acquire
    579 
    580             w.start()
    581             w.join()
    582             print('end of main')
    583             """
    584         self.assertScriptHasOutput(script, "end of main\n")
    585 
    586     @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
    587     def test_5_clear_waiter_locks_to_avoid_crash(self):
    588         # Check that a spawned thread that forks doesn't segfault on certain

    589         # platforms, namely OS X.  This used to happen if there was a waiter

    590         # lock in the thread's condition variable's waiters list.  Even though

    591         # we know the lock will be held across the fork, it is not safe to

    592         # release locks held across forks on all platforms, so releasing the

    593         # waiter lock caused a segfault on OS X.  Furthermore, since locks on

    594         # OS X are (as of this writing) implemented with a mutex + condition

    595         # variable instead of a semaphore, while we know that the Python-level

    596         # lock will be acquired, we can't know if the internal mutex will be

    597         # acquired at the time of the fork.

    598 
    599         # Skip platforms with known problems forking from a worker thread.

    600         # See http://bugs.python.org/issue3863.

    601         if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
    602             raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
    603         script = """if True:
    604             import os, time, threading
    605 
    606             start_fork = False
    607 
    608             def worker():
    609                 # Wait until the main thread has attempted to join this thread
    610                 # before continuing.
    611                 while not start_fork:
    612                     time.sleep(0.01)
    613                 childpid = os.fork()
    614                 if childpid != 0:
    615                     # Parent process just waits for child.
    616                     (cpid, rc) = os.waitpid(childpid, 0)
    617                     assert cpid == childpid
    618                     assert rc == 0
    619                     print('end of worker thread')
    620                 else:
    621                     # Child process should just return.
    622                     pass
    623 
    624             w = threading.Thread(target=worker)
    625 
    626             # Stub out the private condition variable's _release_save method.
    627             # This releases the condition's lock and flips the global that
    628             # causes the worker to fork.  At this point, the problematic waiter
    629             # lock has been acquired once by the waiter and has been put onto
    630             # the waiters list.
    631             condition = w._block
    632             orig_release_save = condition._release_save
    633             def my_release_save():
    634                 global start_fork
    635                 orig_release_save()
    636                 # Waiter lock held here, condition lock released.
    637                 start_fork = True
    638             condition._release_save = my_release_save
    639 
    640             w.start()
    641             w.join()
    642             print('end of main thread')
    643             """
    644         output = "end of worker thread\nend of main thread\n"
    645         self.assertScriptHasOutput(script, output)
    646 
    647 
    648 class ThreadingExceptionTests(BaseTestCase):
    649     # A RuntimeError should be raised if Thread.start() is called

    650     # multiple times.

    651     def test_start_thread_again(self):
    652         thread = threading.Thread()
    653         thread.start()
    654         self.assertRaises(RuntimeError, thread.start)
    655 
    656     def test_joining_current_thread(self):
    657         current_thread = threading.current_thread()
    658         self.assertRaises(RuntimeError, current_thread.join);
    659 
    660     def test_joining_inactive_thread(self):
    661         thread = threading.Thread()
    662         self.assertRaises(RuntimeError, thread.join)
    663 
    664     def test_daemonize_active_thread(self):
    665         thread = threading.Thread()
    666         thread.start()
    667         self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
    668 
    669 
    670 class LockTests(lock_tests.LockTests):
    671     locktype = staticmethod(threading.Lock)
    672 
    673 class RLockTests(lock_tests.RLockTests):
    674     locktype = staticmethod(threading.RLock)
    675 
    676 class EventTests(lock_tests.EventTests):
    677     eventtype = staticmethod(threading.Event)
    678 
    679 class ConditionAsRLockTests(lock_tests.RLockTests):
    680     # An Condition uses an RLock by default and exports its API.

    681     locktype = staticmethod(threading.Condition)
    682 
    683 class ConditionTests(lock_tests.ConditionTests):
    684     condtype = staticmethod(threading.Condition)
    685 
    686 class SemaphoreTests(lock_tests.SemaphoreTests):
    687     semtype = staticmethod(threading.Semaphore)
    688 
    689 class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
    690     semtype = staticmethod(threading.BoundedSemaphore)
    691 
    692     @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
    693     def test_recursion_limit(self):
    694         # Issue 9670

    695         # test that excessive recursion within a non-main thread causes

    696         # an exception rather than crashing the interpreter on platforms

    697         # like Mac OS X or FreeBSD which have small default stack sizes

    698         # for threads

    699         script = """if True:
    700             import threading
    701 
    702             def recurse():
    703                 return recurse()
    704 
    705             def outer():
    706                 try:
    707                     recurse()
    708                 except RuntimeError:
    709                     pass
    710 
    711             w = threading.Thread(target=outer)
    712             w.start()
    713             w.join()
    714             print('end of main thread')
    715             """
    716         expected_output = "end of main thread\n"
    717         p = subprocess.Popen([sys.executable, "-c", script],
    718                              stdout=subprocess.PIPE)
    719         stdout, stderr = p.communicate()
    720         data = stdout.decode().replace('\r', '')
    721         self.assertEqual(p.returncode, 0, "Unexpected error")
    722         self.assertEqual(data, expected_output)
    723 
    724 def test_main():
    725     test.test_support.run_unittest(LockTests, RLockTests, EventTests,
    726                                    ConditionAsRLockTests, ConditionTests,
    727                                    SemaphoreTests, BoundedSemaphoreTests,
    728                                    ThreadTests,
    729                                    ThreadJoinOnShutdown,
    730                                    ThreadingExceptionTests,
    731                                    )
    732 
    733 if __name__ == "__main__":
    734     test_main()
    735