Home | History | Annotate | Download | only in test
      1 """Test script for the gzip module.
      2 """
      3 
      4 import unittest
      5 from test import test_support
      6 import os
      7 import io
      8 import struct
      9 gzip = test_support.import_module('gzip')
     10 
     11 data1 = """  int length=DEFAULTALLOC, err = Z_OK;
     12   PyObject *RetVal;
     13   int flushmode = Z_FINISH;
     14   unsigned long start_total_out;
     15 
     16 """
     17 
     18 data2 = """/* zlibmodule.c -- gzip-compatible data compression */
     19 /* See http://www.gzip.org/zlib/
     20 /* See http://www.winimage.com/zLibDll for Windows */
     21 """
     22 
     23 
     24 class TestGzip(unittest.TestCase):
     25     filename = test_support.TESTFN
     26 
     27     def setUp(self):
     28         test_support.unlink(self.filename)
     29 
     30     def tearDown(self):
     31         test_support.unlink(self.filename)
     32 
     33     def write_and_read_back(self, data, mode='b'):
     34         b_data = memoryview(data).tobytes()
     35         with gzip.GzipFile(self.filename, 'w'+mode) as f:
     36             l = f.write(data)
     37         self.assertEqual(l, len(b_data))
     38         with gzip.GzipFile(self.filename, 'r'+mode) as f:
     39             self.assertEqual(f.read(), b_data)
     40 
     41     @test_support.requires_unicode
     42     def test_unicode_filename(self):
     43         unicode_filename = test_support.TESTFN_UNICODE
     44         try:
     45             unicode_filename.encode(test_support.TESTFN_ENCODING)
     46         except (UnicodeError, TypeError):
     47             self.skipTest("Requires unicode filenames support")
     48         self.filename = unicode_filename
     49         with gzip.GzipFile(unicode_filename, "wb") as f:
     50             f.write(data1 * 50)
     51         with gzip.GzipFile(unicode_filename, "rb") as f:
     52             self.assertEqual(f.read(), data1 * 50)
     53         # Sanity check that we are actually operating on the right file.
     54         with open(unicode_filename, 'rb') as fobj, \
     55              gzip.GzipFile(fileobj=fobj, mode="rb") as f:
     56             self.assertEqual(f.read(), data1 * 50)
     57 
     58     def test_write(self):
     59         with gzip.GzipFile(self.filename, 'wb') as f:
     60             f.write(data1 * 50)
     61 
     62             # Try flush and fileno.
     63             f.flush()
     64             f.fileno()
     65             if hasattr(os, 'fsync'):
     66                 os.fsync(f.fileno())
     67             f.close()
     68 
     69         # Test multiple close() calls.
     70         f.close()
     71 
     72     # The following test_write_xy methods test that write accepts
     73     # the corresponding bytes-like object type as input
     74     # and that the data written equals bytes(xy) in all cases.
     75     def test_write_memoryview(self):
     76         self.write_and_read_back(memoryview(data1 * 50))
     77 
     78     def test_write_incompatible_type(self):
     79         # Test that non-bytes-like types raise TypeError.
     80         # Issue #21560: attempts to write incompatible types
     81         # should not affect the state of the fileobject
     82         with gzip.GzipFile(self.filename, 'wb') as f:
     83             with self.assertRaises(UnicodeEncodeError):
     84                 f.write(u'\xff')
     85             with self.assertRaises(TypeError):
     86                 f.write([1])
     87             f.write(data1)
     88         with gzip.GzipFile(self.filename, 'rb') as f:
     89             self.assertEqual(f.read(), data1)
     90 
     91     def test_read(self):
     92         self.test_write()
     93         # Try reading.
     94         with gzip.GzipFile(self.filename, 'r') as f:
     95             d = f.read()
     96         self.assertEqual(d, data1*50)
     97 
     98     def test_read_universal_newlines(self):
     99         # Issue #5148: Reading breaks when mode contains 'U'.
    100         self.test_write()
    101         with gzip.GzipFile(self.filename, 'rU') as f:
    102             d = f.read()
    103         self.assertEqual(d, data1*50)
    104 
    105     def test_io_on_closed_object(self):
    106         # Test that I/O operations on closed GzipFile objects raise a
    107         # ValueError, just like the corresponding functions on file objects.
    108 
    109         # Write to a file, open it for reading, then close it.
    110         self.test_write()
    111         f = gzip.GzipFile(self.filename, 'r')
    112         f.close()
    113         with self.assertRaises(ValueError):
    114             f.read(1)
    115         with self.assertRaises(ValueError):
    116             f.seek(0)
    117         with self.assertRaises(ValueError):
    118             f.tell()
    119         # Open the file for writing, then close it.
    120         f = gzip.GzipFile(self.filename, 'w')
    121         f.close()
    122         with self.assertRaises(ValueError):
    123             f.write('')
    124         with self.assertRaises(ValueError):
    125             f.flush()
    126 
    127     def test_append(self):
    128         self.test_write()
    129         # Append to the previous file
    130         with gzip.GzipFile(self.filename, 'ab') as f:
    131             f.write(data2 * 15)
    132 
    133         with gzip.GzipFile(self.filename, 'rb') as f:
    134             d = f.read()
    135         self.assertEqual(d, (data1*50) + (data2*15))
    136 
    137     def test_many_append(self):
    138         # Bug #1074261 was triggered when reading a file that contained
    139         # many, many members.  Create such a file and verify that reading it
    140         # works.
    141         with gzip.open(self.filename, 'wb', 9) as f:
    142             f.write('a')
    143         for i in range(0, 200):
    144             with gzip.open(self.filename, "ab", 9) as f: # append
    145                 f.write('a')
    146 
    147         # Try reading the file
    148         with gzip.open(self.filename, "rb") as zgfile:
    149             contents = ""
    150             while 1:
    151                 ztxt = zgfile.read(8192)
    152                 contents += ztxt
    153                 if not ztxt: break
    154         self.assertEqual(contents, 'a'*201)
    155 
    156     def test_buffered_reader(self):
    157         # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
    158         # performance.
    159         self.test_write()
    160 
    161         with gzip.GzipFile(self.filename, 'rb') as f:
    162             with io.BufferedReader(f) as r:
    163                 lines = [line for line in r]
    164 
    165         self.assertEqual(lines, 50 * data1.splitlines(True))
    166 
    167     def test_readline(self):
    168         self.test_write()
    169         # Try .readline() with varying line lengths
    170 
    171         with gzip.GzipFile(self.filename, 'rb') as f:
    172             line_length = 0
    173             while 1:
    174                 L = f.readline(line_length)
    175                 if not L and line_length != 0: break
    176                 self.assertTrue(len(L) <= line_length)
    177                 line_length = (line_length + 1) % 50
    178 
    179     def test_readlines(self):
    180         self.test_write()
    181         # Try .readlines()
    182 
    183         with gzip.GzipFile(self.filename, 'rb') as f:
    184             L = f.readlines()
    185 
    186         with gzip.GzipFile(self.filename, 'rb') as f:
    187             while 1:
    188                 L = f.readlines(150)
    189                 if L == []: break
    190 
    191     def test_seek_read(self):
    192         self.test_write()
    193         # Try seek, read test
    194 
    195         with gzip.GzipFile(self.filename) as f:
    196             while 1:
    197                 oldpos = f.tell()
    198                 line1 = f.readline()
    199                 if not line1: break
    200                 newpos = f.tell()
    201                 f.seek(oldpos)  # negative seek
    202                 if len(line1)>10:
    203                     amount = 10
    204                 else:
    205                     amount = len(line1)
    206                 line2 = f.read(amount)
    207                 self.assertEqual(line1[:amount], line2)
    208                 f.seek(newpos)  # positive seek
    209 
    210     def test_seek_whence(self):
    211         self.test_write()
    212         # Try seek(whence=1), read test
    213 
    214         with gzip.GzipFile(self.filename) as f:
    215             f.read(10)
    216             f.seek(10, whence=1)
    217             y = f.read(10)
    218         self.assertEqual(y, data1[20:30])
    219 
    220     def test_seek_write(self):
    221         # Try seek, write test
    222         with gzip.GzipFile(self.filename, 'w') as f:
    223             for pos in range(0, 256, 16):
    224                 f.seek(pos)
    225                 f.write('GZ\n')
    226 
    227     def test_mode(self):
    228         self.test_write()
    229         with gzip.GzipFile(self.filename, 'r') as f:
    230             self.assertEqual(f.myfileobj.mode, 'rb')
    231 
    232     def test_1647484(self):
    233         for mode in ('wb', 'rb'):
    234             with gzip.GzipFile(self.filename, mode) as f:
    235                 self.assertTrue(hasattr(f, "name"))
    236                 self.assertEqual(f.name, self.filename)
    237 
    238     def test_mtime(self):
    239         mtime = 123456789
    240         with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
    241             fWrite.write(data1)
    242         with gzip.GzipFile(self.filename) as fRead:
    243             dataRead = fRead.read()
    244             self.assertEqual(dataRead, data1)
    245             self.assertTrue(hasattr(fRead, 'mtime'))
    246             self.assertEqual(fRead.mtime, mtime)
    247 
    248     def test_metadata(self):
    249         mtime = 123456789
    250 
    251         with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
    252             fWrite.write(data1)
    253 
    254         with open(self.filename, 'rb') as fRead:
    255             # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
    256 
    257             idBytes = fRead.read(2)
    258             self.assertEqual(idBytes, '\x1f\x8b') # gzip ID
    259 
    260             cmByte = fRead.read(1)
    261             self.assertEqual(cmByte, '\x08') # deflate
    262 
    263             flagsByte = fRead.read(1)
    264             self.assertEqual(flagsByte, '\x08') # only the FNAME flag is set
    265 
    266             mtimeBytes = fRead.read(4)
    267             self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
    268 
    269             xflByte = fRead.read(1)
    270             self.assertEqual(xflByte, '\x02') # maximum compression
    271 
    272             osByte = fRead.read(1)
    273             self.assertEqual(osByte, '\xff') # OS "unknown" (OS-independent)
    274 
    275             # Since the FNAME flag is set, the zero-terminated filename follows.
    276             # RFC 1952 specifies that this is the name of the input file, if any.
    277             # However, the gzip module defaults to storing the name of the output
    278             # file in this field.
    279             expected = self.filename.encode('Latin-1') + '\x00'
    280             nameBytes = fRead.read(len(expected))
    281             self.assertEqual(nameBytes, expected)
    282 
    283             # Since no other flags were set, the header ends here.
    284             # Rather than process the compressed data, let's seek to the trailer.
    285             fRead.seek(os.stat(self.filename).st_size - 8)
    286 
    287             crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
    288             self.assertEqual(crc32Bytes, '\xaf\xd7d\x83')
    289 
    290             isizeBytes = fRead.read(4)
    291             self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
    292 
    293     def test_with_open(self):
    294         # GzipFile supports the context management protocol
    295         with gzip.GzipFile(self.filename, "wb") as f:
    296             f.write(b"xxx")
    297         f = gzip.GzipFile(self.filename, "rb")
    298         f.close()
    299         try:
    300             with f:
    301                 pass
    302         except ValueError:
    303             pass
    304         else:
    305             self.fail("__enter__ on a closed file didn't raise an exception")
    306         try:
    307             with gzip.GzipFile(self.filename, "wb") as f:
    308                 1 // 0
    309         except ZeroDivisionError:
    310             pass
    311         else:
    312             self.fail("1 // 0 didn't raise an exception")
    313 
    314     def test_zero_padded_file(self):
    315         with gzip.GzipFile(self.filename, "wb") as f:
    316             f.write(data1 * 50)
    317 
    318         # Pad the file with zeroes
    319         with open(self.filename, "ab") as f:
    320             f.write("\x00" * 50)
    321 
    322         with gzip.GzipFile(self.filename, "rb") as f:
    323             d = f.read()
    324             self.assertEqual(d, data1 * 50, "Incorrect data in file")
    325 
    326     def test_fileobj_from_fdopen(self):
    327         # Issue #13781: Creating a GzipFile using a fileobj from os.fdopen()
    328         # should not embed the fake filename "<fdopen>" in the output file.
    329         fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
    330         with os.fdopen(fd, "wb") as f:
    331             with gzip.GzipFile(fileobj=f, mode="w") as g:
    332                 self.assertEqual(g.name, "")
    333 
    334     def test_read_with_extra(self):
    335         # Gzip data with an extra field
    336         gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
    337                   b'\x05\x00Extra'
    338                   b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
    339         with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
    340             self.assertEqual(f.read(), b'Test')
    341 
    342 def test_main(verbose=None):
    343     test_support.run_unittest(TestGzip)
    344 
    345 if __name__ == "__main__":
    346     test_main(verbose=True)
    347