Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # A higher level module for using sockets (or Windows named pipes)
      3 #
      4 # multiprocessing/connection.py
      5 #
      6 # Copyright (c) 2006-2008, R Oudkerk
      7 # All rights reserved.
      8 #
      9 # Redistribution and use in source and binary forms, with or without
     10 # modification, are permitted provided that the following conditions
     11 # are met:
     12 #
     13 # 1. Redistributions of source code must retain the above copyright
     14 #    notice, this list of conditions and the following disclaimer.
     15 # 2. Redistributions in binary form must reproduce the above copyright
     16 #    notice, this list of conditions and the following disclaimer in the
     17 #    documentation and/or other materials provided with the distribution.
     18 # 3. Neither the name of author nor the names of any contributors may be
     19 #    used to endorse or promote products derived from this software
     20 #    without specific prior written permission.
     21 #
     22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
     23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     25 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     32 # SUCH DAMAGE.
     33 #
     34 
     35 __all__ = [ 'Client', 'Listener', 'Pipe' ]
     36 
     37 import os
     38 import sys
     39 import socket
     40 import errno
     41 import time
     42 import tempfile
     43 import itertools
     44 
     45 import _multiprocessing
     46 from multiprocessing import current_process, AuthenticationError
     47 from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
     48 from multiprocessing.forking import duplicate, close
     49 
     50 
     51 #
     52 #
     53 #
     54 
     55 BUFSIZE = 8192
     56 # A very generous timeout when it comes to local connections...
     57 CONNECTION_TIMEOUT = 20.
     58 
     59 _mmap_counter = itertools.count()
     60 
     61 default_family = 'AF_INET'
     62 families = ['AF_INET']
     63 
     64 if hasattr(socket, 'AF_UNIX'):
     65     default_family = 'AF_UNIX'
     66     families += ['AF_UNIX']
     67 
     68 if sys.platform == 'win32':
     69     default_family = 'AF_PIPE'
     70     families += ['AF_PIPE']
     71 
     72 
     73 def _init_timeout(timeout=CONNECTION_TIMEOUT):
     74     return time.time() + timeout
     75 
     76 def _check_timeout(t):
     77     return time.time() > t
     78 
     79 #
     80 #
     81 #
     82 
     83 def arbitrary_address(family):
     84     '''
     85     Return an arbitrary free address for the given family
     86     '''
     87     if family == 'AF_INET':
     88         return ('localhost', 0)
     89     elif family == 'AF_UNIX':
     90         return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
     91     elif family == 'AF_PIPE':
     92         return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
     93                                (os.getpid(), _mmap_counter.next()))
     94     else:
     95         raise ValueError('unrecognized family')
     96 
     97 
     98 def address_type(address):
     99     '''
    100     Return the types of the address
    101 
    102     This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
    103     '''
    104     if type(address) == tuple:
    105         return 'AF_INET'
    106     elif type(address) is str and address.startswith('\\\\'):
    107         return 'AF_PIPE'
    108     elif type(address) is str:
    109         return 'AF_UNIX'
    110     else:
    111         raise ValueError('address type of %r unrecognized' % address)
    112 
    113 #
    114 # Public functions
    115 #
    116 
    117 class Listener(object):
    118     '''
    119     Returns a listener object.
    120 
    121     This is a wrapper for a bound socket which is 'listening' for
    122     connections, or for a Windows named pipe.
    123     '''
    124     def __init__(self, address=None, family=None, backlog=1, authkey=None):
    125         family = family or (address and address_type(address)) \
    126                  or default_family
    127         address = address or arbitrary_address(family)
    128 
    129         if family == 'AF_PIPE':
    130             self._listener = PipeListener(address, backlog)
    131         else:
    132             self._listener = SocketListener(address, family, backlog)
    133 
    134         if authkey is not None and not isinstance(authkey, bytes):
    135             raise TypeError, 'authkey should be a byte string'
    136 
    137         self._authkey = authkey
    138 
    139     def accept(self):
    140         '''
    141         Accept a connection on the bound socket or named pipe of `self`.
    142 
    143         Returns a `Connection` object.
    144         '''
    145         c = self._listener.accept()
    146         if self._authkey:
    147             deliver_challenge(c, self._authkey)
    148             answer_challenge(c, self._authkey)
    149         return c
    150 
    151     def close(self):
    152         '''
    153         Close the bound socket or named pipe of `self`.
    154         '''
    155         return self._listener.close()
    156 
    157     address = property(lambda self: self._listener._address)
    158     last_accepted = property(lambda self: self._listener._last_accepted)
    159 
    160 
    161 def Client(address, family=None, authkey=None):
    162     '''
    163     Returns a connection to the address of a `Listener`
    164     '''
    165     family = family or address_type(address)
    166     if family == 'AF_PIPE':
    167         c = PipeClient(address)
    168     else:
    169         c = SocketClient(address)
    170 
    171     if authkey is not None and not isinstance(authkey, bytes):
    172         raise TypeError, 'authkey should be a byte string'
    173 
    174     if authkey is not None:
    175         answer_challenge(c, authkey)
    176         deliver_challenge(c, authkey)
    177 
    178     return c
    179 
    180 
    181 if sys.platform != 'win32':
    182 
    183     def Pipe(duplex=True):
    184         '''
    185         Returns pair of connection objects at either end of a pipe
    186         '''
    187         if duplex:
    188             s1, s2 = socket.socketpair()
    189             s1.setblocking(True)
    190             s2.setblocking(True)
    191             c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
    192             c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
    193             s1.close()
    194             s2.close()
    195         else:
    196             fd1, fd2 = os.pipe()
    197             c1 = _multiprocessing.Connection(fd1, writable=False)
    198             c2 = _multiprocessing.Connection(fd2, readable=False)
    199 
    200         return c1, c2
    201 
    202 else:
    203     from _multiprocessing import win32
    204 
    205     def Pipe(duplex=True):
    206         '''
    207         Returns pair of connection objects at either end of a pipe
    208         '''
    209         address = arbitrary_address('AF_PIPE')
    210         if duplex:
    211             openmode = win32.PIPE_ACCESS_DUPLEX
    212             access = win32.GENERIC_READ | win32.GENERIC_WRITE
    213             obsize, ibsize = BUFSIZE, BUFSIZE
    214         else:
    215             openmode = win32.PIPE_ACCESS_INBOUND
    216             access = win32.GENERIC_WRITE
    217             obsize, ibsize = 0, BUFSIZE
    218 
    219         h1 = win32.CreateNamedPipe(
    220             address, openmode,
    221             win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
    222             win32.PIPE_WAIT,
    223             1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
    224             )
    225         h2 = win32.CreateFile(
    226             address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
    227             )
    228         win32.SetNamedPipeHandleState(
    229             h2, win32.PIPE_READMODE_MESSAGE, None, None
    230             )
    231 
    232         try:
    233             win32.ConnectNamedPipe(h1, win32.NULL)
    234         except WindowsError, e:
    235             if e.args[0] != win32.ERROR_PIPE_CONNECTED:
    236                 raise
    237 
    238         c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
    239         c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
    240 
    241         return c1, c2
    242 
    243 #
    244 # Definitions for connections based on sockets
    245 #
    246 
    247 class SocketListener(object):
    248     '''
    249     Representation of a socket which is bound to an address and listening
    250     '''
    251     def __init__(self, address, family, backlog=1):
    252         self._socket = socket.socket(getattr(socket, family))
    253         try:
    254             self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    255             self._socket.setblocking(True)
    256             self._socket.bind(address)
    257             self._socket.listen(backlog)
    258             self._address = self._socket.getsockname()
    259         except socket.error:
    260             self._socket.close()
    261             raise
    262         self._family = family
    263         self._last_accepted = None
    264 
    265         if family == 'AF_UNIX':
    266             self._unlink = Finalize(
    267                 self, os.unlink, args=(address,), exitpriority=0
    268                 )
    269         else:
    270             self._unlink = None
    271 
    272     def accept(self):
    273         s, self._last_accepted = self._socket.accept()
    274         s.setblocking(True)
    275         fd = duplicate(s.fileno())
    276         conn = _multiprocessing.Connection(fd)
    277         s.close()
    278         return conn
    279 
    280     def close(self):
    281         self._socket.close()
    282         if self._unlink is not None:
    283             self._unlink()
    284 
    285 
    286 def SocketClient(address):
    287     '''
    288     Return a connection object connected to the socket given by `address`
    289     '''
    290     family = address_type(address)
    291     s = socket.socket( getattr(socket, family) )
    292     s.setblocking(True)
    293     t = _init_timeout()
    294 
    295     while 1:
    296         try:
    297             s.connect(address)
    298         except socket.error, e:
    299             if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
    300                 debug('failed to connect to address %s', address)
    301                 raise
    302             time.sleep(0.01)
    303         else:
    304             break
    305     else:
    306         raise
    307 
    308     fd = duplicate(s.fileno())
    309     conn = _multiprocessing.Connection(fd)
    310     s.close()
    311     return conn
    312 
    313 #
    314 # Definitions for connections based on named pipes
    315 #
    316 
    317 if sys.platform == 'win32':
    318 
    319     class PipeListener(object):
    320         '''
    321         Representation of a named pipe
    322         '''
    323         def __init__(self, address, backlog=None):
    324             self._address = address
    325             handle = win32.CreateNamedPipe(
    326                 address, win32.PIPE_ACCESS_DUPLEX,
    327                 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
    328                 win32.PIPE_WAIT,
    329                 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
    330                 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
    331                 )
    332             self._handle_queue = [handle]
    333             self._last_accepted = None
    334 
    335             sub_debug('listener created with address=%r', self._address)
    336 
    337             self.close = Finalize(
    338                 self, PipeListener._finalize_pipe_listener,
    339                 args=(self._handle_queue, self._address), exitpriority=0
    340                 )
    341 
    342         def accept(self):
    343             newhandle = win32.CreateNamedPipe(
    344                 self._address, win32.PIPE_ACCESS_DUPLEX,
    345                 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
    346                 win32.PIPE_WAIT,
    347                 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
    348                 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
    349                 )
    350             self._handle_queue.append(newhandle)
    351             handle = self._handle_queue.pop(0)
    352             try:
    353                 win32.ConnectNamedPipe(handle, win32.NULL)
    354             except WindowsError, e:
    355                 # ERROR_NO_DATA can occur if a client has already connected,
    356                 # written data and then disconnected -- see Issue 14725.
    357                 if e.args[0] not in (win32.ERROR_PIPE_CONNECTED,
    358                                      win32.ERROR_NO_DATA):
    359                     raise
    360             return _multiprocessing.PipeConnection(handle)
    361 
    362         @staticmethod
    363         def _finalize_pipe_listener(queue, address):
    364             sub_debug('closing listener with address=%r', address)
    365             for handle in queue:
    366                 close(handle)
    367 
    368     def PipeClient(address):
    369         '''
    370         Return a connection object connected to the pipe given by `address`
    371         '''
    372         t = _init_timeout()
    373         while 1:
    374             try:
    375                 win32.WaitNamedPipe(address, 1000)
    376                 h = win32.CreateFile(
    377                     address, win32.GENERIC_READ | win32.GENERIC_WRITE,
    378                     0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
    379                     )
    380             except WindowsError, e:
    381                 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
    382                                      win32.ERROR_PIPE_BUSY) or _check_timeout(t):
    383                     raise
    384             else:
    385                 break
    386         else:
    387             raise
    388 
    389         win32.SetNamedPipeHandleState(
    390             h, win32.PIPE_READMODE_MESSAGE, None, None
    391             )
    392         return _multiprocessing.PipeConnection(h)
    393 
    394 #
    395 # Authentication stuff
    396 #
    397 
    398 MESSAGE_LENGTH = 20
    399 
    400 CHALLENGE = b'#CHALLENGE#'
    401 WELCOME = b'#WELCOME#'
    402 FAILURE = b'#FAILURE#'
    403 
    404 def deliver_challenge(connection, authkey):
    405     import hmac
    406     assert isinstance(authkey, bytes)
    407     message = os.urandom(MESSAGE_LENGTH)
    408     connection.send_bytes(CHALLENGE + message)
    409     digest = hmac.new(authkey, message).digest()
    410     response = connection.recv_bytes(256)        # reject large message
    411     if response == digest:
    412         connection.send_bytes(WELCOME)
    413     else:
    414         connection.send_bytes(FAILURE)
    415         raise AuthenticationError('digest received was wrong')
    416 
    417 def answer_challenge(connection, authkey):
    418     import hmac
    419     assert isinstance(authkey, bytes)
    420     message = connection.recv_bytes(256)         # reject large message
    421     assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
    422     message = message[len(CHALLENGE):]
    423     digest = hmac.new(authkey, message).digest()
    424     connection.send_bytes(digest)
    425     response = connection.recv_bytes(256)        # reject large message
    426     if response != WELCOME:
    427         raise AuthenticationError('digest sent was rejected')
    428 
    429 #
    430 # Support for using xmlrpclib for serialization
    431 #
    432 
    433 class ConnectionWrapper(object):
    434     def __init__(self, conn, dumps, loads):
    435         self._conn = conn
    436         self._dumps = dumps
    437         self._loads = loads
    438         for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
    439             obj = getattr(conn, attr)
    440             setattr(self, attr, obj)
    441     def send(self, obj):
    442         s = self._dumps(obj)
    443         self._conn.send_bytes(s)
    444     def recv(self):
    445         s = self._conn.recv_bytes()
    446         return self._loads(s)
    447 
    448 def _xml_dumps(obj):
    449     return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
    450 
    451 def _xml_loads(s):
    452     (obj,), method = xmlrpclib.loads(s.decode('utf8'))
    453     return obj
    454 
    455 class XmlListener(Listener):
    456     def accept(self):
    457         global xmlrpclib
    458         import xmlrpclib
    459         obj = Listener.accept(self)
    460         return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
    461 
    462 def XmlClient(*args, **kwds):
    463     global xmlrpclib
    464     import xmlrpclib
    465     return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
    466