1 import socket 2 import telnetlib 3 import time 4 import Queue 5 6 from unittest import TestCase 7 from test import test_support 8 threading = test_support.import_module('threading') 9 10 HOST = test_support.HOST 11 EOF_sigil = object() 12 13 def server(evt, serv, dataq=None): 14 """ Open a tcp server in three steps 15 1) set evt to true to let the parent know we are ready 16 2) [optional] if is not False, write the list of data from dataq.get() 17 to the socket. 18 3) set evt to true to let the parent know we're done 19 """ 20 serv.listen(5) 21 evt.set() 22 try: 23 conn, addr = serv.accept() 24 if dataq: 25 data = '' 26 new_data = dataq.get(True, 0.5) 27 dataq.task_done() 28 for item in new_data: 29 if item == EOF_sigil: 30 break 31 if type(item) in [int, float]: 32 time.sleep(item) 33 else: 34 data += item 35 written = conn.send(data) 36 data = data[written:] 37 except socket.timeout: 38 pass 39 else: 40 conn.close() 41 finally: 42 serv.close() 43 evt.set() 44 45 class GeneralTests(TestCase): 46 47 def setUp(self): 48 self.evt = threading.Event() 49 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 50 self.sock.settimeout(3) 51 self.port = test_support.bind_port(self.sock) 52 self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) 53 self.thread.start() 54 self.evt.wait() 55 self.evt.clear() 56 time.sleep(.1) 57 58 def tearDown(self): 59 self.evt.wait() 60 self.thread.join() 61 62 def testBasic(self): 63 # connects 64 telnet = telnetlib.Telnet(HOST, self.port) 65 telnet.sock.close() 66 67 def testTimeoutDefault(self): 68 self.assertTrue(socket.getdefaulttimeout() is None) 69 socket.setdefaulttimeout(30) 70 try: 71 telnet = telnetlib.Telnet("localhost", self.port) 72 finally: 73 socket.setdefaulttimeout(None) 74 self.assertEqual(telnet.sock.gettimeout(), 30) 75 telnet.sock.close() 76 77 def testTimeoutNone(self): 78 # None, having other default 79 self.assertTrue(socket.getdefaulttimeout() is None) 80 socket.setdefaulttimeout(30) 81 try: 82 telnet = telnetlib.Telnet(HOST, self.port, timeout=None) 83 finally: 84 socket.setdefaulttimeout(None) 85 self.assertTrue(telnet.sock.gettimeout() is None) 86 telnet.sock.close() 87 88 def testTimeoutValue(self): 89 telnet = telnetlib.Telnet("localhost", self.port, timeout=30) 90 self.assertEqual(telnet.sock.gettimeout(), 30) 91 telnet.sock.close() 92 93 def testTimeoutOpen(self): 94 telnet = telnetlib.Telnet() 95 telnet.open("localhost", self.port, timeout=30) 96 self.assertEqual(telnet.sock.gettimeout(), 30) 97 telnet.sock.close() 98 99 def _read_setUp(self): 100 self.evt = threading.Event() 101 self.dataq = Queue.Queue() 102 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 103 self.sock.settimeout(3) 104 self.port = test_support.bind_port(self.sock) 105 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) 106 self.thread.start() 107 self.evt.wait() 108 self.evt.clear() 109 time.sleep(.1) 110 111 def _read_tearDown(self): 112 self.evt.wait() 113 self.thread.join() 114 115 class ReadTests(TestCase): 116 setUp = _read_setUp 117 tearDown = _read_tearDown 118 119 # use a similar approach to testing timeouts as test_timeout.py 120 # these will never pass 100% but make the fuzz big enough that it is rare 121 block_long = 0.6 122 block_short = 0.3 123 def test_read_until_A(self): 124 """ 125 read_until(expected, [timeout]) 126 Read until the expected string has been seen, or a timeout is 127 hit (default is no timeout); may block. 128 """ 129 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 130 self.dataq.put(want) 131 telnet = telnetlib.Telnet(HOST, self.port) 132 self.dataq.join() 133 data = telnet.read_until('match') 134 self.assertEqual(data, ''.join(want[:-2])) 135 136 def test_read_until_B(self): 137 # test the timeout - it does NOT raise socket.timeout 138 want = ['hello', self.block_long, 'not seen', EOF_sigil] 139 self.dataq.put(want) 140 telnet = telnetlib.Telnet(HOST, self.port) 141 self.dataq.join() 142 data = telnet.read_until('not seen', self.block_short) 143 self.assertEqual(data, want[0]) 144 self.assertEqual(telnet.read_all(), 'not seen') 145 146 def test_read_all_A(self): 147 """ 148 read_all() 149 Read all data until EOF; may block. 150 """ 151 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil] 152 self.dataq.put(want) 153 telnet = telnetlib.Telnet(HOST, self.port) 154 self.dataq.join() 155 data = telnet.read_all() 156 self.assertEqual(data, ''.join(want[:-1])) 157 return 158 159 def _test_blocking(self, func): 160 self.dataq.put([self.block_long, EOF_sigil]) 161 self.dataq.join() 162 start = time.time() 163 data = func() 164 self.assertTrue(self.block_short <= time.time() - start) 165 166 def test_read_all_B(self): 167 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all) 168 169 def test_read_all_C(self): 170 self.dataq.put([EOF_sigil]) 171 telnet = telnetlib.Telnet(HOST, self.port) 172 self.dataq.join() 173 telnet.read_all() 174 telnet.read_all() # shouldn't raise 175 176 def test_read_some_A(self): 177 """ 178 read_some() 179 Read at least one byte or EOF; may block. 180 """ 181 # test 'at least one byte' 182 want = ['x' * 500, EOF_sigil] 183 self.dataq.put(want) 184 telnet = telnetlib.Telnet(HOST, self.port) 185 self.dataq.join() 186 data = telnet.read_all() 187 self.assertTrue(len(data) >= 1) 188 189 def test_read_some_B(self): 190 # test EOF 191 self.dataq.put([EOF_sigil]) 192 telnet = telnetlib.Telnet(HOST, self.port) 193 self.dataq.join() 194 self.assertEqual('', telnet.read_some()) 195 196 def test_read_some_C(self): 197 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some) 198 199 def _test_read_any_eager_A(self, func_name): 200 """ 201 read_very_eager() 202 Read all data available already queued or on the socket, 203 without blocking. 204 """ 205 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil] 206 expects = want[1] + want[2] 207 self.dataq.put(want) 208 telnet = telnetlib.Telnet(HOST, self.port) 209 self.dataq.join() 210 func = getattr(telnet, func_name) 211 data = '' 212 while True: 213 try: 214 data += func() 215 self.assertTrue(expects.startswith(data)) 216 except EOFError: 217 break 218 self.assertEqual(expects, data) 219 220 def _test_read_any_eager_B(self, func_name): 221 # test EOF 222 self.dataq.put([EOF_sigil]) 223 telnet = telnetlib.Telnet(HOST, self.port) 224 self.dataq.join() 225 time.sleep(self.block_short) 226 func = getattr(telnet, func_name) 227 self.assertRaises(EOFError, func) 228 229 # read_eager and read_very_eager make the same gaurantees 230 # (they behave differently but we only test the gaurantees) 231 def test_read_very_eager_A(self): 232 self._test_read_any_eager_A('read_very_eager') 233 def test_read_very_eager_B(self): 234 self._test_read_any_eager_B('read_very_eager') 235 def test_read_eager_A(self): 236 self._test_read_any_eager_A('read_eager') 237 def test_read_eager_B(self): 238 self._test_read_any_eager_B('read_eager') 239 # NB -- we need to test the IAC block which is mentioned in the docstring 240 # but not in the module docs 241 242 def _test_read_any_lazy_B(self, func_name): 243 self.dataq.put([EOF_sigil]) 244 telnet = telnetlib.Telnet(HOST, self.port) 245 self.dataq.join() 246 func = getattr(telnet, func_name) 247 telnet.fill_rawq() 248 self.assertRaises(EOFError, func) 249 250 def test_read_lazy_A(self): 251 want = ['x' * 100, EOF_sigil] 252 self.dataq.put(want) 253 telnet = telnetlib.Telnet(HOST, self.port) 254 self.dataq.join() 255 time.sleep(self.block_short) 256 self.assertEqual('', telnet.read_lazy()) 257 data = '' 258 while True: 259 try: 260 read_data = telnet.read_lazy() 261 data += read_data 262 if not read_data: 263 telnet.fill_rawq() 264 except EOFError: 265 break 266 self.assertTrue(want[0].startswith(data)) 267 self.assertEqual(data, want[0]) 268 269 def test_read_lazy_B(self): 270 self._test_read_any_lazy_B('read_lazy') 271 272 def test_read_very_lazy_A(self): 273 want = ['x' * 100, EOF_sigil] 274 self.dataq.put(want) 275 telnet = telnetlib.Telnet(HOST, self.port) 276 self.dataq.join() 277 time.sleep(self.block_short) 278 self.assertEqual('', telnet.read_very_lazy()) 279 data = '' 280 while True: 281 try: 282 read_data = telnet.read_very_lazy() 283 except EOFError: 284 break 285 data += read_data 286 if not read_data: 287 telnet.fill_rawq() 288 self.assertEqual('', telnet.cookedq) 289 telnet.process_rawq() 290 self.assertTrue(want[0].startswith(data)) 291 self.assertEqual(data, want[0]) 292 293 def test_read_very_lazy_B(self): 294 self._test_read_any_lazy_B('read_very_lazy') 295 296 class nego_collector(object): 297 def __init__(self, sb_getter=None): 298 self.seen = '' 299 self.sb_getter = sb_getter 300 self.sb_seen = '' 301 302 def do_nego(self, sock, cmd, opt): 303 self.seen += cmd + opt 304 if cmd == tl.SE and self.sb_getter: 305 sb_data = self.sb_getter() 306 self.sb_seen += sb_data 307 308 tl = telnetlib 309 class OptionTests(TestCase): 310 setUp = _read_setUp 311 tearDown = _read_tearDown 312 # RFC 854 commands 313 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] 314 315 def _test_command(self, data): 316 """ helper for testing IAC + cmd """ 317 self.setUp() 318 self.dataq.put(data) 319 telnet = telnetlib.Telnet(HOST, self.port) 320 self.dataq.join() 321 nego = nego_collector() 322 telnet.set_option_negotiation_callback(nego.do_nego) 323 txt = telnet.read_all() 324 cmd = nego.seen 325 self.assertTrue(len(cmd) > 0) # we expect at least one command 326 self.assertIn(cmd[0], self.cmds) 327 self.assertEqual(cmd[1], tl.NOOPT) 328 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd)) 329 nego.sb_getter = None # break the nego => telnet cycle 330 self.tearDown() 331 332 def test_IAC_commands(self): 333 # reset our setup 334 self.dataq.put([EOF_sigil]) 335 telnet = telnetlib.Telnet(HOST, self.port) 336 self.dataq.join() 337 self.tearDown() 338 339 for cmd in self.cmds: 340 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil]) 341 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil]) 342 self._test_command([tl.IAC + cmd, EOF_sigil]) 343 # all at once 344 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) 345 self.assertEqual('', telnet.read_sb_data()) 346 347 def test_SB_commands(self): 348 # RFC 855, subnegotiations portion 349 send = [tl.IAC + tl.SB + tl.IAC + tl.SE, 350 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE, 351 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE, 352 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, 353 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE, 354 EOF_sigil, 355 ] 356 self.dataq.put(send) 357 telnet = telnetlib.Telnet(HOST, self.port) 358 self.dataq.join() 359 nego = nego_collector(telnet.read_sb_data) 360 telnet.set_option_negotiation_callback(nego.do_nego) 361 txt = telnet.read_all() 362 self.assertEqual(txt, '') 363 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd' 364 self.assertEqual(nego.sb_seen, want_sb_data) 365 self.assertEqual('', telnet.read_sb_data()) 366 nego.sb_getter = None # break the nego => telnet cycle 367 368 def test_main(verbose=None): 369 test_support.run_unittest(GeneralTests, ReadTests, OptionTests) 370 371 if __name__ == '__main__': 372 test_main() 373