Home | History | Annotate | Download | only in test
      1 """Test script for ftplib module."""
      2 
      3 # Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
      4 # environment
      5 
      6 import ftplib
      7 import asyncore
      8 import asynchat
      9 import socket
     10 import io
     11 import errno
     12 import os
     13 import time
     14 try:
     15     import ssl
     16 except ImportError:
     17     ssl = None
     18 
     19 from unittest import TestCase, skipUnless
     20 from test import support
     21 from test.support import HOST, HOSTv6
     22 threading = support.import_module('threading')
     23 
     24 TIMEOUT = 3
     25 # the dummy data returned by server over the data channel when
     26 # RETR, LIST, NLST, MLSD commands are issued
     27 RETR_DATA = 'abcde12345\r\n' * 1000
     28 LIST_DATA = 'foo\r\nbar\r\n'
     29 NLST_DATA = 'foo\r\nbar\r\n'
     30 MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
     31              "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
     32              "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
     33              "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
     34              "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
     35              "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
     36              "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
     37              "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
     38              "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
     39              "type=file;perm=r;unique==keVO1+IH4;  leading space\r\n"
     40              "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
     41              "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
     42              "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
     43              "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
     44              "type=file;perm=r;unique==keVO1+1G4; file4\r\n")
     45 
     46 
     47 class DummyDTPHandler(asynchat.async_chat):
     48     dtp_conn_closed = False
     49 
     50     def __init__(self, conn, baseclass):
     51         asynchat.async_chat.__init__(self, conn)
     52         self.baseclass = baseclass
     53         self.baseclass.last_received_data = ''
     54 
     55     def handle_read(self):
     56         self.baseclass.last_received_data += self.recv(1024).decode('ascii')
     57 
     58     def handle_close(self):
     59         # XXX: this method can be called many times in a row for a single
     60         # connection, including in clear-text (non-TLS) mode.
     61         # (behaviour witnessed with test_data_connection)
     62         if not self.dtp_conn_closed:
     63             self.baseclass.push('226 transfer complete')
     64             self.close()
     65             self.dtp_conn_closed = True
     66 
     67     def push(self, what):
     68         if self.baseclass.next_data is not None:
     69             what = self.baseclass.next_data
     70             self.baseclass.next_data = None
     71         if not what:
     72             return self.close_when_done()
     73         super(DummyDTPHandler, self).push(what.encode('ascii'))
     74 
     75     def handle_error(self):
     76         raise Exception
     77 
     78 
     79 class DummyFTPHandler(asynchat.async_chat):
     80 
     81     dtp_handler = DummyDTPHandler
     82 
     83     def __init__(self, conn):
     84         asynchat.async_chat.__init__(self, conn)
     85         # tells the socket to handle urgent data inline (ABOR command)
     86         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
     87         self.set_terminator(b"\r\n")
     88         self.in_buffer = []
     89         self.dtp = None
     90         self.last_received_cmd = None
     91         self.last_received_data = ''
     92         self.next_response = ''
     93         self.next_data = None
     94         self.rest = None
     95         self.next_retr_data = RETR_DATA
     96         self.push('220 welcome')
     97 
     98     def collect_incoming_data(self, data):
     99         self.in_buffer.append(data)
    100 
    101     def found_terminator(self):
    102         line = b''.join(self.in_buffer).decode('ascii')
    103         self.in_buffer = []
    104         if self.next_response:
    105             self.push(self.next_response)
    106             self.next_response = ''
    107         cmd = line.split(' ')[0].lower()
    108         self.last_received_cmd = cmd
    109         space = line.find(' ')
    110         if space != -1:
    111             arg = line[space + 1:]
    112         else:
    113             arg = ""
    114         if hasattr(self, 'cmd_' + cmd):
    115             method = getattr(self, 'cmd_' + cmd)
    116             method(arg)
    117         else:
    118             self.push('550 command "%s" not understood.' %cmd)
    119 
    120     def handle_error(self):
    121         raise Exception
    122 
    123     def push(self, data):
    124         asynchat.async_chat.push(self, data.encode('ascii') + b'\r\n')
    125 
    126     def cmd_port(self, arg):
    127         addr = list(map(int, arg.split(',')))
    128         ip = '%d.%d.%d.%d' %tuple(addr[:4])
    129         port = (addr[4] * 256) + addr[5]
    130         s = socket.create_connection((ip, port), timeout=TIMEOUT)
    131         self.dtp = self.dtp_handler(s, baseclass=self)
    132         self.push('200 active data connection established')
    133 
    134     def cmd_pasv(self, arg):
    135         with socket.socket() as sock:
    136             sock.bind((self.socket.getsockname()[0], 0))
    137             sock.listen()
    138             sock.settimeout(TIMEOUT)
    139             ip, port = sock.getsockname()[:2]
    140             ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
    141             self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
    142             conn, addr = sock.accept()
    143             self.dtp = self.dtp_handler(conn, baseclass=self)
    144 
    145     def cmd_eprt(self, arg):
    146         af, ip, port = arg.split(arg[0])[1:-1]
    147         port = int(port)
    148         s = socket.create_connection((ip, port), timeout=TIMEOUT)
    149         self.dtp = self.dtp_handler(s, baseclass=self)
    150         self.push('200 active data connection established')
    151 
    152     def cmd_epsv(self, arg):
    153         with socket.socket(socket.AF_INET6) as sock:
    154             sock.bind((self.socket.getsockname()[0], 0))
    155             sock.listen()
    156             sock.settimeout(TIMEOUT)
    157             port = sock.getsockname()[1]
    158             self.push('229 entering extended passive mode (|||%d|)' %port)
    159             conn, addr = sock.accept()
    160             self.dtp = self.dtp_handler(conn, baseclass=self)
    161 
    162     def cmd_echo(self, arg):
    163         # sends back the received string (used by the test suite)
    164         self.push(arg)
    165 
    166     def cmd_noop(self, arg):
    167         self.push('200 noop ok')
    168 
    169     def cmd_user(self, arg):
    170         self.push('331 username ok')
    171 
    172     def cmd_pass(self, arg):
    173         self.push('230 password ok')
    174 
    175     def cmd_acct(self, arg):
    176         self.push('230 acct ok')
    177 
    178     def cmd_rnfr(self, arg):
    179         self.push('350 rnfr ok')
    180 
    181     def cmd_rnto(self, arg):
    182         self.push('250 rnto ok')
    183 
    184     def cmd_dele(self, arg):
    185         self.push('250 dele ok')
    186 
    187     def cmd_cwd(self, arg):
    188         self.push('250 cwd ok')
    189 
    190     def cmd_size(self, arg):
    191         self.push('250 1000')
    192 
    193     def cmd_mkd(self, arg):
    194         self.push('257 "%s"' %arg)
    195 
    196     def cmd_rmd(self, arg):
    197         self.push('250 rmd ok')
    198 
    199     def cmd_pwd(self, arg):
    200         self.push('257 "pwd ok"')
    201 
    202     def cmd_type(self, arg):
    203         self.push('200 type ok')
    204 
    205     def cmd_quit(self, arg):
    206         self.push('221 quit ok')
    207         self.close()
    208 
    209     def cmd_abor(self, arg):
    210         self.push('226 abor ok')
    211 
    212     def cmd_stor(self, arg):
    213         self.push('125 stor ok')
    214 
    215     def cmd_rest(self, arg):
    216         self.rest = arg
    217         self.push('350 rest ok')
    218 
    219     def cmd_retr(self, arg):
    220         self.push('125 retr ok')
    221         if self.rest is not None:
    222             offset = int(self.rest)
    223         else:
    224             offset = 0
    225         self.dtp.push(self.next_retr_data[offset:])
    226         self.dtp.close_when_done()
    227         self.rest = None
    228 
    229     def cmd_list(self, arg):
    230         self.push('125 list ok')
    231         self.dtp.push(LIST_DATA)
    232         self.dtp.close_when_done()
    233 
    234     def cmd_nlst(self, arg):
    235         self.push('125 nlst ok')
    236         self.dtp.push(NLST_DATA)
    237         self.dtp.close_when_done()
    238 
    239     def cmd_opts(self, arg):
    240         self.push('200 opts ok')
    241 
    242     def cmd_mlsd(self, arg):
    243         self.push('125 mlsd ok')
    244         self.dtp.push(MLSD_DATA)
    245         self.dtp.close_when_done()
    246 
    247     def cmd_setlongretr(self, arg):
    248         # For testing. Next RETR will return long line.
    249         self.next_retr_data = 'x' * int(arg)
    250         self.push('125 setlongretr ok')
    251 
    252 
    253 class DummyFTPServer(asyncore.dispatcher, threading.Thread):
    254 
    255     handler = DummyFTPHandler
    256 
    257     def __init__(self, address, af=socket.AF_INET):
    258         threading.Thread.__init__(self)
    259         asyncore.dispatcher.__init__(self)
    260         self.create_socket(af, socket.SOCK_STREAM)
    261         self.bind(address)
    262         self.listen(5)
    263         self.active = False
    264         self.active_lock = threading.Lock()
    265         self.host, self.port = self.socket.getsockname()[:2]
    266         self.handler_instance = None
    267 
    268     def start(self):
    269         assert not self.active
    270         self.__flag = threading.Event()
    271         threading.Thread.start(self)
    272         self.__flag.wait()
    273 
    274     def run(self):
    275         self.active = True
    276         self.__flag.set()
    277         while self.active and asyncore.socket_map:
    278             self.active_lock.acquire()
    279             asyncore.loop(timeout=0.1, count=1)
    280             self.active_lock.release()
    281         asyncore.close_all(ignore_all=True)
    282 
    283     def stop(self):
    284         assert self.active
    285         self.active = False
    286         self.join()
    287 
    288     def handle_accepted(self, conn, addr):
    289         self.handler_instance = self.handler(conn)
    290 
    291     def handle_connect(self):
    292         self.close()
    293     handle_read = handle_connect
    294 
    295     def writable(self):
    296         return 0
    297 
    298     def handle_error(self):
    299         raise Exception
    300 
    301 
    302 if ssl is not None:
    303 
    304     CERTFILE = os.path.join(os.path.dirname(__file__), "keycert3.pem")
    305     CAFILE = os.path.join(os.path.dirname(__file__), "pycacert.pem")
    306 
    307     class SSLConnection(asyncore.dispatcher):
    308         """An asyncore.dispatcher subclass supporting TLS/SSL."""
    309 
    310         _ssl_accepting = False
    311         _ssl_closing = False
    312 
    313         def secure_connection(self):
    314             context = ssl.SSLContext()
    315             context.load_cert_chain(CERTFILE)
    316             socket = context.wrap_socket(self.socket,
    317                                          suppress_ragged_eofs=False,
    318                                          server_side=True,
    319                                          do_handshake_on_connect=False)
    320             self.del_channel()
    321             self.set_socket(socket)
    322             self._ssl_accepting = True
    323 
    324         def _do_ssl_handshake(self):
    325             try:
    326                 self.socket.do_handshake()
    327             except ssl.SSLError as err:
    328                 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
    329                                    ssl.SSL_ERROR_WANT_WRITE):
    330                     return
    331                 elif err.args[0] == ssl.SSL_ERROR_EOF:
    332                     return self.handle_close()
    333                 raise
    334             except OSError as err:
    335                 if err.args[0] == errno.ECONNABORTED:
    336                     return self.handle_close()
    337             else:
    338                 self._ssl_accepting = False
    339 
    340         def _do_ssl_shutdown(self):
    341             self._ssl_closing = True
    342             try:
    343                 self.socket = self.socket.unwrap()
    344             except ssl.SSLError as err:
    345                 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
    346                                    ssl.SSL_ERROR_WANT_WRITE):
    347                     return
    348             except OSError as err:
    349                 # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
    350                 # from OpenSSL's SSL_shutdown(), corresponding to a
    351                 # closed socket condition. See also:
    352                 # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
    353                 pass
    354             self._ssl_closing = False
    355             if getattr(self, '_ccc', False) is False:
    356                 super(SSLConnection, self).close()
    357             else:
    358                 pass
    359 
    360         def handle_read_event(self):
    361             if self._ssl_accepting:
    362                 self._do_ssl_handshake()
    363             elif self._ssl_closing:
    364                 self._do_ssl_shutdown()
    365             else:
    366                 super(SSLConnection, self).handle_read_event()
    367 
    368         def handle_write_event(self):
    369             if self._ssl_accepting:
    370                 self._do_ssl_handshake()
    371             elif self._ssl_closing:
    372                 self._do_ssl_shutdown()
    373             else:
    374                 super(SSLConnection, self).handle_write_event()
    375 
    376         def send(self, data):
    377             try:
    378                 return super(SSLConnection, self).send(data)
    379             except ssl.SSLError as err:
    380                 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN,
    381                                    ssl.SSL_ERROR_WANT_READ,
    382                                    ssl.SSL_ERROR_WANT_WRITE):
    383                     return 0
    384                 raise
    385 
    386         def recv(self, buffer_size):
    387             try:
    388                 return super(SSLConnection, self).recv(buffer_size)
    389             except ssl.SSLError as err:
    390                 if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
    391                                    ssl.SSL_ERROR_WANT_WRITE):
    392                     return b''
    393                 if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN):
    394                     self.handle_close()
    395                     return b''
    396                 raise
    397 
    398         def handle_error(self):
    399             raise Exception
    400 
    401         def close(self):
    402             if (isinstance(self.socket, ssl.SSLSocket) and
    403                 self.socket._sslobj is not None):
    404                 self._do_ssl_shutdown()
    405             else:
    406                 super(SSLConnection, self).close()
    407 
    408 
    409     class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler):
    410         """A DummyDTPHandler subclass supporting TLS/SSL."""
    411 
    412         def __init__(self, conn, baseclass):
    413             DummyDTPHandler.__init__(self, conn, baseclass)
    414             if self.baseclass.secure_data_channel:
    415                 self.secure_connection()
    416 
    417 
    418     class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler):
    419         """A DummyFTPHandler subclass supporting TLS/SSL."""
    420 
    421         dtp_handler = DummyTLS_DTPHandler
    422 
    423         def __init__(self, conn):
    424             DummyFTPHandler.__init__(self, conn)
    425             self.secure_data_channel = False
    426             self._ccc = False
    427 
    428         def cmd_auth(self, line):
    429             """Set up secure control channel."""
    430             self.push('234 AUTH TLS successful')
    431             self.secure_connection()
    432 
    433         def cmd_ccc(self, line):
    434             self.push('220 Reverting back to clear-text')
    435             self._ccc = True
    436             self._do_ssl_shutdown()
    437 
    438         def cmd_pbsz(self, line):
    439             """Negotiate size of buffer for secure data transfer.
    440             For TLS/SSL the only valid value for the parameter is '0'.
    441             Any other value is accepted but ignored.
    442             """
    443             self.push('200 PBSZ=0 successful.')
    444 
    445         def cmd_prot(self, line):
    446             """Setup un/secure data channel."""
    447             arg = line.upper()
    448             if arg == 'C':
    449                 self.push('200 Protection set to Clear')
    450                 self.secure_data_channel = False
    451             elif arg == 'P':
    452                 self.push('200 Protection set to Private')
    453                 self.secure_data_channel = True
    454             else:
    455                 self.push("502 Unrecognized PROT type (use C or P).")
    456 
    457 
    458     class DummyTLS_FTPServer(DummyFTPServer):
    459         handler = DummyTLS_FTPHandler
    460 
    461 
    462 class TestFTPClass(TestCase):
    463 
    464     def setUp(self):
    465         self.server = DummyFTPServer((HOST, 0))
    466         self.server.start()
    467         self.client = ftplib.FTP(timeout=TIMEOUT)
    468         self.client.connect(self.server.host, self.server.port)
    469 
    470     def tearDown(self):
    471         self.client.close()
    472         self.server.stop()
    473 
    474     def check_data(self, received, expected):
    475         self.assertEqual(len(received), len(expected))
    476         self.assertEqual(received, expected)
    477 
    478     def test_getwelcome(self):
    479         self.assertEqual(self.client.getwelcome(), '220 welcome')
    480 
    481     def test_sanitize(self):
    482         self.assertEqual(self.client.sanitize('foo'), repr('foo'))
    483         self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
    484         self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
    485 
    486     def test_exceptions(self):
    487         self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
    488         self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
    489         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
    490         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
    491         self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
    492 
    493     def test_all_errors(self):
    494         exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
    495                       ftplib.error_proto, ftplib.Error, OSError, EOFError)
    496         for x in exceptions:
    497             try:
    498                 raise x('exception not included in all_errors set')
    499             except ftplib.all_errors:
    500                 pass
    501 
    502     def test_set_pasv(self):
    503         # passive mode is supposed to be enabled by default
    504         self.assertTrue(self.client.passiveserver)
    505         self.client.set_pasv(True)
    506         self.assertTrue(self.client.passiveserver)
    507         self.client.set_pasv(False)
    508         self.assertFalse(self.client.passiveserver)
    509 
    510     def test_voidcmd(self):
    511         self.client.voidcmd('echo 200')
    512         self.client.voidcmd('echo 299')
    513         self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
    514         self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
    515 
    516     def test_login(self):
    517         self.client.login()
    518 
    519     def test_acct(self):
    520         self.client.acct('passwd')
    521 
    522     def test_rename(self):
    523         self.client.rename('a', 'b')
    524         self.server.handler_instance.next_response = '200'
    525         self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
    526 
    527     def test_delete(self):
    528         self.client.delete('foo')
    529         self.server.handler_instance.next_response = '199'
    530         self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
    531 
    532     def test_size(self):
    533         self.client.size('foo')
    534 
    535     def test_mkd(self):
    536         dir = self.client.mkd('/foo')
    537         self.assertEqual(dir, '/foo')
    538 
    539     def test_rmd(self):
    540         self.client.rmd('foo')
    541 
    542     def test_cwd(self):
    543         dir = self.client.cwd('/foo')
    544         self.assertEqual(dir, '250 cwd ok')
    545 
    546     def test_pwd(self):
    547         dir = self.client.pwd()
    548         self.assertEqual(dir, 'pwd ok')
    549 
    550     def test_quit(self):
    551         self.assertEqual(self.client.quit(), '221 quit ok')
    552         # Ensure the connection gets closed; sock attribute should be None
    553         self.assertEqual(self.client.sock, None)
    554 
    555     def test_abort(self):
    556         self.client.abort()
    557 
    558     def test_retrbinary(self):
    559         def callback(data):
    560             received.append(data.decode('ascii'))
    561         received = []
    562         self.client.retrbinary('retr', callback)
    563         self.check_data(''.join(received), RETR_DATA)
    564 
    565     def test_retrbinary_rest(self):
    566         def callback(data):
    567             received.append(data.decode('ascii'))
    568         for rest in (0, 10, 20):
    569             received = []
    570             self.client.retrbinary('retr', callback, rest=rest)
    571             self.check_data(''.join(received), RETR_DATA[rest:])
    572 
    573     def test_retrlines(self):
    574         received = []
    575         self.client.retrlines('retr', received.append)
    576         self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
    577 
    578     def test_storbinary(self):
    579         f = io.BytesIO(RETR_DATA.encode('ascii'))
    580         self.client.storbinary('stor', f)
    581         self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
    582         # test new callback arg
    583         flag = []
    584         f.seek(0)
    585         self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
    586         self.assertTrue(flag)
    587 
    588     def test_storbinary_rest(self):
    589         f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
    590         for r in (30, '30'):
    591             f.seek(0)
    592             self.client.storbinary('stor', f, rest=r)
    593             self.assertEqual(self.server.handler_instance.rest, str(r))
    594 
    595     def test_storlines(self):
    596         f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
    597         self.client.storlines('stor', f)
    598         self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
    599         # test new callback arg
    600         flag = []
    601         f.seek(0)
    602         self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
    603         self.assertTrue(flag)
    604 
    605         f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
    606         # storlines() expects a binary file, not a text file
    607         with support.check_warnings(('', BytesWarning), quiet=True):
    608             self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
    609 
    610     def test_nlst(self):
    611         self.client.nlst()
    612         self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
    613 
    614     def test_dir(self):
    615         l = []
    616         self.client.dir(lambda x: l.append(x))
    617         self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
    618 
    619     def test_mlsd(self):
    620         list(self.client.mlsd())
    621         list(self.client.mlsd(path='/'))
    622         list(self.client.mlsd(path='/', facts=['size', 'type']))
    623 
    624         ls = list(self.client.mlsd())
    625         for name, facts in ls:
    626             self.assertIsInstance(name, str)
    627             self.assertIsInstance(facts, dict)
    628             self.assertTrue(name)
    629             self.assertIn('type', facts)
    630             self.assertIn('perm', facts)
    631             self.assertIn('unique', facts)
    632 
    633         def set_data(data):
    634             self.server.handler_instance.next_data = data
    635 
    636         def test_entry(line, type=None, perm=None, unique=None, name=None):
    637             type = 'type' if type is None else type
    638             perm = 'perm' if perm is None else perm
    639             unique = 'unique' if unique is None else unique
    640             name = 'name' if name is None else name
    641             set_data(line)
    642             _name, facts = next(self.client.mlsd())
    643             self.assertEqual(_name, name)
    644             self.assertEqual(facts['type'], type)
    645             self.assertEqual(facts['perm'], perm)
    646             self.assertEqual(facts['unique'], unique)
    647 
    648         # plain
    649         test_entry('type=type;perm=perm;unique=unique; name\r\n')
    650         # "=" in fact value
    651         test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
    652         test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
    653         test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
    654         test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
    655         # spaces in name
    656         test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
    657         test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
    658         test_entry('type=type;perm=perm;unique=unique;  name\r\n', name=" name")
    659         test_entry('type=type;perm=perm;unique=unique; n am  e\r\n', name="n am  e")
    660         # ";" in name
    661         test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
    662         test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
    663         test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
    664         test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
    665         # case sensitiveness
    666         set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
    667         _name, facts = next(self.client.mlsd())
    668         for x in facts:
    669             self.assertTrue(x.islower())
    670         # no data (directory empty)
    671         set_data('')
    672         self.assertRaises(StopIteration, next, self.client.mlsd())
    673         set_data('')
    674         for x in self.client.mlsd():
    675             self.fail("unexpected data %s" % x)
    676 
    677     def test_makeport(self):
    678         with self.client.makeport():
    679             # IPv4 is in use, just make sure send_eprt has not been used
    680             self.assertEqual(self.server.handler_instance.last_received_cmd,
    681                                 'port')
    682 
    683     def test_makepasv(self):
    684         host, port = self.client.makepasv()
    685         conn = socket.create_connection((host, port), timeout=TIMEOUT)
    686         conn.close()
    687         # IPv4 is in use, just make sure send_epsv has not been used
    688         self.assertEqual(self.server.handler_instance.last_received_cmd, 'pasv')
    689 
    690     def test_with_statement(self):
    691         self.client.quit()
    692 
    693         def is_client_connected():
    694             if self.client.sock is None:
    695                 return False
    696             try:
    697                 self.client.sendcmd('noop')
    698             except (OSError, EOFError):
    699                 return False
    700             return True
    701 
    702         # base test
    703         with ftplib.FTP(timeout=TIMEOUT) as self.client:
    704             self.client.connect(self.server.host, self.server.port)
    705             self.client.sendcmd('noop')
    706             self.assertTrue(is_client_connected())
    707         self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
    708         self.assertFalse(is_client_connected())
    709 
    710         # QUIT sent inside the with block
    711         with ftplib.FTP(timeout=TIMEOUT) as self.client:
    712             self.client.connect(self.server.host, self.server.port)
    713             self.client.sendcmd('noop')
    714             self.client.quit()
    715         self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
    716         self.assertFalse(is_client_connected())
    717 
    718         # force a wrong response code to be sent on QUIT: error_perm
    719         # is expected and the connection is supposed to be closed
    720         try:
    721             with ftplib.FTP(timeout=TIMEOUT) as self.client:
    722                 self.client.connect(self.server.host, self.server.port)
    723                 self.client.sendcmd('noop')
    724                 self.server.handler_instance.next_response = '550 error on quit'
    725         except ftplib.error_perm as err:
    726             self.assertEqual(str(err), '550 error on quit')
    727         else:
    728             self.fail('Exception not raised')
    729         # needed to give the threaded server some time to set the attribute
    730         # which otherwise would still be == 'noop'
    731         time.sleep(0.1)
    732         self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
    733         self.assertFalse(is_client_connected())
    734 
    735     def test_source_address(self):
    736         self.client.quit()
    737         port = support.find_unused_port()
    738         try:
    739             self.client.connect(self.server.host, self.server.port,
    740                                 source_address=(HOST, port))
    741             self.assertEqual(self.client.sock.getsockname()[1], port)
    742             self.client.quit()
    743         except OSError as e:
    744             if e.errno == errno.EADDRINUSE:
    745                 self.skipTest("couldn't bind to port %d" % port)
    746             raise
    747 
    748     def test_source_address_passive_connection(self):
    749         port = support.find_unused_port()
    750         self.client.source_address = (HOST, port)
    751         try:
    752             with self.client.transfercmd('list') as sock:
    753                 self.assertEqual(sock.getsockname()[1], port)
    754         except OSError as e:
    755             if e.errno == errno.EADDRINUSE:
    756                 self.skipTest("couldn't bind to port %d" % port)
    757             raise
    758 
    759     def test_parse257(self):
    760         self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
    761         self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
    762         self.assertEqual(ftplib.parse257('257 ""'), '')
    763         self.assertEqual(ftplib.parse257('257 "" created'), '')
    764         self.assertRaises(ftplib.error_reply, ftplib.parse257, '250 "/foo/bar"')
    765         # The 257 response is supposed to include the directory
    766         # name and in case it contains embedded double-quotes
    767         # they must be doubled (see RFC-959, chapter 7, appendix 2).
    768         self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
    769         self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
    770 
    771     def test_line_too_long(self):
    772         self.assertRaises(ftplib.Error, self.client.sendcmd,
    773                           'x' * self.client.maxline * 2)
    774 
    775     def test_retrlines_too_long(self):
    776         self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
    777         received = []
    778         self.assertRaises(ftplib.Error,
    779                           self.client.retrlines, 'retr', received.append)
    780 
    781     def test_storlines_too_long(self):
    782         f = io.BytesIO(b'x' * self.client.maxline * 2)
    783         self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
    784 
    785 
    786 @skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
    787 class TestIPv6Environment(TestCase):
    788 
    789     def setUp(self):
    790         self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
    791         self.server.start()
    792         self.client = ftplib.FTP(timeout=TIMEOUT)
    793         self.client.connect(self.server.host, self.server.port)
    794 
    795     def tearDown(self):
    796         self.client.close()
    797         self.server.stop()
    798 
    799     def test_af(self):
    800         self.assertEqual(self.client.af, socket.AF_INET6)
    801 
    802     def test_makeport(self):
    803         with self.client.makeport():
    804             self.assertEqual(self.server.handler_instance.last_received_cmd,
    805                                 'eprt')
    806 
    807     def test_makepasv(self):
    808         host, port = self.client.makepasv()
    809         conn = socket.create_connection((host, port), timeout=TIMEOUT)
    810         conn.close()
    811         self.assertEqual(self.server.handler_instance.last_received_cmd, 'epsv')
    812 
    813     def test_transfer(self):
    814         def retr():
    815             def callback(data):
    816                 received.append(data.decode('ascii'))
    817             received = []
    818             self.client.retrbinary('retr', callback)
    819             self.assertEqual(len(''.join(received)), len(RETR_DATA))
    820             self.assertEqual(''.join(received), RETR_DATA)
    821         self.client.set_pasv(True)
    822         retr()
    823         self.client.set_pasv(False)
    824         retr()
    825 
    826 
    827 @skipUnless(ssl, "SSL not available")
    828 class TestTLS_FTPClassMixin(TestFTPClass):
    829     """Repeat TestFTPClass tests starting the TLS layer for both control
    830     and data connections first.
    831     """
    832 
    833     def setUp(self):
    834         self.server = DummyTLS_FTPServer((HOST, 0))
    835         self.server.start()
    836         self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
    837         self.client.connect(self.server.host, self.server.port)
    838         # enable TLS
    839         self.client.auth()
    840         self.client.prot_p()
    841 
    842 
    843 @skipUnless(ssl, "SSL not available")
    844 class TestTLS_FTPClass(TestCase):
    845     """Specific TLS_FTP class tests."""
    846 
    847     def setUp(self):
    848         self.server = DummyTLS_FTPServer((HOST, 0))
    849         self.server.start()
    850         self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
    851         self.client.connect(self.server.host, self.server.port)
    852 
    853     def tearDown(self):
    854         self.client.close()
    855         self.server.stop()
    856 
    857     def test_control_connection(self):
    858         self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
    859         self.client.auth()
    860         self.assertIsInstance(self.client.sock, ssl.SSLSocket)
    861 
    862     def test_data_connection(self):
    863         # clear text
    864         with self.client.transfercmd('list') as sock:
    865             self.assertNotIsInstance(sock, ssl.SSLSocket)
    866         self.assertEqual(self.client.voidresp(), "226 transfer complete")
    867 
    868         # secured, after PROT P
    869         self.client.prot_p()
    870         with self.client.transfercmd('list') as sock:
    871             self.assertIsInstance(sock, ssl.SSLSocket)
    872         self.assertEqual(self.client.voidresp(), "226 transfer complete")
    873 
    874         # PROT C is issued, the connection must be in cleartext again
    875         self.client.prot_c()
    876         with self.client.transfercmd('list') as sock:
    877             self.assertNotIsInstance(sock, ssl.SSLSocket)
    878         self.assertEqual(self.client.voidresp(), "226 transfer complete")
    879 
    880     def test_login(self):
    881         # login() is supposed to implicitly secure the control connection
    882         self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
    883         self.client.login()
    884         self.assertIsInstance(self.client.sock, ssl.SSLSocket)
    885         # make sure that AUTH TLS doesn't get issued again
    886         self.client.login()
    887 
    888     def test_auth_issued_twice(self):
    889         self.client.auth()
    890         self.assertRaises(ValueError, self.client.auth)
    891 
    892     def test_auth_ssl(self):
    893         try:
    894             self.client.ssl_version = ssl.PROTOCOL_SSLv23
    895             self.client.auth()
    896             self.assertRaises(ValueError, self.client.auth)
    897         finally:
    898             self.client.ssl_version = ssl.PROTOCOL_TLSv1
    899 
    900     def test_context(self):
    901         self.client.quit()
    902         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    903         self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
    904                           context=ctx)
    905         self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
    906                           context=ctx)
    907         self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
    908                           keyfile=CERTFILE, context=ctx)
    909 
    910         self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
    911         self.client.connect(self.server.host, self.server.port)
    912         self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
    913         self.client.auth()
    914         self.assertIs(self.client.sock.context, ctx)
    915         self.assertIsInstance(self.client.sock, ssl.SSLSocket)
    916 
    917         self.client.prot_p()
    918         with self.client.transfercmd('list') as sock:
    919             self.assertIs(sock.context, ctx)
    920             self.assertIsInstance(sock, ssl.SSLSocket)
    921 
    922     def test_ccc(self):
    923         self.assertRaises(ValueError, self.client.ccc)
    924         self.client.login(secure=True)
    925         self.assertIsInstance(self.client.sock, ssl.SSLSocket)
    926         self.client.ccc()
    927         self.assertRaises(ValueError, self.client.sock.unwrap)
    928 
    929     def test_check_hostname(self):
    930         self.client.quit()
    931         ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    932         ctx.verify_mode = ssl.CERT_REQUIRED
    933         ctx.check_hostname = True
    934         ctx.load_verify_locations(CAFILE)
    935         self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
    936 
    937         # 127.0.0.1 doesn't match SAN
    938         self.client.connect(self.server.host, self.server.port)
    939         with self.assertRaises(ssl.CertificateError):
    940             self.client.auth()
    941         # exception quits connection
    942 
    943         self.client.connect(self.server.host, self.server.port)
    944         self.client.prot_p()
    945         with self.assertRaises(ssl.CertificateError):
    946             with self.client.transfercmd("list") as sock:
    947                 pass
    948         self.client.quit()
    949 
    950         self.client.connect("localhost", self.server.port)
    951         self.client.auth()
    952         self.client.quit()
    953 
    954         self.client.connect("localhost", self.server.port)
    955         self.client.prot_p()
    956         with self.client.transfercmd("list") as sock:
    957             pass
    958 
    959 
    960 class TestTimeouts(TestCase):
    961 
    962     def setUp(self):
    963         self.evt = threading.Event()
    964         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    965         self.sock.settimeout(20)
    966         self.port = support.bind_port(self.sock)
    967         self.server_thread = threading.Thread(target=self.server)
    968         self.server_thread.start()
    969         # Wait for the server to be ready.
    970         self.evt.wait()
    971         self.evt.clear()
    972         self.old_port = ftplib.FTP.port
    973         ftplib.FTP.port = self.port
    974 
    975     def tearDown(self):
    976         ftplib.FTP.port = self.old_port
    977         self.server_thread.join()
    978 
    979     def server(self):
    980         # This method sets the evt 3 times:
    981         #  1) when the connection is ready to be accepted.
    982         #  2) when it is safe for the caller to close the connection
    983         #  3) when we have closed the socket
    984         self.sock.listen()
    985         # (1) Signal the caller that we are ready to accept the connection.
    986         self.evt.set()
    987         try:
    988             conn, addr = self.sock.accept()
    989         except socket.timeout:
    990             pass
    991         else:
    992             conn.sendall(b"1 Hola mundo\n")
    993             conn.shutdown(socket.SHUT_WR)
    994             # (2) Signal the caller that it is safe to close the socket.
    995             self.evt.set()
    996             conn.close()
    997         finally:
    998             self.sock.close()
    999 
   1000     def testTimeoutDefault(self):
   1001         # default -- use global socket timeout
   1002         self.assertIsNone(socket.getdefaulttimeout())
   1003         socket.setdefaulttimeout(30)
   1004         try:
   1005             ftp = ftplib.FTP(HOST)
   1006         finally:
   1007             socket.setdefaulttimeout(None)
   1008         self.assertEqual(ftp.sock.gettimeout(), 30)
   1009         self.evt.wait()
   1010         ftp.close()
   1011 
   1012     def testTimeoutNone(self):
   1013         # no timeout -- do not use global socket timeout
   1014         self.assertIsNone(socket.getdefaulttimeout())
   1015         socket.setdefaulttimeout(30)
   1016         try:
   1017             ftp = ftplib.FTP(HOST, timeout=None)
   1018         finally:
   1019             socket.setdefaulttimeout(None)
   1020         self.assertIsNone(ftp.sock.gettimeout())
   1021         self.evt.wait()
   1022         ftp.close()
   1023 
   1024     def testTimeoutValue(self):
   1025         # a value
   1026         ftp = ftplib.FTP(HOST, timeout=30)
   1027         self.assertEqual(ftp.sock.gettimeout(), 30)
   1028         self.evt.wait()
   1029         ftp.close()
   1030 
   1031     def testTimeoutConnect(self):
   1032         ftp = ftplib.FTP()
   1033         ftp.connect(HOST, timeout=30)
   1034         self.assertEqual(ftp.sock.gettimeout(), 30)
   1035         self.evt.wait()
   1036         ftp.close()
   1037 
   1038     def testTimeoutDifferentOrder(self):
   1039         ftp = ftplib.FTP(timeout=30)
   1040         ftp.connect(HOST)
   1041         self.assertEqual(ftp.sock.gettimeout(), 30)
   1042         self.evt.wait()
   1043         ftp.close()
   1044 
   1045     def testTimeoutDirectAccess(self):
   1046         ftp = ftplib.FTP()
   1047         ftp.timeout = 30
   1048         ftp.connect(HOST)
   1049         self.assertEqual(ftp.sock.gettimeout(), 30)
   1050         self.evt.wait()
   1051         ftp.close()
   1052 
   1053 
   1054 class MiscTestCase(TestCase):
   1055     def test__all__(self):
   1056         blacklist = {'MSG_OOB', 'FTP_PORT', 'MAXLINE', 'CRLF', 'B_CRLF',
   1057                      'Error', 'parse150', 'parse227', 'parse229', 'parse257',
   1058                      'print_line', 'ftpcp', 'test'}
   1059         support.check__all__(self, ftplib, blacklist=blacklist)
   1060 
   1061 
   1062 def test_main():
   1063     tests = [TestFTPClass, TestTimeouts,
   1064              TestIPv6Environment,
   1065              TestTLS_FTPClassMixin, TestTLS_FTPClass,
   1066              MiscTestCase]
   1067 
   1068     thread_info = support.threading_setup()
   1069     try:
   1070         support.run_unittest(*tests)
   1071     finally:
   1072         support.threading_cleanup(*thread_info)
   1073 
   1074 
   1075 if __name__ == '__main__':
   1076     test_main()
   1077