Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module providing the `SyncManager` class for dealing
      3 # with shared objects
      4 #
      5 # multiprocessing/managers.py
      6 #
      7 # Copyright (c) 2006-2008, R Oudkerk
      8 # Licensed to PSF under a Contributor Agreement.
      9 #
     10 
     11 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
     12 
     13 #
     14 # Imports
     15 #
     16 
     17 import sys
     18 import threading
     19 import array
     20 import queue
     21 
     22 from time import time as _time
     23 from traceback import format_exc
     24 
     25 from . import connection
     26 from .context import reduction, get_spawning_popen
     27 from . import pool
     28 from . import process
     29 from . import util
     30 from . import get_context
     31 
     32 #
     33 # Register some things for pickling
     34 #
     35 
     36 def reduce_array(a):
     37     return array.array, (a.typecode, a.tobytes())
     38 reduction.register(array.array, reduce_array)
     39 
     40 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
     41 if view_types[0] is not list:       # only needed in Py3.0
     42     def rebuild_as_list(obj):
     43         return list, (list(obj),)
     44     for view_type in view_types:
     45         reduction.register(view_type, rebuild_as_list)
     46 
     47 #
     48 # Type for identifying shared objects
     49 #
     50 
     51 class Token(object):
     52     '''
     53     Type to uniquely indentify a shared object
     54     '''
     55     __slots__ = ('typeid', 'address', 'id')
     56 
     57     def __init__(self, typeid, address, id):
     58         (self.typeid, self.address, self.id) = (typeid, address, id)
     59 
     60     def __getstate__(self):
     61         return (self.typeid, self.address, self.id)
     62 
     63     def __setstate__(self, state):
     64         (self.typeid, self.address, self.id) = state
     65 
     66     def __repr__(self):
     67         return '%s(typeid=%r, address=%r, id=%r)' % \
     68                (self.__class__.__name__, self.typeid, self.address, self.id)
     69 
     70 #
     71 # Function for communication with a manager's server process
     72 #
     73 
     74 def dispatch(c, id, methodname, args=(), kwds={}):
     75     '''
     76     Send a message to manager using connection `c` and return response
     77     '''
     78     c.send((id, methodname, args, kwds))
     79     kind, result = c.recv()
     80     if kind == '#RETURN':
     81         return result
     82     raise convert_to_error(kind, result)
     83 
     84 def convert_to_error(kind, result):
     85     if kind == '#ERROR':
     86         return result
     87     elif kind == '#TRACEBACK':
     88         assert type(result) is str
     89         return  RemoteError(result)
     90     elif kind == '#UNSERIALIZABLE':
     91         assert type(result) is str
     92         return RemoteError('Unserializable message: %s\n' % result)
     93     else:
     94         return ValueError('Unrecognized message type')
     95 
     96 class RemoteError(Exception):
     97     def __str__(self):
     98         return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
     99 
    100 #
    101 # Functions for finding the method names of an object
    102 #
    103 
    104 def all_methods(obj):
    105     '''
    106     Return a list of names of methods of `obj`
    107     '''
    108     temp = []
    109     for name in dir(obj):
    110         func = getattr(obj, name)
    111         if callable(func):
    112             temp.append(name)
    113     return temp
    114 
    115 def public_methods(obj):
    116     '''
    117     Return a list of names of methods of `obj` which do not start with '_'
    118     '''
    119     return [name for name in all_methods(obj) if name[0] != '_']
    120 
    121 #
    122 # Server which is run in a process controlled by a manager
    123 #
    124 
    125 class Server(object):
    126     '''
    127     Server class which runs in a process controlled by a manager object
    128     '''
    129     public = ['shutdown', 'create', 'accept_connection', 'get_methods',
    130               'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
    131 
    132     def __init__(self, registry, address, authkey, serializer):
    133         assert isinstance(authkey, bytes)
    134         self.registry = registry
    135         self.authkey = process.AuthenticationString(authkey)
    136         Listener, Client = listener_client[serializer]
    137 
    138         # do authentication later
    139         self.listener = Listener(address=address, backlog=16)
    140         self.address = self.listener.address
    141 
    142         self.id_to_obj = {'0': (None, ())}
    143         self.id_to_refcount = {}
    144         self.id_to_local_proxy_obj = {}
    145         self.mutex = threading.Lock()
    146 
    147     def serve_forever(self):
    148         '''
    149         Run the server forever
    150         '''
    151         self.stop_event = threading.Event()
    152         process.current_process()._manager_server = self
    153         try:
    154             accepter = threading.Thread(target=self.accepter)
    155             accepter.daemon = True
    156             accepter.start()
    157             try:
    158                 while not self.stop_event.is_set():
    159                     self.stop_event.wait(1)
    160             except (KeyboardInterrupt, SystemExit):
    161                 pass
    162         finally:
    163             if sys.stdout != sys.__stdout__:
    164                 util.debug('resetting stdout, stderr')
    165                 sys.stdout = sys.__stdout__
    166                 sys.stderr = sys.__stderr__
    167             sys.exit(0)
    168 
    169     def accepter(self):
    170         while True:
    171             try:
    172                 c = self.listener.accept()
    173             except OSError:
    174                 continue
    175             t = threading.Thread(target=self.handle_request, args=(c,))
    176             t.daemon = True
    177             t.start()
    178 
    179     def handle_request(self, c):
    180         '''
    181         Handle a new connection
    182         '''
    183         funcname = result = request = None
    184         try:
    185             connection.deliver_challenge(c, self.authkey)
    186             connection.answer_challenge(c, self.authkey)
    187             request = c.recv()
    188             ignore, funcname, args, kwds = request
    189             assert funcname in self.public, '%r unrecognized' % funcname
    190             func = getattr(self, funcname)
    191         except Exception:
    192             msg = ('#TRACEBACK', format_exc())
    193         else:
    194             try:
    195                 result = func(c, *args, **kwds)
    196             except Exception:
    197                 msg = ('#TRACEBACK', format_exc())
    198             else:
    199                 msg = ('#RETURN', result)
    200         try:
    201             c.send(msg)
    202         except Exception as e:
    203             try:
    204                 c.send(('#TRACEBACK', format_exc()))
    205             except Exception:
    206                 pass
    207             util.info('Failure to send message: %r', msg)
    208             util.info(' ... request was %r', request)
    209             util.info(' ... exception was %r', e)
    210 
    211         c.close()
    212 
    213     def serve_client(self, conn):
    214         '''
    215         Handle requests from the proxies in a particular process/thread
    216         '''
    217         util.debug('starting server thread to service %r',
    218                    threading.current_thread().name)
    219 
    220         recv = conn.recv
    221         send = conn.send
    222         id_to_obj = self.id_to_obj
    223 
    224         while not self.stop_event.is_set():
    225 
    226             try:
    227                 methodname = obj = None
    228                 request = recv()
    229                 ident, methodname, args, kwds = request
    230                 try:
    231                     obj, exposed, gettypeid = id_to_obj[ident]
    232                 except KeyError as ke:
    233                     try:
    234                         obj, exposed, gettypeid = \
    235                             self.id_to_local_proxy_obj[ident]
    236                     except KeyError as second_ke:
    237                         raise ke
    238 
    239                 if methodname not in exposed:
    240                     raise AttributeError(
    241                         'method %r of %r object is not in exposed=%r' %
    242                         (methodname, type(obj), exposed)
    243                         )
    244 
    245                 function = getattr(obj, methodname)
    246 
    247                 try:
    248                     res = function(*args, **kwds)
    249                 except Exception as e:
    250                     msg = ('#ERROR', e)
    251                 else:
    252                     typeid = gettypeid and gettypeid.get(methodname, None)
    253                     if typeid:
    254                         rident, rexposed = self.create(conn, typeid, res)
    255                         token = Token(typeid, self.address, rident)
    256                         msg = ('#PROXY', (rexposed, token))
    257                     else:
    258                         msg = ('#RETURN', res)
    259 
    260             except AttributeError:
    261                 if methodname is None:
    262                     msg = ('#TRACEBACK', format_exc())
    263                 else:
    264                     try:
    265                         fallback_func = self.fallback_mapping[methodname]
    266                         result = fallback_func(
    267                             self, conn, ident, obj, *args, **kwds
    268                             )
    269                         msg = ('#RETURN', result)
    270                     except Exception:
    271                         msg = ('#TRACEBACK', format_exc())
    272 
    273             except EOFError:
    274                 util.debug('got EOF -- exiting thread serving %r',
    275                            threading.current_thread().name)
    276                 sys.exit(0)
    277 
    278             except Exception:
    279                 msg = ('#TRACEBACK', format_exc())
    280 
    281             try:
    282                 try:
    283                     send(msg)
    284                 except Exception as e:
    285                     send(('#UNSERIALIZABLE', format_exc()))
    286             except Exception as e:
    287                 util.info('exception in thread serving %r',
    288                         threading.current_thread().name)
    289                 util.info(' ... message was %r', msg)
    290                 util.info(' ... exception was %r', e)
    291                 conn.close()
    292                 sys.exit(1)
    293 
    294     def fallback_getvalue(self, conn, ident, obj):
    295         return obj
    296 
    297     def fallback_str(self, conn, ident, obj):
    298         return str(obj)
    299 
    300     def fallback_repr(self, conn, ident, obj):
    301         return repr(obj)
    302 
    303     fallback_mapping = {
    304         '__str__':fallback_str,
    305         '__repr__':fallback_repr,
    306         '#GETVALUE':fallback_getvalue
    307         }
    308 
    309     def dummy(self, c):
    310         pass
    311 
    312     def debug_info(self, c):
    313         '''
    314         Return some info --- useful to spot problems with refcounting
    315         '''
    316         with self.mutex:
    317             result = []
    318             keys = list(self.id_to_refcount.keys())
    319             keys.sort()
    320             for ident in keys:
    321                 if ident != '0':
    322                     result.append('  %s:       refcount=%s\n    %s' %
    323                                   (ident, self.id_to_refcount[ident],
    324                                    str(self.id_to_obj[ident][0])[:75]))
    325             return '\n'.join(result)
    326 
    327     def number_of_objects(self, c):
    328         '''
    329         Number of shared objects
    330         '''
    331         # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
    332         return len(self.id_to_refcount)
    333 
    334     def shutdown(self, c):
    335         '''
    336         Shutdown this process
    337         '''
    338         try:
    339             util.debug('manager received shutdown message')
    340             c.send(('#RETURN', None))
    341         except:
    342             import traceback
    343             traceback.print_exc()
    344         finally:
    345             self.stop_event.set()
    346 
    347     def create(self, c, typeid, *args, **kwds):
    348         '''
    349         Create a new shared object and return its id
    350         '''
    351         with self.mutex:
    352             callable, exposed, method_to_typeid, proxytype = \
    353                       self.registry[typeid]
    354 
    355             if callable is None:
    356                 assert len(args) == 1 and not kwds
    357                 obj = args[0]
    358             else:
    359                 obj = callable(*args, **kwds)
    360 
    361             if exposed is None:
    362                 exposed = public_methods(obj)
    363             if method_to_typeid is not None:
    364                 assert type(method_to_typeid) is dict
    365                 exposed = list(exposed) + list(method_to_typeid)
    366 
    367             ident = '%x' % id(obj)  # convert to string because xmlrpclib
    368                                     # only has 32 bit signed integers
    369             util.debug('%r callable returned object with id %r', typeid, ident)
    370 
    371             self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
    372             if ident not in self.id_to_refcount:
    373                 self.id_to_refcount[ident] = 0
    374 
    375         self.incref(c, ident)
    376         return ident, tuple(exposed)
    377 
    378     def get_methods(self, c, token):
    379         '''
    380         Return the methods of the shared object indicated by token
    381         '''
    382         return tuple(self.id_to_obj[token.id][1])
    383 
    384     def accept_connection(self, c, name):
    385         '''
    386         Spawn a new thread to serve this connection
    387         '''
    388         threading.current_thread().name = name
    389         c.send(('#RETURN', None))
    390         self.serve_client(c)
    391 
    392     def incref(self, c, ident):
    393         with self.mutex:
    394             try:
    395                 self.id_to_refcount[ident] += 1
    396             except KeyError as ke:
    397                 # If no external references exist but an internal (to the
    398                 # manager) still does and a new external reference is created
    399                 # from it, restore the manager's tracking of it from the
    400                 # previously stashed internal ref.
    401                 if ident in self.id_to_local_proxy_obj:
    402                     self.id_to_refcount[ident] = 1
    403                     self.id_to_obj[ident] = \
    404                         self.id_to_local_proxy_obj[ident]
    405                     obj, exposed, gettypeid = self.id_to_obj[ident]
    406                     util.debug('Server re-enabled tracking & INCREF %r', ident)
    407                 else:
    408                     raise ke
    409 
    410     def decref(self, c, ident):
    411         if ident not in self.id_to_refcount and \
    412             ident in self.id_to_local_proxy_obj:
    413             util.debug('Server DECREF skipping %r', ident)
    414             return
    415 
    416         with self.mutex:
    417             assert self.id_to_refcount[ident] >= 1
    418             self.id_to_refcount[ident] -= 1
    419             if self.id_to_refcount[ident] == 0:
    420                 del self.id_to_refcount[ident]
    421 
    422         if ident not in self.id_to_refcount:
    423             # Two-step process in case the object turns out to contain other
    424             # proxy objects (e.g. a managed list of managed lists).
    425             # Otherwise, deleting self.id_to_obj[ident] would trigger the
    426             # deleting of the stored value (another managed object) which would
    427             # in turn attempt to acquire the mutex that is already held here.
    428             self.id_to_obj[ident] = (None, (), None)  # thread-safe
    429             util.debug('disposing of obj with id %r', ident)
    430             with self.mutex:
    431                 del self.id_to_obj[ident]
    432 
    433 
    434 #
    435 # Class to represent state of a manager
    436 #
    437 
    438 class State(object):
    439     __slots__ = ['value']
    440     INITIAL = 0
    441     STARTED = 1
    442     SHUTDOWN = 2
    443 
    444 #
    445 # Mapping from serializer name to Listener and Client types
    446 #
    447 
    448 listener_client = {
    449     'pickle' : (connection.Listener, connection.Client),
    450     'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
    451     }
    452 
    453 #
    454 # Definition of BaseManager
    455 #
    456 
    457 class BaseManager(object):
    458     '''
    459     Base class for managers
    460     '''
    461     _registry = {}
    462     _Server = Server
    463 
    464     def __init__(self, address=None, authkey=None, serializer='pickle',
    465                  ctx=None):
    466         if authkey is None:
    467             authkey = process.current_process().authkey
    468         self._address = address     # XXX not final address if eg ('', 0)
    469         self._authkey = process.AuthenticationString(authkey)
    470         self._state = State()
    471         self._state.value = State.INITIAL
    472         self._serializer = serializer
    473         self._Listener, self._Client = listener_client[serializer]
    474         self._ctx = ctx or get_context()
    475 
    476     def get_server(self):
    477         '''
    478         Return server object with serve_forever() method and address attribute
    479         '''
    480         assert self._state.value == State.INITIAL
    481         return Server(self._registry, self._address,
    482                       self._authkey, self._serializer)
    483 
    484     def connect(self):
    485         '''
    486         Connect manager object to the server process
    487         '''
    488         Listener, Client = listener_client[self._serializer]
    489         conn = Client(self._address, authkey=self._authkey)
    490         dispatch(conn, None, 'dummy')
    491         self._state.value = State.STARTED
    492 
    493     def start(self, initializer=None, initargs=()):
    494         '''
    495         Spawn a server process for this manager object
    496         '''
    497         assert self._state.value == State.INITIAL
    498 
    499         if initializer is not None and not callable(initializer):
    500             raise TypeError('initializer must be a callable')
    501 
    502         # pipe over which we will retrieve address of server
    503         reader, writer = connection.Pipe(duplex=False)
    504 
    505         # spawn process which runs a server
    506         self._process = self._ctx.Process(
    507             target=type(self)._run_server,
    508             args=(self._registry, self._address, self._authkey,
    509                   self._serializer, writer, initializer, initargs),
    510             )
    511         ident = ':'.join(str(i) for i in self._process._identity)
    512         self._process.name = type(self).__name__  + '-' + ident
    513         self._process.start()
    514 
    515         # get address of server
    516         writer.close()
    517         self._address = reader.recv()
    518         reader.close()
    519 
    520         # register a finalizer
    521         self._state.value = State.STARTED
    522         self.shutdown = util.Finalize(
    523             self, type(self)._finalize_manager,
    524             args=(self._process, self._address, self._authkey,
    525                   self._state, self._Client),
    526             exitpriority=0
    527             )
    528 
    529     @classmethod
    530     def _run_server(cls, registry, address, authkey, serializer, writer,
    531                     initializer=None, initargs=()):
    532         '''
    533         Create a server, report its address and run it
    534         '''
    535         if initializer is not None:
    536             initializer(*initargs)
    537 
    538         # create server
    539         server = cls._Server(registry, address, authkey, serializer)
    540 
    541         # inform parent process of the server's address
    542         writer.send(server.address)
    543         writer.close()
    544 
    545         # run the manager
    546         util.info('manager serving at %r', server.address)
    547         server.serve_forever()
    548 
    549     def _create(self, typeid, *args, **kwds):
    550         '''
    551         Create a new shared object; return the token and exposed tuple
    552         '''
    553         assert self._state.value == State.STARTED, 'server not yet started'
    554         conn = self._Client(self._address, authkey=self._authkey)
    555         try:
    556             id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
    557         finally:
    558             conn.close()
    559         return Token(typeid, self._address, id), exposed
    560 
    561     def join(self, timeout=None):
    562         '''
    563         Join the manager process (if it has been spawned)
    564         '''
    565         if self._process is not None:
    566             self._process.join(timeout)
    567             if not self._process.is_alive():
    568                 self._process = None
    569 
    570     def _debug_info(self):
    571         '''
    572         Return some info about the servers shared objects and connections
    573         '''
    574         conn = self._Client(self._address, authkey=self._authkey)
    575         try:
    576             return dispatch(conn, None, 'debug_info')
    577         finally:
    578             conn.close()
    579 
    580     def _number_of_objects(self):
    581         '''
    582         Return the number of shared objects
    583         '''
    584         conn = self._Client(self._address, authkey=self._authkey)
    585         try:
    586             return dispatch(conn, None, 'number_of_objects')
    587         finally:
    588             conn.close()
    589 
    590     def __enter__(self):
    591         if self._state.value == State.INITIAL:
    592             self.start()
    593         assert self._state.value == State.STARTED
    594         return self
    595 
    596     def __exit__(self, exc_type, exc_val, exc_tb):
    597         self.shutdown()
    598 
    599     @staticmethod
    600     def _finalize_manager(process, address, authkey, state, _Client):
    601         '''
    602         Shutdown the manager process; will be registered as a finalizer
    603         '''
    604         if process.is_alive():
    605             util.info('sending shutdown message to manager')
    606             try:
    607                 conn = _Client(address, authkey=authkey)
    608                 try:
    609                     dispatch(conn, None, 'shutdown')
    610                 finally:
    611                     conn.close()
    612             except Exception:
    613                 pass
    614 
    615             process.join(timeout=1.0)
    616             if process.is_alive():
    617                 util.info('manager still alive')
    618                 if hasattr(process, 'terminate'):
    619                     util.info('trying to `terminate()` manager process')
    620                     process.terminate()
    621                     process.join(timeout=0.1)
    622                     if process.is_alive():
    623                         util.info('manager still alive after terminate')
    624 
    625         state.value = State.SHUTDOWN
    626         try:
    627             del BaseProxy._address_to_local[address]
    628         except KeyError:
    629             pass
    630 
    631     address = property(lambda self: self._address)
    632 
    633     @classmethod
    634     def register(cls, typeid, callable=None, proxytype=None, exposed=None,
    635                  method_to_typeid=None, create_method=True):
    636         '''
    637         Register a typeid with the manager type
    638         '''
    639         if '_registry' not in cls.__dict__:
    640             cls._registry = cls._registry.copy()
    641 
    642         if proxytype is None:
    643             proxytype = AutoProxy
    644 
    645         exposed = exposed or getattr(proxytype, '_exposed_', None)
    646 
    647         method_to_typeid = method_to_typeid or \
    648                            getattr(proxytype, '_method_to_typeid_', None)
    649 
    650         if method_to_typeid:
    651             for key, value in list(method_to_typeid.items()):
    652                 assert type(key) is str, '%r is not a string' % key
    653                 assert type(value) is str, '%r is not a string' % value
    654 
    655         cls._registry[typeid] = (
    656             callable, exposed, method_to_typeid, proxytype
    657             )
    658 
    659         if create_method:
    660             def temp(self, *args, **kwds):
    661                 util.debug('requesting creation of a shared %r object', typeid)
    662                 token, exp = self._create(typeid, *args, **kwds)
    663                 proxy = proxytype(
    664                     token, self._serializer, manager=self,
    665                     authkey=self._authkey, exposed=exp
    666                     )
    667                 conn = self._Client(token.address, authkey=self._authkey)
    668                 dispatch(conn, None, 'decref', (token.id,))
    669                 return proxy
    670             temp.__name__ = typeid
    671             setattr(cls, typeid, temp)
    672 
    673 #
    674 # Subclass of set which get cleared after a fork
    675 #
    676 
    677 class ProcessLocalSet(set):
    678     def __init__(self):
    679         util.register_after_fork(self, lambda obj: obj.clear())
    680     def __reduce__(self):
    681         return type(self), ()
    682 
    683 #
    684 # Definition of BaseProxy
    685 #
    686 
    687 class BaseProxy(object):
    688     '''
    689     A base for proxies of shared objects
    690     '''
    691     _address_to_local = {}
    692     _mutex = util.ForkAwareThreadLock()
    693 
    694     def __init__(self, token, serializer, manager=None,
    695                  authkey=None, exposed=None, incref=True, manager_owned=False):
    696         with BaseProxy._mutex:
    697             tls_idset = BaseProxy._address_to_local.get(token.address, None)
    698             if tls_idset is None:
    699                 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
    700                 BaseProxy._address_to_local[token.address] = tls_idset
    701 
    702         # self._tls is used to record the connection used by this
    703         # thread to communicate with the manager at token.address
    704         self._tls = tls_idset[0]
    705 
    706         # self._idset is used to record the identities of all shared
    707         # objects for which the current process owns references and
    708         # which are in the manager at token.address
    709         self._idset = tls_idset[1]
    710 
    711         self._token = token
    712         self._id = self._token.id
    713         self._manager = manager
    714         self._serializer = serializer
    715         self._Client = listener_client[serializer][1]
    716 
    717         # Should be set to True only when a proxy object is being created
    718         # on the manager server; primary use case: nested proxy objects.
    719         # RebuildProxy detects when a proxy is being created on the manager
    720         # and sets this value appropriately.
    721         self._owned_by_manager = manager_owned
    722 
    723         if authkey is not None:
    724             self._authkey = process.AuthenticationString(authkey)
    725         elif self._manager is not None:
    726             self._authkey = self._manager._authkey
    727         else:
    728             self._authkey = process.current_process().authkey
    729 
    730         if incref:
    731             self._incref()
    732 
    733         util.register_after_fork(self, BaseProxy._after_fork)
    734 
    735     def _connect(self):
    736         util.debug('making connection to manager')
    737         name = process.current_process().name
    738         if threading.current_thread().name != 'MainThread':
    739             name += '|' + threading.current_thread().name
    740         conn = self._Client(self._token.address, authkey=self._authkey)
    741         dispatch(conn, None, 'accept_connection', (name,))
    742         self._tls.connection = conn
    743 
    744     def _callmethod(self, methodname, args=(), kwds={}):
    745         '''
    746         Try to call a method of the referrent and return a copy of the result
    747         '''
    748         try:
    749             conn = self._tls.connection
    750         except AttributeError:
    751             util.debug('thread %r does not own a connection',
    752                        threading.current_thread().name)
    753             self._connect()
    754             conn = self._tls.connection
    755 
    756         conn.send((self._id, methodname, args, kwds))
    757         kind, result = conn.recv()
    758 
    759         if kind == '#RETURN':
    760             return result
    761         elif kind == '#PROXY':
    762             exposed, token = result
    763             proxytype = self._manager._registry[token.typeid][-1]
    764             token.address = self._token.address
    765             proxy = proxytype(
    766                 token, self._serializer, manager=self._manager,
    767                 authkey=self._authkey, exposed=exposed
    768                 )
    769             conn = self._Client(token.address, authkey=self._authkey)
    770             dispatch(conn, None, 'decref', (token.id,))
    771             return proxy
    772         raise convert_to_error(kind, result)
    773 
    774     def _getvalue(self):
    775         '''
    776         Get a copy of the value of the referent
    777         '''
    778         return self._callmethod('#GETVALUE')
    779 
    780     def _incref(self):
    781         if self._owned_by_manager:
    782             util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
    783             return
    784 
    785         conn = self._Client(self._token.address, authkey=self._authkey)
    786         dispatch(conn, None, 'incref', (self._id,))
    787         util.debug('INCREF %r', self._token.id)
    788 
    789         self._idset.add(self._id)
    790 
    791         state = self._manager and self._manager._state
    792 
    793         self._close = util.Finalize(
    794             self, BaseProxy._decref,
    795             args=(self._token, self._authkey, state,
    796                   self._tls, self._idset, self._Client),
    797             exitpriority=10
    798             )
    799 
    800     @staticmethod
    801     def _decref(token, authkey, state, tls, idset, _Client):
    802         idset.discard(token.id)
    803 
    804         # check whether manager is still alive
    805         if state is None or state.value == State.STARTED:
    806             # tell manager this process no longer cares about referent
    807             try:
    808                 util.debug('DECREF %r', token.id)
    809                 conn = _Client(token.address, authkey=authkey)
    810                 dispatch(conn, None, 'decref', (token.id,))
    811             except Exception as e:
    812                 util.debug('... decref failed %s', e)
    813 
    814         else:
    815             util.debug('DECREF %r -- manager already shutdown', token.id)
    816 
    817         # check whether we can close this thread's connection because
    818         # the process owns no more references to objects for this manager
    819         if not idset and hasattr(tls, 'connection'):
    820             util.debug('thread %r has no more proxies so closing conn',
    821                        threading.current_thread().name)
    822             tls.connection.close()
    823             del tls.connection
    824 
    825     def _after_fork(self):
    826         self._manager = None
    827         try:
    828             self._incref()
    829         except Exception as e:
    830             # the proxy may just be for a manager which has shutdown
    831             util.info('incref failed: %s' % e)
    832 
    833     def __reduce__(self):
    834         kwds = {}
    835         if get_spawning_popen() is not None:
    836             kwds['authkey'] = self._authkey
    837 
    838         if getattr(self, '_isauto', False):
    839             kwds['exposed'] = self._exposed_
    840             return (RebuildProxy,
    841                     (AutoProxy, self._token, self._serializer, kwds))
    842         else:
    843             return (RebuildProxy,
    844                     (type(self), self._token, self._serializer, kwds))
    845 
    846     def __deepcopy__(self, memo):
    847         return self._getvalue()
    848 
    849     def __repr__(self):
    850         return '<%s object, typeid %r at %#x>' % \
    851                (type(self).__name__, self._token.typeid, id(self))
    852 
    853     def __str__(self):
    854         '''
    855         Return representation of the referent (or a fall-back if that fails)
    856         '''
    857         try:
    858             return self._callmethod('__repr__')
    859         except Exception:
    860             return repr(self)[:-1] + "; '__str__()' failed>"
    861 
    862 #
    863 # Function used for unpickling
    864 #
    865 
    866 def RebuildProxy(func, token, serializer, kwds):
    867     '''
    868     Function used for unpickling proxy objects.
    869     '''
    870     server = getattr(process.current_process(), '_manager_server', None)
    871     if server and server.address == token.address:
    872         util.debug('Rebuild a proxy owned by manager, token=%r', token)
    873         kwds['manager_owned'] = True
    874         if token.id not in server.id_to_local_proxy_obj:
    875             server.id_to_local_proxy_obj[token.id] = \
    876                 server.id_to_obj[token.id]
    877     incref = (
    878         kwds.pop('incref', True) and
    879         not getattr(process.current_process(), '_inheriting', False)
    880         )
    881     return func(token, serializer, incref=incref, **kwds)
    882 
    883 #
    884 # Functions to create proxies and proxy types
    885 #
    886 
    887 def MakeProxyType(name, exposed, _cache={}):
    888     '''
    889     Return a proxy type whose methods are given by `exposed`
    890     '''
    891     exposed = tuple(exposed)
    892     try:
    893         return _cache[(name, exposed)]
    894     except KeyError:
    895         pass
    896 
    897     dic = {}
    898 
    899     for meth in exposed:
    900         exec('''def %s(self, *args, **kwds):
    901         return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
    902 
    903     ProxyType = type(name, (BaseProxy,), dic)
    904     ProxyType._exposed_ = exposed
    905     _cache[(name, exposed)] = ProxyType
    906     return ProxyType
    907 
    908 
    909 def AutoProxy(token, serializer, manager=None, authkey=None,
    910               exposed=None, incref=True):
    911     '''
    912     Return an auto-proxy for `token`
    913     '''
    914     _Client = listener_client[serializer][1]
    915 
    916     if exposed is None:
    917         conn = _Client(token.address, authkey=authkey)
    918         try:
    919             exposed = dispatch(conn, None, 'get_methods', (token,))
    920         finally:
    921             conn.close()
    922 
    923     if authkey is None and manager is not None:
    924         authkey = manager._authkey
    925     if authkey is None:
    926         authkey = process.current_process().authkey
    927 
    928     ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
    929     proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
    930                       incref=incref)
    931     proxy._isauto = True
    932     return proxy
    933 
    934 #
    935 # Types/callables which we will register with SyncManager
    936 #
    937 
    938 class Namespace(object):
    939     def __init__(self, **kwds):
    940         self.__dict__.update(kwds)
    941     def __repr__(self):
    942         items = list(self.__dict__.items())
    943         temp = []
    944         for name, value in items:
    945             if not name.startswith('_'):
    946                 temp.append('%s=%r' % (name, value))
    947         temp.sort()
    948         return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
    949 
    950 class Value(object):
    951     def __init__(self, typecode, value, lock=True):
    952         self._typecode = typecode
    953         self._value = value
    954     def get(self):
    955         return self._value
    956     def set(self, value):
    957         self._value = value
    958     def __repr__(self):
    959         return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
    960     value = property(get, set)
    961 
    962 def Array(typecode, sequence, lock=True):
    963     return array.array(typecode, sequence)
    964 
    965 #
    966 # Proxy types used by SyncManager
    967 #
    968 
    969 class IteratorProxy(BaseProxy):
    970     _exposed_ = ('__next__', 'send', 'throw', 'close')
    971     def __iter__(self):
    972         return self
    973     def __next__(self, *args):
    974         return self._callmethod('__next__', args)
    975     def send(self, *args):
    976         return self._callmethod('send', args)
    977     def throw(self, *args):
    978         return self._callmethod('throw', args)
    979     def close(self, *args):
    980         return self._callmethod('close', args)
    981 
    982 
    983 class AcquirerProxy(BaseProxy):
    984     _exposed_ = ('acquire', 'release')
    985     def acquire(self, blocking=True, timeout=None):
    986         args = (blocking,) if timeout is None else (blocking, timeout)
    987         return self._callmethod('acquire', args)
    988     def release(self):
    989         return self._callmethod('release')
    990     def __enter__(self):
    991         return self._callmethod('acquire')
    992     def __exit__(self, exc_type, exc_val, exc_tb):
    993         return self._callmethod('release')
    994 
    995 
    996 class ConditionProxy(AcquirerProxy):
    997     _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
    998     def wait(self, timeout=None):
    999         return self._callmethod('wait', (timeout,))
   1000     def notify(self):
   1001         return self._callmethod('notify')
   1002     def notify_all(self):
   1003         return self._callmethod('notify_all')
   1004     def wait_for(self, predicate, timeout=None):
   1005         result = predicate()
   1006         if result:
   1007             return result
   1008         if timeout is not None:
   1009             endtime = _time() + timeout
   1010         else:
   1011             endtime = None
   1012             waittime = None
   1013         while not result:
   1014             if endtime is not None:
   1015                 waittime = endtime - _time()
   1016                 if waittime <= 0:
   1017                     break
   1018             self.wait(waittime)
   1019             result = predicate()
   1020         return result
   1021 
   1022 
   1023 class EventProxy(BaseProxy):
   1024     _exposed_ = ('is_set', 'set', 'clear', 'wait')
   1025     def is_set(self):
   1026         return self._callmethod('is_set')
   1027     def set(self):
   1028         return self._callmethod('set')
   1029     def clear(self):
   1030         return self._callmethod('clear')
   1031     def wait(self, timeout=None):
   1032         return self._callmethod('wait', (timeout,))
   1033 
   1034 
   1035 class BarrierProxy(BaseProxy):
   1036     _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
   1037     def wait(self, timeout=None):
   1038         return self._callmethod('wait', (timeout,))
   1039     def abort(self):
   1040         return self._callmethod('abort')
   1041     def reset(self):
   1042         return self._callmethod('reset')
   1043     @property
   1044     def parties(self):
   1045         return self._callmethod('__getattribute__', ('parties',))
   1046     @property
   1047     def n_waiting(self):
   1048         return self._callmethod('__getattribute__', ('n_waiting',))
   1049     @property
   1050     def broken(self):
   1051         return self._callmethod('__getattribute__', ('broken',))
   1052 
   1053 
   1054 class NamespaceProxy(BaseProxy):
   1055     _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
   1056     def __getattr__(self, key):
   1057         if key[0] == '_':
   1058             return object.__getattribute__(self, key)
   1059         callmethod = object.__getattribute__(self, '_callmethod')
   1060         return callmethod('__getattribute__', (key,))
   1061     def __setattr__(self, key, value):
   1062         if key[0] == '_':
   1063             return object.__setattr__(self, key, value)
   1064         callmethod = object.__getattribute__(self, '_callmethod')
   1065         return callmethod('__setattr__', (key, value))
   1066     def __delattr__(self, key):
   1067         if key[0] == '_':
   1068             return object.__delattr__(self, key)
   1069         callmethod = object.__getattribute__(self, '_callmethod')
   1070         return callmethod('__delattr__', (key,))
   1071 
   1072 
   1073 class ValueProxy(BaseProxy):
   1074     _exposed_ = ('get', 'set')
   1075     def get(self):
   1076         return self._callmethod('get')
   1077     def set(self, value):
   1078         return self._callmethod('set', (value,))
   1079     value = property(get, set)
   1080 
   1081 
   1082 BaseListProxy = MakeProxyType('BaseListProxy', (
   1083     '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
   1084     '__mul__', '__reversed__', '__rmul__', '__setitem__',
   1085     'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
   1086     'reverse', 'sort', '__imul__'
   1087     ))
   1088 class ListProxy(BaseListProxy):
   1089     def __iadd__(self, value):
   1090         self._callmethod('extend', (value,))
   1091         return self
   1092     def __imul__(self, value):
   1093         self._callmethod('__imul__', (value,))
   1094         return self
   1095 
   1096 
   1097 DictProxy = MakeProxyType('DictProxy', (
   1098     '__contains__', '__delitem__', '__getitem__', '__len__',
   1099     '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
   1100     'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
   1101     ))
   1102 
   1103 
   1104 ArrayProxy = MakeProxyType('ArrayProxy', (
   1105     '__len__', '__getitem__', '__setitem__'
   1106     ))
   1107 
   1108 
   1109 BasePoolProxy = MakeProxyType('PoolProxy', (
   1110     'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
   1111     'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
   1112     ))
   1113 BasePoolProxy._method_to_typeid_ = {
   1114     'apply_async': 'AsyncResult',
   1115     'map_async': 'AsyncResult',
   1116     'starmap_async': 'AsyncResult',
   1117     'imap': 'Iterator',
   1118     'imap_unordered': 'Iterator'
   1119     }
   1120 class PoolProxy(BasePoolProxy):
   1121     def __enter__(self):
   1122         return self
   1123     def __exit__(self, exc_type, exc_val, exc_tb):
   1124         self.terminate()
   1125 
   1126 #
   1127 # Definition of SyncManager
   1128 #
   1129 
   1130 class SyncManager(BaseManager):
   1131     '''
   1132     Subclass of `BaseManager` which supports a number of shared object types.
   1133 
   1134     The types registered are those intended for the synchronization
   1135     of threads, plus `dict`, `list` and `Namespace`.
   1136 
   1137     The `multiprocessing.Manager()` function creates started instances of
   1138     this class.
   1139     '''
   1140 
   1141 SyncManager.register('Queue', queue.Queue)
   1142 SyncManager.register('JoinableQueue', queue.Queue)
   1143 SyncManager.register('Event', threading.Event, EventProxy)
   1144 SyncManager.register('Lock', threading.Lock, AcquirerProxy)
   1145 SyncManager.register('RLock', threading.RLock, AcquirerProxy)
   1146 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
   1147 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
   1148                      AcquirerProxy)
   1149 SyncManager.register('Condition', threading.Condition, ConditionProxy)
   1150 SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
   1151 SyncManager.register('Pool', pool.Pool, PoolProxy)
   1152 SyncManager.register('list', list, ListProxy)
   1153 SyncManager.register('dict', dict, DictProxy)
   1154 SyncManager.register('Value', Value, ValueProxy)
   1155 SyncManager.register('Array', Array, ArrayProxy)
   1156 SyncManager.register('Namespace', Namespace, NamespaceProxy)
   1157 
   1158 # types returned by methods of PoolProxy
   1159 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
   1160 SyncManager.register('AsyncResult', create_method=False)
   1161