Home | History | Annotate | Download | only in test
      1 # test asynchat
      2 
      3 import errno
      4 import asyncore
      5 import asynchat
      6 import socket
      7 import time
      8 import unittest
      9 import sys
     10 from test import test_support
     11 try:
     12     import threading
     13 except ImportError:
     14     threading = None
     15 
     16 HOST = test_support.HOST
     17 SERVER_QUIT = 'QUIT\n'
     18 
     19 if threading:
     20     class echo_server(threading.Thread):
     21         # parameter to determine the number of bytes passed back to the
     22         # client each send
     23         chunk_size = 1
     24 
     25         def __init__(self, event):
     26             threading.Thread.__init__(self)
     27             self.event = event
     28             self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     29             self.port = test_support.bind_port(self.sock)
     30             # This will be set if the client wants us to wait before echoing data
     31             # back.
     32             self.start_resend_event = None
     33 
     34         def run(self):
     35             self.sock.listen(1)
     36             self.event.set()
     37             conn, client = self.sock.accept()
     38             self.buffer = ""
     39             # collect data until quit message is seen
     40             while SERVER_QUIT not in self.buffer:
     41                 data = conn.recv(1)
     42                 if not data:
     43                     break
     44                 self.buffer = self.buffer + data
     45 
     46             # remove the SERVER_QUIT message
     47             self.buffer = self.buffer.replace(SERVER_QUIT, '')
     48 
     49             if self.start_resend_event:
     50                 self.start_resend_event.wait()
     51 
     52             # re-send entire set of collected data
     53             try:
     54                 # this may fail on some tests, such as test_close_when_done, since
     55                 # the client closes the channel when it's done sending
     56                 while self.buffer:
     57                     n = conn.send(self.buffer[:self.chunk_size])
     58                     time.sleep(0.001)
     59                     self.buffer = self.buffer[n:]
     60             except:
     61                 pass
     62 
     63             conn.close()
     64             self.sock.close()
     65 
     66     class echo_client(asynchat.async_chat):
     67 
     68         def __init__(self, terminator, server_port):
     69             asynchat.async_chat.__init__(self)
     70             self.contents = []
     71             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
     72             self.connect((HOST, server_port))
     73             self.set_terminator(terminator)
     74             self.buffer = ''
     75 
     76         def handle_connect(self):
     77             pass
     78 
     79         if sys.platform == 'darwin':
     80             # select.poll returns a select.POLLHUP at the end of the tests
     81             # on darwin, so just ignore it
     82             def handle_expt(self):
     83                 pass
     84 
     85         def collect_incoming_data(self, data):
     86             self.buffer += data
     87 
     88         def found_terminator(self):
     89             self.contents.append(self.buffer)
     90             self.buffer = ""
     91 
     92 
     93     def start_echo_server():
     94         event = threading.Event()
     95         s = echo_server(event)
     96         s.start()
     97         event.wait()
     98         event.clear()
     99         time.sleep(0.01) # Give server time to start accepting.
    100         return s, event
    101 
    102 
    103 @unittest.skipUnless(threading, 'Threading required for this test.')
    104 class TestAsynchat(unittest.TestCase):
    105     usepoll = False
    106 
    107     def setUp (self):
    108         self._threads = test_support.threading_setup()
    109 
    110     def tearDown (self):
    111         test_support.threading_cleanup(*self._threads)
    112 
    113     def line_terminator_check(self, term, server_chunk):
    114         event = threading.Event()
    115         s = echo_server(event)
    116         s.chunk_size = server_chunk
    117         s.start()
    118         event.wait()
    119         event.clear()
    120         time.sleep(0.01) # Give server time to start accepting.
    121         c = echo_client(term, s.port)
    122         c.push("hello ")
    123         c.push("world%s" % term)
    124         c.push("I'm not dead yet!%s" % term)
    125         c.push(SERVER_QUIT)
    126         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    127         s.join()
    128 
    129         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
    130 
    131     # the line terminator tests below check receiving variously-sized
    132     # chunks back from the server in order to exercise all branches of
    133     # async_chat.handle_read
    134 
    135     def test_line_terminator1(self):
    136         # test one-character terminator
    137         for l in (1,2,3):
    138             self.line_terminator_check('\n', l)
    139 
    140     def test_line_terminator2(self):
    141         # test two-character terminator
    142         for l in (1,2,3):
    143             self.line_terminator_check('\r\n', l)
    144 
    145     def test_line_terminator3(self):
    146         # test three-character terminator
    147         for l in (1,2,3):
    148             self.line_terminator_check('qqq', l)
    149 
    150     def numeric_terminator_check(self, termlen):
    151         # Try reading a fixed number of bytes
    152         s, event = start_echo_server()
    153         c = echo_client(termlen, s.port)
    154         data = "hello world, I'm not dead yet!\n"
    155         c.push(data)
    156         c.push(SERVER_QUIT)
    157         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    158         s.join()
    159 
    160         self.assertEqual(c.contents, [data[:termlen]])
    161 
    162     def test_numeric_terminator1(self):
    163         # check that ints & longs both work (since type is
    164         # explicitly checked in async_chat.handle_read)
    165         self.numeric_terminator_check(1)
    166         self.numeric_terminator_check(1L)
    167 
    168     def test_numeric_terminator2(self):
    169         self.numeric_terminator_check(6L)
    170 
    171     def test_none_terminator(self):
    172         # Try reading a fixed number of bytes
    173         s, event = start_echo_server()
    174         c = echo_client(None, s.port)
    175         data = "hello world, I'm not dead yet!\n"
    176         c.push(data)
    177         c.push(SERVER_QUIT)
    178         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    179         s.join()
    180 
    181         self.assertEqual(c.contents, [])
    182         self.assertEqual(c.buffer, data)
    183 
    184     def test_simple_producer(self):
    185         s, event = start_echo_server()
    186         c = echo_client('\n', s.port)
    187         data = "hello world\nI'm not dead yet!\n"
    188         p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
    189         c.push_with_producer(p)
    190         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    191         s.join()
    192 
    193         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
    194 
    195     def test_string_producer(self):
    196         s, event = start_echo_server()
    197         c = echo_client('\n', s.port)
    198         data = "hello world\nI'm not dead yet!\n"
    199         c.push_with_producer(data+SERVER_QUIT)
    200         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    201         s.join()
    202 
    203         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
    204 
    205     def test_empty_line(self):
    206         # checks that empty lines are handled correctly
    207         s, event = start_echo_server()
    208         c = echo_client('\n', s.port)
    209         c.push("hello world\n\nI'm not dead yet!\n")
    210         c.push(SERVER_QUIT)
    211         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    212         s.join()
    213 
    214         self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
    215 
    216     def test_close_when_done(self):
    217         s, event = start_echo_server()
    218         s.start_resend_event = threading.Event()
    219         c = echo_client('\n', s.port)
    220         c.push("hello world\nI'm not dead yet!\n")
    221         c.push(SERVER_QUIT)
    222         c.close_when_done()
    223         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
    224 
    225         # Only allow the server to start echoing data back to the client after
    226         # the client has closed its connection.  This prevents a race condition
    227         # where the server echoes all of its data before we can check that it
    228         # got any down below.
    229         s.start_resend_event.set()
    230         s.join()
    231 
    232         self.assertEqual(c.contents, [])
    233         # the server might have been able to send a byte or two back, but this
    234         # at least checks that it received something and didn't just fail
    235         # (which could still result in the client not having received anything)
    236         self.assertTrue(len(s.buffer) > 0)
    237 
    238 
    239 class TestAsynchat_WithPoll(TestAsynchat):
    240     usepoll = True
    241 
    242 
    243 class TestAsynchatMocked(unittest.TestCase):
    244     def test_blockingioerror(self):
    245         # Issue #16133: handle_read() must ignore blocking I/O errors like
    246         # EAGAIN
    247         class fake_socket:
    248             def fileno(self):
    249                 return 0
    250 
    251             def recv(self, size):
    252                 raise socket.error(errno.EAGAIN, "EAGAIN")
    253 
    254         class MyChat(asynchat.async_chat):
    255             def handle_error(self):
    256                 raise Exception("error")
    257 
    258         sock = fake_socket()
    259         dispatcher = MyChat()
    260         dispatcher.set_socket(sock)
    261         self.addCleanup(dispatcher.del_channel)
    262 
    263         # must not call handle_error()
    264         dispatcher.handle_read()
    265 
    266 
    267 class TestHelperFunctions(unittest.TestCase):
    268     def test_find_prefix_at_end(self):
    269         self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
    270         self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
    271 
    272 class TestFifo(unittest.TestCase):
    273     def test_basic(self):
    274         f = asynchat.fifo()
    275         f.push(7)
    276         f.push('a')
    277         self.assertEqual(len(f), 2)
    278         self.assertEqual(f.first(), 7)
    279         self.assertEqual(f.pop(), (1, 7))
    280         self.assertEqual(len(f), 1)
    281         self.assertEqual(f.first(), 'a')
    282         self.assertEqual(f.is_empty(), False)
    283         self.assertEqual(f.pop(), (1, 'a'))
    284         self.assertEqual(len(f), 0)
    285         self.assertEqual(f.is_empty(), True)
    286         self.assertEqual(f.pop(), (0, None))
    287 
    288     def test_given_list(self):
    289         f = asynchat.fifo(['x', 17, 3])
    290         self.assertEqual(len(f), 3)
    291         self.assertEqual(f.pop(), (1, 'x'))
    292         self.assertEqual(f.pop(), (1, 17))
    293         self.assertEqual(f.pop(), (1, 3))
    294         self.assertEqual(f.pop(), (0, None))
    295 
    296 
    297 def test_main(verbose=None):
    298     test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
    299                               TestAsynchatMocked,
    300                               TestHelperFunctions, TestFifo)
    301 
    302 if __name__ == "__main__":
    303     test_main(verbose=True)
    304