Home | History | Annotate | Download | only in Lib
      1 # -*- Mode: Python -*-
      2 #   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
      3 #   Author: Sam Rushing <rushing (at] nightmare.com>
      4 
      5 # ======================================================================
      6 # Copyright 1996 by Sam Rushing
      7 #
      8 #                         All Rights Reserved
      9 #
     10 # Permission to use, copy, modify, and distribute this software and
     11 # its documentation for any purpose and without fee is hereby
     12 # granted, provided that the above copyright notice appear in all
     13 # copies and that both that copyright notice and this permission
     14 # notice appear in supporting documentation, and that the name of Sam
     15 # Rushing not be used in advertising or publicity pertaining to
     16 # distribution of the software without specific, written prior
     17 # permission.
     18 #
     19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
     20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
     21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
     22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
     23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
     24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
     25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     26 # ======================================================================
     27 
     28 """Basic infrastructure for asynchronous socket service clients and servers.
     29 
     30 There are only two ways to have a program on a single processor do "more
     31 than one thing at a time".  Multi-threaded programming is the simplest and
     32 most popular way to do it, but there is another very different technique,
     33 that lets you have nearly all the advantages of multi-threading, without
     34 actually using multiple threads. it's really only practical if your program
     35 is largely I/O bound. If your program is CPU bound, then pre-emptive
     36 scheduled threads are probably what you really need. Network servers are
     37 rarely CPU-bound, however.
     38 
     39 If your operating system supports the select() system call in its I/O
     40 library (and nearly all do), then you can use it to juggle multiple
     41 communication channels at once; doing other work while your I/O is taking
     42 place in the "background."  Although this strategy can seem strange and
     43 complex, especially at first, it is in many ways easier to understand and
     44 control than multi-threaded programming. The module documented here solves
     45 many of the difficult problems for you, making the task of building
     46 sophisticated high-performance network servers and clients a snap.
     47 """
     48 
     49 import select
     50 import socket
     51 import sys
     52 import time
     53 import warnings
     54 
     55 import os
     56 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
     57      ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
     58      errorcode
     59 
     60 _DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
     61                            EBADF})
     62 
     63 try:
     64     socket_map
     65 except NameError:
     66     socket_map = {}
     67 
     68 def _strerror(err):
     69     try:
     70         return os.strerror(err)
     71     except (ValueError, OverflowError, NameError):
     72         if err in errorcode:
     73             return errorcode[err]
     74         return "Unknown error %s" %err
     75 
     76 class ExitNow(Exception):
     77     pass
     78 
     79 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
     80 
     81 def read(obj):
     82     try:
     83         obj.handle_read_event()
     84     except _reraised_exceptions:
     85         raise
     86     except:
     87         obj.handle_error()
     88 
     89 def write(obj):
     90     try:
     91         obj.handle_write_event()
     92     except _reraised_exceptions:
     93         raise
     94     except:
     95         obj.handle_error()
     96 
     97 def _exception(obj):
     98     try:
     99         obj.handle_expt_event()
    100     except _reraised_exceptions:
    101         raise
    102     except:
    103         obj.handle_error()
    104 
    105 def readwrite(obj, flags):
    106     try:
    107         if flags & select.POLLIN:
    108             obj.handle_read_event()
    109         if flags & select.POLLOUT:
    110             obj.handle_write_event()
    111         if flags & select.POLLPRI:
    112             obj.handle_expt_event()
    113         if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
    114             obj.handle_close()
    115     except OSError as e:
    116         if e.args[0] not in _DISCONNECTED:
    117             obj.handle_error()
    118         else:
    119             obj.handle_close()
    120     except _reraised_exceptions:
    121         raise
    122     except:
    123         obj.handle_error()
    124 
    125 def poll(timeout=0.0, map=None):
    126     if map is None:
    127         map = socket_map
    128     if map:
    129         r = []; w = []; e = []
    130         for fd, obj in list(map.items()):
    131             is_r = obj.readable()
    132             is_w = obj.writable()
    133             if is_r:
    134                 r.append(fd)
    135             # accepting sockets should not be writable
    136             if is_w and not obj.accepting:
    137                 w.append(fd)
    138             if is_r or is_w:
    139                 e.append(fd)
    140         if [] == r == w == e:
    141             time.sleep(timeout)
    142             return
    143 
    144         r, w, e = select.select(r, w, e, timeout)
    145 
    146         for fd in r:
    147             obj = map.get(fd)
    148             if obj is None:
    149                 continue
    150             read(obj)
    151 
    152         for fd in w:
    153             obj = map.get(fd)
    154             if obj is None:
    155                 continue
    156             write(obj)
    157 
    158         for fd in e:
    159             obj = map.get(fd)
    160             if obj is None:
    161                 continue
    162             _exception(obj)
    163 
    164 def poll2(timeout=0.0, map=None):
    165     # Use the poll() support added to the select module in Python 2.0
    166     if map is None:
    167         map = socket_map
    168     if timeout is not None:
    169         # timeout is in milliseconds
    170         timeout = int(timeout*1000)
    171     pollster = select.poll()
    172     if map:
    173         for fd, obj in list(map.items()):
    174             flags = 0
    175             if obj.readable():
    176                 flags |= select.POLLIN | select.POLLPRI
    177             # accepting sockets should not be writable
    178             if obj.writable() and not obj.accepting:
    179                 flags |= select.POLLOUT
    180             if flags:
    181                 pollster.register(fd, flags)
    182 
    183         r = pollster.poll(timeout)
    184         for fd, flags in r:
    185             obj = map.get(fd)
    186             if obj is None:
    187                 continue
    188             readwrite(obj, flags)
    189 
    190 poll3 = poll2                           # Alias for backward compatibility
    191 
    192 def loop(timeout=30.0, use_poll=False, map=None, count=None):
    193     if map is None:
    194         map = socket_map
    195 
    196     if use_poll and hasattr(select, 'poll'):
    197         poll_fun = poll2
    198     else:
    199         poll_fun = poll
    200 
    201     if count is None:
    202         while map:
    203             poll_fun(timeout, map)
    204 
    205     else:
    206         while map and count > 0:
    207             poll_fun(timeout, map)
    208             count = count - 1
    209 
    210 class dispatcher:
    211 
    212     debug = False
    213     connected = False
    214     accepting = False
    215     connecting = False
    216     closing = False
    217     addr = None
    218     ignore_log_types = frozenset({'warning'})
    219 
    220     def __init__(self, sock=None, map=None):
    221         if map is None:
    222             self._map = socket_map
    223         else:
    224             self._map = map
    225 
    226         self._fileno = None
    227 
    228         if sock:
    229             # Set to nonblocking just to make sure for cases where we
    230             # get a socket from a blocking source.
    231             sock.setblocking(0)
    232             self.set_socket(sock, map)
    233             self.connected = True
    234             # The constructor no longer requires that the socket
    235             # passed be connected.
    236             try:
    237                 self.addr = sock.getpeername()
    238             except OSError as err:
    239                 if err.args[0] in (ENOTCONN, EINVAL):
    240                     # To handle the case where we got an unconnected
    241                     # socket.
    242                     self.connected = False
    243                 else:
    244                     # The socket is broken in some unknown way, alert
    245                     # the user and remove it from the map (to prevent
    246                     # polling of broken sockets).
    247                     self.del_channel(map)
    248                     raise
    249         else:
    250             self.socket = None
    251 
    252     def __repr__(self):
    253         status = [self.__class__.__module__+"."+self.__class__.__qualname__]
    254         if self.accepting and self.addr:
    255             status.append('listening')
    256         elif self.connected:
    257             status.append('connected')
    258         if self.addr is not None:
    259             try:
    260                 status.append('%s:%d' % self.addr)
    261             except TypeError:
    262                 status.append(repr(self.addr))
    263         return '<%s at %#x>' % (' '.join(status), id(self))
    264 
    265     __str__ = __repr__
    266 
    267     def add_channel(self, map=None):
    268         #self.log_info('adding channel %s' % self)
    269         if map is None:
    270             map = self._map
    271         map[self._fileno] = self
    272 
    273     def del_channel(self, map=None):
    274         fd = self._fileno
    275         if map is None:
    276             map = self._map
    277         if fd in map:
    278             #self.log_info('closing channel %d:%s' % (fd, self))
    279             del map[fd]
    280         self._fileno = None
    281 
    282     def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
    283         self.family_and_type = family, type
    284         sock = socket.socket(family, type)
    285         sock.setblocking(0)
    286         self.set_socket(sock)
    287 
    288     def set_socket(self, sock, map=None):
    289         self.socket = sock
    290 ##        self.__dict__['socket'] = sock
    291         self._fileno = sock.fileno()
    292         self.add_channel(map)
    293 
    294     def set_reuse_addr(self):
    295         # try to re-use a server port if possible
    296         try:
    297             self.socket.setsockopt(
    298                 socket.SOL_SOCKET, socket.SO_REUSEADDR,
    299                 self.socket.getsockopt(socket.SOL_SOCKET,
    300                                        socket.SO_REUSEADDR) | 1
    301                 )
    302         except OSError:
    303             pass
    304 
    305     # ==================================================
    306     # predicates for select()
    307     # these are used as filters for the lists of sockets
    308     # to pass to select().
    309     # ==================================================
    310 
    311     def readable(self):
    312         return True
    313 
    314     def writable(self):
    315         return True
    316 
    317     # ==================================================
    318     # socket object methods.
    319     # ==================================================
    320 
    321     def listen(self, num):
    322         self.accepting = True
    323         if os.name == 'nt' and num > 5:
    324             num = 5
    325         return self.socket.listen(num)
    326 
    327     def bind(self, addr):
    328         self.addr = addr
    329         return self.socket.bind(addr)
    330 
    331     def connect(self, address):
    332         self.connected = False
    333         self.connecting = True
    334         err = self.socket.connect_ex(address)
    335         if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
    336         or err == EINVAL and os.name == 'nt':
    337             self.addr = address
    338             return
    339         if err in (0, EISCONN):
    340             self.addr = address
    341             self.handle_connect_event()
    342         else:
    343             raise OSError(err, errorcode[err])
    344 
    345     def accept(self):
    346         # XXX can return either an address pair or None
    347         try:
    348             conn, addr = self.socket.accept()
    349         except TypeError:
    350             return None
    351         except OSError as why:
    352             if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
    353                 return None
    354             else:
    355                 raise
    356         else:
    357             return conn, addr
    358 
    359     def send(self, data):
    360         try:
    361             result = self.socket.send(data)
    362             return result
    363         except OSError as why:
    364             if why.args[0] == EWOULDBLOCK:
    365                 return 0
    366             elif why.args[0] in _DISCONNECTED:
    367                 self.handle_close()
    368                 return 0
    369             else:
    370                 raise
    371 
    372     def recv(self, buffer_size):
    373         try:
    374             data = self.socket.recv(buffer_size)
    375             if not data:
    376                 # a closed connection is indicated by signaling
    377                 # a read condition, and having recv() return 0.
    378                 self.handle_close()
    379                 return b''
    380             else:
    381                 return data
    382         except OSError as why:
    383             # winsock sometimes raises ENOTCONN
    384             if why.args[0] in _DISCONNECTED:
    385                 self.handle_close()
    386                 return b''
    387             else:
    388                 raise
    389 
    390     def close(self):
    391         self.connected = False
    392         self.accepting = False
    393         self.connecting = False
    394         self.del_channel()
    395         if self.socket is not None:
    396             try:
    397                 self.socket.close()
    398             except OSError as why:
    399                 if why.args[0] not in (ENOTCONN, EBADF):
    400                     raise
    401 
    402     # log and log_info may be overridden to provide more sophisticated
    403     # logging and warning methods. In general, log is for 'hit' logging
    404     # and 'log_info' is for informational, warning and error logging.
    405 
    406     def log(self, message):
    407         sys.stderr.write('log: %s\n' % str(message))
    408 
    409     def log_info(self, message, type='info'):
    410         if type not in self.ignore_log_types:
    411             print('%s: %s' % (type, message))
    412 
    413     def handle_read_event(self):
    414         if self.accepting:
    415             # accepting sockets are never connected, they "spawn" new
    416             # sockets that are connected
    417             self.handle_accept()
    418         elif not self.connected:
    419             if self.connecting:
    420                 self.handle_connect_event()
    421             self.handle_read()
    422         else:
    423             self.handle_read()
    424 
    425     def handle_connect_event(self):
    426         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
    427         if err != 0:
    428             raise OSError(err, _strerror(err))
    429         self.handle_connect()
    430         self.connected = True
    431         self.connecting = False
    432 
    433     def handle_write_event(self):
    434         if self.accepting:
    435             # Accepting sockets shouldn't get a write event.
    436             # We will pretend it didn't happen.
    437             return
    438 
    439         if not self.connected:
    440             if self.connecting:
    441                 self.handle_connect_event()
    442         self.handle_write()
    443 
    444     def handle_expt_event(self):
    445         # handle_expt_event() is called if there might be an error on the
    446         # socket, or if there is OOB data
    447         # check for the error condition first
    448         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
    449         if err != 0:
    450             # we can get here when select.select() says that there is an
    451             # exceptional condition on the socket
    452             # since there is an error, we'll go ahead and close the socket
    453             # like we would in a subclassed handle_read() that received no
    454             # data
    455             self.handle_close()
    456         else:
    457             self.handle_expt()
    458 
    459     def handle_error(self):
    460         nil, t, v, tbinfo = compact_traceback()
    461 
    462         # sometimes a user repr method will crash.
    463         try:
    464             self_repr = repr(self)
    465         except:
    466             self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
    467 
    468         self.log_info(
    469             'uncaptured python exception, closing channel %s (%s:%s %s)' % (
    470                 self_repr,
    471                 t,
    472                 v,
    473                 tbinfo
    474                 ),
    475             'error'
    476             )
    477         self.handle_close()
    478 
    479     def handle_expt(self):
    480         self.log_info('unhandled incoming priority event', 'warning')
    481 
    482     def handle_read(self):
    483         self.log_info('unhandled read event', 'warning')
    484 
    485     def handle_write(self):
    486         self.log_info('unhandled write event', 'warning')
    487 
    488     def handle_connect(self):
    489         self.log_info('unhandled connect event', 'warning')
    490 
    491     def handle_accept(self):
    492         pair = self.accept()
    493         if pair is not None:
    494             self.handle_accepted(*pair)
    495 
    496     def handle_accepted(self, sock, addr):
    497         sock.close()
    498         self.log_info('unhandled accepted event', 'warning')
    499 
    500     def handle_close(self):
    501         self.log_info('unhandled close event', 'warning')
    502         self.close()
    503 
    504 # ---------------------------------------------------------------------------
    505 # adds simple buffered output capability, useful for simple clients.
    506 # [for more sophisticated usage use asynchat.async_chat]
    507 # ---------------------------------------------------------------------------
    508 
    509 class dispatcher_with_send(dispatcher):
    510 
    511     def __init__(self, sock=None, map=None):
    512         dispatcher.__init__(self, sock, map)
    513         self.out_buffer = b''
    514 
    515     def initiate_send(self):
    516         num_sent = 0
    517         num_sent = dispatcher.send(self, self.out_buffer[:65536])
    518         self.out_buffer = self.out_buffer[num_sent:]
    519 
    520     def handle_write(self):
    521         self.initiate_send()
    522 
    523     def writable(self):
    524         return (not self.connected) or len(self.out_buffer)
    525 
    526     def send(self, data):
    527         if self.debug:
    528             self.log_info('sending %s' % repr(data))
    529         self.out_buffer = self.out_buffer + data
    530         self.initiate_send()
    531 
    532 # ---------------------------------------------------------------------------
    533 # used for debugging.
    534 # ---------------------------------------------------------------------------
    535 
    536 def compact_traceback():
    537     t, v, tb = sys.exc_info()
    538     tbinfo = []
    539     if not tb: # Must have a traceback
    540         raise AssertionError("traceback does not exist")
    541     while tb:
    542         tbinfo.append((
    543             tb.tb_frame.f_code.co_filename,
    544             tb.tb_frame.f_code.co_name,
    545             str(tb.tb_lineno)
    546             ))
    547         tb = tb.tb_next
    548 
    549     # just to be safe
    550     del tb
    551 
    552     file, function, line = tbinfo[-1]
    553     info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
    554     return (file, function, line), t, v, info
    555 
    556 def close_all(map=None, ignore_all=False):
    557     if map is None:
    558         map = socket_map
    559     for x in list(map.values()):
    560         try:
    561             x.close()
    562         except OSError as x:
    563             if x.args[0] == EBADF:
    564                 pass
    565             elif not ignore_all:
    566                 raise
    567         except _reraised_exceptions:
    568             raise
    569         except:
    570             if not ignore_all:
    571                 raise
    572     map.clear()
    573 
    574 # Asynchronous File I/O:
    575 #
    576 # After a little research (reading man pages on various unixen, and
    577 # digging through the linux kernel), I've determined that select()
    578 # isn't meant for doing asynchronous file i/o.
    579 # Heartening, though - reading linux/mm/filemap.c shows that linux
    580 # supports asynchronous read-ahead.  So _MOST_ of the time, the data
    581 # will be sitting in memory for us already when we go to read it.
    582 #
    583 # What other OS's (besides NT) support async file i/o?  [VMS?]
    584 #
    585 # Regardless, this is useful for pipes, and stdin/stdout...
    586 
    587 if os.name == 'posix':
    588     class file_wrapper:
    589         # Here we override just enough to make a file
    590         # look like a socket for the purposes of asyncore.
    591         # The passed fd is automatically os.dup()'d
    592 
    593         def __init__(self, fd):
    594             self.fd = os.dup(fd)
    595 
    596         def __del__(self):
    597             if self.fd >= 0:
    598                 warnings.warn("unclosed file %r" % self, ResourceWarning,
    599                               source=self)
    600             self.close()
    601 
    602         def recv(self, *args):
    603             return os.read(self.fd, *args)
    604 
    605         def send(self, *args):
    606             return os.write(self.fd, *args)
    607 
    608         def getsockopt(self, level, optname, buflen=None):
    609             if (level == socket.SOL_SOCKET and
    610                 optname == socket.SO_ERROR and
    611                 not buflen):
    612                 return 0
    613             raise NotImplementedError("Only asyncore specific behaviour "
    614                                       "implemented.")
    615 
    616         read = recv
    617         write = send
    618 
    619         def close(self):
    620             if self.fd < 0:
    621                 return
    622             os.close(self.fd)
    623             self.fd = -1
    624 
    625         def fileno(self):
    626             return self.fd
    627 
    628     class file_dispatcher(dispatcher):
    629 
    630         def __init__(self, fd, map=None):
    631             dispatcher.__init__(self, None, map)
    632             self.connected = True
    633             try:
    634                 fd = fd.fileno()
    635             except AttributeError:
    636                 pass
    637             self.set_file(fd)
    638             # set it to non-blocking mode
    639             os.set_blocking(fd, False)
    640 
    641         def set_file(self, fd):
    642             self.socket = file_wrapper(fd)
    643             self._fileno = self.socket.fileno()
    644             self.add_channel()
    645