Home | History | Annotate | Download | only in includes
      1 #
      2 # A test file for the `multiprocessing` package
      3 #
      4 # Copyright (c) 2006-2008, R Oudkerk
      5 # All rights reserved.
      6 #
      7 
      8 import time, sys, random
      9 from Queue import Empty
     10 
     11 import multiprocessing               # may get overwritten
     12 
     13 
     14 #### TEST_VALUE
     15 
     16 def value_func(running, mutex):
     17     random.seed()
     18     time.sleep(random.random()*4)
     19 
     20     mutex.acquire()
     21     print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
     22     running.value -= 1
     23     mutex.release()
     24 
     25 def test_value():
     26     TASKS = 10
     27     running = multiprocessing.Value('i', TASKS)
     28     mutex = multiprocessing.Lock()
     29 
     30     for i in range(TASKS):
     31         p = multiprocessing.Process(target=value_func, args=(running, mutex))
     32         p.start()
     33 
     34     while running.value > 0:
     35         time.sleep(0.08)
     36         mutex.acquire()
     37         print running.value,
     38         sys.stdout.flush()
     39         mutex.release()
     40 
     41     print
     42     print 'No more running processes'
     43 
     44 
     45 #### TEST_QUEUE
     46 
     47 def queue_func(queue):
     48     for i in range(30):
     49         time.sleep(0.5 * random.random())
     50         queue.put(i*i)
     51     queue.put('STOP')
     52 
     53 def test_queue():
     54     q = multiprocessing.Queue()
     55 
     56     p = multiprocessing.Process(target=queue_func, args=(q,))
     57     p.start()
     58 
     59     o = None
     60     while o != 'STOP':
     61         try:
     62             o = q.get(timeout=0.3)
     63             print o,
     64             sys.stdout.flush()
     65         except Empty:
     66             print 'TIMEOUT'
     67 
     68     print
     69 
     70 
     71 #### TEST_CONDITION
     72 
     73 def condition_func(cond):
     74     cond.acquire()
     75     print '\t' + str(cond)
     76     time.sleep(2)
     77     print '\tchild is notifying'
     78     print '\t' + str(cond)
     79     cond.notify()
     80     cond.release()
     81 
     82 def test_condition():
     83     cond = multiprocessing.Condition()
     84 
     85     p = multiprocessing.Process(target=condition_func, args=(cond,))
     86     print cond
     87 
     88     cond.acquire()
     89     print cond
     90     cond.acquire()
     91     print cond
     92 
     93     p.start()
     94 
     95     print 'main is waiting'
     96     cond.wait()
     97     print 'main has woken up'
     98 
     99     print cond
    100     cond.release()
    101     print cond
    102     cond.release()
    103 
    104     p.join()
    105     print cond
    106 
    107 
    108 #### TEST_SEMAPHORE
    109 
    110 def semaphore_func(sema, mutex, running):
    111     sema.acquire()
    112 
    113     mutex.acquire()
    114     running.value += 1
    115     print running.value, 'tasks are running'
    116     mutex.release()
    117 
    118     random.seed()
    119     time.sleep(random.random()*2)
    120 
    121     mutex.acquire()
    122     running.value -= 1
    123     print '%s has finished' % multiprocessing.current_process()
    124     mutex.release()
    125 
    126     sema.release()
    127 
    128 def test_semaphore():
    129     sema = multiprocessing.Semaphore(3)
    130     mutex = multiprocessing.RLock()
    131     running = multiprocessing.Value('i', 0)
    132 
    133     processes = [
    134         multiprocessing.Process(target=semaphore_func,
    135                                 args=(sema, mutex, running))
    136         for i in range(10)
    137         ]
    138 
    139     for p in processes:
    140         p.start()
    141 
    142     for p in processes:
    143         p.join()
    144 
    145 
    146 #### TEST_JOIN_TIMEOUT
    147 
    148 def join_timeout_func():
    149     print '\tchild sleeping'
    150     time.sleep(5.5)
    151     print '\n\tchild terminating'
    152 
    153 def test_join_timeout():
    154     p = multiprocessing.Process(target=join_timeout_func)
    155     p.start()
    156 
    157     print 'waiting for process to finish'
    158 
    159     while 1:
    160         p.join(timeout=1)
    161         if not p.is_alive():
    162             break
    163         print '.',
    164         sys.stdout.flush()
    165 
    166 
    167 #### TEST_EVENT
    168 
    169 def event_func(event):
    170     print '\t%r is waiting' % multiprocessing.current_process()
    171     event.wait()
    172     print '\t%r has woken up' % multiprocessing.current_process()
    173 
    174 def test_event():
    175     event = multiprocessing.Event()
    176 
    177     processes = [multiprocessing.Process(target=event_func, args=(event,))
    178                  for i in range(5)]
    179 
    180     for p in processes:
    181         p.start()
    182 
    183     print 'main is sleeping'
    184     time.sleep(2)
    185 
    186     print 'main is setting event'
    187     event.set()
    188 
    189     for p in processes:
    190         p.join()
    191 
    192 
    193 #### TEST_SHAREDVALUES
    194 
    195 def sharedvalues_func(values, arrays, shared_values, shared_arrays):
    196     for i in range(len(values)):
    197         v = values[i][1]
    198         sv = shared_values[i].value
    199         assert v == sv
    200 
    201     for i in range(len(values)):
    202         a = arrays[i][1]
    203         sa = list(shared_arrays[i][:])
    204         assert a == sa
    205 
    206     print 'Tests passed'
    207 
    208 def test_sharedvalues():
    209     values = [
    210         ('i', 10),
    211         ('h', -2),
    212         ('d', 1.25)
    213         ]
    214     arrays = [
    215         ('i', range(100)),
    216         ('d', [0.25 * i for i in range(100)]),
    217         ('H', range(1000))
    218         ]
    219 
    220     shared_values = [multiprocessing.Value(id, v) for id, v in values]
    221     shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
    222 
    223     p = multiprocessing.Process(
    224         target=sharedvalues_func,
    225         args=(values, arrays, shared_values, shared_arrays)
    226         )
    227     p.start()
    228     p.join()
    229 
    230     assert p.exitcode == 0
    231 
    232 
    233 ####
    234 
    235 def test(namespace=multiprocessing):
    236     global multiprocessing
    237 
    238     multiprocessing = namespace
    239 
    240     for func in [ test_value, test_queue, test_condition,
    241                   test_semaphore, test_join_timeout, test_event,
    242                   test_sharedvalues ]:
    243 
    244         print '\n\t######## %s\n' % func.__name__
    245         func()
    246 
    247     ignore = multiprocessing.active_children()      # cleanup any old processes
    248     if hasattr(multiprocessing, '_debug_info'):
    249         info = multiprocessing._debug_info()
    250         if info:
    251             print info
    252             raise ValueError('there should be no positive refcounts left')
    253 
    254 
    255 if __name__ == '__main__':
    256     multiprocessing.freeze_support()
    257 
    258     assert len(sys.argv) in (1, 2)
    259 
    260     if len(sys.argv) == 1 or sys.argv[1] == 'processes':
    261         print ' Using processes '.center(79, '-')
    262         namespace = multiprocessing
    263     elif sys.argv[1] == 'manager':
    264         print ' Using processes and a manager '.center(79, '-')
    265         namespace = multiprocessing.Manager()
    266         namespace.Process = multiprocessing.Process
    267         namespace.current_process = multiprocessing.current_process
    268         namespace.active_children = multiprocessing.active_children
    269     elif sys.argv[1] == 'threads':
    270         print ' Using threads '.center(79, '-')
    271         import multiprocessing.dummy as namespace
    272     else:
    273         print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
    274         raise SystemExit(2)
    275 
    276     test(namespace)
    277