Home | History | Annotate | Download | only in test
      1 import os
      2 import unittest
      3 import random
      4 from test import support
      5 thread = support.import_module('_thread')
      6 import time
      7 import sys
      8 import weakref
     10 from test import lock_tests
     12 NUMTASKS = 10
     13 NUMTRIPS = 3
     15 _print_mutex = thread.allocate_lock()
     17 def verbose_print(arg):
     18     """Helper function for printing out debugging output."""
     19     if support.verbose:
     20         with _print_mutex:
     21             print(arg)
     23 class BasicThreadTest(unittest.TestCase):
     25     def setUp(self):
     26         self.done_mutex = thread.allocate_lock()
     27         self.done_mutex.acquire()
     28         self.running_mutex = thread.allocate_lock()
     29         self.random_mutex = thread.allocate_lock()
     30         self.created = 0
     31         self.running = 0
     32         self.next_ident = 0
     35 class ThreadRunningTests(BasicThreadTest):
     37     def newtask(self):
     38         with self.running_mutex:
     39             self.next_ident += 1
     40             verbose_print("creating task %s" % self.next_ident)
     41             thread.start_new_thread(self.task, (self.next_ident,))
     42             self.created += 1
     43             self.running += 1
     45     def task(self, ident):
     46         with self.random_mutex:
     47             delay = random.random() / 10000.0
     48         verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
     49         time.sleep(delay)
     50         verbose_print("task %s done" % ident)
     51         with self.running_mutex:
     52             self.running -= 1
     53             if self.created == NUMTASKS and self.running == 0:
     54                 self.done_mutex.release()
     56     def test_starting_threads(self):
     57         # Basic test for thread creation.
     58         for i in range(NUMTASKS):
     59             self.newtask()
     60         verbose_print("waiting for tasks to complete...")
     61         self.done_mutex.acquire()
     62         verbose_print("all tasks done")
     64     def test_stack_size(self):
     65         # Various stack size tests.
     66         self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
     68         thread.stack_size(0)
     69         self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
     71     @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
     72     def test_nt_and_posix_stack_size(self):
     73         try:
     74             thread.stack_size(4096)
     75         except ValueError:
     76             verbose_print("caught expected ValueError setting "
     77                             "stack_size(4096)")
     78         except thread.error:
     79             self.skipTest("platform does not support changing thread stack "
     80                           "size")
     82         fail_msg = "stack_size(%d) failed - should succeed"
     83         for tss in (262144, 0x100000, 0):
     84             thread.stack_size(tss)
     85             self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
     86             verbose_print("successfully set stack_size(%d)" % tss)
     88         for tss in (262144, 0x100000):
     89             verbose_print("trying stack_size = (%d)" % tss)
     90             self.next_ident = 0
     91             self.created = 0
     92             for i in range(NUMTASKS):
     93                 self.newtask()
     95             verbose_print("waiting for all tasks to complete")
     96             self.done_mutex.acquire()
     97             verbose_print("all tasks done")
     99         thread.stack_size(0)
    101     def test__count(self):
    102         # Test the _count() function.
    103         orig = thread._count()
    104         mut = thread.allocate_lock()
    105         mut.acquire()
    106         started = []
    107         def task():
    108             started.append(None)
    109             mut.acquire()
    110             mut.release()
    111         thread.start_new_thread(task, ())
    112         while not started:
    113             time.sleep(0.01)
    114         self.assertEqual(thread._count(), orig + 1)
    115         # Allow the task to finish.
    116         mut.release()
    117         # The only reliable way to be sure that the thread ended from the
    118         # interpreter's point of view is to wait for the function object to be
    119         # destroyed.
    120         done = []
    121         wr = weakref.ref(task, lambda _: done.append(None))
    122         del task
    123         while not done:
    124             time.sleep(0.01)
    125         self.assertEqual(thread._count(), orig)
    127     def test_save_exception_state_on_error(self):
    128         # See issue #14474
    129         def task():
    130             started.release()
    131             raise SyntaxError
    132         def mywrite(self, *args):
    133             try:
    134                 raise ValueError
    135             except ValueError:
    136                 pass
    137             real_write(self, *args)
    138         c = thread._count()
    139         started = thread.allocate_lock()
    140         with support.captured_output("stderr") as stderr:
    141             real_write = stderr.write
    142             stderr.write = mywrite
    143             started.acquire()
    144             thread.start_new_thread(task, ())
    145             started.acquire()
    146             while thread._count() > c:
    147                 time.sleep(0.01)
    148         self.assertIn("Traceback", stderr.getvalue())
    151 class Barrier:
    152     def __init__(self, num_threads):
    153         self.num_threads = num_threads
    154         self.waiting = 0
    155         self.checkin_mutex  = thread.allocate_lock()
    156         self.checkout_mutex = thread.allocate_lock()
    157         self.checkout_mutex.acquire()
    159     def enter(self):
    160         self.checkin_mutex.acquire()
    161         self.waiting = self.waiting + 1
    162         if self.waiting == self.num_threads:
    163             self.waiting = self.num_threads - 1
    164             self.checkout_mutex.release()
    165             return
    166         self.checkin_mutex.release()
    168         self.checkout_mutex.acquire()
    169         self.waiting = self.waiting - 1
    170         if self.waiting == 0:
    171             self.checkin_mutex.release()
    172             return
    173         self.checkout_mutex.release()
    176 class BarrierTest(BasicThreadTest):
    178     def test_barrier(self):
    179         self.bar = Barrier(NUMTASKS)
    180         self.running = NUMTASKS
    181         for i in range(NUMTASKS):
    182             thread.start_new_thread(self.task2, (i,))
    183         verbose_print("waiting for tasks to end")
    184         self.done_mutex.acquire()
    185         verbose_print("tasks done")
    187     def task2(self, ident):
    188         for i in range(NUMTRIPS):
    189             if ident == 0:
    190                 # give it a good chance to enter the next
    191                 # barrier before the others are all out
    192                 # of the current one
    193                 delay = 0
    194             else:
    195                 with self.random_mutex:
    196                     delay = random.random() / 10000.0
    197             verbose_print("task %s will run for %sus" %
    198                           (ident, round(delay * 1e6)))
    199             time.sleep(delay)
    200             verbose_print("task %s entering %s" % (ident, i))
    201             self.bar.enter()
    202             verbose_print("task %s leaving barrier" % ident)
    203         with self.running_mutex:
    204             self.running -= 1
    205             # Must release mutex before releasing done, else the main thread can
    206             # exit and set mutex to None as part of global teardown; then
    207             # mutex.release() raises AttributeError.
    208             finished = self.running == 0
    209         if finished:
    210             self.done_mutex.release()
    212 class LockTests(lock_tests.LockTests):
    213     locktype = thread.allocate_lock
    216 class TestForkInThread(unittest.TestCase):
    217     def setUp(self):
    218         self.read_fd, self.write_fd = os.pipe()
    220     @unittest.skipIf(sys.platform.startswith('win'),
    221                      "This test is only appropriate for POSIX-like systems.")
    222     @support.reap_threads
    223     def test_forkinthread(self):
    224         def thread1():
    225             try:
    226                 pid = os.fork() # fork in a thread
    227             except RuntimeError:
    228                 os._exit(1) # exit the child
    230             if pid == 0: # child
    231                 try:
    232                     os.close(self.read_fd)
    233                     os.write(self.write_fd, b"OK")
    234                 finally:
    235                     os._exit(0)
    236             else: # parent
    237                 os.close(self.write_fd)
    239         thread.start_new_thread(thread1, ())
    240         self.assertEqual(os.read(self.read_fd, 2), b"OK",
    241                          "Unable to fork() in thread")
    243     def tearDown(self):
    244         try:
    245             os.close(self.read_fd)
    246         except OSError:
    247             pass
    249         try:
    250             os.close(self.write_fd)
    251         except OSError:
    252             pass
    255 if __name__ == "__main__":
    256     unittest.main()