1 # 2 # A higher level module for using sockets (or Windows named pipes) 3 # 4 # multiprocessing/connection.py 5 # 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 33 # 34 35 __all__ = [ 'Client', 'Listener', 'Pipe' ] 36 37 import os 38 import sys 39 import socket 40 import errno 41 import time 42 import tempfile 43 import itertools 44 45 import _multiprocessing 46 from multiprocessing import current_process, AuthenticationError 47 from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug 48 from multiprocessing.forking import duplicate, close 49 50 51 # 52 # 53 # 54 55 BUFSIZE = 8192 56 # A very generous timeout when it comes to local connections... 57 CONNECTION_TIMEOUT = 20. 58 59 _mmap_counter = itertools.count() 60 61 default_family = 'AF_INET' 62 families = ['AF_INET'] 63 64 if hasattr(socket, 'AF_UNIX'): 65 default_family = 'AF_UNIX' 66 families += ['AF_UNIX'] 67 68 if sys.platform == 'win32': 69 default_family = 'AF_PIPE' 70 families += ['AF_PIPE'] 71 72 73 def _init_timeout(timeout=CONNECTION_TIMEOUT): 74 return time.time() + timeout 75 76 def _check_timeout(t): 77 return time.time() > t 78 79 # 80 # 81 # 82 83 def arbitrary_address(family): 84 ''' 85 Return an arbitrary free address for the given family 86 ''' 87 if family == 'AF_INET': 88 return ('localhost', 0) 89 elif family == 'AF_UNIX': 90 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir()) 91 elif family == 'AF_PIPE': 92 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 93 (os.getpid(), _mmap_counter.next())) 94 else: 95 raise ValueError('unrecognized family') 96 97 98 def address_type(address): 99 ''' 100 Return the types of the address 101 102 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 103 ''' 104 if type(address) == tuple: 105 return 'AF_INET' 106 elif type(address) is str and address.startswith('\\\\'): 107 return 'AF_PIPE' 108 elif type(address) is str: 109 return 'AF_UNIX' 110 else: 111 raise ValueError('address type of %r unrecognized' % address) 112 113 # 114 # Public functions 115 # 116 117 class Listener(object): 118 ''' 119 Returns a listener object. 120 121 This is a wrapper for a bound socket which is 'listening' for 122 connections, or for a Windows named pipe. 123 ''' 124 def __init__(self, address=None, family=None, backlog=1, authkey=None): 125 family = family or (address and address_type(address)) \ 126 or default_family 127 address = address or arbitrary_address(family) 128 129 if family == 'AF_PIPE': 130 self._listener = PipeListener(address, backlog) 131 else: 132 self._listener = SocketListener(address, family, backlog) 133 134 if authkey is not None and not isinstance(authkey, bytes): 135 raise TypeError, 'authkey should be a byte string' 136 137 self._authkey = authkey 138 139 def accept(self): 140 ''' 141 Accept a connection on the bound socket or named pipe of `self`. 142 143 Returns a `Connection` object. 144 ''' 145 c = self._listener.accept() 146 if self._authkey: 147 deliver_challenge(c, self._authkey) 148 answer_challenge(c, self._authkey) 149 return c 150 151 def close(self): 152 ''' 153 Close the bound socket or named pipe of `self`. 154 ''' 155 return self._listener.close() 156 157 address = property(lambda self: self._listener._address) 158 last_accepted = property(lambda self: self._listener._last_accepted) 159 160 161 def Client(address, family=None, authkey=None): 162 ''' 163 Returns a connection to the address of a `Listener` 164 ''' 165 family = family or address_type(address) 166 if family == 'AF_PIPE': 167 c = PipeClient(address) 168 else: 169 c = SocketClient(address) 170 171 if authkey is not None and not isinstance(authkey, bytes): 172 raise TypeError, 'authkey should be a byte string' 173 174 if authkey is not None: 175 answer_challenge(c, authkey) 176 deliver_challenge(c, authkey) 177 178 return c 179 180 181 if sys.platform != 'win32': 182 183 def Pipe(duplex=True): 184 ''' 185 Returns pair of connection objects at either end of a pipe 186 ''' 187 if duplex: 188 s1, s2 = socket.socketpair() 189 s1.setblocking(True) 190 s2.setblocking(True) 191 c1 = _multiprocessing.Connection(os.dup(s1.fileno())) 192 c2 = _multiprocessing.Connection(os.dup(s2.fileno())) 193 s1.close() 194 s2.close() 195 else: 196 fd1, fd2 = os.pipe() 197 c1 = _multiprocessing.Connection(fd1, writable=False) 198 c2 = _multiprocessing.Connection(fd2, readable=False) 199 200 return c1, c2 201 202 else: 203 from _multiprocessing import win32 204 205 def Pipe(duplex=True): 206 ''' 207 Returns pair of connection objects at either end of a pipe 208 ''' 209 address = arbitrary_address('AF_PIPE') 210 if duplex: 211 openmode = win32.PIPE_ACCESS_DUPLEX 212 access = win32.GENERIC_READ | win32.GENERIC_WRITE 213 obsize, ibsize = BUFSIZE, BUFSIZE 214 else: 215 openmode = win32.PIPE_ACCESS_INBOUND 216 access = win32.GENERIC_WRITE 217 obsize, ibsize = 0, BUFSIZE 218 219 h1 = win32.CreateNamedPipe( 220 address, openmode, 221 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | 222 win32.PIPE_WAIT, 223 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL 224 ) 225 h2 = win32.CreateFile( 226 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL 227 ) 228 win32.SetNamedPipeHandleState( 229 h2, win32.PIPE_READMODE_MESSAGE, None, None 230 ) 231 232 try: 233 win32.ConnectNamedPipe(h1, win32.NULL) 234 except WindowsError, e: 235 if e.args[0] != win32.ERROR_PIPE_CONNECTED: 236 raise 237 238 c1 = _multiprocessing.PipeConnection(h1, writable=duplex) 239 c2 = _multiprocessing.PipeConnection(h2, readable=duplex) 240 241 return c1, c2 242 243 # 244 # Definitions for connections based on sockets 245 # 246 247 class SocketListener(object): 248 ''' 249 Representation of a socket which is bound to an address and listening 250 ''' 251 def __init__(self, address, family, backlog=1): 252 self._socket = socket.socket(getattr(socket, family)) 253 try: 254 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 255 self._socket.setblocking(True) 256 self._socket.bind(address) 257 self._socket.listen(backlog) 258 self._address = self._socket.getsockname() 259 except socket.error: 260 self._socket.close() 261 raise 262 self._family = family 263 self._last_accepted = None 264 265 if family == 'AF_UNIX': 266 self._unlink = Finalize( 267 self, os.unlink, args=(address,), exitpriority=0 268 ) 269 else: 270 self._unlink = None 271 272 def accept(self): 273 s, self._last_accepted = self._socket.accept() 274 s.setblocking(True) 275 fd = duplicate(s.fileno()) 276 conn = _multiprocessing.Connection(fd) 277 s.close() 278 return conn 279 280 def close(self): 281 self._socket.close() 282 if self._unlink is not None: 283 self._unlink() 284 285 286 def SocketClient(address): 287 ''' 288 Return a connection object connected to the socket given by `address` 289 ''' 290 family = address_type(address) 291 s = socket.socket( getattr(socket, family) ) 292 s.setblocking(True) 293 t = _init_timeout() 294 295 while 1: 296 try: 297 s.connect(address) 298 except socket.error, e: 299 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): 300 debug('failed to connect to address %s', address) 301 raise 302 time.sleep(0.01) 303 else: 304 break 305 else: 306 raise 307 308 fd = duplicate(s.fileno()) 309 conn = _multiprocessing.Connection(fd) 310 s.close() 311 return conn 312 313 # 314 # Definitions for connections based on named pipes 315 # 316 317 if sys.platform == 'win32': 318 319 class PipeListener(object): 320 ''' 321 Representation of a named pipe 322 ''' 323 def __init__(self, address, backlog=None): 324 self._address = address 325 handle = win32.CreateNamedPipe( 326 address, win32.PIPE_ACCESS_DUPLEX, 327 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | 328 win32.PIPE_WAIT, 329 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 330 win32.NMPWAIT_WAIT_FOREVER, win32.NULL 331 ) 332 self._handle_queue = [handle] 333 self._last_accepted = None 334 335 sub_debug('listener created with address=%r', self._address) 336 337 self.close = Finalize( 338 self, PipeListener._finalize_pipe_listener, 339 args=(self._handle_queue, self._address), exitpriority=0 340 ) 341 342 def accept(self): 343 newhandle = win32.CreateNamedPipe( 344 self._address, win32.PIPE_ACCESS_DUPLEX, 345 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | 346 win32.PIPE_WAIT, 347 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 348 win32.NMPWAIT_WAIT_FOREVER, win32.NULL 349 ) 350 self._handle_queue.append(newhandle) 351 handle = self._handle_queue.pop(0) 352 try: 353 win32.ConnectNamedPipe(handle, win32.NULL) 354 except WindowsError, e: 355 # ERROR_NO_DATA can occur if a client has already connected, 356 # written data and then disconnected -- see Issue 14725. 357 if e.args[0] not in (win32.ERROR_PIPE_CONNECTED, 358 win32.ERROR_NO_DATA): 359 raise 360 return _multiprocessing.PipeConnection(handle) 361 362 @staticmethod 363 def _finalize_pipe_listener(queue, address): 364 sub_debug('closing listener with address=%r', address) 365 for handle in queue: 366 close(handle) 367 368 def PipeClient(address): 369 ''' 370 Return a connection object connected to the pipe given by `address` 371 ''' 372 t = _init_timeout() 373 while 1: 374 try: 375 win32.WaitNamedPipe(address, 1000) 376 h = win32.CreateFile( 377 address, win32.GENERIC_READ | win32.GENERIC_WRITE, 378 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL 379 ) 380 except WindowsError, e: 381 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, 382 win32.ERROR_PIPE_BUSY) or _check_timeout(t): 383 raise 384 else: 385 break 386 else: 387 raise 388 389 win32.SetNamedPipeHandleState( 390 h, win32.PIPE_READMODE_MESSAGE, None, None 391 ) 392 return _multiprocessing.PipeConnection(h) 393 394 # 395 # Authentication stuff 396 # 397 398 MESSAGE_LENGTH = 20 399 400 CHALLENGE = b'#CHALLENGE#' 401 WELCOME = b'#WELCOME#' 402 FAILURE = b'#FAILURE#' 403 404 def deliver_challenge(connection, authkey): 405 import hmac 406 assert isinstance(authkey, bytes) 407 message = os.urandom(MESSAGE_LENGTH) 408 connection.send_bytes(CHALLENGE + message) 409 digest = hmac.new(authkey, message).digest() 410 response = connection.recv_bytes(256) # reject large message 411 if response == digest: 412 connection.send_bytes(WELCOME) 413 else: 414 connection.send_bytes(FAILURE) 415 raise AuthenticationError('digest received was wrong') 416 417 def answer_challenge(connection, authkey): 418 import hmac 419 assert isinstance(authkey, bytes) 420 message = connection.recv_bytes(256) # reject large message 421 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 422 message = message[len(CHALLENGE):] 423 digest = hmac.new(authkey, message).digest() 424 connection.send_bytes(digest) 425 response = connection.recv_bytes(256) # reject large message 426 if response != WELCOME: 427 raise AuthenticationError('digest sent was rejected') 428 429 # 430 # Support for using xmlrpclib for serialization 431 # 432 433 class ConnectionWrapper(object): 434 def __init__(self, conn, dumps, loads): 435 self._conn = conn 436 self._dumps = dumps 437 self._loads = loads 438 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 439 obj = getattr(conn, attr) 440 setattr(self, attr, obj) 441 def send(self, obj): 442 s = self._dumps(obj) 443 self._conn.send_bytes(s) 444 def recv(self): 445 s = self._conn.recv_bytes() 446 return self._loads(s) 447 448 def _xml_dumps(obj): 449 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') 450 451 def _xml_loads(s): 452 (obj,), method = xmlrpclib.loads(s.decode('utf8')) 453 return obj 454 455 class XmlListener(Listener): 456 def accept(self): 457 global xmlrpclib 458 import xmlrpclib 459 obj = Listener.accept(self) 460 return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 461 462 def XmlClient(*args, **kwds): 463 global xmlrpclib 464 import xmlrpclib 465 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 466