Home | History | Annotate | Download | only in test
      1 #!/usr/bin/env python
      2 
      3 #
      4 # Unit tests for the multiprocessing package
      5 #
      6 
      7 import unittest
      8 import Queue
      9 import time
     10 import sys
     11 import os
     12 import gc
     13 import signal
     14 import array
     15 import socket
     16 import random
     17 import logging
     18 import errno
     19 import test.script_helper
     20 from test import test_support
     21 from StringIO import StringIO
     22 _multiprocessing = test_support.import_module('_multiprocessing')
     23 # import threading after _multiprocessing to raise a more relevant error
     24 # message: "No module named _multiprocessing". _multiprocessing is not compiled
     25 # without thread support.
     26 import threading
     27 
     28 # Work around broken sem_open implementations
     29 test_support.import_module('multiprocessing.synchronize')
     30 
     31 import multiprocessing.dummy
     32 import multiprocessing.connection
     33 import multiprocessing.managers
     34 import multiprocessing.heap
     35 import multiprocessing.pool
     36 
     37 from multiprocessing import util
     38 
     39 try:
     40     from multiprocessing import reduction
     41     HAS_REDUCTION = True
     42 except ImportError:
     43     HAS_REDUCTION = False
     44 
     45 try:
     46     from multiprocessing.sharedctypes import Value, copy
     47     HAS_SHAREDCTYPES = True
     48 except ImportError:
     49     HAS_SHAREDCTYPES = False
     50 
     51 try:
     52     import msvcrt
     53 except ImportError:
     54     msvcrt = None
     55 
     56 #
     57 #
     58 #
     59 
     60 latin = str
     61 
     62 #
     63 # Constants
     64 #
     65 
     66 LOG_LEVEL = util.SUBWARNING
     67 #LOG_LEVEL = logging.DEBUG
     68 
     69 DELTA = 0.1
     70 CHECK_TIMINGS = False     # making true makes tests take a lot longer
     71                           # and can sometimes cause some non-serious
     72                           # failures because some calls block a bit
     73                           # longer than expected
     74 if CHECK_TIMINGS:
     75     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
     76 else:
     77     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
     78 
     79 HAVE_GETVALUE = not getattr(_multiprocessing,
     80                             'HAVE_BROKEN_SEM_GETVALUE', False)
     81 
     82 WIN32 = (sys.platform == "win32")
     83 
     84 try:
     85     MAXFD = os.sysconf("SC_OPEN_MAX")
     86 except:
     87     MAXFD = 256
     88 
     89 #
     90 # Some tests require ctypes
     91 #
     92 
     93 try:
     94     from ctypes import Structure, c_int, c_double
     95 except ImportError:
     96     Structure = object
     97     c_int = c_double = None
     98 
     99 
    100 def check_enough_semaphores():
    101     """Check that the system supports enough semaphores to run the test."""
    102     # minimum number of semaphores available according to POSIX
    103     nsems_min = 256
    104     try:
    105         nsems = os.sysconf("SC_SEM_NSEMS_MAX")
    106     except (AttributeError, ValueError):
    107         # sysconf not available or setting not available
    108         return
    109     if nsems == -1 or nsems >= nsems_min:
    110         return
    111     raise unittest.SkipTest("The OS doesn't support enough semaphores "
    112                             "to run the test (required: %d)." % nsems_min)
    113 
    114 
    115 #
    116 # Creates a wrapper for a function which records the time it takes to finish
    117 #
    118 
    119 class TimingWrapper(object):
    120 
    121     def __init__(self, func):
    122         self.func = func
    123         self.elapsed = None
    124 
    125     def __call__(self, *args, **kwds):
    126         t = time.time()
    127         try:
    128             return self.func(*args, **kwds)
    129         finally:
    130             self.elapsed = time.time() - t
    131 
    132 #
    133 # Base class for test cases
    134 #
    135 
    136 class BaseTestCase(object):
    137 
    138     ALLOWED_TYPES = ('processes', 'manager', 'threads')
    139 
    140     def assertTimingAlmostEqual(self, a, b):
    141         if CHECK_TIMINGS:
    142             self.assertAlmostEqual(a, b, 1)
    143 
    144     def assertReturnsIfImplemented(self, value, func, *args):
    145         try:
    146             res = func(*args)
    147         except NotImplementedError:
    148             pass
    149         else:
    150             return self.assertEqual(value, res)
    151 
    152     # For the sanity of Windows users, rather than crashing or freezing in
    153     # multiple ways.
    154     def __reduce__(self, *args):
    155         raise NotImplementedError("shouldn't try to pickle a test case")
    156 
    157     __reduce_ex__ = __reduce__
    158 
    159 #
    160 # Return the value of a semaphore
    161 #
    162 
    163 def get_value(self):
    164     try:
    165         return self.get_value()
    166     except AttributeError:
    167         try:
    168             return self._Semaphore__value
    169         except AttributeError:
    170             try:
    171                 return self._value
    172             except AttributeError:
    173                 raise NotImplementedError
    174 
    175 #
    176 # Testcases
    177 #
    178 
    179 class _TestProcess(BaseTestCase):
    180 
    181     ALLOWED_TYPES = ('processes', 'threads')
    182 
    183     def test_current(self):
    184         if self.TYPE == 'threads':
    185             return
    186 
    187         current = self.current_process()
    188         authkey = current.authkey
    189 
    190         self.assertTrue(current.is_alive())
    191         self.assertTrue(not current.daemon)
    192         self.assertIsInstance(authkey, bytes)
    193         self.assertTrue(len(authkey) > 0)
    194         self.assertEqual(current.ident, os.getpid())
    195         self.assertEqual(current.exitcode, None)
    196 
    197     @classmethod
    198     def _test(cls, q, *args, **kwds):
    199         current = cls.current_process()
    200         q.put(args)
    201         q.put(kwds)
    202         q.put(current.name)
    203         if cls.TYPE != 'threads':
    204             q.put(bytes(current.authkey))
    205             q.put(current.pid)
    206 
    207     def test_process(self):
    208         q = self.Queue(1)
    209         e = self.Event()
    210         args = (q, 1, 2)
    211         kwargs = {'hello':23, 'bye':2.54}
    212         name = 'SomeProcess'
    213         p = self.Process(
    214             target=self._test, args=args, kwargs=kwargs, name=name
    215             )
    216         p.daemon = True
    217         current = self.current_process()
    218 
    219         if self.TYPE != 'threads':
    220             self.assertEqual(p.authkey, current.authkey)
    221         self.assertEqual(p.is_alive(), False)
    222         self.assertEqual(p.daemon, True)
    223         self.assertNotIn(p, self.active_children())
    224         self.assertTrue(type(self.active_children()) is list)
    225         self.assertEqual(p.exitcode, None)
    226 
    227         p.start()
    228 
    229         self.assertEqual(p.exitcode, None)
    230         self.assertEqual(p.is_alive(), True)
    231         self.assertIn(p, self.active_children())
    232 
    233         self.assertEqual(q.get(), args[1:])
    234         self.assertEqual(q.get(), kwargs)
    235         self.assertEqual(q.get(), p.name)
    236         if self.TYPE != 'threads':
    237             self.assertEqual(q.get(), current.authkey)
    238             self.assertEqual(q.get(), p.pid)
    239 
    240         p.join()
    241 
    242         self.assertEqual(p.exitcode, 0)
    243         self.assertEqual(p.is_alive(), False)
    244         self.assertNotIn(p, self.active_children())
    245 
    246     @classmethod
    247     def _test_terminate(cls):
    248         time.sleep(1000)
    249 
    250     def test_terminate(self):
    251         if self.TYPE == 'threads':
    252             return
    253 
    254         p = self.Process(target=self._test_terminate)
    255         p.daemon = True
    256         p.start()
    257 
    258         self.assertEqual(p.is_alive(), True)
    259         self.assertIn(p, self.active_children())
    260         self.assertEqual(p.exitcode, None)
    261 
    262         p.terminate()
    263 
    264         join = TimingWrapper(p.join)
    265         self.assertEqual(join(), None)
    266         self.assertTimingAlmostEqual(join.elapsed, 0.0)
    267 
    268         self.assertEqual(p.is_alive(), False)
    269         self.assertNotIn(p, self.active_children())
    270 
    271         p.join()
    272 
    273         # XXX sometimes get p.exitcode == 0 on Windows ...
    274         #self.assertEqual(p.exitcode, -signal.SIGTERM)
    275 
    276     def test_cpu_count(self):
    277         try:
    278             cpus = multiprocessing.cpu_count()
    279         except NotImplementedError:
    280             cpus = 1
    281         self.assertTrue(type(cpus) is int)
    282         self.assertTrue(cpus >= 1)
    283 
    284     def test_active_children(self):
    285         self.assertEqual(type(self.active_children()), list)
    286 
    287         p = self.Process(target=time.sleep, args=(DELTA,))
    288         self.assertNotIn(p, self.active_children())
    289 
    290         p.daemon = True
    291         p.start()
    292         self.assertIn(p, self.active_children())
    293 
    294         p.join()
    295         self.assertNotIn(p, self.active_children())
    296 
    297     @classmethod
    298     def _test_recursion(cls, wconn, id):
    299         from multiprocessing import forking
    300         wconn.send(id)
    301         if len(id) < 2:
    302             for i in range(2):
    303                 p = cls.Process(
    304                     target=cls._test_recursion, args=(wconn, id+[i])
    305                     )
    306                 p.start()
    307                 p.join()
    308 
    309     def test_recursion(self):
    310         rconn, wconn = self.Pipe(duplex=False)
    311         self._test_recursion(wconn, [])
    312 
    313         time.sleep(DELTA)
    314         result = []
    315         while rconn.poll():
    316             result.append(rconn.recv())
    317 
    318         expected = [
    319             [],
    320               [0],
    321                 [0, 0],
    322                 [0, 1],
    323               [1],
    324                 [1, 0],
    325                 [1, 1]
    326             ]
    327         self.assertEqual(result, expected)
    328 
    329     @classmethod
    330     def _test_sys_exit(cls, reason, testfn):
    331         sys.stderr = open(testfn, 'w')
    332         sys.exit(reason)
    333 
    334     def test_sys_exit(self):
    335         # See Issue 13854
    336         if self.TYPE == 'threads':
    337             return
    338 
    339         testfn = test_support.TESTFN
    340         self.addCleanup(test_support.unlink, testfn)
    341 
    342         for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
    343             p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
    344             p.daemon = True
    345             p.start()
    346             p.join(5)
    347             self.assertEqual(p.exitcode, code)
    348 
    349             with open(testfn, 'r') as f:
    350                 self.assertEqual(f.read().rstrip(), str(reason))
    351 
    352         for reason in (True, False, 8):
    353             p = self.Process(target=sys.exit, args=(reason,))
    354             p.daemon = True
    355             p.start()
    356             p.join(5)
    357             self.assertEqual(p.exitcode, reason)
    358 
    359 #
    360 #
    361 #
    362 
    363 class _UpperCaser(multiprocessing.Process):
    364 
    365     def __init__(self):
    366         multiprocessing.Process.__init__(self)
    367         self.child_conn, self.parent_conn = multiprocessing.Pipe()
    368 
    369     def run(self):
    370         self.parent_conn.close()
    371         for s in iter(self.child_conn.recv, None):
    372             self.child_conn.send(s.upper())
    373         self.child_conn.close()
    374 
    375     def submit(self, s):
    376         assert type(s) is str
    377         self.parent_conn.send(s)
    378         return self.parent_conn.recv()
    379 
    380     def stop(self):
    381         self.parent_conn.send(None)
    382         self.parent_conn.close()
    383         self.child_conn.close()
    384 
    385 class _TestSubclassingProcess(BaseTestCase):
    386 
    387     ALLOWED_TYPES = ('processes',)
    388 
    389     def test_subclassing(self):
    390         uppercaser = _UpperCaser()
    391         uppercaser.daemon = True
    392         uppercaser.start()
    393         self.assertEqual(uppercaser.submit('hello'), 'HELLO')
    394         self.assertEqual(uppercaser.submit('world'), 'WORLD')
    395         uppercaser.stop()
    396         uppercaser.join()
    397 
    398 #
    399 #
    400 #
    401 
    402 def queue_empty(q):
    403     if hasattr(q, 'empty'):
    404         return q.empty()
    405     else:
    406         return q.qsize() == 0
    407 
    408 def queue_full(q, maxsize):
    409     if hasattr(q, 'full'):
    410         return q.full()
    411     else:
    412         return q.qsize() == maxsize
    413 
    414 
    415 class _TestQueue(BaseTestCase):
    416 
    417 
    418     @classmethod
    419     def _test_put(cls, queue, child_can_start, parent_can_continue):
    420         child_can_start.wait()
    421         for i in range(6):
    422             queue.get()
    423         parent_can_continue.set()
    424 
    425     def test_put(self):
    426         MAXSIZE = 6
    427         queue = self.Queue(maxsize=MAXSIZE)
    428         child_can_start = self.Event()
    429         parent_can_continue = self.Event()
    430 
    431         proc = self.Process(
    432             target=self._test_put,
    433             args=(queue, child_can_start, parent_can_continue)
    434             )
    435         proc.daemon = True
    436         proc.start()
    437 
    438         self.assertEqual(queue_empty(queue), True)
    439         self.assertEqual(queue_full(queue, MAXSIZE), False)
    440 
    441         queue.put(1)
    442         queue.put(2, True)
    443         queue.put(3, True, None)
    444         queue.put(4, False)
    445         queue.put(5, False, None)
    446         queue.put_nowait(6)
    447 
    448         # the values may be in buffer but not yet in pipe so sleep a bit
    449         time.sleep(DELTA)
    450 
    451         self.assertEqual(queue_empty(queue), False)
    452         self.assertEqual(queue_full(queue, MAXSIZE), True)
    453 
    454         put = TimingWrapper(queue.put)
    455         put_nowait = TimingWrapper(queue.put_nowait)
    456 
    457         self.assertRaises(Queue.Full, put, 7, False)
    458         self.assertTimingAlmostEqual(put.elapsed, 0)
    459 
    460         self.assertRaises(Queue.Full, put, 7, False, None)
    461         self.assertTimingAlmostEqual(put.elapsed, 0)
    462 
    463         self.assertRaises(Queue.Full, put_nowait, 7)
    464         self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
    465 
    466         self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
    467         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
    468 
    469         self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
    470         self.assertTimingAlmostEqual(put.elapsed, 0)
    471 
    472         self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
    473         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
    474 
    475         child_can_start.set()
    476         parent_can_continue.wait()
    477 
    478         self.assertEqual(queue_empty(queue), True)
    479         self.assertEqual(queue_full(queue, MAXSIZE), False)
    480 
    481         proc.join()
    482 
    483     @classmethod
    484     def _test_get(cls, queue, child_can_start, parent_can_continue):
    485         child_can_start.wait()
    486         #queue.put(1)
    487         queue.put(2)
    488         queue.put(3)
    489         queue.put(4)
    490         queue.put(5)
    491         parent_can_continue.set()
    492 
    493     def test_get(self):
    494         queue = self.Queue()
    495         child_can_start = self.Event()
    496         parent_can_continue = self.Event()
    497 
    498         proc = self.Process(
    499             target=self._test_get,
    500             args=(queue, child_can_start, parent_can_continue)
    501             )
    502         proc.daemon = True
    503         proc.start()
    504 
    505         self.assertEqual(queue_empty(queue), True)
    506 
    507         child_can_start.set()
    508         parent_can_continue.wait()
    509 
    510         time.sleep(DELTA)
    511         self.assertEqual(queue_empty(queue), False)
    512 
    513         # Hangs unexpectedly, remove for now
    514         #self.assertEqual(queue.get(), 1)
    515         self.assertEqual(queue.get(True, None), 2)
    516         self.assertEqual(queue.get(True), 3)
    517         self.assertEqual(queue.get(timeout=1), 4)
    518         self.assertEqual(queue.get_nowait(), 5)
    519 
    520         self.assertEqual(queue_empty(queue), True)
    521 
    522         get = TimingWrapper(queue.get)
    523         get_nowait = TimingWrapper(queue.get_nowait)
    524 
    525         self.assertRaises(Queue.Empty, get, False)
    526         self.assertTimingAlmostEqual(get.elapsed, 0)
    527 
    528         self.assertRaises(Queue.Empty, get, False, None)
    529         self.assertTimingAlmostEqual(get.elapsed, 0)
    530 
    531         self.assertRaises(Queue.Empty, get_nowait)
    532         self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
    533 
    534         self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
    535         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
    536 
    537         self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
    538         self.assertTimingAlmostEqual(get.elapsed, 0)
    539 
    540         self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
    541         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
    542 
    543         proc.join()
    544 
    545     @classmethod
    546     def _test_fork(cls, queue):
    547         for i in range(10, 20):
    548             queue.put(i)
    549         # note that at this point the items may only be buffered, so the
    550         # process cannot shutdown until the feeder thread has finished
    551         # pushing items onto the pipe.
    552 
    553     def test_fork(self):
    554         # Old versions of Queue would fail to create a new feeder
    555         # thread for a forked process if the original process had its
    556         # own feeder thread.  This test checks that this no longer
    557         # happens.
    558 
    559         queue = self.Queue()
    560 
    561         # put items on queue so that main process starts a feeder thread
    562         for i in range(10):
    563             queue.put(i)
    564 
    565         # wait to make sure thread starts before we fork a new process
    566         time.sleep(DELTA)
    567 
    568         # fork process
    569         p = self.Process(target=self._test_fork, args=(queue,))
    570         p.daemon = True
    571         p.start()
    572 
    573         # check that all expected items are in the queue
    574         for i in range(20):
    575             self.assertEqual(queue.get(), i)
    576         self.assertRaises(Queue.Empty, queue.get, False)
    577 
    578         p.join()
    579 
    580     def test_qsize(self):
    581         q = self.Queue()
    582         try:
    583             self.assertEqual(q.qsize(), 0)
    584         except NotImplementedError:
    585             return
    586         q.put(1)
    587         self.assertEqual(q.qsize(), 1)
    588         q.put(5)
    589         self.assertEqual(q.qsize(), 2)
    590         q.get()
    591         self.assertEqual(q.qsize(), 1)
    592         q.get()
    593         self.assertEqual(q.qsize(), 0)
    594 
    595     @classmethod
    596     def _test_task_done(cls, q):
    597         for obj in iter(q.get, None):
    598             time.sleep(DELTA)
    599             q.task_done()
    600 
    601     def test_task_done(self):
    602         queue = self.JoinableQueue()
    603 
    604         if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
    605             self.skipTest("requires 'queue.task_done()' method")
    606 
    607         workers = [self.Process(target=self._test_task_done, args=(queue,))
    608                    for i in xrange(4)]
    609 
    610         for p in workers:
    611             p.daemon = True
    612             p.start()
    613 
    614         for i in xrange(10):
    615             queue.put(i)
    616 
    617         queue.join()
    618 
    619         for p in workers:
    620             queue.put(None)
    621 
    622         for p in workers:
    623             p.join()
    624 
    625 #
    626 #
    627 #
    628 
    629 class _TestLock(BaseTestCase):
    630 
    631     def test_lock(self):
    632         lock = self.Lock()
    633         self.assertEqual(lock.acquire(), True)
    634         self.assertEqual(lock.acquire(False), False)
    635         self.assertEqual(lock.release(), None)
    636         self.assertRaises((ValueError, threading.ThreadError), lock.release)
    637 
    638     def test_rlock(self):
    639         lock = self.RLock()
    640         self.assertEqual(lock.acquire(), True)
    641         self.assertEqual(lock.acquire(), True)
    642         self.assertEqual(lock.acquire(), True)
    643         self.assertEqual(lock.release(), None)
    644         self.assertEqual(lock.release(), None)
    645         self.assertEqual(lock.release(), None)
    646         self.assertRaises((AssertionError, RuntimeError), lock.release)
    647 
    648     def test_lock_context(self):
    649         with self.Lock():
    650             pass
    651 
    652 
    653 class _TestSemaphore(BaseTestCase):
    654 
    655     def _test_semaphore(self, sem):
    656         self.assertReturnsIfImplemented(2, get_value, sem)
    657         self.assertEqual(sem.acquire(), True)
    658         self.assertReturnsIfImplemented(1, get_value, sem)
    659         self.assertEqual(sem.acquire(), True)
    660         self.assertReturnsIfImplemented(0, get_value, sem)
    661         self.assertEqual(sem.acquire(False), False)
    662         self.assertReturnsIfImplemented(0, get_value, sem)
    663         self.assertEqual(sem.release(), None)
    664         self.assertReturnsIfImplemented(1, get_value, sem)
    665         self.assertEqual(sem.release(), None)
    666         self.assertReturnsIfImplemented(2, get_value, sem)
    667 
    668     def test_semaphore(self):
    669         sem = self.Semaphore(2)
    670         self._test_semaphore(sem)
    671         self.assertEqual(sem.release(), None)
    672         self.assertReturnsIfImplemented(3, get_value, sem)
    673         self.assertEqual(sem.release(), None)
    674         self.assertReturnsIfImplemented(4, get_value, sem)
    675 
    676     def test_bounded_semaphore(self):
    677         sem = self.BoundedSemaphore(2)
    678         self._test_semaphore(sem)
    679         # Currently fails on OS/X
    680         #if HAVE_GETVALUE:
    681         #    self.assertRaises(ValueError, sem.release)
    682         #    self.assertReturnsIfImplemented(2, get_value, sem)
    683 
    684     def test_timeout(self):
    685         if self.TYPE != 'processes':
    686             return
    687 
    688         sem = self.Semaphore(0)
    689         acquire = TimingWrapper(sem.acquire)
    690 
    691         self.assertEqual(acquire(False), False)
    692         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
    693 
    694         self.assertEqual(acquire(False, None), False)
    695         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
    696 
    697         self.assertEqual(acquire(False, TIMEOUT1), False)
    698         self.assertTimingAlmostEqual(acquire.elapsed, 0)
    699 
    700         self.assertEqual(acquire(True, TIMEOUT2), False)
    701         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
    702 
    703         self.assertEqual(acquire(timeout=TIMEOUT3), False)
    704         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
    705 
    706 
    707 class _TestCondition(BaseTestCase):
    708 
    709     @classmethod
    710     def f(cls, cond, sleeping, woken, timeout=None):
    711         cond.acquire()
    712         sleeping.release()
    713         cond.wait(timeout)
    714         woken.release()
    715         cond.release()
    716 
    717     def check_invariant(self, cond):
    718         # this is only supposed to succeed when there are no sleepers
    719         if self.TYPE == 'processes':
    720             try:
    721                 sleepers = (cond._sleeping_count.get_value() -
    722                             cond._woken_count.get_value())
    723                 self.assertEqual(sleepers, 0)
    724                 self.assertEqual(cond._wait_semaphore.get_value(), 0)
    725             except NotImplementedError:
    726                 pass
    727 
    728     def test_notify(self):
    729         cond = self.Condition()
    730         sleeping = self.Semaphore(0)
    731         woken = self.Semaphore(0)
    732 
    733         p = self.Process(target=self.f, args=(cond, sleeping, woken))
    734         p.daemon = True
    735         p.start()
    736 
    737         p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
    738         p.daemon = True
    739         p.start()
    740 
    741         # wait for both children to start sleeping
    742         sleeping.acquire()
    743         sleeping.acquire()
    744 
    745         # check no process/thread has woken up
    746         time.sleep(DELTA)
    747         self.assertReturnsIfImplemented(0, get_value, woken)
    748 
    749         # wake up one process/thread
    750         cond.acquire()
    751         cond.notify()
    752         cond.release()
    753 
    754         # check one process/thread has woken up
    755         time.sleep(DELTA)
    756         self.assertReturnsIfImplemented(1, get_value, woken)
    757 
    758         # wake up another
    759         cond.acquire()
    760         cond.notify()
    761         cond.release()
    762 
    763         # check other has woken up
    764         time.sleep(DELTA)
    765         self.assertReturnsIfImplemented(2, get_value, woken)
    766 
    767         # check state is not mucked up
    768         self.check_invariant(cond)
    769         p.join()
    770 
    771     def test_notify_all(self):
    772         cond = self.Condition()
    773         sleeping = self.Semaphore(0)
    774         woken = self.Semaphore(0)
    775 
    776         # start some threads/processes which will timeout
    777         for i in range(3):
    778             p = self.Process(target=self.f,
    779                              args=(cond, sleeping, woken, TIMEOUT1))
    780             p.daemon = True
    781             p.start()
    782 
    783             t = threading.Thread(target=self.f,
    784                                  args=(cond, sleeping, woken, TIMEOUT1))
    785             t.daemon = True
    786             t.start()
    787 
    788         # wait for them all to sleep
    789         for i in xrange(6):
    790             sleeping.acquire()
    791 
    792         # check they have all timed out
    793         for i in xrange(6):
    794             woken.acquire()
    795         self.assertReturnsIfImplemented(0, get_value, woken)
    796 
    797         # check state is not mucked up
    798         self.check_invariant(cond)
    799 
    800         # start some more threads/processes
    801         for i in range(3):
    802             p = self.Process(target=self.f, args=(cond, sleeping, woken))
    803             p.daemon = True
    804             p.start()
    805 
    806             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
    807             t.daemon = True
    808             t.start()
    809 
    810         # wait for them to all sleep
    811         for i in xrange(6):
    812             sleeping.acquire()
    813 
    814         # check no process/thread has woken up
    815         time.sleep(DELTA)
    816         self.assertReturnsIfImplemented(0, get_value, woken)
    817 
    818         # wake them all up
    819         cond.acquire()
    820         cond.notify_all()
    821         cond.release()
    822 
    823         # check they have all woken
    824         time.sleep(DELTA)
    825         self.assertReturnsIfImplemented(6, get_value, woken)
    826 
    827         # check state is not mucked up
    828         self.check_invariant(cond)
    829 
    830     def test_timeout(self):
    831         cond = self.Condition()
    832         wait = TimingWrapper(cond.wait)
    833         cond.acquire()
    834         res = wait(TIMEOUT1)
    835         cond.release()
    836         self.assertEqual(res, None)
    837         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
    838 
    839 
    840 class _TestEvent(BaseTestCase):
    841 
    842     @classmethod
    843     def _test_event(cls, event):
    844         time.sleep(TIMEOUT2)
    845         event.set()
    846 
    847     def test_event(self):
    848         event = self.Event()
    849         wait = TimingWrapper(event.wait)
    850 
    851         # Removed temporarily, due to API shear, this does not
    852         # work with threading._Event objects. is_set == isSet
    853         self.assertEqual(event.is_set(), False)
    854 
    855         # Removed, threading.Event.wait() will return the value of the __flag
    856         # instead of None. API Shear with the semaphore backed mp.Event
    857         self.assertEqual(wait(0.0), False)
    858         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    859         self.assertEqual(wait(TIMEOUT1), False)
    860         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
    861 
    862         event.set()
    863 
    864         # See note above on the API differences
    865         self.assertEqual(event.is_set(), True)
    866         self.assertEqual(wait(), True)
    867         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    868         self.assertEqual(wait(TIMEOUT1), True)
    869         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    870         # self.assertEqual(event.is_set(), True)
    871 
    872         event.clear()
    873 
    874         #self.assertEqual(event.is_set(), False)
    875 
    876         p = self.Process(target=self._test_event, args=(event,))
    877         p.daemon = True
    878         p.start()
    879         self.assertEqual(wait(), True)
    880 
    881 #
    882 #
    883 #
    884 
    885 class _TestValue(BaseTestCase):
    886 
    887     ALLOWED_TYPES = ('processes',)
    888 
    889     codes_values = [
    890         ('i', 4343, 24234),
    891         ('d', 3.625, -4.25),
    892         ('h', -232, 234),
    893         ('c', latin('x'), latin('y'))
    894         ]
    895 
    896     def setUp(self):
    897         if not HAS_SHAREDCTYPES:
    898             self.skipTest("requires multiprocessing.sharedctypes")
    899 
    900     @classmethod
    901     def _test(cls, values):
    902         for sv, cv in zip(values, cls.codes_values):
    903             sv.value = cv[2]
    904 
    905 
    906     def test_value(self, raw=False):
    907         if raw:
    908             values = [self.RawValue(code, value)
    909                       for code, value, _ in self.codes_values]
    910         else:
    911             values = [self.Value(code, value)
    912                       for code, value, _ in self.codes_values]
    913 
    914         for sv, cv in zip(values, self.codes_values):
    915             self.assertEqual(sv.value, cv[1])
    916 
    917         proc = self.Process(target=self._test, args=(values,))
    918         proc.daemon = True
    919         proc.start()
    920         proc.join()
    921 
    922         for sv, cv in zip(values, self.codes_values):
    923             self.assertEqual(sv.value, cv[2])
    924 
    925     def test_rawvalue(self):
    926         self.test_value(raw=True)
    927 
    928     def test_getobj_getlock(self):
    929         val1 = self.Value('i', 5)
    930         lock1 = val1.get_lock()
    931         obj1 = val1.get_obj()
    932 
    933         val2 = self.Value('i', 5, lock=None)
    934         lock2 = val2.get_lock()
    935         obj2 = val2.get_obj()
    936 
    937         lock = self.Lock()
    938         val3 = self.Value('i', 5, lock=lock)
    939         lock3 = val3.get_lock()
    940         obj3 = val3.get_obj()
    941         self.assertEqual(lock, lock3)
    942 
    943         arr4 = self.Value('i', 5, lock=False)
    944         self.assertFalse(hasattr(arr4, 'get_lock'))
    945         self.assertFalse(hasattr(arr4, 'get_obj'))
    946 
    947         self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
    948 
    949         arr5 = self.RawValue('i', 5)
    950         self.assertFalse(hasattr(arr5, 'get_lock'))
    951         self.assertFalse(hasattr(arr5, 'get_obj'))
    952 
    953 
    954 class _TestArray(BaseTestCase):
    955 
    956     ALLOWED_TYPES = ('processes',)
    957 
    958     @classmethod
    959     def f(cls, seq):
    960         for i in range(1, len(seq)):
    961             seq[i] += seq[i-1]
    962 
    963     @unittest.skipIf(c_int is None, "requires _ctypes")
    964     def test_array(self, raw=False):
    965         seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
    966         if raw:
    967             arr = self.RawArray('i', seq)
    968         else:
    969             arr = self.Array('i', seq)
    970 
    971         self.assertEqual(len(arr), len(seq))
    972         self.assertEqual(arr[3], seq[3])
    973         self.assertEqual(list(arr[2:7]), list(seq[2:7]))
    974 
    975         arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
    976 
    977         self.assertEqual(list(arr[:]), seq)
    978 
    979         self.f(seq)
    980 
    981         p = self.Process(target=self.f, args=(arr,))
    982         p.daemon = True
    983         p.start()
    984         p.join()
    985 
    986         self.assertEqual(list(arr[:]), seq)
    987 
    988     @unittest.skipIf(c_int is None, "requires _ctypes")
    989     def test_array_from_size(self):
    990         size = 10
    991         # Test for zeroing (see issue #11675).
    992         # The repetition below strengthens the test by increasing the chances
    993         # of previously allocated non-zero memory being used for the new array
    994         # on the 2nd and 3rd loops.
    995         for _ in range(3):
    996             arr = self.Array('i', size)
    997             self.assertEqual(len(arr), size)
    998             self.assertEqual(list(arr), [0] * size)
    999             arr[:] = range(10)
   1000             self.assertEqual(list(arr), range(10))
   1001             del arr
   1002 
   1003     @unittest.skipIf(c_int is None, "requires _ctypes")
   1004     def test_rawarray(self):
   1005         self.test_array(raw=True)
   1006 
   1007     @unittest.skipIf(c_int is None, "requires _ctypes")
   1008     def test_array_accepts_long(self):
   1009         arr = self.Array('i', 10L)
   1010         self.assertEqual(len(arr), 10)
   1011         raw_arr = self.RawArray('i', 10L)
   1012         self.assertEqual(len(raw_arr), 10)
   1013 
   1014     @unittest.skipIf(c_int is None, "requires _ctypes")
   1015     def test_getobj_getlock_obj(self):
   1016         arr1 = self.Array('i', range(10))
   1017         lock1 = arr1.get_lock()
   1018         obj1 = arr1.get_obj()
   1019 
   1020         arr2 = self.Array('i', range(10), lock=None)
   1021         lock2 = arr2.get_lock()
   1022         obj2 = arr2.get_obj()
   1023 
   1024         lock = self.Lock()
   1025         arr3 = self.Array('i', range(10), lock=lock)
   1026         lock3 = arr3.get_lock()
   1027         obj3 = arr3.get_obj()
   1028         self.assertEqual(lock, lock3)
   1029 
   1030         arr4 = self.Array('i', range(10), lock=False)
   1031         self.assertFalse(hasattr(arr4, 'get_lock'))
   1032         self.assertFalse(hasattr(arr4, 'get_obj'))
   1033         self.assertRaises(AttributeError,
   1034                           self.Array, 'i', range(10), lock='notalock')
   1035 
   1036         arr5 = self.RawArray('i', range(10))
   1037         self.assertFalse(hasattr(arr5, 'get_lock'))
   1038         self.assertFalse(hasattr(arr5, 'get_obj'))
   1039 
   1040 #
   1041 #
   1042 #
   1043 
   1044 class _TestContainers(BaseTestCase):
   1045 
   1046     ALLOWED_TYPES = ('manager',)
   1047 
   1048     def test_list(self):
   1049         a = self.list(range(10))
   1050         self.assertEqual(a[:], range(10))
   1051 
   1052         b = self.list()
   1053         self.assertEqual(b[:], [])
   1054 
   1055         b.extend(range(5))
   1056         self.assertEqual(b[:], range(5))
   1057 
   1058         self.assertEqual(b[2], 2)
   1059         self.assertEqual(b[2:10], [2,3,4])
   1060 
   1061         b *= 2
   1062         self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
   1063 
   1064         self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
   1065 
   1066         self.assertEqual(a[:], range(10))
   1067 
   1068         d = [a, b]
   1069         e = self.list(d)
   1070         self.assertEqual(
   1071             e[:],
   1072             [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
   1073             )
   1074 
   1075         f = self.list([a])
   1076         a.append('hello')
   1077         self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
   1078 
   1079     def test_dict(self):
   1080         d = self.dict()
   1081         indices = range(65, 70)
   1082         for i in indices:
   1083             d[i] = chr(i)
   1084         self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
   1085         self.assertEqual(sorted(d.keys()), indices)
   1086         self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
   1087         self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
   1088 
   1089     def test_namespace(self):
   1090         n = self.Namespace()
   1091         n.name = 'Bob'
   1092         n.job = 'Builder'
   1093         n._hidden = 'hidden'
   1094         self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
   1095         del n.job
   1096         self.assertEqual(str(n), "Namespace(name='Bob')")
   1097         self.assertTrue(hasattr(n, 'name'))
   1098         self.assertTrue(not hasattr(n, 'job'))
   1099 
   1100 #
   1101 #
   1102 #
   1103 
   1104 def sqr(x, wait=0.0):
   1105     time.sleep(wait)
   1106     return x*x
   1107 class _TestPool(BaseTestCase):
   1108 
   1109     def test_apply(self):
   1110         papply = self.pool.apply
   1111         self.assertEqual(papply(sqr, (5,)), sqr(5))
   1112         self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
   1113 
   1114     def test_map(self):
   1115         pmap = self.pool.map
   1116         self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
   1117         self.assertEqual(pmap(sqr, range(100), chunksize=20),
   1118                          map(sqr, range(100)))
   1119 
   1120     def test_map_chunksize(self):
   1121         try:
   1122             self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
   1123         except multiprocessing.TimeoutError:
   1124             self.fail("pool.map_async with chunksize stalled on null list")
   1125 
   1126     def test_async(self):
   1127         res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
   1128         get = TimingWrapper(res.get)
   1129         self.assertEqual(get(), 49)
   1130         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
   1131 
   1132     def test_async_timeout(self):
   1133         res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
   1134         get = TimingWrapper(res.get)
   1135         self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
   1136         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
   1137 
   1138     def test_imap(self):
   1139         it = self.pool.imap(sqr, range(10))
   1140         self.assertEqual(list(it), map(sqr, range(10)))
   1141 
   1142         it = self.pool.imap(sqr, range(10))
   1143         for i in range(10):
   1144             self.assertEqual(it.next(), i*i)
   1145         self.assertRaises(StopIteration, it.next)
   1146 
   1147         it = self.pool.imap(sqr, range(1000), chunksize=100)
   1148         for i in range(1000):
   1149             self.assertEqual(it.next(), i*i)
   1150         self.assertRaises(StopIteration, it.next)
   1151 
   1152     def test_imap_unordered(self):
   1153         it = self.pool.imap_unordered(sqr, range(1000))
   1154         self.assertEqual(sorted(it), map(sqr, range(1000)))
   1155 
   1156         it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
   1157         self.assertEqual(sorted(it), map(sqr, range(1000)))
   1158 
   1159     def test_make_pool(self):
   1160         self.assertRaises(ValueError, multiprocessing.Pool, -1)
   1161         self.assertRaises(ValueError, multiprocessing.Pool, 0)
   1162 
   1163         p = multiprocessing.Pool(3)
   1164         self.assertEqual(3, len(p._pool))
   1165         p.close()
   1166         p.join()
   1167 
   1168     def test_terminate(self):
   1169         if self.TYPE == 'manager':
   1170             # On Unix a forked process increfs each shared object to
   1171             # which its parent process held a reference.  If the
   1172             # forked process gets terminated then there is likely to
   1173             # be a reference leak.  So to prevent
   1174             # _TestZZZNumberOfObjects from failing we skip this test
   1175             # when using a manager.
   1176             return
   1177 
   1178         result = self.pool.map_async(
   1179             time.sleep, [0.1 for i in range(10000)], chunksize=1
   1180             )
   1181         self.pool.terminate()
   1182         join = TimingWrapper(self.pool.join)
   1183         join()
   1184         self.assertTrue(join.elapsed < 0.2)
   1185 
   1186     def test_empty_iterable(self):
   1187         # See Issue 12157
   1188         p = self.Pool(1)
   1189 
   1190         self.assertEqual(p.map(sqr, []), [])
   1191         self.assertEqual(list(p.imap(sqr, [])), [])
   1192         self.assertEqual(list(p.imap_unordered(sqr, [])), [])
   1193         self.assertEqual(p.map_async(sqr, []).get(), [])
   1194 
   1195         p.close()
   1196         p.join()
   1197 
   1198 def unpickleable_result():
   1199     return lambda: 42
   1200 
   1201 class _TestPoolWorkerErrors(BaseTestCase):
   1202     ALLOWED_TYPES = ('processes', )
   1203 
   1204     def test_unpickleable_result(self):
   1205         from multiprocessing.pool import MaybeEncodingError
   1206         p = multiprocessing.Pool(2)
   1207 
   1208         # Make sure we don't lose pool processes because of encoding errors.
   1209         for iteration in range(20):
   1210             res = p.apply_async(unpickleable_result)
   1211             self.assertRaises(MaybeEncodingError, res.get)
   1212 
   1213         p.close()
   1214         p.join()
   1215 
   1216 class _TestPoolWorkerLifetime(BaseTestCase):
   1217 
   1218     ALLOWED_TYPES = ('processes', )
   1219     def test_pool_worker_lifetime(self):
   1220         p = multiprocessing.Pool(3, maxtasksperchild=10)
   1221         self.assertEqual(3, len(p._pool))
   1222         origworkerpids = [w.pid for w in p._pool]
   1223         # Run many tasks so each worker gets replaced (hopefully)
   1224         results = []
   1225         for i in range(100):
   1226             results.append(p.apply_async(sqr, (i, )))
   1227         # Fetch the results and verify we got the right answers,
   1228         # also ensuring all the tasks have completed.
   1229         for (j, res) in enumerate(results):
   1230             self.assertEqual(res.get(), sqr(j))
   1231         # Refill the pool
   1232         p._repopulate_pool()
   1233         # Wait until all workers are alive
   1234         # (countdown * DELTA = 5 seconds max startup process time)
   1235         countdown = 50
   1236         while countdown and not all(w.is_alive() for w in p._pool):
   1237             countdown -= 1
   1238             time.sleep(DELTA)
   1239         finalworkerpids = [w.pid for w in p._pool]
   1240         # All pids should be assigned.  See issue #7805.
   1241         self.assertNotIn(None, origworkerpids)
   1242         self.assertNotIn(None, finalworkerpids)
   1243         # Finally, check that the worker pids have changed
   1244         self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
   1245         p.close()
   1246         p.join()
   1247 
   1248     def test_pool_worker_lifetime_early_close(self):
   1249         # Issue #10332: closing a pool whose workers have limited lifetimes
   1250         # before all the tasks completed would make join() hang.
   1251         p = multiprocessing.Pool(3, maxtasksperchild=1)
   1252         results = []
   1253         for i in range(6):
   1254             results.append(p.apply_async(sqr, (i, 0.3)))
   1255         p.close()
   1256         p.join()
   1257         # check the results
   1258         for (j, res) in enumerate(results):
   1259             self.assertEqual(res.get(), sqr(j))
   1260 
   1261 
   1262 #
   1263 # Test that manager has expected number of shared objects left
   1264 #
   1265 
   1266 class _TestZZZNumberOfObjects(BaseTestCase):
   1267     # Because test cases are sorted alphabetically, this one will get
   1268     # run after all the other tests for the manager.  It tests that
   1269     # there have been no "reference leaks" for the manager's shared
   1270     # objects.  Note the comment in _TestPool.test_terminate().
   1271     ALLOWED_TYPES = ('manager',)
   1272 
   1273     def test_number_of_objects(self):
   1274         EXPECTED_NUMBER = 1                # the pool object is still alive
   1275         multiprocessing.active_children()  # discard dead process objs
   1276         gc.collect()                       # do garbage collection
   1277         refs = self.manager._number_of_objects()
   1278         debug_info = self.manager._debug_info()
   1279         if refs != EXPECTED_NUMBER:
   1280             print self.manager._debug_info()
   1281             print debug_info
   1282 
   1283         self.assertEqual(refs, EXPECTED_NUMBER)
   1284 
   1285 #
   1286 # Test of creating a customized manager class
   1287 #
   1288 
   1289 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
   1290 
   1291 class FooBar(object):
   1292     def f(self):
   1293         return 'f()'
   1294     def g(self):
   1295         raise ValueError
   1296     def _h(self):
   1297         return '_h()'
   1298 
   1299 def baz():
   1300     for i in xrange(10):
   1301         yield i*i
   1302 
   1303 class IteratorProxy(BaseProxy):
   1304     _exposed_ = ('next', '__next__')
   1305     def __iter__(self):
   1306         return self
   1307     def next(self):
   1308         return self._callmethod('next')
   1309     def __next__(self):
   1310         return self._callmethod('__next__')
   1311 
   1312 class MyManager(BaseManager):
   1313     pass
   1314 
   1315 MyManager.register('Foo', callable=FooBar)
   1316 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
   1317 MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
   1318 
   1319 
   1320 class _TestMyManager(BaseTestCase):
   1321 
   1322     ALLOWED_TYPES = ('manager',)
   1323 
   1324     def test_mymanager(self):
   1325         manager = MyManager()
   1326         manager.start()
   1327 
   1328         foo = manager.Foo()
   1329         bar = manager.Bar()
   1330         baz = manager.baz()
   1331 
   1332         foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
   1333         bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
   1334 
   1335         self.assertEqual(foo_methods, ['f', 'g'])
   1336         self.assertEqual(bar_methods, ['f', '_h'])
   1337 
   1338         self.assertEqual(foo.f(), 'f()')
   1339         self.assertRaises(ValueError, foo.g)
   1340         self.assertEqual(foo._callmethod('f'), 'f()')
   1341         self.assertRaises(RemoteError, foo._callmethod, '_h')
   1342 
   1343         self.assertEqual(bar.f(), 'f()')
   1344         self.assertEqual(bar._h(), '_h()')
   1345         self.assertEqual(bar._callmethod('f'), 'f()')
   1346         self.assertEqual(bar._callmethod('_h'), '_h()')
   1347 
   1348         self.assertEqual(list(baz), [i*i for i in range(10)])
   1349 
   1350         manager.shutdown()
   1351 
   1352 #
   1353 # Test of connecting to a remote server and using xmlrpclib for serialization
   1354 #
   1355 
   1356 _queue = Queue.Queue()
   1357 def get_queue():
   1358     return _queue
   1359 
   1360 class QueueManager(BaseManager):
   1361     '''manager class used by server process'''
   1362 QueueManager.register('get_queue', callable=get_queue)
   1363 
   1364 class QueueManager2(BaseManager):
   1365     '''manager class which specifies the same interface as QueueManager'''
   1366 QueueManager2.register('get_queue')
   1367 
   1368 
   1369 SERIALIZER = 'xmlrpclib'
   1370 
   1371 class _TestRemoteManager(BaseTestCase):
   1372 
   1373     ALLOWED_TYPES = ('manager',)
   1374 
   1375     @classmethod
   1376     def _putter(cls, address, authkey):
   1377         manager = QueueManager2(
   1378             address=address, authkey=authkey, serializer=SERIALIZER
   1379             )
   1380         manager.connect()
   1381         queue = manager.get_queue()
   1382         queue.put(('hello world', None, True, 2.25))
   1383 
   1384     def test_remote(self):
   1385         authkey = os.urandom(32)
   1386 
   1387         manager = QueueManager(
   1388             address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
   1389             )
   1390         manager.start()
   1391 
   1392         p = self.Process(target=self._putter, args=(manager.address, authkey))
   1393         p.daemon = True
   1394         p.start()
   1395 
   1396         manager2 = QueueManager2(
   1397             address=manager.address, authkey=authkey, serializer=SERIALIZER
   1398             )
   1399         manager2.connect()
   1400         queue = manager2.get_queue()
   1401 
   1402         # Note that xmlrpclib will deserialize object as a list not a tuple
   1403         self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
   1404 
   1405         # Because we are using xmlrpclib for serialization instead of
   1406         # pickle this will cause a serialization error.
   1407         self.assertRaises(Exception, queue.put, time.sleep)
   1408 
   1409         # Make queue finalizer run before the server is stopped
   1410         del queue
   1411         manager.shutdown()
   1412 
   1413 class _TestManagerRestart(BaseTestCase):
   1414 
   1415     @classmethod
   1416     def _putter(cls, address, authkey):
   1417         manager = QueueManager(
   1418             address=address, authkey=authkey, serializer=SERIALIZER)
   1419         manager.connect()
   1420         queue = manager.get_queue()
   1421         queue.put('hello world')
   1422 
   1423     def test_rapid_restart(self):
   1424         authkey = os.urandom(32)
   1425         manager = QueueManager(
   1426             address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
   1427         srvr = manager.get_server()
   1428         addr = srvr.address
   1429         # Close the connection.Listener socket which gets opened as a part
   1430         # of manager.get_server(). It's not needed for the test.
   1431         srvr.listener.close()
   1432         manager.start()
   1433 
   1434         p = self.Process(target=self._putter, args=(manager.address, authkey))
   1435         p.daemon = True
   1436         p.start()
   1437         queue = manager.get_queue()
   1438         self.assertEqual(queue.get(), 'hello world')
   1439         del queue
   1440         manager.shutdown()
   1441         manager = QueueManager(
   1442             address=addr, authkey=authkey, serializer=SERIALIZER)
   1443         manager.start()
   1444         manager.shutdown()
   1445 
   1446 #
   1447 #
   1448 #
   1449 
   1450 SENTINEL = latin('')
   1451 
   1452 class _TestConnection(BaseTestCase):
   1453 
   1454     ALLOWED_TYPES = ('processes', 'threads')
   1455 
   1456     @classmethod
   1457     def _echo(cls, conn):
   1458         for msg in iter(conn.recv_bytes, SENTINEL):
   1459             conn.send_bytes(msg)
   1460         conn.close()
   1461 
   1462     def test_connection(self):
   1463         conn, child_conn = self.Pipe()
   1464 
   1465         p = self.Process(target=self._echo, args=(child_conn,))
   1466         p.daemon = True
   1467         p.start()
   1468 
   1469         seq = [1, 2.25, None]
   1470         msg = latin('hello world')
   1471         longmsg = msg * 10
   1472         arr = array.array('i', range(4))
   1473 
   1474         if self.TYPE == 'processes':
   1475             self.assertEqual(type(conn.fileno()), int)
   1476 
   1477         self.assertEqual(conn.send(seq), None)
   1478         self.assertEqual(conn.recv(), seq)
   1479 
   1480         self.assertEqual(conn.send_bytes(msg), None)
   1481         self.assertEqual(conn.recv_bytes(), msg)
   1482 
   1483         if self.TYPE == 'processes':
   1484             buffer = array.array('i', [0]*10)
   1485             expected = list(arr) + [0] * (10 - len(arr))
   1486             self.assertEqual(conn.send_bytes(arr), None)
   1487             self.assertEqual(conn.recv_bytes_into(buffer),
   1488                              len(arr) * buffer.itemsize)
   1489             self.assertEqual(list(buffer), expected)
   1490 
   1491             buffer = array.array('i', [0]*10)
   1492             expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
   1493             self.assertEqual(conn.send_bytes(arr), None)
   1494             self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
   1495                              len(arr) * buffer.itemsize)
   1496             self.assertEqual(list(buffer), expected)
   1497 
   1498             buffer = bytearray(latin(' ' * 40))
   1499             self.assertEqual(conn.send_bytes(longmsg), None)
   1500             try:
   1501                 res = conn.recv_bytes_into(buffer)
   1502             except multiprocessing.BufferTooShort, e:
   1503                 self.assertEqual(e.args, (longmsg,))
   1504             else:
   1505                 self.fail('expected BufferTooShort, got %s' % res)
   1506 
   1507         poll = TimingWrapper(conn.poll)
   1508 
   1509         self.assertEqual(poll(), False)
   1510         self.assertTimingAlmostEqual(poll.elapsed, 0)
   1511 
   1512         self.assertEqual(poll(TIMEOUT1), False)
   1513         self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
   1514 
   1515         conn.send(None)
   1516         time.sleep(.1)
   1517 
   1518         self.assertEqual(poll(TIMEOUT1), True)
   1519         self.assertTimingAlmostEqual(poll.elapsed, 0)
   1520 
   1521         self.assertEqual(conn.recv(), None)
   1522 
   1523         really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
   1524         conn.send_bytes(really_big_msg)
   1525         self.assertEqual(conn.recv_bytes(), really_big_msg)
   1526 
   1527         conn.send_bytes(SENTINEL)                          # tell child to quit
   1528         child_conn.close()
   1529 
   1530         if self.TYPE == 'processes':
   1531             self.assertEqual(conn.readable, True)
   1532             self.assertEqual(conn.writable, True)
   1533             self.assertRaises(EOFError, conn.recv)
   1534             self.assertRaises(EOFError, conn.recv_bytes)
   1535 
   1536         p.join()
   1537 
   1538     def test_duplex_false(self):
   1539         reader, writer = self.Pipe(duplex=False)
   1540         self.assertEqual(writer.send(1), None)
   1541         self.assertEqual(reader.recv(), 1)
   1542         if self.TYPE == 'processes':
   1543             self.assertEqual(reader.readable, True)
   1544             self.assertEqual(reader.writable, False)
   1545             self.assertEqual(writer.readable, False)
   1546             self.assertEqual(writer.writable, True)
   1547             self.assertRaises(IOError, reader.send, 2)
   1548             self.assertRaises(IOError, writer.recv)
   1549             self.assertRaises(IOError, writer.poll)
   1550 
   1551     def test_spawn_close(self):
   1552         # We test that a pipe connection can be closed by parent
   1553         # process immediately after child is spawned.  On Windows this
   1554         # would have sometimes failed on old versions because
   1555         # child_conn would be closed before the child got a chance to
   1556         # duplicate it.
   1557         conn, child_conn = self.Pipe()
   1558 
   1559         p = self.Process(target=self._echo, args=(child_conn,))
   1560         p.daemon = True
   1561         p.start()
   1562         child_conn.close()    # this might complete before child initializes
   1563 
   1564         msg = latin('hello')
   1565         conn.send_bytes(msg)
   1566         self.assertEqual(conn.recv_bytes(), msg)
   1567 
   1568         conn.send_bytes(SENTINEL)
   1569         conn.close()
   1570         p.join()
   1571 
   1572     def test_sendbytes(self):
   1573         if self.TYPE != 'processes':
   1574             return
   1575 
   1576         msg = latin('abcdefghijklmnopqrstuvwxyz')
   1577         a, b = self.Pipe()
   1578 
   1579         a.send_bytes(msg)
   1580         self.assertEqual(b.recv_bytes(), msg)
   1581 
   1582         a.send_bytes(msg, 5)
   1583         self.assertEqual(b.recv_bytes(), msg[5:])
   1584 
   1585         a.send_bytes(msg, 7, 8)
   1586         self.assertEqual(b.recv_bytes(), msg[7:7+8])
   1587 
   1588         a.send_bytes(msg, 26)
   1589         self.assertEqual(b.recv_bytes(), latin(''))
   1590 
   1591         a.send_bytes(msg, 26, 0)
   1592         self.assertEqual(b.recv_bytes(), latin(''))
   1593 
   1594         self.assertRaises(ValueError, a.send_bytes, msg, 27)
   1595 
   1596         self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
   1597 
   1598         self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
   1599 
   1600         self.assertRaises(ValueError, a.send_bytes, msg, -1)
   1601 
   1602         self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
   1603 
   1604     @classmethod
   1605     def _is_fd_assigned(cls, fd):
   1606         try:
   1607             os.fstat(fd)
   1608         except OSError as e:
   1609             if e.errno == errno.EBADF:
   1610                 return False
   1611             raise
   1612         else:
   1613             return True
   1614 
   1615     @classmethod
   1616     def _writefd(cls, conn, data, create_dummy_fds=False):
   1617         if create_dummy_fds:
   1618             for i in range(0, 256):
   1619                 if not cls._is_fd_assigned(i):
   1620                     os.dup2(conn.fileno(), i)
   1621         fd = reduction.recv_handle(conn)
   1622         if msvcrt:
   1623             fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
   1624         os.write(fd, data)
   1625         os.close(fd)
   1626 
   1627     @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
   1628     def test_fd_transfer(self):
   1629         if self.TYPE != 'processes':
   1630             self.skipTest("only makes sense with processes")
   1631         conn, child_conn = self.Pipe(duplex=True)
   1632 
   1633         p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
   1634         p.daemon = True
   1635         p.start()
   1636         with open(test_support.TESTFN, "wb") as f:
   1637             fd = f.fileno()
   1638             if msvcrt:
   1639                 fd = msvcrt.get_osfhandle(fd)
   1640             reduction.send_handle(conn, fd, p.pid)
   1641         p.join()
   1642         with open(test_support.TESTFN, "rb") as f:
   1643             self.assertEqual(f.read(), b"foo")
   1644 
   1645     @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
   1646     @unittest.skipIf(sys.platform == "win32",
   1647                      "test semantics don't make sense on Windows")
   1648     @unittest.skipIf(MAXFD <= 256,
   1649                      "largest assignable fd number is too small")
   1650     @unittest.skipUnless(hasattr(os, "dup2"),
   1651                          "test needs os.dup2()")
   1652     def test_large_fd_transfer(self):
   1653         # With fd > 256 (issue #11657)
   1654         if self.TYPE != 'processes':
   1655             self.skipTest("only makes sense with processes")
   1656         conn, child_conn = self.Pipe(duplex=True)
   1657 
   1658         p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
   1659         p.daemon = True
   1660         p.start()
   1661         with open(test_support.TESTFN, "wb") as f:
   1662             fd = f.fileno()
   1663             for newfd in range(256, MAXFD):
   1664                 if not self._is_fd_assigned(newfd):
   1665                     break
   1666             else:
   1667                 self.fail("could not find an unassigned large file descriptor")
   1668             os.dup2(fd, newfd)
   1669             try:
   1670                 reduction.send_handle(conn, newfd, p.pid)
   1671             finally:
   1672                 os.close(newfd)
   1673         p.join()
   1674         with open(test_support.TESTFN, "rb") as f:
   1675             self.assertEqual(f.read(), b"bar")
   1676 
   1677     @classmethod
   1678     def _send_data_without_fd(self, conn):
   1679         os.write(conn.fileno(), b"\0")
   1680 
   1681     @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
   1682     @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
   1683     def test_missing_fd_transfer(self):
   1684         # Check that exception is raised when received data is not
   1685         # accompanied by a file descriptor in ancillary data.
   1686         if self.TYPE != 'processes':
   1687             self.skipTest("only makes sense with processes")
   1688         conn, child_conn = self.Pipe(duplex=True)
   1689 
   1690         p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
   1691         p.daemon = True
   1692         p.start()
   1693         self.assertRaises(RuntimeError, reduction.recv_handle, conn)
   1694         p.join()
   1695 
   1696 class _TestListenerClient(BaseTestCase):
   1697 
   1698     ALLOWED_TYPES = ('processes', 'threads')
   1699 
   1700     @classmethod
   1701     def _test(cls, address):
   1702         conn = cls.connection.Client(address)
   1703         conn.send('hello')
   1704         conn.close()
   1705 
   1706     def test_listener_client(self):
   1707         for family in self.connection.families:
   1708             l = self.connection.Listener(family=family)
   1709             p = self.Process(target=self._test, args=(l.address,))
   1710             p.daemon = True
   1711             p.start()
   1712             conn = l.accept()
   1713             self.assertEqual(conn.recv(), 'hello')
   1714             p.join()
   1715             l.close()
   1716 
   1717     def test_issue14725(self):
   1718         l = self.connection.Listener()
   1719         p = self.Process(target=self._test, args=(l.address,))
   1720         p.daemon = True
   1721         p.start()
   1722         time.sleep(1)
   1723         # On Windows the client process should by now have connected,
   1724         # written data and closed the pipe handle by now.  This causes
   1725         # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
   1726         # 14725.
   1727         conn = l.accept()
   1728         self.assertEqual(conn.recv(), 'hello')
   1729         conn.close()
   1730         p.join()
   1731         l.close()
   1732 
   1733 #
   1734 # Test of sending connection and socket objects between processes
   1735 #
   1736 """
   1737 class _TestPicklingConnections(BaseTestCase):
   1738 
   1739     ALLOWED_TYPES = ('processes',)
   1740 
   1741     def _listener(self, conn, families):
   1742         for fam in families:
   1743             l = self.connection.Listener(family=fam)
   1744             conn.send(l.address)
   1745             new_conn = l.accept()
   1746             conn.send(new_conn)
   1747 
   1748         if self.TYPE == 'processes':
   1749             l = socket.socket()
   1750             l.bind(('localhost', 0))
   1751             conn.send(l.getsockname())
   1752             l.listen(1)
   1753             new_conn, addr = l.accept()
   1754             conn.send(new_conn)
   1755 
   1756         conn.recv()
   1757 
   1758     def _remote(self, conn):
   1759         for (address, msg) in iter(conn.recv, None):
   1760             client = self.connection.Client(address)
   1761             client.send(msg.upper())
   1762             client.close()
   1763 
   1764         if self.TYPE == 'processes':
   1765             address, msg = conn.recv()
   1766             client = socket.socket()
   1767             client.connect(address)
   1768             client.sendall(msg.upper())
   1769             client.close()
   1770 
   1771         conn.close()
   1772 
   1773     def test_pickling(self):
   1774         try:
   1775             multiprocessing.allow_connection_pickling()
   1776         except ImportError:
   1777             return
   1778 
   1779         families = self.connection.families
   1780 
   1781         lconn, lconn0 = self.Pipe()
   1782         lp = self.Process(target=self._listener, args=(lconn0, families))
   1783         lp.daemon = True
   1784         lp.start()
   1785         lconn0.close()
   1786 
   1787         rconn, rconn0 = self.Pipe()
   1788         rp = self.Process(target=self._remote, args=(rconn0,))
   1789         rp.daemon = True
   1790         rp.start()
   1791         rconn0.close()
   1792 
   1793         for fam in families:
   1794             msg = ('This connection uses family %s' % fam).encode('ascii')
   1795             address = lconn.recv()
   1796             rconn.send((address, msg))
   1797             new_conn = lconn.recv()
   1798             self.assertEqual(new_conn.recv(), msg.upper())
   1799 
   1800         rconn.send(None)
   1801 
   1802         if self.TYPE == 'processes':
   1803             msg = latin('This connection uses a normal socket')
   1804             address = lconn.recv()
   1805             rconn.send((address, msg))
   1806             if hasattr(socket, 'fromfd'):
   1807                 new_conn = lconn.recv()
   1808                 self.assertEqual(new_conn.recv(100), msg.upper())
   1809             else:
   1810                 # XXX On Windows with Py2.6 need to backport fromfd()
   1811                 discard = lconn.recv_bytes()
   1812 
   1813         lconn.send(None)
   1814 
   1815         rconn.close()
   1816         lconn.close()
   1817 
   1818         lp.join()
   1819         rp.join()
   1820 """
   1821 #
   1822 #
   1823 #
   1824 
   1825 class _TestHeap(BaseTestCase):
   1826 
   1827     ALLOWED_TYPES = ('processes',)
   1828 
   1829     def test_heap(self):
   1830         iterations = 5000
   1831         maxblocks = 50
   1832         blocks = []
   1833 
   1834         # create and destroy lots of blocks of different sizes
   1835         for i in xrange(iterations):
   1836             size = int(random.lognormvariate(0, 1) * 1000)
   1837             b = multiprocessing.heap.BufferWrapper(size)
   1838             blocks.append(b)
   1839             if len(blocks) > maxblocks:
   1840                 i = random.randrange(maxblocks)
   1841                 del blocks[i]
   1842 
   1843         # get the heap object
   1844         heap = multiprocessing.heap.BufferWrapper._heap
   1845 
   1846         # verify the state of the heap
   1847         all = []
   1848         occupied = 0
   1849         heap._lock.acquire()
   1850         self.addCleanup(heap._lock.release)
   1851         for L in heap._len_to_seq.values():
   1852             for arena, start, stop in L:
   1853                 all.append((heap._arenas.index(arena), start, stop,
   1854                             stop-start, 'free'))
   1855         for arena, start, stop in heap._allocated_blocks:
   1856             all.append((heap._arenas.index(arena), start, stop,
   1857                         stop-start, 'occupied'))
   1858             occupied += (stop-start)
   1859 
   1860         all.sort()
   1861 
   1862         for i in range(len(all)-1):
   1863             (arena, start, stop) = all[i][:3]
   1864             (narena, nstart, nstop) = all[i+1][:3]
   1865             self.assertTrue((arena != narena and nstart == 0) or
   1866                             (stop == nstart))
   1867 
   1868     def test_free_from_gc(self):
   1869         # Check that freeing of blocks by the garbage collector doesn't deadlock
   1870         # (issue #12352).
   1871         # Make sure the GC is enabled, and set lower collection thresholds to
   1872         # make collections more frequent (and increase the probability of
   1873         # deadlock).
   1874         if not gc.isenabled():
   1875             gc.enable()
   1876             self.addCleanup(gc.disable)
   1877         thresholds = gc.get_threshold()
   1878         self.addCleanup(gc.set_threshold, *thresholds)
   1879         gc.set_threshold(10)
   1880 
   1881         # perform numerous block allocations, with cyclic references to make
   1882         # sure objects are collected asynchronously by the gc
   1883         for i in range(5000):
   1884             a = multiprocessing.heap.BufferWrapper(1)
   1885             b = multiprocessing.heap.BufferWrapper(1)
   1886             # circular references
   1887             a.buddy = b
   1888             b.buddy = a
   1889 
   1890 #
   1891 #
   1892 #
   1893 
   1894 class _Foo(Structure):
   1895     _fields_ = [
   1896         ('x', c_int),
   1897         ('y', c_double)
   1898         ]
   1899 
   1900 class _TestSharedCTypes(BaseTestCase):
   1901 
   1902     ALLOWED_TYPES = ('processes',)
   1903 
   1904     def setUp(self):
   1905         if not HAS_SHAREDCTYPES:
   1906             self.skipTest("requires multiprocessing.sharedctypes")
   1907 
   1908     @classmethod
   1909     def _double(cls, x, y, foo, arr, string):
   1910         x.value *= 2
   1911         y.value *= 2
   1912         foo.x *= 2
   1913         foo.y *= 2
   1914         string.value *= 2
   1915         for i in range(len(arr)):
   1916             arr[i] *= 2
   1917 
   1918     def test_sharedctypes(self, lock=False):
   1919         x = Value('i', 7, lock=lock)
   1920         y = Value(c_double, 1.0/3.0, lock=lock)
   1921         foo = Value(_Foo, 3, 2, lock=lock)
   1922         arr = self.Array('d', range(10), lock=lock)
   1923         string = self.Array('c', 20, lock=lock)
   1924         string.value = latin('hello')
   1925 
   1926         p = self.Process(target=self._double, args=(x, y, foo, arr, string))
   1927         p.daemon = True
   1928         p.start()
   1929         p.join()
   1930 
   1931         self.assertEqual(x.value, 14)
   1932         self.assertAlmostEqual(y.value, 2.0/3.0)
   1933         self.assertEqual(foo.x, 6)
   1934         self.assertAlmostEqual(foo.y, 4.0)
   1935         for i in range(10):
   1936             self.assertAlmostEqual(arr[i], i*2)
   1937         self.assertEqual(string.value, latin('hellohello'))
   1938 
   1939     def test_synchronize(self):
   1940         self.test_sharedctypes(lock=True)
   1941 
   1942     def test_copy(self):
   1943         foo = _Foo(2, 5.0)
   1944         bar = copy(foo)
   1945         foo.x = 0
   1946         foo.y = 0
   1947         self.assertEqual(bar.x, 2)
   1948         self.assertAlmostEqual(bar.y, 5.0)
   1949 
   1950 #
   1951 #
   1952 #
   1953 
   1954 class _TestFinalize(BaseTestCase):
   1955 
   1956     ALLOWED_TYPES = ('processes',)
   1957 
   1958     @classmethod
   1959     def _test_finalize(cls, conn):
   1960         class Foo(object):
   1961             pass
   1962 
   1963         a = Foo()
   1964         util.Finalize(a, conn.send, args=('a',))
   1965         del a           # triggers callback for a
   1966 
   1967         b = Foo()
   1968         close_b = util.Finalize(b, conn.send, args=('b',))
   1969         close_b()       # triggers callback for b
   1970         close_b()       # does nothing because callback has already been called
   1971         del b           # does nothing because callback has already been called
   1972 
   1973         c = Foo()
   1974         util.Finalize(c, conn.send, args=('c',))
   1975 
   1976         d10 = Foo()
   1977         util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
   1978 
   1979         d01 = Foo()
   1980         util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
   1981         d02 = Foo()
   1982         util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
   1983         d03 = Foo()
   1984         util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
   1985 
   1986         util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
   1987 
   1988         util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
   1989 
   1990         # call multiprocessing's cleanup function then exit process without
   1991         # garbage collecting locals
   1992         util._exit_function()
   1993         conn.close()
   1994         os._exit(0)
   1995 
   1996     def test_finalize(self):
   1997         conn, child_conn = self.Pipe()
   1998 
   1999         p = self.Process(target=self._test_finalize, args=(child_conn,))
   2000         p.daemon = True
   2001         p.start()
   2002         p.join()
   2003 
   2004         result = [obj for obj in iter(conn.recv, 'STOP')]
   2005         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
   2006 
   2007 #
   2008 # Test that from ... import * works for each module
   2009 #
   2010 
   2011 class _TestImportStar(BaseTestCase):
   2012 
   2013     ALLOWED_TYPES = ('processes',)
   2014 
   2015     def test_import(self):
   2016         modules = [
   2017             'multiprocessing', 'multiprocessing.connection',
   2018             'multiprocessing.heap', 'multiprocessing.managers',
   2019             'multiprocessing.pool', 'multiprocessing.process',
   2020             'multiprocessing.synchronize', 'multiprocessing.util'
   2021             ]
   2022 
   2023         if HAS_REDUCTION:
   2024             modules.append('multiprocessing.reduction')
   2025 
   2026         if c_int is not None:
   2027             # This module requires _ctypes
   2028             modules.append('multiprocessing.sharedctypes')
   2029 
   2030         for name in modules:
   2031             __import__(name)
   2032             mod = sys.modules[name]
   2033 
   2034             for attr in getattr(mod, '__all__', ()):
   2035                 self.assertTrue(
   2036                     hasattr(mod, attr),
   2037                     '%r does not have attribute %r' % (mod, attr)
   2038                     )
   2039 
   2040 #
   2041 # Quick test that logging works -- does not test logging output
   2042 #
   2043 
   2044 class _TestLogging(BaseTestCase):
   2045 
   2046     ALLOWED_TYPES = ('processes',)
   2047 
   2048     def test_enable_logging(self):
   2049         logger = multiprocessing.get_logger()
   2050         logger.setLevel(util.SUBWARNING)
   2051         self.assertTrue(logger is not None)
   2052         logger.debug('this will not be printed')
   2053         logger.info('nor will this')
   2054         logger.setLevel(LOG_LEVEL)
   2055 
   2056     @classmethod
   2057     def _test_level(cls, conn):
   2058         logger = multiprocessing.get_logger()
   2059         conn.send(logger.getEffectiveLevel())
   2060 
   2061     def test_level(self):
   2062         LEVEL1 = 32
   2063         LEVEL2 = 37
   2064 
   2065         logger = multiprocessing.get_logger()
   2066         root_logger = logging.getLogger()
   2067         root_level = root_logger.level
   2068 
   2069         reader, writer = multiprocessing.Pipe(duplex=False)
   2070 
   2071         logger.setLevel(LEVEL1)
   2072         p = self.Process(target=self._test_level, args=(writer,))
   2073         p.daemon = True
   2074         p.start()
   2075         self.assertEqual(LEVEL1, reader.recv())
   2076 
   2077         logger.setLevel(logging.NOTSET)
   2078         root_logger.setLevel(LEVEL2)
   2079         p = self.Process(target=self._test_level, args=(writer,))
   2080         p.daemon = True
   2081         p.start()
   2082         self.assertEqual(LEVEL2, reader.recv())
   2083 
   2084         root_logger.setLevel(root_level)
   2085         logger.setLevel(level=LOG_LEVEL)
   2086 
   2087 
   2088 # class _TestLoggingProcessName(BaseTestCase):
   2089 #
   2090 #     def handle(self, record):
   2091 #         assert record.processName == multiprocessing.current_process().name
   2092 #         self.__handled = True
   2093 #
   2094 #     def test_logging(self):
   2095 #         handler = logging.Handler()
   2096 #         handler.handle = self.handle
   2097 #         self.__handled = False
   2098 #         # Bypass getLogger() and side-effects
   2099 #         logger = logging.getLoggerClass()(
   2100 #                 'multiprocessing.test.TestLoggingProcessName')
   2101 #         logger.addHandler(handler)
   2102 #         logger.propagate = False
   2103 #
   2104 #         logger.warn('foo')
   2105 #         assert self.__handled
   2106 
   2107 #
   2108 # Check that Process.join() retries if os.waitpid() fails with EINTR
   2109 #
   2110 
   2111 class _TestPollEintr(BaseTestCase):
   2112 
   2113     ALLOWED_TYPES = ('processes',)
   2114 
   2115     @classmethod
   2116     def _killer(cls, pid):
   2117         time.sleep(0.5)
   2118         os.kill(pid, signal.SIGUSR1)
   2119 
   2120     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
   2121     def test_poll_eintr(self):
   2122         got_signal = [False]
   2123         def record(*args):
   2124             got_signal[0] = True
   2125         pid = os.getpid()
   2126         oldhandler = signal.signal(signal.SIGUSR1, record)
   2127         try:
   2128             killer = self.Process(target=self._killer, args=(pid,))
   2129             killer.start()
   2130             p = self.Process(target=time.sleep, args=(1,))
   2131             p.start()
   2132             p.join()
   2133             self.assertTrue(got_signal[0])
   2134             self.assertEqual(p.exitcode, 0)
   2135             killer.join()
   2136         finally:
   2137             signal.signal(signal.SIGUSR1, oldhandler)
   2138 
   2139 #
   2140 # Test to verify handle verification, see issue 3321
   2141 #
   2142 
   2143 class TestInvalidHandle(unittest.TestCase):
   2144 
   2145     @unittest.skipIf(WIN32, "skipped on Windows")
   2146     def test_invalid_handles(self):
   2147         conn = _multiprocessing.Connection(44977608)
   2148         self.assertRaises(IOError, conn.poll)
   2149         self.assertRaises(IOError, _multiprocessing.Connection, -1)
   2150 
   2151 #
   2152 # Functions used to create test cases from the base ones in this module
   2153 #
   2154 
   2155 def get_attributes(Source, names):
   2156     d = {}
   2157     for name in names:
   2158         obj = getattr(Source, name)
   2159         if type(obj) == type(get_attributes):
   2160             obj = staticmethod(obj)
   2161         d[name] = obj
   2162     return d
   2163 
   2164 def create_test_cases(Mixin, type):
   2165     result = {}
   2166     glob = globals()
   2167     Type = type.capitalize()
   2168 
   2169     for name in glob.keys():
   2170         if name.startswith('_Test'):
   2171             base = glob[name]
   2172             if type in base.ALLOWED_TYPES:
   2173                 newname = 'With' + Type + name[1:]
   2174                 class Temp(base, unittest.TestCase, Mixin):
   2175                     pass
   2176                 result[newname] = Temp
   2177                 Temp.__name__ = newname
   2178                 Temp.__module__ = Mixin.__module__
   2179     return result
   2180 
   2181 #
   2182 # Create test cases
   2183 #
   2184 
   2185 class ProcessesMixin(object):
   2186     TYPE = 'processes'
   2187     Process = multiprocessing.Process
   2188     locals().update(get_attributes(multiprocessing, (
   2189         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
   2190         'Condition', 'Event', 'Value', 'Array', 'RawValue',
   2191         'RawArray', 'current_process', 'active_children', 'Pipe',
   2192         'connection', 'JoinableQueue', 'Pool'
   2193         )))
   2194 
   2195 testcases_processes = create_test_cases(ProcessesMixin, type='processes')
   2196 globals().update(testcases_processes)
   2197 
   2198 
   2199 class ManagerMixin(object):
   2200     TYPE = 'manager'
   2201     Process = multiprocessing.Process
   2202     manager = object.__new__(multiprocessing.managers.SyncManager)
   2203     locals().update(get_attributes(manager, (
   2204         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
   2205        'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
   2206         'Namespace', 'JoinableQueue', 'Pool'
   2207         )))
   2208 
   2209 testcases_manager = create_test_cases(ManagerMixin, type='manager')
   2210 globals().update(testcases_manager)
   2211 
   2212 
   2213 class ThreadsMixin(object):
   2214     TYPE = 'threads'
   2215     Process = multiprocessing.dummy.Process
   2216     locals().update(get_attributes(multiprocessing.dummy, (
   2217         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
   2218         'Condition', 'Event', 'Value', 'Array', 'current_process',
   2219         'active_children', 'Pipe', 'connection', 'dict', 'list',
   2220         'Namespace', 'JoinableQueue', 'Pool'
   2221         )))
   2222 
   2223 testcases_threads = create_test_cases(ThreadsMixin, type='threads')
   2224 globals().update(testcases_threads)
   2225 
   2226 class OtherTest(unittest.TestCase):
   2227     # TODO: add more tests for deliver/answer challenge.
   2228     def test_deliver_challenge_auth_failure(self):
   2229         class _FakeConnection(object):
   2230             def recv_bytes(self, size):
   2231                 return b'something bogus'
   2232             def send_bytes(self, data):
   2233                 pass
   2234         self.assertRaises(multiprocessing.AuthenticationError,
   2235                           multiprocessing.connection.deliver_challenge,
   2236                           _FakeConnection(), b'abc')
   2237 
   2238     def test_answer_challenge_auth_failure(self):
   2239         class _FakeConnection(object):
   2240             def __init__(self):
   2241                 self.count = 0
   2242             def recv_bytes(self, size):
   2243                 self.count += 1
   2244                 if self.count == 1:
   2245                     return multiprocessing.connection.CHALLENGE
   2246                 elif self.count == 2:
   2247                     return b'something bogus'
   2248                 return b''
   2249             def send_bytes(self, data):
   2250                 pass
   2251         self.assertRaises(multiprocessing.AuthenticationError,
   2252                           multiprocessing.connection.answer_challenge,
   2253                           _FakeConnection(), b'abc')
   2254 
   2255 #
   2256 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
   2257 #
   2258 
   2259 def initializer(ns):
   2260     ns.test += 1
   2261 
   2262 class TestInitializers(unittest.TestCase):
   2263     def setUp(self):
   2264         self.mgr = multiprocessing.Manager()
   2265         self.ns = self.mgr.Namespace()
   2266         self.ns.test = 0
   2267 
   2268     def tearDown(self):
   2269         self.mgr.shutdown()
   2270 
   2271     def test_manager_initializer(self):
   2272         m = multiprocessing.managers.SyncManager()
   2273         self.assertRaises(TypeError, m.start, 1)
   2274         m.start(initializer, (self.ns,))
   2275         self.assertEqual(self.ns.test, 1)
   2276         m.shutdown()
   2277 
   2278     def test_pool_initializer(self):
   2279         self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
   2280         p = multiprocessing.Pool(1, initializer, (self.ns,))
   2281         p.close()
   2282         p.join()
   2283         self.assertEqual(self.ns.test, 1)
   2284 
   2285 #
   2286 # Issue 5155, 5313, 5331: Test process in processes
   2287 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
   2288 #
   2289 
   2290 def _ThisSubProcess(q):
   2291     try:
   2292         item = q.get(block=False)
   2293     except Queue.Empty:
   2294         pass
   2295 
   2296 def _TestProcess(q):
   2297     queue = multiprocessing.Queue()
   2298     subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
   2299     subProc.daemon = True
   2300     subProc.start()
   2301     subProc.join()
   2302 
   2303 def _afunc(x):
   2304     return x*x
   2305 
   2306 def pool_in_process():
   2307     pool = multiprocessing.Pool(processes=4)
   2308     x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
   2309 
   2310 class _file_like(object):
   2311     def __init__(self, delegate):
   2312         self._delegate = delegate
   2313         self._pid = None
   2314 
   2315     @property
   2316     def cache(self):
   2317         pid = os.getpid()
   2318         # There are no race conditions since fork keeps only the running thread
   2319         if pid != self._pid:
   2320             self._pid = pid
   2321             self._cache = []
   2322         return self._cache
   2323 
   2324     def write(self, data):
   2325         self.cache.append(data)
   2326 
   2327     def flush(self):
   2328         self._delegate.write(''.join(self.cache))
   2329         self._cache = []
   2330 
   2331 class TestStdinBadfiledescriptor(unittest.TestCase):
   2332 
   2333     def test_queue_in_process(self):
   2334         queue = multiprocessing.Queue()
   2335         proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
   2336         proc.start()
   2337         proc.join()
   2338 
   2339     def test_pool_in_process(self):
   2340         p = multiprocessing.Process(target=pool_in_process)
   2341         p.start()
   2342         p.join()
   2343 
   2344     def test_flushing(self):
   2345         sio = StringIO()
   2346         flike = _file_like(sio)
   2347         flike.write('foo')
   2348         proc = multiprocessing.Process(target=lambda: flike.flush())
   2349         flike.flush()
   2350         assert sio.getvalue() == 'foo'
   2351 
   2352 #
   2353 # Test interaction with socket timeouts - see Issue #6056
   2354 #
   2355 
   2356 class TestTimeouts(unittest.TestCase):
   2357     @classmethod
   2358     def _test_timeout(cls, child, address):
   2359         time.sleep(1)
   2360         child.send(123)
   2361         child.close()
   2362         conn = multiprocessing.connection.Client(address)
   2363         conn.send(456)
   2364         conn.close()
   2365 
   2366     def test_timeout(self):
   2367         old_timeout = socket.getdefaulttimeout()
   2368         try:
   2369             socket.setdefaulttimeout(0.1)
   2370             parent, child = multiprocessing.Pipe(duplex=True)
   2371             l = multiprocessing.connection.Listener(family='AF_INET')
   2372             p = multiprocessing.Process(target=self._test_timeout,
   2373                                         args=(child, l.address))
   2374             p.start()
   2375             child.close()
   2376             self.assertEqual(parent.recv(), 123)
   2377             parent.close()
   2378             conn = l.accept()
   2379             self.assertEqual(conn.recv(), 456)
   2380             conn.close()
   2381             l.close()
   2382             p.join(10)
   2383         finally:
   2384             socket.setdefaulttimeout(old_timeout)
   2385 
   2386 #
   2387 # Test what happens with no "if __name__ == '__main__'"
   2388 #
   2389 
   2390 class TestNoForkBomb(unittest.TestCase):
   2391     def test_noforkbomb(self):
   2392         name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
   2393         if WIN32:
   2394             rc, out, err = test.script_helper.assert_python_failure(name)
   2395             self.assertEqual('', out.decode('ascii'))
   2396             self.assertIn('RuntimeError', err.decode('ascii'))
   2397         else:
   2398             rc, out, err = test.script_helper.assert_python_ok(name)
   2399             self.assertEqual('123', out.decode('ascii').rstrip())
   2400             self.assertEqual('', err.decode('ascii'))
   2401 
   2402 #
   2403 # Issue 12098: check sys.flags of child matches that for parent
   2404 #
   2405 
   2406 class TestFlags(unittest.TestCase):
   2407     @classmethod
   2408     def run_in_grandchild(cls, conn):
   2409         conn.send(tuple(sys.flags))
   2410 
   2411     @classmethod
   2412     def run_in_child(cls):
   2413         import json
   2414         r, w = multiprocessing.Pipe(duplex=False)
   2415         p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
   2416         p.start()
   2417         grandchild_flags = r.recv()
   2418         p.join()
   2419         r.close()
   2420         w.close()
   2421         flags = (tuple(sys.flags), grandchild_flags)
   2422         print(json.dumps(flags))
   2423 
   2424     def test_flags(self):
   2425         import json, subprocess
   2426         # start child process using unusual flags
   2427         prog = ('from test.test_multiprocessing import TestFlags; ' +
   2428                 'TestFlags.run_in_child()')
   2429         data = subprocess.check_output(
   2430             [sys.executable, '-E', '-B', '-O', '-c', prog])
   2431         child_flags, grandchild_flags = json.loads(data.decode('ascii'))
   2432         self.assertEqual(child_flags, grandchild_flags)
   2433 
   2434 #
   2435 # Issue #17555: ForkAwareThreadLock
   2436 #
   2437 
   2438 class TestForkAwareThreadLock(unittest.TestCase):
   2439     # We recurisvely start processes.  Issue #17555 meant that the
   2440     # after fork registry would get duplicate entries for the same
   2441     # lock.  The size of the registry at generation n was ~2**n.
   2442 
   2443     @classmethod
   2444     def child(cls, n, conn):
   2445         if n > 1:
   2446             p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
   2447             p.start()
   2448             p.join()
   2449         else:
   2450             conn.send(len(util._afterfork_registry))
   2451         conn.close()
   2452 
   2453     def test_lock(self):
   2454         r, w = multiprocessing.Pipe(False)
   2455         l = util.ForkAwareThreadLock()
   2456         old_size = len(util._afterfork_registry)
   2457         p = multiprocessing.Process(target=self.child, args=(5, w))
   2458         p.start()
   2459         new_size = r.recv()
   2460         p.join()
   2461         self.assertLessEqual(new_size, old_size)
   2462 
   2463 #
   2464 #
   2465 #
   2466 
   2467 testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
   2468                    TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
   2469                    TestFlags, TestForkAwareThreadLock]
   2470 
   2471 #
   2472 #
   2473 #
   2474 
   2475 def test_main(run=None):
   2476     if sys.platform.startswith("linux"):
   2477         try:
   2478             lock = multiprocessing.RLock()
   2479         except OSError:
   2480             raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
   2481 
   2482     check_enough_semaphores()
   2483 
   2484     if run is None:
   2485         from test.test_support import run_unittest as run
   2486 
   2487     util.get_temp_dir()     # creates temp directory for use by all processes
   2488 
   2489     multiprocessing.get_logger().setLevel(LOG_LEVEL)
   2490 
   2491     ProcessesMixin.pool = multiprocessing.Pool(4)
   2492     ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
   2493     ManagerMixin.manager.__init__()
   2494     ManagerMixin.manager.start()
   2495     ManagerMixin.pool = ManagerMixin.manager.Pool(4)
   2496 
   2497     testcases = (
   2498         sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
   2499         sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
   2500         sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
   2501         testcases_other
   2502         )
   2503 
   2504     loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
   2505     suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
   2506     # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
   2507     # module during these tests is at least platform dependent and possibly
   2508     # non-deterministic on any given platform. So we don't mind if the listed
   2509     # warnings aren't actually raised.
   2510     with test_support.check_py3k_warnings(
   2511             (".+__(get|set)slice__ has been removed", DeprecationWarning),
   2512             (r"sys.exc_clear\(\) not supported", DeprecationWarning),
   2513             quiet=True):
   2514         run(suite)
   2515 
   2516     ThreadsMixin.pool.terminate()
   2517     ProcessesMixin.pool.terminate()
   2518     ManagerMixin.pool.terminate()
   2519     ManagerMixin.manager.shutdown()
   2520 
   2521     del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
   2522 
   2523 def main():
   2524     test_main(unittest.TextTestRunner(verbosity=2).run)
   2525 
   2526 if __name__ == '__main__':
   2527     main()
   2528