1 # This Python file uses the following encoding: utf-8 2 # Copyright 2015 The TensorFlow Authors. All Rights Reserved. 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # ============================================================================= 16 """Testing File IO operations in file_io.py.""" 17 18 from __future__ import absolute_import 19 from __future__ import division 20 from __future__ import print_function 21 22 import os.path 23 24 from tensorflow.python.framework import errors 25 from tensorflow.python.lib.io import file_io 26 from tensorflow.python.platform import test 27 28 29 class FileIoTest(test.TestCase): 30 31 def setUp(self): 32 self._base_dir = os.path.join(self.get_temp_dir(), "base_dir") 33 file_io.create_dir(self._base_dir) 34 35 def tearDown(self): 36 file_io.delete_recursively(self._base_dir) 37 38 def testEmptyFilename(self): 39 f = file_io.FileIO("", mode="r") 40 with self.assertRaises(errors.NotFoundError): 41 _ = f.read() 42 43 def testFileDoesntExist(self): 44 file_path = os.path.join(self._base_dir, "temp_file") 45 self.assertFalse(file_io.file_exists(file_path)) 46 with self.assertRaises(errors.NotFoundError): 47 _ = file_io.read_file_to_string(file_path) 48 49 def testWriteToString(self): 50 file_path = os.path.join(self._base_dir, "temp_file") 51 file_io.write_string_to_file(file_path, "testing") 52 self.assertTrue(file_io.file_exists(file_path)) 53 file_contents = file_io.read_file_to_string(file_path) 54 self.assertEqual("testing", file_contents) 55 56 def testAtomicWriteStringToFile(self): 57 file_path = os.path.join(self._base_dir, "temp_file") 58 file_io.atomic_write_string_to_file(file_path, "testing") 59 self.assertTrue(file_io.file_exists(file_path)) 60 file_contents = file_io.read_file_to_string(file_path) 61 self.assertEqual("testing", file_contents) 62 63 def testAtomicWriteStringToFileOverwriteFalse(self): 64 file_path = os.path.join(self._base_dir, "temp_file") 65 file_io.atomic_write_string_to_file(file_path, "old", overwrite=False) 66 with self.assertRaises(errors.AlreadyExistsError): 67 file_io.atomic_write_string_to_file(file_path, "new", overwrite=False) 68 file_contents = file_io.read_file_to_string(file_path) 69 self.assertEqual("old", file_contents) 70 file_io.delete_file(file_path) 71 file_io.atomic_write_string_to_file(file_path, "new", overwrite=False) 72 file_contents = file_io.read_file_to_string(file_path) 73 self.assertEqual("new", file_contents) 74 75 def testReadBinaryMode(self): 76 file_path = os.path.join(self._base_dir, "temp_file") 77 file_io.write_string_to_file(file_path, "testing") 78 with file_io.FileIO(file_path, mode="rb") as f: 79 self.assertEqual(b"testing", f.read()) 80 81 def testWriteBinaryMode(self): 82 file_path = os.path.join(self._base_dir, "temp_file") 83 file_io.FileIO(file_path, "wb").write("testing") 84 with file_io.FileIO(file_path, mode="r") as f: 85 self.assertEqual("testing", f.read()) 86 87 def testAppend(self): 88 file_path = os.path.join(self._base_dir, "temp_file") 89 with file_io.FileIO(file_path, mode="w") as f: 90 f.write("begin\n") 91 with file_io.FileIO(file_path, mode="a") as f: 92 f.write("a1\n") 93 with file_io.FileIO(file_path, mode="a") as f: 94 f.write("a2\n") 95 with file_io.FileIO(file_path, mode="r") as f: 96 file_contents = f.read() 97 self.assertEqual("begin\na1\na2\n", file_contents) 98 99 def testMultipleFiles(self): 100 file_prefix = os.path.join(self._base_dir, "temp_file") 101 for i in range(5000): 102 f = file_io.FileIO(file_prefix + str(i), mode="w+") 103 f.write("testing") 104 f.flush() 105 self.assertEqual("testing", f.read()) 106 f.close() 107 108 def testMultipleWrites(self): 109 file_path = os.path.join(self._base_dir, "temp_file") 110 with file_io.FileIO(file_path, mode="w") as f: 111 f.write("line1\n") 112 f.write("line2") 113 file_contents = file_io.read_file_to_string(file_path) 114 self.assertEqual("line1\nline2", file_contents) 115 116 def testFileWriteBadMode(self): 117 file_path = os.path.join(self._base_dir, "temp_file") 118 with self.assertRaises(errors.PermissionDeniedError): 119 file_io.FileIO(file_path, mode="r").write("testing") 120 121 def testFileReadBadMode(self): 122 file_path = os.path.join(self._base_dir, "temp_file") 123 file_io.FileIO(file_path, mode="w").write("testing") 124 self.assertTrue(file_io.file_exists(file_path)) 125 with self.assertRaises(errors.PermissionDeniedError): 126 file_io.FileIO(file_path, mode="w").read() 127 128 def testFileDelete(self): 129 file_path = os.path.join(self._base_dir, "temp_file") 130 file_io.FileIO(file_path, mode="w").write("testing") 131 file_io.delete_file(file_path) 132 self.assertFalse(file_io.file_exists(file_path)) 133 134 def testFileDeleteFail(self): 135 file_path = os.path.join(self._base_dir, "temp_file") 136 with self.assertRaises(errors.NotFoundError): 137 file_io.delete_file(file_path) 138 139 def testGetMatchingFiles(self): 140 dir_path = os.path.join(self._base_dir, "temp_dir") 141 file_io.create_dir(dir_path) 142 files = ["file1.txt", "file2.txt", "file3.txt"] 143 for name in files: 144 file_path = os.path.join(dir_path, name) 145 file_io.FileIO(file_path, mode="w").write("testing") 146 expected_match = [os.path.join(dir_path, name) for name in files] 147 self.assertItemsEqual( 148 file_io.get_matching_files(os.path.join(dir_path, "file*.txt")), 149 expected_match) 150 self.assertItemsEqual(file_io.get_matching_files(tuple()), []) 151 files_subset = [ 152 os.path.join(dir_path, files[0]), os.path.join(dir_path, files[2]) 153 ] 154 self.assertItemsEqual( 155 file_io.get_matching_files(files_subset), files_subset) 156 file_io.delete_recursively(dir_path) 157 self.assertFalse(file_io.file_exists(os.path.join(dir_path, "file3.txt"))) 158 159 def testCreateRecursiveDir(self): 160 dir_path = os.path.join(self._base_dir, "temp_dir/temp_dir1/temp_dir2") 161 file_io.recursive_create_dir(dir_path) 162 file_io.recursive_create_dir(dir_path) # repeat creation 163 file_path = os.path.join(dir_path, "temp_file") 164 file_io.FileIO(file_path, mode="w").write("testing") 165 self.assertTrue(file_io.file_exists(file_path)) 166 file_io.delete_recursively(os.path.join(self._base_dir, "temp_dir")) 167 self.assertFalse(file_io.file_exists(file_path)) 168 169 def testCopy(self): 170 file_path = os.path.join(self._base_dir, "temp_file") 171 file_io.FileIO(file_path, mode="w").write("testing") 172 copy_path = os.path.join(self._base_dir, "copy_file") 173 file_io.copy(file_path, copy_path) 174 self.assertTrue(file_io.file_exists(copy_path)) 175 f = file_io.FileIO(file_path, mode="r") 176 self.assertEqual("testing", f.read()) 177 self.assertEqual(7, f.tell()) 178 179 def testCopyOverwrite(self): 180 file_path = os.path.join(self._base_dir, "temp_file") 181 file_io.FileIO(file_path, mode="w").write("testing") 182 copy_path = os.path.join(self._base_dir, "copy_file") 183 file_io.FileIO(copy_path, mode="w").write("copy") 184 file_io.copy(file_path, copy_path, overwrite=True) 185 self.assertTrue(file_io.file_exists(copy_path)) 186 self.assertEqual("testing", file_io.FileIO(file_path, mode="r").read()) 187 188 def testCopyOverwriteFalse(self): 189 file_path = os.path.join(self._base_dir, "temp_file") 190 file_io.FileIO(file_path, mode="w").write("testing") 191 copy_path = os.path.join(self._base_dir, "copy_file") 192 file_io.FileIO(copy_path, mode="w").write("copy") 193 with self.assertRaises(errors.AlreadyExistsError): 194 file_io.copy(file_path, copy_path, overwrite=False) 195 196 def testRename(self): 197 file_path = os.path.join(self._base_dir, "temp_file") 198 file_io.FileIO(file_path, mode="w").write("testing") 199 rename_path = os.path.join(self._base_dir, "rename_file") 200 file_io.rename(file_path, rename_path) 201 self.assertTrue(file_io.file_exists(rename_path)) 202 self.assertFalse(file_io.file_exists(file_path)) 203 204 def testRenameOverwrite(self): 205 file_path = os.path.join(self._base_dir, "temp_file") 206 file_io.FileIO(file_path, mode="w").write("testing") 207 rename_path = os.path.join(self._base_dir, "rename_file") 208 file_io.FileIO(rename_path, mode="w").write("rename") 209 file_io.rename(file_path, rename_path, overwrite=True) 210 self.assertTrue(file_io.file_exists(rename_path)) 211 self.assertFalse(file_io.file_exists(file_path)) 212 213 def testRenameOverwriteFalse(self): 214 file_path = os.path.join(self._base_dir, "temp_file") 215 file_io.FileIO(file_path, mode="w").write("testing") 216 rename_path = os.path.join(self._base_dir, "rename_file") 217 file_io.FileIO(rename_path, mode="w").write("rename") 218 with self.assertRaises(errors.AlreadyExistsError): 219 file_io.rename(file_path, rename_path, overwrite=False) 220 self.assertTrue(file_io.file_exists(rename_path)) 221 self.assertTrue(file_io.file_exists(file_path)) 222 223 def testDeleteRecursivelyFail(self): 224 fake_dir_path = os.path.join(self._base_dir, "temp_dir") 225 with self.assertRaises(errors.NotFoundError): 226 file_io.delete_recursively(fake_dir_path) 227 228 def testIsDirectory(self): 229 dir_path = os.path.join(self._base_dir, "test_dir") 230 # Failure for a non-existing dir. 231 self.assertFalse(file_io.is_directory(dir_path)) 232 file_io.create_dir(dir_path) 233 self.assertTrue(file_io.is_directory(dir_path)) 234 file_path = os.path.join(dir_path, "test_file") 235 file_io.FileIO(file_path, mode="w").write("test") 236 # False for a file. 237 self.assertFalse(file_io.is_directory(file_path)) 238 # Test that the value returned from `stat()` has `is_directory` set. 239 file_statistics = file_io.stat(dir_path) 240 self.assertTrue(file_statistics.is_directory) 241 242 def testListDirectory(self): 243 dir_path = os.path.join(self._base_dir, "test_dir") 244 file_io.create_dir(dir_path) 245 files = ["file1.txt", "file2.txt", "file3.txt"] 246 for name in files: 247 file_path = os.path.join(dir_path, name) 248 file_io.FileIO(file_path, mode="w").write("testing") 249 subdir_path = os.path.join(dir_path, "sub_dir") 250 file_io.create_dir(subdir_path) 251 subdir_file_path = os.path.join(subdir_path, "file4.txt") 252 file_io.FileIO(subdir_file_path, mode="w").write("testing") 253 dir_list = file_io.list_directory(dir_path) 254 self.assertItemsEqual(files + ["sub_dir"], dir_list) 255 256 def testListDirectoryFailure(self): 257 dir_path = os.path.join(self._base_dir, "test_dir") 258 with self.assertRaises(errors.NotFoundError): 259 file_io.list_directory(dir_path) 260 261 def _setupWalkDirectories(self, dir_path): 262 # Creating a file structure as follows 263 # test_dir -> file: file1.txt; dirs: subdir1_1, subdir1_2, subdir1_3 264 # subdir1_1 -> file: file3.txt 265 # subdir1_2 -> dir: subdir2 266 file_io.create_dir(dir_path) 267 file_io.FileIO( 268 os.path.join(dir_path, "file1.txt"), mode="w").write("testing") 269 sub_dirs1 = ["subdir1_1", "subdir1_2", "subdir1_3"] 270 for name in sub_dirs1: 271 file_io.create_dir(os.path.join(dir_path, name)) 272 file_io.FileIO( 273 os.path.join(dir_path, "subdir1_1/file2.txt"), 274 mode="w").write("testing") 275 file_io.create_dir(os.path.join(dir_path, "subdir1_2/subdir2")) 276 277 def testWalkInOrder(self): 278 dir_path = os.path.join(self._base_dir, "test_dir") 279 self._setupWalkDirectories(dir_path) 280 # Now test the walk (in_order = True) 281 all_dirs = [] 282 all_subdirs = [] 283 all_files = [] 284 for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=True): 285 all_dirs.append(w_dir) 286 all_subdirs.append(w_subdirs) 287 all_files.append(w_files) 288 self.assertItemsEqual(all_dirs, [dir_path] + [ 289 os.path.join(dir_path, item) 290 for item in 291 ["subdir1_1", "subdir1_2", "subdir1_2/subdir2", "subdir1_3"] 292 ]) 293 self.assertEqual(dir_path, all_dirs[0]) 294 self.assertLess( 295 all_dirs.index(os.path.join(dir_path, "subdir1_2")), 296 all_dirs.index(os.path.join(dir_path, "subdir1_2/subdir2"))) 297 self.assertItemsEqual(all_subdirs[1:5], [[], ["subdir2"], [], []]) 298 self.assertItemsEqual(all_subdirs[0], 299 ["subdir1_1", "subdir1_2", "subdir1_3"]) 300 self.assertItemsEqual(all_files, [["file1.txt"], ["file2.txt"], [], [], []]) 301 self.assertLess( 302 all_files.index(["file1.txt"]), all_files.index(["file2.txt"])) 303 304 def testWalkPostOrder(self): 305 dir_path = os.path.join(self._base_dir, "test_dir") 306 self._setupWalkDirectories(dir_path) 307 # Now test the walk (in_order = False) 308 all_dirs = [] 309 all_subdirs = [] 310 all_files = [] 311 for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=False): 312 all_dirs.append(w_dir) 313 all_subdirs.append(w_subdirs) 314 all_files.append(w_files) 315 self.assertItemsEqual(all_dirs, [ 316 os.path.join(dir_path, item) 317 for item in 318 ["subdir1_1", "subdir1_2/subdir2", "subdir1_2", "subdir1_3"] 319 ] + [dir_path]) 320 self.assertEqual(dir_path, all_dirs[4]) 321 self.assertLess( 322 all_dirs.index(os.path.join(dir_path, "subdir1_2/subdir2")), 323 all_dirs.index(os.path.join(dir_path, "subdir1_2"))) 324 self.assertItemsEqual(all_subdirs[0:4], [[], [], ["subdir2"], []]) 325 self.assertItemsEqual(all_subdirs[4], 326 ["subdir1_1", "subdir1_2", "subdir1_3"]) 327 self.assertItemsEqual(all_files, [["file2.txt"], [], [], [], ["file1.txt"]]) 328 self.assertLess( 329 all_files.index(["file2.txt"]), all_files.index(["file1.txt"])) 330 331 def testWalkFailure(self): 332 dir_path = os.path.join(self._base_dir, "test_dir") 333 # Try walking a directory that wasn't created. 334 all_dirs = [] 335 all_subdirs = [] 336 all_files = [] 337 for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=False): 338 all_dirs.append(w_dir) 339 all_subdirs.append(w_subdirs) 340 all_files.append(w_files) 341 self.assertItemsEqual(all_dirs, []) 342 self.assertItemsEqual(all_subdirs, []) 343 self.assertItemsEqual(all_files, []) 344 345 def testStat(self): 346 file_path = os.path.join(self._base_dir, "temp_file") 347 file_io.FileIO(file_path, mode="w").write("testing") 348 file_statistics = file_io.stat(file_path) 349 os_statistics = os.stat(file_path) 350 self.assertEqual(7, file_statistics.length) 351 self.assertEqual( 352 int(os_statistics.st_mtime), int(file_statistics.mtime_nsec / 1e9)) 353 self.assertFalse(file_statistics.is_directory) 354 355 def testReadLine(self): 356 file_path = os.path.join(self._base_dir, "temp_file") 357 with file_io.FileIO(file_path, mode="r+") as f: 358 f.write("testing1\ntesting2\ntesting3\n\ntesting5") 359 self.assertEqual(36, f.size()) 360 self.assertEqual("testing1\n", f.readline()) 361 self.assertEqual("testing2\n", f.readline()) 362 self.assertEqual("testing3\n", f.readline()) 363 self.assertEqual("\n", f.readline()) 364 self.assertEqual("testing5", f.readline()) 365 self.assertEqual("", f.readline()) 366 367 def testRead(self): 368 file_path = os.path.join(self._base_dir, "temp_file") 369 with file_io.FileIO(file_path, mode="r+") as f: 370 f.write("testing1\ntesting2\ntesting3\n\ntesting5") 371 self.assertEqual(36, f.size()) 372 self.assertEqual("testing1\n", f.read(9)) 373 self.assertEqual("testing2\n", f.read(9)) 374 self.assertEqual("t", f.read(1)) 375 self.assertEqual("esting3\n\ntesting5", f.read()) 376 377 def testTell(self): 378 file_path = os.path.join(self._base_dir, "temp_file") 379 with file_io.FileIO(file_path, mode="r+") as f: 380 f.write("testing1\ntesting2\ntesting3\n\ntesting5") 381 self.assertEqual(0, f.tell()) 382 self.assertEqual("testing1\n", f.readline()) 383 self.assertEqual(9, f.tell()) 384 self.assertEqual("testing2\n", f.readline()) 385 self.assertEqual(18, f.tell()) 386 self.assertEqual("testing3\n", f.readline()) 387 self.assertEqual(27, f.tell()) 388 self.assertEqual("\n", f.readline()) 389 self.assertEqual(28, f.tell()) 390 self.assertEqual("testing5", f.readline()) 391 self.assertEqual(36, f.tell()) 392 self.assertEqual("", f.readline()) 393 self.assertEqual(36, f.tell()) 394 395 def testSeek(self): 396 file_path = os.path.join(self._base_dir, "temp_file") 397 with file_io.FileIO(file_path, mode="r+") as f: 398 f.write("testing1\ntesting2\ntesting3\n\ntesting5") 399 self.assertEqual("testing1\n", f.readline()) 400 self.assertEqual(9, f.tell()) 401 402 # Seek to 18 403 f.seek(18) 404 self.assertEqual(18, f.tell()) 405 self.assertEqual("testing3\n", f.readline()) 406 407 # Seek back to 9 408 f.seek(9) 409 self.assertEqual(9, f.tell()) 410 self.assertEqual("testing2\n", f.readline()) 411 412 f.seek(0) 413 self.assertEqual(0, f.tell()) 414 self.assertEqual("testing1\n", f.readline()) 415 416 with self.assertRaises(errors.InvalidArgumentError): 417 f.seek(-1) 418 419 with self.assertRaises(TypeError): 420 f.seek() 421 422 # TODO(jhseu): Delete after position deprecation. 423 with self.assertRaises(TypeError): 424 f.seek(offset=0, position=0) 425 f.seek(position=9) 426 self.assertEqual(9, f.tell()) 427 self.assertEqual("testing2\n", f.readline()) 428 429 def testSeekFromWhat(self): 430 file_path = os.path.join(self._base_dir, "temp_file") 431 with file_io.FileIO(file_path, mode="r+") as f: 432 f.write("testing1\ntesting2\ntesting3\n\ntesting5") 433 self.assertEqual("testing1\n", f.readline()) 434 self.assertEqual(9, f.tell()) 435 436 # Seek to 18 437 f.seek(9, 1) 438 self.assertEqual(18, f.tell()) 439 self.assertEqual("testing3\n", f.readline()) 440 441 # Seek back to 9 442 f.seek(9, 0) 443 self.assertEqual(9, f.tell()) 444 self.assertEqual("testing2\n", f.readline()) 445 446 f.seek(-f.size(), 2) 447 self.assertEqual(0, f.tell()) 448 self.assertEqual("testing1\n", f.readline()) 449 450 with self.assertRaises(errors.InvalidArgumentError): 451 f.seek(0, 3) 452 453 def testReadingIterator(self): 454 file_path = os.path.join(self._base_dir, "temp_file") 455 data = ["testing1\n", "testing2\n", "testing3\n", "\n", "testing5"] 456 with file_io.FileIO(file_path, mode="r+") as f: 457 f.write("".join(data)) 458 actual_data = [] 459 for line in f: 460 actual_data.append(line) 461 self.assertSequenceEqual(actual_data, data) 462 463 def testReadlines(self): 464 file_path = os.path.join(self._base_dir, "temp_file") 465 data = ["testing1\n", "testing2\n", "testing3\n", "\n", "testing5"] 466 f = file_io.FileIO(file_path, mode="r+") 467 f.write("".join(data)) 468 f.flush() 469 lines = f.readlines() 470 self.assertSequenceEqual(lines, data) 471 472 def testUTF8StringPath(self): 473 file_path = os.path.join(self._base_dir, "UTF8_file") 474 file_io.write_string_to_file(file_path, "testing") 475 with file_io.FileIO(file_path, mode="rb") as f: 476 self.assertEqual(b"testing", f.read()) 477 478 def testEof(self): 479 """Test that reading past EOF does not raise an exception.""" 480 481 file_path = os.path.join(self._base_dir, "temp_file") 482 f = file_io.FileIO(file_path, mode="r+") 483 content = "testing" 484 f.write(content) 485 f.flush() 486 self.assertEqual(content, f.read(len(content) + 1)) 487 488 489 if __name__ == "__main__": 490 test.main() 491