Home | History | Annotate | Download | only in test
      1 import socket
      2 import telnetlib
      3 import time
      4 import Queue
      5 
      6 import unittest
      7 from unittest import TestCase
      8 from test import test_support
      9 threading = test_support.import_module('threading')
     10 
     11 HOST = test_support.HOST
     12 EOF_sigil = object()
     13 
     14 def server(evt, serv, dataq=None):
     15     """ Open a tcp server in three steps
     16         1) set evt to true to let the parent know we are ready
     17         2) [optional] if is not False, write the list of data from dataq.get()
     18            to the socket.
     19     """
     20     serv.listen(5)
     21     evt.set()
     22     try:
     23         conn, addr = serv.accept()
     24         if dataq:
     25             data = ''
     26             new_data = dataq.get(True, 0.5)
     27             dataq.task_done()
     28             for item in new_data:
     29                 if item == EOF_sigil:
     30                     break
     31                 if type(item) in [int, float]:
     32                     time.sleep(item)
     33                 else:
     34                     data += item
     35                 written = conn.send(data)
     36                 data = data[written:]
     37         conn.close()
     38     except socket.timeout:
     39         pass
     40     finally:
     41         serv.close()
     42 
     43 class GeneralTests(TestCase):
     44 
     45     def setUp(self):
     46         self.evt = threading.Event()
     47         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     48         self.sock.settimeout(60)  # Safety net. Look issue 11812
     49         self.port = test_support.bind_port(self.sock)
     50         self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
     51         self.thread.setDaemon(True)
     52         self.thread.start()
     53         self.evt.wait()
     54 
     55     def tearDown(self):
     56         self.thread.join()
     57 
     58     def testBasic(self):
     59         # connects
     60         telnet = telnetlib.Telnet(HOST, self.port)
     61         telnet.sock.close()
     62 
     63     def testTimeoutDefault(self):
     64         self.assertTrue(socket.getdefaulttimeout() is None)
     65         socket.setdefaulttimeout(30)
     66         try:
     67             telnet = telnetlib.Telnet(HOST, self.port)
     68         finally:
     69             socket.setdefaulttimeout(None)
     70         self.assertEqual(telnet.sock.gettimeout(), 30)
     71         telnet.sock.close()
     72 
     73     def testTimeoutNone(self):
     74         # None, having other default
     75         self.assertTrue(socket.getdefaulttimeout() is None)
     76         socket.setdefaulttimeout(30)
     77         try:
     78             telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
     79         finally:
     80             socket.setdefaulttimeout(None)
     81         self.assertTrue(telnet.sock.gettimeout() is None)
     82         telnet.sock.close()
     83 
     84     def testTimeoutValue(self):
     85         telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
     86         self.assertEqual(telnet.sock.gettimeout(), 30)
     87         telnet.sock.close()
     88 
     89     def testTimeoutOpen(self):
     90         telnet = telnetlib.Telnet()
     91         telnet.open(HOST, self.port, timeout=30)
     92         self.assertEqual(telnet.sock.gettimeout(), 30)
     93         telnet.sock.close()
     94 
     95     def testGetters(self):
     96         # Test telnet getter methods
     97         telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
     98         t_sock = telnet.sock
     99         self.assertEqual(telnet.get_socket(), t_sock)
    100         self.assertEqual(telnet.fileno(), t_sock.fileno())
    101         telnet.sock.close()
    102 
    103 def _read_setUp(self):
    104     self.evt = threading.Event()
    105     self.dataq = Queue.Queue()
    106     self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    107     self.sock.settimeout(10)
    108     self.port = test_support.bind_port(self.sock)
    109     self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
    110     self.thread.start()
    111     self.evt.wait()
    112 
    113 def _read_tearDown(self):
    114     self.thread.join()
    115 
    116 class ReadTests(TestCase):
    117     setUp = _read_setUp
    118     tearDown = _read_tearDown
    119 
    120     # use a similar approach to testing timeouts as test_timeout.py
    121     # these will never pass 100% but make the fuzz big enough that it is rare
    122     block_long = 0.6
    123     block_short = 0.3
    124     def test_read_until_A(self):
    125         """
    126         read_until(expected, [timeout])
    127           Read until the expected string has been seen, or a timeout is
    128           hit (default is no timeout); may block.
    129         """
    130         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    131         self.dataq.put(want)
    132         telnet = telnetlib.Telnet(HOST, self.port)
    133         self.dataq.join()
    134         data = telnet.read_until('match')
    135         self.assertEqual(data, ''.join(want[:-2]))
    136 
    137     def test_read_until_B(self):
    138         # test the timeout - it does NOT raise socket.timeout
    139         want = ['hello', self.block_long, 'not seen', EOF_sigil]
    140         self.dataq.put(want)
    141         telnet = telnetlib.Telnet(HOST, self.port)
    142         self.dataq.join()
    143         data = telnet.read_until('not seen', self.block_short)
    144         self.assertEqual(data, want[0])
    145         self.assertEqual(telnet.read_all(), 'not seen')
    146 
    147     def test_read_until_with_poll(self):
    148         """Use select.poll() to implement telnet.read_until()."""
    149         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    150         self.dataq.put(want)
    151         telnet = telnetlib.Telnet(HOST, self.port)
    152         if not telnet._has_poll:
    153             raise unittest.SkipTest('select.poll() is required')
    154         telnet._has_poll = True
    155         self.dataq.join()
    156         data = telnet.read_until('match')
    157         self.assertEqual(data, ''.join(want[:-2]))
    158 
    159     def test_read_until_with_select(self):
    160         """Use select.select() to implement telnet.read_until()."""
    161         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    162         self.dataq.put(want)
    163         telnet = telnetlib.Telnet(HOST, self.port)
    164         telnet._has_poll = False
    165         self.dataq.join()
    166         data = telnet.read_until('match')
    167         self.assertEqual(data, ''.join(want[:-2]))
    168 
    169     def test_read_all_A(self):
    170         """
    171         read_all()
    172           Read all data until EOF; may block.
    173         """
    174         want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
    175         self.dataq.put(want)
    176         telnet = telnetlib.Telnet(HOST, self.port)
    177         self.dataq.join()
    178         data = telnet.read_all()
    179         self.assertEqual(data, ''.join(want[:-1]))
    180 
    181     def _test_blocking(self, func):
    182         self.dataq.put([self.block_long, EOF_sigil])
    183         self.dataq.join()
    184         start = time.time()
    185         data = func()
    186         self.assertTrue(self.block_short <= time.time() - start)
    187 
    188     def test_read_all_B(self):
    189         self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
    190 
    191     def test_read_all_C(self):
    192         self.dataq.put([EOF_sigil])
    193         telnet = telnetlib.Telnet(HOST, self.port)
    194         self.dataq.join()
    195         telnet.read_all()
    196         telnet.read_all() # shouldn't raise
    197 
    198     def test_read_some_A(self):
    199         """
    200         read_some()
    201           Read at least one byte or EOF; may block.
    202         """
    203         # test 'at least one byte'
    204         want = ['x' * 500, EOF_sigil]
    205         self.dataq.put(want)
    206         telnet = telnetlib.Telnet(HOST, self.port)
    207         self.dataq.join()
    208         data = telnet.read_all()
    209         self.assertTrue(len(data) >= 1)
    210 
    211     def test_read_some_B(self):
    212         # test EOF
    213         self.dataq.put([EOF_sigil])
    214         telnet = telnetlib.Telnet(HOST, self.port)
    215         self.dataq.join()
    216         self.assertEqual('', telnet.read_some())
    217 
    218     def test_read_some_C(self):
    219         self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
    220 
    221     def _test_read_any_eager_A(self, func_name):
    222         """
    223         read_very_eager()
    224           Read all data available already queued or on the socket,
    225           without blocking.
    226         """
    227         want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
    228         expects = want[1] + want[2]
    229         self.dataq.put(want)
    230         telnet = telnetlib.Telnet(HOST, self.port)
    231         self.dataq.join()
    232         func = getattr(telnet, func_name)
    233         data = ''
    234         while True:
    235             try:
    236                 data += func()
    237                 self.assertTrue(expects.startswith(data))
    238             except EOFError:
    239                 break
    240         self.assertEqual(expects, data)
    241 
    242     def _test_read_any_eager_B(self, func_name):
    243         # test EOF
    244         self.dataq.put([EOF_sigil])
    245         telnet = telnetlib.Telnet(HOST, self.port)
    246         self.dataq.join()
    247         time.sleep(self.block_short)
    248         func = getattr(telnet, func_name)
    249         self.assertRaises(EOFError, func)
    250 
    251     # read_eager and read_very_eager make the same guarantees
    252     # (they behave differently but we only test the guarantees)
    253     def test_read_very_eager_A(self):
    254         self._test_read_any_eager_A('read_very_eager')
    255     def test_read_very_eager_B(self):
    256         self._test_read_any_eager_B('read_very_eager')
    257     def test_read_eager_A(self):
    258         self._test_read_any_eager_A('read_eager')
    259     def test_read_eager_B(self):
    260         self._test_read_any_eager_B('read_eager')
    261     # NB -- we need to test the IAC block which is mentioned in the docstring
    262     # but not in the module docs
    263 
    264     def _test_read_any_lazy_B(self, func_name):
    265         self.dataq.put([EOF_sigil])
    266         telnet = telnetlib.Telnet(HOST, self.port)
    267         self.dataq.join()
    268         func = getattr(telnet, func_name)
    269         telnet.fill_rawq()
    270         self.assertRaises(EOFError, func)
    271 
    272     def test_read_lazy_A(self):
    273         want = ['x' * 100, EOF_sigil]
    274         self.dataq.put(want)
    275         telnet = telnetlib.Telnet(HOST, self.port)
    276         self.dataq.join()
    277         time.sleep(self.block_short)
    278         self.assertEqual('', telnet.read_lazy())
    279         data = ''
    280         while True:
    281             try:
    282                 read_data = telnet.read_lazy()
    283                 data += read_data
    284                 if not read_data:
    285                     telnet.fill_rawq()
    286             except EOFError:
    287                 break
    288             self.assertTrue(want[0].startswith(data))
    289         self.assertEqual(data, want[0])
    290 
    291     def test_read_lazy_B(self):
    292         self._test_read_any_lazy_B('read_lazy')
    293 
    294     def test_read_very_lazy_A(self):
    295         want = ['x' * 100, EOF_sigil]
    296         self.dataq.put(want)
    297         telnet = telnetlib.Telnet(HOST, self.port)
    298         self.dataq.join()
    299         time.sleep(self.block_short)
    300         self.assertEqual('', telnet.read_very_lazy())
    301         data = ''
    302         while True:
    303             try:
    304                 read_data = telnet.read_very_lazy()
    305             except EOFError:
    306                 break
    307             data += read_data
    308             if not read_data:
    309                 telnet.fill_rawq()
    310                 self.assertEqual('', telnet.cookedq)
    311                 telnet.process_rawq()
    312             self.assertTrue(want[0].startswith(data))
    313         self.assertEqual(data, want[0])
    314 
    315     def test_read_very_lazy_B(self):
    316         self._test_read_any_lazy_B('read_very_lazy')
    317 
    318 class nego_collector(object):
    319     def __init__(self, sb_getter=None):
    320         self.seen = ''
    321         self.sb_getter = sb_getter
    322         self.sb_seen = ''
    323 
    324     def do_nego(self, sock, cmd, opt):
    325         self.seen += cmd + opt
    326         if cmd == tl.SE and self.sb_getter:
    327             sb_data = self.sb_getter()
    328             self.sb_seen += sb_data
    329 
    330 tl = telnetlib
    331 class OptionTests(TestCase):
    332     setUp = _read_setUp
    333     tearDown = _read_tearDown
    334     # RFC 854 commands
    335     cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
    336 
    337     def _test_command(self, data):
    338         """ helper for testing IAC + cmd """
    339         self.setUp()
    340         self.dataq.put(data)
    341         telnet = telnetlib.Telnet(HOST, self.port)
    342         self.dataq.join()
    343         nego = nego_collector()
    344         telnet.set_option_negotiation_callback(nego.do_nego)
    345         txt = telnet.read_all()
    346         cmd = nego.seen
    347         self.assertTrue(len(cmd) > 0) # we expect at least one command
    348         self.assertIn(cmd[0], self.cmds)
    349         self.assertEqual(cmd[1], tl.NOOPT)
    350         self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
    351         nego.sb_getter = None # break the nego => telnet cycle
    352         self.tearDown()
    353 
    354     def test_IAC_commands(self):
    355         # reset our setup
    356         self.dataq.put([EOF_sigil])
    357         telnet = telnetlib.Telnet(HOST, self.port)
    358         self.dataq.join()
    359         self.tearDown()
    360 
    361         for cmd in self.cmds:
    362             self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
    363             self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
    364             self._test_command([tl.IAC + cmd, EOF_sigil])
    365         # all at once
    366         self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
    367         self.assertEqual('', telnet.read_sb_data())
    368 
    369     def test_SB_commands(self):
    370         # RFC 855, subnegotiations portion
    371         send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
    372                 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
    373                 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
    374                 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
    375                 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
    376                 EOF_sigil,
    377                ]
    378         self.dataq.put(send)
    379         telnet = telnetlib.Telnet(HOST, self.port)
    380         self.dataq.join()
    381         nego = nego_collector(telnet.read_sb_data)
    382         telnet.set_option_negotiation_callback(nego.do_nego)
    383         txt = telnet.read_all()
    384         self.assertEqual(txt, '')
    385         want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
    386         self.assertEqual(nego.sb_seen, want_sb_data)
    387         self.assertEqual('', telnet.read_sb_data())
    388         nego.sb_getter = None # break the nego => telnet cycle
    389 
    390 
    391 class ExpectTests(TestCase):
    392     def setUp(self):
    393         self.evt = threading.Event()
    394         self.dataq = Queue.Queue()
    395         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    396         self.sock.settimeout(10)
    397         self.port = test_support.bind_port(self.sock)
    398         self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
    399                                                             self.dataq))
    400         self.thread.start()
    401         self.evt.wait()
    402 
    403     def tearDown(self):
    404         self.thread.join()
    405 
    406     # use a similar approach to testing timeouts as test_timeout.py
    407     # these will never pass 100% but make the fuzz big enough that it is rare
    408     block_long = 0.6
    409     block_short = 0.3
    410     def test_expect_A(self):
    411         """
    412         expect(expected, [timeout])
    413           Read until the expected string has been seen, or a timeout is
    414           hit (default is no timeout); may block.
    415         """
    416         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    417         self.dataq.put(want)
    418         telnet = telnetlib.Telnet(HOST, self.port)
    419         self.dataq.join()
    420         (_,_,data) = telnet.expect(['match'])
    421         self.assertEqual(data, ''.join(want[:-2]))
    422 
    423     def test_expect_B(self):
    424         # test the timeout - it does NOT raise socket.timeout
    425         want = ['hello', self.block_long, 'not seen', EOF_sigil]
    426         self.dataq.put(want)
    427         telnet = telnetlib.Telnet(HOST, self.port)
    428         self.dataq.join()
    429         (_,_,data) = telnet.expect(['not seen'], self.block_short)
    430         self.assertEqual(data, want[0])
    431         self.assertEqual(telnet.read_all(), 'not seen')
    432 
    433     def test_expect_with_poll(self):
    434         """Use select.poll() to implement telnet.expect()."""
    435         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    436         self.dataq.put(want)
    437         telnet = telnetlib.Telnet(HOST, self.port)
    438         if not telnet._has_poll:
    439             raise unittest.SkipTest('select.poll() is required')
    440         telnet._has_poll = True
    441         self.dataq.join()
    442         (_,_,data) = telnet.expect(['match'])
    443         self.assertEqual(data, ''.join(want[:-2]))
    444 
    445     def test_expect_with_select(self):
    446         """Use select.select() to implement telnet.expect()."""
    447         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    448         self.dataq.put(want)
    449         telnet = telnetlib.Telnet(HOST, self.port)
    450         telnet._has_poll = False
    451         self.dataq.join()
    452         (_,_,data) = telnet.expect(['match'])
    453         self.assertEqual(data, ''.join(want[:-2]))
    454 
    455 
    456 def test_main(verbose=None):
    457     test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
    458                               ExpectTests)
    459 
    460 if __name__ == '__main__':
    461     test_main()
    462