1 from test import test_support 2 import unittest 3 import dummy_threading as _threading 4 import time 5 6 class DummyThreadingTestCase(unittest.TestCase): 7 8 class TestThread(_threading.Thread): 9 10 def run(self): 11 global running 12 global sema 13 global mutex 14 # Uncomment if testing another module, such as the real 'threading' 15 # module. 16 #delay = random.random() * 2 17 delay = 0 18 if test_support.verbose: 19 print 'task', self.name, 'will run for', delay, 'sec' 20 sema.acquire() 21 mutex.acquire() 22 running += 1 23 if test_support.verbose: 24 print running, 'tasks are running' 25 mutex.release() 26 time.sleep(delay) 27 if test_support.verbose: 28 print 'task', self.name, 'done' 29 mutex.acquire() 30 running -= 1 31 if test_support.verbose: 32 print self.name, 'is finished.', running, 'tasks are running' 33 mutex.release() 34 sema.release() 35 36 def setUp(self): 37 self.numtasks = 10 38 global sema 39 sema = _threading.BoundedSemaphore(value=3) 40 global mutex 41 mutex = _threading.RLock() 42 global running 43 running = 0 44 self.threads = [] 45 46 def test_tasks(self): 47 for i in range(self.numtasks): 48 t = self.TestThread(name="<thread %d>"%i) 49 self.threads.append(t) 50 t.start() 51 52 if test_support.verbose: 53 print 'waiting for all tasks to complete' 54 for t in self.threads: 55 t.join() 56 if test_support.verbose: 57 print 'all tasks done' 58 59 def test_main(): 60 test_support.run_unittest(DummyThreadingTestCase) 61 62 if __name__ == '__main__': 63 test_main() 64