1 # 2 # Module to allow connection and socket objects to be transferred 3 # between processes 4 # 5 # multiprocessing/reduction.py 6 # 7 # Copyright (c) 2006-2008, R Oudkerk 8 # All rights reserved. 9 # 10 # Redistribution and use in source and binary forms, with or without 11 # modification, are permitted provided that the following conditions 12 # are met: 13 # 14 # 1. Redistributions of source code must retain the above copyright 15 # notice, this list of conditions and the following disclaimer. 16 # 2. Redistributions in binary form must reproduce the above copyright 17 # notice, this list of conditions and the following disclaimer in the 18 # documentation and/or other materials provided with the distribution. 19 # 3. Neither the name of author nor the names of any contributors may be 20 # used to endorse or promote products derived from this software 21 # without specific prior written permission. 22 # 23 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 24 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 27 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 29 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 30 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 32 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 # SUCH DAMAGE. 34 # 35 36 __all__ = [] 37 38 import os 39 import sys 40 import socket 41 import threading 42 43 import _multiprocessing 44 from multiprocessing import current_process 45 from multiprocessing.forking import Popen, duplicate, close, ForkingPickler 46 from multiprocessing.util import register_after_fork, debug, sub_debug 47 from multiprocessing.connection import Client, Listener 48 49 50 # 51 # 52 # 53 54 if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): 55 raise ImportError('pickling of connections not supported') 56 57 # 58 # Platform specific definitions 59 # 60 61 if sys.platform == 'win32': 62 import _subprocess 63 from _multiprocessing import win32 64 65 def send_handle(conn, handle, destination_pid): 66 process_handle = win32.OpenProcess( 67 win32.PROCESS_ALL_ACCESS, False, destination_pid 68 ) 69 try: 70 new_handle = duplicate(handle, process_handle) 71 conn.send(new_handle) 72 finally: 73 close(process_handle) 74 75 def recv_handle(conn): 76 return conn.recv() 77 78 else: 79 def send_handle(conn, handle, destination_pid): 80 _multiprocessing.sendfd(conn.fileno(), handle) 81 82 def recv_handle(conn): 83 return _multiprocessing.recvfd(conn.fileno()) 84 85 # 86 # Support for a per-process server thread which caches pickled handles 87 # 88 89 _cache = set() 90 91 def _reset(obj): 92 global _lock, _listener, _cache 93 for h in _cache: 94 close(h) 95 _cache.clear() 96 _lock = threading.Lock() 97 _listener = None 98 99 _reset(None) 100 register_after_fork(_reset, _reset) 101 102 def _get_listener(): 103 global _listener 104 105 if _listener is None: 106 _lock.acquire() 107 try: 108 if _listener is None: 109 debug('starting listener and thread for sending handles') 110 _listener = Listener(authkey=current_process().authkey) 111 t = threading.Thread(target=_serve) 112 t.daemon = True 113 t.start() 114 finally: 115 _lock.release() 116 117 return _listener 118 119 def _serve(): 120 from .util import is_exiting, sub_warning 121 122 while 1: 123 try: 124 conn = _listener.accept() 125 handle_wanted, destination_pid = conn.recv() 126 _cache.remove(handle_wanted) 127 send_handle(conn, handle_wanted, destination_pid) 128 close(handle_wanted) 129 conn.close() 130 except: 131 if not is_exiting(): 132 import traceback 133 sub_warning( 134 'thread for sharing handles raised exception :\n' + 135 '-'*79 + '\n' + traceback.format_exc() + '-'*79 136 ) 137 138 # 139 # Functions to be used for pickling/unpickling objects with handles 140 # 141 142 def reduce_handle(handle): 143 if Popen.thread_is_spawning(): 144 return (None, Popen.duplicate_for_child(handle), True) 145 dup_handle = duplicate(handle) 146 _cache.add(dup_handle) 147 sub_debug('reducing handle %d', handle) 148 return (_get_listener().address, dup_handle, False) 149 150 def rebuild_handle(pickled_data): 151 address, handle, inherited = pickled_data 152 if inherited: 153 return handle 154 sub_debug('rebuilding handle %d', handle) 155 conn = Client(address, authkey=current_process().authkey) 156 conn.send((handle, os.getpid())) 157 new_handle = recv_handle(conn) 158 conn.close() 159 return new_handle 160 161 # 162 # Register `_multiprocessing.Connection` with `ForkingPickler` 163 # 164 165 def reduce_connection(conn): 166 rh = reduce_handle(conn.fileno()) 167 return rebuild_connection, (rh, conn.readable, conn.writable) 168 169 def rebuild_connection(reduced_handle, readable, writable): 170 handle = rebuild_handle(reduced_handle) 171 return _multiprocessing.Connection( 172 handle, readable=readable, writable=writable 173 ) 174 175 ForkingPickler.register(_multiprocessing.Connection, reduce_connection) 176 177 # 178 # Register `socket.socket` with `ForkingPickler` 179 # 180 181 def fromfd(fd, family, type_, proto=0): 182 s = socket.fromfd(fd, family, type_, proto) 183 if s.__class__ is not socket.socket: 184 s = socket.socket(_sock=s) 185 return s 186 187 def reduce_socket(s): 188 reduced_handle = reduce_handle(s.fileno()) 189 return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) 190 191 def rebuild_socket(reduced_handle, family, type_, proto): 192 fd = rebuild_handle(reduced_handle) 193 _sock = fromfd(fd, family, type_, proto) 194 close(fd) 195 return _sock 196 197 ForkingPickler.register(socket.socket, reduce_socket) 198 199 # 200 # Register `_multiprocessing.PipeConnection` with `ForkingPickler` 201 # 202 203 if sys.platform == 'win32': 204 205 def reduce_pipe_connection(conn): 206 rh = reduce_handle(conn.fileno()) 207 return rebuild_pipe_connection, (rh, conn.readable, conn.writable) 208 209 def rebuild_pipe_connection(reduced_handle, readable, writable): 210 handle = rebuild_handle(reduced_handle) 211 return _multiprocessing.PipeConnection( 212 handle, readable=readable, writable=writable 213 ) 214 215 ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) 216