1 # Adapted from test_file.py by Daniel Stutzbach 2 3 from __future__ import unicode_literals 4 5 import sys 6 import os 7 import errno 8 import unittest 9 from array import array 10 from weakref import proxy 11 from functools import wraps 12 from UserList import UserList 13 import _testcapi 14 15 from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd 16 from test.test_support import py3k_bytes as bytes 17 from test.script_helper import run_python 18 19 from _io import FileIO as _FileIO 20 21 class AutoFileTests(unittest.TestCase): 22 # file tests for which a test file is automatically set up 23 24 def setUp(self): 25 self.f = _FileIO(TESTFN, 'w') 26 27 def tearDown(self): 28 if self.f: 29 self.f.close() 30 os.remove(TESTFN) 31 32 def testWeakRefs(self): 33 # verify weak references 34 p = proxy(self.f) 35 p.write(bytes(range(10))) 36 self.assertEqual(self.f.tell(), p.tell()) 37 self.f.close() 38 self.f = None 39 self.assertRaises(ReferenceError, getattr, p, 'tell') 40 41 def testSeekTell(self): 42 self.f.write(bytes(range(20))) 43 self.assertEqual(self.f.tell(), 20) 44 self.f.seek(0) 45 self.assertEqual(self.f.tell(), 0) 46 self.f.seek(10) 47 self.assertEqual(self.f.tell(), 10) 48 self.f.seek(5, 1) 49 self.assertEqual(self.f.tell(), 15) 50 self.f.seek(-5, 1) 51 self.assertEqual(self.f.tell(), 10) 52 self.f.seek(-5, 2) 53 self.assertEqual(self.f.tell(), 15) 54 55 def testAttributes(self): 56 # verify expected attributes exist 57 f = self.f 58 59 self.assertEqual(f.mode, "wb") 60 self.assertEqual(f.closed, False) 61 62 # verify the attributes are readonly 63 for attr in 'mode', 'closed': 64 self.assertRaises((AttributeError, TypeError), 65 setattr, f, attr, 'oops') 66 67 def testReadinto(self): 68 # verify readinto 69 self.f.write(b"\x01\x02") 70 self.f.close() 71 a = array(b'b', b'x'*10) 72 self.f = _FileIO(TESTFN, 'r') 73 n = self.f.readinto(a) 74 self.assertEqual(array(b'b', [1, 2]), a[:n]) 75 76 def testWritelinesList(self): 77 l = [b'123', b'456'] 78 self.f.writelines(l) 79 self.f.close() 80 self.f = _FileIO(TESTFN, 'rb') 81 buf = self.f.read() 82 self.assertEqual(buf, b'123456') 83 84 def testWritelinesUserList(self): 85 l = UserList([b'123', b'456']) 86 self.f.writelines(l) 87 self.f.close() 88 self.f = _FileIO(TESTFN, 'rb') 89 buf = self.f.read() 90 self.assertEqual(buf, b'123456') 91 92 def testWritelinesError(self): 93 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) 94 self.assertRaises(TypeError, self.f.writelines, None) 95 96 def test_none_args(self): 97 self.f.write(b"hi\nbye\nabc") 98 self.f.close() 99 self.f = _FileIO(TESTFN, 'r') 100 self.assertEqual(self.f.read(None), b"hi\nbye\nabc") 101 self.f.seek(0) 102 self.assertEqual(self.f.readline(None), b"hi\n") 103 self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"]) 104 105 def testRepr(self): 106 self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>" 107 % (self.f.name, self.f.mode)) 108 del self.f.name 109 self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>" 110 % (self.f.fileno(), self.f.mode)) 111 self.f.close() 112 self.assertEqual(repr(self.f), "<_io.FileIO [closed]>") 113 114 def testErrors(self): 115 f = self.f 116 self.assertTrue(not f.isatty()) 117 self.assertTrue(not f.closed) 118 #self.assertEqual(f.name, TESTFN) 119 self.assertRaises(ValueError, f.read, 10) # Open for reading 120 f.close() 121 self.assertTrue(f.closed) 122 f = _FileIO(TESTFN, 'r') 123 self.assertRaises(TypeError, f.readinto, "") 124 self.assertTrue(not f.closed) 125 f.close() 126 self.assertTrue(f.closed) 127 128 def testMethods(self): 129 methods = ['fileno', 'isatty', 'read', 'readinto', 130 'seek', 'tell', 'truncate', 'write', 'seekable', 131 'readable', 'writable'] 132 if sys.platform.startswith('atheos'): 133 methods.remove('truncate') 134 135 self.f.close() 136 self.assertTrue(self.f.closed) 137 138 for methodname in methods: 139 method = getattr(self.f, methodname) 140 # should raise on closed file 141 self.assertRaises(ValueError, method) 142 143 def testOpendir(self): 144 # Issue 3703: opening a directory should fill the errno 145 # Windows always returns "[Errno 13]: Permission denied 146 # Unix calls dircheck() and returns "[Errno 21]: Is a directory" 147 try: 148 _FileIO('.', 'r') 149 except IOError as e: 150 self.assertNotEqual(e.errno, 0) 151 self.assertEqual(e.filename, ".") 152 else: 153 self.fail("Should have raised IOError") 154 155 @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system") 156 def testOpenDirFD(self): 157 fd = os.open('.', os.O_RDONLY) 158 with self.assertRaises(IOError) as cm: 159 _FileIO(fd, 'r') 160 os.close(fd) 161 self.assertEqual(cm.exception.errno, errno.EISDIR) 162 163 #A set of functions testing that we get expected behaviour if someone has 164 #manually closed the internal file descriptor. First, a decorator: 165 def ClosedFD(func): 166 @wraps(func) 167 def wrapper(self): 168 #forcibly close the fd before invoking the problem function 169 f = self.f 170 os.close(f.fileno()) 171 try: 172 func(self, f) 173 finally: 174 try: 175 self.f.close() 176 except IOError: 177 pass 178 return wrapper 179 180 def ClosedFDRaises(func): 181 @wraps(func) 182 def wrapper(self): 183 #forcibly close the fd before invoking the problem function 184 f = self.f 185 os.close(f.fileno()) 186 try: 187 func(self, f) 188 except IOError as e: 189 self.assertEqual(e.errno, errno.EBADF) 190 else: 191 self.fail("Should have raised IOError") 192 finally: 193 try: 194 self.f.close() 195 except IOError: 196 pass 197 return wrapper 198 199 @ClosedFDRaises 200 def testErrnoOnClose(self, f): 201 f.close() 202 203 @ClosedFDRaises 204 def testErrnoOnClosedWrite(self, f): 205 f.write('a') 206 207 @ClosedFDRaises 208 def testErrnoOnClosedSeek(self, f): 209 f.seek(0) 210 211 @ClosedFDRaises 212 def testErrnoOnClosedTell(self, f): 213 f.tell() 214 215 @ClosedFDRaises 216 def testErrnoOnClosedTruncate(self, f): 217 f.truncate(0) 218 219 @ClosedFD 220 def testErrnoOnClosedSeekable(self, f): 221 f.seekable() 222 223 @ClosedFD 224 def testErrnoOnClosedReadable(self, f): 225 f.readable() 226 227 @ClosedFD 228 def testErrnoOnClosedWritable(self, f): 229 f.writable() 230 231 @ClosedFD 232 def testErrnoOnClosedFileno(self, f): 233 f.fileno() 234 235 @ClosedFD 236 def testErrnoOnClosedIsatty(self, f): 237 self.assertEqual(f.isatty(), False) 238 239 def ReopenForRead(self): 240 try: 241 self.f.close() 242 except IOError: 243 pass 244 self.f = _FileIO(TESTFN, 'r') 245 os.close(self.f.fileno()) 246 return self.f 247 248 @ClosedFDRaises 249 def testErrnoOnClosedRead(self, f): 250 f = self.ReopenForRead() 251 f.read(1) 252 253 @ClosedFDRaises 254 def testErrnoOnClosedReadall(self, f): 255 f = self.ReopenForRead() 256 f.readall() 257 258 @ClosedFDRaises 259 def testErrnoOnClosedReadinto(self, f): 260 f = self.ReopenForRead() 261 a = array(b'b', b'x'*10) 262 f.readinto(a) 263 264 class OtherFileTests(unittest.TestCase): 265 266 def testAbles(self): 267 try: 268 f = _FileIO(TESTFN, "w") 269 self.assertEqual(f.readable(), False) 270 self.assertEqual(f.writable(), True) 271 self.assertEqual(f.seekable(), True) 272 f.close() 273 274 f = _FileIO(TESTFN, "r") 275 self.assertEqual(f.readable(), True) 276 self.assertEqual(f.writable(), False) 277 self.assertEqual(f.seekable(), True) 278 f.close() 279 280 f = _FileIO(TESTFN, "a+") 281 self.assertEqual(f.readable(), True) 282 self.assertEqual(f.writable(), True) 283 self.assertEqual(f.seekable(), True) 284 self.assertEqual(f.isatty(), False) 285 f.close() 286 287 if sys.platform != "win32": 288 try: 289 f = _FileIO("/dev/tty", "a") 290 except EnvironmentError: 291 # When run in a cron job there just aren't any 292 # ttys, so skip the test. This also handles other 293 # OS'es that don't support /dev/tty. 294 pass 295 else: 296 self.assertEqual(f.readable(), False) 297 self.assertEqual(f.writable(), True) 298 if sys.platform != "darwin" and \ 299 'bsd' not in sys.platform and \ 300 not sys.platform.startswith('sunos'): 301 # Somehow /dev/tty appears seekable on some BSDs 302 self.assertEqual(f.seekable(), False) 303 self.assertEqual(f.isatty(), True) 304 f.close() 305 finally: 306 os.unlink(TESTFN) 307 308 def testModeStrings(self): 309 # check invalid mode strings 310 for mode in ("", "aU", "wU+", "rw", "rt"): 311 try: 312 f = _FileIO(TESTFN, mode) 313 except ValueError: 314 pass 315 else: 316 f.close() 317 self.fail('%r is an invalid file mode' % mode) 318 319 def testUnicodeOpen(self): 320 # verify repr works for unicode too 321 f = _FileIO(str(TESTFN), "w") 322 f.close() 323 os.unlink(TESTFN) 324 325 def testBytesOpen(self): 326 # Opening a bytes filename 327 try: 328 fn = TESTFN.encode("ascii") 329 except UnicodeEncodeError: 330 # Skip test 331 return 332 f = _FileIO(fn, "w") 333 try: 334 f.write(b"abc") 335 f.close() 336 with open(TESTFN, "rb") as f: 337 self.assertEqual(f.read(), b"abc") 338 finally: 339 os.unlink(TESTFN) 340 341 def testInvalidFd(self): 342 self.assertRaises(ValueError, _FileIO, -10) 343 self.assertRaises(OSError, _FileIO, make_bad_fd()) 344 if sys.platform == 'win32': 345 import msvcrt 346 self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd()) 347 # Issue 15989 348 self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1) 349 self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1) 350 351 def testBadModeArgument(self): 352 # verify that we get a sensible error message for bad mode argument 353 bad_mode = "qwerty" 354 try: 355 f = _FileIO(TESTFN, bad_mode) 356 except ValueError as msg: 357 if msg.args[0] != 0: 358 s = str(msg) 359 if TESTFN in s or bad_mode not in s: 360 self.fail("bad error message for invalid mode: %s" % s) 361 # if msg.args[0] == 0, we're probably on Windows where there may be 362 # no obvious way to discover why open() failed. 363 else: 364 f.close() 365 self.fail("no error for invalid mode: %s" % bad_mode) 366 367 def testTruncate(self): 368 f = _FileIO(TESTFN, 'w') 369 f.write(bytes(bytearray(range(10)))) 370 self.assertEqual(f.tell(), 10) 371 f.truncate(5) 372 self.assertEqual(f.tell(), 10) 373 self.assertEqual(f.seek(0, os.SEEK_END), 5) 374 f.truncate(15) 375 self.assertEqual(f.tell(), 5) 376 self.assertEqual(f.seek(0, os.SEEK_END), 15) 377 f.close() 378 379 def testTruncateOnWindows(self): 380 def bug801631(): 381 # SF bug <http://www.python.org/sf/801631> 382 # "file.truncate fault on windows" 383 f = _FileIO(TESTFN, 'w') 384 f.write(bytes(range(11))) 385 f.close() 386 387 f = _FileIO(TESTFN,'r+') 388 data = f.read(5) 389 if data != bytes(range(5)): 390 self.fail("Read on file opened for update failed %r" % data) 391 if f.tell() != 5: 392 self.fail("File pos after read wrong %d" % f.tell()) 393 394 f.truncate() 395 if f.tell() != 5: 396 self.fail("File pos after ftruncate wrong %d" % f.tell()) 397 398 f.close() 399 size = os.path.getsize(TESTFN) 400 if size != 5: 401 self.fail("File size after ftruncate wrong %d" % size) 402 403 try: 404 bug801631() 405 finally: 406 os.unlink(TESTFN) 407 408 def testAppend(self): 409 try: 410 f = open(TESTFN, 'wb') 411 f.write(b'spam') 412 f.close() 413 f = open(TESTFN, 'ab') 414 f.write(b'eggs') 415 f.close() 416 f = open(TESTFN, 'rb') 417 d = f.read() 418 f.close() 419 self.assertEqual(d, b'spameggs') 420 finally: 421 try: 422 os.unlink(TESTFN) 423 except: 424 pass 425 426 def testInvalidInit(self): 427 self.assertRaises(TypeError, _FileIO, "1", 0, 0) 428 429 def testWarnings(self): 430 with check_warnings(quiet=True) as w: 431 self.assertEqual(w.warnings, []) 432 self.assertRaises(TypeError, _FileIO, []) 433 self.assertEqual(w.warnings, []) 434 self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt") 435 self.assertEqual(w.warnings, []) 436 437 def test_surrogates(self): 438 # Issue #8438: try to open a filename containing surrogates. 439 # It should either fail because the file doesn't exist or the filename 440 # can't be represented using the filesystem encoding, but not because 441 # of a LookupError for the error handler "surrogateescape". 442 filename = u'\udc80.txt' 443 try: 444 with _FileIO(filename): 445 pass 446 except (UnicodeEncodeError, IOError): 447 pass 448 # Spawn a separate Python process with a different "file system 449 # default encoding", to exercise this further. 450 env = dict(os.environ) 451 env[b'LC_CTYPE'] = b'C' 452 _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env) 453 if ('UnicodeEncodeError' not in out and not 454 ( ('IOError: [Errno 2] No such file or directory' in out) or 455 ('IOError: [Errno 22] Invalid argument' in out) ) ): 456 self.fail('Bad output: %r' % out) 457 458 def testUnclosedFDOnException(self): 459 class MyException(Exception): pass 460 class MyFileIO(_FileIO): 461 def __setattr__(self, name, value): 462 if name == "name": 463 raise MyException("blocked setting name") 464 return super(MyFileIO, self).__setattr__(name, value) 465 fd = os.open(__file__, os.O_RDONLY) 466 self.assertRaises(MyException, MyFileIO, fd) 467 os.close(fd) # should not raise OSError(EBADF) 468 469 def test_main(): 470 # Historically, these tests have been sloppy about removing TESTFN. 471 # So get rid of it no matter what. 472 try: 473 run_unittest(AutoFileTests, OtherFileTests) 474 finally: 475 if os.path.exists(TESTFN): 476 os.unlink(TESTFN) 477 478 if __name__ == '__main__': 479 test_main() 480