Home | History | Annotate | Download | only in test
      1 """Test largefile support on system where this makes sense.
      2 """
      3 
      4 from __future__ import print_function
      5 
      6 import os
      7 import stat
      8 import sys
      9 import unittest
     10 from test.test_support import run_unittest, TESTFN, verbose, requires, \
     11                               unlink
     12 import io  # C implementation of io
     13 import _pyio as pyio # Python implementation of io
     14 
     15 try:
     16     import signal
     17     # The default handler for SIGXFSZ is to abort the process.
     18     # By ignoring it, system calls exceeding the file size resource
     19     # limit will raise IOError instead of crashing the interpreter.
     20     oldhandler = signal.signal(signal.SIGXFSZ, signal.SIG_IGN)
     21 except (ImportError, AttributeError):
     22     pass
     23 
     24 # create >2GB file (2GB = 2147483648 bytes)
     25 size = 2500000000
     26 
     27 
     28 class LargeFileTest(unittest.TestCase):
     29     """Test that each file function works as expected for a large
     30     (i.e. > 2GB, do  we have to check > 4GB) files.
     31 
     32     NOTE: the order of execution of the test methods is important! test_seek
     33     must run first to create the test file. File cleanup must also be handled
     34     outside the test instances because of this.
     35 
     36     """
     37 
     38     def test_seek(self):
     39         if verbose:
     40             print('create large file via seek (may be sparse file) ...')
     41         with self.open(TESTFN, 'wb') as f:
     42             f.write(b'z')
     43             f.seek(0)
     44             f.seek(size)
     45             f.write(b'a')
     46             f.flush()
     47             if verbose:
     48                 print('check file size with os.fstat')
     49             self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1)
     50 
     51     def test_osstat(self):
     52         if verbose:
     53             print('check file size with os.stat')
     54         self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1)
     55 
     56     def test_seek_read(self):
     57         if verbose:
     58             print('play around with seek() and read() with the built largefile')
     59         with self.open(TESTFN, 'rb') as f:
     60             self.assertEqual(f.tell(), 0)
     61             self.assertEqual(f.read(1), b'z')
     62             self.assertEqual(f.tell(), 1)
     63             f.seek(0)
     64             self.assertEqual(f.tell(), 0)
     65             f.seek(0, 0)
     66             self.assertEqual(f.tell(), 0)
     67             f.seek(42)
     68             self.assertEqual(f.tell(), 42)
     69             f.seek(42, 0)
     70             self.assertEqual(f.tell(), 42)
     71             f.seek(42, 1)
     72             self.assertEqual(f.tell(), 84)
     73             f.seek(0, 1)
     74             self.assertEqual(f.tell(), 84)
     75             f.seek(0, 2)  # seek from the end
     76             self.assertEqual(f.tell(), size + 1 + 0)
     77             f.seek(-10, 2)
     78             self.assertEqual(f.tell(), size + 1 - 10)
     79             f.seek(-size-1, 2)
     80             self.assertEqual(f.tell(), 0)
     81             f.seek(size)
     82             self.assertEqual(f.tell(), size)
     83             # the 'a' that was written at the end of file above
     84             self.assertEqual(f.read(1), b'a')
     85             f.seek(-size-1, 1)
     86             self.assertEqual(f.read(1), b'z')
     87             self.assertEqual(f.tell(), 1)
     88 
     89     def test_lseek(self):
     90         if verbose:
     91             print('play around with os.lseek() with the built largefile')
     92         with self.open(TESTFN, 'rb') as f:
     93             self.assertEqual(os.lseek(f.fileno(), 0, 0), 0)
     94             self.assertEqual(os.lseek(f.fileno(), 42, 0), 42)
     95             self.assertEqual(os.lseek(f.fileno(), 42, 1), 84)
     96             self.assertEqual(os.lseek(f.fileno(), 0, 1), 84)
     97             self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0)
     98             self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10)
     99             self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0)
    100             self.assertEqual(os.lseek(f.fileno(), size, 0), size)
    101             # the 'a' that was written at the end of file above
    102             self.assertEqual(f.read(1), b'a')
    103 
    104     def test_truncate(self):
    105         if verbose:
    106             print('try truncate')
    107         with self.open(TESTFN, 'r+b') as f:
    108             # this is already decided before start running the test suite
    109             # but we do it anyway for extra protection
    110             if not hasattr(f, 'truncate'):
    111                 raise unittest.SkipTest("open().truncate() not available on this system")
    112             f.seek(0, 2)
    113             # else we've lost track of the true size
    114             self.assertEqual(f.tell(), size+1)
    115             # Cut it back via seek + truncate with no argument.
    116             newsize = size - 10
    117             f.seek(newsize)
    118             f.truncate()
    119             self.assertEqual(f.tell(), newsize)  # else pointer moved
    120             f.seek(0, 2)
    121             self.assertEqual(f.tell(), newsize)  # else wasn't truncated
    122             # Ensure that truncate(smaller than true size) shrinks
    123             # the file.
    124             newsize -= 1
    125             f.seek(42)
    126             f.truncate(newsize)
    127             if self.new_io:
    128                 self.assertEqual(f.tell(), 42)
    129             f.seek(0, 2)
    130             self.assertEqual(f.tell(), newsize)
    131             # XXX truncate(larger than true size) is ill-defined
    132             # across platform; cut it waaaaay back
    133             f.seek(0)
    134             f.truncate(1)
    135             if self.new_io:
    136                 self.assertEqual(f.tell(), 0)       # else pointer moved
    137             f.seek(0)
    138             self.assertEqual(len(f.read()), 1)  # else wasn't truncated
    139 
    140     def test_seekable(self):
    141         # Issue #5016; seekable() can return False when the current position
    142         # is negative when truncated to an int.
    143         if not self.new_io:
    144             self.skipTest("builtin file doesn't have seekable()")
    145         for pos in (2**31-1, 2**31, 2**31+1):
    146             with self.open(TESTFN, 'rb') as f:
    147                 f.seek(pos)
    148                 self.assertTrue(f.seekable())
    149 
    150 
    151 def test_main():
    152     # On Windows and Mac OSX this test comsumes large resources; It
    153     # takes a long time to build the >2GB file and takes >2GB of disk
    154     # space therefore the resource must be enabled to run this test.
    155     # If not, nothing after this line stanza will be executed.
    156     if sys.platform[:3] == 'win' or sys.platform == 'darwin':
    157         requires('largefile',
    158                  'test requires %s bytes and a long time to run' % str(size))
    159     else:
    160         # Only run if the current filesystem supports large files.
    161         # (Skip this test on Windows, since we now always support
    162         # large files.)
    163         f = open(TESTFN, 'wb', buffering=0)
    164         try:
    165             # 2**31 == 2147483648
    166             f.seek(2147483649)
    167             # Seeking is not enough of a test: you must write and
    168             # flush, too!
    169             f.write(b'x')
    170             f.flush()
    171         except (IOError, OverflowError):
    172             f.close()
    173             unlink(TESTFN)
    174             raise unittest.SkipTest("filesystem does not have largefile support")
    175         else:
    176             f.close()
    177     suite = unittest.TestSuite()
    178     for _open, prefix in [(io.open, 'C'), (pyio.open, 'Py'),
    179                           (open, 'Builtin')]:
    180         class TestCase(LargeFileTest):
    181             pass
    182         TestCase.open = staticmethod(_open)
    183         TestCase.new_io = _open is not open
    184         TestCase.__name__ = prefix + LargeFileTest.__name__
    185         suite.addTest(TestCase('test_seek'))
    186         suite.addTest(TestCase('test_osstat'))
    187         suite.addTest(TestCase('test_seek_read'))
    188         suite.addTest(TestCase('test_lseek'))
    189         with _open(TESTFN, 'wb') as f:
    190             if hasattr(f, 'truncate'):
    191                 suite.addTest(TestCase('test_truncate'))
    192         suite.addTest(TestCase('test_seekable'))
    193         unlink(TESTFN)
    194     try:
    195         run_unittest(suite)
    196     finally:
    197         unlink(TESTFN)
    198 
    199 if __name__ == '__main__':
    200     test_main()
    201