1 #!/usr/bin/env python 2 3 # 4 # Unit tests for the multiprocessing package 5 # 6 7 import unittest 8 import Queue 9 import time 10 import sys 11 import os 12 import gc 13 import signal 14 import array 15 import socket 16 import random 17 import logging 18 from test import test_support 19 from StringIO import StringIO 20 _multiprocessing = test_support.import_module('_multiprocessing') 21 # import threading after _multiprocessing to raise a more relevant error 22 # message: "No module named _multiprocessing". _multiprocessing is not compiled 23 # without thread support. 24 import threading 25 26 # Work around broken sem_open implementations 27 test_support.import_module('multiprocessing.synchronize') 28 29 import multiprocessing.dummy 30 import multiprocessing.connection 31 import multiprocessing.managers 32 import multiprocessing.heap 33 import multiprocessing.pool 34 35 from multiprocessing import util 36 37 try: 38 from multiprocessing.sharedctypes import Value, copy 39 HAS_SHAREDCTYPES = True 40 except ImportError: 41 HAS_SHAREDCTYPES = False 42 43 # 44 # 45 # 46 47 latin = str 48 49 # 50 # Constants 51 # 52 53 LOG_LEVEL = util.SUBWARNING 54 #LOG_LEVEL = logging.DEBUG 55 56 DELTA = 0.1 57 CHECK_TIMINGS = False # making true makes tests take a lot longer 58 # and can sometimes cause some non-serious 59 # failures because some calls block a bit 60 # longer than expected 61 if CHECK_TIMINGS: 62 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 63 else: 64 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 65 66 HAVE_GETVALUE = not getattr(_multiprocessing, 67 'HAVE_BROKEN_SEM_GETVALUE', False) 68 69 WIN32 = (sys.platform == "win32") 70 71 # 72 # Some tests require ctypes 73 # 74 75 try: 76 from ctypes import Structure, c_int, c_double 77 except ImportError: 78 Structure = object 79 c_int = c_double = None 80 81 # 82 # Creates a wrapper for a function which records the time it takes to finish 83 # 84 85 class TimingWrapper(object): 86 87 def __init__(self, func): 88 self.func = func 89 self.elapsed = None 90 91 def __call__(self, *args, **kwds): 92 t = time.time() 93 try: 94 return self.func(*args, **kwds) 95 finally: 96 self.elapsed = time.time() - t 97 98 # 99 # Base class for test cases 100 # 101 102 class BaseTestCase(object): 103 104 ALLOWED_TYPES = ('processes', 'manager', 'threads') 105 106 def assertTimingAlmostEqual(self, a, b): 107 if CHECK_TIMINGS: 108 self.assertAlmostEqual(a, b, 1) 109 110 def assertReturnsIfImplemented(self, value, func, *args): 111 try: 112 res = func(*args) 113 except NotImplementedError: 114 pass 115 else: 116 return self.assertEqual(value, res) 117 118 # For the sanity of Windows users, rather than crashing or freezing in 119 # multiple ways. 120 def __reduce__(self, *args): 121 raise NotImplementedError("shouldn't try to pickle a test case") 122 123 __reduce_ex__ = __reduce__ 124 125 # 126 # Return the value of a semaphore 127 # 128 129 def get_value(self): 130 try: 131 return self.get_value() 132 except AttributeError: 133 try: 134 return self._Semaphore__value 135 except AttributeError: 136 try: 137 return self._value 138 except AttributeError: 139 raise NotImplementedError 140 141 # 142 # Testcases 143 # 144 145 class _TestProcess(BaseTestCase): 146 147 ALLOWED_TYPES = ('processes', 'threads') 148 149 def test_current(self): 150 if self.TYPE == 'threads': 151 return 152 153 current = self.current_process() 154 authkey = current.authkey 155 156 self.assertTrue(current.is_alive()) 157 self.assertTrue(not current.daemon) 158 self.assertIsInstance(authkey, bytes) 159 self.assertTrue(len(authkey) > 0) 160 self.assertEqual(current.ident, os.getpid()) 161 self.assertEqual(current.exitcode, None) 162 163 @classmethod 164 def _test(cls, q, *args, **kwds): 165 current = cls.current_process() 166 q.put(args) 167 q.put(kwds) 168 q.put(current.name) 169 if cls.TYPE != 'threads': 170 q.put(bytes(current.authkey)) 171 q.put(current.pid) 172 173 def test_process(self): 174 q = self.Queue(1) 175 e = self.Event() 176 args = (q, 1, 2) 177 kwargs = {'hello':23, 'bye':2.54} 178 name = 'SomeProcess' 179 p = self.Process( 180 target=self._test, args=args, kwargs=kwargs, name=name 181 ) 182 p.daemon = True 183 current = self.current_process() 184 185 if self.TYPE != 'threads': 186 self.assertEqual(p.authkey, current.authkey) 187 self.assertEqual(p.is_alive(), False) 188 self.assertEqual(p.daemon, True) 189 self.assertNotIn(p, self.active_children()) 190 self.assertTrue(type(self.active_children()) is list) 191 self.assertEqual(p.exitcode, None) 192 193 p.start() 194 195 self.assertEqual(p.exitcode, None) 196 self.assertEqual(p.is_alive(), True) 197 self.assertIn(p, self.active_children()) 198 199 self.assertEqual(q.get(), args[1:]) 200 self.assertEqual(q.get(), kwargs) 201 self.assertEqual(q.get(), p.name) 202 if self.TYPE != 'threads': 203 self.assertEqual(q.get(), current.authkey) 204 self.assertEqual(q.get(), p.pid) 205 206 p.join() 207 208 self.assertEqual(p.exitcode, 0) 209 self.assertEqual(p.is_alive(), False) 210 self.assertNotIn(p, self.active_children()) 211 212 @classmethod 213 def _test_terminate(cls): 214 time.sleep(1000) 215 216 def test_terminate(self): 217 if self.TYPE == 'threads': 218 return 219 220 p = self.Process(target=self._test_terminate) 221 p.daemon = True 222 p.start() 223 224 self.assertEqual(p.is_alive(), True) 225 self.assertIn(p, self.active_children()) 226 self.assertEqual(p.exitcode, None) 227 228 p.terminate() 229 230 join = TimingWrapper(p.join) 231 self.assertEqual(join(), None) 232 self.assertTimingAlmostEqual(join.elapsed, 0.0) 233 234 self.assertEqual(p.is_alive(), False) 235 self.assertNotIn(p, self.active_children()) 236 237 p.join() 238 239 # XXX sometimes get p.exitcode == 0 on Windows ... 240 #self.assertEqual(p.exitcode, -signal.SIGTERM) 241 242 def test_cpu_count(self): 243 try: 244 cpus = multiprocessing.cpu_count() 245 except NotImplementedError: 246 cpus = 1 247 self.assertTrue(type(cpus) is int) 248 self.assertTrue(cpus >= 1) 249 250 def test_active_children(self): 251 self.assertEqual(type(self.active_children()), list) 252 253 p = self.Process(target=time.sleep, args=(DELTA,)) 254 self.assertNotIn(p, self.active_children()) 255 256 p.start() 257 self.assertIn(p, self.active_children()) 258 259 p.join() 260 self.assertNotIn(p, self.active_children()) 261 262 @classmethod 263 def _test_recursion(cls, wconn, id): 264 from multiprocessing import forking 265 wconn.send(id) 266 if len(id) < 2: 267 for i in range(2): 268 p = cls.Process( 269 target=cls._test_recursion, args=(wconn, id+[i]) 270 ) 271 p.start() 272 p.join() 273 274 def test_recursion(self): 275 rconn, wconn = self.Pipe(duplex=False) 276 self._test_recursion(wconn, []) 277 278 time.sleep(DELTA) 279 result = [] 280 while rconn.poll(): 281 result.append(rconn.recv()) 282 283 expected = [ 284 [], 285 [0], 286 [0, 0], 287 [0, 1], 288 [1], 289 [1, 0], 290 [1, 1] 291 ] 292 self.assertEqual(result, expected) 293 294 # 295 # 296 # 297 298 class _UpperCaser(multiprocessing.Process): 299 300 def __init__(self): 301 multiprocessing.Process.__init__(self) 302 self.child_conn, self.parent_conn = multiprocessing.Pipe() 303 304 def run(self): 305 self.parent_conn.close() 306 for s in iter(self.child_conn.recv, None): 307 self.child_conn.send(s.upper()) 308 self.child_conn.close() 309 310 def submit(self, s): 311 assert type(s) is str 312 self.parent_conn.send(s) 313 return self.parent_conn.recv() 314 315 def stop(self): 316 self.parent_conn.send(None) 317 self.parent_conn.close() 318 self.child_conn.close() 319 320 class _TestSubclassingProcess(BaseTestCase): 321 322 ALLOWED_TYPES = ('processes',) 323 324 def test_subclassing(self): 325 uppercaser = _UpperCaser() 326 uppercaser.start() 327 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 328 self.assertEqual(uppercaser.submit('world'), 'WORLD') 329 uppercaser.stop() 330 uppercaser.join() 331 332 # 333 # 334 # 335 336 def queue_empty(q): 337 if hasattr(q, 'empty'): 338 return q.empty() 339 else: 340 return q.qsize() == 0 341 342 def queue_full(q, maxsize): 343 if hasattr(q, 'full'): 344 return q.full() 345 else: 346 return q.qsize() == maxsize 347 348 349 class _TestQueue(BaseTestCase): 350 351 352 @classmethod 353 def _test_put(cls, queue, child_can_start, parent_can_continue): 354 child_can_start.wait() 355 for i in range(6): 356 queue.get() 357 parent_can_continue.set() 358 359 def test_put(self): 360 MAXSIZE = 6 361 queue = self.Queue(maxsize=MAXSIZE) 362 child_can_start = self.Event() 363 parent_can_continue = self.Event() 364 365 proc = self.Process( 366 target=self._test_put, 367 args=(queue, child_can_start, parent_can_continue) 368 ) 369 proc.daemon = True 370 proc.start() 371 372 self.assertEqual(queue_empty(queue), True) 373 self.assertEqual(queue_full(queue, MAXSIZE), False) 374 375 queue.put(1) 376 queue.put(2, True) 377 queue.put(3, True, None) 378 queue.put(4, False) 379 queue.put(5, False, None) 380 queue.put_nowait(6) 381 382 # the values may be in buffer but not yet in pipe so sleep a bit 383 time.sleep(DELTA) 384 385 self.assertEqual(queue_empty(queue), False) 386 self.assertEqual(queue_full(queue, MAXSIZE), True) 387 388 put = TimingWrapper(queue.put) 389 put_nowait = TimingWrapper(queue.put_nowait) 390 391 self.assertRaises(Queue.Full, put, 7, False) 392 self.assertTimingAlmostEqual(put.elapsed, 0) 393 394 self.assertRaises(Queue.Full, put, 7, False, None) 395 self.assertTimingAlmostEqual(put.elapsed, 0) 396 397 self.assertRaises(Queue.Full, put_nowait, 7) 398 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 399 400 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1) 401 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 402 403 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2) 404 self.assertTimingAlmostEqual(put.elapsed, 0) 405 406 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3) 407 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 408 409 child_can_start.set() 410 parent_can_continue.wait() 411 412 self.assertEqual(queue_empty(queue), True) 413 self.assertEqual(queue_full(queue, MAXSIZE), False) 414 415 proc.join() 416 417 @classmethod 418 def _test_get(cls, queue, child_can_start, parent_can_continue): 419 child_can_start.wait() 420 #queue.put(1) 421 queue.put(2) 422 queue.put(3) 423 queue.put(4) 424 queue.put(5) 425 parent_can_continue.set() 426 427 def test_get(self): 428 queue = self.Queue() 429 child_can_start = self.Event() 430 parent_can_continue = self.Event() 431 432 proc = self.Process( 433 target=self._test_get, 434 args=(queue, child_can_start, parent_can_continue) 435 ) 436 proc.daemon = True 437 proc.start() 438 439 self.assertEqual(queue_empty(queue), True) 440 441 child_can_start.set() 442 parent_can_continue.wait() 443 444 time.sleep(DELTA) 445 self.assertEqual(queue_empty(queue), False) 446 447 # Hangs unexpectedly, remove for now 448 #self.assertEqual(queue.get(), 1) 449 self.assertEqual(queue.get(True, None), 2) 450 self.assertEqual(queue.get(True), 3) 451 self.assertEqual(queue.get(timeout=1), 4) 452 self.assertEqual(queue.get_nowait(), 5) 453 454 self.assertEqual(queue_empty(queue), True) 455 456 get = TimingWrapper(queue.get) 457 get_nowait = TimingWrapper(queue.get_nowait) 458 459 self.assertRaises(Queue.Empty, get, False) 460 self.assertTimingAlmostEqual(get.elapsed, 0) 461 462 self.assertRaises(Queue.Empty, get, False, None) 463 self.assertTimingAlmostEqual(get.elapsed, 0) 464 465 self.assertRaises(Queue.Empty, get_nowait) 466 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 467 468 self.assertRaises(Queue.Empty, get, True, TIMEOUT1) 469 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 470 471 self.assertRaises(Queue.Empty, get, False, TIMEOUT2) 472 self.assertTimingAlmostEqual(get.elapsed, 0) 473 474 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3) 475 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 476 477 proc.join() 478 479 @classmethod 480 def _test_fork(cls, queue): 481 for i in range(10, 20): 482 queue.put(i) 483 # note that at this point the items may only be buffered, so the 484 # process cannot shutdown until the feeder thread has finished 485 # pushing items onto the pipe. 486 487 def test_fork(self): 488 # Old versions of Queue would fail to create a new feeder 489 # thread for a forked process if the original process had its 490 # own feeder thread. This test checks that this no longer 491 # happens. 492 493 queue = self.Queue() 494 495 # put items on queue so that main process starts a feeder thread 496 for i in range(10): 497 queue.put(i) 498 499 # wait to make sure thread starts before we fork a new process 500 time.sleep(DELTA) 501 502 # fork process 503 p = self.Process(target=self._test_fork, args=(queue,)) 504 p.start() 505 506 # check that all expected items are in the queue 507 for i in range(20): 508 self.assertEqual(queue.get(), i) 509 self.assertRaises(Queue.Empty, queue.get, False) 510 511 p.join() 512 513 def test_qsize(self): 514 q = self.Queue() 515 try: 516 self.assertEqual(q.qsize(), 0) 517 except NotImplementedError: 518 return 519 q.put(1) 520 self.assertEqual(q.qsize(), 1) 521 q.put(5) 522 self.assertEqual(q.qsize(), 2) 523 q.get() 524 self.assertEqual(q.qsize(), 1) 525 q.get() 526 self.assertEqual(q.qsize(), 0) 527 528 @classmethod 529 def _test_task_done(cls, q): 530 for obj in iter(q.get, None): 531 time.sleep(DELTA) 532 q.task_done() 533 534 def test_task_done(self): 535 queue = self.JoinableQueue() 536 537 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'): 538 self.skipTest("requires 'queue.task_done()' method") 539 540 workers = [self.Process(target=self._test_task_done, args=(queue,)) 541 for i in xrange(4)] 542 543 for p in workers: 544 p.start() 545 546 for i in xrange(10): 547 queue.put(i) 548 549 queue.join() 550 551 for p in workers: 552 queue.put(None) 553 554 for p in workers: 555 p.join() 556 557 # 558 # 559 # 560 561 class _TestLock(BaseTestCase): 562 563 def test_lock(self): 564 lock = self.Lock() 565 self.assertEqual(lock.acquire(), True) 566 self.assertEqual(lock.acquire(False), False) 567 self.assertEqual(lock.release(), None) 568 self.assertRaises((ValueError, threading.ThreadError), lock.release) 569 570 def test_rlock(self): 571 lock = self.RLock() 572 self.assertEqual(lock.acquire(), True) 573 self.assertEqual(lock.acquire(), True) 574 self.assertEqual(lock.acquire(), True) 575 self.assertEqual(lock.release(), None) 576 self.assertEqual(lock.release(), None) 577 self.assertEqual(lock.release(), None) 578 self.assertRaises((AssertionError, RuntimeError), lock.release) 579 580 def test_lock_context(self): 581 with self.Lock(): 582 pass 583 584 585 class _TestSemaphore(BaseTestCase): 586 587 def _test_semaphore(self, sem): 588 self.assertReturnsIfImplemented(2, get_value, sem) 589 self.assertEqual(sem.acquire(), True) 590 self.assertReturnsIfImplemented(1, get_value, sem) 591 self.assertEqual(sem.acquire(), True) 592 self.assertReturnsIfImplemented(0, get_value, sem) 593 self.assertEqual(sem.acquire(False), False) 594 self.assertReturnsIfImplemented(0, get_value, sem) 595 self.assertEqual(sem.release(), None) 596 self.assertReturnsIfImplemented(1, get_value, sem) 597 self.assertEqual(sem.release(), None) 598 self.assertReturnsIfImplemented(2, get_value, sem) 599 600 def test_semaphore(self): 601 sem = self.Semaphore(2) 602 self._test_semaphore(sem) 603 self.assertEqual(sem.release(), None) 604 self.assertReturnsIfImplemented(3, get_value, sem) 605 self.assertEqual(sem.release(), None) 606 self.assertReturnsIfImplemented(4, get_value, sem) 607 608 def test_bounded_semaphore(self): 609 sem = self.BoundedSemaphore(2) 610 self._test_semaphore(sem) 611 # Currently fails on OS/X 612 #if HAVE_GETVALUE: 613 # self.assertRaises(ValueError, sem.release) 614 # self.assertReturnsIfImplemented(2, get_value, sem) 615 616 def test_timeout(self): 617 if self.TYPE != 'processes': 618 return 619 620 sem = self.Semaphore(0) 621 acquire = TimingWrapper(sem.acquire) 622 623 self.assertEqual(acquire(False), False) 624 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 625 626 self.assertEqual(acquire(False, None), False) 627 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 628 629 self.assertEqual(acquire(False, TIMEOUT1), False) 630 self.assertTimingAlmostEqual(acquire.elapsed, 0) 631 632 self.assertEqual(acquire(True, TIMEOUT2), False) 633 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 634 635 self.assertEqual(acquire(timeout=TIMEOUT3), False) 636 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 637 638 639 class _TestCondition(BaseTestCase): 640 641 @classmethod 642 def f(cls, cond, sleeping, woken, timeout=None): 643 cond.acquire() 644 sleeping.release() 645 cond.wait(timeout) 646 woken.release() 647 cond.release() 648 649 def check_invariant(self, cond): 650 # this is only supposed to succeed when there are no sleepers 651 if self.TYPE == 'processes': 652 try: 653 sleepers = (cond._sleeping_count.get_value() - 654 cond._woken_count.get_value()) 655 self.assertEqual(sleepers, 0) 656 self.assertEqual(cond._wait_semaphore.get_value(), 0) 657 except NotImplementedError: 658 pass 659 660 def test_notify(self): 661 cond = self.Condition() 662 sleeping = self.Semaphore(0) 663 woken = self.Semaphore(0) 664 665 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 666 p.daemon = True 667 p.start() 668 669 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 670 p.daemon = True 671 p.start() 672 673 # wait for both children to start sleeping 674 sleeping.acquire() 675 sleeping.acquire() 676 677 # check no process/thread has woken up 678 time.sleep(DELTA) 679 self.assertReturnsIfImplemented(0, get_value, woken) 680 681 # wake up one process/thread 682 cond.acquire() 683 cond.notify() 684 cond.release() 685 686 # check one process/thread has woken up 687 time.sleep(DELTA) 688 self.assertReturnsIfImplemented(1, get_value, woken) 689 690 # wake up another 691 cond.acquire() 692 cond.notify() 693 cond.release() 694 695 # check other has woken up 696 time.sleep(DELTA) 697 self.assertReturnsIfImplemented(2, get_value, woken) 698 699 # check state is not mucked up 700 self.check_invariant(cond) 701 p.join() 702 703 def test_notify_all(self): 704 cond = self.Condition() 705 sleeping = self.Semaphore(0) 706 woken = self.Semaphore(0) 707 708 # start some threads/processes which will timeout 709 for i in range(3): 710 p = self.Process(target=self.f, 711 args=(cond, sleeping, woken, TIMEOUT1)) 712 p.daemon = True 713 p.start() 714 715 t = threading.Thread(target=self.f, 716 args=(cond, sleeping, woken, TIMEOUT1)) 717 t.daemon = True 718 t.start() 719 720 # wait for them all to sleep 721 for i in xrange(6): 722 sleeping.acquire() 723 724 # check they have all timed out 725 for i in xrange(6): 726 woken.acquire() 727 self.assertReturnsIfImplemented(0, get_value, woken) 728 729 # check state is not mucked up 730 self.check_invariant(cond) 731 732 # start some more threads/processes 733 for i in range(3): 734 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 735 p.daemon = True 736 p.start() 737 738 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 739 t.daemon = True 740 t.start() 741 742 # wait for them to all sleep 743 for i in xrange(6): 744 sleeping.acquire() 745 746 # check no process/thread has woken up 747 time.sleep(DELTA) 748 self.assertReturnsIfImplemented(0, get_value, woken) 749 750 # wake them all up 751 cond.acquire() 752 cond.notify_all() 753 cond.release() 754 755 # check they have all woken 756 time.sleep(DELTA) 757 self.assertReturnsIfImplemented(6, get_value, woken) 758 759 # check state is not mucked up 760 self.check_invariant(cond) 761 762 def test_timeout(self): 763 cond = self.Condition() 764 wait = TimingWrapper(cond.wait) 765 cond.acquire() 766 res = wait(TIMEOUT1) 767 cond.release() 768 self.assertEqual(res, None) 769 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 770 771 772 class _TestEvent(BaseTestCase): 773 774 @classmethod 775 def _test_event(cls, event): 776 time.sleep(TIMEOUT2) 777 event.set() 778 779 def test_event(self): 780 event = self.Event() 781 wait = TimingWrapper(event.wait) 782 783 # Removed temporarily, due to API shear, this does not 784 # work with threading._Event objects. is_set == isSet 785 self.assertEqual(event.is_set(), False) 786 787 # Removed, threading.Event.wait() will return the value of the __flag 788 # instead of None. API Shear with the semaphore backed mp.Event 789 self.assertEqual(wait(0.0), False) 790 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 791 self.assertEqual(wait(TIMEOUT1), False) 792 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 793 794 event.set() 795 796 # See note above on the API differences 797 self.assertEqual(event.is_set(), True) 798 self.assertEqual(wait(), True) 799 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 800 self.assertEqual(wait(TIMEOUT1), True) 801 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 802 # self.assertEqual(event.is_set(), True) 803 804 event.clear() 805 806 #self.assertEqual(event.is_set(), False) 807 808 self.Process(target=self._test_event, args=(event,)).start() 809 self.assertEqual(wait(), True) 810 811 # 812 # 813 # 814 815 class _TestValue(BaseTestCase): 816 817 ALLOWED_TYPES = ('processes',) 818 819 codes_values = [ 820 ('i', 4343, 24234), 821 ('d', 3.625, -4.25), 822 ('h', -232, 234), 823 ('c', latin('x'), latin('y')) 824 ] 825 826 def setUp(self): 827 if not HAS_SHAREDCTYPES: 828 self.skipTest("requires multiprocessing.sharedctypes") 829 830 @classmethod 831 def _test(cls, values): 832 for sv, cv in zip(values, cls.codes_values): 833 sv.value = cv[2] 834 835 836 def test_value(self, raw=False): 837 if raw: 838 values = [self.RawValue(code, value) 839 for code, value, _ in self.codes_values] 840 else: 841 values = [self.Value(code, value) 842 for code, value, _ in self.codes_values] 843 844 for sv, cv in zip(values, self.codes_values): 845 self.assertEqual(sv.value, cv[1]) 846 847 proc = self.Process(target=self._test, args=(values,)) 848 proc.start() 849 proc.join() 850 851 for sv, cv in zip(values, self.codes_values): 852 self.assertEqual(sv.value, cv[2]) 853 854 def test_rawvalue(self): 855 self.test_value(raw=True) 856 857 def test_getobj_getlock(self): 858 val1 = self.Value('i', 5) 859 lock1 = val1.get_lock() 860 obj1 = val1.get_obj() 861 862 val2 = self.Value('i', 5, lock=None) 863 lock2 = val2.get_lock() 864 obj2 = val2.get_obj() 865 866 lock = self.Lock() 867 val3 = self.Value('i', 5, lock=lock) 868 lock3 = val3.get_lock() 869 obj3 = val3.get_obj() 870 self.assertEqual(lock, lock3) 871 872 arr4 = self.Value('i', 5, lock=False) 873 self.assertFalse(hasattr(arr4, 'get_lock')) 874 self.assertFalse(hasattr(arr4, 'get_obj')) 875 876 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 877 878 arr5 = self.RawValue('i', 5) 879 self.assertFalse(hasattr(arr5, 'get_lock')) 880 self.assertFalse(hasattr(arr5, 'get_obj')) 881 882 883 class _TestArray(BaseTestCase): 884 885 ALLOWED_TYPES = ('processes',) 886 887 @classmethod 888 def f(cls, seq): 889 for i in range(1, len(seq)): 890 seq[i] += seq[i-1] 891 892 @unittest.skipIf(c_int is None, "requires _ctypes") 893 def test_array(self, raw=False): 894 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 895 if raw: 896 arr = self.RawArray('i', seq) 897 else: 898 arr = self.Array('i', seq) 899 900 self.assertEqual(len(arr), len(seq)) 901 self.assertEqual(arr[3], seq[3]) 902 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 903 904 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 905 906 self.assertEqual(list(arr[:]), seq) 907 908 self.f(seq) 909 910 p = self.Process(target=self.f, args=(arr,)) 911 p.start() 912 p.join() 913 914 self.assertEqual(list(arr[:]), seq) 915 916 @unittest.skipIf(c_int is None, "requires _ctypes") 917 def test_array_from_size(self): 918 size = 10 919 # Test for zeroing (see issue #11675). 920 # The repetition below strengthens the test by increasing the chances 921 # of previously allocated non-zero memory being used for the new array 922 # on the 2nd and 3rd loops. 923 for _ in range(3): 924 arr = self.Array('i', size) 925 self.assertEqual(len(arr), size) 926 self.assertEqual(list(arr), [0] * size) 927 arr[:] = range(10) 928 self.assertEqual(list(arr), range(10)) 929 del arr 930 931 @unittest.skipIf(c_int is None, "requires _ctypes") 932 def test_rawarray(self): 933 self.test_array(raw=True) 934 935 @unittest.skipIf(c_int is None, "requires _ctypes") 936 def test_array_accepts_long(self): 937 arr = self.Array('i', 10L) 938 self.assertEqual(len(arr), 10) 939 raw_arr = self.RawArray('i', 10L) 940 self.assertEqual(len(raw_arr), 10) 941 942 @unittest.skipIf(c_int is None, "requires _ctypes") 943 def test_getobj_getlock_obj(self): 944 arr1 = self.Array('i', range(10)) 945 lock1 = arr1.get_lock() 946 obj1 = arr1.get_obj() 947 948 arr2 = self.Array('i', range(10), lock=None) 949 lock2 = arr2.get_lock() 950 obj2 = arr2.get_obj() 951 952 lock = self.Lock() 953 arr3 = self.Array('i', range(10), lock=lock) 954 lock3 = arr3.get_lock() 955 obj3 = arr3.get_obj() 956 self.assertEqual(lock, lock3) 957 958 arr4 = self.Array('i', range(10), lock=False) 959 self.assertFalse(hasattr(arr4, 'get_lock')) 960 self.assertFalse(hasattr(arr4, 'get_obj')) 961 self.assertRaises(AttributeError, 962 self.Array, 'i', range(10), lock='notalock') 963 964 arr5 = self.RawArray('i', range(10)) 965 self.assertFalse(hasattr(arr5, 'get_lock')) 966 self.assertFalse(hasattr(arr5, 'get_obj')) 967 968 # 969 # 970 # 971 972 class _TestContainers(BaseTestCase): 973 974 ALLOWED_TYPES = ('manager',) 975 976 def test_list(self): 977 a = self.list(range(10)) 978 self.assertEqual(a[:], range(10)) 979 980 b = self.list() 981 self.assertEqual(b[:], []) 982 983 b.extend(range(5)) 984 self.assertEqual(b[:], range(5)) 985 986 self.assertEqual(b[2], 2) 987 self.assertEqual(b[2:10], [2,3,4]) 988 989 b *= 2 990 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 991 992 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 993 994 self.assertEqual(a[:], range(10)) 995 996 d = [a, b] 997 e = self.list(d) 998 self.assertEqual( 999 e[:], 1000 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 1001 ) 1002 1003 f = self.list([a]) 1004 a.append('hello') 1005 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) 1006 1007 def test_dict(self): 1008 d = self.dict() 1009 indices = range(65, 70) 1010 for i in indices: 1011 d[i] = chr(i) 1012 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 1013 self.assertEqual(sorted(d.keys()), indices) 1014 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 1015 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 1016 1017 def test_namespace(self): 1018 n = self.Namespace() 1019 n.name = 'Bob' 1020 n.job = 'Builder' 1021 n._hidden = 'hidden' 1022 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 1023 del n.job 1024 self.assertEqual(str(n), "Namespace(name='Bob')") 1025 self.assertTrue(hasattr(n, 'name')) 1026 self.assertTrue(not hasattr(n, 'job')) 1027 1028 # 1029 # 1030 # 1031 1032 def sqr(x, wait=0.0): 1033 time.sleep(wait) 1034 return x*x 1035 class _TestPool(BaseTestCase): 1036 1037 def test_apply(self): 1038 papply = self.pool.apply 1039 self.assertEqual(papply(sqr, (5,)), sqr(5)) 1040 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 1041 1042 def test_map(self): 1043 pmap = self.pool.map 1044 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10))) 1045 self.assertEqual(pmap(sqr, range(100), chunksize=20), 1046 map(sqr, range(100))) 1047 1048 def test_map_chunksize(self): 1049 try: 1050 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 1051 except multiprocessing.TimeoutError: 1052 self.fail("pool.map_async with chunksize stalled on null list") 1053 1054 def test_async(self): 1055 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 1056 get = TimingWrapper(res.get) 1057 self.assertEqual(get(), 49) 1058 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1059 1060 def test_async_timeout(self): 1061 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2)) 1062 get = TimingWrapper(res.get) 1063 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 1064 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 1065 1066 def test_imap(self): 1067 it = self.pool.imap(sqr, range(10)) 1068 self.assertEqual(list(it), map(sqr, range(10))) 1069 1070 it = self.pool.imap(sqr, range(10)) 1071 for i in range(10): 1072 self.assertEqual(it.next(), i*i) 1073 self.assertRaises(StopIteration, it.next) 1074 1075 it = self.pool.imap(sqr, range(1000), chunksize=100) 1076 for i in range(1000): 1077 self.assertEqual(it.next(), i*i) 1078 self.assertRaises(StopIteration, it.next) 1079 1080 def test_imap_unordered(self): 1081 it = self.pool.imap_unordered(sqr, range(1000)) 1082 self.assertEqual(sorted(it), map(sqr, range(1000))) 1083 1084 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53) 1085 self.assertEqual(sorted(it), map(sqr, range(1000))) 1086 1087 def test_make_pool(self): 1088 p = multiprocessing.Pool(3) 1089 self.assertEqual(3, len(p._pool)) 1090 p.close() 1091 p.join() 1092 1093 def test_terminate(self): 1094 if self.TYPE == 'manager': 1095 # On Unix a forked process increfs each shared object to 1096 # which its parent process held a reference. If the 1097 # forked process gets terminated then there is likely to 1098 # be a reference leak. So to prevent 1099 # _TestZZZNumberOfObjects from failing we skip this test 1100 # when using a manager. 1101 return 1102 1103 result = self.pool.map_async( 1104 time.sleep, [0.1 for i in range(10000)], chunksize=1 1105 ) 1106 self.pool.terminate() 1107 join = TimingWrapper(self.pool.join) 1108 join() 1109 self.assertTrue(join.elapsed < 0.2) 1110 1111 class _TestPoolWorkerLifetime(BaseTestCase): 1112 1113 ALLOWED_TYPES = ('processes', ) 1114 def test_pool_worker_lifetime(self): 1115 p = multiprocessing.Pool(3, maxtasksperchild=10) 1116 self.assertEqual(3, len(p._pool)) 1117 origworkerpids = [w.pid for w in p._pool] 1118 # Run many tasks so each worker gets replaced (hopefully) 1119 results = [] 1120 for i in range(100): 1121 results.append(p.apply_async(sqr, (i, ))) 1122 # Fetch the results and verify we got the right answers, 1123 # also ensuring all the tasks have completed. 1124 for (j, res) in enumerate(results): 1125 self.assertEqual(res.get(), sqr(j)) 1126 # Refill the pool 1127 p._repopulate_pool() 1128 # Wait until all workers are alive 1129 # (countdown * DELTA = 5 seconds max startup process time) 1130 countdown = 50 1131 while countdown and not all(w.is_alive() for w in p._pool): 1132 countdown -= 1 1133 time.sleep(DELTA) 1134 finalworkerpids = [w.pid for w in p._pool] 1135 # All pids should be assigned. See issue #7805. 1136 self.assertNotIn(None, origworkerpids) 1137 self.assertNotIn(None, finalworkerpids) 1138 # Finally, check that the worker pids have changed 1139 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 1140 p.close() 1141 p.join() 1142 1143 # 1144 # Test that manager has expected number of shared objects left 1145 # 1146 1147 class _TestZZZNumberOfObjects(BaseTestCase): 1148 # Because test cases are sorted alphabetically, this one will get 1149 # run after all the other tests for the manager. It tests that 1150 # there have been no "reference leaks" for the manager's shared 1151 # objects. Note the comment in _TestPool.test_terminate(). 1152 ALLOWED_TYPES = ('manager',) 1153 1154 def test_number_of_objects(self): 1155 EXPECTED_NUMBER = 1 # the pool object is still alive 1156 multiprocessing.active_children() # discard dead process objs 1157 gc.collect() # do garbage collection 1158 refs = self.manager._number_of_objects() 1159 debug_info = self.manager._debug_info() 1160 if refs != EXPECTED_NUMBER: 1161 print self.manager._debug_info() 1162 print debug_info 1163 1164 self.assertEqual(refs, EXPECTED_NUMBER) 1165 1166 # 1167 # Test of creating a customized manager class 1168 # 1169 1170 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 1171 1172 class FooBar(object): 1173 def f(self): 1174 return 'f()' 1175 def g(self): 1176 raise ValueError 1177 def _h(self): 1178 return '_h()' 1179 1180 def baz(): 1181 for i in xrange(10): 1182 yield i*i 1183 1184 class IteratorProxy(BaseProxy): 1185 _exposed_ = ('next', '__next__') 1186 def __iter__(self): 1187 return self 1188 def next(self): 1189 return self._callmethod('next') 1190 def __next__(self): 1191 return self._callmethod('__next__') 1192 1193 class MyManager(BaseManager): 1194 pass 1195 1196 MyManager.register('Foo', callable=FooBar) 1197 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 1198 MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 1199 1200 1201 class _TestMyManager(BaseTestCase): 1202 1203 ALLOWED_TYPES = ('manager',) 1204 1205 def test_mymanager(self): 1206 manager = MyManager() 1207 manager.start() 1208 1209 foo = manager.Foo() 1210 bar = manager.Bar() 1211 baz = manager.baz() 1212 1213 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 1214 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 1215 1216 self.assertEqual(foo_methods, ['f', 'g']) 1217 self.assertEqual(bar_methods, ['f', '_h']) 1218 1219 self.assertEqual(foo.f(), 'f()') 1220 self.assertRaises(ValueError, foo.g) 1221 self.assertEqual(foo._callmethod('f'), 'f()') 1222 self.assertRaises(RemoteError, foo._callmethod, '_h') 1223 1224 self.assertEqual(bar.f(), 'f()') 1225 self.assertEqual(bar._h(), '_h()') 1226 self.assertEqual(bar._callmethod('f'), 'f()') 1227 self.assertEqual(bar._callmethod('_h'), '_h()') 1228 1229 self.assertEqual(list(baz), [i*i for i in range(10)]) 1230 1231 manager.shutdown() 1232 1233 # 1234 # Test of connecting to a remote server and using xmlrpclib for serialization 1235 # 1236 1237 _queue = Queue.Queue() 1238 def get_queue(): 1239 return _queue 1240 1241 class QueueManager(BaseManager): 1242 '''manager class used by server process''' 1243 QueueManager.register('get_queue', callable=get_queue) 1244 1245 class QueueManager2(BaseManager): 1246 '''manager class which specifies the same interface as QueueManager''' 1247 QueueManager2.register('get_queue') 1248 1249 1250 SERIALIZER = 'xmlrpclib' 1251 1252 class _TestRemoteManager(BaseTestCase): 1253 1254 ALLOWED_TYPES = ('manager',) 1255 1256 @classmethod 1257 def _putter(cls, address, authkey): 1258 manager = QueueManager2( 1259 address=address, authkey=authkey, serializer=SERIALIZER 1260 ) 1261 manager.connect() 1262 queue = manager.get_queue() 1263 queue.put(('hello world', None, True, 2.25)) 1264 1265 def test_remote(self): 1266 authkey = os.urandom(32) 1267 1268 manager = QueueManager( 1269 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER 1270 ) 1271 manager.start() 1272 1273 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1274 p.start() 1275 1276 manager2 = QueueManager2( 1277 address=manager.address, authkey=authkey, serializer=SERIALIZER 1278 ) 1279 manager2.connect() 1280 queue = manager2.get_queue() 1281 1282 # Note that xmlrpclib will deserialize object as a list not a tuple 1283 self.assertEqual(queue.get(), ['hello world', None, True, 2.25]) 1284 1285 # Because we are using xmlrpclib for serialization instead of 1286 # pickle this will cause a serialization error. 1287 self.assertRaises(Exception, queue.put, time.sleep) 1288 1289 # Make queue finalizer run before the server is stopped 1290 del queue 1291 manager.shutdown() 1292 1293 class _TestManagerRestart(BaseTestCase): 1294 1295 @classmethod 1296 def _putter(cls, address, authkey): 1297 manager = QueueManager( 1298 address=address, authkey=authkey, serializer=SERIALIZER) 1299 manager.connect() 1300 queue = manager.get_queue() 1301 queue.put('hello world') 1302 1303 def test_rapid_restart(self): 1304 authkey = os.urandom(32) 1305 manager = QueueManager( 1306 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER) 1307 srvr = manager.get_server() 1308 addr = srvr.address 1309 # Close the connection.Listener socket which gets opened as a part 1310 # of manager.get_server(). It's not needed for the test. 1311 srvr.listener.close() 1312 manager.start() 1313 1314 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1315 p.start() 1316 queue = manager.get_queue() 1317 self.assertEqual(queue.get(), 'hello world') 1318 del queue 1319 manager.shutdown() 1320 manager = QueueManager( 1321 address=addr, authkey=authkey, serializer=SERIALIZER) 1322 manager.start() 1323 manager.shutdown() 1324 1325 # 1326 # 1327 # 1328 1329 SENTINEL = latin('') 1330 1331 class _TestConnection(BaseTestCase): 1332 1333 ALLOWED_TYPES = ('processes', 'threads') 1334 1335 @classmethod 1336 def _echo(cls, conn): 1337 for msg in iter(conn.recv_bytes, SENTINEL): 1338 conn.send_bytes(msg) 1339 conn.close() 1340 1341 def test_connection(self): 1342 conn, child_conn = self.Pipe() 1343 1344 p = self.Process(target=self._echo, args=(child_conn,)) 1345 p.daemon = True 1346 p.start() 1347 1348 seq = [1, 2.25, None] 1349 msg = latin('hello world') 1350 longmsg = msg * 10 1351 arr = array.array('i', range(4)) 1352 1353 if self.TYPE == 'processes': 1354 self.assertEqual(type(conn.fileno()), int) 1355 1356 self.assertEqual(conn.send(seq), None) 1357 self.assertEqual(conn.recv(), seq) 1358 1359 self.assertEqual(conn.send_bytes(msg), None) 1360 self.assertEqual(conn.recv_bytes(), msg) 1361 1362 if self.TYPE == 'processes': 1363 buffer = array.array('i', [0]*10) 1364 expected = list(arr) + [0] * (10 - len(arr)) 1365 self.assertEqual(conn.send_bytes(arr), None) 1366 self.assertEqual(conn.recv_bytes_into(buffer), 1367 len(arr) * buffer.itemsize) 1368 self.assertEqual(list(buffer), expected) 1369 1370 buffer = array.array('i', [0]*10) 1371 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 1372 self.assertEqual(conn.send_bytes(arr), None) 1373 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 1374 len(arr) * buffer.itemsize) 1375 self.assertEqual(list(buffer), expected) 1376 1377 buffer = bytearray(latin(' ' * 40)) 1378 self.assertEqual(conn.send_bytes(longmsg), None) 1379 try: 1380 res = conn.recv_bytes_into(buffer) 1381 except multiprocessing.BufferTooShort, e: 1382 self.assertEqual(e.args, (longmsg,)) 1383 else: 1384 self.fail('expected BufferTooShort, got %s' % res) 1385 1386 poll = TimingWrapper(conn.poll) 1387 1388 self.assertEqual(poll(), False) 1389 self.assertTimingAlmostEqual(poll.elapsed, 0) 1390 1391 self.assertEqual(poll(TIMEOUT1), False) 1392 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 1393 1394 conn.send(None) 1395 1396 self.assertEqual(poll(TIMEOUT1), True) 1397 self.assertTimingAlmostEqual(poll.elapsed, 0) 1398 1399 self.assertEqual(conn.recv(), None) 1400 1401 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 1402 conn.send_bytes(really_big_msg) 1403 self.assertEqual(conn.recv_bytes(), really_big_msg) 1404 1405 conn.send_bytes(SENTINEL) # tell child to quit 1406 child_conn.close() 1407 1408 if self.TYPE == 'processes': 1409 self.assertEqual(conn.readable, True) 1410 self.assertEqual(conn.writable, True) 1411 self.assertRaises(EOFError, conn.recv) 1412 self.assertRaises(EOFError, conn.recv_bytes) 1413 1414 p.join() 1415 1416 def test_duplex_false(self): 1417 reader, writer = self.Pipe(duplex=False) 1418 self.assertEqual(writer.send(1), None) 1419 self.assertEqual(reader.recv(), 1) 1420 if self.TYPE == 'processes': 1421 self.assertEqual(reader.readable, True) 1422 self.assertEqual(reader.writable, False) 1423 self.assertEqual(writer.readable, False) 1424 self.assertEqual(writer.writable, True) 1425 self.assertRaises(IOError, reader.send, 2) 1426 self.assertRaises(IOError, writer.recv) 1427 self.assertRaises(IOError, writer.poll) 1428 1429 def test_spawn_close(self): 1430 # We test that a pipe connection can be closed by parent 1431 # process immediately after child is spawned. On Windows this 1432 # would have sometimes failed on old versions because 1433 # child_conn would be closed before the child got a chance to 1434 # duplicate it. 1435 conn, child_conn = self.Pipe() 1436 1437 p = self.Process(target=self._echo, args=(child_conn,)) 1438 p.start() 1439 child_conn.close() # this might complete before child initializes 1440 1441 msg = latin('hello') 1442 conn.send_bytes(msg) 1443 self.assertEqual(conn.recv_bytes(), msg) 1444 1445 conn.send_bytes(SENTINEL) 1446 conn.close() 1447 p.join() 1448 1449 def test_sendbytes(self): 1450 if self.TYPE != 'processes': 1451 return 1452 1453 msg = latin('abcdefghijklmnopqrstuvwxyz') 1454 a, b = self.Pipe() 1455 1456 a.send_bytes(msg) 1457 self.assertEqual(b.recv_bytes(), msg) 1458 1459 a.send_bytes(msg, 5) 1460 self.assertEqual(b.recv_bytes(), msg[5:]) 1461 1462 a.send_bytes(msg, 7, 8) 1463 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 1464 1465 a.send_bytes(msg, 26) 1466 self.assertEqual(b.recv_bytes(), latin('')) 1467 1468 a.send_bytes(msg, 26, 0) 1469 self.assertEqual(b.recv_bytes(), latin('')) 1470 1471 self.assertRaises(ValueError, a.send_bytes, msg, 27) 1472 1473 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 1474 1475 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 1476 1477 self.assertRaises(ValueError, a.send_bytes, msg, -1) 1478 1479 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 1480 1481 class _TestListenerClient(BaseTestCase): 1482 1483 ALLOWED_TYPES = ('processes', 'threads') 1484 1485 @classmethod 1486 def _test(cls, address): 1487 conn = cls.connection.Client(address) 1488 conn.send('hello') 1489 conn.close() 1490 1491 def test_listener_client(self): 1492 for family in self.connection.families: 1493 l = self.connection.Listener(family=family) 1494 p = self.Process(target=self._test, args=(l.address,)) 1495 p.daemon = True 1496 p.start() 1497 conn = l.accept() 1498 self.assertEqual(conn.recv(), 'hello') 1499 p.join() 1500 l.close() 1501 # 1502 # Test of sending connection and socket objects between processes 1503 # 1504 """ 1505 class _TestPicklingConnections(BaseTestCase): 1506 1507 ALLOWED_TYPES = ('processes',) 1508 1509 def _listener(self, conn, families): 1510 for fam in families: 1511 l = self.connection.Listener(family=fam) 1512 conn.send(l.address) 1513 new_conn = l.accept() 1514 conn.send(new_conn) 1515 1516 if self.TYPE == 'processes': 1517 l = socket.socket() 1518 l.bind(('localhost', 0)) 1519 conn.send(l.getsockname()) 1520 l.listen(1) 1521 new_conn, addr = l.accept() 1522 conn.send(new_conn) 1523 1524 conn.recv() 1525 1526 def _remote(self, conn): 1527 for (address, msg) in iter(conn.recv, None): 1528 client = self.connection.Client(address) 1529 client.send(msg.upper()) 1530 client.close() 1531 1532 if self.TYPE == 'processes': 1533 address, msg = conn.recv() 1534 client = socket.socket() 1535 client.connect(address) 1536 client.sendall(msg.upper()) 1537 client.close() 1538 1539 conn.close() 1540 1541 def test_pickling(self): 1542 try: 1543 multiprocessing.allow_connection_pickling() 1544 except ImportError: 1545 return 1546 1547 families = self.connection.families 1548 1549 lconn, lconn0 = self.Pipe() 1550 lp = self.Process(target=self._listener, args=(lconn0, families)) 1551 lp.start() 1552 lconn0.close() 1553 1554 rconn, rconn0 = self.Pipe() 1555 rp = self.Process(target=self._remote, args=(rconn0,)) 1556 rp.start() 1557 rconn0.close() 1558 1559 for fam in families: 1560 msg = ('This connection uses family %s' % fam).encode('ascii') 1561 address = lconn.recv() 1562 rconn.send((address, msg)) 1563 new_conn = lconn.recv() 1564 self.assertEqual(new_conn.recv(), msg.upper()) 1565 1566 rconn.send(None) 1567 1568 if self.TYPE == 'processes': 1569 msg = latin('This connection uses a normal socket') 1570 address = lconn.recv() 1571 rconn.send((address, msg)) 1572 if hasattr(socket, 'fromfd'): 1573 new_conn = lconn.recv() 1574 self.assertEqual(new_conn.recv(100), msg.upper()) 1575 else: 1576 # XXX On Windows with Py2.6 need to backport fromfd() 1577 discard = lconn.recv_bytes() 1578 1579 lconn.send(None) 1580 1581 rconn.close() 1582 lconn.close() 1583 1584 lp.join() 1585 rp.join() 1586 """ 1587 # 1588 # 1589 # 1590 1591 class _TestHeap(BaseTestCase): 1592 1593 ALLOWED_TYPES = ('processes',) 1594 1595 def test_heap(self): 1596 iterations = 5000 1597 maxblocks = 50 1598 blocks = [] 1599 1600 # create and destroy lots of blocks of different sizes 1601 for i in xrange(iterations): 1602 size = int(random.lognormvariate(0, 1) * 1000) 1603 b = multiprocessing.heap.BufferWrapper(size) 1604 blocks.append(b) 1605 if len(blocks) > maxblocks: 1606 i = random.randrange(maxblocks) 1607 del blocks[i] 1608 1609 # get the heap object 1610 heap = multiprocessing.heap.BufferWrapper._heap 1611 1612 # verify the state of the heap 1613 all = [] 1614 occupied = 0 1615 for L in heap._len_to_seq.values(): 1616 for arena, start, stop in L: 1617 all.append((heap._arenas.index(arena), start, stop, 1618 stop-start, 'free')) 1619 for arena, start, stop in heap._allocated_blocks: 1620 all.append((heap._arenas.index(arena), start, stop, 1621 stop-start, 'occupied')) 1622 occupied += (stop-start) 1623 1624 all.sort() 1625 1626 for i in range(len(all)-1): 1627 (arena, start, stop) = all[i][:3] 1628 (narena, nstart, nstop) = all[i+1][:3] 1629 self.assertTrue((arena != narena and nstart == 0) or 1630 (stop == nstart)) 1631 1632 # 1633 # 1634 # 1635 1636 class _Foo(Structure): 1637 _fields_ = [ 1638 ('x', c_int), 1639 ('y', c_double) 1640 ] 1641 1642 class _TestSharedCTypes(BaseTestCase): 1643 1644 ALLOWED_TYPES = ('processes',) 1645 1646 def setUp(self): 1647 if not HAS_SHAREDCTYPES: 1648 self.skipTest("requires multiprocessing.sharedctypes") 1649 1650 @classmethod 1651 def _double(cls, x, y, foo, arr, string): 1652 x.value *= 2 1653 y.value *= 2 1654 foo.x *= 2 1655 foo.y *= 2 1656 string.value *= 2 1657 for i in range(len(arr)): 1658 arr[i] *= 2 1659 1660 def test_sharedctypes(self, lock=False): 1661 x = Value('i', 7, lock=lock) 1662 y = Value(c_double, 1.0/3.0, lock=lock) 1663 foo = Value(_Foo, 3, 2, lock=lock) 1664 arr = self.Array('d', range(10), lock=lock) 1665 string = self.Array('c', 20, lock=lock) 1666 string.value = latin('hello') 1667 1668 p = self.Process(target=self._double, args=(x, y, foo, arr, string)) 1669 p.start() 1670 p.join() 1671 1672 self.assertEqual(x.value, 14) 1673 self.assertAlmostEqual(y.value, 2.0/3.0) 1674 self.assertEqual(foo.x, 6) 1675 self.assertAlmostEqual(foo.y, 4.0) 1676 for i in range(10): 1677 self.assertAlmostEqual(arr[i], i*2) 1678 self.assertEqual(string.value, latin('hellohello')) 1679 1680 def test_synchronize(self): 1681 self.test_sharedctypes(lock=True) 1682 1683 def test_copy(self): 1684 foo = _Foo(2, 5.0) 1685 bar = copy(foo) 1686 foo.x = 0 1687 foo.y = 0 1688 self.assertEqual(bar.x, 2) 1689 self.assertAlmostEqual(bar.y, 5.0) 1690 1691 # 1692 # 1693 # 1694 1695 class _TestFinalize(BaseTestCase): 1696 1697 ALLOWED_TYPES = ('processes',) 1698 1699 @classmethod 1700 def _test_finalize(cls, conn): 1701 class Foo(object): 1702 pass 1703 1704 a = Foo() 1705 util.Finalize(a, conn.send, args=('a',)) 1706 del a # triggers callback for a 1707 1708 b = Foo() 1709 close_b = util.Finalize(b, conn.send, args=('b',)) 1710 close_b() # triggers callback for b 1711 close_b() # does nothing because callback has already been called 1712 del b # does nothing because callback has already been called 1713 1714 c = Foo() 1715 util.Finalize(c, conn.send, args=('c',)) 1716 1717 d10 = Foo() 1718 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 1719 1720 d01 = Foo() 1721 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 1722 d02 = Foo() 1723 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 1724 d03 = Foo() 1725 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 1726 1727 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 1728 1729 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 1730 1731 # call multiprocessing's cleanup function then exit process without 1732 # garbage collecting locals 1733 util._exit_function() 1734 conn.close() 1735 os._exit(0) 1736 1737 def test_finalize(self): 1738 conn, child_conn = self.Pipe() 1739 1740 p = self.Process(target=self._test_finalize, args=(child_conn,)) 1741 p.start() 1742 p.join() 1743 1744 result = [obj for obj in iter(conn.recv, 'STOP')] 1745 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 1746 1747 # 1748 # Test that from ... import * works for each module 1749 # 1750 1751 class _TestImportStar(BaseTestCase): 1752 1753 ALLOWED_TYPES = ('processes',) 1754 1755 def test_import(self): 1756 modules = [ 1757 'multiprocessing', 'multiprocessing.connection', 1758 'multiprocessing.heap', 'multiprocessing.managers', 1759 'multiprocessing.pool', 'multiprocessing.process', 1760 'multiprocessing.reduction', 1761 'multiprocessing.synchronize', 'multiprocessing.util' 1762 ] 1763 1764 if c_int is not None: 1765 # This module requires _ctypes 1766 modules.append('multiprocessing.sharedctypes') 1767 1768 for name in modules: 1769 __import__(name) 1770 mod = sys.modules[name] 1771 1772 for attr in getattr(mod, '__all__', ()): 1773 self.assertTrue( 1774 hasattr(mod, attr), 1775 '%r does not have attribute %r' % (mod, attr) 1776 ) 1777 1778 # 1779 # Quick test that logging works -- does not test logging output 1780 # 1781 1782 class _TestLogging(BaseTestCase): 1783 1784 ALLOWED_TYPES = ('processes',) 1785 1786 def test_enable_logging(self): 1787 logger = multiprocessing.get_logger() 1788 logger.setLevel(util.SUBWARNING) 1789 self.assertTrue(logger is not None) 1790 logger.debug('this will not be printed') 1791 logger.info('nor will this') 1792 logger.setLevel(LOG_LEVEL) 1793 1794 @classmethod 1795 def _test_level(cls, conn): 1796 logger = multiprocessing.get_logger() 1797 conn.send(logger.getEffectiveLevel()) 1798 1799 def test_level(self): 1800 LEVEL1 = 32 1801 LEVEL2 = 37 1802 1803 logger = multiprocessing.get_logger() 1804 root_logger = logging.getLogger() 1805 root_level = root_logger.level 1806 1807 reader, writer = multiprocessing.Pipe(duplex=False) 1808 1809 logger.setLevel(LEVEL1) 1810 self.Process(target=self._test_level, args=(writer,)).start() 1811 self.assertEqual(LEVEL1, reader.recv()) 1812 1813 logger.setLevel(logging.NOTSET) 1814 root_logger.setLevel(LEVEL2) 1815 self.Process(target=self._test_level, args=(writer,)).start() 1816 self.assertEqual(LEVEL2, reader.recv()) 1817 1818 root_logger.setLevel(root_level) 1819 logger.setLevel(level=LOG_LEVEL) 1820 1821 1822 # class _TestLoggingProcessName(BaseTestCase): 1823 # 1824 # def handle(self, record): 1825 # assert record.processName == multiprocessing.current_process().name 1826 # self.__handled = True 1827 # 1828 # def test_logging(self): 1829 # handler = logging.Handler() 1830 # handler.handle = self.handle 1831 # self.__handled = False 1832 # # Bypass getLogger() and side-effects 1833 # logger = logging.getLoggerClass()( 1834 # 'multiprocessing.test.TestLoggingProcessName') 1835 # logger.addHandler(handler) 1836 # logger.propagate = False 1837 # 1838 # logger.warn('foo') 1839 # assert self.__handled 1840 1841 # 1842 # Test to verify handle verification, see issue 3321 1843 # 1844 1845 class TestInvalidHandle(unittest.TestCase): 1846 1847 @unittest.skipIf(WIN32, "skipped on Windows") 1848 def test_invalid_handles(self): 1849 conn = _multiprocessing.Connection(44977608) 1850 self.assertRaises(IOError, conn.poll) 1851 self.assertRaises(IOError, _multiprocessing.Connection, -1) 1852 1853 # 1854 # Functions used to create test cases from the base ones in this module 1855 # 1856 1857 def get_attributes(Source, names): 1858 d = {} 1859 for name in names: 1860 obj = getattr(Source, name) 1861 if type(obj) == type(get_attributes): 1862 obj = staticmethod(obj) 1863 d[name] = obj 1864 return d 1865 1866 def create_test_cases(Mixin, type): 1867 result = {} 1868 glob = globals() 1869 Type = type.capitalize() 1870 1871 for name in glob.keys(): 1872 if name.startswith('_Test'): 1873 base = glob[name] 1874 if type in base.ALLOWED_TYPES: 1875 newname = 'With' + Type + name[1:] 1876 class Temp(base, unittest.TestCase, Mixin): 1877 pass 1878 result[newname] = Temp 1879 Temp.__name__ = newname 1880 Temp.__module__ = Mixin.__module__ 1881 return result 1882 1883 # 1884 # Create test cases 1885 # 1886 1887 class ProcessesMixin(object): 1888 TYPE = 'processes' 1889 Process = multiprocessing.Process 1890 locals().update(get_attributes(multiprocessing, ( 1891 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 1892 'Condition', 'Event', 'Value', 'Array', 'RawValue', 1893 'RawArray', 'current_process', 'active_children', 'Pipe', 1894 'connection', 'JoinableQueue' 1895 ))) 1896 1897 testcases_processes = create_test_cases(ProcessesMixin, type='processes') 1898 globals().update(testcases_processes) 1899 1900 1901 class ManagerMixin(object): 1902 TYPE = 'manager' 1903 Process = multiprocessing.Process 1904 manager = object.__new__(multiprocessing.managers.SyncManager) 1905 locals().update(get_attributes(manager, ( 1906 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 1907 'Condition', 'Event', 'Value', 'Array', 'list', 'dict', 1908 'Namespace', 'JoinableQueue' 1909 ))) 1910 1911 testcases_manager = create_test_cases(ManagerMixin, type='manager') 1912 globals().update(testcases_manager) 1913 1914 1915 class ThreadsMixin(object): 1916 TYPE = 'threads' 1917 Process = multiprocessing.dummy.Process 1918 locals().update(get_attributes(multiprocessing.dummy, ( 1919 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 1920 'Condition', 'Event', 'Value', 'Array', 'current_process', 1921 'active_children', 'Pipe', 'connection', 'dict', 'list', 1922 'Namespace', 'JoinableQueue' 1923 ))) 1924 1925 testcases_threads = create_test_cases(ThreadsMixin, type='threads') 1926 globals().update(testcases_threads) 1927 1928 class OtherTest(unittest.TestCase): 1929 # TODO: add more tests for deliver/answer challenge. 1930 def test_deliver_challenge_auth_failure(self): 1931 class _FakeConnection(object): 1932 def recv_bytes(self, size): 1933 return b'something bogus' 1934 def send_bytes(self, data): 1935 pass 1936 self.assertRaises(multiprocessing.AuthenticationError, 1937 multiprocessing.connection.deliver_challenge, 1938 _FakeConnection(), b'abc') 1939 1940 def test_answer_challenge_auth_failure(self): 1941 class _FakeConnection(object): 1942 def __init__(self): 1943 self.count = 0 1944 def recv_bytes(self, size): 1945 self.count += 1 1946 if self.count == 1: 1947 return multiprocessing.connection.CHALLENGE 1948 elif self.count == 2: 1949 return b'something bogus' 1950 return b'' 1951 def send_bytes(self, data): 1952 pass 1953 self.assertRaises(multiprocessing.AuthenticationError, 1954 multiprocessing.connection.answer_challenge, 1955 _FakeConnection(), b'abc') 1956 1957 # 1958 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 1959 # 1960 1961 def initializer(ns): 1962 ns.test += 1 1963 1964 class TestInitializers(unittest.TestCase): 1965 def setUp(self): 1966 self.mgr = multiprocessing.Manager() 1967 self.ns = self.mgr.Namespace() 1968 self.ns.test = 0 1969 1970 def tearDown(self): 1971 self.mgr.shutdown() 1972 1973 def test_manager_initializer(self): 1974 m = multiprocessing.managers.SyncManager() 1975 self.assertRaises(TypeError, m.start, 1) 1976 m.start(initializer, (self.ns,)) 1977 self.assertEqual(self.ns.test, 1) 1978 m.shutdown() 1979 1980 def test_pool_initializer(self): 1981 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 1982 p = multiprocessing.Pool(1, initializer, (self.ns,)) 1983 p.close() 1984 p.join() 1985 self.assertEqual(self.ns.test, 1) 1986 1987 # 1988 # Issue 5155, 5313, 5331: Test process in processes 1989 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 1990 # 1991 1992 def _ThisSubProcess(q): 1993 try: 1994 item = q.get(block=False) 1995 except Queue.Empty: 1996 pass 1997 1998 def _TestProcess(q): 1999 queue = multiprocessing.Queue() 2000 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,)) 2001 subProc.start() 2002 subProc.join() 2003 2004 def _afunc(x): 2005 return x*x 2006 2007 def pool_in_process(): 2008 pool = multiprocessing.Pool(processes=4) 2009 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 2010 2011 class _file_like(object): 2012 def __init__(self, delegate): 2013 self._delegate = delegate 2014 self._pid = None 2015 2016 @property 2017 def cache(self): 2018 pid = os.getpid() 2019 # There are no race conditions since fork keeps only the running thread 2020 if pid != self._pid: 2021 self._pid = pid 2022 self._cache = [] 2023 return self._cache 2024 2025 def write(self, data): 2026 self.cache.append(data) 2027 2028 def flush(self): 2029 self._delegate.write(''.join(self.cache)) 2030 self._cache = [] 2031 2032 class TestStdinBadfiledescriptor(unittest.TestCase): 2033 2034 def test_queue_in_process(self): 2035 queue = multiprocessing.Queue() 2036 proc = multiprocessing.Process(target=_TestProcess, args=(queue,)) 2037 proc.start() 2038 proc.join() 2039 2040 def test_pool_in_process(self): 2041 p = multiprocessing.Process(target=pool_in_process) 2042 p.start() 2043 p.join() 2044 2045 def test_flushing(self): 2046 sio = StringIO() 2047 flike = _file_like(sio) 2048 flike.write('foo') 2049 proc = multiprocessing.Process(target=lambda: flike.flush()) 2050 flike.flush() 2051 assert sio.getvalue() == 'foo' 2052 2053 testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, 2054 TestStdinBadfiledescriptor] 2055 2056 # 2057 # 2058 # 2059 2060 def test_main(run=None): 2061 if sys.platform.startswith("linux"): 2062 try: 2063 lock = multiprocessing.RLock() 2064 except OSError: 2065 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!") 2066 2067 if run is None: 2068 from test.test_support import run_unittest as run 2069 2070 util.get_temp_dir() # creates temp directory for use by all processes 2071 2072 multiprocessing.get_logger().setLevel(LOG_LEVEL) 2073 2074 ProcessesMixin.pool = multiprocessing.Pool(4) 2075 ThreadsMixin.pool = multiprocessing.dummy.Pool(4) 2076 ManagerMixin.manager.__init__() 2077 ManagerMixin.manager.start() 2078 ManagerMixin.pool = ManagerMixin.manager.Pool(4) 2079 2080 testcases = ( 2081 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + 2082 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + 2083 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) + 2084 testcases_other 2085 ) 2086 2087 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase 2088 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) 2089 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading 2090 # module during these tests is at least platform dependent and possibly 2091 # non-deterministic on any given platform. So we don't mind if the listed 2092 # warnings aren't actually raised. 2093 with test_support.check_py3k_warnings( 2094 (".+__(get|set)slice__ has been removed", DeprecationWarning), 2095 (r"sys.exc_clear\(\) not supported", DeprecationWarning), 2096 quiet=True): 2097 run(suite) 2098 2099 ThreadsMixin.pool.terminate() 2100 ProcessesMixin.pool.terminate() 2101 ManagerMixin.pool.terminate() 2102 ManagerMixin.manager.shutdown() 2103 2104 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool 2105 2106 def main(): 2107 test_main(unittest.TextTestRunner(verbosity=2).run) 2108 2109 if __name__ == '__main__': 2110 main() 2111