Home | History | Annotate | Download | only in test
      1 import socket
      2 import selectors
      3 import telnetlib
      4 import contextlib
      5 
      6 from test import support
      7 import unittest
      8 threading = support.import_module('threading')
      9 
     10 HOST = support.HOST
     11 
     12 def server(evt, serv):
     13     serv.listen()
     14     evt.set()
     15     try:
     16         conn, addr = serv.accept()
     17         conn.close()
     18     except socket.timeout:
     19         pass
     20     finally:
     21         serv.close()
     22 
     23 class GeneralTests(unittest.TestCase):
     24 
     25     def setUp(self):
     26         self.evt = threading.Event()
     27         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     28         self.sock.settimeout(60)  # Safety net. Look issue 11812
     29         self.port = support.bind_port(self.sock)
     30         self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
     31         self.thread.setDaemon(True)
     32         self.thread.start()
     33         self.evt.wait()
     34 
     35     def tearDown(self):
     36         self.thread.join()
     37         del self.thread  # Clear out any dangling Thread objects.
     38 
     39     def testBasic(self):
     40         # connects
     41         telnet = telnetlib.Telnet(HOST, self.port)
     42         telnet.sock.close()
     43 
     44     def testContextManager(self):
     45         with telnetlib.Telnet(HOST, self.port) as tn:
     46             self.assertIsNotNone(tn.get_socket())
     47         self.assertIsNone(tn.get_socket())
     48 
     49     def testTimeoutDefault(self):
     50         self.assertTrue(socket.getdefaulttimeout() is None)
     51         socket.setdefaulttimeout(30)
     52         try:
     53             telnet = telnetlib.Telnet(HOST, self.port)
     54         finally:
     55             socket.setdefaulttimeout(None)
     56         self.assertEqual(telnet.sock.gettimeout(), 30)
     57         telnet.sock.close()
     58 
     59     def testTimeoutNone(self):
     60         # None, having other default
     61         self.assertTrue(socket.getdefaulttimeout() is None)
     62         socket.setdefaulttimeout(30)
     63         try:
     64             telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
     65         finally:
     66             socket.setdefaulttimeout(None)
     67         self.assertTrue(telnet.sock.gettimeout() is None)
     68         telnet.sock.close()
     69 
     70     def testTimeoutValue(self):
     71         telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
     72         self.assertEqual(telnet.sock.gettimeout(), 30)
     73         telnet.sock.close()
     74 
     75     def testTimeoutOpen(self):
     76         telnet = telnetlib.Telnet()
     77         telnet.open(HOST, self.port, timeout=30)
     78         self.assertEqual(telnet.sock.gettimeout(), 30)
     79         telnet.sock.close()
     80 
     81     def testGetters(self):
     82         # Test telnet getter methods
     83         telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
     84         t_sock = telnet.sock
     85         self.assertEqual(telnet.get_socket(), t_sock)
     86         self.assertEqual(telnet.fileno(), t_sock.fileno())
     87         telnet.sock.close()
     88 
     89 class SocketStub(object):
     90     ''' a socket proxy that re-defines sendall() '''
     91     def __init__(self, reads=()):
     92         self.reads = list(reads)  # Intentionally make a copy.
     93         self.writes = []
     94         self.block = False
     95     def sendall(self, data):
     96         self.writes.append(data)
     97     def recv(self, size):
     98         out = b''
     99         while self.reads and len(out) < size:
    100             out += self.reads.pop(0)
    101         if len(out) > size:
    102             self.reads.insert(0, out[size:])
    103             out = out[:size]
    104         return out
    105 
    106 class TelnetAlike(telnetlib.Telnet):
    107     def fileno(self):
    108         raise NotImplementedError()
    109     def close(self): pass
    110     def sock_avail(self):
    111         return (not self.sock.block)
    112     def msg(self, msg, *args):
    113         with support.captured_stdout() as out:
    114             telnetlib.Telnet.msg(self, msg, *args)
    115         self._messages += out.getvalue()
    116         return
    117 
    118 class MockSelector(selectors.BaseSelector):
    119 
    120     def __init__(self):
    121         self.keys = {}
    122 
    123     @property
    124     def resolution(self):
    125         return 1e-3
    126 
    127     def register(self, fileobj, events, data=None):
    128         key = selectors.SelectorKey(fileobj, 0, events, data)
    129         self.keys[fileobj] = key
    130         return key
    131 
    132     def unregister(self, fileobj):
    133         return self.keys.pop(fileobj)
    134 
    135     def select(self, timeout=None):
    136         block = False
    137         for fileobj in self.keys:
    138             if isinstance(fileobj, TelnetAlike):
    139                 block = fileobj.sock.block
    140                 break
    141         if block:
    142             return []
    143         else:
    144             return [(key, key.events) for key in self.keys.values()]
    145 
    146     def get_map(self):
    147         return self.keys
    148 
    149 
    150 @contextlib.contextmanager
    151 def test_socket(reads):
    152     def new_conn(*ignored):
    153         return SocketStub(reads)
    154     try:
    155         old_conn = socket.create_connection
    156         socket.create_connection = new_conn
    157         yield None
    158     finally:
    159         socket.create_connection = old_conn
    160     return
    161 
    162 def test_telnet(reads=(), cls=TelnetAlike):
    163     ''' return a telnetlib.Telnet object that uses a SocketStub with
    164         reads queued up to be read '''
    165     for x in reads:
    166         assert type(x) is bytes, x
    167     with test_socket(reads):
    168         telnet = cls('dummy', 0)
    169         telnet._messages = '' # debuglevel output
    170     return telnet
    171 
    172 class ExpectAndReadTestCase(unittest.TestCase):
    173     def setUp(self):
    174         self.old_selector = telnetlib._TelnetSelector
    175         telnetlib._TelnetSelector = MockSelector
    176     def tearDown(self):
    177         telnetlib._TelnetSelector = self.old_selector
    178 
    179 class ReadTests(ExpectAndReadTestCase):
    180     def test_read_until(self):
    181         """
    182         read_until(expected, timeout=None)
    183         test the blocking version of read_util
    184         """
    185         want = [b'xxxmatchyyy']
    186         telnet = test_telnet(want)
    187         data = telnet.read_until(b'match')
    188         self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads))
    189 
    190         reads = [b'x' * 50, b'match', b'y' * 50]
    191         expect = b''.join(reads[:-1])
    192         telnet = test_telnet(reads)
    193         data = telnet.read_until(b'match')
    194         self.assertEqual(data, expect)
    195 
    196 
    197     def test_read_all(self):
    198         """
    199         read_all()
    200           Read all data until EOF; may block.
    201         """
    202         reads = [b'x' * 500, b'y' * 500, b'z' * 500]
    203         expect = b''.join(reads)
    204         telnet = test_telnet(reads)
    205         data = telnet.read_all()
    206         self.assertEqual(data, expect)
    207         return
    208 
    209     def test_read_some(self):
    210         """
    211         read_some()
    212           Read at least one byte or EOF; may block.
    213         """
    214         # test 'at least one byte'
    215         telnet = test_telnet([b'x' * 500])
    216         data = telnet.read_some()
    217         self.assertTrue(len(data) >= 1)
    218         # test EOF
    219         telnet = test_telnet()
    220         data = telnet.read_some()
    221         self.assertEqual(b'', data)
    222 
    223     def _read_eager(self, func_name):
    224         """
    225         read_*_eager()
    226           Read all data available already queued or on the socket,
    227           without blocking.
    228         """
    229         want = b'x' * 100
    230         telnet = test_telnet([want])
    231         func = getattr(telnet, func_name)
    232         telnet.sock.block = True
    233         self.assertEqual(b'', func())
    234         telnet.sock.block = False
    235         data = b''
    236         while True:
    237             try:
    238                 data += func()
    239             except EOFError:
    240                 break
    241         self.assertEqual(data, want)
    242 
    243     def test_read_eager(self):
    244         # read_eager and read_very_eager make the same guarantees
    245         # (they behave differently but we only test the guarantees)
    246         self._read_eager('read_eager')
    247         self._read_eager('read_very_eager')
    248         # NB -- we need to test the IAC block which is mentioned in the
    249         # docstring but not in the module docs
    250 
    251     def read_very_lazy(self):
    252         want = b'x' * 100
    253         telnet = test_telnet([want])
    254         self.assertEqual(b'', telnet.read_very_lazy())
    255         while telnet.sock.reads:
    256             telnet.fill_rawq()
    257         data = telnet.read_very_lazy()
    258         self.assertEqual(want, data)
    259         self.assertRaises(EOFError, telnet.read_very_lazy)
    260 
    261     def test_read_lazy(self):
    262         want = b'x' * 100
    263         telnet = test_telnet([want])
    264         self.assertEqual(b'', telnet.read_lazy())
    265         data = b''
    266         while True:
    267             try:
    268                 read_data = telnet.read_lazy()
    269                 data += read_data
    270                 if not read_data:
    271                     telnet.fill_rawq()
    272             except EOFError:
    273                 break
    274             self.assertTrue(want.startswith(data))
    275         self.assertEqual(data, want)
    276 
    277 class nego_collector(object):
    278     def __init__(self, sb_getter=None):
    279         self.seen = b''
    280         self.sb_getter = sb_getter
    281         self.sb_seen = b''
    282 
    283     def do_nego(self, sock, cmd, opt):
    284         self.seen += cmd + opt
    285         if cmd == tl.SE and self.sb_getter:
    286             sb_data = self.sb_getter()
    287             self.sb_seen += sb_data
    288 
    289 tl = telnetlib
    290 
    291 class WriteTests(unittest.TestCase):
    292     '''The only thing that write does is replace each tl.IAC for
    293     tl.IAC+tl.IAC'''
    294 
    295     def test_write(self):
    296         data_sample = [b'data sample without IAC',
    297                        b'data sample with' + tl.IAC + b' one IAC',
    298                        b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC,
    299                        tl.IAC,
    300                        b'']
    301         for data in data_sample:
    302             telnet = test_telnet()
    303             telnet.write(data)
    304             written = b''.join(telnet.sock.writes)
    305             self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written)
    306 
    307 class OptionTests(unittest.TestCase):
    308     # RFC 854 commands
    309     cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
    310 
    311     def _test_command(self, data):
    312         """ helper for testing IAC + cmd """
    313         telnet = test_telnet(data)
    314         data_len = len(b''.join(data))
    315         nego = nego_collector()
    316         telnet.set_option_negotiation_callback(nego.do_nego)
    317         txt = telnet.read_all()
    318         cmd = nego.seen
    319         self.assertTrue(len(cmd) > 0) # we expect at least one command
    320         self.assertIn(cmd[:1], self.cmds)
    321         self.assertEqual(cmd[1:2], tl.NOOPT)
    322         self.assertEqual(data_len, len(txt + cmd))
    323         nego.sb_getter = None # break the nego => telnet cycle
    324 
    325     def test_IAC_commands(self):
    326         for cmd in self.cmds:
    327             self._test_command([tl.IAC, cmd])
    328             self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100])
    329             self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10])
    330         # all at once
    331         self._test_command([tl.IAC + cmd for (cmd) in self.cmds])
    332 
    333     def test_SB_commands(self):
    334         # RFC 855, subnegotiations portion
    335         send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
    336                 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
    337                 tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE,
    338                 tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
    339                 tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE,
    340                ]
    341         telnet = test_telnet(send)
    342         nego = nego_collector(telnet.read_sb_data)
    343         telnet.set_option_negotiation_callback(nego.do_nego)
    344         txt = telnet.read_all()
    345         self.assertEqual(txt, b'')
    346         want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd'
    347         self.assertEqual(nego.sb_seen, want_sb_data)
    348         self.assertEqual(b'', telnet.read_sb_data())
    349         nego.sb_getter = None # break the nego => telnet cycle
    350 
    351     def test_debuglevel_reads(self):
    352         # test all the various places that self.msg(...) is called
    353         given_a_expect_b = [
    354             # Telnet.fill_rawq
    355             (b'a', ": recv b''\n"),
    356             # Telnet.process_rawq
    357             (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
    358             (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
    359             (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
    360             (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
    361             (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
    362            ]
    363         for a, b in given_a_expect_b:
    364             telnet = test_telnet([a])
    365             telnet.set_debuglevel(1)
    366             txt = telnet.read_all()
    367             self.assertIn(b, telnet._messages)
    368         return
    369 
    370     def test_debuglevel_write(self):
    371         telnet = test_telnet()
    372         telnet.set_debuglevel(1)
    373         telnet.write(b'xxx')
    374         expected = "send b'xxx'\n"
    375         self.assertIn(expected, telnet._messages)
    376 
    377     def test_debug_accepts_str_port(self):
    378         # Issue 10695
    379         with test_socket([]):
    380             telnet = TelnetAlike('dummy', '0')
    381             telnet._messages = ''
    382         telnet.set_debuglevel(1)
    383         telnet.msg('test')
    384         self.assertRegex(telnet._messages, r'0.*test')
    385 
    386 
    387 class ExpectTests(ExpectAndReadTestCase):
    388     def test_expect(self):
    389         """
    390         expect(expected, [timeout])
    391           Read until the expected string has been seen, or a timeout is
    392           hit (default is no timeout); may block.
    393         """
    394         want = [b'x' * 10, b'match', b'y' * 10]
    395         telnet = test_telnet(want)
    396         (_,_,data) = telnet.expect([b'match'])
    397         self.assertEqual(data, b''.join(want[:-1]))
    398 
    399 
    400 if __name__ == '__main__':
    401     unittest.main()
    402