Home | History | Annotate | Download | only in Lib
      1 # -*- Mode: Python; tab-width: 4 -*-
      2 #       Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
      3 #       Author: Sam Rushing <rushing (at] nightmare.com>
      4 
      5 # ======================================================================
      6 # Copyright 1996 by Sam Rushing
      7 #
      8 #                         All Rights Reserved
      9 #
     10 # Permission to use, copy, modify, and distribute this software and
     11 # its documentation for any purpose and without fee is hereby
     12 # granted, provided that the above copyright notice appear in all
     13 # copies and that both that copyright notice and this permission
     14 # notice appear in supporting documentation, and that the name of Sam
     15 # Rushing not be used in advertising or publicity pertaining to
     16 # distribution of the software without specific, written prior
     17 # permission.
     18 #
     19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
     20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
     21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
     22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
     23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
     24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
     25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     26 # ======================================================================
     27 
     28 r"""A class supporting chat-style (command/response) protocols.
     29 
     30 This class adds support for 'chat' style protocols - where one side
     31 sends a 'command', and the other sends a response (examples would be
     32 the common internet protocols - smtp, nntp, ftp, etc..).
     33 
     34 The handle_read() method looks at the input stream for the current
     35 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
     36 for multi-line output), calling self.found_terminator() on its
     37 receipt.
     38 
     39 for example:
     40 Say you build an async nntp client using this class.  At the start
     41 of the connection, you'll have self.terminator set to '\r\n', in
     42 order to process the single-line greeting.  Just before issuing a
     43 'LIST' command you'll set it to '\r\n.\r\n'.  The output of the LIST
     44 command will be accumulated (using your own 'collect_incoming_data'
     45 method) up to the terminator, and then control will be returned to
     46 you - by calling your self.found_terminator() method.
     47 """
     48 
     49 import asyncore
     50 import errno
     51 import socket
     52 from collections import deque
     53 from sys import py3kwarning
     54 from warnings import filterwarnings, catch_warnings
     55 
     56 _BLOCKING_IO_ERRORS = (errno.EAGAIN, errno.EALREADY, errno.EINPROGRESS,
     57                        errno.EWOULDBLOCK)
     58 
     59 
     60 class async_chat (asyncore.dispatcher):
     61     """This is an abstract class.  You must derive from this class, and add
     62     the two methods collect_incoming_data() and found_terminator()"""
     63 
     64     # these are overridable defaults
     65 
     66     ac_in_buffer_size       = 4096
     67     ac_out_buffer_size      = 4096
     68 
     69     def __init__ (self, sock=None, map=None):
     70         # for string terminator matching
     71         self.ac_in_buffer = ''
     72 
     73         # we use a list here rather than cStringIO for a few reasons...
     74         # del lst[:] is faster than sio.truncate(0)
     75         # lst = [] is faster than sio.truncate(0)
     76         # cStringIO will be gaining unicode support in py3k, which
     77         # will negatively affect the performance of bytes compared to
     78         # a ''.join() equivalent
     79         self.incoming = []
     80 
     81         # we toss the use of the "simple producer" and replace it with
     82         # a pure deque, which the original fifo was a wrapping of
     83         self.producer_fifo = deque()
     84         asyncore.dispatcher.__init__ (self, sock, map)
     85 
     86     def collect_incoming_data(self, data):
     87         raise NotImplementedError("must be implemented in subclass")
     88 
     89     def _collect_incoming_data(self, data):
     90         self.incoming.append(data)
     91 
     92     def _get_data(self):
     93         d = ''.join(self.incoming)
     94         del self.incoming[:]
     95         return d
     96 
     97     def found_terminator(self):
     98         raise NotImplementedError("must be implemented in subclass")
     99 
    100     def set_terminator (self, term):
    101         "Set the input delimiter.  Can be a fixed string of any length, an integer, or None"
    102         self.terminator = term
    103 
    104     def get_terminator (self):
    105         return self.terminator
    106 
    107     # grab some more data from the socket,
    108     # throw it to the collector method,
    109     # check for the terminator,
    110     # if found, transition to the next state.
    111 
    112     def handle_read (self):
    113 
    114         try:
    115             data = self.recv (self.ac_in_buffer_size)
    116         except socket.error, why:
    117             if why.args[0] in _BLOCKING_IO_ERRORS:
    118                 return
    119             self.handle_error()
    120             return
    121 
    122         self.ac_in_buffer = self.ac_in_buffer + data
    123 
    124         # Continue to search for self.terminator in self.ac_in_buffer,
    125         # while calling self.collect_incoming_data.  The while loop
    126         # is necessary because we might read several data+terminator
    127         # combos with a single recv(4096).
    128 
    129         while self.ac_in_buffer:
    130             lb = len(self.ac_in_buffer)
    131             terminator = self.get_terminator()
    132             if not terminator:
    133                 # no terminator, collect it all
    134                 self.collect_incoming_data (self.ac_in_buffer)
    135                 self.ac_in_buffer = ''
    136             elif isinstance(terminator, int) or isinstance(terminator, long):
    137                 # numeric terminator
    138                 n = terminator
    139                 if lb < n:
    140                     self.collect_incoming_data (self.ac_in_buffer)
    141                     self.ac_in_buffer = ''
    142                     self.terminator = self.terminator - lb
    143                 else:
    144                     self.collect_incoming_data (self.ac_in_buffer[:n])
    145                     self.ac_in_buffer = self.ac_in_buffer[n:]
    146                     self.terminator = 0
    147                     self.found_terminator()
    148             else:
    149                 # 3 cases:
    150                 # 1) end of buffer matches terminator exactly:
    151                 #    collect data, transition
    152                 # 2) end of buffer matches some prefix:
    153                 #    collect data to the prefix
    154                 # 3) end of buffer does not match any prefix:
    155                 #    collect data
    156                 terminator_len = len(terminator)
    157                 index = self.ac_in_buffer.find(terminator)
    158                 if index != -1:
    159                     # we found the terminator
    160                     if index > 0:
    161                         # don't bother reporting the empty string (source of subtle bugs)
    162                         self.collect_incoming_data (self.ac_in_buffer[:index])
    163                     self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
    164                     # This does the Right Thing if the terminator is changed here.
    165                     self.found_terminator()
    166                 else:
    167                     # check for a prefix of the terminator
    168                     index = find_prefix_at_end (self.ac_in_buffer, terminator)
    169                     if index:
    170                         if index != lb:
    171                             # we found a prefix, collect up to the prefix
    172                             self.collect_incoming_data (self.ac_in_buffer[:-index])
    173                             self.ac_in_buffer = self.ac_in_buffer[-index:]
    174                         break
    175                     else:
    176                         # no prefix, collect it all
    177                         self.collect_incoming_data (self.ac_in_buffer)
    178                         self.ac_in_buffer = ''
    179 
    180     def handle_write (self):
    181         self.initiate_send()
    182 
    183     def handle_close (self):
    184         self.close()
    185 
    186     def push (self, data):
    187         sabs = self.ac_out_buffer_size
    188         if len(data) > sabs:
    189             for i in xrange(0, len(data), sabs):
    190                 self.producer_fifo.append(data[i:i+sabs])
    191         else:
    192             self.producer_fifo.append(data)
    193         self.initiate_send()
    194 
    195     def push_with_producer (self, producer):
    196         self.producer_fifo.append(producer)
    197         self.initiate_send()
    198 
    199     def readable (self):
    200         "predicate for inclusion in the readable for select()"
    201         # cannot use the old predicate, it violates the claim of the
    202         # set_terminator method.
    203 
    204         # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
    205         return 1
    206 
    207     def writable (self):
    208         "predicate for inclusion in the writable for select()"
    209         return self.producer_fifo or (not self.connected)
    210 
    211     def close_when_done (self):
    212         "automatically close this channel once the outgoing queue is empty"
    213         self.producer_fifo.append(None)
    214 
    215     def initiate_send(self):
    216         while self.producer_fifo and self.connected:
    217             first = self.producer_fifo[0]
    218             # handle empty string/buffer or None entry
    219             if not first:
    220                 del self.producer_fifo[0]
    221                 if first is None:
    222                     self.handle_close()
    223                     return
    224 
    225             # handle classic producer behavior
    226             obs = self.ac_out_buffer_size
    227             try:
    228                 with catch_warnings():
    229                     if py3kwarning:
    230                         filterwarnings("ignore", ".*buffer", DeprecationWarning)
    231                     data = buffer(first, 0, obs)
    232             except TypeError:
    233                 data = first.more()
    234                 if data:
    235                     self.producer_fifo.appendleft(data)
    236                 else:
    237                     del self.producer_fifo[0]
    238                 continue
    239 
    240             # send the data
    241             try:
    242                 num_sent = self.send(data)
    243             except socket.error:
    244                 self.handle_error()
    245                 return
    246 
    247             if num_sent:
    248                 if num_sent < len(data) or obs < len(first):
    249                     self.producer_fifo[0] = first[num_sent:]
    250                 else:
    251                     del self.producer_fifo[0]
    252             # we tried to send some actual data
    253             return
    254 
    255     def discard_buffers (self):
    256         # Emergencies only!
    257         self.ac_in_buffer = ''
    258         del self.incoming[:]
    259         self.producer_fifo.clear()
    260 
    261 class simple_producer:
    262 
    263     def __init__ (self, data, buffer_size=512):
    264         self.data = data
    265         self.buffer_size = buffer_size
    266 
    267     def more (self):
    268         if len (self.data) > self.buffer_size:
    269             result = self.data[:self.buffer_size]
    270             self.data = self.data[self.buffer_size:]
    271             return result
    272         else:
    273             result = self.data
    274             self.data = ''
    275             return result
    276 
    277 class fifo:
    278     def __init__ (self, list=None):
    279         if not list:
    280             self.list = deque()
    281         else:
    282             self.list = deque(list)
    283 
    284     def __len__ (self):
    285         return len(self.list)
    286 
    287     def is_empty (self):
    288         return not self.list
    289 
    290     def first (self):
    291         return self.list[0]
    292 
    293     def push (self, data):
    294         self.list.append(data)
    295 
    296     def pop (self):
    297         if self.list:
    298             return (1, self.list.popleft())
    299         else:
    300             return (0, None)
    301 
    302 # Given 'haystack', see if any prefix of 'needle' is at its end.  This
    303 # assumes an exact match has already been checked.  Return the number of
    304 # characters matched.
    305 # for example:
    306 # f_p_a_e ("qwerty\r", "\r\n") => 1
    307 # f_p_a_e ("qwertydkjf", "\r\n") => 0
    308 # f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
    309 
    310 # this could maybe be made faster with a computed regex?
    311 # [answer: no; circa Python-2.0, Jan 2001]
    312 # new python:   28961/s
    313 # old python:   18307/s
    314 # re:        12820/s
    315 # regex:     14035/s
    316 
    317 def find_prefix_at_end (haystack, needle):
    318     l = len(needle) - 1
    319     while l and not haystack.endswith(needle[:l]):
    320         l -= 1
    321     return l
    322