Home | History | Annotate | Download | only in test
      1 # Run the _testcapi module tests (tests for the Python/C API):  by defn,
      2 # these are all functions _testcapi exports whose name begins with 'test_'.
      3 
      4 from __future__ import with_statement
      5 import sys
      6 import time
      7 import random
      8 import unittest
      9 from test import test_support
     10 try:
     11     import thread
     12     import threading
     13 except ImportError:
     14     thread = None
     15     threading = None
     16 import _testcapi
     17 
     18 @unittest.skipUnless(threading, 'Threading required for this test.')
     19 class TestPendingCalls(unittest.TestCase):
     20 
     21     def pendingcalls_submit(self, l, n):
     22         def callback():
     23             #this function can be interrupted by thread switching so let's
     24             #use an atomic operation
     25             l.append(None)
     26 
     27         for i in range(n):
     28             time.sleep(random.random()*0.02) #0.01 secs on average
     29             #try submitting callback until successful.
     30             #rely on regular interrupt to flush queue if we are
     31             #unsuccessful.
     32             while True:
     33                 if _testcapi._pending_threadfunc(callback):
     34                     break;
     35 
     36     def pendingcalls_wait(self, l, n, context = None):
     37         #now, stick around until l[0] has grown to 10
     38         count = 0;
     39         while len(l) != n:
     40             #this busy loop is where we expect to be interrupted to
     41             #run our callbacks.  Note that callbacks are only run on the
     42             #main thread
     43             if False and test_support.verbose:
     44                 print "(%i)"%(len(l),),
     45             for i in xrange(1000):
     46                 a = i*i
     47             if context and not context.event.is_set():
     48                 continue
     49             count += 1
     50             self.assertTrue(count < 10000,
     51                 "timeout waiting for %i callbacks, got %i"%(n, len(l)))
     52         if False and test_support.verbose:
     53             print "(%i)"%(len(l),)
     54 
     55     def test_pendingcalls_threaded(self):
     56         #do every callback on a separate thread
     57         n = 32 #total callbacks
     58         threads = []
     59         class foo(object):pass
     60         context = foo()
     61         context.l = []
     62         context.n = 2 #submits per thread
     63         context.nThreads = n // context.n
     64         context.nFinished = 0
     65         context.lock = threading.Lock()
     66         context.event = threading.Event()
     67 
     68         for i in range(context.nThreads):
     69             t = threading.Thread(target=self.pendingcalls_thread, args = (context,))
     70             t.start()
     71             threads.append(t)
     72 
     73         self.pendingcalls_wait(context.l, n, context)
     74 
     75         for t in threads:
     76             t.join()
     77 
     78     def pendingcalls_thread(self, context):
     79         try:
     80             self.pendingcalls_submit(context.l, context.n)
     81         finally:
     82             with context.lock:
     83                 context.nFinished += 1
     84                 nFinished = context.nFinished
     85                 if False and test_support.verbose:
     86                     print "finished threads: ", nFinished
     87             if nFinished == context.nThreads:
     88                 context.event.set()
     89 
     90     def test_pendingcalls_non_threaded(self):
     91         #again, just using the main thread, likely they will all be dispatched at
     92         #once.  It is ok to ask for too many, because we loop until we find a slot.
     93         #the loop can be interrupted to dispatch.
     94         #there are only 32 dispatch slots, so we go for twice that!
     95         l = []
     96         n = 64
     97         self.pendingcalls_submit(l, n)
     98         self.pendingcalls_wait(l, n)
     99 
    100 
    101 @unittest.skipUnless(threading and thread, 'Threading required for this test.')
    102 class TestThreadState(unittest.TestCase):
    103 
    104     @test_support.reap_threads
    105     def test_thread_state(self):
    106         # some extra thread-state tests driven via _testcapi
    107         def target():
    108             idents = []
    109 
    110             def callback():
    111                 idents.append(thread.get_ident())
    112 
    113             _testcapi._test_thread_state(callback)
    114             a = b = callback
    115             time.sleep(1)
    116             # Check our main thread is in the list exactly 3 times.
    117             self.assertEqual(idents.count(thread.get_ident()), 3,
    118                              "Couldn't find main thread correctly in the list")
    119 
    120         target()
    121         t = threading.Thread(target=target)
    122         t.start()
    123         t.join()
    124 
    125 
    126 def test_main():
    127     for name in dir(_testcapi):
    128         if name.startswith('test_'):
    129             test = getattr(_testcapi, name)
    130             if test_support.verbose:
    131                 print "internal", name
    132             try:
    133                 test()
    134             except _testcapi.error:
    135                 raise test_support.TestFailed, sys.exc_info()[1]
    136 
    137     test_support.run_unittest(TestPendingCalls, TestThreadState)
    138 
    139 if __name__ == "__main__":
    140     test_main()
    141