Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module providing the `SyncManager` class for dealing
      3 # with shared objects
      4 #
      5 # multiprocessing/managers.py
      6 #
      7 # Copyright (c) 2006-2008, R Oudkerk
      8 # All rights reserved.
      9 #
     10 # Redistribution and use in source and binary forms, with or without
     11 # modification, are permitted provided that the following conditions
     12 # are met:
     13 #
     14 # 1. Redistributions of source code must retain the above copyright
     15 #    notice, this list of conditions and the following disclaimer.
     16 # 2. Redistributions in binary form must reproduce the above copyright
     17 #    notice, this list of conditions and the following disclaimer in the
     18 #    documentation and/or other materials provided with the distribution.
     19 # 3. Neither the name of author nor the names of any contributors may be
     20 #    used to endorse or promote products derived from this software
     21 #    without specific prior written permission.
     22 #
     23 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
     24 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     25 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     26 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     27 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     28 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     29 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     30 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     32 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     33 # SUCH DAMAGE.
     34 #
     35 
     36 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
     37 
     38 #
     39 # Imports
     40 #
     41 
     42 import os
     43 import sys
     44 import weakref
     45 import threading
     46 import array
     47 import Queue
     48 
     49 from traceback import format_exc
     50 from multiprocessing import Process, current_process, active_children, Pool, util, connection
     51 from multiprocessing.process import AuthenticationString
     52 from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
     53 from multiprocessing.util import Finalize, info
     54 
     55 try:
     56     from cPickle import PicklingError
     57 except ImportError:
     58     from pickle import PicklingError
     59 
     60 #
     61 # Register some things for pickling
     62 #
     63 
     64 def reduce_array(a):
     65     return array.array, (a.typecode, a.tostring())
     66 ForkingPickler.register(array.array, reduce_array)
     67 
     68 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
     69 
     70 #
     71 # Type for identifying shared objects
     72 #
     73 
     74 class Token(object):
     75     '''
     76     Type to uniquely indentify a shared object
     77     '''
     78     __slots__ = ('typeid', 'address', 'id')
     79 
     80     def __init__(self, typeid, address, id):
     81         (self.typeid, self.address, self.id) = (typeid, address, id)
     82 
     83     def __getstate__(self):
     84         return (self.typeid, self.address, self.id)
     85 
     86     def __setstate__(self, state):
     87         (self.typeid, self.address, self.id) = state
     88 
     89     def __repr__(self):
     90         return 'Token(typeid=%r, address=%r, id=%r)' % \
     91                (self.typeid, self.address, self.id)
     92 
     93 #
     94 # Function for communication with a manager's server process
     95 #
     96 
     97 def dispatch(c, id, methodname, args=(), kwds={}):
     98     '''
     99     Send a message to manager using connection `c` and return response
    100     '''
    101     c.send((id, methodname, args, kwds))
    102     kind, result = c.recv()
    103     if kind == '#RETURN':
    104         return result
    105     raise convert_to_error(kind, result)
    106 
    107 def convert_to_error(kind, result):
    108     if kind == '#ERROR':
    109         return result
    110     elif kind == '#TRACEBACK':
    111         assert type(result) is str
    112         return  RemoteError(result)
    113     elif kind == '#UNSERIALIZABLE':
    114         assert type(result) is str
    115         return RemoteError('Unserializable message: %s\n' % result)
    116     else:
    117         return ValueError('Unrecognized message type')
    118 
    119 class RemoteError(Exception):
    120     def __str__(self):
    121         return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
    122 
    123 #
    124 # Functions for finding the method names of an object
    125 #
    126 
    127 def all_methods(obj):
    128     '''
    129     Return a list of names of methods of `obj`
    130     '''
    131     temp = []
    132     for name in dir(obj):
    133         func = getattr(obj, name)
    134         if hasattr(func, '__call__'):
    135             temp.append(name)
    136     return temp
    137 
    138 def public_methods(obj):
    139     '''
    140     Return a list of names of methods of `obj` which do not start with '_'
    141     '''
    142     return [name for name in all_methods(obj) if name[0] != '_']
    143 
    144 #
    145 # Server which is run in a process controlled by a manager
    146 #
    147 
    148 class Server(object):
    149     '''
    150     Server class which runs in a process controlled by a manager object
    151     '''
    152     public = ['shutdown', 'create', 'accept_connection', 'get_methods',
    153               'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
    154 
    155     def __init__(self, registry, address, authkey, serializer):
    156         assert isinstance(authkey, bytes)
    157         self.registry = registry
    158         self.authkey = AuthenticationString(authkey)
    159         Listener, Client = listener_client[serializer]
    160 
    161         # do authentication later
    162         self.listener = Listener(address=address, backlog=16)
    163         self.address = self.listener.address
    164 
    165         self.id_to_obj = {'0': (None, ())}
    166         self.id_to_refcount = {}
    167         self.mutex = threading.RLock()
    168         self.stop = 0
    169 
    170     def serve_forever(self):
    171         '''
    172         Run the server forever
    173         '''
    174         current_process()._manager_server = self
    175         try:
    176             try:
    177                 while 1:
    178                     try:
    179                         c = self.listener.accept()
    180                     except (OSError, IOError):
    181                         continue
    182                     t = threading.Thread(target=self.handle_request, args=(c,))
    183                     t.daemon = True
    184                     t.start()
    185             except (KeyboardInterrupt, SystemExit):
    186                 pass
    187         finally:
    188             self.stop = 999
    189             self.listener.close()
    190 
    191     def handle_request(self, c):
    192         '''
    193         Handle a new connection
    194         '''
    195         funcname = result = request = None
    196         try:
    197             connection.deliver_challenge(c, self.authkey)
    198             connection.answer_challenge(c, self.authkey)
    199             request = c.recv()
    200             ignore, funcname, args, kwds = request
    201             assert funcname in self.public, '%r unrecognized' % funcname
    202             func = getattr(self, funcname)
    203         except Exception:
    204             msg = ('#TRACEBACK', format_exc())
    205         else:
    206             try:
    207                 result = func(c, *args, **kwds)
    208             except Exception:
    209                 msg = ('#TRACEBACK', format_exc())
    210             else:
    211                 msg = ('#RETURN', result)
    212         try:
    213             c.send(msg)
    214         except Exception, e:
    215             try:
    216                 c.send(('#TRACEBACK', format_exc()))
    217             except Exception:
    218                 pass
    219             util.info('Failure to send message: %r', msg)
    220             util.info(' ... request was %r', request)
    221             util.info(' ... exception was %r', e)
    222 
    223         c.close()
    224 
    225     def serve_client(self, conn):
    226         '''
    227         Handle requests from the proxies in a particular process/thread
    228         '''
    229         util.debug('starting server thread to service %r',
    230                    threading.current_thread().name)
    231 
    232         recv = conn.recv
    233         send = conn.send
    234         id_to_obj = self.id_to_obj
    235 
    236         while not self.stop:
    237 
    238             try:
    239                 methodname = obj = None
    240                 request = recv()
    241                 ident, methodname, args, kwds = request
    242                 obj, exposed, gettypeid = id_to_obj[ident]
    243 
    244                 if methodname not in exposed:
    245                     raise AttributeError(
    246                         'method %r of %r object is not in exposed=%r' %
    247                         (methodname, type(obj), exposed)
    248                         )
    249 
    250                 function = getattr(obj, methodname)
    251 
    252                 try:
    253                     res = function(*args, **kwds)
    254                 except Exception, e:
    255                     msg = ('#ERROR', e)
    256                 else:
    257                     typeid = gettypeid and gettypeid.get(methodname, None)
    258                     if typeid:
    259                         rident, rexposed = self.create(conn, typeid, res)
    260                         token = Token(typeid, self.address, rident)
    261                         msg = ('#PROXY', (rexposed, token))
    262                     else:
    263                         msg = ('#RETURN', res)
    264 
    265             except AttributeError:
    266                 if methodname is None:
    267                     msg = ('#TRACEBACK', format_exc())
    268                 else:
    269                     try:
    270                         fallback_func = self.fallback_mapping[methodname]
    271                         result = fallback_func(
    272                             self, conn, ident, obj, *args, **kwds
    273                             )
    274                         msg = ('#RETURN', result)
    275                     except Exception:
    276                         msg = ('#TRACEBACK', format_exc())
    277 
    278             except EOFError:
    279                 util.debug('got EOF -- exiting thread serving %r',
    280                            threading.current_thread().name)
    281                 sys.exit(0)
    282 
    283             except Exception:
    284                 msg = ('#TRACEBACK', format_exc())
    285 
    286             try:
    287                 try:
    288                     send(msg)
    289                 except Exception, e:
    290                     send(('#UNSERIALIZABLE', repr(msg)))
    291             except Exception, e:
    292                 util.info('exception in thread serving %r',
    293                         threading.current_thread().name)
    294                 util.info(' ... message was %r', msg)
    295                 util.info(' ... exception was %r', e)
    296                 conn.close()
    297                 sys.exit(1)
    298 
    299     def fallback_getvalue(self, conn, ident, obj):
    300         return obj
    301 
    302     def fallback_str(self, conn, ident, obj):
    303         return str(obj)
    304 
    305     def fallback_repr(self, conn, ident, obj):
    306         return repr(obj)
    307 
    308     fallback_mapping = {
    309         '__str__':fallback_str,
    310         '__repr__':fallback_repr,
    311         '#GETVALUE':fallback_getvalue
    312         }
    313 
    314     def dummy(self, c):
    315         pass
    316 
    317     def debug_info(self, c):
    318         '''
    319         Return some info --- useful to spot problems with refcounting
    320         '''
    321         self.mutex.acquire()
    322         try:
    323             result = []
    324             keys = self.id_to_obj.keys()
    325             keys.sort()
    326             for ident in keys:
    327                 if ident != '0':
    328                     result.append('  %s:       refcount=%s\n    %s' %
    329                                   (ident, self.id_to_refcount[ident],
    330                                    str(self.id_to_obj[ident][0])[:75]))
    331             return '\n'.join(result)
    332         finally:
    333             self.mutex.release()
    334 
    335     def number_of_objects(self, c):
    336         '''
    337         Number of shared objects
    338         '''
    339         return len(self.id_to_obj) - 1      # don't count ident='0'
    340 
    341     def shutdown(self, c):
    342         '''
    343         Shutdown this process
    344         '''
    345         try:
    346             try:
    347                 util.debug('manager received shutdown message')
    348                 c.send(('#RETURN', None))
    349 
    350                 if sys.stdout != sys.__stdout__:
    351                     util.debug('resetting stdout, stderr')
    352                     sys.stdout = sys.__stdout__
    353                     sys.stderr = sys.__stderr__
    354 
    355                 util._run_finalizers(0)
    356 
    357                 for p in active_children():
    358                     util.debug('terminating a child process of manager')
    359                     p.terminate()
    360 
    361                 for p in active_children():
    362                     util.debug('terminating a child process of manager')
    363                     p.join()
    364 
    365                 util._run_finalizers()
    366                 util.info('manager exiting with exitcode 0')
    367             except:
    368                 import traceback
    369                 traceback.print_exc()
    370         finally:
    371             exit(0)
    372 
    373     def create(self, c, typeid, *args, **kwds):
    374         '''
    375         Create a new shared object and return its id
    376         '''
    377         self.mutex.acquire()
    378         try:
    379             callable, exposed, method_to_typeid, proxytype = \
    380                       self.registry[typeid]
    381 
    382             if callable is None:
    383                 assert len(args) == 1 and not kwds
    384                 obj = args[0]
    385             else:
    386                 obj = callable(*args, **kwds)
    387 
    388             if exposed is None:
    389                 exposed = public_methods(obj)
    390             if method_to_typeid is not None:
    391                 assert type(method_to_typeid) is dict
    392                 exposed = list(exposed) + list(method_to_typeid)
    393 
    394             ident = '%x' % id(obj)  # convert to string because xmlrpclib
    395                                     # only has 32 bit signed integers
    396             util.debug('%r callable returned object with id %r', typeid, ident)
    397 
    398             self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
    399             if ident not in self.id_to_refcount:
    400                 self.id_to_refcount[ident] = 0
    401             # increment the reference count immediately, to avoid
    402             # this object being garbage collected before a Proxy
    403             # object for it can be created.  The caller of create()
    404             # is responsible for doing a decref once the Proxy object
    405             # has been created.
    406             self.incref(c, ident)
    407             return ident, tuple(exposed)
    408         finally:
    409             self.mutex.release()
    410 
    411     def get_methods(self, c, token):
    412         '''
    413         Return the methods of the shared object indicated by token
    414         '''
    415         return tuple(self.id_to_obj[token.id][1])
    416 
    417     def accept_connection(self, c, name):
    418         '''
    419         Spawn a new thread to serve this connection
    420         '''
    421         threading.current_thread().name = name
    422         c.send(('#RETURN', None))
    423         self.serve_client(c)
    424 
    425     def incref(self, c, ident):
    426         self.mutex.acquire()
    427         try:
    428             self.id_to_refcount[ident] += 1
    429         finally:
    430             self.mutex.release()
    431 
    432     def decref(self, c, ident):
    433         self.mutex.acquire()
    434         try:
    435             assert self.id_to_refcount[ident] >= 1
    436             self.id_to_refcount[ident] -= 1
    437             if self.id_to_refcount[ident] == 0:
    438                 del self.id_to_obj[ident], self.id_to_refcount[ident]
    439                 util.debug('disposing of obj with id %r', ident)
    440         finally:
    441             self.mutex.release()
    442 
    443 #
    444 # Class to represent state of a manager
    445 #
    446 
    447 class State(object):
    448     __slots__ = ['value']
    449     INITIAL = 0
    450     STARTED = 1
    451     SHUTDOWN = 2
    452 
    453 #
    454 # Mapping from serializer name to Listener and Client types
    455 #
    456 
    457 listener_client = {
    458     'pickle' : (connection.Listener, connection.Client),
    459     'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
    460     }
    461 
    462 #
    463 # Definition of BaseManager
    464 #
    465 
    466 class BaseManager(object):
    467     '''
    468     Base class for managers
    469     '''
    470     _registry = {}
    471     _Server = Server
    472 
    473     def __init__(self, address=None, authkey=None, serializer='pickle'):
    474         if authkey is None:
    475             authkey = current_process().authkey
    476         self._address = address     # XXX not final address if eg ('', 0)
    477         self._authkey = AuthenticationString(authkey)
    478         self._state = State()
    479         self._state.value = State.INITIAL
    480         self._serializer = serializer
    481         self._Listener, self._Client = listener_client[serializer]
    482 
    483     def __reduce__(self):
    484         return type(self).from_address, \
    485                (self._address, self._authkey, self._serializer)
    486 
    487     def get_server(self):
    488         '''
    489         Return server object with serve_forever() method and address attribute
    490         '''
    491         assert self._state.value == State.INITIAL
    492         return Server(self._registry, self._address,
    493                       self._authkey, self._serializer)
    494 
    495     def connect(self):
    496         '''
    497         Connect manager object to the server process
    498         '''
    499         Listener, Client = listener_client[self._serializer]
    500         conn = Client(self._address, authkey=self._authkey)
    501         dispatch(conn, None, 'dummy')
    502         self._state.value = State.STARTED
    503 
    504     def start(self, initializer=None, initargs=()):
    505         '''
    506         Spawn a server process for this manager object
    507         '''
    508         assert self._state.value == State.INITIAL
    509 
    510         if initializer is not None and not hasattr(initializer, '__call__'):
    511             raise TypeError('initializer must be a callable')
    512 
    513         # pipe over which we will retrieve address of server
    514         reader, writer = connection.Pipe(duplex=False)
    515 
    516         # spawn process which runs a server
    517         self._process = Process(
    518             target=type(self)._run_server,
    519             args=(self._registry, self._address, self._authkey,
    520                   self._serializer, writer, initializer, initargs),
    521             )
    522         ident = ':'.join(str(i) for i in self._process._identity)
    523         self._process.name = type(self).__name__  + '-' + ident
    524         self._process.start()
    525 
    526         # get address of server
    527         writer.close()
    528         self._address = reader.recv()
    529         reader.close()
    530 
    531         # register a finalizer
    532         self._state.value = State.STARTED
    533         self.shutdown = util.Finalize(
    534             self, type(self)._finalize_manager,
    535             args=(self._process, self._address, self._authkey,
    536                   self._state, self._Client),
    537             exitpriority=0
    538             )
    539 
    540     @classmethod
    541     def _run_server(cls, registry, address, authkey, serializer, writer,
    542                     initializer=None, initargs=()):
    543         '''
    544         Create a server, report its address and run it
    545         '''
    546         if initializer is not None:
    547             initializer(*initargs)
    548 
    549         # create server
    550         server = cls._Server(registry, address, authkey, serializer)
    551 
    552         # inform parent process of the server's address
    553         writer.send(server.address)
    554         writer.close()
    555 
    556         # run the manager
    557         util.info('manager serving at %r', server.address)
    558         server.serve_forever()
    559 
    560     def _create(self, typeid, *args, **kwds):
    561         '''
    562         Create a new shared object; return the token and exposed tuple
    563         '''
    564         assert self._state.value == State.STARTED, 'server not yet started'
    565         conn = self._Client(self._address, authkey=self._authkey)
    566         try:
    567             id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
    568         finally:
    569             conn.close()
    570         return Token(typeid, self._address, id), exposed
    571 
    572     def join(self, timeout=None):
    573         '''
    574         Join the manager process (if it has been spawned)
    575         '''
    576         self._process.join(timeout)
    577 
    578     def _debug_info(self):
    579         '''
    580         Return some info about the servers shared objects and connections
    581         '''
    582         conn = self._Client(self._address, authkey=self._authkey)
    583         try:
    584             return dispatch(conn, None, 'debug_info')
    585         finally:
    586             conn.close()
    587 
    588     def _number_of_objects(self):
    589         '''
    590         Return the number of shared objects
    591         '''
    592         conn = self._Client(self._address, authkey=self._authkey)
    593         try:
    594             return dispatch(conn, None, 'number_of_objects')
    595         finally:
    596             conn.close()
    597 
    598     def __enter__(self):
    599         return self
    600 
    601     def __exit__(self, exc_type, exc_val, exc_tb):
    602         self.shutdown()
    603 
    604     @staticmethod
    605     def _finalize_manager(process, address, authkey, state, _Client):
    606         '''
    607         Shutdown the manager process; will be registered as a finalizer
    608         '''
    609         if process.is_alive():
    610             util.info('sending shutdown message to manager')
    611             try:
    612                 conn = _Client(address, authkey=authkey)
    613                 try:
    614                     dispatch(conn, None, 'shutdown')
    615                 finally:
    616                     conn.close()
    617             except Exception:
    618                 pass
    619 
    620             process.join(timeout=0.2)
    621             if process.is_alive():
    622                 util.info('manager still alive')
    623                 if hasattr(process, 'terminate'):
    624                     util.info('trying to `terminate()` manager process')
    625                     process.terminate()
    626                     process.join(timeout=0.1)
    627                     if process.is_alive():
    628                         util.info('manager still alive after terminate')
    629 
    630         state.value = State.SHUTDOWN
    631         try:
    632             del BaseProxy._address_to_local[address]
    633         except KeyError:
    634             pass
    635 
    636     address = property(lambda self: self._address)
    637 
    638     @classmethod
    639     def register(cls, typeid, callable=None, proxytype=None, exposed=None,
    640                  method_to_typeid=None, create_method=True):
    641         '''
    642         Register a typeid with the manager type
    643         '''
    644         if '_registry' not in cls.__dict__:
    645             cls._registry = cls._registry.copy()
    646 
    647         if proxytype is None:
    648             proxytype = AutoProxy
    649 
    650         exposed = exposed or getattr(proxytype, '_exposed_', None)
    651 
    652         method_to_typeid = method_to_typeid or \
    653                            getattr(proxytype, '_method_to_typeid_', None)
    654 
    655         if method_to_typeid:
    656             for key, value in method_to_typeid.items():
    657                 assert type(key) is str, '%r is not a string' % key
    658                 assert type(value) is str, '%r is not a string' % value
    659 
    660         cls._registry[typeid] = (
    661             callable, exposed, method_to_typeid, proxytype
    662             )
    663 
    664         if create_method:
    665             def temp(self, *args, **kwds):
    666                 util.debug('requesting creation of a shared %r object', typeid)
    667                 token, exp = self._create(typeid, *args, **kwds)
    668                 proxy = proxytype(
    669                     token, self._serializer, manager=self,
    670                     authkey=self._authkey, exposed=exp
    671                     )
    672                 conn = self._Client(token.address, authkey=self._authkey)
    673                 dispatch(conn, None, 'decref', (token.id,))
    674                 return proxy
    675             temp.__name__ = typeid
    676             setattr(cls, typeid, temp)
    677 
    678 #
    679 # Subclass of set which get cleared after a fork
    680 #
    681 
    682 class ProcessLocalSet(set):
    683     def __init__(self):
    684         util.register_after_fork(self, lambda obj: obj.clear())
    685     def __reduce__(self):
    686         return type(self), ()
    687 
    688 #
    689 # Definition of BaseProxy
    690 #
    691 
    692 class BaseProxy(object):
    693     '''
    694     A base for proxies of shared objects
    695     '''
    696     _address_to_local = {}
    697     _mutex = util.ForkAwareThreadLock()
    698 
    699     def __init__(self, token, serializer, manager=None,
    700                  authkey=None, exposed=None, incref=True):
    701         BaseProxy._mutex.acquire()
    702         try:
    703             tls_idset = BaseProxy._address_to_local.get(token.address, None)
    704             if tls_idset is None:
    705                 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
    706                 BaseProxy._address_to_local[token.address] = tls_idset
    707         finally:
    708             BaseProxy._mutex.release()
    709 
    710         # self._tls is used to record the connection used by this
    711         # thread to communicate with the manager at token.address
    712         self._tls = tls_idset[0]
    713 
    714         # self._idset is used to record the identities of all shared
    715         # objects for which the current process owns references and
    716         # which are in the manager at token.address
    717         self._idset = tls_idset[1]
    718 
    719         self._token = token
    720         self._id = self._token.id
    721         self._manager = manager
    722         self._serializer = serializer
    723         self._Client = listener_client[serializer][1]
    724 
    725         if authkey is not None:
    726             self._authkey = AuthenticationString(authkey)
    727         elif self._manager is not None:
    728             self._authkey = self._manager._authkey
    729         else:
    730             self._authkey = current_process().authkey
    731 
    732         if incref:
    733             self._incref()
    734 
    735         util.register_after_fork(self, BaseProxy._after_fork)
    736 
    737     def _connect(self):
    738         util.debug('making connection to manager')
    739         name = current_process().name
    740         if threading.current_thread().name != 'MainThread':
    741             name += '|' + threading.current_thread().name
    742         conn = self._Client(self._token.address, authkey=self._authkey)
    743         dispatch(conn, None, 'accept_connection', (name,))
    744         self._tls.connection = conn
    745 
    746     def _callmethod(self, methodname, args=(), kwds={}):
    747         '''
    748         Try to call a method of the referrent and return a copy of the result
    749         '''
    750         try:
    751             conn = self._tls.connection
    752         except AttributeError:
    753             util.debug('thread %r does not own a connection',
    754                        threading.current_thread().name)
    755             self._connect()
    756             conn = self._tls.connection
    757 
    758         conn.send((self._id, methodname, args, kwds))
    759         kind, result = conn.recv()
    760 
    761         if kind == '#RETURN':
    762             return result
    763         elif kind == '#PROXY':
    764             exposed, token = result
    765             proxytype = self._manager._registry[token.typeid][-1]
    766             proxy = proxytype(
    767                 token, self._serializer, manager=self._manager,
    768                 authkey=self._authkey, exposed=exposed
    769                 )
    770             conn = self._Client(token.address, authkey=self._authkey)
    771             dispatch(conn, None, 'decref', (token.id,))
    772             return proxy
    773         raise convert_to_error(kind, result)
    774 
    775     def _getvalue(self):
    776         '''
    777         Get a copy of the value of the referent
    778         '''
    779         return self._callmethod('#GETVALUE')
    780 
    781     def _incref(self):
    782         conn = self._Client(self._token.address, authkey=self._authkey)
    783         dispatch(conn, None, 'incref', (self._id,))
    784         util.debug('INCREF %r', self._token.id)
    785 
    786         self._idset.add(self._id)
    787 
    788         state = self._manager and self._manager._state
    789 
    790         self._close = util.Finalize(
    791             self, BaseProxy._decref,
    792             args=(self._token, self._authkey, state,
    793                   self._tls, self._idset, self._Client),
    794             exitpriority=10
    795             )
    796 
    797     @staticmethod
    798     def _decref(token, authkey, state, tls, idset, _Client):
    799         idset.discard(token.id)
    800 
    801         # check whether manager is still alive
    802         if state is None or state.value == State.STARTED:
    803             # tell manager this process no longer cares about referent
    804             try:
    805                 util.debug('DECREF %r', token.id)
    806                 conn = _Client(token.address, authkey=authkey)
    807                 dispatch(conn, None, 'decref', (token.id,))
    808             except Exception, e:
    809                 util.debug('... decref failed %s', e)
    810 
    811         else:
    812             util.debug('DECREF %r -- manager already shutdown', token.id)
    813 
    814         # check whether we can close this thread's connection because
    815         # the process owns no more references to objects for this manager
    816         if not idset and hasattr(tls, 'connection'):
    817             util.debug('thread %r has no more proxies so closing conn',
    818                        threading.current_thread().name)
    819             tls.connection.close()
    820             del tls.connection
    821 
    822     def _after_fork(self):
    823         self._manager = None
    824         try:
    825             self._incref()
    826         except Exception, e:
    827             # the proxy may just be for a manager which has shutdown
    828             util.info('incref failed: %s' % e)
    829 
    830     def __reduce__(self):
    831         kwds = {}
    832         if Popen.thread_is_spawning():
    833             kwds['authkey'] = self._authkey
    834 
    835         if getattr(self, '_isauto', False):
    836             kwds['exposed'] = self._exposed_
    837             return (RebuildProxy,
    838                     (AutoProxy, self._token, self._serializer, kwds))
    839         else:
    840             return (RebuildProxy,
    841                     (type(self), self._token, self._serializer, kwds))
    842 
    843     def __deepcopy__(self, memo):
    844         return self._getvalue()
    845 
    846     def __repr__(self):
    847         return '<%s object, typeid %r at %s>' % \
    848                (type(self).__name__, self._token.typeid, '0x%x' % id(self))
    849 
    850     def __str__(self):
    851         '''
    852         Return representation of the referent (or a fall-back if that fails)
    853         '''
    854         try:
    855             return self._callmethod('__repr__')
    856         except Exception:
    857             return repr(self)[:-1] + "; '__str__()' failed>"
    858 
    859 #
    860 # Function used for unpickling
    861 #
    862 
    863 def RebuildProxy(func, token, serializer, kwds):
    864     '''
    865     Function used for unpickling proxy objects.
    866 
    867     If possible the shared object is returned, or otherwise a proxy for it.
    868     '''
    869     server = getattr(current_process(), '_manager_server', None)
    870 
    871     if server and server.address == token.address:
    872         return server.id_to_obj[token.id][0]
    873     else:
    874         incref = (
    875             kwds.pop('incref', True) and
    876             not getattr(current_process(), '_inheriting', False)
    877             )
    878         return func(token, serializer, incref=incref, **kwds)
    879 
    880 #
    881 # Functions to create proxies and proxy types
    882 #
    883 
    884 def MakeProxyType(name, exposed, _cache={}):
    885     '''
    886     Return an proxy type whose methods are given by `exposed`
    887     '''
    888     exposed = tuple(exposed)
    889     try:
    890         return _cache[(name, exposed)]
    891     except KeyError:
    892         pass
    893 
    894     dic = {}
    895 
    896     for meth in exposed:
    897         exec '''def %s(self, *args, **kwds):
    898         return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
    899 
    900     ProxyType = type(name, (BaseProxy,), dic)
    901     ProxyType._exposed_ = exposed
    902     _cache[(name, exposed)] = ProxyType
    903     return ProxyType
    904 
    905 
    906 def AutoProxy(token, serializer, manager=None, authkey=None,
    907               exposed=None, incref=True):
    908     '''
    909     Return an auto-proxy for `token`
    910     '''
    911     _Client = listener_client[serializer][1]
    912 
    913     if exposed is None:
    914         conn = _Client(token.address, authkey=authkey)
    915         try:
    916             exposed = dispatch(conn, None, 'get_methods', (token,))
    917         finally:
    918             conn.close()
    919 
    920     if authkey is None and manager is not None:
    921         authkey = manager._authkey
    922     if authkey is None:
    923         authkey = current_process().authkey
    924 
    925     ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
    926     proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
    927                       incref=incref)
    928     proxy._isauto = True
    929     return proxy
    930 
    931 #
    932 # Types/callables which we will register with SyncManager
    933 #
    934 
    935 class Namespace(object):
    936     def __init__(self, **kwds):
    937         self.__dict__.update(kwds)
    938     def __repr__(self):
    939         items = self.__dict__.items()
    940         temp = []
    941         for name, value in items:
    942             if not name.startswith('_'):
    943                 temp.append('%s=%r' % (name, value))
    944         temp.sort()
    945         return 'Namespace(%s)' % str.join(', ', temp)
    946 
    947 class Value(object):
    948     def __init__(self, typecode, value, lock=True):
    949         self._typecode = typecode
    950         self._value = value
    951     def get(self):
    952         return self._value
    953     def set(self, value):
    954         self._value = value
    955     def __repr__(self):
    956         return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
    957     value = property(get, set)
    958 
    959 def Array(typecode, sequence, lock=True):
    960     return array.array(typecode, sequence)
    961 
    962 #
    963 # Proxy types used by SyncManager
    964 #
    965 
    966 class IteratorProxy(BaseProxy):
    967     # XXX remove methods for Py3.0 and Py2.6
    968     _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
    969     def __iter__(self):
    970         return self
    971     def __next__(self, *args):
    972         return self._callmethod('__next__', args)
    973     def next(self, *args):
    974         return self._callmethod('next', args)
    975     def send(self, *args):
    976         return self._callmethod('send', args)
    977     def throw(self, *args):
    978         return self._callmethod('throw', args)
    979     def close(self, *args):
    980         return self._callmethod('close', args)
    981 
    982 
    983 class AcquirerProxy(BaseProxy):
    984     _exposed_ = ('acquire', 'release')
    985     def acquire(self, blocking=True):
    986         return self._callmethod('acquire', (blocking,))
    987     def release(self):
    988         return self._callmethod('release')
    989     def __enter__(self):
    990         return self._callmethod('acquire')
    991     def __exit__(self, exc_type, exc_val, exc_tb):
    992         return self._callmethod('release')
    993 
    994 
    995 class ConditionProxy(AcquirerProxy):
    996     # XXX will Condition.notfyAll() name be available in Py3.0?
    997     _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
    998     def wait(self, timeout=None):
    999         return self._callmethod('wait', (timeout,))
   1000     def notify(self):
   1001         return self._callmethod('notify')
   1002     def notify_all(self):
   1003         return self._callmethod('notify_all')
   1004 
   1005 class EventProxy(BaseProxy):
   1006     _exposed_ = ('is_set', 'set', 'clear', 'wait')
   1007     def is_set(self):
   1008         return self._callmethod('is_set')
   1009     def set(self):
   1010         return self._callmethod('set')
   1011     def clear(self):
   1012         return self._callmethod('clear')
   1013     def wait(self, timeout=None):
   1014         return self._callmethod('wait', (timeout,))
   1015 
   1016 class NamespaceProxy(BaseProxy):
   1017     _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
   1018     def __getattr__(self, key):
   1019         if key[0] == '_':
   1020             return object.__getattribute__(self, key)
   1021         callmethod = object.__getattribute__(self, '_callmethod')
   1022         return callmethod('__getattribute__', (key,))
   1023     def __setattr__(self, key, value):
   1024         if key[0] == '_':
   1025             return object.__setattr__(self, key, value)
   1026         callmethod = object.__getattribute__(self, '_callmethod')
   1027         return callmethod('__setattr__', (key, value))
   1028     def __delattr__(self, key):
   1029         if key[0] == '_':
   1030             return object.__delattr__(self, key)
   1031         callmethod = object.__getattribute__(self, '_callmethod')
   1032         return callmethod('__delattr__', (key,))
   1033 
   1034 
   1035 class ValueProxy(BaseProxy):
   1036     _exposed_ = ('get', 'set')
   1037     def get(self):
   1038         return self._callmethod('get')
   1039     def set(self, value):
   1040         return self._callmethod('set', (value,))
   1041     value = property(get, set)
   1042 
   1043 
   1044 BaseListProxy = MakeProxyType('BaseListProxy', (
   1045     '__add__', '__contains__', '__delitem__', '__delslice__',
   1046     '__getitem__', '__getslice__', '__len__', '__mul__',
   1047     '__reversed__', '__rmul__', '__setitem__', '__setslice__',
   1048     'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
   1049     'reverse', 'sort', '__imul__'
   1050     ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
   1051 class ListProxy(BaseListProxy):
   1052     def __iadd__(self, value):
   1053         self._callmethod('extend', (value,))
   1054         return self
   1055     def __imul__(self, value):
   1056         self._callmethod('__imul__', (value,))
   1057         return self
   1058 
   1059 
   1060 DictProxy = MakeProxyType('DictProxy', (
   1061     '__contains__', '__delitem__', '__getitem__', '__len__',
   1062     '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
   1063     'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
   1064     ))
   1065 
   1066 
   1067 ArrayProxy = MakeProxyType('ArrayProxy', (
   1068     '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
   1069     ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
   1070 
   1071 
   1072 PoolProxy = MakeProxyType('PoolProxy', (
   1073     'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
   1074     'map', 'map_async', 'terminate'
   1075     ))
   1076 PoolProxy._method_to_typeid_ = {
   1077     'apply_async': 'AsyncResult',
   1078     'map_async': 'AsyncResult',
   1079     'imap': 'Iterator',
   1080     'imap_unordered': 'Iterator'
   1081     }
   1082 
   1083 #
   1084 # Definition of SyncManager
   1085 #
   1086 
   1087 class SyncManager(BaseManager):
   1088     '''
   1089     Subclass of `BaseManager` which supports a number of shared object types.
   1090 
   1091     The types registered are those intended for the synchronization
   1092     of threads, plus `dict`, `list` and `Namespace`.
   1093 
   1094     The `multiprocessing.Manager()` function creates started instances of
   1095     this class.
   1096     '''
   1097 
   1098 SyncManager.register('Queue', Queue.Queue)
   1099 SyncManager.register('JoinableQueue', Queue.Queue)
   1100 SyncManager.register('Event', threading.Event, EventProxy)
   1101 SyncManager.register('Lock', threading.Lock, AcquirerProxy)
   1102 SyncManager.register('RLock', threading.RLock, AcquirerProxy)
   1103 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
   1104 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
   1105                      AcquirerProxy)
   1106 SyncManager.register('Condition', threading.Condition, ConditionProxy)
   1107 SyncManager.register('Pool', Pool, PoolProxy)
   1108 SyncManager.register('list', list, ListProxy)
   1109 SyncManager.register('dict', dict, DictProxy)
   1110 SyncManager.register('Value', Value, ValueProxy)
   1111 SyncManager.register('Array', Array, ArrayProxy)
   1112 SyncManager.register('Namespace', Namespace, NamespaceProxy)
   1113 
   1114 # types returned by methods of PoolProxy
   1115 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
   1116 SyncManager.register('AsyncResult', create_method=False)
   1117