Home | History | Annotate | Download | only in python2.7
      1 # -*- Mode: Python; tab-width: 4 -*-
      2 #       Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
      3 #       Author: Sam Rushing <rushing (at] nightmare.com>
      4 
      5 # ======================================================================
      6 # Copyright 1996 by Sam Rushing
      7 #
      8 #                         All Rights Reserved
      9 #
     10 # Permission to use, copy, modify, and distribute this software and
     11 # its documentation for any purpose and without fee is hereby
     12 # granted, provided that the above copyright notice appear in all
     13 # copies and that both that copyright notice and this permission
     14 # notice appear in supporting documentation, and that the name of Sam
     15 # Rushing not be used in advertising or publicity pertaining to
     16 # distribution of the software without specific, written prior
     17 # permission.
     18 #
     19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
     20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
     21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
     22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
     23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
     24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
     25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     26 # ======================================================================
     27 
     28 r"""A class supporting chat-style (command/response) protocols.
     29 
     30 This class adds support for 'chat' style protocols - where one side
     31 sends a 'command', and the other sends a response (examples would be
     32 the common internet protocols - smtp, nntp, ftp, etc..).
     33 
     34 The handle_read() method looks at the input stream for the current
     35 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
     36 for multi-line output), calling self.found_terminator() on its
     37 receipt.
     38 
     39 for example:
     40 Say you build an async nntp client using this class.  At the start
     41 of the connection, you'll have self.terminator set to '\r\n', in
     42 order to process the single-line greeting.  Just before issuing a
     43 'LIST' command you'll set it to '\r\n.\r\n'.  The output of the LIST
     44 command will be accumulated (using your own 'collect_incoming_data'
     45 method) up to the terminator, and then control will be returned to
     46 you - by calling your self.found_terminator() method.
     47 """
     48 
     49 import socket
     50 import asyncore
     51 from collections import deque
     52 from sys import py3kwarning
     53 from warnings import filterwarnings, catch_warnings
     54 
     55 class async_chat (asyncore.dispatcher):
     56     """This is an abstract class.  You must derive from this class, and add
     57     the two methods collect_incoming_data() and found_terminator()"""
     58 
     59     # these are overridable defaults
     60 
     61     ac_in_buffer_size       = 4096
     62     ac_out_buffer_size      = 4096
     63 
     64     def __init__ (self, sock=None, map=None):
     65         # for string terminator matching
     66         self.ac_in_buffer = ''
     67 
     68         # we use a list here rather than cStringIO for a few reasons...
     69         # del lst[:] is faster than sio.truncate(0)
     70         # lst = [] is faster than sio.truncate(0)
     71         # cStringIO will be gaining unicode support in py3k, which
     72         # will negatively affect the performance of bytes compared to
     73         # a ''.join() equivalent
     74         self.incoming = []
     75 
     76         # we toss the use of the "simple producer" and replace it with
     77         # a pure deque, which the original fifo was a wrapping of
     78         self.producer_fifo = deque()
     79         asyncore.dispatcher.__init__ (self, sock, map)
     80 
     81     def collect_incoming_data(self, data):
     82         raise NotImplementedError("must be implemented in subclass")
     83 
     84     def _collect_incoming_data(self, data):
     85         self.incoming.append(data)
     86 
     87     def _get_data(self):
     88         d = ''.join(self.incoming)
     89         del self.incoming[:]
     90         return d
     91 
     92     def found_terminator(self):
     93         raise NotImplementedError("must be implemented in subclass")
     94 
     95     def set_terminator (self, term):
     96         "Set the input delimiter.  Can be a fixed string of any length, an integer, or None"
     97         self.terminator = term
     98 
     99     def get_terminator (self):
    100         return self.terminator
    101 
    102     # grab some more data from the socket,
    103     # throw it to the collector method,
    104     # check for the terminator,
    105     # if found, transition to the next state.
    106 
    107     def handle_read (self):
    108 
    109         try:
    110             data = self.recv (self.ac_in_buffer_size)
    111         except socket.error, why:
    112             self.handle_error()
    113             return
    114 
    115         self.ac_in_buffer = self.ac_in_buffer + data
    116 
    117         # Continue to search for self.terminator in self.ac_in_buffer,
    118         # while calling self.collect_incoming_data.  The while loop
    119         # is necessary because we might read several data+terminator
    120         # combos with a single recv(4096).
    121 
    122         while self.ac_in_buffer:
    123             lb = len(self.ac_in_buffer)
    124             terminator = self.get_terminator()
    125             if not terminator:
    126                 # no terminator, collect it all
    127                 self.collect_incoming_data (self.ac_in_buffer)
    128                 self.ac_in_buffer = ''
    129             elif isinstance(terminator, int) or isinstance(terminator, long):
    130                 # numeric terminator
    131                 n = terminator
    132                 if lb < n:
    133                     self.collect_incoming_data (self.ac_in_buffer)
    134                     self.ac_in_buffer = ''
    135                     self.terminator = self.terminator - lb
    136                 else:
    137                     self.collect_incoming_data (self.ac_in_buffer[:n])
    138                     self.ac_in_buffer = self.ac_in_buffer[n:]
    139                     self.terminator = 0
    140                     self.found_terminator()
    141             else:
    142                 # 3 cases:
    143                 # 1) end of buffer matches terminator exactly:
    144                 #    collect data, transition
    145                 # 2) end of buffer matches some prefix:
    146                 #    collect data to the prefix
    147                 # 3) end of buffer does not match any prefix:
    148                 #    collect data
    149                 terminator_len = len(terminator)
    150                 index = self.ac_in_buffer.find(terminator)
    151                 if index != -1:
    152                     # we found the terminator
    153                     if index > 0:
    154                         # don't bother reporting the empty string (source of subtle bugs)
    155                         self.collect_incoming_data (self.ac_in_buffer[:index])
    156                     self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
    157                     # This does the Right Thing if the terminator is changed here.
    158                     self.found_terminator()
    159                 else:
    160                     # check for a prefix of the terminator
    161                     index = find_prefix_at_end (self.ac_in_buffer, terminator)
    162                     if index:
    163                         if index != lb:
    164                             # we found a prefix, collect up to the prefix
    165                             self.collect_incoming_data (self.ac_in_buffer[:-index])
    166                             self.ac_in_buffer = self.ac_in_buffer[-index:]
    167                         break
    168                     else:
    169                         # no prefix, collect it all
    170                         self.collect_incoming_data (self.ac_in_buffer)
    171                         self.ac_in_buffer = ''
    172 
    173     def handle_write (self):
    174         self.initiate_send()
    175 
    176     def handle_close (self):
    177         self.close()
    178 
    179     def push (self, data):
    180         sabs = self.ac_out_buffer_size
    181         if len(data) > sabs:
    182             for i in xrange(0, len(data), sabs):
    183                 self.producer_fifo.append(data[i:i+sabs])
    184         else:
    185             self.producer_fifo.append(data)
    186         self.initiate_send()
    187 
    188     def push_with_producer (self, producer):
    189         self.producer_fifo.append(producer)
    190         self.initiate_send()
    191 
    192     def readable (self):
    193         "predicate for inclusion in the readable for select()"
    194         # cannot use the old predicate, it violates the claim of the
    195         # set_terminator method.
    196 
    197         # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
    198         return 1
    199 
    200     def writable (self):
    201         "predicate for inclusion in the writable for select()"
    202         return self.producer_fifo or (not self.connected)
    203 
    204     def close_when_done (self):
    205         "automatically close this channel once the outgoing queue is empty"
    206         self.producer_fifo.append(None)
    207 
    208     def initiate_send(self):
    209         while self.producer_fifo and self.connected:
    210             first = self.producer_fifo[0]
    211             # handle empty string/buffer or None entry
    212             if not first:
    213                 del self.producer_fifo[0]
    214                 if first is None:
    215                     self.handle_close()
    216                     return
    217 
    218             # handle classic producer behavior
    219             obs = self.ac_out_buffer_size
    220             try:
    221                 with catch_warnings():
    222                     if py3kwarning:
    223                         filterwarnings("ignore", ".*buffer", DeprecationWarning)
    224                     data = buffer(first, 0, obs)
    225             except TypeError:
    226                 data = first.more()
    227                 if data:
    228                     self.producer_fifo.appendleft(data)
    229                 else:
    230                     del self.producer_fifo[0]
    231                 continue
    232 
    233             # send the data
    234             try:
    235                 num_sent = self.send(data)
    236             except socket.error:
    237                 self.handle_error()
    238                 return
    239 
    240             if num_sent:
    241                 if num_sent < len(data) or obs < len(first):
    242                     self.producer_fifo[0] = first[num_sent:]
    243                 else:
    244                     del self.producer_fifo[0]
    245             # we tried to send some actual data
    246             return
    247 
    248     def discard_buffers (self):
    249         # Emergencies only!
    250         self.ac_in_buffer = ''
    251         del self.incoming[:]
    252         self.producer_fifo.clear()
    253 
    254 class simple_producer:
    255 
    256     def __init__ (self, data, buffer_size=512):
    257         self.data = data
    258         self.buffer_size = buffer_size
    259 
    260     def more (self):
    261         if len (self.data) > self.buffer_size:
    262             result = self.data[:self.buffer_size]
    263             self.data = self.data[self.buffer_size:]
    264             return result
    265         else:
    266             result = self.data
    267             self.data = ''
    268             return result
    269 
    270 class fifo:
    271     def __init__ (self, list=None):
    272         if not list:
    273             self.list = deque()
    274         else:
    275             self.list = deque(list)
    276 
    277     def __len__ (self):
    278         return len(self.list)
    279 
    280     def is_empty (self):
    281         return not self.list
    282 
    283     def first (self):
    284         return self.list[0]
    285 
    286     def push (self, data):
    287         self.list.append(data)
    288 
    289     def pop (self):
    290         if self.list:
    291             return (1, self.list.popleft())
    292         else:
    293             return (0, None)
    294 
    295 # Given 'haystack', see if any prefix of 'needle' is at its end.  This
    296 # assumes an exact match has already been checked.  Return the number of
    297 # characters matched.
    298 # for example:
    299 # f_p_a_e ("qwerty\r", "\r\n") => 1
    300 # f_p_a_e ("qwertydkjf", "\r\n") => 0
    301 # f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
    302 
    303 # this could maybe be made faster with a computed regex?
    304 # [answer: no; circa Python-2.0, Jan 2001]
    305 # new python:   28961/s
    306 # old python:   18307/s
    307 # re:        12820/s
    308 # regex:     14035/s
    309 
    310 def find_prefix_at_end (haystack, needle):
    311     l = len(needle) - 1
    312     while l and not haystack.endswith(needle[:l]):
    313         l -= 1
    314     return l
    315