Home | History | Annotate | Download | only in test
      1 #!/usr/bin/env python

      2 
      3 #

      4 # Unit tests for the multiprocessing package

      5 #

      6 
      7 import unittest
      8 import Queue
      9 import time
     10 import sys
     11 import os
     12 import gc
     13 import signal
     14 import array
     15 import socket
     16 import random
     17 import logging
     18 from test import test_support
     19 from StringIO import StringIO
     20 _multiprocessing = test_support.import_module('_multiprocessing')
     21 # import threading after _multiprocessing to raise a more relevant error

     22 # message: "No module named _multiprocessing". _multiprocessing is not compiled

     23 # without thread support.

     24 import threading
     25 
     26 # Work around broken sem_open implementations

     27 test_support.import_module('multiprocessing.synchronize')
     28 
     29 import multiprocessing.dummy
     30 import multiprocessing.connection
     31 import multiprocessing.managers
     32 import multiprocessing.heap
     33 import multiprocessing.pool
     34 
     35 from multiprocessing import util
     36 
     37 try:
     38     from multiprocessing.sharedctypes import Value, copy
     39     HAS_SHAREDCTYPES = True
     40 except ImportError:
     41     HAS_SHAREDCTYPES = False
     42 
     43 #

     44 #

     45 #

     46 
     47 latin = str
     48 
     49 #

     50 # Constants

     51 #

     52 
     53 LOG_LEVEL = util.SUBWARNING
     54 #LOG_LEVEL = logging.DEBUG

     55 
     56 DELTA = 0.1
     57 CHECK_TIMINGS = False     # making true makes tests take a lot longer

     58                           # and can sometimes cause some non-serious

     59                           # failures because some calls block a bit

     60                           # longer than expected

     61 if CHECK_TIMINGS:
     62     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
     63 else:
     64     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
     65 
     66 HAVE_GETVALUE = not getattr(_multiprocessing,
     67                             'HAVE_BROKEN_SEM_GETVALUE', False)
     68 
     69 WIN32 = (sys.platform == "win32")
     70 
     71 #

     72 # Some tests require ctypes

     73 #

     74 
     75 try:
     76     from ctypes import Structure, c_int, c_double
     77 except ImportError:
     78     Structure = object
     79     c_int = c_double = None
     80 
     81 #

     82 # Creates a wrapper for a function which records the time it takes to finish

     83 #

     84 
     85 class TimingWrapper(object):
     86 
     87     def __init__(self, func):
     88         self.func = func
     89         self.elapsed = None
     90 
     91     def __call__(self, *args, **kwds):
     92         t = time.time()
     93         try:
     94             return self.func(*args, **kwds)
     95         finally:
     96             self.elapsed = time.time() - t
     97 
     98 #

     99 # Base class for test cases

    100 #

    101 
    102 class BaseTestCase(object):
    103 
    104     ALLOWED_TYPES = ('processes', 'manager', 'threads')
    105 
    106     def assertTimingAlmostEqual(self, a, b):
    107         if CHECK_TIMINGS:
    108             self.assertAlmostEqual(a, b, 1)
    109 
    110     def assertReturnsIfImplemented(self, value, func, *args):
    111         try:
    112             res = func(*args)
    113         except NotImplementedError:
    114             pass
    115         else:
    116             return self.assertEqual(value, res)
    117 
    118     # For the sanity of Windows users, rather than crashing or freezing in

    119     # multiple ways.

    120     def __reduce__(self, *args):
    121         raise NotImplementedError("shouldn't try to pickle a test case")
    122 
    123     __reduce_ex__ = __reduce__
    124 
    125 #

    126 # Return the value of a semaphore

    127 #

    128 
    129 def get_value(self):
    130     try:
    131         return self.get_value()
    132     except AttributeError:
    133         try:
    134             return self._Semaphore__value
    135         except AttributeError:
    136             try:
    137                 return self._value
    138             except AttributeError:
    139                 raise NotImplementedError
    140 
    141 #

    142 # Testcases

    143 #

    144 
    145 class _TestProcess(BaseTestCase):
    146 
    147     ALLOWED_TYPES = ('processes', 'threads')
    148 
    149     def test_current(self):
    150         if self.TYPE == 'threads':
    151             return
    152 
    153         current = self.current_process()
    154         authkey = current.authkey
    155 
    156         self.assertTrue(current.is_alive())
    157         self.assertTrue(not current.daemon)
    158         self.assertIsInstance(authkey, bytes)
    159         self.assertTrue(len(authkey) > 0)
    160         self.assertEqual(current.ident, os.getpid())
    161         self.assertEqual(current.exitcode, None)
    162 
    163     @classmethod
    164     def _test(cls, q, *args, **kwds):
    165         current = cls.current_process()
    166         q.put(args)
    167         q.put(kwds)
    168         q.put(current.name)
    169         if cls.TYPE != 'threads':
    170             q.put(bytes(current.authkey))
    171             q.put(current.pid)
    172 
    173     def test_process(self):
    174         q = self.Queue(1)
    175         e = self.Event()
    176         args = (q, 1, 2)
    177         kwargs = {'hello':23, 'bye':2.54}
    178         name = 'SomeProcess'
    179         p = self.Process(
    180             target=self._test, args=args, kwargs=kwargs, name=name
    181             )
    182         p.daemon = True
    183         current = self.current_process()
    184 
    185         if self.TYPE != 'threads':
    186             self.assertEqual(p.authkey, current.authkey)
    187         self.assertEqual(p.is_alive(), False)
    188         self.assertEqual(p.daemon, True)
    189         self.assertNotIn(p, self.active_children())
    190         self.assertTrue(type(self.active_children()) is list)
    191         self.assertEqual(p.exitcode, None)
    192 
    193         p.start()
    194 
    195         self.assertEqual(p.exitcode, None)
    196         self.assertEqual(p.is_alive(), True)
    197         self.assertIn(p, self.active_children())
    198 
    199         self.assertEqual(q.get(), args[1:])
    200         self.assertEqual(q.get(), kwargs)
    201         self.assertEqual(q.get(), p.name)
    202         if self.TYPE != 'threads':
    203             self.assertEqual(q.get(), current.authkey)
    204             self.assertEqual(q.get(), p.pid)
    205 
    206         p.join()
    207 
    208         self.assertEqual(p.exitcode, 0)
    209         self.assertEqual(p.is_alive(), False)
    210         self.assertNotIn(p, self.active_children())
    211 
    212     @classmethod
    213     def _test_terminate(cls):
    214         time.sleep(1000)
    215 
    216     def test_terminate(self):
    217         if self.TYPE == 'threads':
    218             return
    219 
    220         p = self.Process(target=self._test_terminate)
    221         p.daemon = True
    222         p.start()
    223 
    224         self.assertEqual(p.is_alive(), True)
    225         self.assertIn(p, self.active_children())
    226         self.assertEqual(p.exitcode, None)
    227 
    228         p.terminate()
    229 
    230         join = TimingWrapper(p.join)
    231         self.assertEqual(join(), None)
    232         self.assertTimingAlmostEqual(join.elapsed, 0.0)
    233 
    234         self.assertEqual(p.is_alive(), False)
    235         self.assertNotIn(p, self.active_children())
    236 
    237         p.join()
    238 
    239         # XXX sometimes get p.exitcode == 0 on Windows ...

    240         #self.assertEqual(p.exitcode, -signal.SIGTERM)

    241 
    242     def test_cpu_count(self):
    243         try:
    244             cpus = multiprocessing.cpu_count()
    245         except NotImplementedError:
    246             cpus = 1
    247         self.assertTrue(type(cpus) is int)
    248         self.assertTrue(cpus >= 1)
    249 
    250     def test_active_children(self):
    251         self.assertEqual(type(self.active_children()), list)
    252 
    253         p = self.Process(target=time.sleep, args=(DELTA,))
    254         self.assertNotIn(p, self.active_children())
    255 
    256         p.start()
    257         self.assertIn(p, self.active_children())
    258 
    259         p.join()
    260         self.assertNotIn(p, self.active_children())
    261 
    262     @classmethod
    263     def _test_recursion(cls, wconn, id):
    264         from multiprocessing import forking
    265         wconn.send(id)
    266         if len(id) < 2:
    267             for i in range(2):
    268                 p = cls.Process(
    269                     target=cls._test_recursion, args=(wconn, id+[i])
    270                     )
    271                 p.start()
    272                 p.join()
    273 
    274     def test_recursion(self):
    275         rconn, wconn = self.Pipe(duplex=False)
    276         self._test_recursion(wconn, [])
    277 
    278         time.sleep(DELTA)
    279         result = []
    280         while rconn.poll():
    281             result.append(rconn.recv())
    282 
    283         expected = [
    284             [],
    285               [0],
    286                 [0, 0],
    287                 [0, 1],
    288               [1],
    289                 [1, 0],
    290                 [1, 1]
    291             ]
    292         self.assertEqual(result, expected)
    293 
    294 #

    295 #

    296 #

    297 
    298 class _UpperCaser(multiprocessing.Process):
    299 
    300     def __init__(self):
    301         multiprocessing.Process.__init__(self)
    302         self.child_conn, self.parent_conn = multiprocessing.Pipe()
    303 
    304     def run(self):
    305         self.parent_conn.close()
    306         for s in iter(self.child_conn.recv, None):
    307             self.child_conn.send(s.upper())
    308         self.child_conn.close()
    309 
    310     def submit(self, s):
    311         assert type(s) is str
    312         self.parent_conn.send(s)
    313         return self.parent_conn.recv()
    314 
    315     def stop(self):
    316         self.parent_conn.send(None)
    317         self.parent_conn.close()
    318         self.child_conn.close()
    319 
    320 class _TestSubclassingProcess(BaseTestCase):
    321 
    322     ALLOWED_TYPES = ('processes',)
    323 
    324     def test_subclassing(self):
    325         uppercaser = _UpperCaser()
    326         uppercaser.start()
    327         self.assertEqual(uppercaser.submit('hello'), 'HELLO')
    328         self.assertEqual(uppercaser.submit('world'), 'WORLD')
    329         uppercaser.stop()
    330         uppercaser.join()
    331 
    332 #

    333 #

    334 #

    335 
    336 def queue_empty(q):
    337     if hasattr(q, 'empty'):
    338         return q.empty()
    339     else:
    340         return q.qsize() == 0
    341 
    342 def queue_full(q, maxsize):
    343     if hasattr(q, 'full'):
    344         return q.full()
    345     else:
    346         return q.qsize() == maxsize
    347 
    348 
    349 class _TestQueue(BaseTestCase):
    350 
    351 
    352     @classmethod
    353     def _test_put(cls, queue, child_can_start, parent_can_continue):
    354         child_can_start.wait()
    355         for i in range(6):
    356             queue.get()
    357         parent_can_continue.set()
    358 
    359     def test_put(self):
    360         MAXSIZE = 6
    361         queue = self.Queue(maxsize=MAXSIZE)
    362         child_can_start = self.Event()
    363         parent_can_continue = self.Event()
    364 
    365         proc = self.Process(
    366             target=self._test_put,
    367             args=(queue, child_can_start, parent_can_continue)
    368             )
    369         proc.daemon = True
    370         proc.start()
    371 
    372         self.assertEqual(queue_empty(queue), True)
    373         self.assertEqual(queue_full(queue, MAXSIZE), False)
    374 
    375         queue.put(1)
    376         queue.put(2, True)
    377         queue.put(3, True, None)
    378         queue.put(4, False)
    379         queue.put(5, False, None)
    380         queue.put_nowait(6)
    381 
    382         # the values may be in buffer but not yet in pipe so sleep a bit

    383         time.sleep(DELTA)
    384 
    385         self.assertEqual(queue_empty(queue), False)
    386         self.assertEqual(queue_full(queue, MAXSIZE), True)
    387 
    388         put = TimingWrapper(queue.put)
    389         put_nowait = TimingWrapper(queue.put_nowait)
    390 
    391         self.assertRaises(Queue.Full, put, 7, False)
    392         self.assertTimingAlmostEqual(put.elapsed, 0)
    393 
    394         self.assertRaises(Queue.Full, put, 7, False, None)
    395         self.assertTimingAlmostEqual(put.elapsed, 0)
    396 
    397         self.assertRaises(Queue.Full, put_nowait, 7)
    398         self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
    399 
    400         self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
    401         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
    402 
    403         self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
    404         self.assertTimingAlmostEqual(put.elapsed, 0)
    405 
    406         self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
    407         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
    408 
    409         child_can_start.set()
    410         parent_can_continue.wait()
    411 
    412         self.assertEqual(queue_empty(queue), True)
    413         self.assertEqual(queue_full(queue, MAXSIZE), False)
    414 
    415         proc.join()
    416 
    417     @classmethod
    418     def _test_get(cls, queue, child_can_start, parent_can_continue):
    419         child_can_start.wait()
    420         #queue.put(1)

    421         queue.put(2)
    422         queue.put(3)
    423         queue.put(4)
    424         queue.put(5)
    425         parent_can_continue.set()
    426 
    427     def test_get(self):
    428         queue = self.Queue()
    429         child_can_start = self.Event()
    430         parent_can_continue = self.Event()
    431 
    432         proc = self.Process(
    433             target=self._test_get,
    434             args=(queue, child_can_start, parent_can_continue)
    435             )
    436         proc.daemon = True
    437         proc.start()
    438 
    439         self.assertEqual(queue_empty(queue), True)
    440 
    441         child_can_start.set()
    442         parent_can_continue.wait()
    443 
    444         time.sleep(DELTA)
    445         self.assertEqual(queue_empty(queue), False)
    446 
    447         # Hangs unexpectedly, remove for now

    448         #self.assertEqual(queue.get(), 1)

    449         self.assertEqual(queue.get(True, None), 2)
    450         self.assertEqual(queue.get(True), 3)
    451         self.assertEqual(queue.get(timeout=1), 4)
    452         self.assertEqual(queue.get_nowait(), 5)
    453 
    454         self.assertEqual(queue_empty(queue), True)
    455 
    456         get = TimingWrapper(queue.get)
    457         get_nowait = TimingWrapper(queue.get_nowait)
    458 
    459         self.assertRaises(Queue.Empty, get, False)
    460         self.assertTimingAlmostEqual(get.elapsed, 0)
    461 
    462         self.assertRaises(Queue.Empty, get, False, None)
    463         self.assertTimingAlmostEqual(get.elapsed, 0)
    464 
    465         self.assertRaises(Queue.Empty, get_nowait)
    466         self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
    467 
    468         self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
    469         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
    470 
    471         self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
    472         self.assertTimingAlmostEqual(get.elapsed, 0)
    473 
    474         self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
    475         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
    476 
    477         proc.join()
    478 
    479     @classmethod
    480     def _test_fork(cls, queue):
    481         for i in range(10, 20):
    482             queue.put(i)
    483         # note that at this point the items may only be buffered, so the

    484         # process cannot shutdown until the feeder thread has finished

    485         # pushing items onto the pipe.

    486 
    487     def test_fork(self):
    488         # Old versions of Queue would fail to create a new feeder

    489         # thread for a forked process if the original process had its

    490         # own feeder thread.  This test checks that this no longer

    491         # happens.

    492 
    493         queue = self.Queue()
    494 
    495         # put items on queue so that main process starts a feeder thread

    496         for i in range(10):
    497             queue.put(i)
    498 
    499         # wait to make sure thread starts before we fork a new process

    500         time.sleep(DELTA)
    501 
    502         # fork process

    503         p = self.Process(target=self._test_fork, args=(queue,))
    504         p.start()
    505 
    506         # check that all expected items are in the queue

    507         for i in range(20):
    508             self.assertEqual(queue.get(), i)
    509         self.assertRaises(Queue.Empty, queue.get, False)
    510 
    511         p.join()
    512 
    513     def test_qsize(self):
    514         q = self.Queue()
    515         try:
    516             self.assertEqual(q.qsize(), 0)
    517         except NotImplementedError:
    518             return
    519         q.put(1)
    520         self.assertEqual(q.qsize(), 1)
    521         q.put(5)
    522         self.assertEqual(q.qsize(), 2)
    523         q.get()
    524         self.assertEqual(q.qsize(), 1)
    525         q.get()
    526         self.assertEqual(q.qsize(), 0)
    527 
    528     @classmethod
    529     def _test_task_done(cls, q):
    530         for obj in iter(q.get, None):
    531             time.sleep(DELTA)
    532             q.task_done()
    533 
    534     def test_task_done(self):
    535         queue = self.JoinableQueue()
    536 
    537         if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
    538             self.skipTest("requires 'queue.task_done()' method")
    539 
    540         workers = [self.Process(target=self._test_task_done, args=(queue,))
    541                    for i in xrange(4)]
    542 
    543         for p in workers:
    544             p.start()
    545 
    546         for i in xrange(10):
    547             queue.put(i)
    548 
    549         queue.join()
    550 
    551         for p in workers:
    552             queue.put(None)
    553 
    554         for p in workers:
    555             p.join()
    556 
    557 #

    558 #

    559 #

    560 
    561 class _TestLock(BaseTestCase):
    562 
    563     def test_lock(self):
    564         lock = self.Lock()
    565         self.assertEqual(lock.acquire(), True)
    566         self.assertEqual(lock.acquire(False), False)
    567         self.assertEqual(lock.release(), None)
    568         self.assertRaises((ValueError, threading.ThreadError), lock.release)
    569 
    570     def test_rlock(self):
    571         lock = self.RLock()
    572         self.assertEqual(lock.acquire(), True)
    573         self.assertEqual(lock.acquire(), True)
    574         self.assertEqual(lock.acquire(), True)
    575         self.assertEqual(lock.release(), None)
    576         self.assertEqual(lock.release(), None)
    577         self.assertEqual(lock.release(), None)
    578         self.assertRaises((AssertionError, RuntimeError), lock.release)
    579 
    580     def test_lock_context(self):
    581         with self.Lock():
    582             pass
    583 
    584 
    585 class _TestSemaphore(BaseTestCase):
    586 
    587     def _test_semaphore(self, sem):
    588         self.assertReturnsIfImplemented(2, get_value, sem)
    589         self.assertEqual(sem.acquire(), True)
    590         self.assertReturnsIfImplemented(1, get_value, sem)
    591         self.assertEqual(sem.acquire(), True)
    592         self.assertReturnsIfImplemented(0, get_value, sem)
    593         self.assertEqual(sem.acquire(False), False)
    594         self.assertReturnsIfImplemented(0, get_value, sem)
    595         self.assertEqual(sem.release(), None)
    596         self.assertReturnsIfImplemented(1, get_value, sem)
    597         self.assertEqual(sem.release(), None)
    598         self.assertReturnsIfImplemented(2, get_value, sem)
    599 
    600     def test_semaphore(self):
    601         sem = self.Semaphore(2)
    602         self._test_semaphore(sem)
    603         self.assertEqual(sem.release(), None)
    604         self.assertReturnsIfImplemented(3, get_value, sem)
    605         self.assertEqual(sem.release(), None)
    606         self.assertReturnsIfImplemented(4, get_value, sem)
    607 
    608     def test_bounded_semaphore(self):
    609         sem = self.BoundedSemaphore(2)
    610         self._test_semaphore(sem)
    611         # Currently fails on OS/X

    612         #if HAVE_GETVALUE:

    613         #    self.assertRaises(ValueError, sem.release)

    614         #    self.assertReturnsIfImplemented(2, get_value, sem)

    615 
    616     def test_timeout(self):
    617         if self.TYPE != 'processes':
    618             return
    619 
    620         sem = self.Semaphore(0)
    621         acquire = TimingWrapper(sem.acquire)
    622 
    623         self.assertEqual(acquire(False), False)
    624         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
    625 
    626         self.assertEqual(acquire(False, None), False)
    627         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
    628 
    629         self.assertEqual(acquire(False, TIMEOUT1), False)
    630         self.assertTimingAlmostEqual(acquire.elapsed, 0)
    631 
    632         self.assertEqual(acquire(True, TIMEOUT2), False)
    633         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
    634 
    635         self.assertEqual(acquire(timeout=TIMEOUT3), False)
    636         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
    637 
    638 
    639 class _TestCondition(BaseTestCase):
    640 
    641     @classmethod
    642     def f(cls, cond, sleeping, woken, timeout=None):
    643         cond.acquire()
    644         sleeping.release()
    645         cond.wait(timeout)
    646         woken.release()
    647         cond.release()
    648 
    649     def check_invariant(self, cond):
    650         # this is only supposed to succeed when there are no sleepers

    651         if self.TYPE == 'processes':
    652             try:
    653                 sleepers = (cond._sleeping_count.get_value() -
    654                             cond._woken_count.get_value())
    655                 self.assertEqual(sleepers, 0)
    656                 self.assertEqual(cond._wait_semaphore.get_value(), 0)
    657             except NotImplementedError:
    658                 pass
    659 
    660     def test_notify(self):
    661         cond = self.Condition()
    662         sleeping = self.Semaphore(0)
    663         woken = self.Semaphore(0)
    664 
    665         p = self.Process(target=self.f, args=(cond, sleeping, woken))
    666         p.daemon = True
    667         p.start()
    668 
    669         p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
    670         p.daemon = True
    671         p.start()
    672 
    673         # wait for both children to start sleeping

    674         sleeping.acquire()
    675         sleeping.acquire()
    676 
    677         # check no process/thread has woken up

    678         time.sleep(DELTA)
    679         self.assertReturnsIfImplemented(0, get_value, woken)
    680 
    681         # wake up one process/thread

    682         cond.acquire()
    683         cond.notify()
    684         cond.release()
    685 
    686         # check one process/thread has woken up

    687         time.sleep(DELTA)
    688         self.assertReturnsIfImplemented(1, get_value, woken)
    689 
    690         # wake up another

    691         cond.acquire()
    692         cond.notify()
    693         cond.release()
    694 
    695         # check other has woken up

    696         time.sleep(DELTA)
    697         self.assertReturnsIfImplemented(2, get_value, woken)
    698 
    699         # check state is not mucked up

    700         self.check_invariant(cond)
    701         p.join()
    702 
    703     def test_notify_all(self):
    704         cond = self.Condition()
    705         sleeping = self.Semaphore(0)
    706         woken = self.Semaphore(0)
    707 
    708         # start some threads/processes which will timeout

    709         for i in range(3):
    710             p = self.Process(target=self.f,
    711                              args=(cond, sleeping, woken, TIMEOUT1))
    712             p.daemon = True
    713             p.start()
    714 
    715             t = threading.Thread(target=self.f,
    716                                  args=(cond, sleeping, woken, TIMEOUT1))
    717             t.daemon = True
    718             t.start()
    719 
    720         # wait for them all to sleep

    721         for i in xrange(6):
    722             sleeping.acquire()
    723 
    724         # check they have all timed out

    725         for i in xrange(6):
    726             woken.acquire()
    727         self.assertReturnsIfImplemented(0, get_value, woken)
    728 
    729         # check state is not mucked up

    730         self.check_invariant(cond)
    731 
    732         # start some more threads/processes

    733         for i in range(3):
    734             p = self.Process(target=self.f, args=(cond, sleeping, woken))
    735             p.daemon = True
    736             p.start()
    737 
    738             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
    739             t.daemon = True
    740             t.start()
    741 
    742         # wait for them to all sleep

    743         for i in xrange(6):
    744             sleeping.acquire()
    745 
    746         # check no process/thread has woken up

    747         time.sleep(DELTA)
    748         self.assertReturnsIfImplemented(0, get_value, woken)
    749 
    750         # wake them all up

    751         cond.acquire()
    752         cond.notify_all()
    753         cond.release()
    754 
    755         # check they have all woken

    756         time.sleep(DELTA)
    757         self.assertReturnsIfImplemented(6, get_value, woken)
    758 
    759         # check state is not mucked up

    760         self.check_invariant(cond)
    761 
    762     def test_timeout(self):
    763         cond = self.Condition()
    764         wait = TimingWrapper(cond.wait)
    765         cond.acquire()
    766         res = wait(TIMEOUT1)
    767         cond.release()
    768         self.assertEqual(res, None)
    769         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
    770 
    771 
    772 class _TestEvent(BaseTestCase):
    773 
    774     @classmethod
    775     def _test_event(cls, event):
    776         time.sleep(TIMEOUT2)
    777         event.set()
    778 
    779     def test_event(self):
    780         event = self.Event()
    781         wait = TimingWrapper(event.wait)
    782 
    783         # Removed temporarily, due to API shear, this does not

    784         # work with threading._Event objects. is_set == isSet

    785         self.assertEqual(event.is_set(), False)
    786 
    787         # Removed, threading.Event.wait() will return the value of the __flag

    788         # instead of None. API Shear with the semaphore backed mp.Event

    789         self.assertEqual(wait(0.0), False)
    790         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    791         self.assertEqual(wait(TIMEOUT1), False)
    792         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
    793 
    794         event.set()
    795 
    796         # See note above on the API differences

    797         self.assertEqual(event.is_set(), True)
    798         self.assertEqual(wait(), True)
    799         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    800         self.assertEqual(wait(TIMEOUT1), True)
    801         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    802         # self.assertEqual(event.is_set(), True)

    803 
    804         event.clear()
    805 
    806         #self.assertEqual(event.is_set(), False)

    807 
    808         self.Process(target=self._test_event, args=(event,)).start()
    809         self.assertEqual(wait(), True)
    810 
    811 #

    812 #

    813 #

    814 
    815 class _TestValue(BaseTestCase):
    816 
    817     ALLOWED_TYPES = ('processes',)
    818 
    819     codes_values = [
    820         ('i', 4343, 24234),
    821         ('d', 3.625, -4.25),
    822         ('h', -232, 234),
    823         ('c', latin('x'), latin('y'))
    824         ]
    825 
    826     def setUp(self):
    827         if not HAS_SHAREDCTYPES:
    828             self.skipTest("requires multiprocessing.sharedctypes")
    829 
    830     @classmethod
    831     def _test(cls, values):
    832         for sv, cv in zip(values, cls.codes_values):
    833             sv.value = cv[2]
    834 
    835 
    836     def test_value(self, raw=False):
    837         if raw:
    838             values = [self.RawValue(code, value)
    839                       for code, value, _ in self.codes_values]
    840         else:
    841             values = [self.Value(code, value)
    842                       for code, value, _ in self.codes_values]
    843 
    844         for sv, cv in zip(values, self.codes_values):
    845             self.assertEqual(sv.value, cv[1])
    846 
    847         proc = self.Process(target=self._test, args=(values,))
    848         proc.start()
    849         proc.join()
    850 
    851         for sv, cv in zip(values, self.codes_values):
    852             self.assertEqual(sv.value, cv[2])
    853 
    854     def test_rawvalue(self):
    855         self.test_value(raw=True)
    856 
    857     def test_getobj_getlock(self):
    858         val1 = self.Value('i', 5)
    859         lock1 = val1.get_lock()
    860         obj1 = val1.get_obj()
    861 
    862         val2 = self.Value('i', 5, lock=None)
    863         lock2 = val2.get_lock()
    864         obj2 = val2.get_obj()
    865 
    866         lock = self.Lock()
    867         val3 = self.Value('i', 5, lock=lock)
    868         lock3 = val3.get_lock()
    869         obj3 = val3.get_obj()
    870         self.assertEqual(lock, lock3)
    871 
    872         arr4 = self.Value('i', 5, lock=False)
    873         self.assertFalse(hasattr(arr4, 'get_lock'))
    874         self.assertFalse(hasattr(arr4, 'get_obj'))
    875 
    876         self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
    877 
    878         arr5 = self.RawValue('i', 5)
    879         self.assertFalse(hasattr(arr5, 'get_lock'))
    880         self.assertFalse(hasattr(arr5, 'get_obj'))
    881 
    882 
    883 class _TestArray(BaseTestCase):
    884 
    885     ALLOWED_TYPES = ('processes',)
    886 
    887     @classmethod
    888     def f(cls, seq):
    889         for i in range(1, len(seq)):
    890             seq[i] += seq[i-1]
    891 
    892     @unittest.skipIf(c_int is None, "requires _ctypes")
    893     def test_array(self, raw=False):
    894         seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
    895         if raw:
    896             arr = self.RawArray('i', seq)
    897         else:
    898             arr = self.Array('i', seq)
    899 
    900         self.assertEqual(len(arr), len(seq))
    901         self.assertEqual(arr[3], seq[3])
    902         self.assertEqual(list(arr[2:7]), list(seq[2:7]))
    903 
    904         arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
    905 
    906         self.assertEqual(list(arr[:]), seq)
    907 
    908         self.f(seq)
    909 
    910         p = self.Process(target=self.f, args=(arr,))
    911         p.start()
    912         p.join()
    913 
    914         self.assertEqual(list(arr[:]), seq)
    915 
    916     @unittest.skipIf(c_int is None, "requires _ctypes")
    917     def test_array_from_size(self):
    918         size = 10
    919         # Test for zeroing (see issue #11675).

    920         # The repetition below strengthens the test by increasing the chances

    921         # of previously allocated non-zero memory being used for the new array

    922         # on the 2nd and 3rd loops.

    923         for _ in range(3):
    924             arr = self.Array('i', size)
    925             self.assertEqual(len(arr), size)
    926             self.assertEqual(list(arr), [0] * size)
    927             arr[:] = range(10)
    928             self.assertEqual(list(arr), range(10))
    929             del arr
    930 
    931     @unittest.skipIf(c_int is None, "requires _ctypes")
    932     def test_rawarray(self):
    933         self.test_array(raw=True)
    934 
    935     @unittest.skipIf(c_int is None, "requires _ctypes")
    936     def test_array_accepts_long(self):
    937         arr = self.Array('i', 10L)
    938         self.assertEqual(len(arr), 10)
    939         raw_arr = self.RawArray('i', 10L)
    940         self.assertEqual(len(raw_arr), 10)
    941 
    942     @unittest.skipIf(c_int is None, "requires _ctypes")
    943     def test_getobj_getlock_obj(self):
    944         arr1 = self.Array('i', range(10))
    945         lock1 = arr1.get_lock()
    946         obj1 = arr1.get_obj()
    947 
    948         arr2 = self.Array('i', range(10), lock=None)
    949         lock2 = arr2.get_lock()
    950         obj2 = arr2.get_obj()
    951 
    952         lock = self.Lock()
    953         arr3 = self.Array('i', range(10), lock=lock)
    954         lock3 = arr3.get_lock()
    955         obj3 = arr3.get_obj()
    956         self.assertEqual(lock, lock3)
    957 
    958         arr4 = self.Array('i', range(10), lock=False)
    959         self.assertFalse(hasattr(arr4, 'get_lock'))
    960         self.assertFalse(hasattr(arr4, 'get_obj'))
    961         self.assertRaises(AttributeError,
    962                           self.Array, 'i', range(10), lock='notalock')
    963 
    964         arr5 = self.RawArray('i', range(10))
    965         self.assertFalse(hasattr(arr5, 'get_lock'))
    966         self.assertFalse(hasattr(arr5, 'get_obj'))
    967 
    968 #

    969 #

    970 #

    971 
    972 class _TestContainers(BaseTestCase):
    973 
    974     ALLOWED_TYPES = ('manager',)
    975 
    976     def test_list(self):
    977         a = self.list(range(10))
    978         self.assertEqual(a[:], range(10))
    979 
    980         b = self.list()
    981         self.assertEqual(b[:], [])
    982 
    983         b.extend(range(5))
    984         self.assertEqual(b[:], range(5))
    985 
    986         self.assertEqual(b[2], 2)
    987         self.assertEqual(b[2:10], [2,3,4])
    988 
    989         b *= 2
    990         self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
    991 
    992         self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
    993 
    994         self.assertEqual(a[:], range(10))
    995 
    996         d = [a, b]
    997         e = self.list(d)
    998         self.assertEqual(
    999             e[:],
   1000             [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
   1001             )
   1002 
   1003         f = self.list([a])
   1004         a.append('hello')
   1005         self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
   1006 
   1007     def test_dict(self):
   1008         d = self.dict()
   1009         indices = range(65, 70)
   1010         for i in indices:
   1011             d[i] = chr(i)
   1012         self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
   1013         self.assertEqual(sorted(d.keys()), indices)
   1014         self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
   1015         self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
   1016 
   1017     def test_namespace(self):
   1018         n = self.Namespace()
   1019         n.name = 'Bob'
   1020         n.job = 'Builder'
   1021         n._hidden = 'hidden'
   1022         self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
   1023         del n.job
   1024         self.assertEqual(str(n), "Namespace(name='Bob')")
   1025         self.assertTrue(hasattr(n, 'name'))
   1026         self.assertTrue(not hasattr(n, 'job'))
   1027 
   1028 #

   1029 #

   1030 #

   1031 
   1032 def sqr(x, wait=0.0):
   1033     time.sleep(wait)
   1034     return x*x
   1035 class _TestPool(BaseTestCase):
   1036 
   1037     def test_apply(self):
   1038         papply = self.pool.apply
   1039         self.assertEqual(papply(sqr, (5,)), sqr(5))
   1040         self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
   1041 
   1042     def test_map(self):
   1043         pmap = self.pool.map
   1044         self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
   1045         self.assertEqual(pmap(sqr, range(100), chunksize=20),
   1046                          map(sqr, range(100)))
   1047 
   1048     def test_map_chunksize(self):
   1049         try:
   1050             self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
   1051         except multiprocessing.TimeoutError:
   1052             self.fail("pool.map_async with chunksize stalled on null list")
   1053 
   1054     def test_async(self):
   1055         res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
   1056         get = TimingWrapper(res.get)
   1057         self.assertEqual(get(), 49)
   1058         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
   1059 
   1060     def test_async_timeout(self):
   1061         res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
   1062         get = TimingWrapper(res.get)
   1063         self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
   1064         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
   1065 
   1066     def test_imap(self):
   1067         it = self.pool.imap(sqr, range(10))
   1068         self.assertEqual(list(it), map(sqr, range(10)))
   1069 
   1070         it = self.pool.imap(sqr, range(10))
   1071         for i in range(10):
   1072             self.assertEqual(it.next(), i*i)
   1073         self.assertRaises(StopIteration, it.next)
   1074 
   1075         it = self.pool.imap(sqr, range(1000), chunksize=100)
   1076         for i in range(1000):
   1077             self.assertEqual(it.next(), i*i)
   1078         self.assertRaises(StopIteration, it.next)
   1079 
   1080     def test_imap_unordered(self):
   1081         it = self.pool.imap_unordered(sqr, range(1000))
   1082         self.assertEqual(sorted(it), map(sqr, range(1000)))
   1083 
   1084         it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
   1085         self.assertEqual(sorted(it), map(sqr, range(1000)))
   1086 
   1087     def test_make_pool(self):
   1088         p = multiprocessing.Pool(3)
   1089         self.assertEqual(3, len(p._pool))
   1090         p.close()
   1091         p.join()
   1092 
   1093     def test_terminate(self):
   1094         if self.TYPE == 'manager':
   1095             # On Unix a forked process increfs each shared object to

   1096             # which its parent process held a reference.  If the

   1097             # forked process gets terminated then there is likely to

   1098             # be a reference leak.  So to prevent

   1099             # _TestZZZNumberOfObjects from failing we skip this test

   1100             # when using a manager.

   1101             return
   1102 
   1103         result = self.pool.map_async(
   1104             time.sleep, [0.1 for i in range(10000)], chunksize=1
   1105             )
   1106         self.pool.terminate()
   1107         join = TimingWrapper(self.pool.join)
   1108         join()
   1109         self.assertTrue(join.elapsed < 0.2)
   1110 
   1111 class _TestPoolWorkerLifetime(BaseTestCase):
   1112 
   1113     ALLOWED_TYPES = ('processes', )
   1114     def test_pool_worker_lifetime(self):
   1115         p = multiprocessing.Pool(3, maxtasksperchild=10)
   1116         self.assertEqual(3, len(p._pool))
   1117         origworkerpids = [w.pid for w in p._pool]
   1118         # Run many tasks so each worker gets replaced (hopefully)

   1119         results = []
   1120         for i in range(100):
   1121             results.append(p.apply_async(sqr, (i, )))
   1122         # Fetch the results and verify we got the right answers,

   1123         # also ensuring all the tasks have completed.

   1124         for (j, res) in enumerate(results):
   1125             self.assertEqual(res.get(), sqr(j))
   1126         # Refill the pool

   1127         p._repopulate_pool()
   1128         # Wait until all workers are alive

   1129         # (countdown * DELTA = 5 seconds max startup process time)

   1130         countdown = 50
   1131         while countdown and not all(w.is_alive() for w in p._pool):
   1132             countdown -= 1
   1133             time.sleep(DELTA)
   1134         finalworkerpids = [w.pid for w in p._pool]
   1135         # All pids should be assigned.  See issue #7805.

   1136         self.assertNotIn(None, origworkerpids)
   1137         self.assertNotIn(None, finalworkerpids)
   1138         # Finally, check that the worker pids have changed

   1139         self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
   1140         p.close()
   1141         p.join()
   1142 
   1143 #

   1144 # Test that manager has expected number of shared objects left

   1145 #

   1146 
   1147 class _TestZZZNumberOfObjects(BaseTestCase):
   1148     # Because test cases are sorted alphabetically, this one will get

   1149     # run after all the other tests for the manager.  It tests that

   1150     # there have been no "reference leaks" for the manager's shared

   1151     # objects.  Note the comment in _TestPool.test_terminate().

   1152     ALLOWED_TYPES = ('manager',)
   1153 
   1154     def test_number_of_objects(self):
   1155         EXPECTED_NUMBER = 1                # the pool object is still alive

   1156         multiprocessing.active_children()  # discard dead process objs

   1157         gc.collect()                       # do garbage collection

   1158         refs = self.manager._number_of_objects()
   1159         debug_info = self.manager._debug_info()
   1160         if refs != EXPECTED_NUMBER:
   1161             print self.manager._debug_info()
   1162             print debug_info
   1163 
   1164         self.assertEqual(refs, EXPECTED_NUMBER)
   1165 
   1166 #

   1167 # Test of creating a customized manager class

   1168 #

   1169 
   1170 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
   1171 
   1172 class FooBar(object):
   1173     def f(self):
   1174         return 'f()'
   1175     def g(self):
   1176         raise ValueError
   1177     def _h(self):
   1178         return '_h()'
   1179 
   1180 def baz():
   1181     for i in xrange(10):
   1182         yield i*i
   1183 
   1184 class IteratorProxy(BaseProxy):
   1185     _exposed_ = ('next', '__next__')
   1186     def __iter__(self):
   1187         return self
   1188     def next(self):
   1189         return self._callmethod('next')
   1190     def __next__(self):
   1191         return self._callmethod('__next__')
   1192 
   1193 class MyManager(BaseManager):
   1194     pass
   1195 
   1196 MyManager.register('Foo', callable=FooBar)
   1197 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
   1198 MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
   1199 
   1200 
   1201 class _TestMyManager(BaseTestCase):
   1202 
   1203     ALLOWED_TYPES = ('manager',)
   1204 
   1205     def test_mymanager(self):
   1206         manager = MyManager()
   1207         manager.start()
   1208 
   1209         foo = manager.Foo()
   1210         bar = manager.Bar()
   1211         baz = manager.baz()
   1212 
   1213         foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
   1214         bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
   1215 
   1216         self.assertEqual(foo_methods, ['f', 'g'])
   1217         self.assertEqual(bar_methods, ['f', '_h'])
   1218 
   1219         self.assertEqual(foo.f(), 'f()')
   1220         self.assertRaises(ValueError, foo.g)
   1221         self.assertEqual(foo._callmethod('f'), 'f()')
   1222         self.assertRaises(RemoteError, foo._callmethod, '_h')
   1223 
   1224         self.assertEqual(bar.f(), 'f()')
   1225         self.assertEqual(bar._h(), '_h()')
   1226         self.assertEqual(bar._callmethod('f'), 'f()')
   1227         self.assertEqual(bar._callmethod('_h'), '_h()')
   1228 
   1229         self.assertEqual(list(baz), [i*i for i in range(10)])
   1230 
   1231         manager.shutdown()
   1232 
   1233 #

   1234 # Test of connecting to a remote server and using xmlrpclib for serialization

   1235 #

   1236 
   1237 _queue = Queue.Queue()
   1238 def get_queue():
   1239     return _queue
   1240 
   1241 class QueueManager(BaseManager):
   1242     '''manager class used by server process'''
   1243 QueueManager.register('get_queue', callable=get_queue)
   1244 
   1245 class QueueManager2(BaseManager):
   1246     '''manager class which specifies the same interface as QueueManager'''
   1247 QueueManager2.register('get_queue')
   1248 
   1249 
   1250 SERIALIZER = 'xmlrpclib'
   1251 
   1252 class _TestRemoteManager(BaseTestCase):
   1253 
   1254     ALLOWED_TYPES = ('manager',)
   1255 
   1256     @classmethod
   1257     def _putter(cls, address, authkey):
   1258         manager = QueueManager2(
   1259             address=address, authkey=authkey, serializer=SERIALIZER
   1260             )
   1261         manager.connect()
   1262         queue = manager.get_queue()
   1263         queue.put(('hello world', None, True, 2.25))
   1264 
   1265     def test_remote(self):
   1266         authkey = os.urandom(32)
   1267 
   1268         manager = QueueManager(
   1269             address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
   1270             )
   1271         manager.start()
   1272 
   1273         p = self.Process(target=self._putter, args=(manager.address, authkey))
   1274         p.start()
   1275 
   1276         manager2 = QueueManager2(
   1277             address=manager.address, authkey=authkey, serializer=SERIALIZER
   1278             )
   1279         manager2.connect()
   1280         queue = manager2.get_queue()
   1281 
   1282         # Note that xmlrpclib will deserialize object as a list not a tuple

   1283         self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
   1284 
   1285         # Because we are using xmlrpclib for serialization instead of

   1286         # pickle this will cause a serialization error.

   1287         self.assertRaises(Exception, queue.put, time.sleep)
   1288 
   1289         # Make queue finalizer run before the server is stopped

   1290         del queue
   1291         manager.shutdown()
   1292 
   1293 class _TestManagerRestart(BaseTestCase):
   1294 
   1295     @classmethod
   1296     def _putter(cls, address, authkey):
   1297         manager = QueueManager(
   1298             address=address, authkey=authkey, serializer=SERIALIZER)
   1299         manager.connect()
   1300         queue = manager.get_queue()
   1301         queue.put('hello world')
   1302 
   1303     def test_rapid_restart(self):
   1304         authkey = os.urandom(32)
   1305         manager = QueueManager(
   1306             address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
   1307         srvr = manager.get_server()
   1308         addr = srvr.address
   1309         # Close the connection.Listener socket which gets opened as a part

   1310         # of manager.get_server(). It's not needed for the test.

   1311         srvr.listener.close()
   1312         manager.start()
   1313 
   1314         p = self.Process(target=self._putter, args=(manager.address, authkey))
   1315         p.start()
   1316         queue = manager.get_queue()
   1317         self.assertEqual(queue.get(), 'hello world')
   1318         del queue
   1319         manager.shutdown()
   1320         manager = QueueManager(
   1321             address=addr, authkey=authkey, serializer=SERIALIZER)
   1322         manager.start()
   1323         manager.shutdown()
   1324 
   1325 #

   1326 #

   1327 #

   1328 
   1329 SENTINEL = latin('')
   1330 
   1331 class _TestConnection(BaseTestCase):
   1332 
   1333     ALLOWED_TYPES = ('processes', 'threads')
   1334 
   1335     @classmethod
   1336     def _echo(cls, conn):
   1337         for msg in iter(conn.recv_bytes, SENTINEL):
   1338             conn.send_bytes(msg)
   1339         conn.close()
   1340 
   1341     def test_connection(self):
   1342         conn, child_conn = self.Pipe()
   1343 
   1344         p = self.Process(target=self._echo, args=(child_conn,))
   1345         p.daemon = True
   1346         p.start()
   1347 
   1348         seq = [1, 2.25, None]
   1349         msg = latin('hello world')
   1350         longmsg = msg * 10
   1351         arr = array.array('i', range(4))
   1352 
   1353         if self.TYPE == 'processes':
   1354             self.assertEqual(type(conn.fileno()), int)
   1355 
   1356         self.assertEqual(conn.send(seq), None)
   1357         self.assertEqual(conn.recv(), seq)
   1358 
   1359         self.assertEqual(conn.send_bytes(msg), None)
   1360         self.assertEqual(conn.recv_bytes(), msg)
   1361 
   1362         if self.TYPE == 'processes':
   1363             buffer = array.array('i', [0]*10)
   1364             expected = list(arr) + [0] * (10 - len(arr))
   1365             self.assertEqual(conn.send_bytes(arr), None)
   1366             self.assertEqual(conn.recv_bytes_into(buffer),
   1367                              len(arr) * buffer.itemsize)
   1368             self.assertEqual(list(buffer), expected)
   1369 
   1370             buffer = array.array('i', [0]*10)
   1371             expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
   1372             self.assertEqual(conn.send_bytes(arr), None)
   1373             self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
   1374                              len(arr) * buffer.itemsize)
   1375             self.assertEqual(list(buffer), expected)
   1376 
   1377             buffer = bytearray(latin(' ' * 40))
   1378             self.assertEqual(conn.send_bytes(longmsg), None)
   1379             try:
   1380                 res = conn.recv_bytes_into(buffer)
   1381             except multiprocessing.BufferTooShort, e:
   1382                 self.assertEqual(e.args, (longmsg,))
   1383             else:
   1384                 self.fail('expected BufferTooShort, got %s' % res)
   1385 
   1386         poll = TimingWrapper(conn.poll)
   1387 
   1388         self.assertEqual(poll(), False)
   1389         self.assertTimingAlmostEqual(poll.elapsed, 0)
   1390 
   1391         self.assertEqual(poll(TIMEOUT1), False)
   1392         self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
   1393 
   1394         conn.send(None)
   1395 
   1396         self.assertEqual(poll(TIMEOUT1), True)
   1397         self.assertTimingAlmostEqual(poll.elapsed, 0)
   1398 
   1399         self.assertEqual(conn.recv(), None)
   1400 
   1401         really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb

   1402         conn.send_bytes(really_big_msg)
   1403         self.assertEqual(conn.recv_bytes(), really_big_msg)
   1404 
   1405         conn.send_bytes(SENTINEL)                          # tell child to quit

   1406         child_conn.close()
   1407 
   1408         if self.TYPE == 'processes':
   1409             self.assertEqual(conn.readable, True)
   1410             self.assertEqual(conn.writable, True)
   1411             self.assertRaises(EOFError, conn.recv)
   1412             self.assertRaises(EOFError, conn.recv_bytes)
   1413 
   1414         p.join()
   1415 
   1416     def test_duplex_false(self):
   1417         reader, writer = self.Pipe(duplex=False)
   1418         self.assertEqual(writer.send(1), None)
   1419         self.assertEqual(reader.recv(), 1)
   1420         if self.TYPE == 'processes':
   1421             self.assertEqual(reader.readable, True)
   1422             self.assertEqual(reader.writable, False)
   1423             self.assertEqual(writer.readable, False)
   1424             self.assertEqual(writer.writable, True)
   1425             self.assertRaises(IOError, reader.send, 2)
   1426             self.assertRaises(IOError, writer.recv)
   1427             self.assertRaises(IOError, writer.poll)
   1428 
   1429     def test_spawn_close(self):
   1430         # We test that a pipe connection can be closed by parent

   1431         # process immediately after child is spawned.  On Windows this

   1432         # would have sometimes failed on old versions because

   1433         # child_conn would be closed before the child got a chance to

   1434         # duplicate it.

   1435         conn, child_conn = self.Pipe()
   1436 
   1437         p = self.Process(target=self._echo, args=(child_conn,))
   1438         p.start()
   1439         child_conn.close()    # this might complete before child initializes

   1440 
   1441         msg = latin('hello')
   1442         conn.send_bytes(msg)
   1443         self.assertEqual(conn.recv_bytes(), msg)
   1444 
   1445         conn.send_bytes(SENTINEL)
   1446         conn.close()
   1447         p.join()
   1448 
   1449     def test_sendbytes(self):
   1450         if self.TYPE != 'processes':
   1451             return
   1452 
   1453         msg = latin('abcdefghijklmnopqrstuvwxyz')
   1454         a, b = self.Pipe()
   1455 
   1456         a.send_bytes(msg)
   1457         self.assertEqual(b.recv_bytes(), msg)
   1458 
   1459         a.send_bytes(msg, 5)
   1460         self.assertEqual(b.recv_bytes(), msg[5:])
   1461 
   1462         a.send_bytes(msg, 7, 8)
   1463         self.assertEqual(b.recv_bytes(), msg[7:7+8])
   1464 
   1465         a.send_bytes(msg, 26)
   1466         self.assertEqual(b.recv_bytes(), latin(''))
   1467 
   1468         a.send_bytes(msg, 26, 0)
   1469         self.assertEqual(b.recv_bytes(), latin(''))
   1470 
   1471         self.assertRaises(ValueError, a.send_bytes, msg, 27)
   1472 
   1473         self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
   1474 
   1475         self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
   1476 
   1477         self.assertRaises(ValueError, a.send_bytes, msg, -1)
   1478 
   1479         self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
   1480 
   1481 class _TestListenerClient(BaseTestCase):
   1482 
   1483     ALLOWED_TYPES = ('processes', 'threads')
   1484 
   1485     @classmethod
   1486     def _test(cls, address):
   1487         conn = cls.connection.Client(address)
   1488         conn.send('hello')
   1489         conn.close()
   1490 
   1491     def test_listener_client(self):
   1492         for family in self.connection.families:
   1493             l = self.connection.Listener(family=family)
   1494             p = self.Process(target=self._test, args=(l.address,))
   1495             p.daemon = True
   1496             p.start()
   1497             conn = l.accept()
   1498             self.assertEqual(conn.recv(), 'hello')
   1499             p.join()
   1500             l.close()
   1501 #

   1502 # Test of sending connection and socket objects between processes

   1503 #

   1504 """
   1505 class _TestPicklingConnections(BaseTestCase):
   1506 
   1507     ALLOWED_TYPES = ('processes',)
   1508 
   1509     def _listener(self, conn, families):
   1510         for fam in families:
   1511             l = self.connection.Listener(family=fam)
   1512             conn.send(l.address)
   1513             new_conn = l.accept()
   1514             conn.send(new_conn)
   1515 
   1516         if self.TYPE == 'processes':
   1517             l = socket.socket()
   1518             l.bind(('localhost', 0))
   1519             conn.send(l.getsockname())
   1520             l.listen(1)
   1521             new_conn, addr = l.accept()
   1522             conn.send(new_conn)
   1523 
   1524         conn.recv()
   1525 
   1526     def _remote(self, conn):
   1527         for (address, msg) in iter(conn.recv, None):
   1528             client = self.connection.Client(address)
   1529             client.send(msg.upper())
   1530             client.close()
   1531 
   1532         if self.TYPE == 'processes':
   1533             address, msg = conn.recv()
   1534             client = socket.socket()
   1535             client.connect(address)
   1536             client.sendall(msg.upper())
   1537             client.close()
   1538 
   1539         conn.close()
   1540 
   1541     def test_pickling(self):
   1542         try:
   1543             multiprocessing.allow_connection_pickling()
   1544         except ImportError:
   1545             return
   1546 
   1547         families = self.connection.families
   1548 
   1549         lconn, lconn0 = self.Pipe()
   1550         lp = self.Process(target=self._listener, args=(lconn0, families))
   1551         lp.start()
   1552         lconn0.close()
   1553 
   1554         rconn, rconn0 = self.Pipe()
   1555         rp = self.Process(target=self._remote, args=(rconn0,))
   1556         rp.start()
   1557         rconn0.close()
   1558 
   1559         for fam in families:
   1560             msg = ('This connection uses family %s' % fam).encode('ascii')
   1561             address = lconn.recv()
   1562             rconn.send((address, msg))
   1563             new_conn = lconn.recv()
   1564             self.assertEqual(new_conn.recv(), msg.upper())
   1565 
   1566         rconn.send(None)
   1567 
   1568         if self.TYPE == 'processes':
   1569             msg = latin('This connection uses a normal socket')
   1570             address = lconn.recv()
   1571             rconn.send((address, msg))
   1572             if hasattr(socket, 'fromfd'):
   1573                 new_conn = lconn.recv()
   1574                 self.assertEqual(new_conn.recv(100), msg.upper())
   1575             else:
   1576                 # XXX On Windows with Py2.6 need to backport fromfd()
   1577                 discard = lconn.recv_bytes()
   1578 
   1579         lconn.send(None)
   1580 
   1581         rconn.close()
   1582         lconn.close()
   1583 
   1584         lp.join()
   1585         rp.join()
   1586 """
   1587 #

   1588 #

   1589 #

   1590 
   1591 class _TestHeap(BaseTestCase):
   1592 
   1593     ALLOWED_TYPES = ('processes',)
   1594 
   1595     def test_heap(self):
   1596         iterations = 5000
   1597         maxblocks = 50
   1598         blocks = []
   1599 
   1600         # create and destroy lots of blocks of different sizes

   1601         for i in xrange(iterations):
   1602             size = int(random.lognormvariate(0, 1) * 1000)
   1603             b = multiprocessing.heap.BufferWrapper(size)
   1604             blocks.append(b)
   1605             if len(blocks) > maxblocks:
   1606                 i = random.randrange(maxblocks)
   1607                 del blocks[i]
   1608 
   1609         # get the heap object

   1610         heap = multiprocessing.heap.BufferWrapper._heap
   1611 
   1612         # verify the state of the heap

   1613         all = []
   1614         occupied = 0
   1615         for L in heap._len_to_seq.values():
   1616             for arena, start, stop in L:
   1617                 all.append((heap._arenas.index(arena), start, stop,
   1618                             stop-start, 'free'))
   1619         for arena, start, stop in heap._allocated_blocks:
   1620             all.append((heap._arenas.index(arena), start, stop,
   1621                         stop-start, 'occupied'))
   1622             occupied += (stop-start)
   1623 
   1624         all.sort()
   1625 
   1626         for i in range(len(all)-1):
   1627             (arena, start, stop) = all[i][:3]
   1628             (narena, nstart, nstop) = all[i+1][:3]
   1629             self.assertTrue((arena != narena and nstart == 0) or
   1630                             (stop == nstart))
   1631 
   1632 #

   1633 #

   1634 #

   1635 
   1636 class _Foo(Structure):
   1637     _fields_ = [
   1638         ('x', c_int),
   1639         ('y', c_double)
   1640         ]
   1641 
   1642 class _TestSharedCTypes(BaseTestCase):
   1643 
   1644     ALLOWED_TYPES = ('processes',)
   1645 
   1646     def setUp(self):
   1647         if not HAS_SHAREDCTYPES:
   1648             self.skipTest("requires multiprocessing.sharedctypes")
   1649 
   1650     @classmethod
   1651     def _double(cls, x, y, foo, arr, string):
   1652         x.value *= 2
   1653         y.value *= 2
   1654         foo.x *= 2
   1655         foo.y *= 2
   1656         string.value *= 2
   1657         for i in range(len(arr)):
   1658             arr[i] *= 2
   1659 
   1660     def test_sharedctypes(self, lock=False):
   1661         x = Value('i', 7, lock=lock)
   1662         y = Value(c_double, 1.0/3.0, lock=lock)
   1663         foo = Value(_Foo, 3, 2, lock=lock)
   1664         arr = self.Array('d', range(10), lock=lock)
   1665         string = self.Array('c', 20, lock=lock)
   1666         string.value = latin('hello')
   1667 
   1668         p = self.Process(target=self._double, args=(x, y, foo, arr, string))
   1669         p.start()
   1670         p.join()
   1671 
   1672         self.assertEqual(x.value, 14)
   1673         self.assertAlmostEqual(y.value, 2.0/3.0)
   1674         self.assertEqual(foo.x, 6)
   1675         self.assertAlmostEqual(foo.y, 4.0)
   1676         for i in range(10):
   1677             self.assertAlmostEqual(arr[i], i*2)
   1678         self.assertEqual(string.value, latin('hellohello'))
   1679 
   1680     def test_synchronize(self):
   1681         self.test_sharedctypes(lock=True)
   1682 
   1683     def test_copy(self):
   1684         foo = _Foo(2, 5.0)
   1685         bar = copy(foo)
   1686         foo.x = 0
   1687         foo.y = 0
   1688         self.assertEqual(bar.x, 2)
   1689         self.assertAlmostEqual(bar.y, 5.0)
   1690 
   1691 #

   1692 #

   1693 #

   1694 
   1695 class _TestFinalize(BaseTestCase):
   1696 
   1697     ALLOWED_TYPES = ('processes',)
   1698 
   1699     @classmethod
   1700     def _test_finalize(cls, conn):
   1701         class Foo(object):
   1702             pass
   1703 
   1704         a = Foo()
   1705         util.Finalize(a, conn.send, args=('a',))
   1706         del a           # triggers callback for a

   1707 
   1708         b = Foo()
   1709         close_b = util.Finalize(b, conn.send, args=('b',))
   1710         close_b()       # triggers callback for b

   1711         close_b()       # does nothing because callback has already been called

   1712         del b           # does nothing because callback has already been called

   1713 
   1714         c = Foo()
   1715         util.Finalize(c, conn.send, args=('c',))
   1716 
   1717         d10 = Foo()
   1718         util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
   1719 
   1720         d01 = Foo()
   1721         util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
   1722         d02 = Foo()
   1723         util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
   1724         d03 = Foo()
   1725         util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
   1726 
   1727         util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
   1728 
   1729         util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
   1730 
   1731         # call multiprocessing's cleanup function then exit process without

   1732         # garbage collecting locals

   1733         util._exit_function()
   1734         conn.close()
   1735         os._exit(0)
   1736 
   1737     def test_finalize(self):
   1738         conn, child_conn = self.Pipe()
   1739 
   1740         p = self.Process(target=self._test_finalize, args=(child_conn,))
   1741         p.start()
   1742         p.join()
   1743 
   1744         result = [obj for obj in iter(conn.recv, 'STOP')]
   1745         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
   1746 
   1747 #

   1748 # Test that from ... import * works for each module

   1749 #

   1750 
   1751 class _TestImportStar(BaseTestCase):
   1752 
   1753     ALLOWED_TYPES = ('processes',)
   1754 
   1755     def test_import(self):
   1756         modules = [
   1757             'multiprocessing', 'multiprocessing.connection',
   1758             'multiprocessing.heap', 'multiprocessing.managers',
   1759             'multiprocessing.pool', 'multiprocessing.process',
   1760             'multiprocessing.reduction',
   1761             'multiprocessing.synchronize', 'multiprocessing.util'
   1762             ]
   1763 
   1764         if c_int is not None:
   1765             # This module requires _ctypes

   1766             modules.append('multiprocessing.sharedctypes')
   1767 
   1768         for name in modules:
   1769             __import__(name)
   1770             mod = sys.modules[name]
   1771 
   1772             for attr in getattr(mod, '__all__', ()):
   1773                 self.assertTrue(
   1774                     hasattr(mod, attr),
   1775                     '%r does not have attribute %r' % (mod, attr)
   1776                     )
   1777 
   1778 #

   1779 # Quick test that logging works -- does not test logging output

   1780 #

   1781 
   1782 class _TestLogging(BaseTestCase):
   1783 
   1784     ALLOWED_TYPES = ('processes',)
   1785 
   1786     def test_enable_logging(self):
   1787         logger = multiprocessing.get_logger()
   1788         logger.setLevel(util.SUBWARNING)
   1789         self.assertTrue(logger is not None)
   1790         logger.debug('this will not be printed')
   1791         logger.info('nor will this')
   1792         logger.setLevel(LOG_LEVEL)
   1793 
   1794     @classmethod
   1795     def _test_level(cls, conn):
   1796         logger = multiprocessing.get_logger()
   1797         conn.send(logger.getEffectiveLevel())
   1798 
   1799     def test_level(self):
   1800         LEVEL1 = 32
   1801         LEVEL2 = 37
   1802 
   1803         logger = multiprocessing.get_logger()
   1804         root_logger = logging.getLogger()
   1805         root_level = root_logger.level
   1806 
   1807         reader, writer = multiprocessing.Pipe(duplex=False)
   1808 
   1809         logger.setLevel(LEVEL1)
   1810         self.Process(target=self._test_level, args=(writer,)).start()
   1811         self.assertEqual(LEVEL1, reader.recv())
   1812 
   1813         logger.setLevel(logging.NOTSET)
   1814         root_logger.setLevel(LEVEL2)
   1815         self.Process(target=self._test_level, args=(writer,)).start()
   1816         self.assertEqual(LEVEL2, reader.recv())
   1817 
   1818         root_logger.setLevel(root_level)
   1819         logger.setLevel(level=LOG_LEVEL)
   1820 
   1821 
   1822 # class _TestLoggingProcessName(BaseTestCase):

   1823 #

   1824 #     def handle(self, record):

   1825 #         assert record.processName == multiprocessing.current_process().name

   1826 #         self.__handled = True

   1827 #

   1828 #     def test_logging(self):

   1829 #         handler = logging.Handler()

   1830 #         handler.handle = self.handle

   1831 #         self.__handled = False

   1832 #         # Bypass getLogger() and side-effects

   1833 #         logger = logging.getLoggerClass()(

   1834 #                 'multiprocessing.test.TestLoggingProcessName')

   1835 #         logger.addHandler(handler)

   1836 #         logger.propagate = False

   1837 #

   1838 #         logger.warn('foo')

   1839 #         assert self.__handled

   1840 
   1841 #

   1842 # Test to verify handle verification, see issue 3321

   1843 #

   1844 
   1845 class TestInvalidHandle(unittest.TestCase):
   1846 
   1847     @unittest.skipIf(WIN32, "skipped on Windows")
   1848     def test_invalid_handles(self):
   1849         conn = _multiprocessing.Connection(44977608)
   1850         self.assertRaises(IOError, conn.poll)
   1851         self.assertRaises(IOError, _multiprocessing.Connection, -1)
   1852 
   1853 #

   1854 # Functions used to create test cases from the base ones in this module

   1855 #

   1856 
   1857 def get_attributes(Source, names):
   1858     d = {}
   1859     for name in names:
   1860         obj = getattr(Source, name)
   1861         if type(obj) == type(get_attributes):
   1862             obj = staticmethod(obj)
   1863         d[name] = obj
   1864     return d
   1865 
   1866 def create_test_cases(Mixin, type):
   1867     result = {}
   1868     glob = globals()
   1869     Type = type.capitalize()
   1870 
   1871     for name in glob.keys():
   1872         if name.startswith('_Test'):
   1873             base = glob[name]
   1874             if type in base.ALLOWED_TYPES:
   1875                 newname = 'With' + Type + name[1:]
   1876                 class Temp(base, unittest.TestCase, Mixin):
   1877                     pass
   1878                 result[newname] = Temp
   1879                 Temp.__name__ = newname
   1880                 Temp.__module__ = Mixin.__module__
   1881     return result
   1882 
   1883 #

   1884 # Create test cases

   1885 #

   1886 
   1887 class ProcessesMixin(object):
   1888     TYPE = 'processes'
   1889     Process = multiprocessing.Process
   1890     locals().update(get_attributes(multiprocessing, (
   1891         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
   1892         'Condition', 'Event', 'Value', 'Array', 'RawValue',
   1893         'RawArray', 'current_process', 'active_children', 'Pipe',
   1894         'connection', 'JoinableQueue'
   1895         )))
   1896 
   1897 testcases_processes = create_test_cases(ProcessesMixin, type='processes')
   1898 globals().update(testcases_processes)
   1899 
   1900 
   1901 class ManagerMixin(object):
   1902     TYPE = 'manager'
   1903     Process = multiprocessing.Process
   1904     manager = object.__new__(multiprocessing.managers.SyncManager)
   1905     locals().update(get_attributes(manager, (
   1906         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
   1907        'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
   1908         'Namespace', 'JoinableQueue'
   1909         )))
   1910 
   1911 testcases_manager = create_test_cases(ManagerMixin, type='manager')
   1912 globals().update(testcases_manager)
   1913 
   1914 
   1915 class ThreadsMixin(object):
   1916     TYPE = 'threads'
   1917     Process = multiprocessing.dummy.Process
   1918     locals().update(get_attributes(multiprocessing.dummy, (
   1919         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
   1920         'Condition', 'Event', 'Value', 'Array', 'current_process',
   1921         'active_children', 'Pipe', 'connection', 'dict', 'list',
   1922         'Namespace', 'JoinableQueue'
   1923         )))
   1924 
   1925 testcases_threads = create_test_cases(ThreadsMixin, type='threads')
   1926 globals().update(testcases_threads)
   1927 
   1928 class OtherTest(unittest.TestCase):
   1929     # TODO: add more tests for deliver/answer challenge.

   1930     def test_deliver_challenge_auth_failure(self):
   1931         class _FakeConnection(object):
   1932             def recv_bytes(self, size):
   1933                 return b'something bogus'
   1934             def send_bytes(self, data):
   1935                 pass
   1936         self.assertRaises(multiprocessing.AuthenticationError,
   1937                           multiprocessing.connection.deliver_challenge,
   1938                           _FakeConnection(), b'abc')
   1939 
   1940     def test_answer_challenge_auth_failure(self):
   1941         class _FakeConnection(object):
   1942             def __init__(self):
   1943                 self.count = 0
   1944             def recv_bytes(self, size):
   1945                 self.count += 1
   1946                 if self.count == 1:
   1947                     return multiprocessing.connection.CHALLENGE
   1948                 elif self.count == 2:
   1949                     return b'something bogus'
   1950                 return b''
   1951             def send_bytes(self, data):
   1952                 pass
   1953         self.assertRaises(multiprocessing.AuthenticationError,
   1954                           multiprocessing.connection.answer_challenge,
   1955                           _FakeConnection(), b'abc')
   1956 
   1957 #

   1958 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585

   1959 #

   1960 
   1961 def initializer(ns):
   1962     ns.test += 1
   1963 
   1964 class TestInitializers(unittest.TestCase):
   1965     def setUp(self):
   1966         self.mgr = multiprocessing.Manager()
   1967         self.ns = self.mgr.Namespace()
   1968         self.ns.test = 0
   1969 
   1970     def tearDown(self):
   1971         self.mgr.shutdown()
   1972 
   1973     def test_manager_initializer(self):
   1974         m = multiprocessing.managers.SyncManager()
   1975         self.assertRaises(TypeError, m.start, 1)
   1976         m.start(initializer, (self.ns,))
   1977         self.assertEqual(self.ns.test, 1)
   1978         m.shutdown()
   1979 
   1980     def test_pool_initializer(self):
   1981         self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
   1982         p = multiprocessing.Pool(1, initializer, (self.ns,))
   1983         p.close()
   1984         p.join()
   1985         self.assertEqual(self.ns.test, 1)
   1986 
   1987 #

   1988 # Issue 5155, 5313, 5331: Test process in processes

   1989 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior

   1990 #

   1991 
   1992 def _ThisSubProcess(q):
   1993     try:
   1994         item = q.get(block=False)
   1995     except Queue.Empty:
   1996         pass
   1997 
   1998 def _TestProcess(q):
   1999     queue = multiprocessing.Queue()
   2000     subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
   2001     subProc.start()
   2002     subProc.join()
   2003 
   2004 def _afunc(x):
   2005     return x*x
   2006 
   2007 def pool_in_process():
   2008     pool = multiprocessing.Pool(processes=4)
   2009     x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
   2010 
   2011 class _file_like(object):
   2012     def __init__(self, delegate):
   2013         self._delegate = delegate
   2014         self._pid = None
   2015 
   2016     @property
   2017     def cache(self):
   2018         pid = os.getpid()
   2019         # There are no race conditions since fork keeps only the running thread

   2020         if pid != self._pid:
   2021             self._pid = pid
   2022             self._cache = []
   2023         return self._cache
   2024 
   2025     def write(self, data):
   2026         self.cache.append(data)
   2027 
   2028     def flush(self):
   2029         self._delegate.write(''.join(self.cache))
   2030         self._cache = []
   2031 
   2032 class TestStdinBadfiledescriptor(unittest.TestCase):
   2033 
   2034     def test_queue_in_process(self):
   2035         queue = multiprocessing.Queue()
   2036         proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
   2037         proc.start()
   2038         proc.join()
   2039 
   2040     def test_pool_in_process(self):
   2041         p = multiprocessing.Process(target=pool_in_process)
   2042         p.start()
   2043         p.join()
   2044 
   2045     def test_flushing(self):
   2046         sio = StringIO()
   2047         flike = _file_like(sio)
   2048         flike.write('foo')
   2049         proc = multiprocessing.Process(target=lambda: flike.flush())
   2050         flike.flush()
   2051         assert sio.getvalue() == 'foo'
   2052 
   2053 testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
   2054                    TestStdinBadfiledescriptor]
   2055 
   2056 #

   2057 #

   2058 #

   2059 
   2060 def test_main(run=None):
   2061     if sys.platform.startswith("linux"):
   2062         try:
   2063             lock = multiprocessing.RLock()
   2064         except OSError:
   2065             raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
   2066 
   2067     if run is None:
   2068         from test.test_support import run_unittest as run
   2069 
   2070     util.get_temp_dir()     # creates temp directory for use by all processes

   2071 
   2072     multiprocessing.get_logger().setLevel(LOG_LEVEL)
   2073 
   2074     ProcessesMixin.pool = multiprocessing.Pool(4)
   2075     ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
   2076     ManagerMixin.manager.__init__()
   2077     ManagerMixin.manager.start()
   2078     ManagerMixin.pool = ManagerMixin.manager.Pool(4)
   2079 
   2080     testcases = (
   2081         sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
   2082         sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
   2083         sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
   2084         testcases_other
   2085         )
   2086 
   2087     loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
   2088     suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
   2089     # (ncoghlan): Whether or not sys.exc_clear is executed by the threading

   2090     # module during these tests is at least platform dependent and possibly

   2091     # non-deterministic on any given platform. So we don't mind if the listed

   2092     # warnings aren't actually raised.

   2093     with test_support.check_py3k_warnings(
   2094             (".+__(get|set)slice__ has been removed", DeprecationWarning),
   2095             (r"sys.exc_clear\(\) not supported", DeprecationWarning),
   2096             quiet=True):
   2097         run(suite)
   2098 
   2099     ThreadsMixin.pool.terminate()
   2100     ProcessesMixin.pool.terminate()
   2101     ManagerMixin.pool.terminate()
   2102     ManagerMixin.manager.shutdown()
   2103 
   2104     del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
   2105 
   2106 def main():
   2107     test_main(unittest.TextTestRunner(verbosity=2).run)
   2108 
   2109 if __name__ == '__main__':
   2110     main()
   2111