Home | History | Annotate | Download | only in test
      1 #
      2 # Unit tests for the multiprocessing package
      3 #
      4 
      5 import unittest
      6 import Queue
      7 import time
      8 import sys
      9 import os
     10 import gc
     11 import signal
     12 import array
     13 import socket
     14 import random
     15 import logging
     16 import errno
     17 import test.script_helper
     18 from test import test_support
     19 from StringIO import StringIO
     20 _multiprocessing = test_support.import_module('_multiprocessing')
     21 # import threading after _multiprocessing to raise a more relevant error
     22 # message: "No module named _multiprocessing". _multiprocessing is not compiled
     23 # without thread support.
     24 import threading
     25 
     26 # Work around broken sem_open implementations
     27 test_support.import_module('multiprocessing.synchronize')
     28 
     29 import multiprocessing.dummy
     30 import multiprocessing.connection
     31 import multiprocessing.managers
     32 import multiprocessing.heap
     33 import multiprocessing.pool
     34 
     35 from multiprocessing import util
     36 
     37 try:
     38     from multiprocessing import reduction
     39     HAS_REDUCTION = True
     40 except ImportError:
     41     HAS_REDUCTION = False
     42 
     43 try:
     44     from multiprocessing.sharedctypes import Value, copy
     45     HAS_SHAREDCTYPES = True
     46 except ImportError:
     47     HAS_SHAREDCTYPES = False
     48 
     49 try:
     50     import msvcrt
     51 except ImportError:
     52     msvcrt = None
     53 
     54 #
     55 #
     56 #
     57 
     58 latin = str
     59 
     60 #
     61 # Constants
     62 #
     63 
     64 LOG_LEVEL = util.SUBWARNING
     65 #LOG_LEVEL = logging.DEBUG
     66 
     67 DELTA = 0.1
     68 CHECK_TIMINGS = False     # making true makes tests take a lot longer
     69                           # and can sometimes cause some non-serious
     70                           # failures because some calls block a bit
     71                           # longer than expected
     72 if CHECK_TIMINGS:
     73     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
     74 else:
     75     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
     76 
     77 HAVE_GETVALUE = not getattr(_multiprocessing,
     78                             'HAVE_BROKEN_SEM_GETVALUE', False)
     79 
     80 WIN32 = (sys.platform == "win32")
     81 
     82 try:
     83     MAXFD = os.sysconf("SC_OPEN_MAX")
     84 except:
     85     MAXFD = 256
     86 
     87 #
     88 # Some tests require ctypes
     89 #
     90 
     91 try:
     92     from ctypes import Structure, c_int, c_double
     93 except ImportError:
     94     Structure = object
     95     c_int = c_double = None
     96 
     97 
     98 def check_enough_semaphores():
     99     """Check that the system supports enough semaphores to run the test."""
    100     # minimum number of semaphores available according to POSIX
    101     nsems_min = 256
    102     try:
    103         nsems = os.sysconf("SC_SEM_NSEMS_MAX")
    104     except (AttributeError, ValueError):
    105         # sysconf not available or setting not available
    106         return
    107     if nsems == -1 or nsems >= nsems_min:
    108         return
    109     raise unittest.SkipTest("The OS doesn't support enough semaphores "
    110                             "to run the test (required: %d)." % nsems_min)
    111 
    112 
    113 #
    114 # Creates a wrapper for a function which records the time it takes to finish
    115 #
    116 
    117 class TimingWrapper(object):
    118 
    119     def __init__(self, func):
    120         self.func = func
    121         self.elapsed = None
    122 
    123     def __call__(self, *args, **kwds):
    124         t = time.time()
    125         try:
    126             return self.func(*args, **kwds)
    127         finally:
    128             self.elapsed = time.time() - t
    129 
    130 #
    131 # Base class for test cases
    132 #
    133 
    134 class BaseTestCase(object):
    135 
    136     ALLOWED_TYPES = ('processes', 'manager', 'threads')
    137 
    138     def assertTimingAlmostEqual(self, a, b):
    139         if CHECK_TIMINGS:
    140             self.assertAlmostEqual(a, b, 1)
    141 
    142     def assertReturnsIfImplemented(self, value, func, *args):
    143         try:
    144             res = func(*args)
    145         except NotImplementedError:
    146             pass
    147         else:
    148             return self.assertEqual(value, res)
    149 
    150     # For the sanity of Windows users, rather than crashing or freezing in
    151     # multiple ways.
    152     def __reduce__(self, *args):
    153         raise NotImplementedError("shouldn't try to pickle a test case")
    154 
    155     __reduce_ex__ = __reduce__
    156 
    157 #
    158 # Return the value of a semaphore
    159 #
    160 
    161 def get_value(self):
    162     try:
    163         return self.get_value()
    164     except AttributeError:
    165         try:
    166             return self._Semaphore__value
    167         except AttributeError:
    168             try:
    169                 return self._value
    170             except AttributeError:
    171                 raise NotImplementedError
    172 
    173 #
    174 # Testcases
    175 #
    176 
    177 class _TestProcess(BaseTestCase):
    178 
    179     ALLOWED_TYPES = ('processes', 'threads')
    180 
    181     def test_current(self):
    182         if self.TYPE == 'threads':
    183             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    184 
    185         current = self.current_process()
    186         authkey = current.authkey
    187 
    188         self.assertTrue(current.is_alive())
    189         self.assertTrue(not current.daemon)
    190         self.assertIsInstance(authkey, bytes)
    191         self.assertTrue(len(authkey) > 0)
    192         self.assertEqual(current.ident, os.getpid())
    193         self.assertEqual(current.exitcode, None)
    194 
    195     @classmethod
    196     def _test(cls, q, *args, **kwds):
    197         current = cls.current_process()
    198         q.put(args)
    199         q.put(kwds)
    200         q.put(current.name)
    201         if cls.TYPE != 'threads':
    202             q.put(bytes(current.authkey))
    203             q.put(current.pid)
    204 
    205     def test_process(self):
    206         q = self.Queue(1)
    207         e = self.Event()
    208         args = (q, 1, 2)
    209         kwargs = {'hello':23, 'bye':2.54}
    210         name = 'SomeProcess'
    211         p = self.Process(
    212             target=self._test, args=args, kwargs=kwargs, name=name
    213             )
    214         p.daemon = True
    215         current = self.current_process()
    216 
    217         if self.TYPE != 'threads':
    218             self.assertEqual(p.authkey, current.authkey)
    219         self.assertEqual(p.is_alive(), False)
    220         self.assertEqual(p.daemon, True)
    221         self.assertNotIn(p, self.active_children())
    222         self.assertTrue(type(self.active_children()) is list)
    223         self.assertEqual(p.exitcode, None)
    224 
    225         p.start()
    226 
    227         self.assertEqual(p.exitcode, None)
    228         self.assertEqual(p.is_alive(), True)
    229         self.assertIn(p, self.active_children())
    230 
    231         self.assertEqual(q.get(), args[1:])
    232         self.assertEqual(q.get(), kwargs)
    233         self.assertEqual(q.get(), p.name)
    234         if self.TYPE != 'threads':
    235             self.assertEqual(q.get(), current.authkey)
    236             self.assertEqual(q.get(), p.pid)
    237 
    238         p.join()
    239 
    240         self.assertEqual(p.exitcode, 0)
    241         self.assertEqual(p.is_alive(), False)
    242         self.assertNotIn(p, self.active_children())
    243 
    244     @classmethod
    245     def _test_terminate(cls):
    246         time.sleep(1000)
    247 
    248     def test_terminate(self):
    249         if self.TYPE == 'threads':
    250             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    251 
    252         p = self.Process(target=self._test_terminate)
    253         p.daemon = True
    254         p.start()
    255 
    256         self.assertEqual(p.is_alive(), True)
    257         self.assertIn(p, self.active_children())
    258         self.assertEqual(p.exitcode, None)
    259 
    260         p.terminate()
    261 
    262         join = TimingWrapper(p.join)
    263         self.assertEqual(join(), None)
    264         self.assertTimingAlmostEqual(join.elapsed, 0.0)
    265 
    266         self.assertEqual(p.is_alive(), False)
    267         self.assertNotIn(p, self.active_children())
    268 
    269         p.join()
    270 
    271         # XXX sometimes get p.exitcode == 0 on Windows ...
    272         #self.assertEqual(p.exitcode, -signal.SIGTERM)
    273 
    274     def test_cpu_count(self):
    275         try:
    276             cpus = multiprocessing.cpu_count()
    277         except NotImplementedError:
    278             cpus = 1
    279         self.assertTrue(type(cpus) is int)
    280         self.assertTrue(cpus >= 1)
    281 
    282     def test_active_children(self):
    283         self.assertEqual(type(self.active_children()), list)
    284 
    285         p = self.Process(target=time.sleep, args=(DELTA,))
    286         self.assertNotIn(p, self.active_children())
    287 
    288         p.daemon = True
    289         p.start()
    290         self.assertIn(p, self.active_children())
    291 
    292         p.join()
    293         self.assertNotIn(p, self.active_children())
    294 
    295     @classmethod
    296     def _test_recursion(cls, wconn, id):
    297         from multiprocessing import forking
    298         wconn.send(id)
    299         if len(id) < 2:
    300             for i in range(2):
    301                 p = cls.Process(
    302                     target=cls._test_recursion, args=(wconn, id+[i])
    303                     )
    304                 p.start()
    305                 p.join()
    306 
    307     def test_recursion(self):
    308         rconn, wconn = self.Pipe(duplex=False)
    309         self._test_recursion(wconn, [])
    310 
    311         time.sleep(DELTA)
    312         result = []
    313         while rconn.poll():
    314             result.append(rconn.recv())
    315 
    316         expected = [
    317             [],
    318               [0],
    319                 [0, 0],
    320                 [0, 1],
    321               [1],
    322                 [1, 0],
    323                 [1, 1]
    324             ]
    325         self.assertEqual(result, expected)
    326 
    327     @classmethod
    328     def _test_sys_exit(cls, reason, testfn):
    329         sys.stderr = open(testfn, 'w')
    330         sys.exit(reason)
    331 
    332     def test_sys_exit(self):
    333         # See Issue 13854
    334         if self.TYPE == 'threads':
    335             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    336 
    337         testfn = test_support.TESTFN
    338         self.addCleanup(test_support.unlink, testfn)
    339 
    340         for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
    341             p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
    342             p.daemon = True
    343             p.start()
    344             p.join(5)
    345             self.assertEqual(p.exitcode, code)
    346 
    347             with open(testfn, 'r') as f:
    348                 self.assertEqual(f.read().rstrip(), str(reason))
    349 
    350         for reason in (True, False, 8):
    351             p = self.Process(target=sys.exit, args=(reason,))
    352             p.daemon = True
    353             p.start()
    354             p.join(5)
    355             self.assertEqual(p.exitcode, reason)
    356 
    357 #
    358 #
    359 #
    360 
    361 class _UpperCaser(multiprocessing.Process):
    362 
    363     def __init__(self):
    364         multiprocessing.Process.__init__(self)
    365         self.child_conn, self.parent_conn = multiprocessing.Pipe()
    366 
    367     def run(self):
    368         self.parent_conn.close()
    369         for s in iter(self.child_conn.recv, None):
    370             self.child_conn.send(s.upper())
    371         self.child_conn.close()
    372 
    373     def submit(self, s):
    374         assert type(s) is str
    375         self.parent_conn.send(s)
    376         return self.parent_conn.recv()
    377 
    378     def stop(self):
    379         self.parent_conn.send(None)
    380         self.parent_conn.close()
    381         self.child_conn.close()
    382 
    383 class _TestSubclassingProcess(BaseTestCase):
    384 
    385     ALLOWED_TYPES = ('processes',)
    386 
    387     def test_subclassing(self):
    388         uppercaser = _UpperCaser()
    389         uppercaser.daemon = True
    390         uppercaser.start()
    391         self.assertEqual(uppercaser.submit('hello'), 'HELLO')
    392         self.assertEqual(uppercaser.submit('world'), 'WORLD')
    393         uppercaser.stop()
    394         uppercaser.join()
    395 
    396 #
    397 #
    398 #
    399 
    400 def queue_empty(q):
    401     if hasattr(q, 'empty'):
    402         return q.empty()
    403     else:
    404         return q.qsize() == 0
    405 
    406 def queue_full(q, maxsize):
    407     if hasattr(q, 'full'):
    408         return q.full()
    409     else:
    410         return q.qsize() == maxsize
    411 
    412 
    413 class _TestQueue(BaseTestCase):
    414 
    415 
    416     @classmethod
    417     def _test_put(cls, queue, child_can_start, parent_can_continue):
    418         child_can_start.wait()
    419         for i in range(6):
    420             queue.get()
    421         parent_can_continue.set()
    422 
    423     def test_put(self):
    424         MAXSIZE = 6
    425         queue = self.Queue(maxsize=MAXSIZE)
    426         child_can_start = self.Event()
    427         parent_can_continue = self.Event()
    428 
    429         proc = self.Process(
    430             target=self._test_put,
    431             args=(queue, child_can_start, parent_can_continue)
    432             )
    433         proc.daemon = True
    434         proc.start()
    435 
    436         self.assertEqual(queue_empty(queue), True)
    437         self.assertEqual(queue_full(queue, MAXSIZE), False)
    438 
    439         queue.put(1)
    440         queue.put(2, True)
    441         queue.put(3, True, None)
    442         queue.put(4, False)
    443         queue.put(5, False, None)
    444         queue.put_nowait(6)
    445 
    446         # the values may be in buffer but not yet in pipe so sleep a bit
    447         time.sleep(DELTA)
    448 
    449         self.assertEqual(queue_empty(queue), False)
    450         self.assertEqual(queue_full(queue, MAXSIZE), True)
    451 
    452         put = TimingWrapper(queue.put)
    453         put_nowait = TimingWrapper(queue.put_nowait)
    454 
    455         self.assertRaises(Queue.Full, put, 7, False)
    456         self.assertTimingAlmostEqual(put.elapsed, 0)
    457 
    458         self.assertRaises(Queue.Full, put, 7, False, None)
    459         self.assertTimingAlmostEqual(put.elapsed, 0)
    460 
    461         self.assertRaises(Queue.Full, put_nowait, 7)
    462         self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
    463 
    464         self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
    465         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
    466 
    467         self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
    468         self.assertTimingAlmostEqual(put.elapsed, 0)
    469 
    470         self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
    471         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
    472 
    473         child_can_start.set()
    474         parent_can_continue.wait()
    475 
    476         self.assertEqual(queue_empty(queue), True)
    477         self.assertEqual(queue_full(queue, MAXSIZE), False)
    478 
    479         proc.join()
    480 
    481     @classmethod
    482     def _test_get(cls, queue, child_can_start, parent_can_continue):
    483         child_can_start.wait()
    484         #queue.put(1)
    485         queue.put(2)
    486         queue.put(3)
    487         queue.put(4)
    488         queue.put(5)
    489         parent_can_continue.set()
    490 
    491     def test_get(self):
    492         queue = self.Queue()
    493         child_can_start = self.Event()
    494         parent_can_continue = self.Event()
    495 
    496         proc = self.Process(
    497             target=self._test_get,
    498             args=(queue, child_can_start, parent_can_continue)
    499             )
    500         proc.daemon = True
    501         proc.start()
    502 
    503         self.assertEqual(queue_empty(queue), True)
    504 
    505         child_can_start.set()
    506         parent_can_continue.wait()
    507 
    508         time.sleep(DELTA)
    509         self.assertEqual(queue_empty(queue), False)
    510 
    511         # Hangs unexpectedly, remove for now
    512         #self.assertEqual(queue.get(), 1)
    513         self.assertEqual(queue.get(True, None), 2)
    514         self.assertEqual(queue.get(True), 3)
    515         self.assertEqual(queue.get(timeout=1), 4)
    516         self.assertEqual(queue.get_nowait(), 5)
    517 
    518         self.assertEqual(queue_empty(queue), True)
    519 
    520         get = TimingWrapper(queue.get)
    521         get_nowait = TimingWrapper(queue.get_nowait)
    522 
    523         self.assertRaises(Queue.Empty, get, False)
    524         self.assertTimingAlmostEqual(get.elapsed, 0)
    525 
    526         self.assertRaises(Queue.Empty, get, False, None)
    527         self.assertTimingAlmostEqual(get.elapsed, 0)
    528 
    529         self.assertRaises(Queue.Empty, get_nowait)
    530         self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
    531 
    532         self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
    533         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
    534 
    535         self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
    536         self.assertTimingAlmostEqual(get.elapsed, 0)
    537 
    538         self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
    539         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
    540 
    541         proc.join()
    542 
    543     @classmethod
    544     def _test_fork(cls, queue):
    545         for i in range(10, 20):
    546             queue.put(i)
    547         # note that at this point the items may only be buffered, so the
    548         # process cannot shutdown until the feeder thread has finished
    549         # pushing items onto the pipe.
    550 
    551     def test_fork(self):
    552         # Old versions of Queue would fail to create a new feeder
    553         # thread for a forked process if the original process had its
    554         # own feeder thread.  This test checks that this no longer
    555         # happens.
    556 
    557         queue = self.Queue()
    558 
    559         # put items on queue so that main process starts a feeder thread
    560         for i in range(10):
    561             queue.put(i)
    562 
    563         # wait to make sure thread starts before we fork a new process
    564         time.sleep(DELTA)
    565 
    566         # fork process
    567         p = self.Process(target=self._test_fork, args=(queue,))
    568         p.daemon = True
    569         p.start()
    570 
    571         # check that all expected items are in the queue
    572         for i in range(20):
    573             self.assertEqual(queue.get(), i)
    574         self.assertRaises(Queue.Empty, queue.get, False)
    575 
    576         p.join()
    577 
    578     def test_qsize(self):
    579         q = self.Queue()
    580         try:
    581             self.assertEqual(q.qsize(), 0)
    582         except NotImplementedError:
    583             self.skipTest('qsize method not implemented')
    584         q.put(1)
    585         self.assertEqual(q.qsize(), 1)
    586         q.put(5)
    587         self.assertEqual(q.qsize(), 2)
    588         q.get()
    589         self.assertEqual(q.qsize(), 1)
    590         q.get()
    591         self.assertEqual(q.qsize(), 0)
    592 
    593     @classmethod
    594     def _test_task_done(cls, q):
    595         for obj in iter(q.get, None):
    596             time.sleep(DELTA)
    597             q.task_done()
    598 
    599     def test_task_done(self):
    600         queue = self.JoinableQueue()
    601 
    602         if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
    603             self.skipTest("requires 'queue.task_done()' method")
    604 
    605         workers = [self.Process(target=self._test_task_done, args=(queue,))
    606                    for i in xrange(4)]
    607 
    608         for p in workers:
    609             p.daemon = True
    610             p.start()
    611 
    612         for i in xrange(10):
    613             queue.put(i)
    614 
    615         queue.join()
    616 
    617         for p in workers:
    618             queue.put(None)
    619 
    620         for p in workers:
    621             p.join()
    622 
    623     def test_no_import_lock_contention(self):
    624         with test_support.temp_cwd():
    625             module_name = 'imported_by_an_imported_module'
    626             with open(module_name + '.py', 'w') as f:
    627                 f.write("""if 1:
    628                     import multiprocessing
    629 
    630                     q = multiprocessing.Queue()
    631                     q.put('knock knock')
    632                     q.get(timeout=3)
    633                     q.close()
    634                 """)
    635 
    636             with test_support.DirsOnSysPath(os.getcwd()):
    637                 try:
    638                     __import__(module_name)
    639                 except Queue.Empty:
    640                     self.fail("Probable regression on import lock contention;"
    641                               " see Issue #22853")
    642 
    643 #
    644 #
    645 #
    646 
    647 class _TestLock(BaseTestCase):
    648 
    649     def test_lock(self):
    650         lock = self.Lock()
    651         self.assertEqual(lock.acquire(), True)
    652         self.assertEqual(lock.acquire(False), False)
    653         self.assertEqual(lock.release(), None)
    654         self.assertRaises((ValueError, threading.ThreadError), lock.release)
    655 
    656     def test_rlock(self):
    657         lock = self.RLock()
    658         self.assertEqual(lock.acquire(), True)
    659         self.assertEqual(lock.acquire(), True)
    660         self.assertEqual(lock.acquire(), True)
    661         self.assertEqual(lock.release(), None)
    662         self.assertEqual(lock.release(), None)
    663         self.assertEqual(lock.release(), None)
    664         self.assertRaises((AssertionError, RuntimeError), lock.release)
    665 
    666     def test_lock_context(self):
    667         with self.Lock():
    668             pass
    669 
    670 
    671 class _TestSemaphore(BaseTestCase):
    672 
    673     def _test_semaphore(self, sem):
    674         self.assertReturnsIfImplemented(2, get_value, sem)
    675         self.assertEqual(sem.acquire(), True)
    676         self.assertReturnsIfImplemented(1, get_value, sem)
    677         self.assertEqual(sem.acquire(), True)
    678         self.assertReturnsIfImplemented(0, get_value, sem)
    679         self.assertEqual(sem.acquire(False), False)
    680         self.assertReturnsIfImplemented(0, get_value, sem)
    681         self.assertEqual(sem.release(), None)
    682         self.assertReturnsIfImplemented(1, get_value, sem)
    683         self.assertEqual(sem.release(), None)
    684         self.assertReturnsIfImplemented(2, get_value, sem)
    685 
    686     def test_semaphore(self):
    687         sem = self.Semaphore(2)
    688         self._test_semaphore(sem)
    689         self.assertEqual(sem.release(), None)
    690         self.assertReturnsIfImplemented(3, get_value, sem)
    691         self.assertEqual(sem.release(), None)
    692         self.assertReturnsIfImplemented(4, get_value, sem)
    693 
    694     def test_bounded_semaphore(self):
    695         sem = self.BoundedSemaphore(2)
    696         self._test_semaphore(sem)
    697         # Currently fails on OS/X
    698         #if HAVE_GETVALUE:
    699         #    self.assertRaises(ValueError, sem.release)
    700         #    self.assertReturnsIfImplemented(2, get_value, sem)
    701 
    702     def test_timeout(self):
    703         if self.TYPE != 'processes':
    704             self.skipTest('test not appropriate for {}'.format(self.TYPE))
    705 
    706         sem = self.Semaphore(0)
    707         acquire = TimingWrapper(sem.acquire)
    708 
    709         self.assertEqual(acquire(False), False)
    710         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
    711 
    712         self.assertEqual(acquire(False, None), False)
    713         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
    714 
    715         self.assertEqual(acquire(False, TIMEOUT1), False)
    716         self.assertTimingAlmostEqual(acquire.elapsed, 0)
    717 
    718         self.assertEqual(acquire(True, TIMEOUT2), False)
    719         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
    720 
    721         self.assertEqual(acquire(timeout=TIMEOUT3), False)
    722         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
    723 
    724 
    725 class _TestCondition(BaseTestCase):
    726 
    727     @classmethod
    728     def f(cls, cond, sleeping, woken, timeout=None):
    729         cond.acquire()
    730         sleeping.release()
    731         cond.wait(timeout)
    732         woken.release()
    733         cond.release()
    734 
    735     def check_invariant(self, cond):
    736         # this is only supposed to succeed when there are no sleepers
    737         if self.TYPE == 'processes':
    738             try:
    739                 sleepers = (cond._sleeping_count.get_value() -
    740                             cond._woken_count.get_value())
    741                 self.assertEqual(sleepers, 0)
    742                 self.assertEqual(cond._wait_semaphore.get_value(), 0)
    743             except NotImplementedError:
    744                 pass
    745 
    746     def test_notify(self):
    747         cond = self.Condition()
    748         sleeping = self.Semaphore(0)
    749         woken = self.Semaphore(0)
    750 
    751         p = self.Process(target=self.f, args=(cond, sleeping, woken))
    752         p.daemon = True
    753         p.start()
    754 
    755         p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
    756         p.daemon = True
    757         p.start()
    758 
    759         # wait for both children to start sleeping
    760         sleeping.acquire()
    761         sleeping.acquire()
    762 
    763         # check no process/thread has woken up
    764         time.sleep(DELTA)
    765         self.assertReturnsIfImplemented(0, get_value, woken)
    766 
    767         # wake up one process/thread
    768         cond.acquire()
    769         cond.notify()
    770         cond.release()
    771 
    772         # check one process/thread has woken up
    773         time.sleep(DELTA)
    774         self.assertReturnsIfImplemented(1, get_value, woken)
    775 
    776         # wake up another
    777         cond.acquire()
    778         cond.notify()
    779         cond.release()
    780 
    781         # check other has woken up
    782         time.sleep(DELTA)
    783         self.assertReturnsIfImplemented(2, get_value, woken)
    784 
    785         # check state is not mucked up
    786         self.check_invariant(cond)
    787         p.join()
    788 
    789     def test_notify_all(self):
    790         cond = self.Condition()
    791         sleeping = self.Semaphore(0)
    792         woken = self.Semaphore(0)
    793 
    794         # start some threads/processes which will timeout
    795         for i in range(3):
    796             p = self.Process(target=self.f,
    797                              args=(cond, sleeping, woken, TIMEOUT1))
    798             p.daemon = True
    799             p.start()
    800 
    801             t = threading.Thread(target=self.f,
    802                                  args=(cond, sleeping, woken, TIMEOUT1))
    803             t.daemon = True
    804             t.start()
    805 
    806         # wait for them all to sleep
    807         for i in xrange(6):
    808             sleeping.acquire()
    809 
    810         # check they have all timed out
    811         for i in xrange(6):
    812             woken.acquire()
    813         self.assertReturnsIfImplemented(0, get_value, woken)
    814 
    815         # check state is not mucked up
    816         self.check_invariant(cond)
    817 
    818         # start some more threads/processes
    819         for i in range(3):
    820             p = self.Process(target=self.f, args=(cond, sleeping, woken))
    821             p.daemon = True
    822             p.start()
    823 
    824             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
    825             t.daemon = True
    826             t.start()
    827 
    828         # wait for them to all sleep
    829         for i in xrange(6):
    830             sleeping.acquire()
    831 
    832         # check no process/thread has woken up
    833         time.sleep(DELTA)
    834         self.assertReturnsIfImplemented(0, get_value, woken)
    835 
    836         # wake them all up
    837         cond.acquire()
    838         cond.notify_all()
    839         cond.release()
    840 
    841         # check they have all woken
    842         time.sleep(DELTA)
    843         self.assertReturnsIfImplemented(6, get_value, woken)
    844 
    845         # check state is not mucked up
    846         self.check_invariant(cond)
    847 
    848     def test_timeout(self):
    849         cond = self.Condition()
    850         wait = TimingWrapper(cond.wait)
    851         cond.acquire()
    852         res = wait(TIMEOUT1)
    853         cond.release()
    854         self.assertEqual(res, None)
    855         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
    856 
    857 
    858 class _TestEvent(BaseTestCase):
    859 
    860     @classmethod
    861     def _test_event(cls, event):
    862         time.sleep(TIMEOUT2)
    863         event.set()
    864 
    865     def test_event(self):
    866         event = self.Event()
    867         wait = TimingWrapper(event.wait)
    868 
    869         # Removed temporarily, due to API shear, this does not
    870         # work with threading._Event objects. is_set == isSet
    871         self.assertEqual(event.is_set(), False)
    872 
    873         # Removed, threading.Event.wait() will return the value of the __flag
    874         # instead of None. API Shear with the semaphore backed mp.Event
    875         self.assertEqual(wait(0.0), False)
    876         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    877         self.assertEqual(wait(TIMEOUT1), False)
    878         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
    879 
    880         event.set()
    881 
    882         # See note above on the API differences
    883         self.assertEqual(event.is_set(), True)
    884         self.assertEqual(wait(), True)
    885         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    886         self.assertEqual(wait(TIMEOUT1), True)
    887         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
    888         # self.assertEqual(event.is_set(), True)
    889 
    890         event.clear()
    891 
    892         #self.assertEqual(event.is_set(), False)
    893 
    894         p = self.Process(target=self._test_event, args=(event,))
    895         p.daemon = True
    896         p.start()
    897         self.assertEqual(wait(), True)
    898 
    899 #
    900 #
    901 #
    902 
    903 class _TestValue(BaseTestCase):
    904 
    905     ALLOWED_TYPES = ('processes',)
    906 
    907     codes_values = [
    908         ('i', 4343, 24234),
    909         ('d', 3.625, -4.25),
    910         ('h', -232, 234),
    911         ('c', latin('x'), latin('y'))
    912         ]
    913 
    914     def setUp(self):
    915         if not HAS_SHAREDCTYPES:
    916             self.skipTest("requires multiprocessing.sharedctypes")
    917 
    918     @classmethod
    919     def _test(cls, values):
    920         for sv, cv in zip(values, cls.codes_values):
    921             sv.value = cv[2]
    922 
    923 
    924     def test_value(self, raw=False):
    925         if raw:
    926             values = [self.RawValue(code, value)
    927                       for code, value, _ in self.codes_values]
    928         else:
    929             values = [self.Value(code, value)
    930                       for code, value, _ in self.codes_values]
    931 
    932         for sv, cv in zip(values, self.codes_values):
    933             self.assertEqual(sv.value, cv[1])
    934 
    935         proc = self.Process(target=self._test, args=(values,))
    936         proc.daemon = True
    937         proc.start()
    938         proc.join()
    939 
    940         for sv, cv in zip(values, self.codes_values):
    941             self.assertEqual(sv.value, cv[2])
    942 
    943     def test_rawvalue(self):
    944         self.test_value(raw=True)
    945 
    946     def test_getobj_getlock(self):
    947         val1 = self.Value('i', 5)
    948         lock1 = val1.get_lock()
    949         obj1 = val1.get_obj()
    950 
    951         val2 = self.Value('i', 5, lock=None)
    952         lock2 = val2.get_lock()
    953         obj2 = val2.get_obj()
    954 
    955         lock = self.Lock()
    956         val3 = self.Value('i', 5, lock=lock)
    957         lock3 = val3.get_lock()
    958         obj3 = val3.get_obj()
    959         self.assertEqual(lock, lock3)
    960 
    961         arr4 = self.Value('i', 5, lock=False)
    962         self.assertFalse(hasattr(arr4, 'get_lock'))
    963         self.assertFalse(hasattr(arr4, 'get_obj'))
    964 
    965         self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
    966 
    967         arr5 = self.RawValue('i', 5)
    968         self.assertFalse(hasattr(arr5, 'get_lock'))
    969         self.assertFalse(hasattr(arr5, 'get_obj'))
    970 
    971 
    972 class _TestArray(BaseTestCase):
    973 
    974     ALLOWED_TYPES = ('processes',)
    975 
    976     @classmethod
    977     def f(cls, seq):
    978         for i in range(1, len(seq)):
    979             seq[i] += seq[i-1]
    980 
    981     @unittest.skipIf(c_int is None, "requires _ctypes")
    982     def test_array(self, raw=False):
    983         seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
    984         if raw:
    985             arr = self.RawArray('i', seq)
    986         else:
    987             arr = self.Array('i', seq)
    988 
    989         self.assertEqual(len(arr), len(seq))
    990         self.assertEqual(arr[3], seq[3])
    991         self.assertEqual(list(arr[2:7]), list(seq[2:7]))
    992 
    993         arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
    994 
    995         self.assertEqual(list(arr[:]), seq)
    996 
    997         self.f(seq)
    998 
    999         p = self.Process(target=self.f, args=(arr,))
   1000         p.daemon = True
   1001         p.start()
   1002         p.join()
   1003 
   1004         self.assertEqual(list(arr[:]), seq)
   1005 
   1006     @unittest.skipIf(c_int is None, "requires _ctypes")
   1007     def test_array_from_size(self):
   1008         size = 10
   1009         # Test for zeroing (see issue #11675).
   1010         # The repetition below strengthens the test by increasing the chances
   1011         # of previously allocated non-zero memory being used for the new array
   1012         # on the 2nd and 3rd loops.
   1013         for _ in range(3):
   1014             arr = self.Array('i', size)
   1015             self.assertEqual(len(arr), size)
   1016             self.assertEqual(list(arr), [0] * size)
   1017             arr[:] = range(10)
   1018             self.assertEqual(list(arr), range(10))
   1019             del arr
   1020 
   1021     @unittest.skipIf(c_int is None, "requires _ctypes")
   1022     def test_rawarray(self):
   1023         self.test_array(raw=True)
   1024 
   1025     @unittest.skipIf(c_int is None, "requires _ctypes")
   1026     def test_array_accepts_long(self):
   1027         arr = self.Array('i', 10L)
   1028         self.assertEqual(len(arr), 10)
   1029         raw_arr = self.RawArray('i', 10L)
   1030         self.assertEqual(len(raw_arr), 10)
   1031 
   1032     @unittest.skipIf(c_int is None, "requires _ctypes")
   1033     def test_getobj_getlock_obj(self):
   1034         arr1 = self.Array('i', range(10))
   1035         lock1 = arr1.get_lock()
   1036         obj1 = arr1.get_obj()
   1037 
   1038         arr2 = self.Array('i', range(10), lock=None)
   1039         lock2 = arr2.get_lock()
   1040         obj2 = arr2.get_obj()
   1041 
   1042         lock = self.Lock()
   1043         arr3 = self.Array('i', range(10), lock=lock)
   1044         lock3 = arr3.get_lock()
   1045         obj3 = arr3.get_obj()
   1046         self.assertEqual(lock, lock3)
   1047 
   1048         arr4 = self.Array('i', range(10), lock=False)
   1049         self.assertFalse(hasattr(arr4, 'get_lock'))
   1050         self.assertFalse(hasattr(arr4, 'get_obj'))
   1051         self.assertRaises(AttributeError,
   1052                           self.Array, 'i', range(10), lock='notalock')
   1053 
   1054         arr5 = self.RawArray('i', range(10))
   1055         self.assertFalse(hasattr(arr5, 'get_lock'))
   1056         self.assertFalse(hasattr(arr5, 'get_obj'))
   1057 
   1058 #
   1059 #
   1060 #
   1061 
   1062 class _TestContainers(BaseTestCase):
   1063 
   1064     ALLOWED_TYPES = ('manager',)
   1065 
   1066     def test_list(self):
   1067         a = self.list(range(10))
   1068         self.assertEqual(a[:], range(10))
   1069 
   1070         b = self.list()
   1071         self.assertEqual(b[:], [])
   1072 
   1073         b.extend(range(5))
   1074         self.assertEqual(b[:], range(5))
   1075 
   1076         self.assertEqual(b[2], 2)
   1077         self.assertEqual(b[2:10], [2,3,4])
   1078 
   1079         b *= 2
   1080         self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
   1081 
   1082         self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
   1083 
   1084         self.assertEqual(a[:], range(10))
   1085 
   1086         d = [a, b]
   1087         e = self.list(d)
   1088         self.assertEqual(
   1089             e[:],
   1090             [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
   1091             )
   1092 
   1093         f = self.list([a])
   1094         a.append('hello')
   1095         self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
   1096 
   1097     def test_dict(self):
   1098         d = self.dict()
   1099         indices = range(65, 70)
   1100         for i in indices:
   1101             d[i] = chr(i)
   1102         self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
   1103         self.assertEqual(sorted(d.keys()), indices)
   1104         self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
   1105         self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
   1106 
   1107     def test_namespace(self):
   1108         n = self.Namespace()
   1109         n.name = 'Bob'
   1110         n.job = 'Builder'
   1111         n._hidden = 'hidden'
   1112         self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
   1113         del n.job
   1114         self.assertEqual(str(n), "Namespace(name='Bob')")
   1115         self.assertTrue(hasattr(n, 'name'))
   1116         self.assertTrue(not hasattr(n, 'job'))
   1117 
   1118 #
   1119 #
   1120 #
   1121 
   1122 def sqr(x, wait=0.0):
   1123     time.sleep(wait)
   1124     return x*x
   1125 
   1126 class SayWhenError(ValueError): pass
   1127 
   1128 def exception_throwing_generator(total, when):
   1129     for i in range(total):
   1130         if i == when:
   1131             raise SayWhenError("Somebody said when")
   1132         yield i
   1133 
   1134 class _TestPool(BaseTestCase):
   1135 
   1136     def test_apply(self):
   1137         papply = self.pool.apply
   1138         self.assertEqual(papply(sqr, (5,)), sqr(5))
   1139         self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
   1140 
   1141     def test_map(self):
   1142         pmap = self.pool.map
   1143         self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
   1144         self.assertEqual(pmap(sqr, range(100), chunksize=20),
   1145                          map(sqr, range(100)))
   1146 
   1147     def test_map_unplicklable(self):
   1148         # Issue #19425 -- failure to pickle should not cause a hang
   1149         if self.TYPE == 'threads':
   1150             self.skipTest('test not appropriate for {}'.format(self.TYPE))
   1151         class A(object):
   1152             def __reduce__(self):
   1153                 raise RuntimeError('cannot pickle')
   1154         with self.assertRaises(RuntimeError):
   1155             self.pool.map(sqr, [A()]*10)
   1156 
   1157     def test_map_chunksize(self):
   1158         try:
   1159             self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
   1160         except multiprocessing.TimeoutError:
   1161             self.fail("pool.map_async with chunksize stalled on null list")
   1162 
   1163     def test_async(self):
   1164         res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
   1165         get = TimingWrapper(res.get)
   1166         self.assertEqual(get(), 49)
   1167         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
   1168 
   1169     def test_async_timeout(self):
   1170         res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
   1171         get = TimingWrapper(res.get)
   1172         self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
   1173         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
   1174 
   1175     def test_imap(self):
   1176         it = self.pool.imap(sqr, range(10))
   1177         self.assertEqual(list(it), map(sqr, range(10)))
   1178 
   1179         it = self.pool.imap(sqr, range(10))
   1180         for i in range(10):
   1181             self.assertEqual(it.next(), i*i)
   1182         self.assertRaises(StopIteration, it.next)
   1183 
   1184         it = self.pool.imap(sqr, range(1000), chunksize=100)
   1185         for i in range(1000):
   1186             self.assertEqual(it.next(), i*i)
   1187         self.assertRaises(StopIteration, it.next)
   1188 
   1189     def test_imap_handle_iterable_exception(self):
   1190         if self.TYPE == 'manager':
   1191             self.skipTest('test not appropriate for {}'.format(self.TYPE))
   1192 
   1193         it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
   1194         for i in range(3):
   1195             self.assertEqual(next(it), i*i)
   1196         self.assertRaises(SayWhenError, it.next)
   1197 
   1198         # SayWhenError seen at start of problematic chunk's results
   1199         it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
   1200         for i in range(6):
   1201             self.assertEqual(next(it), i*i)
   1202         self.assertRaises(SayWhenError, it.next)
   1203         it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
   1204         for i in range(4):
   1205             self.assertEqual(next(it), i*i)
   1206         self.assertRaises(SayWhenError, it.next)
   1207 
   1208     def test_imap_unordered(self):
   1209         it = self.pool.imap_unordered(sqr, range(1000))
   1210         self.assertEqual(sorted(it), map(sqr, range(1000)))
   1211 
   1212         it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
   1213         self.assertEqual(sorted(it), map(sqr, range(1000)))
   1214 
   1215     def test_imap_unordered_handle_iterable_exception(self):
   1216         if self.TYPE == 'manager':
   1217             self.skipTest('test not appropriate for {}'.format(self.TYPE))
   1218 
   1219         it = self.pool.imap_unordered(sqr,
   1220                                       exception_throwing_generator(10, 3),
   1221                                       1)
   1222         expected_values = map(sqr, range(10))
   1223         with self.assertRaises(SayWhenError):
   1224             # imap_unordered makes it difficult to anticipate the SayWhenError
   1225             for i in range(10):
   1226                 value = next(it)
   1227                 self.assertIn(value, expected_values)
   1228                 expected_values.remove(value)
   1229 
   1230         it = self.pool.imap_unordered(sqr,
   1231                                       exception_throwing_generator(20, 7),
   1232                                       2)
   1233         expected_values = map(sqr, range(20))
   1234         with self.assertRaises(SayWhenError):
   1235             for i in range(20):
   1236                 value = next(it)
   1237                 self.assertIn(value, expected_values)
   1238                 expected_values.remove(value)
   1239 
   1240     def test_make_pool(self):
   1241         self.assertRaises(ValueError, multiprocessing.Pool, -1)
   1242         self.assertRaises(ValueError, multiprocessing.Pool, 0)
   1243 
   1244         p = multiprocessing.Pool(3)
   1245         self.assertEqual(3, len(p._pool))
   1246         p.close()
   1247         p.join()
   1248 
   1249     def test_terminate(self):
   1250         p = self.Pool(4)
   1251         result = p.map_async(
   1252             time.sleep, [0.1 for i in range(10000)], chunksize=1
   1253             )
   1254         p.terminate()
   1255         join = TimingWrapper(p.join)
   1256         join()
   1257         self.assertTrue(join.elapsed < 0.2)
   1258 
   1259     def test_empty_iterable(self):
   1260         # See Issue 12157
   1261         p = self.Pool(1)
   1262 
   1263         self.assertEqual(p.map(sqr, []), [])
   1264         self.assertEqual(list(p.imap(sqr, [])), [])
   1265         self.assertEqual(list(p.imap_unordered(sqr, [])), [])
   1266         self.assertEqual(p.map_async(sqr, []).get(), [])
   1267 
   1268         p.close()
   1269         p.join()
   1270 
   1271 def unpickleable_result():
   1272     return lambda: 42
   1273 
   1274 class _TestPoolWorkerErrors(BaseTestCase):
   1275     ALLOWED_TYPES = ('processes', )
   1276 
   1277     def test_unpickleable_result(self):
   1278         from multiprocessing.pool import MaybeEncodingError
   1279         p = multiprocessing.Pool(2)
   1280 
   1281         # Make sure we don't lose pool processes because of encoding errors.
   1282         for iteration in range(20):
   1283             res = p.apply_async(unpickleable_result)
   1284             self.assertRaises(MaybeEncodingError, res.get)
   1285 
   1286         p.close()
   1287         p.join()
   1288 
   1289 class _TestPoolWorkerLifetime(BaseTestCase):
   1290 
   1291     ALLOWED_TYPES = ('processes', )
   1292     def test_pool_worker_lifetime(self):
   1293         p = multiprocessing.Pool(3, maxtasksperchild=10)
   1294         self.assertEqual(3, len(p._pool))
   1295         origworkerpids = [w.pid for w in p._pool]
   1296         # Run many tasks so each worker gets replaced (hopefully)
   1297         results = []
   1298         for i in range(100):
   1299             results.append(p.apply_async(sqr, (i, )))
   1300         # Fetch the results and verify we got the right answers,
   1301         # also ensuring all the tasks have completed.
   1302         for (j, res) in enumerate(results):
   1303             self.assertEqual(res.get(), sqr(j))
   1304         # Refill the pool
   1305         p._repopulate_pool()
   1306         # Wait until all workers are alive
   1307         # (countdown * DELTA = 5 seconds max startup process time)
   1308         countdown = 50
   1309         while countdown and not all(w.is_alive() for w in p._pool):
   1310             countdown -= 1
   1311             time.sleep(DELTA)
   1312         finalworkerpids = [w.pid for w in p._pool]
   1313         # All pids should be assigned.  See issue #7805.
   1314         self.assertNotIn(None, origworkerpids)
   1315         self.assertNotIn(None, finalworkerpids)
   1316         # Finally, check that the worker pids have changed
   1317         self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
   1318         p.close()
   1319         p.join()
   1320 
   1321     def test_pool_worker_lifetime_early_close(self):
   1322         # Issue #10332: closing a pool whose workers have limited lifetimes
   1323         # before all the tasks completed would make join() hang.
   1324         p = multiprocessing.Pool(3, maxtasksperchild=1)
   1325         results = []
   1326         for i in range(6):
   1327             results.append(p.apply_async(sqr, (i, 0.3)))
   1328         p.close()
   1329         p.join()
   1330         # check the results
   1331         for (j, res) in enumerate(results):
   1332             self.assertEqual(res.get(), sqr(j))
   1333 
   1334 
   1335 #
   1336 # Test that manager has expected number of shared objects left
   1337 #
   1338 
   1339 class _TestZZZNumberOfObjects(BaseTestCase):
   1340     # Because test cases are sorted alphabetically, this one will get
   1341     # run after all the other tests for the manager.  It tests that
   1342     # there have been no "reference leaks" for the manager's shared
   1343     # objects.  Note the comment in _TestPool.test_terminate().
   1344     ALLOWED_TYPES = ('manager',)
   1345 
   1346     def test_number_of_objects(self):
   1347         EXPECTED_NUMBER = 1                # the pool object is still alive
   1348         multiprocessing.active_children()  # discard dead process objs
   1349         gc.collect()                       # do garbage collection
   1350         refs = self.manager._number_of_objects()
   1351         debug_info = self.manager._debug_info()
   1352         if refs != EXPECTED_NUMBER:
   1353             print self.manager._debug_info()
   1354             print debug_info
   1355 
   1356         self.assertEqual(refs, EXPECTED_NUMBER)
   1357 
   1358 #
   1359 # Test of creating a customized manager class
   1360 #
   1361 
   1362 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
   1363 
   1364 class FooBar(object):
   1365     def f(self):
   1366         return 'f()'
   1367     def g(self):
   1368         raise ValueError
   1369     def _h(self):
   1370         return '_h()'
   1371 
   1372 def baz():
   1373     for i in xrange(10):
   1374         yield i*i
   1375 
   1376 class IteratorProxy(BaseProxy):
   1377     _exposed_ = ('next', '__next__')
   1378     def __iter__(self):
   1379         return self
   1380     def next(self):
   1381         return self._callmethod('next')
   1382     def __next__(self):
   1383         return self._callmethod('__next__')
   1384 
   1385 class MyManager(BaseManager):
   1386     pass
   1387 
   1388 MyManager.register('Foo', callable=FooBar)
   1389 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
   1390 MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
   1391 
   1392 
   1393 class _TestMyManager(BaseTestCase):
   1394 
   1395     ALLOWED_TYPES = ('manager',)
   1396 
   1397     def test_mymanager(self):
   1398         manager = MyManager()
   1399         manager.start()
   1400 
   1401         foo = manager.Foo()
   1402         bar = manager.Bar()
   1403         baz = manager.baz()
   1404 
   1405         foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
   1406         bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
   1407 
   1408         self.assertEqual(foo_methods, ['f', 'g'])
   1409         self.assertEqual(bar_methods, ['f', '_h'])
   1410 
   1411         self.assertEqual(foo.f(), 'f()')
   1412         self.assertRaises(ValueError, foo.g)
   1413         self.assertEqual(foo._callmethod('f'), 'f()')
   1414         self.assertRaises(RemoteError, foo._callmethod, '_h')
   1415 
   1416         self.assertEqual(bar.f(), 'f()')
   1417         self.assertEqual(bar._h(), '_h()')
   1418         self.assertEqual(bar._callmethod('f'), 'f()')
   1419         self.assertEqual(bar._callmethod('_h'), '_h()')
   1420 
   1421         self.assertEqual(list(baz), [i*i for i in range(10)])
   1422 
   1423         manager.shutdown()
   1424 
   1425 #
   1426 # Test of connecting to a remote server and using xmlrpclib for serialization
   1427 #
   1428 
   1429 _queue = Queue.Queue()
   1430 def get_queue():
   1431     return _queue
   1432 
   1433 class QueueManager(BaseManager):
   1434     '''manager class used by server process'''
   1435 QueueManager.register('get_queue', callable=get_queue)
   1436 
   1437 class QueueManager2(BaseManager):
   1438     '''manager class which specifies the same interface as QueueManager'''
   1439 QueueManager2.register('get_queue')
   1440 
   1441 
   1442 SERIALIZER = 'xmlrpclib'
   1443 
   1444 class _TestRemoteManager(BaseTestCase):
   1445 
   1446     ALLOWED_TYPES = ('manager',)
   1447     values = ['hello world', None, True, 2.25,
   1448               #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
   1449               ]
   1450     result = values[:]
   1451     if test_support.have_unicode:
   1452         #result[-1] = u'hall\xe5 v\xe4rlden'
   1453         uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
   1454                                 r'\u0441\u0432\u0456\u0442')
   1455         values.append(uvalue)
   1456         result.append(uvalue)
   1457 
   1458     @classmethod
   1459     def _putter(cls, address, authkey):
   1460         manager = QueueManager2(
   1461             address=address, authkey=authkey, serializer=SERIALIZER
   1462             )
   1463         manager.connect()
   1464         queue = manager.get_queue()
   1465         # Note that xmlrpclib will deserialize object as a list not a tuple
   1466         queue.put(tuple(cls.values))
   1467 
   1468     def test_remote(self):
   1469         authkey = os.urandom(32)
   1470 
   1471         manager = QueueManager(
   1472             address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
   1473             )
   1474         manager.start()
   1475 
   1476         p = self.Process(target=self._putter, args=(manager.address, authkey))
   1477         p.daemon = True
   1478         p.start()
   1479 
   1480         manager2 = QueueManager2(
   1481             address=manager.address, authkey=authkey, serializer=SERIALIZER
   1482             )
   1483         manager2.connect()
   1484         queue = manager2.get_queue()
   1485 
   1486         self.assertEqual(queue.get(), self.result)
   1487 
   1488         # Because we are using xmlrpclib for serialization instead of
   1489         # pickle this will cause a serialization error.
   1490         self.assertRaises(Exception, queue.put, time.sleep)
   1491 
   1492         # Make queue finalizer run before the server is stopped
   1493         del queue
   1494         manager.shutdown()
   1495 
   1496 class _TestManagerRestart(BaseTestCase):
   1497 
   1498     @classmethod
   1499     def _putter(cls, address, authkey):
   1500         manager = QueueManager(
   1501             address=address, authkey=authkey, serializer=SERIALIZER)
   1502         manager.connect()
   1503         queue = manager.get_queue()
   1504         queue.put('hello world')
   1505 
   1506     def test_rapid_restart(self):
   1507         authkey = os.urandom(32)
   1508         manager = QueueManager(
   1509             address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
   1510         srvr = manager.get_server()
   1511         addr = srvr.address
   1512         # Close the connection.Listener socket which gets opened as a part
   1513         # of manager.get_server(). It's not needed for the test.
   1514         srvr.listener.close()
   1515         manager.start()
   1516 
   1517         p = self.Process(target=self._putter, args=(manager.address, authkey))
   1518         p.daemon = True
   1519         p.start()
   1520         queue = manager.get_queue()
   1521         self.assertEqual(queue.get(), 'hello world')
   1522         del queue
   1523         manager.shutdown()
   1524         manager = QueueManager(
   1525             address=addr, authkey=authkey, serializer=SERIALIZER)
   1526         manager.start()
   1527         manager.shutdown()
   1528 
   1529 #
   1530 #
   1531 #
   1532 
   1533 SENTINEL = latin('')
   1534 
   1535 class _TestConnection(BaseTestCase):
   1536 
   1537     ALLOWED_TYPES = ('processes', 'threads')
   1538 
   1539     @classmethod
   1540     def _echo(cls, conn):
   1541         for msg in iter(conn.recv_bytes, SENTINEL):
   1542             conn.send_bytes(msg)
   1543         conn.close()
   1544 
   1545     def test_connection(self):
   1546         conn, child_conn = self.Pipe()
   1547 
   1548         p = self.Process(target=self._echo, args=(child_conn,))
   1549         p.daemon = True
   1550         p.start()
   1551 
   1552         seq = [1, 2.25, None]
   1553         msg = latin('hello world')
   1554         longmsg = msg * 10
   1555         arr = array.array('i', range(4))
   1556 
   1557         if self.TYPE == 'processes':
   1558             self.assertEqual(type(conn.fileno()), int)
   1559 
   1560         self.assertEqual(conn.send(seq), None)
   1561         self.assertEqual(conn.recv(), seq)
   1562 
   1563         self.assertEqual(conn.send_bytes(msg), None)
   1564         self.assertEqual(conn.recv_bytes(), msg)
   1565 
   1566         if self.TYPE == 'processes':
   1567             buffer = array.array('i', [0]*10)
   1568             expected = list(arr) + [0] * (10 - len(arr))
   1569             self.assertEqual(conn.send_bytes(arr), None)
   1570             self.assertEqual(conn.recv_bytes_into(buffer),
   1571                              len(arr) * buffer.itemsize)
   1572             self.assertEqual(list(buffer), expected)
   1573 
   1574             buffer = array.array('i', [0]*10)
   1575             expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
   1576             self.assertEqual(conn.send_bytes(arr), None)
   1577             self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
   1578                              len(arr) * buffer.itemsize)
   1579             self.assertEqual(list(buffer), expected)
   1580 
   1581             buffer = bytearray(latin(' ' * 40))
   1582             self.assertEqual(conn.send_bytes(longmsg), None)
   1583             try:
   1584                 res = conn.recv_bytes_into(buffer)
   1585             except multiprocessing.BufferTooShort, e:
   1586                 self.assertEqual(e.args, (longmsg,))
   1587             else:
   1588                 self.fail('expected BufferTooShort, got %s' % res)
   1589 
   1590         poll = TimingWrapper(conn.poll)
   1591 
   1592         self.assertEqual(poll(), False)
   1593         self.assertTimingAlmostEqual(poll.elapsed, 0)
   1594 
   1595         self.assertEqual(poll(TIMEOUT1), False)
   1596         self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
   1597 
   1598         conn.send(None)
   1599         time.sleep(.1)
   1600 
   1601         self.assertEqual(poll(TIMEOUT1), True)
   1602         self.assertTimingAlmostEqual(poll.elapsed, 0)
   1603 
   1604         self.assertEqual(conn.recv(), None)
   1605 
   1606         really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
   1607         conn.send_bytes(really_big_msg)
   1608         self.assertEqual(conn.recv_bytes(), really_big_msg)
   1609 
   1610         conn.send_bytes(SENTINEL)                          # tell child to quit
   1611         child_conn.close()
   1612 
   1613         if self.TYPE == 'processes':
   1614             self.assertEqual(conn.readable, True)
   1615             self.assertEqual(conn.writable, True)
   1616             self.assertRaises(EOFError, conn.recv)
   1617             self.assertRaises(EOFError, conn.recv_bytes)
   1618 
   1619         p.join()
   1620 
   1621     def test_duplex_false(self):
   1622         reader, writer = self.Pipe(duplex=False)
   1623         self.assertEqual(writer.send(1), None)
   1624         self.assertEqual(reader.recv(), 1)
   1625         if self.TYPE == 'processes':
   1626             self.assertEqual(reader.readable, True)
   1627             self.assertEqual(reader.writable, False)
   1628             self.assertEqual(writer.readable, False)
   1629             self.assertEqual(writer.writable, True)
   1630             self.assertRaises(IOError, reader.send, 2)
   1631             self.assertRaises(IOError, writer.recv)
   1632             self.assertRaises(IOError, writer.poll)
   1633 
   1634     def test_spawn_close(self):
   1635         # We test that a pipe connection can be closed by parent
   1636         # process immediately after child is spawned.  On Windows this
   1637         # would have sometimes failed on old versions because
   1638         # child_conn would be closed before the child got a chance to
   1639         # duplicate it.
   1640         conn, child_conn = self.Pipe()
   1641 
   1642         p = self.Process(target=self._echo, args=(child_conn,))
   1643         p.daemon = True
   1644         p.start()
   1645         child_conn.close()    # this might complete before child initializes
   1646 
   1647         msg = latin('hello')
   1648         conn.send_bytes(msg)
   1649         self.assertEqual(conn.recv_bytes(), msg)
   1650 
   1651         conn.send_bytes(SENTINEL)
   1652         conn.close()
   1653         p.join()
   1654 
   1655     def test_sendbytes(self):
   1656         if self.TYPE != 'processes':
   1657             self.skipTest('test not appropriate for {}'.format(self.TYPE))
   1658 
   1659         msg = latin('abcdefghijklmnopqrstuvwxyz')
   1660         a, b = self.Pipe()
   1661 
   1662         a.send_bytes(msg)
   1663         self.assertEqual(b.recv_bytes(), msg)
   1664 
   1665         a.send_bytes(msg, 5)
   1666         self.assertEqual(b.recv_bytes(), msg[5:])
   1667 
   1668         a.send_bytes(msg, 7, 8)
   1669         self.assertEqual(b.recv_bytes(), msg[7:7+8])
   1670 
   1671         a.send_bytes(msg, 26)
   1672         self.assertEqual(b.recv_bytes(), latin(''))
   1673 
   1674         a.send_bytes(msg, 26, 0)
   1675         self.assertEqual(b.recv_bytes(), latin(''))
   1676 
   1677         self.assertRaises(ValueError, a.send_bytes, msg, 27)
   1678 
   1679         self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
   1680 
   1681         self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
   1682 
   1683         self.assertRaises(ValueError, a.send_bytes, msg, -1)
   1684 
   1685         self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
   1686 
   1687     @classmethod
   1688     def _is_fd_assigned(cls, fd):
   1689         try:
   1690             os.fstat(fd)
   1691         except OSError as e:
   1692             if e.errno == errno.EBADF:
   1693                 return False
   1694             raise
   1695         else:
   1696             return True
   1697 
   1698     @classmethod
   1699     def _writefd(cls, conn, data, create_dummy_fds=False):
   1700         if create_dummy_fds:
   1701             for i in range(0, 256):
   1702                 if not cls._is_fd_assigned(i):
   1703                     os.dup2(conn.fileno(), i)
   1704         fd = reduction.recv_handle(conn)
   1705         if msvcrt:
   1706             fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
   1707         os.write(fd, data)
   1708         os.close(fd)
   1709 
   1710     @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
   1711     def test_fd_transfer(self):
   1712         if self.TYPE != 'processes':
   1713             self.skipTest("only makes sense with processes")
   1714         conn, child_conn = self.Pipe(duplex=True)
   1715 
   1716         p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
   1717         p.daemon = True
   1718         p.start()
   1719         with open(test_support.TESTFN, "wb") as f:
   1720             fd = f.fileno()
   1721             if msvcrt:
   1722                 fd = msvcrt.get_osfhandle(fd)
   1723             reduction.send_handle(conn, fd, p.pid)
   1724         p.join()
   1725         with open(test_support.TESTFN, "rb") as f:
   1726             self.assertEqual(f.read(), b"foo")
   1727 
   1728     @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
   1729     @unittest.skipIf(sys.platform == "win32",
   1730                      "test semantics don't make sense on Windows")
   1731     @unittest.skipIf(MAXFD <= 256,
   1732                      "largest assignable fd number is too small")
   1733     @unittest.skipUnless(hasattr(os, "dup2"),
   1734                          "test needs os.dup2()")
   1735     def test_large_fd_transfer(self):
   1736         # With fd > 256 (issue #11657)
   1737         if self.TYPE != 'processes':
   1738             self.skipTest("only makes sense with processes")
   1739         conn, child_conn = self.Pipe(duplex=True)
   1740 
   1741         p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
   1742         p.daemon = True
   1743         p.start()
   1744         with open(test_support.TESTFN, "wb") as f:
   1745             fd = f.fileno()
   1746             for newfd in range(256, MAXFD):
   1747                 if not self._is_fd_assigned(newfd):
   1748                     break
   1749             else:
   1750                 self.fail("could not find an unassigned large file descriptor")
   1751             os.dup2(fd, newfd)
   1752             try:
   1753                 reduction.send_handle(conn, newfd, p.pid)
   1754             finally:
   1755                 os.close(newfd)
   1756         p.join()
   1757         with open(test_support.TESTFN, "rb") as f:
   1758             self.assertEqual(f.read(), b"bar")
   1759 
   1760     @classmethod
   1761     def _send_data_without_fd(self, conn):
   1762         os.write(conn.fileno(), b"\0")
   1763 
   1764     @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
   1765     @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
   1766     def test_missing_fd_transfer(self):
   1767         # Check that exception is raised when received data is not
   1768         # accompanied by a file descriptor in ancillary data.
   1769         if self.TYPE != 'processes':
   1770             self.skipTest("only makes sense with processes")
   1771         conn, child_conn = self.Pipe(duplex=True)
   1772 
   1773         p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
   1774         p.daemon = True
   1775         p.start()
   1776         self.assertRaises(RuntimeError, reduction.recv_handle, conn)
   1777         p.join()
   1778 
   1779 class _TestListenerClient(BaseTestCase):
   1780 
   1781     ALLOWED_TYPES = ('processes', 'threads')
   1782 
   1783     @classmethod
   1784     def _test(cls, address):
   1785         conn = cls.connection.Client(address)
   1786         conn.send('hello')
   1787         conn.close()
   1788 
   1789     def test_listener_client(self):
   1790         for family in self.connection.families:
   1791             l = self.connection.Listener(family=family)
   1792             p = self.Process(target=self._test, args=(l.address,))
   1793             p.daemon = True
   1794             p.start()
   1795             conn = l.accept()
   1796             self.assertEqual(conn.recv(), 'hello')
   1797             p.join()
   1798             l.close()
   1799 
   1800     def test_issue14725(self):
   1801         l = self.connection.Listener()
   1802         p = self.Process(target=self._test, args=(l.address,))
   1803         p.daemon = True
   1804         p.start()
   1805         time.sleep(1)
   1806         # On Windows the client process should by now have connected,
   1807         # written data and closed the pipe handle by now.  This causes
   1808         # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
   1809         # 14725.
   1810         conn = l.accept()
   1811         self.assertEqual(conn.recv(), 'hello')
   1812         conn.close()
   1813         p.join()
   1814         l.close()
   1815 
   1816 #
   1817 # Test of sending connection and socket objects between processes
   1818 #
   1819 """
   1820 class _TestPicklingConnections(BaseTestCase):
   1821 
   1822     ALLOWED_TYPES = ('processes',)
   1823 
   1824     def _listener(self, conn, families):
   1825         for fam in families:
   1826             l = self.connection.Listener(family=fam)
   1827             conn.send(l.address)
   1828             new_conn = l.accept()
   1829             conn.send(new_conn)
   1830 
   1831         if self.TYPE == 'processes':
   1832             l = socket.socket()
   1833             l.bind(('localhost', 0))
   1834             conn.send(l.getsockname())
   1835             l.listen(1)
   1836             new_conn, addr = l.accept()
   1837             conn.send(new_conn)
   1838 
   1839         conn.recv()
   1840 
   1841     def _remote(self, conn):
   1842         for (address, msg) in iter(conn.recv, None):
   1843             client = self.connection.Client(address)
   1844             client.send(msg.upper())
   1845             client.close()
   1846 
   1847         if self.TYPE == 'processes':
   1848             address, msg = conn.recv()
   1849             client = socket.socket()
   1850             client.connect(address)
   1851             client.sendall(msg.upper())
   1852             client.close()
   1853 
   1854         conn.close()
   1855 
   1856     def test_pickling(self):
   1857         try:
   1858             multiprocessing.allow_connection_pickling()
   1859         except ImportError:
   1860             return
   1861 
   1862         families = self.connection.families
   1863 
   1864         lconn, lconn0 = self.Pipe()
   1865         lp = self.Process(target=self._listener, args=(lconn0, families))
   1866         lp.daemon = True
   1867         lp.start()
   1868         lconn0.close()
   1869 
   1870         rconn, rconn0 = self.Pipe()
   1871         rp = self.Process(target=self._remote, args=(rconn0,))
   1872         rp.daemon = True
   1873         rp.start()
   1874         rconn0.close()
   1875 
   1876         for fam in families:
   1877             msg = ('This connection uses family %s' % fam).encode('ascii')
   1878             address = lconn.recv()
   1879             rconn.send((address, msg))
   1880             new_conn = lconn.recv()
   1881             self.assertEqual(new_conn.recv(), msg.upper())
   1882 
   1883         rconn.send(None)
   1884 
   1885         if self.TYPE == 'processes':
   1886             msg = latin('This connection uses a normal socket')
   1887             address = lconn.recv()
   1888             rconn.send((address, msg))
   1889             if hasattr(socket, 'fromfd'):
   1890                 new_conn = lconn.recv()
   1891                 self.assertEqual(new_conn.recv(100), msg.upper())
   1892             else:
   1893                 # XXX On Windows with Py2.6 need to backport fromfd()
   1894                 discard = lconn.recv_bytes()
   1895 
   1896         lconn.send(None)
   1897 
   1898         rconn.close()
   1899         lconn.close()
   1900 
   1901         lp.join()
   1902         rp.join()
   1903 """
   1904 #
   1905 #
   1906 #
   1907 
   1908 class _TestHeap(BaseTestCase):
   1909 
   1910     ALLOWED_TYPES = ('processes',)
   1911 
   1912     def test_heap(self):
   1913         iterations = 5000
   1914         maxblocks = 50
   1915         blocks = []
   1916 
   1917         # create and destroy lots of blocks of different sizes
   1918         for i in xrange(iterations):
   1919             size = int(random.lognormvariate(0, 1) * 1000)
   1920             b = multiprocessing.heap.BufferWrapper(size)
   1921             blocks.append(b)
   1922             if len(blocks) > maxblocks:
   1923                 i = random.randrange(maxblocks)
   1924                 del blocks[i]
   1925 
   1926         # get the heap object
   1927         heap = multiprocessing.heap.BufferWrapper._heap
   1928 
   1929         # verify the state of the heap
   1930         all = []
   1931         occupied = 0
   1932         heap._lock.acquire()
   1933         self.addCleanup(heap._lock.release)
   1934         for L in heap._len_to_seq.values():
   1935             for arena, start, stop in L:
   1936                 all.append((heap._arenas.index(arena), start, stop,
   1937                             stop-start, 'free'))
   1938         for arena, start, stop in heap._allocated_blocks:
   1939             all.append((heap._arenas.index(arena), start, stop,
   1940                         stop-start, 'occupied'))
   1941             occupied += (stop-start)
   1942 
   1943         all.sort()
   1944 
   1945         for i in range(len(all)-1):
   1946             (arena, start, stop) = all[i][:3]
   1947             (narena, nstart, nstop) = all[i+1][:3]
   1948             self.assertTrue((arena != narena and nstart == 0) or
   1949                             (stop == nstart))
   1950 
   1951     def test_free_from_gc(self):
   1952         # Check that freeing of blocks by the garbage collector doesn't deadlock
   1953         # (issue #12352).
   1954         # Make sure the GC is enabled, and set lower collection thresholds to
   1955         # make collections more frequent (and increase the probability of
   1956         # deadlock).
   1957         if not gc.isenabled():
   1958             gc.enable()
   1959             self.addCleanup(gc.disable)
   1960         thresholds = gc.get_threshold()
   1961         self.addCleanup(gc.set_threshold, *thresholds)
   1962         gc.set_threshold(10)
   1963 
   1964         # perform numerous block allocations, with cyclic references to make
   1965         # sure objects are collected asynchronously by the gc
   1966         for i in range(5000):
   1967             a = multiprocessing.heap.BufferWrapper(1)
   1968             b = multiprocessing.heap.BufferWrapper(1)
   1969             # circular references
   1970             a.buddy = b
   1971             b.buddy = a
   1972 
   1973 #
   1974 #
   1975 #
   1976 
   1977 class _Foo(Structure):
   1978     _fields_ = [
   1979         ('x', c_int),
   1980         ('y', c_double)
   1981         ]
   1982 
   1983 class _TestSharedCTypes(BaseTestCase):
   1984 
   1985     ALLOWED_TYPES = ('processes',)
   1986 
   1987     def setUp(self):
   1988         if not HAS_SHAREDCTYPES:
   1989             self.skipTest("requires multiprocessing.sharedctypes")
   1990 
   1991     @classmethod
   1992     def _double(cls, x, y, foo, arr, string):
   1993         x.value *= 2
   1994         y.value *= 2
   1995         foo.x *= 2
   1996         foo.y *= 2
   1997         string.value *= 2
   1998         for i in range(len(arr)):
   1999             arr[i] *= 2
   2000 
   2001     def test_sharedctypes(self, lock=False):
   2002         x = Value('i', 7, lock=lock)
   2003         y = Value(c_double, 1.0/3.0, lock=lock)
   2004         foo = Value(_Foo, 3, 2, lock=lock)
   2005         arr = self.Array('d', range(10), lock=lock)
   2006         string = self.Array('c', 20, lock=lock)
   2007         string.value = latin('hello')
   2008 
   2009         p = self.Process(target=self._double, args=(x, y, foo, arr, string))
   2010         p.daemon = True
   2011         p.start()
   2012         p.join()
   2013 
   2014         self.assertEqual(x.value, 14)
   2015         self.assertAlmostEqual(y.value, 2.0/3.0)
   2016         self.assertEqual(foo.x, 6)
   2017         self.assertAlmostEqual(foo.y, 4.0)
   2018         for i in range(10):
   2019             self.assertAlmostEqual(arr[i], i*2)
   2020         self.assertEqual(string.value, latin('hellohello'))
   2021 
   2022     def test_synchronize(self):
   2023         self.test_sharedctypes(lock=True)
   2024 
   2025     def test_copy(self):
   2026         foo = _Foo(2, 5.0)
   2027         bar = copy(foo)
   2028         foo.x = 0
   2029         foo.y = 0
   2030         self.assertEqual(bar.x, 2)
   2031         self.assertAlmostEqual(bar.y, 5.0)
   2032 
   2033 #
   2034 #
   2035 #
   2036 
   2037 class _TestFinalize(BaseTestCase):
   2038 
   2039     ALLOWED_TYPES = ('processes',)
   2040 
   2041     @classmethod
   2042     def _test_finalize(cls, conn):
   2043         class Foo(object):
   2044             pass
   2045 
   2046         a = Foo()
   2047         util.Finalize(a, conn.send, args=('a',))
   2048         del a           # triggers callback for a
   2049 
   2050         b = Foo()
   2051         close_b = util.Finalize(b, conn.send, args=('b',))
   2052         close_b()       # triggers callback for b
   2053         close_b()       # does nothing because callback has already been called
   2054         del b           # does nothing because callback has already been called
   2055 
   2056         c = Foo()
   2057         util.Finalize(c, conn.send, args=('c',))
   2058 
   2059         d10 = Foo()
   2060         util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
   2061 
   2062         d01 = Foo()
   2063         util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
   2064         d02 = Foo()
   2065         util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
   2066         d03 = Foo()
   2067         util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
   2068 
   2069         util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
   2070 
   2071         util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
   2072 
   2073         # call multiprocessing's cleanup function then exit process without
   2074         # garbage collecting locals
   2075         util._exit_function()
   2076         conn.close()
   2077         os._exit(0)
   2078 
   2079     def test_finalize(self):
   2080         conn, child_conn = self.Pipe()
   2081 
   2082         p = self.Process(target=self._test_finalize, args=(child_conn,))
   2083         p.daemon = True
   2084         p.start()
   2085         p.join()
   2086 
   2087         result = [obj for obj in iter(conn.recv, 'STOP')]
   2088         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
   2089 
   2090 #
   2091 # Test that from ... import * works for each module
   2092 #
   2093 
   2094 class _TestImportStar(BaseTestCase):
   2095 
   2096     ALLOWED_TYPES = ('processes',)
   2097 
   2098     def test_import(self):
   2099         modules = [
   2100             'multiprocessing', 'multiprocessing.connection',
   2101             'multiprocessing.heap', 'multiprocessing.managers',
   2102             'multiprocessing.pool', 'multiprocessing.process',
   2103             'multiprocessing.synchronize', 'multiprocessing.util'
   2104             ]
   2105 
   2106         if HAS_REDUCTION:
   2107             modules.append('multiprocessing.reduction')
   2108 
   2109         if c_int is not None:
   2110             # This module requires _ctypes
   2111             modules.append('multiprocessing.sharedctypes')
   2112 
   2113         for name in modules:
   2114             __import__(name)
   2115             mod = sys.modules[name]
   2116 
   2117             for attr in getattr(mod, '__all__', ()):
   2118                 self.assertTrue(
   2119                     hasattr(mod, attr),
   2120                     '%r does not have attribute %r' % (mod, attr)
   2121                     )
   2122 
   2123 #
   2124 # Quick test that logging works -- does not test logging output
   2125 #
   2126 
   2127 class _TestLogging(BaseTestCase):
   2128 
   2129     ALLOWED_TYPES = ('processes',)
   2130 
   2131     def test_enable_logging(self):
   2132         logger = multiprocessing.get_logger()
   2133         logger.setLevel(util.SUBWARNING)
   2134         self.assertTrue(logger is not None)
   2135         logger.debug('this will not be printed')
   2136         logger.info('nor will this')
   2137         logger.setLevel(LOG_LEVEL)
   2138 
   2139     @classmethod
   2140     def _test_level(cls, conn):
   2141         logger = multiprocessing.get_logger()
   2142         conn.send(logger.getEffectiveLevel())
   2143 
   2144     def test_level(self):
   2145         LEVEL1 = 32
   2146         LEVEL2 = 37
   2147 
   2148         logger = multiprocessing.get_logger()
   2149         root_logger = logging.getLogger()
   2150         root_level = root_logger.level
   2151 
   2152         reader, writer = multiprocessing.Pipe(duplex=False)
   2153 
   2154         logger.setLevel(LEVEL1)
   2155         p = self.Process(target=self._test_level, args=(writer,))
   2156         p.daemon = True
   2157         p.start()
   2158         self.assertEqual(LEVEL1, reader.recv())
   2159 
   2160         logger.setLevel(logging.NOTSET)
   2161         root_logger.setLevel(LEVEL2)
   2162         p = self.Process(target=self._test_level, args=(writer,))
   2163         p.daemon = True
   2164         p.start()
   2165         self.assertEqual(LEVEL2, reader.recv())
   2166 
   2167         root_logger.setLevel(root_level)
   2168         logger.setLevel(level=LOG_LEVEL)
   2169 
   2170 
   2171 # class _TestLoggingProcessName(BaseTestCase):
   2172 #
   2173 #     def handle(self, record):
   2174 #         assert record.processName == multiprocessing.current_process().name
   2175 #         self.__handled = True
   2176 #
   2177 #     def test_logging(self):
   2178 #         handler = logging.Handler()
   2179 #         handler.handle = self.handle
   2180 #         self.__handled = False
   2181 #         # Bypass getLogger() and side-effects
   2182 #         logger = logging.getLoggerClass()(
   2183 #                 'multiprocessing.test.TestLoggingProcessName')
   2184 #         logger.addHandler(handler)
   2185 #         logger.propagate = False
   2186 #
   2187 #         logger.warn('foo')
   2188 #         assert self.__handled
   2189 
   2190 #
   2191 # Check that Process.join() retries if os.waitpid() fails with EINTR
   2192 #
   2193 
   2194 class _TestPollEintr(BaseTestCase):
   2195 
   2196     ALLOWED_TYPES = ('processes',)
   2197 
   2198     @classmethod
   2199     def _killer(cls, pid):
   2200         time.sleep(0.5)
   2201         os.kill(pid, signal.SIGUSR1)
   2202 
   2203     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
   2204     def test_poll_eintr(self):
   2205         got_signal = [False]
   2206         def record(*args):
   2207             got_signal[0] = True
   2208         pid = os.getpid()
   2209         oldhandler = signal.signal(signal.SIGUSR1, record)
   2210         try:
   2211             killer = self.Process(target=self._killer, args=(pid,))
   2212             killer.start()
   2213             p = self.Process(target=time.sleep, args=(1,))
   2214             p.start()
   2215             p.join()
   2216             self.assertTrue(got_signal[0])
   2217             self.assertEqual(p.exitcode, 0)
   2218             killer.join()
   2219         finally:
   2220             signal.signal(signal.SIGUSR1, oldhandler)
   2221 
   2222 #
   2223 # Test to verify handle verification, see issue 3321
   2224 #
   2225 
   2226 class TestInvalidHandle(unittest.TestCase):
   2227 
   2228     @unittest.skipIf(WIN32, "skipped on Windows")
   2229     def test_invalid_handles(self):
   2230         conn = _multiprocessing.Connection(44977608)
   2231         self.assertRaises(IOError, conn.poll)
   2232         self.assertRaises(IOError, _multiprocessing.Connection, -1)
   2233 
   2234 #
   2235 # Functions used to create test cases from the base ones in this module
   2236 #
   2237 
   2238 def get_attributes(Source, names):
   2239     d = {}
   2240     for name in names:
   2241         obj = getattr(Source, name)
   2242         if type(obj) == type(get_attributes):
   2243             obj = staticmethod(obj)
   2244         d[name] = obj
   2245     return d
   2246 
   2247 def create_test_cases(Mixin, type):
   2248     result = {}
   2249     glob = globals()
   2250     Type = type.capitalize()
   2251 
   2252     for name in glob.keys():
   2253         if name.startswith('_Test'):
   2254             base = glob[name]
   2255             if type in base.ALLOWED_TYPES:
   2256                 newname = 'With' + Type + name[1:]
   2257                 class Temp(base, unittest.TestCase, Mixin):
   2258                     pass
   2259                 result[newname] = Temp
   2260                 Temp.__name__ = newname
   2261                 Temp.__module__ = Mixin.__module__
   2262     return result
   2263 
   2264 #
   2265 # Create test cases
   2266 #
   2267 
   2268 class ProcessesMixin(object):
   2269     TYPE = 'processes'
   2270     Process = multiprocessing.Process
   2271     locals().update(get_attributes(multiprocessing, (
   2272         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
   2273         'Condition', 'Event', 'Value', 'Array', 'RawValue',
   2274         'RawArray', 'current_process', 'active_children', 'Pipe',
   2275         'connection', 'JoinableQueue', 'Pool'
   2276         )))
   2277 
   2278 testcases_processes = create_test_cases(ProcessesMixin, type='processes')
   2279 globals().update(testcases_processes)
   2280 
   2281 
   2282 class ManagerMixin(object):
   2283     TYPE = 'manager'
   2284     Process = multiprocessing.Process
   2285     manager = object.__new__(multiprocessing.managers.SyncManager)
   2286     locals().update(get_attributes(manager, (
   2287         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
   2288        'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
   2289         'Namespace', 'JoinableQueue', 'Pool'
   2290         )))
   2291 
   2292 testcases_manager = create_test_cases(ManagerMixin, type='manager')
   2293 globals().update(testcases_manager)
   2294 
   2295 
   2296 class ThreadsMixin(object):
   2297     TYPE = 'threads'
   2298     Process = multiprocessing.dummy.Process
   2299     locals().update(get_attributes(multiprocessing.dummy, (
   2300         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
   2301         'Condition', 'Event', 'Value', 'Array', 'current_process',
   2302         'active_children', 'Pipe', 'connection', 'dict', 'list',
   2303         'Namespace', 'JoinableQueue', 'Pool'
   2304         )))
   2305 
   2306 testcases_threads = create_test_cases(ThreadsMixin, type='threads')
   2307 globals().update(testcases_threads)
   2308 
   2309 class OtherTest(unittest.TestCase):
   2310     # TODO: add more tests for deliver/answer challenge.
   2311     def test_deliver_challenge_auth_failure(self):
   2312         class _FakeConnection(object):
   2313             def recv_bytes(self, size):
   2314                 return b'something bogus'
   2315             def send_bytes(self, data):
   2316                 pass
   2317         self.assertRaises(multiprocessing.AuthenticationError,
   2318                           multiprocessing.connection.deliver_challenge,
   2319                           _FakeConnection(), b'abc')
   2320 
   2321     def test_answer_challenge_auth_failure(self):
   2322         class _FakeConnection(object):
   2323             def __init__(self):
   2324                 self.count = 0
   2325             def recv_bytes(self, size):
   2326                 self.count += 1
   2327                 if self.count == 1:
   2328                     return multiprocessing.connection.CHALLENGE
   2329                 elif self.count == 2:
   2330                     return b'something bogus'
   2331                 return b''
   2332             def send_bytes(self, data):
   2333                 pass
   2334         self.assertRaises(multiprocessing.AuthenticationError,
   2335                           multiprocessing.connection.answer_challenge,
   2336                           _FakeConnection(), b'abc')
   2337 
   2338 #
   2339 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
   2340 #
   2341 
   2342 def initializer(ns):
   2343     ns.test += 1
   2344 
   2345 class TestInitializers(unittest.TestCase):
   2346     def setUp(self):
   2347         self.mgr = multiprocessing.Manager()
   2348         self.ns = self.mgr.Namespace()
   2349         self.ns.test = 0
   2350 
   2351     def tearDown(self):
   2352         self.mgr.shutdown()
   2353 
   2354     def test_manager_initializer(self):
   2355         m = multiprocessing.managers.SyncManager()
   2356         self.assertRaises(TypeError, m.start, 1)
   2357         m.start(initializer, (self.ns,))
   2358         self.assertEqual(self.ns.test, 1)
   2359         m.shutdown()
   2360 
   2361     def test_pool_initializer(self):
   2362         self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
   2363         p = multiprocessing.Pool(1, initializer, (self.ns,))
   2364         p.close()
   2365         p.join()
   2366         self.assertEqual(self.ns.test, 1)
   2367 
   2368 #
   2369 # Issue 5155, 5313, 5331: Test process in processes
   2370 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
   2371 #
   2372 
   2373 def _this_sub_process(q):
   2374     try:
   2375         item = q.get(block=False)
   2376     except Queue.Empty:
   2377         pass
   2378 
   2379 def _test_process(q):
   2380     queue = multiprocessing.Queue()
   2381     subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
   2382     subProc.daemon = True
   2383     subProc.start()
   2384     subProc.join()
   2385 
   2386 def _afunc(x):
   2387     return x*x
   2388 
   2389 def pool_in_process():
   2390     pool = multiprocessing.Pool(processes=4)
   2391     x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
   2392 
   2393 class _file_like(object):
   2394     def __init__(self, delegate):
   2395         self._delegate = delegate
   2396         self._pid = None
   2397 
   2398     @property
   2399     def cache(self):
   2400         pid = os.getpid()
   2401         # There are no race conditions since fork keeps only the running thread
   2402         if pid != self._pid:
   2403             self._pid = pid
   2404             self._cache = []
   2405         return self._cache
   2406 
   2407     def write(self, data):
   2408         self.cache.append(data)
   2409 
   2410     def flush(self):
   2411         self._delegate.write(''.join(self.cache))
   2412         self._cache = []
   2413 
   2414 class TestStdinBadfiledescriptor(unittest.TestCase):
   2415 
   2416     def test_queue_in_process(self):
   2417         queue = multiprocessing.Queue()
   2418         proc = multiprocessing.Process(target=_test_process, args=(queue,))
   2419         proc.start()
   2420         proc.join()
   2421 
   2422     def test_pool_in_process(self):
   2423         p = multiprocessing.Process(target=pool_in_process)
   2424         p.start()
   2425         p.join()
   2426 
   2427     def test_flushing(self):
   2428         sio = StringIO()
   2429         flike = _file_like(sio)
   2430         flike.write('foo')
   2431         proc = multiprocessing.Process(target=lambda: flike.flush())
   2432         flike.flush()
   2433         assert sio.getvalue() == 'foo'
   2434 
   2435 #
   2436 # Test interaction with socket timeouts - see Issue #6056
   2437 #
   2438 
   2439 class TestTimeouts(unittest.TestCase):
   2440     @classmethod
   2441     def _test_timeout(cls, child, address):
   2442         time.sleep(1)
   2443         child.send(123)
   2444         child.close()
   2445         conn = multiprocessing.connection.Client(address)
   2446         conn.send(456)
   2447         conn.close()
   2448 
   2449     def test_timeout(self):
   2450         old_timeout = socket.getdefaulttimeout()
   2451         try:
   2452             socket.setdefaulttimeout(0.1)
   2453             parent, child = multiprocessing.Pipe(duplex=True)
   2454             l = multiprocessing.connection.Listener(family='AF_INET')
   2455             p = multiprocessing.Process(target=self._test_timeout,
   2456                                         args=(child, l.address))
   2457             p.start()
   2458             child.close()
   2459             self.assertEqual(parent.recv(), 123)
   2460             parent.close()
   2461             conn = l.accept()
   2462             self.assertEqual(conn.recv(), 456)
   2463             conn.close()
   2464             l.close()
   2465             p.join(10)
   2466         finally:
   2467             socket.setdefaulttimeout(old_timeout)
   2468 
   2469 #
   2470 # Test what happens with no "if __name__ == '__main__'"
   2471 #
   2472 
   2473 class TestNoForkBomb(unittest.TestCase):
   2474     def test_noforkbomb(self):
   2475         name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
   2476         if WIN32:
   2477             rc, out, err = test.script_helper.assert_python_failure(name)
   2478             self.assertEqual(out, '')
   2479             self.assertIn('RuntimeError', err)
   2480         else:
   2481             rc, out, err = test.script_helper.assert_python_ok(name)
   2482             self.assertEqual(out.rstrip(), '123')
   2483             self.assertEqual(err, '')
   2484 
   2485 #
   2486 # Issue 12098: check sys.flags of child matches that for parent
   2487 #
   2488 
   2489 class TestFlags(unittest.TestCase):
   2490     @classmethod
   2491     def run_in_grandchild(cls, conn):
   2492         conn.send(tuple(sys.flags))
   2493 
   2494     @classmethod
   2495     def run_in_child(cls):
   2496         import json
   2497         r, w = multiprocessing.Pipe(duplex=False)
   2498         p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
   2499         p.start()
   2500         grandchild_flags = r.recv()
   2501         p.join()
   2502         r.close()
   2503         w.close()
   2504         flags = (tuple(sys.flags), grandchild_flags)
   2505         print(json.dumps(flags))
   2506 
   2507     @test_support.requires_unicode  # XXX json needs unicode support
   2508     def test_flags(self):
   2509         import json, subprocess
   2510         # start child process using unusual flags
   2511         prog = ('from test.test_multiprocessing import TestFlags; ' +
   2512                 'TestFlags.run_in_child()')
   2513         data = subprocess.check_output(
   2514             [sys.executable, '-E', '-B', '-O', '-c', prog])
   2515         child_flags, grandchild_flags = json.loads(data.decode('ascii'))
   2516         self.assertEqual(child_flags, grandchild_flags)
   2517 
   2518 #
   2519 # Issue #17555: ForkAwareThreadLock
   2520 #
   2521 
   2522 class TestForkAwareThreadLock(unittest.TestCase):
   2523     # We recurisvely start processes.  Issue #17555 meant that the
   2524     # after fork registry would get duplicate entries for the same
   2525     # lock.  The size of the registry at generation n was ~2**n.
   2526 
   2527     @classmethod
   2528     def child(cls, n, conn):
   2529         if n > 1:
   2530             p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
   2531             p.start()
   2532             p.join()
   2533         else:
   2534             conn.send(len(util._afterfork_registry))
   2535         conn.close()
   2536 
   2537     def test_lock(self):
   2538         r, w = multiprocessing.Pipe(False)
   2539         l = util.ForkAwareThreadLock()
   2540         old_size = len(util._afterfork_registry)
   2541         p = multiprocessing.Process(target=self.child, args=(5, w))
   2542         p.start()
   2543         new_size = r.recv()
   2544         p.join()
   2545         self.assertLessEqual(new_size, old_size)
   2546 
   2547 #
   2548 # Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
   2549 #
   2550 
   2551 class TestIgnoreEINTR(unittest.TestCase):
   2552 
   2553     @classmethod
   2554     def _test_ignore(cls, conn):
   2555         def handler(signum, frame):
   2556             pass
   2557         signal.signal(signal.SIGUSR1, handler)
   2558         conn.send('ready')
   2559         x = conn.recv()
   2560         conn.send(x)
   2561         conn.send_bytes(b'x'*(1024*1024))   # sending 1 MB should block
   2562 
   2563     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
   2564     def test_ignore(self):
   2565         conn, child_conn = multiprocessing.Pipe()
   2566         try:
   2567             p = multiprocessing.Process(target=self._test_ignore,
   2568                                         args=(child_conn,))
   2569             p.daemon = True
   2570             p.start()
   2571             child_conn.close()
   2572             self.assertEqual(conn.recv(), 'ready')
   2573             time.sleep(0.1)
   2574             os.kill(p.pid, signal.SIGUSR1)
   2575             time.sleep(0.1)
   2576             conn.send(1234)
   2577             self.assertEqual(conn.recv(), 1234)
   2578             time.sleep(0.1)
   2579             os.kill(p.pid, signal.SIGUSR1)
   2580             self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
   2581             time.sleep(0.1)
   2582             p.join()
   2583         finally:
   2584             conn.close()
   2585 
   2586     @classmethod
   2587     def _test_ignore_listener(cls, conn):
   2588         def handler(signum, frame):
   2589             pass
   2590         signal.signal(signal.SIGUSR1, handler)
   2591         l = multiprocessing.connection.Listener()
   2592         conn.send(l.address)
   2593         a = l.accept()
   2594         a.send('welcome')
   2595 
   2596     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
   2597     def test_ignore_listener(self):
   2598         conn, child_conn = multiprocessing.Pipe()
   2599         try:
   2600             p = multiprocessing.Process(target=self._test_ignore_listener,
   2601                                         args=(child_conn,))
   2602             p.daemon = True
   2603             p.start()
   2604             child_conn.close()
   2605             address = conn.recv()
   2606             time.sleep(0.1)
   2607             os.kill(p.pid, signal.SIGUSR1)
   2608             time.sleep(0.1)
   2609             client = multiprocessing.connection.Client(address)
   2610             self.assertEqual(client.recv(), 'welcome')
   2611             p.join()
   2612         finally:
   2613             conn.close()
   2614 
   2615 #
   2616 #
   2617 #
   2618 
   2619 testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
   2620                    TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
   2621                    TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
   2622 
   2623 #
   2624 #
   2625 #
   2626 
   2627 def test_main(run=None):
   2628     if sys.platform.startswith("linux"):
   2629         try:
   2630             lock = multiprocessing.RLock()
   2631         except OSError:
   2632             raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
   2633 
   2634     check_enough_semaphores()
   2635 
   2636     if run is None:
   2637         from test.test_support import run_unittest as run
   2638 
   2639     util.get_temp_dir()     # creates temp directory for use by all processes
   2640 
   2641     multiprocessing.get_logger().setLevel(LOG_LEVEL)
   2642 
   2643     ProcessesMixin.pool = multiprocessing.Pool(4)
   2644     ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
   2645     ManagerMixin.manager.__init__()
   2646     ManagerMixin.manager.start()
   2647     ManagerMixin.pool = ManagerMixin.manager.Pool(4)
   2648 
   2649     testcases = (
   2650         sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
   2651         sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
   2652         sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
   2653         testcases_other
   2654         )
   2655 
   2656     loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
   2657     suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
   2658     # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
   2659     # module during these tests is at least platform dependent and possibly
   2660     # non-deterministic on any given platform. So we don't mind if the listed
   2661     # warnings aren't actually raised.
   2662     with test_support.check_py3k_warnings(
   2663             (".+__(get|set)slice__ has been removed", DeprecationWarning),
   2664             (r"sys.exc_clear\(\) not supported", DeprecationWarning),
   2665             quiet=True):
   2666         run(suite)
   2667 
   2668     ThreadsMixin.pool.terminate()
   2669     ProcessesMixin.pool.terminate()
   2670     ManagerMixin.pool.terminate()
   2671     ManagerMixin.manager.shutdown()
   2672 
   2673     del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
   2674 
   2675 def main():
   2676     test_main(unittest.TextTestRunner(verbosity=2).run)
   2677 
   2678 if __name__ == '__main__':
   2679     main()
   2680