Home | History | Annotate | Download | only in test
      1 """Test script for ftplib module."""
      2 
      3 # Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
      4 # environment
      5 
      6 import ftplib
      7 import asyncore
      8 import asynchat
      9 import socket
     10 import io
     11 import errno
     12 import os
     13 import threading
     14 import time
     15 try:
     16     import ssl
     17 except ImportError:
     18     ssl = None
     19 
     20 from unittest import TestCase, skipUnless
     21 from test import support
     22 from test.support import HOST, HOSTv6
     23 
     24 TIMEOUT = 3
     25 # the dummy data returned by server over the data channel when
     26 # RETR, LIST, NLST, MLSD commands are issued
     27 RETR_DATA = 'abcde12345\r\n' * 1000
     28 LIST_DATA = 'foo\r\nbar\r\n'
     29 NLST_DATA = 'foo\r\nbar\r\n'
     30 MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
     31              "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
     32              "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
     33              "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
     34              "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
     35              "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
     36              "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
     37              "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
     38              "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
     39              "type=file;perm=r;unique==keVO1+IH4;  leading space\r\n"
     40              "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
     41              "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
     42              "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
     43              "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
     44              "type=file;perm=r;unique==keVO1+1G4; file4\r\n")
     45 
     46 
     47 class DummyDTPHandler(asynchat.async_chat):
     48     dtp_conn_closed = False
     49 
     50     def __init__(self, conn, baseclass):
     51         asynchat.async_chat.__init__(self, conn)
     52         self.baseclass = baseclass
     53         self.baseclass.last_received_data = ''
     54 
     55     def handle_read(self):
     56         self.baseclass.last_received_data += self.recv(1024).decode('ascii')
     57 
     58     def handle_close(self):
     59         # XXX: this method can be called many times in a row for a single
     60         # connection, including in clear-text (non-TLS) mode.
     61         # (behaviour witnessed with test_data_connection)
     62         if not self.dtp_conn_closed:
     63             self.baseclass.push('226 transfer complete')
     64             self.close()
     65             self.dtp_conn_closed = True
     66 
     67     def push(self, what):
     68         if self.baseclass.next_data is not None:
     69             what = self.baseclass.next_data
     70             self.baseclass.next_data = None
     71         if not what:
     72             return self.close_when_done()
     73         super(DummyDTPHandler, self).push(what.encode('ascii'))
     74 
     75     def handle_error(self):
     76         raise Exception
     77 
     78 
     79 class DummyFTPHandler(asynchat.async_chat):
     80 
     81     dtp_handler = DummyDTPHandler
     82 
     83     def __init__(self, conn):
     84         asynchat.async_chat.__init__(self, conn)
     85         # tells the socket to handle urgent data inline (ABOR command)
     86         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
     87         self.set_terminator(b"\r\n")
     88         self.in_buffer = []
     89         self.dtp = None
     90         self.last_received_cmd = None
     91         self.last_received_data = ''
     92         self.next_response = ''
     93         self.next_data = None
     94         self.rest = None
     95         self.next_retr_data = RETR_DATA
     96         self.push('220 welcome')
     97 
     98     def collect_incoming_data(self, data):
     99         self.in_buffer.append(data)
    100 
    101     def found_terminator(self):
    102         line = b''.join(self.in_buffer).decode('ascii')
    103         self.in_buffer = []
    104         if self.next_response:
    105             self.push(self.next_response)
    106             self.next_response = ''
    107         cmd = line.split(' ')[0].lower()
    108         self.last_received_cmd = cmd
    109         space = line.find(' ')
    110         if space != -1:
    111             arg = line[space + 1:]
    112         else:
    113             arg = ""
    114         if hasattr(self, 'cmd_' + cmd):
    115             method = getattr(self, 'cmd_' + cmd)
    116             method(arg)
    117         else:
    118             self.push('550 command "%s" not understood.' %cmd)
    119 
    120     def handle_error(self):
    121         raise Exception
    122 
    123     def push(self, data):
    124         asynchat.async_chat.push(self, data.encode('ascii') + b'\r\n')
    125 
    126     def cmd_port(self, arg):
    127         addr = list(map(int, arg.split(',')))
    128         ip = '%d.%d.%d.%d' %tuple(addr[:4])
    129         port = (addr[4] * 256) + addr[5]
    130         s = socket.create_connection((ip, port), timeout=TIMEOUT)
    131         self.dtp = self.dtp_handler(s, baseclass=self)
    132         self.push('200 active data connection established')
    133 
    134     def cmd_pasv(self, arg):
    135         with socket.socket() as sock:
    136             sock.bind((self.socket.getsockname()[0], 0))
    137             sock.listen()
    138             sock.settimeout(TIMEOUT)
    139             ip, port = sock.getsockname()[:2]
    140             ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
    141             self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
    142             conn, addr = sock.accept()
    143             self.dtp = self.dtp_handler(conn, baseclass=self)
    144 
    145     def cmd_eprt(self, arg):
    146         af, ip, port = arg.split(arg[0])[1:-1]
    147         port = int(port)
    148         s = socket.create_connection((ip, port), timeout=TIMEOUT)
    149         self.dtp = self.dtp_handler(s, baseclass=self)
    150         self.push('200 active data connection established')
    151 
    152     def cmd_epsv(self, arg):
    153         with socket.socket(socket.AF_INET6) as sock:
    154             sock.bind((self.socket.getsockname()[0], 0))
    155             sock.listen()
    156             sock.settimeout(TIMEOUT)
    157             port = sock.getsockname()[1]
    158             self.push('229 entering extended passive mode (|||%d|)' %port)
    159             conn, addr = sock.accept()
    160             self.dtp = self.dtp_handler(conn, baseclass=self)
    161 
    162     def cmd_echo(self, arg):
    163         # sends back the received string (used by the test suite)
    164         self.push(arg)
    165 
    166     def cmd_noop(self, arg):
    167         self.push('200 noop ok')
    168 
    169     def cmd_user(self, arg):
    170         self.push('331 username ok')
    171 
    172     def cmd_pass(self, arg):
    173         self.push('230 password ok')
    174 
    175     def cmd_acct(self, arg):
    176         self.push('230 acct ok')
    177 
    178     def cmd_rnfr(self, arg):
    179         self.push('350 rnfr ok')
    180 
    181     def cmd_rnto(self, arg):
    182         self.push('250 rnto ok')
    183 
    184     def cmd_dele(self, arg):
    185         self.push('250 dele ok')
    186 
    187     def cmd_cwd(self, arg):
    188         self.push('250 cwd ok')
    189 
    190     def cmd_size(self, arg):
    191         self.push('250 1000')
    192 
    193     def cmd_mkd(self, arg):
    194         self.push('257 "%s"' %arg)
    195 
    196     def cmd_rmd(self, arg):
    197         self.push('250 rmd ok')
    198 
    199     def cmd_pwd(self, arg):
    200         self.push('257 "pwd ok"')
    201 
    202     def cmd_type(self, arg):
    203         self.push('200 type ok')
    204 
    205     def cmd_quit(self, arg):
    206         self.push('221 quit ok')
    207         self.close()
    208 
    209     def cmd_abor(self, arg):
    210         self.push('226 abor ok')
    211 
    212     def cmd_stor(self, arg):
    213         self.push('125 stor ok')
    214 
    215     def cmd_rest(self, arg):
    216         self.rest = arg
    217         self.push('350 rest ok')
    218 
    219     def cmd_retr(self, arg):
    220         self.push('125 retr ok')
    221         if self.rest is not None:
    222             offset = int(self.rest)
    223         else:
    224             offset = 0
    225         self.dtp.push(self.next_retr_data[offset:])
    226         self.dtp.close_when_done()
    227         self.rest = None
    228 
    229     def cmd_list(self, arg):
    230         self.push('125 list ok')
    231         self.dtp.push(LIST_DATA)
    232         self.dtp.close_when_done()
    233 
    234     def cmd_nlst(self, arg):
    235         self.push('125 nlst ok')
    236         self.dtp.push(NLST_DATA)
    237         self.dtp.close_when_done()
    238 
    239     def cmd_opts(self, arg):
    240         self.push('200 opts ok')
    241 
    242     def cmd_mlsd(self, arg):
    243         self.push('125 mlsd ok')
    244         self.dtp.push(MLSD_DATA)
    245         self.dtp.close_when_done()
    246 
    247     def cmd_setlongretr(self, arg):
    248         # For testing. Next RETR will return long line.
    249         self.next_retr_data = 'x' * int(arg)
    250         self.push('125 setlongretr ok')
    251 
    252 
    253 class DummyFTPServer(asyncore.dispatcher, threading.Thread):
    254 
    255     handler = DummyFTPHandler
    256 
    257     def __init__(self, address, af=socket.AF_INET):
    258         threading.Thread.__init__(self)
    259         asyncore.dispatcher.__init__(self)
    260         self.daemon = True
    261         self.create_socket(af, socket.SOCK_STREAM)
    262         self.bind(address)
    263         self.listen(5)
    264         self.active = False
    265         self.active_lock = threading.Lock()
    266         self.host, self.port = self.socket.getsockname()[:2]
    267         self.handler_instance = None
    268 
    269     def start(self):
    270         assert not self.active
    271         self.__flag = threading.Event()
    272         threading.Thread.start(self)
    273         self.__flag.wait()
    274 
    275     def run(self):
    276         self.active = True
    277         self.__flag.set()
    278         while self.active and asyncore.socket_map:
    279             self.active_lock.acquire()
    280             asyncore.loop(timeout=0.1, count=1)
    281             self.active_lock.release()
    282         asyncore.close_all(ignore_all=True)
    283 
    284     def stop(self):
    285         assert self.active
    286         self.active = False
    287         self.join()
    288 
    289     def handle_accepted(self, conn, addr):
    290         self.handler_instance = self.handler(conn)
    291 
    292     def handle_connect(self):
    293         self.close()
    294     handle_read = handle_connect
    295 
    296     def writable(self):
    297         return 0
    298 
    299     def handle_error(self):
    300         raise Exception
    301 
    302 
    303 if ssl is not None:
    304 
    305     CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
    306     CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
    307 
    308     class SSLConnection(asyncore.dispatcher):
    309         """An asyncore.dispatcher subclass supporting TLS/SSL."""
    310 
    311         _ssl_accepting = False
    312         _ssl_closing = False
    313 
    314         def secure_connection(self):
    315             context = ssl.SSLContext()
    316             context.load_cert_chain(CERTFILE)
    317             socket = context.wrap_socket(self.socket,
    318                                          suppress_ragged_eofs=False,
    319                                          server_side=True,
    320                                          do_handshake_on_connect=False)
    321             self.del_channel()
    322             self.set_socket(socket)
    323             self._ssl_accepting = True
    324 
    325         def _do_ssl_handshake(self):
    326             try:
    327                 self.socket.do_handshake()
    328             except ssl.SSLError as err:
    329                 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
    330                                    ssl.SSL_ERROR_WANT_WRITE):
    331                     return
    332                 elif err.args[0] == ssl.SSL_ERROR_EOF:
    333                     return self.handle_close()
    334                 # TODO: SSLError does not expose alert information
    335                 elif "SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1]:
    336                     return self.handle_close()
    337                 raise
    338             except OSError as err:
    339                 if err.args[0] == errno.ECONNABORTED:
    340                     return self.handle_close()
    341             else:
    342                 self._ssl_accepting = False
    343 
    344         def _do_ssl_shutdown(self):
    345             self._ssl_closing = True
    346             try:
    347                 self.socket = self.socket.unwrap()
    348             except ssl.SSLError as err:
    349                 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
    350                                    ssl.SSL_ERROR_WANT_WRITE):
    351                     return
    352             except OSError as err:
    353                 # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
    354                 # from OpenSSL's SSL_shutdown(), corresponding to a
    355                 # closed socket condition. See also:
    356                 # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
    357                 pass
    358             self._ssl_closing = False
    359             if getattr(self, '_ccc', False) is False:
    360                 super(SSLConnection, self).close()
    361             else:
    362                 pass
    363 
    364         def handle_read_event(self):
    365             if self._ssl_accepting:
    366                 self._do_ssl_handshake()
    367             elif self._ssl_closing:
    368                 self._do_ssl_shutdown()
    369             else:
    370                 super(SSLConnection, self).handle_read_event()
    371 
    372         def handle_write_event(self):
    373             if self._ssl_accepting:
    374                 self._do_ssl_handshake()
    375             elif self._ssl_closing:
    376                 self._do_ssl_shutdown()
    377             else:
    378                 super(SSLConnection, self).handle_write_event()
    379 
    380         def send(self, data):
    381             try:
    382                 return super(SSLConnection, self).send(data)
    383             except ssl.SSLError as err:
    384                 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
    385                                    ssl.SSL_ERROR_WANT_READ,
    386                                    ssl.SSL_ERROR_WANT_WRITE):
    387                     return 0
    388                 raise
    389 
    390         def recv(self, buffer_size):
    391             try:
    392                 return super(SSLConnection, self).recv(buffer_size)
    393             except ssl.SSLError as err:
    394                 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
    395                                    ssl.SSL_ERROR_WANT_WRITE):
    396                     return b''
    397                 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
    398                     self.handle_close()
    399                     return b''
    400                 raise
    401 
    402         def handle_error(self):
    403             raise Exception
    404 
    405         def close(self):
    406             if (isinstance(self.socket, ssl.SSLSocket) and
    407                     self.socket._sslobj is not None):
    408                 self._do_ssl_shutdown()
    409             else:
    410                 super(SSLConnection, self).close()
    411 
    412 
    413     class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
    414         """A DummyDTPHandler subclass supporting TLS/SSL."""
    415 
    416         def __init__(self, conn, baseclass):
    417             DummyDTPHandler.__init__(self, conn, baseclass)
    418             if self.baseclass.secure_data_channel:
    419                 self.secure_connection()
    420 
    421 
    422     class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
    423         """A DummyFTPHandler subclass supporting TLS/SSL."""
    424 
    425         dtp_handler = DummyTLS_DTPHandler
    426 
    427         def __init__(self, conn):
    428             DummyFTPHandler.__init__(self, conn)
    429             self.secure_data_channel = False
    430             self._ccc = False
    431 
    432         def cmd_auth(self, line):
    433             """Set up secure control channel."""
    434             self.push('234 AUTH TLS successful')
    435             self.secure_connection()
    436 
    437         def cmd_ccc(self, line):
    438             self.push('220 Reverting back to clear-text')
    439             self._ccc = True
    440             self._do_ssl_shutdown()
    441 
    442         def cmd_pbsz(self, line):
    443             """Negotiate size of buffer for secure data transfer.
    444             For TLS/SSL the only valid value for the parameter is '0'.
    445             Any other value is accepted but ignored.
    446             """
    447             self.push('200 PBSZ=0 successful.')
    448 
    449         def cmd_prot(self, line):
    450             """Setup un/secure data channel."""
    451             arg = line.upper()
    452             if arg == 'C':
    453                 self.push('200 Protection set to Clear')
    454                 self.secure_data_channel = False
    455             elif arg == 'P':
    456                 self.push('200 Protection set to Private')
    457                 self.secure_data_channel = True
    458             else:
    459                 self.push("502 Unrecognized PROT type (use C or P).")
    460 
    461 
    462     class DummyTLS_FTPServer(DummyFTPServer):
    463         handler = DummyTLS_FTPHandler
    464 
    465 
    466 class TestFTPClass(TestCase):
    467 
    468     def setUp(self):
    469         self.server = DummyFTPServer((HOST, 0))
    470         self.server.start()
    471         self.client = ftplib.FTP(timeout=TIMEOUT)
    472         self.client.connect(self.server.host, self.server.port)
    473 
    474     def tearDown(self):
    475         self.client.close()
    476         self.server.stop()
    477         # Explicitly clear the attribute to prevent dangling thread
    478         self.server = None
    479         asyncore.close_all(ignore_all=True)
    480 
    481     def check_data(self, received, expected):
    482         self.assertEqual(len(received), len(expected))
    483         self.assertEqual(received, expected)
    484 
    485     def test_getwelcome(self):
    486         self.assertEqual(self.client.getwelcome(), '220 welcome')
    487 
    488     def test_sanitize(self):
    489         self.assertEqual(self.client.sanitize('foo'), repr('foo'))
    490         self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
    491         self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
    492 
    493     def test_exceptions(self):
    494         self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r\n0')
    495         self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\n0')
    496         self.assertRaises(ValueError, self.client.sendcmd, 'echo 40\r0')
    497         self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
    498         self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
    499         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
    500         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
    501         self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
    502 
    503     def test_all_errors(self):
    504         exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
    505                       ftplib.error_proto, ftplib.Error, OSError,
    506                       EOFError)
    507         for x in exceptions:
    508             try:
    509                 raise x('exception not included in all_errors set')
    510             except ftplib.all_errors:
    511                 pass
    512 
    513     def test_set_pasv(self):
    514         # passive mode is supposed to be enabled by default
    515         self.assertTrue(self.client.passiveserver)
    516         self.client.set_pasv(True)
    517         self.assertTrue(self.client.passiveserver)
    518         self.client.set_pasv(False)
    519         self.assertFalse(self.client.passiveserver)
    520 
    521     def test_voidcmd(self):
    522         self.client.voidcmd('echo 200')
    523         self.client.voidcmd('echo 299')
    524         self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
    525         self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
    526 
    527     def test_login(self):
    528         self.client.login()
    529 
    530     def test_acct(self):
    531         self.client.acct('passwd')
    532 
    533     def test_rename(self):
    534         self.client.rename('a', 'b')
    535         self.server.handler_instance.next_response = '200'
    536         self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
    537 
    538     def test_delete(self):
    539         self.client.delete('foo')
    540         self.server.handler_instance.next_response = '199'
    541         self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
    542 
    543     def test_size(self):
    544         self.client.size('foo')
    545 
    546     def test_mkd(self):
    547         dir = self.client.mkd('/foo')
    548         self.assertEqual(dir, '/foo')
    549 
    550     def test_rmd(self):
    551         self.client.rmd('foo')
    552 
    553     def test_cwd(self):
    554         dir = self.client.cwd('/foo')
    555         self.assertEqual(dir, '250 cwd ok')
    556 
    557     def test_pwd(self):
    558         dir = self.client.pwd()
    559         self.assertEqual(dir, 'pwd ok')
    560 
    561     def test_quit(self):
    562         self.assertEqual(self.client.quit(), '221 quit ok')
    563         # Ensure the connection gets closed; sock attribute should be None
    564         self.assertEqual(self.client.sock, None)
    565 
    566     def test_abort(self):
    567         self.client.abort()
    568 
    569     def test_retrbinary(self):
    570         def callback(data):
    571             received.append(data.decode('ascii'))
    572         received = []
    573         self.client.retrbinary('retr', callback)
    574         self.check_data(''.join(received), RETR_DATA)
    575 
    576     def test_retrbinary_rest(self):
    577         def callback(data):
    578             received.append(data.decode('ascii'))
    579         for rest in (0, 10, 20):
    580             received = []
    581             self.client.retrbinary('retr', callback, rest=rest)
    582             self.check_data(''.join(received), RETR_DATA[rest:])
    583 
    584     def test_retrlines(self):
    585         received = []
    586         self.client.retrlines('retr', received.append)
    587         self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
    588 
    589     def test_storbinary(self):
    590         f = io.BytesIO(RETR_DATA.encode('ascii'))
    591         self.client.storbinary('stor', f)
    592         self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
    593         # test new callback arg
    594         flag = []
    595         f.seek(0)
    596         self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
    597         self.assertTrue(flag)
    598 
    599     def test_storbinary_rest(self):
    600         f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
    601         for r in (30, '30'):
    602             f.seek(0)
    603             self.client.storbinary('stor', f, rest=r)
    604             self.assertEqual(self.server.handler_instance.rest, str(r))
    605 
    606     def test_storlines(self):
    607         f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
    608         self.client.storlines('stor', f)
    609         self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
    610         # test new callback arg
    611         flag = []
    612         f.seek(0)
    613         self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
    614         self.assertTrue(flag)
    615 
    616         f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
    617         # storlines() expects a binary file, not a text file
    618         with support.check_warnings(('', BytesWarning), quiet=True):
    619             self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
    620 
    621     def test_nlst(self):
    622         self.client.nlst()
    623         self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
    624 
    625     def test_dir(self):
    626         l = []
    627         self.client.dir(lambda x: l.append(x))
    628         self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
    629 
    630     def test_mlsd(self):
    631         list(self.client.mlsd())
    632         list(self.client.mlsd(path='/'))
    633         list(self.client.mlsd(path='/', facts=['size', 'type']))
    634 
    635         ls = list(self.client.mlsd())
    636         for name, facts in ls:
    637             self.assertIsInstance(name, str)
    638             self.assertIsInstance(facts, dict)
    639             self.assertTrue(name)
    640             self.assertIn('type', facts)
    641             self.assertIn('perm', facts)
    642             self.assertIn('unique', facts)
    643 
    644         def set_data(data):
    645             self.server.handler_instance.next_data = data
    646 
    647         def test_entry(line, type=None, perm=None, unique=None, name=None):
    648             type = 'type' if type is None else type
    649             perm = 'perm' if perm is None else perm
    650             unique = 'unique' if unique is None else unique
    651             name = 'name' if name is None else name
    652             set_data(line)
    653             _name, facts = next(self.client.mlsd())
    654             self.assertEqual(_name, name)
    655             self.assertEqual(facts['type'], type)
    656             self.assertEqual(facts['perm'], perm)
    657             self.assertEqual(facts['unique'], unique)
    658 
    659         # plain
    660         test_entry('type=type;perm=perm;unique=unique; name\r\n')
    661         # "=" in fact value
    662         test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
    663         test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
    664         test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
    665         test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
    666         # spaces in name
    667         test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
    668         test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
    669         test_entry('type=type;perm=perm;unique=unique;  name\r\n', name=" name")
    670         test_entry('type=type;perm=perm;unique=unique; n am  e\r\n', name="n am  e")
    671         # ";" in name
    672         test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
    673         test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
    674         test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
    675         test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
    676         # case sensitiveness
    677         set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
    678         _name, facts = next(self.client.mlsd())
    679         for x in facts:
    680             self.assertTrue(x.islower())
    681         # no data (directory empty)
    682         set_data('')
    683         self.assertRaises(StopIteration, next, self.client.mlsd())
    684         set_data('')
    685         for x in self.client.mlsd():
    686             self.fail("unexpected data %s" % x)
    687 
    688     def test_makeport(self):
    689         with self.client.makeport():
    690             # IPv4 is in use, just make sure send_eprt has not been used
    691             self.assertEqual(self.server.handler_instance.last_received_cmd,
    692                                 'port')
    693 
    694     def test_makepasv(self):
    695         host, port = self.client.makepasv()
    696         conn = socket.create_connection((host, port), timeout=TIMEOUT)
    697         conn.close()
    698         # IPv4 is in use, just make sure send_epsv has not been used
    699         self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
    700 
    701     def test_with_statement(self):
    702         self.client.quit()
    703 
    704         def is_client_connected():
    705             if self.client.sock is None:
    706                 return False
    707             try:
    708                 self.client.sendcmd('noop')
    709             except (OSError, EOFError):
    710                 return False
    711             return True
    712 
    713         # base test
    714         with ftplib.FTP(timeout=TIMEOUT) as self.client:
    715             self.client.connect(self.server.host, self.server.port)
    716             self.client.sendcmd('noop')
    717             self.assertTrue(is_client_connected())
    718         self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
    719         self.assertFalse(is_client_connected())
    720 
    721         # QUIT sent inside the with block
    722         with ftplib.FTP(timeout=TIMEOUT) as self.client:
    723             self.client.connect(self.server.host, self.server.port)
    724             self.client.sendcmd('noop')
    725             self.client.quit()
    726         self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
    727         self.assertFalse(is_client_connected())
    728 
    729         # force a wrong response code to be sent on QUIT: error_perm
    730         # is expected and the connection is supposed to be closed
    731         try:
    732             with ftplib.FTP(timeout=TIMEOUT) as self.client:
    733                 self.client.connect(self.server.host, self.server.port)
    734                 self.client.sendcmd('noop')
    735                 self.server.handler_instance.next_response = '550 error on quit'
    736         except ftplib.error_perm as err:
    737             self.assertEqual(str(err), '550 error on quit')
    738         else:
    739             self.fail('Exception not raised')
    740         # needed to give the threaded server some time to set the attribute
    741         # which otherwise would still be == 'noop'
    742         time.sleep(0.1)
    743         self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
    744         self.assertFalse(is_client_connected())
    745 
    746     def test_source_address(self):
    747         self.client.quit()
    748         port = support.find_unused_port()
    749         try:
    750             self.client.connect(self.server.host, self.server.port,
    751                                 source_address=(HOST, port))
    752             self.assertEqual(self.client.sock.getsockname()[1], port)
    753             self.client.quit()
    754         except OSError as e:
    755             if e.errno == errno.EADDRINUSE:
    756                 self.skipTest("couldn't bind to port %d" % port)
    757             raise
    758 
    759     def test_source_address_passive_connection(self):
    760         port = support.find_unused_port()
    761         self.client.source_address = (HOST, port)
    762         try:
    763             with self.client.transfercmd('list') as sock:
    764                 self.assertEqual(sock.getsockname()[1], port)
    765         except OSError as e:
    766             if e.errno == errno.EADDRINUSE:
    767                 self.skipTest("couldn't bind to port %d" % port)
    768             raise
    769 
    770     def test_parse257(self):
    771         self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
    772         self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
    773         self.assertEqual(ftplib.parse257('257 ""'), '')
    774         self.assertEqual(ftplib.parse257('257 "" created'), '')
    775         self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
    776         # The 257 response is supposed to include the directory
    777         # name and in case it contains embedded double-quotes
    778         # they must be doubled (see RFC-959, chapter 7, appendix 2).
    779         self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
    780         self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
    781 
    782     def test_line_too_long(self):
    783         self.assertRaises(ftplib.Error, self.client.sendcmd,
    784                           'x' * self.client.maxline * 2)
    785 
    786     def test_retrlines_too_long(self):
    787         self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
    788         received = []
    789         self.assertRaises(ftplib.Error,
    790                           self.client.retrlines, 'retr', received.append)
    791 
    792     def test_storlines_too_long(self):
    793         f = io.BytesIO(b'x' * self.client.maxline * 2)
    794         self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
    795 
    796 
    797 @skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
    798 class TestIPv6Environment(TestCase):
    799 
    800     def setUp(self):
    801         self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
    802         self.server.start()
    803         self.client = ftplib.FTP(timeout=TIMEOUT)
    804         self.client.connect(self.server.host, self.server.port)
    805 
    806     def tearDown(self):
    807         self.client.close()
    808         self.server.stop()
    809         # Explicitly clear the attribute to prevent dangling thread
    810         self.server = None
    811         asyncore.close_all(ignore_all=True)
    812 
    813     def test_af(self):
    814         self.assertEqual(self.client.af, socket.AF_INET6)
    815 
    816     def test_makeport(self):
    817         with self.client.makeport():
    818             self.assertEqual(self.server.handler_instance.last_received_cmd,
    819                                 'eprt')
    820 
    821     def test_makepasv(self):
    822         host, port = self.client.makepasv()
    823         conn = socket.create_connection((host, port), timeout=TIMEOUT)
    824         conn.close()
    825         self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
    826 
    827     def test_transfer(self):
    828         def retr():
    829             def callback(data):
    830                 received.append(data.decode('ascii'))
    831             received = []
    832             self.client.retrbinary('retr', callback)
    833             self.assertEqual(len(''.join(received)), len(RETR_DATA))
    834             self.assertEqual(''.join(received), RETR_DATA)
    835         self.client.set_pasv(True)
    836         retr()
    837         self.client.set_pasv(False)
    838         retr()
    839 
    840 
    841 @skipUnless(ssl, "SSL not available")
    842 class TestTLS_FTPClassMixin(TestFTPClass):
    843     """Repeat TestFTPClass tests starting the TLS layer for both control
    844     and data connections first.
    845     """
    846 
    847     def setUp(self):
    848         self.server = DummyTLS_FTPServer((HOST, 0))
    849         self.server.start()
    850         self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
    851         self.client.connect(self.server.host, self.server.port)
    852         # enable TLS
    853         self.client.auth()
    854         self.client.prot_p()
    855 
    856 
    857 @skipUnless(ssl, "SSL not available")
    858 class TestTLS_FTPClass(TestCase):
    859     """Specific TLS_FTP class tests."""
    860 
    861     def setUp(self):
    862         self.server = DummyTLS_FTPServer((HOST, 0))
    863         self.server.start()
    864         self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
    865         self.client.connect(self.server.host, self.server.port)
    866 
    867     def tearDown(self):
    868         self.client.close()
    869         self.server.stop()
    870         # Explicitly clear the attribute to prevent dangling thread
    871         self.server = None
    872         asyncore.close_all(ignore_all=True)
    873 
    874     def test_control_connection(self):
    875         self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
    876         self.client.auth()
    877         self.assertIsInstance(self.client.sock, ssl.SSLSocket)
    878 
    879     def test_data_connection(self):
    880         # clear text
    881         with self.client.transfercmd('list') as sock:
    882             self.assertNotIsInstance(sock, ssl.SSLSocket)
    883             self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
    884         self.assertEqual(self.client.voidresp(), "226 transfer complete")
    885 
    886         # secured, after PROT P
    887         self.client.prot_p()
    888         with self.client.transfercmd('list') as sock:
    889             self.assertIsInstance(sock, ssl.SSLSocket)
    890             # consume from SSL socket to finalize handshake and avoid
    891             # "SSLError [SSL] shutdown while in init"
    892             self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
    893         self.assertEqual(self.client.voidresp(), "226 transfer complete")
    894 
    895         # PROT C is issued, the connection must be in cleartext again
    896         self.client.prot_c()
    897         with self.client.transfercmd('list') as sock:
    898             self.assertNotIsInstance(sock, ssl.SSLSocket)
    899             self.assertEqual(sock.recv(1024), LIST_DATA.encode('ascii'))
    900         self.assertEqual(self.client.voidresp(), "226 transfer complete")
    901 
    902     def test_login(self):
    903         # login() is supposed to implicitly secure the control connection
    904         self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
    905         self.client.login()
    906         self.assertIsInstance(self.client.sock, ssl.SSLSocket)
    907         # make sure that AUTH TLS doesn't get issued again
    908         self.client.login()
    909 
    910     def test_auth_issued_twice(self):
    911         self.client.auth()
    912         self.assertRaises(ValueError, self.client.auth)
    913 
    914     def test_context(self):
    915         self.client.quit()
    916         ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    917         ctx.check_hostname = False
    918         ctx.verify_mode = ssl.CERT_NONE
    919         self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
    920                           context=ctx)
    921         self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
    922                           context=ctx)
    923         self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
    924                           keyfile=CERTFILE, context=ctx)
    925 
    926         self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
    927         self.client.connect(self.server.host, self.server.port)
    928         self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
    929         self.client.auth()
    930         self.assertIs(self.client.sock.context, ctx)
    931         self.assertIsInstance(self.client.sock, ssl.SSLSocket)
    932 
    933         self.client.prot_p()
    934         with self.client.transfercmd('list') as sock:
    935             self.assertIs(sock.context, ctx)
    936             self.assertIsInstance(sock, ssl.SSLSocket)
    937 
    938     def test_ccc(self):
    939         self.assertRaises(ValueError, self.client.ccc)
    940         self.client.login(secure=True)
    941         self.assertIsInstance(self.client.sock, ssl.SSLSocket)
    942         self.client.ccc()
    943         self.assertRaises(ValueError, self.client.sock.unwrap)
    944 
    945     @skipUnless(False, "FIXME: bpo-32706")
    946     def test_check_hostname(self):
    947         self.client.quit()
    948         ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    949         self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
    950         self.assertEqual(ctx.check_hostname, True)
    951         ctx.load_verify_locations(CAFILE)
    952         self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
    953 
    954         # 127.0.0.1 doesn't match SAN
    955         self.client.connect(self.server.host, self.server.port)
    956         with self.assertRaises(ssl.CertificateError):
    957             self.client.auth()
    958         # exception quits connection
    959 
    960         self.client.connect(self.server.host, self.server.port)
    961         self.client.prot_p()
    962         with self.assertRaises(ssl.CertificateError):
    963             with self.client.transfercmd("list") as sock:
    964                 pass
    965         self.client.quit()
    966 
    967         self.client.connect("localhost", self.server.port)
    968         self.client.auth()
    969         self.client.quit()
    970 
    971         self.client.connect("localhost", self.server.port)
    972         self.client.prot_p()
    973         with self.client.transfercmd("list") as sock:
    974             pass
    975 
    976 
    977 class TestTimeouts(TestCase):
    978 
    979     def setUp(self):
    980         self.evt = threading.Event()
    981         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    982         self.sock.settimeout(20)
    983         self.port = support.bind_port(self.sock)
    984         self.server_thread = threading.Thread(target=self.server)
    985         self.server_thread.daemon = True
    986         self.server_thread.start()
    987         # Wait for the server to be ready.
    988         self.evt.wait()
    989         self.evt.clear()
    990         self.old_port = ftplib.FTP.port
    991         ftplib.FTP.port = self.port
    992 
    993     def tearDown(self):
    994         ftplib.FTP.port = self.old_port
    995         self.server_thread.join()
    996         # Explicitly clear the attribute to prevent dangling thread
    997         self.server_thread = None
    998 
    999     def server(self):
   1000         # This method sets the evt 3 times:
   1001         #  1) when the connection is ready to be accepted.
   1002         #  2) when it is safe for the caller to close the connection
   1003         #  3) when we have closed the socket
   1004         self.sock.listen()
   1005         # (1) Signal the caller that we are ready to accept the connection.
   1006         self.evt.set()
   1007         try:
   1008             conn, addr = self.sock.accept()
   1009         except socket.timeout:
   1010             pass
   1011         else:
   1012             conn.sendall(b"1 Hola mundo\n")
   1013             conn.shutdown(socket.SHUT_WR)
   1014             # (2) Signal the caller that it is safe to close the socket.
   1015             self.evt.set()
   1016             conn.close()
   1017         finally:
   1018             self.sock.close()
   1019 
   1020     def testTimeoutDefault(self):
   1021         # default -- use global socket timeout
   1022         self.assertIsNone(socket.getdefaulttimeout())
   1023         socket.setdefaulttimeout(30)
   1024         try:
   1025             ftp = ftplib.FTP(HOST)
   1026         finally:
   1027             socket.setdefaulttimeout(None)
   1028         self.assertEqual(ftp.sock.gettimeout(), 30)
   1029         self.evt.wait()
   1030         ftp.close()
   1031 
   1032     def testTimeoutNone(self):
   1033         # no timeout -- do not use global socket timeout
   1034         self.assertIsNone(socket.getdefaulttimeout())
   1035         socket.setdefaulttimeout(30)
   1036         try:
   1037             ftp = ftplib.FTP(HOST, timeout=None)
   1038         finally:
   1039             socket.setdefaulttimeout(None)
   1040         self.assertIsNone(ftp.sock.gettimeout())
   1041         self.evt.wait()
   1042         ftp.close()
   1043 
   1044     def testTimeoutValue(self):
   1045         # a value
   1046         ftp = ftplib.FTP(HOST, timeout=30)
   1047         self.assertEqual(ftp.sock.gettimeout(), 30)
   1048         self.evt.wait()
   1049         ftp.close()
   1050 
   1051     def testTimeoutConnect(self):
   1052         ftp = ftplib.FTP()
   1053         ftp.connect(HOST, timeout=30)
   1054         self.assertEqual(ftp.sock.gettimeout(), 30)
   1055         self.evt.wait()
   1056         ftp.close()
   1057 
   1058     def testTimeoutDifferentOrder(self):
   1059         ftp = ftplib.FTP(timeout=30)
   1060         ftp.connect(HOST)
   1061         self.assertEqual(ftp.sock.gettimeout(), 30)
   1062         self.evt.wait()
   1063         ftp.close()
   1064 
   1065     def testTimeoutDirectAccess(self):
   1066         ftp = ftplib.FTP()
   1067         ftp.timeout = 30
   1068         ftp.connect(HOST)
   1069         self.assertEqual(ftp.sock.gettimeout(), 30)
   1070         self.evt.wait()
   1071         ftp.close()
   1072 
   1073 
   1074 class MiscTestCase(TestCase):
   1075     def test__all__(self):
   1076         blacklist = {'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF',
   1077                      'Error', 'parse150', 'parse227', 'parse229', 'parse257',
   1078                      'print_line', 'ftpcp', 'test'}
   1079         support.check__all__(self, ftplib, blacklist=blacklist)
   1080 
   1081 
   1082 def test_main():
   1083     tests = [TestFTPClass, TestTimeouts,
   1084              TestIPv6Environment,
   1085              TestTLS_FTPClassMixin, TestTLS_FTPClass,
   1086              MiscTestCase]
   1087 
   1088     thread_info = support.threading_setup()
   1089     try:
   1090         support.run_unittest(*tests)
   1091     finally:
   1092         support.threading_cleanup(*thread_info)
   1093 
   1094 
   1095 if __name__ == '__main__':
   1096     test_main()
   1097