1 import socket 2 import nntplib 3 import time 4 import unittest 5 6 try: 7 import threading 8 except ImportError: 9 threading = None 10 11 12 from unittest import TestCase 13 from test import test_support 14 15 HOST = test_support.HOST 16 17 18 def server(evt, serv, evil=False): 19 serv.listen(5) 20 try: 21 conn, addr = serv.accept() 22 except socket.timeout: 23 pass 24 else: 25 if evil: 26 conn.send("1 I'm too long response" * 3000 + "\n") 27 else: 28 conn.send("1 I'm OK response\n") 29 conn.close() 30 finally: 31 serv.close() 32 evt.set() 33 34 35 class BaseServerTest(TestCase): 36 def setUp(self): 37 self.evt = threading.Event() 38 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 39 self.sock.settimeout(3) 40 self.port = test_support.bind_port(self.sock) 41 threading.Thread( 42 target=server, 43 args=(self.evt, self.sock, self.evil)).start() 44 time.sleep(.1) 45 46 def tearDown(self): 47 self.evt.wait() 48 49 50 @unittest.skipUnless(threading, 'threading required') 51 class ServerTests(BaseServerTest): 52 evil = False 53 54 def test_basic_connect(self): 55 nntp = nntplib.NNTP('localhost', self.port) 56 nntp.sock.close() 57 58 59 @unittest.skipUnless(threading, 'threading required') 60 class EvilServerTests(BaseServerTest): 61 evil = True 62 63 def test_too_long_line(self): 64 self.assertRaises(nntplib.NNTPDataError, 65 nntplib.NNTP, 'localhost', self.port) 66 67 68 def test_main(verbose=None): 69 test_support.run_unittest(EvilServerTests) 70 test_support.run_unittest(ServerTests) 71 72 if __name__ == '__main__': 73 test_main() 74