Home | History | Annotate | Download | only in test
      1 # Adapted from test_file.py by Daniel Stutzbach

      2 
      3 from __future__ import unicode_literals
      4 
      5 import sys
      6 import os
      7 import errno
      8 import unittest
      9 from array import array
     10 from weakref import proxy
     11 from functools import wraps
     12 
     13 from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd
     14 from test.test_support import py3k_bytes as bytes
     15 from test.script_helper import run_python
     16 
     17 from _io import FileIO as _FileIO
     18 
     19 class AutoFileTests(unittest.TestCase):
     20     # file tests for which a test file is automatically set up

     21 
     22     def setUp(self):
     23         self.f = _FileIO(TESTFN, 'w')
     24 
     25     def tearDown(self):
     26         if self.f:
     27             self.f.close()
     28         os.remove(TESTFN)
     29 
     30     def testWeakRefs(self):
     31         # verify weak references

     32         p = proxy(self.f)
     33         p.write(bytes(range(10)))
     34         self.assertEqual(self.f.tell(), p.tell())
     35         self.f.close()
     36         self.f = None
     37         self.assertRaises(ReferenceError, getattr, p, 'tell')
     38 
     39     def testSeekTell(self):
     40         self.f.write(bytes(range(20)))
     41         self.assertEqual(self.f.tell(), 20)
     42         self.f.seek(0)
     43         self.assertEqual(self.f.tell(), 0)
     44         self.f.seek(10)
     45         self.assertEqual(self.f.tell(), 10)
     46         self.f.seek(5, 1)
     47         self.assertEqual(self.f.tell(), 15)
     48         self.f.seek(-5, 1)
     49         self.assertEqual(self.f.tell(), 10)
     50         self.f.seek(-5, 2)
     51         self.assertEqual(self.f.tell(), 15)
     52 
     53     def testAttributes(self):
     54         # verify expected attributes exist

     55         f = self.f
     56 
     57         self.assertEqual(f.mode, "wb")
     58         self.assertEqual(f.closed, False)
     59 
     60         # verify the attributes are readonly

     61         for attr in 'mode', 'closed':
     62             self.assertRaises((AttributeError, TypeError),
     63                               setattr, f, attr, 'oops')
     64 
     65     def testReadinto(self):
     66         # verify readinto

     67         self.f.write(b"\x01\x02")
     68         self.f.close()
     69         a = array(b'b', b'x'*10)
     70         self.f = _FileIO(TESTFN, 'r')
     71         n = self.f.readinto(a)
     72         self.assertEqual(array(b'b', [1, 2]), a[:n])
     73 
     74     def test_none_args(self):
     75         self.f.write(b"hi\nbye\nabc")
     76         self.f.close()
     77         self.f = _FileIO(TESTFN, 'r')
     78         self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
     79         self.f.seek(0)
     80         self.assertEqual(self.f.readline(None), b"hi\n")
     81         self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
     82 
     83     def testRepr(self):
     84         self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>"
     85                                        % (self.f.name, self.f.mode))
     86         del self.f.name
     87         self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>"
     88                                        % (self.f.fileno(), self.f.mode))
     89         self.f.close()
     90         self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
     91 
     92     def testErrors(self):
     93         f = self.f
     94         self.assertTrue(not f.isatty())
     95         self.assertTrue(not f.closed)
     96         #self.assertEqual(f.name, TESTFN)

     97         self.assertRaises(ValueError, f.read, 10) # Open for reading

     98         f.close()
     99         self.assertTrue(f.closed)
    100         f = _FileIO(TESTFN, 'r')
    101         self.assertRaises(TypeError, f.readinto, "")
    102         self.assertTrue(not f.closed)
    103         f.close()
    104         self.assertTrue(f.closed)
    105 
    106     def testMethods(self):
    107         methods = ['fileno', 'isatty', 'read', 'readinto',
    108                    'seek', 'tell', 'truncate', 'write', 'seekable',
    109                    'readable', 'writable']
    110         if sys.platform.startswith('atheos'):
    111             methods.remove('truncate')
    112 
    113         self.f.close()
    114         self.assertTrue(self.f.closed)
    115 
    116         for methodname in methods:
    117             method = getattr(self.f, methodname)
    118             # should raise on closed file

    119             self.assertRaises(ValueError, method)
    120 
    121     def testOpendir(self):
    122         # Issue 3703: opening a directory should fill the errno

    123         # Windows always returns "[Errno 13]: Permission denied

    124         # Unix calls dircheck() and returns "[Errno 21]: Is a directory"

    125         try:
    126             _FileIO('.', 'r')
    127         except IOError as e:
    128             self.assertNotEqual(e.errno, 0)
    129             self.assertEqual(e.filename, ".")
    130         else:
    131             self.fail("Should have raised IOError")
    132 
    133     #A set of functions testing that we get expected behaviour if someone has

    134     #manually closed the internal file descriptor.  First, a decorator:

    135     def ClosedFD(func):
    136         @wraps(func)
    137         def wrapper(self):
    138             #forcibly close the fd before invoking the problem function

    139             f = self.f
    140             os.close(f.fileno())
    141             try:
    142                 func(self, f)
    143             finally:
    144                 try:
    145                     self.f.close()
    146                 except IOError:
    147                     pass
    148         return wrapper
    149 
    150     def ClosedFDRaises(func):
    151         @wraps(func)
    152         def wrapper(self):
    153             #forcibly close the fd before invoking the problem function

    154             f = self.f
    155             os.close(f.fileno())
    156             try:
    157                 func(self, f)
    158             except IOError as e:
    159                 self.assertEqual(e.errno, errno.EBADF)
    160             else:
    161                 self.fail("Should have raised IOError")
    162             finally:
    163                 try:
    164                     self.f.close()
    165                 except IOError:
    166                     pass
    167         return wrapper
    168 
    169     @ClosedFDRaises
    170     def testErrnoOnClose(self, f):
    171         f.close()
    172 
    173     @ClosedFDRaises
    174     def testErrnoOnClosedWrite(self, f):
    175         f.write('a')
    176 
    177     @ClosedFDRaises
    178     def testErrnoOnClosedSeek(self, f):
    179         f.seek(0)
    180 
    181     @ClosedFDRaises
    182     def testErrnoOnClosedTell(self, f):
    183         f.tell()
    184 
    185     @ClosedFDRaises
    186     def testErrnoOnClosedTruncate(self, f):
    187         f.truncate(0)
    188 
    189     @ClosedFD
    190     def testErrnoOnClosedSeekable(self, f):
    191         f.seekable()
    192 
    193     @ClosedFD
    194     def testErrnoOnClosedReadable(self, f):
    195         f.readable()
    196 
    197     @ClosedFD
    198     def testErrnoOnClosedWritable(self, f):
    199         f.writable()
    200 
    201     @ClosedFD
    202     def testErrnoOnClosedFileno(self, f):
    203         f.fileno()
    204 
    205     @ClosedFD
    206     def testErrnoOnClosedIsatty(self, f):
    207         self.assertEqual(f.isatty(), False)
    208 
    209     def ReopenForRead(self):
    210         try:
    211             self.f.close()
    212         except IOError:
    213             pass
    214         self.f = _FileIO(TESTFN, 'r')
    215         os.close(self.f.fileno())
    216         return self.f
    217 
    218     @ClosedFDRaises
    219     def testErrnoOnClosedRead(self, f):
    220         f = self.ReopenForRead()
    221         f.read(1)
    222 
    223     @ClosedFDRaises
    224     def testErrnoOnClosedReadall(self, f):
    225         f = self.ReopenForRead()
    226         f.readall()
    227 
    228     @ClosedFDRaises
    229     def testErrnoOnClosedReadinto(self, f):
    230         f = self.ReopenForRead()
    231         a = array(b'b', b'x'*10)
    232         f.readinto(a)
    233 
    234 class OtherFileTests(unittest.TestCase):
    235 
    236     def testAbles(self):
    237         try:
    238             f = _FileIO(TESTFN, "w")
    239             self.assertEqual(f.readable(), False)
    240             self.assertEqual(f.writable(), True)
    241             self.assertEqual(f.seekable(), True)
    242             f.close()
    243 
    244             f = _FileIO(TESTFN, "r")
    245             self.assertEqual(f.readable(), True)
    246             self.assertEqual(f.writable(), False)
    247             self.assertEqual(f.seekable(), True)
    248             f.close()
    249 
    250             f = _FileIO(TESTFN, "a+")
    251             self.assertEqual(f.readable(), True)
    252             self.assertEqual(f.writable(), True)
    253             self.assertEqual(f.seekable(), True)
    254             self.assertEqual(f.isatty(), False)
    255             f.close()
    256 
    257             if sys.platform != "win32":
    258                 try:
    259                     f = _FileIO("/dev/tty", "a")
    260                 except EnvironmentError:
    261                     # When run in a cron job there just aren't any

    262                     # ttys, so skip the test.  This also handles other

    263                     # OS'es that don't support /dev/tty.

    264                     pass
    265                 else:
    266                     self.assertEqual(f.readable(), False)
    267                     self.assertEqual(f.writable(), True)
    268                     if sys.platform != "darwin" and \
    269                        'bsd' not in sys.platform and \
    270                        not sys.platform.startswith('sunos'):
    271                         # Somehow /dev/tty appears seekable on some BSDs

    272                         self.assertEqual(f.seekable(), False)
    273                     self.assertEqual(f.isatty(), True)
    274                     f.close()
    275         finally:
    276             os.unlink(TESTFN)
    277 
    278     def testModeStrings(self):
    279         # check invalid mode strings

    280         for mode in ("", "aU", "wU+", "rw", "rt"):
    281             try:
    282                 f = _FileIO(TESTFN, mode)
    283             except ValueError:
    284                 pass
    285             else:
    286                 f.close()
    287                 self.fail('%r is an invalid file mode' % mode)
    288 
    289     def testUnicodeOpen(self):
    290         # verify repr works for unicode too

    291         f = _FileIO(str(TESTFN), "w")
    292         f.close()
    293         os.unlink(TESTFN)
    294 
    295     def testBytesOpen(self):
    296         # Opening a bytes filename

    297         try:
    298             fn = TESTFN.encode("ascii")
    299         except UnicodeEncodeError:
    300             # Skip test

    301             return
    302         f = _FileIO(fn, "w")
    303         try:
    304             f.write(b"abc")
    305             f.close()
    306             with open(TESTFN, "rb") as f:
    307                 self.assertEqual(f.read(), b"abc")
    308         finally:
    309             os.unlink(TESTFN)
    310 
    311     def testInvalidFd(self):
    312         self.assertRaises(ValueError, _FileIO, -10)
    313         self.assertRaises(OSError, _FileIO, make_bad_fd())
    314         if sys.platform == 'win32':
    315             import msvcrt
    316             self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())
    317 
    318     def testBadModeArgument(self):
    319         # verify that we get a sensible error message for bad mode argument

    320         bad_mode = "qwerty"
    321         try:
    322             f = _FileIO(TESTFN, bad_mode)
    323         except ValueError as msg:
    324             if msg.args[0] != 0:
    325                 s = str(msg)
    326                 if TESTFN in s or bad_mode not in s:
    327                     self.fail("bad error message for invalid mode: %s" % s)
    328             # if msg.args[0] == 0, we're probably on Windows where there may be

    329             # no obvious way to discover why open() failed.

    330         else:
    331             f.close()
    332             self.fail("no error for invalid mode: %s" % bad_mode)
    333 
    334     def testTruncate(self):
    335         f = _FileIO(TESTFN, 'w')
    336         f.write(bytes(bytearray(range(10))))
    337         self.assertEqual(f.tell(), 10)
    338         f.truncate(5)
    339         self.assertEqual(f.tell(), 10)
    340         self.assertEqual(f.seek(0, os.SEEK_END), 5)
    341         f.truncate(15)
    342         self.assertEqual(f.tell(), 5)
    343         self.assertEqual(f.seek(0, os.SEEK_END), 15)
    344         f.close()
    345 
    346     def testTruncateOnWindows(self):
    347         def bug801631():
    348             # SF bug <http://www.python.org/sf/801631>

    349             # "file.truncate fault on windows"

    350             f = _FileIO(TESTFN, 'w')
    351             f.write(bytes(range(11)))
    352             f.close()
    353 
    354             f = _FileIO(TESTFN,'r+')
    355             data = f.read(5)
    356             if data != bytes(range(5)):
    357                 self.fail("Read on file opened for update failed %r" % data)
    358             if f.tell() != 5:
    359                 self.fail("File pos after read wrong %d" % f.tell())
    360 
    361             f.truncate()
    362             if f.tell() != 5:
    363                 self.fail("File pos after ftruncate wrong %d" % f.tell())
    364 
    365             f.close()
    366             size = os.path.getsize(TESTFN)
    367             if size != 5:
    368                 self.fail("File size after ftruncate wrong %d" % size)
    369 
    370         try:
    371             bug801631()
    372         finally:
    373             os.unlink(TESTFN)
    374 
    375     def testAppend(self):
    376         try:
    377             f = open(TESTFN, 'wb')
    378             f.write(b'spam')
    379             f.close()
    380             f = open(TESTFN, 'ab')
    381             f.write(b'eggs')
    382             f.close()
    383             f = open(TESTFN, 'rb')
    384             d = f.read()
    385             f.close()
    386             self.assertEqual(d, b'spameggs')
    387         finally:
    388             try:
    389                 os.unlink(TESTFN)
    390             except:
    391                 pass
    392 
    393     def testInvalidInit(self):
    394         self.assertRaises(TypeError, _FileIO, "1", 0, 0)
    395 
    396     def testWarnings(self):
    397         with check_warnings(quiet=True) as w:
    398             self.assertEqual(w.warnings, [])
    399             self.assertRaises(TypeError, _FileIO, [])
    400             self.assertEqual(w.warnings, [])
    401             self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
    402             self.assertEqual(w.warnings, [])
    403 
    404     def test_surrogates(self):
    405         # Issue #8438: try to open a filename containing surrogates.

    406         # It should either fail because the file doesn't exist or the filename

    407         # can't be represented using the filesystem encoding, but not because

    408         # of a LookupError for the error handler "surrogateescape".

    409         filename = u'\udc80.txt'
    410         try:
    411             with _FileIO(filename):
    412                 pass
    413         except (UnicodeEncodeError, IOError):
    414             pass
    415         # Spawn a separate Python process with a different "file system

    416         # default encoding", to exercise this further.

    417         env = dict(os.environ)
    418         env[b'LC_CTYPE'] = b'C'
    419         _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env)
    420         if ('UnicodeEncodeError' not in out and
    421             'IOError: [Errno 2] No such file or directory' not in out):
    422             self.fail('Bad output: %r' % out)
    423 
    424 def test_main():
    425     # Historically, these tests have been sloppy about removing TESTFN.

    426     # So get rid of it no matter what.

    427     try:
    428         run_unittest(AutoFileTests, OtherFileTests)
    429     finally:
    430         if os.path.exists(TESTFN):
    431             os.unlink(TESTFN)
    432 
    433 if __name__ == '__main__':
    434     test_main()
    435