Home | History | Annotate | Download | only in io
      1 # This Python file uses the following encoding: utf-8
      2 # Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 # =============================================================================
     16 """Testing File IO operations in file_io.py."""
     17 
     18 from __future__ import absolute_import
     19 from __future__ import division
     20 from __future__ import print_function
     21 
     22 import os.path
     23 
     24 from tensorflow.python.framework import errors
     25 from tensorflow.python.lib.io import file_io
     26 from tensorflow.python.platform import test
     27 
     28 
     29 class FileIoTest(test.TestCase):
     30 
     31   def setUp(self):
     32     self._base_dir = os.path.join(self.get_temp_dir(), "base_dir")
     33     file_io.create_dir(self._base_dir)
     34 
     35   def tearDown(self):
     36     file_io.delete_recursively(self._base_dir)
     37 
     38   def testEmptyFilename(self):
     39     f = file_io.FileIO("", mode="r")
     40     with self.assertRaises(errors.NotFoundError):
     41       _ = f.read()
     42 
     43   def testFileDoesntExist(self):
     44     file_path = os.path.join(self._base_dir, "temp_file")
     45     self.assertFalse(file_io.file_exists(file_path))
     46     with self.assertRaises(errors.NotFoundError):
     47       _ = file_io.read_file_to_string(file_path)
     48 
     49   def testWriteToString(self):
     50     file_path = os.path.join(self._base_dir, "temp_file")
     51     file_io.write_string_to_file(file_path, "testing")
     52     self.assertTrue(file_io.file_exists(file_path))
     53     file_contents = file_io.read_file_to_string(file_path)
     54     self.assertEqual("testing", file_contents)
     55 
     56   def testAtomicWriteStringToFile(self):
     57     file_path = os.path.join(self._base_dir, "temp_file")
     58     file_io.atomic_write_string_to_file(file_path, "testing")
     59     self.assertTrue(file_io.file_exists(file_path))
     60     file_contents = file_io.read_file_to_string(file_path)
     61     self.assertEqual("testing", file_contents)
     62 
     63   def testAtomicWriteStringToFileOverwriteFalse(self):
     64     file_path = os.path.join(self._base_dir, "temp_file")
     65     file_io.atomic_write_string_to_file(file_path, "old", overwrite=False)
     66     with self.assertRaises(errors.AlreadyExistsError):
     67       file_io.atomic_write_string_to_file(file_path, "new", overwrite=False)
     68     file_contents = file_io.read_file_to_string(file_path)
     69     self.assertEqual("old", file_contents)
     70     file_io.delete_file(file_path)
     71     file_io.atomic_write_string_to_file(file_path, "new", overwrite=False)
     72     file_contents = file_io.read_file_to_string(file_path)
     73     self.assertEqual("new", file_contents)
     74 
     75   def testReadBinaryMode(self):
     76     file_path = os.path.join(self._base_dir, "temp_file")
     77     file_io.write_string_to_file(file_path, "testing")
     78     with file_io.FileIO(file_path, mode="rb") as f:
     79       self.assertEqual(b"testing", f.read())
     80 
     81   def testWriteBinaryMode(self):
     82     file_path = os.path.join(self._base_dir, "temp_file")
     83     file_io.FileIO(file_path, "wb").write("testing")
     84     with file_io.FileIO(file_path, mode="r") as f:
     85       self.assertEqual("testing", f.read())
     86 
     87   def testAppend(self):
     88     file_path = os.path.join(self._base_dir, "temp_file")
     89     with file_io.FileIO(file_path, mode="w") as f:
     90       f.write("begin\n")
     91     with file_io.FileIO(file_path, mode="a") as f:
     92       f.write("a1\n")
     93     with file_io.FileIO(file_path, mode="a") as f:
     94       f.write("a2\n")
     95     with file_io.FileIO(file_path, mode="r") as f:
     96       file_contents = f.read()
     97       self.assertEqual("begin\na1\na2\n", file_contents)
     98 
     99   def testMultipleFiles(self):
    100     file_prefix = os.path.join(self._base_dir, "temp_file")
    101     for i in range(5000):
    102       f = file_io.FileIO(file_prefix + str(i), mode="w+")
    103       f.write("testing")
    104       f.flush()
    105       self.assertEqual("testing", f.read())
    106       f.close()
    107 
    108   def testMultipleWrites(self):
    109     file_path = os.path.join(self._base_dir, "temp_file")
    110     with file_io.FileIO(file_path, mode="w") as f:
    111       f.write("line1\n")
    112       f.write("line2")
    113     file_contents = file_io.read_file_to_string(file_path)
    114     self.assertEqual("line1\nline2", file_contents)
    115 
    116   def testFileWriteBadMode(self):
    117     file_path = os.path.join(self._base_dir, "temp_file")
    118     with self.assertRaises(errors.PermissionDeniedError):
    119       file_io.FileIO(file_path, mode="r").write("testing")
    120 
    121   def testFileReadBadMode(self):
    122     file_path = os.path.join(self._base_dir, "temp_file")
    123     file_io.FileIO(file_path, mode="w").write("testing")
    124     self.assertTrue(file_io.file_exists(file_path))
    125     with self.assertRaises(errors.PermissionDeniedError):
    126       file_io.FileIO(file_path, mode="w").read()
    127 
    128   def testFileDelete(self):
    129     file_path = os.path.join(self._base_dir, "temp_file")
    130     file_io.FileIO(file_path, mode="w").write("testing")
    131     file_io.delete_file(file_path)
    132     self.assertFalse(file_io.file_exists(file_path))
    133 
    134   def testFileDeleteFail(self):
    135     file_path = os.path.join(self._base_dir, "temp_file")
    136     with self.assertRaises(errors.NotFoundError):
    137       file_io.delete_file(file_path)
    138 
    139   def testGetMatchingFiles(self):
    140     dir_path = os.path.join(self._base_dir, "temp_dir")
    141     file_io.create_dir(dir_path)
    142     files = ["file1.txt", "file2.txt", "file3.txt"]
    143     for name in files:
    144       file_path = os.path.join(dir_path, name)
    145       file_io.FileIO(file_path, mode="w").write("testing")
    146     expected_match = [os.path.join(dir_path, name) for name in files]
    147     self.assertItemsEqual(
    148         file_io.get_matching_files(os.path.join(dir_path, "file*.txt")),
    149         expected_match)
    150     self.assertItemsEqual(file_io.get_matching_files(tuple()), [])
    151     files_subset = [
    152         os.path.join(dir_path, files[0]), os.path.join(dir_path, files[2])
    153     ]
    154     self.assertItemsEqual(
    155         file_io.get_matching_files(files_subset), files_subset)
    156     file_io.delete_recursively(dir_path)
    157     self.assertFalse(file_io.file_exists(os.path.join(dir_path, "file3.txt")))
    158 
    159   def testCreateRecursiveDir(self):
    160     dir_path = os.path.join(self._base_dir, "temp_dir/temp_dir1/temp_dir2")
    161     file_io.recursive_create_dir(dir_path)
    162     file_io.recursive_create_dir(dir_path)  # repeat creation
    163     file_path = os.path.join(dir_path, "temp_file")
    164     file_io.FileIO(file_path, mode="w").write("testing")
    165     self.assertTrue(file_io.file_exists(file_path))
    166     file_io.delete_recursively(os.path.join(self._base_dir, "temp_dir"))
    167     self.assertFalse(file_io.file_exists(file_path))
    168 
    169   def testCopy(self):
    170     file_path = os.path.join(self._base_dir, "temp_file")
    171     file_io.FileIO(file_path, mode="w").write("testing")
    172     copy_path = os.path.join(self._base_dir, "copy_file")
    173     file_io.copy(file_path, copy_path)
    174     self.assertTrue(file_io.file_exists(copy_path))
    175     f = file_io.FileIO(file_path, mode="r")
    176     self.assertEqual("testing", f.read())
    177     self.assertEqual(7, f.tell())
    178 
    179   def testCopyOverwrite(self):
    180     file_path = os.path.join(self._base_dir, "temp_file")
    181     file_io.FileIO(file_path, mode="w").write("testing")
    182     copy_path = os.path.join(self._base_dir, "copy_file")
    183     file_io.FileIO(copy_path, mode="w").write("copy")
    184     file_io.copy(file_path, copy_path, overwrite=True)
    185     self.assertTrue(file_io.file_exists(copy_path))
    186     self.assertEqual("testing", file_io.FileIO(file_path, mode="r").read())
    187 
    188   def testCopyOverwriteFalse(self):
    189     file_path = os.path.join(self._base_dir, "temp_file")
    190     file_io.FileIO(file_path, mode="w").write("testing")
    191     copy_path = os.path.join(self._base_dir, "copy_file")
    192     file_io.FileIO(copy_path, mode="w").write("copy")
    193     with self.assertRaises(errors.AlreadyExistsError):
    194       file_io.copy(file_path, copy_path, overwrite=False)
    195 
    196   def testRename(self):
    197     file_path = os.path.join(self._base_dir, "temp_file")
    198     file_io.FileIO(file_path, mode="w").write("testing")
    199     rename_path = os.path.join(self._base_dir, "rename_file")
    200     file_io.rename(file_path, rename_path)
    201     self.assertTrue(file_io.file_exists(rename_path))
    202     self.assertFalse(file_io.file_exists(file_path))
    203 
    204   def testRenameOverwrite(self):
    205     file_path = os.path.join(self._base_dir, "temp_file")
    206     file_io.FileIO(file_path, mode="w").write("testing")
    207     rename_path = os.path.join(self._base_dir, "rename_file")
    208     file_io.FileIO(rename_path, mode="w").write("rename")
    209     file_io.rename(file_path, rename_path, overwrite=True)
    210     self.assertTrue(file_io.file_exists(rename_path))
    211     self.assertFalse(file_io.file_exists(file_path))
    212 
    213   def testRenameOverwriteFalse(self):
    214     file_path = os.path.join(self._base_dir, "temp_file")
    215     file_io.FileIO(file_path, mode="w").write("testing")
    216     rename_path = os.path.join(self._base_dir, "rename_file")
    217     file_io.FileIO(rename_path, mode="w").write("rename")
    218     with self.assertRaises(errors.AlreadyExistsError):
    219       file_io.rename(file_path, rename_path, overwrite=False)
    220     self.assertTrue(file_io.file_exists(rename_path))
    221     self.assertTrue(file_io.file_exists(file_path))
    222 
    223   def testDeleteRecursivelyFail(self):
    224     fake_dir_path = os.path.join(self._base_dir, "temp_dir")
    225     with self.assertRaises(errors.NotFoundError):
    226       file_io.delete_recursively(fake_dir_path)
    227 
    228   def testIsDirectory(self):
    229     dir_path = os.path.join(self._base_dir, "test_dir")
    230     # Failure for a non-existing dir.
    231     self.assertFalse(file_io.is_directory(dir_path))
    232     file_io.create_dir(dir_path)
    233     self.assertTrue(file_io.is_directory(dir_path))
    234     file_path = os.path.join(dir_path, "test_file")
    235     file_io.FileIO(file_path, mode="w").write("test")
    236     # False for a file.
    237     self.assertFalse(file_io.is_directory(file_path))
    238     # Test that the value returned from `stat()` has `is_directory` set.
    239     file_statistics = file_io.stat(dir_path)
    240     self.assertTrue(file_statistics.is_directory)
    241 
    242   def testListDirectory(self):
    243     dir_path = os.path.join(self._base_dir, "test_dir")
    244     file_io.create_dir(dir_path)
    245     files = ["file1.txt", "file2.txt", "file3.txt"]
    246     for name in files:
    247       file_path = os.path.join(dir_path, name)
    248       file_io.FileIO(file_path, mode="w").write("testing")
    249     subdir_path = os.path.join(dir_path, "sub_dir")
    250     file_io.create_dir(subdir_path)
    251     subdir_file_path = os.path.join(subdir_path, "file4.txt")
    252     file_io.FileIO(subdir_file_path, mode="w").write("testing")
    253     dir_list = file_io.list_directory(dir_path)
    254     self.assertItemsEqual(files + ["sub_dir"], dir_list)
    255 
    256   def testListDirectoryFailure(self):
    257     dir_path = os.path.join(self._base_dir, "test_dir")
    258     with self.assertRaises(errors.NotFoundError):
    259       file_io.list_directory(dir_path)
    260 
    261   def _setupWalkDirectories(self, dir_path):
    262     # Creating a file structure as follows
    263     # test_dir -> file: file1.txt; dirs: subdir1_1, subdir1_2, subdir1_3
    264     # subdir1_1 -> file: file3.txt
    265     # subdir1_2 -> dir: subdir2
    266     file_io.create_dir(dir_path)
    267     file_io.FileIO(
    268         os.path.join(dir_path, "file1.txt"), mode="w").write("testing")
    269     sub_dirs1 = ["subdir1_1", "subdir1_2", "subdir1_3"]
    270     for name in sub_dirs1:
    271       file_io.create_dir(os.path.join(dir_path, name))
    272     file_io.FileIO(
    273         os.path.join(dir_path, "subdir1_1/file2.txt"),
    274         mode="w").write("testing")
    275     file_io.create_dir(os.path.join(dir_path, "subdir1_2/subdir2"))
    276 
    277   def testWalkInOrder(self):
    278     dir_path = os.path.join(self._base_dir, "test_dir")
    279     self._setupWalkDirectories(dir_path)
    280     # Now test the walk (in_order = True)
    281     all_dirs = []
    282     all_subdirs = []
    283     all_files = []
    284     for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=True):
    285       all_dirs.append(w_dir)
    286       all_subdirs.append(w_subdirs)
    287       all_files.append(w_files)
    288     self.assertItemsEqual(all_dirs, [dir_path] + [
    289         os.path.join(dir_path, item)
    290         for item in
    291         ["subdir1_1", "subdir1_2", "subdir1_2/subdir2", "subdir1_3"]
    292     ])
    293     self.assertEqual(dir_path, all_dirs[0])
    294     self.assertLess(
    295         all_dirs.index(os.path.join(dir_path, "subdir1_2")),
    296         all_dirs.index(os.path.join(dir_path, "subdir1_2/subdir2")))
    297     self.assertItemsEqual(all_subdirs[1:5], [[], ["subdir2"], [], []])
    298     self.assertItemsEqual(all_subdirs[0],
    299                           ["subdir1_1", "subdir1_2", "subdir1_3"])
    300     self.assertItemsEqual(all_files, [["file1.txt"], ["file2.txt"], [], [], []])
    301     self.assertLess(
    302         all_files.index(["file1.txt"]), all_files.index(["file2.txt"]))
    303 
    304   def testWalkPostOrder(self):
    305     dir_path = os.path.join(self._base_dir, "test_dir")
    306     self._setupWalkDirectories(dir_path)
    307     # Now test the walk (in_order = False)
    308     all_dirs = []
    309     all_subdirs = []
    310     all_files = []
    311     for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=False):
    312       all_dirs.append(w_dir)
    313       all_subdirs.append(w_subdirs)
    314       all_files.append(w_files)
    315     self.assertItemsEqual(all_dirs, [
    316         os.path.join(dir_path, item)
    317         for item in
    318         ["subdir1_1", "subdir1_2/subdir2", "subdir1_2", "subdir1_3"]
    319     ] + [dir_path])
    320     self.assertEqual(dir_path, all_dirs[4])
    321     self.assertLess(
    322         all_dirs.index(os.path.join(dir_path, "subdir1_2/subdir2")),
    323         all_dirs.index(os.path.join(dir_path, "subdir1_2")))
    324     self.assertItemsEqual(all_subdirs[0:4], [[], [], ["subdir2"], []])
    325     self.assertItemsEqual(all_subdirs[4],
    326                           ["subdir1_1", "subdir1_2", "subdir1_3"])
    327     self.assertItemsEqual(all_files, [["file2.txt"], [], [], [], ["file1.txt"]])
    328     self.assertLess(
    329         all_files.index(["file2.txt"]), all_files.index(["file1.txt"]))
    330 
    331   def testWalkFailure(self):
    332     dir_path = os.path.join(self._base_dir, "test_dir")
    333     # Try walking a directory that wasn't created.
    334     all_dirs = []
    335     all_subdirs = []
    336     all_files = []
    337     for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=False):
    338       all_dirs.append(w_dir)
    339       all_subdirs.append(w_subdirs)
    340       all_files.append(w_files)
    341     self.assertItemsEqual(all_dirs, [])
    342     self.assertItemsEqual(all_subdirs, [])
    343     self.assertItemsEqual(all_files, [])
    344 
    345   def testStat(self):
    346     file_path = os.path.join(self._base_dir, "temp_file")
    347     file_io.FileIO(file_path, mode="w").write("testing")
    348     file_statistics = file_io.stat(file_path)
    349     os_statistics = os.stat(file_path)
    350     self.assertEqual(7, file_statistics.length)
    351     self.assertEqual(
    352         int(os_statistics.st_mtime), int(file_statistics.mtime_nsec / 1e9))
    353     self.assertFalse(file_statistics.is_directory)
    354 
    355   def testReadLine(self):
    356     file_path = os.path.join(self._base_dir, "temp_file")
    357     with file_io.FileIO(file_path, mode="r+") as f:
    358       f.write("testing1\ntesting2\ntesting3\n\ntesting5")
    359     self.assertEqual(36, f.size())
    360     self.assertEqual("testing1\n", f.readline())
    361     self.assertEqual("testing2\n", f.readline())
    362     self.assertEqual("testing3\n", f.readline())
    363     self.assertEqual("\n", f.readline())
    364     self.assertEqual("testing5", f.readline())
    365     self.assertEqual("", f.readline())
    366 
    367   def testRead(self):
    368     file_path = os.path.join(self._base_dir, "temp_file")
    369     with file_io.FileIO(file_path, mode="r+") as f:
    370       f.write("testing1\ntesting2\ntesting3\n\ntesting5")
    371     self.assertEqual(36, f.size())
    372     self.assertEqual("testing1\n", f.read(9))
    373     self.assertEqual("testing2\n", f.read(9))
    374     self.assertEqual("t", f.read(1))
    375     self.assertEqual("esting3\n\ntesting5", f.read())
    376 
    377   def testTell(self):
    378     file_path = os.path.join(self._base_dir, "temp_file")
    379     with file_io.FileIO(file_path, mode="r+") as f:
    380       f.write("testing1\ntesting2\ntesting3\n\ntesting5")
    381     self.assertEqual(0, f.tell())
    382     self.assertEqual("testing1\n", f.readline())
    383     self.assertEqual(9, f.tell())
    384     self.assertEqual("testing2\n", f.readline())
    385     self.assertEqual(18, f.tell())
    386     self.assertEqual("testing3\n", f.readline())
    387     self.assertEqual(27, f.tell())
    388     self.assertEqual("\n", f.readline())
    389     self.assertEqual(28, f.tell())
    390     self.assertEqual("testing5", f.readline())
    391     self.assertEqual(36, f.tell())
    392     self.assertEqual("", f.readline())
    393     self.assertEqual(36, f.tell())
    394 
    395   def testSeek(self):
    396     file_path = os.path.join(self._base_dir, "temp_file")
    397     with file_io.FileIO(file_path, mode="r+") as f:
    398       f.write("testing1\ntesting2\ntesting3\n\ntesting5")
    399     self.assertEqual("testing1\n", f.readline())
    400     self.assertEqual(9, f.tell())
    401 
    402     # Seek to 18
    403     f.seek(18)
    404     self.assertEqual(18, f.tell())
    405     self.assertEqual("testing3\n", f.readline())
    406 
    407     # Seek back to 9
    408     f.seek(9)
    409     self.assertEqual(9, f.tell())
    410     self.assertEqual("testing2\n", f.readline())
    411 
    412     f.seek(0)
    413     self.assertEqual(0, f.tell())
    414     self.assertEqual("testing1\n", f.readline())
    415 
    416     with self.assertRaises(errors.InvalidArgumentError):
    417       f.seek(-1)
    418 
    419     with self.assertRaises(TypeError):
    420       f.seek()
    421 
    422     # TODO(jhseu): Delete after position deprecation.
    423     with self.assertRaises(TypeError):
    424       f.seek(offset=0, position=0)
    425     f.seek(position=9)
    426     self.assertEqual(9, f.tell())
    427     self.assertEqual("testing2\n", f.readline())
    428 
    429   def testSeekFromWhat(self):
    430     file_path = os.path.join(self._base_dir, "temp_file")
    431     with file_io.FileIO(file_path, mode="r+") as f:
    432       f.write("testing1\ntesting2\ntesting3\n\ntesting5")
    433     self.assertEqual("testing1\n", f.readline())
    434     self.assertEqual(9, f.tell())
    435 
    436     # Seek to 18
    437     f.seek(9, 1)
    438     self.assertEqual(18, f.tell())
    439     self.assertEqual("testing3\n", f.readline())
    440 
    441     # Seek back to 9
    442     f.seek(9, 0)
    443     self.assertEqual(9, f.tell())
    444     self.assertEqual("testing2\n", f.readline())
    445 
    446     f.seek(-f.size(), 2)
    447     self.assertEqual(0, f.tell())
    448     self.assertEqual("testing1\n", f.readline())
    449 
    450     with self.assertRaises(errors.InvalidArgumentError):
    451       f.seek(0, 3)
    452 
    453   def testReadingIterator(self):
    454     file_path = os.path.join(self._base_dir, "temp_file")
    455     data = ["testing1\n", "testing2\n", "testing3\n", "\n", "testing5"]
    456     with file_io.FileIO(file_path, mode="r+") as f:
    457       f.write("".join(data))
    458     actual_data = []
    459     for line in f:
    460       actual_data.append(line)
    461     self.assertSequenceEqual(actual_data, data)
    462 
    463   def testReadlines(self):
    464     file_path = os.path.join(self._base_dir, "temp_file")
    465     data = ["testing1\n", "testing2\n", "testing3\n", "\n", "testing5"]
    466     f = file_io.FileIO(file_path, mode="r+")
    467     f.write("".join(data))
    468     f.flush()
    469     lines = f.readlines()
    470     self.assertSequenceEqual(lines, data)
    471 
    472   def testUTF8StringPath(self):
    473     file_path = os.path.join(self._base_dir, "UTF8_file")
    474     file_io.write_string_to_file(file_path, "testing")
    475     with file_io.FileIO(file_path, mode="rb") as f:
    476       self.assertEqual(b"testing", f.read())
    477 
    478   def testEof(self):
    479     """Test that reading past EOF does not raise an exception."""
    480 
    481     file_path = os.path.join(self._base_dir, "temp_file")
    482     f = file_io.FileIO(file_path, mode="r+")
    483     content = "testing"
    484     f.write(content)
    485     f.flush()
    486     self.assertEqual(content, f.read(len(content) + 1))
    487 
    488 
    489 if __name__ == "__main__":
    490   test.main()
    491