1 # 2 # Module providing the `SyncManager` class for dealing 3 # with shared objects 4 # 5 # multiprocessing/managers.py 6 # 7 # Copyright (c) 2006-2008, R Oudkerk 8 # All rights reserved. 9 # 10 # Redistribution and use in source and binary forms, with or without 11 # modification, are permitted provided that the following conditions 12 # are met: 13 # 14 # 1. Redistributions of source code must retain the above copyright 15 # notice, this list of conditions and the following disclaimer. 16 # 2. Redistributions in binary form must reproduce the above copyright 17 # notice, this list of conditions and the following disclaimer in the 18 # documentation and/or other materials provided with the distribution. 19 # 3. Neither the name of author nor the names of any contributors may be 20 # used to endorse or promote products derived from this software 21 # without specific prior written permission. 22 # 23 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 24 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 27 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 29 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 30 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 32 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 # SUCH DAMAGE. 34 # 35 36 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] 37 38 # 39 # Imports 40 # 41 42 import os 43 import sys 44 import weakref 45 import threading 46 import array 47 import Queue 48 49 from traceback import format_exc 50 from multiprocessing import Process, current_process, active_children, Pool, util, connection 51 from multiprocessing.process import AuthenticationString 52 from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler 53 from multiprocessing.util import Finalize, info 54 55 try: 56 from cPickle import PicklingError 57 except ImportError: 58 from pickle import PicklingError 59 60 # 61 # Register some things for pickling 62 # 63 64 def reduce_array(a): 65 return array.array, (a.typecode, a.tostring()) 66 ForkingPickler.register(array.array, reduce_array) 67 68 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 69 70 # 71 # Type for identifying shared objects 72 # 73 74 class Token(object): 75 ''' 76 Type to uniquely indentify a shared object 77 ''' 78 __slots__ = ('typeid', 'address', 'id') 79 80 def __init__(self, typeid, address, id): 81 (self.typeid, self.address, self.id) = (typeid, address, id) 82 83 def __getstate__(self): 84 return (self.typeid, self.address, self.id) 85 86 def __setstate__(self, state): 87 (self.typeid, self.address, self.id) = state 88 89 def __repr__(self): 90 return 'Token(typeid=%r, address=%r, id=%r)' % \ 91 (self.typeid, self.address, self.id) 92 93 # 94 # Function for communication with a manager's server process 95 # 96 97 def dispatch(c, id, methodname, args=(), kwds={}): 98 ''' 99 Send a message to manager using connection `c` and return response 100 ''' 101 c.send((id, methodname, args, kwds)) 102 kind, result = c.recv() 103 if kind == '#RETURN': 104 return result 105 raise convert_to_error(kind, result) 106 107 def convert_to_error(kind, result): 108 if kind == '#ERROR': 109 return result 110 elif kind == '#TRACEBACK': 111 assert type(result) is str 112 return RemoteError(result) 113 elif kind == '#UNSERIALIZABLE': 114 assert type(result) is str 115 return RemoteError('Unserializable message: %s\n' % result) 116 else: 117 return ValueError('Unrecognized message type') 118 119 class RemoteError(Exception): 120 def __str__(self): 121 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 122 123 # 124 # Functions for finding the method names of an object 125 # 126 127 def all_methods(obj): 128 ''' 129 Return a list of names of methods of `obj` 130 ''' 131 temp = [] 132 for name in dir(obj): 133 func = getattr(obj, name) 134 if hasattr(func, '__call__'): 135 temp.append(name) 136 return temp 137 138 def public_methods(obj): 139 ''' 140 Return a list of names of methods of `obj` which do not start with '_' 141 ''' 142 return [name for name in all_methods(obj) if name[0] != '_'] 143 144 # 145 # Server which is run in a process controlled by a manager 146 # 147 148 class Server(object): 149 ''' 150 Server class which runs in a process controlled by a manager object 151 ''' 152 public = ['shutdown', 'create', 'accept_connection', 'get_methods', 153 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 154 155 def __init__(self, registry, address, authkey, serializer): 156 assert isinstance(authkey, bytes) 157 self.registry = registry 158 self.authkey = AuthenticationString(authkey) 159 Listener, Client = listener_client[serializer] 160 161 # do authentication later 162 self.listener = Listener(address=address, backlog=16) 163 self.address = self.listener.address 164 165 self.id_to_obj = {'0': (None, ())} 166 self.id_to_refcount = {} 167 self.mutex = threading.RLock() 168 self.stop = 0 169 170 def serve_forever(self): 171 ''' 172 Run the server forever 173 ''' 174 current_process()._manager_server = self 175 try: 176 try: 177 while 1: 178 try: 179 c = self.listener.accept() 180 except (OSError, IOError): 181 continue 182 t = threading.Thread(target=self.handle_request, args=(c,)) 183 t.daemon = True 184 t.start() 185 except (KeyboardInterrupt, SystemExit): 186 pass 187 finally: 188 self.stop = 999 189 self.listener.close() 190 191 def handle_request(self, c): 192 ''' 193 Handle a new connection 194 ''' 195 funcname = result = request = None 196 try: 197 connection.deliver_challenge(c, self.authkey) 198 connection.answer_challenge(c, self.authkey) 199 request = c.recv() 200 ignore, funcname, args, kwds = request 201 assert funcname in self.public, '%r unrecognized' % funcname 202 func = getattr(self, funcname) 203 except Exception: 204 msg = ('#TRACEBACK', format_exc()) 205 else: 206 try: 207 result = func(c, *args, **kwds) 208 except Exception: 209 msg = ('#TRACEBACK', format_exc()) 210 else: 211 msg = ('#RETURN', result) 212 try: 213 c.send(msg) 214 except Exception, e: 215 try: 216 c.send(('#TRACEBACK', format_exc())) 217 except Exception: 218 pass 219 util.info('Failure to send message: %r', msg) 220 util.info(' ... request was %r', request) 221 util.info(' ... exception was %r', e) 222 223 c.close() 224 225 def serve_client(self, conn): 226 ''' 227 Handle requests from the proxies in a particular process/thread 228 ''' 229 util.debug('starting server thread to service %r', 230 threading.current_thread().name) 231 232 recv = conn.recv 233 send = conn.send 234 id_to_obj = self.id_to_obj 235 236 while not self.stop: 237 238 try: 239 methodname = obj = None 240 request = recv() 241 ident, methodname, args, kwds = request 242 obj, exposed, gettypeid = id_to_obj[ident] 243 244 if methodname not in exposed: 245 raise AttributeError( 246 'method %r of %r object is not in exposed=%r' % 247 (methodname, type(obj), exposed) 248 ) 249 250 function = getattr(obj, methodname) 251 252 try: 253 res = function(*args, **kwds) 254 except Exception, e: 255 msg = ('#ERROR', e) 256 else: 257 typeid = gettypeid and gettypeid.get(methodname, None) 258 if typeid: 259 rident, rexposed = self.create(conn, typeid, res) 260 token = Token(typeid, self.address, rident) 261 msg = ('#PROXY', (rexposed, token)) 262 else: 263 msg = ('#RETURN', res) 264 265 except AttributeError: 266 if methodname is None: 267 msg = ('#TRACEBACK', format_exc()) 268 else: 269 try: 270 fallback_func = self.fallback_mapping[methodname] 271 result = fallback_func( 272 self, conn, ident, obj, *args, **kwds 273 ) 274 msg = ('#RETURN', result) 275 except Exception: 276 msg = ('#TRACEBACK', format_exc()) 277 278 except EOFError: 279 util.debug('got EOF -- exiting thread serving %r', 280 threading.current_thread().name) 281 sys.exit(0) 282 283 except Exception: 284 msg = ('#TRACEBACK', format_exc()) 285 286 try: 287 try: 288 send(msg) 289 except Exception, e: 290 send(('#UNSERIALIZABLE', repr(msg))) 291 except Exception, e: 292 util.info('exception in thread serving %r', 293 threading.current_thread().name) 294 util.info(' ... message was %r', msg) 295 util.info(' ... exception was %r', e) 296 conn.close() 297 sys.exit(1) 298 299 def fallback_getvalue(self, conn, ident, obj): 300 return obj 301 302 def fallback_str(self, conn, ident, obj): 303 return str(obj) 304 305 def fallback_repr(self, conn, ident, obj): 306 return repr(obj) 307 308 fallback_mapping = { 309 '__str__':fallback_str, 310 '__repr__':fallback_repr, 311 '#GETVALUE':fallback_getvalue 312 } 313 314 def dummy(self, c): 315 pass 316 317 def debug_info(self, c): 318 ''' 319 Return some info --- useful to spot problems with refcounting 320 ''' 321 self.mutex.acquire() 322 try: 323 result = [] 324 keys = self.id_to_obj.keys() 325 keys.sort() 326 for ident in keys: 327 if ident != '0': 328 result.append(' %s: refcount=%s\n %s' % 329 (ident, self.id_to_refcount[ident], 330 str(self.id_to_obj[ident][0])[:75])) 331 return '\n'.join(result) 332 finally: 333 self.mutex.release() 334 335 def number_of_objects(self, c): 336 ''' 337 Number of shared objects 338 ''' 339 return len(self.id_to_obj) - 1 # don't count ident='0' 340 341 def shutdown(self, c): 342 ''' 343 Shutdown this process 344 ''' 345 try: 346 try: 347 util.debug('manager received shutdown message') 348 c.send(('#RETURN', None)) 349 350 if sys.stdout != sys.__stdout__: 351 util.debug('resetting stdout, stderr') 352 sys.stdout = sys.__stdout__ 353 sys.stderr = sys.__stderr__ 354 355 util._run_finalizers(0) 356 357 for p in active_children(): 358 util.debug('terminating a child process of manager') 359 p.terminate() 360 361 for p in active_children(): 362 util.debug('terminating a child process of manager') 363 p.join() 364 365 util._run_finalizers() 366 util.info('manager exiting with exitcode 0') 367 except: 368 import traceback 369 traceback.print_exc() 370 finally: 371 exit(0) 372 373 def create(self, c, typeid, *args, **kwds): 374 ''' 375 Create a new shared object and return its id 376 ''' 377 self.mutex.acquire() 378 try: 379 callable, exposed, method_to_typeid, proxytype = \ 380 self.registry[typeid] 381 382 if callable is None: 383 assert len(args) == 1 and not kwds 384 obj = args[0] 385 else: 386 obj = callable(*args, **kwds) 387 388 if exposed is None: 389 exposed = public_methods(obj) 390 if method_to_typeid is not None: 391 assert type(method_to_typeid) is dict 392 exposed = list(exposed) + list(method_to_typeid) 393 394 ident = '%x' % id(obj) # convert to string because xmlrpclib 395 # only has 32 bit signed integers 396 util.debug('%r callable returned object with id %r', typeid, ident) 397 398 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 399 if ident not in self.id_to_refcount: 400 self.id_to_refcount[ident] = 0 401 # increment the reference count immediately, to avoid 402 # this object being garbage collected before a Proxy 403 # object for it can be created. The caller of create() 404 # is responsible for doing a decref once the Proxy object 405 # has been created. 406 self.incref(c, ident) 407 return ident, tuple(exposed) 408 finally: 409 self.mutex.release() 410 411 def get_methods(self, c, token): 412 ''' 413 Return the methods of the shared object indicated by token 414 ''' 415 return tuple(self.id_to_obj[token.id][1]) 416 417 def accept_connection(self, c, name): 418 ''' 419 Spawn a new thread to serve this connection 420 ''' 421 threading.current_thread().name = name 422 c.send(('#RETURN', None)) 423 self.serve_client(c) 424 425 def incref(self, c, ident): 426 self.mutex.acquire() 427 try: 428 self.id_to_refcount[ident] += 1 429 finally: 430 self.mutex.release() 431 432 def decref(self, c, ident): 433 self.mutex.acquire() 434 try: 435 assert self.id_to_refcount[ident] >= 1 436 self.id_to_refcount[ident] -= 1 437 if self.id_to_refcount[ident] == 0: 438 del self.id_to_obj[ident], self.id_to_refcount[ident] 439 util.debug('disposing of obj with id %r', ident) 440 finally: 441 self.mutex.release() 442 443 # 444 # Class to represent state of a manager 445 # 446 447 class State(object): 448 __slots__ = ['value'] 449 INITIAL = 0 450 STARTED = 1 451 SHUTDOWN = 2 452 453 # 454 # Mapping from serializer name to Listener and Client types 455 # 456 457 listener_client = { 458 'pickle' : (connection.Listener, connection.Client), 459 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 460 } 461 462 # 463 # Definition of BaseManager 464 # 465 466 class BaseManager(object): 467 ''' 468 Base class for managers 469 ''' 470 _registry = {} 471 _Server = Server 472 473 def __init__(self, address=None, authkey=None, serializer='pickle'): 474 if authkey is None: 475 authkey = current_process().authkey 476 self._address = address # XXX not final address if eg ('', 0) 477 self._authkey = AuthenticationString(authkey) 478 self._state = State() 479 self._state.value = State.INITIAL 480 self._serializer = serializer 481 self._Listener, self._Client = listener_client[serializer] 482 483 def __reduce__(self): 484 return type(self).from_address, \ 485 (self._address, self._authkey, self._serializer) 486 487 def get_server(self): 488 ''' 489 Return server object with serve_forever() method and address attribute 490 ''' 491 assert self._state.value == State.INITIAL 492 return Server(self._registry, self._address, 493 self._authkey, self._serializer) 494 495 def connect(self): 496 ''' 497 Connect manager object to the server process 498 ''' 499 Listener, Client = listener_client[self._serializer] 500 conn = Client(self._address, authkey=self._authkey) 501 dispatch(conn, None, 'dummy') 502 self._state.value = State.STARTED 503 504 def start(self, initializer=None, initargs=()): 505 ''' 506 Spawn a server process for this manager object 507 ''' 508 assert self._state.value == State.INITIAL 509 510 if initializer is not None and not hasattr(initializer, '__call__'): 511 raise TypeError('initializer must be a callable') 512 513 # pipe over which we will retrieve address of server 514 reader, writer = connection.Pipe(duplex=False) 515 516 # spawn process which runs a server 517 self._process = Process( 518 target=type(self)._run_server, 519 args=(self._registry, self._address, self._authkey, 520 self._serializer, writer, initializer, initargs), 521 ) 522 ident = ':'.join(str(i) for i in self._process._identity) 523 self._process.name = type(self).__name__ + '-' + ident 524 self._process.start() 525 526 # get address of server 527 writer.close() 528 self._address = reader.recv() 529 reader.close() 530 531 # register a finalizer 532 self._state.value = State.STARTED 533 self.shutdown = util.Finalize( 534 self, type(self)._finalize_manager, 535 args=(self._process, self._address, self._authkey, 536 self._state, self._Client), 537 exitpriority=0 538 ) 539 540 @classmethod 541 def _run_server(cls, registry, address, authkey, serializer, writer, 542 initializer=None, initargs=()): 543 ''' 544 Create a server, report its address and run it 545 ''' 546 if initializer is not None: 547 initializer(*initargs) 548 549 # create server 550 server = cls._Server(registry, address, authkey, serializer) 551 552 # inform parent process of the server's address 553 writer.send(server.address) 554 writer.close() 555 556 # run the manager 557 util.info('manager serving at %r', server.address) 558 server.serve_forever() 559 560 def _create(self, typeid, *args, **kwds): 561 ''' 562 Create a new shared object; return the token and exposed tuple 563 ''' 564 assert self._state.value == State.STARTED, 'server not yet started' 565 conn = self._Client(self._address, authkey=self._authkey) 566 try: 567 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 568 finally: 569 conn.close() 570 return Token(typeid, self._address, id), exposed 571 572 def join(self, timeout=None): 573 ''' 574 Join the manager process (if it has been spawned) 575 ''' 576 self._process.join(timeout) 577 578 def _debug_info(self): 579 ''' 580 Return some info about the servers shared objects and connections 581 ''' 582 conn = self._Client(self._address, authkey=self._authkey) 583 try: 584 return dispatch(conn, None, 'debug_info') 585 finally: 586 conn.close() 587 588 def _number_of_objects(self): 589 ''' 590 Return the number of shared objects 591 ''' 592 conn = self._Client(self._address, authkey=self._authkey) 593 try: 594 return dispatch(conn, None, 'number_of_objects') 595 finally: 596 conn.close() 597 598 def __enter__(self): 599 return self 600 601 def __exit__(self, exc_type, exc_val, exc_tb): 602 self.shutdown() 603 604 @staticmethod 605 def _finalize_manager(process, address, authkey, state, _Client): 606 ''' 607 Shutdown the manager process; will be registered as a finalizer 608 ''' 609 if process.is_alive(): 610 util.info('sending shutdown message to manager') 611 try: 612 conn = _Client(address, authkey=authkey) 613 try: 614 dispatch(conn, None, 'shutdown') 615 finally: 616 conn.close() 617 except Exception: 618 pass 619 620 process.join(timeout=0.2) 621 if process.is_alive(): 622 util.info('manager still alive') 623 if hasattr(process, 'terminate'): 624 util.info('trying to `terminate()` manager process') 625 process.terminate() 626 process.join(timeout=0.1) 627 if process.is_alive(): 628 util.info('manager still alive after terminate') 629 630 state.value = State.SHUTDOWN 631 try: 632 del BaseProxy._address_to_local[address] 633 except KeyError: 634 pass 635 636 address = property(lambda self: self._address) 637 638 @classmethod 639 def register(cls, typeid, callable=None, proxytype=None, exposed=None, 640 method_to_typeid=None, create_method=True): 641 ''' 642 Register a typeid with the manager type 643 ''' 644 if '_registry' not in cls.__dict__: 645 cls._registry = cls._registry.copy() 646 647 if proxytype is None: 648 proxytype = AutoProxy 649 650 exposed = exposed or getattr(proxytype, '_exposed_', None) 651 652 method_to_typeid = method_to_typeid or \ 653 getattr(proxytype, '_method_to_typeid_', None) 654 655 if method_to_typeid: 656 for key, value in method_to_typeid.items(): 657 assert type(key) is str, '%r is not a string' % key 658 assert type(value) is str, '%r is not a string' % value 659 660 cls._registry[typeid] = ( 661 callable, exposed, method_to_typeid, proxytype 662 ) 663 664 if create_method: 665 def temp(self, *args, **kwds): 666 util.debug('requesting creation of a shared %r object', typeid) 667 token, exp = self._create(typeid, *args, **kwds) 668 proxy = proxytype( 669 token, self._serializer, manager=self, 670 authkey=self._authkey, exposed=exp 671 ) 672 conn = self._Client(token.address, authkey=self._authkey) 673 dispatch(conn, None, 'decref', (token.id,)) 674 return proxy 675 temp.__name__ = typeid 676 setattr(cls, typeid, temp) 677 678 # 679 # Subclass of set which get cleared after a fork 680 # 681 682 class ProcessLocalSet(set): 683 def __init__(self): 684 util.register_after_fork(self, lambda obj: obj.clear()) 685 def __reduce__(self): 686 return type(self), () 687 688 # 689 # Definition of BaseProxy 690 # 691 692 class BaseProxy(object): 693 ''' 694 A base for proxies of shared objects 695 ''' 696 _address_to_local = {} 697 _mutex = util.ForkAwareThreadLock() 698 699 def __init__(self, token, serializer, manager=None, 700 authkey=None, exposed=None, incref=True): 701 BaseProxy._mutex.acquire() 702 try: 703 tls_idset = BaseProxy._address_to_local.get(token.address, None) 704 if tls_idset is None: 705 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() 706 BaseProxy._address_to_local[token.address] = tls_idset 707 finally: 708 BaseProxy._mutex.release() 709 710 # self._tls is used to record the connection used by this 711 # thread to communicate with the manager at token.address 712 self._tls = tls_idset[0] 713 714 # self._idset is used to record the identities of all shared 715 # objects for which the current process owns references and 716 # which are in the manager at token.address 717 self._idset = tls_idset[1] 718 719 self._token = token 720 self._id = self._token.id 721 self._manager = manager 722 self._serializer = serializer 723 self._Client = listener_client[serializer][1] 724 725 if authkey is not None: 726 self._authkey = AuthenticationString(authkey) 727 elif self._manager is not None: 728 self._authkey = self._manager._authkey 729 else: 730 self._authkey = current_process().authkey 731 732 if incref: 733 self._incref() 734 735 util.register_after_fork(self, BaseProxy._after_fork) 736 737 def _connect(self): 738 util.debug('making connection to manager') 739 name = current_process().name 740 if threading.current_thread().name != 'MainThread': 741 name += '|' + threading.current_thread().name 742 conn = self._Client(self._token.address, authkey=self._authkey) 743 dispatch(conn, None, 'accept_connection', (name,)) 744 self._tls.connection = conn 745 746 def _callmethod(self, methodname, args=(), kwds={}): 747 ''' 748 Try to call a method of the referrent and return a copy of the result 749 ''' 750 try: 751 conn = self._tls.connection 752 except AttributeError: 753 util.debug('thread %r does not own a connection', 754 threading.current_thread().name) 755 self._connect() 756 conn = self._tls.connection 757 758 conn.send((self._id, methodname, args, kwds)) 759 kind, result = conn.recv() 760 761 if kind == '#RETURN': 762 return result 763 elif kind == '#PROXY': 764 exposed, token = result 765 proxytype = self._manager._registry[token.typeid][-1] 766 proxy = proxytype( 767 token, self._serializer, manager=self._manager, 768 authkey=self._authkey, exposed=exposed 769 ) 770 conn = self._Client(token.address, authkey=self._authkey) 771 dispatch(conn, None, 'decref', (token.id,)) 772 return proxy 773 raise convert_to_error(kind, result) 774 775 def _getvalue(self): 776 ''' 777 Get a copy of the value of the referent 778 ''' 779 return self._callmethod('#GETVALUE') 780 781 def _incref(self): 782 conn = self._Client(self._token.address, authkey=self._authkey) 783 dispatch(conn, None, 'incref', (self._id,)) 784 util.debug('INCREF %r', self._token.id) 785 786 self._idset.add(self._id) 787 788 state = self._manager and self._manager._state 789 790 self._close = util.Finalize( 791 self, BaseProxy._decref, 792 args=(self._token, self._authkey, state, 793 self._tls, self._idset, self._Client), 794 exitpriority=10 795 ) 796 797 @staticmethod 798 def _decref(token, authkey, state, tls, idset, _Client): 799 idset.discard(token.id) 800 801 # check whether manager is still alive 802 if state is None or state.value == State.STARTED: 803 # tell manager this process no longer cares about referent 804 try: 805 util.debug('DECREF %r', token.id) 806 conn = _Client(token.address, authkey=authkey) 807 dispatch(conn, None, 'decref', (token.id,)) 808 except Exception, e: 809 util.debug('... decref failed %s', e) 810 811 else: 812 util.debug('DECREF %r -- manager already shutdown', token.id) 813 814 # check whether we can close this thread's connection because 815 # the process owns no more references to objects for this manager 816 if not idset and hasattr(tls, 'connection'): 817 util.debug('thread %r has no more proxies so closing conn', 818 threading.current_thread().name) 819 tls.connection.close() 820 del tls.connection 821 822 def _after_fork(self): 823 self._manager = None 824 try: 825 self._incref() 826 except Exception, e: 827 # the proxy may just be for a manager which has shutdown 828 util.info('incref failed: %s' % e) 829 830 def __reduce__(self): 831 kwds = {} 832 if Popen.thread_is_spawning(): 833 kwds['authkey'] = self._authkey 834 835 if getattr(self, '_isauto', False): 836 kwds['exposed'] = self._exposed_ 837 return (RebuildProxy, 838 (AutoProxy, self._token, self._serializer, kwds)) 839 else: 840 return (RebuildProxy, 841 (type(self), self._token, self._serializer, kwds)) 842 843 def __deepcopy__(self, memo): 844 return self._getvalue() 845 846 def __repr__(self): 847 return '<%s object, typeid %r at %s>' % \ 848 (type(self).__name__, self._token.typeid, '0x%x' % id(self)) 849 850 def __str__(self): 851 ''' 852 Return representation of the referent (or a fall-back if that fails) 853 ''' 854 try: 855 return self._callmethod('__repr__') 856 except Exception: 857 return repr(self)[:-1] + "; '__str__()' failed>" 858 859 # 860 # Function used for unpickling 861 # 862 863 def RebuildProxy(func, token, serializer, kwds): 864 ''' 865 Function used for unpickling proxy objects. 866 867 If possible the shared object is returned, or otherwise a proxy for it. 868 ''' 869 server = getattr(current_process(), '_manager_server', None) 870 871 if server and server.address == token.address: 872 return server.id_to_obj[token.id][0] 873 else: 874 incref = ( 875 kwds.pop('incref', True) and 876 not getattr(current_process(), '_inheriting', False) 877 ) 878 return func(token, serializer, incref=incref, **kwds) 879 880 # 881 # Functions to create proxies and proxy types 882 # 883 884 def MakeProxyType(name, exposed, _cache={}): 885 ''' 886 Return an proxy type whose methods are given by `exposed` 887 ''' 888 exposed = tuple(exposed) 889 try: 890 return _cache[(name, exposed)] 891 except KeyError: 892 pass 893 894 dic = {} 895 896 for meth in exposed: 897 exec '''def %s(self, *args, **kwds): 898 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic 899 900 ProxyType = type(name, (BaseProxy,), dic) 901 ProxyType._exposed_ = exposed 902 _cache[(name, exposed)] = ProxyType 903 return ProxyType 904 905 906 def AutoProxy(token, serializer, manager=None, authkey=None, 907 exposed=None, incref=True): 908 ''' 909 Return an auto-proxy for `token` 910 ''' 911 _Client = listener_client[serializer][1] 912 913 if exposed is None: 914 conn = _Client(token.address, authkey=authkey) 915 try: 916 exposed = dispatch(conn, None, 'get_methods', (token,)) 917 finally: 918 conn.close() 919 920 if authkey is None and manager is not None: 921 authkey = manager._authkey 922 if authkey is None: 923 authkey = current_process().authkey 924 925 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 926 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 927 incref=incref) 928 proxy._isauto = True 929 return proxy 930 931 # 932 # Types/callables which we will register with SyncManager 933 # 934 935 class Namespace(object): 936 def __init__(self, **kwds): 937 self.__dict__.update(kwds) 938 def __repr__(self): 939 items = self.__dict__.items() 940 temp = [] 941 for name, value in items: 942 if not name.startswith('_'): 943 temp.append('%s=%r' % (name, value)) 944 temp.sort() 945 return 'Namespace(%s)' % str.join(', ', temp) 946 947 class Value(object): 948 def __init__(self, typecode, value, lock=True): 949 self._typecode = typecode 950 self._value = value 951 def get(self): 952 return self._value 953 def set(self, value): 954 self._value = value 955 def __repr__(self): 956 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 957 value = property(get, set) 958 959 def Array(typecode, sequence, lock=True): 960 return array.array(typecode, sequence) 961 962 # 963 # Proxy types used by SyncManager 964 # 965 966 class IteratorProxy(BaseProxy): 967 # XXX remove methods for Py3.0 and Py2.6 968 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close') 969 def __iter__(self): 970 return self 971 def __next__(self, *args): 972 return self._callmethod('__next__', args) 973 def next(self, *args): 974 return self._callmethod('next', args) 975 def send(self, *args): 976 return self._callmethod('send', args) 977 def throw(self, *args): 978 return self._callmethod('throw', args) 979 def close(self, *args): 980 return self._callmethod('close', args) 981 982 983 class AcquirerProxy(BaseProxy): 984 _exposed_ = ('acquire', 'release') 985 def acquire(self, blocking=True): 986 return self._callmethod('acquire', (blocking,)) 987 def release(self): 988 return self._callmethod('release') 989 def __enter__(self): 990 return self._callmethod('acquire') 991 def __exit__(self, exc_type, exc_val, exc_tb): 992 return self._callmethod('release') 993 994 995 class ConditionProxy(AcquirerProxy): 996 # XXX will Condition.notfyAll() name be available in Py3.0? 997 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 998 def wait(self, timeout=None): 999 return self._callmethod('wait', (timeout,)) 1000 def notify(self): 1001 return self._callmethod('notify') 1002 def notify_all(self): 1003 return self._callmethod('notify_all') 1004 1005 class EventProxy(BaseProxy): 1006 _exposed_ = ('is_set', 'set', 'clear', 'wait') 1007 def is_set(self): 1008 return self._callmethod('is_set') 1009 def set(self): 1010 return self._callmethod('set') 1011 def clear(self): 1012 return self._callmethod('clear') 1013 def wait(self, timeout=None): 1014 return self._callmethod('wait', (timeout,)) 1015 1016 class NamespaceProxy(BaseProxy): 1017 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1018 def __getattr__(self, key): 1019 if key[0] == '_': 1020 return object.__getattribute__(self, key) 1021 callmethod = object.__getattribute__(self, '_callmethod') 1022 return callmethod('__getattribute__', (key,)) 1023 def __setattr__(self, key, value): 1024 if key[0] == '_': 1025 return object.__setattr__(self, key, value) 1026 callmethod = object.__getattribute__(self, '_callmethod') 1027 return callmethod('__setattr__', (key, value)) 1028 def __delattr__(self, key): 1029 if key[0] == '_': 1030 return object.__delattr__(self, key) 1031 callmethod = object.__getattribute__(self, '_callmethod') 1032 return callmethod('__delattr__', (key,)) 1033 1034 1035 class ValueProxy(BaseProxy): 1036 _exposed_ = ('get', 'set') 1037 def get(self): 1038 return self._callmethod('get') 1039 def set(self, value): 1040 return self._callmethod('set', (value,)) 1041 value = property(get, set) 1042 1043 1044 BaseListProxy = MakeProxyType('BaseListProxy', ( 1045 '__add__', '__contains__', '__delitem__', '__delslice__', 1046 '__getitem__', '__getslice__', '__len__', '__mul__', 1047 '__reversed__', '__rmul__', '__setitem__', '__setslice__', 1048 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1049 'reverse', 'sort', '__imul__' 1050 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 1051 class ListProxy(BaseListProxy): 1052 def __iadd__(self, value): 1053 self._callmethod('extend', (value,)) 1054 return self 1055 def __imul__(self, value): 1056 self._callmethod('__imul__', (value,)) 1057 return self 1058 1059 1060 DictProxy = MakeProxyType('DictProxy', ( 1061 '__contains__', '__delitem__', '__getitem__', '__len__', 1062 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items', 1063 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1064 )) 1065 1066 1067 ArrayProxy = MakeProxyType('ArrayProxy', ( 1068 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__' 1069 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 1070 1071 1072 PoolProxy = MakeProxyType('PoolProxy', ( 1073 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1074 'map', 'map_async', 'terminate' 1075 )) 1076 PoolProxy._method_to_typeid_ = { 1077 'apply_async': 'AsyncResult', 1078 'map_async': 'AsyncResult', 1079 'imap': 'Iterator', 1080 'imap_unordered': 'Iterator' 1081 } 1082 1083 # 1084 # Definition of SyncManager 1085 # 1086 1087 class SyncManager(BaseManager): 1088 ''' 1089 Subclass of `BaseManager` which supports a number of shared object types. 1090 1091 The types registered are those intended for the synchronization 1092 of threads, plus `dict`, `list` and `Namespace`. 1093 1094 The `multiprocessing.Manager()` function creates started instances of 1095 this class. 1096 ''' 1097 1098 SyncManager.register('Queue', Queue.Queue) 1099 SyncManager.register('JoinableQueue', Queue.Queue) 1100 SyncManager.register('Event', threading.Event, EventProxy) 1101 SyncManager.register('Lock', threading.Lock, AcquirerProxy) 1102 SyncManager.register('RLock', threading.RLock, AcquirerProxy) 1103 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1104 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1105 AcquirerProxy) 1106 SyncManager.register('Condition', threading.Condition, ConditionProxy) 1107 SyncManager.register('Pool', Pool, PoolProxy) 1108 SyncManager.register('list', list, ListProxy) 1109 SyncManager.register('dict', dict, DictProxy) 1110 SyncManager.register('Value', Value, ValueProxy) 1111 SyncManager.register('Array', Array, ArrayProxy) 1112 SyncManager.register('Namespace', Namespace, NamespaceProxy) 1113 1114 # types returned by methods of PoolProxy 1115 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1116 SyncManager.register('AsyncResult', create_method=False) 1117