1 # Run the _testcapi module tests (tests for the Python/C API): by defn, 2 # these are all functions _testcapi exports whose name begins with 'test_'. 3 4 from __future__ import with_statement 5 import string 6 import sys 7 import time 8 import random 9 import unittest 10 from test import test_support as support 11 try: 12 import thread 13 import threading 14 except ImportError: 15 thread = None 16 threading = None 17 # Skip this test if the _testcapi module isn't available. 18 _testcapi = support.import_module('_testcapi') 19 20 class CAPITest(unittest.TestCase): 21 22 def test_buildvalue_N(self): 23 _testcapi.test_buildvalue_N() 24 25 26 @unittest.skipUnless(threading, 'Threading required for this test.') 27 class TestPendingCalls(unittest.TestCase): 28 29 def pendingcalls_submit(self, l, n): 30 def callback(): 31 #this function can be interrupted by thread switching so let's 32 #use an atomic operation 33 l.append(None) 34 35 for i in range(n): 36 time.sleep(random.random()*0.02) #0.01 secs on average 37 #try submitting callback until successful. 38 #rely on regular interrupt to flush queue if we are 39 #unsuccessful. 40 while True: 41 if _testcapi._pending_threadfunc(callback): 42 break; 43 44 def pendingcalls_wait(self, l, n, context = None): 45 #now, stick around until l[0] has grown to 10 46 count = 0; 47 while len(l) != n: 48 #this busy loop is where we expect to be interrupted to 49 #run our callbacks. Note that callbacks are only run on the 50 #main thread 51 if False and support.verbose: 52 print "(%i)"%(len(l),), 53 for i in xrange(1000): 54 a = i*i 55 if context and not context.event.is_set(): 56 continue 57 count += 1 58 self.assertTrue(count < 10000, 59 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 60 if False and support.verbose: 61 print "(%i)"%(len(l),) 62 63 def test_pendingcalls_threaded(self): 64 #do every callback on a separate thread 65 n = 32 #total callbacks 66 threads = [] 67 class foo(object):pass 68 context = foo() 69 context.l = [] 70 context.n = 2 #submits per thread 71 context.nThreads = n // context.n 72 context.nFinished = 0 73 context.lock = threading.Lock() 74 context.event = threading.Event() 75 76 threads = [threading.Thread(target=self.pendingcalls_thread, 77 args=(context,)) 78 for i in range(context.nThreads)] 79 with support.start_threads(threads): 80 self.pendingcalls_wait(context.l, n, context) 81 82 def pendingcalls_thread(self, context): 83 try: 84 self.pendingcalls_submit(context.l, context.n) 85 finally: 86 with context.lock: 87 context.nFinished += 1 88 nFinished = context.nFinished 89 if False and support.verbose: 90 print "finished threads: ", nFinished 91 if nFinished == context.nThreads: 92 context.event.set() 93 94 def test_pendingcalls_non_threaded(self): 95 #again, just using the main thread, likely they will all be dispatched at 96 #once. It is ok to ask for too many, because we loop until we find a slot. 97 #the loop can be interrupted to dispatch. 98 #there are only 32 dispatch slots, so we go for twice that! 99 l = [] 100 n = 64 101 self.pendingcalls_submit(l, n) 102 self.pendingcalls_wait(l, n) 103 104 105 class TestGetIndices(unittest.TestCase): 106 107 def test_get_indices(self): 108 self.assertEqual(_testcapi.get_indices(slice(10L, 20, 1), 100), (0, 10, 20, 1)) 109 self.assertEqual(_testcapi.get_indices(slice(10.1, 20, 1), 100), None) 110 self.assertEqual(_testcapi.get_indices(slice(10, 20L, 1), 100), (0, 10, 20, 1)) 111 self.assertEqual(_testcapi.get_indices(slice(10, 20.1, 1), 100), None) 112 113 self.assertEqual(_testcapi.get_indices(slice(10L, 20, 1L), 100), (0, 10, 20, 1)) 114 self.assertEqual(_testcapi.get_indices(slice(10.1, 20, 1L), 100), None) 115 self.assertEqual(_testcapi.get_indices(slice(10, 20L, 1L), 100), (0, 10, 20, 1)) 116 self.assertEqual(_testcapi.get_indices(slice(10, 20.1, 1L), 100), None) 117 118 119 @unittest.skipUnless(threading and thread, 'Threading required for this test.') 120 class TestThreadState(unittest.TestCase): 121 122 @support.reap_threads 123 def test_thread_state(self): 124 # some extra thread-state tests driven via _testcapi 125 def target(): 126 idents = [] 127 128 def callback(): 129 idents.append(thread.get_ident()) 130 131 _testcapi._test_thread_state(callback) 132 a = b = callback 133 time.sleep(1) 134 # Check our main thread is in the list exactly 3 times. 135 self.assertEqual(idents.count(thread.get_ident()), 3, 136 "Couldn't find main thread correctly in the list") 137 138 target() 139 t = threading.Thread(target=target) 140 t.start() 141 t.join() 142 143 144 class Test_testcapi(unittest.TestCase): 145 locals().update((name, getattr(_testcapi, name)) 146 for name in dir(_testcapi) 147 if name.startswith('test_') and not name.endswith('_code')) 148 149 150 def test_main(): 151 support.run_unittest(CAPITest, TestPendingCalls, 152 TestThreadState, TestGetIndices, Test_testcapi) 153 154 if __name__ == "__main__": 155 test_main() 156