Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module providing the `SyncManager` class for dealing
      3 # with shared objects
      4 #
      5 # multiprocessing/managers.py
      6 #
      7 # Copyright (c) 2006-2008, R Oudkerk
      8 # All rights reserved.
      9 #
     10 # Redistribution and use in source and binary forms, with or without
     11 # modification, are permitted provided that the following conditions
     12 # are met:
     13 #
     14 # 1. Redistributions of source code must retain the above copyright
     15 #    notice, this list of conditions and the following disclaimer.
     16 # 2. Redistributions in binary form must reproduce the above copyright
     17 #    notice, this list of conditions and the following disclaimer in the
     18 #    documentation and/or other materials provided with the distribution.
     19 # 3. Neither the name of author nor the names of any contributors may be
     20 #    used to endorse or promote products derived from this software
     21 #    without specific prior written permission.
     22 #
     23 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
     24 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     25 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     26 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     27 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     28 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     29 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     30 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     32 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     33 # SUCH DAMAGE.
     34 #
     35 
     36 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
     37 
     38 #
     39 # Imports
     40 #
     41 
     42 import os
     43 import sys
     44 import weakref
     45 import threading
     46 import array
     47 import Queue
     48 
     49 from traceback import format_exc
     50 from multiprocessing import Process, current_process, active_children, Pool, util, connection
     51 from multiprocessing.process import AuthenticationString
     52 from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
     53 from multiprocessing.util import Finalize, info
     54 
     55 try:
     56     from cPickle import PicklingError
     57 except ImportError:
     58     from pickle import PicklingError
     59 
     60 #
     61 # Register some things for pickling
     62 #
     63 
     64 def reduce_array(a):
     65     return array.array, (a.typecode, a.tostring())
     66 ForkingPickler.register(array.array, reduce_array)
     67 
     68 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
     69 
     70 #
     71 # Type for identifying shared objects
     72 #
     73 
     74 class Token(object):
     75     '''
     76     Type to uniquely indentify a shared object
     77     '''
     78     __slots__ = ('typeid', 'address', 'id')
     79 
     80     def __init__(self, typeid, address, id):
     81         (self.typeid, self.address, self.id) = (typeid, address, id)
     82 
     83     def __getstate__(self):
     84         return (self.typeid, self.address, self.id)
     85 
     86     def __setstate__(self, state):
     87         (self.typeid, self.address, self.id) = state
     88 
     89     def __repr__(self):
     90         return 'Token(typeid=%r, address=%r, id=%r)' % \
     91                (self.typeid, self.address, self.id)
     92 
     93 #
     94 # Function for communication with a manager's server process
     95 #
     96 
     97 def dispatch(c, id, methodname, args=(), kwds={}):
     98     '''
     99     Send a message to manager using connection `c` and return response
    100     '''
    101     c.send((id, methodname, args, kwds))
    102     kind, result = c.recv()
    103     if kind == '#RETURN':
    104         return result
    105     raise convert_to_error(kind, result)
    106 
    107 def convert_to_error(kind, result):
    108     if kind == '#ERROR':
    109         return result
    110     elif kind == '#TRACEBACK':
    111         assert type(result) is str
    112         return  RemoteError(result)
    113     elif kind == '#UNSERIALIZABLE':
    114         assert type(result) is str
    115         return RemoteError('Unserializable message: %s\n' % result)
    116     else:
    117         return ValueError('Unrecognized message type')
    118 
    119 class RemoteError(Exception):
    120     def __str__(self):
    121         return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
    122 
    123 #
    124 # Functions for finding the method names of an object
    125 #
    126 
    127 def all_methods(obj):
    128     '''
    129     Return a list of names of methods of `obj`
    130     '''
    131     temp = []
    132     for name in dir(obj):
    133         func = getattr(obj, name)
    134         if hasattr(func, '__call__'):
    135             temp.append(name)
    136     return temp
    137 
    138 def public_methods(obj):
    139     '''
    140     Return a list of names of methods of `obj` which do not start with '_'
    141     '''
    142     return [name for name in all_methods(obj) if name[0] != '_']
    143 
    144 #
    145 # Server which is run in a process controlled by a manager
    146 #
    147 
    148 class Server(object):
    149     '''
    150     Server class which runs in a process controlled by a manager object
    151     '''
    152     public = ['shutdown', 'create', 'accept_connection', 'get_methods',
    153               'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
    154 
    155     def __init__(self, registry, address, authkey, serializer):
    156         assert isinstance(authkey, bytes)
    157         self.registry = registry
    158         self.authkey = AuthenticationString(authkey)
    159         Listener, Client = listener_client[serializer]
    160 
    161         # do authentication later
    162         self.listener = Listener(address=address, backlog=16)
    163         self.address = self.listener.address
    164 
    165         self.id_to_obj = {'0': (None, ())}
    166         self.id_to_refcount = {}
    167         self.mutex = threading.RLock()
    168         self.stop = 0
    169 
    170     def serve_forever(self):
    171         '''
    172         Run the server forever
    173         '''
    174         current_process()._manager_server = self
    175         try:
    176             try:
    177                 while 1:
    178                     try:
    179                         c = self.listener.accept()
    180                     except (OSError, IOError):
    181                         continue
    182                     t = threading.Thread(target=self.handle_request, args=(c,))
    183                     t.daemon = True
    184                     t.start()
    185             except (KeyboardInterrupt, SystemExit):
    186                 pass
    187         finally:
    188             self.stop = 999
    189             self.listener.close()
    190 
    191     def handle_request(self, c):
    192         '''
    193         Handle a new connection
    194         '''
    195         funcname = result = request = None
    196         try:
    197             connection.deliver_challenge(c, self.authkey)
    198             connection.answer_challenge(c, self.authkey)
    199             request = c.recv()
    200             ignore, funcname, args, kwds = request
    201             assert funcname in self.public, '%r unrecognized' % funcname
    202             func = getattr(self, funcname)
    203         except Exception:
    204             msg = ('#TRACEBACK', format_exc())
    205         else:
    206             try:
    207                 result = func(c, *args, **kwds)
    208             except Exception:
    209                 msg = ('#TRACEBACK', format_exc())
    210             else:
    211                 msg = ('#RETURN', result)
    212         try:
    213             c.send(msg)
    214         except Exception, e:
    215             try:
    216                 c.send(('#TRACEBACK', format_exc()))
    217             except Exception:
    218                 pass
    219             util.info('Failure to send message: %r', msg)
    220             util.info(' ... request was %r', request)
    221             util.info(' ... exception was %r', e)
    222 
    223         c.close()
    224 
    225     def serve_client(self, conn):
    226         '''
    227         Handle requests from the proxies in a particular process/thread
    228         '''
    229         util.debug('starting server thread to service %r',
    230                    threading.current_thread().name)
    231 
    232         recv = conn.recv
    233         send = conn.send
    234         id_to_obj = self.id_to_obj
    235 
    236         while not self.stop:
    237 
    238             try:
    239                 methodname = obj = None
    240                 request = recv()
    241                 ident, methodname, args, kwds = request
    242                 obj, exposed, gettypeid = id_to_obj[ident]
    243 
    244                 if methodname not in exposed:
    245                     raise AttributeError(
    246                         'method %r of %r object is not in exposed=%r' %
    247                         (methodname, type(obj), exposed)
    248                         )
    249 
    250                 function = getattr(obj, methodname)
    251 
    252                 try:
    253                     res = function(*args, **kwds)
    254                 except Exception, e:
    255                     msg = ('#ERROR', e)
    256                 else:
    257                     typeid = gettypeid and gettypeid.get(methodname, None)
    258                     if typeid:
    259                         rident, rexposed = self.create(conn, typeid, res)
    260                         token = Token(typeid, self.address, rident)
    261                         msg = ('#PROXY', (rexposed, token))
    262                     else:
    263                         msg = ('#RETURN', res)
    264 
    265             except AttributeError:
    266                 if methodname is None:
    267                     msg = ('#TRACEBACK', format_exc())
    268                 else:
    269                     try:
    270                         fallback_func = self.fallback_mapping[methodname]
    271                         result = fallback_func(
    272                             self, conn, ident, obj, *args, **kwds
    273                             )
    274                         msg = ('#RETURN', result)
    275                     except Exception:
    276                         msg = ('#TRACEBACK', format_exc())
    277 
    278             except EOFError:
    279                 util.debug('got EOF -- exiting thread serving %r',
    280                            threading.current_thread().name)
    281                 sys.exit(0)
    282 
    283             except Exception:
    284                 msg = ('#TRACEBACK', format_exc())
    285 
    286             try:
    287                 try:
    288                     send(msg)
    289                 except Exception, e:
    290                     send(('#UNSERIALIZABLE', format_exc()))
    291             except Exception, e:
    292                 util.info('exception in thread serving %r',
    293                         threading.current_thread().name)
    294                 util.info(' ... message was %r', msg)
    295                 util.info(' ... exception was %r', e)
    296                 conn.close()
    297                 sys.exit(1)
    298 
    299     def fallback_getvalue(self, conn, ident, obj):
    300         return obj
    301 
    302     def fallback_str(self, conn, ident, obj):
    303         return str(obj)
    304 
    305     def fallback_repr(self, conn, ident, obj):
    306         return repr(obj)
    307 
    308     fallback_mapping = {
    309         '__str__':fallback_str,
    310         '__repr__':fallback_repr,
    311         '#GETVALUE':fallback_getvalue
    312         }
    313 
    314     def dummy(self, c):
    315         pass
    316 
    317     def debug_info(self, c):
    318         '''
    319         Return some info --- useful to spot problems with refcounting
    320         '''
    321         self.mutex.acquire()
    322         try:
    323             result = []
    324             keys = self.id_to_obj.keys()
    325             keys.sort()
    326             for ident in keys:
    327                 if ident != '0':
    328                     result.append('  %s:       refcount=%s\n    %s' %
    329                                   (ident, self.id_to_refcount[ident],
    330                                    str(self.id_to_obj[ident][0])[:75]))
    331             return '\n'.join(result)
    332         finally:
    333             self.mutex.release()
    334 
    335     def number_of_objects(self, c):
    336         '''
    337         Number of shared objects
    338         '''
    339         return len(self.id_to_obj) - 1      # don't count ident='0'
    340 
    341     def shutdown(self, c):
    342         '''
    343         Shutdown this process
    344         '''
    345         try:
    346             try:
    347                 util.debug('manager received shutdown message')
    348                 c.send(('#RETURN', None))
    349 
    350                 if sys.stdout != sys.__stdout__:
    351                     util.debug('resetting stdout, stderr')
    352                     sys.stdout = sys.__stdout__
    353                     sys.stderr = sys.__stderr__
    354 
    355                 util._run_finalizers(0)
    356 
    357                 for p in active_children():
    358                     util.debug('terminating a child process of manager')
    359                     p.terminate()
    360 
    361                 for p in active_children():
    362                     util.debug('terminating a child process of manager')
    363                     p.join()
    364 
    365                 util._run_finalizers()
    366                 util.info('manager exiting with exitcode 0')
    367             except:
    368                 import traceback
    369                 traceback.print_exc()
    370         finally:
    371             exit(0)
    372 
    373     def create(self, c, typeid, *args, **kwds):
    374         '''
    375         Create a new shared object and return its id
    376         '''
    377         self.mutex.acquire()
    378         try:
    379             callable, exposed, method_to_typeid, proxytype = \
    380                       self.registry[typeid]
    381 
    382             if callable is None:
    383                 assert len(args) == 1 and not kwds
    384                 obj = args[0]
    385             else:
    386                 obj = callable(*args, **kwds)
    387 
    388             if exposed is None:
    389                 exposed = public_methods(obj)
    390             if method_to_typeid is not None:
    391                 assert type(method_to_typeid) is dict
    392                 exposed = list(exposed) + list(method_to_typeid)
    393 
    394             ident = '%x' % id(obj)  # convert to string because xmlrpclib
    395                                     # only has 32 bit signed integers
    396             util.debug('%r callable returned object with id %r', typeid, ident)
    397 
    398             self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
    399             if ident not in self.id_to_refcount:
    400                 self.id_to_refcount[ident] = 0
    401             # increment the reference count immediately, to avoid
    402             # this object being garbage collected before a Proxy
    403             # object for it can be created.  The caller of create()
    404             # is responsible for doing a decref once the Proxy object
    405             # has been created.
    406             self.incref(c, ident)
    407             return ident, tuple(exposed)
    408         finally:
    409             self.mutex.release()
    410 
    411     def get_methods(self, c, token):
    412         '''
    413         Return the methods of the shared object indicated by token
    414         '''
    415         return tuple(self.id_to_obj[token.id][1])
    416 
    417     def accept_connection(self, c, name):
    418         '''
    419         Spawn a new thread to serve this connection
    420         '''
    421         threading.current_thread().name = name
    422         c.send(('#RETURN', None))
    423         self.serve_client(c)
    424 
    425     def incref(self, c, ident):
    426         self.mutex.acquire()
    427         try:
    428             self.id_to_refcount[ident] += 1
    429         finally:
    430             self.mutex.release()
    431 
    432     def decref(self, c, ident):
    433         self.mutex.acquire()
    434         try:
    435             assert self.id_to_refcount[ident] >= 1
    436             self.id_to_refcount[ident] -= 1
    437             if self.id_to_refcount[ident] == 0:
    438                 del self.id_to_obj[ident], self.id_to_refcount[ident]
    439                 util.debug('disposing of obj with id %r', ident)
    440         finally:
    441             self.mutex.release()
    442 
    443 #
    444 # Class to represent state of a manager
    445 #
    446 
    447 class State(object):
    448     __slots__ = ['value']
    449     INITIAL = 0
    450     STARTED = 1
    451     SHUTDOWN = 2
    452 
    453 #
    454 # Mapping from serializer name to Listener and Client types
    455 #
    456 
    457 listener_client = {
    458     'pickle' : (connection.Listener, connection.Client),
    459     'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
    460     }
    461 
    462 #
    463 # Definition of BaseManager
    464 #
    465 
    466 class BaseManager(object):
    467     '''
    468     Base class for managers
    469     '''
    470     _registry = {}
    471     _Server = Server
    472 
    473     def __init__(self, address=None, authkey=None, serializer='pickle'):
    474         if authkey is None:
    475             authkey = current_process().authkey
    476         self._address = address     # XXX not final address if eg ('', 0)
    477         self._authkey = AuthenticationString(authkey)
    478         self._state = State()
    479         self._state.value = State.INITIAL
    480         self._serializer = serializer
    481         self._Listener, self._Client = listener_client[serializer]
    482 
    483     def __reduce__(self):
    484         return type(self).from_address, \
    485                (self._address, self._authkey, self._serializer)
    486 
    487     def get_server(self):
    488         '''
    489         Return server object with serve_forever() method and address attribute
    490         '''
    491         assert self._state.value == State.INITIAL
    492         return Server(self._registry, self._address,
    493                       self._authkey, self._serializer)
    494 
    495     def connect(self):
    496         '''
    497         Connect manager object to the server process
    498         '''
    499         Listener, Client = listener_client[self._serializer]
    500         conn = Client(self._address, authkey=self._authkey)
    501         dispatch(conn, None, 'dummy')
    502         self._state.value = State.STARTED
    503 
    504     def start(self, initializer=None, initargs=()):
    505         '''
    506         Spawn a server process for this manager object
    507         '''
    508         assert self._state.value == State.INITIAL
    509 
    510         if initializer is not None and not hasattr(initializer, '__call__'):
    511             raise TypeError('initializer must be a callable')
    512 
    513         # pipe over which we will retrieve address of server
    514         reader, writer = connection.Pipe(duplex=False)
    515 
    516         # spawn process which runs a server
    517         self._process = Process(
    518             target=type(self)._run_server,
    519             args=(self._registry, self._address, self._authkey,
    520                   self._serializer, writer, initializer, initargs),
    521             )
    522         ident = ':'.join(str(i) for i in self._process._identity)
    523         self._process.name = type(self).__name__  + '-' + ident
    524         self._process.start()
    525 
    526         # get address of server
    527         writer.close()
    528         self._address = reader.recv()
    529         reader.close()
    530 
    531         # register a finalizer
    532         self._state.value = State.STARTED
    533         self.shutdown = util.Finalize(
    534             self, type(self)._finalize_manager,
    535             args=(self._process, self._address, self._authkey,
    536                   self._state, self._Client),
    537             exitpriority=0
    538             )
    539 
    540     @classmethod
    541     def _run_server(cls, registry, address, authkey, serializer, writer,
    542                     initializer=None, initargs=()):
    543         '''
    544         Create a server, report its address and run it
    545         '''
    546         if initializer is not None:
    547             initializer(*initargs)
    548 
    549         # create server
    550         server = cls._Server(registry, address, authkey, serializer)
    551 
    552         # inform parent process of the server's address
    553         writer.send(server.address)
    554         writer.close()
    555 
    556         # run the manager
    557         util.info('manager serving at %r', server.address)
    558         server.serve_forever()
    559 
    560     def _create(self, typeid, *args, **kwds):
    561         '''
    562         Create a new shared object; return the token and exposed tuple
    563         '''
    564         assert self._state.value == State.STARTED, 'server not yet started'
    565         conn = self._Client(self._address, authkey=self._authkey)
    566         try:
    567             id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
    568         finally:
    569             conn.close()
    570         return Token(typeid, self._address, id), exposed
    571 
    572     def join(self, timeout=None):
    573         '''
    574         Join the manager process (if it has been spawned)
    575         '''
    576         self._process.join(timeout)
    577 
    578     def _debug_info(self):
    579         '''
    580         Return some info about the servers shared objects and connections
    581         '''
    582         conn = self._Client(self._address, authkey=self._authkey)
    583         try:
    584             return dispatch(conn, None, 'debug_info')
    585         finally:
    586             conn.close()
    587 
    588     def _number_of_objects(self):
    589         '''
    590         Return the number of shared objects
    591         '''
    592         conn = self._Client(self._address, authkey=self._authkey)
    593         try:
    594             return dispatch(conn, None, 'number_of_objects')
    595         finally:
    596             conn.close()
    597 
    598     def __enter__(self):
    599         return self
    600 
    601     def __exit__(self, exc_type, exc_val, exc_tb):
    602         self.shutdown()
    603 
    604     @staticmethod
    605     def _finalize_manager(process, address, authkey, state, _Client):
    606         '''
    607         Shutdown the manager process; will be registered as a finalizer
    608         '''
    609         if process.is_alive():
    610             util.info('sending shutdown message to manager')
    611             try:
    612                 conn = _Client(address, authkey=authkey)
    613                 try:
    614                     dispatch(conn, None, 'shutdown')
    615                 finally:
    616                     conn.close()
    617             except Exception:
    618                 pass
    619 
    620             process.join(timeout=0.2)
    621             if process.is_alive():
    622                 util.info('manager still alive')
    623                 if hasattr(process, 'terminate'):
    624                     util.info('trying to `terminate()` manager process')
    625                     process.terminate()
    626                     process.join(timeout=0.1)
    627                     if process.is_alive():
    628                         util.info('manager still alive after terminate')
    629 
    630         state.value = State.SHUTDOWN
    631         try:
    632             del BaseProxy._address_to_local[address]
    633         except KeyError:
    634             pass
    635 
    636     address = property(lambda self: self._address)
    637 
    638     @classmethod
    639     def register(cls, typeid, callable=None, proxytype=None, exposed=None,
    640                  method_to_typeid=None, create_method=True):
    641         '''
    642         Register a typeid with the manager type
    643         '''
    644         if '_registry' not in cls.__dict__:
    645             cls._registry = cls._registry.copy()
    646 
    647         if proxytype is None:
    648             proxytype = AutoProxy
    649 
    650         exposed = exposed or getattr(proxytype, '_exposed_', None)
    651 
    652         method_to_typeid = method_to_typeid or \
    653                            getattr(proxytype, '_method_to_typeid_', None)
    654 
    655         if method_to_typeid:
    656             for key, value in method_to_typeid.items():
    657                 assert type(key) is str, '%r is not a string' % key
    658                 assert type(value) is str, '%r is not a string' % value
    659 
    660         cls._registry[typeid] = (
    661             callable, exposed, method_to_typeid, proxytype
    662             )
    663 
    664         if create_method:
    665             def temp(self, *args, **kwds):
    666                 util.debug('requesting creation of a shared %r object', typeid)
    667                 token, exp = self._create(typeid, *args, **kwds)
    668                 proxy = proxytype(
    669                     token, self._serializer, manager=self,
    670                     authkey=self._authkey, exposed=exp
    671                     )
    672                 conn = self._Client(token.address, authkey=self._authkey)
    673                 dispatch(conn, None, 'decref', (token.id,))
    674                 return proxy
    675             temp.__name__ = typeid
    676             setattr(cls, typeid, temp)
    677 
    678 #
    679 # Subclass of set which get cleared after a fork
    680 #
    681 
    682 class ProcessLocalSet(set):
    683     def __init__(self):
    684         util.register_after_fork(self, lambda obj: obj.clear())
    685     def __reduce__(self):
    686         return type(self), ()
    687 
    688 #
    689 # Definition of BaseProxy
    690 #
    691 
    692 class BaseProxy(object):
    693     '''
    694     A base for proxies of shared objects
    695     '''
    696     _address_to_local = {}
    697     _mutex = util.ForkAwareThreadLock()
    698 
    699     def __init__(self, token, serializer, manager=None,
    700                  authkey=None, exposed=None, incref=True):
    701         BaseProxy._mutex.acquire()
    702         try:
    703             tls_idset = BaseProxy._address_to_local.get(token.address, None)
    704             if tls_idset is None:
    705                 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
    706                 BaseProxy._address_to_local[token.address] = tls_idset
    707         finally:
    708             BaseProxy._mutex.release()
    709 
    710         # self._tls is used to record the connection used by this
    711         # thread to communicate with the manager at token.address
    712         self._tls = tls_idset[0]
    713 
    714         # self._idset is used to record the identities of all shared
    715         # objects for which the current process owns references and
    716         # which are in the manager at token.address
    717         self._idset = tls_idset[1]
    718 
    719         self._token = token
    720         self._id = self._token.id
    721         self._manager = manager
    722         self._serializer = serializer
    723         self._Client = listener_client[serializer][1]
    724 
    725         if authkey is not None:
    726             self._authkey = AuthenticationString(authkey)
    727         elif self._manager is not None:
    728             self._authkey = self._manager._authkey
    729         else:
    730             self._authkey = current_process().authkey
    731 
    732         if incref:
    733             self._incref()
    734 
    735         util.register_after_fork(self, BaseProxy._after_fork)
    736 
    737     def _connect(self):
    738         util.debug('making connection to manager')
    739         name = current_process().name
    740         if threading.current_thread().name != 'MainThread':
    741             name += '|' + threading.current_thread().name
    742         conn = self._Client(self._token.address, authkey=self._authkey)
    743         dispatch(conn, None, 'accept_connection', (name,))
    744         self._tls.connection = conn
    745 
    746     def _callmethod(self, methodname, args=(), kwds={}):
    747         '''
    748         Try to call a method of the referrent and return a copy of the result
    749         '''
    750         try:
    751             conn = self._tls.connection
    752         except AttributeError:
    753             util.debug('thread %r does not own a connection',
    754                        threading.current_thread().name)
    755             self._connect()
    756             conn = self._tls.connection
    757 
    758         conn.send((self._id, methodname, args, kwds))
    759         kind, result = conn.recv()
    760 
    761         if kind == '#RETURN':
    762             return result
    763         elif kind == '#PROXY':
    764             exposed, token = result
    765             proxytype = self._manager._registry[token.typeid][-1]
    766             token.address = self._token.address
    767             proxy = proxytype(
    768                 token, self._serializer, manager=self._manager,
    769                 authkey=self._authkey, exposed=exposed
    770                 )
    771             conn = self._Client(token.address, authkey=self._authkey)
    772             dispatch(conn, None, 'decref', (token.id,))
    773             return proxy
    774         raise convert_to_error(kind, result)
    775 
    776     def _getvalue(self):
    777         '''
    778         Get a copy of the value of the referent
    779         '''
    780         return self._callmethod('#GETVALUE')
    781 
    782     def _incref(self):
    783         conn = self._Client(self._token.address, authkey=self._authkey)
    784         dispatch(conn, None, 'incref', (self._id,))
    785         util.debug('INCREF %r', self._token.id)
    786 
    787         self._idset.add(self._id)
    788 
    789         state = self._manager and self._manager._state
    790 
    791         self._close = util.Finalize(
    792             self, BaseProxy._decref,
    793             args=(self._token, self._authkey, state,
    794                   self._tls, self._idset, self._Client),
    795             exitpriority=10
    796             )
    797 
    798     @staticmethod
    799     def _decref(token, authkey, state, tls, idset, _Client):
    800         idset.discard(token.id)
    801 
    802         # check whether manager is still alive
    803         if state is None or state.value == State.STARTED:
    804             # tell manager this process no longer cares about referent
    805             try:
    806                 util.debug('DECREF %r', token.id)
    807                 conn = _Client(token.address, authkey=authkey)
    808                 dispatch(conn, None, 'decref', (token.id,))
    809             except Exception, e:
    810                 util.debug('... decref failed %s', e)
    811 
    812         else:
    813             util.debug('DECREF %r -- manager already shutdown', token.id)
    814 
    815         # check whether we can close this thread's connection because
    816         # the process owns no more references to objects for this manager
    817         if not idset and hasattr(tls, 'connection'):
    818             util.debug('thread %r has no more proxies so closing conn',
    819                        threading.current_thread().name)
    820             tls.connection.close()
    821             del tls.connection
    822 
    823     def _after_fork(self):
    824         self._manager = None
    825         try:
    826             self._incref()
    827         except Exception, e:
    828             # the proxy may just be for a manager which has shutdown
    829             util.info('incref failed: %s' % e)
    830 
    831     def __reduce__(self):
    832         kwds = {}
    833         if Popen.thread_is_spawning():
    834             kwds['authkey'] = self._authkey
    835 
    836         if getattr(self, '_isauto', False):
    837             kwds['exposed'] = self._exposed_
    838             return (RebuildProxy,
    839                     (AutoProxy, self._token, self._serializer, kwds))
    840         else:
    841             return (RebuildProxy,
    842                     (type(self), self._token, self._serializer, kwds))
    843 
    844     def __deepcopy__(self, memo):
    845         return self._getvalue()
    846 
    847     def __repr__(self):
    848         return '<%s object, typeid %r at %s>' % \
    849                (type(self).__name__, self._token.typeid, '0x%x' % id(self))
    850 
    851     def __str__(self):
    852         '''
    853         Return representation of the referent (or a fall-back if that fails)
    854         '''
    855         try:
    856             return self._callmethod('__repr__')
    857         except Exception:
    858             return repr(self)[:-1] + "; '__str__()' failed>"
    859 
    860 #
    861 # Function used for unpickling
    862 #
    863 
    864 def RebuildProxy(func, token, serializer, kwds):
    865     '''
    866     Function used for unpickling proxy objects.
    867 
    868     If possible the shared object is returned, or otherwise a proxy for it.
    869     '''
    870     server = getattr(current_process(), '_manager_server', None)
    871 
    872     if server and server.address == token.address:
    873         return server.id_to_obj[token.id][0]
    874     else:
    875         incref = (
    876             kwds.pop('incref', True) and
    877             not getattr(current_process(), '_inheriting', False)
    878             )
    879         return func(token, serializer, incref=incref, **kwds)
    880 
    881 #
    882 # Functions to create proxies and proxy types
    883 #
    884 
    885 def MakeProxyType(name, exposed, _cache={}):
    886     '''
    887     Return a proxy type whose methods are given by `exposed`
    888     '''
    889     exposed = tuple(exposed)
    890     try:
    891         return _cache[(name, exposed)]
    892     except KeyError:
    893         pass
    894 
    895     dic = {}
    896 
    897     for meth in exposed:
    898         exec '''def %s(self, *args, **kwds):
    899         return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
    900 
    901     ProxyType = type(name, (BaseProxy,), dic)
    902     ProxyType._exposed_ = exposed
    903     _cache[(name, exposed)] = ProxyType
    904     return ProxyType
    905 
    906 
    907 def AutoProxy(token, serializer, manager=None, authkey=None,
    908               exposed=None, incref=True):
    909     '''
    910     Return an auto-proxy for `token`
    911     '''
    912     _Client = listener_client[serializer][1]
    913 
    914     if exposed is None:
    915         conn = _Client(token.address, authkey=authkey)
    916         try:
    917             exposed = dispatch(conn, None, 'get_methods', (token,))
    918         finally:
    919             conn.close()
    920 
    921     if authkey is None and manager is not None:
    922         authkey = manager._authkey
    923     if authkey is None:
    924         authkey = current_process().authkey
    925 
    926     ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
    927     proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
    928                       incref=incref)
    929     proxy._isauto = True
    930     return proxy
    931 
    932 #
    933 # Types/callables which we will register with SyncManager
    934 #
    935 
    936 class Namespace(object):
    937     def __init__(self, **kwds):
    938         self.__dict__.update(kwds)
    939     def __repr__(self):
    940         items = self.__dict__.items()
    941         temp = []
    942         for name, value in items:
    943             if not name.startswith('_'):
    944                 temp.append('%s=%r' % (name, value))
    945         temp.sort()
    946         return 'Namespace(%s)' % str.join(', ', temp)
    947 
    948 class Value(object):
    949     def __init__(self, typecode, value, lock=True):
    950         self._typecode = typecode
    951         self._value = value
    952     def get(self):
    953         return self._value
    954     def set(self, value):
    955         self._value = value
    956     def __repr__(self):
    957         return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
    958     value = property(get, set)
    959 
    960 def Array(typecode, sequence, lock=True):
    961     return array.array(typecode, sequence)
    962 
    963 #
    964 # Proxy types used by SyncManager
    965 #
    966 
    967 class IteratorProxy(BaseProxy):
    968     # XXX remove methods for Py3.0 and Py2.6
    969     _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
    970     def __iter__(self):
    971         return self
    972     def __next__(self, *args):
    973         return self._callmethod('__next__', args)
    974     def next(self, *args):
    975         return self._callmethod('next', args)
    976     def send(self, *args):
    977         return self._callmethod('send', args)
    978     def throw(self, *args):
    979         return self._callmethod('throw', args)
    980     def close(self, *args):
    981         return self._callmethod('close', args)
    982 
    983 
    984 class AcquirerProxy(BaseProxy):
    985     _exposed_ = ('acquire', 'release')
    986     def acquire(self, blocking=True):
    987         return self._callmethod('acquire', (blocking,))
    988     def release(self):
    989         return self._callmethod('release')
    990     def __enter__(self):
    991         return self._callmethod('acquire')
    992     def __exit__(self, exc_type, exc_val, exc_tb):
    993         return self._callmethod('release')
    994 
    995 
    996 class ConditionProxy(AcquirerProxy):
    997     # XXX will Condition.notfyAll() name be available in Py3.0?
    998     _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
    999     def wait(self, timeout=None):
   1000         return self._callmethod('wait', (timeout,))
   1001     def notify(self):
   1002         return self._callmethod('notify')
   1003     def notify_all(self):
   1004         return self._callmethod('notify_all')
   1005 
   1006 class EventProxy(BaseProxy):
   1007     _exposed_ = ('is_set', 'set', 'clear', 'wait')
   1008     def is_set(self):
   1009         return self._callmethod('is_set')
   1010     def set(self):
   1011         return self._callmethod('set')
   1012     def clear(self):
   1013         return self._callmethod('clear')
   1014     def wait(self, timeout=None):
   1015         return self._callmethod('wait', (timeout,))
   1016 
   1017 class NamespaceProxy(BaseProxy):
   1018     _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
   1019     def __getattr__(self, key):
   1020         if key[0] == '_':
   1021             return object.__getattribute__(self, key)
   1022         callmethod = object.__getattribute__(self, '_callmethod')
   1023         return callmethod('__getattribute__', (key,))
   1024     def __setattr__(self, key, value):
   1025         if key[0] == '_':
   1026             return object.__setattr__(self, key, value)
   1027         callmethod = object.__getattribute__(self, '_callmethod')
   1028         return callmethod('__setattr__', (key, value))
   1029     def __delattr__(self, key):
   1030         if key[0] == '_':
   1031             return object.__delattr__(self, key)
   1032         callmethod = object.__getattribute__(self, '_callmethod')
   1033         return callmethod('__delattr__', (key,))
   1034 
   1035 
   1036 class ValueProxy(BaseProxy):
   1037     _exposed_ = ('get', 'set')
   1038     def get(self):
   1039         return self._callmethod('get')
   1040     def set(self, value):
   1041         return self._callmethod('set', (value,))
   1042     value = property(get, set)
   1043 
   1044 
   1045 BaseListProxy = MakeProxyType('BaseListProxy', (
   1046     '__add__', '__contains__', '__delitem__', '__delslice__',
   1047     '__getitem__', '__getslice__', '__len__', '__mul__',
   1048     '__reversed__', '__rmul__', '__setitem__', '__setslice__',
   1049     'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
   1050     'reverse', 'sort', '__imul__'
   1051     ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
   1052 class ListProxy(BaseListProxy):
   1053     def __iadd__(self, value):
   1054         self._callmethod('extend', (value,))
   1055         return self
   1056     def __imul__(self, value):
   1057         self._callmethod('__imul__', (value,))
   1058         return self
   1059 
   1060 
   1061 DictProxy = MakeProxyType('DictProxy', (
   1062     '__contains__', '__delitem__', '__getitem__', '__len__',
   1063     '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
   1064     'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
   1065     ))
   1066 
   1067 
   1068 ArrayProxy = MakeProxyType('ArrayProxy', (
   1069     '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
   1070     ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
   1071 
   1072 
   1073 PoolProxy = MakeProxyType('PoolProxy', (
   1074     'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
   1075     'map', 'map_async', 'terminate'
   1076     ))
   1077 PoolProxy._method_to_typeid_ = {
   1078     'apply_async': 'AsyncResult',
   1079     'map_async': 'AsyncResult',
   1080     'imap': 'Iterator',
   1081     'imap_unordered': 'Iterator'
   1082     }
   1083 
   1084 #
   1085 # Definition of SyncManager
   1086 #
   1087 
   1088 class SyncManager(BaseManager):
   1089     '''
   1090     Subclass of `BaseManager` which supports a number of shared object types.
   1091 
   1092     The types registered are those intended for the synchronization
   1093     of threads, plus `dict`, `list` and `Namespace`.
   1094 
   1095     The `multiprocessing.Manager()` function creates started instances of
   1096     this class.
   1097     '''
   1098 
   1099 SyncManager.register('Queue', Queue.Queue)
   1100 SyncManager.register('JoinableQueue', Queue.Queue)
   1101 SyncManager.register('Event', threading.Event, EventProxy)
   1102 SyncManager.register('Lock', threading.Lock, AcquirerProxy)
   1103 SyncManager.register('RLock', threading.RLock, AcquirerProxy)
   1104 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
   1105 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
   1106                      AcquirerProxy)
   1107 SyncManager.register('Condition', threading.Condition, ConditionProxy)
   1108 SyncManager.register('Pool', Pool, PoolProxy)
   1109 SyncManager.register('list', list, ListProxy)
   1110 SyncManager.register('dict', dict, DictProxy)
   1111 SyncManager.register('Value', Value, ValueProxy)
   1112 SyncManager.register('Array', Array, ArrayProxy)
   1113 SyncManager.register('Namespace', Namespace, NamespaceProxy)
   1114 
   1115 # types returned by methods of PoolProxy
   1116 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
   1117 SyncManager.register('AsyncResult', create_method=False)
   1118