Home | History | Annotate | Download | only in test
      1 import socket
      2 import telnetlib
      3 import time
      4 import Queue
      5 
      6 import unittest
      7 from unittest import TestCase
      8 from test import test_support
      9 threading = test_support.import_module('threading')
     10 
     11 HOST = test_support.HOST
     12 EOF_sigil = object()
     13 
     14 def server(evt, serv, dataq=None):
     15     """ Open a tcp server in three steps
     16         1) set evt to true to let the parent know we are ready
     17         2) [optional] if is not False, write the list of data from dataq.get()
     18            to the socket.
     19     """
     20     serv.listen(5)
     21     evt.set()
     22     try:
     23         conn, addr = serv.accept()
     24         if dataq:
     25             data = ''
     26             new_data = dataq.get(True, 0.5)
     27             dataq.task_done()
     28             for item in new_data:
     29                 if item == EOF_sigil:
     30                     break
     31                 if type(item) in [int, float]:
     32                     time.sleep(item)
     33                 else:
     34                     data += item
     35                 written = conn.send(data)
     36                 data = data[written:]
     37         conn.close()
     38     except socket.timeout:
     39         pass
     40     finally:
     41         serv.close()
     42 
     43 class GeneralTests(TestCase):
     44 
     45     def setUp(self):
     46         self.evt = threading.Event()
     47         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     48         self.sock.settimeout(60)  # Safety net. Look issue 11812
     49         self.port = test_support.bind_port(self.sock)
     50         self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
     51         self.thread.setDaemon(True)
     52         self.thread.start()
     53         self.evt.wait()
     54 
     55     def tearDown(self):
     56         self.thread.join()
     57 
     58     def testBasic(self):
     59         # connects
     60         telnet = telnetlib.Telnet(HOST, self.port)
     61         telnet.sock.close()
     62 
     63     def testTimeoutDefault(self):
     64         self.assertTrue(socket.getdefaulttimeout() is None)
     65         socket.setdefaulttimeout(30)
     66         try:
     67             telnet = telnetlib.Telnet(HOST, self.port)
     68         finally:
     69             socket.setdefaulttimeout(None)
     70         self.assertEqual(telnet.sock.gettimeout(), 30)
     71         telnet.sock.close()
     72 
     73     def testTimeoutNone(self):
     74         # None, having other default
     75         self.assertTrue(socket.getdefaulttimeout() is None)
     76         socket.setdefaulttimeout(30)
     77         try:
     78             telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
     79         finally:
     80             socket.setdefaulttimeout(None)
     81         self.assertTrue(telnet.sock.gettimeout() is None)
     82         telnet.sock.close()
     83 
     84     def testTimeoutValue(self):
     85         telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
     86         self.assertEqual(telnet.sock.gettimeout(), 30)
     87         telnet.sock.close()
     88 
     89     def testTimeoutOpen(self):
     90         telnet = telnetlib.Telnet()
     91         telnet.open(HOST, self.port, timeout=30)
     92         self.assertEqual(telnet.sock.gettimeout(), 30)
     93         telnet.sock.close()
     94 
     95 def _read_setUp(self):
     96     self.evt = threading.Event()
     97     self.dataq = Queue.Queue()
     98     self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     99     self.sock.settimeout(10)
    100     self.port = test_support.bind_port(self.sock)
    101     self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
    102     self.thread.start()
    103     self.evt.wait()
    104 
    105 def _read_tearDown(self):
    106     self.thread.join()
    107 
    108 class ReadTests(TestCase):
    109     setUp = _read_setUp
    110     tearDown = _read_tearDown
    111 
    112     # use a similar approach to testing timeouts as test_timeout.py
    113     # these will never pass 100% but make the fuzz big enough that it is rare
    114     block_long = 0.6
    115     block_short = 0.3
    116     def test_read_until_A(self):
    117         """
    118         read_until(expected, [timeout])
    119           Read until the expected string has been seen, or a timeout is
    120           hit (default is no timeout); may block.
    121         """
    122         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    123         self.dataq.put(want)
    124         telnet = telnetlib.Telnet(HOST, self.port)
    125         self.dataq.join()
    126         data = telnet.read_until('match')
    127         self.assertEqual(data, ''.join(want[:-2]))
    128 
    129     def test_read_until_B(self):
    130         # test the timeout - it does NOT raise socket.timeout
    131         want = ['hello', self.block_long, 'not seen', EOF_sigil]
    132         self.dataq.put(want)
    133         telnet = telnetlib.Telnet(HOST, self.port)
    134         self.dataq.join()
    135         data = telnet.read_until('not seen', self.block_short)
    136         self.assertEqual(data, want[0])
    137         self.assertEqual(telnet.read_all(), 'not seen')
    138 
    139     def test_read_until_with_poll(self):
    140         """Use select.poll() to implement telnet.read_until()."""
    141         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    142         self.dataq.put(want)
    143         telnet = telnetlib.Telnet(HOST, self.port)
    144         if not telnet._has_poll:
    145             raise unittest.SkipTest('select.poll() is required')
    146         telnet._has_poll = True
    147         self.dataq.join()
    148         data = telnet.read_until('match')
    149         self.assertEqual(data, ''.join(want[:-2]))
    150 
    151     def test_read_until_with_select(self):
    152         """Use select.select() to implement telnet.read_until()."""
    153         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    154         self.dataq.put(want)
    155         telnet = telnetlib.Telnet(HOST, self.port)
    156         telnet._has_poll = False
    157         self.dataq.join()
    158         data = telnet.read_until('match')
    159         self.assertEqual(data, ''.join(want[:-2]))
    160 
    161     def test_read_all_A(self):
    162         """
    163         read_all()
    164           Read all data until EOF; may block.
    165         """
    166         want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
    167         self.dataq.put(want)
    168         telnet = telnetlib.Telnet(HOST, self.port)
    169         self.dataq.join()
    170         data = telnet.read_all()
    171         self.assertEqual(data, ''.join(want[:-1]))
    172         return
    173 
    174     def _test_blocking(self, func):
    175         self.dataq.put([self.block_long, EOF_sigil])
    176         self.dataq.join()
    177         start = time.time()
    178         data = func()
    179         self.assertTrue(self.block_short <= time.time() - start)
    180 
    181     def test_read_all_B(self):
    182         self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
    183 
    184     def test_read_all_C(self):
    185         self.dataq.put([EOF_sigil])
    186         telnet = telnetlib.Telnet(HOST, self.port)
    187         self.dataq.join()
    188         telnet.read_all()
    189         telnet.read_all() # shouldn't raise
    190 
    191     def test_read_some_A(self):
    192         """
    193         read_some()
    194           Read at least one byte or EOF; may block.
    195         """
    196         # test 'at least one byte'
    197         want = ['x' * 500, EOF_sigil]
    198         self.dataq.put(want)
    199         telnet = telnetlib.Telnet(HOST, self.port)
    200         self.dataq.join()
    201         data = telnet.read_all()
    202         self.assertTrue(len(data) >= 1)
    203 
    204     def test_read_some_B(self):
    205         # test EOF
    206         self.dataq.put([EOF_sigil])
    207         telnet = telnetlib.Telnet(HOST, self.port)
    208         self.dataq.join()
    209         self.assertEqual('', telnet.read_some())
    210 
    211     def test_read_some_C(self):
    212         self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
    213 
    214     def _test_read_any_eager_A(self, func_name):
    215         """
    216         read_very_eager()
    217           Read all data available already queued or on the socket,
    218           without blocking.
    219         """
    220         want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
    221         expects = want[1] + want[2]
    222         self.dataq.put(want)
    223         telnet = telnetlib.Telnet(HOST, self.port)
    224         self.dataq.join()
    225         func = getattr(telnet, func_name)
    226         data = ''
    227         while True:
    228             try:
    229                 data += func()
    230                 self.assertTrue(expects.startswith(data))
    231             except EOFError:
    232                 break
    233         self.assertEqual(expects, data)
    234 
    235     def _test_read_any_eager_B(self, func_name):
    236         # test EOF
    237         self.dataq.put([EOF_sigil])
    238         telnet = telnetlib.Telnet(HOST, self.port)
    239         self.dataq.join()
    240         time.sleep(self.block_short)
    241         func = getattr(telnet, func_name)
    242         self.assertRaises(EOFError, func)
    243 
    244     # read_eager and read_very_eager make the same gaurantees
    245     # (they behave differently but we only test the gaurantees)
    246     def test_read_very_eager_A(self):
    247         self._test_read_any_eager_A('read_very_eager')
    248     def test_read_very_eager_B(self):
    249         self._test_read_any_eager_B('read_very_eager')
    250     def test_read_eager_A(self):
    251         self._test_read_any_eager_A('read_eager')
    252     def test_read_eager_B(self):
    253         self._test_read_any_eager_B('read_eager')
    254     # NB -- we need to test the IAC block which is mentioned in the docstring
    255     # but not in the module docs
    256 
    257     def _test_read_any_lazy_B(self, func_name):
    258         self.dataq.put([EOF_sigil])
    259         telnet = telnetlib.Telnet(HOST, self.port)
    260         self.dataq.join()
    261         func = getattr(telnet, func_name)
    262         telnet.fill_rawq()
    263         self.assertRaises(EOFError, func)
    264 
    265     def test_read_lazy_A(self):
    266         want = ['x' * 100, EOF_sigil]
    267         self.dataq.put(want)
    268         telnet = telnetlib.Telnet(HOST, self.port)
    269         self.dataq.join()
    270         time.sleep(self.block_short)
    271         self.assertEqual('', telnet.read_lazy())
    272         data = ''
    273         while True:
    274             try:
    275                 read_data = telnet.read_lazy()
    276                 data += read_data
    277                 if not read_data:
    278                     telnet.fill_rawq()
    279             except EOFError:
    280                 break
    281             self.assertTrue(want[0].startswith(data))
    282         self.assertEqual(data, want[0])
    283 
    284     def test_read_lazy_B(self):
    285         self._test_read_any_lazy_B('read_lazy')
    286 
    287     def test_read_very_lazy_A(self):
    288         want = ['x' * 100, EOF_sigil]
    289         self.dataq.put(want)
    290         telnet = telnetlib.Telnet(HOST, self.port)
    291         self.dataq.join()
    292         time.sleep(self.block_short)
    293         self.assertEqual('', telnet.read_very_lazy())
    294         data = ''
    295         while True:
    296             try:
    297                 read_data = telnet.read_very_lazy()
    298             except EOFError:
    299                 break
    300             data += read_data
    301             if not read_data:
    302                 telnet.fill_rawq()
    303                 self.assertEqual('', telnet.cookedq)
    304                 telnet.process_rawq()
    305             self.assertTrue(want[0].startswith(data))
    306         self.assertEqual(data, want[0])
    307 
    308     def test_read_very_lazy_B(self):
    309         self._test_read_any_lazy_B('read_very_lazy')
    310 
    311 class nego_collector(object):
    312     def __init__(self, sb_getter=None):
    313         self.seen = ''
    314         self.sb_getter = sb_getter
    315         self.sb_seen = ''
    316 
    317     def do_nego(self, sock, cmd, opt):
    318         self.seen += cmd + opt
    319         if cmd == tl.SE and self.sb_getter:
    320             sb_data = self.sb_getter()
    321             self.sb_seen += sb_data
    322 
    323 tl = telnetlib
    324 class OptionTests(TestCase):
    325     setUp = _read_setUp
    326     tearDown = _read_tearDown
    327     # RFC 854 commands
    328     cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
    329 
    330     def _test_command(self, data):
    331         """ helper for testing IAC + cmd """
    332         self.setUp()
    333         self.dataq.put(data)
    334         telnet = telnetlib.Telnet(HOST, self.port)
    335         self.dataq.join()
    336         nego = nego_collector()
    337         telnet.set_option_negotiation_callback(nego.do_nego)
    338         txt = telnet.read_all()
    339         cmd = nego.seen
    340         self.assertTrue(len(cmd) > 0) # we expect at least one command
    341         self.assertIn(cmd[0], self.cmds)
    342         self.assertEqual(cmd[1], tl.NOOPT)
    343         self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
    344         nego.sb_getter = None # break the nego => telnet cycle
    345         self.tearDown()
    346 
    347     def test_IAC_commands(self):
    348         # reset our setup
    349         self.dataq.put([EOF_sigil])
    350         telnet = telnetlib.Telnet(HOST, self.port)
    351         self.dataq.join()
    352         self.tearDown()
    353 
    354         for cmd in self.cmds:
    355             self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
    356             self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
    357             self._test_command([tl.IAC + cmd, EOF_sigil])
    358         # all at once
    359         self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
    360         self.assertEqual('', telnet.read_sb_data())
    361 
    362     def test_SB_commands(self):
    363         # RFC 855, subnegotiations portion
    364         send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
    365                 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
    366                 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
    367                 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
    368                 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
    369                 EOF_sigil,
    370                ]
    371         self.dataq.put(send)
    372         telnet = telnetlib.Telnet(HOST, self.port)
    373         self.dataq.join()
    374         nego = nego_collector(telnet.read_sb_data)
    375         telnet.set_option_negotiation_callback(nego.do_nego)
    376         txt = telnet.read_all()
    377         self.assertEqual(txt, '')
    378         want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
    379         self.assertEqual(nego.sb_seen, want_sb_data)
    380         self.assertEqual('', telnet.read_sb_data())
    381         nego.sb_getter = None # break the nego => telnet cycle
    382 
    383 
    384 class ExpectTests(TestCase):
    385     def setUp(self):
    386         self.evt = threading.Event()
    387         self.dataq = Queue.Queue()
    388         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    389         self.sock.settimeout(10)
    390         self.port = test_support.bind_port(self.sock)
    391         self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
    392                                                             self.dataq))
    393         self.thread.start()
    394         self.evt.wait()
    395 
    396     def tearDown(self):
    397         self.thread.join()
    398 
    399     # use a similar approach to testing timeouts as test_timeout.py
    400     # these will never pass 100% but make the fuzz big enough that it is rare
    401     block_long = 0.6
    402     block_short = 0.3
    403     def test_expect_A(self):
    404         """
    405         expect(expected, [timeout])
    406           Read until the expected string has been seen, or a timeout is
    407           hit (default is no timeout); may block.
    408         """
    409         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    410         self.dataq.put(want)
    411         telnet = telnetlib.Telnet(HOST, self.port)
    412         self.dataq.join()
    413         (_,_,data) = telnet.expect(['match'])
    414         self.assertEqual(data, ''.join(want[:-2]))
    415 
    416     def test_expect_B(self):
    417         # test the timeout - it does NOT raise socket.timeout
    418         want = ['hello', self.block_long, 'not seen', EOF_sigil]
    419         self.dataq.put(want)
    420         telnet = telnetlib.Telnet(HOST, self.port)
    421         self.dataq.join()
    422         (_,_,data) = telnet.expect(['not seen'], self.block_short)
    423         self.assertEqual(data, want[0])
    424         self.assertEqual(telnet.read_all(), 'not seen')
    425 
    426     def test_expect_with_poll(self):
    427         """Use select.poll() to implement telnet.expect()."""
    428         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    429         self.dataq.put(want)
    430         telnet = telnetlib.Telnet(HOST, self.port)
    431         if not telnet._has_poll:
    432             raise unittest.SkipTest('select.poll() is required')
    433         telnet._has_poll = True
    434         self.dataq.join()
    435         (_,_,data) = telnet.expect(['match'])
    436         self.assertEqual(data, ''.join(want[:-2]))
    437 
    438     def test_expect_with_select(self):
    439         """Use select.select() to implement telnet.expect()."""
    440         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    441         self.dataq.put(want)
    442         telnet = telnetlib.Telnet(HOST, self.port)
    443         telnet._has_poll = False
    444         self.dataq.join()
    445         (_,_,data) = telnet.expect(['match'])
    446         self.assertEqual(data, ''.join(want[:-2]))
    447 
    448 
    449 def test_main(verbose=None):
    450     test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
    451                               ExpectTests)
    452 
    453 if __name__ == '__main__':
    454     test_main()
    455