Home | History | Annotate | Download | only in test
      1 # Adapted from test_file.py by Daniel Stutzbach
      2 
      3 from __future__ import unicode_literals
      4 
      5 import sys
      6 import os
      7 import errno
      8 import unittest
      9 from array import array
     10 from weakref import proxy
     11 from functools import wraps
     12 from UserList import UserList
     13 import _testcapi
     14 
     15 from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd
     16 from test.test_support import py3k_bytes as bytes
     17 from test.script_helper import run_python
     18 
     19 from _io import FileIO as _FileIO
     20 
     21 class AutoFileTests(unittest.TestCase):
     22     # file tests for which a test file is automatically set up
     23 
     24     def setUp(self):
     25         self.f = _FileIO(TESTFN, 'w')
     26 
     27     def tearDown(self):
     28         if self.f:
     29             self.f.close()
     30         os.remove(TESTFN)
     31 
     32     def testWeakRefs(self):
     33         # verify weak references
     34         p = proxy(self.f)
     35         p.write(bytes(range(10)))
     36         self.assertEqual(self.f.tell(), p.tell())
     37         self.f.close()
     38         self.f = None
     39         self.assertRaises(ReferenceError, getattr, p, 'tell')
     40 
     41     def testSeekTell(self):
     42         self.f.write(bytes(range(20)))
     43         self.assertEqual(self.f.tell(), 20)
     44         self.f.seek(0)
     45         self.assertEqual(self.f.tell(), 0)
     46         self.f.seek(10)
     47         self.assertEqual(self.f.tell(), 10)
     48         self.f.seek(5, 1)
     49         self.assertEqual(self.f.tell(), 15)
     50         self.f.seek(-5, 1)
     51         self.assertEqual(self.f.tell(), 10)
     52         self.f.seek(-5, 2)
     53         self.assertEqual(self.f.tell(), 15)
     54 
     55     def testAttributes(self):
     56         # verify expected attributes exist
     57         f = self.f
     58 
     59         self.assertEqual(f.mode, "wb")
     60         self.assertEqual(f.closed, False)
     61 
     62         # verify the attributes are readonly
     63         for attr in 'mode', 'closed':
     64             self.assertRaises((AttributeError, TypeError),
     65                               setattr, f, attr, 'oops')
     66 
     67     def testReadinto(self):
     68         # verify readinto
     69         self.f.write(b"\x01\x02")
     70         self.f.close()
     71         a = array(b'b', b'x'*10)
     72         self.f = _FileIO(TESTFN, 'r')
     73         n = self.f.readinto(a)
     74         self.assertEqual(array(b'b', [1, 2]), a[:n])
     75 
     76     def testWritelinesList(self):
     77         l = [b'123', b'456']
     78         self.f.writelines(l)
     79         self.f.close()
     80         self.f = _FileIO(TESTFN, 'rb')
     81         buf = self.f.read()
     82         self.assertEqual(buf, b'123456')
     83 
     84     def testWritelinesUserList(self):
     85         l = UserList([b'123', b'456'])
     86         self.f.writelines(l)
     87         self.f.close()
     88         self.f = _FileIO(TESTFN, 'rb')
     89         buf = self.f.read()
     90         self.assertEqual(buf, b'123456')
     91 
     92     def testWritelinesError(self):
     93         self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
     94         self.assertRaises(TypeError, self.f.writelines, None)
     95 
     96     def test_none_args(self):
     97         self.f.write(b"hi\nbye\nabc")
     98         self.f.close()
     99         self.f = _FileIO(TESTFN, 'r')
    100         self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
    101         self.f.seek(0)
    102         self.assertEqual(self.f.readline(None), b"hi\n")
    103         self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
    104 
    105     def testRepr(self):
    106         self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>"
    107                                        % (self.f.name, self.f.mode))
    108         del self.f.name
    109         self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>"
    110                                        % (self.f.fileno(), self.f.mode))
    111         self.f.close()
    112         self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
    113 
    114     def testErrors(self):
    115         f = self.f
    116         self.assertTrue(not f.isatty())
    117         self.assertTrue(not f.closed)
    118         #self.assertEqual(f.name, TESTFN)
    119         self.assertRaises(ValueError, f.read, 10) # Open for reading
    120         f.close()
    121         self.assertTrue(f.closed)
    122         f = _FileIO(TESTFN, 'r')
    123         self.assertRaises(TypeError, f.readinto, "")
    124         self.assertTrue(not f.closed)
    125         f.close()
    126         self.assertTrue(f.closed)
    127 
    128     def testMethods(self):
    129         methods = ['fileno', 'isatty', 'read', 'readinto',
    130                    'seek', 'tell', 'truncate', 'write', 'seekable',
    131                    'readable', 'writable']
    132         if sys.platform.startswith('atheos'):
    133             methods.remove('truncate')
    134 
    135         self.f.close()
    136         self.assertTrue(self.f.closed)
    137 
    138         for methodname in methods:
    139             method = getattr(self.f, methodname)
    140             # should raise on closed file
    141             self.assertRaises(ValueError, method)
    142 
    143     def testOpendir(self):
    144         # Issue 3703: opening a directory should fill the errno
    145         # Windows always returns "[Errno 13]: Permission denied
    146         # Unix calls dircheck() and returns "[Errno 21]: Is a directory"
    147         try:
    148             _FileIO('.', 'r')
    149         except IOError as e:
    150             self.assertNotEqual(e.errno, 0)
    151             self.assertEqual(e.filename, ".")
    152         else:
    153             self.fail("Should have raised IOError")
    154 
    155     @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
    156     def testOpenDirFD(self):
    157         fd = os.open('.', os.O_RDONLY)
    158         with self.assertRaises(IOError) as cm:
    159             _FileIO(fd, 'r')
    160         os.close(fd)
    161         self.assertEqual(cm.exception.errno, errno.EISDIR)
    162 
    163     #A set of functions testing that we get expected behaviour if someone has
    164     #manually closed the internal file descriptor.  First, a decorator:
    165     def ClosedFD(func):
    166         @wraps(func)
    167         def wrapper(self):
    168             #forcibly close the fd before invoking the problem function
    169             f = self.f
    170             os.close(f.fileno())
    171             try:
    172                 func(self, f)
    173             finally:
    174                 try:
    175                     self.f.close()
    176                 except IOError:
    177                     pass
    178         return wrapper
    179 
    180     def ClosedFDRaises(func):
    181         @wraps(func)
    182         def wrapper(self):
    183             #forcibly close the fd before invoking the problem function
    184             f = self.f
    185             os.close(f.fileno())
    186             try:
    187                 func(self, f)
    188             except IOError as e:
    189                 self.assertEqual(e.errno, errno.EBADF)
    190             else:
    191                 self.fail("Should have raised IOError")
    192             finally:
    193                 try:
    194                     self.f.close()
    195                 except IOError:
    196                     pass
    197         return wrapper
    198 
    199     @ClosedFDRaises
    200     def testErrnoOnClose(self, f):
    201         f.close()
    202 
    203     @ClosedFDRaises
    204     def testErrnoOnClosedWrite(self, f):
    205         f.write('a')
    206 
    207     @ClosedFDRaises
    208     def testErrnoOnClosedSeek(self, f):
    209         f.seek(0)
    210 
    211     @ClosedFDRaises
    212     def testErrnoOnClosedTell(self, f):
    213         f.tell()
    214 
    215     @ClosedFDRaises
    216     def testErrnoOnClosedTruncate(self, f):
    217         f.truncate(0)
    218 
    219     @ClosedFD
    220     def testErrnoOnClosedSeekable(self, f):
    221         f.seekable()
    222 
    223     @ClosedFD
    224     def testErrnoOnClosedReadable(self, f):
    225         f.readable()
    226 
    227     @ClosedFD
    228     def testErrnoOnClosedWritable(self, f):
    229         f.writable()
    230 
    231     @ClosedFD
    232     def testErrnoOnClosedFileno(self, f):
    233         f.fileno()
    234 
    235     @ClosedFD
    236     def testErrnoOnClosedIsatty(self, f):
    237         self.assertEqual(f.isatty(), False)
    238 
    239     def ReopenForRead(self):
    240         try:
    241             self.f.close()
    242         except IOError:
    243             pass
    244         self.f = _FileIO(TESTFN, 'r')
    245         os.close(self.f.fileno())
    246         return self.f
    247 
    248     @ClosedFDRaises
    249     def testErrnoOnClosedRead(self, f):
    250         f = self.ReopenForRead()
    251         f.read(1)
    252 
    253     @ClosedFDRaises
    254     def testErrnoOnClosedReadall(self, f):
    255         f = self.ReopenForRead()
    256         f.readall()
    257 
    258     @ClosedFDRaises
    259     def testErrnoOnClosedReadinto(self, f):
    260         f = self.ReopenForRead()
    261         a = array(b'b', b'x'*10)
    262         f.readinto(a)
    263 
    264 class OtherFileTests(unittest.TestCase):
    265 
    266     def testAbles(self):
    267         try:
    268             f = _FileIO(TESTFN, "w")
    269             self.assertEqual(f.readable(), False)
    270             self.assertEqual(f.writable(), True)
    271             self.assertEqual(f.seekable(), True)
    272             f.close()
    273 
    274             f = _FileIO(TESTFN, "r")
    275             self.assertEqual(f.readable(), True)
    276             self.assertEqual(f.writable(), False)
    277             self.assertEqual(f.seekable(), True)
    278             f.close()
    279 
    280             f = _FileIO(TESTFN, "a+")
    281             self.assertEqual(f.readable(), True)
    282             self.assertEqual(f.writable(), True)
    283             self.assertEqual(f.seekable(), True)
    284             self.assertEqual(f.isatty(), False)
    285             f.close()
    286 
    287             if sys.platform != "win32":
    288                 try:
    289                     f = _FileIO("/dev/tty", "a")
    290                 except EnvironmentError:
    291                     # When run in a cron job there just aren't any
    292                     # ttys, so skip the test.  This also handles other
    293                     # OS'es that don't support /dev/tty.
    294                     pass
    295                 else:
    296                     self.assertEqual(f.readable(), False)
    297                     self.assertEqual(f.writable(), True)
    298                     if sys.platform != "darwin" and \
    299                        'bsd' not in sys.platform and \
    300                        not sys.platform.startswith('sunos'):
    301                         # Somehow /dev/tty appears seekable on some BSDs
    302                         self.assertEqual(f.seekable(), False)
    303                     self.assertEqual(f.isatty(), True)
    304                     f.close()
    305         finally:
    306             os.unlink(TESTFN)
    307 
    308     def testModeStrings(self):
    309         # check invalid mode strings
    310         for mode in ("", "aU", "wU+", "rw", "rt"):
    311             try:
    312                 f = _FileIO(TESTFN, mode)
    313             except ValueError:
    314                 pass
    315             else:
    316                 f.close()
    317                 self.fail('%r is an invalid file mode' % mode)
    318 
    319     def testUnicodeOpen(self):
    320         # verify repr works for unicode too
    321         f = _FileIO(str(TESTFN), "w")
    322         f.close()
    323         os.unlink(TESTFN)
    324 
    325     def testBytesOpen(self):
    326         # Opening a bytes filename
    327         try:
    328             fn = TESTFN.encode("ascii")
    329         except UnicodeEncodeError:
    330             # Skip test
    331             return
    332         f = _FileIO(fn, "w")
    333         try:
    334             f.write(b"abc")
    335             f.close()
    336             with open(TESTFN, "rb") as f:
    337                 self.assertEqual(f.read(), b"abc")
    338         finally:
    339             os.unlink(TESTFN)
    340 
    341     def testInvalidFd(self):
    342         self.assertRaises(ValueError, _FileIO, -10)
    343         self.assertRaises(OSError, _FileIO, make_bad_fd())
    344         if sys.platform == 'win32':
    345             import msvcrt
    346             self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())
    347         # Issue 15989
    348         self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
    349         self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
    350 
    351     def testBadModeArgument(self):
    352         # verify that we get a sensible error message for bad mode argument
    353         bad_mode = "qwerty"
    354         try:
    355             f = _FileIO(TESTFN, bad_mode)
    356         except ValueError as msg:
    357             if msg.args[0] != 0:
    358                 s = str(msg)
    359                 if TESTFN in s or bad_mode not in s:
    360                     self.fail("bad error message for invalid mode: %s" % s)
    361             # if msg.args[0] == 0, we're probably on Windows where there may be
    362             # no obvious way to discover why open() failed.
    363         else:
    364             f.close()
    365             self.fail("no error for invalid mode: %s" % bad_mode)
    366 
    367     def testTruncate(self):
    368         f = _FileIO(TESTFN, 'w')
    369         f.write(bytes(bytearray(range(10))))
    370         self.assertEqual(f.tell(), 10)
    371         f.truncate(5)
    372         self.assertEqual(f.tell(), 10)
    373         self.assertEqual(f.seek(0, os.SEEK_END), 5)
    374         f.truncate(15)
    375         self.assertEqual(f.tell(), 5)
    376         self.assertEqual(f.seek(0, os.SEEK_END), 15)
    377         f.close()
    378 
    379     def testTruncateOnWindows(self):
    380         def bug801631():
    381             # SF bug <http://www.python.org/sf/801631>
    382             # "file.truncate fault on windows"
    383             f = _FileIO(TESTFN, 'w')
    384             f.write(bytes(range(11)))
    385             f.close()
    386 
    387             f = _FileIO(TESTFN,'r+')
    388             data = f.read(5)
    389             if data != bytes(range(5)):
    390                 self.fail("Read on file opened for update failed %r" % data)
    391             if f.tell() != 5:
    392                 self.fail("File pos after read wrong %d" % f.tell())
    393 
    394             f.truncate()
    395             if f.tell() != 5:
    396                 self.fail("File pos after ftruncate wrong %d" % f.tell())
    397 
    398             f.close()
    399             size = os.path.getsize(TESTFN)
    400             if size != 5:
    401                 self.fail("File size after ftruncate wrong %d" % size)
    402 
    403         try:
    404             bug801631()
    405         finally:
    406             os.unlink(TESTFN)
    407 
    408     def testAppend(self):
    409         try:
    410             f = open(TESTFN, 'wb')
    411             f.write(b'spam')
    412             f.close()
    413             f = open(TESTFN, 'ab')
    414             f.write(b'eggs')
    415             f.close()
    416             f = open(TESTFN, 'rb')
    417             d = f.read()
    418             f.close()
    419             self.assertEqual(d, b'spameggs')
    420         finally:
    421             try:
    422                 os.unlink(TESTFN)
    423             except:
    424                 pass
    425 
    426     def testInvalidInit(self):
    427         self.assertRaises(TypeError, _FileIO, "1", 0, 0)
    428 
    429     def testWarnings(self):
    430         with check_warnings(quiet=True) as w:
    431             self.assertEqual(w.warnings, [])
    432             self.assertRaises(TypeError, _FileIO, [])
    433             self.assertEqual(w.warnings, [])
    434             self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
    435             self.assertEqual(w.warnings, [])
    436 
    437     def test_surrogates(self):
    438         # Issue #8438: try to open a filename containing surrogates.
    439         # It should either fail because the file doesn't exist or the filename
    440         # can't be represented using the filesystem encoding, but not because
    441         # of a LookupError for the error handler "surrogateescape".
    442         filename = u'\udc80.txt'
    443         try:
    444             with _FileIO(filename):
    445                 pass
    446         except (UnicodeEncodeError, IOError):
    447             pass
    448         # Spawn a separate Python process with a different "file system
    449         # default encoding", to exercise this further.
    450         env = dict(os.environ)
    451         env[b'LC_CTYPE'] = b'C'
    452         _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env)
    453         if ('UnicodeEncodeError' not in out and not
    454                 ( ('IOError: [Errno 2] No such file or directory' in out) or
    455                   ('IOError: [Errno 22] Invalid argument' in out) ) ):
    456             self.fail('Bad output: %r' % out)
    457 
    458     def testUnclosedFDOnException(self):
    459         class MyException(Exception): pass
    460         class MyFileIO(_FileIO):
    461             def __setattr__(self, name, value):
    462                 if name == "name":
    463                     raise MyException("blocked setting name")
    464                 return super(MyFileIO, self).__setattr__(name, value)
    465         fd = os.open(__file__, os.O_RDONLY)
    466         self.assertRaises(MyException, MyFileIO, fd)
    467         os.close(fd)  # should not raise OSError(EBADF)
    468 
    469 def test_main():
    470     # Historically, these tests have been sloppy about removing TESTFN.
    471     # So get rid of it no matter what.
    472     try:
    473         run_unittest(AutoFileTests, OtherFileTests)
    474     finally:
    475         if os.path.exists(TESTFN):
    476             os.unlink(TESTFN)
    477 
    478 if __name__ == '__main__':
    479     test_main()
    480