Home | History | Annotate | Download | only in test
      1 # Adapted from test_file.py by Daniel Stutzbach
      2 
      3 from __future__ import unicode_literals
      4 
      5 import sys
      6 import os
      7 import errno
      8 import unittest
      9 from array import array
     10 from weakref import proxy
     11 from functools import wraps
     12 from UserList import UserList
     13 
     14 from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd
     15 from test.test_support import py3k_bytes as bytes, cpython_only
     16 from test.script_helper import run_python
     17 
     18 from _io import FileIO as _FileIO
     19 
     20 class AutoFileTests(unittest.TestCase):
     21     # file tests for which a test file is automatically set up
     22 
     23     def setUp(self):
     24         self.f = _FileIO(TESTFN, 'w')
     25 
     26     def tearDown(self):
     27         if self.f:
     28             self.f.close()
     29         os.remove(TESTFN)
     30 
     31     def testWeakRefs(self):
     32         # verify weak references
     33         p = proxy(self.f)
     34         p.write(bytes(range(10)))
     35         self.assertEqual(self.f.tell(), p.tell())
     36         self.f.close()
     37         self.f = None
     38         self.assertRaises(ReferenceError, getattr, p, 'tell')
     39 
     40     def testSeekTell(self):
     41         self.f.write(bytes(range(20)))
     42         self.assertEqual(self.f.tell(), 20)
     43         self.f.seek(0)
     44         self.assertEqual(self.f.tell(), 0)
     45         self.f.seek(10)
     46         self.assertEqual(self.f.tell(), 10)
     47         self.f.seek(5, 1)
     48         self.assertEqual(self.f.tell(), 15)
     49         self.f.seek(-5, 1)
     50         self.assertEqual(self.f.tell(), 10)
     51         self.f.seek(-5, 2)
     52         self.assertEqual(self.f.tell(), 15)
     53 
     54     def testAttributes(self):
     55         # verify expected attributes exist
     56         f = self.f
     57 
     58         self.assertEqual(f.mode, "wb")
     59         self.assertEqual(f.closed, False)
     60 
     61         # verify the attributes are readonly
     62         for attr in 'mode', 'closed':
     63             self.assertRaises((AttributeError, TypeError),
     64                               setattr, f, attr, 'oops')
     65 
     66     def testReadinto(self):
     67         # verify readinto
     68         self.f.write(b"\x01\x02")
     69         self.f.close()
     70         a = array(b'b', b'x'*10)
     71         self.f = _FileIO(TESTFN, 'r')
     72         n = self.f.readinto(a)
     73         self.assertEqual(array(b'b', [1, 2]), a[:n])
     74 
     75     def testWritelinesList(self):
     76         l = [b'123', b'456']
     77         self.f.writelines(l)
     78         self.f.close()
     79         self.f = _FileIO(TESTFN, 'rb')
     80         buf = self.f.read()
     81         self.assertEqual(buf, b'123456')
     82 
     83     def testWritelinesUserList(self):
     84         l = UserList([b'123', b'456'])
     85         self.f.writelines(l)
     86         self.f.close()
     87         self.f = _FileIO(TESTFN, 'rb')
     88         buf = self.f.read()
     89         self.assertEqual(buf, b'123456')
     90 
     91     def testWritelinesError(self):
     92         self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
     93         self.assertRaises(TypeError, self.f.writelines, None)
     94 
     95     def test_none_args(self):
     96         self.f.write(b"hi\nbye\nabc")
     97         self.f.close()
     98         self.f = _FileIO(TESTFN, 'r')
     99         self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
    100         self.f.seek(0)
    101         self.assertEqual(self.f.readline(None), b"hi\n")
    102         self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
    103 
    104     def testRepr(self):
    105         self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>"
    106                                        % (self.f.name, self.f.mode))
    107         del self.f.name
    108         self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>"
    109                                        % (self.f.fileno(), self.f.mode))
    110         self.f.close()
    111         self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
    112 
    113     def testErrors(self):
    114         f = self.f
    115         self.assertFalse(f.isatty())
    116         self.assertFalse(f.closed)
    117         #self.assertEqual(f.name, TESTFN)
    118         self.assertRaises(ValueError, f.read, 10) # Open for reading
    119         f.close()
    120         self.assertTrue(f.closed)
    121         f = _FileIO(TESTFN, 'r')
    122         self.assertRaises(TypeError, f.readinto, "")
    123         self.assertFalse(f.closed)
    124         f.close()
    125         self.assertTrue(f.closed)
    126 
    127     def testMethods(self):
    128         methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
    129                    'read', 'readall', 'readline', 'readlines',
    130                    'tell', 'truncate', 'flush']
    131         if sys.platform.startswith('atheos'):
    132             methods.remove('truncate')
    133 
    134         self.f.close()
    135         self.assertTrue(self.f.closed)
    136 
    137         for methodname in methods:
    138             method = getattr(self.f, methodname)
    139             # should raise on closed file
    140             self.assertRaises(ValueError, method)
    141 
    142         self.assertRaises(ValueError, self.f.readinto) # XXX should be TypeError?
    143         self.assertRaises(ValueError, self.f.readinto, bytearray(1))
    144         self.assertRaises(ValueError, self.f.seek)
    145         self.assertRaises(ValueError, self.f.seek, 0)
    146         self.assertRaises(ValueError, self.f.write)
    147         self.assertRaises(ValueError, self.f.write, b'')
    148         self.assertRaises(TypeError, self.f.writelines)
    149         self.assertRaises(ValueError, self.f.writelines, b'')
    150 
    151     def testOpendir(self):
    152         # Issue 3703: opening a directory should fill the errno
    153         # Windows always returns "[Errno 13]: Permission denied
    154         # Unix calls dircheck() and returns "[Errno 21]: Is a directory"
    155         try:
    156             _FileIO('.', 'r')
    157         except IOError as e:
    158             self.assertNotEqual(e.errno, 0)
    159             self.assertEqual(e.filename, ".")
    160         else:
    161             self.fail("Should have raised IOError")
    162 
    163     @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
    164     def testOpenDirFD(self):
    165         fd = os.open('.', os.O_RDONLY)
    166         with self.assertRaises(IOError) as cm:
    167             _FileIO(fd, 'r')
    168         os.close(fd)
    169         self.assertEqual(cm.exception.errno, errno.EISDIR)
    170 
    171     #A set of functions testing that we get expected behaviour if someone has
    172     #manually closed the internal file descriptor.  First, a decorator:
    173     def ClosedFD(func):
    174         @wraps(func)
    175         def wrapper(self):
    176             #forcibly close the fd before invoking the problem function
    177             f = self.f
    178             os.close(f.fileno())
    179             try:
    180                 func(self, f)
    181             finally:
    182                 try:
    183                     self.f.close()
    184                 except IOError:
    185                     pass
    186         return wrapper
    187 
    188     def ClosedFDRaises(func):
    189         @wraps(func)
    190         def wrapper(self):
    191             #forcibly close the fd before invoking the problem function
    192             f = self.f
    193             os.close(f.fileno())
    194             try:
    195                 func(self, f)
    196             except IOError as e:
    197                 self.assertEqual(e.errno, errno.EBADF)
    198             else:
    199                 self.fail("Should have raised IOError")
    200             finally:
    201                 try:
    202                     self.f.close()
    203                 except IOError:
    204                     pass
    205         return wrapper
    206 
    207     @ClosedFDRaises
    208     def testErrnoOnClose(self, f):
    209         f.close()
    210 
    211     @ClosedFDRaises
    212     def testErrnoOnClosedWrite(self, f):
    213         f.write('a')
    214 
    215     @ClosedFDRaises
    216     def testErrnoOnClosedSeek(self, f):
    217         f.seek(0)
    218 
    219     @ClosedFDRaises
    220     def testErrnoOnClosedTell(self, f):
    221         f.tell()
    222 
    223     @ClosedFDRaises
    224     def testErrnoOnClosedTruncate(self, f):
    225         f.truncate(0)
    226 
    227     @ClosedFD
    228     def testErrnoOnClosedSeekable(self, f):
    229         f.seekable()
    230 
    231     @ClosedFD
    232     def testErrnoOnClosedReadable(self, f):
    233         f.readable()
    234 
    235     @ClosedFD
    236     def testErrnoOnClosedWritable(self, f):
    237         f.writable()
    238 
    239     @ClosedFD
    240     def testErrnoOnClosedFileno(self, f):
    241         f.fileno()
    242 
    243     @ClosedFD
    244     def testErrnoOnClosedIsatty(self, f):
    245         self.assertEqual(f.isatty(), False)
    246 
    247     def ReopenForRead(self):
    248         try:
    249             self.f.close()
    250         except IOError:
    251             pass
    252         self.f = _FileIO(TESTFN, 'r')
    253         os.close(self.f.fileno())
    254         return self.f
    255 
    256     @ClosedFDRaises
    257     def testErrnoOnClosedRead(self, f):
    258         f = self.ReopenForRead()
    259         f.read(1)
    260 
    261     @ClosedFDRaises
    262     def testErrnoOnClosedReadall(self, f):
    263         f = self.ReopenForRead()
    264         f.readall()
    265 
    266     @ClosedFDRaises
    267     def testErrnoOnClosedReadinto(self, f):
    268         f = self.ReopenForRead()
    269         a = array(b'b', b'x'*10)
    270         f.readinto(a)
    271 
    272 class OtherFileTests(unittest.TestCase):
    273 
    274     def testAbles(self):
    275         try:
    276             f = _FileIO(TESTFN, "w")
    277             self.assertEqual(f.readable(), False)
    278             self.assertEqual(f.writable(), True)
    279             self.assertEqual(f.seekable(), True)
    280             f.close()
    281 
    282             f = _FileIO(TESTFN, "r")
    283             self.assertEqual(f.readable(), True)
    284             self.assertEqual(f.writable(), False)
    285             self.assertEqual(f.seekable(), True)
    286             f.close()
    287 
    288             f = _FileIO(TESTFN, "a+")
    289             self.assertEqual(f.readable(), True)
    290             self.assertEqual(f.writable(), True)
    291             self.assertEqual(f.seekable(), True)
    292             self.assertEqual(f.isatty(), False)
    293             f.close()
    294         finally:
    295             os.unlink(TESTFN)
    296 
    297     @unittest.skipIf(sys.platform == 'win32', 'no ttys on Windows')
    298     def testAblesOnTTY(self):
    299         try:
    300             f = _FileIO("/dev/tty", "a")
    301         except EnvironmentError:
    302             # When run in a cron job there just aren't any
    303             # ttys, so skip the test.  This also handles other
    304             # OS'es that don't support /dev/tty.
    305             self.skipTest('need /dev/tty')
    306         else:
    307             self.assertEqual(f.readable(), False)
    308             self.assertEqual(f.writable(), True)
    309             if sys.platform != "darwin" and \
    310                'bsd' not in sys.platform and \
    311                not sys.platform.startswith(('sunos', 'aix')):
    312                 # Somehow /dev/tty appears seekable on some BSDs
    313                 self.assertEqual(f.seekable(), False)
    314             self.assertEqual(f.isatty(), True)
    315             f.close()
    316 
    317     def testInvalidModeStrings(self):
    318         # check invalid mode strings
    319         for mode in ("", "aU", "wU+", "rw", "rt"):
    320             try:
    321                 f = _FileIO(TESTFN, mode)
    322             except ValueError:
    323                 pass
    324             else:
    325                 f.close()
    326                 self.fail('%r is an invalid file mode' % mode)
    327 
    328     def testModeStrings(self):
    329         # test that the mode attribute is correct for various mode strings
    330         # given as init args
    331         try:
    332             for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
    333                           ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
    334                           ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
    335                           ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
    336                 # read modes are last so that TESTFN will exist first
    337                 with _FileIO(TESTFN, modes[0]) as f:
    338                     self.assertEqual(f.mode, modes[1])
    339         finally:
    340             if os.path.exists(TESTFN):
    341                 os.unlink(TESTFN)
    342 
    343     def testUnicodeOpen(self):
    344         # verify repr works for unicode too
    345         f = _FileIO(str(TESTFN), "w")
    346         f.close()
    347         os.unlink(TESTFN)
    348 
    349     def testBytesOpen(self):
    350         # Opening a bytes filename
    351         try:
    352             fn = TESTFN.encode("ascii")
    353         except UnicodeEncodeError:
    354             self.skipTest('could not encode %r to ascii' % TESTFN)
    355         f = _FileIO(fn, "w")
    356         try:
    357             f.write(b"abc")
    358             f.close()
    359             with open(TESTFN, "rb") as f:
    360                 self.assertEqual(f.read(), b"abc")
    361         finally:
    362             os.unlink(TESTFN)
    363 
    364     def testConstructorHandlesNULChars(self):
    365         fn_with_NUL = 'foo\0bar'
    366         self.assertRaises(TypeError, _FileIO, fn_with_NUL, 'w')
    367         self.assertRaises(TypeError, _FileIO, fn_with_NUL.encode('ascii'), 'w')
    368 
    369     def testInvalidFd(self):
    370         self.assertRaises(ValueError, _FileIO, -10)
    371         self.assertRaises(OSError, _FileIO, make_bad_fd())
    372         if sys.platform == 'win32':
    373             import msvcrt
    374             self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())
    375 
    376     @cpython_only
    377     def testInvalidFd_overflow(self):
    378         # Issue 15989
    379         import _testcapi
    380         self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
    381         self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
    382 
    383     def testBadModeArgument(self):
    384         # verify that we get a sensible error message for bad mode argument
    385         bad_mode = "qwerty"
    386         try:
    387             f = _FileIO(TESTFN, bad_mode)
    388         except ValueError as msg:
    389             if msg.args[0] != 0:
    390                 s = str(msg)
    391                 if TESTFN in s or bad_mode not in s:
    392                     self.fail("bad error message for invalid mode: %s" % s)
    393             # if msg.args[0] == 0, we're probably on Windows where there may be
    394             # no obvious way to discover why open() failed.
    395         else:
    396             f.close()
    397             self.fail("no error for invalid mode: %s" % bad_mode)
    398 
    399     def testTruncate(self):
    400         f = _FileIO(TESTFN, 'w')
    401         f.write(bytes(bytearray(range(10))))
    402         self.assertEqual(f.tell(), 10)
    403         f.truncate(5)
    404         self.assertEqual(f.tell(), 10)
    405         self.assertEqual(f.seek(0, os.SEEK_END), 5)
    406         f.truncate(15)
    407         self.assertEqual(f.tell(), 5)
    408         self.assertEqual(f.seek(0, os.SEEK_END), 15)
    409         f.close()
    410 
    411     def testTruncateOnWindows(self):
    412         def bug801631():
    413             # SF bug <http://www.python.org/sf/801631>
    414             # "file.truncate fault on windows"
    415             f = _FileIO(TESTFN, 'w')
    416             f.write(bytes(range(11)))
    417             f.close()
    418 
    419             f = _FileIO(TESTFN,'r+')
    420             data = f.read(5)
    421             if data != bytes(range(5)):
    422                 self.fail("Read on file opened for update failed %r" % data)
    423             if f.tell() != 5:
    424                 self.fail("File pos after read wrong %d" % f.tell())
    425 
    426             f.truncate()
    427             if f.tell() != 5:
    428                 self.fail("File pos after ftruncate wrong %d" % f.tell())
    429 
    430             f.close()
    431             size = os.path.getsize(TESTFN)
    432             if size != 5:
    433                 self.fail("File size after ftruncate wrong %d" % size)
    434 
    435         try:
    436             bug801631()
    437         finally:
    438             os.unlink(TESTFN)
    439 
    440     def testAppend(self):
    441         try:
    442             f = open(TESTFN, 'wb')
    443             f.write(b'spam')
    444             f.close()
    445             f = open(TESTFN, 'ab')
    446             f.write(b'eggs')
    447             f.close()
    448             f = open(TESTFN, 'rb')
    449             d = f.read()
    450             f.close()
    451             self.assertEqual(d, b'spameggs')
    452         finally:
    453             try:
    454                 os.unlink(TESTFN)
    455             except:
    456                 pass
    457 
    458     def testInvalidInit(self):
    459         self.assertRaises(TypeError, _FileIO, "1", 0, 0)
    460 
    461     def testWarnings(self):
    462         with check_warnings(quiet=True) as w:
    463             self.assertEqual(w.warnings, [])
    464             self.assertRaises(TypeError, _FileIO, [])
    465             self.assertEqual(w.warnings, [])
    466             self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
    467             self.assertEqual(w.warnings, [])
    468 
    469     def test_surrogates(self):
    470         # Issue #8438: try to open a filename containing surrogates.
    471         # It should either fail because the file doesn't exist or the filename
    472         # can't be represented using the filesystem encoding, but not because
    473         # of a LookupError for the error handler "surrogateescape".
    474         filename = u'\udc80.txt'
    475         try:
    476             with _FileIO(filename):
    477                 pass
    478         except (UnicodeEncodeError, IOError):
    479             pass
    480         # Spawn a separate Python process with a different "file system
    481         # default encoding", to exercise this further.
    482         env = dict(os.environ)
    483         env[b'LC_CTYPE'] = b'C'
    484         _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env)
    485         if ('UnicodeEncodeError' not in out and not
    486                 ( ('IOError: [Errno 2] No such file or directory' in out) or
    487                   ('IOError: [Errno 22] Invalid argument' in out) ) ):
    488             self.fail('Bad output: %r' % out)
    489 
    490     def testUnclosedFDOnException(self):
    491         class MyException(Exception): pass
    492         class MyFileIO(_FileIO):
    493             def __setattr__(self, name, value):
    494                 if name == "name":
    495                     raise MyException("blocked setting name")
    496                 return super(MyFileIO, self).__setattr__(name, value)
    497         fd = os.open(__file__, os.O_RDONLY)
    498         self.assertRaises(MyException, MyFileIO, fd)
    499         os.close(fd)  # should not raise OSError(EBADF)
    500 
    501 def test_main():
    502     # Historically, these tests have been sloppy about removing TESTFN.
    503     # So get rid of it no matter what.
    504     try:
    505         run_unittest(AutoFileTests, OtherFileTests)
    506     finally:
    507         if os.path.exists(TESTFN):
    508             os.unlink(TESTFN)
    509 
    510 if __name__ == '__main__':
    511     test_main()
    512