1 # Run the _testcapi module tests (tests for the Python/C API): by defn, 2 # these are all functions _testcapi exports whose name begins with 'test_'. 3 4 from __future__ import with_statement 5 import sys 6 import time 7 import random 8 import unittest 9 from test import test_support 10 try: 11 import thread 12 import threading 13 except ImportError: 14 thread = None 15 threading = None 16 import _testcapi 17 18 @unittest.skipUnless(threading, 'Threading required for this test.') 19 class TestPendingCalls(unittest.TestCase): 20 21 def pendingcalls_submit(self, l, n): 22 def callback(): 23 #this function can be interrupted by thread switching so let's 24 #use an atomic operation 25 l.append(None) 26 27 for i in range(n): 28 time.sleep(random.random()*0.02) #0.01 secs on average 29 #try submitting callback until successful. 30 #rely on regular interrupt to flush queue if we are 31 #unsuccessful. 32 while True: 33 if _testcapi._pending_threadfunc(callback): 34 break; 35 36 def pendingcalls_wait(self, l, n, context = None): 37 #now, stick around until l[0] has grown to 10 38 count = 0; 39 while len(l) != n: 40 #this busy loop is where we expect to be interrupted to 41 #run our callbacks. Note that callbacks are only run on the 42 #main thread 43 if False and test_support.verbose: 44 print "(%i)"%(len(l),), 45 for i in xrange(1000): 46 a = i*i 47 if context and not context.event.is_set(): 48 continue 49 count += 1 50 self.assertTrue(count < 10000, 51 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 52 if False and test_support.verbose: 53 print "(%i)"%(len(l),) 54 55 def test_pendingcalls_threaded(self): 56 #do every callback on a separate thread 57 n = 32 #total callbacks 58 threads = [] 59 class foo(object):pass 60 context = foo() 61 context.l = [] 62 context.n = 2 #submits per thread 63 context.nThreads = n // context.n 64 context.nFinished = 0 65 context.lock = threading.Lock() 66 context.event = threading.Event() 67 68 for i in range(context.nThreads): 69 t = threading.Thread(target=self.pendingcalls_thread, args = (context,)) 70 t.start() 71 threads.append(t) 72 73 self.pendingcalls_wait(context.l, n, context) 74 75 for t in threads: 76 t.join() 77 78 def pendingcalls_thread(self, context): 79 try: 80 self.pendingcalls_submit(context.l, context.n) 81 finally: 82 with context.lock: 83 context.nFinished += 1 84 nFinished = context.nFinished 85 if False and test_support.verbose: 86 print "finished threads: ", nFinished 87 if nFinished == context.nThreads: 88 context.event.set() 89 90 def test_pendingcalls_non_threaded(self): 91 #again, just using the main thread, likely they will all be dispatched at 92 #once. It is ok to ask for too many, because we loop until we find a slot. 93 #the loop can be interrupted to dispatch. 94 #there are only 32 dispatch slots, so we go for twice that! 95 l = [] 96 n = 64 97 self.pendingcalls_submit(l, n) 98 self.pendingcalls_wait(l, n) 99 100 101 @unittest.skipUnless(threading and thread, 'Threading required for this test.') 102 class TestThreadState(unittest.TestCase): 103 104 @test_support.reap_threads 105 def test_thread_state(self): 106 # some extra thread-state tests driven via _testcapi 107 def target(): 108 idents = [] 109 110 def callback(): 111 idents.append(thread.get_ident()) 112 113 _testcapi._test_thread_state(callback) 114 a = b = callback 115 time.sleep(1) 116 # Check our main thread is in the list exactly 3 times. 117 self.assertEqual(idents.count(thread.get_ident()), 3, 118 "Couldn't find main thread correctly in the list") 119 120 target() 121 t = threading.Thread(target=target) 122 t.start() 123 t.join() 124 125 126 def test_main(): 127 for name in dir(_testcapi): 128 if name.startswith('test_'): 129 test = getattr(_testcapi, name) 130 if test_support.verbose: 131 print "internal", name 132 try: 133 test() 134 except _testcapi.error: 135 raise test_support.TestFailed, sys.exc_info()[1] 136 137 test_support.run_unittest(TestPendingCalls, TestThreadState) 138 139 if __name__ == "__main__": 140 test_main() 141