1 # Adapted from test_file.py by Daniel Stutzbach 2 3 from __future__ import unicode_literals 4 5 import sys 6 import os 7 import errno 8 import unittest 9 from array import array 10 from weakref import proxy 11 from functools import wraps 12 from UserList import UserList 13 14 from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd 15 from test.test_support import py3k_bytes as bytes, cpython_only 16 from test.script_helper import run_python 17 18 from _io import FileIO as _FileIO 19 20 class AutoFileTests(unittest.TestCase): 21 # file tests for which a test file is automatically set up 22 23 def setUp(self): 24 self.f = _FileIO(TESTFN, 'w') 25 26 def tearDown(self): 27 if self.f: 28 self.f.close() 29 os.remove(TESTFN) 30 31 def testWeakRefs(self): 32 # verify weak references 33 p = proxy(self.f) 34 p.write(bytes(range(10))) 35 self.assertEqual(self.f.tell(), p.tell()) 36 self.f.close() 37 self.f = None 38 self.assertRaises(ReferenceError, getattr, p, 'tell') 39 40 def testSeekTell(self): 41 self.f.write(bytes(range(20))) 42 self.assertEqual(self.f.tell(), 20) 43 self.f.seek(0) 44 self.assertEqual(self.f.tell(), 0) 45 self.f.seek(10) 46 self.assertEqual(self.f.tell(), 10) 47 self.f.seek(5, 1) 48 self.assertEqual(self.f.tell(), 15) 49 self.f.seek(-5, 1) 50 self.assertEqual(self.f.tell(), 10) 51 self.f.seek(-5, 2) 52 self.assertEqual(self.f.tell(), 15) 53 54 def testAttributes(self): 55 # verify expected attributes exist 56 f = self.f 57 58 self.assertEqual(f.mode, "wb") 59 self.assertEqual(f.closed, False) 60 61 # verify the attributes are readonly 62 for attr in 'mode', 'closed': 63 self.assertRaises((AttributeError, TypeError), 64 setattr, f, attr, 'oops') 65 66 def testReadinto(self): 67 # verify readinto 68 self.f.write(b"\x01\x02") 69 self.f.close() 70 a = array(b'b', b'x'*10) 71 self.f = _FileIO(TESTFN, 'r') 72 n = self.f.readinto(a) 73 self.assertEqual(array(b'b', [1, 2]), a[:n]) 74 75 def testWritelinesList(self): 76 l = [b'123', b'456'] 77 self.f.writelines(l) 78 self.f.close() 79 self.f = _FileIO(TESTFN, 'rb') 80 buf = self.f.read() 81 self.assertEqual(buf, b'123456') 82 83 def testWritelinesUserList(self): 84 l = UserList([b'123', b'456']) 85 self.f.writelines(l) 86 self.f.close() 87 self.f = _FileIO(TESTFN, 'rb') 88 buf = self.f.read() 89 self.assertEqual(buf, b'123456') 90 91 def testWritelinesError(self): 92 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) 93 self.assertRaises(TypeError, self.f.writelines, None) 94 95 def test_none_args(self): 96 self.f.write(b"hi\nbye\nabc") 97 self.f.close() 98 self.f = _FileIO(TESTFN, 'r') 99 self.assertEqual(self.f.read(None), b"hi\nbye\nabc") 100 self.f.seek(0) 101 self.assertEqual(self.f.readline(None), b"hi\n") 102 self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"]) 103 104 def testRepr(self): 105 self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>" 106 % (self.f.name, self.f.mode)) 107 del self.f.name 108 self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>" 109 % (self.f.fileno(), self.f.mode)) 110 self.f.close() 111 self.assertEqual(repr(self.f), "<_io.FileIO [closed]>") 112 113 def testErrors(self): 114 f = self.f 115 self.assertFalse(f.isatty()) 116 self.assertFalse(f.closed) 117 #self.assertEqual(f.name, TESTFN) 118 self.assertRaises(ValueError, f.read, 10) # Open for reading 119 f.close() 120 self.assertTrue(f.closed) 121 f = _FileIO(TESTFN, 'r') 122 self.assertRaises(TypeError, f.readinto, "") 123 self.assertFalse(f.closed) 124 f.close() 125 self.assertTrue(f.closed) 126 127 def testMethods(self): 128 methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable', 129 'read', 'readall', 'readline', 'readlines', 130 'tell', 'truncate', 'flush'] 131 if sys.platform.startswith('atheos'): 132 methods.remove('truncate') 133 134 self.f.close() 135 self.assertTrue(self.f.closed) 136 137 for methodname in methods: 138 method = getattr(self.f, methodname) 139 # should raise on closed file 140 self.assertRaises(ValueError, method) 141 142 self.assertRaises(ValueError, self.f.readinto) # XXX should be TypeError? 143 self.assertRaises(ValueError, self.f.readinto, bytearray(1)) 144 self.assertRaises(ValueError, self.f.seek) 145 self.assertRaises(ValueError, self.f.seek, 0) 146 self.assertRaises(ValueError, self.f.write) 147 self.assertRaises(ValueError, self.f.write, b'') 148 self.assertRaises(TypeError, self.f.writelines) 149 self.assertRaises(ValueError, self.f.writelines, b'') 150 151 def testOpendir(self): 152 # Issue 3703: opening a directory should fill the errno 153 # Windows always returns "[Errno 13]: Permission denied 154 # Unix calls dircheck() and returns "[Errno 21]: Is a directory" 155 try: 156 _FileIO('.', 'r') 157 except IOError as e: 158 self.assertNotEqual(e.errno, 0) 159 self.assertEqual(e.filename, ".") 160 else: 161 self.fail("Should have raised IOError") 162 163 @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system") 164 def testOpenDirFD(self): 165 fd = os.open('.', os.O_RDONLY) 166 with self.assertRaises(IOError) as cm: 167 _FileIO(fd, 'r') 168 os.close(fd) 169 self.assertEqual(cm.exception.errno, errno.EISDIR) 170 171 #A set of functions testing that we get expected behaviour if someone has 172 #manually closed the internal file descriptor. First, a decorator: 173 def ClosedFD(func): 174 @wraps(func) 175 def wrapper(self): 176 #forcibly close the fd before invoking the problem function 177 f = self.f 178 os.close(f.fileno()) 179 try: 180 func(self, f) 181 finally: 182 try: 183 self.f.close() 184 except IOError: 185 pass 186 return wrapper 187 188 def ClosedFDRaises(func): 189 @wraps(func) 190 def wrapper(self): 191 #forcibly close the fd before invoking the problem function 192 f = self.f 193 os.close(f.fileno()) 194 try: 195 func(self, f) 196 except IOError as e: 197 self.assertEqual(e.errno, errno.EBADF) 198 else: 199 self.fail("Should have raised IOError") 200 finally: 201 try: 202 self.f.close() 203 except IOError: 204 pass 205 return wrapper 206 207 @ClosedFDRaises 208 def testErrnoOnClose(self, f): 209 f.close() 210 211 @ClosedFDRaises 212 def testErrnoOnClosedWrite(self, f): 213 f.write('a') 214 215 @ClosedFDRaises 216 def testErrnoOnClosedSeek(self, f): 217 f.seek(0) 218 219 @ClosedFDRaises 220 def testErrnoOnClosedTell(self, f): 221 f.tell() 222 223 @ClosedFDRaises 224 def testErrnoOnClosedTruncate(self, f): 225 f.truncate(0) 226 227 @ClosedFD 228 def testErrnoOnClosedSeekable(self, f): 229 f.seekable() 230 231 @ClosedFD 232 def testErrnoOnClosedReadable(self, f): 233 f.readable() 234 235 @ClosedFD 236 def testErrnoOnClosedWritable(self, f): 237 f.writable() 238 239 @ClosedFD 240 def testErrnoOnClosedFileno(self, f): 241 f.fileno() 242 243 @ClosedFD 244 def testErrnoOnClosedIsatty(self, f): 245 self.assertEqual(f.isatty(), False) 246 247 def ReopenForRead(self): 248 try: 249 self.f.close() 250 except IOError: 251 pass 252 self.f = _FileIO(TESTFN, 'r') 253 os.close(self.f.fileno()) 254 return self.f 255 256 @ClosedFDRaises 257 def testErrnoOnClosedRead(self, f): 258 f = self.ReopenForRead() 259 f.read(1) 260 261 @ClosedFDRaises 262 def testErrnoOnClosedReadall(self, f): 263 f = self.ReopenForRead() 264 f.readall() 265 266 @ClosedFDRaises 267 def testErrnoOnClosedReadinto(self, f): 268 f = self.ReopenForRead() 269 a = array(b'b', b'x'*10) 270 f.readinto(a) 271 272 class OtherFileTests(unittest.TestCase): 273 274 def testAbles(self): 275 try: 276 f = _FileIO(TESTFN, "w") 277 self.assertEqual(f.readable(), False) 278 self.assertEqual(f.writable(), True) 279 self.assertEqual(f.seekable(), True) 280 f.close() 281 282 f = _FileIO(TESTFN, "r") 283 self.assertEqual(f.readable(), True) 284 self.assertEqual(f.writable(), False) 285 self.assertEqual(f.seekable(), True) 286 f.close() 287 288 f = _FileIO(TESTFN, "a+") 289 self.assertEqual(f.readable(), True) 290 self.assertEqual(f.writable(), True) 291 self.assertEqual(f.seekable(), True) 292 self.assertEqual(f.isatty(), False) 293 f.close() 294 finally: 295 os.unlink(TESTFN) 296 297 @unittest.skipIf(sys.platform == 'win32', 'no ttys on Windows') 298 def testAblesOnTTY(self): 299 try: 300 f = _FileIO("/dev/tty", "a") 301 except EnvironmentError: 302 # When run in a cron job there just aren't any 303 # ttys, so skip the test. This also handles other 304 # OS'es that don't support /dev/tty. 305 self.skipTest('need /dev/tty') 306 else: 307 self.assertEqual(f.readable(), False) 308 self.assertEqual(f.writable(), True) 309 if sys.platform != "darwin" and \ 310 'bsd' not in sys.platform and \ 311 not sys.platform.startswith(('sunos', 'aix')): 312 # Somehow /dev/tty appears seekable on some BSDs 313 self.assertEqual(f.seekable(), False) 314 self.assertEqual(f.isatty(), True) 315 f.close() 316 317 def testInvalidModeStrings(self): 318 # check invalid mode strings 319 for mode in ("", "aU", "wU+", "rw", "rt"): 320 try: 321 f = _FileIO(TESTFN, mode) 322 except ValueError: 323 pass 324 else: 325 f.close() 326 self.fail('%r is an invalid file mode' % mode) 327 328 def testModeStrings(self): 329 # test that the mode attribute is correct for various mode strings 330 # given as init args 331 try: 332 for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'), 333 ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'), 334 ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'), 335 ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]: 336 # read modes are last so that TESTFN will exist first 337 with _FileIO(TESTFN, modes[0]) as f: 338 self.assertEqual(f.mode, modes[1]) 339 finally: 340 if os.path.exists(TESTFN): 341 os.unlink(TESTFN) 342 343 def testUnicodeOpen(self): 344 # verify repr works for unicode too 345 f = _FileIO(str(TESTFN), "w") 346 f.close() 347 os.unlink(TESTFN) 348 349 def testBytesOpen(self): 350 # Opening a bytes filename 351 try: 352 fn = TESTFN.encode("ascii") 353 except UnicodeEncodeError: 354 self.skipTest('could not encode %r to ascii' % TESTFN) 355 f = _FileIO(fn, "w") 356 try: 357 f.write(b"abc") 358 f.close() 359 with open(TESTFN, "rb") as f: 360 self.assertEqual(f.read(), b"abc") 361 finally: 362 os.unlink(TESTFN) 363 364 def testConstructorHandlesNULChars(self): 365 fn_with_NUL = 'foo\0bar' 366 self.assertRaises(TypeError, _FileIO, fn_with_NUL, 'w') 367 self.assertRaises(TypeError, _FileIO, fn_with_NUL.encode('ascii'), 'w') 368 369 def testInvalidFd(self): 370 self.assertRaises(ValueError, _FileIO, -10) 371 self.assertRaises(OSError, _FileIO, make_bad_fd()) 372 if sys.platform == 'win32': 373 import msvcrt 374 self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd()) 375 376 @cpython_only 377 def testInvalidFd_overflow(self): 378 # Issue 15989 379 import _testcapi 380 self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1) 381 self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1) 382 383 def testBadModeArgument(self): 384 # verify that we get a sensible error message for bad mode argument 385 bad_mode = "qwerty" 386 try: 387 f = _FileIO(TESTFN, bad_mode) 388 except ValueError as msg: 389 if msg.args[0] != 0: 390 s = str(msg) 391 if TESTFN in s or bad_mode not in s: 392 self.fail("bad error message for invalid mode: %s" % s) 393 # if msg.args[0] == 0, we're probably on Windows where there may be 394 # no obvious way to discover why open() failed. 395 else: 396 f.close() 397 self.fail("no error for invalid mode: %s" % bad_mode) 398 399 def testTruncate(self): 400 f = _FileIO(TESTFN, 'w') 401 f.write(bytes(bytearray(range(10)))) 402 self.assertEqual(f.tell(), 10) 403 f.truncate(5) 404 self.assertEqual(f.tell(), 10) 405 self.assertEqual(f.seek(0, os.SEEK_END), 5) 406 f.truncate(15) 407 self.assertEqual(f.tell(), 5) 408 self.assertEqual(f.seek(0, os.SEEK_END), 15) 409 f.close() 410 411 def testTruncateOnWindows(self): 412 def bug801631(): 413 # SF bug <http://www.python.org/sf/801631> 414 # "file.truncate fault on windows" 415 f = _FileIO(TESTFN, 'w') 416 f.write(bytes(range(11))) 417 f.close() 418 419 f = _FileIO(TESTFN,'r+') 420 data = f.read(5) 421 if data != bytes(range(5)): 422 self.fail("Read on file opened for update failed %r" % data) 423 if f.tell() != 5: 424 self.fail("File pos after read wrong %d" % f.tell()) 425 426 f.truncate() 427 if f.tell() != 5: 428 self.fail("File pos after ftruncate wrong %d" % f.tell()) 429 430 f.close() 431 size = os.path.getsize(TESTFN) 432 if size != 5: 433 self.fail("File size after ftruncate wrong %d" % size) 434 435 try: 436 bug801631() 437 finally: 438 os.unlink(TESTFN) 439 440 def testAppend(self): 441 try: 442 f = open(TESTFN, 'wb') 443 f.write(b'spam') 444 f.close() 445 f = open(TESTFN, 'ab') 446 f.write(b'eggs') 447 f.close() 448 f = open(TESTFN, 'rb') 449 d = f.read() 450 f.close() 451 self.assertEqual(d, b'spameggs') 452 finally: 453 try: 454 os.unlink(TESTFN) 455 except: 456 pass 457 458 def testInvalidInit(self): 459 self.assertRaises(TypeError, _FileIO, "1", 0, 0) 460 461 def testWarnings(self): 462 with check_warnings(quiet=True) as w: 463 self.assertEqual(w.warnings, []) 464 self.assertRaises(TypeError, _FileIO, []) 465 self.assertEqual(w.warnings, []) 466 self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt") 467 self.assertEqual(w.warnings, []) 468 469 def test_surrogates(self): 470 # Issue #8438: try to open a filename containing surrogates. 471 # It should either fail because the file doesn't exist or the filename 472 # can't be represented using the filesystem encoding, but not because 473 # of a LookupError for the error handler "surrogateescape". 474 filename = u'\udc80.txt' 475 try: 476 with _FileIO(filename): 477 pass 478 except (UnicodeEncodeError, IOError): 479 pass 480 # Spawn a separate Python process with a different "file system 481 # default encoding", to exercise this further. 482 env = dict(os.environ) 483 env[b'LC_CTYPE'] = b'C' 484 _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env) 485 if ('UnicodeEncodeError' not in out and not 486 ( ('IOError: [Errno 2] No such file or directory' in out) or 487 ('IOError: [Errno 22] Invalid argument' in out) ) ): 488 self.fail('Bad output: %r' % out) 489 490 def testUnclosedFDOnException(self): 491 class MyException(Exception): pass 492 class MyFileIO(_FileIO): 493 def __setattr__(self, name, value): 494 if name == "name": 495 raise MyException("blocked setting name") 496 return super(MyFileIO, self).__setattr__(name, value) 497 fd = os.open(__file__, os.O_RDONLY) 498 self.assertRaises(MyException, MyFileIO, fd) 499 os.close(fd) # should not raise OSError(EBADF) 500 501 def test_main(): 502 # Historically, these tests have been sloppy about removing TESTFN. 503 # So get rid of it no matter what. 504 try: 505 run_unittest(AutoFileTests, OtherFileTests) 506 finally: 507 if os.path.exists(TESTFN): 508 os.unlink(TESTFN) 509 510 if __name__ == '__main__': 511 test_main() 512