Home | History | Annotate | Download | only in test
      1 # test asynchat
      2 
      3 import asyncore, asynchat, socket, time
      4 import unittest
      5 import sys
      6 from test import test_support
      7 try:
      8     import threading
      9 except ImportError:
     10     threading = None
     11 
     12 HOST = test_support.HOST
     13 SERVER_QUIT = 'QUIT\n'
     14 
     15 if threading:
     16     class echo_server(threading.Thread):
     17         # parameter to determine the number of bytes passed back to the
     18         # client each send
     19         chunk_size = 1
     20 
     21         def __init__(self, event):
     22             threading.Thread.__init__(self)
     23             self.event = event
     24             self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     25             self.port = test_support.bind_port(self.sock)
     26             # This will be set if the client wants us to wait before echoing data
     27             # back.
     28             self.start_resend_event = None
     29 
     30         def run(self):
     31             self.sock.listen(1)
     32             self.event.set()
     33             conn, client = self.sock.accept()
     34             self.buffer = ""
     35             # collect data until quit message is seen
     36             while SERVER_QUIT not in self.buffer:
     37                 data = conn.recv(1)
     38                 if not data:
     39                     break
     40                 self.buffer = self.buffer + data
     41 
     42             # remove the SERVER_QUIT message
     43             self.buffer = self.buffer.replace(SERVER_QUIT, '')
     44 
     45             if self.start_resend_event:
     46                 self.start_resend_event.wait()
     47 
     48             # re-send entire set of collected data
     49             try:
     50                 # this may fail on some tests, such as test_close_when_done, since
     51                 # the client closes the channel when it's done sending
     52                 while self.buffer:
     53                     n = conn.send(self.buffer[:self.chunk_size])
     54                     time.sleep(0.001)
     55                     self.buffer = self.buffer[n:]
     56             except:
     57                 pass
     58 
     59             conn.close()
     60             self.sock.close()
     61 
     62     class echo_client(asynchat.async_chat):
     63 
     64         def __init__(self, terminator, server_port):
     65             asynchat.async_chat.__init__(self)
     66             self.contents = []
     67             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
     68             self.connect((HOST, server_port))
     69             self.set_terminator(terminator)
     70             self.buffer = ''
     71 
     72         def handle_connect(self):
     73             pass
     74 
     75         if sys.platform == 'darwin':
     76             # select.poll returns a select.POLLHUP at the end of the tests
     77             # on darwin, so just ignore it
     78             def handle_expt(self):
     79                 pass
     80 
     81         def collect_incoming_data(self, data):
     82             self.buffer += data
     83 
     84         def found_terminator(self):
     85             self.contents.append(self.buffer)
     86             self.buffer = ""
     87 
     88 
     89     def start_echo_server():
     90         event = threading.Event()
     91         s = echo_server(event)
     92         s.start()
     93         event.wait()
     94         event.clear()
     95         time.sleep(0.01) # Give server time to start accepting.
     96         return s, event
     97 
     98 
     99 @unittest.skipUnless(threading, 'Threading required for this test.')
    100 class TestAsynchat(unittest.TestCase):
    101     usepoll = False
    102 
    103     def setUp (self):
    104         self._threads = test_support.threading_setup()
    105 
    106     def tearDown (self):
    107         test_support.threading_cleanup(*self._threads)
    108 
    109     def line_terminator_check(self, term, server_chunk):
    110         event = threading.Event()
    111         s = echo_server(event)
    112         s.chunk_size = server_chunk
    113         s.start()
    114         event.wait()
    115         event.clear()
    116         time.sleep(0.01) # Give server time to start accepting.
    117         c = echo_client(term, s.port)
    118         c.push("hello ")
    119         c.push("world%s" % term)
    120         c.push("I'm not dead yet!%s" % term)
    121         c.push(SERVER_QUIT)
    122         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    123         s.join()
    124 
    125         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
    126 
    127     # the line terminator tests below check receiving variously-sized
    128     # chunks back from the server in order to exercise all branches of
    129     # async_chat.handle_read
    130 
    131     def test_line_terminator1(self):
    132         # test one-character terminator
    133         for l in (1,2,3):
    134             self.line_terminator_check('\n', l)
    135 
    136     def test_line_terminator2(self):
    137         # test two-character terminator
    138         for l in (1,2,3):
    139             self.line_terminator_check('\r\n', l)
    140 
    141     def test_line_terminator3(self):
    142         # test three-character terminator
    143         for l in (1,2,3):
    144             self.line_terminator_check('qqq', l)
    145 
    146     def numeric_terminator_check(self, termlen):
    147         # Try reading a fixed number of bytes
    148         s, event = start_echo_server()
    149         c = echo_client(termlen, s.port)
    150         data = "hello world, I'm not dead yet!\n"
    151         c.push(data)
    152         c.push(SERVER_QUIT)
    153         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    154         s.join()
    155 
    156         self.assertEqual(c.contents, [data[:termlen]])
    157 
    158     def test_numeric_terminator1(self):
    159         # check that ints & longs both work (since type is
    160         # explicitly checked in async_chat.handle_read)
    161         self.numeric_terminator_check(1)
    162         self.numeric_terminator_check(1L)
    163 
    164     def test_numeric_terminator2(self):
    165         self.numeric_terminator_check(6L)
    166 
    167     def test_none_terminator(self):
    168         # Try reading a fixed number of bytes
    169         s, event = start_echo_server()
    170         c = echo_client(None, s.port)
    171         data = "hello world, I'm not dead yet!\n"
    172         c.push(data)
    173         c.push(SERVER_QUIT)
    174         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    175         s.join()
    176 
    177         self.assertEqual(c.contents, [])
    178         self.assertEqual(c.buffer, data)
    179 
    180     def test_simple_producer(self):
    181         s, event = start_echo_server()
    182         c = echo_client('\n', s.port)
    183         data = "hello world\nI'm not dead yet!\n"
    184         p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
    185         c.push_with_producer(p)
    186         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    187         s.join()
    188 
    189         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
    190 
    191     def test_string_producer(self):
    192         s, event = start_echo_server()
    193         c = echo_client('\n', s.port)
    194         data = "hello world\nI'm not dead yet!\n"
    195         c.push_with_producer(data+SERVER_QUIT)
    196         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    197         s.join()
    198 
    199         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
    200 
    201     def test_empty_line(self):
    202         # checks that empty lines are handled correctly
    203         s, event = start_echo_server()
    204         c = echo_client('\n', s.port)
    205         c.push("hello world\n\nI'm not dead yet!\n")
    206         c.push(SERVER_QUIT)
    207         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    208         s.join()
    209 
    210         self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
    211 
    212     def test_close_when_done(self):
    213         s, event = start_echo_server()
    214         s.start_resend_event = threading.Event()
    215         c = echo_client('\n', s.port)
    216         c.push("hello world\nI'm not dead yet!\n")
    217         c.push(SERVER_QUIT)
    218         c.close_when_done()
    219         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    220 
    221         # Only allow the server to start echoing data back to the client after
    222         # the client has closed its connection.  This prevents a race condition
    223         # where the server echoes all of its data before we can check that it
    224         # got any down below.
    225         s.start_resend_event.set()
    226         s.join()
    227 
    228         self.assertEqual(c.contents, [])
    229         # the server might have been able to send a byte or two back, but this
    230         # at least checks that it received something and didn't just fail
    231         # (which could still result in the client not having received anything)
    232         self.assertTrue(len(s.buffer) > 0)
    233 
    234 
    235 class TestAsynchat_WithPoll(TestAsynchat):
    236     usepoll = True
    237 
    238 class TestHelperFunctions(unittest.TestCase):
    239     def test_find_prefix_at_end(self):
    240         self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
    241         self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
    242 
    243 class TestFifo(unittest.TestCase):
    244     def test_basic(self):
    245         f = asynchat.fifo()
    246         f.push(7)
    247         f.push('a')
    248         self.assertEqual(len(f), 2)
    249         self.assertEqual(f.first(), 7)
    250         self.assertEqual(f.pop(), (1, 7))
    251         self.assertEqual(len(f), 1)
    252         self.assertEqual(f.first(), 'a')
    253         self.assertEqual(f.is_empty(), False)
    254         self.assertEqual(f.pop(), (1, 'a'))
    255         self.assertEqual(len(f), 0)
    256         self.assertEqual(f.is_empty(), True)
    257         self.assertEqual(f.pop(), (0, None))
    258 
    259     def test_given_list(self):
    260         f = asynchat.fifo(['x', 17, 3])
    261         self.assertEqual(len(f), 3)
    262         self.assertEqual(f.pop(), (1, 'x'))
    263         self.assertEqual(f.pop(), (1, 17))
    264         self.assertEqual(f.pop(), (1, 3))
    265         self.assertEqual(f.pop(), (0, None))
    266 
    267 
    268 def test_main(verbose=None):
    269     test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
    270                               TestHelperFunctions, TestFifo)
    271 
    272 if __name__ == "__main__":
    273     test_main(verbose=True)
    274