Home | History | Annotate | Download | only in test
      1 #
      2 # Unit tests for the multiprocessing package
      3 #
      4 
      5 import unittest
      6 import queue as pyqueue
      7 import time
      8 import io
      9 import itertools
     10 import sys
     11 import os
     12 import gc
     13 import errno
     14 import signal
     15 import array
     16 import socket
     17 import random
     18 import logging
     19 import struct
     20 import operator
     21 import test.support
     22 import test.support.script_helper
     23 
     24 
     25 # Skip tests if _multiprocessing wasn't built.
     26 _multiprocessing = test.support.import_module('_multiprocessing')
     27 # Skip tests if sem_open implementation is broken.
     28 test.support.import_module('multiprocessing.synchronize')
     29 # import threading after _multiprocessing to raise a more relevant error
     30 # message: "No module named _multiprocessing". _multiprocessing is not compiled
     31 # without thread support.
     32 import threading
     33 
     34 import multiprocessing.dummy
     35 import multiprocessing.connection
     36 import multiprocessing.managers
     37 import multiprocessing.heap
     38 import multiprocessing.pool
     39 
     40 from multiprocessing import util
     41 
     42 try:
     43     from multiprocessing import reduction
     44     HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
     45 except ImportError:
     46     HAS_REDUCTION = False
     47 
     48 try:
     49     from multiprocessing.sharedctypes import Value, copy
     50     HAS_SHAREDCTYPES = True
     51 except ImportError:
     52     HAS_SHAREDCTYPES = False
     53 
     54 try:
     55     import msvcrt
     56 except ImportError:
     57     msvcrt = None
     58 
     59 #
     60 #
     61 #
     62 
     63 def latin(s):
     64     return s.encode('latin')
     65 
     66 #
     67 # Constants
     68 #
     69 
     70 LOG_LEVEL = util.SUBWARNING
     71 #LOG_LEVEL = logging.DEBUG
     72 
     73 DELTA = 0.1
     74 CHECK_TIMINGS = False     # making true makes tests take a lot longer
     75                           # and can sometimes cause some non-serious
     76                           # failures because some calls block a bit
     77                           # longer than expected
     78 if CHECK_TIMINGS:
     79     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
     80 else:
     81     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
     82 
     83 HAVE_GETVALUE = not getattr(_multiprocessing,
     84                             'HAVE_BROKEN_SEM_GETVALUE', False)
     85 
     86 WIN32 = (sys.platform == "win32")
     87 
     88 from multiprocessing.connection import wait
     89 
     90 def wait_for_handle(handle, timeout):
     91     if timeout is not None and timeout < 0.0:
     92         timeout = None
     93     return wait([handle], timeout)
     94 
     95 try:
     96     MAXFD = os.sysconf("SC_OPEN_MAX")
     97 except:
     98     MAXFD = 256
     99 
    100 # To speed up tests when using the forkserver, we can preload these:
    101 PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
    102 
    103 #
    104 # Some tests require ctypes
    105 #
    106 
    107 try:
    108     from ctypes import Structure, c_int, c_double
    109 except ImportError:
    110     Structure = object
    111     c_int = c_double = None
    112 
    113 
    114 def check_enough_semaphores():
    115     """Check that the system supports enough semaphores to run the test."""
    116     # minimum number of semaphores available according to POSIX
    117     nsems_min = 256
    118     try:
    119         nsems = os.sysconf("SC_SEM_NSEMS_MAX")
    120     except (AttributeError, ValueError):
    121         # sysconf not available or setting not available
    122         return
    123     if nsems == -1 or nsems >= nsems_min:
    124         return
    125     raise unittest.SkipTest("The OS doesn't support enough semaphores "
    126                             "to run the test (required: %d)." % nsems_min)
    127 
    128 
    129 #
    130 # Creates a wrapper for a function which records the time it takes to finish
    131 #
    132 
    133 class TimingWrapper(object):
    134 
    135     def __init__(self, func):
    136         self.func = func
    137         self.elapsed = None
    138 
    139     def __call__(self, *args, **kwds):
    140         t = time.time()
    141         try:
    142             return self.func(*args, **kwds)
    143         finally:
    144             self.elapsed = time.time() - t
    145 
    146 #
    147 # Base class for test cases
    148 #
    149 
    150 class BaseTestCase(object):
    151 
    152     ALLOWED_TYPES = ('processes', 'manager', 'threads')
    153 
    154     def assertTimingAlmostEqual(self, a, b):
    155         if CHECK_TIMINGS:
    156             self.assertAlmostEqual(a, b, 1)
    157 
    158     def assertReturnsIfImplemented(self, value, func, *args):
    159         try:
    160             res = func(*args)
    161         except NotImplementedError:
    162             pass
    163         else:
    164             return self.assertEqual(value, res)
    165 
    166     # For the sanity of Windows users, rather than crashing or freezing in
    167     # multiple ways.
    168     def __reduce__(self, *args):
    169         raise NotImplementedError("shouldn't try to pickle a test case")
    170 
    171     __reduce_ex__ = __reduce__
    172 
    173 #
    174 # Return the value of a semaphore
    175 #
    176 
    177 def get_value(self):
    178     try:
    179         return self.get_value()
    180     except AttributeError:
    181         try:
    182             return self._Semaphore__value
    183         except AttributeError:
    184             try:
    185                 return self._value
    186             except AttributeError:
    187                 raise NotImplementedError
    188 
    189 #
    190 # Testcases
    191 #
    192 
    193 class _TestProcess(BaseTestCase):
    194 
    195     ALLOWED_TYPES = ('processes', 'threads')
    196 
    197     def test_current(self):
    198         if self.TYPE == 'threads':
    199             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    200 
    201         current = self.current_process()
    202         authkey = current.authkey
    203 
    204         self.assertTrue(current.is_alive())
    205         self.assertTrue(not current.daemon)
    206         self.assertIsInstance(authkey, bytes)
    207         self.assertTrue(len(authkey) > 0)
    208         self.assertEqual(current.ident, os.getpid())
    209         self.assertEqual(current.exitcode, None)
    210 
    211     def test_daemon_argument(self):
    212         if self.TYPE == "threads":
    213             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    214 
    215         # By default uses the current process's daemon flag.
    216         proc0 = self.Process(target=self._test)
    217         self.assertEqual(proc0.daemon, self.current_process().daemon)
    218         proc1 = self.Process(target=self._test, daemon=True)
    219         self.assertTrue(proc1.daemon)
    220         proc2 = self.Process(target=self._test, daemon=False)
    221         self.assertFalse(proc2.daemon)
    222 
    223     @classmethod
    224     def _test(cls, q, *args, **kwds):
    225         current = cls.current_process()
    226         q.put(args)
    227         q.put(kwds)
    228         q.put(current.name)
    229         if cls.TYPE != 'threads':
    230             q.put(bytes(current.authkey))
    231             q.put(current.pid)
    232 
    233     def test_process(self):
    234         q = self.Queue(1)
    235         e = self.Event()
    236         args = (q, 1, 2)
    237         kwargs = {'hello':23, 'bye':2.54}
    238         name = 'SomeProcess'
    239         p = self.Process(
    240             target=self._test, args=args, kwargs=kwargs, name=name
    241             )
    242         p.daemon = True
    243         current = self.current_process()
    244 
    245         if self.TYPE != 'threads':
    246             self.assertEqual(p.authkey, current.authkey)
    247         self.assertEqual(p.is_alive(), False)
    248         self.assertEqual(p.daemon, True)
    249         self.assertNotIn(p, self.active_children())
    250         self.assertTrue(type(self.active_children()) is list)
    251         self.assertEqual(p.exitcode, None)
    252 
    253         p.start()
    254 
    255         self.assertEqual(p.exitcode, None)
    256         self.assertEqual(p.is_alive(), True)
    257         self.assertIn(p, self.active_children())
    258 
    259         self.assertEqual(q.get(), args[1:])
    260         self.assertEqual(q.get(), kwargs)
    261         self.assertEqual(q.get(), p.name)
    262         if self.TYPE != 'threads':
    263             self.assertEqual(q.get(), current.authkey)
    264             self.assertEqual(q.get(), p.pid)
    265 
    266         p.join()
    267 
    268         self.assertEqual(p.exitcode, 0)
    269         self.assertEqual(p.is_alive(), False)
    270         self.assertNotIn(p, self.active_children())
    271 
    272     @classmethod
    273     def _test_terminate(cls):
    274         time.sleep(100)
    275 
    276     def test_terminate(self):
    277         if self.TYPE == 'threads':
    278             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    279 
    280         p = self.Process(target=self._test_terminate)
    281         p.daemon = True
    282         p.start()
    283 
    284         self.assertEqual(p.is_alive(), True)
    285         self.assertIn(p, self.active_children())
    286         self.assertEqual(p.exitcode, None)
    287 
    288         join = TimingWrapper(p.join)
    289 
    290         self.assertEqual(join(0), None)
    291         self.assertTimingAlmostEqual(join.elapsed, 0.0)
    292         self.assertEqual(p.is_alive(), True)
    293 
    294         self.assertEqual(join(-1), None)
    295         self.assertTimingAlmostEqual(join.elapsed, 0.0)
    296         self.assertEqual(p.is_alive(), True)
    297 
    298         # XXX maybe terminating too soon causes the problems on Gentoo...
    299         time.sleep(1)
    300 
    301         p.terminate()
    302 
    303         if hasattr(signal, 'alarm'):
    304             # On the Gentoo buildbot waitpid() often seems to block forever.
    305             # We use alarm() to interrupt it if it blocks for too long.
    306             def handler(*args):
    307                 raise RuntimeError('join took too long: %s' % p)
    308             old_handler = signal.signal(signal.SIGALRM, handler)
    309             try:
    310                 signal.alarm(10)
    311                 self.assertEqual(join(), None)
    312             finally:
    313                 signal.alarm(0)
    314                 signal.signal(signal.SIGALRM, old_handler)
    315         else:
    316             self.assertEqual(join(), None)
    317 
    318         self.assertTimingAlmostEqual(join.elapsed, 0.0)
    319 
    320         self.assertEqual(p.is_alive(), False)
    321         self.assertNotIn(p, self.active_children())
    322 
    323         p.join()
    324 
    325         # XXX sometimes get p.exitcode == 0 on Windows ...
    326         #self.assertEqual(p.exitcode, -signal.SIGTERM)
    327 
    328     def test_cpu_count(self):
    329         try:
    330             cpus = multiprocessing.cpu_count()
    331         except NotImplementedError:
    332             cpus = 1
    333         self.assertTrue(type(cpus) is int)
    334         self.assertTrue(cpus >= 1)
    335 
    336     def test_active_children(self):
    337         self.assertEqual(type(self.active_children()), list)
    338 
    339         p = self.Process(target=time.sleep, args=(DELTA,))
    340         self.assertNotIn(p, self.active_children())
    341 
    342         p.daemon = True
    343         p.start()
    344         self.assertIn(p, self.active_children())
    345 
    346         p.join()
    347         self.assertNotIn(p, self.active_children())
    348 
    349     @classmethod
    350     def _test_recursion(cls, wconn, id):
    351         wconn.send(id)
    352         if len(id) < 2:
    353             for i in range(2):
    354                 p = cls.Process(
    355                     target=cls._test_recursion, args=(wconn, id+[i])
    356                     )
    357                 p.start()
    358                 p.join()
    359 
    360     def test_recursion(self):
    361         rconn, wconn = self.Pipe(duplex=False)
    362         self._test_recursion(wconn, [])
    363 
    364         time.sleep(DELTA)
    365         result = []
    366         while rconn.poll():
    367             result.append(rconn.recv())
    368 
    369         expected = [
    370             [],
    371               [0],
    372                 [0, 0],
    373                 [0, 1],
    374               [1],
    375                 [1, 0],
    376                 [1, 1]
    377             ]
    378         self.assertEqual(result, expected)
    379 
    380     @classmethod
    381     def _test_sentinel(cls, event):
    382         event.wait(10.0)
    383 
    384     def test_sentinel(self):
    385         if self.TYPE == "threads":
    386             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    387         event = self.Event()
    388         p = self.Process(target=self._test_sentinel, args=(event,))
    389         with self.assertRaises(ValueError):
    390             p.sentinel
    391         p.start()
    392         self.addCleanup(p.join)
    393         sentinel = p.sentinel
    394         self.assertIsInstance(sentinel, int)
    395         self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
    396         event.set()
    397         p.join()
    398         self.assertTrue(wait_for_handle(sentinel, timeout=1))
    399 
    400 #
    401 #
    402 #
    403 
    404 class _UpperCaser(multiprocessing.Process):
    405 
    406     def __init__(self):
    407         multiprocessing.Process.__init__(self)
    408         self.child_conn, self.parent_conn = multiprocessing.Pipe()
    409 
    410     def run(self):
    411         self.parent_conn.close()
    412         for s in iter(self.child_conn.recv, None):
    413             self.child_conn.send(s.upper())
    414         self.child_conn.close()
    415 
    416     def submit(self, s):
    417         assert type(s) is str
    418         self.parent_conn.send(s)
    419         return self.parent_conn.recv()
    420 
    421     def stop(self):
    422         self.parent_conn.send(None)
    423         self.parent_conn.close()
    424         self.child_conn.close()
    425 
    426 class _TestSubclassingProcess(BaseTestCase):
    427 
    428     ALLOWED_TYPES = ('processes',)
    429 
    430     def test_subclassing(self):
    431         uppercaser = _UpperCaser()
    432         uppercaser.daemon = True
    433         uppercaser.start()
    434         self.assertEqual(uppercaser.submit('hello'), 'HELLO')
    435         self.assertEqual(uppercaser.submit('world'), 'WORLD')
    436         uppercaser.stop()
    437         uppercaser.join()
    438 
    439     def test_stderr_flush(self):
    440         # sys.stderr is flushed at process shutdown (issue #13812)
    441         if self.TYPE == "threads":
    442             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    443 
    444         testfn = test.support.TESTFN
    445         self.addCleanup(test.support.unlink, testfn)
    446         proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
    447         proc.start()
    448         proc.join()
    449         with open(testfn, 'r') as f:
    450             err = f.read()
    451             # The whole traceback was printed
    452             self.assertIn("ZeroDivisionError", err)
    453             self.assertIn("test_multiprocessing.py", err)
    454             self.assertIn("1/0 # MARKER", err)
    455 
    456     @classmethod
    457     def _test_stderr_flush(cls, testfn):
    458         fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
    459         sys.stderr = open(fd, 'w', closefd=False)
    460         1/0 # MARKER
    461 
    462 
    463     @classmethod
    464     def _test_sys_exit(cls, reason, testfn):
    465         fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
    466         sys.stderr = open(fd, 'w', closefd=False)
    467         sys.exit(reason)
    468 
    469     def test_sys_exit(self):
    470         # See Issue 13854
    471         if self.TYPE == 'threads':
    472             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    473 
    474         testfn = test.support.TESTFN
    475         self.addCleanup(test.support.unlink, testfn)
    476 
    477         for reason in (
    478             [1, 2, 3],
    479             'ignore this',
    480         ):
    481             p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
    482             p.daemon = True
    483             p.start()
    484             p.join(5)
    485             self.assertEqual(p.exitcode, 1)
    486 
    487             with open(testfn, 'r') as f:
    488                 content = f.read()
    489             self.assertEqual(content.rstrip(), str(reason))
    490 
    491             os.unlink(testfn)
    492 
    493         for reason in (True, False, 8):
    494             p = self.Process(target=sys.exit, args=(reason,))
    495             p.daemon = True
    496             p.start()
    497             p.join(5)
    498             self.assertEqual(p.exitcode, reason)
    499 
    500 #
    501 #
    502 #
    503 
    504 def queue_empty(q):
    505     if hasattr(q, 'empty'):
    506         return q.empty()
    507     else:
    508         return q.qsize() == 0
    509 
    510 def queue_full(q, maxsize):
    511     if hasattr(q, 'full'):
    512         return q.full()
    513     else:
    514         return q.qsize() == maxsize
    515 
    516 
    517 class _TestQueue(BaseTestCase):
    518 
    519 
    520     @classmethod
    521     def _test_put(cls, queue, child_can_start, parent_can_continue):
    522         child_can_start.wait()
    523         for i in range(6):
    524             queue.get()
    525         parent_can_continue.set()
    526 
    527     def test_put(self):
    528         MAXSIZE = 6
    529         queue = self.Queue(maxsize=MAXSIZE)
    530         child_can_start = self.Event()
    531         parent_can_continue = self.Event()
    532 
    533         proc = self.Process(
    534             target=self._test_put,
    535             args=(queue, child_can_start, parent_can_continue)
    536             )
    537         proc.daemon = True
    538         proc.start()
    539 
    540         self.assertEqual(queue_empty(queue), True)
    541         self.assertEqual(queue_full(queue, MAXSIZE), False)
    542 
    543         queue.put(1)
    544         queue.put(2, True)
    545         queue.put(3, True, None)
    546         queue.put(4, False)
    547         queue.put(5, False, None)
    548         queue.put_nowait(6)
    549 
    550         # the values may be in buffer but not yet in pipe so sleep a bit
    551         time.sleep(DELTA)
    552 
    553         self.assertEqual(queue_empty(queue), False)
    554         self.assertEqual(queue_full(queue, MAXSIZE), True)
    555 
    556         put = TimingWrapper(queue.put)
    557         put_nowait = TimingWrapper(queue.put_nowait)
    558 
    559         self.assertRaises(pyqueue.Full, put, 7, False)
    560         self.assertTimingAlmostEqual(put.elapsed, 0)
    561 
    562         self.assertRaises(pyqueue.Full, put, 7, False, None)
    563         self.assertTimingAlmostEqual(put.elapsed, 0)
    564 
    565         self.assertRaises(pyqueue.Full, put_nowait, 7)
    566         self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
    567 
    568         self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
    569         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
    570 
    571         self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
    572         self.assertTimingAlmostEqual(put.elapsed, 0)
    573 
    574         self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
    575         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
    576 
    577         child_can_start.set()
    578         parent_can_continue.wait()
    579 
    580         self.assertEqual(queue_empty(queue), True)
    581         self.assertEqual(queue_full(queue, MAXSIZE), False)
    582 
    583         proc.join()
    584 
    585     @classmethod
    586     def _test_get(cls, queue, child_can_start, parent_can_continue):
    587         child_can_start.wait()
    588         #queue.put(1)
    589         queue.put(2)
    590         queue.put(3)
    591         queue.put(4)
    592         queue.put(5)
    593         parent_can_continue.set()
    594 
    595     def test_get(self):
    596         queue = self.Queue()
    597         child_can_start = self.Event()
    598         parent_can_continue = self.Event()
    599 
    600         proc = self.Process(
    601             target=self._test_get,
    602             args=(queue, child_can_start, parent_can_continue)
    603             )
    604         proc.daemon = True
    605         proc.start()
    606 
    607         self.assertEqual(queue_empty(queue), True)
    608 
    609         child_can_start.set()
    610         parent_can_continue.wait()
    611 
    612         time.sleep(DELTA)
    613         self.assertEqual(queue_empty(queue), False)
    614 
    615         # Hangs unexpectedly, remove for now
    616         #self.assertEqual(queue.get(), 1)
    617         self.assertEqual(queue.get(True, None), 2)
    618         self.assertEqual(queue.get(True), 3)
    619         self.assertEqual(queue.get(timeout=1), 4)
    620         self.assertEqual(queue.get_nowait(), 5)
    621 
    622         self.assertEqual(queue_empty(queue), True)
    623 
    624         get = TimingWrapper(queue.get)
    625         get_nowait = TimingWrapper(queue.get_nowait)
    626 
    627         self.assertRaises(pyqueue.Empty, get, False)
    628         self.assertTimingAlmostEqual(get.elapsed, 0)
    629 
    630         self.assertRaises(pyqueue.Empty, get, False, None)
    631         self.assertTimingAlmostEqual(get.elapsed, 0)
    632 
    633         self.assertRaises(pyqueue.Empty, get_nowait)
    634         self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
    635 
    636         self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
    637         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
    638 
    639         self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
    640         self.assertTimingAlmostEqual(get.elapsed, 0)
    641 
    642         self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
    643         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
    644 
    645         proc.join()
    646 
    647     @classmethod
    648     def _test_fork(cls, queue):
    649         for i in range(10, 20):
    650             queue.put(i)
    651         # note that at this point the items may only be buffered, so the
    652         # process cannot shutdown until the feeder thread has finished
    653         # pushing items onto the pipe.
    654 
    655     def test_fork(self):
    656         # Old versions of Queue would fail to create a new feeder
    657         # thread for a forked process if the original process had its
    658         # own feeder thread.  This test checks that this no longer
    659         # happens.
    660 
    661         queue = self.Queue()
    662 
    663         # put items on queue so that main process starts a feeder thread
    664         for i in range(10):
    665             queue.put(i)
    666 
    667         # wait to make sure thread starts before we fork a new process
    668         time.sleep(DELTA)
    669 
    670         # fork process
    671         p = self.Process(target=self._test_fork, args=(queue,))
    672         p.daemon = True
    673         p.start()
    674 
    675         # check that all expected items are in the queue
    676         for i in range(20):
    677             self.assertEqual(queue.get(), i)
    678         self.assertRaises(pyqueue.Empty, queue.get, False)
    679 
    680         p.join()
    681 
    682     def test_qsize(self):
    683         q = self.Queue()
    684         try:
    685             self.assertEqual(q.qsize(), 0)
    686         except NotImplementedError:
    687             self.skipTest('qsize method not implemented')
    688         q.put(1)
    689         self.assertEqual(q.qsize(), 1)
    690         q.put(5)
    691         self.assertEqual(q.qsize(), 2)
    692         q.get()
    693         self.assertEqual(q.qsize(), 1)
    694         q.get()
    695         self.assertEqual(q.qsize(), 0)
    696 
    697     @classmethod
    698     def _test_task_done(cls, q):
    699         for obj in iter(q.get, None):
    700             time.sleep(DELTA)
    701             q.task_done()
    702 
    703     def test_task_done(self):
    704         queue = self.JoinableQueue()
    705 
    706         workers = [self.Process(target=self._test_task_done, args=(queue,))
    707                    for i in range(4)]
    708 
    709         for p in workers:
    710             p.daemon = True
    711             p.start()
    712 
    713         for i in range(10):
    714             queue.put(i)
    715 
    716         queue.join()
    717 
    718         for p in workers:
    719             queue.put(None)
    720 
    721         for p in workers:
    722             p.join()
    723 
    724     def test_no_import_lock_contention(self):
    725         with test.support.temp_cwd():
    726             module_name = 'imported_by_an_imported_module'
    727             with open(module_name + '.py', 'w') as f:
    728                 f.write("""if 1:
    729                     import multiprocessing
    730 
    731                     q = multiprocessing.Queue()
    732                     q.put('knock knock')
    733                     q.get(timeout=3)
    734                     q.close()
    735                     del q
    736                 """)
    737 
    738             with test.support.DirsOnSysPath(os.getcwd()):
    739                 try:
    740                     __import__(module_name)
    741                 except pyqueue.Empty:
    742                     self.fail("Probable regression on import lock contention;"
    743                               " see Issue #22853")
    744 
    745     def test_timeout(self):
    746         q = multiprocessing.Queue()
    747         start = time.time()
    748         self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
    749         delta = time.time() - start
    750         # Tolerate a delta of 30 ms because of the bad clock resolution on
    751         # Windows (usually 15.6 ms)
    752         self.assertGreaterEqual(delta, 0.170)
    753 
    754 #
    755 #
    756 #
    757 
    758 class _TestLock(BaseTestCase):
    759 
    760     def test_lock(self):
    761         lock = self.Lock()
    762         self.assertEqual(lock.acquire(), True)
    763         self.assertEqual(lock.acquire(False), False)
    764         self.assertEqual(lock.release(), None)
    765         self.assertRaises((ValueError, threading.ThreadError), lock.release)
    766 
    767     def test_rlock(self):
    768         lock = self.RLock()
    769         self.assertEqual(lock.acquire(), True)
    770         self.assertEqual(lock.acquire(), True)
    771         self.assertEqual(lock.acquire(), True)
    772         self.assertEqual(lock.release(), None)
    773         self.assertEqual(lock.release(), None)
    774         self.assertEqual(lock.release(), None)
    775         self.assertRaises((AssertionError, RuntimeError), lock.release)
    776 
    777     def test_lock_context(self):
    778         with self.Lock():
    779             pass
    780 
    781 
    782 class _TestSemaphore(BaseTestCase):
    783 
    784     def _test_semaphore(self, sem):
    785         self.assertReturnsIfImplemented(2, get_value, sem)
    786         self.assertEqual(sem.acquire(), True)
    787         self.assertReturnsIfImplemented(1, get_value, sem)
    788         self.assertEqual(sem.acquire(), True)
    789         self.assertReturnsIfImplemented(0, get_value, sem)
    790         self.assertEqual(sem.acquire(False), False)
    791         self.assertReturnsIfImplemented(0, get_value, sem)
    792         self.assertEqual(sem.release(), None)
    793         self.assertReturnsIfImplemented(1, get_value, sem)
    794         self.assertEqual(sem.release(), None)
    795         self.assertReturnsIfImplemented(2, get_value, sem)
    796 
    797     def test_semaphore(self):
    798         sem = self.Semaphore(2)
    799         self._test_semaphore(sem)
    800         self.assertEqual(sem.release(), None)
    801         self.assertReturnsIfImplemented(3, get_value, sem)
    802         self.assertEqual(sem.release(), None)
    803         self.assertReturnsIfImplemented(4, get_value, sem)
    804 
    805     def test_bounded_semaphore(self):
    806         sem = self.BoundedSemaphore(2)
    807         self._test_semaphore(sem)
    808         # Currently fails on OS/X
    809         #if HAVE_GETVALUE:
    810         #    self.assertRaises(ValueError, sem.release)
    811         #    self.assertReturnsIfImplemented(2, get_value, sem)
    812 
    813     def test_timeout(self):
    814         if self.TYPE != 'processes':
    815             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    816 
    817         sem = self.Semaphore(0)
    818         acquire = TimingWrapper(sem.acquire)
    819 
    820         self.assertEqual(acquire(False), False)
    821         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
    822 
    823         self.assertEqual(acquire(False, None), False)
    824         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
    825 
    826         self.assertEqual(acquire(False, TIMEOUT1), False)
    827         self.assertTimingAlmostEqual(acquire.elapsed, 0)
    828 
    829         self.assertEqual(acquire(True, TIMEOUT2), False)
    830         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
    831 
    832         self.assertEqual(acquire(timeout=TIMEOUT3), False)
    833         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
    834 
    835 
    836 class _TestCondition(BaseTestCase):
    837 
    838     @classmethod
    839     def f(cls, cond, sleeping, woken, timeout=None):
    840         cond.acquire()
    841         sleeping.release()
    842         cond.wait(timeout)
    843         woken.release()
    844         cond.release()
    845 
    846     def check_invariant(self, cond):
    847         # this is only supposed to succeed when there are no sleepers
    848         if self.TYPE == 'processes':
    849             try:
    850                 sleepers = (cond._sleeping_count.get_value() -
    851                             cond._woken_count.get_value())
    852                 self.assertEqual(sleepers, 0)
    853                 self.assertEqual(cond._wait_semaphore.get_value(), 0)
    854             except NotImplementedError:
    855                 pass
    856 
    857     def test_notify(self):
    858         cond = self.Condition()
    859         sleeping = self.Semaphore(0)
    860         woken = self.Semaphore(0)
    861 
    862         p = self.Process(target=self.f, args=(cond, sleeping, woken))
    863         p.daemon = True
    864         p.start()
    865 
    866         p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
    867         p.daemon = True
    868         p.start()
    869 
    870         # wait for both children to start sleeping
    871         sleeping.acquire()
    872         sleeping.acquire()
    873 
    874         # check no process/thread has woken up
    875         time.sleep(DELTA)
    876         self.assertReturnsIfImplemented(0, get_value, woken)
    877 
    878         # wake up one process/thread
    879         cond.acquire()
    880         cond.notify()
    881         cond.release()
    882 
    883         # check one process/thread has woken up
    884         time.sleep(DELTA)
    885         self.assertReturnsIfImplemented(1, get_value, woken)
    886 
    887         # wake up another
    888         cond.acquire()
    889         cond.notify()
    890         cond.release()
    891 
    892         # check other has woken up
    893         time.sleep(DELTA)
    894         self.assertReturnsIfImplemented(2, get_value, woken)
    895 
    896         # check state is not mucked up
    897         self.check_invariant(cond)
    898         p.join()
    899 
    900     def test_notify_all(self):
    901         cond = self.Condition()
    902         sleeping = self.Semaphore(0)
    903         woken = self.Semaphore(0)
    904 
    905         # start some threads/processes which will timeout
    906         for i in range(3):
    907             p = self.Process(target=self.f,
    908                              args=(cond, sleeping, woken, TIMEOUT1))
    909             p.daemon = True
    910             p.start()
    911 
    912             t = threading.Thread(target=self.f,
    913                                  args=(cond, sleeping, woken, TIMEOUT1))
    914             t.daemon = True
    915             t.start()
    916 
    917         # wait for them all to sleep
    918         for i in range(6):
    919             sleeping.acquire()
    920 
    921         # check they have all timed out
    922         for i in range(6):
    923             woken.acquire()
    924         self.assertReturnsIfImplemented(0, get_value, woken)
    925 
    926         # check state is not mucked up
    927         self.check_invariant(cond)
    928 
    929         # start some more threads/processes
    930         for i in range(3):
    931             p = self.Process(target=self.f, args=(cond, sleeping, woken))
    932             p.daemon = True
    933             p.start()
    934 
    935             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
    936             t.daemon = True
    937             t.start()
    938 
    939         # wait for them to all sleep
    940         for i in range(6):
    941             sleeping.acquire()
    942 
    943         # check no process/thread has woken up
    944         time.sleep(DELTA)
    945         self.assertReturnsIfImplemented(0, get_value, woken)
    946 
    947         # wake them all up
    948         cond.acquire()
    949         cond.notify_all()
    950         cond.release()
    951 
    952         # check they have all woken
    953         for i in range(10):
    954             try:
    955                 if get_value(woken) == 6:
    956                     break
    957             except NotImplementedError:
    958                 break
    959             time.sleep(DELTA)
    960         self.assertReturnsIfImplemented(6, get_value, woken)
    961 
    962         # check state is not mucked up
    963         self.check_invariant(cond)
    964 
    965     def test_timeout(self):
    966         cond = self.Condition()
    967         wait = TimingWrapper(cond.wait)
    968         cond.acquire()
    969         res = wait(TIMEOUT1)
    970         cond.release()
    971         self.assertEqual(res, False)
    972         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
    973 
    974     @classmethod
    975     def _test_waitfor_f(cls, cond, state):
    976         with cond:
    977             state.value = 0
    978             cond.notify()
    979             result = cond.wait_for(lambda : state.value==4)
    980             if not result or state.value != 4:
    981                 sys.exit(1)
    982 
    983     @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
    984     def test_waitfor(self):
    985         # based on test in test/lock_tests.py
    986         cond = self.Condition()
    987         state = self.Value('i', -1)
    988 
    989         p = self.Process(target=self._test_waitfor_f, args=(cond, state))
    990         p.daemon = True
    991         p.start()
    992 
    993         with cond:
    994             result = cond.wait_for(lambda : state.value==0)
    995             self.assertTrue(result)
    996             self.assertEqual(state.value, 0)
    997 
    998         for i in range(4):
    999             time.sleep(0.01)
   1000             with cond:
   1001                 state.value += 1
   1002                 cond.notify()
   1003 
   1004         p.join(5)
   1005         self.assertFalse(p.is_alive())
   1006         self.assertEqual(p.exitcode, 0)
   1007 
   1008     @classmethod
   1009     def _test_waitfor_timeout_f(cls, cond, state, success, sem):
   1010         sem.release()
   1011         with cond:
   1012             expected = 0.1
   1013             dt = time.time()
   1014             result = cond.wait_for(lambda : state.value==4, timeout=expected)
   1015             dt = time.time() - dt
   1016             # borrow logic in assertTimeout() from test/lock_tests.py
   1017             if not result and expected * 0.6 < dt < expected * 10.0:
   1018                 success.value = True
   1019 
   1020     @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
   1021     def test_waitfor_timeout(self):
   1022         # based on test in test/lock_tests.py
   1023         cond = self.Condition()
   1024         state = self.Value('i', 0)
   1025         success = self.Value('i', False)
   1026         sem = self.Semaphore(0)
   1027 
   1028         p = self.Process(target=self._test_waitfor_timeout_f,
   1029                          args=(cond, state, success, sem))
   1030         p.daemon = True
   1031         p.start()
   1032         self.assertTrue(sem.acquire(timeout=10))
   1033 
   1034         # Only increment 3 times, so state == 4 is never reached.
   1035         for i in range(3):
   1036             time.sleep(0.01)
   1037             with cond:
   1038                 state.value += 1
   1039                 cond.notify()
   1040 
   1041         p.join(5)
   1042         self.assertTrue(success.value)
   1043 
   1044     @classmethod
   1045     def _test_wait_result(cls, c, pid):
   1046         with c:
   1047             c.notify()
   1048         time.sleep(1)
   1049         if pid is not None:
   1050             os.kill(pid, signal.SIGINT)
   1051 
   1052     def test_wait_result(self):
   1053         if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
   1054             pid = os.getpid()
   1055         else:
   1056             pid = None
   1057 
   1058         c = self.Condition()
   1059         with c:
   1060             self.assertFalse(c.wait(0))
   1061             self.assertFalse(c.wait(0.1))
   1062 
   1063             p = self.Process(target=self._test_wait_result, args=(c, pid))
   1064             p.start()
   1065 
   1066             self.assertTrue(c.wait(10))
   1067             if pid is not None:
   1068                 self.assertRaises(KeyboardInterrupt, c.wait, 10)
   1069 
   1070             p.join()
   1071 
   1072 
   1073 class _TestEvent(BaseTestCase):
   1074 
   1075     @classmethod
   1076     def _test_event(cls, event):
   1077         time.sleep(TIMEOUT2)
   1078         event.set()
   1079 
   1080     def test_event(self):
   1081         event = self.Event()
   1082         wait = TimingWrapper(event.wait)
   1083 
   1084         # Removed temporarily, due to API shear, this does not
   1085         # work with threading._Event objects. is_set == isSet
   1086         self.assertEqual(event.is_set(), False)
   1087 
   1088         # Removed, threading.Event.wait() will return the value of the __flag
   1089         # instead of None. API Shear with the semaphore backed mp.Event
   1090         self.assertEqual(wait(0.0), False)
   1091         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
   1092         self.assertEqual(wait(TIMEOUT1), False)
   1093         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
   1094 
   1095         event.set()
   1096 
   1097         # See note above on the API differences
   1098         self.assertEqual(event.is_set(), True)
   1099         self.assertEqual(wait(), True)
   1100         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
   1101         self.assertEqual(wait(TIMEOUT1), True)
   1102         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
   1103         # self.assertEqual(event.is_set(), True)
   1104 
   1105         event.clear()
   1106 
   1107         #self.assertEqual(event.is_set(), False)
   1108 
   1109         p = self.Process(target=self._test_event, args=(event,))
   1110         p.daemon = True
   1111         p.start()
   1112         self.assertEqual(wait(), True)
   1113 
   1114 #
   1115 # Tests for Barrier - adapted from tests in test/lock_tests.py
   1116 #
   1117 
   1118 # Many of the tests for threading.Barrier use a list as an atomic
   1119 # counter: a value is appended to increment the counter, and the
   1120 # length of the list gives the value.  We use the class DummyList
   1121 # for the same purpose.
   1122 
   1123 class _DummyList(object):
   1124 
   1125     def __init__(self):
   1126         wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
   1127         lock = multiprocessing.Lock()
   1128         self.__setstate__((wrapper, lock))
   1129         self._lengthbuf[0] = 0
   1130 
   1131     def __setstate__(self, state):
   1132         (self._wrapper, self._lock) = state
   1133         self._lengthbuf = self._wrapper.create_memoryview().cast('i')
   1134 
   1135     def __getstate__(self):
   1136         return (self._wrapper, self._lock)
   1137 
   1138     def append(self, _):
   1139         with self._lock:
   1140             self._lengthbuf[0] += 1
   1141 
   1142     def __len__(self):
   1143         with self._lock:
   1144             return self._lengthbuf[0]
   1145 
   1146 def _wait():
   1147     # A crude wait/yield function not relying on synchronization primitives.
   1148     time.sleep(0.01)
   1149 
   1150 
   1151 class Bunch(object):
   1152     """
   1153     A bunch of threads.
   1154     """
   1155     def __init__(self, namespace, f, args, n, wait_before_exit=False):
   1156         """
   1157         Construct a bunch of `n` threads running the same function `f`.
   1158         If `wait_before_exit` is True, the threads won't terminate until
   1159         do_finish() is called.
   1160         """
   1161         self.f = f
   1162         self.args = args
   1163         self.n = n
   1164         self.started = namespace.DummyList()
   1165         self.finished = namespace.DummyList()
   1166         self._can_exit = namespace.Event()
   1167         if not wait_before_exit:
   1168             self._can_exit.set()
   1169         for i in range(n):
   1170             p = namespace.Process(target=self.task)
   1171             p.daemon = True
   1172             p.start()
   1173 
   1174     def task(self):
   1175         pid = os.getpid()
   1176         self.started.append(pid)
   1177         try:
   1178             self.f(*self.args)
   1179         finally:
   1180             self.finished.append(pid)
   1181             self._can_exit.wait(30)
   1182             assert self._can_exit.is_set()
   1183 
   1184     def wait_for_started(self):
   1185         while len(self.started) < self.n:
   1186             _wait()
   1187 
   1188     def wait_for_finished(self):
   1189         while len(self.finished) < self.n:
   1190             _wait()
   1191 
   1192     def do_finish(self):
   1193         self._can_exit.set()
   1194 
   1195 
   1196 class AppendTrue(object):
   1197     def __init__(self, obj):
   1198         self.obj = obj
   1199     def __call__(self):
   1200         self.obj.append(True)
   1201 
   1202 
   1203 class _TestBarrier(BaseTestCase):
   1204     """
   1205     Tests for Barrier objects.
   1206     """
   1207     N = 5
   1208     defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
   1209 
   1210     def setUp(self):
   1211         self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
   1212 
   1213     def tearDown(self):
   1214         self.barrier.abort()
   1215         self.barrier = None
   1216 
   1217     def DummyList(self):
   1218         if self.TYPE == 'threads':
   1219             return []
   1220         elif self.TYPE == 'manager':
   1221             return self.manager.list()
   1222         else:
   1223             return _DummyList()
   1224 
   1225     def run_threads(self, f, args):
   1226         b = Bunch(self, f, args, self.N-1)
   1227         f(*args)
   1228         b.wait_for_finished()
   1229 
   1230     @classmethod
   1231     def multipass(cls, barrier, results, n):
   1232         m = barrier.parties
   1233         assert m == cls.N
   1234         for i in range(n):
   1235             results[0].append(True)
   1236             assert len(results[1]) == i * m
   1237             barrier.wait()
   1238             results[1].append(True)
   1239             assert len(results[0]) == (i + 1) * m
   1240             barrier.wait()
   1241         try:
   1242             assert barrier.n_waiting == 0
   1243         except NotImplementedError:
   1244             pass
   1245         assert not barrier.broken
   1246 
   1247     def test_barrier(self, passes=1):
   1248         """
   1249         Test that a barrier is passed in lockstep
   1250         """
   1251         results = [self.DummyList(), self.DummyList()]
   1252         self.run_threads(self.multipass, (self.barrier, results, passes))
   1253 
   1254     def test_barrier_10(self):
   1255         """
   1256         Test that a barrier works for 10 consecutive runs
   1257         """
   1258         return self.test_barrier(10)
   1259 
   1260     @classmethod
   1261     def _test_wait_return_f(cls, barrier, queue):
   1262         res = barrier.wait()
   1263         queue.put(res)
   1264 
   1265     def test_wait_return(self):
   1266         """
   1267         test the return value from barrier.wait
   1268         """
   1269         queue = self.Queue()
   1270         self.run_threads(self._test_wait_return_f, (self.barrier, queue))
   1271         results = [queue.get() for i in range(self.N)]
   1272         self.assertEqual(results.count(0), 1)
   1273 
   1274     @classmethod
   1275     def _test_action_f(cls, barrier, results):
   1276         barrier.wait()
   1277         if len(results) != 1:
   1278             raise RuntimeError
   1279 
   1280     def test_action(self):
   1281         """
   1282         Test the 'action' callback
   1283         """
   1284         results = self.DummyList()
   1285         barrier = self.Barrier(self.N, action=AppendTrue(results))
   1286         self.run_threads(self._test_action_f, (barrier, results))
   1287         self.assertEqual(len(results), 1)
   1288 
   1289     @classmethod
   1290     def _test_abort_f(cls, barrier, results1, results2):
   1291         try:
   1292             i = barrier.wait()
   1293             if i == cls.N//2:
   1294                 raise RuntimeError
   1295             barrier.wait()
   1296             results1.append(True)
   1297         except threading.BrokenBarrierError:
   1298             results2.append(True)
   1299         except RuntimeError:
   1300             barrier.abort()
   1301 
   1302     def test_abort(self):
   1303         """
   1304         Test that an abort will put the barrier in a broken state
   1305         """
   1306         results1 = self.DummyList()
   1307         results2 = self.DummyList()
   1308         self.run_threads(self._test_abort_f,
   1309                          (self.barrier, results1, results2))
   1310         self.assertEqual(len(results1), 0)
   1311         self.assertEqual(len(results2), self.N-1)
   1312         self.assertTrue(self.barrier.broken)
   1313 
   1314     @classmethod
   1315     def _test_reset_f(cls, barrier, results1, results2, results3):
   1316         i = barrier.wait()
   1317         if i == cls.N//2:
   1318             # Wait until the other threads are all in the barrier.
   1319             while barrier.n_waiting < cls.N-1:
   1320                 time.sleep(0.001)
   1321             barrier.reset()
   1322         else:
   1323             try:
   1324                 barrier.wait()
   1325                 results1.append(True)
   1326             except threading.BrokenBarrierError:
   1327                 results2.append(True)
   1328         # Now, pass the barrier again
   1329         barrier.wait()
   1330         results3.append(True)
   1331 
   1332     def test_reset(self):
   1333         """
   1334         Test that a 'reset' on a barrier frees the waiting threads
   1335         """
   1336         results1 = self.DummyList()
   1337         results2 = self.DummyList()
   1338         results3 = self.DummyList()
   1339         self.run_threads(self._test_reset_f,
   1340                          (self.barrier, results1, results2, results3))
   1341         self.assertEqual(len(results1), 0)
   1342         self.assertEqual(len(results2), self.N-1)
   1343         self.assertEqual(len(results3), self.N)
   1344 
   1345     @classmethod
   1346     def _test_abort_and_reset_f(cls, barrier, barrier2,
   1347                                 results1, results2, results3):
   1348         try:
   1349             i = barrier.wait()
   1350             if i == cls.N//2:
   1351                 raise RuntimeError
   1352             barrier.wait()
   1353             results1.append(True)
   1354         except threading.BrokenBarrierError:
   1355             results2.append(True)
   1356         except RuntimeError:
   1357             barrier.abort()
   1358         # Synchronize and reset the barrier.  Must synchronize first so
   1359         # that everyone has left it when we reset, and after so that no
   1360         # one enters it before the reset.
   1361         if barrier2.wait() == cls.N//2:
   1362             barrier.reset()
   1363         barrier2.wait()
   1364         barrier.wait()
   1365         results3.append(True)
   1366 
   1367     def test_abort_and_reset(self):
   1368         """
   1369         Test that a barrier can be reset after being broken.
   1370         """
   1371         results1 = self.DummyList()
   1372         results2 = self.DummyList()
   1373         results3 = self.DummyList()
   1374         barrier2 = self.Barrier(self.N)
   1375 
   1376         self.run_threads(self._test_abort_and_reset_f,
   1377                          (self.barrier, barrier2, results1, results2, results3))
   1378         self.assertEqual(len(results1), 0)
   1379         self.assertEqual(len(results2), self.N-1)
   1380         self.assertEqual(len(results3), self.N)
   1381 
   1382     @classmethod
   1383     def _test_timeout_f(cls, barrier, results):
   1384         i = barrier.wait()
   1385         if i == cls.N//2:
   1386             # One thread is late!
   1387             time.sleep(1.0)
   1388         try:
   1389             barrier.wait(0.5)
   1390         except threading.BrokenBarrierError:
   1391             results.append(True)
   1392 
   1393     def test_timeout(self):
   1394         """
   1395         Test wait(timeout)
   1396         """
   1397         results = self.DummyList()
   1398         self.run_threads(self._test_timeout_f, (self.barrier, results))
   1399         self.assertEqual(len(results), self.barrier.parties)
   1400 
   1401     @classmethod
   1402     def _test_default_timeout_f(cls, barrier, results):
   1403         i = barrier.wait(cls.defaultTimeout)
   1404         if i == cls.N//2:
   1405             # One thread is later than the default timeout
   1406             time.sleep(1.0)
   1407         try:
   1408             barrier.wait()
   1409         except threading.BrokenBarrierError:
   1410             results.append(True)
   1411 
   1412     def test_default_timeout(self):
   1413         """
   1414         Test the barrier's default timeout
   1415         """
   1416         barrier = self.Barrier(self.N, timeout=0.5)
   1417         results = self.DummyList()
   1418         self.run_threads(self._test_default_timeout_f, (barrier, results))
   1419         self.assertEqual(len(results), barrier.parties)
   1420 
   1421     def test_single_thread(self):
   1422         b = self.Barrier(1)
   1423         b.wait()
   1424         b.wait()
   1425 
   1426     @classmethod
   1427     def _test_thousand_f(cls, barrier, passes, conn, lock):
   1428         for i in range(passes):
   1429             barrier.wait()
   1430             with lock:
   1431                 conn.send(i)
   1432 
   1433     def test_thousand(self):
   1434         if self.TYPE == 'manager':
   1435             self.skipTest('test not appropriate for {}'.format(self.TYPE))
   1436         passes = 1000
   1437         lock = self.Lock()
   1438         conn, child_conn = self.Pipe(False)
   1439         for j in range(self.N):
   1440             p = self.Process(target=self._test_thousand_f,
   1441                            args=(self.barrier, passes, child_conn, lock))
   1442             p.start()
   1443 
   1444         for i in range(passes):
   1445             for j in range(self.N):
   1446                 self.assertEqual(conn.recv(), i)
   1447 
   1448 #
   1449 #
   1450 #
   1451 
   1452 class _TestValue(BaseTestCase):
   1453 
   1454     ALLOWED_TYPES = ('processes',)
   1455 
   1456     codes_values = [
   1457         ('i', 4343, 24234),
   1458         ('d', 3.625, -4.25),
   1459         ('h', -232, 234),
   1460         ('c', latin('x'), latin('y'))
   1461         ]
   1462 
   1463     def setUp(self):
   1464         if not HAS_SHAREDCTYPES:
   1465             self.skipTest("requires multiprocessing.sharedctypes")
   1466 
   1467     @classmethod
   1468     def _test(cls, values):
   1469         for sv, cv in zip(values, cls.codes_values):
   1470             sv.value = cv[2]
   1471 
   1472 
   1473     def test_value(self, raw=False):
   1474         if raw:
   1475             values = [self.RawValue(code, value)
   1476                       for code, value, _ in self.codes_values]
   1477         else:
   1478             values = [self.Value(code, value)
   1479                       for code, value, _ in self.codes_values]
   1480 
   1481         for sv, cv in zip(values, self.codes_values):
   1482             self.assertEqual(sv.value, cv[1])
   1483 
   1484         proc = self.Process(target=self._test, args=(values,))
   1485         proc.daemon = True
   1486         proc.start()
   1487         proc.join()
   1488 
   1489         for sv, cv in zip(values, self.codes_values):
   1490             self.assertEqual(sv.value, cv[2])
   1491 
   1492     def test_rawvalue(self):
   1493         self.test_value(raw=True)
   1494 
   1495     def test_getobj_getlock(self):
   1496         val1 = self.Value('i', 5)
   1497         lock1 = val1.get_lock()
   1498         obj1 = val1.get_obj()
   1499 
   1500         val2 = self.Value('i', 5, lock=None)
   1501         lock2 = val2.get_lock()
   1502         obj2 = val2.get_obj()
   1503 
   1504         lock = self.Lock()
   1505         val3 = self.Value('i', 5, lock=lock)
   1506         lock3 = val3.get_lock()
   1507         obj3 = val3.get_obj()
   1508         self.assertEqual(lock, lock3)
   1509 
   1510         arr4 = self.Value('i', 5, lock=False)
   1511         self.assertFalse(hasattr(arr4, 'get_lock'))
   1512         self.assertFalse(hasattr(arr4, 'get_obj'))
   1513 
   1514         self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
   1515 
   1516         arr5 = self.RawValue('i', 5)
   1517         self.assertFalse(hasattr(arr5, 'get_lock'))
   1518         self.assertFalse(hasattr(arr5, 'get_obj'))
   1519 
   1520 
   1521 class _TestArray(BaseTestCase):
   1522 
   1523     ALLOWED_TYPES = ('processes',)
   1524 
   1525     @classmethod
   1526     def f(cls, seq):
   1527         for i in range(1, len(seq)):
   1528             seq[i] += seq[i-1]
   1529 
   1530     @unittest.skipIf(c_int is None, "requires _ctypes")
   1531     def test_array(self, raw=False):
   1532         seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
   1533         if raw:
   1534             arr = self.RawArray('i', seq)
   1535         else:
   1536             arr = self.Array('i', seq)
   1537 
   1538         self.assertEqual(len(arr), len(seq))
   1539         self.assertEqual(arr[3], seq[3])
   1540         self.assertEqual(list(arr[2:7]), list(seq[2:7]))
   1541 
   1542         arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
   1543 
   1544         self.assertEqual(list(arr[:]), seq)
   1545 
   1546         self.f(seq)
   1547 
   1548         p = self.Process(target=self.f, args=(arr,))
   1549         p.daemon = True
   1550         p.start()
   1551         p.join()
   1552 
   1553         self.assertEqual(list(arr[:]), seq)
   1554 
   1555     @unittest.skipIf(c_int is None, "requires _ctypes")
   1556     def test_array_from_size(self):
   1557         size = 10
   1558         # Test for zeroing (see issue #11675).
   1559         # The repetition below strengthens the test by increasing the chances
   1560         # of previously allocated non-zero memory being used for the new array
   1561         # on the 2nd and 3rd loops.
   1562         for _ in range(3):
   1563             arr = self.Array('i', size)
   1564             self.assertEqual(len(arr), size)
   1565             self.assertEqual(list(arr), [0] * size)
   1566             arr[:] = range(10)
   1567             self.assertEqual(list(arr), list(range(10)))
   1568             del arr
   1569 
   1570     @unittest.skipIf(c_int is None, "requires _ctypes")
   1571     def test_rawarray(self):
   1572         self.test_array(raw=True)
   1573 
   1574     @unittest.skipIf(c_int is None, "requires _ctypes")
   1575     def test_getobj_getlock_obj(self):
   1576         arr1 = self.Array('i', list(range(10)))
   1577         lock1 = arr1.get_lock()
   1578         obj1 = arr1.get_obj()
   1579 
   1580         arr2 = self.Array('i', list(range(10)), lock=None)
   1581         lock2 = arr2.get_lock()
   1582         obj2 = arr2.get_obj()
   1583 
   1584         lock = self.Lock()
   1585         arr3 = self.Array('i', list(range(10)), lock=lock)
   1586         lock3 = arr3.get_lock()
   1587         obj3 = arr3.get_obj()
   1588         self.assertEqual(lock, lock3)
   1589 
   1590         arr4 = self.Array('i', range(10), lock=False)
   1591         self.assertFalse(hasattr(arr4, 'get_lock'))
   1592         self.assertFalse(hasattr(arr4, 'get_obj'))
   1593         self.assertRaises(AttributeError,
   1594                           self.Array, 'i', range(10), lock='notalock')
   1595 
   1596         arr5 = self.RawArray('i', range(10))
   1597         self.assertFalse(hasattr(arr5, 'get_lock'))
   1598         self.assertFalse(hasattr(arr5, 'get_obj'))
   1599 
   1600 #
   1601 #
   1602 #
   1603 
   1604 class _TestContainers(BaseTestCase):
   1605 
   1606     ALLOWED_TYPES = ('manager',)
   1607 
   1608     def test_list(self):
   1609         a = self.list(list(range(10)))
   1610         self.assertEqual(a[:], list(range(10)))
   1611 
   1612         b = self.list()
   1613         self.assertEqual(b[:], [])
   1614 
   1615         b.extend(list(range(5)))
   1616         self.assertEqual(b[:], list(range(5)))
   1617 
   1618         self.assertEqual(b[2], 2)
   1619         self.assertEqual(b[2:10], [2,3,4])
   1620 
   1621         b *= 2
   1622         self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
   1623 
   1624         self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
   1625 
   1626         self.assertEqual(a[:], list(range(10)))
   1627 
   1628         d = [a, b]
   1629         e = self.list(d)
   1630         self.assertEqual(
   1631             [element[:] for element in e],
   1632             [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
   1633             )
   1634 
   1635         f = self.list([a])
   1636         a.append('hello')
   1637         self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
   1638 
   1639     def test_list_proxy_in_list(self):
   1640         a = self.list([self.list(range(3)) for _i in range(3)])
   1641         self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
   1642 
   1643         a[0][-1] = 55
   1644         self.assertEqual(a[0][:], [0, 1, 55])
   1645         for i in range(1, 3):
   1646             self.assertEqual(a[i][:], [0, 1, 2])
   1647 
   1648         self.assertEqual(a[1].pop(), 2)
   1649         self.assertEqual(len(a[1]), 2)
   1650         for i in range(0, 3, 2):
   1651             self.assertEqual(len(a[i]), 3)
   1652 
   1653         del a
   1654 
   1655         b = self.list()
   1656         b.append(b)
   1657         del b
   1658 
   1659     def test_dict(self):
   1660         d = self.dict()
   1661         indices = list(range(65, 70))
   1662         for i in indices:
   1663             d[i] = chr(i)
   1664         self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
   1665         self.assertEqual(sorted(d.keys()), indices)
   1666         self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
   1667         self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
   1668 
   1669     def test_dict_proxy_nested(self):
   1670         pets = self.dict(ferrets=2, hamsters=4)
   1671         supplies = self.dict(water=10, feed=3)
   1672         d = self.dict(pets=pets, supplies=supplies)
   1673 
   1674         self.assertEqual(supplies['water'], 10)
   1675         self.assertEqual(d['supplies']['water'], 10)
   1676 
   1677         d['supplies']['blankets'] = 5
   1678         self.assertEqual(supplies['blankets'], 5)
   1679         self.assertEqual(d['supplies']['blankets'], 5)
   1680 
   1681         d['supplies']['water'] = 7
   1682         self.assertEqual(supplies['water'], 7)
   1683         self.assertEqual(d['supplies']['water'], 7)
   1684 
   1685         del pets
   1686         del supplies
   1687         self.assertEqual(d['pets']['ferrets'], 2)
   1688         d['supplies']['blankets'] = 11
   1689         self.assertEqual(d['supplies']['blankets'], 11)
   1690 
   1691         pets = d['pets']
   1692         supplies = d['supplies']
   1693         supplies['water'] = 7
   1694         self.assertEqual(supplies['water'], 7)
   1695         self.assertEqual(d['supplies']['water'], 7)
   1696 
   1697         d.clear()
   1698         self.assertEqual(len(d), 0)
   1699         self.assertEqual(supplies['water'], 7)
   1700         self.assertEqual(pets['hamsters'], 4)
   1701 
   1702         l = self.list([pets, supplies])
   1703         l[0]['marmots'] = 1
   1704         self.assertEqual(pets['marmots'], 1)
   1705         self.assertEqual(l[0]['marmots'], 1)
   1706 
   1707         del pets
   1708         del supplies
   1709         self.assertEqual(l[0]['marmots'], 1)
   1710 
   1711         outer = self.list([[88, 99], l])
   1712         self.assertIsInstance(outer[0], list)  # Not a ListProxy
   1713         self.assertEqual(outer[-1][-1]['feed'], 3)
   1714 
   1715     def test_namespace(self):
   1716         n = self.Namespace()
   1717         n.name = 'Bob'
   1718         n.job = 'Builder'
   1719         n._hidden = 'hidden'
   1720         self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
   1721         del n.job
   1722         self.assertEqual(str(n), "Namespace(name='Bob')")
   1723         self.assertTrue(hasattr(n, 'name'))
   1724         self.assertTrue(not hasattr(n, 'job'))
   1725 
   1726 #
   1727 #
   1728 #
   1729 
   1730 def sqr(x, wait=0.0):
   1731     time.sleep(wait)
   1732     return x*x
   1733 
   1734 def mul(x, y):
   1735     return x*y
   1736 
   1737 def raise_large_valuerror(wait):
   1738     time.sleep(wait)
   1739     raise ValueError("x" * 1024**2)
   1740 
   1741 class SayWhenError(ValueError): pass
   1742 
   1743 def exception_throwing_generator(total, when):
   1744     for i in range(total):
   1745         if i == when:
   1746             raise SayWhenError("Somebody said when")
   1747         yield i
   1748 
   1749 class _TestPool(BaseTestCase):
   1750 
   1751     @classmethod
   1752     def setUpClass(cls):
   1753         super().setUpClass()
   1754         cls.pool = cls.Pool(4)
   1755 
   1756     @classmethod
   1757     def tearDownClass(cls):
   1758         cls.pool.terminate()
   1759         cls.pool.join()
   1760         cls.pool = None
   1761         super().tearDownClass()
   1762 
   1763     def test_apply(self):
   1764         papply = self.pool.apply
   1765         self.assertEqual(papply(sqr, (5,)), sqr(5))
   1766         self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
   1767 
   1768     def test_map(self):
   1769         pmap = self.pool.map
   1770         self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
   1771         self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
   1772                          list(map(sqr, list(range(100)))))
   1773 
   1774     def test_starmap(self):
   1775         psmap = self.pool.starmap
   1776         tuples = list(zip(range(10), range(9,-1, -1)))
   1777         self.assertEqual(psmap(mul, tuples),
   1778                          list(itertools.starmap(mul, tuples)))
   1779         tuples = list(zip(range(100), range(99,-1, -1)))
   1780         self.assertEqual(psmap(mul, tuples, chunksize=20),
   1781                          list(itertools.starmap(mul, tuples)))
   1782 
   1783     def test_starmap_async(self):
   1784         tuples = list(zip(range(100), range(99,-1, -1)))
   1785         self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
   1786                          list(itertools.starmap(mul, tuples)))
   1787 
   1788     def test_map_async(self):
   1789         self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
   1790                          list(map(sqr, list(range(10)))))
   1791 
   1792     def test_map_async_callbacks(self):
   1793         call_args = self.manager.list() if self.TYPE == 'manager' else []
   1794         self.pool.map_async(int, ['1'],
   1795                             callback=call_args.append,
   1796                             error_callback=call_args.append).wait()
   1797         self.assertEqual(1, len(call_args))
   1798         self.assertEqual([1], call_args[0])
   1799         self.pool.map_async(int, ['a'],
   1800                             callback=call_args.append,
   1801                             error_callback=call_args.append).wait()
   1802         self.assertEqual(2, len(call_args))
   1803         self.assertIsInstance(call_args[1], ValueError)
   1804 
   1805     def test_map_unplicklable(self):
   1806         # Issue #19425 -- failure to pickle should not cause a hang
   1807         if self.TYPE == 'threads':
   1808             self.skipTest('test not appropriate for {}'.format(self.TYPE))
   1809         class A(object):
   1810             def __reduce__(self):
   1811                 raise RuntimeError('cannot pickle')
   1812         with self.assertRaises(RuntimeError):
   1813             self.pool.map(sqr, [A()]*10)
   1814 
   1815     def test_map_chunksize(self):
   1816         try:
   1817             self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
   1818         except multiprocessing.TimeoutError:
   1819             self.fail("pool.map_async with chunksize stalled on null list")
   1820 
   1821     def test_async(self):
   1822         res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
   1823         get = TimingWrapper(res.get)
   1824         self.assertEqual(get(), 49)
   1825         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
   1826 
   1827     def test_async_timeout(self):
   1828         res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
   1829         get = TimingWrapper(res.get)
   1830         self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
   1831         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
   1832 
   1833     def test_imap(self):
   1834         it = self.pool.imap(sqr, list(range(10)))
   1835         self.assertEqual(list(it), list(map(sqr, list(range(10)))))
   1836 
   1837         it = self.pool.imap(sqr, list(range(10)))
   1838         for i in range(10):
   1839             self.assertEqual(next(it), i*i)
   1840         self.assertRaises(StopIteration, it.__next__)
   1841 
   1842         it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
   1843         for i in range(1000):
   1844             self.assertEqual(next(it), i*i)
   1845         self.assertRaises(StopIteration, it.__next__)
   1846 
   1847     def test_imap_handle_iterable_exception(self):
   1848         if self.TYPE == 'manager':
   1849             self.skipTest('test not appropriate for {}'.format(self.TYPE))
   1850 
   1851         it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
   1852         for i in range(3):
   1853             self.assertEqual(next(it), i*i)
   1854         self.assertRaises(SayWhenError, it.__next__)
   1855 
   1856         # SayWhenError seen at start of problematic chunk's results
   1857         it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
   1858         for i in range(6):
   1859             self.assertEqual(next(it), i*i)
   1860         self.assertRaises(SayWhenError, it.__next__)
   1861         it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
   1862         for i in range(4):
   1863             self.assertEqual(next(it), i*i)
   1864         self.assertRaises(SayWhenError, it.__next__)
   1865 
   1866     def test_imap_unordered(self):
   1867         it = self.pool.imap_unordered(sqr, list(range(1000)))
   1868         self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
   1869 
   1870         it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
   1871         self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
   1872 
   1873     def test_imap_unordered_handle_iterable_exception(self):
   1874         if self.TYPE == 'manager':
   1875             self.skipTest('test not appropriate for {}'.format(self.TYPE))
   1876 
   1877         it = self.pool.imap_unordered(sqr,
   1878                                       exception_throwing_generator(10, 3),
   1879                                       1)
   1880         expected_values = list(map(sqr, list(range(10))))
   1881         with self.assertRaises(SayWhenError):
   1882             # imap_unordered makes it difficult to anticipate the SayWhenError
   1883             for i in range(10):
   1884                 value = next(it)
   1885                 self.assertIn(value, expected_values)
   1886                 expected_values.remove(value)
   1887 
   1888         it = self.pool.imap_unordered(sqr,
   1889                                       exception_throwing_generator(20, 7),
   1890                                       2)
   1891         expected_values = list(map(sqr, list(range(20))))
   1892         with self.assertRaises(SayWhenError):
   1893             for i in range(20):
   1894                 value = next(it)
   1895                 self.assertIn(value, expected_values)
   1896                 expected_values.remove(value)
   1897 
   1898     def test_make_pool(self):
   1899         expected_error = (RemoteError if self.TYPE == 'manager'
   1900                           else ValueError)
   1901 
   1902         self.assertRaises(expected_error, self.Pool, -1)
   1903         self.assertRaises(expected_error, self.Pool, 0)
   1904 
   1905         if self.TYPE != 'manager':
   1906             p = self.Pool(3)
   1907             try:
   1908                 self.assertEqual(3, len(p._pool))
   1909             finally:
   1910                 p.close()
   1911                 p.join()
   1912 
   1913     def test_terminate(self):
   1914         result = self.pool.map_async(
   1915             time.sleep, [0.1 for i in range(10000)], chunksize=1
   1916             )
   1917         self.pool.terminate()
   1918         join = TimingWrapper(self.pool.join)
   1919         join()
   1920         # Sanity check the pool didn't wait for all tasks to finish
   1921         self.assertLess(join.elapsed, 2.0)
   1922 
   1923     def test_empty_iterable(self):
   1924         # See Issue 12157
   1925         p = self.Pool(1)
   1926 
   1927         self.assertEqual(p.map(sqr, []), [])
   1928         self.assertEqual(list(p.imap(sqr, [])), [])
   1929         self.assertEqual(list(p.imap_unordered(sqr, [])), [])
   1930         self.assertEqual(p.map_async(sqr, []).get(), [])
   1931 
   1932         p.close()
   1933         p.join()
   1934 
   1935     def test_context(self):
   1936         if self.TYPE == 'processes':
   1937             L = list(range(10))
   1938             expected = [sqr(i) for i in L]
   1939             with self.Pool(2) as p:
   1940                 r = p.map_async(sqr, L)
   1941                 self.assertEqual(r.get(), expected)
   1942             self.assertRaises(ValueError, p.map_async, sqr, L)
   1943 
   1944     @classmethod
   1945     def _test_traceback(cls):
   1946         raise RuntimeError(123) # some comment
   1947 
   1948     def test_traceback(self):
   1949         # We want ensure that the traceback from the child process is
   1950         # contained in the traceback raised in the main process.
   1951         if self.TYPE == 'processes':
   1952             with self.Pool(1) as p:
   1953                 try:
   1954                     p.apply(self._test_traceback)
   1955                 except Exception as e:
   1956                     exc = e
   1957                 else:
   1958                     raise AssertionError('expected RuntimeError')
   1959             self.assertIs(type(exc), RuntimeError)
   1960             self.assertEqual(exc.args, (123,))
   1961             cause = exc.__cause__
   1962             self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
   1963             self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
   1964 
   1965             with test.support.captured_stderr() as f1:
   1966                 try:
   1967                     raise exc
   1968                 except RuntimeError:
   1969                     sys.excepthook(*sys.exc_info())
   1970             self.assertIn('raise RuntimeError(123) # some comment',
   1971                           f1.getvalue())
   1972 
   1973     @classmethod
   1974     def _test_wrapped_exception(cls):
   1975         raise RuntimeError('foo')
   1976 
   1977     def test_wrapped_exception(self):
   1978         # Issue #20980: Should not wrap exception when using thread pool
   1979         with self.Pool(1) as p:
   1980             with self.assertRaises(RuntimeError):
   1981                 p.apply(self._test_wrapped_exception)
   1982 
   1983     def test_map_no_failfast(self):
   1984         # Issue #23992: the fail-fast behaviour when an exception is raised
   1985         # during map() would make Pool.join() deadlock, because a worker
   1986         # process would fill the result queue (after the result handler thread
   1987         # terminated, hence not draining it anymore).
   1988 
   1989         t_start = time.time()
   1990 
   1991         with self.assertRaises(ValueError):
   1992             with self.Pool(2) as p:
   1993                 try:
   1994                     p.map(raise_large_valuerror, [0, 1])
   1995                 finally:
   1996                     time.sleep(0.5)
   1997                     p.close()
   1998                     p.join()
   1999 
   2000         # check that we indeed waited for all jobs
   2001         self.assertGreater(time.time() - t_start, 0.9)
   2002 
   2003 
   2004 def raising():
   2005     raise KeyError("key")
   2006 
   2007 def unpickleable_result():
   2008     return lambda: 42
   2009 
   2010 class _TestPoolWorkerErrors(BaseTestCase):
   2011     ALLOWED_TYPES = ('processes', )
   2012 
   2013     def test_async_error_callback(self):
   2014         p = multiprocessing.Pool(2)
   2015 
   2016         scratchpad = [None]
   2017         def errback(exc):
   2018             scratchpad[0] = exc
   2019 
   2020         res = p.apply_async(raising, error_callback=errback)
   2021         self.assertRaises(KeyError, res.get)
   2022         self.assertTrue(scratchpad[0])
   2023         self.assertIsInstance(scratchpad[0], KeyError)
   2024 
   2025         p.close()
   2026         p.join()
   2027 
   2028     def test_unpickleable_result(self):
   2029         from multiprocessing.pool import MaybeEncodingError
   2030         p = multiprocessing.Pool(2)
   2031 
   2032         # Make sure we don't lose pool processes because of encoding errors.
   2033         for iteration in range(20):
   2034 
   2035             scratchpad = [None]
   2036             def errback(exc):
   2037                 scratchpad[0] = exc
   2038 
   2039             res = p.apply_async(unpickleable_result, error_callback=errback)
   2040             self.assertRaises(MaybeEncodingError, res.get)
   2041             wrapped = scratchpad[0]
   2042             self.assertTrue(wrapped)
   2043             self.assertIsInstance(scratchpad[0], MaybeEncodingError)
   2044             self.assertIsNotNone(wrapped.exc)
   2045             self.assertIsNotNone(wrapped.value)
   2046 
   2047         p.close()
   2048         p.join()
   2049 
   2050 class _TestPoolWorkerLifetime(BaseTestCase):
   2051     ALLOWED_TYPES = ('processes', )
   2052 
   2053     def test_pool_worker_lifetime(self):
   2054         p = multiprocessing.Pool(3, maxtasksperchild=10)
   2055         self.assertEqual(3, len(p._pool))
   2056         origworkerpids = [w.pid for w in p._pool]
   2057         # Run many tasks so each worker gets replaced (hopefully)
   2058         results = []
   2059         for i in range(100):
   2060             results.append(p.apply_async(sqr, (i, )))
   2061         # Fetch the results and verify we got the right answers,
   2062         # also ensuring all the tasks have completed.
   2063         for (j, res) in enumerate(results):
   2064             self.assertEqual(res.get(), sqr(j))
   2065         # Refill the pool
   2066         p._repopulate_pool()
   2067         # Wait until all workers are alive
   2068         # (countdown * DELTA = 5 seconds max startup process time)
   2069         countdown = 50
   2070         while countdown and not all(w.is_alive() for w in p._pool):
   2071             countdown -= 1
   2072             time.sleep(DELTA)
   2073         finalworkerpids = [w.pid for w in p._pool]
   2074         # All pids should be assigned.  See issue #7805.
   2075         self.assertNotIn(None, origworkerpids)
   2076         self.assertNotIn(None, finalworkerpids)
   2077         # Finally, check that the worker pids have changed
   2078         self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
   2079         p.close()
   2080         p.join()
   2081 
   2082     def test_pool_worker_lifetime_early_close(self):
   2083         # Issue #10332: closing a pool whose workers have limited lifetimes
   2084         # before all the tasks completed would make join() hang.
   2085         p = multiprocessing.Pool(3, maxtasksperchild=1)
   2086         results = []
   2087         for i in range(6):
   2088             results.append(p.apply_async(sqr, (i, 0.3)))
   2089         p.close()
   2090         p.join()
   2091         # check the results
   2092         for (j, res) in enumerate(results):
   2093             self.assertEqual(res.get(), sqr(j))
   2094 
   2095 #
   2096 # Test of creating a customized manager class
   2097 #
   2098 
   2099 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
   2100 
   2101 class FooBar(object):
   2102     def f(self):
   2103         return 'f()'
   2104     def g(self):
   2105         raise ValueError
   2106     def _h(self):
   2107         return '_h()'
   2108 
   2109 def baz():
   2110     for i in range(10):
   2111         yield i*i
   2112 
   2113 class IteratorProxy(BaseProxy):
   2114     _exposed_ = ('__next__',)
   2115     def __iter__(self):
   2116         return self
   2117     def __next__(self):
   2118         return self._callmethod('__next__')
   2119 
   2120 class MyManager(BaseManager):
   2121     pass
   2122 
   2123 MyManager.register('Foo', callable=FooBar)
   2124 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
   2125 MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
   2126 
   2127 
   2128 class _TestMyManager(BaseTestCase):
   2129 
   2130     ALLOWED_TYPES = ('manager',)
   2131 
   2132     def test_mymanager(self):
   2133         manager = MyManager()
   2134         manager.start()
   2135         self.common(manager)
   2136         manager.shutdown()
   2137 
   2138         # If the manager process exited cleanly then the exitcode
   2139         # will be zero.  Otherwise (after a short timeout)
   2140         # terminate() is used, resulting in an exitcode of -SIGTERM.
   2141         self.assertEqual(manager._process.exitcode, 0)
   2142 
   2143     def test_mymanager_context(self):
   2144         with MyManager() as manager:
   2145             self.common(manager)
   2146         self.assertEqual(manager._process.exitcode, 0)
   2147 
   2148     def test_mymanager_context_prestarted(self):
   2149         manager = MyManager()
   2150         manager.start()
   2151         with manager:
   2152             self.common(manager)
   2153         self.assertEqual(manager._process.exitcode, 0)
   2154 
   2155     def common(self, manager):
   2156         foo = manager.Foo()
   2157         bar = manager.Bar()
   2158         baz = manager.baz()
   2159 
   2160         foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
   2161         bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
   2162 
   2163         self.assertEqual(foo_methods, ['f', 'g'])
   2164         self.assertEqual(bar_methods, ['f', '_h'])
   2165 
   2166         self.assertEqual(foo.f(), 'f()')
   2167         self.assertRaises(ValueError, foo.g)
   2168         self.assertEqual(foo._callmethod('f'), 'f()')
   2169         self.assertRaises(RemoteError, foo._callmethod, '_h')
   2170 
   2171         self.assertEqual(bar.f(), 'f()')
   2172         self.assertEqual(bar._h(), '_h()')
   2173         self.assertEqual(bar._callmethod('f'), 'f()')
   2174         self.assertEqual(bar._callmethod('_h'), '_h()')
   2175 
   2176         self.assertEqual(list(baz), [i*i for i in range(10)])
   2177 
   2178 
   2179 #
   2180 # Test of connecting to a remote server and using xmlrpclib for serialization
   2181 #
   2182 
   2183 _queue = pyqueue.Queue()
   2184 def get_queue():
   2185     return _queue
   2186 
   2187 class QueueManager(BaseManager):
   2188     '''manager class used by server process'''
   2189 QueueManager.register('get_queue', callable=get_queue)
   2190 
   2191 class QueueManager2(BaseManager):
   2192     '''manager class which specifies the same interface as QueueManager'''
   2193 QueueManager2.register('get_queue')
   2194 
   2195 
   2196 SERIALIZER = 'xmlrpclib'
   2197 
   2198 class _TestRemoteManager(BaseTestCase):
   2199 
   2200     ALLOWED_TYPES = ('manager',)
   2201     values = ['hello world', None, True, 2.25,
   2202               'hall\xe5 v\xe4rlden',
   2203               '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
   2204               b'hall\xe5 v\xe4rlden',
   2205              ]
   2206     result = values[:]
   2207 
   2208     @classmethod
   2209     def _putter(cls, address, authkey):
   2210         manager = QueueManager2(
   2211             address=address, authkey=authkey, serializer=SERIALIZER
   2212             )
   2213         manager.connect()
   2214         queue = manager.get_queue()
   2215         # Note that xmlrpclib will deserialize object as a list not a tuple
   2216         queue.put(tuple(cls.values))
   2217 
   2218     def test_remote(self):
   2219         authkey = os.urandom(32)
   2220 
   2221         manager = QueueManager(
   2222             address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
   2223             )
   2224         manager.start()
   2225 
   2226         p = self.Process(target=self._putter, args=(manager.address, authkey))
   2227         p.daemon = True
   2228         p.start()
   2229 
   2230         manager2 = QueueManager2(
   2231             address=manager.address, authkey=authkey, serializer=SERIALIZER
   2232             )
   2233         manager2.connect()
   2234         queue = manager2.get_queue()
   2235 
   2236         self.assertEqual(queue.get(), self.result)
   2237 
   2238         # Because we are using xmlrpclib for serialization instead of
   2239         # pickle this will cause a serialization error.
   2240         self.assertRaises(Exception, queue.put, time.sleep)
   2241 
   2242         # Make queue finalizer run before the server is stopped
   2243         del queue
   2244         manager.shutdown()
   2245 
   2246 class _TestManagerRestart(BaseTestCase):
   2247 
   2248     @classmethod
   2249     def _putter(cls, address, authkey):
   2250         manager = QueueManager(
   2251             address=address, authkey=authkey, serializer=SERIALIZER)
   2252         manager.connect()
   2253         queue = manager.get_queue()
   2254         queue.put('hello world')
   2255 
   2256     def test_rapid_restart(self):
   2257         authkey = os.urandom(32)
   2258         manager = QueueManager(
   2259             address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
   2260         srvr = manager.get_server()
   2261         addr = srvr.address
   2262         # Close the connection.Listener socket which gets opened as a part
   2263         # of manager.get_server(). It's not needed for the test.
   2264         srvr.listener.close()
   2265         manager.start()
   2266 
   2267         p = self.Process(target=self._putter, args=(manager.address, authkey))
   2268         p.daemon = True
   2269         p.start()
   2270         queue = manager.get_queue()
   2271         self.assertEqual(queue.get(), 'hello world')
   2272         del queue
   2273         manager.shutdown()
   2274         manager = QueueManager(
   2275             address=addr, authkey=authkey, serializer=SERIALIZER)
   2276         try:
   2277             manager.start()
   2278         except OSError as e:
   2279             if e.errno != errno.EADDRINUSE:
   2280                 raise
   2281             # Retry after some time, in case the old socket was lingering
   2282             # (sporadic failure on buildbots)
   2283             time.sleep(1.0)
   2284             manager = QueueManager(
   2285                 address=addr, authkey=authkey, serializer=SERIALIZER)
   2286         manager.shutdown()
   2287 
   2288 #
   2289 #
   2290 #
   2291 
   2292 SENTINEL = latin('')
   2293 
   2294 class _TestConnection(BaseTestCase):
   2295 
   2296     ALLOWED_TYPES = ('processes', 'threads')
   2297 
   2298     @classmethod
   2299     def _echo(cls, conn):
   2300         for msg in iter(conn.recv_bytes, SENTINEL):
   2301             conn.send_bytes(msg)
   2302         conn.close()
   2303 
   2304     def test_connection(self):
   2305         conn, child_conn = self.Pipe()
   2306 
   2307         p = self.Process(target=self._echo, args=(child_conn,))
   2308         p.daemon = True
   2309         p.start()
   2310 
   2311         seq = [1, 2.25, None]
   2312         msg = latin('hello world')
   2313         longmsg = msg * 10
   2314         arr = array.array('i', list(range(4)))
   2315 
   2316         if self.TYPE == 'processes':
   2317             self.assertEqual(type(conn.fileno()), int)
   2318 
   2319         self.assertEqual(conn.send(seq), None)
   2320         self.assertEqual(conn.recv(), seq)
   2321 
   2322         self.assertEqual(conn.send_bytes(msg), None)
   2323         self.assertEqual(conn.recv_bytes(), msg)
   2324 
   2325         if self.TYPE == 'processes':
   2326             buffer = array.array('i', [0]*10)
   2327             expected = list(arr) + [0] * (10 - len(arr))
   2328             self.assertEqual(conn.send_bytes(arr), None)
   2329             self.assertEqual(conn.recv_bytes_into(buffer),
   2330                              len(arr) * buffer.itemsize)
   2331             self.assertEqual(list(buffer), expected)
   2332 
   2333             buffer = array.array('i', [0]*10)
   2334             expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
   2335             self.assertEqual(conn.send_bytes(arr), None)
   2336             self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
   2337                              len(arr) * buffer.itemsize)
   2338             self.assertEqual(list(buffer), expected)
   2339 
   2340             buffer = bytearray(latin(' ' * 40))
   2341             self.assertEqual(conn.send_bytes(longmsg), None)
   2342             try:
   2343                 res = conn.recv_bytes_into(buffer)
   2344             except multiprocessing.BufferTooShort as e:
   2345                 self.assertEqual(e.args, (longmsg,))
   2346             else:
   2347                 self.fail('expected BufferTooShort, got %s' % res)
   2348 
   2349         poll = TimingWrapper(conn.poll)
   2350 
   2351         self.assertEqual(poll(), False)
   2352         self.assertTimingAlmostEqual(poll.elapsed, 0)
   2353 
   2354         self.assertEqual(poll(-1), False)
   2355         self.assertTimingAlmostEqual(poll.elapsed, 0)
   2356 
   2357         self.assertEqual(poll(TIMEOUT1), False)
   2358         self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
   2359 
   2360         conn.send(None)
   2361         time.sleep(.1)
   2362 
   2363         self.assertEqual(poll(TIMEOUT1), True)
   2364         self.assertTimingAlmostEqual(poll.elapsed, 0)
   2365 
   2366         self.assertEqual(conn.recv(), None)
   2367 
   2368         really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
   2369         conn.send_bytes(really_big_msg)
   2370         self.assertEqual(conn.recv_bytes(), really_big_msg)
   2371 
   2372         conn.send_bytes(SENTINEL)                          # tell child to quit
   2373         child_conn.close()
   2374 
   2375         if self.TYPE == 'processes':
   2376             self.assertEqual(conn.readable, True)
   2377             self.assertEqual(conn.writable, True)
   2378             self.assertRaises(EOFError, conn.recv)
   2379             self.assertRaises(EOFError, conn.recv_bytes)
   2380 
   2381         p.join()
   2382 
   2383     def test_duplex_false(self):
   2384         reader, writer = self.Pipe(duplex=False)
   2385         self.assertEqual(writer.send(1), None)
   2386         self.assertEqual(reader.recv(), 1)
   2387         if self.TYPE == 'processes':
   2388             self.assertEqual(reader.readable, True)
   2389             self.assertEqual(reader.writable, False)
   2390             self.assertEqual(writer.readable, False)
   2391             self.assertEqual(writer.writable, True)
   2392             self.assertRaises(OSError, reader.send, 2)
   2393             self.assertRaises(OSError, writer.recv)
   2394             self.assertRaises(OSError, writer.poll)
   2395 
   2396     def test_spawn_close(self):
   2397         # We test that a pipe connection can be closed by parent
   2398         # process immediately after child is spawned.  On Windows this
   2399         # would have sometimes failed on old versions because
   2400         # child_conn would be closed before the child got a chance to
   2401         # duplicate it.
   2402         conn, child_conn = self.Pipe()
   2403 
   2404         p = self.Process(target=self._echo, args=(child_conn,))
   2405         p.daemon = True
   2406         p.start()
   2407         child_conn.close()    # this might complete before child initializes
   2408 
   2409         msg = latin('hello')
   2410         conn.send_bytes(msg)
   2411         self.assertEqual(conn.recv_bytes(), msg)
   2412 
   2413         conn.send_bytes(SENTINEL)
   2414         conn.close()
   2415         p.join()
   2416 
   2417     def test_sendbytes(self):
   2418         if self.TYPE != 'processes':
   2419             self.skipTest('test not appropriate for {}'.format(self.TYPE))
   2420 
   2421         msg = latin('abcdefghijklmnopqrstuvwxyz')
   2422         a, b = self.Pipe()
   2423 
   2424         a.send_bytes(msg)
   2425         self.assertEqual(b.recv_bytes(), msg)
   2426 
   2427         a.send_bytes(msg, 5)
   2428         self.assertEqual(b.recv_bytes(), msg[5:])
   2429 
   2430         a.send_bytes(msg, 7, 8)
   2431         self.assertEqual(b.recv_bytes(), msg[7:7+8])
   2432 
   2433         a.send_bytes(msg, 26)
   2434         self.assertEqual(b.recv_bytes(), latin(''))
   2435 
   2436         a.send_bytes(msg, 26, 0)
   2437         self.assertEqual(b.recv_bytes(), latin(''))
   2438 
   2439         self.assertRaises(ValueError, a.send_bytes, msg, 27)
   2440 
   2441         self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
   2442 
   2443         self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
   2444 
   2445         self.assertRaises(ValueError, a.send_bytes, msg, -1)
   2446 
   2447         self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
   2448 
   2449     @classmethod
   2450     def _is_fd_assigned(cls, fd):
   2451         try:
   2452             os.fstat(fd)
   2453         except OSError as e:
   2454             if e.errno == errno.EBADF:
   2455                 return False
   2456             raise
   2457         else:
   2458             return True
   2459 
   2460     @classmethod
   2461     def _writefd(cls, conn, data, create_dummy_fds=False):
   2462         if create_dummy_fds:
   2463             for i in range(0, 256):
   2464                 if not cls._is_fd_assigned(i):
   2465                     os.dup2(conn.fileno(), i)
   2466         fd = reduction.recv_handle(conn)
   2467         if msvcrt:
   2468             fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
   2469         os.write(fd, data)
   2470         os.close(fd)
   2471 
   2472     @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
   2473     def test_fd_transfer(self):
   2474         if self.TYPE != 'processes':
   2475             self.skipTest("only makes sense with processes")
   2476         conn, child_conn = self.Pipe(duplex=True)
   2477 
   2478         p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
   2479         p.daemon = True
   2480         p.start()
   2481         self.addCleanup(test.support.unlink, test.support.TESTFN)
   2482         with open(test.support.TESTFN, "wb") as f:
   2483             fd = f.fileno()
   2484             if msvcrt:
   2485                 fd = msvcrt.get_osfhandle(fd)
   2486             reduction.send_handle(conn, fd, p.pid)
   2487         p.join()
   2488         with open(test.support.TESTFN, "rb") as f:
   2489             self.assertEqual(f.read(), b"foo")
   2490 
   2491     @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
   2492     @unittest.skipIf(sys.platform == "win32",
   2493                      "test semantics don't make sense on Windows")
   2494     @unittest.skipIf(MAXFD <= 256,
   2495                      "largest assignable fd number is too small")
   2496     @unittest.skipUnless(hasattr(os, "dup2"),
   2497                          "test needs os.dup2()")
   2498     def test_large_fd_transfer(self):
   2499         # With fd > 256 (issue #11657)
   2500         if self.TYPE != 'processes':
   2501             self.skipTest("only makes sense with processes")
   2502         conn, child_conn = self.Pipe(duplex=True)
   2503 
   2504         p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
   2505         p.daemon = True
   2506         p.start()
   2507         self.addCleanup(test.support.unlink, test.support.TESTFN)
   2508         with open(test.support.TESTFN, "wb") as f:
   2509             fd = f.fileno()
   2510             for newfd in range(256, MAXFD):
   2511                 if not self._is_fd_assigned(newfd):
   2512                     break
   2513             else:
   2514                 self.fail("could not find an unassigned large file descriptor")
   2515             os.dup2(fd, newfd)
   2516             try:
   2517                 reduction.send_handle(conn, newfd, p.pid)
   2518             finally:
   2519                 os.close(newfd)
   2520         p.join()
   2521         with open(test.support.TESTFN, "rb") as f:
   2522             self.assertEqual(f.read(), b"bar")
   2523 
   2524     @classmethod
   2525     def _send_data_without_fd(self, conn):
   2526         os.write(conn.fileno(), b"\0")
   2527 
   2528     @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
   2529     @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
   2530     def test_missing_fd_transfer(self):
   2531         # Check that exception is raised when received data is not
   2532         # accompanied by a file descriptor in ancillary data.
   2533         if self.TYPE != 'processes':
   2534             self.skipTest("only makes sense with processes")
   2535         conn, child_conn = self.Pipe(duplex=True)
   2536 
   2537         p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
   2538         p.daemon = True
   2539         p.start()
   2540         self.assertRaises(RuntimeError, reduction.recv_handle, conn)
   2541         p.join()
   2542 
   2543     def test_context(self):
   2544         a, b = self.Pipe()
   2545 
   2546         with a, b:
   2547             a.send(1729)
   2548             self.assertEqual(b.recv(), 1729)
   2549             if self.TYPE == 'processes':
   2550                 self.assertFalse(a.closed)
   2551                 self.assertFalse(b.closed)
   2552 
   2553         if self.TYPE == 'processes':
   2554             self.assertTrue(a.closed)
   2555             self.assertTrue(b.closed)
   2556             self.assertRaises(OSError, a.recv)
   2557             self.assertRaises(OSError, b.recv)
   2558 
   2559 class _TestListener(BaseTestCase):
   2560 
   2561     ALLOWED_TYPES = ('processes',)
   2562 
   2563     def test_multiple_bind(self):
   2564         for family in self.connection.families:
   2565             l = self.connection.Listener(family=family)
   2566             self.addCleanup(l.close)
   2567             self.assertRaises(OSError, self.connection.Listener,
   2568                               l.address, family)
   2569 
   2570     def test_context(self):
   2571         with self.connection.Listener() as l:
   2572             with self.connection.Client(l.address) as c:
   2573                 with l.accept() as d:
   2574                     c.send(1729)
   2575                     self.assertEqual(d.recv(), 1729)
   2576 
   2577         if self.TYPE == 'processes':
   2578             self.assertRaises(OSError, l.accept)
   2579 
   2580 class _TestListenerClient(BaseTestCase):
   2581 
   2582     ALLOWED_TYPES = ('processes', 'threads')
   2583 
   2584     @classmethod
   2585     def _test(cls, address):
   2586         conn = cls.connection.Client(address)
   2587         conn.send('hello')
   2588         conn.close()
   2589 
   2590     def test_listener_client(self):
   2591         for family in self.connection.families:
   2592             l = self.connection.Listener(family=family)
   2593             p = self.Process(target=self._test, args=(l.address,))
   2594             p.daemon = True
   2595             p.start()
   2596             conn = l.accept()
   2597             self.assertEqual(conn.recv(), 'hello')
   2598             p.join()
   2599             l.close()
   2600 
   2601     def test_issue14725(self):
   2602         l = self.connection.Listener()
   2603         p = self.Process(target=self._test, args=(l.address,))
   2604         p.daemon = True
   2605         p.start()
   2606         time.sleep(1)
   2607         # On Windows the client process should by now have connected,
   2608         # written data and closed the pipe handle by now.  This causes
   2609         # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
   2610         # 14725.
   2611         conn = l.accept()
   2612         self.assertEqual(conn.recv(), 'hello')
   2613         conn.close()
   2614         p.join()
   2615         l.close()
   2616 
   2617     def test_issue16955(self):
   2618         for fam in self.connection.families:
   2619             l = self.connection.Listener(family=fam)
   2620             c = self.connection.Client(l.address)
   2621             a = l.accept()
   2622             a.send_bytes(b"hello")
   2623             self.assertTrue(c.poll(1))
   2624             a.close()
   2625             c.close()
   2626             l.close()
   2627 
   2628 class _TestPoll(BaseTestCase):
   2629 
   2630     ALLOWED_TYPES = ('processes', 'threads')
   2631 
   2632     def test_empty_string(self):
   2633         a, b = self.Pipe()
   2634         self.assertEqual(a.poll(), False)
   2635         b.send_bytes(b'')
   2636         self.assertEqual(a.poll(), True)
   2637         self.assertEqual(a.poll(), True)
   2638 
   2639     @classmethod
   2640     def _child_strings(cls, conn, strings):
   2641         for s in strings:
   2642             time.sleep(0.1)
   2643             conn.send_bytes(s)
   2644         conn.close()
   2645 
   2646     def test_strings(self):
   2647         strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
   2648         a, b = self.Pipe()
   2649         p = self.Process(target=self._child_strings, args=(b, strings))
   2650         p.start()
   2651 
   2652         for s in strings:
   2653             for i in range(200):
   2654                 if a.poll(0.01):
   2655                     break
   2656             x = a.recv_bytes()
   2657             self.assertEqual(s, x)
   2658 
   2659         p.join()
   2660 
   2661     @classmethod
   2662     def _child_boundaries(cls, r):
   2663         # Polling may "pull" a message in to the child process, but we
   2664         # don't want it to pull only part of a message, as that would
   2665         # corrupt the pipe for any other processes which might later
   2666         # read from it.
   2667         r.poll(5)
   2668 
   2669     def test_boundaries(self):
   2670         r, w = self.Pipe(False)
   2671         p = self.Process(target=self._child_boundaries, args=(r,))
   2672         p.start()
   2673         time.sleep(2)
   2674         L = [b"first", b"second"]
   2675         for obj in L:
   2676             w.send_bytes(obj)
   2677         w.close()
   2678         p.join()
   2679         self.assertIn(r.recv_bytes(), L)
   2680 
   2681     @classmethod
   2682     def _child_dont_merge(cls, b):
   2683         b.send_bytes(b'a')
   2684         b.send_bytes(b'b')
   2685         b.send_bytes(b'cd')
   2686 
   2687     def test_dont_merge(self):
   2688         a, b = self.Pipe()
   2689         self.assertEqual(a.poll(0.0), False)
   2690         self.assertEqual(a.poll(0.1), False)
   2691 
   2692         p = self.Process(target=self._child_dont_merge, args=(b,))
   2693         p.start()
   2694 
   2695         self.assertEqual(a.recv_bytes(), b'a')
   2696         self.assertEqual(a.poll(1.0), True)
   2697         self.assertEqual(a.poll(1.0), True)
   2698         self.assertEqual(a.recv_bytes(), b'b')
   2699         self.assertEqual(a.poll(1.0), True)
   2700         self.assertEqual(a.poll(1.0), True)
   2701         self.assertEqual(a.poll(0.0), True)
   2702         self.assertEqual(a.recv_bytes(), b'cd')
   2703 
   2704         p.join()
   2705 
   2706 #
   2707 # Test of sending connection and socket objects between processes
   2708 #
   2709 
   2710 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
   2711 class _TestPicklingConnections(BaseTestCase):
   2712 
   2713     ALLOWED_TYPES = ('processes',)
   2714 
   2715     @classmethod
   2716     def tearDownClass(cls):
   2717         from multiprocessing import resource_sharer
   2718         resource_sharer.stop(timeout=5)
   2719 
   2720     @classmethod
   2721     def _listener(cls, conn, families):
   2722         for fam in families:
   2723             l = cls.connection.Listener(family=fam)
   2724             conn.send(l.address)
   2725             new_conn = l.accept()
   2726             conn.send(new_conn)
   2727             new_conn.close()
   2728             l.close()
   2729 
   2730         l = socket.socket()
   2731         l.bind((test.support.HOST, 0))
   2732         l.listen()
   2733         conn.send(l.getsockname())
   2734         new_conn, addr = l.accept()
   2735         conn.send(new_conn)
   2736         new_conn.close()
   2737         l.close()
   2738 
   2739         conn.recv()
   2740 
   2741     @classmethod
   2742     def _remote(cls, conn):
   2743         for (address, msg) in iter(conn.recv, None):
   2744             client = cls.connection.Client(address)
   2745             client.send(msg.upper())
   2746             client.close()
   2747 
   2748         address, msg = conn.recv()
   2749         client = socket.socket()
   2750         client.connect(address)
   2751         client.sendall(msg.upper())
   2752         client.close()
   2753 
   2754         conn.close()
   2755 
   2756     def test_pickling(self):
   2757         families = self.connection.families
   2758 
   2759         lconn, lconn0 = self.Pipe()
   2760         lp = self.Process(target=self._listener, args=(lconn0, families))
   2761         lp.daemon = True
   2762         lp.start()
   2763         lconn0.close()
   2764 
   2765         rconn, rconn0 = self.Pipe()
   2766         rp = self.Process(target=self._remote, args=(rconn0,))
   2767         rp.daemon = True
   2768         rp.start()
   2769         rconn0.close()
   2770 
   2771         for fam in families:
   2772             msg = ('This connection uses family %s' % fam).encode('ascii')
   2773             address = lconn.recv()
   2774             rconn.send((address, msg))
   2775             new_conn = lconn.recv()
   2776             self.assertEqual(new_conn.recv(), msg.upper())
   2777 
   2778         rconn.send(None)
   2779 
   2780         msg = latin('This connection uses a normal socket')
   2781         address = lconn.recv()
   2782         rconn.send((address, msg))
   2783         new_conn = lconn.recv()
   2784         buf = []
   2785         while True:
   2786             s = new_conn.recv(100)
   2787             if not s:
   2788                 break
   2789             buf.append(s)
   2790         buf = b''.join(buf)
   2791         self.assertEqual(buf, msg.upper())
   2792         new_conn.close()
   2793 
   2794         lconn.send(None)
   2795 
   2796         rconn.close()
   2797         lconn.close()
   2798 
   2799         lp.join()
   2800         rp.join()
   2801 
   2802     @classmethod
   2803     def child_access(cls, conn):
   2804         w = conn.recv()
   2805         w.send('all is well')
   2806         w.close()
   2807 
   2808         r = conn.recv()
   2809         msg = r.recv()
   2810         conn.send(msg*2)
   2811 
   2812         conn.close()
   2813 
   2814     def test_access(self):
   2815         # On Windows, if we do not specify a destination pid when
   2816         # using DupHandle then we need to be careful to use the
   2817         # correct access flags for DuplicateHandle(), or else
   2818         # DupHandle.detach() will raise PermissionError.  For example,
   2819         # for a read only pipe handle we should use
   2820         # access=FILE_GENERIC_READ.  (Unfortunately
   2821         # DUPLICATE_SAME_ACCESS does not work.)
   2822         conn, child_conn = self.Pipe()
   2823         p = self.Process(target=self.child_access, args=(child_conn,))
   2824         p.daemon = True
   2825         p.start()
   2826         child_conn.close()
   2827 
   2828         r, w = self.Pipe(duplex=False)
   2829         conn.send(w)
   2830         w.close()
   2831         self.assertEqual(r.recv(), 'all is well')
   2832         r.close()
   2833 
   2834         r, w = self.Pipe(duplex=False)
   2835         conn.send(r)
   2836         r.close()
   2837         w.send('foobar')
   2838         w.close()
   2839         self.assertEqual(conn.recv(), 'foobar'*2)
   2840 
   2841 #
   2842 #
   2843 #
   2844 
   2845 class _TestHeap(BaseTestCase):
   2846 
   2847     ALLOWED_TYPES = ('processes',)
   2848 
   2849     def test_heap(self):
   2850         iterations = 5000
   2851         maxblocks = 50
   2852         blocks = []
   2853 
   2854         # create and destroy lots of blocks of different sizes
   2855         for i in range(iterations):
   2856             size = int(random.lognormvariate(0, 1) * 1000)
   2857             b = multiprocessing.heap.BufferWrapper(size)
   2858             blocks.append(b)
   2859             if len(blocks) > maxblocks:
   2860                 i = random.randrange(maxblocks)
   2861                 del blocks[i]
   2862 
   2863         # get the heap object
   2864         heap = multiprocessing.heap.BufferWrapper._heap
   2865 
   2866         # verify the state of the heap
   2867         all = []
   2868         occupied = 0
   2869         heap._lock.acquire()
   2870         self.addCleanup(heap._lock.release)
   2871         for L in list(heap._len_to_seq.values()):
   2872             for arena, start, stop in L:
   2873                 all.append((heap._arenas.index(arena), start, stop,
   2874                             stop-start, 'free'))
   2875         for arena, start, stop in heap._allocated_blocks:
   2876             all.append((heap._arenas.index(arena), start, stop,
   2877                         stop-start, 'occupied'))
   2878             occupied += (stop-start)
   2879 
   2880         all.sort()
   2881 
   2882         for i in range(len(all)-1):
   2883             (arena, start, stop) = all[i][:3]
   2884             (narena, nstart, nstop) = all[i+1][:3]
   2885             self.assertTrue((arena != narena and nstart == 0) or
   2886                             (stop == nstart))
   2887 
   2888     def test_free_from_gc(self):
   2889         # Check that freeing of blocks by the garbage collector doesn't deadlock
   2890         # (issue #12352).
   2891         # Make sure the GC is enabled, and set lower collection thresholds to
   2892         # make collections more frequent (and increase the probability of
   2893         # deadlock).
   2894         if not gc.isenabled():
   2895             gc.enable()
   2896             self.addCleanup(gc.disable)
   2897         thresholds = gc.get_threshold()
   2898         self.addCleanup(gc.set_threshold, *thresholds)
   2899         gc.set_threshold(10)
   2900 
   2901         # perform numerous block allocations, with cyclic references to make
   2902         # sure objects are collected asynchronously by the gc
   2903         for i in range(5000):
   2904             a = multiprocessing.heap.BufferWrapper(1)
   2905             b = multiprocessing.heap.BufferWrapper(1)
   2906             # circular references
   2907             a.buddy = b
   2908             b.buddy = a
   2909 
   2910 #
   2911 #
   2912 #
   2913 
   2914 class _Foo(Structure):
   2915     _fields_ = [
   2916         ('x', c_int),
   2917         ('y', c_double)
   2918         ]
   2919 
   2920 class _TestSharedCTypes(BaseTestCase):
   2921 
   2922     ALLOWED_TYPES = ('processes',)
   2923 
   2924     def setUp(self):
   2925         if not HAS_SHAREDCTYPES:
   2926             self.skipTest("requires multiprocessing.sharedctypes")
   2927 
   2928     @classmethod
   2929     def _double(cls, x, y, foo, arr, string):
   2930         x.value *= 2
   2931         y.value *= 2
   2932         foo.x *= 2
   2933         foo.y *= 2
   2934         string.value *= 2
   2935         for i in range(len(arr)):
   2936             arr[i] *= 2
   2937 
   2938     def test_sharedctypes(self, lock=False):
   2939         x = Value('i', 7, lock=lock)
   2940         y = Value(c_double, 1.0/3.0, lock=lock)
   2941         foo = Value(_Foo, 3, 2, lock=lock)
   2942         arr = self.Array('d', list(range(10)), lock=lock)
   2943         string = self.Array('c', 20, lock=lock)
   2944         string.value = latin('hello')
   2945 
   2946         p = self.Process(target=self._double, args=(x, y, foo, arr, string))
   2947         p.daemon = True
   2948         p.start()
   2949         p.join()
   2950 
   2951         self.assertEqual(x.value, 14)
   2952         self.assertAlmostEqual(y.value, 2.0/3.0)
   2953         self.assertEqual(foo.x, 6)
   2954         self.assertAlmostEqual(foo.y, 4.0)
   2955         for i in range(10):
   2956             self.assertAlmostEqual(arr[i], i*2)
   2957         self.assertEqual(string.value, latin('hellohello'))
   2958 
   2959     def test_synchronize(self):
   2960         self.test_sharedctypes(lock=True)
   2961 
   2962     def test_copy(self):
   2963         foo = _Foo(2, 5.0)
   2964         bar = copy(foo)
   2965         foo.x = 0
   2966         foo.y = 0
   2967         self.assertEqual(bar.x, 2)
   2968         self.assertAlmostEqual(bar.y, 5.0)
   2969 
   2970 #
   2971 #
   2972 #
   2973 
   2974 class _TestFinalize(BaseTestCase):
   2975 
   2976     ALLOWED_TYPES = ('processes',)
   2977 
   2978     @classmethod
   2979     def _test_finalize(cls, conn):
   2980         class Foo(object):
   2981             pass
   2982 
   2983         a = Foo()
   2984         util.Finalize(a, conn.send, args=('a',))
   2985         del a           # triggers callback for a
   2986 
   2987         b = Foo()
   2988         close_b = util.Finalize(b, conn.send, args=('b',))
   2989         close_b()       # triggers callback for b
   2990         close_b()       # does nothing because callback has already been called
   2991         del b           # does nothing because callback has already been called
   2992 
   2993         c = Foo()
   2994         util.Finalize(c, conn.send, args=('c',))
   2995 
   2996         d10 = Foo()
   2997         util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
   2998 
   2999         d01 = Foo()
   3000         util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
   3001         d02 = Foo()
   3002         util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
   3003         d03 = Foo()
   3004         util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
   3005 
   3006         util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
   3007 
   3008         util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
   3009 
   3010         # call multiprocessing's cleanup function then exit process without
   3011         # garbage collecting locals
   3012         util._exit_function()
   3013         conn.close()
   3014         os._exit(0)
   3015 
   3016     def test_finalize(self):
   3017         conn, child_conn = self.Pipe()
   3018 
   3019         p = self.Process(target=self._test_finalize, args=(child_conn,))
   3020         p.daemon = True
   3021         p.start()
   3022         p.join()
   3023 
   3024         result = [obj for obj in iter(conn.recv, 'STOP')]
   3025         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
   3026 
   3027 #
   3028 # Test that from ... import * works for each module
   3029 #
   3030 
   3031 class _TestImportStar(unittest.TestCase):
   3032 
   3033     def get_module_names(self):
   3034         import glob
   3035         folder = os.path.dirname(multiprocessing.__file__)
   3036         pattern = os.path.join(folder, '*.py')
   3037         files = glob.glob(pattern)
   3038         modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
   3039         modules = ['multiprocessing.' + m for m in modules]
   3040         modules.remove('multiprocessing.__init__')
   3041         modules.append('multiprocessing')
   3042         return modules
   3043 
   3044     def test_import(self):
   3045         modules = self.get_module_names()
   3046         if sys.platform == 'win32':
   3047             modules.remove('multiprocessing.popen_fork')
   3048             modules.remove('multiprocessing.popen_forkserver')
   3049             modules.remove('multiprocessing.popen_spawn_posix')
   3050         else:
   3051             modules.remove('multiprocessing.popen_spawn_win32')
   3052             if not HAS_REDUCTION:
   3053                 modules.remove('multiprocessing.popen_forkserver')
   3054 
   3055         if c_int is None:
   3056             # This module requires _ctypes
   3057             modules.remove('multiprocessing.sharedctypes')
   3058 
   3059         for name in modules:
   3060             __import__(name)
   3061             mod = sys.modules[name]
   3062             self.assertTrue(hasattr(mod, '__all__'), name)
   3063 
   3064             for attr in mod.__all__:
   3065                 self.assertTrue(
   3066                     hasattr(mod, attr),
   3067                     '%r does not have attribute %r' % (mod, attr)
   3068                     )
   3069 
   3070 #
   3071 # Quick test that logging works -- does not test logging output
   3072 #
   3073 
   3074 class _TestLogging(BaseTestCase):
   3075 
   3076     ALLOWED_TYPES = ('processes',)
   3077 
   3078     def test_enable_logging(self):
   3079         logger = multiprocessing.get_logger()
   3080         logger.setLevel(util.SUBWARNING)
   3081         self.assertTrue(logger is not None)
   3082         logger.debug('this will not be printed')
   3083         logger.info('nor will this')
   3084         logger.setLevel(LOG_LEVEL)
   3085 
   3086     @classmethod
   3087     def _test_level(cls, conn):
   3088         logger = multiprocessing.get_logger()
   3089         conn.send(logger.getEffectiveLevel())
   3090 
   3091     def test_level(self):
   3092         LEVEL1 = 32
   3093         LEVEL2 = 37
   3094 
   3095         logger = multiprocessing.get_logger()
   3096         root_logger = logging.getLogger()
   3097         root_level = root_logger.level
   3098 
   3099         reader, writer = multiprocessing.Pipe(duplex=False)
   3100 
   3101         logger.setLevel(LEVEL1)
   3102         p = self.Process(target=self._test_level, args=(writer,))
   3103         p.daemon = True
   3104         p.start()
   3105         self.assertEqual(LEVEL1, reader.recv())
   3106 
   3107         logger.setLevel(logging.NOTSET)
   3108         root_logger.setLevel(LEVEL2)
   3109         p = self.Process(target=self._test_level, args=(writer,))
   3110         p.daemon = True
   3111         p.start()
   3112         self.assertEqual(LEVEL2, reader.recv())
   3113 
   3114         root_logger.setLevel(root_level)
   3115         logger.setLevel(level=LOG_LEVEL)
   3116 
   3117 
   3118 # class _TestLoggingProcessName(BaseTestCase):
   3119 #
   3120 #     def handle(self, record):
   3121 #         assert record.processName == multiprocessing.current_process().name
   3122 #         self.__handled = True
   3123 #
   3124 #     def test_logging(self):
   3125 #         handler = logging.Handler()
   3126 #         handler.handle = self.handle
   3127 #         self.__handled = False
   3128 #         # Bypass getLogger() and side-effects
   3129 #         logger = logging.getLoggerClass()(
   3130 #                 'multiprocessing.test.TestLoggingProcessName')
   3131 #         logger.addHandler(handler)
   3132 #         logger.propagate = False
   3133 #
   3134 #         logger.warn('foo')
   3135 #         assert self.__handled
   3136 
   3137 #
   3138 # Check that Process.join() retries if os.waitpid() fails with EINTR
   3139 #
   3140 
   3141 class _TestPollEintr(BaseTestCase):
   3142 
   3143     ALLOWED_TYPES = ('processes',)
   3144 
   3145     @classmethod
   3146     def _killer(cls, pid):
   3147         time.sleep(0.1)
   3148         os.kill(pid, signal.SIGUSR1)
   3149 
   3150     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
   3151     def test_poll_eintr(self):
   3152         got_signal = [False]
   3153         def record(*args):
   3154             got_signal[0] = True
   3155         pid = os.getpid()
   3156         oldhandler = signal.signal(signal.SIGUSR1, record)
   3157         try:
   3158             killer = self.Process(target=self._killer, args=(pid,))
   3159             killer.start()
   3160             try:
   3161                 p = self.Process(target=time.sleep, args=(2,))
   3162                 p.start()
   3163                 p.join()
   3164             finally:
   3165                 killer.join()
   3166             self.assertTrue(got_signal[0])
   3167             self.assertEqual(p.exitcode, 0)
   3168         finally:
   3169             signal.signal(signal.SIGUSR1, oldhandler)
   3170 
   3171 #
   3172 # Test to verify handle verification, see issue 3321
   3173 #
   3174 
   3175 class TestInvalidHandle(unittest.TestCase):
   3176 
   3177     @unittest.skipIf(WIN32, "skipped on Windows")
   3178     def test_invalid_handles(self):
   3179         conn = multiprocessing.connection.Connection(44977608)
   3180         # check that poll() doesn't crash
   3181         try:
   3182             conn.poll()
   3183         except (ValueError, OSError):
   3184             pass
   3185         finally:
   3186             # Hack private attribute _handle to avoid printing an error
   3187             # in conn.__del__
   3188             conn._handle = None
   3189         self.assertRaises((ValueError, OSError),
   3190                           multiprocessing.connection.Connection, -1)
   3191 
   3192 
   3193 
   3194 class OtherTest(unittest.TestCase):
   3195     # TODO: add more tests for deliver/answer challenge.
   3196     def test_deliver_challenge_auth_failure(self):
   3197         class _FakeConnection(object):
   3198             def recv_bytes(self, size):
   3199                 return b'something bogus'
   3200             def send_bytes(self, data):
   3201                 pass
   3202         self.assertRaises(multiprocessing.AuthenticationError,
   3203                           multiprocessing.connection.deliver_challenge,
   3204                           _FakeConnection(), b'abc')
   3205 
   3206     def test_answer_challenge_auth_failure(self):
   3207         class _FakeConnection(object):
   3208             def __init__(self):
   3209                 self.count = 0
   3210             def recv_bytes(self, size):
   3211                 self.count += 1
   3212                 if self.count == 1:
   3213                     return multiprocessing.connection.CHALLENGE
   3214                 elif self.count == 2:
   3215                     return b'something bogus'
   3216                 return b''
   3217             def send_bytes(self, data):
   3218                 pass
   3219         self.assertRaises(multiprocessing.AuthenticationError,
   3220                           multiprocessing.connection.answer_challenge,
   3221                           _FakeConnection(), b'abc')
   3222 
   3223 #
   3224 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
   3225 #
   3226 
   3227 def initializer(ns):
   3228     ns.test += 1
   3229 
   3230 class TestInitializers(unittest.TestCase):
   3231     def setUp(self):
   3232         self.mgr = multiprocessing.Manager()
   3233         self.ns = self.mgr.Namespace()
   3234         self.ns.test = 0
   3235 
   3236     def tearDown(self):
   3237         self.mgr.shutdown()
   3238         self.mgr.join()
   3239 
   3240     def test_manager_initializer(self):
   3241         m = multiprocessing.managers.SyncManager()
   3242         self.assertRaises(TypeError, m.start, 1)
   3243         m.start(initializer, (self.ns,))
   3244         self.assertEqual(self.ns.test, 1)
   3245         m.shutdown()
   3246         m.join()
   3247 
   3248     def test_pool_initializer(self):
   3249         self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
   3250         p = multiprocessing.Pool(1, initializer, (self.ns,))
   3251         p.close()
   3252         p.join()
   3253         self.assertEqual(self.ns.test, 1)
   3254 
   3255 #
   3256 # Issue 5155, 5313, 5331: Test process in processes
   3257 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
   3258 #
   3259 
   3260 def _this_sub_process(q):
   3261     try:
   3262         item = q.get(block=False)
   3263     except pyqueue.Empty:
   3264         pass
   3265 
   3266 def _test_process(q):
   3267     queue = multiprocessing.Queue()
   3268     subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
   3269     subProc.daemon = True
   3270     subProc.start()
   3271     subProc.join()
   3272 
   3273 def _afunc(x):
   3274     return x*x
   3275 
   3276 def pool_in_process():
   3277     pool = multiprocessing.Pool(processes=4)
   3278     x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
   3279     pool.close()
   3280     pool.join()
   3281 
   3282 class _file_like(object):
   3283     def __init__(self, delegate):
   3284         self._delegate = delegate
   3285         self._pid = None
   3286 
   3287     @property
   3288     def cache(self):
   3289         pid = os.getpid()
   3290         # There are no race conditions since fork keeps only the running thread
   3291         if pid != self._pid:
   3292             self._pid = pid
   3293             self._cache = []
   3294         return self._cache
   3295 
   3296     def write(self, data):
   3297         self.cache.append(data)
   3298 
   3299     def flush(self):
   3300         self._delegate.write(''.join(self.cache))
   3301         self._cache = []
   3302 
   3303 class TestStdinBadfiledescriptor(unittest.TestCase):
   3304 
   3305     def test_queue_in_process(self):
   3306         queue = multiprocessing.Queue()
   3307         proc = multiprocessing.Process(target=_test_process, args=(queue,))
   3308         proc.start()
   3309         proc.join()
   3310 
   3311     def test_pool_in_process(self):
   3312         p = multiprocessing.Process(target=pool_in_process)
   3313         p.start()
   3314         p.join()
   3315 
   3316     def test_flushing(self):
   3317         sio = io.StringIO()
   3318         flike = _file_like(sio)
   3319         flike.write('foo')
   3320         proc = multiprocessing.Process(target=lambda: flike.flush())
   3321         flike.flush()
   3322         assert sio.getvalue() == 'foo'
   3323 
   3324 
   3325 class TestWait(unittest.TestCase):
   3326 
   3327     @classmethod
   3328     def _child_test_wait(cls, w, slow):
   3329         for i in range(10):
   3330             if slow:
   3331                 time.sleep(random.random()*0.1)
   3332             w.send((i, os.getpid()))
   3333         w.close()
   3334 
   3335     def test_wait(self, slow=False):
   3336         from multiprocessing.connection import wait
   3337         readers = []
   3338         procs = []
   3339         messages = []
   3340 
   3341         for i in range(4):
   3342             r, w = multiprocessing.Pipe(duplex=False)
   3343             p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
   3344             p.daemon = True
   3345             p.start()
   3346             w.close()
   3347             readers.append(r)
   3348             procs.append(p)
   3349             self.addCleanup(p.join)
   3350 
   3351         while readers:
   3352             for r in wait(readers):
   3353                 try:
   3354                     msg = r.recv()
   3355                 except EOFError:
   3356                     readers.remove(r)
   3357                     r.close()
   3358                 else:
   3359                     messages.append(msg)
   3360 
   3361         messages.sort()
   3362         expected = sorted((i, p.pid) for i in range(10) for p in procs)
   3363         self.assertEqual(messages, expected)
   3364 
   3365     @classmethod
   3366     def _child_test_wait_socket(cls, address, slow):
   3367         s = socket.socket()
   3368         s.connect(address)
   3369         for i in range(10):
   3370             if slow:
   3371                 time.sleep(random.random()*0.1)
   3372             s.sendall(('%s\n' % i).encode('ascii'))
   3373         s.close()
   3374 
   3375     def test_wait_socket(self, slow=False):
   3376         from multiprocessing.connection import wait
   3377         l = socket.socket()
   3378         l.bind((test.support.HOST, 0))
   3379         l.listen()
   3380         addr = l.getsockname()
   3381         readers = []
   3382         procs = []
   3383         dic = {}
   3384 
   3385         for i in range(4):
   3386             p = multiprocessing.Process(target=self._child_test_wait_socket,
   3387                                         args=(addr, slow))
   3388             p.daemon = True
   3389             p.start()
   3390             procs.append(p)
   3391             self.addCleanup(p.join)
   3392 
   3393         for i in range(4):
   3394             r, _ = l.accept()
   3395             readers.append(r)
   3396             dic[r] = []
   3397         l.close()
   3398 
   3399         while readers:
   3400             for r in wait(readers):
   3401                 msg = r.recv(32)
   3402                 if not msg:
   3403                     readers.remove(r)
   3404                     r.close()
   3405                 else:
   3406                     dic[r].append(msg)
   3407 
   3408         expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
   3409         for v in dic.values():
   3410             self.assertEqual(b''.join(v), expected)
   3411 
   3412     def test_wait_slow(self):
   3413         self.test_wait(True)
   3414 
   3415     def test_wait_socket_slow(self):
   3416         self.test_wait_socket(True)
   3417 
   3418     def test_wait_timeout(self):
   3419         from multiprocessing.connection import wait
   3420 
   3421         expected = 5
   3422         a, b = multiprocessing.Pipe()
   3423 
   3424         start = time.time()
   3425         res = wait([a, b], expected)
   3426         delta = time.time() - start
   3427 
   3428         self.assertEqual(res, [])
   3429         self.assertLess(delta, expected * 2)
   3430         self.assertGreater(delta, expected * 0.5)
   3431 
   3432         b.send(None)
   3433 
   3434         start = time.time()
   3435         res = wait([a, b], 20)
   3436         delta = time.time() - start
   3437 
   3438         self.assertEqual(res, [a])
   3439         self.assertLess(delta, 0.4)
   3440 
   3441     @classmethod
   3442     def signal_and_sleep(cls, sem, period):
   3443         sem.release()
   3444         time.sleep(period)
   3445 
   3446     def test_wait_integer(self):
   3447         from multiprocessing.connection import wait
   3448 
   3449         expected = 3
   3450         sorted_ = lambda l: sorted(l, key=lambda x: id(x))
   3451         sem = multiprocessing.Semaphore(0)
   3452         a, b = multiprocessing.Pipe()
   3453         p = multiprocessing.Process(target=self.signal_and_sleep,
   3454                                     args=(sem, expected))
   3455 
   3456         p.start()
   3457         self.assertIsInstance(p.sentinel, int)
   3458         self.assertTrue(sem.acquire(timeout=20))
   3459 
   3460         start = time.time()
   3461         res = wait([a, p.sentinel, b], expected + 20)
   3462         delta = time.time() - start
   3463 
   3464         self.assertEqual(res, [p.sentinel])
   3465         self.assertLess(delta, expected + 2)
   3466         self.assertGreater(delta, expected - 2)
   3467 
   3468         a.send(None)
   3469 
   3470         start = time.time()
   3471         res = wait([a, p.sentinel, b], 20)
   3472         delta = time.time() - start
   3473 
   3474         self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
   3475         self.assertLess(delta, 0.4)
   3476 
   3477         b.send(None)
   3478 
   3479         start = time.time()
   3480         res = wait([a, p.sentinel, b], 20)
   3481         delta = time.time() - start
   3482 
   3483         self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
   3484         self.assertLess(delta, 0.4)
   3485 
   3486         p.terminate()
   3487         p.join()
   3488 
   3489     def test_neg_timeout(self):
   3490         from multiprocessing.connection import wait
   3491         a, b = multiprocessing.Pipe()
   3492         t = time.time()
   3493         res = wait([a], timeout=-1)
   3494         t = time.time() - t
   3495         self.assertEqual(res, [])
   3496         self.assertLess(t, 1)
   3497         a.close()
   3498         b.close()
   3499 
   3500 #
   3501 # Issue 14151: Test invalid family on invalid environment
   3502 #
   3503 
   3504 class TestInvalidFamily(unittest.TestCase):
   3505 
   3506     @unittest.skipIf(WIN32, "skipped on Windows")
   3507     def test_invalid_family(self):
   3508         with self.assertRaises(ValueError):
   3509             multiprocessing.connection.Listener(r'\\.\test')
   3510 
   3511     @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
   3512     def test_invalid_family_win32(self):
   3513         with self.assertRaises(ValueError):
   3514             multiprocessing.connection.Listener('/var/test.pipe')
   3515 
   3516 #
   3517 # Issue 12098: check sys.flags of child matches that for parent
   3518 #
   3519 
   3520 class TestFlags(unittest.TestCase):
   3521     @classmethod
   3522     def run_in_grandchild(cls, conn):
   3523         conn.send(tuple(sys.flags))
   3524 
   3525     @classmethod
   3526     def run_in_child(cls):
   3527         import json
   3528         r, w = multiprocessing.Pipe(duplex=False)
   3529         p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
   3530         p.start()
   3531         grandchild_flags = r.recv()
   3532         p.join()
   3533         r.close()
   3534         w.close()
   3535         flags = (tuple(sys.flags), grandchild_flags)
   3536         print(json.dumps(flags))
   3537 
   3538     def test_flags(self):
   3539         import json, subprocess
   3540         # start child process using unusual flags
   3541         prog = ('from test._test_multiprocessing import TestFlags; ' +
   3542                 'TestFlags.run_in_child()')
   3543         data = subprocess.check_output(
   3544             [sys.executable, '-E', '-S', '-O', '-c', prog])
   3545         child_flags, grandchild_flags = json.loads(data.decode('ascii'))
   3546         self.assertEqual(child_flags, grandchild_flags)
   3547 
   3548 #
   3549 # Test interaction with socket timeouts - see Issue #6056
   3550 #
   3551 
   3552 class TestTimeouts(unittest.TestCase):
   3553     @classmethod
   3554     def _test_timeout(cls, child, address):
   3555         time.sleep(1)
   3556         child.send(123)
   3557         child.close()
   3558         conn = multiprocessing.connection.Client(address)
   3559         conn.send(456)
   3560         conn.close()
   3561 
   3562     def test_timeout(self):
   3563         old_timeout = socket.getdefaulttimeout()
   3564         try:
   3565             socket.setdefaulttimeout(0.1)
   3566             parent, child = multiprocessing.Pipe(duplex=True)
   3567             l = multiprocessing.connection.Listener(family='AF_INET')
   3568             p = multiprocessing.Process(target=self._test_timeout,
   3569                                         args=(child, l.address))
   3570             p.start()
   3571             child.close()
   3572             self.assertEqual(parent.recv(), 123)
   3573             parent.close()
   3574             conn = l.accept()
   3575             self.assertEqual(conn.recv(), 456)
   3576             conn.close()
   3577             l.close()
   3578             p.join(10)
   3579         finally:
   3580             socket.setdefaulttimeout(old_timeout)
   3581 
   3582 #
   3583 # Test what happens with no "if __name__ == '__main__'"
   3584 #
   3585 
   3586 class TestNoForkBomb(unittest.TestCase):
   3587     def test_noforkbomb(self):
   3588         sm = multiprocessing.get_start_method()
   3589         name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
   3590         if sm != 'fork':
   3591             rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
   3592             self.assertEqual(out, b'')
   3593             self.assertIn(b'RuntimeError', err)
   3594         else:
   3595             rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
   3596             self.assertEqual(out.rstrip(), b'123')
   3597             self.assertEqual(err, b'')
   3598 
   3599 #
   3600 # Issue #17555: ForkAwareThreadLock
   3601 #
   3602 
   3603 class TestForkAwareThreadLock(unittest.TestCase):
   3604     # We recurisvely start processes.  Issue #17555 meant that the
   3605     # after fork registry would get duplicate entries for the same
   3606     # lock.  The size of the registry at generation n was ~2**n.
   3607 
   3608     @classmethod
   3609     def child(cls, n, conn):
   3610         if n > 1:
   3611             p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
   3612             p.start()
   3613             conn.close()
   3614             p.join(timeout=5)
   3615         else:
   3616             conn.send(len(util._afterfork_registry))
   3617         conn.close()
   3618 
   3619     def test_lock(self):
   3620         r, w = multiprocessing.Pipe(False)
   3621         l = util.ForkAwareThreadLock()
   3622         old_size = len(util._afterfork_registry)
   3623         p = multiprocessing.Process(target=self.child, args=(5, w))
   3624         p.start()
   3625         w.close()
   3626         new_size = r.recv()
   3627         p.join(timeout=5)
   3628         self.assertLessEqual(new_size, old_size)
   3629 
   3630 #
   3631 # Check that non-forked child processes do not inherit unneeded fds/handles
   3632 #
   3633 
   3634 class TestCloseFds(unittest.TestCase):
   3635 
   3636     def get_high_socket_fd(self):
   3637         if WIN32:
   3638             # The child process will not have any socket handles, so
   3639             # calling socket.fromfd() should produce WSAENOTSOCK even
   3640             # if there is a handle of the same number.
   3641             return socket.socket().detach()
   3642         else:
   3643             # We want to produce a socket with an fd high enough that a
   3644             # freshly created child process will not have any fds as high.
   3645             fd = socket.socket().detach()
   3646             to_close = []
   3647             while fd < 50:
   3648                 to_close.append(fd)
   3649                 fd = os.dup(fd)
   3650             for x in to_close:
   3651                 os.close(x)
   3652             return fd
   3653 
   3654     def close(self, fd):
   3655         if WIN32:
   3656             socket.socket(fileno=fd).close()
   3657         else:
   3658             os.close(fd)
   3659 
   3660     @classmethod
   3661     def _test_closefds(cls, conn, fd):
   3662         try:
   3663             s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
   3664         except Exception as e:
   3665             conn.send(e)
   3666         else:
   3667             s.close()
   3668             conn.send(None)
   3669 
   3670     def test_closefd(self):
   3671         if not HAS_REDUCTION:
   3672             raise unittest.SkipTest('requires fd pickling')
   3673 
   3674         reader, writer = multiprocessing.Pipe()
   3675         fd = self.get_high_socket_fd()
   3676         try:
   3677             p = multiprocessing.Process(target=self._test_closefds,
   3678                                         args=(writer, fd))
   3679             p.start()
   3680             writer.close()
   3681             e = reader.recv()
   3682             p.join(timeout=5)
   3683         finally:
   3684             self.close(fd)
   3685             writer.close()
   3686             reader.close()
   3687 
   3688         if multiprocessing.get_start_method() == 'fork':
   3689             self.assertIs(e, None)
   3690         else:
   3691             WSAENOTSOCK = 10038
   3692             self.assertIsInstance(e, OSError)
   3693             self.assertTrue(e.errno == errno.EBADF or
   3694                             e.winerror == WSAENOTSOCK, e)
   3695 
   3696 #
   3697 # Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
   3698 #
   3699 
   3700 class TestIgnoreEINTR(unittest.TestCase):
   3701 
   3702     @classmethod
   3703     def _test_ignore(cls, conn):
   3704         def handler(signum, frame):
   3705             pass
   3706         signal.signal(signal.SIGUSR1, handler)
   3707         conn.send('ready')
   3708         x = conn.recv()
   3709         conn.send(x)
   3710         conn.send_bytes(b'x'*(1024*1024))   # sending 1 MB should block
   3711 
   3712     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
   3713     def test_ignore(self):
   3714         conn, child_conn = multiprocessing.Pipe()
   3715         try:
   3716             p = multiprocessing.Process(target=self._test_ignore,
   3717                                         args=(child_conn,))
   3718             p.daemon = True
   3719             p.start()
   3720             child_conn.close()
   3721             self.assertEqual(conn.recv(), 'ready')
   3722             time.sleep(0.1)
   3723             os.kill(p.pid, signal.SIGUSR1)
   3724             time.sleep(0.1)
   3725             conn.send(1234)
   3726             self.assertEqual(conn.recv(), 1234)
   3727             time.sleep(0.1)
   3728             os.kill(p.pid, signal.SIGUSR1)
   3729             self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
   3730             time.sleep(0.1)
   3731             p.join()
   3732         finally:
   3733             conn.close()
   3734 
   3735     @classmethod
   3736     def _test_ignore_listener(cls, conn):
   3737         def handler(signum, frame):
   3738             pass
   3739         signal.signal(signal.SIGUSR1, handler)
   3740         with multiprocessing.connection.Listener() as l:
   3741             conn.send(l.address)
   3742             a = l.accept()
   3743             a.send('welcome')
   3744 
   3745     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
   3746     def test_ignore_listener(self):
   3747         conn, child_conn = multiprocessing.Pipe()
   3748         try:
   3749             p = multiprocessing.Process(target=self._test_ignore_listener,
   3750                                         args=(child_conn,))
   3751             p.daemon = True
   3752             p.start()
   3753             child_conn.close()
   3754             address = conn.recv()
   3755             time.sleep(0.1)
   3756             os.kill(p.pid, signal.SIGUSR1)
   3757             time.sleep(0.1)
   3758             client = multiprocessing.connection.Client(address)
   3759             self.assertEqual(client.recv(), 'welcome')
   3760             p.join()
   3761         finally:
   3762             conn.close()
   3763 
   3764 class TestStartMethod(unittest.TestCase):
   3765     @classmethod
   3766     def _check_context(cls, conn):
   3767         conn.send(multiprocessing.get_start_method())
   3768 
   3769     def check_context(self, ctx):
   3770         r, w = ctx.Pipe(duplex=False)
   3771         p = ctx.Process(target=self._check_context, args=(w,))
   3772         p.start()
   3773         w.close()
   3774         child_method = r.recv()
   3775         r.close()
   3776         p.join()
   3777         self.assertEqual(child_method, ctx.get_start_method())
   3778 
   3779     def test_context(self):
   3780         for method in ('fork', 'spawn', 'forkserver'):
   3781             try:
   3782                 ctx = multiprocessing.get_context(method)
   3783             except ValueError:
   3784                 continue
   3785             self.assertEqual(ctx.get_start_method(), method)
   3786             self.assertIs(ctx.get_context(), ctx)
   3787             self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
   3788             self.assertRaises(ValueError, ctx.set_start_method, None)
   3789             self.check_context(ctx)
   3790 
   3791     def test_set_get(self):
   3792         multiprocessing.set_forkserver_preload(PRELOAD)
   3793         count = 0
   3794         old_method = multiprocessing.get_start_method()
   3795         try:
   3796             for method in ('fork', 'spawn', 'forkserver'):
   3797                 try:
   3798                     multiprocessing.set_start_method(method, force=True)
   3799                 except ValueError:
   3800                     continue
   3801                 self.assertEqual(multiprocessing.get_start_method(), method)
   3802                 ctx = multiprocessing.get_context()
   3803                 self.assertEqual(ctx.get_start_method(), method)
   3804                 self.assertTrue(type(ctx).__name__.lower().startswith(method))
   3805                 self.assertTrue(
   3806                     ctx.Process.__name__.lower().startswith(method))
   3807                 self.check_context(multiprocessing)
   3808                 count += 1
   3809         finally:
   3810             multiprocessing.set_start_method(old_method, force=True)
   3811         self.assertGreaterEqual(count, 1)
   3812 
   3813     def test_get_all(self):
   3814         methods = multiprocessing.get_all_start_methods()
   3815         if sys.platform == 'win32':
   3816             self.assertEqual(methods, ['spawn'])
   3817         else:
   3818             self.assertTrue(methods == ['fork', 'spawn'] or
   3819                             methods == ['fork', 'spawn', 'forkserver'])
   3820 
   3821     def test_preload_resources(self):
   3822         if multiprocessing.get_start_method() != 'forkserver':
   3823             self.skipTest("test only relevant for 'forkserver' method")
   3824         name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
   3825         rc, out, err = test.support.script_helper.assert_python_ok(name)
   3826         out = out.decode()
   3827         err = err.decode()
   3828         if out.rstrip() != 'ok' or err != '':
   3829             print(out)
   3830             print(err)
   3831             self.fail("failed spawning forkserver or grandchild")
   3832 
   3833 
   3834 #
   3835 # Check that killing process does not leak named semaphores
   3836 #
   3837 
   3838 @unittest.skipIf(sys.platform == "win32",
   3839                  "test semantics don't make sense on Windows")
   3840 class TestSemaphoreTracker(unittest.TestCase):
   3841     def test_semaphore_tracker(self):
   3842         import subprocess
   3843         cmd = '''if 1:
   3844             import multiprocessing as mp, time, os
   3845             mp.set_start_method("spawn")
   3846             lock1 = mp.Lock()
   3847             lock2 = mp.Lock()
   3848             os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
   3849             os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
   3850             time.sleep(10)
   3851         '''
   3852         r, w = os.pipe()
   3853         p = subprocess.Popen([sys.executable,
   3854                              '-c', cmd % (w, w)],
   3855                              pass_fds=[w],
   3856                              stderr=subprocess.PIPE)
   3857         os.close(w)
   3858         with open(r, 'rb', closefd=True) as f:
   3859             name1 = f.readline().rstrip().decode('ascii')
   3860             name2 = f.readline().rstrip().decode('ascii')
   3861         _multiprocessing.sem_unlink(name1)
   3862         p.terminate()
   3863         p.wait()
   3864         time.sleep(2.0)
   3865         with self.assertRaises(OSError) as ctx:
   3866             _multiprocessing.sem_unlink(name2)
   3867         # docs say it should be ENOENT, but OSX seems to give EINVAL
   3868         self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
   3869         err = p.stderr.read().decode('utf-8')
   3870         p.stderr.close()
   3871         expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
   3872         self.assertRegex(err, expected)
   3873         self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
   3874 
   3875 #
   3876 # Mixins
   3877 #
   3878 
   3879 class ProcessesMixin(object):
   3880     TYPE = 'processes'
   3881     Process = multiprocessing.Process
   3882     connection = multiprocessing.connection
   3883     current_process = staticmethod(multiprocessing.current_process)
   3884     active_children = staticmethod(multiprocessing.active_children)
   3885     Pool = staticmethod(multiprocessing.Pool)
   3886     Pipe = staticmethod(multiprocessing.Pipe)
   3887     Queue = staticmethod(multiprocessing.Queue)
   3888     JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
   3889     Lock = staticmethod(multiprocessing.Lock)
   3890     RLock = staticmethod(multiprocessing.RLock)
   3891     Semaphore = staticmethod(multiprocessing.Semaphore)
   3892     BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
   3893     Condition = staticmethod(multiprocessing.Condition)
   3894     Event = staticmethod(multiprocessing.Event)
   3895     Barrier = staticmethod(multiprocessing.Barrier)
   3896     Value = staticmethod(multiprocessing.Value)
   3897     Array = staticmethod(multiprocessing.Array)
   3898     RawValue = staticmethod(multiprocessing.RawValue)
   3899     RawArray = staticmethod(multiprocessing.RawArray)
   3900 
   3901 
   3902 class ManagerMixin(object):
   3903     TYPE = 'manager'
   3904     Process = multiprocessing.Process
   3905     Queue = property(operator.attrgetter('manager.Queue'))
   3906     JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
   3907     Lock = property(operator.attrgetter('manager.Lock'))
   3908     RLock = property(operator.attrgetter('manager.RLock'))
   3909     Semaphore = property(operator.attrgetter('manager.Semaphore'))
   3910     BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
   3911     Condition = property(operator.attrgetter('manager.Condition'))
   3912     Event = property(operator.attrgetter('manager.Event'))
   3913     Barrier = property(operator.attrgetter('manager.Barrier'))
   3914     Value = property(operator.attrgetter('manager.Value'))
   3915     Array = property(operator.attrgetter('manager.Array'))
   3916     list = property(operator.attrgetter('manager.list'))
   3917     dict = property(operator.attrgetter('manager.dict'))
   3918     Namespace = property(operator.attrgetter('manager.Namespace'))
   3919 
   3920     @classmethod
   3921     def Pool(cls, *args, **kwds):
   3922         return cls.manager.Pool(*args, **kwds)
   3923 
   3924     @classmethod
   3925     def setUpClass(cls):
   3926         cls.manager = multiprocessing.Manager()
   3927 
   3928     @classmethod
   3929     def tearDownClass(cls):
   3930         # only the manager process should be returned by active_children()
   3931         # but this can take a bit on slow machines, so wait a few seconds
   3932         # if there are other children too (see #17395)
   3933         t = 0.01
   3934         while len(multiprocessing.active_children()) > 1 and t < 5:
   3935             time.sleep(t)
   3936             t *= 2
   3937         gc.collect()                       # do garbage collection
   3938         if cls.manager._number_of_objects() != 0:
   3939             # This is not really an error since some tests do not
   3940             # ensure that all processes which hold a reference to a
   3941             # managed object have been joined.
   3942             print('Shared objects which still exist at manager shutdown:')
   3943             print(cls.manager._debug_info())
   3944         cls.manager.shutdown()
   3945         cls.manager.join()
   3946         cls.manager = None
   3947 
   3948 
   3949 class ThreadsMixin(object):
   3950     TYPE = 'threads'
   3951     Process = multiprocessing.dummy.Process
   3952     connection = multiprocessing.dummy.connection
   3953     current_process = staticmethod(multiprocessing.dummy.current_process)
   3954     active_children = staticmethod(multiprocessing.dummy.active_children)
   3955     Pool = staticmethod(multiprocessing.dummy.Pool)
   3956     Pipe = staticmethod(multiprocessing.dummy.Pipe)
   3957     Queue = staticmethod(multiprocessing.dummy.Queue)
   3958     JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
   3959     Lock = staticmethod(multiprocessing.dummy.Lock)
   3960     RLock = staticmethod(multiprocessing.dummy.RLock)
   3961     Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
   3962     BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
   3963     Condition = staticmethod(multiprocessing.dummy.Condition)
   3964     Event = staticmethod(multiprocessing.dummy.Event)
   3965     Barrier = staticmethod(multiprocessing.dummy.Barrier)
   3966     Value = staticmethod(multiprocessing.dummy.Value)
   3967     Array = staticmethod(multiprocessing.dummy.Array)
   3968 
   3969 #
   3970 # Functions used to create test cases from the base ones in this module
   3971 #
   3972 
   3973 def install_tests_in_module_dict(remote_globs, start_method):
   3974     __module__ = remote_globs['__name__']
   3975     local_globs = globals()
   3976     ALL_TYPES = {'processes', 'threads', 'manager'}
   3977 
   3978     for name, base in local_globs.items():
   3979         if not isinstance(base, type):
   3980             continue
   3981         if issubclass(base, BaseTestCase):
   3982             if base is BaseTestCase:
   3983                 continue
   3984             assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
   3985             for type_ in base.ALLOWED_TYPES:
   3986                 newname = 'With' + type_.capitalize() + name[1:]
   3987                 Mixin = local_globs[type_.capitalize() + 'Mixin']
   3988                 class Temp(base, Mixin, unittest.TestCase):
   3989                     pass
   3990                 Temp.__name__ = Temp.__qualname__ = newname
   3991                 Temp.__module__ = __module__
   3992                 remote_globs[newname] = Temp
   3993         elif issubclass(base, unittest.TestCase):
   3994             class Temp(base, object):
   3995                 pass
   3996             Temp.__name__ = Temp.__qualname__ = name
   3997             Temp.__module__ = __module__
   3998             remote_globs[name] = Temp
   3999 
   4000     dangling = [None, None]
   4001     old_start_method = [None]
   4002 
   4003     def setUpModule():
   4004         multiprocessing.set_forkserver_preload(PRELOAD)
   4005         multiprocessing.process._cleanup()
   4006         dangling[0] = multiprocessing.process._dangling.copy()
   4007         dangling[1] = threading._dangling.copy()
   4008         old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
   4009         try:
   4010             multiprocessing.set_start_method(start_method, force=True)
   4011         except ValueError:
   4012             raise unittest.SkipTest(start_method +
   4013                                     ' start method not supported')
   4014 
   4015         if sys.platform.startswith("linux"):
   4016             try:
   4017                 lock = multiprocessing.RLock()
   4018             except OSError:
   4019                 raise unittest.SkipTest("OSError raises on RLock creation, "
   4020                                         "see issue 3111!")
   4021         check_enough_semaphores()
   4022         util.get_temp_dir()     # creates temp directory
   4023         multiprocessing.get_logger().setLevel(LOG_LEVEL)
   4024 
   4025     def tearDownModule():
   4026         multiprocessing.set_start_method(old_start_method[0], force=True)
   4027         # pause a bit so we don't get warning about dangling threads/processes
   4028         time.sleep(0.5)
   4029         multiprocessing.process._cleanup()
   4030         gc.collect()
   4031         tmp = set(multiprocessing.process._dangling) - set(dangling[0])
   4032         if tmp:
   4033             print('Dangling processes:', tmp, file=sys.stderr)
   4034         del tmp
   4035         tmp = set(threading._dangling) - set(dangling[1])
   4036         if tmp:
   4037             print('Dangling threads:', tmp, file=sys.stderr)
   4038 
   4039     remote_globs['setUpModule'] = setUpModule
   4040     remote_globs['tearDownModule'] = tearDownModule
   4041