Home | History | Annotate | Download | only in test
      1 # Adapted from test_file.py by Daniel Stutzbach
      2 
      3 import sys
      4 import os
      5 import io
      6 import errno
      7 import unittest
      8 from array import array
      9 from weakref import proxy
     10 from functools import wraps
     11 
     12 from test.support import (TESTFN, TESTFN_UNICODE, check_warnings, run_unittest,
     13                           make_bad_fd, cpython_only)
     14 from collections import UserList
     15 
     16 import _io  # C implementation of io
     17 import _pyio # Python implementation of io
     18 
     19 
     20 class AutoFileTests:
     21     # file tests for which a test file is automatically set up
     22 
     23     def setUp(self):
     24         self.f = self.FileIO(TESTFN, 'w')
     25 
     26     def tearDown(self):
     27         if self.f:
     28             self.f.close()
     29         os.remove(TESTFN)
     30 
     31     def testWeakRefs(self):
     32         # verify weak references
     33         p = proxy(self.f)
     34         p.write(bytes(range(10)))
     35         self.assertEqual(self.f.tell(), p.tell())
     36         self.f.close()
     37         self.f = None
     38         self.assertRaises(ReferenceError, getattr, p, 'tell')
     39 
     40     def testSeekTell(self):
     41         self.f.write(bytes(range(20)))
     42         self.assertEqual(self.f.tell(), 20)
     43         self.f.seek(0)
     44         self.assertEqual(self.f.tell(), 0)
     45         self.f.seek(10)
     46         self.assertEqual(self.f.tell(), 10)
     47         self.f.seek(5, 1)
     48         self.assertEqual(self.f.tell(), 15)
     49         self.f.seek(-5, 1)
     50         self.assertEqual(self.f.tell(), 10)
     51         self.f.seek(-5, 2)
     52         self.assertEqual(self.f.tell(), 15)
     53 
     54     def testAttributes(self):
     55         # verify expected attributes exist
     56         f = self.f
     57 
     58         self.assertEqual(f.mode, "wb")
     59         self.assertEqual(f.closed, False)
     60 
     61         # verify the attributes are readonly
     62         for attr in 'mode', 'closed':
     63             self.assertRaises((AttributeError, TypeError),
     64                               setattr, f, attr, 'oops')
     65 
     66     def testBlksize(self):
     67         # test private _blksize attribute
     68         blksize = io.DEFAULT_BUFFER_SIZE
     69         # try to get preferred blksize from stat.st_blksize, if available
     70         if hasattr(os, 'fstat'):
     71             fst = os.fstat(self.f.fileno())
     72             blksize = getattr(fst, 'st_blksize', blksize)
     73         self.assertEqual(self.f._blksize, blksize)
     74 
     75     # verify readinto
     76     def testReadintoByteArray(self):
     77         self.f.write(bytes([1, 2, 0, 255]))
     78         self.f.close()
     79 
     80         ba = bytearray(b'abcdefgh')
     81         with self.FileIO(TESTFN, 'r') as f:
     82             n = f.readinto(ba)
     83         self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
     84         self.assertEqual(n, 4)
     85 
     86     def _testReadintoMemoryview(self):
     87         self.f.write(bytes([1, 2, 0, 255]))
     88         self.f.close()
     89 
     90         m = memoryview(bytearray(b'abcdefgh'))
     91         with self.FileIO(TESTFN, 'r') as f:
     92             n = f.readinto(m)
     93         self.assertEqual(m, b'\x01\x02\x00\xffefgh')
     94         self.assertEqual(n, 4)
     95 
     96         m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
     97         with self.FileIO(TESTFN, 'r') as f:
     98             n = f.readinto(m)
     99         self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
    100         self.assertEqual(n, 4)
    101 
    102     def _testReadintoArray(self):
    103         self.f.write(bytes([1, 2, 0, 255]))
    104         self.f.close()
    105 
    106         a = array('B', b'abcdefgh')
    107         with self.FileIO(TESTFN, 'r') as f:
    108             n = f.readinto(a)
    109         self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
    110         self.assertEqual(n, 4)
    111 
    112         a = array('b', b'abcdefgh')
    113         with self.FileIO(TESTFN, 'r') as f:
    114             n = f.readinto(a)
    115         self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
    116         self.assertEqual(n, 4)
    117 
    118         a = array('I', b'abcdefgh')
    119         with self.FileIO(TESTFN, 'r') as f:
    120             n = f.readinto(a)
    121         self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
    122         self.assertEqual(n, 4)
    123 
    124     def testWritelinesList(self):
    125         l = [b'123', b'456']
    126         self.f.writelines(l)
    127         self.f.close()
    128         self.f = self.FileIO(TESTFN, 'rb')
    129         buf = self.f.read()
    130         self.assertEqual(buf, b'123456')
    131 
    132     def testWritelinesUserList(self):
    133         l = UserList([b'123', b'456'])
    134         self.f.writelines(l)
    135         self.f.close()
    136         self.f = self.FileIO(TESTFN, 'rb')
    137         buf = self.f.read()
    138         self.assertEqual(buf, b'123456')
    139 
    140     def testWritelinesError(self):
    141         self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
    142         self.assertRaises(TypeError, self.f.writelines, None)
    143         self.assertRaises(TypeError, self.f.writelines, "abc")
    144 
    145     def test_none_args(self):
    146         self.f.write(b"hi\nbye\nabc")
    147         self.f.close()
    148         self.f = self.FileIO(TESTFN, 'r')
    149         self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
    150         self.f.seek(0)
    151         self.assertEqual(self.f.readline(None), b"hi\n")
    152         self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
    153 
    154     def test_reject(self):
    155         self.assertRaises(TypeError, self.f.write, "Hello!")
    156 
    157     def testRepr(self):
    158         self.assertEqual(repr(self.f),
    159                          "<%s.FileIO name=%r mode=%r closefd=True>" %
    160                          (self.modulename, self.f.name, self.f.mode))
    161         del self.f.name
    162         self.assertEqual(repr(self.f),
    163                          "<%s.FileIO fd=%r mode=%r closefd=True>" %
    164                          (self.modulename, self.f.fileno(), self.f.mode))
    165         self.f.close()
    166         self.assertEqual(repr(self.f),
    167                          "<%s.FileIO [closed]>" % (self.modulename,))
    168 
    169     def testReprNoCloseFD(self):
    170         fd = os.open(TESTFN, os.O_RDONLY)
    171         try:
    172             with self.FileIO(fd, 'r', closefd=False) as f:
    173                 self.assertEqual(repr(f),
    174                                  "<%s.FileIO name=%r mode=%r closefd=False>" %
    175                                  (self.modulename, f.name, f.mode))
    176         finally:
    177             os.close(fd)
    178 
    179     def testErrors(self):
    180         f = self.f
    181         self.assertFalse(f.isatty())
    182         self.assertFalse(f.closed)
    183         #self.assertEqual(f.name, TESTFN)
    184         self.assertRaises(ValueError, f.read, 10) # Open for reading
    185         f.close()
    186         self.assertTrue(f.closed)
    187         f = self.FileIO(TESTFN, 'r')
    188         self.assertRaises(TypeError, f.readinto, "")
    189         self.assertFalse(f.closed)
    190         f.close()
    191         self.assertTrue(f.closed)
    192 
    193     def testMethods(self):
    194         methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
    195                    'read', 'readall', 'readline', 'readlines',
    196                    'tell', 'truncate', 'flush']
    197 
    198         self.f.close()
    199         self.assertTrue(self.f.closed)
    200 
    201         for methodname in methods:
    202             method = getattr(self.f, methodname)
    203             # should raise on closed file
    204             self.assertRaises(ValueError, method)
    205 
    206         self.assertRaises(TypeError, self.f.readinto)
    207         self.assertRaises(ValueError, self.f.readinto, bytearray(1))
    208         self.assertRaises(TypeError, self.f.seek)
    209         self.assertRaises(ValueError, self.f.seek, 0)
    210         self.assertRaises(TypeError, self.f.write)
    211         self.assertRaises(ValueError, self.f.write, b'')
    212         self.assertRaises(TypeError, self.f.writelines)
    213         self.assertRaises(ValueError, self.f.writelines, b'')
    214 
    215     def testOpendir(self):
    216         # Issue 3703: opening a directory should fill the errno
    217         # Windows always returns "[Errno 13]: Permission denied
    218         # Unix uses fstat and returns "[Errno 21]: Is a directory"
    219         try:
    220             self.FileIO('.', 'r')
    221         except OSError as e:
    222             self.assertNotEqual(e.errno, 0)
    223             self.assertEqual(e.filename, ".")
    224         else:
    225             self.fail("Should have raised OSError")
    226 
    227     @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
    228     def testOpenDirFD(self):
    229         fd = os.open('.', os.O_RDONLY)
    230         with self.assertRaises(OSError) as cm:
    231             self.FileIO(fd, 'r')
    232         os.close(fd)
    233         self.assertEqual(cm.exception.errno, errno.EISDIR)
    234 
    235     #A set of functions testing that we get expected behaviour if someone has
    236     #manually closed the internal file descriptor.  First, a decorator:
    237     def ClosedFD(func):
    238         @wraps(func)
    239         def wrapper(self):
    240             #forcibly close the fd before invoking the problem function
    241             f = self.f
    242             os.close(f.fileno())
    243             try:
    244                 func(self, f)
    245             finally:
    246                 try:
    247                     self.f.close()
    248                 except OSError:
    249                     pass
    250         return wrapper
    251 
    252     def ClosedFDRaises(func):
    253         @wraps(func)
    254         def wrapper(self):
    255             #forcibly close the fd before invoking the problem function
    256             f = self.f
    257             os.close(f.fileno())
    258             try:
    259                 func(self, f)
    260             except OSError as e:
    261                 self.assertEqual(e.errno, errno.EBADF)
    262             else:
    263                 self.fail("Should have raised OSError")
    264             finally:
    265                 try:
    266                     self.f.close()
    267                 except OSError:
    268                     pass
    269         return wrapper
    270 
    271     @ClosedFDRaises
    272     def testErrnoOnClose(self, f):
    273         f.close()
    274 
    275     @ClosedFDRaises
    276     def testErrnoOnClosedWrite(self, f):
    277         f.write(b'a')
    278 
    279     @ClosedFDRaises
    280     def testErrnoOnClosedSeek(self, f):
    281         f.seek(0)
    282 
    283     @ClosedFDRaises
    284     def testErrnoOnClosedTell(self, f):
    285         f.tell()
    286 
    287     @ClosedFDRaises
    288     def testErrnoOnClosedTruncate(self, f):
    289         f.truncate(0)
    290 
    291     @ClosedFD
    292     def testErrnoOnClosedSeekable(self, f):
    293         f.seekable()
    294 
    295     @ClosedFD
    296     def testErrnoOnClosedReadable(self, f):
    297         f.readable()
    298 
    299     @ClosedFD
    300     def testErrnoOnClosedWritable(self, f):
    301         f.writable()
    302 
    303     @ClosedFD
    304     def testErrnoOnClosedFileno(self, f):
    305         f.fileno()
    306 
    307     @ClosedFD
    308     def testErrnoOnClosedIsatty(self, f):
    309         self.assertEqual(f.isatty(), False)
    310 
    311     def ReopenForRead(self):
    312         try:
    313             self.f.close()
    314         except OSError:
    315             pass
    316         self.f = self.FileIO(TESTFN, 'r')
    317         os.close(self.f.fileno())
    318         return self.f
    319 
    320     @ClosedFDRaises
    321     def testErrnoOnClosedRead(self, f):
    322         f = self.ReopenForRead()
    323         f.read(1)
    324 
    325     @ClosedFDRaises
    326     def testErrnoOnClosedReadall(self, f):
    327         f = self.ReopenForRead()
    328         f.readall()
    329 
    330     @ClosedFDRaises
    331     def testErrnoOnClosedReadinto(self, f):
    332         f = self.ReopenForRead()
    333         a = array('b', b'x'*10)
    334         f.readinto(a)
    335 
    336 class CAutoFileTests(AutoFileTests, unittest.TestCase):
    337     FileIO = _io.FileIO
    338     modulename = '_io'
    339 
    340 class PyAutoFileTests(AutoFileTests, unittest.TestCase):
    341     FileIO = _pyio.FileIO
    342     modulename = '_pyio'
    343 
    344 
    345 class OtherFileTests:
    346 
    347     def testAbles(self):
    348         try:
    349             f = self.FileIO(TESTFN, "w")
    350             self.assertEqual(f.readable(), False)
    351             self.assertEqual(f.writable(), True)
    352             self.assertEqual(f.seekable(), True)
    353             f.close()
    354 
    355             f = self.FileIO(TESTFN, "r")
    356             self.assertEqual(f.readable(), True)
    357             self.assertEqual(f.writable(), False)
    358             self.assertEqual(f.seekable(), True)
    359             f.close()
    360 
    361             f = self.FileIO(TESTFN, "a+")
    362             self.assertEqual(f.readable(), True)
    363             self.assertEqual(f.writable(), True)
    364             self.assertEqual(f.seekable(), True)
    365             self.assertEqual(f.isatty(), False)
    366             f.close()
    367 
    368             if sys.platform != "win32":
    369                 try:
    370                     f = self.FileIO("/dev/tty", "a")
    371                 except OSError:
    372                     # When run in a cron job there just aren't any
    373                     # ttys, so skip the test.  This also handles other
    374                     # OS'es that don't support /dev/tty.
    375                     pass
    376                 else:
    377                     self.assertEqual(f.readable(), False)
    378                     self.assertEqual(f.writable(), True)
    379                     if sys.platform != "darwin" and \
    380                        'bsd' not in sys.platform and \
    381                        not sys.platform.startswith(('sunos', 'aix')):
    382                         # Somehow /dev/tty appears seekable on some BSDs
    383                         self.assertEqual(f.seekable(), False)
    384                     self.assertEqual(f.isatty(), True)
    385                     f.close()
    386         finally:
    387             os.unlink(TESTFN)
    388 
    389     def testInvalidModeStrings(self):
    390         # check invalid mode strings
    391         for mode in ("", "aU", "wU+", "rw", "rt"):
    392             try:
    393                 f = self.FileIO(TESTFN, mode)
    394             except ValueError:
    395                 pass
    396             else:
    397                 f.close()
    398                 self.fail('%r is an invalid file mode' % mode)
    399 
    400     def testModeStrings(self):
    401         # test that the mode attribute is correct for various mode strings
    402         # given as init args
    403         try:
    404             for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
    405                           ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
    406                           ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
    407                           ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
    408                 # read modes are last so that TESTFN will exist first
    409                 with self.FileIO(TESTFN, modes[0]) as f:
    410                     self.assertEqual(f.mode, modes[1])
    411         finally:
    412             if os.path.exists(TESTFN):
    413                 os.unlink(TESTFN)
    414 
    415     def testUnicodeOpen(self):
    416         # verify repr works for unicode too
    417         f = self.FileIO(str(TESTFN), "w")
    418         f.close()
    419         os.unlink(TESTFN)
    420 
    421     def testBytesOpen(self):
    422         # Opening a bytes filename
    423         try:
    424             fn = TESTFN.encode("ascii")
    425         except UnicodeEncodeError:
    426             self.skipTest('could not encode %r to ascii' % TESTFN)
    427         f = self.FileIO(fn, "w")
    428         try:
    429             f.write(b"abc")
    430             f.close()
    431             with open(TESTFN, "rb") as f:
    432                 self.assertEqual(f.read(), b"abc")
    433         finally:
    434             os.unlink(TESTFN)
    435 
    436     @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8',
    437                      "test only works for utf-8 filesystems")
    438     def testUtf8BytesOpen(self):
    439         # Opening a UTF-8 bytes filename
    440         try:
    441             fn = TESTFN_UNICODE.encode("utf-8")
    442         except UnicodeEncodeError:
    443             self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE)
    444         f = self.FileIO(fn, "w")
    445         try:
    446             f.write(b"abc")
    447             f.close()
    448             with open(TESTFN_UNICODE, "rb") as f:
    449                 self.assertEqual(f.read(), b"abc")
    450         finally:
    451             os.unlink(TESTFN_UNICODE)
    452 
    453     def testConstructorHandlesNULChars(self):
    454         fn_with_NUL = 'foo\0bar'
    455         self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
    456         self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
    457 
    458     def testInvalidFd(self):
    459         self.assertRaises(ValueError, self.FileIO, -10)
    460         self.assertRaises(OSError, self.FileIO, make_bad_fd())
    461         if sys.platform == 'win32':
    462             import msvcrt
    463             self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
    464 
    465     def testBadModeArgument(self):
    466         # verify that we get a sensible error message for bad mode argument
    467         bad_mode = "qwerty"
    468         try:
    469             f = self.FileIO(TESTFN, bad_mode)
    470         except ValueError as msg:
    471             if msg.args[0] != 0:
    472                 s = str(msg)
    473                 if TESTFN in s or bad_mode not in s:
    474                     self.fail("bad error message for invalid mode: %s" % s)
    475             # if msg.args[0] == 0, we're probably on Windows where there may be
    476             # no obvious way to discover why open() failed.
    477         else:
    478             f.close()
    479             self.fail("no error for invalid mode: %s" % bad_mode)
    480 
    481     def testTruncate(self):
    482         f = self.FileIO(TESTFN, 'w')
    483         f.write(bytes(bytearray(range(10))))
    484         self.assertEqual(f.tell(), 10)
    485         f.truncate(5)
    486         self.assertEqual(f.tell(), 10)
    487         self.assertEqual(f.seek(0, io.SEEK_END), 5)
    488         f.truncate(15)
    489         self.assertEqual(f.tell(), 5)
    490         self.assertEqual(f.seek(0, io.SEEK_END), 15)
    491         f.close()
    492 
    493     def testTruncateOnWindows(self):
    494         def bug801631():
    495             # SF bug <http://www.python.org/sf/801631>
    496             # "file.truncate fault on windows"
    497             f = self.FileIO(TESTFN, 'w')
    498             f.write(bytes(range(11)))
    499             f.close()
    500 
    501             f = self.FileIO(TESTFN,'r+')
    502             data = f.read(5)
    503             if data != bytes(range(5)):
    504                 self.fail("Read on file opened for update failed %r" % data)
    505             if f.tell() != 5:
    506                 self.fail("File pos after read wrong %d" % f.tell())
    507 
    508             f.truncate()
    509             if f.tell() != 5:
    510                 self.fail("File pos after ftruncate wrong %d" % f.tell())
    511 
    512             f.close()
    513             size = os.path.getsize(TESTFN)
    514             if size != 5:
    515                 self.fail("File size after ftruncate wrong %d" % size)
    516 
    517         try:
    518             bug801631()
    519         finally:
    520             os.unlink(TESTFN)
    521 
    522     def testAppend(self):
    523         try:
    524             f = open(TESTFN, 'wb')
    525             f.write(b'spam')
    526             f.close()
    527             f = open(TESTFN, 'ab')
    528             f.write(b'eggs')
    529             f.close()
    530             f = open(TESTFN, 'rb')
    531             d = f.read()
    532             f.close()
    533             self.assertEqual(d, b'spameggs')
    534         finally:
    535             try:
    536                 os.unlink(TESTFN)
    537             except:
    538                 pass
    539 
    540     def testInvalidInit(self):
    541         self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
    542 
    543     def testWarnings(self):
    544         with check_warnings(quiet=True) as w:
    545             self.assertEqual(w.warnings, [])
    546             self.assertRaises(TypeError, self.FileIO, [])
    547             self.assertEqual(w.warnings, [])
    548             self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
    549             self.assertEqual(w.warnings, [])
    550 
    551     def testUnclosedFDOnException(self):
    552         class MyException(Exception): pass
    553         class MyFileIO(self.FileIO):
    554             def __setattr__(self, name, value):
    555                 if name == "name":
    556                     raise MyException("blocked setting name")
    557                 return super(MyFileIO, self).__setattr__(name, value)
    558         fd = os.open(__file__, os.O_RDONLY)
    559         self.assertRaises(MyException, MyFileIO, fd)
    560         os.close(fd)  # should not raise OSError(EBADF)
    561 
    562 class COtherFileTests(OtherFileTests, unittest.TestCase):
    563     FileIO = _io.FileIO
    564     modulename = '_io'
    565 
    566     @cpython_only
    567     def testInvalidFd_overflow(self):
    568         # Issue 15989
    569         import _testcapi
    570         self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
    571         self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
    572 
    573 class PyOtherFileTests(OtherFileTests, unittest.TestCase):
    574     FileIO = _pyio.FileIO
    575     modulename = '_pyio'
    576 
    577 
    578 def test_main():
    579     # Historically, these tests have been sloppy about removing TESTFN.
    580     # So get rid of it no matter what.
    581     try:
    582         run_unittest(CAutoFileTests, PyAutoFileTests,
    583                      COtherFileTests, PyOtherFileTests)
    584     finally:
    585         if os.path.exists(TESTFN):
    586             os.unlink(TESTFN)
    587 
    588 if __name__ == '__main__':
    589     test_main()
    590