1 import sys 2 import os 3 import unittest 4 import itertools 5 import time 6 from array import array 7 from weakref import proxy 8 try: 9 import threading 10 except ImportError: 11 threading = None 12 13 from test import test_support 14 from test.test_support import TESTFN, run_unittest 15 from UserList import UserList 16 17 class AutoFileTests(unittest.TestCase): 18 # file tests for which a test file is automatically set up 19 20 def setUp(self): 21 self.f = open(TESTFN, 'wb') 22 23 def tearDown(self): 24 if self.f: 25 self.f.close() 26 os.remove(TESTFN) 27 28 def testWeakRefs(self): 29 # verify weak references 30 p = proxy(self.f) 31 p.write('teststring') 32 self.assertEqual(self.f.tell(), p.tell()) 33 self.f.close() 34 self.f = None 35 self.assertRaises(ReferenceError, getattr, p, 'tell') 36 37 def testAttributes(self): 38 # verify expected attributes exist 39 f = self.f 40 with test_support.check_py3k_warnings(): 41 softspace = f.softspace 42 f.name # merely shouldn't blow up 43 f.mode # ditto 44 f.closed # ditto 45 46 with test_support.check_py3k_warnings(): 47 # verify softspace is writable 48 f.softspace = softspace # merely shouldn't blow up 49 50 # verify the others aren't 51 for attr in 'name', 'mode', 'closed': 52 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops') 53 54 def testReadinto(self): 55 # verify readinto 56 self.f.write('12') 57 self.f.close() 58 a = array('c', 'x'*10) 59 self.f = open(TESTFN, 'rb') 60 n = self.f.readinto(a) 61 self.assertEqual('12', a.tostring()[:n]) 62 63 def testWritelinesUserList(self): 64 # verify writelines with instance sequence 65 l = UserList(['1', '2']) 66 self.f.writelines(l) 67 self.f.close() 68 self.f = open(TESTFN, 'rb') 69 buf = self.f.read() 70 self.assertEqual(buf, '12') 71 72 def testWritelinesIntegers(self): 73 # verify writelines with integers 74 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) 75 76 def testWritelinesIntegersUserList(self): 77 # verify writelines with integers in UserList 78 l = UserList([1,2,3]) 79 self.assertRaises(TypeError, self.f.writelines, l) 80 81 def testWritelinesNonString(self): 82 # verify writelines with non-string object 83 class NonString: 84 pass 85 86 self.assertRaises(TypeError, self.f.writelines, 87 [NonString(), NonString()]) 88 89 def testRepr(self): 90 # verify repr works 91 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN)) 92 93 def testErrors(self): 94 self.f.close() 95 self.f = open(TESTFN, 'rb') 96 f = self.f 97 self.assertEqual(f.name, TESTFN) 98 self.assertTrue(not f.isatty()) 99 self.assertTrue(not f.closed) 100 101 self.assertRaises(TypeError, f.readinto, "") 102 f.close() 103 self.assertTrue(f.closed) 104 105 def testMethods(self): 106 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto', 107 'readline', 'readlines', 'seek', 'tell', 'truncate', 108 'write', '__iter__'] 109 deprecated_methods = ['xreadlines'] 110 if sys.platform.startswith('atheos'): 111 methods.remove('truncate') 112 113 # __exit__ should close the file 114 self.f.__exit__(None, None, None) 115 self.assertTrue(self.f.closed) 116 117 for methodname in methods: 118 method = getattr(self.f, methodname) 119 # should raise on closed file 120 self.assertRaises(ValueError, method) 121 with test_support.check_py3k_warnings(): 122 for methodname in deprecated_methods: 123 method = getattr(self.f, methodname) 124 self.assertRaises(ValueError, method) 125 self.assertRaises(ValueError, self.f.writelines, []) 126 127 # file is closed, __exit__ shouldn't do anything 128 self.assertEqual(self.f.__exit__(None, None, None), None) 129 # it must also return None if an exception was given 130 try: 131 1 // 0 132 except: 133 self.assertEqual(self.f.__exit__(*sys.exc_info()), None) 134 135 def testReadWhenWriting(self): 136 self.assertRaises(IOError, self.f.read) 137 138 def testNastyWritelinesGenerator(self): 139 def nasty(): 140 for i in range(5): 141 if i == 3: 142 self.f.close() 143 yield str(i) 144 self.assertRaises(ValueError, self.f.writelines, nasty()) 145 146 def testIssue5677(self): 147 # Remark: Do not perform more than one test per open file, 148 # since that does NOT catch the readline error on Windows. 149 data = 'xxx' 150 for mode in ['w', 'wb', 'a', 'ab']: 151 for attr in ['read', 'readline', 'readlines']: 152 self.f = open(TESTFN, mode) 153 self.f.write(data) 154 self.assertRaises(IOError, getattr(self.f, attr)) 155 self.f.close() 156 157 self.f = open(TESTFN, mode) 158 self.f.write(data) 159 self.assertRaises(IOError, lambda: [line for line in self.f]) 160 self.f.close() 161 162 self.f = open(TESTFN, mode) 163 self.f.write(data) 164 self.assertRaises(IOError, self.f.readinto, bytearray(len(data))) 165 self.f.close() 166 167 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']: 168 self.f = open(TESTFN, mode) 169 self.assertRaises(IOError, self.f.write, data) 170 self.f.close() 171 172 self.f = open(TESTFN, mode) 173 self.assertRaises(IOError, self.f.writelines, [data, data]) 174 self.f.close() 175 176 self.f = open(TESTFN, mode) 177 self.assertRaises(IOError, self.f.truncate) 178 self.f.close() 179 180 class OtherFileTests(unittest.TestCase): 181 182 def testOpenDir(self): 183 this_dir = os.path.dirname(__file__) or os.curdir 184 for mode in (None, "w"): 185 try: 186 if mode: 187 f = open(this_dir, mode) 188 else: 189 f = open(this_dir) 190 except IOError as e: 191 self.assertEqual(e.filename, this_dir) 192 else: 193 self.fail("opening a directory didn't raise an IOError") 194 195 def testModeStrings(self): 196 # check invalid mode strings 197 for mode in ("", "aU", "wU+"): 198 try: 199 f = open(TESTFN, mode) 200 except ValueError: 201 pass 202 else: 203 f.close() 204 self.fail('%r is an invalid file mode' % mode) 205 206 # Some invalid modes fail on Windows, but pass on Unix 207 # Issue3965: avoid a crash on Windows when filename is unicode 208 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')): 209 try: 210 f = open(name, "rr") 211 except (IOError, ValueError): 212 pass 213 else: 214 f.close() 215 216 def testStdin(self): 217 # This causes the interpreter to exit on OSF1 v5.1. 218 if sys.platform != 'osf1V5': 219 self.assertRaises(IOError, sys.stdin.seek, -1) 220 else: 221 print >>sys.__stdout__, ( 222 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.' 223 ' Test manually.') 224 self.assertRaises(IOError, sys.stdin.truncate) 225 226 def testUnicodeOpen(self): 227 # verify repr works for unicode too 228 f = open(unicode(TESTFN), "w") 229 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN)) 230 f.close() 231 os.unlink(TESTFN) 232 233 def testBadModeArgument(self): 234 # verify that we get a sensible error message for bad mode argument 235 bad_mode = "qwerty" 236 try: 237 f = open(TESTFN, bad_mode) 238 except ValueError, msg: 239 if msg.args[0] != 0: 240 s = str(msg) 241 if TESTFN in s or bad_mode not in s: 242 self.fail("bad error message for invalid mode: %s" % s) 243 # if msg.args[0] == 0, we're probably on Windows where there may 244 # be no obvious way to discover why open() failed. 245 else: 246 f.close() 247 self.fail("no error for invalid mode: %s" % bad_mode) 248 249 def testSetBufferSize(self): 250 # make sure that explicitly setting the buffer size doesn't cause 251 # misbehaviour especially with repeated close() calls 252 for s in (-1, 0, 1, 512): 253 try: 254 f = open(TESTFN, 'w', s) 255 f.write(str(s)) 256 f.close() 257 f.close() 258 f = open(TESTFN, 'r', s) 259 d = int(f.read()) 260 f.close() 261 f.close() 262 except IOError, msg: 263 self.fail('error setting buffer size %d: %s' % (s, str(msg))) 264 self.assertEqual(d, s) 265 266 def testTruncateOnWindows(self): 267 os.unlink(TESTFN) 268 269 def bug801631(): 270 # SF bug <http://www.python.org/sf/801631> 271 # "file.truncate fault on windows" 272 f = open(TESTFN, 'wb') 273 f.write('12345678901') # 11 bytes 274 f.close() 275 276 f = open(TESTFN,'rb+') 277 data = f.read(5) 278 if data != '12345': 279 self.fail("Read on file opened for update failed %r" % data) 280 if f.tell() != 5: 281 self.fail("File pos after read wrong %d" % f.tell()) 282 283 f.truncate() 284 if f.tell() != 5: 285 self.fail("File pos after ftruncate wrong %d" % f.tell()) 286 287 f.close() 288 size = os.path.getsize(TESTFN) 289 if size != 5: 290 self.fail("File size after ftruncate wrong %d" % size) 291 292 try: 293 bug801631() 294 finally: 295 os.unlink(TESTFN) 296 297 def testIteration(self): 298 # Test the complex interaction when mixing file-iteration and the 299 # various read* methods. Ostensibly, the mixture could just be tested 300 # to work when it should work according to the Python language, 301 # instead of fail when it should fail according to the current CPython 302 # implementation. People don't always program Python the way they 303 # should, though, and the implemenation might change in subtle ways, 304 # so we explicitly test for errors, too; the test will just have to 305 # be updated when the implementation changes. 306 dataoffset = 16384 307 filler = "ham\n" 308 assert not dataoffset % len(filler), \ 309 "dataoffset must be multiple of len(filler)" 310 nchunks = dataoffset // len(filler) 311 testlines = [ 312 "spam, spam and eggs\n", 313 "eggs, spam, ham and spam\n", 314 "saussages, spam, spam and eggs\n", 315 "spam, ham, spam and eggs\n", 316 "spam, spam, spam, spam, spam, ham, spam\n", 317 "wonderful spaaaaaam.\n" 318 ] 319 methods = [("readline", ()), ("read", ()), ("readlines", ()), 320 ("readinto", (array("c", " "*100),))] 321 322 try: 323 # Prepare the testfile 324 bag = open(TESTFN, "w") 325 bag.write(filler * nchunks) 326 bag.writelines(testlines) 327 bag.close() 328 # Test for appropriate errors mixing read* and iteration 329 for methodname, args in methods: 330 f = open(TESTFN) 331 if f.next() != filler: 332 self.fail, "Broken testfile" 333 meth = getattr(f, methodname) 334 try: 335 meth(*args) 336 except ValueError: 337 pass 338 else: 339 self.fail("%s%r after next() didn't raise ValueError" % 340 (methodname, args)) 341 f.close() 342 343 # Test to see if harmless (by accident) mixing of read* and 344 # iteration still works. This depends on the size of the internal 345 # iteration buffer (currently 8192,) but we can test it in a 346 # flexible manner. Each line in the bag o' ham is 4 bytes 347 # ("h", "a", "m", "\n"), so 4096 lines of that should get us 348 # exactly on the buffer boundary for any power-of-2 buffersize 349 # between 4 and 16384 (inclusive). 350 f = open(TESTFN) 351 for i in range(nchunks): 352 f.next() 353 testline = testlines.pop(0) 354 try: 355 line = f.readline() 356 except ValueError: 357 self.fail("readline() after next() with supposedly empty " 358 "iteration-buffer failed anyway") 359 if line != testline: 360 self.fail("readline() after next() with empty buffer " 361 "failed. Got %r, expected %r" % (line, testline)) 362 testline = testlines.pop(0) 363 buf = array("c", "\x00" * len(testline)) 364 try: 365 f.readinto(buf) 366 except ValueError: 367 self.fail("readinto() after next() with supposedly empty " 368 "iteration-buffer failed anyway") 369 line = buf.tostring() 370 if line != testline: 371 self.fail("readinto() after next() with empty buffer " 372 "failed. Got %r, expected %r" % (line, testline)) 373 374 testline = testlines.pop(0) 375 try: 376 line = f.read(len(testline)) 377 except ValueError: 378 self.fail("read() after next() with supposedly empty " 379 "iteration-buffer failed anyway") 380 if line != testline: 381 self.fail("read() after next() with empty buffer " 382 "failed. Got %r, expected %r" % (line, testline)) 383 try: 384 lines = f.readlines() 385 except ValueError: 386 self.fail("readlines() after next() with supposedly empty " 387 "iteration-buffer failed anyway") 388 if lines != testlines: 389 self.fail("readlines() after next() with empty buffer " 390 "failed. Got %r, expected %r" % (line, testline)) 391 # Reading after iteration hit EOF shouldn't hurt either 392 f = open(TESTFN) 393 try: 394 for line in f: 395 pass 396 try: 397 f.readline() 398 f.readinto(buf) 399 f.read() 400 f.readlines() 401 except ValueError: 402 self.fail("read* failed after next() consumed file") 403 finally: 404 f.close() 405 finally: 406 os.unlink(TESTFN) 407 408 class FileSubclassTests(unittest.TestCase): 409 410 def testExit(self): 411 # test that exiting with context calls subclass' close 412 class C(file): 413 def __init__(self, *args): 414 self.subclass_closed = False 415 file.__init__(self, *args) 416 def close(self): 417 self.subclass_closed = True 418 file.close(self) 419 420 with C(TESTFN, 'w') as f: 421 pass 422 self.assertTrue(f.subclass_closed) 423 424 425 @unittest.skipUnless(threading, 'Threading required for this test.') 426 class FileThreadingTests(unittest.TestCase): 427 # These tests check the ability to call various methods of file objects 428 # (including close()) concurrently without crashing the Python interpreter. 429 # See #815646, #595601 430 431 def setUp(self): 432 self._threads = test_support.threading_setup() 433 self.f = None 434 self.filename = TESTFN 435 with open(self.filename, "w") as f: 436 f.write("\n".join("0123456789")) 437 self._count_lock = threading.Lock() 438 self.close_count = 0 439 self.close_success_count = 0 440 self.use_buffering = False 441 442 def tearDown(self): 443 if self.f: 444 try: 445 self.f.close() 446 except (EnvironmentError, ValueError): 447 pass 448 try: 449 os.remove(self.filename) 450 except EnvironmentError: 451 pass 452 test_support.threading_cleanup(*self._threads) 453 454 def _create_file(self): 455 if self.use_buffering: 456 self.f = open(self.filename, "w+", buffering=1024*16) 457 else: 458 self.f = open(self.filename, "w+") 459 460 def _close_file(self): 461 with self._count_lock: 462 self.close_count += 1 463 self.f.close() 464 with self._count_lock: 465 self.close_success_count += 1 466 467 def _close_and_reopen_file(self): 468 self._close_file() 469 # if close raises an exception thats fine, self.f remains valid so 470 # we don't need to reopen. 471 self._create_file() 472 473 def _run_workers(self, func, nb_workers, duration=0.2): 474 with self._count_lock: 475 self.close_count = 0 476 self.close_success_count = 0 477 self.do_continue = True 478 threads = [] 479 try: 480 for i in range(nb_workers): 481 t = threading.Thread(target=func) 482 t.start() 483 threads.append(t) 484 for _ in xrange(100): 485 time.sleep(duration/100) 486 with self._count_lock: 487 if self.close_count-self.close_success_count > nb_workers+1: 488 if test_support.verbose: 489 print 'Q', 490 break 491 time.sleep(duration) 492 finally: 493 self.do_continue = False 494 for t in threads: 495 t.join() 496 497 def _test_close_open_io(self, io_func, nb_workers=5): 498 def worker(): 499 self._create_file() 500 funcs = itertools.cycle(( 501 lambda: io_func(), 502 lambda: self._close_and_reopen_file(), 503 )) 504 for f in funcs: 505 if not self.do_continue: 506 break 507 try: 508 f() 509 except (IOError, ValueError): 510 pass 511 self._run_workers(worker, nb_workers) 512 if test_support.verbose: 513 # Useful verbose statistics when tuning this test to take 514 # less time to run but still ensuring that its still useful. 515 # 516 # the percent of close calls that raised an error 517 percent = 100. - 100.*self.close_success_count/self.close_count 518 print self.close_count, ('%.4f ' % percent), 519 520 def test_close_open(self): 521 def io_func(): 522 pass 523 self._test_close_open_io(io_func) 524 525 def test_close_open_flush(self): 526 def io_func(): 527 self.f.flush() 528 self._test_close_open_io(io_func) 529 530 def test_close_open_iter(self): 531 def io_func(): 532 list(iter(self.f)) 533 self._test_close_open_io(io_func) 534 535 def test_close_open_isatty(self): 536 def io_func(): 537 self.f.isatty() 538 self._test_close_open_io(io_func) 539 540 def test_close_open_print(self): 541 def io_func(): 542 print >> self.f, '' 543 self._test_close_open_io(io_func) 544 545 def test_close_open_print_buffered(self): 546 self.use_buffering = True 547 def io_func(): 548 print >> self.f, '' 549 self._test_close_open_io(io_func) 550 551 def test_close_open_read(self): 552 def io_func(): 553 self.f.read(0) 554 self._test_close_open_io(io_func) 555 556 def test_close_open_readinto(self): 557 def io_func(): 558 a = array('c', 'xxxxx') 559 self.f.readinto(a) 560 self._test_close_open_io(io_func) 561 562 def test_close_open_readline(self): 563 def io_func(): 564 self.f.readline() 565 self._test_close_open_io(io_func) 566 567 def test_close_open_readlines(self): 568 def io_func(): 569 self.f.readlines() 570 self._test_close_open_io(io_func) 571 572 def test_close_open_seek(self): 573 def io_func(): 574 self.f.seek(0, 0) 575 self._test_close_open_io(io_func) 576 577 def test_close_open_tell(self): 578 def io_func(): 579 self.f.tell() 580 self._test_close_open_io(io_func) 581 582 def test_close_open_truncate(self): 583 def io_func(): 584 self.f.truncate() 585 self._test_close_open_io(io_func) 586 587 def test_close_open_write(self): 588 def io_func(): 589 self.f.write('') 590 self._test_close_open_io(io_func) 591 592 def test_close_open_writelines(self): 593 def io_func(): 594 self.f.writelines('') 595 self._test_close_open_io(io_func) 596 597 598 class StdoutTests(unittest.TestCase): 599 600 def test_move_stdout_on_write(self): 601 # Issue 3242: sys.stdout can be replaced (and freed) during a 602 # print statement; prevent a segfault in this case 603 save_stdout = sys.stdout 604 605 class File: 606 def write(self, data): 607 if '\n' in data: 608 sys.stdout = save_stdout 609 610 try: 611 sys.stdout = File() 612 print "some text" 613 finally: 614 sys.stdout = save_stdout 615 616 def test_del_stdout_before_print(self): 617 # Issue 4597: 'print' with no argument wasn't reporting when 618 # sys.stdout was deleted. 619 save_stdout = sys.stdout 620 del sys.stdout 621 try: 622 print 623 except RuntimeError as e: 624 self.assertEqual(str(e), "lost sys.stdout") 625 else: 626 self.fail("Expected RuntimeError") 627 finally: 628 sys.stdout = save_stdout 629 630 def test_unicode(self): 631 import subprocess 632 633 def get_message(encoding, *code): 634 code = '\n'.join(code) 635 env = os.environ.copy() 636 env['PYTHONIOENCODING'] = encoding 637 process = subprocess.Popen([sys.executable, "-c", code], 638 stdout=subprocess.PIPE, env=env) 639 stdout, stderr = process.communicate() 640 self.assertEqual(process.returncode, 0) 641 return stdout 642 643 def check_message(text, encoding, expected): 644 stdout = get_message(encoding, 645 "import sys", 646 "sys.stdout.write(%r)" % text, 647 "sys.stdout.flush()") 648 self.assertEqual(stdout, expected) 649 650 # test the encoding 651 check_message(u'15\u20ac', "iso-8859-15", "15\xa4") 652 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac') 653 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20') 654 655 # test the error handler 656 check_message(u'15\u20ac', "iso-8859-1:ignore", "15") 657 check_message(u'15\u20ac', "iso-8859-1:replace", "15?") 658 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac") 659 660 # test the buffer API 661 for objtype in ('buffer', 'bytearray'): 662 stdout = get_message('ascii', 663 'import sys', 664 r'sys.stdout.write(%s("\xe9"))' % objtype, 665 'sys.stdout.flush()') 666 self.assertEqual(stdout, "\xe9") 667 668 669 def test_main(): 670 # Historically, these tests have been sloppy about removing TESTFN. 671 # So get rid of it no matter what. 672 try: 673 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests, 674 FileThreadingTests, StdoutTests) 675 finally: 676 if os.path.exists(TESTFN): 677 os.unlink(TESTFN) 678 679 if __name__ == '__main__': 680 test_main() 681