1 # 2 # Unit tests for the multiprocessing package 3 # 4 5 import unittest 6 import queue as pyqueue 7 import time 8 import io 9 import itertools 10 import sys 11 import os 12 import gc 13 import errno 14 import signal 15 import array 16 import socket 17 import random 18 import logging 19 import struct 20 import operator 21 import test.support 22 import test.support.script_helper 23 24 25 # Skip tests if _multiprocessing wasn't built. 26 _multiprocessing = test.support.import_module('_multiprocessing') 27 # Skip tests if sem_open implementation is broken. 28 test.support.import_module('multiprocessing.synchronize') 29 # import threading after _multiprocessing to raise a more relevant error 30 # message: "No module named _multiprocessing". _multiprocessing is not compiled 31 # without thread support. 32 import threading 33 34 import multiprocessing.dummy 35 import multiprocessing.connection 36 import multiprocessing.managers 37 import multiprocessing.heap 38 import multiprocessing.pool 39 40 from multiprocessing import util 41 42 try: 43 from multiprocessing import reduction 44 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 45 except ImportError: 46 HAS_REDUCTION = False 47 48 try: 49 from multiprocessing.sharedctypes import Value, copy 50 HAS_SHAREDCTYPES = True 51 except ImportError: 52 HAS_SHAREDCTYPES = False 53 54 try: 55 import msvcrt 56 except ImportError: 57 msvcrt = None 58 59 # 60 # 61 # 62 63 def latin(s): 64 return s.encode('latin') 65 66 # 67 # Constants 68 # 69 70 LOG_LEVEL = util.SUBWARNING 71 #LOG_LEVEL = logging.DEBUG 72 73 DELTA = 0.1 74 CHECK_TIMINGS = False # making true makes tests take a lot longer 75 # and can sometimes cause some non-serious 76 # failures because some calls block a bit 77 # longer than expected 78 if CHECK_TIMINGS: 79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 80 else: 81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 82 83 HAVE_GETVALUE = not getattr(_multiprocessing, 84 'HAVE_BROKEN_SEM_GETVALUE', False) 85 86 WIN32 = (sys.platform == "win32") 87 88 from multiprocessing.connection import wait 89 90 def wait_for_handle(handle, timeout): 91 if timeout is not None and timeout < 0.0: 92 timeout = None 93 return wait([handle], timeout) 94 95 try: 96 MAXFD = os.sysconf("SC_OPEN_MAX") 97 except: 98 MAXFD = 256 99 100 # To speed up tests when using the forkserver, we can preload these: 101 PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] 102 103 # 104 # Some tests require ctypes 105 # 106 107 try: 108 from ctypes import Structure, c_int, c_double 109 except ImportError: 110 Structure = object 111 c_int = c_double = None 112 113 114 def check_enough_semaphores(): 115 """Check that the system supports enough semaphores to run the test.""" 116 # minimum number of semaphores available according to POSIX 117 nsems_min = 256 118 try: 119 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 120 except (AttributeError, ValueError): 121 # sysconf not available or setting not available 122 return 123 if nsems == -1 or nsems >= nsems_min: 124 return 125 raise unittest.SkipTest("The OS doesn't support enough semaphores " 126 "to run the test (required: %d)." % nsems_min) 127 128 129 # 130 # Creates a wrapper for a function which records the time it takes to finish 131 # 132 133 class TimingWrapper(object): 134 135 def __init__(self, func): 136 self.func = func 137 self.elapsed = None 138 139 def __call__(self, *args, **kwds): 140 t = time.time() 141 try: 142 return self.func(*args, **kwds) 143 finally: 144 self.elapsed = time.time() - t 145 146 # 147 # Base class for test cases 148 # 149 150 class BaseTestCase(object): 151 152 ALLOWED_TYPES = ('processes', 'manager', 'threads') 153 154 def assertTimingAlmostEqual(self, a, b): 155 if CHECK_TIMINGS: 156 self.assertAlmostEqual(a, b, 1) 157 158 def assertReturnsIfImplemented(self, value, func, *args): 159 try: 160 res = func(*args) 161 except NotImplementedError: 162 pass 163 else: 164 return self.assertEqual(value, res) 165 166 # For the sanity of Windows users, rather than crashing or freezing in 167 # multiple ways. 168 def __reduce__(self, *args): 169 raise NotImplementedError("shouldn't try to pickle a test case") 170 171 __reduce_ex__ = __reduce__ 172 173 # 174 # Return the value of a semaphore 175 # 176 177 def get_value(self): 178 try: 179 return self.get_value() 180 except AttributeError: 181 try: 182 return self._Semaphore__value 183 except AttributeError: 184 try: 185 return self._value 186 except AttributeError: 187 raise NotImplementedError 188 189 # 190 # Testcases 191 # 192 193 class _TestProcess(BaseTestCase): 194 195 ALLOWED_TYPES = ('processes', 'threads') 196 197 def test_current(self): 198 if self.TYPE == 'threads': 199 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 200 201 current = self.current_process() 202 authkey = current.authkey 203 204 self.assertTrue(current.is_alive()) 205 self.assertTrue(not current.daemon) 206 self.assertIsInstance(authkey, bytes) 207 self.assertTrue(len(authkey) > 0) 208 self.assertEqual(current.ident, os.getpid()) 209 self.assertEqual(current.exitcode, None) 210 211 def test_daemon_argument(self): 212 if self.TYPE == "threads": 213 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 214 215 # By default uses the current process's daemon flag. 216 proc0 = self.Process(target=self._test) 217 self.assertEqual(proc0.daemon, self.current_process().daemon) 218 proc1 = self.Process(target=self._test, daemon=True) 219 self.assertTrue(proc1.daemon) 220 proc2 = self.Process(target=self._test, daemon=False) 221 self.assertFalse(proc2.daemon) 222 223 @classmethod 224 def _test(cls, q, *args, **kwds): 225 current = cls.current_process() 226 q.put(args) 227 q.put(kwds) 228 q.put(current.name) 229 if cls.TYPE != 'threads': 230 q.put(bytes(current.authkey)) 231 q.put(current.pid) 232 233 def test_process(self): 234 q = self.Queue(1) 235 e = self.Event() 236 args = (q, 1, 2) 237 kwargs = {'hello':23, 'bye':2.54} 238 name = 'SomeProcess' 239 p = self.Process( 240 target=self._test, args=args, kwargs=kwargs, name=name 241 ) 242 p.daemon = True 243 current = self.current_process() 244 245 if self.TYPE != 'threads': 246 self.assertEqual(p.authkey, current.authkey) 247 self.assertEqual(p.is_alive(), False) 248 self.assertEqual(p.daemon, True) 249 self.assertNotIn(p, self.active_children()) 250 self.assertTrue(type(self.active_children()) is list) 251 self.assertEqual(p.exitcode, None) 252 253 p.start() 254 255 self.assertEqual(p.exitcode, None) 256 self.assertEqual(p.is_alive(), True) 257 self.assertIn(p, self.active_children()) 258 259 self.assertEqual(q.get(), args[1:]) 260 self.assertEqual(q.get(), kwargs) 261 self.assertEqual(q.get(), p.name) 262 if self.TYPE != 'threads': 263 self.assertEqual(q.get(), current.authkey) 264 self.assertEqual(q.get(), p.pid) 265 266 p.join() 267 268 self.assertEqual(p.exitcode, 0) 269 self.assertEqual(p.is_alive(), False) 270 self.assertNotIn(p, self.active_children()) 271 272 @classmethod 273 def _test_terminate(cls): 274 time.sleep(100) 275 276 def test_terminate(self): 277 if self.TYPE == 'threads': 278 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 279 280 p = self.Process(target=self._test_terminate) 281 p.daemon = True 282 p.start() 283 284 self.assertEqual(p.is_alive(), True) 285 self.assertIn(p, self.active_children()) 286 self.assertEqual(p.exitcode, None) 287 288 join = TimingWrapper(p.join) 289 290 self.assertEqual(join(0), None) 291 self.assertTimingAlmostEqual(join.elapsed, 0.0) 292 self.assertEqual(p.is_alive(), True) 293 294 self.assertEqual(join(-1), None) 295 self.assertTimingAlmostEqual(join.elapsed, 0.0) 296 self.assertEqual(p.is_alive(), True) 297 298 # XXX maybe terminating too soon causes the problems on Gentoo... 299 time.sleep(1) 300 301 p.terminate() 302 303 if hasattr(signal, 'alarm'): 304 # On the Gentoo buildbot waitpid() often seems to block forever. 305 # We use alarm() to interrupt it if it blocks for too long. 306 def handler(*args): 307 raise RuntimeError('join took too long: %s' % p) 308 old_handler = signal.signal(signal.SIGALRM, handler) 309 try: 310 signal.alarm(10) 311 self.assertEqual(join(), None) 312 finally: 313 signal.alarm(0) 314 signal.signal(signal.SIGALRM, old_handler) 315 else: 316 self.assertEqual(join(), None) 317 318 self.assertTimingAlmostEqual(join.elapsed, 0.0) 319 320 self.assertEqual(p.is_alive(), False) 321 self.assertNotIn(p, self.active_children()) 322 323 p.join() 324 325 # XXX sometimes get p.exitcode == 0 on Windows ... 326 #self.assertEqual(p.exitcode, -signal.SIGTERM) 327 328 def test_cpu_count(self): 329 try: 330 cpus = multiprocessing.cpu_count() 331 except NotImplementedError: 332 cpus = 1 333 self.assertTrue(type(cpus) is int) 334 self.assertTrue(cpus >= 1) 335 336 def test_active_children(self): 337 self.assertEqual(type(self.active_children()), list) 338 339 p = self.Process(target=time.sleep, args=(DELTA,)) 340 self.assertNotIn(p, self.active_children()) 341 342 p.daemon = True 343 p.start() 344 self.assertIn(p, self.active_children()) 345 346 p.join() 347 self.assertNotIn(p, self.active_children()) 348 349 @classmethod 350 def _test_recursion(cls, wconn, id): 351 wconn.send(id) 352 if len(id) < 2: 353 for i in range(2): 354 p = cls.Process( 355 target=cls._test_recursion, args=(wconn, id+[i]) 356 ) 357 p.start() 358 p.join() 359 360 def test_recursion(self): 361 rconn, wconn = self.Pipe(duplex=False) 362 self._test_recursion(wconn, []) 363 364 time.sleep(DELTA) 365 result = [] 366 while rconn.poll(): 367 result.append(rconn.recv()) 368 369 expected = [ 370 [], 371 [0], 372 [0, 0], 373 [0, 1], 374 [1], 375 [1, 0], 376 [1, 1] 377 ] 378 self.assertEqual(result, expected) 379 380 @classmethod 381 def _test_sentinel(cls, event): 382 event.wait(10.0) 383 384 def test_sentinel(self): 385 if self.TYPE == "threads": 386 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 387 event = self.Event() 388 p = self.Process(target=self._test_sentinel, args=(event,)) 389 with self.assertRaises(ValueError): 390 p.sentinel 391 p.start() 392 self.addCleanup(p.join) 393 sentinel = p.sentinel 394 self.assertIsInstance(sentinel, int) 395 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 396 event.set() 397 p.join() 398 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 399 400 # 401 # 402 # 403 404 class _UpperCaser(multiprocessing.Process): 405 406 def __init__(self): 407 multiprocessing.Process.__init__(self) 408 self.child_conn, self.parent_conn = multiprocessing.Pipe() 409 410 def run(self): 411 self.parent_conn.close() 412 for s in iter(self.child_conn.recv, None): 413 self.child_conn.send(s.upper()) 414 self.child_conn.close() 415 416 def submit(self, s): 417 assert type(s) is str 418 self.parent_conn.send(s) 419 return self.parent_conn.recv() 420 421 def stop(self): 422 self.parent_conn.send(None) 423 self.parent_conn.close() 424 self.child_conn.close() 425 426 class _TestSubclassingProcess(BaseTestCase): 427 428 ALLOWED_TYPES = ('processes',) 429 430 def test_subclassing(self): 431 uppercaser = _UpperCaser() 432 uppercaser.daemon = True 433 uppercaser.start() 434 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 435 self.assertEqual(uppercaser.submit('world'), 'WORLD') 436 uppercaser.stop() 437 uppercaser.join() 438 439 def test_stderr_flush(self): 440 # sys.stderr is flushed at process shutdown (issue #13812) 441 if self.TYPE == "threads": 442 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 443 444 testfn = test.support.TESTFN 445 self.addCleanup(test.support.unlink, testfn) 446 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 447 proc.start() 448 proc.join() 449 with open(testfn, 'r') as f: 450 err = f.read() 451 # The whole traceback was printed 452 self.assertIn("ZeroDivisionError", err) 453 self.assertIn("test_multiprocessing.py", err) 454 self.assertIn("1/0 # MARKER", err) 455 456 @classmethod 457 def _test_stderr_flush(cls, testfn): 458 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 459 sys.stderr = open(fd, 'w', closefd=False) 460 1/0 # MARKER 461 462 463 @classmethod 464 def _test_sys_exit(cls, reason, testfn): 465 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 466 sys.stderr = open(fd, 'w', closefd=False) 467 sys.exit(reason) 468 469 def test_sys_exit(self): 470 # See Issue 13854 471 if self.TYPE == 'threads': 472 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 473 474 testfn = test.support.TESTFN 475 self.addCleanup(test.support.unlink, testfn) 476 477 for reason in ( 478 [1, 2, 3], 479 'ignore this', 480 ): 481 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 482 p.daemon = True 483 p.start() 484 p.join(5) 485 self.assertEqual(p.exitcode, 1) 486 487 with open(testfn, 'r') as f: 488 content = f.read() 489 self.assertEqual(content.rstrip(), str(reason)) 490 491 os.unlink(testfn) 492 493 for reason in (True, False, 8): 494 p = self.Process(target=sys.exit, args=(reason,)) 495 p.daemon = True 496 p.start() 497 p.join(5) 498 self.assertEqual(p.exitcode, reason) 499 500 # 501 # 502 # 503 504 def queue_empty(q): 505 if hasattr(q, 'empty'): 506 return q.empty() 507 else: 508 return q.qsize() == 0 509 510 def queue_full(q, maxsize): 511 if hasattr(q, 'full'): 512 return q.full() 513 else: 514 return q.qsize() == maxsize 515 516 517 class _TestQueue(BaseTestCase): 518 519 520 @classmethod 521 def _test_put(cls, queue, child_can_start, parent_can_continue): 522 child_can_start.wait() 523 for i in range(6): 524 queue.get() 525 parent_can_continue.set() 526 527 def test_put(self): 528 MAXSIZE = 6 529 queue = self.Queue(maxsize=MAXSIZE) 530 child_can_start = self.Event() 531 parent_can_continue = self.Event() 532 533 proc = self.Process( 534 target=self._test_put, 535 args=(queue, child_can_start, parent_can_continue) 536 ) 537 proc.daemon = True 538 proc.start() 539 540 self.assertEqual(queue_empty(queue), True) 541 self.assertEqual(queue_full(queue, MAXSIZE), False) 542 543 queue.put(1) 544 queue.put(2, True) 545 queue.put(3, True, None) 546 queue.put(4, False) 547 queue.put(5, False, None) 548 queue.put_nowait(6) 549 550 # the values may be in buffer but not yet in pipe so sleep a bit 551 time.sleep(DELTA) 552 553 self.assertEqual(queue_empty(queue), False) 554 self.assertEqual(queue_full(queue, MAXSIZE), True) 555 556 put = TimingWrapper(queue.put) 557 put_nowait = TimingWrapper(queue.put_nowait) 558 559 self.assertRaises(pyqueue.Full, put, 7, False) 560 self.assertTimingAlmostEqual(put.elapsed, 0) 561 562 self.assertRaises(pyqueue.Full, put, 7, False, None) 563 self.assertTimingAlmostEqual(put.elapsed, 0) 564 565 self.assertRaises(pyqueue.Full, put_nowait, 7) 566 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 567 568 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 569 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 570 571 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 572 self.assertTimingAlmostEqual(put.elapsed, 0) 573 574 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 575 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 576 577 child_can_start.set() 578 parent_can_continue.wait() 579 580 self.assertEqual(queue_empty(queue), True) 581 self.assertEqual(queue_full(queue, MAXSIZE), False) 582 583 proc.join() 584 585 @classmethod 586 def _test_get(cls, queue, child_can_start, parent_can_continue): 587 child_can_start.wait() 588 #queue.put(1) 589 queue.put(2) 590 queue.put(3) 591 queue.put(4) 592 queue.put(5) 593 parent_can_continue.set() 594 595 def test_get(self): 596 queue = self.Queue() 597 child_can_start = self.Event() 598 parent_can_continue = self.Event() 599 600 proc = self.Process( 601 target=self._test_get, 602 args=(queue, child_can_start, parent_can_continue) 603 ) 604 proc.daemon = True 605 proc.start() 606 607 self.assertEqual(queue_empty(queue), True) 608 609 child_can_start.set() 610 parent_can_continue.wait() 611 612 time.sleep(DELTA) 613 self.assertEqual(queue_empty(queue), False) 614 615 # Hangs unexpectedly, remove for now 616 #self.assertEqual(queue.get(), 1) 617 self.assertEqual(queue.get(True, None), 2) 618 self.assertEqual(queue.get(True), 3) 619 self.assertEqual(queue.get(timeout=1), 4) 620 self.assertEqual(queue.get_nowait(), 5) 621 622 self.assertEqual(queue_empty(queue), True) 623 624 get = TimingWrapper(queue.get) 625 get_nowait = TimingWrapper(queue.get_nowait) 626 627 self.assertRaises(pyqueue.Empty, get, False) 628 self.assertTimingAlmostEqual(get.elapsed, 0) 629 630 self.assertRaises(pyqueue.Empty, get, False, None) 631 self.assertTimingAlmostEqual(get.elapsed, 0) 632 633 self.assertRaises(pyqueue.Empty, get_nowait) 634 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 635 636 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 637 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 638 639 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 640 self.assertTimingAlmostEqual(get.elapsed, 0) 641 642 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 643 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 644 645 proc.join() 646 647 @classmethod 648 def _test_fork(cls, queue): 649 for i in range(10, 20): 650 queue.put(i) 651 # note that at this point the items may only be buffered, so the 652 # process cannot shutdown until the feeder thread has finished 653 # pushing items onto the pipe. 654 655 def test_fork(self): 656 # Old versions of Queue would fail to create a new feeder 657 # thread for a forked process if the original process had its 658 # own feeder thread. This test checks that this no longer 659 # happens. 660 661 queue = self.Queue() 662 663 # put items on queue so that main process starts a feeder thread 664 for i in range(10): 665 queue.put(i) 666 667 # wait to make sure thread starts before we fork a new process 668 time.sleep(DELTA) 669 670 # fork process 671 p = self.Process(target=self._test_fork, args=(queue,)) 672 p.daemon = True 673 p.start() 674 675 # check that all expected items are in the queue 676 for i in range(20): 677 self.assertEqual(queue.get(), i) 678 self.assertRaises(pyqueue.Empty, queue.get, False) 679 680 p.join() 681 682 def test_qsize(self): 683 q = self.Queue() 684 try: 685 self.assertEqual(q.qsize(), 0) 686 except NotImplementedError: 687 self.skipTest('qsize method not implemented') 688 q.put(1) 689 self.assertEqual(q.qsize(), 1) 690 q.put(5) 691 self.assertEqual(q.qsize(), 2) 692 q.get() 693 self.assertEqual(q.qsize(), 1) 694 q.get() 695 self.assertEqual(q.qsize(), 0) 696 697 @classmethod 698 def _test_task_done(cls, q): 699 for obj in iter(q.get, None): 700 time.sleep(DELTA) 701 q.task_done() 702 703 def test_task_done(self): 704 queue = self.JoinableQueue() 705 706 workers = [self.Process(target=self._test_task_done, args=(queue,)) 707 for i in range(4)] 708 709 for p in workers: 710 p.daemon = True 711 p.start() 712 713 for i in range(10): 714 queue.put(i) 715 716 queue.join() 717 718 for p in workers: 719 queue.put(None) 720 721 for p in workers: 722 p.join() 723 724 def test_no_import_lock_contention(self): 725 with test.support.temp_cwd(): 726 module_name = 'imported_by_an_imported_module' 727 with open(module_name + '.py', 'w') as f: 728 f.write("""if 1: 729 import multiprocessing 730 731 q = multiprocessing.Queue() 732 q.put('knock knock') 733 q.get(timeout=3) 734 q.close() 735 del q 736 """) 737 738 with test.support.DirsOnSysPath(os.getcwd()): 739 try: 740 __import__(module_name) 741 except pyqueue.Empty: 742 self.fail("Probable regression on import lock contention;" 743 " see Issue #22853") 744 745 def test_timeout(self): 746 q = multiprocessing.Queue() 747 start = time.time() 748 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 749 delta = time.time() - start 750 # Tolerate a delta of 30 ms because of the bad clock resolution on 751 # Windows (usually 15.6 ms) 752 self.assertGreaterEqual(delta, 0.170) 753 754 # 755 # 756 # 757 758 class _TestLock(BaseTestCase): 759 760 def test_lock(self): 761 lock = self.Lock() 762 self.assertEqual(lock.acquire(), True) 763 self.assertEqual(lock.acquire(False), False) 764 self.assertEqual(lock.release(), None) 765 self.assertRaises((ValueError, threading.ThreadError), lock.release) 766 767 def test_rlock(self): 768 lock = self.RLock() 769 self.assertEqual(lock.acquire(), True) 770 self.assertEqual(lock.acquire(), True) 771 self.assertEqual(lock.acquire(), True) 772 self.assertEqual(lock.release(), None) 773 self.assertEqual(lock.release(), None) 774 self.assertEqual(lock.release(), None) 775 self.assertRaises((AssertionError, RuntimeError), lock.release) 776 777 def test_lock_context(self): 778 with self.Lock(): 779 pass 780 781 782 class _TestSemaphore(BaseTestCase): 783 784 def _test_semaphore(self, sem): 785 self.assertReturnsIfImplemented(2, get_value, sem) 786 self.assertEqual(sem.acquire(), True) 787 self.assertReturnsIfImplemented(1, get_value, sem) 788 self.assertEqual(sem.acquire(), True) 789 self.assertReturnsIfImplemented(0, get_value, sem) 790 self.assertEqual(sem.acquire(False), False) 791 self.assertReturnsIfImplemented(0, get_value, sem) 792 self.assertEqual(sem.release(), None) 793 self.assertReturnsIfImplemented(1, get_value, sem) 794 self.assertEqual(sem.release(), None) 795 self.assertReturnsIfImplemented(2, get_value, sem) 796 797 def test_semaphore(self): 798 sem = self.Semaphore(2) 799 self._test_semaphore(sem) 800 self.assertEqual(sem.release(), None) 801 self.assertReturnsIfImplemented(3, get_value, sem) 802 self.assertEqual(sem.release(), None) 803 self.assertReturnsIfImplemented(4, get_value, sem) 804 805 def test_bounded_semaphore(self): 806 sem = self.BoundedSemaphore(2) 807 self._test_semaphore(sem) 808 # Currently fails on OS/X 809 #if HAVE_GETVALUE: 810 # self.assertRaises(ValueError, sem.release) 811 # self.assertReturnsIfImplemented(2, get_value, sem) 812 813 def test_timeout(self): 814 if self.TYPE != 'processes': 815 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 816 817 sem = self.Semaphore(0) 818 acquire = TimingWrapper(sem.acquire) 819 820 self.assertEqual(acquire(False), False) 821 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 822 823 self.assertEqual(acquire(False, None), False) 824 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 825 826 self.assertEqual(acquire(False, TIMEOUT1), False) 827 self.assertTimingAlmostEqual(acquire.elapsed, 0) 828 829 self.assertEqual(acquire(True, TIMEOUT2), False) 830 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 831 832 self.assertEqual(acquire(timeout=TIMEOUT3), False) 833 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 834 835 836 class _TestCondition(BaseTestCase): 837 838 @classmethod 839 def f(cls, cond, sleeping, woken, timeout=None): 840 cond.acquire() 841 sleeping.release() 842 cond.wait(timeout) 843 woken.release() 844 cond.release() 845 846 def check_invariant(self, cond): 847 # this is only supposed to succeed when there are no sleepers 848 if self.TYPE == 'processes': 849 try: 850 sleepers = (cond._sleeping_count.get_value() - 851 cond._woken_count.get_value()) 852 self.assertEqual(sleepers, 0) 853 self.assertEqual(cond._wait_semaphore.get_value(), 0) 854 except NotImplementedError: 855 pass 856 857 def test_notify(self): 858 cond = self.Condition() 859 sleeping = self.Semaphore(0) 860 woken = self.Semaphore(0) 861 862 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 863 p.daemon = True 864 p.start() 865 866 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 867 p.daemon = True 868 p.start() 869 870 # wait for both children to start sleeping 871 sleeping.acquire() 872 sleeping.acquire() 873 874 # check no process/thread has woken up 875 time.sleep(DELTA) 876 self.assertReturnsIfImplemented(0, get_value, woken) 877 878 # wake up one process/thread 879 cond.acquire() 880 cond.notify() 881 cond.release() 882 883 # check one process/thread has woken up 884 time.sleep(DELTA) 885 self.assertReturnsIfImplemented(1, get_value, woken) 886 887 # wake up another 888 cond.acquire() 889 cond.notify() 890 cond.release() 891 892 # check other has woken up 893 time.sleep(DELTA) 894 self.assertReturnsIfImplemented(2, get_value, woken) 895 896 # check state is not mucked up 897 self.check_invariant(cond) 898 p.join() 899 900 def test_notify_all(self): 901 cond = self.Condition() 902 sleeping = self.Semaphore(0) 903 woken = self.Semaphore(0) 904 905 # start some threads/processes which will timeout 906 for i in range(3): 907 p = self.Process(target=self.f, 908 args=(cond, sleeping, woken, TIMEOUT1)) 909 p.daemon = True 910 p.start() 911 912 t = threading.Thread(target=self.f, 913 args=(cond, sleeping, woken, TIMEOUT1)) 914 t.daemon = True 915 t.start() 916 917 # wait for them all to sleep 918 for i in range(6): 919 sleeping.acquire() 920 921 # check they have all timed out 922 for i in range(6): 923 woken.acquire() 924 self.assertReturnsIfImplemented(0, get_value, woken) 925 926 # check state is not mucked up 927 self.check_invariant(cond) 928 929 # start some more threads/processes 930 for i in range(3): 931 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 932 p.daemon = True 933 p.start() 934 935 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 936 t.daemon = True 937 t.start() 938 939 # wait for them to all sleep 940 for i in range(6): 941 sleeping.acquire() 942 943 # check no process/thread has woken up 944 time.sleep(DELTA) 945 self.assertReturnsIfImplemented(0, get_value, woken) 946 947 # wake them all up 948 cond.acquire() 949 cond.notify_all() 950 cond.release() 951 952 # check they have all woken 953 for i in range(10): 954 try: 955 if get_value(woken) == 6: 956 break 957 except NotImplementedError: 958 break 959 time.sleep(DELTA) 960 self.assertReturnsIfImplemented(6, get_value, woken) 961 962 # check state is not mucked up 963 self.check_invariant(cond) 964 965 def test_timeout(self): 966 cond = self.Condition() 967 wait = TimingWrapper(cond.wait) 968 cond.acquire() 969 res = wait(TIMEOUT1) 970 cond.release() 971 self.assertEqual(res, False) 972 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 973 974 @classmethod 975 def _test_waitfor_f(cls, cond, state): 976 with cond: 977 state.value = 0 978 cond.notify() 979 result = cond.wait_for(lambda : state.value==4) 980 if not result or state.value != 4: 981 sys.exit(1) 982 983 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 984 def test_waitfor(self): 985 # based on test in test/lock_tests.py 986 cond = self.Condition() 987 state = self.Value('i', -1) 988 989 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 990 p.daemon = True 991 p.start() 992 993 with cond: 994 result = cond.wait_for(lambda : state.value==0) 995 self.assertTrue(result) 996 self.assertEqual(state.value, 0) 997 998 for i in range(4): 999 time.sleep(0.01) 1000 with cond: 1001 state.value += 1 1002 cond.notify() 1003 1004 p.join(5) 1005 self.assertFalse(p.is_alive()) 1006 self.assertEqual(p.exitcode, 0) 1007 1008 @classmethod 1009 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1010 sem.release() 1011 with cond: 1012 expected = 0.1 1013 dt = time.time() 1014 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1015 dt = time.time() - dt 1016 # borrow logic in assertTimeout() from test/lock_tests.py 1017 if not result and expected * 0.6 < dt < expected * 10.0: 1018 success.value = True 1019 1020 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1021 def test_waitfor_timeout(self): 1022 # based on test in test/lock_tests.py 1023 cond = self.Condition() 1024 state = self.Value('i', 0) 1025 success = self.Value('i', False) 1026 sem = self.Semaphore(0) 1027 1028 p = self.Process(target=self._test_waitfor_timeout_f, 1029 args=(cond, state, success, sem)) 1030 p.daemon = True 1031 p.start() 1032 self.assertTrue(sem.acquire(timeout=10)) 1033 1034 # Only increment 3 times, so state == 4 is never reached. 1035 for i in range(3): 1036 time.sleep(0.01) 1037 with cond: 1038 state.value += 1 1039 cond.notify() 1040 1041 p.join(5) 1042 self.assertTrue(success.value) 1043 1044 @classmethod 1045 def _test_wait_result(cls, c, pid): 1046 with c: 1047 c.notify() 1048 time.sleep(1) 1049 if pid is not None: 1050 os.kill(pid, signal.SIGINT) 1051 1052 def test_wait_result(self): 1053 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1054 pid = os.getpid() 1055 else: 1056 pid = None 1057 1058 c = self.Condition() 1059 with c: 1060 self.assertFalse(c.wait(0)) 1061 self.assertFalse(c.wait(0.1)) 1062 1063 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1064 p.start() 1065 1066 self.assertTrue(c.wait(10)) 1067 if pid is not None: 1068 self.assertRaises(KeyboardInterrupt, c.wait, 10) 1069 1070 p.join() 1071 1072 1073 class _TestEvent(BaseTestCase): 1074 1075 @classmethod 1076 def _test_event(cls, event): 1077 time.sleep(TIMEOUT2) 1078 event.set() 1079 1080 def test_event(self): 1081 event = self.Event() 1082 wait = TimingWrapper(event.wait) 1083 1084 # Removed temporarily, due to API shear, this does not 1085 # work with threading._Event objects. is_set == isSet 1086 self.assertEqual(event.is_set(), False) 1087 1088 # Removed, threading.Event.wait() will return the value of the __flag 1089 # instead of None. API Shear with the semaphore backed mp.Event 1090 self.assertEqual(wait(0.0), False) 1091 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1092 self.assertEqual(wait(TIMEOUT1), False) 1093 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1094 1095 event.set() 1096 1097 # See note above on the API differences 1098 self.assertEqual(event.is_set(), True) 1099 self.assertEqual(wait(), True) 1100 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1101 self.assertEqual(wait(TIMEOUT1), True) 1102 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1103 # self.assertEqual(event.is_set(), True) 1104 1105 event.clear() 1106 1107 #self.assertEqual(event.is_set(), False) 1108 1109 p = self.Process(target=self._test_event, args=(event,)) 1110 p.daemon = True 1111 p.start() 1112 self.assertEqual(wait(), True) 1113 1114 # 1115 # Tests for Barrier - adapted from tests in test/lock_tests.py 1116 # 1117 1118 # Many of the tests for threading.Barrier use a list as an atomic 1119 # counter: a value is appended to increment the counter, and the 1120 # length of the list gives the value. We use the class DummyList 1121 # for the same purpose. 1122 1123 class _DummyList(object): 1124 1125 def __init__(self): 1126 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1127 lock = multiprocessing.Lock() 1128 self.__setstate__((wrapper, lock)) 1129 self._lengthbuf[0] = 0 1130 1131 def __setstate__(self, state): 1132 (self._wrapper, self._lock) = state 1133 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1134 1135 def __getstate__(self): 1136 return (self._wrapper, self._lock) 1137 1138 def append(self, _): 1139 with self._lock: 1140 self._lengthbuf[0] += 1 1141 1142 def __len__(self): 1143 with self._lock: 1144 return self._lengthbuf[0] 1145 1146 def _wait(): 1147 # A crude wait/yield function not relying on synchronization primitives. 1148 time.sleep(0.01) 1149 1150 1151 class Bunch(object): 1152 """ 1153 A bunch of threads. 1154 """ 1155 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1156 """ 1157 Construct a bunch of `n` threads running the same function `f`. 1158 If `wait_before_exit` is True, the threads won't terminate until 1159 do_finish() is called. 1160 """ 1161 self.f = f 1162 self.args = args 1163 self.n = n 1164 self.started = namespace.DummyList() 1165 self.finished = namespace.DummyList() 1166 self._can_exit = namespace.Event() 1167 if not wait_before_exit: 1168 self._can_exit.set() 1169 for i in range(n): 1170 p = namespace.Process(target=self.task) 1171 p.daemon = True 1172 p.start() 1173 1174 def task(self): 1175 pid = os.getpid() 1176 self.started.append(pid) 1177 try: 1178 self.f(*self.args) 1179 finally: 1180 self.finished.append(pid) 1181 self._can_exit.wait(30) 1182 assert self._can_exit.is_set() 1183 1184 def wait_for_started(self): 1185 while len(self.started) < self.n: 1186 _wait() 1187 1188 def wait_for_finished(self): 1189 while len(self.finished) < self.n: 1190 _wait() 1191 1192 def do_finish(self): 1193 self._can_exit.set() 1194 1195 1196 class AppendTrue(object): 1197 def __init__(self, obj): 1198 self.obj = obj 1199 def __call__(self): 1200 self.obj.append(True) 1201 1202 1203 class _TestBarrier(BaseTestCase): 1204 """ 1205 Tests for Barrier objects. 1206 """ 1207 N = 5 1208 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1209 1210 def setUp(self): 1211 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1212 1213 def tearDown(self): 1214 self.barrier.abort() 1215 self.barrier = None 1216 1217 def DummyList(self): 1218 if self.TYPE == 'threads': 1219 return [] 1220 elif self.TYPE == 'manager': 1221 return self.manager.list() 1222 else: 1223 return _DummyList() 1224 1225 def run_threads(self, f, args): 1226 b = Bunch(self, f, args, self.N-1) 1227 f(*args) 1228 b.wait_for_finished() 1229 1230 @classmethod 1231 def multipass(cls, barrier, results, n): 1232 m = barrier.parties 1233 assert m == cls.N 1234 for i in range(n): 1235 results[0].append(True) 1236 assert len(results[1]) == i * m 1237 barrier.wait() 1238 results[1].append(True) 1239 assert len(results[0]) == (i + 1) * m 1240 barrier.wait() 1241 try: 1242 assert barrier.n_waiting == 0 1243 except NotImplementedError: 1244 pass 1245 assert not barrier.broken 1246 1247 def test_barrier(self, passes=1): 1248 """ 1249 Test that a barrier is passed in lockstep 1250 """ 1251 results = [self.DummyList(), self.DummyList()] 1252 self.run_threads(self.multipass, (self.barrier, results, passes)) 1253 1254 def test_barrier_10(self): 1255 """ 1256 Test that a barrier works for 10 consecutive runs 1257 """ 1258 return self.test_barrier(10) 1259 1260 @classmethod 1261 def _test_wait_return_f(cls, barrier, queue): 1262 res = barrier.wait() 1263 queue.put(res) 1264 1265 def test_wait_return(self): 1266 """ 1267 test the return value from barrier.wait 1268 """ 1269 queue = self.Queue() 1270 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1271 results = [queue.get() for i in range(self.N)] 1272 self.assertEqual(results.count(0), 1) 1273 1274 @classmethod 1275 def _test_action_f(cls, barrier, results): 1276 barrier.wait() 1277 if len(results) != 1: 1278 raise RuntimeError 1279 1280 def test_action(self): 1281 """ 1282 Test the 'action' callback 1283 """ 1284 results = self.DummyList() 1285 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1286 self.run_threads(self._test_action_f, (barrier, results)) 1287 self.assertEqual(len(results), 1) 1288 1289 @classmethod 1290 def _test_abort_f(cls, barrier, results1, results2): 1291 try: 1292 i = barrier.wait() 1293 if i == cls.N//2: 1294 raise RuntimeError 1295 barrier.wait() 1296 results1.append(True) 1297 except threading.BrokenBarrierError: 1298 results2.append(True) 1299 except RuntimeError: 1300 barrier.abort() 1301 1302 def test_abort(self): 1303 """ 1304 Test that an abort will put the barrier in a broken state 1305 """ 1306 results1 = self.DummyList() 1307 results2 = self.DummyList() 1308 self.run_threads(self._test_abort_f, 1309 (self.barrier, results1, results2)) 1310 self.assertEqual(len(results1), 0) 1311 self.assertEqual(len(results2), self.N-1) 1312 self.assertTrue(self.barrier.broken) 1313 1314 @classmethod 1315 def _test_reset_f(cls, barrier, results1, results2, results3): 1316 i = barrier.wait() 1317 if i == cls.N//2: 1318 # Wait until the other threads are all in the barrier. 1319 while barrier.n_waiting < cls.N-1: 1320 time.sleep(0.001) 1321 barrier.reset() 1322 else: 1323 try: 1324 barrier.wait() 1325 results1.append(True) 1326 except threading.BrokenBarrierError: 1327 results2.append(True) 1328 # Now, pass the barrier again 1329 barrier.wait() 1330 results3.append(True) 1331 1332 def test_reset(self): 1333 """ 1334 Test that a 'reset' on a barrier frees the waiting threads 1335 """ 1336 results1 = self.DummyList() 1337 results2 = self.DummyList() 1338 results3 = self.DummyList() 1339 self.run_threads(self._test_reset_f, 1340 (self.barrier, results1, results2, results3)) 1341 self.assertEqual(len(results1), 0) 1342 self.assertEqual(len(results2), self.N-1) 1343 self.assertEqual(len(results3), self.N) 1344 1345 @classmethod 1346 def _test_abort_and_reset_f(cls, barrier, barrier2, 1347 results1, results2, results3): 1348 try: 1349 i = barrier.wait() 1350 if i == cls.N//2: 1351 raise RuntimeError 1352 barrier.wait() 1353 results1.append(True) 1354 except threading.BrokenBarrierError: 1355 results2.append(True) 1356 except RuntimeError: 1357 barrier.abort() 1358 # Synchronize and reset the barrier. Must synchronize first so 1359 # that everyone has left it when we reset, and after so that no 1360 # one enters it before the reset. 1361 if barrier2.wait() == cls.N//2: 1362 barrier.reset() 1363 barrier2.wait() 1364 barrier.wait() 1365 results3.append(True) 1366 1367 def test_abort_and_reset(self): 1368 """ 1369 Test that a barrier can be reset after being broken. 1370 """ 1371 results1 = self.DummyList() 1372 results2 = self.DummyList() 1373 results3 = self.DummyList() 1374 barrier2 = self.Barrier(self.N) 1375 1376 self.run_threads(self._test_abort_and_reset_f, 1377 (self.barrier, barrier2, results1, results2, results3)) 1378 self.assertEqual(len(results1), 0) 1379 self.assertEqual(len(results2), self.N-1) 1380 self.assertEqual(len(results3), self.N) 1381 1382 @classmethod 1383 def _test_timeout_f(cls, barrier, results): 1384 i = barrier.wait() 1385 if i == cls.N//2: 1386 # One thread is late! 1387 time.sleep(1.0) 1388 try: 1389 barrier.wait(0.5) 1390 except threading.BrokenBarrierError: 1391 results.append(True) 1392 1393 def test_timeout(self): 1394 """ 1395 Test wait(timeout) 1396 """ 1397 results = self.DummyList() 1398 self.run_threads(self._test_timeout_f, (self.barrier, results)) 1399 self.assertEqual(len(results), self.barrier.parties) 1400 1401 @classmethod 1402 def _test_default_timeout_f(cls, barrier, results): 1403 i = barrier.wait(cls.defaultTimeout) 1404 if i == cls.N//2: 1405 # One thread is later than the default timeout 1406 time.sleep(1.0) 1407 try: 1408 barrier.wait() 1409 except threading.BrokenBarrierError: 1410 results.append(True) 1411 1412 def test_default_timeout(self): 1413 """ 1414 Test the barrier's default timeout 1415 """ 1416 barrier = self.Barrier(self.N, timeout=0.5) 1417 results = self.DummyList() 1418 self.run_threads(self._test_default_timeout_f, (barrier, results)) 1419 self.assertEqual(len(results), barrier.parties) 1420 1421 def test_single_thread(self): 1422 b = self.Barrier(1) 1423 b.wait() 1424 b.wait() 1425 1426 @classmethod 1427 def _test_thousand_f(cls, barrier, passes, conn, lock): 1428 for i in range(passes): 1429 barrier.wait() 1430 with lock: 1431 conn.send(i) 1432 1433 def test_thousand(self): 1434 if self.TYPE == 'manager': 1435 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1436 passes = 1000 1437 lock = self.Lock() 1438 conn, child_conn = self.Pipe(False) 1439 for j in range(self.N): 1440 p = self.Process(target=self._test_thousand_f, 1441 args=(self.barrier, passes, child_conn, lock)) 1442 p.start() 1443 1444 for i in range(passes): 1445 for j in range(self.N): 1446 self.assertEqual(conn.recv(), i) 1447 1448 # 1449 # 1450 # 1451 1452 class _TestValue(BaseTestCase): 1453 1454 ALLOWED_TYPES = ('processes',) 1455 1456 codes_values = [ 1457 ('i', 4343, 24234), 1458 ('d', 3.625, -4.25), 1459 ('h', -232, 234), 1460 ('c', latin('x'), latin('y')) 1461 ] 1462 1463 def setUp(self): 1464 if not HAS_SHAREDCTYPES: 1465 self.skipTest("requires multiprocessing.sharedctypes") 1466 1467 @classmethod 1468 def _test(cls, values): 1469 for sv, cv in zip(values, cls.codes_values): 1470 sv.value = cv[2] 1471 1472 1473 def test_value(self, raw=False): 1474 if raw: 1475 values = [self.RawValue(code, value) 1476 for code, value, _ in self.codes_values] 1477 else: 1478 values = [self.Value(code, value) 1479 for code, value, _ in self.codes_values] 1480 1481 for sv, cv in zip(values, self.codes_values): 1482 self.assertEqual(sv.value, cv[1]) 1483 1484 proc = self.Process(target=self._test, args=(values,)) 1485 proc.daemon = True 1486 proc.start() 1487 proc.join() 1488 1489 for sv, cv in zip(values, self.codes_values): 1490 self.assertEqual(sv.value, cv[2]) 1491 1492 def test_rawvalue(self): 1493 self.test_value(raw=True) 1494 1495 def test_getobj_getlock(self): 1496 val1 = self.Value('i', 5) 1497 lock1 = val1.get_lock() 1498 obj1 = val1.get_obj() 1499 1500 val2 = self.Value('i', 5, lock=None) 1501 lock2 = val2.get_lock() 1502 obj2 = val2.get_obj() 1503 1504 lock = self.Lock() 1505 val3 = self.Value('i', 5, lock=lock) 1506 lock3 = val3.get_lock() 1507 obj3 = val3.get_obj() 1508 self.assertEqual(lock, lock3) 1509 1510 arr4 = self.Value('i', 5, lock=False) 1511 self.assertFalse(hasattr(arr4, 'get_lock')) 1512 self.assertFalse(hasattr(arr4, 'get_obj')) 1513 1514 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 1515 1516 arr5 = self.RawValue('i', 5) 1517 self.assertFalse(hasattr(arr5, 'get_lock')) 1518 self.assertFalse(hasattr(arr5, 'get_obj')) 1519 1520 1521 class _TestArray(BaseTestCase): 1522 1523 ALLOWED_TYPES = ('processes',) 1524 1525 @classmethod 1526 def f(cls, seq): 1527 for i in range(1, len(seq)): 1528 seq[i] += seq[i-1] 1529 1530 @unittest.skipIf(c_int is None, "requires _ctypes") 1531 def test_array(self, raw=False): 1532 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 1533 if raw: 1534 arr = self.RawArray('i', seq) 1535 else: 1536 arr = self.Array('i', seq) 1537 1538 self.assertEqual(len(arr), len(seq)) 1539 self.assertEqual(arr[3], seq[3]) 1540 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 1541 1542 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 1543 1544 self.assertEqual(list(arr[:]), seq) 1545 1546 self.f(seq) 1547 1548 p = self.Process(target=self.f, args=(arr,)) 1549 p.daemon = True 1550 p.start() 1551 p.join() 1552 1553 self.assertEqual(list(arr[:]), seq) 1554 1555 @unittest.skipIf(c_int is None, "requires _ctypes") 1556 def test_array_from_size(self): 1557 size = 10 1558 # Test for zeroing (see issue #11675). 1559 # The repetition below strengthens the test by increasing the chances 1560 # of previously allocated non-zero memory being used for the new array 1561 # on the 2nd and 3rd loops. 1562 for _ in range(3): 1563 arr = self.Array('i', size) 1564 self.assertEqual(len(arr), size) 1565 self.assertEqual(list(arr), [0] * size) 1566 arr[:] = range(10) 1567 self.assertEqual(list(arr), list(range(10))) 1568 del arr 1569 1570 @unittest.skipIf(c_int is None, "requires _ctypes") 1571 def test_rawarray(self): 1572 self.test_array(raw=True) 1573 1574 @unittest.skipIf(c_int is None, "requires _ctypes") 1575 def test_getobj_getlock_obj(self): 1576 arr1 = self.Array('i', list(range(10))) 1577 lock1 = arr1.get_lock() 1578 obj1 = arr1.get_obj() 1579 1580 arr2 = self.Array('i', list(range(10)), lock=None) 1581 lock2 = arr2.get_lock() 1582 obj2 = arr2.get_obj() 1583 1584 lock = self.Lock() 1585 arr3 = self.Array('i', list(range(10)), lock=lock) 1586 lock3 = arr3.get_lock() 1587 obj3 = arr3.get_obj() 1588 self.assertEqual(lock, lock3) 1589 1590 arr4 = self.Array('i', range(10), lock=False) 1591 self.assertFalse(hasattr(arr4, 'get_lock')) 1592 self.assertFalse(hasattr(arr4, 'get_obj')) 1593 self.assertRaises(AttributeError, 1594 self.Array, 'i', range(10), lock='notalock') 1595 1596 arr5 = self.RawArray('i', range(10)) 1597 self.assertFalse(hasattr(arr5, 'get_lock')) 1598 self.assertFalse(hasattr(arr5, 'get_obj')) 1599 1600 # 1601 # 1602 # 1603 1604 class _TestContainers(BaseTestCase): 1605 1606 ALLOWED_TYPES = ('manager',) 1607 1608 def test_list(self): 1609 a = self.list(list(range(10))) 1610 self.assertEqual(a[:], list(range(10))) 1611 1612 b = self.list() 1613 self.assertEqual(b[:], []) 1614 1615 b.extend(list(range(5))) 1616 self.assertEqual(b[:], list(range(5))) 1617 1618 self.assertEqual(b[2], 2) 1619 self.assertEqual(b[2:10], [2,3,4]) 1620 1621 b *= 2 1622 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 1623 1624 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 1625 1626 self.assertEqual(a[:], list(range(10))) 1627 1628 d = [a, b] 1629 e = self.list(d) 1630 self.assertEqual( 1631 [element[:] for element in e], 1632 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 1633 ) 1634 1635 f = self.list([a]) 1636 a.append('hello') 1637 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) 1638 1639 def test_list_proxy_in_list(self): 1640 a = self.list([self.list(range(3)) for _i in range(3)]) 1641 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) 1642 1643 a[0][-1] = 55 1644 self.assertEqual(a[0][:], [0, 1, 55]) 1645 for i in range(1, 3): 1646 self.assertEqual(a[i][:], [0, 1, 2]) 1647 1648 self.assertEqual(a[1].pop(), 2) 1649 self.assertEqual(len(a[1]), 2) 1650 for i in range(0, 3, 2): 1651 self.assertEqual(len(a[i]), 3) 1652 1653 del a 1654 1655 b = self.list() 1656 b.append(b) 1657 del b 1658 1659 def test_dict(self): 1660 d = self.dict() 1661 indices = list(range(65, 70)) 1662 for i in indices: 1663 d[i] = chr(i) 1664 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 1665 self.assertEqual(sorted(d.keys()), indices) 1666 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 1667 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 1668 1669 def test_dict_proxy_nested(self): 1670 pets = self.dict(ferrets=2, hamsters=4) 1671 supplies = self.dict(water=10, feed=3) 1672 d = self.dict(pets=pets, supplies=supplies) 1673 1674 self.assertEqual(supplies['water'], 10) 1675 self.assertEqual(d['supplies']['water'], 10) 1676 1677 d['supplies']['blankets'] = 5 1678 self.assertEqual(supplies['blankets'], 5) 1679 self.assertEqual(d['supplies']['blankets'], 5) 1680 1681 d['supplies']['water'] = 7 1682 self.assertEqual(supplies['water'], 7) 1683 self.assertEqual(d['supplies']['water'], 7) 1684 1685 del pets 1686 del supplies 1687 self.assertEqual(d['pets']['ferrets'], 2) 1688 d['supplies']['blankets'] = 11 1689 self.assertEqual(d['supplies']['blankets'], 11) 1690 1691 pets = d['pets'] 1692 supplies = d['supplies'] 1693 supplies['water'] = 7 1694 self.assertEqual(supplies['water'], 7) 1695 self.assertEqual(d['supplies']['water'], 7) 1696 1697 d.clear() 1698 self.assertEqual(len(d), 0) 1699 self.assertEqual(supplies['water'], 7) 1700 self.assertEqual(pets['hamsters'], 4) 1701 1702 l = self.list([pets, supplies]) 1703 l[0]['marmots'] = 1 1704 self.assertEqual(pets['marmots'], 1) 1705 self.assertEqual(l[0]['marmots'], 1) 1706 1707 del pets 1708 del supplies 1709 self.assertEqual(l[0]['marmots'], 1) 1710 1711 outer = self.list([[88, 99], l]) 1712 self.assertIsInstance(outer[0], list) # Not a ListProxy 1713 self.assertEqual(outer[-1][-1]['feed'], 3) 1714 1715 def test_namespace(self): 1716 n = self.Namespace() 1717 n.name = 'Bob' 1718 n.job = 'Builder' 1719 n._hidden = 'hidden' 1720 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 1721 del n.job 1722 self.assertEqual(str(n), "Namespace(name='Bob')") 1723 self.assertTrue(hasattr(n, 'name')) 1724 self.assertTrue(not hasattr(n, 'job')) 1725 1726 # 1727 # 1728 # 1729 1730 def sqr(x, wait=0.0): 1731 time.sleep(wait) 1732 return x*x 1733 1734 def mul(x, y): 1735 return x*y 1736 1737 def raise_large_valuerror(wait): 1738 time.sleep(wait) 1739 raise ValueError("x" * 1024**2) 1740 1741 class SayWhenError(ValueError): pass 1742 1743 def exception_throwing_generator(total, when): 1744 for i in range(total): 1745 if i == when: 1746 raise SayWhenError("Somebody said when") 1747 yield i 1748 1749 class _TestPool(BaseTestCase): 1750 1751 @classmethod 1752 def setUpClass(cls): 1753 super().setUpClass() 1754 cls.pool = cls.Pool(4) 1755 1756 @classmethod 1757 def tearDownClass(cls): 1758 cls.pool.terminate() 1759 cls.pool.join() 1760 cls.pool = None 1761 super().tearDownClass() 1762 1763 def test_apply(self): 1764 papply = self.pool.apply 1765 self.assertEqual(papply(sqr, (5,)), sqr(5)) 1766 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 1767 1768 def test_map(self): 1769 pmap = self.pool.map 1770 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 1771 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 1772 list(map(sqr, list(range(100))))) 1773 1774 def test_starmap(self): 1775 psmap = self.pool.starmap 1776 tuples = list(zip(range(10), range(9,-1, -1))) 1777 self.assertEqual(psmap(mul, tuples), 1778 list(itertools.starmap(mul, tuples))) 1779 tuples = list(zip(range(100), range(99,-1, -1))) 1780 self.assertEqual(psmap(mul, tuples, chunksize=20), 1781 list(itertools.starmap(mul, tuples))) 1782 1783 def test_starmap_async(self): 1784 tuples = list(zip(range(100), range(99,-1, -1))) 1785 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 1786 list(itertools.starmap(mul, tuples))) 1787 1788 def test_map_async(self): 1789 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 1790 list(map(sqr, list(range(10))))) 1791 1792 def test_map_async_callbacks(self): 1793 call_args = self.manager.list() if self.TYPE == 'manager' else [] 1794 self.pool.map_async(int, ['1'], 1795 callback=call_args.append, 1796 error_callback=call_args.append).wait() 1797 self.assertEqual(1, len(call_args)) 1798 self.assertEqual([1], call_args[0]) 1799 self.pool.map_async(int, ['a'], 1800 callback=call_args.append, 1801 error_callback=call_args.append).wait() 1802 self.assertEqual(2, len(call_args)) 1803 self.assertIsInstance(call_args[1], ValueError) 1804 1805 def test_map_unplicklable(self): 1806 # Issue #19425 -- failure to pickle should not cause a hang 1807 if self.TYPE == 'threads': 1808 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1809 class A(object): 1810 def __reduce__(self): 1811 raise RuntimeError('cannot pickle') 1812 with self.assertRaises(RuntimeError): 1813 self.pool.map(sqr, [A()]*10) 1814 1815 def test_map_chunksize(self): 1816 try: 1817 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 1818 except multiprocessing.TimeoutError: 1819 self.fail("pool.map_async with chunksize stalled on null list") 1820 1821 def test_async(self): 1822 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 1823 get = TimingWrapper(res.get) 1824 self.assertEqual(get(), 49) 1825 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1826 1827 def test_async_timeout(self): 1828 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 1829 get = TimingWrapper(res.get) 1830 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 1831 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 1832 1833 def test_imap(self): 1834 it = self.pool.imap(sqr, list(range(10))) 1835 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 1836 1837 it = self.pool.imap(sqr, list(range(10))) 1838 for i in range(10): 1839 self.assertEqual(next(it), i*i) 1840 self.assertRaises(StopIteration, it.__next__) 1841 1842 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 1843 for i in range(1000): 1844 self.assertEqual(next(it), i*i) 1845 self.assertRaises(StopIteration, it.__next__) 1846 1847 def test_imap_handle_iterable_exception(self): 1848 if self.TYPE == 'manager': 1849 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1850 1851 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 1852 for i in range(3): 1853 self.assertEqual(next(it), i*i) 1854 self.assertRaises(SayWhenError, it.__next__) 1855 1856 # SayWhenError seen at start of problematic chunk's results 1857 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 1858 for i in range(6): 1859 self.assertEqual(next(it), i*i) 1860 self.assertRaises(SayWhenError, it.__next__) 1861 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 1862 for i in range(4): 1863 self.assertEqual(next(it), i*i) 1864 self.assertRaises(SayWhenError, it.__next__) 1865 1866 def test_imap_unordered(self): 1867 it = self.pool.imap_unordered(sqr, list(range(1000))) 1868 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 1869 1870 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53) 1871 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 1872 1873 def test_imap_unordered_handle_iterable_exception(self): 1874 if self.TYPE == 'manager': 1875 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1876 1877 it = self.pool.imap_unordered(sqr, 1878 exception_throwing_generator(10, 3), 1879 1) 1880 expected_values = list(map(sqr, list(range(10)))) 1881 with self.assertRaises(SayWhenError): 1882 # imap_unordered makes it difficult to anticipate the SayWhenError 1883 for i in range(10): 1884 value = next(it) 1885 self.assertIn(value, expected_values) 1886 expected_values.remove(value) 1887 1888 it = self.pool.imap_unordered(sqr, 1889 exception_throwing_generator(20, 7), 1890 2) 1891 expected_values = list(map(sqr, list(range(20)))) 1892 with self.assertRaises(SayWhenError): 1893 for i in range(20): 1894 value = next(it) 1895 self.assertIn(value, expected_values) 1896 expected_values.remove(value) 1897 1898 def test_make_pool(self): 1899 expected_error = (RemoteError if self.TYPE == 'manager' 1900 else ValueError) 1901 1902 self.assertRaises(expected_error, self.Pool, -1) 1903 self.assertRaises(expected_error, self.Pool, 0) 1904 1905 if self.TYPE != 'manager': 1906 p = self.Pool(3) 1907 try: 1908 self.assertEqual(3, len(p._pool)) 1909 finally: 1910 p.close() 1911 p.join() 1912 1913 def test_terminate(self): 1914 result = self.pool.map_async( 1915 time.sleep, [0.1 for i in range(10000)], chunksize=1 1916 ) 1917 self.pool.terminate() 1918 join = TimingWrapper(self.pool.join) 1919 join() 1920 # Sanity check the pool didn't wait for all tasks to finish 1921 self.assertLess(join.elapsed, 2.0) 1922 1923 def test_empty_iterable(self): 1924 # See Issue 12157 1925 p = self.Pool(1) 1926 1927 self.assertEqual(p.map(sqr, []), []) 1928 self.assertEqual(list(p.imap(sqr, [])), []) 1929 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 1930 self.assertEqual(p.map_async(sqr, []).get(), []) 1931 1932 p.close() 1933 p.join() 1934 1935 def test_context(self): 1936 if self.TYPE == 'processes': 1937 L = list(range(10)) 1938 expected = [sqr(i) for i in L] 1939 with self.Pool(2) as p: 1940 r = p.map_async(sqr, L) 1941 self.assertEqual(r.get(), expected) 1942 self.assertRaises(ValueError, p.map_async, sqr, L) 1943 1944 @classmethod 1945 def _test_traceback(cls): 1946 raise RuntimeError(123) # some comment 1947 1948 def test_traceback(self): 1949 # We want ensure that the traceback from the child process is 1950 # contained in the traceback raised in the main process. 1951 if self.TYPE == 'processes': 1952 with self.Pool(1) as p: 1953 try: 1954 p.apply(self._test_traceback) 1955 except Exception as e: 1956 exc = e 1957 else: 1958 raise AssertionError('expected RuntimeError') 1959 self.assertIs(type(exc), RuntimeError) 1960 self.assertEqual(exc.args, (123,)) 1961 cause = exc.__cause__ 1962 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 1963 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 1964 1965 with test.support.captured_stderr() as f1: 1966 try: 1967 raise exc 1968 except RuntimeError: 1969 sys.excepthook(*sys.exc_info()) 1970 self.assertIn('raise RuntimeError(123) # some comment', 1971 f1.getvalue()) 1972 1973 @classmethod 1974 def _test_wrapped_exception(cls): 1975 raise RuntimeError('foo') 1976 1977 def test_wrapped_exception(self): 1978 # Issue #20980: Should not wrap exception when using thread pool 1979 with self.Pool(1) as p: 1980 with self.assertRaises(RuntimeError): 1981 p.apply(self._test_wrapped_exception) 1982 1983 def test_map_no_failfast(self): 1984 # Issue #23992: the fail-fast behaviour when an exception is raised 1985 # during map() would make Pool.join() deadlock, because a worker 1986 # process would fill the result queue (after the result handler thread 1987 # terminated, hence not draining it anymore). 1988 1989 t_start = time.time() 1990 1991 with self.assertRaises(ValueError): 1992 with self.Pool(2) as p: 1993 try: 1994 p.map(raise_large_valuerror, [0, 1]) 1995 finally: 1996 time.sleep(0.5) 1997 p.close() 1998 p.join() 1999 2000 # check that we indeed waited for all jobs 2001 self.assertGreater(time.time() - t_start, 0.9) 2002 2003 2004 def raising(): 2005 raise KeyError("key") 2006 2007 def unpickleable_result(): 2008 return lambda: 42 2009 2010 class _TestPoolWorkerErrors(BaseTestCase): 2011 ALLOWED_TYPES = ('processes', ) 2012 2013 def test_async_error_callback(self): 2014 p = multiprocessing.Pool(2) 2015 2016 scratchpad = [None] 2017 def errback(exc): 2018 scratchpad[0] = exc 2019 2020 res = p.apply_async(raising, error_callback=errback) 2021 self.assertRaises(KeyError, res.get) 2022 self.assertTrue(scratchpad[0]) 2023 self.assertIsInstance(scratchpad[0], KeyError) 2024 2025 p.close() 2026 p.join() 2027 2028 def test_unpickleable_result(self): 2029 from multiprocessing.pool import MaybeEncodingError 2030 p = multiprocessing.Pool(2) 2031 2032 # Make sure we don't lose pool processes because of encoding errors. 2033 for iteration in range(20): 2034 2035 scratchpad = [None] 2036 def errback(exc): 2037 scratchpad[0] = exc 2038 2039 res = p.apply_async(unpickleable_result, error_callback=errback) 2040 self.assertRaises(MaybeEncodingError, res.get) 2041 wrapped = scratchpad[0] 2042 self.assertTrue(wrapped) 2043 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 2044 self.assertIsNotNone(wrapped.exc) 2045 self.assertIsNotNone(wrapped.value) 2046 2047 p.close() 2048 p.join() 2049 2050 class _TestPoolWorkerLifetime(BaseTestCase): 2051 ALLOWED_TYPES = ('processes', ) 2052 2053 def test_pool_worker_lifetime(self): 2054 p = multiprocessing.Pool(3, maxtasksperchild=10) 2055 self.assertEqual(3, len(p._pool)) 2056 origworkerpids = [w.pid for w in p._pool] 2057 # Run many tasks so each worker gets replaced (hopefully) 2058 results = [] 2059 for i in range(100): 2060 results.append(p.apply_async(sqr, (i, ))) 2061 # Fetch the results and verify we got the right answers, 2062 # also ensuring all the tasks have completed. 2063 for (j, res) in enumerate(results): 2064 self.assertEqual(res.get(), sqr(j)) 2065 # Refill the pool 2066 p._repopulate_pool() 2067 # Wait until all workers are alive 2068 # (countdown * DELTA = 5 seconds max startup process time) 2069 countdown = 50 2070 while countdown and not all(w.is_alive() for w in p._pool): 2071 countdown -= 1 2072 time.sleep(DELTA) 2073 finalworkerpids = [w.pid for w in p._pool] 2074 # All pids should be assigned. See issue #7805. 2075 self.assertNotIn(None, origworkerpids) 2076 self.assertNotIn(None, finalworkerpids) 2077 # Finally, check that the worker pids have changed 2078 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 2079 p.close() 2080 p.join() 2081 2082 def test_pool_worker_lifetime_early_close(self): 2083 # Issue #10332: closing a pool whose workers have limited lifetimes 2084 # before all the tasks completed would make join() hang. 2085 p = multiprocessing.Pool(3, maxtasksperchild=1) 2086 results = [] 2087 for i in range(6): 2088 results.append(p.apply_async(sqr, (i, 0.3))) 2089 p.close() 2090 p.join() 2091 # check the results 2092 for (j, res) in enumerate(results): 2093 self.assertEqual(res.get(), sqr(j)) 2094 2095 # 2096 # Test of creating a customized manager class 2097 # 2098 2099 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 2100 2101 class FooBar(object): 2102 def f(self): 2103 return 'f()' 2104 def g(self): 2105 raise ValueError 2106 def _h(self): 2107 return '_h()' 2108 2109 def baz(): 2110 for i in range(10): 2111 yield i*i 2112 2113 class IteratorProxy(BaseProxy): 2114 _exposed_ = ('__next__',) 2115 def __iter__(self): 2116 return self 2117 def __next__(self): 2118 return self._callmethod('__next__') 2119 2120 class MyManager(BaseManager): 2121 pass 2122 2123 MyManager.register('Foo', callable=FooBar) 2124 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 2125 MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 2126 2127 2128 class _TestMyManager(BaseTestCase): 2129 2130 ALLOWED_TYPES = ('manager',) 2131 2132 def test_mymanager(self): 2133 manager = MyManager() 2134 manager.start() 2135 self.common(manager) 2136 manager.shutdown() 2137 2138 # If the manager process exited cleanly then the exitcode 2139 # will be zero. Otherwise (after a short timeout) 2140 # terminate() is used, resulting in an exitcode of -SIGTERM. 2141 self.assertEqual(manager._process.exitcode, 0) 2142 2143 def test_mymanager_context(self): 2144 with MyManager() as manager: 2145 self.common(manager) 2146 self.assertEqual(manager._process.exitcode, 0) 2147 2148 def test_mymanager_context_prestarted(self): 2149 manager = MyManager() 2150 manager.start() 2151 with manager: 2152 self.common(manager) 2153 self.assertEqual(manager._process.exitcode, 0) 2154 2155 def common(self, manager): 2156 foo = manager.Foo() 2157 bar = manager.Bar() 2158 baz = manager.baz() 2159 2160 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 2161 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 2162 2163 self.assertEqual(foo_methods, ['f', 'g']) 2164 self.assertEqual(bar_methods, ['f', '_h']) 2165 2166 self.assertEqual(foo.f(), 'f()') 2167 self.assertRaises(ValueError, foo.g) 2168 self.assertEqual(foo._callmethod('f'), 'f()') 2169 self.assertRaises(RemoteError, foo._callmethod, '_h') 2170 2171 self.assertEqual(bar.f(), 'f()') 2172 self.assertEqual(bar._h(), '_h()') 2173 self.assertEqual(bar._callmethod('f'), 'f()') 2174 self.assertEqual(bar._callmethod('_h'), '_h()') 2175 2176 self.assertEqual(list(baz), [i*i for i in range(10)]) 2177 2178 2179 # 2180 # Test of connecting to a remote server and using xmlrpclib for serialization 2181 # 2182 2183 _queue = pyqueue.Queue() 2184 def get_queue(): 2185 return _queue 2186 2187 class QueueManager(BaseManager): 2188 '''manager class used by server process''' 2189 QueueManager.register('get_queue', callable=get_queue) 2190 2191 class QueueManager2(BaseManager): 2192 '''manager class which specifies the same interface as QueueManager''' 2193 QueueManager2.register('get_queue') 2194 2195 2196 SERIALIZER = 'xmlrpclib' 2197 2198 class _TestRemoteManager(BaseTestCase): 2199 2200 ALLOWED_TYPES = ('manager',) 2201 values = ['hello world', None, True, 2.25, 2202 'hall\xe5 v\xe4rlden', 2203 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 2204 b'hall\xe5 v\xe4rlden', 2205 ] 2206 result = values[:] 2207 2208 @classmethod 2209 def _putter(cls, address, authkey): 2210 manager = QueueManager2( 2211 address=address, authkey=authkey, serializer=SERIALIZER 2212 ) 2213 manager.connect() 2214 queue = manager.get_queue() 2215 # Note that xmlrpclib will deserialize object as a list not a tuple 2216 queue.put(tuple(cls.values)) 2217 2218 def test_remote(self): 2219 authkey = os.urandom(32) 2220 2221 manager = QueueManager( 2222 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER 2223 ) 2224 manager.start() 2225 2226 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2227 p.daemon = True 2228 p.start() 2229 2230 manager2 = QueueManager2( 2231 address=manager.address, authkey=authkey, serializer=SERIALIZER 2232 ) 2233 manager2.connect() 2234 queue = manager2.get_queue() 2235 2236 self.assertEqual(queue.get(), self.result) 2237 2238 # Because we are using xmlrpclib for serialization instead of 2239 # pickle this will cause a serialization error. 2240 self.assertRaises(Exception, queue.put, time.sleep) 2241 2242 # Make queue finalizer run before the server is stopped 2243 del queue 2244 manager.shutdown() 2245 2246 class _TestManagerRestart(BaseTestCase): 2247 2248 @classmethod 2249 def _putter(cls, address, authkey): 2250 manager = QueueManager( 2251 address=address, authkey=authkey, serializer=SERIALIZER) 2252 manager.connect() 2253 queue = manager.get_queue() 2254 queue.put('hello world') 2255 2256 def test_rapid_restart(self): 2257 authkey = os.urandom(32) 2258 manager = QueueManager( 2259 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) 2260 srvr = manager.get_server() 2261 addr = srvr.address 2262 # Close the connection.Listener socket which gets opened as a part 2263 # of manager.get_server(). It's not needed for the test. 2264 srvr.listener.close() 2265 manager.start() 2266 2267 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2268 p.daemon = True 2269 p.start() 2270 queue = manager.get_queue() 2271 self.assertEqual(queue.get(), 'hello world') 2272 del queue 2273 manager.shutdown() 2274 manager = QueueManager( 2275 address=addr, authkey=authkey, serializer=SERIALIZER) 2276 try: 2277 manager.start() 2278 except OSError as e: 2279 if e.errno != errno.EADDRINUSE: 2280 raise 2281 # Retry after some time, in case the old socket was lingering 2282 # (sporadic failure on buildbots) 2283 time.sleep(1.0) 2284 manager = QueueManager( 2285 address=addr, authkey=authkey, serializer=SERIALIZER) 2286 manager.shutdown() 2287 2288 # 2289 # 2290 # 2291 2292 SENTINEL = latin('') 2293 2294 class _TestConnection(BaseTestCase): 2295 2296 ALLOWED_TYPES = ('processes', 'threads') 2297 2298 @classmethod 2299 def _echo(cls, conn): 2300 for msg in iter(conn.recv_bytes, SENTINEL): 2301 conn.send_bytes(msg) 2302 conn.close() 2303 2304 def test_connection(self): 2305 conn, child_conn = self.Pipe() 2306 2307 p = self.Process(target=self._echo, args=(child_conn,)) 2308 p.daemon = True 2309 p.start() 2310 2311 seq = [1, 2.25, None] 2312 msg = latin('hello world') 2313 longmsg = msg * 10 2314 arr = array.array('i', list(range(4))) 2315 2316 if self.TYPE == 'processes': 2317 self.assertEqual(type(conn.fileno()), int) 2318 2319 self.assertEqual(conn.send(seq), None) 2320 self.assertEqual(conn.recv(), seq) 2321 2322 self.assertEqual(conn.send_bytes(msg), None) 2323 self.assertEqual(conn.recv_bytes(), msg) 2324 2325 if self.TYPE == 'processes': 2326 buffer = array.array('i', [0]*10) 2327 expected = list(arr) + [0] * (10 - len(arr)) 2328 self.assertEqual(conn.send_bytes(arr), None) 2329 self.assertEqual(conn.recv_bytes_into(buffer), 2330 len(arr) * buffer.itemsize) 2331 self.assertEqual(list(buffer), expected) 2332 2333 buffer = array.array('i', [0]*10) 2334 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 2335 self.assertEqual(conn.send_bytes(arr), None) 2336 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 2337 len(arr) * buffer.itemsize) 2338 self.assertEqual(list(buffer), expected) 2339 2340 buffer = bytearray(latin(' ' * 40)) 2341 self.assertEqual(conn.send_bytes(longmsg), None) 2342 try: 2343 res = conn.recv_bytes_into(buffer) 2344 except multiprocessing.BufferTooShort as e: 2345 self.assertEqual(e.args, (longmsg,)) 2346 else: 2347 self.fail('expected BufferTooShort, got %s' % res) 2348 2349 poll = TimingWrapper(conn.poll) 2350 2351 self.assertEqual(poll(), False) 2352 self.assertTimingAlmostEqual(poll.elapsed, 0) 2353 2354 self.assertEqual(poll(-1), False) 2355 self.assertTimingAlmostEqual(poll.elapsed, 0) 2356 2357 self.assertEqual(poll(TIMEOUT1), False) 2358 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 2359 2360 conn.send(None) 2361 time.sleep(.1) 2362 2363 self.assertEqual(poll(TIMEOUT1), True) 2364 self.assertTimingAlmostEqual(poll.elapsed, 0) 2365 2366 self.assertEqual(conn.recv(), None) 2367 2368 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 2369 conn.send_bytes(really_big_msg) 2370 self.assertEqual(conn.recv_bytes(), really_big_msg) 2371 2372 conn.send_bytes(SENTINEL) # tell child to quit 2373 child_conn.close() 2374 2375 if self.TYPE == 'processes': 2376 self.assertEqual(conn.readable, True) 2377 self.assertEqual(conn.writable, True) 2378 self.assertRaises(EOFError, conn.recv) 2379 self.assertRaises(EOFError, conn.recv_bytes) 2380 2381 p.join() 2382 2383 def test_duplex_false(self): 2384 reader, writer = self.Pipe(duplex=False) 2385 self.assertEqual(writer.send(1), None) 2386 self.assertEqual(reader.recv(), 1) 2387 if self.TYPE == 'processes': 2388 self.assertEqual(reader.readable, True) 2389 self.assertEqual(reader.writable, False) 2390 self.assertEqual(writer.readable, False) 2391 self.assertEqual(writer.writable, True) 2392 self.assertRaises(OSError, reader.send, 2) 2393 self.assertRaises(OSError, writer.recv) 2394 self.assertRaises(OSError, writer.poll) 2395 2396 def test_spawn_close(self): 2397 # We test that a pipe connection can be closed by parent 2398 # process immediately after child is spawned. On Windows this 2399 # would have sometimes failed on old versions because 2400 # child_conn would be closed before the child got a chance to 2401 # duplicate it. 2402 conn, child_conn = self.Pipe() 2403 2404 p = self.Process(target=self._echo, args=(child_conn,)) 2405 p.daemon = True 2406 p.start() 2407 child_conn.close() # this might complete before child initializes 2408 2409 msg = latin('hello') 2410 conn.send_bytes(msg) 2411 self.assertEqual(conn.recv_bytes(), msg) 2412 2413 conn.send_bytes(SENTINEL) 2414 conn.close() 2415 p.join() 2416 2417 def test_sendbytes(self): 2418 if self.TYPE != 'processes': 2419 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2420 2421 msg = latin('abcdefghijklmnopqrstuvwxyz') 2422 a, b = self.Pipe() 2423 2424 a.send_bytes(msg) 2425 self.assertEqual(b.recv_bytes(), msg) 2426 2427 a.send_bytes(msg, 5) 2428 self.assertEqual(b.recv_bytes(), msg[5:]) 2429 2430 a.send_bytes(msg, 7, 8) 2431 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 2432 2433 a.send_bytes(msg, 26) 2434 self.assertEqual(b.recv_bytes(), latin('')) 2435 2436 a.send_bytes(msg, 26, 0) 2437 self.assertEqual(b.recv_bytes(), latin('')) 2438 2439 self.assertRaises(ValueError, a.send_bytes, msg, 27) 2440 2441 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 2442 2443 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 2444 2445 self.assertRaises(ValueError, a.send_bytes, msg, -1) 2446 2447 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 2448 2449 @classmethod 2450 def _is_fd_assigned(cls, fd): 2451 try: 2452 os.fstat(fd) 2453 except OSError as e: 2454 if e.errno == errno.EBADF: 2455 return False 2456 raise 2457 else: 2458 return True 2459 2460 @classmethod 2461 def _writefd(cls, conn, data, create_dummy_fds=False): 2462 if create_dummy_fds: 2463 for i in range(0, 256): 2464 if not cls._is_fd_assigned(i): 2465 os.dup2(conn.fileno(), i) 2466 fd = reduction.recv_handle(conn) 2467 if msvcrt: 2468 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 2469 os.write(fd, data) 2470 os.close(fd) 2471 2472 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2473 def test_fd_transfer(self): 2474 if self.TYPE != 'processes': 2475 self.skipTest("only makes sense with processes") 2476 conn, child_conn = self.Pipe(duplex=True) 2477 2478 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 2479 p.daemon = True 2480 p.start() 2481 self.addCleanup(test.support.unlink, test.support.TESTFN) 2482 with open(test.support.TESTFN, "wb") as f: 2483 fd = f.fileno() 2484 if msvcrt: 2485 fd = msvcrt.get_osfhandle(fd) 2486 reduction.send_handle(conn, fd, p.pid) 2487 p.join() 2488 with open(test.support.TESTFN, "rb") as f: 2489 self.assertEqual(f.read(), b"foo") 2490 2491 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2492 @unittest.skipIf(sys.platform == "win32", 2493 "test semantics don't make sense on Windows") 2494 @unittest.skipIf(MAXFD <= 256, 2495 "largest assignable fd number is too small") 2496 @unittest.skipUnless(hasattr(os, "dup2"), 2497 "test needs os.dup2()") 2498 def test_large_fd_transfer(self): 2499 # With fd > 256 (issue #11657) 2500 if self.TYPE != 'processes': 2501 self.skipTest("only makes sense with processes") 2502 conn, child_conn = self.Pipe(duplex=True) 2503 2504 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 2505 p.daemon = True 2506 p.start() 2507 self.addCleanup(test.support.unlink, test.support.TESTFN) 2508 with open(test.support.TESTFN, "wb") as f: 2509 fd = f.fileno() 2510 for newfd in range(256, MAXFD): 2511 if not self._is_fd_assigned(newfd): 2512 break 2513 else: 2514 self.fail("could not find an unassigned large file descriptor") 2515 os.dup2(fd, newfd) 2516 try: 2517 reduction.send_handle(conn, newfd, p.pid) 2518 finally: 2519 os.close(newfd) 2520 p.join() 2521 with open(test.support.TESTFN, "rb") as f: 2522 self.assertEqual(f.read(), b"bar") 2523 2524 @classmethod 2525 def _send_data_without_fd(self, conn): 2526 os.write(conn.fileno(), b"\0") 2527 2528 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2529 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 2530 def test_missing_fd_transfer(self): 2531 # Check that exception is raised when received data is not 2532 # accompanied by a file descriptor in ancillary data. 2533 if self.TYPE != 'processes': 2534 self.skipTest("only makes sense with processes") 2535 conn, child_conn = self.Pipe(duplex=True) 2536 2537 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 2538 p.daemon = True 2539 p.start() 2540 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 2541 p.join() 2542 2543 def test_context(self): 2544 a, b = self.Pipe() 2545 2546 with a, b: 2547 a.send(1729) 2548 self.assertEqual(b.recv(), 1729) 2549 if self.TYPE == 'processes': 2550 self.assertFalse(a.closed) 2551 self.assertFalse(b.closed) 2552 2553 if self.TYPE == 'processes': 2554 self.assertTrue(a.closed) 2555 self.assertTrue(b.closed) 2556 self.assertRaises(OSError, a.recv) 2557 self.assertRaises(OSError, b.recv) 2558 2559 class _TestListener(BaseTestCase): 2560 2561 ALLOWED_TYPES = ('processes',) 2562 2563 def test_multiple_bind(self): 2564 for family in self.connection.families: 2565 l = self.connection.Listener(family=family) 2566 self.addCleanup(l.close) 2567 self.assertRaises(OSError, self.connection.Listener, 2568 l.address, family) 2569 2570 def test_context(self): 2571 with self.connection.Listener() as l: 2572 with self.connection.Client(l.address) as c: 2573 with l.accept() as d: 2574 c.send(1729) 2575 self.assertEqual(d.recv(), 1729) 2576 2577 if self.TYPE == 'processes': 2578 self.assertRaises(OSError, l.accept) 2579 2580 class _TestListenerClient(BaseTestCase): 2581 2582 ALLOWED_TYPES = ('processes', 'threads') 2583 2584 @classmethod 2585 def _test(cls, address): 2586 conn = cls.connection.Client(address) 2587 conn.send('hello') 2588 conn.close() 2589 2590 def test_listener_client(self): 2591 for family in self.connection.families: 2592 l = self.connection.Listener(family=family) 2593 p = self.Process(target=self._test, args=(l.address,)) 2594 p.daemon = True 2595 p.start() 2596 conn = l.accept() 2597 self.assertEqual(conn.recv(), 'hello') 2598 p.join() 2599 l.close() 2600 2601 def test_issue14725(self): 2602 l = self.connection.Listener() 2603 p = self.Process(target=self._test, args=(l.address,)) 2604 p.daemon = True 2605 p.start() 2606 time.sleep(1) 2607 # On Windows the client process should by now have connected, 2608 # written data and closed the pipe handle by now. This causes 2609 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 2610 # 14725. 2611 conn = l.accept() 2612 self.assertEqual(conn.recv(), 'hello') 2613 conn.close() 2614 p.join() 2615 l.close() 2616 2617 def test_issue16955(self): 2618 for fam in self.connection.families: 2619 l = self.connection.Listener(family=fam) 2620 c = self.connection.Client(l.address) 2621 a = l.accept() 2622 a.send_bytes(b"hello") 2623 self.assertTrue(c.poll(1)) 2624 a.close() 2625 c.close() 2626 l.close() 2627 2628 class _TestPoll(BaseTestCase): 2629 2630 ALLOWED_TYPES = ('processes', 'threads') 2631 2632 def test_empty_string(self): 2633 a, b = self.Pipe() 2634 self.assertEqual(a.poll(), False) 2635 b.send_bytes(b'') 2636 self.assertEqual(a.poll(), True) 2637 self.assertEqual(a.poll(), True) 2638 2639 @classmethod 2640 def _child_strings(cls, conn, strings): 2641 for s in strings: 2642 time.sleep(0.1) 2643 conn.send_bytes(s) 2644 conn.close() 2645 2646 def test_strings(self): 2647 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 2648 a, b = self.Pipe() 2649 p = self.Process(target=self._child_strings, args=(b, strings)) 2650 p.start() 2651 2652 for s in strings: 2653 for i in range(200): 2654 if a.poll(0.01): 2655 break 2656 x = a.recv_bytes() 2657 self.assertEqual(s, x) 2658 2659 p.join() 2660 2661 @classmethod 2662 def _child_boundaries(cls, r): 2663 # Polling may "pull" a message in to the child process, but we 2664 # don't want it to pull only part of a message, as that would 2665 # corrupt the pipe for any other processes which might later 2666 # read from it. 2667 r.poll(5) 2668 2669 def test_boundaries(self): 2670 r, w = self.Pipe(False) 2671 p = self.Process(target=self._child_boundaries, args=(r,)) 2672 p.start() 2673 time.sleep(2) 2674 L = [b"first", b"second"] 2675 for obj in L: 2676 w.send_bytes(obj) 2677 w.close() 2678 p.join() 2679 self.assertIn(r.recv_bytes(), L) 2680 2681 @classmethod 2682 def _child_dont_merge(cls, b): 2683 b.send_bytes(b'a') 2684 b.send_bytes(b'b') 2685 b.send_bytes(b'cd') 2686 2687 def test_dont_merge(self): 2688 a, b = self.Pipe() 2689 self.assertEqual(a.poll(0.0), False) 2690 self.assertEqual(a.poll(0.1), False) 2691 2692 p = self.Process(target=self._child_dont_merge, args=(b,)) 2693 p.start() 2694 2695 self.assertEqual(a.recv_bytes(), b'a') 2696 self.assertEqual(a.poll(1.0), True) 2697 self.assertEqual(a.poll(1.0), True) 2698 self.assertEqual(a.recv_bytes(), b'b') 2699 self.assertEqual(a.poll(1.0), True) 2700 self.assertEqual(a.poll(1.0), True) 2701 self.assertEqual(a.poll(0.0), True) 2702 self.assertEqual(a.recv_bytes(), b'cd') 2703 2704 p.join() 2705 2706 # 2707 # Test of sending connection and socket objects between processes 2708 # 2709 2710 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2711 class _TestPicklingConnections(BaseTestCase): 2712 2713 ALLOWED_TYPES = ('processes',) 2714 2715 @classmethod 2716 def tearDownClass(cls): 2717 from multiprocessing import resource_sharer 2718 resource_sharer.stop(timeout=5) 2719 2720 @classmethod 2721 def _listener(cls, conn, families): 2722 for fam in families: 2723 l = cls.connection.Listener(family=fam) 2724 conn.send(l.address) 2725 new_conn = l.accept() 2726 conn.send(new_conn) 2727 new_conn.close() 2728 l.close() 2729 2730 l = socket.socket() 2731 l.bind((test.support.HOST, 0)) 2732 l.listen() 2733 conn.send(l.getsockname()) 2734 new_conn, addr = l.accept() 2735 conn.send(new_conn) 2736 new_conn.close() 2737 l.close() 2738 2739 conn.recv() 2740 2741 @classmethod 2742 def _remote(cls, conn): 2743 for (address, msg) in iter(conn.recv, None): 2744 client = cls.connection.Client(address) 2745 client.send(msg.upper()) 2746 client.close() 2747 2748 address, msg = conn.recv() 2749 client = socket.socket() 2750 client.connect(address) 2751 client.sendall(msg.upper()) 2752 client.close() 2753 2754 conn.close() 2755 2756 def test_pickling(self): 2757 families = self.connection.families 2758 2759 lconn, lconn0 = self.Pipe() 2760 lp = self.Process(target=self._listener, args=(lconn0, families)) 2761 lp.daemon = True 2762 lp.start() 2763 lconn0.close() 2764 2765 rconn, rconn0 = self.Pipe() 2766 rp = self.Process(target=self._remote, args=(rconn0,)) 2767 rp.daemon = True 2768 rp.start() 2769 rconn0.close() 2770 2771 for fam in families: 2772 msg = ('This connection uses family %s' % fam).encode('ascii') 2773 address = lconn.recv() 2774 rconn.send((address, msg)) 2775 new_conn = lconn.recv() 2776 self.assertEqual(new_conn.recv(), msg.upper()) 2777 2778 rconn.send(None) 2779 2780 msg = latin('This connection uses a normal socket') 2781 address = lconn.recv() 2782 rconn.send((address, msg)) 2783 new_conn = lconn.recv() 2784 buf = [] 2785 while True: 2786 s = new_conn.recv(100) 2787 if not s: 2788 break 2789 buf.append(s) 2790 buf = b''.join(buf) 2791 self.assertEqual(buf, msg.upper()) 2792 new_conn.close() 2793 2794 lconn.send(None) 2795 2796 rconn.close() 2797 lconn.close() 2798 2799 lp.join() 2800 rp.join() 2801 2802 @classmethod 2803 def child_access(cls, conn): 2804 w = conn.recv() 2805 w.send('all is well') 2806 w.close() 2807 2808 r = conn.recv() 2809 msg = r.recv() 2810 conn.send(msg*2) 2811 2812 conn.close() 2813 2814 def test_access(self): 2815 # On Windows, if we do not specify a destination pid when 2816 # using DupHandle then we need to be careful to use the 2817 # correct access flags for DuplicateHandle(), or else 2818 # DupHandle.detach() will raise PermissionError. For example, 2819 # for a read only pipe handle we should use 2820 # access=FILE_GENERIC_READ. (Unfortunately 2821 # DUPLICATE_SAME_ACCESS does not work.) 2822 conn, child_conn = self.Pipe() 2823 p = self.Process(target=self.child_access, args=(child_conn,)) 2824 p.daemon = True 2825 p.start() 2826 child_conn.close() 2827 2828 r, w = self.Pipe(duplex=False) 2829 conn.send(w) 2830 w.close() 2831 self.assertEqual(r.recv(), 'all is well') 2832 r.close() 2833 2834 r, w = self.Pipe(duplex=False) 2835 conn.send(r) 2836 r.close() 2837 w.send('foobar') 2838 w.close() 2839 self.assertEqual(conn.recv(), 'foobar'*2) 2840 2841 # 2842 # 2843 # 2844 2845 class _TestHeap(BaseTestCase): 2846 2847 ALLOWED_TYPES = ('processes',) 2848 2849 def test_heap(self): 2850 iterations = 5000 2851 maxblocks = 50 2852 blocks = [] 2853 2854 # create and destroy lots of blocks of different sizes 2855 for i in range(iterations): 2856 size = int(random.lognormvariate(0, 1) * 1000) 2857 b = multiprocessing.heap.BufferWrapper(size) 2858 blocks.append(b) 2859 if len(blocks) > maxblocks: 2860 i = random.randrange(maxblocks) 2861 del blocks[i] 2862 2863 # get the heap object 2864 heap = multiprocessing.heap.BufferWrapper._heap 2865 2866 # verify the state of the heap 2867 all = [] 2868 occupied = 0 2869 heap._lock.acquire() 2870 self.addCleanup(heap._lock.release) 2871 for L in list(heap._len_to_seq.values()): 2872 for arena, start, stop in L: 2873 all.append((heap._arenas.index(arena), start, stop, 2874 stop-start, 'free')) 2875 for arena, start, stop in heap._allocated_blocks: 2876 all.append((heap._arenas.index(arena), start, stop, 2877 stop-start, 'occupied')) 2878 occupied += (stop-start) 2879 2880 all.sort() 2881 2882 for i in range(len(all)-1): 2883 (arena, start, stop) = all[i][:3] 2884 (narena, nstart, nstop) = all[i+1][:3] 2885 self.assertTrue((arena != narena and nstart == 0) or 2886 (stop == nstart)) 2887 2888 def test_free_from_gc(self): 2889 # Check that freeing of blocks by the garbage collector doesn't deadlock 2890 # (issue #12352). 2891 # Make sure the GC is enabled, and set lower collection thresholds to 2892 # make collections more frequent (and increase the probability of 2893 # deadlock). 2894 if not gc.isenabled(): 2895 gc.enable() 2896 self.addCleanup(gc.disable) 2897 thresholds = gc.get_threshold() 2898 self.addCleanup(gc.set_threshold, *thresholds) 2899 gc.set_threshold(10) 2900 2901 # perform numerous block allocations, with cyclic references to make 2902 # sure objects are collected asynchronously by the gc 2903 for i in range(5000): 2904 a = multiprocessing.heap.BufferWrapper(1) 2905 b = multiprocessing.heap.BufferWrapper(1) 2906 # circular references 2907 a.buddy = b 2908 b.buddy = a 2909 2910 # 2911 # 2912 # 2913 2914 class _Foo(Structure): 2915 _fields_ = [ 2916 ('x', c_int), 2917 ('y', c_double) 2918 ] 2919 2920 class _TestSharedCTypes(BaseTestCase): 2921 2922 ALLOWED_TYPES = ('processes',) 2923 2924 def setUp(self): 2925 if not HAS_SHAREDCTYPES: 2926 self.skipTest("requires multiprocessing.sharedctypes") 2927 2928 @classmethod 2929 def _double(cls, x, y, foo, arr, string): 2930 x.value *= 2 2931 y.value *= 2 2932 foo.x *= 2 2933 foo.y *= 2 2934 string.value *= 2 2935 for i in range(len(arr)): 2936 arr[i] *= 2 2937 2938 def test_sharedctypes(self, lock=False): 2939 x = Value('i', 7, lock=lock) 2940 y = Value(c_double, 1.0/3.0, lock=lock) 2941 foo = Value(_Foo, 3, 2, lock=lock) 2942 arr = self.Array('d', list(range(10)), lock=lock) 2943 string = self.Array('c', 20, lock=lock) 2944 string.value = latin('hello') 2945 2946 p = self.Process(target=self._double, args=(x, y, foo, arr, string)) 2947 p.daemon = True 2948 p.start() 2949 p.join() 2950 2951 self.assertEqual(x.value, 14) 2952 self.assertAlmostEqual(y.value, 2.0/3.0) 2953 self.assertEqual(foo.x, 6) 2954 self.assertAlmostEqual(foo.y, 4.0) 2955 for i in range(10): 2956 self.assertAlmostEqual(arr[i], i*2) 2957 self.assertEqual(string.value, latin('hellohello')) 2958 2959 def test_synchronize(self): 2960 self.test_sharedctypes(lock=True) 2961 2962 def test_copy(self): 2963 foo = _Foo(2, 5.0) 2964 bar = copy(foo) 2965 foo.x = 0 2966 foo.y = 0 2967 self.assertEqual(bar.x, 2) 2968 self.assertAlmostEqual(bar.y, 5.0) 2969 2970 # 2971 # 2972 # 2973 2974 class _TestFinalize(BaseTestCase): 2975 2976 ALLOWED_TYPES = ('processes',) 2977 2978 @classmethod 2979 def _test_finalize(cls, conn): 2980 class Foo(object): 2981 pass 2982 2983 a = Foo() 2984 util.Finalize(a, conn.send, args=('a',)) 2985 del a # triggers callback for a 2986 2987 b = Foo() 2988 close_b = util.Finalize(b, conn.send, args=('b',)) 2989 close_b() # triggers callback for b 2990 close_b() # does nothing because callback has already been called 2991 del b # does nothing because callback has already been called 2992 2993 c = Foo() 2994 util.Finalize(c, conn.send, args=('c',)) 2995 2996 d10 = Foo() 2997 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 2998 2999 d01 = Foo() 3000 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 3001 d02 = Foo() 3002 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 3003 d03 = Foo() 3004 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 3005 3006 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 3007 3008 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 3009 3010 # call multiprocessing's cleanup function then exit process without 3011 # garbage collecting locals 3012 util._exit_function() 3013 conn.close() 3014 os._exit(0) 3015 3016 def test_finalize(self): 3017 conn, child_conn = self.Pipe() 3018 3019 p = self.Process(target=self._test_finalize, args=(child_conn,)) 3020 p.daemon = True 3021 p.start() 3022 p.join() 3023 3024 result = [obj for obj in iter(conn.recv, 'STOP')] 3025 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 3026 3027 # 3028 # Test that from ... import * works for each module 3029 # 3030 3031 class _TestImportStar(unittest.TestCase): 3032 3033 def get_module_names(self): 3034 import glob 3035 folder = os.path.dirname(multiprocessing.__file__) 3036 pattern = os.path.join(folder, '*.py') 3037 files = glob.glob(pattern) 3038 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 3039 modules = ['multiprocessing.' + m for m in modules] 3040 modules.remove('multiprocessing.__init__') 3041 modules.append('multiprocessing') 3042 return modules 3043 3044 def test_import(self): 3045 modules = self.get_module_names() 3046 if sys.platform == 'win32': 3047 modules.remove('multiprocessing.popen_fork') 3048 modules.remove('multiprocessing.popen_forkserver') 3049 modules.remove('multiprocessing.popen_spawn_posix') 3050 else: 3051 modules.remove('multiprocessing.popen_spawn_win32') 3052 if not HAS_REDUCTION: 3053 modules.remove('multiprocessing.popen_forkserver') 3054 3055 if c_int is None: 3056 # This module requires _ctypes 3057 modules.remove('multiprocessing.sharedctypes') 3058 3059 for name in modules: 3060 __import__(name) 3061 mod = sys.modules[name] 3062 self.assertTrue(hasattr(mod, '__all__'), name) 3063 3064 for attr in mod.__all__: 3065 self.assertTrue( 3066 hasattr(mod, attr), 3067 '%r does not have attribute %r' % (mod, attr) 3068 ) 3069 3070 # 3071 # Quick test that logging works -- does not test logging output 3072 # 3073 3074 class _TestLogging(BaseTestCase): 3075 3076 ALLOWED_TYPES = ('processes',) 3077 3078 def test_enable_logging(self): 3079 logger = multiprocessing.get_logger() 3080 logger.setLevel(util.SUBWARNING) 3081 self.assertTrue(logger is not None) 3082 logger.debug('this will not be printed') 3083 logger.info('nor will this') 3084 logger.setLevel(LOG_LEVEL) 3085 3086 @classmethod 3087 def _test_level(cls, conn): 3088 logger = multiprocessing.get_logger() 3089 conn.send(logger.getEffectiveLevel()) 3090 3091 def test_level(self): 3092 LEVEL1 = 32 3093 LEVEL2 = 37 3094 3095 logger = multiprocessing.get_logger() 3096 root_logger = logging.getLogger() 3097 root_level = root_logger.level 3098 3099 reader, writer = multiprocessing.Pipe(duplex=False) 3100 3101 logger.setLevel(LEVEL1) 3102 p = self.Process(target=self._test_level, args=(writer,)) 3103 p.daemon = True 3104 p.start() 3105 self.assertEqual(LEVEL1, reader.recv()) 3106 3107 logger.setLevel(logging.NOTSET) 3108 root_logger.setLevel(LEVEL2) 3109 p = self.Process(target=self._test_level, args=(writer,)) 3110 p.daemon = True 3111 p.start() 3112 self.assertEqual(LEVEL2, reader.recv()) 3113 3114 root_logger.setLevel(root_level) 3115 logger.setLevel(level=LOG_LEVEL) 3116 3117 3118 # class _TestLoggingProcessName(BaseTestCase): 3119 # 3120 # def handle(self, record): 3121 # assert record.processName == multiprocessing.current_process().name 3122 # self.__handled = True 3123 # 3124 # def test_logging(self): 3125 # handler = logging.Handler() 3126 # handler.handle = self.handle 3127 # self.__handled = False 3128 # # Bypass getLogger() and side-effects 3129 # logger = logging.getLoggerClass()( 3130 # 'multiprocessing.test.TestLoggingProcessName') 3131 # logger.addHandler(handler) 3132 # logger.propagate = False 3133 # 3134 # logger.warn('foo') 3135 # assert self.__handled 3136 3137 # 3138 # Check that Process.join() retries if os.waitpid() fails with EINTR 3139 # 3140 3141 class _TestPollEintr(BaseTestCase): 3142 3143 ALLOWED_TYPES = ('processes',) 3144 3145 @classmethod 3146 def _killer(cls, pid): 3147 time.sleep(0.1) 3148 os.kill(pid, signal.SIGUSR1) 3149 3150 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 3151 def test_poll_eintr(self): 3152 got_signal = [False] 3153 def record(*args): 3154 got_signal[0] = True 3155 pid = os.getpid() 3156 oldhandler = signal.signal(signal.SIGUSR1, record) 3157 try: 3158 killer = self.Process(target=self._killer, args=(pid,)) 3159 killer.start() 3160 try: 3161 p = self.Process(target=time.sleep, args=(2,)) 3162 p.start() 3163 p.join() 3164 finally: 3165 killer.join() 3166 self.assertTrue(got_signal[0]) 3167 self.assertEqual(p.exitcode, 0) 3168 finally: 3169 signal.signal(signal.SIGUSR1, oldhandler) 3170 3171 # 3172 # Test to verify handle verification, see issue 3321 3173 # 3174 3175 class TestInvalidHandle(unittest.TestCase): 3176 3177 @unittest.skipIf(WIN32, "skipped on Windows") 3178 def test_invalid_handles(self): 3179 conn = multiprocessing.connection.Connection(44977608) 3180 # check that poll() doesn't crash 3181 try: 3182 conn.poll() 3183 except (ValueError, OSError): 3184 pass 3185 finally: 3186 # Hack private attribute _handle to avoid printing an error 3187 # in conn.__del__ 3188 conn._handle = None 3189 self.assertRaises((ValueError, OSError), 3190 multiprocessing.connection.Connection, -1) 3191 3192 3193 3194 class OtherTest(unittest.TestCase): 3195 # TODO: add more tests for deliver/answer challenge. 3196 def test_deliver_challenge_auth_failure(self): 3197 class _FakeConnection(object): 3198 def recv_bytes(self, size): 3199 return b'something bogus' 3200 def send_bytes(self, data): 3201 pass 3202 self.assertRaises(multiprocessing.AuthenticationError, 3203 multiprocessing.connection.deliver_challenge, 3204 _FakeConnection(), b'abc') 3205 3206 def test_answer_challenge_auth_failure(self): 3207 class _FakeConnection(object): 3208 def __init__(self): 3209 self.count = 0 3210 def recv_bytes(self, size): 3211 self.count += 1 3212 if self.count == 1: 3213 return multiprocessing.connection.CHALLENGE 3214 elif self.count == 2: 3215 return b'something bogus' 3216 return b'' 3217 def send_bytes(self, data): 3218 pass 3219 self.assertRaises(multiprocessing.AuthenticationError, 3220 multiprocessing.connection.answer_challenge, 3221 _FakeConnection(), b'abc') 3222 3223 # 3224 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 3225 # 3226 3227 def initializer(ns): 3228 ns.test += 1 3229 3230 class TestInitializers(unittest.TestCase): 3231 def setUp(self): 3232 self.mgr = multiprocessing.Manager() 3233 self.ns = self.mgr.Namespace() 3234 self.ns.test = 0 3235 3236 def tearDown(self): 3237 self.mgr.shutdown() 3238 self.mgr.join() 3239 3240 def test_manager_initializer(self): 3241 m = multiprocessing.managers.SyncManager() 3242 self.assertRaises(TypeError, m.start, 1) 3243 m.start(initializer, (self.ns,)) 3244 self.assertEqual(self.ns.test, 1) 3245 m.shutdown() 3246 m.join() 3247 3248 def test_pool_initializer(self): 3249 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 3250 p = multiprocessing.Pool(1, initializer, (self.ns,)) 3251 p.close() 3252 p.join() 3253 self.assertEqual(self.ns.test, 1) 3254 3255 # 3256 # Issue 5155, 5313, 5331: Test process in processes 3257 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 3258 # 3259 3260 def _this_sub_process(q): 3261 try: 3262 item = q.get(block=False) 3263 except pyqueue.Empty: 3264 pass 3265 3266 def _test_process(q): 3267 queue = multiprocessing.Queue() 3268 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 3269 subProc.daemon = True 3270 subProc.start() 3271 subProc.join() 3272 3273 def _afunc(x): 3274 return x*x 3275 3276 def pool_in_process(): 3277 pool = multiprocessing.Pool(processes=4) 3278 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 3279 pool.close() 3280 pool.join() 3281 3282 class _file_like(object): 3283 def __init__(self, delegate): 3284 self._delegate = delegate 3285 self._pid = None 3286 3287 @property 3288 def cache(self): 3289 pid = os.getpid() 3290 # There are no race conditions since fork keeps only the running thread 3291 if pid != self._pid: 3292 self._pid = pid 3293 self._cache = [] 3294 return self._cache 3295 3296 def write(self, data): 3297 self.cache.append(data) 3298 3299 def flush(self): 3300 self._delegate.write(''.join(self.cache)) 3301 self._cache = [] 3302 3303 class TestStdinBadfiledescriptor(unittest.TestCase): 3304 3305 def test_queue_in_process(self): 3306 queue = multiprocessing.Queue() 3307 proc = multiprocessing.Process(target=_test_process, args=(queue,)) 3308 proc.start() 3309 proc.join() 3310 3311 def test_pool_in_process(self): 3312 p = multiprocessing.Process(target=pool_in_process) 3313 p.start() 3314 p.join() 3315 3316 def test_flushing(self): 3317 sio = io.StringIO() 3318 flike = _file_like(sio) 3319 flike.write('foo') 3320 proc = multiprocessing.Process(target=lambda: flike.flush()) 3321 flike.flush() 3322 assert sio.getvalue() == 'foo' 3323 3324 3325 class TestWait(unittest.TestCase): 3326 3327 @classmethod 3328 def _child_test_wait(cls, w, slow): 3329 for i in range(10): 3330 if slow: 3331 time.sleep(random.random()*0.1) 3332 w.send((i, os.getpid())) 3333 w.close() 3334 3335 def test_wait(self, slow=False): 3336 from multiprocessing.connection import wait 3337 readers = [] 3338 procs = [] 3339 messages = [] 3340 3341 for i in range(4): 3342 r, w = multiprocessing.Pipe(duplex=False) 3343 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 3344 p.daemon = True 3345 p.start() 3346 w.close() 3347 readers.append(r) 3348 procs.append(p) 3349 self.addCleanup(p.join) 3350 3351 while readers: 3352 for r in wait(readers): 3353 try: 3354 msg = r.recv() 3355 except EOFError: 3356 readers.remove(r) 3357 r.close() 3358 else: 3359 messages.append(msg) 3360 3361 messages.sort() 3362 expected = sorted((i, p.pid) for i in range(10) for p in procs) 3363 self.assertEqual(messages, expected) 3364 3365 @classmethod 3366 def _child_test_wait_socket(cls, address, slow): 3367 s = socket.socket() 3368 s.connect(address) 3369 for i in range(10): 3370 if slow: 3371 time.sleep(random.random()*0.1) 3372 s.sendall(('%s\n' % i).encode('ascii')) 3373 s.close() 3374 3375 def test_wait_socket(self, slow=False): 3376 from multiprocessing.connection import wait 3377 l = socket.socket() 3378 l.bind((test.support.HOST, 0)) 3379 l.listen() 3380 addr = l.getsockname() 3381 readers = [] 3382 procs = [] 3383 dic = {} 3384 3385 for i in range(4): 3386 p = multiprocessing.Process(target=self._child_test_wait_socket, 3387 args=(addr, slow)) 3388 p.daemon = True 3389 p.start() 3390 procs.append(p) 3391 self.addCleanup(p.join) 3392 3393 for i in range(4): 3394 r, _ = l.accept() 3395 readers.append(r) 3396 dic[r] = [] 3397 l.close() 3398 3399 while readers: 3400 for r in wait(readers): 3401 msg = r.recv(32) 3402 if not msg: 3403 readers.remove(r) 3404 r.close() 3405 else: 3406 dic[r].append(msg) 3407 3408 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 3409 for v in dic.values(): 3410 self.assertEqual(b''.join(v), expected) 3411 3412 def test_wait_slow(self): 3413 self.test_wait(True) 3414 3415 def test_wait_socket_slow(self): 3416 self.test_wait_socket(True) 3417 3418 def test_wait_timeout(self): 3419 from multiprocessing.connection import wait 3420 3421 expected = 5 3422 a, b = multiprocessing.Pipe() 3423 3424 start = time.time() 3425 res = wait([a, b], expected) 3426 delta = time.time() - start 3427 3428 self.assertEqual(res, []) 3429 self.assertLess(delta, expected * 2) 3430 self.assertGreater(delta, expected * 0.5) 3431 3432 b.send(None) 3433 3434 start = time.time() 3435 res = wait([a, b], 20) 3436 delta = time.time() - start 3437 3438 self.assertEqual(res, [a]) 3439 self.assertLess(delta, 0.4) 3440 3441 @classmethod 3442 def signal_and_sleep(cls, sem, period): 3443 sem.release() 3444 time.sleep(period) 3445 3446 def test_wait_integer(self): 3447 from multiprocessing.connection import wait 3448 3449 expected = 3 3450 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 3451 sem = multiprocessing.Semaphore(0) 3452 a, b = multiprocessing.Pipe() 3453 p = multiprocessing.Process(target=self.signal_and_sleep, 3454 args=(sem, expected)) 3455 3456 p.start() 3457 self.assertIsInstance(p.sentinel, int) 3458 self.assertTrue(sem.acquire(timeout=20)) 3459 3460 start = time.time() 3461 res = wait([a, p.sentinel, b], expected + 20) 3462 delta = time.time() - start 3463 3464 self.assertEqual(res, [p.sentinel]) 3465 self.assertLess(delta, expected + 2) 3466 self.assertGreater(delta, expected - 2) 3467 3468 a.send(None) 3469 3470 start = time.time() 3471 res = wait([a, p.sentinel, b], 20) 3472 delta = time.time() - start 3473 3474 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 3475 self.assertLess(delta, 0.4) 3476 3477 b.send(None) 3478 3479 start = time.time() 3480 res = wait([a, p.sentinel, b], 20) 3481 delta = time.time() - start 3482 3483 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 3484 self.assertLess(delta, 0.4) 3485 3486 p.terminate() 3487 p.join() 3488 3489 def test_neg_timeout(self): 3490 from multiprocessing.connection import wait 3491 a, b = multiprocessing.Pipe() 3492 t = time.time() 3493 res = wait([a], timeout=-1) 3494 t = time.time() - t 3495 self.assertEqual(res, []) 3496 self.assertLess(t, 1) 3497 a.close() 3498 b.close() 3499 3500 # 3501 # Issue 14151: Test invalid family on invalid environment 3502 # 3503 3504 class TestInvalidFamily(unittest.TestCase): 3505 3506 @unittest.skipIf(WIN32, "skipped on Windows") 3507 def test_invalid_family(self): 3508 with self.assertRaises(ValueError): 3509 multiprocessing.connection.Listener(r'\\.\test') 3510 3511 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 3512 def test_invalid_family_win32(self): 3513 with self.assertRaises(ValueError): 3514 multiprocessing.connection.Listener('/var/test.pipe') 3515 3516 # 3517 # Issue 12098: check sys.flags of child matches that for parent 3518 # 3519 3520 class TestFlags(unittest.TestCase): 3521 @classmethod 3522 def run_in_grandchild(cls, conn): 3523 conn.send(tuple(sys.flags)) 3524 3525 @classmethod 3526 def run_in_child(cls): 3527 import json 3528 r, w = multiprocessing.Pipe(duplex=False) 3529 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 3530 p.start() 3531 grandchild_flags = r.recv() 3532 p.join() 3533 r.close() 3534 w.close() 3535 flags = (tuple(sys.flags), grandchild_flags) 3536 print(json.dumps(flags)) 3537 3538 def test_flags(self): 3539 import json, subprocess 3540 # start child process using unusual flags 3541 prog = ('from test._test_multiprocessing import TestFlags; ' + 3542 'TestFlags.run_in_child()') 3543 data = subprocess.check_output( 3544 [sys.executable, '-E', '-S', '-O', '-c', prog]) 3545 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 3546 self.assertEqual(child_flags, grandchild_flags) 3547 3548 # 3549 # Test interaction with socket timeouts - see Issue #6056 3550 # 3551 3552 class TestTimeouts(unittest.TestCase): 3553 @classmethod 3554 def _test_timeout(cls, child, address): 3555 time.sleep(1) 3556 child.send(123) 3557 child.close() 3558 conn = multiprocessing.connection.Client(address) 3559 conn.send(456) 3560 conn.close() 3561 3562 def test_timeout(self): 3563 old_timeout = socket.getdefaulttimeout() 3564 try: 3565 socket.setdefaulttimeout(0.1) 3566 parent, child = multiprocessing.Pipe(duplex=True) 3567 l = multiprocessing.connection.Listener(family='AF_INET') 3568 p = multiprocessing.Process(target=self._test_timeout, 3569 args=(child, l.address)) 3570 p.start() 3571 child.close() 3572 self.assertEqual(parent.recv(), 123) 3573 parent.close() 3574 conn = l.accept() 3575 self.assertEqual(conn.recv(), 456) 3576 conn.close() 3577 l.close() 3578 p.join(10) 3579 finally: 3580 socket.setdefaulttimeout(old_timeout) 3581 3582 # 3583 # Test what happens with no "if __name__ == '__main__'" 3584 # 3585 3586 class TestNoForkBomb(unittest.TestCase): 3587 def test_noforkbomb(self): 3588 sm = multiprocessing.get_start_method() 3589 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 3590 if sm != 'fork': 3591 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 3592 self.assertEqual(out, b'') 3593 self.assertIn(b'RuntimeError', err) 3594 else: 3595 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 3596 self.assertEqual(out.rstrip(), b'123') 3597 self.assertEqual(err, b'') 3598 3599 # 3600 # Issue #17555: ForkAwareThreadLock 3601 # 3602 3603 class TestForkAwareThreadLock(unittest.TestCase): 3604 # We recurisvely start processes. Issue #17555 meant that the 3605 # after fork registry would get duplicate entries for the same 3606 # lock. The size of the registry at generation n was ~2**n. 3607 3608 @classmethod 3609 def child(cls, n, conn): 3610 if n > 1: 3611 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 3612 p.start() 3613 conn.close() 3614 p.join(timeout=5) 3615 else: 3616 conn.send(len(util._afterfork_registry)) 3617 conn.close() 3618 3619 def test_lock(self): 3620 r, w = multiprocessing.Pipe(False) 3621 l = util.ForkAwareThreadLock() 3622 old_size = len(util._afterfork_registry) 3623 p = multiprocessing.Process(target=self.child, args=(5, w)) 3624 p.start() 3625 w.close() 3626 new_size = r.recv() 3627 p.join(timeout=5) 3628 self.assertLessEqual(new_size, old_size) 3629 3630 # 3631 # Check that non-forked child processes do not inherit unneeded fds/handles 3632 # 3633 3634 class TestCloseFds(unittest.TestCase): 3635 3636 def get_high_socket_fd(self): 3637 if WIN32: 3638 # The child process will not have any socket handles, so 3639 # calling socket.fromfd() should produce WSAENOTSOCK even 3640 # if there is a handle of the same number. 3641 return socket.socket().detach() 3642 else: 3643 # We want to produce a socket with an fd high enough that a 3644 # freshly created child process will not have any fds as high. 3645 fd = socket.socket().detach() 3646 to_close = [] 3647 while fd < 50: 3648 to_close.append(fd) 3649 fd = os.dup(fd) 3650 for x in to_close: 3651 os.close(x) 3652 return fd 3653 3654 def close(self, fd): 3655 if WIN32: 3656 socket.socket(fileno=fd).close() 3657 else: 3658 os.close(fd) 3659 3660 @classmethod 3661 def _test_closefds(cls, conn, fd): 3662 try: 3663 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 3664 except Exception as e: 3665 conn.send(e) 3666 else: 3667 s.close() 3668 conn.send(None) 3669 3670 def test_closefd(self): 3671 if not HAS_REDUCTION: 3672 raise unittest.SkipTest('requires fd pickling') 3673 3674 reader, writer = multiprocessing.Pipe() 3675 fd = self.get_high_socket_fd() 3676 try: 3677 p = multiprocessing.Process(target=self._test_closefds, 3678 args=(writer, fd)) 3679 p.start() 3680 writer.close() 3681 e = reader.recv() 3682 p.join(timeout=5) 3683 finally: 3684 self.close(fd) 3685 writer.close() 3686 reader.close() 3687 3688 if multiprocessing.get_start_method() == 'fork': 3689 self.assertIs(e, None) 3690 else: 3691 WSAENOTSOCK = 10038 3692 self.assertIsInstance(e, OSError) 3693 self.assertTrue(e.errno == errno.EBADF or 3694 e.winerror == WSAENOTSOCK, e) 3695 3696 # 3697 # Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 3698 # 3699 3700 class TestIgnoreEINTR(unittest.TestCase): 3701 3702 @classmethod 3703 def _test_ignore(cls, conn): 3704 def handler(signum, frame): 3705 pass 3706 signal.signal(signal.SIGUSR1, handler) 3707 conn.send('ready') 3708 x = conn.recv() 3709 conn.send(x) 3710 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block 3711 3712 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 3713 def test_ignore(self): 3714 conn, child_conn = multiprocessing.Pipe() 3715 try: 3716 p = multiprocessing.Process(target=self._test_ignore, 3717 args=(child_conn,)) 3718 p.daemon = True 3719 p.start() 3720 child_conn.close() 3721 self.assertEqual(conn.recv(), 'ready') 3722 time.sleep(0.1) 3723 os.kill(p.pid, signal.SIGUSR1) 3724 time.sleep(0.1) 3725 conn.send(1234) 3726 self.assertEqual(conn.recv(), 1234) 3727 time.sleep(0.1) 3728 os.kill(p.pid, signal.SIGUSR1) 3729 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024)) 3730 time.sleep(0.1) 3731 p.join() 3732 finally: 3733 conn.close() 3734 3735 @classmethod 3736 def _test_ignore_listener(cls, conn): 3737 def handler(signum, frame): 3738 pass 3739 signal.signal(signal.SIGUSR1, handler) 3740 with multiprocessing.connection.Listener() as l: 3741 conn.send(l.address) 3742 a = l.accept() 3743 a.send('welcome') 3744 3745 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 3746 def test_ignore_listener(self): 3747 conn, child_conn = multiprocessing.Pipe() 3748 try: 3749 p = multiprocessing.Process(target=self._test_ignore_listener, 3750 args=(child_conn,)) 3751 p.daemon = True 3752 p.start() 3753 child_conn.close() 3754 address = conn.recv() 3755 time.sleep(0.1) 3756 os.kill(p.pid, signal.SIGUSR1) 3757 time.sleep(0.1) 3758 client = multiprocessing.connection.Client(address) 3759 self.assertEqual(client.recv(), 'welcome') 3760 p.join() 3761 finally: 3762 conn.close() 3763 3764 class TestStartMethod(unittest.TestCase): 3765 @classmethod 3766 def _check_context(cls, conn): 3767 conn.send(multiprocessing.get_start_method()) 3768 3769 def check_context(self, ctx): 3770 r, w = ctx.Pipe(duplex=False) 3771 p = ctx.Process(target=self._check_context, args=(w,)) 3772 p.start() 3773 w.close() 3774 child_method = r.recv() 3775 r.close() 3776 p.join() 3777 self.assertEqual(child_method, ctx.get_start_method()) 3778 3779 def test_context(self): 3780 for method in ('fork', 'spawn', 'forkserver'): 3781 try: 3782 ctx = multiprocessing.get_context(method) 3783 except ValueError: 3784 continue 3785 self.assertEqual(ctx.get_start_method(), method) 3786 self.assertIs(ctx.get_context(), ctx) 3787 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 3788 self.assertRaises(ValueError, ctx.set_start_method, None) 3789 self.check_context(ctx) 3790 3791 def test_set_get(self): 3792 multiprocessing.set_forkserver_preload(PRELOAD) 3793 count = 0 3794 old_method = multiprocessing.get_start_method() 3795 try: 3796 for method in ('fork', 'spawn', 'forkserver'): 3797 try: 3798 multiprocessing.set_start_method(method, force=True) 3799 except ValueError: 3800 continue 3801 self.assertEqual(multiprocessing.get_start_method(), method) 3802 ctx = multiprocessing.get_context() 3803 self.assertEqual(ctx.get_start_method(), method) 3804 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 3805 self.assertTrue( 3806 ctx.Process.__name__.lower().startswith(method)) 3807 self.check_context(multiprocessing) 3808 count += 1 3809 finally: 3810 multiprocessing.set_start_method(old_method, force=True) 3811 self.assertGreaterEqual(count, 1) 3812 3813 def test_get_all(self): 3814 methods = multiprocessing.get_all_start_methods() 3815 if sys.platform == 'win32': 3816 self.assertEqual(methods, ['spawn']) 3817 else: 3818 self.assertTrue(methods == ['fork', 'spawn'] or 3819 methods == ['fork', 'spawn', 'forkserver']) 3820 3821 def test_preload_resources(self): 3822 if multiprocessing.get_start_method() != 'forkserver': 3823 self.skipTest("test only relevant for 'forkserver' method") 3824 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') 3825 rc, out, err = test.support.script_helper.assert_python_ok(name) 3826 out = out.decode() 3827 err = err.decode() 3828 if out.rstrip() != 'ok' or err != '': 3829 print(out) 3830 print(err) 3831 self.fail("failed spawning forkserver or grandchild") 3832 3833 3834 # 3835 # Check that killing process does not leak named semaphores 3836 # 3837 3838 @unittest.skipIf(sys.platform == "win32", 3839 "test semantics don't make sense on Windows") 3840 class TestSemaphoreTracker(unittest.TestCase): 3841 def test_semaphore_tracker(self): 3842 import subprocess 3843 cmd = '''if 1: 3844 import multiprocessing as mp, time, os 3845 mp.set_start_method("spawn") 3846 lock1 = mp.Lock() 3847 lock2 = mp.Lock() 3848 os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n") 3849 os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n") 3850 time.sleep(10) 3851 ''' 3852 r, w = os.pipe() 3853 p = subprocess.Popen([sys.executable, 3854 '-c', cmd % (w, w)], 3855 pass_fds=[w], 3856 stderr=subprocess.PIPE) 3857 os.close(w) 3858 with open(r, 'rb', closefd=True) as f: 3859 name1 = f.readline().rstrip().decode('ascii') 3860 name2 = f.readline().rstrip().decode('ascii') 3861 _multiprocessing.sem_unlink(name1) 3862 p.terminate() 3863 p.wait() 3864 time.sleep(2.0) 3865 with self.assertRaises(OSError) as ctx: 3866 _multiprocessing.sem_unlink(name2) 3867 # docs say it should be ENOENT, but OSX seems to give EINVAL 3868 self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) 3869 err = p.stderr.read().decode('utf-8') 3870 p.stderr.close() 3871 expected = 'semaphore_tracker: There appear to be 2 leaked semaphores' 3872 self.assertRegex(err, expected) 3873 self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) 3874 3875 # 3876 # Mixins 3877 # 3878 3879 class ProcessesMixin(object): 3880 TYPE = 'processes' 3881 Process = multiprocessing.Process 3882 connection = multiprocessing.connection 3883 current_process = staticmethod(multiprocessing.current_process) 3884 active_children = staticmethod(multiprocessing.active_children) 3885 Pool = staticmethod(multiprocessing.Pool) 3886 Pipe = staticmethod(multiprocessing.Pipe) 3887 Queue = staticmethod(multiprocessing.Queue) 3888 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 3889 Lock = staticmethod(multiprocessing.Lock) 3890 RLock = staticmethod(multiprocessing.RLock) 3891 Semaphore = staticmethod(multiprocessing.Semaphore) 3892 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 3893 Condition = staticmethod(multiprocessing.Condition) 3894 Event = staticmethod(multiprocessing.Event) 3895 Barrier = staticmethod(multiprocessing.Barrier) 3896 Value = staticmethod(multiprocessing.Value) 3897 Array = staticmethod(multiprocessing.Array) 3898 RawValue = staticmethod(multiprocessing.RawValue) 3899 RawArray = staticmethod(multiprocessing.RawArray) 3900 3901 3902 class ManagerMixin(object): 3903 TYPE = 'manager' 3904 Process = multiprocessing.Process 3905 Queue = property(operator.attrgetter('manager.Queue')) 3906 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 3907 Lock = property(operator.attrgetter('manager.Lock')) 3908 RLock = property(operator.attrgetter('manager.RLock')) 3909 Semaphore = property(operator.attrgetter('manager.Semaphore')) 3910 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 3911 Condition = property(operator.attrgetter('manager.Condition')) 3912 Event = property(operator.attrgetter('manager.Event')) 3913 Barrier = property(operator.attrgetter('manager.Barrier')) 3914 Value = property(operator.attrgetter('manager.Value')) 3915 Array = property(operator.attrgetter('manager.Array')) 3916 list = property(operator.attrgetter('manager.list')) 3917 dict = property(operator.attrgetter('manager.dict')) 3918 Namespace = property(operator.attrgetter('manager.Namespace')) 3919 3920 @classmethod 3921 def Pool(cls, *args, **kwds): 3922 return cls.manager.Pool(*args, **kwds) 3923 3924 @classmethod 3925 def setUpClass(cls): 3926 cls.manager = multiprocessing.Manager() 3927 3928 @classmethod 3929 def tearDownClass(cls): 3930 # only the manager process should be returned by active_children() 3931 # but this can take a bit on slow machines, so wait a few seconds 3932 # if there are other children too (see #17395) 3933 t = 0.01 3934 while len(multiprocessing.active_children()) > 1 and t < 5: 3935 time.sleep(t) 3936 t *= 2 3937 gc.collect() # do garbage collection 3938 if cls.manager._number_of_objects() != 0: 3939 # This is not really an error since some tests do not 3940 # ensure that all processes which hold a reference to a 3941 # managed object have been joined. 3942 print('Shared objects which still exist at manager shutdown:') 3943 print(cls.manager._debug_info()) 3944 cls.manager.shutdown() 3945 cls.manager.join() 3946 cls.manager = None 3947 3948 3949 class ThreadsMixin(object): 3950 TYPE = 'threads' 3951 Process = multiprocessing.dummy.Process 3952 connection = multiprocessing.dummy.connection 3953 current_process = staticmethod(multiprocessing.dummy.current_process) 3954 active_children = staticmethod(multiprocessing.dummy.active_children) 3955 Pool = staticmethod(multiprocessing.dummy.Pool) 3956 Pipe = staticmethod(multiprocessing.dummy.Pipe) 3957 Queue = staticmethod(multiprocessing.dummy.Queue) 3958 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 3959 Lock = staticmethod(multiprocessing.dummy.Lock) 3960 RLock = staticmethod(multiprocessing.dummy.RLock) 3961 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 3962 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 3963 Condition = staticmethod(multiprocessing.dummy.Condition) 3964 Event = staticmethod(multiprocessing.dummy.Event) 3965 Barrier = staticmethod(multiprocessing.dummy.Barrier) 3966 Value = staticmethod(multiprocessing.dummy.Value) 3967 Array = staticmethod(multiprocessing.dummy.Array) 3968 3969 # 3970 # Functions used to create test cases from the base ones in this module 3971 # 3972 3973 def install_tests_in_module_dict(remote_globs, start_method): 3974 __module__ = remote_globs['__name__'] 3975 local_globs = globals() 3976 ALL_TYPES = {'processes', 'threads', 'manager'} 3977 3978 for name, base in local_globs.items(): 3979 if not isinstance(base, type): 3980 continue 3981 if issubclass(base, BaseTestCase): 3982 if base is BaseTestCase: 3983 continue 3984 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 3985 for type_ in base.ALLOWED_TYPES: 3986 newname = 'With' + type_.capitalize() + name[1:] 3987 Mixin = local_globs[type_.capitalize() + 'Mixin'] 3988 class Temp(base, Mixin, unittest.TestCase): 3989 pass 3990 Temp.__name__ = Temp.__qualname__ = newname 3991 Temp.__module__ = __module__ 3992 remote_globs[newname] = Temp 3993 elif issubclass(base, unittest.TestCase): 3994 class Temp(base, object): 3995 pass 3996 Temp.__name__ = Temp.__qualname__ = name 3997 Temp.__module__ = __module__ 3998 remote_globs[name] = Temp 3999 4000 dangling = [None, None] 4001 old_start_method = [None] 4002 4003 def setUpModule(): 4004 multiprocessing.set_forkserver_preload(PRELOAD) 4005 multiprocessing.process._cleanup() 4006 dangling[0] = multiprocessing.process._dangling.copy() 4007 dangling[1] = threading._dangling.copy() 4008 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 4009 try: 4010 multiprocessing.set_start_method(start_method, force=True) 4011 except ValueError: 4012 raise unittest.SkipTest(start_method + 4013 ' start method not supported') 4014 4015 if sys.platform.startswith("linux"): 4016 try: 4017 lock = multiprocessing.RLock() 4018 except OSError: 4019 raise unittest.SkipTest("OSError raises on RLock creation, " 4020 "see issue 3111!") 4021 check_enough_semaphores() 4022 util.get_temp_dir() # creates temp directory 4023 multiprocessing.get_logger().setLevel(LOG_LEVEL) 4024 4025 def tearDownModule(): 4026 multiprocessing.set_start_method(old_start_method[0], force=True) 4027 # pause a bit so we don't get warning about dangling threads/processes 4028 time.sleep(0.5) 4029 multiprocessing.process._cleanup() 4030 gc.collect() 4031 tmp = set(multiprocessing.process._dangling) - set(dangling[0]) 4032 if tmp: 4033 print('Dangling processes:', tmp, file=sys.stderr) 4034 del tmp 4035 tmp = set(threading._dangling) - set(dangling[1]) 4036 if tmp: 4037 print('Dangling threads:', tmp, file=sys.stderr) 4038 4039 remote_globs['setUpModule'] = setUpModule 4040 remote_globs['tearDownModule'] = tearDownModule 4041