Home | History | Annotate | Download | only in test
      1 import os
      2 import unittest
      3 import random
      4 from test import test_support
      5 thread = test_support.import_module('thread')
      6 import time
      7 import sys
      8 import weakref
      9 
     10 from test import lock_tests
     11 
     12 NUMTASKS = 10
     13 NUMTRIPS = 3
     14 
     15 
     16 _print_mutex = thread.allocate_lock()
     17 
     18 def verbose_print(arg):
     19     """Helper function for printing out debugging output."""
     20     if test_support.verbose:
     21         with _print_mutex:
     22             print arg
     23 
     24 
     25 class BasicThreadTest(unittest.TestCase):
     26 
     27     def setUp(self):
     28         self.done_mutex = thread.allocate_lock()
     29         self.done_mutex.acquire()
     30         self.running_mutex = thread.allocate_lock()
     31         self.random_mutex = thread.allocate_lock()
     32         self.created = 0
     33         self.running = 0
     34         self.next_ident = 0
     35 
     36 
     37 class ThreadRunningTests(BasicThreadTest):
     38 
     39     def newtask(self):
     40         with self.running_mutex:
     41             self.next_ident += 1
     42             verbose_print("creating task %s" % self.next_ident)
     43             thread.start_new_thread(self.task, (self.next_ident,))
     44             self.created += 1
     45             self.running += 1
     46 
     47     def task(self, ident):
     48         with self.random_mutex:
     49             delay = random.random() / 10000.0
     50         verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
     51         time.sleep(delay)
     52         verbose_print("task %s done" % ident)
     53         with self.running_mutex:
     54             self.running -= 1
     55             if self.created == NUMTASKS and self.running == 0:
     56                 self.done_mutex.release()
     57 
     58     def test_starting_threads(self):
     59         # Basic test for thread creation.

     60         for i in range(NUMTASKS):
     61             self.newtask()
     62         verbose_print("waiting for tasks to complete...")
     63         self.done_mutex.acquire()
     64         verbose_print("all tasks done")
     65 
     66     def test_stack_size(self):
     67         # Various stack size tests.

     68         self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
     69 
     70         thread.stack_size(0)
     71         self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
     72 
     73         if os.name not in ("nt", "os2", "posix"):
     74             return
     75 
     76         tss_supported = True
     77         try:
     78             thread.stack_size(4096)
     79         except ValueError:
     80             verbose_print("caught expected ValueError setting "
     81                             "stack_size(4096)")
     82         except thread.error:
     83             tss_supported = False
     84             verbose_print("platform does not support changing thread stack "
     85                             "size")
     86 
     87         if tss_supported:
     88             fail_msg = "stack_size(%d) failed - should succeed"
     89             for tss in (262144, 0x100000, 0):
     90                 thread.stack_size(tss)
     91                 self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
     92                 verbose_print("successfully set stack_size(%d)" % tss)
     93 
     94             for tss in (262144, 0x100000):
     95                 verbose_print("trying stack_size = (%d)" % tss)
     96                 self.next_ident = 0
     97                 self.created = 0
     98                 for i in range(NUMTASKS):
     99                     self.newtask()
    100 
    101                 verbose_print("waiting for all tasks to complete")
    102                 self.done_mutex.acquire()
    103                 verbose_print("all tasks done")
    104 
    105             thread.stack_size(0)
    106 
    107     def test__count(self):
    108         # Test the _count() function.

    109         orig = thread._count()
    110         mut = thread.allocate_lock()
    111         mut.acquire()
    112         started = []
    113         def task():
    114             started.append(None)
    115             mut.acquire()
    116             mut.release()
    117         thread.start_new_thread(task, ())
    118         while not started:
    119             time.sleep(0.01)
    120         self.assertEqual(thread._count(), orig + 1)
    121         # Allow the task to finish.

    122         mut.release()
    123         # The only reliable way to be sure that the thread ended from the

    124         # interpreter's point of view is to wait for the function object to be

    125         # destroyed.

    126         done = []
    127         wr = weakref.ref(task, lambda _: done.append(None))
    128         del task
    129         while not done:
    130             time.sleep(0.01)
    131         self.assertEqual(thread._count(), orig)
    132 
    133 
    134 class Barrier:
    135     def __init__(self, num_threads):
    136         self.num_threads = num_threads
    137         self.waiting = 0
    138         self.checkin_mutex  = thread.allocate_lock()
    139         self.checkout_mutex = thread.allocate_lock()
    140         self.checkout_mutex.acquire()
    141 
    142     def enter(self):
    143         self.checkin_mutex.acquire()
    144         self.waiting = self.waiting + 1
    145         if self.waiting == self.num_threads:
    146             self.waiting = self.num_threads - 1
    147             self.checkout_mutex.release()
    148             return
    149         self.checkin_mutex.release()
    150 
    151         self.checkout_mutex.acquire()
    152         self.waiting = self.waiting - 1
    153         if self.waiting == 0:
    154             self.checkin_mutex.release()
    155             return
    156         self.checkout_mutex.release()
    157 
    158 
    159 class BarrierTest(BasicThreadTest):
    160 
    161     def test_barrier(self):
    162         self.bar = Barrier(NUMTASKS)
    163         self.running = NUMTASKS
    164         for i in range(NUMTASKS):
    165             thread.start_new_thread(self.task2, (i,))
    166         verbose_print("waiting for tasks to end")
    167         self.done_mutex.acquire()
    168         verbose_print("tasks done")
    169 
    170     def task2(self, ident):
    171         for i in range(NUMTRIPS):
    172             if ident == 0:
    173                 # give it a good chance to enter the next

    174                 # barrier before the others are all out

    175                 # of the current one

    176                 delay = 0
    177             else:
    178                 with self.random_mutex:
    179                     delay = random.random() / 10000.0
    180             verbose_print("task %s will run for %sus" %
    181                           (ident, round(delay * 1e6)))
    182             time.sleep(delay)
    183             verbose_print("task %s entering %s" % (ident, i))
    184             self.bar.enter()
    185             verbose_print("task %s leaving barrier" % ident)
    186         with self.running_mutex:
    187             self.running -= 1
    188             # Must release mutex before releasing done, else the main thread can

    189             # exit and set mutex to None as part of global teardown; then

    190             # mutex.release() raises AttributeError.

    191             finished = self.running == 0
    192         if finished:
    193             self.done_mutex.release()
    194 
    195 
    196 class LockTests(lock_tests.LockTests):
    197     locktype = thread.allocate_lock
    198 
    199 
    200 class TestForkInThread(unittest.TestCase):
    201     def setUp(self):
    202         self.read_fd, self.write_fd = os.pipe()
    203 
    204     @unittest.skipIf(sys.platform.startswith('win'),
    205                      "This test is only appropriate for POSIX-like systems.")
    206     @test_support.reap_threads
    207     def test_forkinthread(self):
    208         def thread1():
    209             try:
    210                 pid = os.fork() # fork in a thread

    211             except RuntimeError:
    212                 sys.exit(0) # exit the child

    213 
    214             if pid == 0: # child

    215                 os.close(self.read_fd)
    216                 os.write(self.write_fd, "OK")
    217                 sys.exit(0)
    218             else: # parent

    219                 os.close(self.write_fd)
    220 
    221         thread.start_new_thread(thread1, ())
    222         self.assertEqual(os.read(self.read_fd, 2), "OK",
    223                          "Unable to fork() in thread")
    224 
    225     def tearDown(self):
    226         try:
    227             os.close(self.read_fd)
    228         except OSError:
    229             pass
    230 
    231         try:
    232             os.close(self.write_fd)
    233         except OSError:
    234             pass
    235 
    236 
    237 def test_main():
    238     test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,
    239                               TestForkInThread)
    240 
    241 if __name__ == "__main__":
    242     test_main()
    243