1 # Adapted from test_file.py by Daniel Stutzbach 2 3 import sys 4 import os 5 import io 6 import errno 7 import unittest 8 from array import array 9 from weakref import proxy 10 from functools import wraps 11 12 from test.support import (TESTFN, TESTFN_UNICODE, check_warnings, run_unittest, 13 make_bad_fd, cpython_only) 14 from collections import UserList 15 16 import _io # C implementation of io 17 import _pyio # Python implementation of io 18 19 20 class AutoFileTests: 21 # file tests for which a test file is automatically set up 22 23 def setUp(self): 24 self.f = self.FileIO(TESTFN, 'w') 25 26 def tearDown(self): 27 if self.f: 28 self.f.close() 29 os.remove(TESTFN) 30 31 def testWeakRefs(self): 32 # verify weak references 33 p = proxy(self.f) 34 p.write(bytes(range(10))) 35 self.assertEqual(self.f.tell(), p.tell()) 36 self.f.close() 37 self.f = None 38 self.assertRaises(ReferenceError, getattr, p, 'tell') 39 40 def testSeekTell(self): 41 self.f.write(bytes(range(20))) 42 self.assertEqual(self.f.tell(), 20) 43 self.f.seek(0) 44 self.assertEqual(self.f.tell(), 0) 45 self.f.seek(10) 46 self.assertEqual(self.f.tell(), 10) 47 self.f.seek(5, 1) 48 self.assertEqual(self.f.tell(), 15) 49 self.f.seek(-5, 1) 50 self.assertEqual(self.f.tell(), 10) 51 self.f.seek(-5, 2) 52 self.assertEqual(self.f.tell(), 15) 53 54 def testAttributes(self): 55 # verify expected attributes exist 56 f = self.f 57 58 self.assertEqual(f.mode, "wb") 59 self.assertEqual(f.closed, False) 60 61 # verify the attributes are readonly 62 for attr in 'mode', 'closed': 63 self.assertRaises((AttributeError, TypeError), 64 setattr, f, attr, 'oops') 65 66 def testBlksize(self): 67 # test private _blksize attribute 68 blksize = io.DEFAULT_BUFFER_SIZE 69 # try to get preferred blksize from stat.st_blksize, if available 70 if hasattr(os, 'fstat'): 71 fst = os.fstat(self.f.fileno()) 72 blksize = getattr(fst, 'st_blksize', blksize) 73 self.assertEqual(self.f._blksize, blksize) 74 75 # verify readinto 76 def testReadintoByteArray(self): 77 self.f.write(bytes([1, 2, 0, 255])) 78 self.f.close() 79 80 ba = bytearray(b'abcdefgh') 81 with self.FileIO(TESTFN, 'r') as f: 82 n = f.readinto(ba) 83 self.assertEqual(ba, b'\x01\x02\x00\xffefgh') 84 self.assertEqual(n, 4) 85 86 def _testReadintoMemoryview(self): 87 self.f.write(bytes([1, 2, 0, 255])) 88 self.f.close() 89 90 m = memoryview(bytearray(b'abcdefgh')) 91 with self.FileIO(TESTFN, 'r') as f: 92 n = f.readinto(m) 93 self.assertEqual(m, b'\x01\x02\x00\xffefgh') 94 self.assertEqual(n, 4) 95 96 m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2]) 97 with self.FileIO(TESTFN, 'r') as f: 98 n = f.readinto(m) 99 self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh') 100 self.assertEqual(n, 4) 101 102 def _testReadintoArray(self): 103 self.f.write(bytes([1, 2, 0, 255])) 104 self.f.close() 105 106 a = array('B', b'abcdefgh') 107 with self.FileIO(TESTFN, 'r') as f: 108 n = f.readinto(a) 109 self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104])) 110 self.assertEqual(n, 4) 111 112 a = array('b', b'abcdefgh') 113 with self.FileIO(TESTFN, 'r') as f: 114 n = f.readinto(a) 115 self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104])) 116 self.assertEqual(n, 4) 117 118 a = array('I', b'abcdefgh') 119 with self.FileIO(TESTFN, 'r') as f: 120 n = f.readinto(a) 121 self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh')) 122 self.assertEqual(n, 4) 123 124 def testWritelinesList(self): 125 l = [b'123', b'456'] 126 self.f.writelines(l) 127 self.f.close() 128 self.f = self.FileIO(TESTFN, 'rb') 129 buf = self.f.read() 130 self.assertEqual(buf, b'123456') 131 132 def testWritelinesUserList(self): 133 l = UserList([b'123', b'456']) 134 self.f.writelines(l) 135 self.f.close() 136 self.f = self.FileIO(TESTFN, 'rb') 137 buf = self.f.read() 138 self.assertEqual(buf, b'123456') 139 140 def testWritelinesError(self): 141 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) 142 self.assertRaises(TypeError, self.f.writelines, None) 143 self.assertRaises(TypeError, self.f.writelines, "abc") 144 145 def test_none_args(self): 146 self.f.write(b"hi\nbye\nabc") 147 self.f.close() 148 self.f = self.FileIO(TESTFN, 'r') 149 self.assertEqual(self.f.read(None), b"hi\nbye\nabc") 150 self.f.seek(0) 151 self.assertEqual(self.f.readline(None), b"hi\n") 152 self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"]) 153 154 def test_reject(self): 155 self.assertRaises(TypeError, self.f.write, "Hello!") 156 157 def testRepr(self): 158 self.assertEqual(repr(self.f), 159 "<%s.FileIO name=%r mode=%r closefd=True>" % 160 (self.modulename, self.f.name, self.f.mode)) 161 del self.f.name 162 self.assertEqual(repr(self.f), 163 "<%s.FileIO fd=%r mode=%r closefd=True>" % 164 (self.modulename, self.f.fileno(), self.f.mode)) 165 self.f.close() 166 self.assertEqual(repr(self.f), 167 "<%s.FileIO [closed]>" % (self.modulename,)) 168 169 def testReprNoCloseFD(self): 170 fd = os.open(TESTFN, os.O_RDONLY) 171 try: 172 with self.FileIO(fd, 'r', closefd=False) as f: 173 self.assertEqual(repr(f), 174 "<%s.FileIO name=%r mode=%r closefd=False>" % 175 (self.modulename, f.name, f.mode)) 176 finally: 177 os.close(fd) 178 179 def testErrors(self): 180 f = self.f 181 self.assertFalse(f.isatty()) 182 self.assertFalse(f.closed) 183 #self.assertEqual(f.name, TESTFN) 184 self.assertRaises(ValueError, f.read, 10) # Open for reading 185 f.close() 186 self.assertTrue(f.closed) 187 f = self.FileIO(TESTFN, 'r') 188 self.assertRaises(TypeError, f.readinto, "") 189 self.assertFalse(f.closed) 190 f.close() 191 self.assertTrue(f.closed) 192 193 def testMethods(self): 194 methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable', 195 'read', 'readall', 'readline', 'readlines', 196 'tell', 'truncate', 'flush'] 197 198 self.f.close() 199 self.assertTrue(self.f.closed) 200 201 for methodname in methods: 202 method = getattr(self.f, methodname) 203 # should raise on closed file 204 self.assertRaises(ValueError, method) 205 206 self.assertRaises(TypeError, self.f.readinto) 207 self.assertRaises(ValueError, self.f.readinto, bytearray(1)) 208 self.assertRaises(TypeError, self.f.seek) 209 self.assertRaises(ValueError, self.f.seek, 0) 210 self.assertRaises(TypeError, self.f.write) 211 self.assertRaises(ValueError, self.f.write, b'') 212 self.assertRaises(TypeError, self.f.writelines) 213 self.assertRaises(ValueError, self.f.writelines, b'') 214 215 def testOpendir(self): 216 # Issue 3703: opening a directory should fill the errno 217 # Windows always returns "[Errno 13]: Permission denied 218 # Unix uses fstat and returns "[Errno 21]: Is a directory" 219 try: 220 self.FileIO('.', 'r') 221 except OSError as e: 222 self.assertNotEqual(e.errno, 0) 223 self.assertEqual(e.filename, ".") 224 else: 225 self.fail("Should have raised OSError") 226 227 @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system") 228 def testOpenDirFD(self): 229 fd = os.open('.', os.O_RDONLY) 230 with self.assertRaises(OSError) as cm: 231 self.FileIO(fd, 'r') 232 os.close(fd) 233 self.assertEqual(cm.exception.errno, errno.EISDIR) 234 235 #A set of functions testing that we get expected behaviour if someone has 236 #manually closed the internal file descriptor. First, a decorator: 237 def ClosedFD(func): 238 @wraps(func) 239 def wrapper(self): 240 #forcibly close the fd before invoking the problem function 241 f = self.f 242 os.close(f.fileno()) 243 try: 244 func(self, f) 245 finally: 246 try: 247 self.f.close() 248 except OSError: 249 pass 250 return wrapper 251 252 def ClosedFDRaises(func): 253 @wraps(func) 254 def wrapper(self): 255 #forcibly close the fd before invoking the problem function 256 f = self.f 257 os.close(f.fileno()) 258 try: 259 func(self, f) 260 except OSError as e: 261 self.assertEqual(e.errno, errno.EBADF) 262 else: 263 self.fail("Should have raised OSError") 264 finally: 265 try: 266 self.f.close() 267 except OSError: 268 pass 269 return wrapper 270 271 @ClosedFDRaises 272 def testErrnoOnClose(self, f): 273 f.close() 274 275 @ClosedFDRaises 276 def testErrnoOnClosedWrite(self, f): 277 f.write(b'a') 278 279 @ClosedFDRaises 280 def testErrnoOnClosedSeek(self, f): 281 f.seek(0) 282 283 @ClosedFDRaises 284 def testErrnoOnClosedTell(self, f): 285 f.tell() 286 287 @ClosedFDRaises 288 def testErrnoOnClosedTruncate(self, f): 289 f.truncate(0) 290 291 @ClosedFD 292 def testErrnoOnClosedSeekable(self, f): 293 f.seekable() 294 295 @ClosedFD 296 def testErrnoOnClosedReadable(self, f): 297 f.readable() 298 299 @ClosedFD 300 def testErrnoOnClosedWritable(self, f): 301 f.writable() 302 303 @ClosedFD 304 def testErrnoOnClosedFileno(self, f): 305 f.fileno() 306 307 @ClosedFD 308 def testErrnoOnClosedIsatty(self, f): 309 self.assertEqual(f.isatty(), False) 310 311 def ReopenForRead(self): 312 try: 313 self.f.close() 314 except OSError: 315 pass 316 self.f = self.FileIO(TESTFN, 'r') 317 os.close(self.f.fileno()) 318 return self.f 319 320 @ClosedFDRaises 321 def testErrnoOnClosedRead(self, f): 322 f = self.ReopenForRead() 323 f.read(1) 324 325 @ClosedFDRaises 326 def testErrnoOnClosedReadall(self, f): 327 f = self.ReopenForRead() 328 f.readall() 329 330 @ClosedFDRaises 331 def testErrnoOnClosedReadinto(self, f): 332 f = self.ReopenForRead() 333 a = array('b', b'x'*10) 334 f.readinto(a) 335 336 class CAutoFileTests(AutoFileTests, unittest.TestCase): 337 FileIO = _io.FileIO 338 modulename = '_io' 339 340 class PyAutoFileTests(AutoFileTests, unittest.TestCase): 341 FileIO = _pyio.FileIO 342 modulename = '_pyio' 343 344 345 class OtherFileTests: 346 347 def testAbles(self): 348 try: 349 f = self.FileIO(TESTFN, "w") 350 self.assertEqual(f.readable(), False) 351 self.assertEqual(f.writable(), True) 352 self.assertEqual(f.seekable(), True) 353 f.close() 354 355 f = self.FileIO(TESTFN, "r") 356 self.assertEqual(f.readable(), True) 357 self.assertEqual(f.writable(), False) 358 self.assertEqual(f.seekable(), True) 359 f.close() 360 361 f = self.FileIO(TESTFN, "a+") 362 self.assertEqual(f.readable(), True) 363 self.assertEqual(f.writable(), True) 364 self.assertEqual(f.seekable(), True) 365 self.assertEqual(f.isatty(), False) 366 f.close() 367 368 if sys.platform != "win32": 369 try: 370 f = self.FileIO("/dev/tty", "a") 371 except OSError: 372 # When run in a cron job there just aren't any 373 # ttys, so skip the test. This also handles other 374 # OS'es that don't support /dev/tty. 375 pass 376 else: 377 self.assertEqual(f.readable(), False) 378 self.assertEqual(f.writable(), True) 379 if sys.platform != "darwin" and \ 380 'bsd' not in sys.platform and \ 381 not sys.platform.startswith(('sunos', 'aix')): 382 # Somehow /dev/tty appears seekable on some BSDs 383 self.assertEqual(f.seekable(), False) 384 self.assertEqual(f.isatty(), True) 385 f.close() 386 finally: 387 os.unlink(TESTFN) 388 389 def testInvalidModeStrings(self): 390 # check invalid mode strings 391 for mode in ("", "aU", "wU+", "rw", "rt"): 392 try: 393 f = self.FileIO(TESTFN, mode) 394 except ValueError: 395 pass 396 else: 397 f.close() 398 self.fail('%r is an invalid file mode' % mode) 399 400 def testModeStrings(self): 401 # test that the mode attribute is correct for various mode strings 402 # given as init args 403 try: 404 for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'), 405 ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'), 406 ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'), 407 ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]: 408 # read modes are last so that TESTFN will exist first 409 with self.FileIO(TESTFN, modes[0]) as f: 410 self.assertEqual(f.mode, modes[1]) 411 finally: 412 if os.path.exists(TESTFN): 413 os.unlink(TESTFN) 414 415 def testUnicodeOpen(self): 416 # verify repr works for unicode too 417 f = self.FileIO(str(TESTFN), "w") 418 f.close() 419 os.unlink(TESTFN) 420 421 def testBytesOpen(self): 422 # Opening a bytes filename 423 try: 424 fn = TESTFN.encode("ascii") 425 except UnicodeEncodeError: 426 self.skipTest('could not encode %r to ascii' % TESTFN) 427 f = self.FileIO(fn, "w") 428 try: 429 f.write(b"abc") 430 f.close() 431 with open(TESTFN, "rb") as f: 432 self.assertEqual(f.read(), b"abc") 433 finally: 434 os.unlink(TESTFN) 435 436 @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8', 437 "test only works for utf-8 filesystems") 438 def testUtf8BytesOpen(self): 439 # Opening a UTF-8 bytes filename 440 try: 441 fn = TESTFN_UNICODE.encode("utf-8") 442 except UnicodeEncodeError: 443 self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE) 444 f = self.FileIO(fn, "w") 445 try: 446 f.write(b"abc") 447 f.close() 448 with open(TESTFN_UNICODE, "rb") as f: 449 self.assertEqual(f.read(), b"abc") 450 finally: 451 os.unlink(TESTFN_UNICODE) 452 453 def testConstructorHandlesNULChars(self): 454 fn_with_NUL = 'foo\0bar' 455 self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w') 456 self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w') 457 458 def testInvalidFd(self): 459 self.assertRaises(ValueError, self.FileIO, -10) 460 self.assertRaises(OSError, self.FileIO, make_bad_fd()) 461 if sys.platform == 'win32': 462 import msvcrt 463 self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd()) 464 465 def testBadModeArgument(self): 466 # verify that we get a sensible error message for bad mode argument 467 bad_mode = "qwerty" 468 try: 469 f = self.FileIO(TESTFN, bad_mode) 470 except ValueError as msg: 471 if msg.args[0] != 0: 472 s = str(msg) 473 if TESTFN in s or bad_mode not in s: 474 self.fail("bad error message for invalid mode: %s" % s) 475 # if msg.args[0] == 0, we're probably on Windows where there may be 476 # no obvious way to discover why open() failed. 477 else: 478 f.close() 479 self.fail("no error for invalid mode: %s" % bad_mode) 480 481 def testTruncate(self): 482 f = self.FileIO(TESTFN, 'w') 483 f.write(bytes(bytearray(range(10)))) 484 self.assertEqual(f.tell(), 10) 485 f.truncate(5) 486 self.assertEqual(f.tell(), 10) 487 self.assertEqual(f.seek(0, io.SEEK_END), 5) 488 f.truncate(15) 489 self.assertEqual(f.tell(), 5) 490 self.assertEqual(f.seek(0, io.SEEK_END), 15) 491 f.close() 492 493 def testTruncateOnWindows(self): 494 def bug801631(): 495 # SF bug <http://www.python.org/sf/801631> 496 # "file.truncate fault on windows" 497 f = self.FileIO(TESTFN, 'w') 498 f.write(bytes(range(11))) 499 f.close() 500 501 f = self.FileIO(TESTFN,'r+') 502 data = f.read(5) 503 if data != bytes(range(5)): 504 self.fail("Read on file opened for update failed %r" % data) 505 if f.tell() != 5: 506 self.fail("File pos after read wrong %d" % f.tell()) 507 508 f.truncate() 509 if f.tell() != 5: 510 self.fail("File pos after ftruncate wrong %d" % f.tell()) 511 512 f.close() 513 size = os.path.getsize(TESTFN) 514 if size != 5: 515 self.fail("File size after ftruncate wrong %d" % size) 516 517 try: 518 bug801631() 519 finally: 520 os.unlink(TESTFN) 521 522 def testAppend(self): 523 try: 524 f = open(TESTFN, 'wb') 525 f.write(b'spam') 526 f.close() 527 f = open(TESTFN, 'ab') 528 f.write(b'eggs') 529 f.close() 530 f = open(TESTFN, 'rb') 531 d = f.read() 532 f.close() 533 self.assertEqual(d, b'spameggs') 534 finally: 535 try: 536 os.unlink(TESTFN) 537 except: 538 pass 539 540 def testInvalidInit(self): 541 self.assertRaises(TypeError, self.FileIO, "1", 0, 0) 542 543 def testWarnings(self): 544 with check_warnings(quiet=True) as w: 545 self.assertEqual(w.warnings, []) 546 self.assertRaises(TypeError, self.FileIO, []) 547 self.assertEqual(w.warnings, []) 548 self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt") 549 self.assertEqual(w.warnings, []) 550 551 def testUnclosedFDOnException(self): 552 class MyException(Exception): pass 553 class MyFileIO(self.FileIO): 554 def __setattr__(self, name, value): 555 if name == "name": 556 raise MyException("blocked setting name") 557 return super(MyFileIO, self).__setattr__(name, value) 558 fd = os.open(__file__, os.O_RDONLY) 559 self.assertRaises(MyException, MyFileIO, fd) 560 os.close(fd) # should not raise OSError(EBADF) 561 562 class COtherFileTests(OtherFileTests, unittest.TestCase): 563 FileIO = _io.FileIO 564 modulename = '_io' 565 566 @cpython_only 567 def testInvalidFd_overflow(self): 568 # Issue 15989 569 import _testcapi 570 self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1) 571 self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1) 572 573 class PyOtherFileTests(OtherFileTests, unittest.TestCase): 574 FileIO = _pyio.FileIO 575 modulename = '_pyio' 576 577 578 def test_main(): 579 # Historically, these tests have been sloppy about removing TESTFN. 580 # So get rid of it no matter what. 581 try: 582 run_unittest(CAutoFileTests, PyAutoFileTests, 583 COtherFileTests, PyOtherFileTests) 584 finally: 585 if os.path.exists(TESTFN): 586 os.unlink(TESTFN) 587 588 if __name__ == '__main__': 589 test_main() 590