Home | History | Annotate | Download | only in Lib
      1 # -*- Mode: Python; tab-width: 4 -*-
      2 #       Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
      3 #       Author: Sam Rushing <rushing (at] nightmare.com>
      4 
      5 # ======================================================================
      6 # Copyright 1996 by Sam Rushing
      7 #
      8 #                         All Rights Reserved
      9 #
     10 # Permission to use, copy, modify, and distribute this software and
     11 # its documentation for any purpose and without fee is hereby
     12 # granted, provided that the above copyright notice appear in all
     13 # copies and that both that copyright notice and this permission
     14 # notice appear in supporting documentation, and that the name of Sam
     15 # Rushing not be used in advertising or publicity pertaining to
     16 # distribution of the software without specific, written prior
     17 # permission.
     18 #
     19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
     20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
     21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
     22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
     23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
     24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
     25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     26 # ======================================================================
     27 
     28 r"""A class supporting chat-style (command/response) protocols.
     29 
     30 This class adds support for 'chat' style protocols - where one side
     31 sends a 'command', and the other sends a response (examples would be
     32 the common internet protocols - smtp, nntp, ftp, etc..).
     33 
     34 The handle_read() method looks at the input stream for the current
     35 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
     36 for multi-line output), calling self.found_terminator() on its
     37 receipt.
     38 
     39 for example:
     40 Say you build an async nntp client using this class.  At the start
     41 of the connection, you'll have self.terminator set to '\r\n', in
     42 order to process the single-line greeting.  Just before issuing a
     43 'LIST' command you'll set it to '\r\n.\r\n'.  The output of the LIST
     44 command will be accumulated (using your own 'collect_incoming_data'
     45 method) up to the terminator, and then control will be returned to
     46 you - by calling your self.found_terminator() method.
     47 """
     48 import asyncore
     49 from collections import deque
     50 
     51 
     52 class async_chat(asyncore.dispatcher):
     53     """This is an abstract class.  You must derive from this class, and add
     54     the two methods collect_incoming_data() and found_terminator()"""
     55 
     56     # these are overridable defaults
     57 
     58     ac_in_buffer_size = 65536
     59     ac_out_buffer_size = 65536
     60 
     61     # we don't want to enable the use of encoding by default, because that is a
     62     # sign of an application bug that we don't want to pass silently
     63 
     64     use_encoding = 0
     65     encoding = 'latin-1'
     66 
     67     def __init__(self, sock=None, map=None):
     68         # for string terminator matching
     69         self.ac_in_buffer = b''
     70 
     71         # we use a list here rather than io.BytesIO for a few reasons...
     72         # del lst[:] is faster than bio.truncate(0)
     73         # lst = [] is faster than bio.truncate(0)
     74         self.incoming = []
     75 
     76         # we toss the use of the "simple producer" and replace it with
     77         # a pure deque, which the original fifo was a wrapping of
     78         self.producer_fifo = deque()
     79         asyncore.dispatcher.__init__(self, sock, map)
     80 
     81     def collect_incoming_data(self, data):
     82         raise NotImplementedError("must be implemented in subclass")
     83 
     84     def _collect_incoming_data(self, data):
     85         self.incoming.append(data)
     86 
     87     def _get_data(self):
     88         d = b''.join(self.incoming)
     89         del self.incoming[:]
     90         return d
     91 
     92     def found_terminator(self):
     93         raise NotImplementedError("must be implemented in subclass")
     94 
     95     def set_terminator(self, term):
     96         """Set the input delimiter.
     97 
     98         Can be a fixed string of any length, an integer, or None.
     99         """
    100         if isinstance(term, str) and self.use_encoding:
    101             term = bytes(term, self.encoding)
    102         elif isinstance(term, int) and term < 0:
    103             raise ValueError('the number of received bytes must be positive')
    104         self.terminator = term
    105 
    106     def get_terminator(self):
    107         return self.terminator
    108 
    109     # grab some more data from the socket,
    110     # throw it to the collector method,
    111     # check for the terminator,
    112     # if found, transition to the next state.
    113 
    114     def handle_read(self):
    115 
    116         try:
    117             data = self.recv(self.ac_in_buffer_size)
    118         except BlockingIOError:
    119             return
    120         except OSError as why:
    121             self.handle_error()
    122             return
    123 
    124         if isinstance(data, str) and self.use_encoding:
    125             data = bytes(str, self.encoding)
    126         self.ac_in_buffer = self.ac_in_buffer + data
    127 
    128         # Continue to search for self.terminator in self.ac_in_buffer,
    129         # while calling self.collect_incoming_data.  The while loop
    130         # is necessary because we might read several data+terminator
    131         # combos with a single recv(4096).
    132 
    133         while self.ac_in_buffer:
    134             lb = len(self.ac_in_buffer)
    135             terminator = self.get_terminator()
    136             if not terminator:
    137                 # no terminator, collect it all
    138                 self.collect_incoming_data(self.ac_in_buffer)
    139                 self.ac_in_buffer = b''
    140             elif isinstance(terminator, int):
    141                 # numeric terminator
    142                 n = terminator
    143                 if lb < n:
    144                     self.collect_incoming_data(self.ac_in_buffer)
    145                     self.ac_in_buffer = b''
    146                     self.terminator = self.terminator - lb
    147                 else:
    148                     self.collect_incoming_data(self.ac_in_buffer[:n])
    149                     self.ac_in_buffer = self.ac_in_buffer[n:]
    150                     self.terminator = 0
    151                     self.found_terminator()
    152             else:
    153                 # 3 cases:
    154                 # 1) end of buffer matches terminator exactly:
    155                 #    collect data, transition
    156                 # 2) end of buffer matches some prefix:
    157                 #    collect data to the prefix
    158                 # 3) end of buffer does not match any prefix:
    159                 #    collect data
    160                 terminator_len = len(terminator)
    161                 index = self.ac_in_buffer.find(terminator)
    162                 if index != -1:
    163                     # we found the terminator
    164                     if index > 0:
    165                         # don't bother reporting the empty string
    166                         # (source of subtle bugs)
    167                         self.collect_incoming_data(self.ac_in_buffer[:index])
    168                     self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
    169                     # This does the Right Thing if the terminator
    170                     # is changed here.
    171                     self.found_terminator()
    172                 else:
    173                     # check for a prefix of the terminator
    174                     index = find_prefix_at_end(self.ac_in_buffer, terminator)
    175                     if index:
    176                         if index != lb:
    177                             # we found a prefix, collect up to the prefix
    178                             self.collect_incoming_data(self.ac_in_buffer[:-index])
    179                             self.ac_in_buffer = self.ac_in_buffer[-index:]
    180                         break
    181                     else:
    182                         # no prefix, collect it all
    183                         self.collect_incoming_data(self.ac_in_buffer)
    184                         self.ac_in_buffer = b''
    185 
    186     def handle_write(self):
    187         self.initiate_send()
    188 
    189     def handle_close(self):
    190         self.close()
    191 
    192     def push(self, data):
    193         if not isinstance(data, (bytes, bytearray, memoryview)):
    194             raise TypeError('data argument must be byte-ish (%r)',
    195                             type(data))
    196         sabs = self.ac_out_buffer_size
    197         if len(data) > sabs:
    198             for i in range(0, len(data), sabs):
    199                 self.producer_fifo.append(data[i:i+sabs])
    200         else:
    201             self.producer_fifo.append(data)
    202         self.initiate_send()
    203 
    204     def push_with_producer(self, producer):
    205         self.producer_fifo.append(producer)
    206         self.initiate_send()
    207 
    208     def readable(self):
    209         "predicate for inclusion in the readable for select()"
    210         # cannot use the old predicate, it violates the claim of the
    211         # set_terminator method.
    212 
    213         # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
    214         return 1
    215 
    216     def writable(self):
    217         "predicate for inclusion in the writable for select()"
    218         return self.producer_fifo or (not self.connected)
    219 
    220     def close_when_done(self):
    221         "automatically close this channel once the outgoing queue is empty"
    222         self.producer_fifo.append(None)
    223 
    224     def initiate_send(self):
    225         while self.producer_fifo and self.connected:
    226             first = self.producer_fifo[0]
    227             # handle empty string/buffer or None entry
    228             if not first:
    229                 del self.producer_fifo[0]
    230                 if first is None:
    231                     self.handle_close()
    232                     return
    233 
    234             # handle classic producer behavior
    235             obs = self.ac_out_buffer_size
    236             try:
    237                 data = first[:obs]
    238             except TypeError:
    239                 data = first.more()
    240                 if data:
    241                     self.producer_fifo.appendleft(data)
    242                 else:
    243                     del self.producer_fifo[0]
    244                 continue
    245 
    246             if isinstance(data, str) and self.use_encoding:
    247                 data = bytes(data, self.encoding)
    248 
    249             # send the data
    250             try:
    251                 num_sent = self.send(data)
    252             except OSError:
    253                 self.handle_error()
    254                 return
    255 
    256             if num_sent:
    257                 if num_sent < len(data) or obs < len(first):
    258                     self.producer_fifo[0] = first[num_sent:]
    259                 else:
    260                     del self.producer_fifo[0]
    261             # we tried to send some actual data
    262             return
    263 
    264     def discard_buffers(self):
    265         # Emergencies only!
    266         self.ac_in_buffer = b''
    267         del self.incoming[:]
    268         self.producer_fifo.clear()
    269 
    270 
    271 class simple_producer:
    272 
    273     def __init__(self, data, buffer_size=512):
    274         self.data = data
    275         self.buffer_size = buffer_size
    276 
    277     def more(self):
    278         if len(self.data) > self.buffer_size:
    279             result = self.data[:self.buffer_size]
    280             self.data = self.data[self.buffer_size:]
    281             return result
    282         else:
    283             result = self.data
    284             self.data = b''
    285             return result
    286 
    287 
    288 # Given 'haystack', see if any prefix of 'needle' is at its end.  This
    289 # assumes an exact match has already been checked.  Return the number of
    290 # characters matched.
    291 # for example:
    292 # f_p_a_e("qwerty\r", "\r\n") => 1
    293 # f_p_a_e("qwertydkjf", "\r\n") => 0
    294 # f_p_a_e("qwerty\r\n", "\r\n") => <undefined>
    295 
    296 # this could maybe be made faster with a computed regex?
    297 # [answer: no; circa Python-2.0, Jan 2001]
    298 # new python:   28961/s
    299 # old python:   18307/s
    300 # re:        12820/s
    301 # regex:     14035/s
    302 
    303 def find_prefix_at_end(haystack, needle):
    304     l = len(needle) - 1
    305     while l and not haystack.endswith(needle[:l]):
    306         l -= 1
    307     return l
    308