Home | History | Annotate | Download | only in test
      1 import os
      2 import unittest
      3 import random
      4 from test import support
      5 import _thread as thread
      6 import time
      7 import sys
      8 import weakref
      9 
     10 from test import lock_tests
     11 
     12 NUMTASKS = 10
     13 NUMTRIPS = 3
     14 POLL_SLEEP = 0.010 # seconds = 10 ms
     15 
     16 _print_mutex = thread.allocate_lock()
     17 
     18 def verbose_print(arg):
     19     """Helper function for printing out debugging output."""
     20     if support.verbose:
     21         with _print_mutex:
     22             print(arg)
     23 
     24 
     25 class BasicThreadTest(unittest.TestCase):
     26 
     27     def setUp(self):
     28         self.done_mutex = thread.allocate_lock()
     29         self.done_mutex.acquire()
     30         self.running_mutex = thread.allocate_lock()
     31         self.random_mutex = thread.allocate_lock()
     32         self.created = 0
     33         self.running = 0
     34         self.next_ident = 0
     35 
     36         key = support.threading_setup()
     37         self.addCleanup(support.threading_cleanup, *key)
     38 
     39 
     40 class ThreadRunningTests(BasicThreadTest):
     41 
     42     def newtask(self):
     43         with self.running_mutex:
     44             self.next_ident += 1
     45             verbose_print("creating task %s" % self.next_ident)
     46             thread.start_new_thread(self.task, (self.next_ident,))
     47             self.created += 1
     48             self.running += 1
     49 
     50     def task(self, ident):
     51         with self.random_mutex:
     52             delay = random.random() / 10000.0
     53         verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
     54         time.sleep(delay)
     55         verbose_print("task %s done" % ident)
     56         with self.running_mutex:
     57             self.running -= 1
     58             if self.created == NUMTASKS and self.running == 0:
     59                 self.done_mutex.release()
     60 
     61     def test_starting_threads(self):
     62         with support.wait_threads_exit():
     63             # Basic test for thread creation.
     64             for i in range(NUMTASKS):
     65                 self.newtask()
     66             verbose_print("waiting for tasks to complete...")
     67             self.done_mutex.acquire()
     68             verbose_print("all tasks done")
     69 
     70     def test_stack_size(self):
     71         # Various stack size tests.
     72         self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
     73 
     74         thread.stack_size(0)
     75         self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
     76 
     77     @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
     78     def test_nt_and_posix_stack_size(self):
     79         try:
     80             thread.stack_size(4096)
     81         except ValueError:
     82             verbose_print("caught expected ValueError setting "
     83                             "stack_size(4096)")
     84         except thread.error:
     85             self.skipTest("platform does not support changing thread stack "
     86                           "size")
     87 
     88         fail_msg = "stack_size(%d) failed - should succeed"
     89         for tss in (262144, 0x100000, 0):
     90             thread.stack_size(tss)
     91             self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
     92             verbose_print("successfully set stack_size(%d)" % tss)
     93 
     94         for tss in (262144, 0x100000):
     95             verbose_print("trying stack_size = (%d)" % tss)
     96             self.next_ident = 0
     97             self.created = 0
     98             with support.wait_threads_exit():
     99                 for i in range(NUMTASKS):
    100                     self.newtask()
    101 
    102                 verbose_print("waiting for all tasks to complete")
    103                 self.done_mutex.acquire()
    104                 verbose_print("all tasks done")
    105 
    106         thread.stack_size(0)
    107 
    108     def test__count(self):
    109         # Test the _count() function.
    110         orig = thread._count()
    111         mut = thread.allocate_lock()
    112         mut.acquire()
    113         started = []
    114 
    115         def task():
    116             started.append(None)
    117             mut.acquire()
    118             mut.release()
    119 
    120         with support.wait_threads_exit():
    121             thread.start_new_thread(task, ())
    122             while not started:
    123                 time.sleep(POLL_SLEEP)
    124             self.assertEqual(thread._count(), orig + 1)
    125             # Allow the task to finish.
    126             mut.release()
    127             # The only reliable way to be sure that the thread ended from the
    128             # interpreter's point of view is to wait for the function object to be
    129             # destroyed.
    130             done = []
    131             wr = weakref.ref(task, lambda _: done.append(None))
    132             del task
    133             while not done:
    134                 time.sleep(POLL_SLEEP)
    135             self.assertEqual(thread._count(), orig)
    136 
    137     def test_save_exception_state_on_error(self):
    138         # See issue #14474
    139         def task():
    140             started.release()
    141             raise SyntaxError
    142         def mywrite(self, *args):
    143             try:
    144                 raise ValueError
    145             except ValueError:
    146                 pass
    147             real_write(self, *args)
    148         started = thread.allocate_lock()
    149         with support.captured_output("stderr") as stderr:
    150             real_write = stderr.write
    151             stderr.write = mywrite
    152             started.acquire()
    153             with support.wait_threads_exit():
    154                 thread.start_new_thread(task, ())
    155                 started.acquire()
    156         self.assertIn("Traceback", stderr.getvalue())
    157 
    158 
    159 class Barrier:
    160     def __init__(self, num_threads):
    161         self.num_threads = num_threads
    162         self.waiting = 0
    163         self.checkin_mutex  = thread.allocate_lock()
    164         self.checkout_mutex = thread.allocate_lock()
    165         self.checkout_mutex.acquire()
    166 
    167     def enter(self):
    168         self.checkin_mutex.acquire()
    169         self.waiting = self.waiting + 1
    170         if self.waiting == self.num_threads:
    171             self.waiting = self.num_threads - 1
    172             self.checkout_mutex.release()
    173             return
    174         self.checkin_mutex.release()
    175 
    176         self.checkout_mutex.acquire()
    177         self.waiting = self.waiting - 1
    178         if self.waiting == 0:
    179             self.checkin_mutex.release()
    180             return
    181         self.checkout_mutex.release()
    182 
    183 
    184 class BarrierTest(BasicThreadTest):
    185 
    186     def test_barrier(self):
    187         with support.wait_threads_exit():
    188             self.bar = Barrier(NUMTASKS)
    189             self.running = NUMTASKS
    190             for i in range(NUMTASKS):
    191                 thread.start_new_thread(self.task2, (i,))
    192             verbose_print("waiting for tasks to end")
    193             self.done_mutex.acquire()
    194             verbose_print("tasks done")
    195 
    196     def task2(self, ident):
    197         for i in range(NUMTRIPS):
    198             if ident == 0:
    199                 # give it a good chance to enter the next
    200                 # barrier before the others are all out
    201                 # of the current one
    202                 delay = 0
    203             else:
    204                 with self.random_mutex:
    205                     delay = random.random() / 10000.0
    206             verbose_print("task %s will run for %sus" %
    207                           (ident, round(delay * 1e6)))
    208             time.sleep(delay)
    209             verbose_print("task %s entering %s" % (ident, i))
    210             self.bar.enter()
    211             verbose_print("task %s leaving barrier" % ident)
    212         with self.running_mutex:
    213             self.running -= 1
    214             # Must release mutex before releasing done, else the main thread can
    215             # exit and set mutex to None as part of global teardown; then
    216             # mutex.release() raises AttributeError.
    217             finished = self.running == 0
    218         if finished:
    219             self.done_mutex.release()
    220 
    221 class LockTests(lock_tests.LockTests):
    222     locktype = thread.allocate_lock
    223 
    224 
    225 class TestForkInThread(unittest.TestCase):
    226     def setUp(self):
    227         self.read_fd, self.write_fd = os.pipe()
    228 
    229     @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
    230     @support.reap_threads
    231     def test_forkinthread(self):
    232         status = "not set"
    233 
    234         def thread1():
    235             nonlocal status
    236 
    237             # fork in a thread
    238             pid = os.fork()
    239             if pid == 0:
    240                 # child
    241                 try:
    242                     os.close(self.read_fd)
    243                     os.write(self.write_fd, b"OK")
    244                 finally:
    245                     os._exit(0)
    246             else:
    247                 # parent
    248                 os.close(self.write_fd)
    249                 pid, status = os.waitpid(pid, 0)
    250 
    251         with support.wait_threads_exit():
    252             thread.start_new_thread(thread1, ())
    253             self.assertEqual(os.read(self.read_fd, 2), b"OK",
    254                              "Unable to fork() in thread")
    255         self.assertEqual(status, 0)
    256 
    257     def tearDown(self):
    258         try:
    259             os.close(self.read_fd)
    260         except OSError:
    261             pass
    262 
    263         try:
    264             os.close(self.write_fd)
    265         except OSError:
    266             pass
    267 
    268 
    269 if __name__ == "__main__":
    270     unittest.main()
    271