1 """Unit tests for the io module.""" 2 3 # Tests of io are scattered over the test suite: 4 # * test_bufio - tests file buffering 5 # * test_memoryio - tests BytesIO and StringIO 6 # * test_fileio - tests FileIO 7 # * test_file - tests the file interface 8 # * test_io - tests everything else in the io module 9 # * test_univnewlines - tests universal newline support 10 # * test_largefile - tests operations on a file greater than 2**32 bytes 11 # (only enabled with -ulargefile) 12 13 ################################################################################ 14 # ATTENTION TEST WRITERS!!! 15 ################################################################################ 16 # When writing tests for io, it's important to test both the C and Python 17 # implementations. This is usually done by writing a base test that refers to 18 # the type it is testing as a attribute. Then it provides custom subclasses to 19 # test both implementations. This file has lots of examples. 20 ################################################################################ 21 22 from __future__ import print_function 23 from __future__ import unicode_literals 24 25 import os 26 import sys 27 import time 28 import array 29 import random 30 import unittest 31 import weakref 32 import abc 33 import signal 34 import errno 35 from itertools import cycle, count 36 from collections import deque 37 from test import test_support as support 38 39 import codecs 40 import io # C implementation of io 41 import _pyio as pyio # Python implementation of io 42 try: 43 import threading 44 except ImportError: 45 threading = None 46 47 __metaclass__ = type 48 bytes = support.py3k_bytes 49 50 def _default_chunk_size(): 51 """Get the default TextIOWrapper chunk size""" 52 with io.open(__file__, "r", encoding="latin1") as f: 53 return f._CHUNK_SIZE 54 55 56 class MockRawIOWithoutRead: 57 """A RawIO implementation without read(), so as to exercise the default 58 RawIO.read() which calls readinto().""" 59 60 def __init__(self, read_stack=()): 61 self._read_stack = list(read_stack) 62 self._write_stack = [] 63 self._reads = 0 64 self._extraneous_reads = 0 65 66 def write(self, b): 67 self._write_stack.append(bytes(b)) 68 return len(b) 69 70 def writable(self): 71 return True 72 73 def fileno(self): 74 return 42 75 76 def readable(self): 77 return True 78 79 def seekable(self): 80 return True 81 82 def seek(self, pos, whence): 83 return 0 # wrong but we gotta return something 84 85 def tell(self): 86 return 0 # same comment as above 87 88 def readinto(self, buf): 89 self._reads += 1 90 max_len = len(buf) 91 try: 92 data = self._read_stack[0] 93 except IndexError: 94 self._extraneous_reads += 1 95 return 0 96 if data is None: 97 del self._read_stack[0] 98 return None 99 n = len(data) 100 if len(data) <= max_len: 101 del self._read_stack[0] 102 buf[:n] = data 103 return n 104 else: 105 buf[:] = data[:max_len] 106 self._read_stack[0] = data[max_len:] 107 return max_len 108 109 def truncate(self, pos=None): 110 return pos 111 112 class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 113 pass 114 115 class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 116 pass 117 118 119 class MockRawIO(MockRawIOWithoutRead): 120 121 def read(self, n=None): 122 self._reads += 1 123 try: 124 return self._read_stack.pop(0) 125 except: 126 self._extraneous_reads += 1 127 return b"" 128 129 class CMockRawIO(MockRawIO, io.RawIOBase): 130 pass 131 132 class PyMockRawIO(MockRawIO, pyio.RawIOBase): 133 pass 134 135 136 class MisbehavedRawIO(MockRawIO): 137 def write(self, b): 138 return MockRawIO.write(self, b) * 2 139 140 def read(self, n=None): 141 return MockRawIO.read(self, n) * 2 142 143 def seek(self, pos, whence): 144 return -123 145 146 def tell(self): 147 return -456 148 149 def readinto(self, buf): 150 MockRawIO.readinto(self, buf) 151 return len(buf) * 5 152 153 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 154 pass 155 156 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 157 pass 158 159 160 class CloseFailureIO(MockRawIO): 161 closed = 0 162 163 def close(self): 164 if not self.closed: 165 self.closed = 1 166 raise IOError 167 168 class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 169 pass 170 171 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 172 pass 173 174 175 class MockFileIO: 176 177 def __init__(self, data): 178 self.read_history = [] 179 super(MockFileIO, self).__init__(data) 180 181 def read(self, n=None): 182 res = super(MockFileIO, self).read(n) 183 self.read_history.append(None if res is None else len(res)) 184 return res 185 186 def readinto(self, b): 187 res = super(MockFileIO, self).readinto(b) 188 self.read_history.append(res) 189 return res 190 191 class CMockFileIO(MockFileIO, io.BytesIO): 192 pass 193 194 class PyMockFileIO(MockFileIO, pyio.BytesIO): 195 pass 196 197 198 class MockNonBlockWriterIO: 199 200 def __init__(self): 201 self._write_stack = [] 202 self._blocker_char = None 203 204 def pop_written(self): 205 s = b"".join(self._write_stack) 206 self._write_stack[:] = [] 207 return s 208 209 def block_on(self, char): 210 """Block when a given char is encountered.""" 211 self._blocker_char = char 212 213 def readable(self): 214 return True 215 216 def seekable(self): 217 return True 218 219 def writable(self): 220 return True 221 222 def write(self, b): 223 b = bytes(b) 224 n = -1 225 if self._blocker_char: 226 try: 227 n = b.index(self._blocker_char) 228 except ValueError: 229 pass 230 else: 231 self._blocker_char = None 232 self._write_stack.append(b[:n]) 233 raise self.BlockingIOError(0, "test blocking", n) 234 self._write_stack.append(b) 235 return len(b) 236 237 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 238 BlockingIOError = io.BlockingIOError 239 240 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 241 BlockingIOError = pyio.BlockingIOError 242 243 244 class IOTest(unittest.TestCase): 245 246 def setUp(self): 247 support.unlink(support.TESTFN) 248 249 def tearDown(self): 250 support.unlink(support.TESTFN) 251 252 def write_ops(self, f): 253 self.assertEqual(f.write(b"blah."), 5) 254 f.truncate(0) 255 self.assertEqual(f.tell(), 5) 256 f.seek(0) 257 258 self.assertEqual(f.write(b"blah."), 5) 259 self.assertEqual(f.seek(0), 0) 260 self.assertEqual(f.write(b"Hello."), 6) 261 self.assertEqual(f.tell(), 6) 262 self.assertEqual(f.seek(-1, 1), 5) 263 self.assertEqual(f.tell(), 5) 264 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9) 265 self.assertEqual(f.seek(0), 0) 266 self.assertEqual(f.write(b"h"), 1) 267 self.assertEqual(f.seek(-1, 2), 13) 268 self.assertEqual(f.tell(), 13) 269 270 self.assertEqual(f.truncate(12), 12) 271 self.assertEqual(f.tell(), 13) 272 self.assertRaises(TypeError, f.seek, 0.0) 273 274 def read_ops(self, f, buffered=False): 275 data = f.read(5) 276 self.assertEqual(data, b"hello") 277 data = bytearray(data) 278 self.assertEqual(f.readinto(data), 5) 279 self.assertEqual(data, b" worl") 280 self.assertEqual(f.readinto(data), 2) 281 self.assertEqual(len(data), 5) 282 self.assertEqual(data[:2], b"d\n") 283 self.assertEqual(f.seek(0), 0) 284 self.assertEqual(f.read(20), b"hello world\n") 285 self.assertEqual(f.read(1), b"") 286 self.assertEqual(f.readinto(bytearray(b"x")), 0) 287 self.assertEqual(f.seek(-6, 2), 6) 288 self.assertEqual(f.read(5), b"world") 289 self.assertEqual(f.read(0), b"") 290 self.assertEqual(f.readinto(bytearray()), 0) 291 self.assertEqual(f.seek(-6, 1), 5) 292 self.assertEqual(f.read(5), b" worl") 293 self.assertEqual(f.tell(), 10) 294 self.assertRaises(TypeError, f.seek, 0.0) 295 if buffered: 296 f.seek(0) 297 self.assertEqual(f.read(), b"hello world\n") 298 f.seek(6) 299 self.assertEqual(f.read(), b"world\n") 300 self.assertEqual(f.read(), b"") 301 302 LARGE = 2**31 303 304 def large_file_ops(self, f): 305 assert f.readable() 306 assert f.writable() 307 self.assertEqual(f.seek(self.LARGE), self.LARGE) 308 self.assertEqual(f.tell(), self.LARGE) 309 self.assertEqual(f.write(b"xxx"), 3) 310 self.assertEqual(f.tell(), self.LARGE + 3) 311 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 312 self.assertEqual(f.truncate(), self.LARGE + 2) 313 self.assertEqual(f.tell(), self.LARGE + 2) 314 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 315 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 316 self.assertEqual(f.tell(), self.LARGE + 2) 317 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 318 self.assertEqual(f.seek(-1, 2), self.LARGE) 319 self.assertEqual(f.read(2), b"x") 320 321 def test_invalid_operations(self): 322 # Try writing on a file opened in read mode and vice-versa. 323 for mode in ("w", "wb"): 324 with self.open(support.TESTFN, mode) as fp: 325 self.assertRaises(IOError, fp.read) 326 self.assertRaises(IOError, fp.readline) 327 with self.open(support.TESTFN, "rb") as fp: 328 self.assertRaises(IOError, fp.write, b"blah") 329 self.assertRaises(IOError, fp.writelines, [b"blah\n"]) 330 with self.open(support.TESTFN, "r") as fp: 331 self.assertRaises(IOError, fp.write, "blah") 332 self.assertRaises(IOError, fp.writelines, ["blah\n"]) 333 334 def test_raw_file_io(self): 335 with self.open(support.TESTFN, "wb", buffering=0) as f: 336 self.assertEqual(f.readable(), False) 337 self.assertEqual(f.writable(), True) 338 self.assertEqual(f.seekable(), True) 339 self.write_ops(f) 340 with self.open(support.TESTFN, "rb", buffering=0) as f: 341 self.assertEqual(f.readable(), True) 342 self.assertEqual(f.writable(), False) 343 self.assertEqual(f.seekable(), True) 344 self.read_ops(f) 345 346 def test_buffered_file_io(self): 347 with self.open(support.TESTFN, "wb") as f: 348 self.assertEqual(f.readable(), False) 349 self.assertEqual(f.writable(), True) 350 self.assertEqual(f.seekable(), True) 351 self.write_ops(f) 352 with self.open(support.TESTFN, "rb") as f: 353 self.assertEqual(f.readable(), True) 354 self.assertEqual(f.writable(), False) 355 self.assertEqual(f.seekable(), True) 356 self.read_ops(f, True) 357 358 def test_readline(self): 359 with self.open(support.TESTFN, "wb") as f: 360 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 361 with self.open(support.TESTFN, "rb") as f: 362 self.assertEqual(f.readline(), b"abc\n") 363 self.assertEqual(f.readline(10), b"def\n") 364 self.assertEqual(f.readline(2), b"xy") 365 self.assertEqual(f.readline(4), b"zzy\n") 366 self.assertEqual(f.readline(), b"foo\x00bar\n") 367 self.assertEqual(f.readline(None), b"another line") 368 self.assertRaises(TypeError, f.readline, 5.3) 369 with self.open(support.TESTFN, "r") as f: 370 self.assertRaises(TypeError, f.readline, 5.3) 371 372 def test_raw_bytes_io(self): 373 f = self.BytesIO() 374 self.write_ops(f) 375 data = f.getvalue() 376 self.assertEqual(data, b"hello world\n") 377 f = self.BytesIO(data) 378 self.read_ops(f, True) 379 380 def test_large_file_ops(self): 381 # On Windows and Mac OSX this test comsumes large resources; It takes 382 # a long time to build the >2GB file and takes >2GB of disk space 383 # therefore the resource must be enabled to run this test. 384 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 385 if not support.is_resource_enabled("largefile"): 386 print("\nTesting large file ops skipped on %s." % sys.platform, 387 file=sys.stderr) 388 print("It requires %d bytes and a long time." % self.LARGE, 389 file=sys.stderr) 390 print("Use 'regrtest.py -u largefile test_io' to run it.", 391 file=sys.stderr) 392 return 393 with self.open(support.TESTFN, "w+b", 0) as f: 394 self.large_file_ops(f) 395 with self.open(support.TESTFN, "w+b") as f: 396 self.large_file_ops(f) 397 398 def test_with_open(self): 399 for bufsize in (0, 1, 100): 400 f = None 401 with self.open(support.TESTFN, "wb", bufsize) as f: 402 f.write(b"xxx") 403 self.assertEqual(f.closed, True) 404 f = None 405 try: 406 with self.open(support.TESTFN, "wb", bufsize) as f: 407 1 // 0 408 except ZeroDivisionError: 409 self.assertEqual(f.closed, True) 410 else: 411 self.fail("1 // 0 didn't raise an exception") 412 413 # issue 5008 414 def test_append_mode_tell(self): 415 with self.open(support.TESTFN, "wb") as f: 416 f.write(b"xxx") 417 with self.open(support.TESTFN, "ab", buffering=0) as f: 418 self.assertEqual(f.tell(), 3) 419 with self.open(support.TESTFN, "ab") as f: 420 self.assertEqual(f.tell(), 3) 421 with self.open(support.TESTFN, "a") as f: 422 self.assertTrue(f.tell() > 0) 423 424 def test_destructor(self): 425 record = [] 426 class MyFileIO(self.FileIO): 427 def __del__(self): 428 record.append(1) 429 try: 430 f = super(MyFileIO, self).__del__ 431 except AttributeError: 432 pass 433 else: 434 f() 435 def close(self): 436 record.append(2) 437 super(MyFileIO, self).close() 438 def flush(self): 439 record.append(3) 440 super(MyFileIO, self).flush() 441 f = MyFileIO(support.TESTFN, "wb") 442 f.write(b"xxx") 443 del f 444 support.gc_collect() 445 self.assertEqual(record, [1, 2, 3]) 446 with self.open(support.TESTFN, "rb") as f: 447 self.assertEqual(f.read(), b"xxx") 448 449 def _check_base_destructor(self, base): 450 record = [] 451 class MyIO(base): 452 def __init__(self): 453 # This exercises the availability of attributes on object 454 # destruction. 455 # (in the C version, close() is called by the tp_dealloc 456 # function, not by __del__) 457 self.on_del = 1 458 self.on_close = 2 459 self.on_flush = 3 460 def __del__(self): 461 record.append(self.on_del) 462 try: 463 f = super(MyIO, self).__del__ 464 except AttributeError: 465 pass 466 else: 467 f() 468 def close(self): 469 record.append(self.on_close) 470 super(MyIO, self).close() 471 def flush(self): 472 record.append(self.on_flush) 473 super(MyIO, self).flush() 474 f = MyIO() 475 del f 476 support.gc_collect() 477 self.assertEqual(record, [1, 2, 3]) 478 479 def test_IOBase_destructor(self): 480 self._check_base_destructor(self.IOBase) 481 482 def test_RawIOBase_destructor(self): 483 self._check_base_destructor(self.RawIOBase) 484 485 def test_BufferedIOBase_destructor(self): 486 self._check_base_destructor(self.BufferedIOBase) 487 488 def test_TextIOBase_destructor(self): 489 self._check_base_destructor(self.TextIOBase) 490 491 def test_close_flushes(self): 492 with self.open(support.TESTFN, "wb") as f: 493 f.write(b"xxx") 494 with self.open(support.TESTFN, "rb") as f: 495 self.assertEqual(f.read(), b"xxx") 496 497 def test_array_writes(self): 498 a = array.array(b'i', range(10)) 499 n = len(a.tostring()) 500 with self.open(support.TESTFN, "wb", 0) as f: 501 self.assertEqual(f.write(a), n) 502 with self.open(support.TESTFN, "wb") as f: 503 self.assertEqual(f.write(a), n) 504 505 def test_closefd(self): 506 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 507 closefd=False) 508 509 def test_read_closed(self): 510 with self.open(support.TESTFN, "w") as f: 511 f.write("egg\n") 512 with self.open(support.TESTFN, "r") as f: 513 file = self.open(f.fileno(), "r", closefd=False) 514 self.assertEqual(file.read(), "egg\n") 515 file.seek(0) 516 file.close() 517 self.assertRaises(ValueError, file.read) 518 519 def test_no_closefd_with_filename(self): 520 # can't use closefd in combination with a file name 521 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 522 523 def test_closefd_attr(self): 524 with self.open(support.TESTFN, "wb") as f: 525 f.write(b"egg\n") 526 with self.open(support.TESTFN, "r") as f: 527 self.assertEqual(f.buffer.raw.closefd, True) 528 file = self.open(f.fileno(), "r", closefd=False) 529 self.assertEqual(file.buffer.raw.closefd, False) 530 531 def test_garbage_collection(self): 532 # FileIO objects are collected, and collecting them flushes 533 # all data to disk. 534 f = self.FileIO(support.TESTFN, "wb") 535 f.write(b"abcxxx") 536 f.f = f 537 wr = weakref.ref(f) 538 del f 539 support.gc_collect() 540 self.assertTrue(wr() is None, wr) 541 with self.open(support.TESTFN, "rb") as f: 542 self.assertEqual(f.read(), b"abcxxx") 543 544 def test_unbounded_file(self): 545 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 546 zero = "/dev/zero" 547 if not os.path.exists(zero): 548 self.skipTest("{0} does not exist".format(zero)) 549 if sys.maxsize > 0x7FFFFFFF: 550 self.skipTest("test can only run in a 32-bit address space") 551 if support.real_max_memuse < support._2G: 552 self.skipTest("test requires at least 2GB of memory") 553 with self.open(zero, "rb", buffering=0) as f: 554 self.assertRaises(OverflowError, f.read) 555 with self.open(zero, "rb") as f: 556 self.assertRaises(OverflowError, f.read) 557 with self.open(zero, "r") as f: 558 self.assertRaises(OverflowError, f.read) 559 560 def test_flush_error_on_close(self): 561 f = self.open(support.TESTFN, "wb", buffering=0) 562 def bad_flush(): 563 raise IOError() 564 f.flush = bad_flush 565 self.assertRaises(IOError, f.close) # exception not swallowed 566 567 def test_multi_close(self): 568 f = self.open(support.TESTFN, "wb", buffering=0) 569 f.close() 570 f.close() 571 f.close() 572 self.assertRaises(ValueError, f.flush) 573 574 def test_RawIOBase_read(self): 575 # Exercise the default RawIOBase.read() implementation (which calls 576 # readinto() internally). 577 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 578 self.assertEqual(rawio.read(2), b"ab") 579 self.assertEqual(rawio.read(2), b"c") 580 self.assertEqual(rawio.read(2), b"d") 581 self.assertEqual(rawio.read(2), None) 582 self.assertEqual(rawio.read(2), b"ef") 583 self.assertEqual(rawio.read(2), b"g") 584 self.assertEqual(rawio.read(2), None) 585 self.assertEqual(rawio.read(2), b"") 586 587 class CIOTest(IOTest): 588 pass 589 590 class PyIOTest(IOTest): 591 test_array_writes = unittest.skip( 592 "len(array.array) returns number of elements rather than bytelength" 593 )(IOTest.test_array_writes) 594 595 596 class CommonBufferedTests: 597 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 598 599 def test_detach(self): 600 raw = self.MockRawIO() 601 buf = self.tp(raw) 602 self.assertIs(buf.detach(), raw) 603 self.assertRaises(ValueError, buf.detach) 604 605 def test_fileno(self): 606 rawio = self.MockRawIO() 607 bufio = self.tp(rawio) 608 609 self.assertEqual(42, bufio.fileno()) 610 611 def test_no_fileno(self): 612 # XXX will we always have fileno() function? If so, kill 613 # this test. Else, write it. 614 pass 615 616 def test_invalid_args(self): 617 rawio = self.MockRawIO() 618 bufio = self.tp(rawio) 619 # Invalid whence 620 self.assertRaises(ValueError, bufio.seek, 0, -1) 621 self.assertRaises(ValueError, bufio.seek, 0, 3) 622 623 def test_override_destructor(self): 624 tp = self.tp 625 record = [] 626 class MyBufferedIO(tp): 627 def __del__(self): 628 record.append(1) 629 try: 630 f = super(MyBufferedIO, self).__del__ 631 except AttributeError: 632 pass 633 else: 634 f() 635 def close(self): 636 record.append(2) 637 super(MyBufferedIO, self).close() 638 def flush(self): 639 record.append(3) 640 super(MyBufferedIO, self).flush() 641 rawio = self.MockRawIO() 642 bufio = MyBufferedIO(rawio) 643 writable = bufio.writable() 644 del bufio 645 support.gc_collect() 646 if writable: 647 self.assertEqual(record, [1, 2, 3]) 648 else: 649 self.assertEqual(record, [1, 2]) 650 651 def test_context_manager(self): 652 # Test usability as a context manager 653 rawio = self.MockRawIO() 654 bufio = self.tp(rawio) 655 def _with(): 656 with bufio: 657 pass 658 _with() 659 # bufio should now be closed, and using it a second time should raise 660 # a ValueError. 661 self.assertRaises(ValueError, _with) 662 663 def test_error_through_destructor(self): 664 # Test that the exception state is not modified by a destructor, 665 # even if close() fails. 666 rawio = self.CloseFailureIO() 667 def f(): 668 self.tp(rawio).xyzzy 669 with support.captured_output("stderr") as s: 670 self.assertRaises(AttributeError, f) 671 s = s.getvalue().strip() 672 if s: 673 # The destructor *may* have printed an unraisable error, check it 674 self.assertEqual(len(s.splitlines()), 1) 675 self.assertTrue(s.startswith("Exception IOError: "), s) 676 self.assertTrue(s.endswith(" ignored"), s) 677 678 def test_repr(self): 679 raw = self.MockRawIO() 680 b = self.tp(raw) 681 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) 682 self.assertEqual(repr(b), "<%s>" % clsname) 683 raw.name = "dummy" 684 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname) 685 raw.name = b"dummy" 686 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) 687 688 def test_flush_error_on_close(self): 689 raw = self.MockRawIO() 690 def bad_flush(): 691 raise IOError() 692 raw.flush = bad_flush 693 b = self.tp(raw) 694 self.assertRaises(IOError, b.close) # exception not swallowed 695 696 def test_multi_close(self): 697 raw = self.MockRawIO() 698 b = self.tp(raw) 699 b.close() 700 b.close() 701 b.close() 702 self.assertRaises(ValueError, b.flush) 703 704 def test_readonly_attributes(self): 705 raw = self.MockRawIO() 706 buf = self.tp(raw) 707 x = self.MockRawIO() 708 with self.assertRaises((AttributeError, TypeError)): 709 buf.raw = x 710 711 712 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 713 read_mode = "rb" 714 715 def test_constructor(self): 716 rawio = self.MockRawIO([b"abc"]) 717 bufio = self.tp(rawio) 718 bufio.__init__(rawio) 719 bufio.__init__(rawio, buffer_size=1024) 720 bufio.__init__(rawio, buffer_size=16) 721 self.assertEqual(b"abc", bufio.read()) 722 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 723 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 724 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 725 rawio = self.MockRawIO([b"abc"]) 726 bufio.__init__(rawio) 727 self.assertEqual(b"abc", bufio.read()) 728 729 def test_read(self): 730 for arg in (None, 7): 731 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 732 bufio = self.tp(rawio) 733 self.assertEqual(b"abcdefg", bufio.read(arg)) 734 # Invalid args 735 self.assertRaises(ValueError, bufio.read, -2) 736 737 def test_read1(self): 738 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 739 bufio = self.tp(rawio) 740 self.assertEqual(b"a", bufio.read(1)) 741 self.assertEqual(b"b", bufio.read1(1)) 742 self.assertEqual(rawio._reads, 1) 743 self.assertEqual(b"c", bufio.read1(100)) 744 self.assertEqual(rawio._reads, 1) 745 self.assertEqual(b"d", bufio.read1(100)) 746 self.assertEqual(rawio._reads, 2) 747 self.assertEqual(b"efg", bufio.read1(100)) 748 self.assertEqual(rawio._reads, 3) 749 self.assertEqual(b"", bufio.read1(100)) 750 self.assertEqual(rawio._reads, 4) 751 # Invalid args 752 self.assertRaises(ValueError, bufio.read1, -1) 753 754 def test_readinto(self): 755 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 756 bufio = self.tp(rawio) 757 b = bytearray(2) 758 self.assertEqual(bufio.readinto(b), 2) 759 self.assertEqual(b, b"ab") 760 self.assertEqual(bufio.readinto(b), 2) 761 self.assertEqual(b, b"cd") 762 self.assertEqual(bufio.readinto(b), 2) 763 self.assertEqual(b, b"ef") 764 self.assertEqual(bufio.readinto(b), 1) 765 self.assertEqual(b, b"gf") 766 self.assertEqual(bufio.readinto(b), 0) 767 self.assertEqual(b, b"gf") 768 769 def test_readlines(self): 770 def bufio(): 771 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 772 return self.tp(rawio) 773 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 774 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 775 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 776 777 def test_buffering(self): 778 data = b"abcdefghi" 779 dlen = len(data) 780 781 tests = [ 782 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 783 [ 100, [ 3, 3, 3], [ dlen ] ], 784 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 785 ] 786 787 for bufsize, buf_read_sizes, raw_read_sizes in tests: 788 rawio = self.MockFileIO(data) 789 bufio = self.tp(rawio, buffer_size=bufsize) 790 pos = 0 791 for nbytes in buf_read_sizes: 792 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 793 pos += nbytes 794 # this is mildly implementation-dependent 795 self.assertEqual(rawio.read_history, raw_read_sizes) 796 797 def test_read_non_blocking(self): 798 # Inject some None's in there to simulate EWOULDBLOCK 799 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 800 bufio = self.tp(rawio) 801 self.assertEqual(b"abcd", bufio.read(6)) 802 self.assertEqual(b"e", bufio.read(1)) 803 self.assertEqual(b"fg", bufio.read()) 804 self.assertEqual(b"", bufio.peek(1)) 805 self.assertIsNone(bufio.read()) 806 self.assertEqual(b"", bufio.read()) 807 808 rawio = self.MockRawIO((b"a", None, None)) 809 self.assertEqual(b"a", rawio.readall()) 810 self.assertIsNone(rawio.readall()) 811 812 def test_read_past_eof(self): 813 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 814 bufio = self.tp(rawio) 815 816 self.assertEqual(b"abcdefg", bufio.read(9000)) 817 818 def test_read_all(self): 819 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 820 bufio = self.tp(rawio) 821 822 self.assertEqual(b"abcdefg", bufio.read()) 823 824 @unittest.skipUnless(threading, 'Threading required for this test.') 825 @support.requires_resource('cpu') 826 def test_threads(self): 827 try: 828 # Write out many bytes with exactly the same number of 0's, 829 # 1's... 255's. This will help us check that concurrent reading 830 # doesn't duplicate or forget contents. 831 N = 1000 832 l = list(range(256)) * N 833 random.shuffle(l) 834 s = bytes(bytearray(l)) 835 with self.open(support.TESTFN, "wb") as f: 836 f.write(s) 837 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 838 bufio = self.tp(raw, 8) 839 errors = [] 840 results = [] 841 def f(): 842 try: 843 # Intra-buffer read then buffer-flushing read 844 for n in cycle([1, 19]): 845 s = bufio.read(n) 846 if not s: 847 break 848 # list.append() is atomic 849 results.append(s) 850 except Exception as e: 851 errors.append(e) 852 raise 853 threads = [threading.Thread(target=f) for x in range(20)] 854 for t in threads: 855 t.start() 856 time.sleep(0.02) # yield 857 for t in threads: 858 t.join() 859 self.assertFalse(errors, 860 "the following exceptions were caught: %r" % errors) 861 s = b''.join(results) 862 for i in range(256): 863 c = bytes(bytearray([i])) 864 self.assertEqual(s.count(c), N) 865 finally: 866 support.unlink(support.TESTFN) 867 868 def test_misbehaved_io(self): 869 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 870 bufio = self.tp(rawio) 871 self.assertRaises(IOError, bufio.seek, 0) 872 self.assertRaises(IOError, bufio.tell) 873 874 def test_no_extraneous_read(self): 875 # Issue #9550; when the raw IO object has satisfied the read request, 876 # we should not issue any additional reads, otherwise it may block 877 # (e.g. socket). 878 bufsize = 16 879 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 880 rawio = self.MockRawIO([b"x" * n]) 881 bufio = self.tp(rawio, bufsize) 882 self.assertEqual(bufio.read(n), b"x" * n) 883 # Simple case: one raw read is enough to satisfy the request. 884 self.assertEqual(rawio._extraneous_reads, 0, 885 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 886 # A more complex case where two raw reads are needed to satisfy 887 # the request. 888 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 889 bufio = self.tp(rawio, bufsize) 890 self.assertEqual(bufio.read(n), b"x" * n) 891 self.assertEqual(rawio._extraneous_reads, 0, 892 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 893 894 895 class CBufferedReaderTest(BufferedReaderTest): 896 tp = io.BufferedReader 897 898 def test_constructor(self): 899 BufferedReaderTest.test_constructor(self) 900 # The allocation can succeed on 32-bit builds, e.g. with more 901 # than 2GB RAM and a 64-bit kernel. 902 if sys.maxsize > 0x7FFFFFFF: 903 rawio = self.MockRawIO() 904 bufio = self.tp(rawio) 905 self.assertRaises((OverflowError, MemoryError, ValueError), 906 bufio.__init__, rawio, sys.maxsize) 907 908 def test_initialization(self): 909 rawio = self.MockRawIO([b"abc"]) 910 bufio = self.tp(rawio) 911 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 912 self.assertRaises(ValueError, bufio.read) 913 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 914 self.assertRaises(ValueError, bufio.read) 915 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 916 self.assertRaises(ValueError, bufio.read) 917 918 def test_misbehaved_io_read(self): 919 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 920 bufio = self.tp(rawio) 921 # _pyio.BufferedReader seems to implement reading different, so that 922 # checking this is not so easy. 923 self.assertRaises(IOError, bufio.read, 10) 924 925 def test_garbage_collection(self): 926 # C BufferedReader objects are collected. 927 # The Python version has __del__, so it ends into gc.garbage instead 928 rawio = self.FileIO(support.TESTFN, "w+b") 929 f = self.tp(rawio) 930 f.f = f 931 wr = weakref.ref(f) 932 del f 933 support.gc_collect() 934 self.assertTrue(wr() is None, wr) 935 936 class PyBufferedReaderTest(BufferedReaderTest): 937 tp = pyio.BufferedReader 938 939 940 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 941 write_mode = "wb" 942 943 def test_constructor(self): 944 rawio = self.MockRawIO() 945 bufio = self.tp(rawio) 946 bufio.__init__(rawio) 947 bufio.__init__(rawio, buffer_size=1024) 948 bufio.__init__(rawio, buffer_size=16) 949 self.assertEqual(3, bufio.write(b"abc")) 950 bufio.flush() 951 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 952 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 953 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 954 bufio.__init__(rawio) 955 self.assertEqual(3, bufio.write(b"ghi")) 956 bufio.flush() 957 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 958 959 def test_detach_flush(self): 960 raw = self.MockRawIO() 961 buf = self.tp(raw) 962 buf.write(b"howdy!") 963 self.assertFalse(raw._write_stack) 964 buf.detach() 965 self.assertEqual(raw._write_stack, [b"howdy!"]) 966 967 def test_write(self): 968 # Write to the buffered IO but don't overflow the buffer. 969 writer = self.MockRawIO() 970 bufio = self.tp(writer, 8) 971 bufio.write(b"abc") 972 self.assertFalse(writer._write_stack) 973 974 def test_write_overflow(self): 975 writer = self.MockRawIO() 976 bufio = self.tp(writer, 8) 977 contents = b"abcdefghijklmnop" 978 for n in range(0, len(contents), 3): 979 bufio.write(contents[n:n+3]) 980 flushed = b"".join(writer._write_stack) 981 # At least (total - 8) bytes were implicitly flushed, perhaps more 982 # depending on the implementation. 983 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 984 985 def check_writes(self, intermediate_func): 986 # Lots of writes, test the flushed output is as expected. 987 contents = bytes(range(256)) * 1000 988 n = 0 989 writer = self.MockRawIO() 990 bufio = self.tp(writer, 13) 991 # Generator of write sizes: repeat each N 15 times then proceed to N+1 992 def gen_sizes(): 993 for size in count(1): 994 for i in range(15): 995 yield size 996 sizes = gen_sizes() 997 while n < len(contents): 998 size = min(next(sizes), len(contents) - n) 999 self.assertEqual(bufio.write(contents[n:n+size]), size) 1000 intermediate_func(bufio) 1001 n += size 1002 bufio.flush() 1003 self.assertEqual(contents, 1004 b"".join(writer._write_stack)) 1005 1006 def test_writes(self): 1007 self.check_writes(lambda bufio: None) 1008 1009 def test_writes_and_flushes(self): 1010 self.check_writes(lambda bufio: bufio.flush()) 1011 1012 def test_writes_and_seeks(self): 1013 def _seekabs(bufio): 1014 pos = bufio.tell() 1015 bufio.seek(pos + 1, 0) 1016 bufio.seek(pos - 1, 0) 1017 bufio.seek(pos, 0) 1018 self.check_writes(_seekabs) 1019 def _seekrel(bufio): 1020 pos = bufio.seek(0, 1) 1021 bufio.seek(+1, 1) 1022 bufio.seek(-1, 1) 1023 bufio.seek(pos, 0) 1024 self.check_writes(_seekrel) 1025 1026 def test_writes_and_truncates(self): 1027 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1028 1029 def test_write_non_blocking(self): 1030 raw = self.MockNonBlockWriterIO() 1031 bufio = self.tp(raw, 8) 1032 1033 self.assertEqual(bufio.write(b"abcd"), 4) 1034 self.assertEqual(bufio.write(b"efghi"), 5) 1035 # 1 byte will be written, the rest will be buffered 1036 raw.block_on(b"k") 1037 self.assertEqual(bufio.write(b"jklmn"), 5) 1038 1039 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1040 raw.block_on(b"0") 1041 try: 1042 bufio.write(b"opqrwxyz0123456789") 1043 except self.BlockingIOError as e: 1044 written = e.characters_written 1045 else: 1046 self.fail("BlockingIOError should have been raised") 1047 self.assertEqual(written, 16) 1048 self.assertEqual(raw.pop_written(), 1049 b"abcdefghijklmnopqrwxyz") 1050 1051 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1052 s = raw.pop_written() 1053 # Previously buffered bytes were flushed 1054 self.assertTrue(s.startswith(b"01234567A"), s) 1055 1056 def test_write_and_rewind(self): 1057 raw = io.BytesIO() 1058 bufio = self.tp(raw, 4) 1059 self.assertEqual(bufio.write(b"abcdef"), 6) 1060 self.assertEqual(bufio.tell(), 6) 1061 bufio.seek(0, 0) 1062 self.assertEqual(bufio.write(b"XY"), 2) 1063 bufio.seek(6, 0) 1064 self.assertEqual(raw.getvalue(), b"XYcdef") 1065 self.assertEqual(bufio.write(b"123456"), 6) 1066 bufio.flush() 1067 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1068 1069 def test_flush(self): 1070 writer = self.MockRawIO() 1071 bufio = self.tp(writer, 8) 1072 bufio.write(b"abc") 1073 bufio.flush() 1074 self.assertEqual(b"abc", writer._write_stack[0]) 1075 1076 def test_destructor(self): 1077 writer = self.MockRawIO() 1078 bufio = self.tp(writer, 8) 1079 bufio.write(b"abc") 1080 del bufio 1081 support.gc_collect() 1082 self.assertEqual(b"abc", writer._write_stack[0]) 1083 1084 def test_truncate(self): 1085 # Truncate implicitly flushes the buffer. 1086 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1087 bufio = self.tp(raw, 8) 1088 bufio.write(b"abcdef") 1089 self.assertEqual(bufio.truncate(3), 3) 1090 self.assertEqual(bufio.tell(), 6) 1091 with self.open(support.TESTFN, "rb", buffering=0) as f: 1092 self.assertEqual(f.read(), b"abc") 1093 1094 @unittest.skipUnless(threading, 'Threading required for this test.') 1095 @support.requires_resource('cpu') 1096 def test_threads(self): 1097 try: 1098 # Write out many bytes from many threads and test they were 1099 # all flushed. 1100 N = 1000 1101 contents = bytes(range(256)) * N 1102 sizes = cycle([1, 19]) 1103 n = 0 1104 queue = deque() 1105 while n < len(contents): 1106 size = next(sizes) 1107 queue.append(contents[n:n+size]) 1108 n += size 1109 del contents 1110 # We use a real file object because it allows us to 1111 # exercise situations where the GIL is released before 1112 # writing the buffer to the raw streams. This is in addition 1113 # to concurrency issues due to switching threads in the middle 1114 # of Python code. 1115 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1116 bufio = self.tp(raw, 8) 1117 errors = [] 1118 def f(): 1119 try: 1120 while True: 1121 try: 1122 s = queue.popleft() 1123 except IndexError: 1124 return 1125 bufio.write(s) 1126 except Exception as e: 1127 errors.append(e) 1128 raise 1129 threads = [threading.Thread(target=f) for x in range(20)] 1130 for t in threads: 1131 t.start() 1132 time.sleep(0.02) # yield 1133 for t in threads: 1134 t.join() 1135 self.assertFalse(errors, 1136 "the following exceptions were caught: %r" % errors) 1137 bufio.close() 1138 with self.open(support.TESTFN, "rb") as f: 1139 s = f.read() 1140 for i in range(256): 1141 self.assertEqual(s.count(bytes([i])), N) 1142 finally: 1143 support.unlink(support.TESTFN) 1144 1145 def test_misbehaved_io(self): 1146 rawio = self.MisbehavedRawIO() 1147 bufio = self.tp(rawio, 5) 1148 self.assertRaises(IOError, bufio.seek, 0) 1149 self.assertRaises(IOError, bufio.tell) 1150 self.assertRaises(IOError, bufio.write, b"abcdef") 1151 1152 def test_max_buffer_size_deprecation(self): 1153 with support.check_warnings(("max_buffer_size is deprecated", 1154 DeprecationWarning)): 1155 self.tp(self.MockRawIO(), 8, 12) 1156 1157 1158 class CBufferedWriterTest(BufferedWriterTest): 1159 tp = io.BufferedWriter 1160 1161 def test_constructor(self): 1162 BufferedWriterTest.test_constructor(self) 1163 # The allocation can succeed on 32-bit builds, e.g. with more 1164 # than 2GB RAM and a 64-bit kernel. 1165 if sys.maxsize > 0x7FFFFFFF: 1166 rawio = self.MockRawIO() 1167 bufio = self.tp(rawio) 1168 self.assertRaises((OverflowError, MemoryError, ValueError), 1169 bufio.__init__, rawio, sys.maxsize) 1170 1171 def test_initialization(self): 1172 rawio = self.MockRawIO() 1173 bufio = self.tp(rawio) 1174 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1175 self.assertRaises(ValueError, bufio.write, b"def") 1176 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1177 self.assertRaises(ValueError, bufio.write, b"def") 1178 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1179 self.assertRaises(ValueError, bufio.write, b"def") 1180 1181 def test_garbage_collection(self): 1182 # C BufferedWriter objects are collected, and collecting them flushes 1183 # all data to disk. 1184 # The Python version has __del__, so it ends into gc.garbage instead 1185 rawio = self.FileIO(support.TESTFN, "w+b") 1186 f = self.tp(rawio) 1187 f.write(b"123xxx") 1188 f.x = f 1189 wr = weakref.ref(f) 1190 del f 1191 support.gc_collect() 1192 self.assertTrue(wr() is None, wr) 1193 with self.open(support.TESTFN, "rb") as f: 1194 self.assertEqual(f.read(), b"123xxx") 1195 1196 1197 class PyBufferedWriterTest(BufferedWriterTest): 1198 tp = pyio.BufferedWriter 1199 1200 class BufferedRWPairTest(unittest.TestCase): 1201 1202 def test_constructor(self): 1203 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1204 self.assertFalse(pair.closed) 1205 1206 def test_detach(self): 1207 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1208 self.assertRaises(self.UnsupportedOperation, pair.detach) 1209 1210 def test_constructor_max_buffer_size_deprecation(self): 1211 with support.check_warnings(("max_buffer_size is deprecated", 1212 DeprecationWarning)): 1213 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1214 1215 def test_constructor_with_not_readable(self): 1216 class NotReadable(MockRawIO): 1217 def readable(self): 1218 return False 1219 1220 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO()) 1221 1222 def test_constructor_with_not_writeable(self): 1223 class NotWriteable(MockRawIO): 1224 def writable(self): 1225 return False 1226 1227 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable()) 1228 1229 def test_read(self): 1230 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1231 1232 self.assertEqual(pair.read(3), b"abc") 1233 self.assertEqual(pair.read(1), b"d") 1234 self.assertEqual(pair.read(), b"ef") 1235 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1236 self.assertEqual(pair.read(None), b"abc") 1237 1238 def test_readlines(self): 1239 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1240 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1241 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1242 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1243 1244 def test_read1(self): 1245 # .read1() is delegated to the underlying reader object, so this test 1246 # can be shallow. 1247 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1248 1249 self.assertEqual(pair.read1(3), b"abc") 1250 1251 def test_readinto(self): 1252 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1253 1254 data = bytearray(5) 1255 self.assertEqual(pair.readinto(data), 5) 1256 self.assertEqual(data, b"abcde") 1257 1258 def test_write(self): 1259 w = self.MockRawIO() 1260 pair = self.tp(self.MockRawIO(), w) 1261 1262 pair.write(b"abc") 1263 pair.flush() 1264 pair.write(b"def") 1265 pair.flush() 1266 self.assertEqual(w._write_stack, [b"abc", b"def"]) 1267 1268 def test_peek(self): 1269 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1270 1271 self.assertTrue(pair.peek(3).startswith(b"abc")) 1272 self.assertEqual(pair.read(3), b"abc") 1273 1274 def test_readable(self): 1275 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1276 self.assertTrue(pair.readable()) 1277 1278 def test_writeable(self): 1279 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1280 self.assertTrue(pair.writable()) 1281 1282 def test_seekable(self): 1283 # BufferedRWPairs are never seekable, even if their readers and writers 1284 # are. 1285 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1286 self.assertFalse(pair.seekable()) 1287 1288 # .flush() is delegated to the underlying writer object and has been 1289 # tested in the test_write method. 1290 1291 def test_close_and_closed(self): 1292 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1293 self.assertFalse(pair.closed) 1294 pair.close() 1295 self.assertTrue(pair.closed) 1296 1297 def test_isatty(self): 1298 class SelectableIsAtty(MockRawIO): 1299 def __init__(self, isatty): 1300 MockRawIO.__init__(self) 1301 self._isatty = isatty 1302 1303 def isatty(self): 1304 return self._isatty 1305 1306 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 1307 self.assertFalse(pair.isatty()) 1308 1309 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 1310 self.assertTrue(pair.isatty()) 1311 1312 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 1313 self.assertTrue(pair.isatty()) 1314 1315 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 1316 self.assertTrue(pair.isatty()) 1317 1318 class CBufferedRWPairTest(BufferedRWPairTest): 1319 tp = io.BufferedRWPair 1320 1321 class PyBufferedRWPairTest(BufferedRWPairTest): 1322 tp = pyio.BufferedRWPair 1323 1324 1325 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 1326 read_mode = "rb+" 1327 write_mode = "wb+" 1328 1329 def test_constructor(self): 1330 BufferedReaderTest.test_constructor(self) 1331 BufferedWriterTest.test_constructor(self) 1332 1333 def test_read_and_write(self): 1334 raw = self.MockRawIO((b"asdf", b"ghjk")) 1335 rw = self.tp(raw, 8) 1336 1337 self.assertEqual(b"as", rw.read(2)) 1338 rw.write(b"ddd") 1339 rw.write(b"eee") 1340 self.assertFalse(raw._write_stack) # Buffer writes 1341 self.assertEqual(b"ghjk", rw.read()) 1342 self.assertEqual(b"dddeee", raw._write_stack[0]) 1343 1344 def test_seek_and_tell(self): 1345 raw = self.BytesIO(b"asdfghjkl") 1346 rw = self.tp(raw) 1347 1348 self.assertEqual(b"as", rw.read(2)) 1349 self.assertEqual(2, rw.tell()) 1350 rw.seek(0, 0) 1351 self.assertEqual(b"asdf", rw.read(4)) 1352 1353 rw.write(b"asdf") 1354 rw.seek(0, 0) 1355 self.assertEqual(b"asdfasdfl", rw.read()) 1356 self.assertEqual(9, rw.tell()) 1357 rw.seek(-4, 2) 1358 self.assertEqual(5, rw.tell()) 1359 rw.seek(2, 1) 1360 self.assertEqual(7, rw.tell()) 1361 self.assertEqual(b"fl", rw.read(11)) 1362 self.assertRaises(TypeError, rw.seek, 0.0) 1363 1364 def check_flush_and_read(self, read_func): 1365 raw = self.BytesIO(b"abcdefghi") 1366 bufio = self.tp(raw) 1367 1368 self.assertEqual(b"ab", read_func(bufio, 2)) 1369 bufio.write(b"12") 1370 self.assertEqual(b"ef", read_func(bufio, 2)) 1371 self.assertEqual(6, bufio.tell()) 1372 bufio.flush() 1373 self.assertEqual(6, bufio.tell()) 1374 self.assertEqual(b"ghi", read_func(bufio)) 1375 raw.seek(0, 0) 1376 raw.write(b"XYZ") 1377 # flush() resets the read buffer 1378 bufio.flush() 1379 bufio.seek(0, 0) 1380 self.assertEqual(b"XYZ", read_func(bufio, 3)) 1381 1382 def test_flush_and_read(self): 1383 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 1384 1385 def test_flush_and_readinto(self): 1386 def _readinto(bufio, n=-1): 1387 b = bytearray(n if n >= 0 else 9999) 1388 n = bufio.readinto(b) 1389 return bytes(b[:n]) 1390 self.check_flush_and_read(_readinto) 1391 1392 def test_flush_and_peek(self): 1393 def _peek(bufio, n=-1): 1394 # This relies on the fact that the buffer can contain the whole 1395 # raw stream, otherwise peek() can return less. 1396 b = bufio.peek(n) 1397 if n != -1: 1398 b = b[:n] 1399 bufio.seek(len(b), 1) 1400 return b 1401 self.check_flush_and_read(_peek) 1402 1403 def test_flush_and_write(self): 1404 raw = self.BytesIO(b"abcdefghi") 1405 bufio = self.tp(raw) 1406 1407 bufio.write(b"123") 1408 bufio.flush() 1409 bufio.write(b"45") 1410 bufio.flush() 1411 bufio.seek(0, 0) 1412 self.assertEqual(b"12345fghi", raw.getvalue()) 1413 self.assertEqual(b"12345fghi", bufio.read()) 1414 1415 def test_threads(self): 1416 BufferedReaderTest.test_threads(self) 1417 BufferedWriterTest.test_threads(self) 1418 1419 def test_writes_and_peek(self): 1420 def _peek(bufio): 1421 bufio.peek(1) 1422 self.check_writes(_peek) 1423 def _peek(bufio): 1424 pos = bufio.tell() 1425 bufio.seek(-1, 1) 1426 bufio.peek(1) 1427 bufio.seek(pos, 0) 1428 self.check_writes(_peek) 1429 1430 def test_writes_and_reads(self): 1431 def _read(bufio): 1432 bufio.seek(-1, 1) 1433 bufio.read(1) 1434 self.check_writes(_read) 1435 1436 def test_writes_and_read1s(self): 1437 def _read1(bufio): 1438 bufio.seek(-1, 1) 1439 bufio.read1(1) 1440 self.check_writes(_read1) 1441 1442 def test_writes_and_readintos(self): 1443 def _read(bufio): 1444 bufio.seek(-1, 1) 1445 bufio.readinto(bytearray(1)) 1446 self.check_writes(_read) 1447 1448 def test_write_after_readahead(self): 1449 # Issue #6629: writing after the buffer was filled by readahead should 1450 # first rewind the raw stream. 1451 for overwrite_size in [1, 5]: 1452 raw = self.BytesIO(b"A" * 10) 1453 bufio = self.tp(raw, 4) 1454 # Trigger readahead 1455 self.assertEqual(bufio.read(1), b"A") 1456 self.assertEqual(bufio.tell(), 1) 1457 # Overwriting should rewind the raw stream if it needs so 1458 bufio.write(b"B" * overwrite_size) 1459 self.assertEqual(bufio.tell(), overwrite_size + 1) 1460 # If the write size was smaller than the buffer size, flush() and 1461 # check that rewind happens. 1462 bufio.flush() 1463 self.assertEqual(bufio.tell(), overwrite_size + 1) 1464 s = raw.getvalue() 1465 self.assertEqual(s, 1466 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 1467 1468 def test_write_rewind_write(self): 1469 # Various combinations of reading / writing / seeking backwards / writing again 1470 def mutate(bufio, pos1, pos2): 1471 assert pos2 >= pos1 1472 # Fill the buffer 1473 bufio.seek(pos1) 1474 bufio.read(pos2 - pos1) 1475 bufio.write(b'\x02') 1476 # This writes earlier than the previous write, but still inside 1477 # the buffer. 1478 bufio.seek(pos1) 1479 bufio.write(b'\x01') 1480 1481 b = b"\x80\x81\x82\x83\x84" 1482 for i in range(0, len(b)): 1483 for j in range(i, len(b)): 1484 raw = self.BytesIO(b) 1485 bufio = self.tp(raw, 100) 1486 mutate(bufio, i, j) 1487 bufio.flush() 1488 expected = bytearray(b) 1489 expected[j] = 2 1490 expected[i] = 1 1491 self.assertEqual(raw.getvalue(), expected, 1492 "failed result for i=%d, j=%d" % (i, j)) 1493 1494 def test_truncate_after_read_or_write(self): 1495 raw = self.BytesIO(b"A" * 10) 1496 bufio = self.tp(raw, 100) 1497 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 1498 self.assertEqual(bufio.truncate(), 2) 1499 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 1500 self.assertEqual(bufio.truncate(), 4) 1501 1502 def test_misbehaved_io(self): 1503 BufferedReaderTest.test_misbehaved_io(self) 1504 BufferedWriterTest.test_misbehaved_io(self) 1505 1506 class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest): 1507 tp = io.BufferedRandom 1508 1509 def test_constructor(self): 1510 BufferedRandomTest.test_constructor(self) 1511 # The allocation can succeed on 32-bit builds, e.g. with more 1512 # than 2GB RAM and a 64-bit kernel. 1513 if sys.maxsize > 0x7FFFFFFF: 1514 rawio = self.MockRawIO() 1515 bufio = self.tp(rawio) 1516 self.assertRaises((OverflowError, MemoryError, ValueError), 1517 bufio.__init__, rawio, sys.maxsize) 1518 1519 def test_garbage_collection(self): 1520 CBufferedReaderTest.test_garbage_collection(self) 1521 CBufferedWriterTest.test_garbage_collection(self) 1522 1523 class PyBufferedRandomTest(BufferedRandomTest): 1524 tp = pyio.BufferedRandom 1525 1526 1527 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these 1528 # properties: 1529 # - A single output character can correspond to many bytes of input. 1530 # - The number of input bytes to complete the character can be 1531 # undetermined until the last input byte is received. 1532 # - The number of input bytes can vary depending on previous input. 1533 # - A single input byte can correspond to many characters of output. 1534 # - The number of output characters can be undetermined until the 1535 # last input byte is received. 1536 # - The number of output characters can vary depending on previous input. 1537 1538 class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 1539 """ 1540 For testing seek/tell behavior with a stateful, buffering decoder. 1541 1542 Input is a sequence of words. Words may be fixed-length (length set 1543 by input) or variable-length (period-terminated). In variable-length 1544 mode, extra periods are ignored. Possible words are: 1545 - 'i' followed by a number sets the input length, I (maximum 99). 1546 When I is set to 0, words are space-terminated. 1547 - 'o' followed by a number sets the output length, O (maximum 99). 1548 - Any other word is converted into a word followed by a period on 1549 the output. The output word consists of the input word truncated 1550 or padded out with hyphens to make its length equal to O. If O 1551 is 0, the word is output verbatim without truncating or padding. 1552 I and O are initially set to 1. When I changes, any buffered input is 1553 re-scanned according to the new I. EOF also terminates the last word. 1554 """ 1555 1556 def __init__(self, errors='strict'): 1557 codecs.IncrementalDecoder.__init__(self, errors) 1558 self.reset() 1559 1560 def __repr__(self): 1561 return '<SID %x>' % id(self) 1562 1563 def reset(self): 1564 self.i = 1 1565 self.o = 1 1566 self.buffer = bytearray() 1567 1568 def getstate(self): 1569 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 1570 return bytes(self.buffer), i*100 + o 1571 1572 def setstate(self, state): 1573 buffer, io = state 1574 self.buffer = bytearray(buffer) 1575 i, o = divmod(io, 100) 1576 self.i, self.o = i ^ 1, o ^ 1 1577 1578 def decode(self, input, final=False): 1579 output = '' 1580 for b in input: 1581 if self.i == 0: # variable-length, terminated with period 1582 if b == '.': 1583 if self.buffer: 1584 output += self.process_word() 1585 else: 1586 self.buffer.append(b) 1587 else: # fixed-length, terminate after self.i bytes 1588 self.buffer.append(b) 1589 if len(self.buffer) == self.i: 1590 output += self.process_word() 1591 if final and self.buffer: # EOF terminates the last word 1592 output += self.process_word() 1593 return output 1594 1595 def process_word(self): 1596 output = '' 1597 if self.buffer[0] == ord('i'): 1598 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 1599 elif self.buffer[0] == ord('o'): 1600 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 1601 else: 1602 output = self.buffer.decode('ascii') 1603 if len(output) < self.o: 1604 output += '-'*self.o # pad out with hyphens 1605 if self.o: 1606 output = output[:self.o] # truncate to output length 1607 output += '.' 1608 self.buffer = bytearray() 1609 return output 1610 1611 codecEnabled = False 1612 1613 @classmethod 1614 def lookupTestDecoder(cls, name): 1615 if cls.codecEnabled and name == 'test_decoder': 1616 latin1 = codecs.lookup('latin-1') 1617 return codecs.CodecInfo( 1618 name='test_decoder', encode=latin1.encode, decode=None, 1619 incrementalencoder=None, 1620 streamreader=None, streamwriter=None, 1621 incrementaldecoder=cls) 1622 1623 # Register the previous decoder for testing. 1624 # Disabled by default, tests will enable it. 1625 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) 1626 1627 1628 class StatefulIncrementalDecoderTest(unittest.TestCase): 1629 """ 1630 Make sure the StatefulIncrementalDecoder actually works. 1631 """ 1632 1633 test_cases = [ 1634 # I=1, O=1 (fixed-length input == fixed-length output) 1635 (b'abcd', False, 'a.b.c.d.'), 1636 # I=0, O=0 (variable-length input, variable-length output) 1637 (b'oiabcd', True, 'abcd.'), 1638 # I=0, O=0 (should ignore extra periods) 1639 (b'oi...abcd...', True, 'abcd.'), 1640 # I=0, O=6 (variable-length input, fixed-length output) 1641 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 1642 # I=2, O=6 (fixed-length input < fixed-length output) 1643 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 1644 # I=6, O=3 (fixed-length input > fixed-length output) 1645 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 1646 # I=0, then 3; O=29, then 15 (with longer output) 1647 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 1648 'a----------------------------.' + 1649 'b----------------------------.' + 1650 'cde--------------------------.' + 1651 'abcdefghijabcde.' + 1652 'a.b------------.' + 1653 '.c.------------.' + 1654 'd.e------------.' + 1655 'k--------------.' + 1656 'l--------------.' + 1657 'm--------------.') 1658 ] 1659 1660 def test_decoder(self): 1661 # Try a few one-shot test cases. 1662 for input, eof, output in self.test_cases: 1663 d = StatefulIncrementalDecoder() 1664 self.assertEqual(d.decode(input, eof), output) 1665 1666 # Also test an unfinished decode, followed by forcing EOF. 1667 d = StatefulIncrementalDecoder() 1668 self.assertEqual(d.decode(b'oiabcd'), '') 1669 self.assertEqual(d.decode(b'', 1), 'abcd.') 1670 1671 class TextIOWrapperTest(unittest.TestCase): 1672 1673 def setUp(self): 1674 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 1675 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 1676 support.unlink(support.TESTFN) 1677 1678 def tearDown(self): 1679 support.unlink(support.TESTFN) 1680 1681 def test_constructor(self): 1682 r = self.BytesIO(b"\xc3\xa9\n\n") 1683 b = self.BufferedReader(r, 1000) 1684 t = self.TextIOWrapper(b) 1685 t.__init__(b, encoding="latin1", newline="\r\n") 1686 self.assertEqual(t.encoding, "latin1") 1687 self.assertEqual(t.line_buffering, False) 1688 t.__init__(b, encoding="utf8", line_buffering=True) 1689 self.assertEqual(t.encoding, "utf8") 1690 self.assertEqual(t.line_buffering, True) 1691 self.assertEqual("\xe9\n", t.readline()) 1692 self.assertRaises(TypeError, t.__init__, b, newline=42) 1693 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 1694 1695 def test_detach(self): 1696 r = self.BytesIO() 1697 b = self.BufferedWriter(r) 1698 t = self.TextIOWrapper(b) 1699 self.assertIs(t.detach(), b) 1700 1701 t = self.TextIOWrapper(b, encoding="ascii") 1702 t.write("howdy") 1703 self.assertFalse(r.getvalue()) 1704 t.detach() 1705 self.assertEqual(r.getvalue(), b"howdy") 1706 self.assertRaises(ValueError, t.detach) 1707 1708 def test_repr(self): 1709 raw = self.BytesIO("hello".encode("utf-8")) 1710 b = self.BufferedReader(raw) 1711 t = self.TextIOWrapper(b, encoding="utf-8") 1712 modname = self.TextIOWrapper.__module__ 1713 self.assertEqual(repr(t), 1714 "<%s.TextIOWrapper encoding='utf-8'>" % modname) 1715 raw.name = "dummy" 1716 self.assertEqual(repr(t), 1717 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname) 1718 raw.name = b"dummy" 1719 self.assertEqual(repr(t), 1720 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 1721 1722 def test_line_buffering(self): 1723 r = self.BytesIO() 1724 b = self.BufferedWriter(r, 1000) 1725 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 1726 t.write("X") 1727 self.assertEqual(r.getvalue(), b"") # No flush happened 1728 t.write("Y\nZ") 1729 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 1730 t.write("A\rB") 1731 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 1732 1733 def test_encoding(self): 1734 # Check the encoding attribute is always set, and valid 1735 b = self.BytesIO() 1736 t = self.TextIOWrapper(b, encoding="utf8") 1737 self.assertEqual(t.encoding, "utf8") 1738 t = self.TextIOWrapper(b) 1739 self.assertTrue(t.encoding is not None) 1740 codecs.lookup(t.encoding) 1741 1742 def test_encoding_errors_reading(self): 1743 # (1) default 1744 b = self.BytesIO(b"abc\n\xff\n") 1745 t = self.TextIOWrapper(b, encoding="ascii") 1746 self.assertRaises(UnicodeError, t.read) 1747 # (2) explicit strict 1748 b = self.BytesIO(b"abc\n\xff\n") 1749 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 1750 self.assertRaises(UnicodeError, t.read) 1751 # (3) ignore 1752 b = self.BytesIO(b"abc\n\xff\n") 1753 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 1754 self.assertEqual(t.read(), "abc\n\n") 1755 # (4) replace 1756 b = self.BytesIO(b"abc\n\xff\n") 1757 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 1758 self.assertEqual(t.read(), "abc\n\ufffd\n") 1759 1760 def test_encoding_errors_writing(self): 1761 # (1) default 1762 b = self.BytesIO() 1763 t = self.TextIOWrapper(b, encoding="ascii") 1764 self.assertRaises(UnicodeError, t.write, "\xff") 1765 # (2) explicit strict 1766 b = self.BytesIO() 1767 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 1768 self.assertRaises(UnicodeError, t.write, "\xff") 1769 # (3) ignore 1770 b = self.BytesIO() 1771 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 1772 newline="\n") 1773 t.write("abc\xffdef\n") 1774 t.flush() 1775 self.assertEqual(b.getvalue(), b"abcdef\n") 1776 # (4) replace 1777 b = self.BytesIO() 1778 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 1779 newline="\n") 1780 t.write("abc\xffdef\n") 1781 t.flush() 1782 self.assertEqual(b.getvalue(), b"abc?def\n") 1783 1784 def test_newlines(self): 1785 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 1786 1787 tests = [ 1788 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 1789 [ '', input_lines ], 1790 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 1791 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 1792 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 1793 ] 1794 encodings = ( 1795 'utf-8', 'latin-1', 1796 'utf-16', 'utf-16-le', 'utf-16-be', 1797 'utf-32', 'utf-32-le', 'utf-32-be', 1798 ) 1799 1800 # Try a range of buffer sizes to test the case where \r is the last 1801 # character in TextIOWrapper._pending_line. 1802 for encoding in encodings: 1803 # XXX: str.encode() should return bytes 1804 data = bytes(''.join(input_lines).encode(encoding)) 1805 for do_reads in (False, True): 1806 for bufsize in range(1, 10): 1807 for newline, exp_lines in tests: 1808 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 1809 textio = self.TextIOWrapper(bufio, newline=newline, 1810 encoding=encoding) 1811 if do_reads: 1812 got_lines = [] 1813 while True: 1814 c2 = textio.read(2) 1815 if c2 == '': 1816 break 1817 self.assertEqual(len(c2), 2) 1818 got_lines.append(c2 + textio.readline()) 1819 else: 1820 got_lines = list(textio) 1821 1822 for got_line, exp_line in zip(got_lines, exp_lines): 1823 self.assertEqual(got_line, exp_line) 1824 self.assertEqual(len(got_lines), len(exp_lines)) 1825 1826 def test_newlines_input(self): 1827 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 1828 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 1829 for newline, expected in [ 1830 (None, normalized.decode("ascii").splitlines(True)), 1831 ("", testdata.decode("ascii").splitlines(True)), 1832 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 1833 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 1834 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 1835 ]: 1836 buf = self.BytesIO(testdata) 1837 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 1838 self.assertEqual(txt.readlines(), expected) 1839 txt.seek(0) 1840 self.assertEqual(txt.read(), "".join(expected)) 1841 1842 def test_newlines_output(self): 1843 testdict = { 1844 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 1845 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 1846 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 1847 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 1848 } 1849 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 1850 for newline, expected in tests: 1851 buf = self.BytesIO() 1852 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 1853 txt.write("AAA\nB") 1854 txt.write("BB\nCCC\n") 1855 txt.write("X\rY\r\nZ") 1856 txt.flush() 1857 self.assertEqual(buf.closed, False) 1858 self.assertEqual(buf.getvalue(), expected) 1859 1860 def test_destructor(self): 1861 l = [] 1862 base = self.BytesIO 1863 class MyBytesIO(base): 1864 def close(self): 1865 l.append(self.getvalue()) 1866 base.close(self) 1867 b = MyBytesIO() 1868 t = self.TextIOWrapper(b, encoding="ascii") 1869 t.write("abc") 1870 del t 1871 support.gc_collect() 1872 self.assertEqual([b"abc"], l) 1873 1874 def test_override_destructor(self): 1875 record = [] 1876 class MyTextIO(self.TextIOWrapper): 1877 def __del__(self): 1878 record.append(1) 1879 try: 1880 f = super(MyTextIO, self).__del__ 1881 except AttributeError: 1882 pass 1883 else: 1884 f() 1885 def close(self): 1886 record.append(2) 1887 super(MyTextIO, self).close() 1888 def flush(self): 1889 record.append(3) 1890 super(MyTextIO, self).flush() 1891 b = self.BytesIO() 1892 t = MyTextIO(b, encoding="ascii") 1893 del t 1894 support.gc_collect() 1895 self.assertEqual(record, [1, 2, 3]) 1896 1897 def test_error_through_destructor(self): 1898 # Test that the exception state is not modified by a destructor, 1899 # even if close() fails. 1900 rawio = self.CloseFailureIO() 1901 def f(): 1902 self.TextIOWrapper(rawio).xyzzy 1903 with support.captured_output("stderr") as s: 1904 self.assertRaises(AttributeError, f) 1905 s = s.getvalue().strip() 1906 if s: 1907 # The destructor *may* have printed an unraisable error, check it 1908 self.assertEqual(len(s.splitlines()), 1) 1909 self.assertTrue(s.startswith("Exception IOError: "), s) 1910 self.assertTrue(s.endswith(" ignored"), s) 1911 1912 # Systematic tests of the text I/O API 1913 1914 def test_basic_io(self): 1915 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 1916 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": 1917 f = self.open(support.TESTFN, "w+", encoding=enc) 1918 f._CHUNK_SIZE = chunksize 1919 self.assertEqual(f.write("abc"), 3) 1920 f.close() 1921 f = self.open(support.TESTFN, "r+", encoding=enc) 1922 f._CHUNK_SIZE = chunksize 1923 self.assertEqual(f.tell(), 0) 1924 self.assertEqual(f.read(), "abc") 1925 cookie = f.tell() 1926 self.assertEqual(f.seek(0), 0) 1927 self.assertEqual(f.read(None), "abc") 1928 f.seek(0) 1929 self.assertEqual(f.read(2), "ab") 1930 self.assertEqual(f.read(1), "c") 1931 self.assertEqual(f.read(1), "") 1932 self.assertEqual(f.read(), "") 1933 self.assertEqual(f.tell(), cookie) 1934 self.assertEqual(f.seek(0), 0) 1935 self.assertEqual(f.seek(0, 2), cookie) 1936 self.assertEqual(f.write("def"), 3) 1937 self.assertEqual(f.seek(cookie), cookie) 1938 self.assertEqual(f.read(), "def") 1939 if enc.startswith("utf"): 1940 self.multi_line_test(f, enc) 1941 f.close() 1942 1943 def multi_line_test(self, f, enc): 1944 f.seek(0) 1945 f.truncate() 1946 sample = "s\xff\u0fff\uffff" 1947 wlines = [] 1948 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 1949 chars = [] 1950 for i in range(size): 1951 chars.append(sample[i % len(sample)]) 1952 line = "".join(chars) + "\n" 1953 wlines.append((f.tell(), line)) 1954 f.write(line) 1955 f.seek(0) 1956 rlines = [] 1957 while True: 1958 pos = f.tell() 1959 line = f.readline() 1960 if not line: 1961 break 1962 rlines.append((pos, line)) 1963 self.assertEqual(rlines, wlines) 1964 1965 def test_telling(self): 1966 f = self.open(support.TESTFN, "w+", encoding="utf8") 1967 p0 = f.tell() 1968 f.write("\xff\n") 1969 p1 = f.tell() 1970 f.write("\xff\n") 1971 p2 = f.tell() 1972 f.seek(0) 1973 self.assertEqual(f.tell(), p0) 1974 self.assertEqual(f.readline(), "\xff\n") 1975 self.assertEqual(f.tell(), p1) 1976 self.assertEqual(f.readline(), "\xff\n") 1977 self.assertEqual(f.tell(), p2) 1978 f.seek(0) 1979 for line in f: 1980 self.assertEqual(line, "\xff\n") 1981 self.assertRaises(IOError, f.tell) 1982 self.assertEqual(f.tell(), p2) 1983 f.close() 1984 1985 def test_seeking(self): 1986 chunk_size = _default_chunk_size() 1987 prefix_size = chunk_size - 2 1988 u_prefix = "a" * prefix_size 1989 prefix = bytes(u_prefix.encode("utf-8")) 1990 self.assertEqual(len(u_prefix), len(prefix)) 1991 u_suffix = "\u8888\n" 1992 suffix = bytes(u_suffix.encode("utf-8")) 1993 line = prefix + suffix 1994 f = self.open(support.TESTFN, "wb") 1995 f.write(line*2) 1996 f.close() 1997 f = self.open(support.TESTFN, "r", encoding="utf-8") 1998 s = f.read(prefix_size) 1999 self.assertEqual(s, prefix.decode("ascii")) 2000 self.assertEqual(f.tell(), prefix_size) 2001 self.assertEqual(f.readline(), u_suffix) 2002 2003 def test_seeking_too(self): 2004 # Regression test for a specific bug 2005 data = b'\xe0\xbf\xbf\n' 2006 f = self.open(support.TESTFN, "wb") 2007 f.write(data) 2008 f.close() 2009 f = self.open(support.TESTFN, "r", encoding="utf-8") 2010 f._CHUNK_SIZE # Just test that it exists 2011 f._CHUNK_SIZE = 2 2012 f.readline() 2013 f.tell() 2014 2015 def test_seek_and_tell(self): 2016 #Test seek/tell using the StatefulIncrementalDecoder. 2017 # Make test faster by doing smaller seeks 2018 CHUNK_SIZE = 128 2019 2020 def test_seek_and_tell_with_data(data, min_pos=0): 2021 """Tell/seek to various points within a data stream and ensure 2022 that the decoded data returned by read() is consistent.""" 2023 f = self.open(support.TESTFN, 'wb') 2024 f.write(data) 2025 f.close() 2026 f = self.open(support.TESTFN, encoding='test_decoder') 2027 f._CHUNK_SIZE = CHUNK_SIZE 2028 decoded = f.read() 2029 f.close() 2030 2031 for i in range(min_pos, len(decoded) + 1): # seek positions 2032 for j in [1, 5, len(decoded) - i]: # read lengths 2033 f = self.open(support.TESTFN, encoding='test_decoder') 2034 self.assertEqual(f.read(i), decoded[:i]) 2035 cookie = f.tell() 2036 self.assertEqual(f.read(j), decoded[i:i + j]) 2037 f.seek(cookie) 2038 self.assertEqual(f.read(), decoded[i:]) 2039 f.close() 2040 2041 # Enable the test decoder. 2042 StatefulIncrementalDecoder.codecEnabled = 1 2043 2044 # Run the tests. 2045 try: 2046 # Try each test case. 2047 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2048 test_seek_and_tell_with_data(input) 2049 2050 # Position each test case so that it crosses a chunk boundary. 2051 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2052 offset = CHUNK_SIZE - len(input)//2 2053 prefix = b'.'*offset 2054 # Don't bother seeking into the prefix (takes too long). 2055 min_pos = offset*2 2056 test_seek_and_tell_with_data(prefix + input, min_pos) 2057 2058 # Ensure our test decoder won't interfere with subsequent tests. 2059 finally: 2060 StatefulIncrementalDecoder.codecEnabled = 0 2061 2062 def test_encoded_writes(self): 2063 data = "1234567890" 2064 tests = ("utf-16", 2065 "utf-16-le", 2066 "utf-16-be", 2067 "utf-32", 2068 "utf-32-le", 2069 "utf-32-be") 2070 for encoding in tests: 2071 buf = self.BytesIO() 2072 f = self.TextIOWrapper(buf, encoding=encoding) 2073 # Check if the BOM is written only once (see issue1753). 2074 f.write(data) 2075 f.write(data) 2076 f.seek(0) 2077 self.assertEqual(f.read(), data * 2) 2078 f.seek(0) 2079 self.assertEqual(f.read(), data * 2) 2080 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 2081 2082 def test_unreadable(self): 2083 class UnReadable(self.BytesIO): 2084 def readable(self): 2085 return False 2086 txt = self.TextIOWrapper(UnReadable()) 2087 self.assertRaises(IOError, txt.read) 2088 2089 def test_read_one_by_one(self): 2090 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 2091 reads = "" 2092 while True: 2093 c = txt.read(1) 2094 if not c: 2095 break 2096 reads += c 2097 self.assertEqual(reads, "AA\nBB") 2098 2099 def test_readlines(self): 2100 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 2101 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 2102 txt.seek(0) 2103 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 2104 txt.seek(0) 2105 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 2106 2107 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 2108 def test_read_by_chunk(self): 2109 # make sure "\r\n" straddles 128 char boundary. 2110 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 2111 reads = "" 2112 while True: 2113 c = txt.read(128) 2114 if not c: 2115 break 2116 reads += c 2117 self.assertEqual(reads, "A"*127+"\nB") 2118 2119 def test_issue1395_1(self): 2120 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2121 2122 # read one char at a time 2123 reads = "" 2124 while True: 2125 c = txt.read(1) 2126 if not c: 2127 break 2128 reads += c 2129 self.assertEqual(reads, self.normalized) 2130 2131 def test_issue1395_2(self): 2132 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2133 txt._CHUNK_SIZE = 4 2134 2135 reads = "" 2136 while True: 2137 c = txt.read(4) 2138 if not c: 2139 break 2140 reads += c 2141 self.assertEqual(reads, self.normalized) 2142 2143 def test_issue1395_3(self): 2144 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2145 txt._CHUNK_SIZE = 4 2146 2147 reads = txt.read(4) 2148 reads += txt.read(4) 2149 reads += txt.readline() 2150 reads += txt.readline() 2151 reads += txt.readline() 2152 self.assertEqual(reads, self.normalized) 2153 2154 def test_issue1395_4(self): 2155 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2156 txt._CHUNK_SIZE = 4 2157 2158 reads = txt.read(4) 2159 reads += txt.read() 2160 self.assertEqual(reads, self.normalized) 2161 2162 def test_issue1395_5(self): 2163 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2164 txt._CHUNK_SIZE = 4 2165 2166 reads = txt.read(4) 2167 pos = txt.tell() 2168 txt.seek(0) 2169 txt.seek(pos) 2170 self.assertEqual(txt.read(4), "BBB\n") 2171 2172 def test_issue2282(self): 2173 buffer = self.BytesIO(self.testdata) 2174 txt = self.TextIOWrapper(buffer, encoding="ascii") 2175 2176 self.assertEqual(buffer.seekable(), txt.seekable()) 2177 2178 def test_append_bom(self): 2179 # The BOM is not written again when appending to a non-empty file 2180 filename = support.TESTFN 2181 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2182 with self.open(filename, 'w', encoding=charset) as f: 2183 f.write('aaa') 2184 pos = f.tell() 2185 with self.open(filename, 'rb') as f: 2186 self.assertEqual(f.read(), 'aaa'.encode(charset)) 2187 2188 with self.open(filename, 'a', encoding=charset) as f: 2189 f.write('xxx') 2190 with self.open(filename, 'rb') as f: 2191 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 2192 2193 def test_seek_bom(self): 2194 # Same test, but when seeking manually 2195 filename = support.TESTFN 2196 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2197 with self.open(filename, 'w', encoding=charset) as f: 2198 f.write('aaa') 2199 pos = f.tell() 2200 with self.open(filename, 'r+', encoding=charset) as f: 2201 f.seek(pos) 2202 f.write('zzz') 2203 f.seek(0) 2204 f.write('bbb') 2205 with self.open(filename, 'rb') as f: 2206 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 2207 2208 def test_errors_property(self): 2209 with self.open(support.TESTFN, "w") as f: 2210 self.assertEqual(f.errors, "strict") 2211 with self.open(support.TESTFN, "w", errors="replace") as f: 2212 self.assertEqual(f.errors, "replace") 2213 2214 @unittest.skipUnless(threading, 'Threading required for this test.') 2215 def test_threads_write(self): 2216 # Issue6750: concurrent writes could duplicate data 2217 event = threading.Event() 2218 with self.open(support.TESTFN, "w", buffering=1) as f: 2219 def run(n): 2220 text = "Thread%03d\n" % n 2221 event.wait() 2222 f.write(text) 2223 threads = [threading.Thread(target=lambda n=x: run(n)) 2224 for x in range(20)] 2225 for t in threads: 2226 t.start() 2227 time.sleep(0.02) 2228 event.set() 2229 for t in threads: 2230 t.join() 2231 with self.open(support.TESTFN) as f: 2232 content = f.read() 2233 for n in range(20): 2234 self.assertEqual(content.count("Thread%03d\n" % n), 1) 2235 2236 def test_flush_error_on_close(self): 2237 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2238 def bad_flush(): 2239 raise IOError() 2240 txt.flush = bad_flush 2241 self.assertRaises(IOError, txt.close) # exception not swallowed 2242 2243 def test_multi_close(self): 2244 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2245 txt.close() 2246 txt.close() 2247 txt.close() 2248 self.assertRaises(ValueError, txt.flush) 2249 2250 def test_readonly_attributes(self): 2251 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2252 buf = self.BytesIO(self.testdata) 2253 with self.assertRaises((AttributeError, TypeError)): 2254 txt.buffer = buf 2255 2256 class CTextIOWrapperTest(TextIOWrapperTest): 2257 2258 def test_initialization(self): 2259 r = self.BytesIO(b"\xc3\xa9\n\n") 2260 b = self.BufferedReader(r, 1000) 2261 t = self.TextIOWrapper(b) 2262 self.assertRaises(TypeError, t.__init__, b, newline=42) 2263 self.assertRaises(ValueError, t.read) 2264 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2265 self.assertRaises(ValueError, t.read) 2266 2267 def test_garbage_collection(self): 2268 # C TextIOWrapper objects are collected, and collecting them flushes 2269 # all data to disk. 2270 # The Python version has __del__, so it ends in gc.garbage instead. 2271 rawio = io.FileIO(support.TESTFN, "wb") 2272 b = self.BufferedWriter(rawio) 2273 t = self.TextIOWrapper(b, encoding="ascii") 2274 t.write("456def") 2275 t.x = t 2276 wr = weakref.ref(t) 2277 del t 2278 support.gc_collect() 2279 self.assertTrue(wr() is None, wr) 2280 with self.open(support.TESTFN, "rb") as f: 2281 self.assertEqual(f.read(), b"456def") 2282 2283 class PyTextIOWrapperTest(TextIOWrapperTest): 2284 pass 2285 2286 2287 class IncrementalNewlineDecoderTest(unittest.TestCase): 2288 2289 def check_newline_decoding_utf8(self, decoder): 2290 # UTF-8 specific tests for a newline decoder 2291 def _check_decode(b, s, **kwargs): 2292 # We exercise getstate() / setstate() as well as decode() 2293 state = decoder.getstate() 2294 self.assertEqual(decoder.decode(b, **kwargs), s) 2295 decoder.setstate(state) 2296 self.assertEqual(decoder.decode(b, **kwargs), s) 2297 2298 _check_decode(b'\xe8\xa2\x88', "\u8888") 2299 2300 _check_decode(b'\xe8', "") 2301 _check_decode(b'\xa2', "") 2302 _check_decode(b'\x88', "\u8888") 2303 2304 _check_decode(b'\xe8', "") 2305 _check_decode(b'\xa2', "") 2306 _check_decode(b'\x88', "\u8888") 2307 2308 _check_decode(b'\xe8', "") 2309 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 2310 2311 decoder.reset() 2312 _check_decode(b'\n', "\n") 2313 _check_decode(b'\r', "") 2314 _check_decode(b'', "\n", final=True) 2315 _check_decode(b'\r', "\n", final=True) 2316 2317 _check_decode(b'\r', "") 2318 _check_decode(b'a', "\na") 2319 2320 _check_decode(b'\r\r\n', "\n\n") 2321 _check_decode(b'\r', "") 2322 _check_decode(b'\r', "\n") 2323 _check_decode(b'\na', "\na") 2324 2325 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 2326 _check_decode(b'\xe8\xa2\x88', "\u8888") 2327 _check_decode(b'\n', "\n") 2328 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 2329 _check_decode(b'\n', "\n") 2330 2331 def check_newline_decoding(self, decoder, encoding): 2332 result = [] 2333 if encoding is not None: 2334 encoder = codecs.getincrementalencoder(encoding)() 2335 def _decode_bytewise(s): 2336 # Decode one byte at a time 2337 for b in encoder.encode(s): 2338 result.append(decoder.decode(b)) 2339 else: 2340 encoder = None 2341 def _decode_bytewise(s): 2342 # Decode one char at a time 2343 for c in s: 2344 result.append(decoder.decode(c)) 2345 self.assertEqual(decoder.newlines, None) 2346 _decode_bytewise("abc\n\r") 2347 self.assertEqual(decoder.newlines, '\n') 2348 _decode_bytewise("\nabc") 2349 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 2350 _decode_bytewise("abc\r") 2351 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 2352 _decode_bytewise("abc") 2353 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 2354 _decode_bytewise("abc\r") 2355 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 2356 decoder.reset() 2357 input = "abc" 2358 if encoder is not None: 2359 encoder.reset() 2360 input = encoder.encode(input) 2361 self.assertEqual(decoder.decode(input), "abc") 2362 self.assertEqual(decoder.newlines, None) 2363 2364 def test_newline_decoder(self): 2365 encodings = ( 2366 # None meaning the IncrementalNewlineDecoder takes unicode input 2367 # rather than bytes input 2368 None, 'utf-8', 'latin-1', 2369 'utf-16', 'utf-16-le', 'utf-16-be', 2370 'utf-32', 'utf-32-le', 'utf-32-be', 2371 ) 2372 for enc in encodings: 2373 decoder = enc and codecs.getincrementaldecoder(enc)() 2374 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2375 self.check_newline_decoding(decoder, enc) 2376 decoder = codecs.getincrementaldecoder("utf-8")() 2377 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2378 self.check_newline_decoding_utf8(decoder) 2379 2380 def test_newline_bytes(self): 2381 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 2382 def _check(dec): 2383 self.assertEqual(dec.newlines, None) 2384 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 2385 self.assertEqual(dec.newlines, None) 2386 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 2387 self.assertEqual(dec.newlines, None) 2388 dec = self.IncrementalNewlineDecoder(None, translate=False) 2389 _check(dec) 2390 dec = self.IncrementalNewlineDecoder(None, translate=True) 2391 _check(dec) 2392 2393 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2394 pass 2395 2396 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2397 pass 2398 2399 2400 # XXX Tests for open() 2401 2402 class MiscIOTest(unittest.TestCase): 2403 2404 def tearDown(self): 2405 support.unlink(support.TESTFN) 2406 2407 def test___all__(self): 2408 for name in self.io.__all__: 2409 obj = getattr(self.io, name, None) 2410 self.assertTrue(obj is not None, name) 2411 if name == "open": 2412 continue 2413 elif "error" in name.lower() or name == "UnsupportedOperation": 2414 self.assertTrue(issubclass(obj, Exception), name) 2415 elif not name.startswith("SEEK_"): 2416 self.assertTrue(issubclass(obj, self.IOBase)) 2417 2418 def test_attributes(self): 2419 f = self.open(support.TESTFN, "wb", buffering=0) 2420 self.assertEqual(f.mode, "wb") 2421 f.close() 2422 2423 f = self.open(support.TESTFN, "U") 2424 self.assertEqual(f.name, support.TESTFN) 2425 self.assertEqual(f.buffer.name, support.TESTFN) 2426 self.assertEqual(f.buffer.raw.name, support.TESTFN) 2427 self.assertEqual(f.mode, "U") 2428 self.assertEqual(f.buffer.mode, "rb") 2429 self.assertEqual(f.buffer.raw.mode, "rb") 2430 f.close() 2431 2432 f = self.open(support.TESTFN, "w+") 2433 self.assertEqual(f.mode, "w+") 2434 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 2435 self.assertEqual(f.buffer.raw.mode, "rb+") 2436 2437 g = self.open(f.fileno(), "wb", closefd=False) 2438 self.assertEqual(g.mode, "wb") 2439 self.assertEqual(g.raw.mode, "wb") 2440 self.assertEqual(g.name, f.fileno()) 2441 self.assertEqual(g.raw.name, f.fileno()) 2442 f.close() 2443 g.close() 2444 2445 def test_io_after_close(self): 2446 for kwargs in [ 2447 {"mode": "w"}, 2448 {"mode": "wb"}, 2449 {"mode": "w", "buffering": 1}, 2450 {"mode": "w", "buffering": 2}, 2451 {"mode": "wb", "buffering": 0}, 2452 {"mode": "r"}, 2453 {"mode": "rb"}, 2454 {"mode": "r", "buffering": 1}, 2455 {"mode": "r", "buffering": 2}, 2456 {"mode": "rb", "buffering": 0}, 2457 {"mode": "w+"}, 2458 {"mode": "w+b"}, 2459 {"mode": "w+", "buffering": 1}, 2460 {"mode": "w+", "buffering": 2}, 2461 {"mode": "w+b", "buffering": 0}, 2462 ]: 2463 f = self.open(support.TESTFN, **kwargs) 2464 f.close() 2465 self.assertRaises(ValueError, f.flush) 2466 self.assertRaises(ValueError, f.fileno) 2467 self.assertRaises(ValueError, f.isatty) 2468 self.assertRaises(ValueError, f.__iter__) 2469 if hasattr(f, "peek"): 2470 self.assertRaises(ValueError, f.peek, 1) 2471 self.assertRaises(ValueError, f.read) 2472 if hasattr(f, "read1"): 2473 self.assertRaises(ValueError, f.read1, 1024) 2474 if hasattr(f, "readall"): 2475 self.assertRaises(ValueError, f.readall) 2476 if hasattr(f, "readinto"): 2477 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 2478 self.assertRaises(ValueError, f.readline) 2479 self.assertRaises(ValueError, f.readlines) 2480 self.assertRaises(ValueError, f.seek, 0) 2481 self.assertRaises(ValueError, f.tell) 2482 self.assertRaises(ValueError, f.truncate) 2483 self.assertRaises(ValueError, f.write, 2484 b"" if "b" in kwargs['mode'] else "") 2485 self.assertRaises(ValueError, f.writelines, []) 2486 self.assertRaises(ValueError, next, f) 2487 2488 def test_blockingioerror(self): 2489 # Various BlockingIOError issues 2490 self.assertRaises(TypeError, self.BlockingIOError) 2491 self.assertRaises(TypeError, self.BlockingIOError, 1) 2492 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) 2493 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) 2494 b = self.BlockingIOError(1, "") 2495 self.assertEqual(b.characters_written, 0) 2496 class C(unicode): 2497 pass 2498 c = C("") 2499 b = self.BlockingIOError(1, c) 2500 c.b = b 2501 b.c = c 2502 wr = weakref.ref(c) 2503 del c, b 2504 support.gc_collect() 2505 self.assertTrue(wr() is None, wr) 2506 2507 def test_abcs(self): 2508 # Test the visible base classes are ABCs. 2509 self.assertIsInstance(self.IOBase, abc.ABCMeta) 2510 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 2511 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 2512 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 2513 2514 def _check_abc_inheritance(self, abcmodule): 2515 with self.open(support.TESTFN, "wb", buffering=0) as f: 2516 self.assertIsInstance(f, abcmodule.IOBase) 2517 self.assertIsInstance(f, abcmodule.RawIOBase) 2518 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 2519 self.assertNotIsInstance(f, abcmodule.TextIOBase) 2520 with self.open(support.TESTFN, "wb") as f: 2521 self.assertIsInstance(f, abcmodule.IOBase) 2522 self.assertNotIsInstance(f, abcmodule.RawIOBase) 2523 self.assertIsInstance(f, abcmodule.BufferedIOBase) 2524 self.assertNotIsInstance(f, abcmodule.TextIOBase) 2525 with self.open(support.TESTFN, "w") as f: 2526 self.assertIsInstance(f, abcmodule.IOBase) 2527 self.assertNotIsInstance(f, abcmodule.RawIOBase) 2528 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 2529 self.assertIsInstance(f, abcmodule.TextIOBase) 2530 2531 def test_abc_inheritance(self): 2532 # Test implementations inherit from their respective ABCs 2533 self._check_abc_inheritance(self) 2534 2535 def test_abc_inheritance_official(self): 2536 # Test implementations inherit from the official ABCs of the 2537 # baseline "io" module. 2538 self._check_abc_inheritance(io) 2539 2540 class CMiscIOTest(MiscIOTest): 2541 io = io 2542 2543 class PyMiscIOTest(MiscIOTest): 2544 io = pyio 2545 2546 2547 @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 2548 class SignalsTest(unittest.TestCase): 2549 2550 def setUp(self): 2551 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 2552 2553 def tearDown(self): 2554 signal.signal(signal.SIGALRM, self.oldalrm) 2555 2556 def alarm_interrupt(self, sig, frame): 2557 1 // 0 2558 2559 @unittest.skipUnless(threading, 'Threading required for this test.') 2560 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 2561 """Check that a partial write, when it gets interrupted, properly 2562 invokes the signal handler, and bubbles up the exception raised 2563 in the latter.""" 2564 read_results = [] 2565 def _read(): 2566 s = os.read(r, 1) 2567 read_results.append(s) 2568 t = threading.Thread(target=_read) 2569 t.daemon = True 2570 r, w = os.pipe() 2571 try: 2572 wio = self.io.open(w, **fdopen_kwargs) 2573 t.start() 2574 signal.alarm(1) 2575 # Fill the pipe enough that the write will be blocking. 2576 # It will be interrupted by the timer armed above. Since the 2577 # other thread has read one byte, the low-level write will 2578 # return with a successful (partial) result rather than an EINTR. 2579 # The buffered IO layer must check for pending signal 2580 # handlers, which in this case will invoke alarm_interrupt(). 2581 self.assertRaises(ZeroDivisionError, 2582 wio.write, item * (1024 * 1024)) 2583 t.join() 2584 # We got one byte, get another one and check that it isn't a 2585 # repeat of the first one. 2586 read_results.append(os.read(r, 1)) 2587 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 2588 finally: 2589 os.close(w) 2590 os.close(r) 2591 # This is deliberate. If we didn't close the file descriptor 2592 # before closing wio, wio would try to flush its internal 2593 # buffer, and block again. 2594 try: 2595 wio.close() 2596 except IOError as e: 2597 if e.errno != errno.EBADF: 2598 raise 2599 2600 def test_interrupted_write_unbuffered(self): 2601 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 2602 2603 def test_interrupted_write_buffered(self): 2604 self.check_interrupted_write(b"xy", b"xy", mode="wb") 2605 2606 def test_interrupted_write_text(self): 2607 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 2608 2609 def check_reentrant_write(self, data, **fdopen_kwargs): 2610 def on_alarm(*args): 2611 # Will be called reentrantly from the same thread 2612 wio.write(data) 2613 1/0 2614 signal.signal(signal.SIGALRM, on_alarm) 2615 r, w = os.pipe() 2616 wio = self.io.open(w, **fdopen_kwargs) 2617 try: 2618 signal.alarm(1) 2619 # Either the reentrant call to wio.write() fails with RuntimeError, 2620 # or the signal handler raises ZeroDivisionError. 2621 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 2622 while 1: 2623 for i in range(100): 2624 wio.write(data) 2625 wio.flush() 2626 # Make sure the buffer doesn't fill up and block further writes 2627 os.read(r, len(data) * 100) 2628 exc = cm.exception 2629 if isinstance(exc, RuntimeError): 2630 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 2631 finally: 2632 wio.close() 2633 os.close(r) 2634 2635 def test_reentrant_write_buffered(self): 2636 self.check_reentrant_write(b"xy", mode="wb") 2637 2638 def test_reentrant_write_text(self): 2639 self.check_reentrant_write("xy", mode="w", encoding="ascii") 2640 2641 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 2642 """Check that a buffered read, when it gets interrupted (either 2643 returning a partial result or EINTR), properly invokes the signal 2644 handler and retries if the latter returned successfully.""" 2645 r, w = os.pipe() 2646 fdopen_kwargs["closefd"] = False 2647 def alarm_handler(sig, frame): 2648 os.write(w, b"bar") 2649 signal.signal(signal.SIGALRM, alarm_handler) 2650 try: 2651 rio = self.io.open(r, **fdopen_kwargs) 2652 os.write(w, b"foo") 2653 signal.alarm(1) 2654 # Expected behaviour: 2655 # - first raw read() returns partial b"foo" 2656 # - second raw read() returns EINTR 2657 # - third raw read() returns b"bar" 2658 self.assertEqual(decode(rio.read(6)), "foobar") 2659 finally: 2660 rio.close() 2661 os.close(w) 2662 os.close(r) 2663 2664 def test_interrupterd_read_retry_buffered(self): 2665 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 2666 mode="rb") 2667 2668 def test_interrupterd_read_retry_text(self): 2669 self.check_interrupted_read_retry(lambda x: x, 2670 mode="r") 2671 2672 @unittest.skipUnless(threading, 'Threading required for this test.') 2673 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 2674 """Check that a buffered write, when it gets interrupted (either 2675 returning a partial result or EINTR), properly invokes the signal 2676 handler and retries if the latter returned successfully.""" 2677 select = support.import_module("select") 2678 # A quantity that exceeds the buffer size of an anonymous pipe's 2679 # write end. 2680 N = 1024 * 1024 2681 r, w = os.pipe() 2682 fdopen_kwargs["closefd"] = False 2683 # We need a separate thread to read from the pipe and allow the 2684 # write() to finish. This thread is started after the SIGALRM is 2685 # received (forcing a first EINTR in write()). 2686 read_results = [] 2687 write_finished = False 2688 def _read(): 2689 while not write_finished: 2690 while r in select.select([r], [], [], 1.0)[0]: 2691 s = os.read(r, 1024) 2692 read_results.append(s) 2693 t = threading.Thread(target=_read) 2694 t.daemon = True 2695 def alarm1(sig, frame): 2696 signal.signal(signal.SIGALRM, alarm2) 2697 signal.alarm(1) 2698 def alarm2(sig, frame): 2699 t.start() 2700 signal.signal(signal.SIGALRM, alarm1) 2701 try: 2702 wio = self.io.open(w, **fdopen_kwargs) 2703 signal.alarm(1) 2704 # Expected behaviour: 2705 # - first raw write() is partial (because of the limited pipe buffer 2706 # and the first alarm) 2707 # - second raw write() returns EINTR (because of the second alarm) 2708 # - subsequent write()s are successful (either partial or complete) 2709 self.assertEqual(N, wio.write(item * N)) 2710 wio.flush() 2711 write_finished = True 2712 t.join() 2713 self.assertEqual(N, sum(len(x) for x in read_results)) 2714 finally: 2715 write_finished = True 2716 os.close(w) 2717 os.close(r) 2718 # This is deliberate. If we didn't close the file descriptor 2719 # before closing wio, wio would try to flush its internal 2720 # buffer, and could block (in case of failure). 2721 try: 2722 wio.close() 2723 except IOError as e: 2724 if e.errno != errno.EBADF: 2725 raise 2726 2727 def test_interrupterd_write_retry_buffered(self): 2728 self.check_interrupted_write_retry(b"x", mode="wb") 2729 2730 def test_interrupterd_write_retry_text(self): 2731 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 2732 2733 2734 class CSignalsTest(SignalsTest): 2735 io = io 2736 2737 class PySignalsTest(SignalsTest): 2738 io = pyio 2739 2740 # Handling reentrancy issues would slow down _pyio even more, so the 2741 # tests are disabled. 2742 test_reentrant_write_buffered = None 2743 test_reentrant_write_text = None 2744 2745 2746 def test_main(): 2747 tests = (CIOTest, PyIOTest, 2748 CBufferedReaderTest, PyBufferedReaderTest, 2749 CBufferedWriterTest, PyBufferedWriterTest, 2750 CBufferedRWPairTest, PyBufferedRWPairTest, 2751 CBufferedRandomTest, PyBufferedRandomTest, 2752 StatefulIncrementalDecoderTest, 2753 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 2754 CTextIOWrapperTest, PyTextIOWrapperTest, 2755 CMiscIOTest, PyMiscIOTest, 2756 CSignalsTest, PySignalsTest, 2757 ) 2758 2759 # Put the namespaces of the IO module we are testing and some useful mock 2760 # classes in the __dict__ of each test. 2761 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 2762 MockNonBlockWriterIO, MockRawIOWithoutRead) 2763 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 2764 c_io_ns = dict((name, getattr(io, name)) for name in all_members) 2765 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members) 2766 globs = globals() 2767 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 2768 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 2769 # Avoid turning open into a bound method. 2770 py_io_ns["open"] = pyio.OpenWrapper 2771 for test in tests: 2772 if test.__name__.startswith("C"): 2773 for name, obj in c_io_ns.items(): 2774 setattr(test, name, obj) 2775 elif test.__name__.startswith("Py"): 2776 for name, obj in py_io_ns.items(): 2777 setattr(test, name, obj) 2778 2779 support.run_unittest(*tests) 2780 2781 if __name__ == "__main__": 2782 test_main() 2783