Home | History | Annotate | Download | only in test
      1 import os
      2 import unittest
      3 import random
      4 from test import test_support
      5 thread = test_support.import_module('thread')
      6 import time
      7 import sys
      8 import weakref
      9 
     10 from test import lock_tests
     11 
     12 NUMTASKS = 10
     13 NUMTRIPS = 3
     14 
     15 
     16 _print_mutex = thread.allocate_lock()
     17 
     18 def verbose_print(arg):
     19     """Helper function for printing out debugging output."""
     20     if test_support.verbose:
     21         with _print_mutex:
     22             print arg
     23 
     24 
     25 class BasicThreadTest(unittest.TestCase):
     26 
     27     def setUp(self):
     28         self.done_mutex = thread.allocate_lock()
     29         self.done_mutex.acquire()
     30         self.running_mutex = thread.allocate_lock()
     31         self.random_mutex = thread.allocate_lock()
     32         self.created = 0
     33         self.running = 0
     34         self.next_ident = 0
     35 
     36 
     37 class ThreadRunningTests(BasicThreadTest):
     38 
     39     def newtask(self):
     40         with self.running_mutex:
     41             self.next_ident += 1
     42             verbose_print("creating task %s" % self.next_ident)
     43             thread.start_new_thread(self.task, (self.next_ident,))
     44             self.created += 1
     45             self.running += 1
     46 
     47     def task(self, ident):
     48         with self.random_mutex:
     49             delay = random.random() / 10000.0
     50         verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
     51         time.sleep(delay)
     52         verbose_print("task %s done" % ident)
     53         with self.running_mutex:
     54             self.running -= 1
     55             if self.created == NUMTASKS and self.running == 0:
     56                 self.done_mutex.release()
     57 
     58     def test_starting_threads(self):
     59         # Basic test for thread creation.
     60         for i in range(NUMTASKS):
     61             self.newtask()
     62         verbose_print("waiting for tasks to complete...")
     63         self.done_mutex.acquire()
     64         verbose_print("all tasks done")
     65 
     66     def test_stack_size(self):
     67         # Various stack size tests.
     68         self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
     69 
     70         thread.stack_size(0)
     71         self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
     72 
     73         if os.name not in ("nt", "os2", "posix"):
     74             return
     75 
     76         tss_supported = True
     77         try:
     78             thread.stack_size(4096)
     79         except ValueError:
     80             verbose_print("caught expected ValueError setting "
     81                             "stack_size(4096)")
     82         except thread.error:
     83             tss_supported = False
     84             verbose_print("platform does not support changing thread stack "
     85                             "size")
     86 
     87         if tss_supported:
     88             fail_msg = "stack_size(%d) failed - should succeed"
     89             for tss in (262144, 0x100000, 0):
     90                 thread.stack_size(tss)
     91                 self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
     92                 verbose_print("successfully set stack_size(%d)" % tss)
     93 
     94             for tss in (262144, 0x100000):
     95                 verbose_print("trying stack_size = (%d)" % tss)
     96                 self.next_ident = 0
     97                 self.created = 0
     98                 for i in range(NUMTASKS):
     99                     self.newtask()
    100 
    101                 verbose_print("waiting for all tasks to complete")
    102                 self.done_mutex.acquire()
    103                 verbose_print("all tasks done")
    104 
    105             thread.stack_size(0)
    106 
    107     def test__count(self):
    108         # Test the _count() function.
    109         orig = thread._count()
    110         mut = thread.allocate_lock()
    111         mut.acquire()
    112         started = []
    113         def task():
    114             started.append(None)
    115             mut.acquire()
    116             mut.release()
    117         thread.start_new_thread(task, ())
    118         while not started:
    119             time.sleep(0.01)
    120         self.assertEqual(thread._count(), orig + 1)
    121         # Allow the task to finish.
    122         mut.release()
    123         # The only reliable way to be sure that the thread ended from the
    124         # interpreter's point of view is to wait for the function object to be
    125         # destroyed.
    126         done = []
    127         wr = weakref.ref(task, lambda _: done.append(None))
    128         del task
    129         while not done:
    130             time.sleep(0.01)
    131         self.assertEqual(thread._count(), orig)
    132 
    133     def test_save_exception_state_on_error(self):
    134         # See issue #14474
    135         def task():
    136             started.release()
    137             raise SyntaxError
    138         def mywrite(self, *args):
    139             try:
    140                 raise ValueError
    141             except ValueError:
    142                 pass
    143             real_write(self, *args)
    144         c = thread._count()
    145         started = thread.allocate_lock()
    146         with test_support.captured_output("stderr") as stderr:
    147             real_write = stderr.write
    148             stderr.write = mywrite
    149             started.acquire()
    150             thread.start_new_thread(task, ())
    151             started.acquire()
    152             while thread._count() > c:
    153                 time.sleep(0.01)
    154         self.assertIn("Traceback", stderr.getvalue())
    155 
    156 
    157 class Barrier:
    158     def __init__(self, num_threads):
    159         self.num_threads = num_threads
    160         self.waiting = 0
    161         self.checkin_mutex  = thread.allocate_lock()
    162         self.checkout_mutex = thread.allocate_lock()
    163         self.checkout_mutex.acquire()
    164 
    165     def enter(self):
    166         self.checkin_mutex.acquire()
    167         self.waiting = self.waiting + 1
    168         if self.waiting == self.num_threads:
    169             self.waiting = self.num_threads - 1
    170             self.checkout_mutex.release()
    171             return
    172         self.checkin_mutex.release()
    173 
    174         self.checkout_mutex.acquire()
    175         self.waiting = self.waiting - 1
    176         if self.waiting == 0:
    177             self.checkin_mutex.release()
    178             return
    179         self.checkout_mutex.release()
    180 
    181 
    182 class BarrierTest(BasicThreadTest):
    183 
    184     def test_barrier(self):
    185         self.bar = Barrier(NUMTASKS)
    186         self.running = NUMTASKS
    187         for i in range(NUMTASKS):
    188             thread.start_new_thread(self.task2, (i,))
    189         verbose_print("waiting for tasks to end")
    190         self.done_mutex.acquire()
    191         verbose_print("tasks done")
    192 
    193     def task2(self, ident):
    194         for i in range(NUMTRIPS):
    195             if ident == 0:
    196                 # give it a good chance to enter the next
    197                 # barrier before the others are all out
    198                 # of the current one
    199                 delay = 0
    200             else:
    201                 with self.random_mutex:
    202                     delay = random.random() / 10000.0
    203             verbose_print("task %s will run for %sus" %
    204                           (ident, round(delay * 1e6)))
    205             time.sleep(delay)
    206             verbose_print("task %s entering %s" % (ident, i))
    207             self.bar.enter()
    208             verbose_print("task %s leaving barrier" % ident)
    209         with self.running_mutex:
    210             self.running -= 1
    211             # Must release mutex before releasing done, else the main thread can
    212             # exit and set mutex to None as part of global teardown; then
    213             # mutex.release() raises AttributeError.
    214             finished = self.running == 0
    215         if finished:
    216             self.done_mutex.release()
    217 
    218 
    219 class LockTests(lock_tests.LockTests):
    220     locktype = thread.allocate_lock
    221 
    222 
    223 class TestForkInThread(unittest.TestCase):
    224     def setUp(self):
    225         self.read_fd, self.write_fd = os.pipe()
    226 
    227     @unittest.skipIf(sys.platform.startswith('win'),
    228                      "This test is only appropriate for POSIX-like systems.")
    229     @test_support.reap_threads
    230     def test_forkinthread(self):
    231         def thread1():
    232             try:
    233                 pid = os.fork() # fork in a thread
    234             except RuntimeError:
    235                 sys.exit(0) # exit the child
    236 
    237             if pid == 0: # child
    238                 os.close(self.read_fd)
    239                 os.write(self.write_fd, "OK")
    240                 sys.exit(0)
    241             else: # parent
    242                 os.close(self.write_fd)
    243 
    244         thread.start_new_thread(thread1, ())
    245         self.assertEqual(os.read(self.read_fd, 2), "OK",
    246                          "Unable to fork() in thread")
    247 
    248     def tearDown(self):
    249         try:
    250             os.close(self.read_fd)
    251         except OSError:
    252             pass
    253 
    254         try:
    255             os.close(self.write_fd)
    256         except OSError:
    257             pass
    258 
    259 
    260 def test_main():
    261     test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,
    262                               TestForkInThread)
    263 
    264 if __name__ == "__main__":
    265     test_main()
    266