1 import sys 2 import os 3 import unittest 4 import itertools 5 import select 6 import signal 7 import subprocess 8 import time 9 from array import array 10 from weakref import proxy 11 try: 12 import threading 13 except ImportError: 14 threading = None 15 16 from test import test_support 17 from test.test_support import TESTFN, run_unittest 18 from UserList import UserList 19 20 class AutoFileTests(unittest.TestCase): 21 # file tests for which a test file is automatically set up 22 23 def setUp(self): 24 self.f = open(TESTFN, 'wb') 25 26 def tearDown(self): 27 if self.f: 28 self.f.close() 29 os.remove(TESTFN) 30 31 def testWeakRefs(self): 32 # verify weak references 33 p = proxy(self.f) 34 p.write('teststring') 35 self.assertEqual(self.f.tell(), p.tell()) 36 self.f.close() 37 self.f = None 38 self.assertRaises(ReferenceError, getattr, p, 'tell') 39 40 def testAttributes(self): 41 # verify expected attributes exist 42 f = self.f 43 with test_support.check_py3k_warnings(): 44 softspace = f.softspace 45 f.name # merely shouldn't blow up 46 f.mode # ditto 47 f.closed # ditto 48 49 with test_support.check_py3k_warnings(): 50 # verify softspace is writable 51 f.softspace = softspace # merely shouldn't blow up 52 53 # verify the others aren't 54 for attr in 'name', 'mode', 'closed': 55 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops') 56 57 def testReadinto(self): 58 # verify readinto 59 self.f.write('12') 60 self.f.close() 61 a = array('c', 'x'*10) 62 self.f = open(TESTFN, 'rb') 63 n = self.f.readinto(a) 64 self.assertEqual('12', a.tostring()[:n]) 65 66 def testWritelinesUserList(self): 67 # verify writelines with instance sequence 68 l = UserList(['1', '2']) 69 self.f.writelines(l) 70 self.f.close() 71 self.f = open(TESTFN, 'rb') 72 buf = self.f.read() 73 self.assertEqual(buf, '12') 74 75 def testWritelinesIntegers(self): 76 # verify writelines with integers 77 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) 78 79 def testWritelinesIntegersUserList(self): 80 # verify writelines with integers in UserList 81 l = UserList([1,2,3]) 82 self.assertRaises(TypeError, self.f.writelines, l) 83 84 def testWritelinesNonString(self): 85 # verify writelines with non-string object 86 class NonString: 87 pass 88 89 self.assertRaises(TypeError, self.f.writelines, 90 [NonString(), NonString()]) 91 92 def testRepr(self): 93 # verify repr works 94 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN)) 95 # see issue #14161 96 # Windows doesn't like \r\n\t" in the file name, but ' is ok 97 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx" 98 with open(fname, 'w') as f: 99 self.addCleanup(os.remove, fname) 100 self.assertTrue(repr(f).startswith( 101 "<open file %r, mode 'w' at" % fname)) 102 103 def testErrors(self): 104 self.f.close() 105 self.f = open(TESTFN, 'rb') 106 f = self.f 107 self.assertEqual(f.name, TESTFN) 108 self.assertTrue(not f.isatty()) 109 self.assertTrue(not f.closed) 110 111 self.assertRaises(TypeError, f.readinto, "") 112 f.close() 113 self.assertTrue(f.closed) 114 115 def testMethods(self): 116 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto', 117 'readline', 'readlines', 'seek', 'tell', 'truncate', 118 'write', '__iter__'] 119 deprecated_methods = ['xreadlines'] 120 if sys.platform.startswith('atheos'): 121 methods.remove('truncate') 122 123 # __exit__ should close the file 124 self.f.__exit__(None, None, None) 125 self.assertTrue(self.f.closed) 126 127 for methodname in methods: 128 method = getattr(self.f, methodname) 129 # should raise on closed file 130 self.assertRaises(ValueError, method) 131 with test_support.check_py3k_warnings(): 132 for methodname in deprecated_methods: 133 method = getattr(self.f, methodname) 134 self.assertRaises(ValueError, method) 135 self.assertRaises(ValueError, self.f.writelines, []) 136 137 # file is closed, __exit__ shouldn't do anything 138 self.assertEqual(self.f.__exit__(None, None, None), None) 139 # it must also return None if an exception was given 140 try: 141 1 // 0 142 except: 143 self.assertEqual(self.f.__exit__(*sys.exc_info()), None) 144 145 def testReadWhenWriting(self): 146 self.assertRaises(IOError, self.f.read) 147 148 def testNastyWritelinesGenerator(self): 149 def nasty(): 150 for i in range(5): 151 if i == 3: 152 self.f.close() 153 yield str(i) 154 self.assertRaises(ValueError, self.f.writelines, nasty()) 155 156 def testIssue5677(self): 157 # Remark: Do not perform more than one test per open file, 158 # since that does NOT catch the readline error on Windows. 159 data = 'xxx' 160 for mode in ['w', 'wb', 'a', 'ab']: 161 for attr in ['read', 'readline', 'readlines']: 162 self.f = open(TESTFN, mode) 163 self.f.write(data) 164 self.assertRaises(IOError, getattr(self.f, attr)) 165 self.f.close() 166 167 self.f = open(TESTFN, mode) 168 self.f.write(data) 169 self.assertRaises(IOError, lambda: [line for line in self.f]) 170 self.f.close() 171 172 self.f = open(TESTFN, mode) 173 self.f.write(data) 174 self.assertRaises(IOError, self.f.readinto, bytearray(len(data))) 175 self.f.close() 176 177 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']: 178 self.f = open(TESTFN, mode) 179 self.assertRaises(IOError, self.f.write, data) 180 self.f.close() 181 182 self.f = open(TESTFN, mode) 183 self.assertRaises(IOError, self.f.writelines, [data, data]) 184 self.f.close() 185 186 self.f = open(TESTFN, mode) 187 self.assertRaises(IOError, self.f.truncate) 188 self.f.close() 189 190 class OtherFileTests(unittest.TestCase): 191 192 def testOpenDir(self): 193 this_dir = os.path.dirname(__file__) or os.curdir 194 for mode in (None, "w"): 195 try: 196 if mode: 197 f = open(this_dir, mode) 198 else: 199 f = open(this_dir) 200 except IOError as e: 201 self.assertEqual(e.filename, this_dir) 202 else: 203 self.fail("opening a directory didn't raise an IOError") 204 205 def testModeStrings(self): 206 # check invalid mode strings 207 for mode in ("", "aU", "wU+"): 208 try: 209 f = open(TESTFN, mode) 210 except ValueError: 211 pass 212 else: 213 f.close() 214 self.fail('%r is an invalid file mode' % mode) 215 216 # Some invalid modes fail on Windows, but pass on Unix 217 # Issue3965: avoid a crash on Windows when filename is unicode 218 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')): 219 try: 220 f = open(name, "rr") 221 except (IOError, ValueError): 222 pass 223 else: 224 f.close() 225 226 def testStdin(self): 227 # This causes the interpreter to exit on OSF1 v5.1. 228 if sys.platform != 'osf1V5': 229 self.assertRaises(IOError, sys.stdin.seek, -1) 230 else: 231 print >>sys.__stdout__, ( 232 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.' 233 ' Test manually.') 234 self.assertRaises(IOError, sys.stdin.truncate) 235 236 def testUnicodeOpen(self): 237 # verify repr works for unicode too 238 f = open(unicode(TESTFN), "w") 239 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN)) 240 f.close() 241 os.unlink(TESTFN) 242 243 def testBadModeArgument(self): 244 # verify that we get a sensible error message for bad mode argument 245 bad_mode = "qwerty" 246 try: 247 f = open(TESTFN, bad_mode) 248 except ValueError, msg: 249 if msg.args[0] != 0: 250 s = str(msg) 251 if TESTFN in s or bad_mode not in s: 252 self.fail("bad error message for invalid mode: %s" % s) 253 # if msg.args[0] == 0, we're probably on Windows where there may 254 # be no obvious way to discover why open() failed. 255 else: 256 f.close() 257 self.fail("no error for invalid mode: %s" % bad_mode) 258 259 def testSetBufferSize(self): 260 # make sure that explicitly setting the buffer size doesn't cause 261 # misbehaviour especially with repeated close() calls 262 for s in (-1, 0, 1, 512): 263 try: 264 f = open(TESTFN, 'w', s) 265 f.write(str(s)) 266 f.close() 267 f.close() 268 f = open(TESTFN, 'r', s) 269 d = int(f.read()) 270 f.close() 271 f.close() 272 except IOError, msg: 273 self.fail('error setting buffer size %d: %s' % (s, str(msg))) 274 self.assertEqual(d, s) 275 276 def testTruncateOnWindows(self): 277 os.unlink(TESTFN) 278 279 def bug801631(): 280 # SF bug <http://www.python.org/sf/801631> 281 # "file.truncate fault on windows" 282 f = open(TESTFN, 'wb') 283 f.write('12345678901') # 11 bytes 284 f.close() 285 286 f = open(TESTFN,'rb+') 287 data = f.read(5) 288 if data != '12345': 289 self.fail("Read on file opened for update failed %r" % data) 290 if f.tell() != 5: 291 self.fail("File pos after read wrong %d" % f.tell()) 292 293 f.truncate() 294 if f.tell() != 5: 295 self.fail("File pos after ftruncate wrong %d" % f.tell()) 296 297 f.close() 298 size = os.path.getsize(TESTFN) 299 if size != 5: 300 self.fail("File size after ftruncate wrong %d" % size) 301 302 try: 303 bug801631() 304 finally: 305 os.unlink(TESTFN) 306 307 def testIteration(self): 308 # Test the complex interaction when mixing file-iteration and the 309 # various read* methods. Ostensibly, the mixture could just be tested 310 # to work when it should work according to the Python language, 311 # instead of fail when it should fail according to the current CPython 312 # implementation. People don't always program Python the way they 313 # should, though, and the implemenation might change in subtle ways, 314 # so we explicitly test for errors, too; the test will just have to 315 # be updated when the implementation changes. 316 dataoffset = 16384 317 filler = "ham\n" 318 assert not dataoffset % len(filler), \ 319 "dataoffset must be multiple of len(filler)" 320 nchunks = dataoffset // len(filler) 321 testlines = [ 322 "spam, spam and eggs\n", 323 "eggs, spam, ham and spam\n", 324 "saussages, spam, spam and eggs\n", 325 "spam, ham, spam and eggs\n", 326 "spam, spam, spam, spam, spam, ham, spam\n", 327 "wonderful spaaaaaam.\n" 328 ] 329 methods = [("readline", ()), ("read", ()), ("readlines", ()), 330 ("readinto", (array("c", " "*100),))] 331 332 try: 333 # Prepare the testfile 334 bag = open(TESTFN, "w") 335 bag.write(filler * nchunks) 336 bag.writelines(testlines) 337 bag.close() 338 # Test for appropriate errors mixing read* and iteration 339 for methodname, args in methods: 340 f = open(TESTFN) 341 if f.next() != filler: 342 self.fail, "Broken testfile" 343 meth = getattr(f, methodname) 344 try: 345 meth(*args) 346 except ValueError: 347 pass 348 else: 349 self.fail("%s%r after next() didn't raise ValueError" % 350 (methodname, args)) 351 f.close() 352 353 # Test to see if harmless (by accident) mixing of read* and 354 # iteration still works. This depends on the size of the internal 355 # iteration buffer (currently 8192,) but we can test it in a 356 # flexible manner. Each line in the bag o' ham is 4 bytes 357 # ("h", "a", "m", "\n"), so 4096 lines of that should get us 358 # exactly on the buffer boundary for any power-of-2 buffersize 359 # between 4 and 16384 (inclusive). 360 f = open(TESTFN) 361 for i in range(nchunks): 362 f.next() 363 testline = testlines.pop(0) 364 try: 365 line = f.readline() 366 except ValueError: 367 self.fail("readline() after next() with supposedly empty " 368 "iteration-buffer failed anyway") 369 if line != testline: 370 self.fail("readline() after next() with empty buffer " 371 "failed. Got %r, expected %r" % (line, testline)) 372 testline = testlines.pop(0) 373 buf = array("c", "\x00" * len(testline)) 374 try: 375 f.readinto(buf) 376 except ValueError: 377 self.fail("readinto() after next() with supposedly empty " 378 "iteration-buffer failed anyway") 379 line = buf.tostring() 380 if line != testline: 381 self.fail("readinto() after next() with empty buffer " 382 "failed. Got %r, expected %r" % (line, testline)) 383 384 testline = testlines.pop(0) 385 try: 386 line = f.read(len(testline)) 387 except ValueError: 388 self.fail("read() after next() with supposedly empty " 389 "iteration-buffer failed anyway") 390 if line != testline: 391 self.fail("read() after next() with empty buffer " 392 "failed. Got %r, expected %r" % (line, testline)) 393 try: 394 lines = f.readlines() 395 except ValueError: 396 self.fail("readlines() after next() with supposedly empty " 397 "iteration-buffer failed anyway") 398 if lines != testlines: 399 self.fail("readlines() after next() with empty buffer " 400 "failed. Got %r, expected %r" % (line, testline)) 401 # Reading after iteration hit EOF shouldn't hurt either 402 f = open(TESTFN) 403 try: 404 for line in f: 405 pass 406 try: 407 f.readline() 408 f.readinto(buf) 409 f.read() 410 f.readlines() 411 except ValueError: 412 self.fail("read* failed after next() consumed file") 413 finally: 414 f.close() 415 finally: 416 os.unlink(TESTFN) 417 418 class FileSubclassTests(unittest.TestCase): 419 420 def testExit(self): 421 # test that exiting with context calls subclass' close 422 class C(file): 423 def __init__(self, *args): 424 self.subclass_closed = False 425 file.__init__(self, *args) 426 def close(self): 427 self.subclass_closed = True 428 file.close(self) 429 430 with C(TESTFN, 'w') as f: 431 pass 432 self.assertTrue(f.subclass_closed) 433 434 435 @unittest.skipUnless(threading, 'Threading required for this test.') 436 class FileThreadingTests(unittest.TestCase): 437 # These tests check the ability to call various methods of file objects 438 # (including close()) concurrently without crashing the Python interpreter. 439 # See #815646, #595601 440 441 def setUp(self): 442 self._threads = test_support.threading_setup() 443 self.f = None 444 self.filename = TESTFN 445 with open(self.filename, "w") as f: 446 f.write("\n".join("0123456789")) 447 self._count_lock = threading.Lock() 448 self.close_count = 0 449 self.close_success_count = 0 450 self.use_buffering = False 451 452 def tearDown(self): 453 if self.f: 454 try: 455 self.f.close() 456 except (EnvironmentError, ValueError): 457 pass 458 try: 459 os.remove(self.filename) 460 except EnvironmentError: 461 pass 462 test_support.threading_cleanup(*self._threads) 463 464 def _create_file(self): 465 if self.use_buffering: 466 self.f = open(self.filename, "w+", buffering=1024*16) 467 else: 468 self.f = open(self.filename, "w+") 469 470 def _close_file(self): 471 with self._count_lock: 472 self.close_count += 1 473 self.f.close() 474 with self._count_lock: 475 self.close_success_count += 1 476 477 def _close_and_reopen_file(self): 478 self._close_file() 479 # if close raises an exception thats fine, self.f remains valid so 480 # we don't need to reopen. 481 self._create_file() 482 483 def _run_workers(self, func, nb_workers, duration=0.2): 484 with self._count_lock: 485 self.close_count = 0 486 self.close_success_count = 0 487 self.do_continue = True 488 threads = [] 489 try: 490 for i in range(nb_workers): 491 t = threading.Thread(target=func) 492 t.start() 493 threads.append(t) 494 for _ in xrange(100): 495 time.sleep(duration/100) 496 with self._count_lock: 497 if self.close_count-self.close_success_count > nb_workers+1: 498 if test_support.verbose: 499 print 'Q', 500 break 501 time.sleep(duration) 502 finally: 503 self.do_continue = False 504 for t in threads: 505 t.join() 506 507 def _test_close_open_io(self, io_func, nb_workers=5): 508 def worker(): 509 self._create_file() 510 funcs = itertools.cycle(( 511 lambda: io_func(), 512 lambda: self._close_and_reopen_file(), 513 )) 514 for f in funcs: 515 if not self.do_continue: 516 break 517 try: 518 f() 519 except (IOError, ValueError): 520 pass 521 self._run_workers(worker, nb_workers) 522 if test_support.verbose: 523 # Useful verbose statistics when tuning this test to take 524 # less time to run but still ensuring that its still useful. 525 # 526 # the percent of close calls that raised an error 527 percent = 100. - 100.*self.close_success_count/self.close_count 528 print self.close_count, ('%.4f ' % percent), 529 530 def test_close_open(self): 531 def io_func(): 532 pass 533 self._test_close_open_io(io_func) 534 535 def test_close_open_flush(self): 536 def io_func(): 537 self.f.flush() 538 self._test_close_open_io(io_func) 539 540 def test_close_open_iter(self): 541 def io_func(): 542 list(iter(self.f)) 543 self._test_close_open_io(io_func) 544 545 def test_close_open_isatty(self): 546 def io_func(): 547 self.f.isatty() 548 self._test_close_open_io(io_func) 549 550 def test_close_open_print(self): 551 def io_func(): 552 print >> self.f, '' 553 self._test_close_open_io(io_func) 554 555 def test_close_open_print_buffered(self): 556 self.use_buffering = True 557 def io_func(): 558 print >> self.f, '' 559 self._test_close_open_io(io_func) 560 561 def test_close_open_read(self): 562 def io_func(): 563 self.f.read(0) 564 self._test_close_open_io(io_func) 565 566 def test_close_open_readinto(self): 567 def io_func(): 568 a = array('c', 'xxxxx') 569 self.f.readinto(a) 570 self._test_close_open_io(io_func) 571 572 def test_close_open_readline(self): 573 def io_func(): 574 self.f.readline() 575 self._test_close_open_io(io_func) 576 577 def test_close_open_readlines(self): 578 def io_func(): 579 self.f.readlines() 580 self._test_close_open_io(io_func) 581 582 def test_close_open_seek(self): 583 def io_func(): 584 self.f.seek(0, 0) 585 self._test_close_open_io(io_func) 586 587 def test_close_open_tell(self): 588 def io_func(): 589 self.f.tell() 590 self._test_close_open_io(io_func) 591 592 def test_close_open_truncate(self): 593 def io_func(): 594 self.f.truncate() 595 self._test_close_open_io(io_func) 596 597 def test_close_open_write(self): 598 def io_func(): 599 self.f.write('') 600 self._test_close_open_io(io_func) 601 602 def test_close_open_writelines(self): 603 def io_func(): 604 self.f.writelines('') 605 self._test_close_open_io(io_func) 606 607 608 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.') 609 class TestFileSignalEINTR(unittest.TestCase): 610 def _test_reading(self, data_to_write, read_and_verify_code, method_name, 611 universal_newlines=False): 612 """Generic buffered read method test harness to verify EINTR behavior. 613 614 Also validates that Python signal handlers are run during the read. 615 616 Args: 617 data_to_write: String to write to the child process for reading 618 before sending it a signal, confirming the signal was handled, 619 writing a final newline char and closing the infile pipe. 620 read_and_verify_code: Single "line" of code to read from a file 621 object named 'infile' and validate the result. This will be 622 executed as part of a python subprocess fed data_to_write. 623 method_name: The name of the read method being tested, for use in 624 an error message on failure. 625 universal_newlines: If True, infile will be opened in universal 626 newline mode in the child process. 627 """ 628 if universal_newlines: 629 # Test the \r\n -> \n conversion while we're at it. 630 data_to_write = data_to_write.replace('\n', '\r\n') 631 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")' 632 else: 633 infile_setup_code = 'infile = sys.stdin' 634 # Total pipe IO in this function is smaller than the minimum posix OS 635 # pipe buffer size of 512 bytes. No writer should block. 636 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.' 637 638 child_code = ( 639 'import os, signal, sys ;' 640 'signal.signal(' 641 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;' 642 + infile_setup_code + ' ;' + 643 'assert isinstance(infile, file) ;' 644 'sys.stderr.write("Go.\\n") ;' 645 + read_and_verify_code) 646 reader_process = subprocess.Popen( 647 [sys.executable, '-c', child_code], 648 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 649 stderr=subprocess.PIPE) 650 # Wait for the signal handler to be installed. 651 go = reader_process.stderr.read(4) 652 if go != 'Go.\n': 653 reader_process.kill() 654 self.fail('Error from %s process while awaiting "Go":\n%s' % ( 655 method_name, go+reader_process.stderr.read())) 656 reader_process.stdin.write(data_to_write) 657 signals_sent = 0 658 rlist = [] 659 # We don't know when the read_and_verify_code in our child is actually 660 # executing within the read system call we want to interrupt. This 661 # loop waits for a bit before sending the first signal to increase 662 # the likelihood of that. Implementations without correct EINTR 663 # and signal handling usually fail this test. 664 while not rlist: 665 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05) 666 reader_process.send_signal(signal.SIGINT) 667 # Give the subprocess time to handle it before we loop around and 668 # send another one. On OSX the second signal happening close to 669 # immediately after the first was causing the subprocess to crash 670 # via the OS's default SIGINT handler. 671 time.sleep(0.1) 672 signals_sent += 1 673 if signals_sent > 200: 674 reader_process.kill() 675 self.fail("failed to handle signal during %s." % method_name) 676 # This assumes anything unexpected that writes to stderr will also 677 # write a newline. That is true of the traceback printing code. 678 signal_line = reader_process.stderr.readline() 679 if signal_line != '$\n': 680 reader_process.kill() 681 self.fail('Error from %s process while awaiting signal:\n%s' % ( 682 method_name, signal_line+reader_process.stderr.read())) 683 # We append a newline to our input so that a readline call can 684 # end on its own before the EOF is seen. 685 stdout, stderr = reader_process.communicate(input='\n') 686 if reader_process.returncode != 0: 687 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % ( 688 method_name, reader_process.returncode, stdout, stderr)) 689 690 def test_readline(self, universal_newlines=False): 691 """file.readline must handle signals and not lose data.""" 692 self._test_reading( 693 data_to_write='hello, world!', 694 read_and_verify_code=( 695 'line = infile.readline() ;' 696 'expected_line = "hello, world!\\n" ;' 697 'assert line == expected_line, (' 698 '"read %r expected %r" % (line, expected_line))' 699 ), 700 method_name='readline', 701 universal_newlines=universal_newlines) 702 703 def test_readline_with_universal_newlines(self): 704 self.test_readline(universal_newlines=True) 705 706 def test_readlines(self, universal_newlines=False): 707 """file.readlines must handle signals and not lose data.""" 708 self._test_reading( 709 data_to_write='hello\nworld!', 710 read_and_verify_code=( 711 'lines = infile.readlines() ;' 712 'expected_lines = ["hello\\n", "world!\\n"] ;' 713 'assert lines == expected_lines, (' 714 '"readlines returned wrong data.\\n" ' 715 '"got lines %r\\nexpected %r" ' 716 '% (lines, expected_lines))' 717 ), 718 method_name='readlines', 719 universal_newlines=universal_newlines) 720 721 def test_readlines_with_universal_newlines(self): 722 self.test_readlines(universal_newlines=True) 723 724 def test_readall(self): 725 """Unbounded file.read() must handle signals and not lose data.""" 726 self._test_reading( 727 data_to_write='hello, world!abcdefghijklm', 728 read_and_verify_code=( 729 'data = infile.read() ;' 730 'expected_data = "hello, world!abcdefghijklm\\n";' 731 'assert data == expected_data, (' 732 '"read %r expected %r" % (data, expected_data))' 733 ), 734 method_name='unbounded read') 735 736 def test_readinto(self): 737 """file.readinto must handle signals and not lose data.""" 738 self._test_reading( 739 data_to_write='hello, world!', 740 read_and_verify_code=( 741 'data = bytearray(50) ;' 742 'num_read = infile.readinto(data) ;' 743 'expected_data = "hello, world!\\n";' 744 'assert data[:num_read] == expected_data, (' 745 '"read %r expected %r" % (data, expected_data))' 746 ), 747 method_name='readinto') 748 749 750 class StdoutTests(unittest.TestCase): 751 752 def test_move_stdout_on_write(self): 753 # Issue 3242: sys.stdout can be replaced (and freed) during a 754 # print statement; prevent a segfault in this case 755 save_stdout = sys.stdout 756 757 class File: 758 def write(self, data): 759 if '\n' in data: 760 sys.stdout = save_stdout 761 762 try: 763 sys.stdout = File() 764 print "some text" 765 finally: 766 sys.stdout = save_stdout 767 768 def test_del_stdout_before_print(self): 769 # Issue 4597: 'print' with no argument wasn't reporting when 770 # sys.stdout was deleted. 771 save_stdout = sys.stdout 772 del sys.stdout 773 try: 774 print 775 except RuntimeError as e: 776 self.assertEqual(str(e), "lost sys.stdout") 777 else: 778 self.fail("Expected RuntimeError") 779 finally: 780 sys.stdout = save_stdout 781 782 def test_unicode(self): 783 import subprocess 784 785 def get_message(encoding, *code): 786 code = '\n'.join(code) 787 env = os.environ.copy() 788 env['PYTHONIOENCODING'] = encoding 789 process = subprocess.Popen([sys.executable, "-c", code], 790 stdout=subprocess.PIPE, env=env) 791 stdout, stderr = process.communicate() 792 self.assertEqual(process.returncode, 0) 793 return stdout 794 795 def check_message(text, encoding, expected): 796 stdout = get_message(encoding, 797 "import sys", 798 "sys.stdout.write(%r)" % text, 799 "sys.stdout.flush()") 800 self.assertEqual(stdout, expected) 801 802 # test the encoding 803 check_message(u'15\u20ac', "iso-8859-15", "15\xa4") 804 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac') 805 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20') 806 807 # test the error handler 808 check_message(u'15\u20ac', "iso-8859-1:ignore", "15") 809 check_message(u'15\u20ac', "iso-8859-1:replace", "15?") 810 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac") 811 812 # test the buffer API 813 for objtype in ('buffer', 'bytearray'): 814 stdout = get_message('ascii', 815 'import sys', 816 r'sys.stdout.write(%s("\xe9"))' % objtype, 817 'sys.stdout.flush()') 818 self.assertEqual(stdout, "\xe9") 819 820 821 def test_main(): 822 # Historically, these tests have been sloppy about removing TESTFN. 823 # So get rid of it no matter what. 824 try: 825 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests, 826 FileThreadingTests, TestFileSignalEINTR, StdoutTests) 827 finally: 828 if os.path.exists(TESTFN): 829 os.unlink(TESTFN) 830 831 if __name__ == '__main__': 832 test_main() 833