Home | History | Annotate | Download | only in test
      1 import os
      2 import unittest
      3 import random
      4 from test import test_support
      5 thread = test_support.import_module('thread')
      6 import time
      7 import sys
      8 import weakref
      9 
     10 from test import lock_tests
     11 
     12 NUMTASKS = 10
     13 NUMTRIPS = 3
     14 
     15 
     16 _print_mutex = thread.allocate_lock()
     17 
     18 def verbose_print(arg):
     19     """Helper function for printing out debugging output."""
     20     if test_support.verbose:
     21         with _print_mutex:
     22             print arg
     23 
     24 
     25 class BasicThreadTest(unittest.TestCase):
     26 
     27     def setUp(self):
     28         self.done_mutex = thread.allocate_lock()
     29         self.done_mutex.acquire()
     30         self.running_mutex = thread.allocate_lock()
     31         self.random_mutex = thread.allocate_lock()
     32         self.created = 0
     33         self.running = 0
     34         self.next_ident = 0
     35 
     36 
     37 class ThreadRunningTests(BasicThreadTest):
     38 
     39     def newtask(self):
     40         with self.running_mutex:
     41             self.next_ident += 1
     42             verbose_print("creating task %s" % self.next_ident)
     43             thread.start_new_thread(self.task, (self.next_ident,))
     44             self.created += 1
     45             self.running += 1
     46 
     47     def task(self, ident):
     48         with self.random_mutex:
     49             delay = random.random() / 10000.0
     50         verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
     51         time.sleep(delay)
     52         verbose_print("task %s done" % ident)
     53         with self.running_mutex:
     54             self.running -= 1
     55             if self.created == NUMTASKS and self.running == 0:
     56                 self.done_mutex.release()
     57 
     58     def test_starting_threads(self):
     59         # Basic test for thread creation.
     60         for i in range(NUMTASKS):
     61             self.newtask()
     62         verbose_print("waiting for tasks to complete...")
     63         self.done_mutex.acquire()
     64         verbose_print("all tasks done")
     65 
     66     def test_stack_size(self):
     67         # Various stack size tests.
     68         self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
     69 
     70         thread.stack_size(0)
     71         self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
     72 
     73     @unittest.skipIf(os.name not in ("nt", "os2", "posix"), 'test meant for nt, os2, and posix')
     74     def test_nt_and_posix_stack_size(self):
     75         try:
     76             thread.stack_size(4096)
     77         except ValueError:
     78             verbose_print("caught expected ValueError setting "
     79                             "stack_size(4096)")
     80         except thread.error:
     81             self.skipTest("platform does not support changing thread stack "
     82                           "size")
     83 
     84         fail_msg = "stack_size(%d) failed - should succeed"
     85         for tss in (262144, 0x100000, 0):
     86             thread.stack_size(tss)
     87             self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
     88             verbose_print("successfully set stack_size(%d)" % tss)
     89 
     90         for tss in (262144, 0x100000):
     91             verbose_print("trying stack_size = (%d)" % tss)
     92             self.next_ident = 0
     93             self.created = 0
     94             for i in range(NUMTASKS):
     95                 self.newtask()
     96 
     97             verbose_print("waiting for all tasks to complete")
     98             self.done_mutex.acquire()
     99             verbose_print("all tasks done")
    100 
    101         thread.stack_size(0)
    102 
    103     def test__count(self):
    104         # Test the _count() function.
    105         orig = thread._count()
    106         mut = thread.allocate_lock()
    107         mut.acquire()
    108         started = []
    109         def task():
    110             started.append(None)
    111             mut.acquire()
    112             mut.release()
    113         thread.start_new_thread(task, ())
    114         while not started:
    115             time.sleep(0.01)
    116         self.assertEqual(thread._count(), orig + 1)
    117         # Allow the task to finish.
    118         mut.release()
    119         # The only reliable way to be sure that the thread ended from the
    120         # interpreter's point of view is to wait for the function object to be
    121         # destroyed.
    122         done = []
    123         wr = weakref.ref(task, lambda _: done.append(None))
    124         del task
    125         while not done:
    126             time.sleep(0.01)
    127         self.assertEqual(thread._count(), orig)
    128 
    129     def test_save_exception_state_on_error(self):
    130         # See issue #14474
    131         def task():
    132             started.release()
    133             raise SyntaxError
    134         def mywrite(self, *args):
    135             try:
    136                 raise ValueError
    137             except ValueError:
    138                 pass
    139             real_write(self, *args)
    140         c = thread._count()
    141         started = thread.allocate_lock()
    142         with test_support.captured_output("stderr") as stderr:
    143             real_write = stderr.write
    144             stderr.write = mywrite
    145             started.acquire()
    146             thread.start_new_thread(task, ())
    147             started.acquire()
    148             while thread._count() > c:
    149                 time.sleep(0.01)
    150         self.assertIn("Traceback", stderr.getvalue())
    151 
    152 
    153 class Barrier:
    154     def __init__(self, num_threads):
    155         self.num_threads = num_threads
    156         self.waiting = 0
    157         self.checkin_mutex  = thread.allocate_lock()
    158         self.checkout_mutex = thread.allocate_lock()
    159         self.checkout_mutex.acquire()
    160 
    161     def enter(self):
    162         self.checkin_mutex.acquire()
    163         self.waiting = self.waiting + 1
    164         if self.waiting == self.num_threads:
    165             self.waiting = self.num_threads - 1
    166             self.checkout_mutex.release()
    167             return
    168         self.checkin_mutex.release()
    169 
    170         self.checkout_mutex.acquire()
    171         self.waiting = self.waiting - 1
    172         if self.waiting == 0:
    173             self.checkin_mutex.release()
    174             return
    175         self.checkout_mutex.release()
    176 
    177 
    178 class BarrierTest(BasicThreadTest):
    179 
    180     def test_barrier(self):
    181         self.bar = Barrier(NUMTASKS)
    182         self.running = NUMTASKS
    183         for i in range(NUMTASKS):
    184             thread.start_new_thread(self.task2, (i,))
    185         verbose_print("waiting for tasks to end")
    186         self.done_mutex.acquire()
    187         verbose_print("tasks done")
    188 
    189     def task2(self, ident):
    190         for i in range(NUMTRIPS):
    191             if ident == 0:
    192                 # give it a good chance to enter the next
    193                 # barrier before the others are all out
    194                 # of the current one
    195                 delay = 0
    196             else:
    197                 with self.random_mutex:
    198                     delay = random.random() / 10000.0
    199             verbose_print("task %s will run for %sus" %
    200                           (ident, round(delay * 1e6)))
    201             time.sleep(delay)
    202             verbose_print("task %s entering %s" % (ident, i))
    203             self.bar.enter()
    204             verbose_print("task %s leaving barrier" % ident)
    205         with self.running_mutex:
    206             self.running -= 1
    207             # Must release mutex before releasing done, else the main thread can
    208             # exit and set mutex to None as part of global teardown; then
    209             # mutex.release() raises AttributeError.
    210             finished = self.running == 0
    211         if finished:
    212             self.done_mutex.release()
    213 
    214 
    215 class LockTests(lock_tests.LockTests):
    216     locktype = thread.allocate_lock
    217 
    218 
    219 class TestForkInThread(unittest.TestCase):
    220     def setUp(self):
    221         self.read_fd, self.write_fd = os.pipe()
    222 
    223     @unittest.skipIf(sys.platform.startswith('win'),
    224                      "This test is only appropriate for POSIX-like systems.")
    225     @test_support.reap_threads
    226     def test_forkinthread(self):
    227         def thread1():
    228             try:
    229                 pid = os.fork() # fork in a thread
    230             except RuntimeError:
    231                 sys.exit(0) # exit the child
    232 
    233             if pid == 0: # child
    234                 os.close(self.read_fd)
    235                 os.write(self.write_fd, "OK")
    236                 # Exiting the thread normally in the child process can leave
    237                 # any additional threads (such as the one started by
    238                 # importing _tkinter) still running, and this can prevent
    239                 # the half-zombie child process from being cleaned up. See
    240                 # Issue #26456.
    241                 os._exit(0)
    242             else: # parent
    243                 os.close(self.write_fd)
    244 
    245         thread.start_new_thread(thread1, ())
    246         self.assertEqual(os.read(self.read_fd, 2), "OK",
    247                          "Unable to fork() in thread")
    248 
    249     def tearDown(self):
    250         try:
    251             os.close(self.read_fd)
    252         except OSError:
    253             pass
    254 
    255         try:
    256             os.close(self.write_fd)
    257         except OSError:
    258             pass
    259 
    260 
    261 def test_main():
    262     test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,
    263                               TestForkInThread)
    264 
    265 if __name__ == "__main__":
    266     test_main()
    267