1 """Test script for poplib module.""" 2 3 # Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL 4 # a real test suite 5 6 import poplib 7 import asyncore 8 import asynchat 9 import socket 10 import os 11 import errno 12 import threading 13 14 from unittest import TestCase, skipUnless 15 from test import support as test_support 16 17 HOST = test_support.HOST 18 PORT = 0 19 20 SUPPORTS_SSL = False 21 if hasattr(poplib, 'POP3_SSL'): 22 import ssl 23 24 SUPPORTS_SSL = True 25 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 26 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 27 28 requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported') 29 30 # the dummy data returned by server when LIST and RETR commands are issued 31 LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' 32 RETR_RESP = b"""From: postmaster (at] python.org\ 33 \r\nContent-Type: text/plain\r\n\ 34 MIME-Version: 1.0\r\n\ 35 Subject: Dummy\r\n\ 36 \r\n\ 37 line1\r\n\ 38 line2\r\n\ 39 line3\r\n\ 40 .\r\n""" 41 42 43 class DummyPOP3Handler(asynchat.async_chat): 44 45 CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']} 46 enable_UTF8 = False 47 48 def __init__(self, conn): 49 asynchat.async_chat.__init__(self, conn) 50 self.set_terminator(b"\r\n") 51 self.in_buffer = [] 52 self.push('+OK dummy pop3 server ready. <timestamp>') 53 self.tls_active = False 54 self.tls_starting = False 55 56 def collect_incoming_data(self, data): 57 self.in_buffer.append(data) 58 59 def found_terminator(self): 60 line = b''.join(self.in_buffer) 61 line = str(line, 'ISO-8859-1') 62 self.in_buffer = [] 63 cmd = line.split(' ')[0].lower() 64 space = line.find(' ') 65 if space != -1: 66 arg = line[space + 1:] 67 else: 68 arg = "" 69 if hasattr(self, 'cmd_' + cmd): 70 method = getattr(self, 'cmd_' + cmd) 71 method(arg) 72 else: 73 self.push('-ERR unrecognized POP3 command "%s".' %cmd) 74 75 def handle_error(self): 76 raise 77 78 def push(self, data): 79 asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n') 80 81 def cmd_echo(self, arg): 82 # sends back the received string (used by the test suite) 83 self.push(arg) 84 85 def cmd_user(self, arg): 86 if arg != "guido": 87 self.push("-ERR no such user") 88 self.push('+OK password required') 89 90 def cmd_pass(self, arg): 91 if arg != "python": 92 self.push("-ERR wrong password") 93 self.push('+OK 10 messages') 94 95 def cmd_stat(self, arg): 96 self.push('+OK 10 100') 97 98 def cmd_list(self, arg): 99 if arg: 100 self.push('+OK %s %s' % (arg, arg)) 101 else: 102 self.push('+OK') 103 asynchat.async_chat.push(self, LIST_RESP) 104 105 cmd_uidl = cmd_list 106 107 def cmd_retr(self, arg): 108 self.push('+OK %s bytes' %len(RETR_RESP)) 109 asynchat.async_chat.push(self, RETR_RESP) 110 111 cmd_top = cmd_retr 112 113 def cmd_dele(self, arg): 114 self.push('+OK message marked for deletion.') 115 116 def cmd_noop(self, arg): 117 self.push('+OK done nothing.') 118 119 def cmd_rpop(self, arg): 120 self.push('+OK done nothing.') 121 122 def cmd_apop(self, arg): 123 self.push('+OK done nothing.') 124 125 def cmd_quit(self, arg): 126 self.push('+OK closing.') 127 self.close_when_done() 128 129 def _get_capas(self): 130 _capas = dict(self.CAPAS) 131 if not self.tls_active and SUPPORTS_SSL: 132 _capas['STLS'] = [] 133 return _capas 134 135 def cmd_capa(self, arg): 136 self.push('+OK Capability list follows') 137 if self._get_capas(): 138 for cap, params in self._get_capas().items(): 139 _ln = [cap] 140 if params: 141 _ln.extend(params) 142 self.push(' '.join(_ln)) 143 self.push('.') 144 145 def cmd_utf8(self, arg): 146 self.push('+OK I know RFC6856' 147 if self.enable_UTF8 148 else '-ERR What is UTF8?!') 149 150 if SUPPORTS_SSL: 151 152 def cmd_stls(self, arg): 153 if self.tls_active is False: 154 self.push('+OK Begin TLS negotiation') 155 context = ssl.SSLContext() 156 context.load_cert_chain(CERTFILE) 157 tls_sock = context.wrap_socket(self.socket, 158 server_side=True, 159 do_handshake_on_connect=False, 160 suppress_ragged_eofs=False) 161 self.del_channel() 162 self.set_socket(tls_sock) 163 self.tls_active = True 164 self.tls_starting = True 165 self.in_buffer = [] 166 self._do_tls_handshake() 167 else: 168 self.push('-ERR Command not permitted when TLS active') 169 170 def _do_tls_handshake(self): 171 try: 172 self.socket.do_handshake() 173 except ssl.SSLError as err: 174 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 175 ssl.SSL_ERROR_WANT_WRITE): 176 return 177 elif err.args[0] == ssl.SSL_ERROR_EOF: 178 return self.handle_close() 179 # TODO: SSLError does not expose alert information 180 elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or 181 "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]): 182 return self.handle_close() 183 raise 184 except OSError as err: 185 if err.args[0] == errno.ECONNABORTED: 186 return self.handle_close() 187 else: 188 self.tls_active = True 189 self.tls_starting = False 190 191 def handle_read(self): 192 if self.tls_starting: 193 self._do_tls_handshake() 194 else: 195 try: 196 asynchat.async_chat.handle_read(self) 197 except ssl.SSLEOFError: 198 self.handle_close() 199 200 class DummyPOP3Server(asyncore.dispatcher, threading.Thread): 201 202 handler = DummyPOP3Handler 203 204 def __init__(self, address, af=socket.AF_INET): 205 threading.Thread.__init__(self) 206 asyncore.dispatcher.__init__(self) 207 self.daemon = True 208 self.create_socket(af, socket.SOCK_STREAM) 209 self.bind(address) 210 self.listen(5) 211 self.active = False 212 self.active_lock = threading.Lock() 213 self.host, self.port = self.socket.getsockname()[:2] 214 self.handler_instance = None 215 216 def start(self): 217 assert not self.active 218 self.__flag = threading.Event() 219 threading.Thread.start(self) 220 self.__flag.wait() 221 222 def run(self): 223 self.active = True 224 self.__flag.set() 225 try: 226 while self.active and asyncore.socket_map: 227 with self.active_lock: 228 asyncore.loop(timeout=0.1, count=1) 229 finally: 230 asyncore.close_all(ignore_all=True) 231 232 def stop(self): 233 assert self.active 234 self.active = False 235 self.join() 236 237 def handle_accepted(self, conn, addr): 238 self.handler_instance = self.handler(conn) 239 240 def handle_connect(self): 241 self.close() 242 handle_read = handle_connect 243 244 def writable(self): 245 return 0 246 247 def handle_error(self): 248 raise 249 250 251 class TestPOP3Class(TestCase): 252 def assertOK(self, resp): 253 self.assertTrue(resp.startswith(b"+OK")) 254 255 def setUp(self): 256 self.server = DummyPOP3Server((HOST, PORT)) 257 self.server.start() 258 self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) 259 260 def tearDown(self): 261 self.client.close() 262 self.server.stop() 263 # Explicitly clear the attribute to prevent dangling thread 264 self.server = None 265 266 def test_getwelcome(self): 267 self.assertEqual(self.client.getwelcome(), 268 b'+OK dummy pop3 server ready. <timestamp>') 269 270 def test_exceptions(self): 271 self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') 272 273 def test_user(self): 274 self.assertOK(self.client.user('guido')) 275 self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 276 277 def test_pass_(self): 278 self.assertOK(self.client.pass_('python')) 279 self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 280 281 def test_stat(self): 282 self.assertEqual(self.client.stat(), (10, 100)) 283 284 def test_list(self): 285 self.assertEqual(self.client.list()[1:], 286 ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'], 287 25)) 288 self.assertTrue(self.client.list('1').endswith(b"OK 1 1")) 289 290 def test_retr(self): 291 expected = (b'+OK 116 bytes', 292 [b'From: postmaster (at] python.org', b'Content-Type: text/plain', 293 b'MIME-Version: 1.0', b'Subject: Dummy', 294 b'', b'line1', b'line2', b'line3'], 295 113) 296 foo = self.client.retr('foo') 297 self.assertEqual(foo, expected) 298 299 def test_too_long_lines(self): 300 self.assertRaises(poplib.error_proto, self.client._shortcmd, 301 'echo +%s' % ((poplib._MAXLINE + 10) * 'a')) 302 303 def test_dele(self): 304 self.assertOK(self.client.dele('foo')) 305 306 def test_noop(self): 307 self.assertOK(self.client.noop()) 308 309 def test_rpop(self): 310 self.assertOK(self.client.rpop('foo')) 311 312 def test_apop_normal(self): 313 self.assertOK(self.client.apop('foo', 'dummypassword')) 314 315 def test_apop_REDOS(self): 316 # Replace welcome with very long evil welcome. 317 # NB The upper bound on welcome length is currently 2048. 318 # At this length, evil input makes each apop call take 319 # on the order of milliseconds instead of microseconds. 320 evil_welcome = b'+OK' + (b'<' * 1000000) 321 with test_support.swap_attr(self.client, 'welcome', evil_welcome): 322 # The evil welcome is invalid, so apop should throw. 323 self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb') 324 325 def test_top(self): 326 expected = (b'+OK 116 bytes', 327 [b'From: postmaster (at] python.org', b'Content-Type: text/plain', 328 b'MIME-Version: 1.0', b'Subject: Dummy', b'', 329 b'line1', b'line2', b'line3'], 330 113) 331 self.assertEqual(self.client.top(1, 1), expected) 332 333 def test_uidl(self): 334 self.client.uidl() 335 self.client.uidl('foo') 336 337 def test_utf8_raises_if_unsupported(self): 338 self.server.handler.enable_UTF8 = False 339 self.assertRaises(poplib.error_proto, self.client.utf8) 340 341 def test_utf8(self): 342 self.server.handler.enable_UTF8 = True 343 expected = b'+OK I know RFC6856' 344 result = self.client.utf8() 345 self.assertEqual(result, expected) 346 347 def test_capa(self): 348 capa = self.client.capa() 349 self.assertTrue('IMPLEMENTATION' in capa.keys()) 350 351 def test_quit(self): 352 resp = self.client.quit() 353 self.assertTrue(resp) 354 self.assertIsNone(self.client.sock) 355 self.assertIsNone(self.client.file) 356 357 @requires_ssl 358 def test_stls_capa(self): 359 capa = self.client.capa() 360 self.assertTrue('STLS' in capa.keys()) 361 362 @requires_ssl 363 def test_stls(self): 364 expected = b'+OK Begin TLS negotiation' 365 resp = self.client.stls() 366 self.assertEqual(resp, expected) 367 368 @requires_ssl 369 def test_stls_context(self): 370 expected = b'+OK Begin TLS negotiation' 371 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 372 ctx.load_verify_locations(CAFILE) 373 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 374 self.assertEqual(ctx.check_hostname, True) 375 with self.assertRaises(ssl.CertificateError): 376 resp = self.client.stls(context=ctx) 377 self.client = poplib.POP3("localhost", self.server.port, timeout=3) 378 resp = self.client.stls(context=ctx) 379 self.assertEqual(resp, expected) 380 381 382 if SUPPORTS_SSL: 383 from test.test_ftplib import SSLConnection 384 385 class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler): 386 387 def __init__(self, conn): 388 asynchat.async_chat.__init__(self, conn) 389 self.secure_connection() 390 self.set_terminator(b"\r\n") 391 self.in_buffer = [] 392 self.push('+OK dummy pop3 server ready. <timestamp>') 393 self.tls_active = True 394 self.tls_starting = False 395 396 397 @requires_ssl 398 class TestPOP3_SSLClass(TestPOP3Class): 399 # repeat previous tests by using poplib.POP3_SSL 400 401 def setUp(self): 402 self.server = DummyPOP3Server((HOST, PORT)) 403 self.server.handler = DummyPOP3_SSLHandler 404 self.server.start() 405 self.client = poplib.POP3_SSL(self.server.host, self.server.port) 406 407 def test__all__(self): 408 self.assertIn('POP3_SSL', poplib.__all__) 409 410 def test_context(self): 411 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 412 ctx.check_hostname = False 413 ctx.verify_mode = ssl.CERT_NONE 414 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 415 self.server.port, keyfile=CERTFILE, context=ctx) 416 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 417 self.server.port, certfile=CERTFILE, context=ctx) 418 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 419 self.server.port, keyfile=CERTFILE, 420 certfile=CERTFILE, context=ctx) 421 422 self.client.quit() 423 self.client = poplib.POP3_SSL(self.server.host, self.server.port, 424 context=ctx) 425 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 426 self.assertIs(self.client.sock.context, ctx) 427 self.assertTrue(self.client.noop().startswith(b'+OK')) 428 429 def test_stls(self): 430 self.assertRaises(poplib.error_proto, self.client.stls) 431 432 test_stls_context = test_stls 433 434 def test_stls_capa(self): 435 capa = self.client.capa() 436 self.assertFalse('STLS' in capa.keys()) 437 438 439 @requires_ssl 440 class TestPOP3_TLSClass(TestPOP3Class): 441 # repeat previous tests by using poplib.POP3.stls() 442 443 def setUp(self): 444 self.server = DummyPOP3Server((HOST, PORT)) 445 self.server.start() 446 self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) 447 self.client.stls() 448 449 def tearDown(self): 450 if self.client.file is not None and self.client.sock is not None: 451 try: 452 self.client.quit() 453 except poplib.error_proto: 454 # happens in the test_too_long_lines case; the overlong 455 # response will be treated as response to QUIT and raise 456 # this exception 457 self.client.close() 458 self.server.stop() 459 # Explicitly clear the attribute to prevent dangling thread 460 self.server = None 461 462 def test_stls(self): 463 self.assertRaises(poplib.error_proto, self.client.stls) 464 465 test_stls_context = test_stls 466 467 def test_stls_capa(self): 468 capa = self.client.capa() 469 self.assertFalse(b'STLS' in capa.keys()) 470 471 472 class TestTimeouts(TestCase): 473 474 def setUp(self): 475 self.evt = threading.Event() 476 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 477 self.sock.settimeout(60) # Safety net. Look issue 11812 478 self.port = test_support.bind_port(self.sock) 479 self.thread = threading.Thread(target=self.server, args=(self.evt,self.sock)) 480 self.thread.daemon = True 481 self.thread.start() 482 self.evt.wait() 483 484 def tearDown(self): 485 self.thread.join() 486 # Explicitly clear the attribute to prevent dangling thread 487 self.thread = None 488 489 def server(self, evt, serv): 490 serv.listen() 491 evt.set() 492 try: 493 conn, addr = serv.accept() 494 conn.send(b"+ Hola mundo\n") 495 conn.close() 496 except socket.timeout: 497 pass 498 finally: 499 serv.close() 500 501 def testTimeoutDefault(self): 502 self.assertIsNone(socket.getdefaulttimeout()) 503 socket.setdefaulttimeout(30) 504 try: 505 pop = poplib.POP3(HOST, self.port) 506 finally: 507 socket.setdefaulttimeout(None) 508 self.assertEqual(pop.sock.gettimeout(), 30) 509 pop.close() 510 511 def testTimeoutNone(self): 512 self.assertIsNone(socket.getdefaulttimeout()) 513 socket.setdefaulttimeout(30) 514 try: 515 pop = poplib.POP3(HOST, self.port, timeout=None) 516 finally: 517 socket.setdefaulttimeout(None) 518 self.assertIsNone(pop.sock.gettimeout()) 519 pop.close() 520 521 def testTimeoutValue(self): 522 pop = poplib.POP3(HOST, self.port, timeout=30) 523 self.assertEqual(pop.sock.gettimeout(), 30) 524 pop.close() 525 526 527 def test_main(): 528 tests = [TestPOP3Class, TestTimeouts, 529 TestPOP3_SSLClass, TestPOP3_TLSClass] 530 thread_info = test_support.threading_setup() 531 try: 532 test_support.run_unittest(*tests) 533 finally: 534 test_support.threading_cleanup(*thread_info) 535 536 537 if __name__ == '__main__': 538 test_main() 539