1 #!/usr/bin/env python 2 3 # 4 # Unit tests for the multiprocessing package 5 # 6 7 import unittest 8 import Queue 9 import time 10 import sys 11 import os 12 import gc 13 import signal 14 import array 15 import socket 16 import random 17 import logging 18 import errno 19 import test.script_helper 20 from test import test_support 21 from StringIO import StringIO 22 _multiprocessing = test_support.import_module('_multiprocessing') 23 # import threading after _multiprocessing to raise a more relevant error 24 # message: "No module named _multiprocessing". _multiprocessing is not compiled 25 # without thread support. 26 import threading 27 28 # Work around broken sem_open implementations 29 test_support.import_module('multiprocessing.synchronize') 30 31 import multiprocessing.dummy 32 import multiprocessing.connection 33 import multiprocessing.managers 34 import multiprocessing.heap 35 import multiprocessing.pool 36 37 from multiprocessing import util 38 39 try: 40 from multiprocessing import reduction 41 HAS_REDUCTION = True 42 except ImportError: 43 HAS_REDUCTION = False 44 45 try: 46 from multiprocessing.sharedctypes import Value, copy 47 HAS_SHAREDCTYPES = True 48 except ImportError: 49 HAS_SHAREDCTYPES = False 50 51 try: 52 import msvcrt 53 except ImportError: 54 msvcrt = None 55 56 # 57 # 58 # 59 60 latin = str 61 62 # 63 # Constants 64 # 65 66 LOG_LEVEL = util.SUBWARNING 67 #LOG_LEVEL = logging.DEBUG 68 69 DELTA = 0.1 70 CHECK_TIMINGS = False # making true makes tests take a lot longer 71 # and can sometimes cause some non-serious 72 # failures because some calls block a bit 73 # longer than expected 74 if CHECK_TIMINGS: 75 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 76 else: 77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 78 79 HAVE_GETVALUE = not getattr(_multiprocessing, 80 'HAVE_BROKEN_SEM_GETVALUE', False) 81 82 WIN32 = (sys.platform == "win32") 83 84 try: 85 MAXFD = os.sysconf("SC_OPEN_MAX") 86 except: 87 MAXFD = 256 88 89 # 90 # Some tests require ctypes 91 # 92 93 try: 94 from ctypes import Structure, c_int, c_double 95 except ImportError: 96 Structure = object 97 c_int = c_double = None 98 99 100 def check_enough_semaphores(): 101 """Check that the system supports enough semaphores to run the test.""" 102 # minimum number of semaphores available according to POSIX 103 nsems_min = 256 104 try: 105 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 106 except (AttributeError, ValueError): 107 # sysconf not available or setting not available 108 return 109 if nsems == -1 or nsems >= nsems_min: 110 return 111 raise unittest.SkipTest("The OS doesn't support enough semaphores " 112 "to run the test (required: %d)." % nsems_min) 113 114 115 # 116 # Creates a wrapper for a function which records the time it takes to finish 117 # 118 119 class TimingWrapper(object): 120 121 def __init__(self, func): 122 self.func = func 123 self.elapsed = None 124 125 def __call__(self, *args, **kwds): 126 t = time.time() 127 try: 128 return self.func(*args, **kwds) 129 finally: 130 self.elapsed = time.time() - t 131 132 # 133 # Base class for test cases 134 # 135 136 class BaseTestCase(object): 137 138 ALLOWED_TYPES = ('processes', 'manager', 'threads') 139 140 def assertTimingAlmostEqual(self, a, b): 141 if CHECK_TIMINGS: 142 self.assertAlmostEqual(a, b, 1) 143 144 def assertReturnsIfImplemented(self, value, func, *args): 145 try: 146 res = func(*args) 147 except NotImplementedError: 148 pass 149 else: 150 return self.assertEqual(value, res) 151 152 # For the sanity of Windows users, rather than crashing or freezing in 153 # multiple ways. 154 def __reduce__(self, *args): 155 raise NotImplementedError("shouldn't try to pickle a test case") 156 157 __reduce_ex__ = __reduce__ 158 159 # 160 # Return the value of a semaphore 161 # 162 163 def get_value(self): 164 try: 165 return self.get_value() 166 except AttributeError: 167 try: 168 return self._Semaphore__value 169 except AttributeError: 170 try: 171 return self._value 172 except AttributeError: 173 raise NotImplementedError 174 175 # 176 # Testcases 177 # 178 179 class _TestProcess(BaseTestCase): 180 181 ALLOWED_TYPES = ('processes', 'threads') 182 183 def test_current(self): 184 if self.TYPE == 'threads': 185 return 186 187 current = self.current_process() 188 authkey = current.authkey 189 190 self.assertTrue(current.is_alive()) 191 self.assertTrue(not current.daemon) 192 self.assertIsInstance(authkey, bytes) 193 self.assertTrue(len(authkey) > 0) 194 self.assertEqual(current.ident, os.getpid()) 195 self.assertEqual(current.exitcode, None) 196 197 @classmethod 198 def _test(cls, q, *args, **kwds): 199 current = cls.current_process() 200 q.put(args) 201 q.put(kwds) 202 q.put(current.name) 203 if cls.TYPE != 'threads': 204 q.put(bytes(current.authkey)) 205 q.put(current.pid) 206 207 def test_process(self): 208 q = self.Queue(1) 209 e = self.Event() 210 args = (q, 1, 2) 211 kwargs = {'hello':23, 'bye':2.54} 212 name = 'SomeProcess' 213 p = self.Process( 214 target=self._test, args=args, kwargs=kwargs, name=name 215 ) 216 p.daemon = True 217 current = self.current_process() 218 219 if self.TYPE != 'threads': 220 self.assertEqual(p.authkey, current.authkey) 221 self.assertEqual(p.is_alive(), False) 222 self.assertEqual(p.daemon, True) 223 self.assertNotIn(p, self.active_children()) 224 self.assertTrue(type(self.active_children()) is list) 225 self.assertEqual(p.exitcode, None) 226 227 p.start() 228 229 self.assertEqual(p.exitcode, None) 230 self.assertEqual(p.is_alive(), True) 231 self.assertIn(p, self.active_children()) 232 233 self.assertEqual(q.get(), args[1:]) 234 self.assertEqual(q.get(), kwargs) 235 self.assertEqual(q.get(), p.name) 236 if self.TYPE != 'threads': 237 self.assertEqual(q.get(), current.authkey) 238 self.assertEqual(q.get(), p.pid) 239 240 p.join() 241 242 self.assertEqual(p.exitcode, 0) 243 self.assertEqual(p.is_alive(), False) 244 self.assertNotIn(p, self.active_children()) 245 246 @classmethod 247 def _test_terminate(cls): 248 time.sleep(1000) 249 250 def test_terminate(self): 251 if self.TYPE == 'threads': 252 return 253 254 p = self.Process(target=self._test_terminate) 255 p.daemon = True 256 p.start() 257 258 self.assertEqual(p.is_alive(), True) 259 self.assertIn(p, self.active_children()) 260 self.assertEqual(p.exitcode, None) 261 262 p.terminate() 263 264 join = TimingWrapper(p.join) 265 self.assertEqual(join(), None) 266 self.assertTimingAlmostEqual(join.elapsed, 0.0) 267 268 self.assertEqual(p.is_alive(), False) 269 self.assertNotIn(p, self.active_children()) 270 271 p.join() 272 273 # XXX sometimes get p.exitcode == 0 on Windows ... 274 #self.assertEqual(p.exitcode, -signal.SIGTERM) 275 276 def test_cpu_count(self): 277 try: 278 cpus = multiprocessing.cpu_count() 279 except NotImplementedError: 280 cpus = 1 281 self.assertTrue(type(cpus) is int) 282 self.assertTrue(cpus >= 1) 283 284 def test_active_children(self): 285 self.assertEqual(type(self.active_children()), list) 286 287 p = self.Process(target=time.sleep, args=(DELTA,)) 288 self.assertNotIn(p, self.active_children()) 289 290 p.daemon = True 291 p.start() 292 self.assertIn(p, self.active_children()) 293 294 p.join() 295 self.assertNotIn(p, self.active_children()) 296 297 @classmethod 298 def _test_recursion(cls, wconn, id): 299 from multiprocessing import forking 300 wconn.send(id) 301 if len(id) < 2: 302 for i in range(2): 303 p = cls.Process( 304 target=cls._test_recursion, args=(wconn, id+[i]) 305 ) 306 p.start() 307 p.join() 308 309 def test_recursion(self): 310 rconn, wconn = self.Pipe(duplex=False) 311 self._test_recursion(wconn, []) 312 313 time.sleep(DELTA) 314 result = [] 315 while rconn.poll(): 316 result.append(rconn.recv()) 317 318 expected = [ 319 [], 320 [0], 321 [0, 0], 322 [0, 1], 323 [1], 324 [1, 0], 325 [1, 1] 326 ] 327 self.assertEqual(result, expected) 328 329 @classmethod 330 def _test_sys_exit(cls, reason, testfn): 331 sys.stderr = open(testfn, 'w') 332 sys.exit(reason) 333 334 def test_sys_exit(self): 335 # See Issue 13854 336 if self.TYPE == 'threads': 337 return 338 339 testfn = test_support.TESTFN 340 self.addCleanup(test_support.unlink, testfn) 341 342 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)): 343 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 344 p.daemon = True 345 p.start() 346 p.join(5) 347 self.assertEqual(p.exitcode, code) 348 349 with open(testfn, 'r') as f: 350 self.assertEqual(f.read().rstrip(), str(reason)) 351 352 for reason in (True, False, 8): 353 p = self.Process(target=sys.exit, args=(reason,)) 354 p.daemon = True 355 p.start() 356 p.join(5) 357 self.assertEqual(p.exitcode, reason) 358 359 # 360 # 361 # 362 363 class _UpperCaser(multiprocessing.Process): 364 365 def __init__(self): 366 multiprocessing.Process.__init__(self) 367 self.child_conn, self.parent_conn = multiprocessing.Pipe() 368 369 def run(self): 370 self.parent_conn.close() 371 for s in iter(self.child_conn.recv, None): 372 self.child_conn.send(s.upper()) 373 self.child_conn.close() 374 375 def submit(self, s): 376 assert type(s) is str 377 self.parent_conn.send(s) 378 return self.parent_conn.recv() 379 380 def stop(self): 381 self.parent_conn.send(None) 382 self.parent_conn.close() 383 self.child_conn.close() 384 385 class _TestSubclassingProcess(BaseTestCase): 386 387 ALLOWED_TYPES = ('processes',) 388 389 def test_subclassing(self): 390 uppercaser = _UpperCaser() 391 uppercaser.daemon = True 392 uppercaser.start() 393 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 394 self.assertEqual(uppercaser.submit('world'), 'WORLD') 395 uppercaser.stop() 396 uppercaser.join() 397 398 # 399 # 400 # 401 402 def queue_empty(q): 403 if hasattr(q, 'empty'): 404 return q.empty() 405 else: 406 return q.qsize() == 0 407 408 def queue_full(q, maxsize): 409 if hasattr(q, 'full'): 410 return q.full() 411 else: 412 return q.qsize() == maxsize 413 414 415 class _TestQueue(BaseTestCase): 416 417 418 @classmethod 419 def _test_put(cls, queue, child_can_start, parent_can_continue): 420 child_can_start.wait() 421 for i in range(6): 422 queue.get() 423 parent_can_continue.set() 424 425 def test_put(self): 426 MAXSIZE = 6 427 queue = self.Queue(maxsize=MAXSIZE) 428 child_can_start = self.Event() 429 parent_can_continue = self.Event() 430 431 proc = self.Process( 432 target=self._test_put, 433 args=(queue, child_can_start, parent_can_continue) 434 ) 435 proc.daemon = True 436 proc.start() 437 438 self.assertEqual(queue_empty(queue), True) 439 self.assertEqual(queue_full(queue, MAXSIZE), False) 440 441 queue.put(1) 442 queue.put(2, True) 443 queue.put(3, True, None) 444 queue.put(4, False) 445 queue.put(5, False, None) 446 queue.put_nowait(6) 447 448 # the values may be in buffer but not yet in pipe so sleep a bit 449 time.sleep(DELTA) 450 451 self.assertEqual(queue_empty(queue), False) 452 self.assertEqual(queue_full(queue, MAXSIZE), True) 453 454 put = TimingWrapper(queue.put) 455 put_nowait = TimingWrapper(queue.put_nowait) 456 457 self.assertRaises(Queue.Full, put, 7, False) 458 self.assertTimingAlmostEqual(put.elapsed, 0) 459 460 self.assertRaises(Queue.Full, put, 7, False, None) 461 self.assertTimingAlmostEqual(put.elapsed, 0) 462 463 self.assertRaises(Queue.Full, put_nowait, 7) 464 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 465 466 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1) 467 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 468 469 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2) 470 self.assertTimingAlmostEqual(put.elapsed, 0) 471 472 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3) 473 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 474 475 child_can_start.set() 476 parent_can_continue.wait() 477 478 self.assertEqual(queue_empty(queue), True) 479 self.assertEqual(queue_full(queue, MAXSIZE), False) 480 481 proc.join() 482 483 @classmethod 484 def _test_get(cls, queue, child_can_start, parent_can_continue): 485 child_can_start.wait() 486 #queue.put(1) 487 queue.put(2) 488 queue.put(3) 489 queue.put(4) 490 queue.put(5) 491 parent_can_continue.set() 492 493 def test_get(self): 494 queue = self.Queue() 495 child_can_start = self.Event() 496 parent_can_continue = self.Event() 497 498 proc = self.Process( 499 target=self._test_get, 500 args=(queue, child_can_start, parent_can_continue) 501 ) 502 proc.daemon = True 503 proc.start() 504 505 self.assertEqual(queue_empty(queue), True) 506 507 child_can_start.set() 508 parent_can_continue.wait() 509 510 time.sleep(DELTA) 511 self.assertEqual(queue_empty(queue), False) 512 513 # Hangs unexpectedly, remove for now 514 #self.assertEqual(queue.get(), 1) 515 self.assertEqual(queue.get(True, None), 2) 516 self.assertEqual(queue.get(True), 3) 517 self.assertEqual(queue.get(timeout=1), 4) 518 self.assertEqual(queue.get_nowait(), 5) 519 520 self.assertEqual(queue_empty(queue), True) 521 522 get = TimingWrapper(queue.get) 523 get_nowait = TimingWrapper(queue.get_nowait) 524 525 self.assertRaises(Queue.Empty, get, False) 526 self.assertTimingAlmostEqual(get.elapsed, 0) 527 528 self.assertRaises(Queue.Empty, get, False, None) 529 self.assertTimingAlmostEqual(get.elapsed, 0) 530 531 self.assertRaises(Queue.Empty, get_nowait) 532 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 533 534 self.assertRaises(Queue.Empty, get, True, TIMEOUT1) 535 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 536 537 self.assertRaises(Queue.Empty, get, False, TIMEOUT2) 538 self.assertTimingAlmostEqual(get.elapsed, 0) 539 540 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3) 541 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 542 543 proc.join() 544 545 @classmethod 546 def _test_fork(cls, queue): 547 for i in range(10, 20): 548 queue.put(i) 549 # note that at this point the items may only be buffered, so the 550 # process cannot shutdown until the feeder thread has finished 551 # pushing items onto the pipe. 552 553 def test_fork(self): 554 # Old versions of Queue would fail to create a new feeder 555 # thread for a forked process if the original process had its 556 # own feeder thread. This test checks that this no longer 557 # happens. 558 559 queue = self.Queue() 560 561 # put items on queue so that main process starts a feeder thread 562 for i in range(10): 563 queue.put(i) 564 565 # wait to make sure thread starts before we fork a new process 566 time.sleep(DELTA) 567 568 # fork process 569 p = self.Process(target=self._test_fork, args=(queue,)) 570 p.daemon = True 571 p.start() 572 573 # check that all expected items are in the queue 574 for i in range(20): 575 self.assertEqual(queue.get(), i) 576 self.assertRaises(Queue.Empty, queue.get, False) 577 578 p.join() 579 580 def test_qsize(self): 581 q = self.Queue() 582 try: 583 self.assertEqual(q.qsize(), 0) 584 except NotImplementedError: 585 return 586 q.put(1) 587 self.assertEqual(q.qsize(), 1) 588 q.put(5) 589 self.assertEqual(q.qsize(), 2) 590 q.get() 591 self.assertEqual(q.qsize(), 1) 592 q.get() 593 self.assertEqual(q.qsize(), 0) 594 595 @classmethod 596 def _test_task_done(cls, q): 597 for obj in iter(q.get, None): 598 time.sleep(DELTA) 599 q.task_done() 600 601 def test_task_done(self): 602 queue = self.JoinableQueue() 603 604 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'): 605 self.skipTest("requires 'queue.task_done()' method") 606 607 workers = [self.Process(target=self._test_task_done, args=(queue,)) 608 for i in xrange(4)] 609 610 for p in workers: 611 p.daemon = True 612 p.start() 613 614 for i in xrange(10): 615 queue.put(i) 616 617 queue.join() 618 619 for p in workers: 620 queue.put(None) 621 622 for p in workers: 623 p.join() 624 625 # 626 # 627 # 628 629 class _TestLock(BaseTestCase): 630 631 def test_lock(self): 632 lock = self.Lock() 633 self.assertEqual(lock.acquire(), True) 634 self.assertEqual(lock.acquire(False), False) 635 self.assertEqual(lock.release(), None) 636 self.assertRaises((ValueError, threading.ThreadError), lock.release) 637 638 def test_rlock(self): 639 lock = self.RLock() 640 self.assertEqual(lock.acquire(), True) 641 self.assertEqual(lock.acquire(), True) 642 self.assertEqual(lock.acquire(), True) 643 self.assertEqual(lock.release(), None) 644 self.assertEqual(lock.release(), None) 645 self.assertEqual(lock.release(), None) 646 self.assertRaises((AssertionError, RuntimeError), lock.release) 647 648 def test_lock_context(self): 649 with self.Lock(): 650 pass 651 652 653 class _TestSemaphore(BaseTestCase): 654 655 def _test_semaphore(self, sem): 656 self.assertReturnsIfImplemented(2, get_value, sem) 657 self.assertEqual(sem.acquire(), True) 658 self.assertReturnsIfImplemented(1, get_value, sem) 659 self.assertEqual(sem.acquire(), True) 660 self.assertReturnsIfImplemented(0, get_value, sem) 661 self.assertEqual(sem.acquire(False), False) 662 self.assertReturnsIfImplemented(0, get_value, sem) 663 self.assertEqual(sem.release(), None) 664 self.assertReturnsIfImplemented(1, get_value, sem) 665 self.assertEqual(sem.release(), None) 666 self.assertReturnsIfImplemented(2, get_value, sem) 667 668 def test_semaphore(self): 669 sem = self.Semaphore(2) 670 self._test_semaphore(sem) 671 self.assertEqual(sem.release(), None) 672 self.assertReturnsIfImplemented(3, get_value, sem) 673 self.assertEqual(sem.release(), None) 674 self.assertReturnsIfImplemented(4, get_value, sem) 675 676 def test_bounded_semaphore(self): 677 sem = self.BoundedSemaphore(2) 678 self._test_semaphore(sem) 679 # Currently fails on OS/X 680 #if HAVE_GETVALUE: 681 # self.assertRaises(ValueError, sem.release) 682 # self.assertReturnsIfImplemented(2, get_value, sem) 683 684 def test_timeout(self): 685 if self.TYPE != 'processes': 686 return 687 688 sem = self.Semaphore(0) 689 acquire = TimingWrapper(sem.acquire) 690 691 self.assertEqual(acquire(False), False) 692 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 693 694 self.assertEqual(acquire(False, None), False) 695 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 696 697 self.assertEqual(acquire(False, TIMEOUT1), False) 698 self.assertTimingAlmostEqual(acquire.elapsed, 0) 699 700 self.assertEqual(acquire(True, TIMEOUT2), False) 701 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 702 703 self.assertEqual(acquire(timeout=TIMEOUT3), False) 704 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 705 706 707 class _TestCondition(BaseTestCase): 708 709 @classmethod 710 def f(cls, cond, sleeping, woken, timeout=None): 711 cond.acquire() 712 sleeping.release() 713 cond.wait(timeout) 714 woken.release() 715 cond.release() 716 717 def check_invariant(self, cond): 718 # this is only supposed to succeed when there are no sleepers 719 if self.TYPE == 'processes': 720 try: 721 sleepers = (cond._sleeping_count.get_value() - 722 cond._woken_count.get_value()) 723 self.assertEqual(sleepers, 0) 724 self.assertEqual(cond._wait_semaphore.get_value(), 0) 725 except NotImplementedError: 726 pass 727 728 def test_notify(self): 729 cond = self.Condition() 730 sleeping = self.Semaphore(0) 731 woken = self.Semaphore(0) 732 733 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 734 p.daemon = True 735 p.start() 736 737 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 738 p.daemon = True 739 p.start() 740 741 # wait for both children to start sleeping 742 sleeping.acquire() 743 sleeping.acquire() 744 745 # check no process/thread has woken up 746 time.sleep(DELTA) 747 self.assertReturnsIfImplemented(0, get_value, woken) 748 749 # wake up one process/thread 750 cond.acquire() 751 cond.notify() 752 cond.release() 753 754 # check one process/thread has woken up 755 time.sleep(DELTA) 756 self.assertReturnsIfImplemented(1, get_value, woken) 757 758 # wake up another 759 cond.acquire() 760 cond.notify() 761 cond.release() 762 763 # check other has woken up 764 time.sleep(DELTA) 765 self.assertReturnsIfImplemented(2, get_value, woken) 766 767 # check state is not mucked up 768 self.check_invariant(cond) 769 p.join() 770 771 def test_notify_all(self): 772 cond = self.Condition() 773 sleeping = self.Semaphore(0) 774 woken = self.Semaphore(0) 775 776 # start some threads/processes which will timeout 777 for i in range(3): 778 p = self.Process(target=self.f, 779 args=(cond, sleeping, woken, TIMEOUT1)) 780 p.daemon = True 781 p.start() 782 783 t = threading.Thread(target=self.f, 784 args=(cond, sleeping, woken, TIMEOUT1)) 785 t.daemon = True 786 t.start() 787 788 # wait for them all to sleep 789 for i in xrange(6): 790 sleeping.acquire() 791 792 # check they have all timed out 793 for i in xrange(6): 794 woken.acquire() 795 self.assertReturnsIfImplemented(0, get_value, woken) 796 797 # check state is not mucked up 798 self.check_invariant(cond) 799 800 # start some more threads/processes 801 for i in range(3): 802 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 803 p.daemon = True 804 p.start() 805 806 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 807 t.daemon = True 808 t.start() 809 810 # wait for them to all sleep 811 for i in xrange(6): 812 sleeping.acquire() 813 814 # check no process/thread has woken up 815 time.sleep(DELTA) 816 self.assertReturnsIfImplemented(0, get_value, woken) 817 818 # wake them all up 819 cond.acquire() 820 cond.notify_all() 821 cond.release() 822 823 # check they have all woken 824 time.sleep(DELTA) 825 self.assertReturnsIfImplemented(6, get_value, woken) 826 827 # check state is not mucked up 828 self.check_invariant(cond) 829 830 def test_timeout(self): 831 cond = self.Condition() 832 wait = TimingWrapper(cond.wait) 833 cond.acquire() 834 res = wait(TIMEOUT1) 835 cond.release() 836 self.assertEqual(res, None) 837 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 838 839 840 class _TestEvent(BaseTestCase): 841 842 @classmethod 843 def _test_event(cls, event): 844 time.sleep(TIMEOUT2) 845 event.set() 846 847 def test_event(self): 848 event = self.Event() 849 wait = TimingWrapper(event.wait) 850 851 # Removed temporarily, due to API shear, this does not 852 # work with threading._Event objects. is_set == isSet 853 self.assertEqual(event.is_set(), False) 854 855 # Removed, threading.Event.wait() will return the value of the __flag 856 # instead of None. API Shear with the semaphore backed mp.Event 857 self.assertEqual(wait(0.0), False) 858 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 859 self.assertEqual(wait(TIMEOUT1), False) 860 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 861 862 event.set() 863 864 # See note above on the API differences 865 self.assertEqual(event.is_set(), True) 866 self.assertEqual(wait(), True) 867 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 868 self.assertEqual(wait(TIMEOUT1), True) 869 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 870 # self.assertEqual(event.is_set(), True) 871 872 event.clear() 873 874 #self.assertEqual(event.is_set(), False) 875 876 p = self.Process(target=self._test_event, args=(event,)) 877 p.daemon = True 878 p.start() 879 self.assertEqual(wait(), True) 880 881 # 882 # 883 # 884 885 class _TestValue(BaseTestCase): 886 887 ALLOWED_TYPES = ('processes',) 888 889 codes_values = [ 890 ('i', 4343, 24234), 891 ('d', 3.625, -4.25), 892 ('h', -232, 234), 893 ('c', latin('x'), latin('y')) 894 ] 895 896 def setUp(self): 897 if not HAS_SHAREDCTYPES: 898 self.skipTest("requires multiprocessing.sharedctypes") 899 900 @classmethod 901 def _test(cls, values): 902 for sv, cv in zip(values, cls.codes_values): 903 sv.value = cv[2] 904 905 906 def test_value(self, raw=False): 907 if raw: 908 values = [self.RawValue(code, value) 909 for code, value, _ in self.codes_values] 910 else: 911 values = [self.Value(code, value) 912 for code, value, _ in self.codes_values] 913 914 for sv, cv in zip(values, self.codes_values): 915 self.assertEqual(sv.value, cv[1]) 916 917 proc = self.Process(target=self._test, args=(values,)) 918 proc.daemon = True 919 proc.start() 920 proc.join() 921 922 for sv, cv in zip(values, self.codes_values): 923 self.assertEqual(sv.value, cv[2]) 924 925 def test_rawvalue(self): 926 self.test_value(raw=True) 927 928 def test_getobj_getlock(self): 929 val1 = self.Value('i', 5) 930 lock1 = val1.get_lock() 931 obj1 = val1.get_obj() 932 933 val2 = self.Value('i', 5, lock=None) 934 lock2 = val2.get_lock() 935 obj2 = val2.get_obj() 936 937 lock = self.Lock() 938 val3 = self.Value('i', 5, lock=lock) 939 lock3 = val3.get_lock() 940 obj3 = val3.get_obj() 941 self.assertEqual(lock, lock3) 942 943 arr4 = self.Value('i', 5, lock=False) 944 self.assertFalse(hasattr(arr4, 'get_lock')) 945 self.assertFalse(hasattr(arr4, 'get_obj')) 946 947 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 948 949 arr5 = self.RawValue('i', 5) 950 self.assertFalse(hasattr(arr5, 'get_lock')) 951 self.assertFalse(hasattr(arr5, 'get_obj')) 952 953 954 class _TestArray(BaseTestCase): 955 956 ALLOWED_TYPES = ('processes',) 957 958 @classmethod 959 def f(cls, seq): 960 for i in range(1, len(seq)): 961 seq[i] += seq[i-1] 962 963 @unittest.skipIf(c_int is None, "requires _ctypes") 964 def test_array(self, raw=False): 965 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 966 if raw: 967 arr = self.RawArray('i', seq) 968 else: 969 arr = self.Array('i', seq) 970 971 self.assertEqual(len(arr), len(seq)) 972 self.assertEqual(arr[3], seq[3]) 973 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 974 975 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 976 977 self.assertEqual(list(arr[:]), seq) 978 979 self.f(seq) 980 981 p = self.Process(target=self.f, args=(arr,)) 982 p.daemon = True 983 p.start() 984 p.join() 985 986 self.assertEqual(list(arr[:]), seq) 987 988 @unittest.skipIf(c_int is None, "requires _ctypes") 989 def test_array_from_size(self): 990 size = 10 991 # Test for zeroing (see issue #11675). 992 # The repetition below strengthens the test by increasing the chances 993 # of previously allocated non-zero memory being used for the new array 994 # on the 2nd and 3rd loops. 995 for _ in range(3): 996 arr = self.Array('i', size) 997 self.assertEqual(len(arr), size) 998 self.assertEqual(list(arr), [0] * size) 999 arr[:] = range(10) 1000 self.assertEqual(list(arr), range(10)) 1001 del arr 1002 1003 @unittest.skipIf(c_int is None, "requires _ctypes") 1004 def test_rawarray(self): 1005 self.test_array(raw=True) 1006 1007 @unittest.skipIf(c_int is None, "requires _ctypes") 1008 def test_array_accepts_long(self): 1009 arr = self.Array('i', 10L) 1010 self.assertEqual(len(arr), 10) 1011 raw_arr = self.RawArray('i', 10L) 1012 self.assertEqual(len(raw_arr), 10) 1013 1014 @unittest.skipIf(c_int is None, "requires _ctypes") 1015 def test_getobj_getlock_obj(self): 1016 arr1 = self.Array('i', range(10)) 1017 lock1 = arr1.get_lock() 1018 obj1 = arr1.get_obj() 1019 1020 arr2 = self.Array('i', range(10), lock=None) 1021 lock2 = arr2.get_lock() 1022 obj2 = arr2.get_obj() 1023 1024 lock = self.Lock() 1025 arr3 = self.Array('i', range(10), lock=lock) 1026 lock3 = arr3.get_lock() 1027 obj3 = arr3.get_obj() 1028 self.assertEqual(lock, lock3) 1029 1030 arr4 = self.Array('i', range(10), lock=False) 1031 self.assertFalse(hasattr(arr4, 'get_lock')) 1032 self.assertFalse(hasattr(arr4, 'get_obj')) 1033 self.assertRaises(AttributeError, 1034 self.Array, 'i', range(10), lock='notalock') 1035 1036 arr5 = self.RawArray('i', range(10)) 1037 self.assertFalse(hasattr(arr5, 'get_lock')) 1038 self.assertFalse(hasattr(arr5, 'get_obj')) 1039 1040 # 1041 # 1042 # 1043 1044 class _TestContainers(BaseTestCase): 1045 1046 ALLOWED_TYPES = ('manager',) 1047 1048 def test_list(self): 1049 a = self.list(range(10)) 1050 self.assertEqual(a[:], range(10)) 1051 1052 b = self.list() 1053 self.assertEqual(b[:], []) 1054 1055 b.extend(range(5)) 1056 self.assertEqual(b[:], range(5)) 1057 1058 self.assertEqual(b[2], 2) 1059 self.assertEqual(b[2:10], [2,3,4]) 1060 1061 b *= 2 1062 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 1063 1064 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 1065 1066 self.assertEqual(a[:], range(10)) 1067 1068 d = [a, b] 1069 e = self.list(d) 1070 self.assertEqual( 1071 e[:], 1072 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 1073 ) 1074 1075 f = self.list([a]) 1076 a.append('hello') 1077 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) 1078 1079 def test_dict(self): 1080 d = self.dict() 1081 indices = range(65, 70) 1082 for i in indices: 1083 d[i] = chr(i) 1084 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 1085 self.assertEqual(sorted(d.keys()), indices) 1086 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 1087 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 1088 1089 def test_namespace(self): 1090 n = self.Namespace() 1091 n.name = 'Bob' 1092 n.job = 'Builder' 1093 n._hidden = 'hidden' 1094 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 1095 del n.job 1096 self.assertEqual(str(n), "Namespace(name='Bob')") 1097 self.assertTrue(hasattr(n, 'name')) 1098 self.assertTrue(not hasattr(n, 'job')) 1099 1100 # 1101 # 1102 # 1103 1104 def sqr(x, wait=0.0): 1105 time.sleep(wait) 1106 return x*x 1107 class _TestPool(BaseTestCase): 1108 1109 def test_apply(self): 1110 papply = self.pool.apply 1111 self.assertEqual(papply(sqr, (5,)), sqr(5)) 1112 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 1113 1114 def test_map(self): 1115 pmap = self.pool.map 1116 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10))) 1117 self.assertEqual(pmap(sqr, range(100), chunksize=20), 1118 map(sqr, range(100))) 1119 1120 def test_map_chunksize(self): 1121 try: 1122 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 1123 except multiprocessing.TimeoutError: 1124 self.fail("pool.map_async with chunksize stalled on null list") 1125 1126 def test_async(self): 1127 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 1128 get = TimingWrapper(res.get) 1129 self.assertEqual(get(), 49) 1130 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1131 1132 def test_async_timeout(self): 1133 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2)) 1134 get = TimingWrapper(res.get) 1135 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 1136 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 1137 1138 def test_imap(self): 1139 it = self.pool.imap(sqr, range(10)) 1140 self.assertEqual(list(it), map(sqr, range(10))) 1141 1142 it = self.pool.imap(sqr, range(10)) 1143 for i in range(10): 1144 self.assertEqual(it.next(), i*i) 1145 self.assertRaises(StopIteration, it.next) 1146 1147 it = self.pool.imap(sqr, range(1000), chunksize=100) 1148 for i in range(1000): 1149 self.assertEqual(it.next(), i*i) 1150 self.assertRaises(StopIteration, it.next) 1151 1152 def test_imap_unordered(self): 1153 it = self.pool.imap_unordered(sqr, range(1000)) 1154 self.assertEqual(sorted(it), map(sqr, range(1000))) 1155 1156 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53) 1157 self.assertEqual(sorted(it), map(sqr, range(1000))) 1158 1159 def test_make_pool(self): 1160 self.assertRaises(ValueError, multiprocessing.Pool, -1) 1161 self.assertRaises(ValueError, multiprocessing.Pool, 0) 1162 1163 p = multiprocessing.Pool(3) 1164 self.assertEqual(3, len(p._pool)) 1165 p.close() 1166 p.join() 1167 1168 def test_terminate(self): 1169 if self.TYPE == 'manager': 1170 # On Unix a forked process increfs each shared object to 1171 # which its parent process held a reference. If the 1172 # forked process gets terminated then there is likely to 1173 # be a reference leak. So to prevent 1174 # _TestZZZNumberOfObjects from failing we skip this test 1175 # when using a manager. 1176 return 1177 1178 result = self.pool.map_async( 1179 time.sleep, [0.1 for i in range(10000)], chunksize=1 1180 ) 1181 self.pool.terminate() 1182 join = TimingWrapper(self.pool.join) 1183 join() 1184 self.assertTrue(join.elapsed < 0.2) 1185 1186 def test_empty_iterable(self): 1187 # See Issue 12157 1188 p = self.Pool(1) 1189 1190 self.assertEqual(p.map(sqr, []), []) 1191 self.assertEqual(list(p.imap(sqr, [])), []) 1192 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 1193 self.assertEqual(p.map_async(sqr, []).get(), []) 1194 1195 p.close() 1196 p.join() 1197 1198 def unpickleable_result(): 1199 return lambda: 42 1200 1201 class _TestPoolWorkerErrors(BaseTestCase): 1202 ALLOWED_TYPES = ('processes', ) 1203 1204 def test_unpickleable_result(self): 1205 from multiprocessing.pool import MaybeEncodingError 1206 p = multiprocessing.Pool(2) 1207 1208 # Make sure we don't lose pool processes because of encoding errors. 1209 for iteration in range(20): 1210 res = p.apply_async(unpickleable_result) 1211 self.assertRaises(MaybeEncodingError, res.get) 1212 1213 p.close() 1214 p.join() 1215 1216 class _TestPoolWorkerLifetime(BaseTestCase): 1217 1218 ALLOWED_TYPES = ('processes', ) 1219 def test_pool_worker_lifetime(self): 1220 p = multiprocessing.Pool(3, maxtasksperchild=10) 1221 self.assertEqual(3, len(p._pool)) 1222 origworkerpids = [w.pid for w in p._pool] 1223 # Run many tasks so each worker gets replaced (hopefully) 1224 results = [] 1225 for i in range(100): 1226 results.append(p.apply_async(sqr, (i, ))) 1227 # Fetch the results and verify we got the right answers, 1228 # also ensuring all the tasks have completed. 1229 for (j, res) in enumerate(results): 1230 self.assertEqual(res.get(), sqr(j)) 1231 # Refill the pool 1232 p._repopulate_pool() 1233 # Wait until all workers are alive 1234 # (countdown * DELTA = 5 seconds max startup process time) 1235 countdown = 50 1236 while countdown and not all(w.is_alive() for w in p._pool): 1237 countdown -= 1 1238 time.sleep(DELTA) 1239 finalworkerpids = [w.pid for w in p._pool] 1240 # All pids should be assigned. See issue #7805. 1241 self.assertNotIn(None, origworkerpids) 1242 self.assertNotIn(None, finalworkerpids) 1243 # Finally, check that the worker pids have changed 1244 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 1245 p.close() 1246 p.join() 1247 1248 def test_pool_worker_lifetime_early_close(self): 1249 # Issue #10332: closing a pool whose workers have limited lifetimes 1250 # before all the tasks completed would make join() hang. 1251 p = multiprocessing.Pool(3, maxtasksperchild=1) 1252 results = [] 1253 for i in range(6): 1254 results.append(p.apply_async(sqr, (i, 0.3))) 1255 p.close() 1256 p.join() 1257 # check the results 1258 for (j, res) in enumerate(results): 1259 self.assertEqual(res.get(), sqr(j)) 1260 1261 1262 # 1263 # Test that manager has expected number of shared objects left 1264 # 1265 1266 class _TestZZZNumberOfObjects(BaseTestCase): 1267 # Because test cases are sorted alphabetically, this one will get 1268 # run after all the other tests for the manager. It tests that 1269 # there have been no "reference leaks" for the manager's shared 1270 # objects. Note the comment in _TestPool.test_terminate(). 1271 ALLOWED_TYPES = ('manager',) 1272 1273 def test_number_of_objects(self): 1274 EXPECTED_NUMBER = 1 # the pool object is still alive 1275 multiprocessing.active_children() # discard dead process objs 1276 gc.collect() # do garbage collection 1277 refs = self.manager._number_of_objects() 1278 debug_info = self.manager._debug_info() 1279 if refs != EXPECTED_NUMBER: 1280 print self.manager._debug_info() 1281 print debug_info 1282 1283 self.assertEqual(refs, EXPECTED_NUMBER) 1284 1285 # 1286 # Test of creating a customized manager class 1287 # 1288 1289 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 1290 1291 class FooBar(object): 1292 def f(self): 1293 return 'f()' 1294 def g(self): 1295 raise ValueError 1296 def _h(self): 1297 return '_h()' 1298 1299 def baz(): 1300 for i in xrange(10): 1301 yield i*i 1302 1303 class IteratorProxy(BaseProxy): 1304 _exposed_ = ('next', '__next__') 1305 def __iter__(self): 1306 return self 1307 def next(self): 1308 return self._callmethod('next') 1309 def __next__(self): 1310 return self._callmethod('__next__') 1311 1312 class MyManager(BaseManager): 1313 pass 1314 1315 MyManager.register('Foo', callable=FooBar) 1316 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 1317 MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 1318 1319 1320 class _TestMyManager(BaseTestCase): 1321 1322 ALLOWED_TYPES = ('manager',) 1323 1324 def test_mymanager(self): 1325 manager = MyManager() 1326 manager.start() 1327 1328 foo = manager.Foo() 1329 bar = manager.Bar() 1330 baz = manager.baz() 1331 1332 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 1333 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 1334 1335 self.assertEqual(foo_methods, ['f', 'g']) 1336 self.assertEqual(bar_methods, ['f', '_h']) 1337 1338 self.assertEqual(foo.f(), 'f()') 1339 self.assertRaises(ValueError, foo.g) 1340 self.assertEqual(foo._callmethod('f'), 'f()') 1341 self.assertRaises(RemoteError, foo._callmethod, '_h') 1342 1343 self.assertEqual(bar.f(), 'f()') 1344 self.assertEqual(bar._h(), '_h()') 1345 self.assertEqual(bar._callmethod('f'), 'f()') 1346 self.assertEqual(bar._callmethod('_h'), '_h()') 1347 1348 self.assertEqual(list(baz), [i*i for i in range(10)]) 1349 1350 manager.shutdown() 1351 1352 # 1353 # Test of connecting to a remote server and using xmlrpclib for serialization 1354 # 1355 1356 _queue = Queue.Queue() 1357 def get_queue(): 1358 return _queue 1359 1360 class QueueManager(BaseManager): 1361 '''manager class used by server process''' 1362 QueueManager.register('get_queue', callable=get_queue) 1363 1364 class QueueManager2(BaseManager): 1365 '''manager class which specifies the same interface as QueueManager''' 1366 QueueManager2.register('get_queue') 1367 1368 1369 SERIALIZER = 'xmlrpclib' 1370 1371 class _TestRemoteManager(BaseTestCase): 1372 1373 ALLOWED_TYPES = ('manager',) 1374 1375 @classmethod 1376 def _putter(cls, address, authkey): 1377 manager = QueueManager2( 1378 address=address, authkey=authkey, serializer=SERIALIZER 1379 ) 1380 manager.connect() 1381 queue = manager.get_queue() 1382 queue.put(('hello world', None, True, 2.25)) 1383 1384 def test_remote(self): 1385 authkey = os.urandom(32) 1386 1387 manager = QueueManager( 1388 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER 1389 ) 1390 manager.start() 1391 1392 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1393 p.daemon = True 1394 p.start() 1395 1396 manager2 = QueueManager2( 1397 address=manager.address, authkey=authkey, serializer=SERIALIZER 1398 ) 1399 manager2.connect() 1400 queue = manager2.get_queue() 1401 1402 # Note that xmlrpclib will deserialize object as a list not a tuple 1403 self.assertEqual(queue.get(), ['hello world', None, True, 2.25]) 1404 1405 # Because we are using xmlrpclib for serialization instead of 1406 # pickle this will cause a serialization error. 1407 self.assertRaises(Exception, queue.put, time.sleep) 1408 1409 # Make queue finalizer run before the server is stopped 1410 del queue 1411 manager.shutdown() 1412 1413 class _TestManagerRestart(BaseTestCase): 1414 1415 @classmethod 1416 def _putter(cls, address, authkey): 1417 manager = QueueManager( 1418 address=address, authkey=authkey, serializer=SERIALIZER) 1419 manager.connect() 1420 queue = manager.get_queue() 1421 queue.put('hello world') 1422 1423 def test_rapid_restart(self): 1424 authkey = os.urandom(32) 1425 manager = QueueManager( 1426 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER) 1427 srvr = manager.get_server() 1428 addr = srvr.address 1429 # Close the connection.Listener socket which gets opened as a part 1430 # of manager.get_server(). It's not needed for the test. 1431 srvr.listener.close() 1432 manager.start() 1433 1434 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1435 p.daemon = True 1436 p.start() 1437 queue = manager.get_queue() 1438 self.assertEqual(queue.get(), 'hello world') 1439 del queue 1440 manager.shutdown() 1441 manager = QueueManager( 1442 address=addr, authkey=authkey, serializer=SERIALIZER) 1443 manager.start() 1444 manager.shutdown() 1445 1446 # 1447 # 1448 # 1449 1450 SENTINEL = latin('') 1451 1452 class _TestConnection(BaseTestCase): 1453 1454 ALLOWED_TYPES = ('processes', 'threads') 1455 1456 @classmethod 1457 def _echo(cls, conn): 1458 for msg in iter(conn.recv_bytes, SENTINEL): 1459 conn.send_bytes(msg) 1460 conn.close() 1461 1462 def test_connection(self): 1463 conn, child_conn = self.Pipe() 1464 1465 p = self.Process(target=self._echo, args=(child_conn,)) 1466 p.daemon = True 1467 p.start() 1468 1469 seq = [1, 2.25, None] 1470 msg = latin('hello world') 1471 longmsg = msg * 10 1472 arr = array.array('i', range(4)) 1473 1474 if self.TYPE == 'processes': 1475 self.assertEqual(type(conn.fileno()), int) 1476 1477 self.assertEqual(conn.send(seq), None) 1478 self.assertEqual(conn.recv(), seq) 1479 1480 self.assertEqual(conn.send_bytes(msg), None) 1481 self.assertEqual(conn.recv_bytes(), msg) 1482 1483 if self.TYPE == 'processes': 1484 buffer = array.array('i', [0]*10) 1485 expected = list(arr) + [0] * (10 - len(arr)) 1486 self.assertEqual(conn.send_bytes(arr), None) 1487 self.assertEqual(conn.recv_bytes_into(buffer), 1488 len(arr) * buffer.itemsize) 1489 self.assertEqual(list(buffer), expected) 1490 1491 buffer = array.array('i', [0]*10) 1492 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 1493 self.assertEqual(conn.send_bytes(arr), None) 1494 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 1495 len(arr) * buffer.itemsize) 1496 self.assertEqual(list(buffer), expected) 1497 1498 buffer = bytearray(latin(' ' * 40)) 1499 self.assertEqual(conn.send_bytes(longmsg), None) 1500 try: 1501 res = conn.recv_bytes_into(buffer) 1502 except multiprocessing.BufferTooShort, e: 1503 self.assertEqual(e.args, (longmsg,)) 1504 else: 1505 self.fail('expected BufferTooShort, got %s' % res) 1506 1507 poll = TimingWrapper(conn.poll) 1508 1509 self.assertEqual(poll(), False) 1510 self.assertTimingAlmostEqual(poll.elapsed, 0) 1511 1512 self.assertEqual(poll(TIMEOUT1), False) 1513 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 1514 1515 conn.send(None) 1516 time.sleep(.1) 1517 1518 self.assertEqual(poll(TIMEOUT1), True) 1519 self.assertTimingAlmostEqual(poll.elapsed, 0) 1520 1521 self.assertEqual(conn.recv(), None) 1522 1523 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 1524 conn.send_bytes(really_big_msg) 1525 self.assertEqual(conn.recv_bytes(), really_big_msg) 1526 1527 conn.send_bytes(SENTINEL) # tell child to quit 1528 child_conn.close() 1529 1530 if self.TYPE == 'processes': 1531 self.assertEqual(conn.readable, True) 1532 self.assertEqual(conn.writable, True) 1533 self.assertRaises(EOFError, conn.recv) 1534 self.assertRaises(EOFError, conn.recv_bytes) 1535 1536 p.join() 1537 1538 def test_duplex_false(self): 1539 reader, writer = self.Pipe(duplex=False) 1540 self.assertEqual(writer.send(1), None) 1541 self.assertEqual(reader.recv(), 1) 1542 if self.TYPE == 'processes': 1543 self.assertEqual(reader.readable, True) 1544 self.assertEqual(reader.writable, False) 1545 self.assertEqual(writer.readable, False) 1546 self.assertEqual(writer.writable, True) 1547 self.assertRaises(IOError, reader.send, 2) 1548 self.assertRaises(IOError, writer.recv) 1549 self.assertRaises(IOError, writer.poll) 1550 1551 def test_spawn_close(self): 1552 # We test that a pipe connection can be closed by parent 1553 # process immediately after child is spawned. On Windows this 1554 # would have sometimes failed on old versions because 1555 # child_conn would be closed before the child got a chance to 1556 # duplicate it. 1557 conn, child_conn = self.Pipe() 1558 1559 p = self.Process(target=self._echo, args=(child_conn,)) 1560 p.daemon = True 1561 p.start() 1562 child_conn.close() # this might complete before child initializes 1563 1564 msg = latin('hello') 1565 conn.send_bytes(msg) 1566 self.assertEqual(conn.recv_bytes(), msg) 1567 1568 conn.send_bytes(SENTINEL) 1569 conn.close() 1570 p.join() 1571 1572 def test_sendbytes(self): 1573 if self.TYPE != 'processes': 1574 return 1575 1576 msg = latin('abcdefghijklmnopqrstuvwxyz') 1577 a, b = self.Pipe() 1578 1579 a.send_bytes(msg) 1580 self.assertEqual(b.recv_bytes(), msg) 1581 1582 a.send_bytes(msg, 5) 1583 self.assertEqual(b.recv_bytes(), msg[5:]) 1584 1585 a.send_bytes(msg, 7, 8) 1586 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 1587 1588 a.send_bytes(msg, 26) 1589 self.assertEqual(b.recv_bytes(), latin('')) 1590 1591 a.send_bytes(msg, 26, 0) 1592 self.assertEqual(b.recv_bytes(), latin('')) 1593 1594 self.assertRaises(ValueError, a.send_bytes, msg, 27) 1595 1596 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 1597 1598 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 1599 1600 self.assertRaises(ValueError, a.send_bytes, msg, -1) 1601 1602 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 1603 1604 @classmethod 1605 def _is_fd_assigned(cls, fd): 1606 try: 1607 os.fstat(fd) 1608 except OSError as e: 1609 if e.errno == errno.EBADF: 1610 return False 1611 raise 1612 else: 1613 return True 1614 1615 @classmethod 1616 def _writefd(cls, conn, data, create_dummy_fds=False): 1617 if create_dummy_fds: 1618 for i in range(0, 256): 1619 if not cls._is_fd_assigned(i): 1620 os.dup2(conn.fileno(), i) 1621 fd = reduction.recv_handle(conn) 1622 if msvcrt: 1623 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 1624 os.write(fd, data) 1625 os.close(fd) 1626 1627 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1628 def test_fd_transfer(self): 1629 if self.TYPE != 'processes': 1630 self.skipTest("only makes sense with processes") 1631 conn, child_conn = self.Pipe(duplex=True) 1632 1633 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 1634 p.daemon = True 1635 p.start() 1636 with open(test_support.TESTFN, "wb") as f: 1637 fd = f.fileno() 1638 if msvcrt: 1639 fd = msvcrt.get_osfhandle(fd) 1640 reduction.send_handle(conn, fd, p.pid) 1641 p.join() 1642 with open(test_support.TESTFN, "rb") as f: 1643 self.assertEqual(f.read(), b"foo") 1644 1645 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1646 @unittest.skipIf(sys.platform == "win32", 1647 "test semantics don't make sense on Windows") 1648 @unittest.skipIf(MAXFD <= 256, 1649 "largest assignable fd number is too small") 1650 @unittest.skipUnless(hasattr(os, "dup2"), 1651 "test needs os.dup2()") 1652 def test_large_fd_transfer(self): 1653 # With fd > 256 (issue #11657) 1654 if self.TYPE != 'processes': 1655 self.skipTest("only makes sense with processes") 1656 conn, child_conn = self.Pipe(duplex=True) 1657 1658 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 1659 p.daemon = True 1660 p.start() 1661 with open(test_support.TESTFN, "wb") as f: 1662 fd = f.fileno() 1663 for newfd in range(256, MAXFD): 1664 if not self._is_fd_assigned(newfd): 1665 break 1666 else: 1667 self.fail("could not find an unassigned large file descriptor") 1668 os.dup2(fd, newfd) 1669 try: 1670 reduction.send_handle(conn, newfd, p.pid) 1671 finally: 1672 os.close(newfd) 1673 p.join() 1674 with open(test_support.TESTFN, "rb") as f: 1675 self.assertEqual(f.read(), b"bar") 1676 1677 @classmethod 1678 def _send_data_without_fd(self, conn): 1679 os.write(conn.fileno(), b"\0") 1680 1681 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 1682 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 1683 def test_missing_fd_transfer(self): 1684 # Check that exception is raised when received data is not 1685 # accompanied by a file descriptor in ancillary data. 1686 if self.TYPE != 'processes': 1687 self.skipTest("only makes sense with processes") 1688 conn, child_conn = self.Pipe(duplex=True) 1689 1690 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 1691 p.daemon = True 1692 p.start() 1693 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 1694 p.join() 1695 1696 class _TestListenerClient(BaseTestCase): 1697 1698 ALLOWED_TYPES = ('processes', 'threads') 1699 1700 @classmethod 1701 def _test(cls, address): 1702 conn = cls.connection.Client(address) 1703 conn.send('hello') 1704 conn.close() 1705 1706 def test_listener_client(self): 1707 for family in self.connection.families: 1708 l = self.connection.Listener(family=family) 1709 p = self.Process(target=self._test, args=(l.address,)) 1710 p.daemon = True 1711 p.start() 1712 conn = l.accept() 1713 self.assertEqual(conn.recv(), 'hello') 1714 p.join() 1715 l.close() 1716 1717 def test_issue14725(self): 1718 l = self.connection.Listener() 1719 p = self.Process(target=self._test, args=(l.address,)) 1720 p.daemon = True 1721 p.start() 1722 time.sleep(1) 1723 # On Windows the client process should by now have connected, 1724 # written data and closed the pipe handle by now. This causes 1725 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 1726 # 14725. 1727 conn = l.accept() 1728 self.assertEqual(conn.recv(), 'hello') 1729 conn.close() 1730 p.join() 1731 l.close() 1732 1733 # 1734 # Test of sending connection and socket objects between processes 1735 # 1736 """ 1737 class _TestPicklingConnections(BaseTestCase): 1738 1739 ALLOWED_TYPES = ('processes',) 1740 1741 def _listener(self, conn, families): 1742 for fam in families: 1743 l = self.connection.Listener(family=fam) 1744 conn.send(l.address) 1745 new_conn = l.accept() 1746 conn.send(new_conn) 1747 1748 if self.TYPE == 'processes': 1749 l = socket.socket() 1750 l.bind(('localhost', 0)) 1751 conn.send(l.getsockname()) 1752 l.listen(1) 1753 new_conn, addr = l.accept() 1754 conn.send(new_conn) 1755 1756 conn.recv() 1757 1758 def _remote(self, conn): 1759 for (address, msg) in iter(conn.recv, None): 1760 client = self.connection.Client(address) 1761 client.send(msg.upper()) 1762 client.close() 1763 1764 if self.TYPE == 'processes': 1765 address, msg = conn.recv() 1766 client = socket.socket() 1767 client.connect(address) 1768 client.sendall(msg.upper()) 1769 client.close() 1770 1771 conn.close() 1772 1773 def test_pickling(self): 1774 try: 1775 multiprocessing.allow_connection_pickling() 1776 except ImportError: 1777 return 1778 1779 families = self.connection.families 1780 1781 lconn, lconn0 = self.Pipe() 1782 lp = self.Process(target=self._listener, args=(lconn0, families)) 1783 lp.daemon = True 1784 lp.start() 1785 lconn0.close() 1786 1787 rconn, rconn0 = self.Pipe() 1788 rp = self.Process(target=self._remote, args=(rconn0,)) 1789 rp.daemon = True 1790 rp.start() 1791 rconn0.close() 1792 1793 for fam in families: 1794 msg = ('This connection uses family %s' % fam).encode('ascii') 1795 address = lconn.recv() 1796 rconn.send((address, msg)) 1797 new_conn = lconn.recv() 1798 self.assertEqual(new_conn.recv(), msg.upper()) 1799 1800 rconn.send(None) 1801 1802 if self.TYPE == 'processes': 1803 msg = latin('This connection uses a normal socket') 1804 address = lconn.recv() 1805 rconn.send((address, msg)) 1806 if hasattr(socket, 'fromfd'): 1807 new_conn = lconn.recv() 1808 self.assertEqual(new_conn.recv(100), msg.upper()) 1809 else: 1810 # XXX On Windows with Py2.6 need to backport fromfd() 1811 discard = lconn.recv_bytes() 1812 1813 lconn.send(None) 1814 1815 rconn.close() 1816 lconn.close() 1817 1818 lp.join() 1819 rp.join() 1820 """ 1821 # 1822 # 1823 # 1824 1825 class _TestHeap(BaseTestCase): 1826 1827 ALLOWED_TYPES = ('processes',) 1828 1829 def test_heap(self): 1830 iterations = 5000 1831 maxblocks = 50 1832 blocks = [] 1833 1834 # create and destroy lots of blocks of different sizes 1835 for i in xrange(iterations): 1836 size = int(random.lognormvariate(0, 1) * 1000) 1837 b = multiprocessing.heap.BufferWrapper(size) 1838 blocks.append(b) 1839 if len(blocks) > maxblocks: 1840 i = random.randrange(maxblocks) 1841 del blocks[i] 1842 1843 # get the heap object 1844 heap = multiprocessing.heap.BufferWrapper._heap 1845 1846 # verify the state of the heap 1847 all = [] 1848 occupied = 0 1849 heap._lock.acquire() 1850 self.addCleanup(heap._lock.release) 1851 for L in heap._len_to_seq.values(): 1852 for arena, start, stop in L: 1853 all.append((heap._arenas.index(arena), start, stop, 1854 stop-start, 'free')) 1855 for arena, start, stop in heap._allocated_blocks: 1856 all.append((heap._arenas.index(arena), start, stop, 1857 stop-start, 'occupied')) 1858 occupied += (stop-start) 1859 1860 all.sort() 1861 1862 for i in range(len(all)-1): 1863 (arena, start, stop) = all[i][:3] 1864 (narena, nstart, nstop) = all[i+1][:3] 1865 self.assertTrue((arena != narena and nstart == 0) or 1866 (stop == nstart)) 1867 1868 def test_free_from_gc(self): 1869 # Check that freeing of blocks by the garbage collector doesn't deadlock 1870 # (issue #12352). 1871 # Make sure the GC is enabled, and set lower collection thresholds to 1872 # make collections more frequent (and increase the probability of 1873 # deadlock). 1874 if not gc.isenabled(): 1875 gc.enable() 1876 self.addCleanup(gc.disable) 1877 thresholds = gc.get_threshold() 1878 self.addCleanup(gc.set_threshold, *thresholds) 1879 gc.set_threshold(10) 1880 1881 # perform numerous block allocations, with cyclic references to make 1882 # sure objects are collected asynchronously by the gc 1883 for i in range(5000): 1884 a = multiprocessing.heap.BufferWrapper(1) 1885 b = multiprocessing.heap.BufferWrapper(1) 1886 # circular references 1887 a.buddy = b 1888 b.buddy = a 1889 1890 # 1891 # 1892 # 1893 1894 class _Foo(Structure): 1895 _fields_ = [ 1896 ('x', c_int), 1897 ('y', c_double) 1898 ] 1899 1900 class _TestSharedCTypes(BaseTestCase): 1901 1902 ALLOWED_TYPES = ('processes',) 1903 1904 def setUp(self): 1905 if not HAS_SHAREDCTYPES: 1906 self.skipTest("requires multiprocessing.sharedctypes") 1907 1908 @classmethod 1909 def _double(cls, x, y, foo, arr, string): 1910 x.value *= 2 1911 y.value *= 2 1912 foo.x *= 2 1913 foo.y *= 2 1914 string.value *= 2 1915 for i in range(len(arr)): 1916 arr[i] *= 2 1917 1918 def test_sharedctypes(self, lock=False): 1919 x = Value('i', 7, lock=lock) 1920 y = Value(c_double, 1.0/3.0, lock=lock) 1921 foo = Value(_Foo, 3, 2, lock=lock) 1922 arr = self.Array('d', range(10), lock=lock) 1923 string = self.Array('c', 20, lock=lock) 1924 string.value = latin('hello') 1925 1926 p = self.Process(target=self._double, args=(x, y, foo, arr, string)) 1927 p.daemon = True 1928 p.start() 1929 p.join() 1930 1931 self.assertEqual(x.value, 14) 1932 self.assertAlmostEqual(y.value, 2.0/3.0) 1933 self.assertEqual(foo.x, 6) 1934 self.assertAlmostEqual(foo.y, 4.0) 1935 for i in range(10): 1936 self.assertAlmostEqual(arr[i], i*2) 1937 self.assertEqual(string.value, latin('hellohello')) 1938 1939 def test_synchronize(self): 1940 self.test_sharedctypes(lock=True) 1941 1942 def test_copy(self): 1943 foo = _Foo(2, 5.0) 1944 bar = copy(foo) 1945 foo.x = 0 1946 foo.y = 0 1947 self.assertEqual(bar.x, 2) 1948 self.assertAlmostEqual(bar.y, 5.0) 1949 1950 # 1951 # 1952 # 1953 1954 class _TestFinalize(BaseTestCase): 1955 1956 ALLOWED_TYPES = ('processes',) 1957 1958 @classmethod 1959 def _test_finalize(cls, conn): 1960 class Foo(object): 1961 pass 1962 1963 a = Foo() 1964 util.Finalize(a, conn.send, args=('a',)) 1965 del a # triggers callback for a 1966 1967 b = Foo() 1968 close_b = util.Finalize(b, conn.send, args=('b',)) 1969 close_b() # triggers callback for b 1970 close_b() # does nothing because callback has already been called 1971 del b # does nothing because callback has already been called 1972 1973 c = Foo() 1974 util.Finalize(c, conn.send, args=('c',)) 1975 1976 d10 = Foo() 1977 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 1978 1979 d01 = Foo() 1980 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 1981 d02 = Foo() 1982 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 1983 d03 = Foo() 1984 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 1985 1986 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 1987 1988 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 1989 1990 # call multiprocessing's cleanup function then exit process without 1991 # garbage collecting locals 1992 util._exit_function() 1993 conn.close() 1994 os._exit(0) 1995 1996 def test_finalize(self): 1997 conn, child_conn = self.Pipe() 1998 1999 p = self.Process(target=self._test_finalize, args=(child_conn,)) 2000 p.daemon = True 2001 p.start() 2002 p.join() 2003 2004 result = [obj for obj in iter(conn.recv, 'STOP')] 2005 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 2006 2007 # 2008 # Test that from ... import * works for each module 2009 # 2010 2011 class _TestImportStar(BaseTestCase): 2012 2013 ALLOWED_TYPES = ('processes',) 2014 2015 def test_import(self): 2016 modules = [ 2017 'multiprocessing', 'multiprocessing.connection', 2018 'multiprocessing.heap', 'multiprocessing.managers', 2019 'multiprocessing.pool', 'multiprocessing.process', 2020 'multiprocessing.synchronize', 'multiprocessing.util' 2021 ] 2022 2023 if HAS_REDUCTION: 2024 modules.append('multiprocessing.reduction') 2025 2026 if c_int is not None: 2027 # This module requires _ctypes 2028 modules.append('multiprocessing.sharedctypes') 2029 2030 for name in modules: 2031 __import__(name) 2032 mod = sys.modules[name] 2033 2034 for attr in getattr(mod, '__all__', ()): 2035 self.assertTrue( 2036 hasattr(mod, attr), 2037 '%r does not have attribute %r' % (mod, attr) 2038 ) 2039 2040 # 2041 # Quick test that logging works -- does not test logging output 2042 # 2043 2044 class _TestLogging(BaseTestCase): 2045 2046 ALLOWED_TYPES = ('processes',) 2047 2048 def test_enable_logging(self): 2049 logger = multiprocessing.get_logger() 2050 logger.setLevel(util.SUBWARNING) 2051 self.assertTrue(logger is not None) 2052 logger.debug('this will not be printed') 2053 logger.info('nor will this') 2054 logger.setLevel(LOG_LEVEL) 2055 2056 @classmethod 2057 def _test_level(cls, conn): 2058 logger = multiprocessing.get_logger() 2059 conn.send(logger.getEffectiveLevel()) 2060 2061 def test_level(self): 2062 LEVEL1 = 32 2063 LEVEL2 = 37 2064 2065 logger = multiprocessing.get_logger() 2066 root_logger = logging.getLogger() 2067 root_level = root_logger.level 2068 2069 reader, writer = multiprocessing.Pipe(duplex=False) 2070 2071 logger.setLevel(LEVEL1) 2072 p = self.Process(target=self._test_level, args=(writer,)) 2073 p.daemon = True 2074 p.start() 2075 self.assertEqual(LEVEL1, reader.recv()) 2076 2077 logger.setLevel(logging.NOTSET) 2078 root_logger.setLevel(LEVEL2) 2079 p = self.Process(target=self._test_level, args=(writer,)) 2080 p.daemon = True 2081 p.start() 2082 self.assertEqual(LEVEL2, reader.recv()) 2083 2084 root_logger.setLevel(root_level) 2085 logger.setLevel(level=LOG_LEVEL) 2086 2087 2088 # class _TestLoggingProcessName(BaseTestCase): 2089 # 2090 # def handle(self, record): 2091 # assert record.processName == multiprocessing.current_process().name 2092 # self.__handled = True 2093 # 2094 # def test_logging(self): 2095 # handler = logging.Handler() 2096 # handler.handle = self.handle 2097 # self.__handled = False 2098 # # Bypass getLogger() and side-effects 2099 # logger = logging.getLoggerClass()( 2100 # 'multiprocessing.test.TestLoggingProcessName') 2101 # logger.addHandler(handler) 2102 # logger.propagate = False 2103 # 2104 # logger.warn('foo') 2105 # assert self.__handled 2106 2107 # 2108 # Check that Process.join() retries if os.waitpid() fails with EINTR 2109 # 2110 2111 class _TestPollEintr(BaseTestCase): 2112 2113 ALLOWED_TYPES = ('processes',) 2114 2115 @classmethod 2116 def _killer(cls, pid): 2117 time.sleep(0.5) 2118 os.kill(pid, signal.SIGUSR1) 2119 2120 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2121 def test_poll_eintr(self): 2122 got_signal = [False] 2123 def record(*args): 2124 got_signal[0] = True 2125 pid = os.getpid() 2126 oldhandler = signal.signal(signal.SIGUSR1, record) 2127 try: 2128 killer = self.Process(target=self._killer, args=(pid,)) 2129 killer.start() 2130 p = self.Process(target=time.sleep, args=(1,)) 2131 p.start() 2132 p.join() 2133 self.assertTrue(got_signal[0]) 2134 self.assertEqual(p.exitcode, 0) 2135 killer.join() 2136 finally: 2137 signal.signal(signal.SIGUSR1, oldhandler) 2138 2139 # 2140 # Test to verify handle verification, see issue 3321 2141 # 2142 2143 class TestInvalidHandle(unittest.TestCase): 2144 2145 @unittest.skipIf(WIN32, "skipped on Windows") 2146 def test_invalid_handles(self): 2147 conn = _multiprocessing.Connection(44977608) 2148 self.assertRaises(IOError, conn.poll) 2149 self.assertRaises(IOError, _multiprocessing.Connection, -1) 2150 2151 # 2152 # Functions used to create test cases from the base ones in this module 2153 # 2154 2155 def get_attributes(Source, names): 2156 d = {} 2157 for name in names: 2158 obj = getattr(Source, name) 2159 if type(obj) == type(get_attributes): 2160 obj = staticmethod(obj) 2161 d[name] = obj 2162 return d 2163 2164 def create_test_cases(Mixin, type): 2165 result = {} 2166 glob = globals() 2167 Type = type.capitalize() 2168 2169 for name in glob.keys(): 2170 if name.startswith('_Test'): 2171 base = glob[name] 2172 if type in base.ALLOWED_TYPES: 2173 newname = 'With' + Type + name[1:] 2174 class Temp(base, unittest.TestCase, Mixin): 2175 pass 2176 result[newname] = Temp 2177 Temp.__name__ = newname 2178 Temp.__module__ = Mixin.__module__ 2179 return result 2180 2181 # 2182 # Create test cases 2183 # 2184 2185 class ProcessesMixin(object): 2186 TYPE = 'processes' 2187 Process = multiprocessing.Process 2188 locals().update(get_attributes(multiprocessing, ( 2189 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 2190 'Condition', 'Event', 'Value', 'Array', 'RawValue', 2191 'RawArray', 'current_process', 'active_children', 'Pipe', 2192 'connection', 'JoinableQueue', 'Pool' 2193 ))) 2194 2195 testcases_processes = create_test_cases(ProcessesMixin, type='processes') 2196 globals().update(testcases_processes) 2197 2198 2199 class ManagerMixin(object): 2200 TYPE = 'manager' 2201 Process = multiprocessing.Process 2202 manager = object.__new__(multiprocessing.managers.SyncManager) 2203 locals().update(get_attributes(manager, ( 2204 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 2205 'Condition', 'Event', 'Value', 'Array', 'list', 'dict', 2206 'Namespace', 'JoinableQueue', 'Pool' 2207 ))) 2208 2209 testcases_manager = create_test_cases(ManagerMixin, type='manager') 2210 globals().update(testcases_manager) 2211 2212 2213 class ThreadsMixin(object): 2214 TYPE = 'threads' 2215 Process = multiprocessing.dummy.Process 2216 locals().update(get_attributes(multiprocessing.dummy, ( 2217 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 2218 'Condition', 'Event', 'Value', 'Array', 'current_process', 2219 'active_children', 'Pipe', 'connection', 'dict', 'list', 2220 'Namespace', 'JoinableQueue', 'Pool' 2221 ))) 2222 2223 testcases_threads = create_test_cases(ThreadsMixin, type='threads') 2224 globals().update(testcases_threads) 2225 2226 class OtherTest(unittest.TestCase): 2227 # TODO: add more tests for deliver/answer challenge. 2228 def test_deliver_challenge_auth_failure(self): 2229 class _FakeConnection(object): 2230 def recv_bytes(self, size): 2231 return b'something bogus' 2232 def send_bytes(self, data): 2233 pass 2234 self.assertRaises(multiprocessing.AuthenticationError, 2235 multiprocessing.connection.deliver_challenge, 2236 _FakeConnection(), b'abc') 2237 2238 def test_answer_challenge_auth_failure(self): 2239 class _FakeConnection(object): 2240 def __init__(self): 2241 self.count = 0 2242 def recv_bytes(self, size): 2243 self.count += 1 2244 if self.count == 1: 2245 return multiprocessing.connection.CHALLENGE 2246 elif self.count == 2: 2247 return b'something bogus' 2248 return b'' 2249 def send_bytes(self, data): 2250 pass 2251 self.assertRaises(multiprocessing.AuthenticationError, 2252 multiprocessing.connection.answer_challenge, 2253 _FakeConnection(), b'abc') 2254 2255 # 2256 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 2257 # 2258 2259 def initializer(ns): 2260 ns.test += 1 2261 2262 class TestInitializers(unittest.TestCase): 2263 def setUp(self): 2264 self.mgr = multiprocessing.Manager() 2265 self.ns = self.mgr.Namespace() 2266 self.ns.test = 0 2267 2268 def tearDown(self): 2269 self.mgr.shutdown() 2270 2271 def test_manager_initializer(self): 2272 m = multiprocessing.managers.SyncManager() 2273 self.assertRaises(TypeError, m.start, 1) 2274 m.start(initializer, (self.ns,)) 2275 self.assertEqual(self.ns.test, 1) 2276 m.shutdown() 2277 2278 def test_pool_initializer(self): 2279 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 2280 p = multiprocessing.Pool(1, initializer, (self.ns,)) 2281 p.close() 2282 p.join() 2283 self.assertEqual(self.ns.test, 1) 2284 2285 # 2286 # Issue 5155, 5313, 5331: Test process in processes 2287 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 2288 # 2289 2290 def _ThisSubProcess(q): 2291 try: 2292 item = q.get(block=False) 2293 except Queue.Empty: 2294 pass 2295 2296 def _TestProcess(q): 2297 queue = multiprocessing.Queue() 2298 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,)) 2299 subProc.daemon = True 2300 subProc.start() 2301 subProc.join() 2302 2303 def _afunc(x): 2304 return x*x 2305 2306 def pool_in_process(): 2307 pool = multiprocessing.Pool(processes=4) 2308 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 2309 2310 class _file_like(object): 2311 def __init__(self, delegate): 2312 self._delegate = delegate 2313 self._pid = None 2314 2315 @property 2316 def cache(self): 2317 pid = os.getpid() 2318 # There are no race conditions since fork keeps only the running thread 2319 if pid != self._pid: 2320 self._pid = pid 2321 self._cache = [] 2322 return self._cache 2323 2324 def write(self, data): 2325 self.cache.append(data) 2326 2327 def flush(self): 2328 self._delegate.write(''.join(self.cache)) 2329 self._cache = [] 2330 2331 class TestStdinBadfiledescriptor(unittest.TestCase): 2332 2333 def test_queue_in_process(self): 2334 queue = multiprocessing.Queue() 2335 proc = multiprocessing.Process(target=_TestProcess, args=(queue,)) 2336 proc.start() 2337 proc.join() 2338 2339 def test_pool_in_process(self): 2340 p = multiprocessing.Process(target=pool_in_process) 2341 p.start() 2342 p.join() 2343 2344 def test_flushing(self): 2345 sio = StringIO() 2346 flike = _file_like(sio) 2347 flike.write('foo') 2348 proc = multiprocessing.Process(target=lambda: flike.flush()) 2349 flike.flush() 2350 assert sio.getvalue() == 'foo' 2351 2352 # 2353 # Test interaction with socket timeouts - see Issue #6056 2354 # 2355 2356 class TestTimeouts(unittest.TestCase): 2357 @classmethod 2358 def _test_timeout(cls, child, address): 2359 time.sleep(1) 2360 child.send(123) 2361 child.close() 2362 conn = multiprocessing.connection.Client(address) 2363 conn.send(456) 2364 conn.close() 2365 2366 def test_timeout(self): 2367 old_timeout = socket.getdefaulttimeout() 2368 try: 2369 socket.setdefaulttimeout(0.1) 2370 parent, child = multiprocessing.Pipe(duplex=True) 2371 l = multiprocessing.connection.Listener(family='AF_INET') 2372 p = multiprocessing.Process(target=self._test_timeout, 2373 args=(child, l.address)) 2374 p.start() 2375 child.close() 2376 self.assertEqual(parent.recv(), 123) 2377 parent.close() 2378 conn = l.accept() 2379 self.assertEqual(conn.recv(), 456) 2380 conn.close() 2381 l.close() 2382 p.join(10) 2383 finally: 2384 socket.setdefaulttimeout(old_timeout) 2385 2386 # 2387 # Test what happens with no "if __name__ == '__main__'" 2388 # 2389 2390 class TestNoForkBomb(unittest.TestCase): 2391 def test_noforkbomb(self): 2392 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 2393 if WIN32: 2394 rc, out, err = test.script_helper.assert_python_failure(name) 2395 self.assertEqual('', out.decode('ascii')) 2396 self.assertIn('RuntimeError', err.decode('ascii')) 2397 else: 2398 rc, out, err = test.script_helper.assert_python_ok(name) 2399 self.assertEqual('123', out.decode('ascii').rstrip()) 2400 self.assertEqual('', err.decode('ascii')) 2401 2402 # 2403 # Issue 12098: check sys.flags of child matches that for parent 2404 # 2405 2406 class TestFlags(unittest.TestCase): 2407 @classmethod 2408 def run_in_grandchild(cls, conn): 2409 conn.send(tuple(sys.flags)) 2410 2411 @classmethod 2412 def run_in_child(cls): 2413 import json 2414 r, w = multiprocessing.Pipe(duplex=False) 2415 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 2416 p.start() 2417 grandchild_flags = r.recv() 2418 p.join() 2419 r.close() 2420 w.close() 2421 flags = (tuple(sys.flags), grandchild_flags) 2422 print(json.dumps(flags)) 2423 2424 def test_flags(self): 2425 import json, subprocess 2426 # start child process using unusual flags 2427 prog = ('from test.test_multiprocessing import TestFlags; ' + 2428 'TestFlags.run_in_child()') 2429 data = subprocess.check_output( 2430 [sys.executable, '-E', '-B', '-O', '-c', prog]) 2431 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 2432 self.assertEqual(child_flags, grandchild_flags) 2433 2434 # 2435 # Issue #17555: ForkAwareThreadLock 2436 # 2437 2438 class TestForkAwareThreadLock(unittest.TestCase): 2439 # We recurisvely start processes. Issue #17555 meant that the 2440 # after fork registry would get duplicate entries for the same 2441 # lock. The size of the registry at generation n was ~2**n. 2442 2443 @classmethod 2444 def child(cls, n, conn): 2445 if n > 1: 2446 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 2447 p.start() 2448 p.join() 2449 else: 2450 conn.send(len(util._afterfork_registry)) 2451 conn.close() 2452 2453 def test_lock(self): 2454 r, w = multiprocessing.Pipe(False) 2455 l = util.ForkAwareThreadLock() 2456 old_size = len(util._afterfork_registry) 2457 p = multiprocessing.Process(target=self.child, args=(5, w)) 2458 p.start() 2459 new_size = r.recv() 2460 p.join() 2461 self.assertLessEqual(new_size, old_size) 2462 2463 # 2464 # 2465 # 2466 2467 testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, 2468 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb, 2469 TestFlags, TestForkAwareThreadLock] 2470 2471 # 2472 # 2473 # 2474 2475 def test_main(run=None): 2476 if sys.platform.startswith("linux"): 2477 try: 2478 lock = multiprocessing.RLock() 2479 except OSError: 2480 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!") 2481 2482 check_enough_semaphores() 2483 2484 if run is None: 2485 from test.test_support import run_unittest as run 2486 2487 util.get_temp_dir() # creates temp directory for use by all processes 2488 2489 multiprocessing.get_logger().setLevel(LOG_LEVEL) 2490 2491 ProcessesMixin.pool = multiprocessing.Pool(4) 2492 ThreadsMixin.pool = multiprocessing.dummy.Pool(4) 2493 ManagerMixin.manager.__init__() 2494 ManagerMixin.manager.start() 2495 ManagerMixin.pool = ManagerMixin.manager.Pool(4) 2496 2497 testcases = ( 2498 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + 2499 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + 2500 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) + 2501 testcases_other 2502 ) 2503 2504 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase 2505 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) 2506 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading 2507 # module during these tests is at least platform dependent and possibly 2508 # non-deterministic on any given platform. So we don't mind if the listed 2509 # warnings aren't actually raised. 2510 with test_support.check_py3k_warnings( 2511 (".+__(get|set)slice__ has been removed", DeprecationWarning), 2512 (r"sys.exc_clear\(\) not supported", DeprecationWarning), 2513 quiet=True): 2514 run(suite) 2515 2516 ThreadsMixin.pool.terminate() 2517 ProcessesMixin.pool.terminate() 2518 ManagerMixin.pool.terminate() 2519 ManagerMixin.manager.shutdown() 2520 2521 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool 2522 2523 def main(): 2524 test_main(unittest.TextTestRunner(verbosity=2).run) 2525 2526 if __name__ == '__main__': 2527 main() 2528