Home | History | Annotate | Download | only in test
      1 import socket
      2 import telnetlib
      3 import time
      4 import Queue
      5 
      6 from unittest import TestCase
      7 from test import test_support
      8 threading = test_support.import_module('threading')
      9 
     10 HOST = test_support.HOST
     11 EOF_sigil = object()
     12 
     13 def server(evt, serv, dataq=None):
     14     """ Open a tcp server in three steps
     15         1) set evt to true to let the parent know we are ready
     16         2) [optional] if is not False, write the list of data from dataq.get()
     17            to the socket.
     18         3) set evt to true to let the parent know we're done
     19     """
     20     serv.listen(5)
     21     evt.set()
     22     try:
     23         conn, addr = serv.accept()
     24         if dataq:
     25             data = ''
     26             new_data = dataq.get(True, 0.5)
     27             dataq.task_done()
     28             for item in new_data:
     29                 if item == EOF_sigil:
     30                     break
     31                 if type(item) in [int, float]:
     32                     time.sleep(item)
     33                 else:
     34                     data += item
     35                 written = conn.send(data)
     36                 data = data[written:]
     37     except socket.timeout:
     38         pass
     39     else:
     40         conn.close()
     41     finally:
     42         serv.close()
     43         evt.set()
     44 
     45 class GeneralTests(TestCase):
     46 
     47     def setUp(self):
     48         self.evt = threading.Event()
     49         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     50         self.sock.settimeout(3)
     51         self.port = test_support.bind_port(self.sock)
     52         self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
     53         self.thread.start()
     54         self.evt.wait()
     55         self.evt.clear()
     56         time.sleep(.1)
     57 
     58     def tearDown(self):
     59         self.evt.wait()
     60         self.thread.join()
     61 
     62     def testBasic(self):
     63         # connects

     64         telnet = telnetlib.Telnet(HOST, self.port)
     65         telnet.sock.close()
     66 
     67     def testTimeoutDefault(self):
     68         self.assertTrue(socket.getdefaulttimeout() is None)
     69         socket.setdefaulttimeout(30)
     70         try:
     71             telnet = telnetlib.Telnet("localhost", self.port)
     72         finally:
     73             socket.setdefaulttimeout(None)
     74         self.assertEqual(telnet.sock.gettimeout(), 30)
     75         telnet.sock.close()
     76 
     77     def testTimeoutNone(self):
     78         # None, having other default

     79         self.assertTrue(socket.getdefaulttimeout() is None)
     80         socket.setdefaulttimeout(30)
     81         try:
     82             telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
     83         finally:
     84             socket.setdefaulttimeout(None)
     85         self.assertTrue(telnet.sock.gettimeout() is None)
     86         telnet.sock.close()
     87 
     88     def testTimeoutValue(self):
     89         telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
     90         self.assertEqual(telnet.sock.gettimeout(), 30)
     91         telnet.sock.close()
     92 
     93     def testTimeoutOpen(self):
     94         telnet = telnetlib.Telnet()
     95         telnet.open("localhost", self.port, timeout=30)
     96         self.assertEqual(telnet.sock.gettimeout(), 30)
     97         telnet.sock.close()
     98 
     99 def _read_setUp(self):
    100     self.evt = threading.Event()
    101     self.dataq = Queue.Queue()
    102     self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    103     self.sock.settimeout(3)
    104     self.port = test_support.bind_port(self.sock)
    105     self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
    106     self.thread.start()
    107     self.evt.wait()
    108     self.evt.clear()
    109     time.sleep(.1)
    110 
    111 def _read_tearDown(self):
    112     self.evt.wait()
    113     self.thread.join()
    114 
    115 class ReadTests(TestCase):
    116     setUp = _read_setUp
    117     tearDown = _read_tearDown
    118 
    119     # use a similar approach to testing timeouts as test_timeout.py

    120     # these will never pass 100% but make the fuzz big enough that it is rare

    121     block_long = 0.6
    122     block_short = 0.3
    123     def test_read_until_A(self):
    124         """
    125         read_until(expected, [timeout])
    126           Read until the expected string has been seen, or a timeout is
    127           hit (default is no timeout); may block.
    128         """
    129         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
    130         self.dataq.put(want)
    131         telnet = telnetlib.Telnet(HOST, self.port)
    132         self.dataq.join()
    133         data = telnet.read_until('match')
    134         self.assertEqual(data, ''.join(want[:-2]))
    135 
    136     def test_read_until_B(self):
    137         # test the timeout - it does NOT raise socket.timeout

    138         want = ['hello', self.block_long, 'not seen', EOF_sigil]
    139         self.dataq.put(want)
    140         telnet = telnetlib.Telnet(HOST, self.port)
    141         self.dataq.join()
    142         data = telnet.read_until('not seen', self.block_short)
    143         self.assertEqual(data, want[0])
    144         self.assertEqual(telnet.read_all(), 'not seen')
    145 
    146     def test_read_all_A(self):
    147         """
    148         read_all()
    149           Read all data until EOF; may block.
    150         """
    151         want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
    152         self.dataq.put(want)
    153         telnet = telnetlib.Telnet(HOST, self.port)
    154         self.dataq.join()
    155         data = telnet.read_all()
    156         self.assertEqual(data, ''.join(want[:-1]))
    157         return
    158 
    159     def _test_blocking(self, func):
    160         self.dataq.put([self.block_long, EOF_sigil])
    161         self.dataq.join()
    162         start = time.time()
    163         data = func()
    164         self.assertTrue(self.block_short <= time.time() - start)
    165 
    166     def test_read_all_B(self):
    167         self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
    168 
    169     def test_read_all_C(self):
    170         self.dataq.put([EOF_sigil])
    171         telnet = telnetlib.Telnet(HOST, self.port)
    172         self.dataq.join()
    173         telnet.read_all()
    174         telnet.read_all() # shouldn't raise

    175 
    176     def test_read_some_A(self):
    177         """
    178         read_some()
    179           Read at least one byte or EOF; may block.
    180         """
    181         # test 'at least one byte'

    182         want = ['x' * 500, EOF_sigil]
    183         self.dataq.put(want)
    184         telnet = telnetlib.Telnet(HOST, self.port)
    185         self.dataq.join()
    186         data = telnet.read_all()
    187         self.assertTrue(len(data) >= 1)
    188 
    189     def test_read_some_B(self):
    190         # test EOF

    191         self.dataq.put([EOF_sigil])
    192         telnet = telnetlib.Telnet(HOST, self.port)
    193         self.dataq.join()
    194         self.assertEqual('', telnet.read_some())
    195 
    196     def test_read_some_C(self):
    197         self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
    198 
    199     def _test_read_any_eager_A(self, func_name):
    200         """
    201         read_very_eager()
    202           Read all data available already queued or on the socket,
    203           without blocking.
    204         """
    205         want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
    206         expects = want[1] + want[2]
    207         self.dataq.put(want)
    208         telnet = telnetlib.Telnet(HOST, self.port)
    209         self.dataq.join()
    210         func = getattr(telnet, func_name)
    211         data = ''
    212         while True:
    213             try:
    214                 data += func()
    215                 self.assertTrue(expects.startswith(data))
    216             except EOFError:
    217                 break
    218         self.assertEqual(expects, data)
    219 
    220     def _test_read_any_eager_B(self, func_name):
    221         # test EOF

    222         self.dataq.put([EOF_sigil])
    223         telnet = telnetlib.Telnet(HOST, self.port)
    224         self.dataq.join()
    225         time.sleep(self.block_short)
    226         func = getattr(telnet, func_name)
    227         self.assertRaises(EOFError, func)
    228 
    229     # read_eager and read_very_eager make the same gaurantees

    230     # (they behave differently but we only test the gaurantees)

    231     def test_read_very_eager_A(self):
    232         self._test_read_any_eager_A('read_very_eager')
    233     def test_read_very_eager_B(self):
    234         self._test_read_any_eager_B('read_very_eager')
    235     def test_read_eager_A(self):
    236         self._test_read_any_eager_A('read_eager')
    237     def test_read_eager_B(self):
    238         self._test_read_any_eager_B('read_eager')
    239     # NB -- we need to test the IAC block which is mentioned in the docstring

    240     # but not in the module docs

    241 
    242     def _test_read_any_lazy_B(self, func_name):
    243         self.dataq.put([EOF_sigil])
    244         telnet = telnetlib.Telnet(HOST, self.port)
    245         self.dataq.join()
    246         func = getattr(telnet, func_name)
    247         telnet.fill_rawq()
    248         self.assertRaises(EOFError, func)
    249 
    250     def test_read_lazy_A(self):
    251         want = ['x' * 100, EOF_sigil]
    252         self.dataq.put(want)
    253         telnet = telnetlib.Telnet(HOST, self.port)
    254         self.dataq.join()
    255         time.sleep(self.block_short)
    256         self.assertEqual('', telnet.read_lazy())
    257         data = ''
    258         while True:
    259             try:
    260                 read_data = telnet.read_lazy()
    261                 data += read_data
    262                 if not read_data:
    263                     telnet.fill_rawq()
    264             except EOFError:
    265                 break
    266             self.assertTrue(want[0].startswith(data))
    267         self.assertEqual(data, want[0])
    268 
    269     def test_read_lazy_B(self):
    270         self._test_read_any_lazy_B('read_lazy')
    271 
    272     def test_read_very_lazy_A(self):
    273         want = ['x' * 100, EOF_sigil]
    274         self.dataq.put(want)
    275         telnet = telnetlib.Telnet(HOST, self.port)
    276         self.dataq.join()
    277         time.sleep(self.block_short)
    278         self.assertEqual('', telnet.read_very_lazy())
    279         data = ''
    280         while True:
    281             try:
    282                 read_data = telnet.read_very_lazy()
    283             except EOFError:
    284                 break
    285             data += read_data
    286             if not read_data:
    287                 telnet.fill_rawq()
    288                 self.assertEqual('', telnet.cookedq)
    289                 telnet.process_rawq()
    290             self.assertTrue(want[0].startswith(data))
    291         self.assertEqual(data, want[0])
    292 
    293     def test_read_very_lazy_B(self):
    294         self._test_read_any_lazy_B('read_very_lazy')
    295 
    296 class nego_collector(object):
    297     def __init__(self, sb_getter=None):
    298         self.seen = ''
    299         self.sb_getter = sb_getter
    300         self.sb_seen = ''
    301 
    302     def do_nego(self, sock, cmd, opt):
    303         self.seen += cmd + opt
    304         if cmd == tl.SE and self.sb_getter:
    305             sb_data = self.sb_getter()
    306             self.sb_seen += sb_data
    307 
    308 tl = telnetlib
    309 class OptionTests(TestCase):
    310     setUp = _read_setUp
    311     tearDown = _read_tearDown
    312     # RFC 854 commands

    313     cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
    314 
    315     def _test_command(self, data):
    316         """ helper for testing IAC + cmd """
    317         self.setUp()
    318         self.dataq.put(data)
    319         telnet = telnetlib.Telnet(HOST, self.port)
    320         self.dataq.join()
    321         nego = nego_collector()
    322         telnet.set_option_negotiation_callback(nego.do_nego)
    323         txt = telnet.read_all()
    324         cmd = nego.seen
    325         self.assertTrue(len(cmd) > 0) # we expect at least one command

    326         self.assertIn(cmd[0], self.cmds)
    327         self.assertEqual(cmd[1], tl.NOOPT)
    328         self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
    329         nego.sb_getter = None # break the nego => telnet cycle

    330         self.tearDown()
    331 
    332     def test_IAC_commands(self):
    333         # reset our setup

    334         self.dataq.put([EOF_sigil])
    335         telnet = telnetlib.Telnet(HOST, self.port)
    336         self.dataq.join()
    337         self.tearDown()
    338 
    339         for cmd in self.cmds:
    340             self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
    341             self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
    342             self._test_command([tl.IAC + cmd, EOF_sigil])
    343         # all at once

    344         self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
    345         self.assertEqual('', telnet.read_sb_data())
    346 
    347     def test_SB_commands(self):
    348         # RFC 855, subnegotiations portion

    349         send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
    350                 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
    351                 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
    352                 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
    353                 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
    354                 EOF_sigil,
    355                ]
    356         self.dataq.put(send)
    357         telnet = telnetlib.Telnet(HOST, self.port)
    358         self.dataq.join()
    359         nego = nego_collector(telnet.read_sb_data)
    360         telnet.set_option_negotiation_callback(nego.do_nego)
    361         txt = telnet.read_all()
    362         self.assertEqual(txt, '')
    363         want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
    364         self.assertEqual(nego.sb_seen, want_sb_data)
    365         self.assertEqual('', telnet.read_sb_data())
    366         nego.sb_getter = None # break the nego => telnet cycle

    367 
    368 def test_main(verbose=None):
    369     test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
    370 
    371 if __name__ == '__main__':
    372     test_main()
    373