1 """Unit tests for the io module.""" 2 3 # Tests of io are scattered over the test suite: 4 # * test_bufio - tests file buffering 5 # * test_memoryio - tests BytesIO and StringIO 6 # * test_fileio - tests FileIO 7 # * test_file - tests the file interface 8 # * test_io - tests everything else in the io module 9 # * test_univnewlines - tests universal newline support 10 # * test_largefile - tests operations on a file greater than 2**32 bytes 11 # (only enabled with -ulargefile) 12 13 ################################################################################ 14 # ATTENTION TEST WRITERS!!! 15 ################################################################################ 16 # When writing tests for io, it's important to test both the C and Python 17 # implementations. This is usually done by writing a base test that refers to 18 # the type it is testing as an attribute. Then it provides custom subclasses to 19 # test both implementations. This file has lots of examples. 20 ################################################################################ 21 22 import abc 23 import array 24 import errno 25 import locale 26 import os 27 import pickle 28 import random 29 import signal 30 import sys 31 import time 32 import unittest 33 import warnings 34 import weakref 35 from collections import deque, UserList 36 from itertools import cycle, count 37 from test import support 38 from test.support.script_helper import assert_python_ok, run_python_until_end 39 40 import codecs 41 import io # C implementation of io 42 import _pyio as pyio # Python implementation of io 43 try: 44 import threading 45 except ImportError: 46 threading = None 47 48 try: 49 import ctypes 50 except ImportError: 51 def byteslike(*pos, **kw): 52 return array.array("b", bytes(*pos, **kw)) 53 else: 54 def byteslike(*pos, **kw): 55 """Create a bytes-like object having no string or sequence methods""" 56 data = bytes(*pos, **kw) 57 obj = EmptyStruct() 58 ctypes.resize(obj, len(data)) 59 memoryview(obj).cast("B")[:] = data 60 return obj 61 class EmptyStruct(ctypes.Structure): 62 pass 63 64 def _default_chunk_size(): 65 """Get the default TextIOWrapper chunk size""" 66 with open(__file__, "r", encoding="latin-1") as f: 67 return f._CHUNK_SIZE 68 69 70 class MockRawIOWithoutRead: 71 """A RawIO implementation without read(), so as to exercise the default 72 RawIO.read() which calls readinto().""" 73 74 def __init__(self, read_stack=()): 75 self._read_stack = list(read_stack) 76 self._write_stack = [] 77 self._reads = 0 78 self._extraneous_reads = 0 79 80 def write(self, b): 81 self._write_stack.append(bytes(b)) 82 return len(b) 83 84 def writable(self): 85 return True 86 87 def fileno(self): 88 return 42 89 90 def readable(self): 91 return True 92 93 def seekable(self): 94 return True 95 96 def seek(self, pos, whence): 97 return 0 # wrong but we gotta return something 98 99 def tell(self): 100 return 0 # same comment as above 101 102 def readinto(self, buf): 103 self._reads += 1 104 max_len = len(buf) 105 try: 106 data = self._read_stack[0] 107 except IndexError: 108 self._extraneous_reads += 1 109 return 0 110 if data is None: 111 del self._read_stack[0] 112 return None 113 n = len(data) 114 if len(data) <= max_len: 115 del self._read_stack[0] 116 buf[:n] = data 117 return n 118 else: 119 buf[:] = data[:max_len] 120 self._read_stack[0] = data[max_len:] 121 return max_len 122 123 def truncate(self, pos=None): 124 return pos 125 126 class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 127 pass 128 129 class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 130 pass 131 132 133 class MockRawIO(MockRawIOWithoutRead): 134 135 def read(self, n=None): 136 self._reads += 1 137 try: 138 return self._read_stack.pop(0) 139 except: 140 self._extraneous_reads += 1 141 return b"" 142 143 class CMockRawIO(MockRawIO, io.RawIOBase): 144 pass 145 146 class PyMockRawIO(MockRawIO, pyio.RawIOBase): 147 pass 148 149 150 class MisbehavedRawIO(MockRawIO): 151 def write(self, b): 152 return super().write(b) * 2 153 154 def read(self, n=None): 155 return super().read(n) * 2 156 157 def seek(self, pos, whence): 158 return -123 159 160 def tell(self): 161 return -456 162 163 def readinto(self, buf): 164 super().readinto(buf) 165 return len(buf) * 5 166 167 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 168 pass 169 170 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 171 pass 172 173 174 class CloseFailureIO(MockRawIO): 175 closed = 0 176 177 def close(self): 178 if not self.closed: 179 self.closed = 1 180 raise OSError 181 182 class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 183 pass 184 185 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 186 pass 187 188 189 class MockFileIO: 190 191 def __init__(self, data): 192 self.read_history = [] 193 super().__init__(data) 194 195 def read(self, n=None): 196 res = super().read(n) 197 self.read_history.append(None if res is None else len(res)) 198 return res 199 200 def readinto(self, b): 201 res = super().readinto(b) 202 self.read_history.append(res) 203 return res 204 205 class CMockFileIO(MockFileIO, io.BytesIO): 206 pass 207 208 class PyMockFileIO(MockFileIO, pyio.BytesIO): 209 pass 210 211 212 class MockUnseekableIO: 213 def seekable(self): 214 return False 215 216 def seek(self, *args): 217 raise self.UnsupportedOperation("not seekable") 218 219 def tell(self, *args): 220 raise self.UnsupportedOperation("not seekable") 221 222 def truncate(self, *args): 223 raise self.UnsupportedOperation("not seekable") 224 225 class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): 226 UnsupportedOperation = io.UnsupportedOperation 227 228 class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): 229 UnsupportedOperation = pyio.UnsupportedOperation 230 231 232 class MockNonBlockWriterIO: 233 234 def __init__(self): 235 self._write_stack = [] 236 self._blocker_char = None 237 238 def pop_written(self): 239 s = b"".join(self._write_stack) 240 self._write_stack[:] = [] 241 return s 242 243 def block_on(self, char): 244 """Block when a given char is encountered.""" 245 self._blocker_char = char 246 247 def readable(self): 248 return True 249 250 def seekable(self): 251 return True 252 253 def writable(self): 254 return True 255 256 def write(self, b): 257 b = bytes(b) 258 n = -1 259 if self._blocker_char: 260 try: 261 n = b.index(self._blocker_char) 262 except ValueError: 263 pass 264 else: 265 if n > 0: 266 # write data up to the first blocker 267 self._write_stack.append(b[:n]) 268 return n 269 else: 270 # cancel blocker and indicate would block 271 self._blocker_char = None 272 return None 273 self._write_stack.append(b) 274 return len(b) 275 276 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 277 BlockingIOError = io.BlockingIOError 278 279 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 280 BlockingIOError = pyio.BlockingIOError 281 282 283 class IOTest(unittest.TestCase): 284 285 def setUp(self): 286 support.unlink(support.TESTFN) 287 288 def tearDown(self): 289 support.unlink(support.TESTFN) 290 291 def write_ops(self, f): 292 self.assertEqual(f.write(b"blah."), 5) 293 f.truncate(0) 294 self.assertEqual(f.tell(), 5) 295 f.seek(0) 296 297 self.assertEqual(f.write(b"blah."), 5) 298 self.assertEqual(f.seek(0), 0) 299 self.assertEqual(f.write(b"Hello."), 6) 300 self.assertEqual(f.tell(), 6) 301 self.assertEqual(f.seek(-1, 1), 5) 302 self.assertEqual(f.tell(), 5) 303 buffer = bytearray(b" world\n\n\n") 304 self.assertEqual(f.write(buffer), 9) 305 buffer[:] = b"*" * 9 # Overwrite our copy of the data 306 self.assertEqual(f.seek(0), 0) 307 self.assertEqual(f.write(b"h"), 1) 308 self.assertEqual(f.seek(-1, 2), 13) 309 self.assertEqual(f.tell(), 13) 310 311 self.assertEqual(f.truncate(12), 12) 312 self.assertEqual(f.tell(), 13) 313 self.assertRaises(TypeError, f.seek, 0.0) 314 315 def read_ops(self, f, buffered=False): 316 data = f.read(5) 317 self.assertEqual(data, b"hello") 318 data = byteslike(data) 319 self.assertEqual(f.readinto(data), 5) 320 self.assertEqual(bytes(data), b" worl") 321 data = bytearray(5) 322 self.assertEqual(f.readinto(data), 2) 323 self.assertEqual(len(data), 5) 324 self.assertEqual(data[:2], b"d\n") 325 self.assertEqual(f.seek(0), 0) 326 self.assertEqual(f.read(20), b"hello world\n") 327 self.assertEqual(f.read(1), b"") 328 self.assertEqual(f.readinto(byteslike(b"x")), 0) 329 self.assertEqual(f.seek(-6, 2), 6) 330 self.assertEqual(f.read(5), b"world") 331 self.assertEqual(f.read(0), b"") 332 self.assertEqual(f.readinto(byteslike()), 0) 333 self.assertEqual(f.seek(-6, 1), 5) 334 self.assertEqual(f.read(5), b" worl") 335 self.assertEqual(f.tell(), 10) 336 self.assertRaises(TypeError, f.seek, 0.0) 337 if buffered: 338 f.seek(0) 339 self.assertEqual(f.read(), b"hello world\n") 340 f.seek(6) 341 self.assertEqual(f.read(), b"world\n") 342 self.assertEqual(f.read(), b"") 343 f.seek(0) 344 data = byteslike(5) 345 self.assertEqual(f.readinto1(data), 5) 346 self.assertEqual(bytes(data), b"hello") 347 348 LARGE = 2**31 349 350 def large_file_ops(self, f): 351 assert f.readable() 352 assert f.writable() 353 try: 354 self.assertEqual(f.seek(self.LARGE), self.LARGE) 355 except (OverflowError, ValueError): 356 self.skipTest("no largefile support") 357 self.assertEqual(f.tell(), self.LARGE) 358 self.assertEqual(f.write(b"xxx"), 3) 359 self.assertEqual(f.tell(), self.LARGE + 3) 360 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 361 self.assertEqual(f.truncate(), self.LARGE + 2) 362 self.assertEqual(f.tell(), self.LARGE + 2) 363 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 364 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 365 self.assertEqual(f.tell(), self.LARGE + 2) 366 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 367 self.assertEqual(f.seek(-1, 2), self.LARGE) 368 self.assertEqual(f.read(2), b"x") 369 370 def test_invalid_operations(self): 371 # Try writing on a file opened in read mode and vice-versa. 372 exc = self.UnsupportedOperation 373 for mode in ("w", "wb"): 374 with self.open(support.TESTFN, mode) as fp: 375 self.assertRaises(exc, fp.read) 376 self.assertRaises(exc, fp.readline) 377 with self.open(support.TESTFN, "wb", buffering=0) as fp: 378 self.assertRaises(exc, fp.read) 379 self.assertRaises(exc, fp.readline) 380 with self.open(support.TESTFN, "rb", buffering=0) as fp: 381 self.assertRaises(exc, fp.write, b"blah") 382 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 383 with self.open(support.TESTFN, "rb") as fp: 384 self.assertRaises(exc, fp.write, b"blah") 385 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 386 with self.open(support.TESTFN, "r") as fp: 387 self.assertRaises(exc, fp.write, "blah") 388 self.assertRaises(exc, fp.writelines, ["blah\n"]) 389 # Non-zero seeking from current or end pos 390 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) 391 self.assertRaises(exc, fp.seek, -1, self.SEEK_END) 392 393 def test_optional_abilities(self): 394 # Test for OSError when optional APIs are not supported 395 # The purpose of this test is to try fileno(), reading, writing and 396 # seeking operations with various objects that indicate they do not 397 # support these operations. 398 399 def pipe_reader(): 400 [r, w] = os.pipe() 401 os.close(w) # So that read() is harmless 402 return self.FileIO(r, "r") 403 404 def pipe_writer(): 405 [r, w] = os.pipe() 406 self.addCleanup(os.close, r) 407 # Guarantee that we can write into the pipe without blocking 408 thread = threading.Thread(target=os.read, args=(r, 100)) 409 thread.start() 410 self.addCleanup(thread.join) 411 return self.FileIO(w, "w") 412 413 def buffered_reader(): 414 return self.BufferedReader(self.MockUnseekableIO()) 415 416 def buffered_writer(): 417 return self.BufferedWriter(self.MockUnseekableIO()) 418 419 def buffered_random(): 420 return self.BufferedRandom(self.BytesIO()) 421 422 def buffered_rw_pair(): 423 return self.BufferedRWPair(self.MockUnseekableIO(), 424 self.MockUnseekableIO()) 425 426 def text_reader(): 427 class UnseekableReader(self.MockUnseekableIO): 428 writable = self.BufferedIOBase.writable 429 write = self.BufferedIOBase.write 430 return self.TextIOWrapper(UnseekableReader(), "ascii") 431 432 def text_writer(): 433 class UnseekableWriter(self.MockUnseekableIO): 434 readable = self.BufferedIOBase.readable 435 read = self.BufferedIOBase.read 436 return self.TextIOWrapper(UnseekableWriter(), "ascii") 437 438 tests = ( 439 (pipe_reader, "fr"), (pipe_writer, "fw"), 440 (buffered_reader, "r"), (buffered_writer, "w"), 441 (buffered_random, "rws"), (buffered_rw_pair, "rw"), 442 (text_reader, "r"), (text_writer, "w"), 443 (self.BytesIO, "rws"), (self.StringIO, "rws"), 444 ) 445 for [test, abilities] in tests: 446 if test is pipe_writer and not threading: 447 continue # Skip subtest that uses a background thread 448 with self.subTest(test), test() as obj: 449 readable = "r" in abilities 450 self.assertEqual(obj.readable(), readable) 451 writable = "w" in abilities 452 self.assertEqual(obj.writable(), writable) 453 454 if isinstance(obj, self.TextIOBase): 455 data = "3" 456 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)): 457 data = b"3" 458 else: 459 self.fail("Unknown base class") 460 461 if "f" in abilities: 462 obj.fileno() 463 else: 464 self.assertRaises(OSError, obj.fileno) 465 466 if readable: 467 obj.read(1) 468 obj.read() 469 else: 470 self.assertRaises(OSError, obj.read, 1) 471 self.assertRaises(OSError, obj.read) 472 473 if writable: 474 obj.write(data) 475 else: 476 self.assertRaises(OSError, obj.write, data) 477 478 if sys.platform.startswith("win") and test in ( 479 pipe_reader, pipe_writer): 480 # Pipes seem to appear as seekable on Windows 481 continue 482 seekable = "s" in abilities 483 self.assertEqual(obj.seekable(), seekable) 484 485 if seekable: 486 obj.tell() 487 obj.seek(0) 488 else: 489 self.assertRaises(OSError, obj.tell) 490 self.assertRaises(OSError, obj.seek, 0) 491 492 if writable and seekable: 493 obj.truncate() 494 obj.truncate(0) 495 else: 496 self.assertRaises(OSError, obj.truncate) 497 self.assertRaises(OSError, obj.truncate, 0) 498 499 def test_open_handles_NUL_chars(self): 500 fn_with_NUL = 'foo\0bar' 501 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') 502 503 bytes_fn = bytes(fn_with_NUL, 'ascii') 504 with warnings.catch_warnings(): 505 warnings.simplefilter("ignore", DeprecationWarning) 506 self.assertRaises(ValueError, self.open, bytes_fn, 'w') 507 508 def test_raw_file_io(self): 509 with self.open(support.TESTFN, "wb", buffering=0) as f: 510 self.assertEqual(f.readable(), False) 511 self.assertEqual(f.writable(), True) 512 self.assertEqual(f.seekable(), True) 513 self.write_ops(f) 514 with self.open(support.TESTFN, "rb", buffering=0) as f: 515 self.assertEqual(f.readable(), True) 516 self.assertEqual(f.writable(), False) 517 self.assertEqual(f.seekable(), True) 518 self.read_ops(f) 519 520 def test_buffered_file_io(self): 521 with self.open(support.TESTFN, "wb") as f: 522 self.assertEqual(f.readable(), False) 523 self.assertEqual(f.writable(), True) 524 self.assertEqual(f.seekable(), True) 525 self.write_ops(f) 526 with self.open(support.TESTFN, "rb") as f: 527 self.assertEqual(f.readable(), True) 528 self.assertEqual(f.writable(), False) 529 self.assertEqual(f.seekable(), True) 530 self.read_ops(f, True) 531 532 def test_readline(self): 533 with self.open(support.TESTFN, "wb") as f: 534 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 535 with self.open(support.TESTFN, "rb") as f: 536 self.assertEqual(f.readline(), b"abc\n") 537 self.assertEqual(f.readline(10), b"def\n") 538 self.assertEqual(f.readline(2), b"xy") 539 self.assertEqual(f.readline(4), b"zzy\n") 540 self.assertEqual(f.readline(), b"foo\x00bar\n") 541 self.assertEqual(f.readline(None), b"another line") 542 self.assertRaises(TypeError, f.readline, 5.3) 543 with self.open(support.TESTFN, "r") as f: 544 self.assertRaises(TypeError, f.readline, 5.3) 545 546 def test_raw_bytes_io(self): 547 f = self.BytesIO() 548 self.write_ops(f) 549 data = f.getvalue() 550 self.assertEqual(data, b"hello world\n") 551 f = self.BytesIO(data) 552 self.read_ops(f, True) 553 554 def test_large_file_ops(self): 555 # On Windows and Mac OSX this test comsumes large resources; It takes 556 # a long time to build the >2GB file and takes >2GB of disk space 557 # therefore the resource must be enabled to run this test. 558 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 559 support.requires( 560 'largefile', 561 'test requires %s bytes and a long time to run' % self.LARGE) 562 with self.open(support.TESTFN, "w+b", 0) as f: 563 self.large_file_ops(f) 564 with self.open(support.TESTFN, "w+b") as f: 565 self.large_file_ops(f) 566 567 def test_with_open(self): 568 for bufsize in (0, 1, 100): 569 f = None 570 with self.open(support.TESTFN, "wb", bufsize) as f: 571 f.write(b"xxx") 572 self.assertEqual(f.closed, True) 573 f = None 574 try: 575 with self.open(support.TESTFN, "wb", bufsize) as f: 576 1/0 577 except ZeroDivisionError: 578 self.assertEqual(f.closed, True) 579 else: 580 self.fail("1/0 didn't raise an exception") 581 582 # issue 5008 583 def test_append_mode_tell(self): 584 with self.open(support.TESTFN, "wb") as f: 585 f.write(b"xxx") 586 with self.open(support.TESTFN, "ab", buffering=0) as f: 587 self.assertEqual(f.tell(), 3) 588 with self.open(support.TESTFN, "ab") as f: 589 self.assertEqual(f.tell(), 3) 590 with self.open(support.TESTFN, "a") as f: 591 self.assertGreater(f.tell(), 0) 592 593 def test_destructor(self): 594 record = [] 595 class MyFileIO(self.FileIO): 596 def __del__(self): 597 record.append(1) 598 try: 599 f = super().__del__ 600 except AttributeError: 601 pass 602 else: 603 f() 604 def close(self): 605 record.append(2) 606 super().close() 607 def flush(self): 608 record.append(3) 609 super().flush() 610 with support.check_warnings(('', ResourceWarning)): 611 f = MyFileIO(support.TESTFN, "wb") 612 f.write(b"xxx") 613 del f 614 support.gc_collect() 615 self.assertEqual(record, [1, 2, 3]) 616 with self.open(support.TESTFN, "rb") as f: 617 self.assertEqual(f.read(), b"xxx") 618 619 def _check_base_destructor(self, base): 620 record = [] 621 class MyIO(base): 622 def __init__(self): 623 # This exercises the availability of attributes on object 624 # destruction. 625 # (in the C version, close() is called by the tp_dealloc 626 # function, not by __del__) 627 self.on_del = 1 628 self.on_close = 2 629 self.on_flush = 3 630 def __del__(self): 631 record.append(self.on_del) 632 try: 633 f = super().__del__ 634 except AttributeError: 635 pass 636 else: 637 f() 638 def close(self): 639 record.append(self.on_close) 640 super().close() 641 def flush(self): 642 record.append(self.on_flush) 643 super().flush() 644 f = MyIO() 645 del f 646 support.gc_collect() 647 self.assertEqual(record, [1, 2, 3]) 648 649 def test_IOBase_destructor(self): 650 self._check_base_destructor(self.IOBase) 651 652 def test_RawIOBase_destructor(self): 653 self._check_base_destructor(self.RawIOBase) 654 655 def test_BufferedIOBase_destructor(self): 656 self._check_base_destructor(self.BufferedIOBase) 657 658 def test_TextIOBase_destructor(self): 659 self._check_base_destructor(self.TextIOBase) 660 661 def test_close_flushes(self): 662 with self.open(support.TESTFN, "wb") as f: 663 f.write(b"xxx") 664 with self.open(support.TESTFN, "rb") as f: 665 self.assertEqual(f.read(), b"xxx") 666 667 def test_array_writes(self): 668 a = array.array('i', range(10)) 669 n = len(a.tobytes()) 670 def check(f): 671 with f: 672 self.assertEqual(f.write(a), n) 673 f.writelines((a,)) 674 check(self.BytesIO()) 675 check(self.FileIO(support.TESTFN, "w")) 676 check(self.BufferedWriter(self.MockRawIO())) 677 check(self.BufferedRandom(self.MockRawIO())) 678 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) 679 680 def test_closefd(self): 681 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 682 closefd=False) 683 684 def test_read_closed(self): 685 with self.open(support.TESTFN, "w") as f: 686 f.write("egg\n") 687 with self.open(support.TESTFN, "r") as f: 688 file = self.open(f.fileno(), "r", closefd=False) 689 self.assertEqual(file.read(), "egg\n") 690 file.seek(0) 691 file.close() 692 self.assertRaises(ValueError, file.read) 693 694 def test_no_closefd_with_filename(self): 695 # can't use closefd in combination with a file name 696 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 697 698 def test_closefd_attr(self): 699 with self.open(support.TESTFN, "wb") as f: 700 f.write(b"egg\n") 701 with self.open(support.TESTFN, "r") as f: 702 self.assertEqual(f.buffer.raw.closefd, True) 703 file = self.open(f.fileno(), "r", closefd=False) 704 self.assertEqual(file.buffer.raw.closefd, False) 705 706 def test_garbage_collection(self): 707 # FileIO objects are collected, and collecting them flushes 708 # all data to disk. 709 with support.check_warnings(('', ResourceWarning)): 710 f = self.FileIO(support.TESTFN, "wb") 711 f.write(b"abcxxx") 712 f.f = f 713 wr = weakref.ref(f) 714 del f 715 support.gc_collect() 716 self.assertIsNone(wr(), wr) 717 with self.open(support.TESTFN, "rb") as f: 718 self.assertEqual(f.read(), b"abcxxx") 719 720 def test_unbounded_file(self): 721 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 722 zero = "/dev/zero" 723 if not os.path.exists(zero): 724 self.skipTest("{0} does not exist".format(zero)) 725 if sys.maxsize > 0x7FFFFFFF: 726 self.skipTest("test can only run in a 32-bit address space") 727 if support.real_max_memuse < support._2G: 728 self.skipTest("test requires at least 2GB of memory") 729 with self.open(zero, "rb", buffering=0) as f: 730 self.assertRaises(OverflowError, f.read) 731 with self.open(zero, "rb") as f: 732 self.assertRaises(OverflowError, f.read) 733 with self.open(zero, "r") as f: 734 self.assertRaises(OverflowError, f.read) 735 736 def check_flush_error_on_close(self, *args, **kwargs): 737 # Test that the file is closed despite failed flush 738 # and that flush() is called before file closed. 739 f = self.open(*args, **kwargs) 740 closed = [] 741 def bad_flush(): 742 closed[:] = [f.closed] 743 raise OSError() 744 f.flush = bad_flush 745 self.assertRaises(OSError, f.close) # exception not swallowed 746 self.assertTrue(f.closed) 747 self.assertTrue(closed) # flush() called 748 self.assertFalse(closed[0]) # flush() called before file closed 749 f.flush = lambda: None # break reference loop 750 751 def test_flush_error_on_close(self): 752 # raw file 753 # Issue #5700: io.FileIO calls flush() after file closed 754 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0) 755 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 756 self.check_flush_error_on_close(fd, 'wb', buffering=0) 757 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 758 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 759 os.close(fd) 760 # buffered io 761 self.check_flush_error_on_close(support.TESTFN, 'wb') 762 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 763 self.check_flush_error_on_close(fd, 'wb') 764 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 765 self.check_flush_error_on_close(fd, 'wb', closefd=False) 766 os.close(fd) 767 # text io 768 self.check_flush_error_on_close(support.TESTFN, 'w') 769 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 770 self.check_flush_error_on_close(fd, 'w') 771 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 772 self.check_flush_error_on_close(fd, 'w', closefd=False) 773 os.close(fd) 774 775 def test_multi_close(self): 776 f = self.open(support.TESTFN, "wb", buffering=0) 777 f.close() 778 f.close() 779 f.close() 780 self.assertRaises(ValueError, f.flush) 781 782 def test_RawIOBase_read(self): 783 # Exercise the default RawIOBase.read() implementation (which calls 784 # readinto() internally). 785 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 786 self.assertEqual(rawio.read(2), b"ab") 787 self.assertEqual(rawio.read(2), b"c") 788 self.assertEqual(rawio.read(2), b"d") 789 self.assertEqual(rawio.read(2), None) 790 self.assertEqual(rawio.read(2), b"ef") 791 self.assertEqual(rawio.read(2), b"g") 792 self.assertEqual(rawio.read(2), None) 793 self.assertEqual(rawio.read(2), b"") 794 795 def test_types_have_dict(self): 796 test = ( 797 self.IOBase(), 798 self.RawIOBase(), 799 self.TextIOBase(), 800 self.StringIO(), 801 self.BytesIO() 802 ) 803 for obj in test: 804 self.assertTrue(hasattr(obj, "__dict__")) 805 806 def test_opener(self): 807 with self.open(support.TESTFN, "w") as f: 808 f.write("egg\n") 809 fd = os.open(support.TESTFN, os.O_RDONLY) 810 def opener(path, flags): 811 return fd 812 with self.open("non-existent", "r", opener=opener) as f: 813 self.assertEqual(f.read(), "egg\n") 814 815 def test_bad_opener_negative_1(self): 816 # Issue #27066. 817 def badopener(fname, flags): 818 return -1 819 with self.assertRaises(ValueError) as cm: 820 open('non-existent', 'r', opener=badopener) 821 self.assertEqual(str(cm.exception), 'opener returned -1') 822 823 def test_bad_opener_other_negative(self): 824 # Issue #27066. 825 def badopener(fname, flags): 826 return -2 827 with self.assertRaises(ValueError) as cm: 828 open('non-existent', 'r', opener=badopener) 829 self.assertEqual(str(cm.exception), 'opener returned -2') 830 831 def test_fileio_closefd(self): 832 # Issue #4841 833 with self.open(__file__, 'rb') as f1, \ 834 self.open(__file__, 'rb') as f2: 835 fileio = self.FileIO(f1.fileno(), closefd=False) 836 # .__init__() must not close f1 837 fileio.__init__(f2.fileno(), closefd=False) 838 f1.readline() 839 # .close() must not close f2 840 fileio.close() 841 f2.readline() 842 843 def test_nonbuffered_textio(self): 844 with support.check_no_resource_warning(self): 845 with self.assertRaises(ValueError): 846 self.open(support.TESTFN, 'w', buffering=0) 847 848 def test_invalid_newline(self): 849 with support.check_no_resource_warning(self): 850 with self.assertRaises(ValueError): 851 self.open(support.TESTFN, 'w', newline='invalid') 852 853 def test_buffered_readinto_mixin(self): 854 # Test the implementation provided by BufferedIOBase 855 class Stream(self.BufferedIOBase): 856 def read(self, size): 857 return b"12345" 858 read1 = read 859 stream = Stream() 860 for method in ("readinto", "readinto1"): 861 with self.subTest(method): 862 buffer = byteslike(5) 863 self.assertEqual(getattr(stream, method)(buffer), 5) 864 self.assertEqual(bytes(buffer), b"12345") 865 866 def test_fspath_support(self): 867 class PathLike: 868 def __init__(self, path): 869 self.path = path 870 871 def __fspath__(self): 872 return self.path 873 874 def check_path_succeeds(path): 875 with self.open(path, "w") as f: 876 f.write("egg\n") 877 878 with self.open(path, "r") as f: 879 self.assertEqual(f.read(), "egg\n") 880 881 check_path_succeeds(PathLike(support.TESTFN)) 882 check_path_succeeds(PathLike(support.TESTFN.encode('utf-8'))) 883 884 bad_path = PathLike(TypeError) 885 with self.assertRaises(TypeError): 886 self.open(bad_path, 'w') 887 888 # ensure that refcounting is correct with some error conditions 889 with self.assertRaisesRegex(ValueError, 'read/write/append mode'): 890 self.open(PathLike(support.TESTFN), 'rwxa') 891 892 893 class CIOTest(IOTest): 894 895 def test_IOBase_finalize(self): 896 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 897 # class which inherits IOBase and an object of this class are caught 898 # in a reference cycle and close() is already in the method cache. 899 class MyIO(self.IOBase): 900 def close(self): 901 pass 902 903 # create an instance to populate the method cache 904 MyIO() 905 obj = MyIO() 906 obj.obj = obj 907 wr = weakref.ref(obj) 908 del MyIO 909 del obj 910 support.gc_collect() 911 self.assertIsNone(wr(), wr) 912 913 class PyIOTest(IOTest): 914 pass 915 916 917 @support.cpython_only 918 class APIMismatchTest(unittest.TestCase): 919 920 def test_RawIOBase_io_in_pyio_match(self): 921 """Test that pyio RawIOBase class has all c RawIOBase methods""" 922 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase, 923 ignore=('__weakref__',)) 924 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods') 925 926 def test_RawIOBase_pyio_in_io_match(self): 927 """Test that c RawIOBase class has all pyio RawIOBase methods""" 928 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase) 929 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods') 930 931 932 class CommonBufferedTests: 933 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 934 935 def test_detach(self): 936 raw = self.MockRawIO() 937 buf = self.tp(raw) 938 self.assertIs(buf.detach(), raw) 939 self.assertRaises(ValueError, buf.detach) 940 941 repr(buf) # Should still work 942 943 def test_fileno(self): 944 rawio = self.MockRawIO() 945 bufio = self.tp(rawio) 946 947 self.assertEqual(42, bufio.fileno()) 948 949 def test_invalid_args(self): 950 rawio = self.MockRawIO() 951 bufio = self.tp(rawio) 952 # Invalid whence 953 self.assertRaises(ValueError, bufio.seek, 0, -1) 954 self.assertRaises(ValueError, bufio.seek, 0, 9) 955 956 def test_override_destructor(self): 957 tp = self.tp 958 record = [] 959 class MyBufferedIO(tp): 960 def __del__(self): 961 record.append(1) 962 try: 963 f = super().__del__ 964 except AttributeError: 965 pass 966 else: 967 f() 968 def close(self): 969 record.append(2) 970 super().close() 971 def flush(self): 972 record.append(3) 973 super().flush() 974 rawio = self.MockRawIO() 975 bufio = MyBufferedIO(rawio) 976 del bufio 977 support.gc_collect() 978 self.assertEqual(record, [1, 2, 3]) 979 980 def test_context_manager(self): 981 # Test usability as a context manager 982 rawio = self.MockRawIO() 983 bufio = self.tp(rawio) 984 def _with(): 985 with bufio: 986 pass 987 _with() 988 # bufio should now be closed, and using it a second time should raise 989 # a ValueError. 990 self.assertRaises(ValueError, _with) 991 992 def test_error_through_destructor(self): 993 # Test that the exception state is not modified by a destructor, 994 # even if close() fails. 995 rawio = self.CloseFailureIO() 996 def f(): 997 self.tp(rawio).xyzzy 998 with support.captured_output("stderr") as s: 999 self.assertRaises(AttributeError, f) 1000 s = s.getvalue().strip() 1001 if s: 1002 # The destructor *may* have printed an unraisable error, check it 1003 self.assertEqual(len(s.splitlines()), 1) 1004 self.assertTrue(s.startswith("Exception OSError: "), s) 1005 self.assertTrue(s.endswith(" ignored"), s) 1006 1007 def test_repr(self): 1008 raw = self.MockRawIO() 1009 b = self.tp(raw) 1010 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__) 1011 self.assertEqual(repr(b), "<%s>" % clsname) 1012 raw.name = "dummy" 1013 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) 1014 raw.name = b"dummy" 1015 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname) 1016 1017 def test_flush_error_on_close(self): 1018 # Test that buffered file is closed despite failed flush 1019 # and that flush() is called before file closed. 1020 raw = self.MockRawIO() 1021 closed = [] 1022 def bad_flush(): 1023 closed[:] = [b.closed, raw.closed] 1024 raise OSError() 1025 raw.flush = bad_flush 1026 b = self.tp(raw) 1027 self.assertRaises(OSError, b.close) # exception not swallowed 1028 self.assertTrue(b.closed) 1029 self.assertTrue(raw.closed) 1030 self.assertTrue(closed) # flush() called 1031 self.assertFalse(closed[0]) # flush() called before file closed 1032 self.assertFalse(closed[1]) 1033 raw.flush = lambda: None # break reference loop 1034 1035 def test_close_error_on_close(self): 1036 raw = self.MockRawIO() 1037 def bad_flush(): 1038 raise OSError('flush') 1039 def bad_close(): 1040 raise OSError('close') 1041 raw.close = bad_close 1042 b = self.tp(raw) 1043 b.flush = bad_flush 1044 with self.assertRaises(OSError) as err: # exception not swallowed 1045 b.close() 1046 self.assertEqual(err.exception.args, ('close',)) 1047 self.assertIsInstance(err.exception.__context__, OSError) 1048 self.assertEqual(err.exception.__context__.args, ('flush',)) 1049 self.assertFalse(b.closed) 1050 1051 def test_nonnormalized_close_error_on_close(self): 1052 # Issue #21677 1053 raw = self.MockRawIO() 1054 def bad_flush(): 1055 raise non_existing_flush 1056 def bad_close(): 1057 raise non_existing_close 1058 raw.close = bad_close 1059 b = self.tp(raw) 1060 b.flush = bad_flush 1061 with self.assertRaises(NameError) as err: # exception not swallowed 1062 b.close() 1063 self.assertIn('non_existing_close', str(err.exception)) 1064 self.assertIsInstance(err.exception.__context__, NameError) 1065 self.assertIn('non_existing_flush', str(err.exception.__context__)) 1066 self.assertFalse(b.closed) 1067 1068 def test_multi_close(self): 1069 raw = self.MockRawIO() 1070 b = self.tp(raw) 1071 b.close() 1072 b.close() 1073 b.close() 1074 self.assertRaises(ValueError, b.flush) 1075 1076 def test_unseekable(self): 1077 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1078 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1079 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1080 1081 def test_readonly_attributes(self): 1082 raw = self.MockRawIO() 1083 buf = self.tp(raw) 1084 x = self.MockRawIO() 1085 with self.assertRaises(AttributeError): 1086 buf.raw = x 1087 1088 1089 class SizeofTest: 1090 1091 @support.cpython_only 1092 def test_sizeof(self): 1093 bufsize1 = 4096 1094 bufsize2 = 8192 1095 rawio = self.MockRawIO() 1096 bufio = self.tp(rawio, buffer_size=bufsize1) 1097 size = sys.getsizeof(bufio) - bufsize1 1098 rawio = self.MockRawIO() 1099 bufio = self.tp(rawio, buffer_size=bufsize2) 1100 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 1101 1102 @support.cpython_only 1103 def test_buffer_freeing(self) : 1104 bufsize = 4096 1105 rawio = self.MockRawIO() 1106 bufio = self.tp(rawio, buffer_size=bufsize) 1107 size = sys.getsizeof(bufio) - bufsize 1108 bufio.close() 1109 self.assertEqual(sys.getsizeof(bufio), size) 1110 1111 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 1112 read_mode = "rb" 1113 1114 def test_constructor(self): 1115 rawio = self.MockRawIO([b"abc"]) 1116 bufio = self.tp(rawio) 1117 bufio.__init__(rawio) 1118 bufio.__init__(rawio, buffer_size=1024) 1119 bufio.__init__(rawio, buffer_size=16) 1120 self.assertEqual(b"abc", bufio.read()) 1121 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1122 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1123 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1124 rawio = self.MockRawIO([b"abc"]) 1125 bufio.__init__(rawio) 1126 self.assertEqual(b"abc", bufio.read()) 1127 1128 def test_uninitialized(self): 1129 bufio = self.tp.__new__(self.tp) 1130 del bufio 1131 bufio = self.tp.__new__(self.tp) 1132 self.assertRaisesRegex((ValueError, AttributeError), 1133 'uninitialized|has no attribute', 1134 bufio.read, 0) 1135 bufio.__init__(self.MockRawIO()) 1136 self.assertEqual(bufio.read(0), b'') 1137 1138 def test_read(self): 1139 for arg in (None, 7): 1140 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1141 bufio = self.tp(rawio) 1142 self.assertEqual(b"abcdefg", bufio.read(arg)) 1143 # Invalid args 1144 self.assertRaises(ValueError, bufio.read, -2) 1145 1146 def test_read1(self): 1147 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1148 bufio = self.tp(rawio) 1149 self.assertEqual(b"a", bufio.read(1)) 1150 self.assertEqual(b"b", bufio.read1(1)) 1151 self.assertEqual(rawio._reads, 1) 1152 self.assertEqual(b"c", bufio.read1(100)) 1153 self.assertEqual(rawio._reads, 1) 1154 self.assertEqual(b"d", bufio.read1(100)) 1155 self.assertEqual(rawio._reads, 2) 1156 self.assertEqual(b"efg", bufio.read1(100)) 1157 self.assertEqual(rawio._reads, 3) 1158 self.assertEqual(b"", bufio.read1(100)) 1159 self.assertEqual(rawio._reads, 4) 1160 # Invalid args 1161 self.assertRaises(ValueError, bufio.read1, -1) 1162 1163 def test_readinto(self): 1164 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1165 bufio = self.tp(rawio) 1166 b = bytearray(2) 1167 self.assertEqual(bufio.readinto(b), 2) 1168 self.assertEqual(b, b"ab") 1169 self.assertEqual(bufio.readinto(b), 2) 1170 self.assertEqual(b, b"cd") 1171 self.assertEqual(bufio.readinto(b), 2) 1172 self.assertEqual(b, b"ef") 1173 self.assertEqual(bufio.readinto(b), 1) 1174 self.assertEqual(b, b"gf") 1175 self.assertEqual(bufio.readinto(b), 0) 1176 self.assertEqual(b, b"gf") 1177 rawio = self.MockRawIO((b"abc", None)) 1178 bufio = self.tp(rawio) 1179 self.assertEqual(bufio.readinto(b), 2) 1180 self.assertEqual(b, b"ab") 1181 self.assertEqual(bufio.readinto(b), 1) 1182 self.assertEqual(b, b"cb") 1183 1184 def test_readinto1(self): 1185 buffer_size = 10 1186 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) 1187 bufio = self.tp(rawio, buffer_size=buffer_size) 1188 b = bytearray(2) 1189 self.assertEqual(bufio.peek(3), b'abc') 1190 self.assertEqual(rawio._reads, 1) 1191 self.assertEqual(bufio.readinto1(b), 2) 1192 self.assertEqual(b, b"ab") 1193 self.assertEqual(rawio._reads, 1) 1194 self.assertEqual(bufio.readinto1(b), 1) 1195 self.assertEqual(b[:1], b"c") 1196 self.assertEqual(rawio._reads, 1) 1197 self.assertEqual(bufio.readinto1(b), 2) 1198 self.assertEqual(b, b"de") 1199 self.assertEqual(rawio._reads, 2) 1200 b = bytearray(2*buffer_size) 1201 self.assertEqual(bufio.peek(3), b'fgh') 1202 self.assertEqual(rawio._reads, 3) 1203 self.assertEqual(bufio.readinto1(b), 6) 1204 self.assertEqual(b[:6], b"fghjkl") 1205 self.assertEqual(rawio._reads, 4) 1206 1207 def test_readinto_array(self): 1208 buffer_size = 60 1209 data = b"a" * 26 1210 rawio = self.MockRawIO((data,)) 1211 bufio = self.tp(rawio, buffer_size=buffer_size) 1212 1213 # Create an array with element size > 1 byte 1214 b = array.array('i', b'x' * 32) 1215 assert len(b) != 16 1216 1217 # Read into it. We should get as many *bytes* as we can fit into b 1218 # (which is more than the number of elements) 1219 n = bufio.readinto(b) 1220 self.assertGreater(n, len(b)) 1221 1222 # Check that old contents of b are preserved 1223 bm = memoryview(b).cast('B') 1224 self.assertLess(n, len(bm)) 1225 self.assertEqual(bm[:n], data[:n]) 1226 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1227 1228 def test_readinto1_array(self): 1229 buffer_size = 60 1230 data = b"a" * 26 1231 rawio = self.MockRawIO((data,)) 1232 bufio = self.tp(rawio, buffer_size=buffer_size) 1233 1234 # Create an array with element size > 1 byte 1235 b = array.array('i', b'x' * 32) 1236 assert len(b) != 16 1237 1238 # Read into it. We should get as many *bytes* as we can fit into b 1239 # (which is more than the number of elements) 1240 n = bufio.readinto1(b) 1241 self.assertGreater(n, len(b)) 1242 1243 # Check that old contents of b are preserved 1244 bm = memoryview(b).cast('B') 1245 self.assertLess(n, len(bm)) 1246 self.assertEqual(bm[:n], data[:n]) 1247 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1248 1249 def test_readlines(self): 1250 def bufio(): 1251 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 1252 return self.tp(rawio) 1253 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 1254 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 1255 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 1256 1257 def test_buffering(self): 1258 data = b"abcdefghi" 1259 dlen = len(data) 1260 1261 tests = [ 1262 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 1263 [ 100, [ 3, 3, 3], [ dlen ] ], 1264 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 1265 ] 1266 1267 for bufsize, buf_read_sizes, raw_read_sizes in tests: 1268 rawio = self.MockFileIO(data) 1269 bufio = self.tp(rawio, buffer_size=bufsize) 1270 pos = 0 1271 for nbytes in buf_read_sizes: 1272 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 1273 pos += nbytes 1274 # this is mildly implementation-dependent 1275 self.assertEqual(rawio.read_history, raw_read_sizes) 1276 1277 def test_read_non_blocking(self): 1278 # Inject some None's in there to simulate EWOULDBLOCK 1279 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 1280 bufio = self.tp(rawio) 1281 self.assertEqual(b"abcd", bufio.read(6)) 1282 self.assertEqual(b"e", bufio.read(1)) 1283 self.assertEqual(b"fg", bufio.read()) 1284 self.assertEqual(b"", bufio.peek(1)) 1285 self.assertIsNone(bufio.read()) 1286 self.assertEqual(b"", bufio.read()) 1287 1288 rawio = self.MockRawIO((b"a", None, None)) 1289 self.assertEqual(b"a", rawio.readall()) 1290 self.assertIsNone(rawio.readall()) 1291 1292 def test_read_past_eof(self): 1293 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1294 bufio = self.tp(rawio) 1295 1296 self.assertEqual(b"abcdefg", bufio.read(9000)) 1297 1298 def test_read_all(self): 1299 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1300 bufio = self.tp(rawio) 1301 1302 self.assertEqual(b"abcdefg", bufio.read()) 1303 1304 @unittest.skipUnless(threading, 'Threading required for this test.') 1305 @support.requires_resource('cpu') 1306 def test_threads(self): 1307 try: 1308 # Write out many bytes with exactly the same number of 0's, 1309 # 1's... 255's. This will help us check that concurrent reading 1310 # doesn't duplicate or forget contents. 1311 N = 1000 1312 l = list(range(256)) * N 1313 random.shuffle(l) 1314 s = bytes(bytearray(l)) 1315 with self.open(support.TESTFN, "wb") as f: 1316 f.write(s) 1317 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 1318 bufio = self.tp(raw, 8) 1319 errors = [] 1320 results = [] 1321 def f(): 1322 try: 1323 # Intra-buffer read then buffer-flushing read 1324 for n in cycle([1, 19]): 1325 s = bufio.read(n) 1326 if not s: 1327 break 1328 # list.append() is atomic 1329 results.append(s) 1330 except Exception as e: 1331 errors.append(e) 1332 raise 1333 threads = [threading.Thread(target=f) for x in range(20)] 1334 with support.start_threads(threads): 1335 time.sleep(0.02) # yield 1336 self.assertFalse(errors, 1337 "the following exceptions were caught: %r" % errors) 1338 s = b''.join(results) 1339 for i in range(256): 1340 c = bytes(bytearray([i])) 1341 self.assertEqual(s.count(c), N) 1342 finally: 1343 support.unlink(support.TESTFN) 1344 1345 def test_unseekable(self): 1346 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1347 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1348 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1349 bufio.read(1) 1350 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1351 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1352 1353 def test_misbehaved_io(self): 1354 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1355 bufio = self.tp(rawio) 1356 self.assertRaises(OSError, bufio.seek, 0) 1357 self.assertRaises(OSError, bufio.tell) 1358 1359 def test_no_extraneous_read(self): 1360 # Issue #9550; when the raw IO object has satisfied the read request, 1361 # we should not issue any additional reads, otherwise it may block 1362 # (e.g. socket). 1363 bufsize = 16 1364 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1365 rawio = self.MockRawIO([b"x" * n]) 1366 bufio = self.tp(rawio, bufsize) 1367 self.assertEqual(bufio.read(n), b"x" * n) 1368 # Simple case: one raw read is enough to satisfy the request. 1369 self.assertEqual(rawio._extraneous_reads, 0, 1370 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1371 # A more complex case where two raw reads are needed to satisfy 1372 # the request. 1373 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1374 bufio = self.tp(rawio, bufsize) 1375 self.assertEqual(bufio.read(n), b"x" * n) 1376 self.assertEqual(rawio._extraneous_reads, 0, 1377 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1378 1379 def test_read_on_closed(self): 1380 # Issue #23796 1381 b = io.BufferedReader(io.BytesIO(b"12")) 1382 b.read(1) 1383 b.close() 1384 self.assertRaises(ValueError, b.peek) 1385 self.assertRaises(ValueError, b.read1, 1) 1386 1387 1388 class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1389 tp = io.BufferedReader 1390 1391 def test_constructor(self): 1392 BufferedReaderTest.test_constructor(self) 1393 # The allocation can succeed on 32-bit builds, e.g. with more 1394 # than 2GB RAM and a 64-bit kernel. 1395 if sys.maxsize > 0x7FFFFFFF: 1396 rawio = self.MockRawIO() 1397 bufio = self.tp(rawio) 1398 self.assertRaises((OverflowError, MemoryError, ValueError), 1399 bufio.__init__, rawio, sys.maxsize) 1400 1401 def test_initialization(self): 1402 rawio = self.MockRawIO([b"abc"]) 1403 bufio = self.tp(rawio) 1404 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1405 self.assertRaises(ValueError, bufio.read) 1406 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1407 self.assertRaises(ValueError, bufio.read) 1408 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1409 self.assertRaises(ValueError, bufio.read) 1410 1411 def test_misbehaved_io_read(self): 1412 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1413 bufio = self.tp(rawio) 1414 # _pyio.BufferedReader seems to implement reading different, so that 1415 # checking this is not so easy. 1416 self.assertRaises(OSError, bufio.read, 10) 1417 1418 def test_garbage_collection(self): 1419 # C BufferedReader objects are collected. 1420 # The Python version has __del__, so it ends into gc.garbage instead 1421 with support.check_warnings(('', ResourceWarning)): 1422 rawio = self.FileIO(support.TESTFN, "w+b") 1423 f = self.tp(rawio) 1424 f.f = f 1425 wr = weakref.ref(f) 1426 del f 1427 support.gc_collect() 1428 self.assertIsNone(wr(), wr) 1429 1430 def test_args_error(self): 1431 # Issue #17275 1432 with self.assertRaisesRegex(TypeError, "BufferedReader"): 1433 self.tp(io.BytesIO(), 1024, 1024, 1024) 1434 1435 1436 class PyBufferedReaderTest(BufferedReaderTest): 1437 tp = pyio.BufferedReader 1438 1439 1440 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1441 write_mode = "wb" 1442 1443 def test_constructor(self): 1444 rawio = self.MockRawIO() 1445 bufio = self.tp(rawio) 1446 bufio.__init__(rawio) 1447 bufio.__init__(rawio, buffer_size=1024) 1448 bufio.__init__(rawio, buffer_size=16) 1449 self.assertEqual(3, bufio.write(b"abc")) 1450 bufio.flush() 1451 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1452 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1453 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1454 bufio.__init__(rawio) 1455 self.assertEqual(3, bufio.write(b"ghi")) 1456 bufio.flush() 1457 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1458 1459 def test_uninitialized(self): 1460 bufio = self.tp.__new__(self.tp) 1461 del bufio 1462 bufio = self.tp.__new__(self.tp) 1463 self.assertRaisesRegex((ValueError, AttributeError), 1464 'uninitialized|has no attribute', 1465 bufio.write, b'') 1466 bufio.__init__(self.MockRawIO()) 1467 self.assertEqual(bufio.write(b''), 0) 1468 1469 def test_detach_flush(self): 1470 raw = self.MockRawIO() 1471 buf = self.tp(raw) 1472 buf.write(b"howdy!") 1473 self.assertFalse(raw._write_stack) 1474 buf.detach() 1475 self.assertEqual(raw._write_stack, [b"howdy!"]) 1476 1477 def test_write(self): 1478 # Write to the buffered IO but don't overflow the buffer. 1479 writer = self.MockRawIO() 1480 bufio = self.tp(writer, 8) 1481 bufio.write(b"abc") 1482 self.assertFalse(writer._write_stack) 1483 buffer = bytearray(b"def") 1484 bufio.write(buffer) 1485 buffer[:] = b"***" # Overwrite our copy of the data 1486 bufio.flush() 1487 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1488 1489 def test_write_overflow(self): 1490 writer = self.MockRawIO() 1491 bufio = self.tp(writer, 8) 1492 contents = b"abcdefghijklmnop" 1493 for n in range(0, len(contents), 3): 1494 bufio.write(contents[n:n+3]) 1495 flushed = b"".join(writer._write_stack) 1496 # At least (total - 8) bytes were implicitly flushed, perhaps more 1497 # depending on the implementation. 1498 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1499 1500 def check_writes(self, intermediate_func): 1501 # Lots of writes, test the flushed output is as expected. 1502 contents = bytes(range(256)) * 1000 1503 n = 0 1504 writer = self.MockRawIO() 1505 bufio = self.tp(writer, 13) 1506 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1507 def gen_sizes(): 1508 for size in count(1): 1509 for i in range(15): 1510 yield size 1511 sizes = gen_sizes() 1512 while n < len(contents): 1513 size = min(next(sizes), len(contents) - n) 1514 self.assertEqual(bufio.write(contents[n:n+size]), size) 1515 intermediate_func(bufio) 1516 n += size 1517 bufio.flush() 1518 self.assertEqual(contents, b"".join(writer._write_stack)) 1519 1520 def test_writes(self): 1521 self.check_writes(lambda bufio: None) 1522 1523 def test_writes_and_flushes(self): 1524 self.check_writes(lambda bufio: bufio.flush()) 1525 1526 def test_writes_and_seeks(self): 1527 def _seekabs(bufio): 1528 pos = bufio.tell() 1529 bufio.seek(pos + 1, 0) 1530 bufio.seek(pos - 1, 0) 1531 bufio.seek(pos, 0) 1532 self.check_writes(_seekabs) 1533 def _seekrel(bufio): 1534 pos = bufio.seek(0, 1) 1535 bufio.seek(+1, 1) 1536 bufio.seek(-1, 1) 1537 bufio.seek(pos, 0) 1538 self.check_writes(_seekrel) 1539 1540 def test_writes_and_truncates(self): 1541 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1542 1543 def test_write_non_blocking(self): 1544 raw = self.MockNonBlockWriterIO() 1545 bufio = self.tp(raw, 8) 1546 1547 self.assertEqual(bufio.write(b"abcd"), 4) 1548 self.assertEqual(bufio.write(b"efghi"), 5) 1549 # 1 byte will be written, the rest will be buffered 1550 raw.block_on(b"k") 1551 self.assertEqual(bufio.write(b"jklmn"), 5) 1552 1553 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1554 raw.block_on(b"0") 1555 try: 1556 bufio.write(b"opqrwxyz0123456789") 1557 except self.BlockingIOError as e: 1558 written = e.characters_written 1559 else: 1560 self.fail("BlockingIOError should have been raised") 1561 self.assertEqual(written, 16) 1562 self.assertEqual(raw.pop_written(), 1563 b"abcdefghijklmnopqrwxyz") 1564 1565 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1566 s = raw.pop_written() 1567 # Previously buffered bytes were flushed 1568 self.assertTrue(s.startswith(b"01234567A"), s) 1569 1570 def test_write_and_rewind(self): 1571 raw = io.BytesIO() 1572 bufio = self.tp(raw, 4) 1573 self.assertEqual(bufio.write(b"abcdef"), 6) 1574 self.assertEqual(bufio.tell(), 6) 1575 bufio.seek(0, 0) 1576 self.assertEqual(bufio.write(b"XY"), 2) 1577 bufio.seek(6, 0) 1578 self.assertEqual(raw.getvalue(), b"XYcdef") 1579 self.assertEqual(bufio.write(b"123456"), 6) 1580 bufio.flush() 1581 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1582 1583 def test_flush(self): 1584 writer = self.MockRawIO() 1585 bufio = self.tp(writer, 8) 1586 bufio.write(b"abc") 1587 bufio.flush() 1588 self.assertEqual(b"abc", writer._write_stack[0]) 1589 1590 def test_writelines(self): 1591 l = [b'ab', b'cd', b'ef'] 1592 writer = self.MockRawIO() 1593 bufio = self.tp(writer, 8) 1594 bufio.writelines(l) 1595 bufio.flush() 1596 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1597 1598 def test_writelines_userlist(self): 1599 l = UserList([b'ab', b'cd', b'ef']) 1600 writer = self.MockRawIO() 1601 bufio = self.tp(writer, 8) 1602 bufio.writelines(l) 1603 bufio.flush() 1604 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1605 1606 def test_writelines_error(self): 1607 writer = self.MockRawIO() 1608 bufio = self.tp(writer, 8) 1609 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1610 self.assertRaises(TypeError, bufio.writelines, None) 1611 self.assertRaises(TypeError, bufio.writelines, 'abc') 1612 1613 def test_destructor(self): 1614 writer = self.MockRawIO() 1615 bufio = self.tp(writer, 8) 1616 bufio.write(b"abc") 1617 del bufio 1618 support.gc_collect() 1619 self.assertEqual(b"abc", writer._write_stack[0]) 1620 1621 def test_truncate(self): 1622 # Truncate implicitly flushes the buffer. 1623 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1624 bufio = self.tp(raw, 8) 1625 bufio.write(b"abcdef") 1626 self.assertEqual(bufio.truncate(3), 3) 1627 self.assertEqual(bufio.tell(), 6) 1628 with self.open(support.TESTFN, "rb", buffering=0) as f: 1629 self.assertEqual(f.read(), b"abc") 1630 1631 @unittest.skipUnless(threading, 'Threading required for this test.') 1632 @support.requires_resource('cpu') 1633 def test_threads(self): 1634 try: 1635 # Write out many bytes from many threads and test they were 1636 # all flushed. 1637 N = 1000 1638 contents = bytes(range(256)) * N 1639 sizes = cycle([1, 19]) 1640 n = 0 1641 queue = deque() 1642 while n < len(contents): 1643 size = next(sizes) 1644 queue.append(contents[n:n+size]) 1645 n += size 1646 del contents 1647 # We use a real file object because it allows us to 1648 # exercise situations where the GIL is released before 1649 # writing the buffer to the raw streams. This is in addition 1650 # to concurrency issues due to switching threads in the middle 1651 # of Python code. 1652 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1653 bufio = self.tp(raw, 8) 1654 errors = [] 1655 def f(): 1656 try: 1657 while True: 1658 try: 1659 s = queue.popleft() 1660 except IndexError: 1661 return 1662 bufio.write(s) 1663 except Exception as e: 1664 errors.append(e) 1665 raise 1666 threads = [threading.Thread(target=f) for x in range(20)] 1667 with support.start_threads(threads): 1668 time.sleep(0.02) # yield 1669 self.assertFalse(errors, 1670 "the following exceptions were caught: %r" % errors) 1671 bufio.close() 1672 with self.open(support.TESTFN, "rb") as f: 1673 s = f.read() 1674 for i in range(256): 1675 self.assertEqual(s.count(bytes([i])), N) 1676 finally: 1677 support.unlink(support.TESTFN) 1678 1679 def test_misbehaved_io(self): 1680 rawio = self.MisbehavedRawIO() 1681 bufio = self.tp(rawio, 5) 1682 self.assertRaises(OSError, bufio.seek, 0) 1683 self.assertRaises(OSError, bufio.tell) 1684 self.assertRaises(OSError, bufio.write, b"abcdef") 1685 1686 def test_max_buffer_size_removal(self): 1687 with self.assertRaises(TypeError): 1688 self.tp(self.MockRawIO(), 8, 12) 1689 1690 def test_write_error_on_close(self): 1691 raw = self.MockRawIO() 1692 def bad_write(b): 1693 raise OSError() 1694 raw.write = bad_write 1695 b = self.tp(raw) 1696 b.write(b'spam') 1697 self.assertRaises(OSError, b.close) # exception not swallowed 1698 self.assertTrue(b.closed) 1699 1700 1701 class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1702 tp = io.BufferedWriter 1703 1704 def test_constructor(self): 1705 BufferedWriterTest.test_constructor(self) 1706 # The allocation can succeed on 32-bit builds, e.g. with more 1707 # than 2GB RAM and a 64-bit kernel. 1708 if sys.maxsize > 0x7FFFFFFF: 1709 rawio = self.MockRawIO() 1710 bufio = self.tp(rawio) 1711 self.assertRaises((OverflowError, MemoryError, ValueError), 1712 bufio.__init__, rawio, sys.maxsize) 1713 1714 def test_initialization(self): 1715 rawio = self.MockRawIO() 1716 bufio = self.tp(rawio) 1717 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1718 self.assertRaises(ValueError, bufio.write, b"def") 1719 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1720 self.assertRaises(ValueError, bufio.write, b"def") 1721 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1722 self.assertRaises(ValueError, bufio.write, b"def") 1723 1724 def test_garbage_collection(self): 1725 # C BufferedWriter objects are collected, and collecting them flushes 1726 # all data to disk. 1727 # The Python version has __del__, so it ends into gc.garbage instead 1728 with support.check_warnings(('', ResourceWarning)): 1729 rawio = self.FileIO(support.TESTFN, "w+b") 1730 f = self.tp(rawio) 1731 f.write(b"123xxx") 1732 f.x = f 1733 wr = weakref.ref(f) 1734 del f 1735 support.gc_collect() 1736 self.assertIsNone(wr(), wr) 1737 with self.open(support.TESTFN, "rb") as f: 1738 self.assertEqual(f.read(), b"123xxx") 1739 1740 def test_args_error(self): 1741 # Issue #17275 1742 with self.assertRaisesRegex(TypeError, "BufferedWriter"): 1743 self.tp(io.BytesIO(), 1024, 1024, 1024) 1744 1745 1746 class PyBufferedWriterTest(BufferedWriterTest): 1747 tp = pyio.BufferedWriter 1748 1749 class BufferedRWPairTest(unittest.TestCase): 1750 1751 def test_constructor(self): 1752 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1753 self.assertFalse(pair.closed) 1754 1755 def test_uninitialized(self): 1756 pair = self.tp.__new__(self.tp) 1757 del pair 1758 pair = self.tp.__new__(self.tp) 1759 self.assertRaisesRegex((ValueError, AttributeError), 1760 'uninitialized|has no attribute', 1761 pair.read, 0) 1762 self.assertRaisesRegex((ValueError, AttributeError), 1763 'uninitialized|has no attribute', 1764 pair.write, b'') 1765 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1766 self.assertEqual(pair.read(0), b'') 1767 self.assertEqual(pair.write(b''), 0) 1768 1769 def test_detach(self): 1770 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1771 self.assertRaises(self.UnsupportedOperation, pair.detach) 1772 1773 def test_constructor_max_buffer_size_removal(self): 1774 with self.assertRaises(TypeError): 1775 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1776 1777 def test_constructor_with_not_readable(self): 1778 class NotReadable(MockRawIO): 1779 def readable(self): 1780 return False 1781 1782 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO()) 1783 1784 def test_constructor_with_not_writeable(self): 1785 class NotWriteable(MockRawIO): 1786 def writable(self): 1787 return False 1788 1789 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable()) 1790 1791 def test_read(self): 1792 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1793 1794 self.assertEqual(pair.read(3), b"abc") 1795 self.assertEqual(pair.read(1), b"d") 1796 self.assertEqual(pair.read(), b"ef") 1797 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1798 self.assertEqual(pair.read(None), b"abc") 1799 1800 def test_readlines(self): 1801 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1802 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1803 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1804 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1805 1806 def test_read1(self): 1807 # .read1() is delegated to the underlying reader object, so this test 1808 # can be shallow. 1809 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1810 1811 self.assertEqual(pair.read1(3), b"abc") 1812 1813 def test_readinto(self): 1814 for method in ("readinto", "readinto1"): 1815 with self.subTest(method): 1816 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1817 1818 data = byteslike(b'\0' * 5) 1819 self.assertEqual(getattr(pair, method)(data), 5) 1820 self.assertEqual(bytes(data), b"abcde") 1821 1822 def test_write(self): 1823 w = self.MockRawIO() 1824 pair = self.tp(self.MockRawIO(), w) 1825 1826 pair.write(b"abc") 1827 pair.flush() 1828 buffer = bytearray(b"def") 1829 pair.write(buffer) 1830 buffer[:] = b"***" # Overwrite our copy of the data 1831 pair.flush() 1832 self.assertEqual(w._write_stack, [b"abc", b"def"]) 1833 1834 def test_peek(self): 1835 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1836 1837 self.assertTrue(pair.peek(3).startswith(b"abc")) 1838 self.assertEqual(pair.read(3), b"abc") 1839 1840 def test_readable(self): 1841 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1842 self.assertTrue(pair.readable()) 1843 1844 def test_writeable(self): 1845 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1846 self.assertTrue(pair.writable()) 1847 1848 def test_seekable(self): 1849 # BufferedRWPairs are never seekable, even if their readers and writers 1850 # are. 1851 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1852 self.assertFalse(pair.seekable()) 1853 1854 # .flush() is delegated to the underlying writer object and has been 1855 # tested in the test_write method. 1856 1857 def test_close_and_closed(self): 1858 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1859 self.assertFalse(pair.closed) 1860 pair.close() 1861 self.assertTrue(pair.closed) 1862 1863 def test_reader_close_error_on_close(self): 1864 def reader_close(): 1865 reader_non_existing 1866 reader = self.MockRawIO() 1867 reader.close = reader_close 1868 writer = self.MockRawIO() 1869 pair = self.tp(reader, writer) 1870 with self.assertRaises(NameError) as err: 1871 pair.close() 1872 self.assertIn('reader_non_existing', str(err.exception)) 1873 self.assertTrue(pair.closed) 1874 self.assertFalse(reader.closed) 1875 self.assertTrue(writer.closed) 1876 1877 def test_writer_close_error_on_close(self): 1878 def writer_close(): 1879 writer_non_existing 1880 reader = self.MockRawIO() 1881 writer = self.MockRawIO() 1882 writer.close = writer_close 1883 pair = self.tp(reader, writer) 1884 with self.assertRaises(NameError) as err: 1885 pair.close() 1886 self.assertIn('writer_non_existing', str(err.exception)) 1887 self.assertFalse(pair.closed) 1888 self.assertTrue(reader.closed) 1889 self.assertFalse(writer.closed) 1890 1891 def test_reader_writer_close_error_on_close(self): 1892 def reader_close(): 1893 reader_non_existing 1894 def writer_close(): 1895 writer_non_existing 1896 reader = self.MockRawIO() 1897 reader.close = reader_close 1898 writer = self.MockRawIO() 1899 writer.close = writer_close 1900 pair = self.tp(reader, writer) 1901 with self.assertRaises(NameError) as err: 1902 pair.close() 1903 self.assertIn('reader_non_existing', str(err.exception)) 1904 self.assertIsInstance(err.exception.__context__, NameError) 1905 self.assertIn('writer_non_existing', str(err.exception.__context__)) 1906 self.assertFalse(pair.closed) 1907 self.assertFalse(reader.closed) 1908 self.assertFalse(writer.closed) 1909 1910 def test_isatty(self): 1911 class SelectableIsAtty(MockRawIO): 1912 def __init__(self, isatty): 1913 MockRawIO.__init__(self) 1914 self._isatty = isatty 1915 1916 def isatty(self): 1917 return self._isatty 1918 1919 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 1920 self.assertFalse(pair.isatty()) 1921 1922 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 1923 self.assertTrue(pair.isatty()) 1924 1925 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 1926 self.assertTrue(pair.isatty()) 1927 1928 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 1929 self.assertTrue(pair.isatty()) 1930 1931 def test_weakref_clearing(self): 1932 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 1933 ref = weakref.ref(brw) 1934 brw = None 1935 ref = None # Shouldn't segfault. 1936 1937 class CBufferedRWPairTest(BufferedRWPairTest): 1938 tp = io.BufferedRWPair 1939 1940 class PyBufferedRWPairTest(BufferedRWPairTest): 1941 tp = pyio.BufferedRWPair 1942 1943 1944 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 1945 read_mode = "rb+" 1946 write_mode = "wb+" 1947 1948 def test_constructor(self): 1949 BufferedReaderTest.test_constructor(self) 1950 BufferedWriterTest.test_constructor(self) 1951 1952 def test_uninitialized(self): 1953 BufferedReaderTest.test_uninitialized(self) 1954 BufferedWriterTest.test_uninitialized(self) 1955 1956 def test_read_and_write(self): 1957 raw = self.MockRawIO((b"asdf", b"ghjk")) 1958 rw = self.tp(raw, 8) 1959 1960 self.assertEqual(b"as", rw.read(2)) 1961 rw.write(b"ddd") 1962 rw.write(b"eee") 1963 self.assertFalse(raw._write_stack) # Buffer writes 1964 self.assertEqual(b"ghjk", rw.read()) 1965 self.assertEqual(b"dddeee", raw._write_stack[0]) 1966 1967 def test_seek_and_tell(self): 1968 raw = self.BytesIO(b"asdfghjkl") 1969 rw = self.tp(raw) 1970 1971 self.assertEqual(b"as", rw.read(2)) 1972 self.assertEqual(2, rw.tell()) 1973 rw.seek(0, 0) 1974 self.assertEqual(b"asdf", rw.read(4)) 1975 1976 rw.write(b"123f") 1977 rw.seek(0, 0) 1978 self.assertEqual(b"asdf123fl", rw.read()) 1979 self.assertEqual(9, rw.tell()) 1980 rw.seek(-4, 2) 1981 self.assertEqual(5, rw.tell()) 1982 rw.seek(2, 1) 1983 self.assertEqual(7, rw.tell()) 1984 self.assertEqual(b"fl", rw.read(11)) 1985 rw.flush() 1986 self.assertEqual(b"asdf123fl", raw.getvalue()) 1987 1988 self.assertRaises(TypeError, rw.seek, 0.0) 1989 1990 def check_flush_and_read(self, read_func): 1991 raw = self.BytesIO(b"abcdefghi") 1992 bufio = self.tp(raw) 1993 1994 self.assertEqual(b"ab", read_func(bufio, 2)) 1995 bufio.write(b"12") 1996 self.assertEqual(b"ef", read_func(bufio, 2)) 1997 self.assertEqual(6, bufio.tell()) 1998 bufio.flush() 1999 self.assertEqual(6, bufio.tell()) 2000 self.assertEqual(b"ghi", read_func(bufio)) 2001 raw.seek(0, 0) 2002 raw.write(b"XYZ") 2003 # flush() resets the read buffer 2004 bufio.flush() 2005 bufio.seek(0, 0) 2006 self.assertEqual(b"XYZ", read_func(bufio, 3)) 2007 2008 def test_flush_and_read(self): 2009 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 2010 2011 def test_flush_and_readinto(self): 2012 def _readinto(bufio, n=-1): 2013 b = bytearray(n if n >= 0 else 9999) 2014 n = bufio.readinto(b) 2015 return bytes(b[:n]) 2016 self.check_flush_and_read(_readinto) 2017 2018 def test_flush_and_peek(self): 2019 def _peek(bufio, n=-1): 2020 # This relies on the fact that the buffer can contain the whole 2021 # raw stream, otherwise peek() can return less. 2022 b = bufio.peek(n) 2023 if n != -1: 2024 b = b[:n] 2025 bufio.seek(len(b), 1) 2026 return b 2027 self.check_flush_and_read(_peek) 2028 2029 def test_flush_and_write(self): 2030 raw = self.BytesIO(b"abcdefghi") 2031 bufio = self.tp(raw) 2032 2033 bufio.write(b"123") 2034 bufio.flush() 2035 bufio.write(b"45") 2036 bufio.flush() 2037 bufio.seek(0, 0) 2038 self.assertEqual(b"12345fghi", raw.getvalue()) 2039 self.assertEqual(b"12345fghi", bufio.read()) 2040 2041 def test_threads(self): 2042 BufferedReaderTest.test_threads(self) 2043 BufferedWriterTest.test_threads(self) 2044 2045 def test_writes_and_peek(self): 2046 def _peek(bufio): 2047 bufio.peek(1) 2048 self.check_writes(_peek) 2049 def _peek(bufio): 2050 pos = bufio.tell() 2051 bufio.seek(-1, 1) 2052 bufio.peek(1) 2053 bufio.seek(pos, 0) 2054 self.check_writes(_peek) 2055 2056 def test_writes_and_reads(self): 2057 def _read(bufio): 2058 bufio.seek(-1, 1) 2059 bufio.read(1) 2060 self.check_writes(_read) 2061 2062 def test_writes_and_read1s(self): 2063 def _read1(bufio): 2064 bufio.seek(-1, 1) 2065 bufio.read1(1) 2066 self.check_writes(_read1) 2067 2068 def test_writes_and_readintos(self): 2069 def _read(bufio): 2070 bufio.seek(-1, 1) 2071 bufio.readinto(bytearray(1)) 2072 self.check_writes(_read) 2073 2074 def test_write_after_readahead(self): 2075 # Issue #6629: writing after the buffer was filled by readahead should 2076 # first rewind the raw stream. 2077 for overwrite_size in [1, 5]: 2078 raw = self.BytesIO(b"A" * 10) 2079 bufio = self.tp(raw, 4) 2080 # Trigger readahead 2081 self.assertEqual(bufio.read(1), b"A") 2082 self.assertEqual(bufio.tell(), 1) 2083 # Overwriting should rewind the raw stream if it needs so 2084 bufio.write(b"B" * overwrite_size) 2085 self.assertEqual(bufio.tell(), overwrite_size + 1) 2086 # If the write size was smaller than the buffer size, flush() and 2087 # check that rewind happens. 2088 bufio.flush() 2089 self.assertEqual(bufio.tell(), overwrite_size + 1) 2090 s = raw.getvalue() 2091 self.assertEqual(s, 2092 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 2093 2094 def test_write_rewind_write(self): 2095 # Various combinations of reading / writing / seeking backwards / writing again 2096 def mutate(bufio, pos1, pos2): 2097 assert pos2 >= pos1 2098 # Fill the buffer 2099 bufio.seek(pos1) 2100 bufio.read(pos2 - pos1) 2101 bufio.write(b'\x02') 2102 # This writes earlier than the previous write, but still inside 2103 # the buffer. 2104 bufio.seek(pos1) 2105 bufio.write(b'\x01') 2106 2107 b = b"\x80\x81\x82\x83\x84" 2108 for i in range(0, len(b)): 2109 for j in range(i, len(b)): 2110 raw = self.BytesIO(b) 2111 bufio = self.tp(raw, 100) 2112 mutate(bufio, i, j) 2113 bufio.flush() 2114 expected = bytearray(b) 2115 expected[j] = 2 2116 expected[i] = 1 2117 self.assertEqual(raw.getvalue(), expected, 2118 "failed result for i=%d, j=%d" % (i, j)) 2119 2120 def test_truncate_after_read_or_write(self): 2121 raw = self.BytesIO(b"A" * 10) 2122 bufio = self.tp(raw, 100) 2123 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 2124 self.assertEqual(bufio.truncate(), 2) 2125 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 2126 self.assertEqual(bufio.truncate(), 4) 2127 2128 def test_misbehaved_io(self): 2129 BufferedReaderTest.test_misbehaved_io(self) 2130 BufferedWriterTest.test_misbehaved_io(self) 2131 2132 def test_interleaved_read_write(self): 2133 # Test for issue #12213 2134 with self.BytesIO(b'abcdefgh') as raw: 2135 with self.tp(raw, 100) as f: 2136 f.write(b"1") 2137 self.assertEqual(f.read(1), b'b') 2138 f.write(b'2') 2139 self.assertEqual(f.read1(1), b'd') 2140 f.write(b'3') 2141 buf = bytearray(1) 2142 f.readinto(buf) 2143 self.assertEqual(buf, b'f') 2144 f.write(b'4') 2145 self.assertEqual(f.peek(1), b'h') 2146 f.flush() 2147 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 2148 2149 with self.BytesIO(b'abc') as raw: 2150 with self.tp(raw, 100) as f: 2151 self.assertEqual(f.read(1), b'a') 2152 f.write(b"2") 2153 self.assertEqual(f.read(1), b'c') 2154 f.flush() 2155 self.assertEqual(raw.getvalue(), b'a2c') 2156 2157 def test_interleaved_readline_write(self): 2158 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 2159 with self.tp(raw) as f: 2160 f.write(b'1') 2161 self.assertEqual(f.readline(), b'b\n') 2162 f.write(b'2') 2163 self.assertEqual(f.readline(), b'def\n') 2164 f.write(b'3') 2165 self.assertEqual(f.readline(), b'\n') 2166 f.flush() 2167 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 2168 2169 # You can't construct a BufferedRandom over a non-seekable stream. 2170 test_unseekable = None 2171 2172 2173 class CBufferedRandomTest(BufferedRandomTest, SizeofTest): 2174 tp = io.BufferedRandom 2175 2176 def test_constructor(self): 2177 BufferedRandomTest.test_constructor(self) 2178 # The allocation can succeed on 32-bit builds, e.g. with more 2179 # than 2GB RAM and a 64-bit kernel. 2180 if sys.maxsize > 0x7FFFFFFF: 2181 rawio = self.MockRawIO() 2182 bufio = self.tp(rawio) 2183 self.assertRaises((OverflowError, MemoryError, ValueError), 2184 bufio.__init__, rawio, sys.maxsize) 2185 2186 def test_garbage_collection(self): 2187 CBufferedReaderTest.test_garbage_collection(self) 2188 CBufferedWriterTest.test_garbage_collection(self) 2189 2190 def test_args_error(self): 2191 # Issue #17275 2192 with self.assertRaisesRegex(TypeError, "BufferedRandom"): 2193 self.tp(io.BytesIO(), 1024, 1024, 1024) 2194 2195 2196 class PyBufferedRandomTest(BufferedRandomTest): 2197 tp = pyio.BufferedRandom 2198 2199 2200 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these 2201 # properties: 2202 # - A single output character can correspond to many bytes of input. 2203 # - The number of input bytes to complete the character can be 2204 # undetermined until the last input byte is received. 2205 # - The number of input bytes can vary depending on previous input. 2206 # - A single input byte can correspond to many characters of output. 2207 # - The number of output characters can be undetermined until the 2208 # last input byte is received. 2209 # - The number of output characters can vary depending on previous input. 2210 2211 class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 2212 """ 2213 For testing seek/tell behavior with a stateful, buffering decoder. 2214 2215 Input is a sequence of words. Words may be fixed-length (length set 2216 by input) or variable-length (period-terminated). In variable-length 2217 mode, extra periods are ignored. Possible words are: 2218 - 'i' followed by a number sets the input length, I (maximum 99). 2219 When I is set to 0, words are space-terminated. 2220 - 'o' followed by a number sets the output length, O (maximum 99). 2221 - Any other word is converted into a word followed by a period on 2222 the output. The output word consists of the input word truncated 2223 or padded out with hyphens to make its length equal to O. If O 2224 is 0, the word is output verbatim without truncating or padding. 2225 I and O are initially set to 1. When I changes, any buffered input is 2226 re-scanned according to the new I. EOF also terminates the last word. 2227 """ 2228 2229 def __init__(self, errors='strict'): 2230 codecs.IncrementalDecoder.__init__(self, errors) 2231 self.reset() 2232 2233 def __repr__(self): 2234 return '<SID %x>' % id(self) 2235 2236 def reset(self): 2237 self.i = 1 2238 self.o = 1 2239 self.buffer = bytearray() 2240 2241 def getstate(self): 2242 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 2243 return bytes(self.buffer), i*100 + o 2244 2245 def setstate(self, state): 2246 buffer, io = state 2247 self.buffer = bytearray(buffer) 2248 i, o = divmod(io, 100) 2249 self.i, self.o = i ^ 1, o ^ 1 2250 2251 def decode(self, input, final=False): 2252 output = '' 2253 for b in input: 2254 if self.i == 0: # variable-length, terminated with period 2255 if b == ord('.'): 2256 if self.buffer: 2257 output += self.process_word() 2258 else: 2259 self.buffer.append(b) 2260 else: # fixed-length, terminate after self.i bytes 2261 self.buffer.append(b) 2262 if len(self.buffer) == self.i: 2263 output += self.process_word() 2264 if final and self.buffer: # EOF terminates the last word 2265 output += self.process_word() 2266 return output 2267 2268 def process_word(self): 2269 output = '' 2270 if self.buffer[0] == ord('i'): 2271 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 2272 elif self.buffer[0] == ord('o'): 2273 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 2274 else: 2275 output = self.buffer.decode('ascii') 2276 if len(output) < self.o: 2277 output += '-'*self.o # pad out with hyphens 2278 if self.o: 2279 output = output[:self.o] # truncate to output length 2280 output += '.' 2281 self.buffer = bytearray() 2282 return output 2283 2284 codecEnabled = False 2285 2286 @classmethod 2287 def lookupTestDecoder(cls, name): 2288 if cls.codecEnabled and name == 'test_decoder': 2289 latin1 = codecs.lookup('latin-1') 2290 return codecs.CodecInfo( 2291 name='test_decoder', encode=latin1.encode, decode=None, 2292 incrementalencoder=None, 2293 streamreader=None, streamwriter=None, 2294 incrementaldecoder=cls) 2295 2296 # Register the previous decoder for testing. 2297 # Disabled by default, tests will enable it. 2298 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) 2299 2300 2301 class StatefulIncrementalDecoderTest(unittest.TestCase): 2302 """ 2303 Make sure the StatefulIncrementalDecoder actually works. 2304 """ 2305 2306 test_cases = [ 2307 # I=1, O=1 (fixed-length input == fixed-length output) 2308 (b'abcd', False, 'a.b.c.d.'), 2309 # I=0, O=0 (variable-length input, variable-length output) 2310 (b'oiabcd', True, 'abcd.'), 2311 # I=0, O=0 (should ignore extra periods) 2312 (b'oi...abcd...', True, 'abcd.'), 2313 # I=0, O=6 (variable-length input, fixed-length output) 2314 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 2315 # I=2, O=6 (fixed-length input < fixed-length output) 2316 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 2317 # I=6, O=3 (fixed-length input > fixed-length output) 2318 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 2319 # I=0, then 3; O=29, then 15 (with longer output) 2320 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 2321 'a----------------------------.' + 2322 'b----------------------------.' + 2323 'cde--------------------------.' + 2324 'abcdefghijabcde.' + 2325 'a.b------------.' + 2326 '.c.------------.' + 2327 'd.e------------.' + 2328 'k--------------.' + 2329 'l--------------.' + 2330 'm--------------.') 2331 ] 2332 2333 def test_decoder(self): 2334 # Try a few one-shot test cases. 2335 for input, eof, output in self.test_cases: 2336 d = StatefulIncrementalDecoder() 2337 self.assertEqual(d.decode(input, eof), output) 2338 2339 # Also test an unfinished decode, followed by forcing EOF. 2340 d = StatefulIncrementalDecoder() 2341 self.assertEqual(d.decode(b'oiabcd'), '') 2342 self.assertEqual(d.decode(b'', 1), 'abcd.') 2343 2344 class TextIOWrapperTest(unittest.TestCase): 2345 2346 def setUp(self): 2347 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 2348 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2349 support.unlink(support.TESTFN) 2350 2351 def tearDown(self): 2352 support.unlink(support.TESTFN) 2353 2354 def test_constructor(self): 2355 r = self.BytesIO(b"\xc3\xa9\n\n") 2356 b = self.BufferedReader(r, 1000) 2357 t = self.TextIOWrapper(b) 2358 t.__init__(b, encoding="latin-1", newline="\r\n") 2359 self.assertEqual(t.encoding, "latin-1") 2360 self.assertEqual(t.line_buffering, False) 2361 t.__init__(b, encoding="utf-8", line_buffering=True) 2362 self.assertEqual(t.encoding, "utf-8") 2363 self.assertEqual(t.line_buffering, True) 2364 self.assertEqual("\xe9\n", t.readline()) 2365 self.assertRaises(TypeError, t.__init__, b, newline=42) 2366 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2367 2368 def test_uninitialized(self): 2369 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2370 del t 2371 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2372 self.assertRaises(Exception, repr, t) 2373 self.assertRaisesRegex((ValueError, AttributeError), 2374 'uninitialized|has no attribute', 2375 t.read, 0) 2376 t.__init__(self.MockRawIO()) 2377 self.assertEqual(t.read(0), '') 2378 2379 def test_non_text_encoding_codecs_are_rejected(self): 2380 # Ensure the constructor complains if passed a codec that isn't 2381 # marked as a text encoding 2382 # http://bugs.python.org/issue20404 2383 r = self.BytesIO() 2384 b = self.BufferedWriter(r) 2385 with self.assertRaisesRegex(LookupError, "is not a text encoding"): 2386 self.TextIOWrapper(b, encoding="hex") 2387 2388 def test_detach(self): 2389 r = self.BytesIO() 2390 b = self.BufferedWriter(r) 2391 t = self.TextIOWrapper(b) 2392 self.assertIs(t.detach(), b) 2393 2394 t = self.TextIOWrapper(b, encoding="ascii") 2395 t.write("howdy") 2396 self.assertFalse(r.getvalue()) 2397 t.detach() 2398 self.assertEqual(r.getvalue(), b"howdy") 2399 self.assertRaises(ValueError, t.detach) 2400 2401 # Operations independent of the detached stream should still work 2402 repr(t) 2403 self.assertEqual(t.encoding, "ascii") 2404 self.assertEqual(t.errors, "strict") 2405 self.assertFalse(t.line_buffering) 2406 2407 def test_repr(self): 2408 raw = self.BytesIO("hello".encode("utf-8")) 2409 b = self.BufferedReader(raw) 2410 t = self.TextIOWrapper(b, encoding="utf-8") 2411 modname = self.TextIOWrapper.__module__ 2412 self.assertEqual(repr(t), 2413 "<%s.TextIOWrapper encoding='utf-8'>" % modname) 2414 raw.name = "dummy" 2415 self.assertEqual(repr(t), 2416 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2417 t.mode = "r" 2418 self.assertEqual(repr(t), 2419 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) 2420 raw.name = b"dummy" 2421 self.assertEqual(repr(t), 2422 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) 2423 2424 t.buffer.detach() 2425 repr(t) # Should not raise an exception 2426 2427 def test_line_buffering(self): 2428 r = self.BytesIO() 2429 b = self.BufferedWriter(r, 1000) 2430 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 2431 t.write("X") 2432 self.assertEqual(r.getvalue(), b"") # No flush happened 2433 t.write("Y\nZ") 2434 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2435 t.write("A\rB") 2436 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2437 2438 def test_default_encoding(self): 2439 old_environ = dict(os.environ) 2440 try: 2441 # try to get a user preferred encoding different than the current 2442 # locale encoding to check that TextIOWrapper() uses the current 2443 # locale encoding and not the user preferred encoding 2444 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'): 2445 if key in os.environ: 2446 del os.environ[key] 2447 2448 current_locale_encoding = locale.getpreferredencoding(False) 2449 b = self.BytesIO() 2450 t = self.TextIOWrapper(b) 2451 self.assertEqual(t.encoding, current_locale_encoding) 2452 finally: 2453 os.environ.clear() 2454 os.environ.update(old_environ) 2455 2456 @support.cpython_only 2457 def test_device_encoding(self): 2458 # Issue 15989 2459 import _testcapi 2460 b = self.BytesIO() 2461 b.fileno = lambda: _testcapi.INT_MAX + 1 2462 self.assertRaises(OverflowError, self.TextIOWrapper, b) 2463 b.fileno = lambda: _testcapi.UINT_MAX + 1 2464 self.assertRaises(OverflowError, self.TextIOWrapper, b) 2465 2466 def test_encoding(self): 2467 # Check the encoding attribute is always set, and valid 2468 b = self.BytesIO() 2469 t = self.TextIOWrapper(b, encoding="utf-8") 2470 self.assertEqual(t.encoding, "utf-8") 2471 t = self.TextIOWrapper(b) 2472 self.assertIsNotNone(t.encoding) 2473 codecs.lookup(t.encoding) 2474 2475 def test_encoding_errors_reading(self): 2476 # (1) default 2477 b = self.BytesIO(b"abc\n\xff\n") 2478 t = self.TextIOWrapper(b, encoding="ascii") 2479 self.assertRaises(UnicodeError, t.read) 2480 # (2) explicit strict 2481 b = self.BytesIO(b"abc\n\xff\n") 2482 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2483 self.assertRaises(UnicodeError, t.read) 2484 # (3) ignore 2485 b = self.BytesIO(b"abc\n\xff\n") 2486 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2487 self.assertEqual(t.read(), "abc\n\n") 2488 # (4) replace 2489 b = self.BytesIO(b"abc\n\xff\n") 2490 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2491 self.assertEqual(t.read(), "abc\n\ufffd\n") 2492 2493 def test_encoding_errors_writing(self): 2494 # (1) default 2495 b = self.BytesIO() 2496 t = self.TextIOWrapper(b, encoding="ascii") 2497 self.assertRaises(UnicodeError, t.write, "\xff") 2498 # (2) explicit strict 2499 b = self.BytesIO() 2500 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2501 self.assertRaises(UnicodeError, t.write, "\xff") 2502 # (3) ignore 2503 b = self.BytesIO() 2504 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2505 newline="\n") 2506 t.write("abc\xffdef\n") 2507 t.flush() 2508 self.assertEqual(b.getvalue(), b"abcdef\n") 2509 # (4) replace 2510 b = self.BytesIO() 2511 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2512 newline="\n") 2513 t.write("abc\xffdef\n") 2514 t.flush() 2515 self.assertEqual(b.getvalue(), b"abc?def\n") 2516 2517 def test_newlines(self): 2518 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2519 2520 tests = [ 2521 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2522 [ '', input_lines ], 2523 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2524 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2525 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2526 ] 2527 encodings = ( 2528 'utf-8', 'latin-1', 2529 'utf-16', 'utf-16-le', 'utf-16-be', 2530 'utf-32', 'utf-32-le', 'utf-32-be', 2531 ) 2532 2533 # Try a range of buffer sizes to test the case where \r is the last 2534 # character in TextIOWrapper._pending_line. 2535 for encoding in encodings: 2536 # XXX: str.encode() should return bytes 2537 data = bytes(''.join(input_lines).encode(encoding)) 2538 for do_reads in (False, True): 2539 for bufsize in range(1, 10): 2540 for newline, exp_lines in tests: 2541 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2542 textio = self.TextIOWrapper(bufio, newline=newline, 2543 encoding=encoding) 2544 if do_reads: 2545 got_lines = [] 2546 while True: 2547 c2 = textio.read(2) 2548 if c2 == '': 2549 break 2550 self.assertEqual(len(c2), 2) 2551 got_lines.append(c2 + textio.readline()) 2552 else: 2553 got_lines = list(textio) 2554 2555 for got_line, exp_line in zip(got_lines, exp_lines): 2556 self.assertEqual(got_line, exp_line) 2557 self.assertEqual(len(got_lines), len(exp_lines)) 2558 2559 def test_newlines_input(self): 2560 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2561 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2562 for newline, expected in [ 2563 (None, normalized.decode("ascii").splitlines(keepends=True)), 2564 ("", testdata.decode("ascii").splitlines(keepends=True)), 2565 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2566 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2567 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2568 ]: 2569 buf = self.BytesIO(testdata) 2570 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2571 self.assertEqual(txt.readlines(), expected) 2572 txt.seek(0) 2573 self.assertEqual(txt.read(), "".join(expected)) 2574 2575 def test_newlines_output(self): 2576 testdict = { 2577 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2578 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2579 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2580 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2581 } 2582 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2583 for newline, expected in tests: 2584 buf = self.BytesIO() 2585 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2586 txt.write("AAA\nB") 2587 txt.write("BB\nCCC\n") 2588 txt.write("X\rY\r\nZ") 2589 txt.flush() 2590 self.assertEqual(buf.closed, False) 2591 self.assertEqual(buf.getvalue(), expected) 2592 2593 def test_destructor(self): 2594 l = [] 2595 base = self.BytesIO 2596 class MyBytesIO(base): 2597 def close(self): 2598 l.append(self.getvalue()) 2599 base.close(self) 2600 b = MyBytesIO() 2601 t = self.TextIOWrapper(b, encoding="ascii") 2602 t.write("abc") 2603 del t 2604 support.gc_collect() 2605 self.assertEqual([b"abc"], l) 2606 2607 def test_override_destructor(self): 2608 record = [] 2609 class MyTextIO(self.TextIOWrapper): 2610 def __del__(self): 2611 record.append(1) 2612 try: 2613 f = super().__del__ 2614 except AttributeError: 2615 pass 2616 else: 2617 f() 2618 def close(self): 2619 record.append(2) 2620 super().close() 2621 def flush(self): 2622 record.append(3) 2623 super().flush() 2624 b = self.BytesIO() 2625 t = MyTextIO(b, encoding="ascii") 2626 del t 2627 support.gc_collect() 2628 self.assertEqual(record, [1, 2, 3]) 2629 2630 def test_error_through_destructor(self): 2631 # Test that the exception state is not modified by a destructor, 2632 # even if close() fails. 2633 rawio = self.CloseFailureIO() 2634 def f(): 2635 self.TextIOWrapper(rawio).xyzzy 2636 with support.captured_output("stderr") as s: 2637 self.assertRaises(AttributeError, f) 2638 s = s.getvalue().strip() 2639 if s: 2640 # The destructor *may* have printed an unraisable error, check it 2641 self.assertEqual(len(s.splitlines()), 1) 2642 self.assertTrue(s.startswith("Exception OSError: "), s) 2643 self.assertTrue(s.endswith(" ignored"), s) 2644 2645 # Systematic tests of the text I/O API 2646 2647 def test_basic_io(self): 2648 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2649 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": 2650 f = self.open(support.TESTFN, "w+", encoding=enc) 2651 f._CHUNK_SIZE = chunksize 2652 self.assertEqual(f.write("abc"), 3) 2653 f.close() 2654 f = self.open(support.TESTFN, "r+", encoding=enc) 2655 f._CHUNK_SIZE = chunksize 2656 self.assertEqual(f.tell(), 0) 2657 self.assertEqual(f.read(), "abc") 2658 cookie = f.tell() 2659 self.assertEqual(f.seek(0), 0) 2660 self.assertEqual(f.read(None), "abc") 2661 f.seek(0) 2662 self.assertEqual(f.read(2), "ab") 2663 self.assertEqual(f.read(1), "c") 2664 self.assertEqual(f.read(1), "") 2665 self.assertEqual(f.read(), "") 2666 self.assertEqual(f.tell(), cookie) 2667 self.assertEqual(f.seek(0), 0) 2668 self.assertEqual(f.seek(0, 2), cookie) 2669 self.assertEqual(f.write("def"), 3) 2670 self.assertEqual(f.seek(cookie), cookie) 2671 self.assertEqual(f.read(), "def") 2672 if enc.startswith("utf"): 2673 self.multi_line_test(f, enc) 2674 f.close() 2675 2676 def multi_line_test(self, f, enc): 2677 f.seek(0) 2678 f.truncate() 2679 sample = "s\xff\u0fff\uffff" 2680 wlines = [] 2681 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2682 chars = [] 2683 for i in range(size): 2684 chars.append(sample[i % len(sample)]) 2685 line = "".join(chars) + "\n" 2686 wlines.append((f.tell(), line)) 2687 f.write(line) 2688 f.seek(0) 2689 rlines = [] 2690 while True: 2691 pos = f.tell() 2692 line = f.readline() 2693 if not line: 2694 break 2695 rlines.append((pos, line)) 2696 self.assertEqual(rlines, wlines) 2697 2698 def test_telling(self): 2699 f = self.open(support.TESTFN, "w+", encoding="utf-8") 2700 p0 = f.tell() 2701 f.write("\xff\n") 2702 p1 = f.tell() 2703 f.write("\xff\n") 2704 p2 = f.tell() 2705 f.seek(0) 2706 self.assertEqual(f.tell(), p0) 2707 self.assertEqual(f.readline(), "\xff\n") 2708 self.assertEqual(f.tell(), p1) 2709 self.assertEqual(f.readline(), "\xff\n") 2710 self.assertEqual(f.tell(), p2) 2711 f.seek(0) 2712 for line in f: 2713 self.assertEqual(line, "\xff\n") 2714 self.assertRaises(OSError, f.tell) 2715 self.assertEqual(f.tell(), p2) 2716 f.close() 2717 2718 def test_seeking(self): 2719 chunk_size = _default_chunk_size() 2720 prefix_size = chunk_size - 2 2721 u_prefix = "a" * prefix_size 2722 prefix = bytes(u_prefix.encode("utf-8")) 2723 self.assertEqual(len(u_prefix), len(prefix)) 2724 u_suffix = "\u8888\n" 2725 suffix = bytes(u_suffix.encode("utf-8")) 2726 line = prefix + suffix 2727 with self.open(support.TESTFN, "wb") as f: 2728 f.write(line*2) 2729 with self.open(support.TESTFN, "r", encoding="utf-8") as f: 2730 s = f.read(prefix_size) 2731 self.assertEqual(s, str(prefix, "ascii")) 2732 self.assertEqual(f.tell(), prefix_size) 2733 self.assertEqual(f.readline(), u_suffix) 2734 2735 def test_seeking_too(self): 2736 # Regression test for a specific bug 2737 data = b'\xe0\xbf\xbf\n' 2738 with self.open(support.TESTFN, "wb") as f: 2739 f.write(data) 2740 with self.open(support.TESTFN, "r", encoding="utf-8") as f: 2741 f._CHUNK_SIZE # Just test that it exists 2742 f._CHUNK_SIZE = 2 2743 f.readline() 2744 f.tell() 2745 2746 def test_seek_and_tell(self): 2747 #Test seek/tell using the StatefulIncrementalDecoder. 2748 # Make test faster by doing smaller seeks 2749 CHUNK_SIZE = 128 2750 2751 def test_seek_and_tell_with_data(data, min_pos=0): 2752 """Tell/seek to various points within a data stream and ensure 2753 that the decoded data returned by read() is consistent.""" 2754 f = self.open(support.TESTFN, 'wb') 2755 f.write(data) 2756 f.close() 2757 f = self.open(support.TESTFN, encoding='test_decoder') 2758 f._CHUNK_SIZE = CHUNK_SIZE 2759 decoded = f.read() 2760 f.close() 2761 2762 for i in range(min_pos, len(decoded) + 1): # seek positions 2763 for j in [1, 5, len(decoded) - i]: # read lengths 2764 f = self.open(support.TESTFN, encoding='test_decoder') 2765 self.assertEqual(f.read(i), decoded[:i]) 2766 cookie = f.tell() 2767 self.assertEqual(f.read(j), decoded[i:i + j]) 2768 f.seek(cookie) 2769 self.assertEqual(f.read(), decoded[i:]) 2770 f.close() 2771 2772 # Enable the test decoder. 2773 StatefulIncrementalDecoder.codecEnabled = 1 2774 2775 # Run the tests. 2776 try: 2777 # Try each test case. 2778 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2779 test_seek_and_tell_with_data(input) 2780 2781 # Position each test case so that it crosses a chunk boundary. 2782 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2783 offset = CHUNK_SIZE - len(input)//2 2784 prefix = b'.'*offset 2785 # Don't bother seeking into the prefix (takes too long). 2786 min_pos = offset*2 2787 test_seek_and_tell_with_data(prefix + input, min_pos) 2788 2789 # Ensure our test decoder won't interfere with subsequent tests. 2790 finally: 2791 StatefulIncrementalDecoder.codecEnabled = 0 2792 2793 def test_encoded_writes(self): 2794 data = "1234567890" 2795 tests = ("utf-16", 2796 "utf-16-le", 2797 "utf-16-be", 2798 "utf-32", 2799 "utf-32-le", 2800 "utf-32-be") 2801 for encoding in tests: 2802 buf = self.BytesIO() 2803 f = self.TextIOWrapper(buf, encoding=encoding) 2804 # Check if the BOM is written only once (see issue1753). 2805 f.write(data) 2806 f.write(data) 2807 f.seek(0) 2808 self.assertEqual(f.read(), data * 2) 2809 f.seek(0) 2810 self.assertEqual(f.read(), data * 2) 2811 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 2812 2813 def test_unreadable(self): 2814 class UnReadable(self.BytesIO): 2815 def readable(self): 2816 return False 2817 txt = self.TextIOWrapper(UnReadable()) 2818 self.assertRaises(OSError, txt.read) 2819 2820 def test_read_one_by_one(self): 2821 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 2822 reads = "" 2823 while True: 2824 c = txt.read(1) 2825 if not c: 2826 break 2827 reads += c 2828 self.assertEqual(reads, "AA\nBB") 2829 2830 def test_readlines(self): 2831 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 2832 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 2833 txt.seek(0) 2834 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 2835 txt.seek(0) 2836 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 2837 2838 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 2839 def test_read_by_chunk(self): 2840 # make sure "\r\n" straddles 128 char boundary. 2841 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 2842 reads = "" 2843 while True: 2844 c = txt.read(128) 2845 if not c: 2846 break 2847 reads += c 2848 self.assertEqual(reads, "A"*127+"\nB") 2849 2850 def test_writelines(self): 2851 l = ['ab', 'cd', 'ef'] 2852 buf = self.BytesIO() 2853 txt = self.TextIOWrapper(buf) 2854 txt.writelines(l) 2855 txt.flush() 2856 self.assertEqual(buf.getvalue(), b'abcdef') 2857 2858 def test_writelines_userlist(self): 2859 l = UserList(['ab', 'cd', 'ef']) 2860 buf = self.BytesIO() 2861 txt = self.TextIOWrapper(buf) 2862 txt.writelines(l) 2863 txt.flush() 2864 self.assertEqual(buf.getvalue(), b'abcdef') 2865 2866 def test_writelines_error(self): 2867 txt = self.TextIOWrapper(self.BytesIO()) 2868 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 2869 self.assertRaises(TypeError, txt.writelines, None) 2870 self.assertRaises(TypeError, txt.writelines, b'abc') 2871 2872 def test_issue1395_1(self): 2873 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2874 2875 # read one char at a time 2876 reads = "" 2877 while True: 2878 c = txt.read(1) 2879 if not c: 2880 break 2881 reads += c 2882 self.assertEqual(reads, self.normalized) 2883 2884 def test_issue1395_2(self): 2885 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2886 txt._CHUNK_SIZE = 4 2887 2888 reads = "" 2889 while True: 2890 c = txt.read(4) 2891 if not c: 2892 break 2893 reads += c 2894 self.assertEqual(reads, self.normalized) 2895 2896 def test_issue1395_3(self): 2897 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2898 txt._CHUNK_SIZE = 4 2899 2900 reads = txt.read(4) 2901 reads += txt.read(4) 2902 reads += txt.readline() 2903 reads += txt.readline() 2904 reads += txt.readline() 2905 self.assertEqual(reads, self.normalized) 2906 2907 def test_issue1395_4(self): 2908 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2909 txt._CHUNK_SIZE = 4 2910 2911 reads = txt.read(4) 2912 reads += txt.read() 2913 self.assertEqual(reads, self.normalized) 2914 2915 def test_issue1395_5(self): 2916 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2917 txt._CHUNK_SIZE = 4 2918 2919 reads = txt.read(4) 2920 pos = txt.tell() 2921 txt.seek(0) 2922 txt.seek(pos) 2923 self.assertEqual(txt.read(4), "BBB\n") 2924 2925 def test_issue2282(self): 2926 buffer = self.BytesIO(self.testdata) 2927 txt = self.TextIOWrapper(buffer, encoding="ascii") 2928 2929 self.assertEqual(buffer.seekable(), txt.seekable()) 2930 2931 def test_append_bom(self): 2932 # The BOM is not written again when appending to a non-empty file 2933 filename = support.TESTFN 2934 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2935 with self.open(filename, 'w', encoding=charset) as f: 2936 f.write('aaa') 2937 pos = f.tell() 2938 with self.open(filename, 'rb') as f: 2939 self.assertEqual(f.read(), 'aaa'.encode(charset)) 2940 2941 with self.open(filename, 'a', encoding=charset) as f: 2942 f.write('xxx') 2943 with self.open(filename, 'rb') as f: 2944 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 2945 2946 def test_seek_bom(self): 2947 # Same test, but when seeking manually 2948 filename = support.TESTFN 2949 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2950 with self.open(filename, 'w', encoding=charset) as f: 2951 f.write('aaa') 2952 pos = f.tell() 2953 with self.open(filename, 'r+', encoding=charset) as f: 2954 f.seek(pos) 2955 f.write('zzz') 2956 f.seek(0) 2957 f.write('bbb') 2958 with self.open(filename, 'rb') as f: 2959 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 2960 2961 def test_seek_append_bom(self): 2962 # Same test, but first seek to the start and then to the end 2963 filename = support.TESTFN 2964 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2965 with self.open(filename, 'w', encoding=charset) as f: 2966 f.write('aaa') 2967 with self.open(filename, 'a', encoding=charset) as f: 2968 f.seek(0) 2969 f.seek(0, self.SEEK_END) 2970 f.write('xxx') 2971 with self.open(filename, 'rb') as f: 2972 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 2973 2974 def test_errors_property(self): 2975 with self.open(support.TESTFN, "w") as f: 2976 self.assertEqual(f.errors, "strict") 2977 with self.open(support.TESTFN, "w", errors="replace") as f: 2978 self.assertEqual(f.errors, "replace") 2979 2980 @support.no_tracing 2981 @unittest.skipUnless(threading, 'Threading required for this test.') 2982 def test_threads_write(self): 2983 # Issue6750: concurrent writes could duplicate data 2984 event = threading.Event() 2985 with self.open(support.TESTFN, "w", buffering=1) as f: 2986 def run(n): 2987 text = "Thread%03d\n" % n 2988 event.wait() 2989 f.write(text) 2990 threads = [threading.Thread(target=run, args=(x,)) 2991 for x in range(20)] 2992 with support.start_threads(threads, event.set): 2993 time.sleep(0.02) 2994 with self.open(support.TESTFN) as f: 2995 content = f.read() 2996 for n in range(20): 2997 self.assertEqual(content.count("Thread%03d\n" % n), 1) 2998 2999 def test_flush_error_on_close(self): 3000 # Test that text file is closed despite failed flush 3001 # and that flush() is called before file closed. 3002 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3003 closed = [] 3004 def bad_flush(): 3005 closed[:] = [txt.closed, txt.buffer.closed] 3006 raise OSError() 3007 txt.flush = bad_flush 3008 self.assertRaises(OSError, txt.close) # exception not swallowed 3009 self.assertTrue(txt.closed) 3010 self.assertTrue(txt.buffer.closed) 3011 self.assertTrue(closed) # flush() called 3012 self.assertFalse(closed[0]) # flush() called before file closed 3013 self.assertFalse(closed[1]) 3014 txt.flush = lambda: None # break reference loop 3015 3016 def test_close_error_on_close(self): 3017 buffer = self.BytesIO(self.testdata) 3018 def bad_flush(): 3019 raise OSError('flush') 3020 def bad_close(): 3021 raise OSError('close') 3022 buffer.close = bad_close 3023 txt = self.TextIOWrapper(buffer, encoding="ascii") 3024 txt.flush = bad_flush 3025 with self.assertRaises(OSError) as err: # exception not swallowed 3026 txt.close() 3027 self.assertEqual(err.exception.args, ('close',)) 3028 self.assertIsInstance(err.exception.__context__, OSError) 3029 self.assertEqual(err.exception.__context__.args, ('flush',)) 3030 self.assertFalse(txt.closed) 3031 3032 def test_nonnormalized_close_error_on_close(self): 3033 # Issue #21677 3034 buffer = self.BytesIO(self.testdata) 3035 def bad_flush(): 3036 raise non_existing_flush 3037 def bad_close(): 3038 raise non_existing_close 3039 buffer.close = bad_close 3040 txt = self.TextIOWrapper(buffer, encoding="ascii") 3041 txt.flush = bad_flush 3042 with self.assertRaises(NameError) as err: # exception not swallowed 3043 txt.close() 3044 self.assertIn('non_existing_close', str(err.exception)) 3045 self.assertIsInstance(err.exception.__context__, NameError) 3046 self.assertIn('non_existing_flush', str(err.exception.__context__)) 3047 self.assertFalse(txt.closed) 3048 3049 def test_multi_close(self): 3050 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3051 txt.close() 3052 txt.close() 3053 txt.close() 3054 self.assertRaises(ValueError, txt.flush) 3055 3056 def test_unseekable(self): 3057 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata)) 3058 self.assertRaises(self.UnsupportedOperation, txt.tell) 3059 self.assertRaises(self.UnsupportedOperation, txt.seek, 0) 3060 3061 def test_readonly_attributes(self): 3062 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3063 buf = self.BytesIO(self.testdata) 3064 with self.assertRaises(AttributeError): 3065 txt.buffer = buf 3066 3067 def test_rawio(self): 3068 # Issue #12591: TextIOWrapper must work with raw I/O objects, so 3069 # that subprocess.Popen() can have the required unbuffered 3070 # semantics with universal_newlines=True. 3071 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3072 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3073 # Reads 3074 self.assertEqual(txt.read(4), 'abcd') 3075 self.assertEqual(txt.readline(), 'efghi\n') 3076 self.assertEqual(list(txt), ['jkl\n', 'opq\n']) 3077 3078 def test_rawio_write_through(self): 3079 # Issue #12591: with write_through=True, writes don't need a flush 3080 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3081 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', 3082 write_through=True) 3083 txt.write('1') 3084 txt.write('23\n4') 3085 txt.write('5') 3086 self.assertEqual(b''.join(raw._write_stack), b'123\n45') 3087 3088 def test_bufio_write_through(self): 3089 # Issue #21396: write_through=True doesn't force a flush() 3090 # on the underlying binary buffered object. 3091 flush_called, write_called = [], [] 3092 class BufferedWriter(self.BufferedWriter): 3093 def flush(self, *args, **kwargs): 3094 flush_called.append(True) 3095 return super().flush(*args, **kwargs) 3096 def write(self, *args, **kwargs): 3097 write_called.append(True) 3098 return super().write(*args, **kwargs) 3099 3100 rawio = self.BytesIO() 3101 data = b"a" 3102 bufio = BufferedWriter(rawio, len(data)*2) 3103 textio = self.TextIOWrapper(bufio, encoding='ascii', 3104 write_through=True) 3105 # write to the buffered io but don't overflow the buffer 3106 text = data.decode('ascii') 3107 textio.write(text) 3108 3109 # buffer.flush is not called with write_through=True 3110 self.assertFalse(flush_called) 3111 # buffer.write *is* called with write_through=True 3112 self.assertTrue(write_called) 3113 self.assertEqual(rawio.getvalue(), b"") # no flush 3114 3115 write_called = [] # reset 3116 textio.write(text * 10) # total content is larger than bufio buffer 3117 self.assertTrue(write_called) 3118 self.assertEqual(rawio.getvalue(), data * 11) # all flushed 3119 3120 def test_read_nonbytes(self): 3121 # Issue #17106 3122 # Crash when underlying read() returns non-bytes 3123 t = self.TextIOWrapper(self.StringIO('a')) 3124 self.assertRaises(TypeError, t.read, 1) 3125 t = self.TextIOWrapper(self.StringIO('a')) 3126 self.assertRaises(TypeError, t.readline) 3127 t = self.TextIOWrapper(self.StringIO('a')) 3128 self.assertRaises(TypeError, t.read) 3129 3130 def test_illegal_decoder(self): 3131 # Issue #17106 3132 # Bypass the early encoding check added in issue 20404 3133 def _make_illegal_wrapper(): 3134 quopri = codecs.lookup("quopri") 3135 quopri._is_text_encoding = True 3136 try: 3137 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 3138 newline='\n', encoding="quopri") 3139 finally: 3140 quopri._is_text_encoding = False 3141 return t 3142 # Crash when decoder returns non-string 3143 t = _make_illegal_wrapper() 3144 self.assertRaises(TypeError, t.read, 1) 3145 t = _make_illegal_wrapper() 3146 self.assertRaises(TypeError, t.readline) 3147 t = _make_illegal_wrapper() 3148 self.assertRaises(TypeError, t.read) 3149 3150 def _check_create_at_shutdown(self, **kwargs): 3151 # Issue #20037: creating a TextIOWrapper at shutdown 3152 # shouldn't crash the interpreter. 3153 iomod = self.io.__name__ 3154 code = """if 1: 3155 import codecs 3156 import {iomod} as io 3157 3158 # Avoid looking up codecs at shutdown 3159 codecs.lookup('utf-8') 3160 3161 class C: 3162 def __init__(self): 3163 self.buf = io.BytesIO() 3164 def __del__(self): 3165 io.TextIOWrapper(self.buf, **{kwargs}) 3166 print("ok") 3167 c = C() 3168 """.format(iomod=iomod, kwargs=kwargs) 3169 return assert_python_ok("-c", code) 3170 3171 @support.requires_type_collecting 3172 def test_create_at_shutdown_without_encoding(self): 3173 rc, out, err = self._check_create_at_shutdown() 3174 if err: 3175 # Can error out with a RuntimeError if the module state 3176 # isn't found. 3177 self.assertIn(self.shutdown_error, err.decode()) 3178 else: 3179 self.assertEqual("ok", out.decode().strip()) 3180 3181 @support.requires_type_collecting 3182 def test_create_at_shutdown_with_encoding(self): 3183 rc, out, err = self._check_create_at_shutdown(encoding='utf-8', 3184 errors='strict') 3185 self.assertFalse(err) 3186 self.assertEqual("ok", out.decode().strip()) 3187 3188 def test_read_byteslike(self): 3189 r = MemviewBytesIO(b'Just some random string\n') 3190 t = self.TextIOWrapper(r, 'utf-8') 3191 3192 # TextIOwrapper will not read the full string, because 3193 # we truncate it to a multiple of the native int size 3194 # so that we can construct a more complex memoryview. 3195 bytes_val = _to_memoryview(r.getvalue()).tobytes() 3196 3197 self.assertEqual(t.read(200), bytes_val.decode('utf-8')) 3198 3199 def test_issue22849(self): 3200 class F(object): 3201 def readable(self): return True 3202 def writable(self): return True 3203 def seekable(self): return True 3204 3205 for i in range(10): 3206 try: 3207 self.TextIOWrapper(F(), encoding='utf-8') 3208 except Exception: 3209 pass 3210 3211 F.tell = lambda x: 0 3212 t = self.TextIOWrapper(F(), encoding='utf-8') 3213 3214 3215 class MemviewBytesIO(io.BytesIO): 3216 '''A BytesIO object whose read method returns memoryviews 3217 rather than bytes''' 3218 3219 def read1(self, len_): 3220 return _to_memoryview(super().read1(len_)) 3221 3222 def read(self, len_): 3223 return _to_memoryview(super().read(len_)) 3224 3225 def _to_memoryview(buf): 3226 '''Convert bytes-object *buf* to a non-trivial memoryview''' 3227 3228 arr = array.array('i') 3229 idx = len(buf) - len(buf) % arr.itemsize 3230 arr.frombytes(buf[:idx]) 3231 return memoryview(arr) 3232 3233 3234 class CTextIOWrapperTest(TextIOWrapperTest): 3235 io = io 3236 shutdown_error = "RuntimeError: could not find io module state" 3237 3238 def test_initialization(self): 3239 r = self.BytesIO(b"\xc3\xa9\n\n") 3240 b = self.BufferedReader(r, 1000) 3241 t = self.TextIOWrapper(b) 3242 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 3243 self.assertRaises(ValueError, t.read) 3244 3245 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 3246 self.assertRaises(Exception, repr, t) 3247 3248 def test_garbage_collection(self): 3249 # C TextIOWrapper objects are collected, and collecting them flushes 3250 # all data to disk. 3251 # The Python version has __del__, so it ends in gc.garbage instead. 3252 with support.check_warnings(('', ResourceWarning)): 3253 rawio = io.FileIO(support.TESTFN, "wb") 3254 b = self.BufferedWriter(rawio) 3255 t = self.TextIOWrapper(b, encoding="ascii") 3256 t.write("456def") 3257 t.x = t 3258 wr = weakref.ref(t) 3259 del t 3260 support.gc_collect() 3261 self.assertIsNone(wr(), wr) 3262 with self.open(support.TESTFN, "rb") as f: 3263 self.assertEqual(f.read(), b"456def") 3264 3265 def test_rwpair_cleared_before_textio(self): 3266 # Issue 13070: TextIOWrapper's finalization would crash when called 3267 # after the reference to the underlying BufferedRWPair's writer got 3268 # cleared by the GC. 3269 for i in range(1000): 3270 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3271 t1 = self.TextIOWrapper(b1, encoding="ascii") 3272 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3273 t2 = self.TextIOWrapper(b2, encoding="ascii") 3274 # circular references 3275 t1.buddy = t2 3276 t2.buddy = t1 3277 support.gc_collect() 3278 3279 3280 class PyTextIOWrapperTest(TextIOWrapperTest): 3281 io = pyio 3282 shutdown_error = "LookupError: unknown encoding: ascii" 3283 3284 3285 class IncrementalNewlineDecoderTest(unittest.TestCase): 3286 3287 def check_newline_decoding_utf8(self, decoder): 3288 # UTF-8 specific tests for a newline decoder 3289 def _check_decode(b, s, **kwargs): 3290 # We exercise getstate() / setstate() as well as decode() 3291 state = decoder.getstate() 3292 self.assertEqual(decoder.decode(b, **kwargs), s) 3293 decoder.setstate(state) 3294 self.assertEqual(decoder.decode(b, **kwargs), s) 3295 3296 _check_decode(b'\xe8\xa2\x88', "\u8888") 3297 3298 _check_decode(b'\xe8', "") 3299 _check_decode(b'\xa2', "") 3300 _check_decode(b'\x88', "\u8888") 3301 3302 _check_decode(b'\xe8', "") 3303 _check_decode(b'\xa2', "") 3304 _check_decode(b'\x88', "\u8888") 3305 3306 _check_decode(b'\xe8', "") 3307 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 3308 3309 decoder.reset() 3310 _check_decode(b'\n', "\n") 3311 _check_decode(b'\r', "") 3312 _check_decode(b'', "\n", final=True) 3313 _check_decode(b'\r', "\n", final=True) 3314 3315 _check_decode(b'\r', "") 3316 _check_decode(b'a', "\na") 3317 3318 _check_decode(b'\r\r\n', "\n\n") 3319 _check_decode(b'\r', "") 3320 _check_decode(b'\r', "\n") 3321 _check_decode(b'\na', "\na") 3322 3323 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 3324 _check_decode(b'\xe8\xa2\x88', "\u8888") 3325 _check_decode(b'\n', "\n") 3326 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 3327 _check_decode(b'\n', "\n") 3328 3329 def check_newline_decoding(self, decoder, encoding): 3330 result = [] 3331 if encoding is not None: 3332 encoder = codecs.getincrementalencoder(encoding)() 3333 def _decode_bytewise(s): 3334 # Decode one byte at a time 3335 for b in encoder.encode(s): 3336 result.append(decoder.decode(bytes([b]))) 3337 else: 3338 encoder = None 3339 def _decode_bytewise(s): 3340 # Decode one char at a time 3341 for c in s: 3342 result.append(decoder.decode(c)) 3343 self.assertEqual(decoder.newlines, None) 3344 _decode_bytewise("abc\n\r") 3345 self.assertEqual(decoder.newlines, '\n') 3346 _decode_bytewise("\nabc") 3347 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3348 _decode_bytewise("abc\r") 3349 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3350 _decode_bytewise("abc") 3351 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 3352 _decode_bytewise("abc\r") 3353 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 3354 decoder.reset() 3355 input = "abc" 3356 if encoder is not None: 3357 encoder.reset() 3358 input = encoder.encode(input) 3359 self.assertEqual(decoder.decode(input), "abc") 3360 self.assertEqual(decoder.newlines, None) 3361 3362 def test_newline_decoder(self): 3363 encodings = ( 3364 # None meaning the IncrementalNewlineDecoder takes unicode input 3365 # rather than bytes input 3366 None, 'utf-8', 'latin-1', 3367 'utf-16', 'utf-16-le', 'utf-16-be', 3368 'utf-32', 'utf-32-le', 'utf-32-be', 3369 ) 3370 for enc in encodings: 3371 decoder = enc and codecs.getincrementaldecoder(enc)() 3372 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3373 self.check_newline_decoding(decoder, enc) 3374 decoder = codecs.getincrementaldecoder("utf-8")() 3375 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3376 self.check_newline_decoding_utf8(decoder) 3377 3378 def test_newline_bytes(self): 3379 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 3380 def _check(dec): 3381 self.assertEqual(dec.newlines, None) 3382 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 3383 self.assertEqual(dec.newlines, None) 3384 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 3385 self.assertEqual(dec.newlines, None) 3386 dec = self.IncrementalNewlineDecoder(None, translate=False) 3387 _check(dec) 3388 dec = self.IncrementalNewlineDecoder(None, translate=True) 3389 _check(dec) 3390 3391 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3392 pass 3393 3394 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3395 pass 3396 3397 3398 # XXX Tests for open() 3399 3400 class MiscIOTest(unittest.TestCase): 3401 3402 def tearDown(self): 3403 support.unlink(support.TESTFN) 3404 3405 def test___all__(self): 3406 for name in self.io.__all__: 3407 obj = getattr(self.io, name, None) 3408 self.assertIsNotNone(obj, name) 3409 if name == "open": 3410 continue 3411 elif "error" in name.lower() or name == "UnsupportedOperation": 3412 self.assertTrue(issubclass(obj, Exception), name) 3413 elif not name.startswith("SEEK_"): 3414 self.assertTrue(issubclass(obj, self.IOBase)) 3415 3416 def test_attributes(self): 3417 f = self.open(support.TESTFN, "wb", buffering=0) 3418 self.assertEqual(f.mode, "wb") 3419 f.close() 3420 3421 with support.check_warnings(('', DeprecationWarning)): 3422 f = self.open(support.TESTFN, "U") 3423 self.assertEqual(f.name, support.TESTFN) 3424 self.assertEqual(f.buffer.name, support.TESTFN) 3425 self.assertEqual(f.buffer.raw.name, support.TESTFN) 3426 self.assertEqual(f.mode, "U") 3427 self.assertEqual(f.buffer.mode, "rb") 3428 self.assertEqual(f.buffer.raw.mode, "rb") 3429 f.close() 3430 3431 f = self.open(support.TESTFN, "w+") 3432 self.assertEqual(f.mode, "w+") 3433 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 3434 self.assertEqual(f.buffer.raw.mode, "rb+") 3435 3436 g = self.open(f.fileno(), "wb", closefd=False) 3437 self.assertEqual(g.mode, "wb") 3438 self.assertEqual(g.raw.mode, "wb") 3439 self.assertEqual(g.name, f.fileno()) 3440 self.assertEqual(g.raw.name, f.fileno()) 3441 f.close() 3442 g.close() 3443 3444 def test_io_after_close(self): 3445 for kwargs in [ 3446 {"mode": "w"}, 3447 {"mode": "wb"}, 3448 {"mode": "w", "buffering": 1}, 3449 {"mode": "w", "buffering": 2}, 3450 {"mode": "wb", "buffering": 0}, 3451 {"mode": "r"}, 3452 {"mode": "rb"}, 3453 {"mode": "r", "buffering": 1}, 3454 {"mode": "r", "buffering": 2}, 3455 {"mode": "rb", "buffering": 0}, 3456 {"mode": "w+"}, 3457 {"mode": "w+b"}, 3458 {"mode": "w+", "buffering": 1}, 3459 {"mode": "w+", "buffering": 2}, 3460 {"mode": "w+b", "buffering": 0}, 3461 ]: 3462 f = self.open(support.TESTFN, **kwargs) 3463 f.close() 3464 self.assertRaises(ValueError, f.flush) 3465 self.assertRaises(ValueError, f.fileno) 3466 self.assertRaises(ValueError, f.isatty) 3467 self.assertRaises(ValueError, f.__iter__) 3468 if hasattr(f, "peek"): 3469 self.assertRaises(ValueError, f.peek, 1) 3470 self.assertRaises(ValueError, f.read) 3471 if hasattr(f, "read1"): 3472 self.assertRaises(ValueError, f.read1, 1024) 3473 if hasattr(f, "readall"): 3474 self.assertRaises(ValueError, f.readall) 3475 if hasattr(f, "readinto"): 3476 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 3477 if hasattr(f, "readinto1"): 3478 self.assertRaises(ValueError, f.readinto1, bytearray(1024)) 3479 self.assertRaises(ValueError, f.readline) 3480 self.assertRaises(ValueError, f.readlines) 3481 self.assertRaises(ValueError, f.seek, 0) 3482 self.assertRaises(ValueError, f.tell) 3483 self.assertRaises(ValueError, f.truncate) 3484 self.assertRaises(ValueError, f.write, 3485 b"" if "b" in kwargs['mode'] else "") 3486 self.assertRaises(ValueError, f.writelines, []) 3487 self.assertRaises(ValueError, next, f) 3488 3489 def test_blockingioerror(self): 3490 # Various BlockingIOError issues 3491 class C(str): 3492 pass 3493 c = C("") 3494 b = self.BlockingIOError(1, c) 3495 c.b = b 3496 b.c = c 3497 wr = weakref.ref(c) 3498 del c, b 3499 support.gc_collect() 3500 self.assertIsNone(wr(), wr) 3501 3502 def test_abcs(self): 3503 # Test the visible base classes are ABCs. 3504 self.assertIsInstance(self.IOBase, abc.ABCMeta) 3505 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 3506 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 3507 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 3508 3509 def _check_abc_inheritance(self, abcmodule): 3510 with self.open(support.TESTFN, "wb", buffering=0) as f: 3511 self.assertIsInstance(f, abcmodule.IOBase) 3512 self.assertIsInstance(f, abcmodule.RawIOBase) 3513 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3514 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3515 with self.open(support.TESTFN, "wb") as f: 3516 self.assertIsInstance(f, abcmodule.IOBase) 3517 self.assertNotIsInstance(f, abcmodule.RawIOBase) 3518 self.assertIsInstance(f, abcmodule.BufferedIOBase) 3519 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3520 with self.open(support.TESTFN, "w") as f: 3521 self.assertIsInstance(f, abcmodule.IOBase) 3522 self.assertNotIsInstance(f, abcmodule.RawIOBase) 3523 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3524 self.assertIsInstance(f, abcmodule.TextIOBase) 3525 3526 def test_abc_inheritance(self): 3527 # Test implementations inherit from their respective ABCs 3528 self._check_abc_inheritance(self) 3529 3530 def test_abc_inheritance_official(self): 3531 # Test implementations inherit from the official ABCs of the 3532 # baseline "io" module. 3533 self._check_abc_inheritance(io) 3534 3535 def _check_warn_on_dealloc(self, *args, **kwargs): 3536 f = open(*args, **kwargs) 3537 r = repr(f) 3538 with self.assertWarns(ResourceWarning) as cm: 3539 f = None 3540 support.gc_collect() 3541 self.assertIn(r, str(cm.warning.args[0])) 3542 3543 def test_warn_on_dealloc(self): 3544 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0) 3545 self._check_warn_on_dealloc(support.TESTFN, "wb") 3546 self._check_warn_on_dealloc(support.TESTFN, "w") 3547 3548 def _check_warn_on_dealloc_fd(self, *args, **kwargs): 3549 fds = [] 3550 def cleanup_fds(): 3551 for fd in fds: 3552 try: 3553 os.close(fd) 3554 except OSError as e: 3555 if e.errno != errno.EBADF: 3556 raise 3557 self.addCleanup(cleanup_fds) 3558 r, w = os.pipe() 3559 fds += r, w 3560 self._check_warn_on_dealloc(r, *args, **kwargs) 3561 # When using closefd=False, there's no warning 3562 r, w = os.pipe() 3563 fds += r, w 3564 with support.check_no_resource_warning(self): 3565 open(r, *args, closefd=False, **kwargs) 3566 3567 def test_warn_on_dealloc_fd(self): 3568 self._check_warn_on_dealloc_fd("rb", buffering=0) 3569 self._check_warn_on_dealloc_fd("rb") 3570 self._check_warn_on_dealloc_fd("r") 3571 3572 3573 def test_pickling(self): 3574 # Pickling file objects is forbidden 3575 for kwargs in [ 3576 {"mode": "w"}, 3577 {"mode": "wb"}, 3578 {"mode": "wb", "buffering": 0}, 3579 {"mode": "r"}, 3580 {"mode": "rb"}, 3581 {"mode": "rb", "buffering": 0}, 3582 {"mode": "w+"}, 3583 {"mode": "w+b"}, 3584 {"mode": "w+b", "buffering": 0}, 3585 ]: 3586 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 3587 with self.open(support.TESTFN, **kwargs) as f: 3588 self.assertRaises(TypeError, pickle.dumps, f, protocol) 3589 3590 def test_nonblock_pipe_write_bigbuf(self): 3591 self._test_nonblock_pipe_write(16*1024) 3592 3593 def test_nonblock_pipe_write_smallbuf(self): 3594 self._test_nonblock_pipe_write(1024) 3595 3596 @unittest.skipUnless(hasattr(os, 'set_blocking'), 3597 'os.set_blocking() required for this test') 3598 def _test_nonblock_pipe_write(self, bufsize): 3599 sent = [] 3600 received = [] 3601 r, w = os.pipe() 3602 os.set_blocking(r, False) 3603 os.set_blocking(w, False) 3604 3605 # To exercise all code paths in the C implementation we need 3606 # to play with buffer sizes. For instance, if we choose a 3607 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 3608 # then we will never get a partial write of the buffer. 3609 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 3610 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 3611 3612 with rf, wf: 3613 for N in 9999, 73, 7574: 3614 try: 3615 i = 0 3616 while True: 3617 msg = bytes([i % 26 + 97]) * N 3618 sent.append(msg) 3619 wf.write(msg) 3620 i += 1 3621 3622 except self.BlockingIOError as e: 3623 self.assertEqual(e.args[0], errno.EAGAIN) 3624 self.assertEqual(e.args[2], e.characters_written) 3625 sent[-1] = sent[-1][:e.characters_written] 3626 received.append(rf.read()) 3627 msg = b'BLOCKED' 3628 wf.write(msg) 3629 sent.append(msg) 3630 3631 while True: 3632 try: 3633 wf.flush() 3634 break 3635 except self.BlockingIOError as e: 3636 self.assertEqual(e.args[0], errno.EAGAIN) 3637 self.assertEqual(e.args[2], e.characters_written) 3638 self.assertEqual(e.characters_written, 0) 3639 received.append(rf.read()) 3640 3641 received += iter(rf.read, None) 3642 3643 sent, received = b''.join(sent), b''.join(received) 3644 self.assertEqual(sent, received) 3645 self.assertTrue(wf.closed) 3646 self.assertTrue(rf.closed) 3647 3648 def test_create_fail(self): 3649 # 'x' mode fails if file is existing 3650 with self.open(support.TESTFN, 'w'): 3651 pass 3652 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x') 3653 3654 def test_create_writes(self): 3655 # 'x' mode opens for writing 3656 with self.open(support.TESTFN, 'xb') as f: 3657 f.write(b"spam") 3658 with self.open(support.TESTFN, 'rb') as f: 3659 self.assertEqual(b"spam", f.read()) 3660 3661 def test_open_allargs(self): 3662 # there used to be a buffer overflow in the parser for rawmode 3663 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+') 3664 3665 3666 class CMiscIOTest(MiscIOTest): 3667 io = io 3668 3669 def test_readinto_buffer_overflow(self): 3670 # Issue #18025 3671 class BadReader(self.io.BufferedIOBase): 3672 def read(self, n=-1): 3673 return b'x' * 10**6 3674 bufio = BadReader() 3675 b = bytearray(2) 3676 self.assertRaises(ValueError, bufio.readinto, b) 3677 3678 @unittest.skipUnless(threading, 'Threading required for this test.') 3679 def check_daemon_threads_shutdown_deadlock(self, stream_name): 3680 # Issue #23309: deadlocks at shutdown should be avoided when a 3681 # daemon thread and the main thread both write to a file. 3682 code = """if 1: 3683 import sys 3684 import time 3685 import threading 3686 3687 file = sys.{stream_name} 3688 3689 def run(): 3690 while True: 3691 file.write('.') 3692 file.flush() 3693 3694 thread = threading.Thread(target=run) 3695 thread.daemon = True 3696 thread.start() 3697 3698 time.sleep(0.5) 3699 file.write('!') 3700 file.flush() 3701 """.format_map(locals()) 3702 res, _ = run_python_until_end("-c", code) 3703 err = res.err.decode() 3704 if res.rc != 0: 3705 # Failure: should be a fatal error 3706 self.assertIn("Fatal Python error: could not acquire lock " 3707 "for <_io.BufferedWriter name='<{stream_name}>'> " 3708 "at interpreter shutdown, possibly due to " 3709 "daemon threads".format_map(locals()), 3710 err) 3711 else: 3712 self.assertFalse(err.strip('.!')) 3713 3714 def test_daemon_threads_shutdown_stdout_deadlock(self): 3715 self.check_daemon_threads_shutdown_deadlock('stdout') 3716 3717 def test_daemon_threads_shutdown_stderr_deadlock(self): 3718 self.check_daemon_threads_shutdown_deadlock('stderr') 3719 3720 3721 class PyMiscIOTest(MiscIOTest): 3722 io = pyio 3723 3724 3725 @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 3726 class SignalsTest(unittest.TestCase): 3727 3728 def setUp(self): 3729 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 3730 3731 def tearDown(self): 3732 signal.signal(signal.SIGALRM, self.oldalrm) 3733 3734 def alarm_interrupt(self, sig, frame): 3735 1/0 3736 3737 @unittest.skipUnless(threading, 'Threading required for this test.') 3738 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 3739 """Check that a partial write, when it gets interrupted, properly 3740 invokes the signal handler, and bubbles up the exception raised 3741 in the latter.""" 3742 read_results = [] 3743 def _read(): 3744 if hasattr(signal, 'pthread_sigmask'): 3745 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) 3746 s = os.read(r, 1) 3747 read_results.append(s) 3748 t = threading.Thread(target=_read) 3749 t.daemon = True 3750 r, w = os.pipe() 3751 fdopen_kwargs["closefd"] = False 3752 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1) 3753 try: 3754 wio = self.io.open(w, **fdopen_kwargs) 3755 t.start() 3756 # Fill the pipe enough that the write will be blocking. 3757 # It will be interrupted by the timer armed above. Since the 3758 # other thread has read one byte, the low-level write will 3759 # return with a successful (partial) result rather than an EINTR. 3760 # The buffered IO layer must check for pending signal 3761 # handlers, which in this case will invoke alarm_interrupt(). 3762 signal.alarm(1) 3763 try: 3764 self.assertRaises(ZeroDivisionError, wio.write, large_data) 3765 finally: 3766 signal.alarm(0) 3767 t.join() 3768 # We got one byte, get another one and check that it isn't a 3769 # repeat of the first one. 3770 read_results.append(os.read(r, 1)) 3771 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 3772 finally: 3773 os.close(w) 3774 os.close(r) 3775 # This is deliberate. If we didn't close the file descriptor 3776 # before closing wio, wio would try to flush its internal 3777 # buffer, and block again. 3778 try: 3779 wio.close() 3780 except OSError as e: 3781 if e.errno != errno.EBADF: 3782 raise 3783 3784 def test_interrupted_write_unbuffered(self): 3785 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 3786 3787 def test_interrupted_write_buffered(self): 3788 self.check_interrupted_write(b"xy", b"xy", mode="wb") 3789 3790 # Issue #22331: The test hangs on FreeBSD 7.2 3791 @support.requires_freebsd_version(8) 3792 def test_interrupted_write_text(self): 3793 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 3794 3795 @support.no_tracing 3796 def check_reentrant_write(self, data, **fdopen_kwargs): 3797 def on_alarm(*args): 3798 # Will be called reentrantly from the same thread 3799 wio.write(data) 3800 1/0 3801 signal.signal(signal.SIGALRM, on_alarm) 3802 r, w = os.pipe() 3803 wio = self.io.open(w, **fdopen_kwargs) 3804 try: 3805 signal.alarm(1) 3806 # Either the reentrant call to wio.write() fails with RuntimeError, 3807 # or the signal handler raises ZeroDivisionError. 3808 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 3809 while 1: 3810 for i in range(100): 3811 wio.write(data) 3812 wio.flush() 3813 # Make sure the buffer doesn't fill up and block further writes 3814 os.read(r, len(data) * 100) 3815 exc = cm.exception 3816 if isinstance(exc, RuntimeError): 3817 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 3818 finally: 3819 wio.close() 3820 os.close(r) 3821 3822 def test_reentrant_write_buffered(self): 3823 self.check_reentrant_write(b"xy", mode="wb") 3824 3825 def test_reentrant_write_text(self): 3826 self.check_reentrant_write("xy", mode="w", encoding="ascii") 3827 3828 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 3829 """Check that a buffered read, when it gets interrupted (either 3830 returning a partial result or EINTR), properly invokes the signal 3831 handler and retries if the latter returned successfully.""" 3832 r, w = os.pipe() 3833 fdopen_kwargs["closefd"] = False 3834 def alarm_handler(sig, frame): 3835 os.write(w, b"bar") 3836 signal.signal(signal.SIGALRM, alarm_handler) 3837 try: 3838 rio = self.io.open(r, **fdopen_kwargs) 3839 os.write(w, b"foo") 3840 signal.alarm(1) 3841 # Expected behaviour: 3842 # - first raw read() returns partial b"foo" 3843 # - second raw read() returns EINTR 3844 # - third raw read() returns b"bar" 3845 self.assertEqual(decode(rio.read(6)), "foobar") 3846 finally: 3847 rio.close() 3848 os.close(w) 3849 os.close(r) 3850 3851 def test_interrupted_read_retry_buffered(self): 3852 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 3853 mode="rb") 3854 3855 def test_interrupted_read_retry_text(self): 3856 self.check_interrupted_read_retry(lambda x: x, 3857 mode="r") 3858 3859 @unittest.skipUnless(threading, 'Threading required for this test.') 3860 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 3861 """Check that a buffered write, when it gets interrupted (either 3862 returning a partial result or EINTR), properly invokes the signal 3863 handler and retries if the latter returned successfully.""" 3864 select = support.import_module("select") 3865 3866 # A quantity that exceeds the buffer size of an anonymous pipe's 3867 # write end. 3868 N = support.PIPE_MAX_SIZE 3869 r, w = os.pipe() 3870 fdopen_kwargs["closefd"] = False 3871 3872 # We need a separate thread to read from the pipe and allow the 3873 # write() to finish. This thread is started after the SIGALRM is 3874 # received (forcing a first EINTR in write()). 3875 read_results = [] 3876 write_finished = False 3877 error = None 3878 def _read(): 3879 try: 3880 while not write_finished: 3881 while r in select.select([r], [], [], 1.0)[0]: 3882 s = os.read(r, 1024) 3883 read_results.append(s) 3884 except BaseException as exc: 3885 nonlocal error 3886 error = exc 3887 t = threading.Thread(target=_read) 3888 t.daemon = True 3889 def alarm1(sig, frame): 3890 signal.signal(signal.SIGALRM, alarm2) 3891 signal.alarm(1) 3892 def alarm2(sig, frame): 3893 t.start() 3894 3895 large_data = item * N 3896 signal.signal(signal.SIGALRM, alarm1) 3897 try: 3898 wio = self.io.open(w, **fdopen_kwargs) 3899 signal.alarm(1) 3900 # Expected behaviour: 3901 # - first raw write() is partial (because of the limited pipe buffer 3902 # and the first alarm) 3903 # - second raw write() returns EINTR (because of the second alarm) 3904 # - subsequent write()s are successful (either partial or complete) 3905 written = wio.write(large_data) 3906 self.assertEqual(N, written) 3907 3908 wio.flush() 3909 write_finished = True 3910 t.join() 3911 3912 self.assertIsNone(error) 3913 self.assertEqual(N, sum(len(x) for x in read_results)) 3914 finally: 3915 write_finished = True 3916 os.close(w) 3917 os.close(r) 3918 # This is deliberate. If we didn't close the file descriptor 3919 # before closing wio, wio would try to flush its internal 3920 # buffer, and could block (in case of failure). 3921 try: 3922 wio.close() 3923 except OSError as e: 3924 if e.errno != errno.EBADF: 3925 raise 3926 3927 def test_interrupted_write_retry_buffered(self): 3928 self.check_interrupted_write_retry(b"x", mode="wb") 3929 3930 def test_interrupted_write_retry_text(self): 3931 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 3932 3933 3934 class CSignalsTest(SignalsTest): 3935 io = io 3936 3937 class PySignalsTest(SignalsTest): 3938 io = pyio 3939 3940 # Handling reentrancy issues would slow down _pyio even more, so the 3941 # tests are disabled. 3942 test_reentrant_write_buffered = None 3943 test_reentrant_write_text = None 3944 3945 3946 def load_tests(*args): 3947 tests = (CIOTest, PyIOTest, APIMismatchTest, 3948 CBufferedReaderTest, PyBufferedReaderTest, 3949 CBufferedWriterTest, PyBufferedWriterTest, 3950 CBufferedRWPairTest, PyBufferedRWPairTest, 3951 CBufferedRandomTest, PyBufferedRandomTest, 3952 StatefulIncrementalDecoderTest, 3953 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 3954 CTextIOWrapperTest, PyTextIOWrapperTest, 3955 CMiscIOTest, PyMiscIOTest, 3956 CSignalsTest, PySignalsTest, 3957 ) 3958 3959 # Put the namespaces of the IO module we are testing and some useful mock 3960 # classes in the __dict__ of each test. 3961 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 3962 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead) 3963 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 3964 c_io_ns = {name : getattr(io, name) for name in all_members} 3965 py_io_ns = {name : getattr(pyio, name) for name in all_members} 3966 globs = globals() 3967 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 3968 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 3969 # Avoid turning open into a bound method. 3970 py_io_ns["open"] = pyio.OpenWrapper 3971 for test in tests: 3972 if test.__name__.startswith("C"): 3973 for name, obj in c_io_ns.items(): 3974 setattr(test, name, obj) 3975 elif test.__name__.startswith("Py"): 3976 for name, obj in py_io_ns.items(): 3977 setattr(test, name, obj) 3978 3979 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests]) 3980 return suite 3981 3982 if __name__ == "__main__": 3983 unittest.main() 3984