Home | History | Annotate | Download | only in Lib
      1 # -*- Mode: Python -*-
      2 #   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
      3 #   Author: Sam Rushing <rushing (at] nightmare.com>
      4 
      5 # ======================================================================
      6 # Copyright 1996 by Sam Rushing
      7 #
      8 #                         All Rights Reserved
      9 #
     10 # Permission to use, copy, modify, and distribute this software and
     11 # its documentation for any purpose and without fee is hereby
     12 # granted, provided that the above copyright notice appear in all
     13 # copies and that both that copyright notice and this permission
     14 # notice appear in supporting documentation, and that the name of Sam
     15 # Rushing not be used in advertising or publicity pertaining to
     16 # distribution of the software without specific, written prior
     17 # permission.
     18 #
     19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
     20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
     21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
     22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
     23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
     24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
     25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     26 # ======================================================================
     27 
     28 """Basic infrastructure for asynchronous socket service clients and servers.
     29 
     30 There are only two ways to have a program on a single processor do "more
     31 than one thing at a time".  Multi-threaded programming is the simplest and
     32 most popular way to do it, but there is another very different technique,
     33 that lets you have nearly all the advantages of multi-threading, without
     34 actually using multiple threads. it's really only practical if your program
     35 is largely I/O bound. If your program is CPU bound, then pre-emptive
     36 scheduled threads are probably what you really need. Network servers are
     37 rarely CPU-bound, however.
     38 
     39 If your operating system supports the select() system call in its I/O
     40 library (and nearly all do), then you can use it to juggle multiple
     41 communication channels at once; doing other work while your I/O is taking
     42 place in the "background."  Although this strategy can seem strange and
     43 complex, especially at first, it is in many ways easier to understand and
     44 control than multi-threaded programming. The module documented here solves
     45 many of the difficult problems for you, making the task of building
     46 sophisticated high-performance network servers and clients a snap.
     47 """
     48 
     49 import select
     50 import socket
     51 import sys
     52 import time
     53 import warnings
     54 
     55 import os
     56 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
     57      ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
     58      errorcode
     59 
     60 _DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
     61                            EBADF))
     62 
     63 try:
     64     socket_map
     65 except NameError:
     66     socket_map = {}
     67 
     68 def _strerror(err):
     69     try:
     70         return os.strerror(err)
     71     except (ValueError, OverflowError, NameError):
     72         if err in errorcode:
     73             return errorcode[err]
     74         return "Unknown error %s" %err
     75 
     76 class ExitNow(Exception):
     77     pass
     78 
     79 _reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
     80 
     81 def read(obj):
     82     try:
     83         obj.handle_read_event()
     84     except _reraised_exceptions:
     85         raise
     86     except:
     87         obj.handle_error()
     88 
     89 def write(obj):
     90     try:
     91         obj.handle_write_event()
     92     except _reraised_exceptions:
     93         raise
     94     except:
     95         obj.handle_error()
     96 
     97 def _exception(obj):
     98     try:
     99         obj.handle_expt_event()
    100     except _reraised_exceptions:
    101         raise
    102     except:
    103         obj.handle_error()
    104 
    105 def readwrite(obj, flags):
    106     try:
    107         if flags & select.POLLIN:
    108             obj.handle_read_event()
    109         if flags & select.POLLOUT:
    110             obj.handle_write_event()
    111         if flags & select.POLLPRI:
    112             obj.handle_expt_event()
    113         if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
    114             obj.handle_close()
    115     except socket.error, e:
    116         if e.args[0] not in _DISCONNECTED:
    117             obj.handle_error()
    118         else:
    119             obj.handle_close()
    120     except _reraised_exceptions:
    121         raise
    122     except:
    123         obj.handle_error()
    124 
    125 def poll(timeout=0.0, map=None):
    126     if map is None:
    127         map = socket_map
    128     if map:
    129         r = []; w = []; e = []
    130         for fd, obj in map.items():
    131             is_r = obj.readable()
    132             is_w = obj.writable()
    133             if is_r:
    134                 r.append(fd)
    135             # accepting sockets should not be writable
    136             if is_w and not obj.accepting:
    137                 w.append(fd)
    138             if is_r or is_w:
    139                 e.append(fd)
    140         if [] == r == w == e:
    141             time.sleep(timeout)
    142             return
    143 
    144         try:
    145             r, w, e = select.select(r, w, e, timeout)
    146         except select.error, err:
    147             if err.args[0] != EINTR:
    148                 raise
    149             else:
    150                 return
    151 
    152         for fd in r:
    153             obj = map.get(fd)
    154             if obj is None:
    155                 continue
    156             read(obj)
    157 
    158         for fd in w:
    159             obj = map.get(fd)
    160             if obj is None:
    161                 continue
    162             write(obj)
    163 
    164         for fd in e:
    165             obj = map.get(fd)
    166             if obj is None:
    167                 continue
    168             _exception(obj)
    169 
    170 def poll2(timeout=0.0, map=None):
    171     # Use the poll() support added to the select module in Python 2.0
    172     if map is None:
    173         map = socket_map
    174     if timeout is not None:
    175         # timeout is in milliseconds
    176         timeout = int(timeout*1000)
    177     pollster = select.poll()
    178     if map:
    179         for fd, obj in map.items():
    180             flags = 0
    181             if obj.readable():
    182                 flags |= select.POLLIN | select.POLLPRI
    183             # accepting sockets should not be writable
    184             if obj.writable() and not obj.accepting:
    185                 flags |= select.POLLOUT
    186             if flags:
    187                 # Only check for exceptions if object was either readable
    188                 # or writable.
    189                 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
    190                 pollster.register(fd, flags)
    191         try:
    192             r = pollster.poll(timeout)
    193         except select.error, err:
    194             if err.args[0] != EINTR:
    195                 raise
    196             r = []
    197         for fd, flags in r:
    198             obj = map.get(fd)
    199             if obj is None:
    200                 continue
    201             readwrite(obj, flags)
    202 
    203 poll3 = poll2                           # Alias for backward compatibility
    204 
    205 def loop(timeout=30.0, use_poll=False, map=None, count=None):
    206     if map is None:
    207         map = socket_map
    208 
    209     if use_poll and hasattr(select, 'poll'):
    210         poll_fun = poll2
    211     else:
    212         poll_fun = poll
    213 
    214     if count is None:
    215         while map:
    216             poll_fun(timeout, map)
    217 
    218     else:
    219         while map and count > 0:
    220             poll_fun(timeout, map)
    221             count = count - 1
    222 
    223 class dispatcher:
    224 
    225     debug = False
    226     connected = False
    227     accepting = False
    228     connecting = False
    229     closing = False
    230     addr = None
    231     ignore_log_types = frozenset(['warning'])
    232 
    233     def __init__(self, sock=None, map=None):
    234         if map is None:
    235             self._map = socket_map
    236         else:
    237             self._map = map
    238 
    239         self._fileno = None
    240 
    241         if sock:
    242             # Set to nonblocking just to make sure for cases where we
    243             # get a socket from a blocking source.
    244             sock.setblocking(0)
    245             self.set_socket(sock, map)
    246             self.connected = True
    247             # The constructor no longer requires that the socket
    248             # passed be connected.
    249             try:
    250                 self.addr = sock.getpeername()
    251             except socket.error, err:
    252                 if err.args[0] in (ENOTCONN, EINVAL):
    253                     # To handle the case where we got an unconnected
    254                     # socket.
    255                     self.connected = False
    256                 else:
    257                     # The socket is broken in some unknown way, alert
    258                     # the user and remove it from the map (to prevent
    259                     # polling of broken sockets).
    260                     self.del_channel(map)
    261                     raise
    262         else:
    263             self.socket = None
    264 
    265     def __repr__(self):
    266         status = [self.__class__.__module__+"."+self.__class__.__name__]
    267         if self.accepting and self.addr:
    268             status.append('listening')
    269         elif self.connected:
    270             status.append('connected')
    271         if self.addr is not None:
    272             try:
    273                 status.append('%s:%d' % self.addr)
    274             except TypeError:
    275                 status.append(repr(self.addr))
    276         return '<%s at %#x>' % (' '.join(status), id(self))
    277 
    278     __str__ = __repr__
    279 
    280     def add_channel(self, map=None):
    281         #self.log_info('adding channel %s' % self)
    282         if map is None:
    283             map = self._map
    284         map[self._fileno] = self
    285 
    286     def del_channel(self, map=None):
    287         fd = self._fileno
    288         if map is None:
    289             map = self._map
    290         if fd in map:
    291             #self.log_info('closing channel %d:%s' % (fd, self))
    292             del map[fd]
    293         self._fileno = None
    294 
    295     def create_socket(self, family, type):
    296         self.family_and_type = family, type
    297         sock = socket.socket(family, type)
    298         sock.setblocking(0)
    299         self.set_socket(sock)
    300 
    301     def set_socket(self, sock, map=None):
    302         self.socket = sock
    303 ##        self.__dict__['socket'] = sock
    304         self._fileno = sock.fileno()
    305         self.add_channel(map)
    306 
    307     def set_reuse_addr(self):
    308         # try to re-use a server port if possible
    309         try:
    310             self.socket.setsockopt(
    311                 socket.SOL_SOCKET, socket.SO_REUSEADDR,
    312                 self.socket.getsockopt(socket.SOL_SOCKET,
    313                                        socket.SO_REUSEADDR) | 1
    314                 )
    315         except socket.error:
    316             pass
    317 
    318     # ==================================================
    319     # predicates for select()
    320     # these are used as filters for the lists of sockets
    321     # to pass to select().
    322     # ==================================================
    323 
    324     def readable(self):
    325         return True
    326 
    327     def writable(self):
    328         return True
    329 
    330     # ==================================================
    331     # socket object methods.
    332     # ==================================================
    333 
    334     def listen(self, num):
    335         self.accepting = True
    336         if os.name == 'nt' and num > 5:
    337             num = 5
    338         return self.socket.listen(num)
    339 
    340     def bind(self, addr):
    341         self.addr = addr
    342         return self.socket.bind(addr)
    343 
    344     def connect(self, address):
    345         self.connected = False
    346         self.connecting = True
    347         err = self.socket.connect_ex(address)
    348         if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
    349         or err == EINVAL and os.name in ('nt', 'ce'):
    350             self.addr = address
    351             return
    352         if err in (0, EISCONN):
    353             self.addr = address
    354             self.handle_connect_event()
    355         else:
    356             raise socket.error(err, errorcode[err])
    357 
    358     def accept(self):
    359         # XXX can return either an address pair or None
    360         try:
    361             conn, addr = self.socket.accept()
    362         except TypeError:
    363             return None
    364         except socket.error as why:
    365             if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
    366                 return None
    367             else:
    368                 raise
    369         else:
    370             return conn, addr
    371 
    372     def send(self, data):
    373         try:
    374             result = self.socket.send(data)
    375             return result
    376         except socket.error, why:
    377             if why.args[0] == EWOULDBLOCK:
    378                 return 0
    379             elif why.args[0] in _DISCONNECTED:
    380                 self.handle_close()
    381                 return 0
    382             else:
    383                 raise
    384 
    385     def recv(self, buffer_size):
    386         try:
    387             data = self.socket.recv(buffer_size)
    388             if not data:
    389                 # a closed connection is indicated by signaling
    390                 # a read condition, and having recv() return 0.
    391                 self.handle_close()
    392                 return ''
    393             else:
    394                 return data
    395         except socket.error, why:
    396             # winsock sometimes raises ENOTCONN
    397             if why.args[0] in _DISCONNECTED:
    398                 self.handle_close()
    399                 return ''
    400             else:
    401                 raise
    402 
    403     def close(self):
    404         self.connected = False
    405         self.accepting = False
    406         self.connecting = False
    407         self.del_channel()
    408         try:
    409             self.socket.close()
    410         except socket.error, why:
    411             if why.args[0] not in (ENOTCONN, EBADF):
    412                 raise
    413 
    414     # cheap inheritance, used to pass all other attribute
    415     # references to the underlying socket object.
    416     def __getattr__(self, attr):
    417         try:
    418             retattr = getattr(self.socket, attr)
    419         except AttributeError:
    420             raise AttributeError("%s instance has no attribute '%s'"
    421                                  %(self.__class__.__name__, attr))
    422         else:
    423             msg = "%(me)s.%(attr)s is deprecated. Use %(me)s.socket.%(attr)s " \
    424                   "instead." % {'me': self.__class__.__name__, 'attr':attr}
    425             warnings.warn(msg, DeprecationWarning, stacklevel=2)
    426             return retattr
    427 
    428     # log and log_info may be overridden to provide more sophisticated
    429     # logging and warning methods. In general, log is for 'hit' logging
    430     # and 'log_info' is for informational, warning and error logging.
    431 
    432     def log(self, message):
    433         sys.stderr.write('log: %s\n' % str(message))
    434 
    435     def log_info(self, message, type='info'):
    436         if type not in self.ignore_log_types:
    437             print '%s: %s' % (type, message)
    438 
    439     def handle_read_event(self):
    440         if self.accepting:
    441             # accepting sockets are never connected, they "spawn" new
    442             # sockets that are connected
    443             self.handle_accept()
    444         elif not self.connected:
    445             if self.connecting:
    446                 self.handle_connect_event()
    447             self.handle_read()
    448         else:
    449             self.handle_read()
    450 
    451     def handle_connect_event(self):
    452         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
    453         if err != 0:
    454             raise socket.error(err, _strerror(err))
    455         self.handle_connect()
    456         self.connected = True
    457         self.connecting = False
    458 
    459     def handle_write_event(self):
    460         if self.accepting:
    461             # Accepting sockets shouldn't get a write event.
    462             # We will pretend it didn't happen.
    463             return
    464 
    465         if not self.connected:
    466             if self.connecting:
    467                 self.handle_connect_event()
    468         self.handle_write()
    469 
    470     def handle_expt_event(self):
    471         # handle_expt_event() is called if there might be an error on the
    472         # socket, or if there is OOB data
    473         # check for the error condition first
    474         err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
    475         if err != 0:
    476             # we can get here when select.select() says that there is an
    477             # exceptional condition on the socket
    478             # since there is an error, we'll go ahead and close the socket
    479             # like we would in a subclassed handle_read() that received no
    480             # data
    481             self.handle_close()
    482         else:
    483             self.handle_expt()
    484 
    485     def handle_error(self):
    486         nil, t, v, tbinfo = compact_traceback()
    487 
    488         # sometimes a user repr method will crash.
    489         try:
    490             self_repr = repr(self)
    491         except:
    492             self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
    493 
    494         self.log_info(
    495             'uncaptured python exception, closing channel %s (%s:%s %s)' % (
    496                 self_repr,
    497                 t,
    498                 v,
    499                 tbinfo
    500                 ),
    501             'error'
    502             )
    503         self.handle_close()
    504 
    505     def handle_expt(self):
    506         self.log_info('unhandled incoming priority event', 'warning')
    507 
    508     def handle_read(self):
    509         self.log_info('unhandled read event', 'warning')
    510 
    511     def handle_write(self):
    512         self.log_info('unhandled write event', 'warning')
    513 
    514     def handle_connect(self):
    515         self.log_info('unhandled connect event', 'warning')
    516 
    517     def handle_accept(self):
    518         self.log_info('unhandled accept event', 'warning')
    519 
    520     def handle_close(self):
    521         self.log_info('unhandled close event', 'warning')
    522         self.close()
    523 
    524 # ---------------------------------------------------------------------------
    525 # adds simple buffered output capability, useful for simple clients.
    526 # [for more sophisticated usage use asynchat.async_chat]
    527 # ---------------------------------------------------------------------------
    528 
    529 class dispatcher_with_send(dispatcher):
    530 
    531     def __init__(self, sock=None, map=None):
    532         dispatcher.__init__(self, sock, map)
    533         self.out_buffer = ''
    534 
    535     def initiate_send(self):
    536         num_sent = 0
    537         num_sent = dispatcher.send(self, self.out_buffer[:512])
    538         self.out_buffer = self.out_buffer[num_sent:]
    539 
    540     def handle_write(self):
    541         self.initiate_send()
    542 
    543     def writable(self):
    544         return (not self.connected) or len(self.out_buffer)
    545 
    546     def send(self, data):
    547         if self.debug:
    548             self.log_info('sending %s' % repr(data))
    549         self.out_buffer = self.out_buffer + data
    550         self.initiate_send()
    551 
    552 # ---------------------------------------------------------------------------
    553 # used for debugging.
    554 # ---------------------------------------------------------------------------
    555 
    556 def compact_traceback():
    557     t, v, tb = sys.exc_info()
    558     tbinfo = []
    559     if not tb: # Must have a traceback
    560         raise AssertionError("traceback does not exist")
    561     while tb:
    562         tbinfo.append((
    563             tb.tb_frame.f_code.co_filename,
    564             tb.tb_frame.f_code.co_name,
    565             str(tb.tb_lineno)
    566             ))
    567         tb = tb.tb_next
    568 
    569     # just to be safe
    570     del tb
    571 
    572     file, function, line = tbinfo[-1]
    573     info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
    574     return (file, function, line), t, v, info
    575 
    576 def close_all(map=None, ignore_all=False):
    577     if map is None:
    578         map = socket_map
    579     for x in map.values():
    580         try:
    581             x.close()
    582         except OSError, x:
    583             if x.args[0] == EBADF:
    584                 pass
    585             elif not ignore_all:
    586                 raise
    587         except _reraised_exceptions:
    588             raise
    589         except:
    590             if not ignore_all:
    591                 raise
    592     map.clear()
    593 
    594 # Asynchronous File I/O:
    595 #
    596 # After a little research (reading man pages on various unixen, and
    597 # digging through the linux kernel), I've determined that select()
    598 # isn't meant for doing asynchronous file i/o.
    599 # Heartening, though - reading linux/mm/filemap.c shows that linux
    600 # supports asynchronous read-ahead.  So _MOST_ of the time, the data
    601 # will be sitting in memory for us already when we go to read it.
    602 #
    603 # What other OS's (besides NT) support async file i/o?  [VMS?]
    604 #
    605 # Regardless, this is useful for pipes, and stdin/stdout...
    606 
    607 if os.name == 'posix':
    608     import fcntl
    609 
    610     class file_wrapper:
    611         # Here we override just enough to make a file
    612         # look like a socket for the purposes of asyncore.
    613         # The passed fd is automatically os.dup()'d
    614 
    615         def __init__(self, fd):
    616             self.fd = os.dup(fd)
    617 
    618         def recv(self, *args):
    619             return os.read(self.fd, *args)
    620 
    621         def send(self, *args):
    622             return os.write(self.fd, *args)
    623 
    624         def getsockopt(self, level, optname, buflen=None):
    625             if (level == socket.SOL_SOCKET and
    626                 optname == socket.SO_ERROR and
    627                 not buflen):
    628                 return 0
    629             raise NotImplementedError("Only asyncore specific behaviour "
    630                                       "implemented.")
    631 
    632         read = recv
    633         write = send
    634 
    635         def close(self):
    636             if self.fd < 0:
    637                 return
    638             fd = self.fd
    639             self.fd = -1
    640             os.close(fd)
    641 
    642         def fileno(self):
    643             return self.fd
    644 
    645     class file_dispatcher(dispatcher):
    646 
    647         def __init__(self, fd, map=None):
    648             dispatcher.__init__(self, None, map)
    649             self.connected = True
    650             try:
    651                 fd = fd.fileno()
    652             except AttributeError:
    653                 pass
    654             self.set_file(fd)
    655             # set it to non-blocking mode
    656             flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
    657             flags = flags | os.O_NONBLOCK
    658             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
    659 
    660         def set_file(self, fd):
    661             self.socket = file_wrapper(fd)
    662             self._fileno = self.socket.fileno()
    663             self.add_channel()
    664