1 """Test script for ftplib module.""" 2 3 # Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS 4 # environment 5 6 import ftplib 7 import asyncore 8 import asynchat 9 import socket 10 import StringIO 11 import errno 12 import os 13 try: 14 import ssl 15 except ImportError: 16 ssl = None 17 18 from unittest import TestCase 19 from test import test_support 20 from test.test_support import HOST 21 threading = test_support.import_module('threading') 22 23 24 # the dummy data returned by server over the data channel when 25 # RETR, LIST and NLST commands are issued 26 RETR_DATA = 'abcde12345\r\n' * 1000 27 LIST_DATA = 'foo\r\nbar\r\n' 28 NLST_DATA = 'foo\r\nbar\r\n' 29 30 31 class DummyDTPHandler(asynchat.async_chat): 32 dtp_conn_closed = False 33 34 def __init__(self, conn, baseclass): 35 asynchat.async_chat.__init__(self, conn) 36 self.baseclass = baseclass 37 self.baseclass.last_received_data = '' 38 39 def handle_read(self): 40 self.baseclass.last_received_data += self.recv(1024) 41 42 def handle_close(self): 43 # XXX: this method can be called many times in a row for a single 44 # connection, including in clear-text (non-TLS) mode. 45 # (behaviour witnessed with test_data_connection) 46 if not self.dtp_conn_closed: 47 self.baseclass.push('226 transfer complete') 48 self.close() 49 self.dtp_conn_closed = True 50 51 def handle_error(self): 52 raise 53 54 55 class DummyFTPHandler(asynchat.async_chat): 56 57 dtp_handler = DummyDTPHandler 58 59 def __init__(self, conn): 60 asynchat.async_chat.__init__(self, conn) 61 self.set_terminator("\r\n") 62 self.in_buffer = [] 63 self.dtp = None 64 self.last_received_cmd = None 65 self.last_received_data = '' 66 self.next_response = '' 67 self.rest = None 68 self.push('220 welcome') 69 70 def collect_incoming_data(self, data): 71 self.in_buffer.append(data) 72 73 def found_terminator(self): 74 line = ''.join(self.in_buffer) 75 self.in_buffer = [] 76 if self.next_response: 77 self.push(self.next_response) 78 self.next_response = '' 79 cmd = line.split(' ')[0].lower() 80 self.last_received_cmd = cmd 81 space = line.find(' ') 82 if space != -1: 83 arg = line[space + 1:] 84 else: 85 arg = "" 86 if hasattr(self, 'cmd_' + cmd): 87 method = getattr(self, 'cmd_' + cmd) 88 method(arg) 89 else: 90 self.push('550 command "%s" not understood.' %cmd) 91 92 def handle_error(self): 93 raise 94 95 def push(self, data): 96 asynchat.async_chat.push(self, data + '\r\n') 97 98 def cmd_port(self, arg): 99 addr = map(int, arg.split(',')) 100 ip = '%d.%d.%d.%d' %tuple(addr[:4]) 101 port = (addr[4] * 256) + addr[5] 102 s = socket.create_connection((ip, port), timeout=10) 103 self.dtp = self.dtp_handler(s, baseclass=self) 104 self.push('200 active data connection established') 105 106 def cmd_pasv(self, arg): 107 sock = socket.socket() 108 sock.bind((self.socket.getsockname()[0], 0)) 109 sock.listen(5) 110 sock.settimeout(10) 111 ip, port = sock.getsockname()[:2] 112 ip = ip.replace('.', ',') 113 p1, p2 = divmod(port, 256) 114 self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2)) 115 conn, addr = sock.accept() 116 self.dtp = self.dtp_handler(conn, baseclass=self) 117 118 def cmd_eprt(self, arg): 119 af, ip, port = arg.split(arg[0])[1:-1] 120 port = int(port) 121 s = socket.create_connection((ip, port), timeout=10) 122 self.dtp = self.dtp_handler(s, baseclass=self) 123 self.push('200 active data connection established') 124 125 def cmd_epsv(self, arg): 126 sock = socket.socket(socket.AF_INET6) 127 sock.bind((self.socket.getsockname()[0], 0)) 128 sock.listen(5) 129 sock.settimeout(10) 130 port = sock.getsockname()[1] 131 self.push('229 entering extended passive mode (|||%d|)' %port) 132 conn, addr = sock.accept() 133 self.dtp = self.dtp_handler(conn, baseclass=self) 134 135 def cmd_echo(self, arg): 136 # sends back the received string (used by the test suite) 137 self.push(arg) 138 139 def cmd_user(self, arg): 140 self.push('331 username ok') 141 142 def cmd_pass(self, arg): 143 self.push('230 password ok') 144 145 def cmd_acct(self, arg): 146 self.push('230 acct ok') 147 148 def cmd_rnfr(self, arg): 149 self.push('350 rnfr ok') 150 151 def cmd_rnto(self, arg): 152 self.push('250 rnto ok') 153 154 def cmd_dele(self, arg): 155 self.push('250 dele ok') 156 157 def cmd_cwd(self, arg): 158 self.push('250 cwd ok') 159 160 def cmd_size(self, arg): 161 self.push('250 1000') 162 163 def cmd_mkd(self, arg): 164 self.push('257 "%s"' %arg) 165 166 def cmd_rmd(self, arg): 167 self.push('250 rmd ok') 168 169 def cmd_pwd(self, arg): 170 self.push('257 "pwd ok"') 171 172 def cmd_type(self, arg): 173 self.push('200 type ok') 174 175 def cmd_quit(self, arg): 176 self.push('221 quit ok') 177 self.close() 178 179 def cmd_stor(self, arg): 180 self.push('125 stor ok') 181 182 def cmd_rest(self, arg): 183 self.rest = arg 184 self.push('350 rest ok') 185 186 def cmd_retr(self, arg): 187 self.push('125 retr ok') 188 if self.rest is not None: 189 offset = int(self.rest) 190 else: 191 offset = 0 192 self.dtp.push(RETR_DATA[offset:]) 193 self.dtp.close_when_done() 194 self.rest = None 195 196 def cmd_list(self, arg): 197 self.push('125 list ok') 198 self.dtp.push(LIST_DATA) 199 self.dtp.close_when_done() 200 201 def cmd_nlst(self, arg): 202 self.push('125 nlst ok') 203 self.dtp.push(NLST_DATA) 204 self.dtp.close_when_done() 205 206 207 class DummyFTPServer(asyncore.dispatcher, threading.Thread): 208 209 handler = DummyFTPHandler 210 211 def __init__(self, address, af=socket.AF_INET): 212 threading.Thread.__init__(self) 213 asyncore.dispatcher.__init__(self) 214 self.create_socket(af, socket.SOCK_STREAM) 215 self.bind(address) 216 self.listen(5) 217 self.active = False 218 self.active_lock = threading.Lock() 219 self.host, self.port = self.socket.getsockname()[:2] 220 221 def start(self): 222 assert not self.active 223 self.__flag = threading.Event() 224 threading.Thread.start(self) 225 self.__flag.wait() 226 227 def run(self): 228 self.active = True 229 self.__flag.set() 230 while self.active and asyncore.socket_map: 231 self.active_lock.acquire() 232 asyncore.loop(timeout=0.1, count=1) 233 self.active_lock.release() 234 asyncore.close_all(ignore_all=True) 235 236 def stop(self): 237 assert self.active 238 self.active = False 239 self.join() 240 241 def handle_accept(self): 242 conn, addr = self.accept() 243 self.handler = self.handler(conn) 244 self.close() 245 246 def handle_connect(self): 247 self.close() 248 handle_read = handle_connect 249 250 def writable(self): 251 return 0 252 253 def handle_error(self): 254 raise 255 256 257 if ssl is not None: 258 259 CERTFILE = os.path.join(os.path.dirname(__file__), "keycert.pem") 260 261 class SSLConnection(object, asyncore.dispatcher): 262 """An asyncore.dispatcher subclass supporting TLS/SSL.""" 263 264 _ssl_accepting = False 265 _ssl_closing = False 266 267 def secure_connection(self): 268 self.socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False, 269 certfile=CERTFILE, server_side=True, 270 do_handshake_on_connect=False, 271 ssl_version=ssl.PROTOCOL_SSLv23) 272 self._ssl_accepting = True 273 274 def _do_ssl_handshake(self): 275 try: 276 self.socket.do_handshake() 277 except ssl.SSLError, err: 278 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 279 ssl.SSL_ERROR_WANT_WRITE): 280 return 281 elif err.args[0] == ssl.SSL_ERROR_EOF: 282 return self.handle_close() 283 raise 284 except socket.error, err: 285 if err.args[0] == errno.ECONNABORTED: 286 return self.handle_close() 287 else: 288 self._ssl_accepting = False 289 290 def _do_ssl_shutdown(self): 291 self._ssl_closing = True 292 try: 293 self.socket = self.socket.unwrap() 294 except ssl.SSLError, err: 295 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 296 ssl.SSL_ERROR_WANT_WRITE): 297 return 298 except socket.error, err: 299 # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return 300 # from OpenSSL's SSL_shutdown(), corresponding to a 301 # closed socket condition. See also: 302 # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html 303 pass 304 self._ssl_closing = False 305 super(SSLConnection, self).close() 306 307 def handle_read_event(self): 308 if self._ssl_accepting: 309 self._do_ssl_handshake() 310 elif self._ssl_closing: 311 self._do_ssl_shutdown() 312 else: 313 super(SSLConnection, self).handle_read_event() 314 315 def handle_write_event(self): 316 if self._ssl_accepting: 317 self._do_ssl_handshake() 318 elif self._ssl_closing: 319 self._do_ssl_shutdown() 320 else: 321 super(SSLConnection, self).handle_write_event() 322 323 def send(self, data): 324 try: 325 return super(SSLConnection, self).send(data) 326 except ssl.SSLError, err: 327 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN, 328 ssl.SSL_ERROR_WANT_READ, 329 ssl.SSL_ERROR_WANT_WRITE): 330 return 0 331 raise 332 333 def recv(self, buffer_size): 334 try: 335 return super(SSLConnection, self).recv(buffer_size) 336 except ssl.SSLError, err: 337 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 338 ssl.SSL_ERROR_WANT_WRITE): 339 return '' 340 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): 341 self.handle_close() 342 return '' 343 raise 344 345 def handle_error(self): 346 raise 347 348 def close(self): 349 if (isinstance(self.socket, ssl.SSLSocket) and 350 self.socket._sslobj is not None): 351 self._do_ssl_shutdown() 352 353 354 class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler): 355 """A DummyDTPHandler subclass supporting TLS/SSL.""" 356 357 def __init__(self, conn, baseclass): 358 DummyDTPHandler.__init__(self, conn, baseclass) 359 if self.baseclass.secure_data_channel: 360 self.secure_connection() 361 362 363 class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler): 364 """A DummyFTPHandler subclass supporting TLS/SSL.""" 365 366 dtp_handler = DummyTLS_DTPHandler 367 368 def __init__(self, conn): 369 DummyFTPHandler.__init__(self, conn) 370 self.secure_data_channel = False 371 372 def cmd_auth(self, line): 373 """Set up secure control channel.""" 374 self.push('234 AUTH TLS successful') 375 self.secure_connection() 376 377 def cmd_pbsz(self, line): 378 """Negotiate size of buffer for secure data transfer. 379 For TLS/SSL the only valid value for the parameter is '0'. 380 Any other value is accepted but ignored. 381 """ 382 self.push('200 PBSZ=0 successful.') 383 384 def cmd_prot(self, line): 385 """Setup un/secure data channel.""" 386 arg = line.upper() 387 if arg == 'C': 388 self.push('200 Protection set to Clear') 389 self.secure_data_channel = False 390 elif arg == 'P': 391 self.push('200 Protection set to Private') 392 self.secure_data_channel = True 393 else: 394 self.push("502 Unrecognized PROT type (use C or P).") 395 396 397 class DummyTLS_FTPServer(DummyFTPServer): 398 handler = DummyTLS_FTPHandler 399 400 401 class TestFTPClass(TestCase): 402 403 def setUp(self): 404 self.server = DummyFTPServer((HOST, 0)) 405 self.server.start() 406 self.client = ftplib.FTP(timeout=10) 407 self.client.connect(self.server.host, self.server.port) 408 409 def tearDown(self): 410 self.client.close() 411 self.server.stop() 412 413 def test_getwelcome(self): 414 self.assertEqual(self.client.getwelcome(), '220 welcome') 415 416 def test_sanitize(self): 417 self.assertEqual(self.client.sanitize('foo'), repr('foo')) 418 self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****')) 419 self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****')) 420 421 def test_exceptions(self): 422 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400') 423 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499') 424 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500') 425 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599') 426 self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999') 427 428 def test_all_errors(self): 429 exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm, 430 ftplib.error_proto, ftplib.Error, IOError, EOFError) 431 for x in exceptions: 432 try: 433 raise x('exception not included in all_errors set') 434 except ftplib.all_errors: 435 pass 436 437 def test_set_pasv(self): 438 # passive mode is supposed to be enabled by default 439 self.assertTrue(self.client.passiveserver) 440 self.client.set_pasv(True) 441 self.assertTrue(self.client.passiveserver) 442 self.client.set_pasv(False) 443 self.assertFalse(self.client.passiveserver) 444 445 def test_voidcmd(self): 446 self.client.voidcmd('echo 200') 447 self.client.voidcmd('echo 299') 448 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199') 449 self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300') 450 451 def test_login(self): 452 self.client.login() 453 454 def test_acct(self): 455 self.client.acct('passwd') 456 457 def test_rename(self): 458 self.client.rename('a', 'b') 459 self.server.handler.next_response = '200' 460 self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b') 461 462 def test_delete(self): 463 self.client.delete('foo') 464 self.server.handler.next_response = '199' 465 self.assertRaises(ftplib.error_reply, self.client.delete, 'foo') 466 467 def test_size(self): 468 self.client.size('foo') 469 470 def test_mkd(self): 471 dir = self.client.mkd('/foo') 472 self.assertEqual(dir, '/foo') 473 474 def test_rmd(self): 475 self.client.rmd('foo') 476 477 def test_pwd(self): 478 dir = self.client.pwd() 479 self.assertEqual(dir, 'pwd ok') 480 481 def test_quit(self): 482 self.assertEqual(self.client.quit(), '221 quit ok') 483 # Ensure the connection gets closed; sock attribute should be None 484 self.assertEqual(self.client.sock, None) 485 486 def test_retrbinary(self): 487 received = [] 488 self.client.retrbinary('retr', received.append) 489 self.assertEqual(''.join(received), RETR_DATA) 490 491 def test_retrbinary_rest(self): 492 for rest in (0, 10, 20): 493 received = [] 494 self.client.retrbinary('retr', received.append, rest=rest) 495 self.assertEqual(''.join(received), RETR_DATA[rest:], 496 msg='rest test case %d %d %d' % (rest, 497 len(''.join(received)), 498 len(RETR_DATA[rest:]))) 499 500 def test_retrlines(self): 501 received = [] 502 self.client.retrlines('retr', received.append) 503 self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', '')) 504 505 def test_storbinary(self): 506 f = StringIO.StringIO(RETR_DATA) 507 self.client.storbinary('stor', f) 508 self.assertEqual(self.server.handler.last_received_data, RETR_DATA) 509 # test new callback arg 510 flag = [] 511 f.seek(0) 512 self.client.storbinary('stor', f, callback=lambda x: flag.append(None)) 513 self.assertTrue(flag) 514 515 def test_storbinary_rest(self): 516 f = StringIO.StringIO(RETR_DATA) 517 for r in (30, '30'): 518 f.seek(0) 519 self.client.storbinary('stor', f, rest=r) 520 self.assertEqual(self.server.handler.rest, str(r)) 521 522 def test_storlines(self): 523 f = StringIO.StringIO(RETR_DATA.replace('\r\n', '\n')) 524 self.client.storlines('stor', f) 525 self.assertEqual(self.server.handler.last_received_data, RETR_DATA) 526 # test new callback arg 527 flag = [] 528 f.seek(0) 529 self.client.storlines('stor foo', f, callback=lambda x: flag.append(None)) 530 self.assertTrue(flag) 531 532 def test_nlst(self): 533 self.client.nlst() 534 self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1]) 535 536 def test_dir(self): 537 l = [] 538 self.client.dir(lambda x: l.append(x)) 539 self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', '')) 540 541 def test_makeport(self): 542 self.client.makeport() 543 # IPv4 is in use, just make sure send_eprt has not been used 544 self.assertEqual(self.server.handler.last_received_cmd, 'port') 545 546 def test_makepasv(self): 547 host, port = self.client.makepasv() 548 conn = socket.create_connection((host, port), 10) 549 conn.close() 550 # IPv4 is in use, just make sure send_epsv has not been used 551 self.assertEqual(self.server.handler.last_received_cmd, 'pasv') 552 553 554 class TestIPv6Environment(TestCase): 555 556 def setUp(self): 557 self.server = DummyFTPServer((HOST, 0), af=socket.AF_INET6) 558 self.server.start() 559 self.client = ftplib.FTP() 560 self.client.connect(self.server.host, self.server.port) 561 562 def tearDown(self): 563 self.client.close() 564 self.server.stop() 565 566 def test_af(self): 567 self.assertEqual(self.client.af, socket.AF_INET6) 568 569 def test_makeport(self): 570 self.client.makeport() 571 self.assertEqual(self.server.handler.last_received_cmd, 'eprt') 572 573 def test_makepasv(self): 574 host, port = self.client.makepasv() 575 conn = socket.create_connection((host, port), 10) 576 conn.close() 577 self.assertEqual(self.server.handler.last_received_cmd, 'epsv') 578 579 def test_transfer(self): 580 def retr(): 581 received = [] 582 self.client.retrbinary('retr', received.append) 583 self.assertEqual(''.join(received), RETR_DATA) 584 self.client.set_pasv(True) 585 retr() 586 self.client.set_pasv(False) 587 retr() 588 589 590 class TestTLS_FTPClassMixin(TestFTPClass): 591 """Repeat TestFTPClass tests starting the TLS layer for both control 592 and data connections first. 593 """ 594 595 def setUp(self): 596 self.server = DummyTLS_FTPServer((HOST, 0)) 597 self.server.start() 598 self.client = ftplib.FTP_TLS(timeout=10) 599 self.client.connect(self.server.host, self.server.port) 600 # enable TLS 601 self.client.auth() 602 self.client.prot_p() 603 604 605 class TestTLS_FTPClass(TestCase): 606 """Specific TLS_FTP class tests.""" 607 608 def setUp(self): 609 self.server = DummyTLS_FTPServer((HOST, 0)) 610 self.server.start() 611 self.client = ftplib.FTP_TLS(timeout=10) 612 self.client.connect(self.server.host, self.server.port) 613 614 def tearDown(self): 615 self.client.close() 616 self.server.stop() 617 618 def test_control_connection(self): 619 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 620 self.client.auth() 621 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 622 623 def test_data_connection(self): 624 # clear text 625 sock = self.client.transfercmd('list') 626 self.assertNotIsInstance(sock, ssl.SSLSocket) 627 sock.close() 628 self.assertEqual(self.client.voidresp(), "226 transfer complete") 629 630 # secured, after PROT P 631 self.client.prot_p() 632 sock = self.client.transfercmd('list') 633 self.assertIsInstance(sock, ssl.SSLSocket) 634 sock.close() 635 self.assertEqual(self.client.voidresp(), "226 transfer complete") 636 637 # PROT C is issued, the connection must be in cleartext again 638 self.client.prot_c() 639 sock = self.client.transfercmd('list') 640 self.assertNotIsInstance(sock, ssl.SSLSocket) 641 sock.close() 642 self.assertEqual(self.client.voidresp(), "226 transfer complete") 643 644 def test_login(self): 645 # login() is supposed to implicitly secure the control connection 646 self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) 647 self.client.login() 648 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 649 # make sure that AUTH TLS doesn't get issued again 650 self.client.login() 651 652 def test_auth_issued_twice(self): 653 self.client.auth() 654 self.assertRaises(ValueError, self.client.auth) 655 656 def test_auth_ssl(self): 657 try: 658 self.client.ssl_version = ssl.PROTOCOL_SSLv3 659 self.client.auth() 660 self.assertRaises(ValueError, self.client.auth) 661 finally: 662 self.client.ssl_version = ssl.PROTOCOL_TLSv1 663 664 665 class TestTimeouts(TestCase): 666 667 def setUp(self): 668 self.evt = threading.Event() 669 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 670 self.sock.settimeout(10) 671 self.port = test_support.bind_port(self.sock) 672 threading.Thread(target=self.server, args=(self.evt,self.sock)).start() 673 # Wait for the server to be ready. 674 self.evt.wait() 675 self.evt.clear() 676 ftplib.FTP.port = self.port 677 678 def tearDown(self): 679 self.evt.wait() 680 681 def server(self, evt, serv): 682 # This method sets the evt 3 times: 683 # 1) when the connection is ready to be accepted. 684 # 2) when it is safe for the caller to close the connection 685 # 3) when we have closed the socket 686 serv.listen(5) 687 # (1) Signal the caller that we are ready to accept the connection. 688 evt.set() 689 try: 690 conn, addr = serv.accept() 691 except socket.timeout: 692 pass 693 else: 694 conn.send("1 Hola mundo\n") 695 # (2) Signal the caller that it is safe to close the socket. 696 evt.set() 697 conn.close() 698 finally: 699 serv.close() 700 # (3) Signal the caller that we are done. 701 evt.set() 702 703 def testTimeoutDefault(self): 704 # default -- use global socket timeout 705 self.assertTrue(socket.getdefaulttimeout() is None) 706 socket.setdefaulttimeout(30) 707 try: 708 ftp = ftplib.FTP("localhost") 709 finally: 710 socket.setdefaulttimeout(None) 711 self.assertEqual(ftp.sock.gettimeout(), 30) 712 self.evt.wait() 713 ftp.close() 714 715 def testTimeoutNone(self): 716 # no timeout -- do not use global socket timeout 717 self.assertTrue(socket.getdefaulttimeout() is None) 718 socket.setdefaulttimeout(30) 719 try: 720 ftp = ftplib.FTP("localhost", timeout=None) 721 finally: 722 socket.setdefaulttimeout(None) 723 self.assertTrue(ftp.sock.gettimeout() is None) 724 self.evt.wait() 725 ftp.close() 726 727 def testTimeoutValue(self): 728 # a value 729 ftp = ftplib.FTP(HOST, timeout=30) 730 self.assertEqual(ftp.sock.gettimeout(), 30) 731 self.evt.wait() 732 ftp.close() 733 734 def testTimeoutConnect(self): 735 ftp = ftplib.FTP() 736 ftp.connect(HOST, timeout=30) 737 self.assertEqual(ftp.sock.gettimeout(), 30) 738 self.evt.wait() 739 ftp.close() 740 741 def testTimeoutDifferentOrder(self): 742 ftp = ftplib.FTP(timeout=30) 743 ftp.connect(HOST) 744 self.assertEqual(ftp.sock.gettimeout(), 30) 745 self.evt.wait() 746 ftp.close() 747 748 def testTimeoutDirectAccess(self): 749 ftp = ftplib.FTP() 750 ftp.timeout = 30 751 ftp.connect(HOST) 752 self.assertEqual(ftp.sock.gettimeout(), 30) 753 self.evt.wait() 754 ftp.close() 755 756 757 def test_main(): 758 tests = [TestFTPClass, TestTimeouts] 759 if socket.has_ipv6: 760 try: 761 DummyFTPServer((HOST, 0), af=socket.AF_INET6) 762 except socket.error: 763 pass 764 else: 765 tests.append(TestIPv6Environment) 766 767 if ssl is not None: 768 tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass]) 769 770 thread_info = test_support.threading_setup() 771 try: 772 test_support.run_unittest(*tests) 773 finally: 774 test_support.threading_cleanup(*thread_info) 775 776 777 if __name__ == '__main__': 778 test_main() 779