1 import os 2 import unittest 3 import random 4 from test import test_support 5 thread = test_support.import_module('thread') 6 import time 7 import sys 8 import weakref 9 10 from test import lock_tests 11 12 NUMTASKS = 10 13 NUMTRIPS = 3 14 15 16 _print_mutex = thread.allocate_lock() 17 18 def verbose_print(arg): 19 """Helper function for printing out debugging output.""" 20 if test_support.verbose: 21 with _print_mutex: 22 print arg 23 24 25 class BasicThreadTest(unittest.TestCase): 26 27 def setUp(self): 28 self.done_mutex = thread.allocate_lock() 29 self.done_mutex.acquire() 30 self.running_mutex = thread.allocate_lock() 31 self.random_mutex = thread.allocate_lock() 32 self.created = 0 33 self.running = 0 34 self.next_ident = 0 35 36 37 class ThreadRunningTests(BasicThreadTest): 38 39 def newtask(self): 40 with self.running_mutex: 41 self.next_ident += 1 42 verbose_print("creating task %s" % self.next_ident) 43 thread.start_new_thread(self.task, (self.next_ident,)) 44 self.created += 1 45 self.running += 1 46 47 def task(self, ident): 48 with self.random_mutex: 49 delay = random.random() / 10000.0 50 verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) 51 time.sleep(delay) 52 verbose_print("task %s done" % ident) 53 with self.running_mutex: 54 self.running -= 1 55 if self.created == NUMTASKS and self.running == 0: 56 self.done_mutex.release() 57 58 def test_starting_threads(self): 59 # Basic test for thread creation. 60 for i in range(NUMTASKS): 61 self.newtask() 62 verbose_print("waiting for tasks to complete...") 63 self.done_mutex.acquire() 64 verbose_print("all tasks done") 65 66 def test_stack_size(self): 67 # Various stack size tests. 68 self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") 69 70 thread.stack_size(0) 71 self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") 72 73 @unittest.skipIf(os.name not in ("nt", "os2", "posix"), 'test meant for nt, os2, and posix') 74 def test_nt_and_posix_stack_size(self): 75 try: 76 thread.stack_size(4096) 77 except ValueError: 78 verbose_print("caught expected ValueError setting " 79 "stack_size(4096)") 80 except thread.error: 81 self.skipTest("platform does not support changing thread stack " 82 "size") 83 84 fail_msg = "stack_size(%d) failed - should succeed" 85 for tss in (262144, 0x100000, 0): 86 thread.stack_size(tss) 87 self.assertEqual(thread.stack_size(), tss, fail_msg % tss) 88 verbose_print("successfully set stack_size(%d)" % tss) 89 90 for tss in (262144, 0x100000): 91 verbose_print("trying stack_size = (%d)" % tss) 92 self.next_ident = 0 93 self.created = 0 94 for i in range(NUMTASKS): 95 self.newtask() 96 97 verbose_print("waiting for all tasks to complete") 98 self.done_mutex.acquire() 99 verbose_print("all tasks done") 100 101 thread.stack_size(0) 102 103 def test__count(self): 104 # Test the _count() function. 105 orig = thread._count() 106 mut = thread.allocate_lock() 107 mut.acquire() 108 started = [] 109 def task(): 110 started.append(None) 111 mut.acquire() 112 mut.release() 113 thread.start_new_thread(task, ()) 114 while not started: 115 time.sleep(0.01) 116 self.assertEqual(thread._count(), orig + 1) 117 # Allow the task to finish. 118 mut.release() 119 # The only reliable way to be sure that the thread ended from the 120 # interpreter's point of view is to wait for the function object to be 121 # destroyed. 122 done = [] 123 wr = weakref.ref(task, lambda _: done.append(None)) 124 del task 125 while not done: 126 time.sleep(0.01) 127 self.assertEqual(thread._count(), orig) 128 129 def test_save_exception_state_on_error(self): 130 # See issue #14474 131 def task(): 132 started.release() 133 raise SyntaxError 134 def mywrite(self, *args): 135 try: 136 raise ValueError 137 except ValueError: 138 pass 139 real_write(self, *args) 140 c = thread._count() 141 started = thread.allocate_lock() 142 with test_support.captured_output("stderr") as stderr: 143 real_write = stderr.write 144 stderr.write = mywrite 145 started.acquire() 146 thread.start_new_thread(task, ()) 147 started.acquire() 148 while thread._count() > c: 149 time.sleep(0.01) 150 self.assertIn("Traceback", stderr.getvalue()) 151 152 153 class Barrier: 154 def __init__(self, num_threads): 155 self.num_threads = num_threads 156 self.waiting = 0 157 self.checkin_mutex = thread.allocate_lock() 158 self.checkout_mutex = thread.allocate_lock() 159 self.checkout_mutex.acquire() 160 161 def enter(self): 162 self.checkin_mutex.acquire() 163 self.waiting = self.waiting + 1 164 if self.waiting == self.num_threads: 165 self.waiting = self.num_threads - 1 166 self.checkout_mutex.release() 167 return 168 self.checkin_mutex.release() 169 170 self.checkout_mutex.acquire() 171 self.waiting = self.waiting - 1 172 if self.waiting == 0: 173 self.checkin_mutex.release() 174 return 175 self.checkout_mutex.release() 176 177 178 class BarrierTest(BasicThreadTest): 179 180 def test_barrier(self): 181 self.bar = Barrier(NUMTASKS) 182 self.running = NUMTASKS 183 for i in range(NUMTASKS): 184 thread.start_new_thread(self.task2, (i,)) 185 verbose_print("waiting for tasks to end") 186 self.done_mutex.acquire() 187 verbose_print("tasks done") 188 189 def task2(self, ident): 190 for i in range(NUMTRIPS): 191 if ident == 0: 192 # give it a good chance to enter the next 193 # barrier before the others are all out 194 # of the current one 195 delay = 0 196 else: 197 with self.random_mutex: 198 delay = random.random() / 10000.0 199 verbose_print("task %s will run for %sus" % 200 (ident, round(delay * 1e6))) 201 time.sleep(delay) 202 verbose_print("task %s entering %s" % (ident, i)) 203 self.bar.enter() 204 verbose_print("task %s leaving barrier" % ident) 205 with self.running_mutex: 206 self.running -= 1 207 # Must release mutex before releasing done, else the main thread can 208 # exit and set mutex to None as part of global teardown; then 209 # mutex.release() raises AttributeError. 210 finished = self.running == 0 211 if finished: 212 self.done_mutex.release() 213 214 215 class LockTests(lock_tests.LockTests): 216 locktype = thread.allocate_lock 217 218 219 class TestForkInThread(unittest.TestCase): 220 def setUp(self): 221 self.read_fd, self.write_fd = os.pipe() 222 223 @unittest.skipIf(sys.platform.startswith('win'), 224 "This test is only appropriate for POSIX-like systems.") 225 @test_support.reap_threads 226 def test_forkinthread(self): 227 def thread1(): 228 try: 229 pid = os.fork() # fork in a thread 230 except RuntimeError: 231 sys.exit(0) # exit the child 232 233 if pid == 0: # child 234 os.close(self.read_fd) 235 os.write(self.write_fd, "OK") 236 # Exiting the thread normally in the child process can leave 237 # any additional threads (such as the one started by 238 # importing _tkinter) still running, and this can prevent 239 # the half-zombie child process from being cleaned up. See 240 # Issue #26456. 241 os._exit(0) 242 else: # parent 243 os.close(self.write_fd) 244 245 thread.start_new_thread(thread1, ()) 246 self.assertEqual(os.read(self.read_fd, 2), "OK", 247 "Unable to fork() in thread") 248 249 def tearDown(self): 250 try: 251 os.close(self.read_fd) 252 except OSError: 253 pass 254 255 try: 256 os.close(self.write_fd) 257 except OSError: 258 pass 259 260 261 def test_main(): 262 test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests, 263 TestForkInThread) 264 265 if __name__ == "__main__": 266 test_main() 267