1 from test import support 2 import unittest 3 import dummy_threading as _threading 4 import time 5 6 class DummyThreadingTestCase(unittest.TestCase): 7 8 class TestThread(_threading.Thread): 9 10 def run(self): 11 global running 12 global sema 13 global mutex 14 # Uncomment if testing another module, such as the real 'threading' 15 # module. 16 #delay = random.random() * 2 17 delay = 0 18 if support.verbose: 19 print('task', self.name, 'will run for', delay, 'sec') 20 sema.acquire() 21 mutex.acquire() 22 running += 1 23 if support.verbose: 24 print(running, 'tasks are running') 25 mutex.release() 26 time.sleep(delay) 27 if support.verbose: 28 print('task', self.name, 'done') 29 mutex.acquire() 30 running -= 1 31 if support.verbose: 32 print(self.name, 'is finished.', running, 'tasks are running') 33 mutex.release() 34 sema.release() 35 36 def setUp(self): 37 self.numtasks = 10 38 global sema 39 sema = _threading.BoundedSemaphore(value=3) 40 global mutex 41 mutex = _threading.RLock() 42 global running 43 running = 0 44 self.threads = [] 45 46 def test_tasks(self): 47 for i in range(self.numtasks): 48 t = self.TestThread(name="<thread %d>"%i) 49 self.threads.append(t) 50 t.start() 51 52 if support.verbose: 53 print('waiting for all tasks to complete') 54 for t in self.threads: 55 t.join() 56 if support.verbose: 57 print('all tasks done') 58 59 if __name__ == '__main__': 60 unittest.main() 61