Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # We use a background thread for sharing fds on Unix, and for sharing sockets on
      3 # Windows.
      4 #
      5 # A client which wants to pickle a resource registers it with the resource
      6 # sharer and gets an identifier in return.  The unpickling process will connect
      7 # to the resource sharer, sends the identifier and its pid, and then receives
      8 # the resource.
      9 #
     10 
     11 import os
     12 import signal
     13 import socket
     14 import sys
     15 import threading
     16 
     17 from . import process
     18 from .context import reduction
     19 from . import util
     20 
     21 __all__ = ['stop']
     22 
     23 
     24 if sys.platform == 'win32':
     25     __all__ += ['DupSocket']
     26 
     27     class DupSocket(object):
     28         '''Picklable wrapper for a socket.'''
     29         def __init__(self, sock):
     30             new_sock = sock.dup()
     31             def send(conn, pid):
     32                 share = new_sock.share(pid)
     33                 conn.send_bytes(share)
     34             self._id = _resource_sharer.register(send, new_sock.close)
     35 
     36         def detach(self):
     37             '''Get the socket.  This should only be called once.'''
     38             with _resource_sharer.get_connection(self._id) as conn:
     39                 share = conn.recv_bytes()
     40                 return socket.fromshare(share)
     41 
     42 else:
     43     __all__ += ['DupFd']
     44 
     45     class DupFd(object):
     46         '''Wrapper for fd which can be used at any time.'''
     47         def __init__(self, fd):
     48             new_fd = os.dup(fd)
     49             def send(conn, pid):
     50                 reduction.send_handle(conn, new_fd, pid)
     51             def close():
     52                 os.close(new_fd)
     53             self._id = _resource_sharer.register(send, close)
     54 
     55         def detach(self):
     56             '''Get the fd.  This should only be called once.'''
     57             with _resource_sharer.get_connection(self._id) as conn:
     58                 return reduction.recv_handle(conn)
     59 
     60 
     61 class _ResourceSharer(object):
     62     '''Manager for resouces using background thread.'''
     63     def __init__(self):
     64         self._key = 0
     65         self._cache = {}
     66         self._old_locks = []
     67         self._lock = threading.Lock()
     68         self._listener = None
     69         self._address = None
     70         self._thread = None
     71         util.register_after_fork(self, _ResourceSharer._afterfork)
     72 
     73     def register(self, send, close):
     74         '''Register resource, returning an identifier.'''
     75         with self._lock:
     76             if self._address is None:
     77                 self._start()
     78             self._key += 1
     79             self._cache[self._key] = (send, close)
     80             return (self._address, self._key)
     81 
     82     @staticmethod
     83     def get_connection(ident):
     84         '''Return connection from which to receive identified resource.'''
     85         from .connection import Client
     86         address, key = ident
     87         c = Client(address, authkey=process.current_process().authkey)
     88         c.send((key, os.getpid()))
     89         return c
     90 
     91     def stop(self, timeout=None):
     92         '''Stop the background thread and clear registered resources.'''
     93         from .connection import Client
     94         with self._lock:
     95             if self._address is not None:
     96                 c = Client(self._address,
     97                            authkey=process.current_process().authkey)
     98                 c.send(None)
     99                 c.close()
    100                 self._thread.join(timeout)
    101                 if self._thread.is_alive():
    102                     util.sub_warning('_ResourceSharer thread did '
    103                                      'not stop when asked')
    104                 self._listener.close()
    105                 self._thread = None
    106                 self._address = None
    107                 self._listener = None
    108                 for key, (send, close) in self._cache.items():
    109                     close()
    110                 self._cache.clear()
    111 
    112     def _afterfork(self):
    113         for key, (send, close) in self._cache.items():
    114             close()
    115         self._cache.clear()
    116         # If self._lock was locked at the time of the fork, it may be broken
    117         # -- see issue 6721.  Replace it without letting it be gc'ed.
    118         self._old_locks.append(self._lock)
    119         self._lock = threading.Lock()
    120         if self._listener is not None:
    121             self._listener.close()
    122         self._listener = None
    123         self._address = None
    124         self._thread = None
    125 
    126     def _start(self):
    127         from .connection import Listener
    128         assert self._listener is None
    129         util.debug('starting listener and thread for sending handles')
    130         self._listener = Listener(authkey=process.current_process().authkey)
    131         self._address = self._listener.address
    132         t = threading.Thread(target=self._serve)
    133         t.daemon = True
    134         t.start()
    135         self._thread = t
    136 
    137     def _serve(self):
    138         if hasattr(signal, 'pthread_sigmask'):
    139             signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
    140         while 1:
    141             try:
    142                 with self._listener.accept() as conn:
    143                     msg = conn.recv()
    144                     if msg is None:
    145                         break
    146                     key, destination_pid = msg
    147                     send, close = self._cache.pop(key)
    148                     try:
    149                         send(conn, destination_pid)
    150                     finally:
    151                         close()
    152             except:
    153                 if not util.is_exiting():
    154                     sys.excepthook(*sys.exc_info())
    155 
    156 
    157 _resource_sharer = _ResourceSharer()
    158 stop = _resource_sharer.stop
    159