Home | History | Annotate | Download | only in test
      1 """Unit tests for the io module."""
      2 
      3 # Tests of io are scattered over the test suite:
      4 # * test_bufio - tests file buffering
      5 # * test_memoryio - tests BytesIO and StringIO
      6 # * test_fileio - tests FileIO
      7 # * test_file - tests the file interface
      8 # * test_io - tests everything else in the io module
      9 # * test_univnewlines - tests universal newline support
     10 # * test_largefile - tests operations on a file greater than 2**32 bytes
     11 #     (only enabled with -ulargefile)
     12 
     13 ################################################################################
     14 # ATTENTION TEST WRITERS!!!
     15 ################################################################################
     16 # When writing tests for io, it's important to test both the C and Python
     17 # implementations. This is usually done by writing a base test that refers to
     18 # the type it is testing as an attribute. Then it provides custom subclasses to
     19 # test both implementations. This file has lots of examples.
     20 ################################################################################
     21 
     22 import abc
     23 import array
     24 import errno
     25 import locale
     26 import os
     27 import pickle
     28 import random
     29 import signal
     30 import sys
     31 import time
     32 import unittest
     33 import warnings
     34 import weakref
     35 from collections import deque, UserList
     36 from itertools import cycle, count
     37 from test import support
     38 from test.support.script_helper import assert_python_ok, run_python_until_end
     39 
     40 import codecs
     41 import io  # C implementation of io
     42 import _pyio as pyio # Python implementation of io
     43 try:
     44     import threading
     45 except ImportError:
     46     threading = None
     47 
     48 try:
     49     import ctypes
     50 except ImportError:
     51     def byteslike(*pos, **kw):
     52         return array.array("b", bytes(*pos, **kw))
     53 else:
     54     def byteslike(*pos, **kw):
     55         """Create a bytes-like object having no string or sequence methods"""
     56         data = bytes(*pos, **kw)
     57         obj = EmptyStruct()
     58         ctypes.resize(obj, len(data))
     59         memoryview(obj).cast("B")[:] = data
     60         return obj
     61     class EmptyStruct(ctypes.Structure):
     62         pass
     63 
     64 def _default_chunk_size():
     65     """Get the default TextIOWrapper chunk size"""
     66     with open(__file__, "r", encoding="latin-1") as f:
     67         return f._CHUNK_SIZE
     68 
     69 
     70 class MockRawIOWithoutRead:
     71     """A RawIO implementation without read(), so as to exercise the default
     72     RawIO.read() which calls readinto()."""
     73 
     74     def __init__(self, read_stack=()):
     75         self._read_stack = list(read_stack)
     76         self._write_stack = []
     77         self._reads = 0
     78         self._extraneous_reads = 0
     79 
     80     def write(self, b):
     81         self._write_stack.append(bytes(b))
     82         return len(b)
     83 
     84     def writable(self):
     85         return True
     86 
     87     def fileno(self):
     88         return 42
     89 
     90     def readable(self):
     91         return True
     92 
     93     def seekable(self):
     94         return True
     95 
     96     def seek(self, pos, whence):
     97         return 0   # wrong but we gotta return something
     98 
     99     def tell(self):
    100         return 0   # same comment as above
    101 
    102     def readinto(self, buf):
    103         self._reads += 1
    104         max_len = len(buf)
    105         try:
    106             data = self._read_stack[0]
    107         except IndexError:
    108             self._extraneous_reads += 1
    109             return 0
    110         if data is None:
    111             del self._read_stack[0]
    112             return None
    113         n = len(data)
    114         if len(data) <= max_len:
    115             del self._read_stack[0]
    116             buf[:n] = data
    117             return n
    118         else:
    119             buf[:] = data[:max_len]
    120             self._read_stack[0] = data[max_len:]
    121             return max_len
    122 
    123     def truncate(self, pos=None):
    124         return pos
    125 
    126 class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
    127     pass
    128 
    129 class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
    130     pass
    131 
    132 
    133 class MockRawIO(MockRawIOWithoutRead):
    134 
    135     def read(self, n=None):
    136         self._reads += 1
    137         try:
    138             return self._read_stack.pop(0)
    139         except:
    140             self._extraneous_reads += 1
    141             return b""
    142 
    143 class CMockRawIO(MockRawIO, io.RawIOBase):
    144     pass
    145 
    146 class PyMockRawIO(MockRawIO, pyio.RawIOBase):
    147     pass
    148 
    149 
    150 class MisbehavedRawIO(MockRawIO):
    151     def write(self, b):
    152         return super().write(b) * 2
    153 
    154     def read(self, n=None):
    155         return super().read(n) * 2
    156 
    157     def seek(self, pos, whence):
    158         return -123
    159 
    160     def tell(self):
    161         return -456
    162 
    163     def readinto(self, buf):
    164         super().readinto(buf)
    165         return len(buf) * 5
    166 
    167 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
    168     pass
    169 
    170 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
    171     pass
    172 
    173 
    174 class CloseFailureIO(MockRawIO):
    175     closed = 0
    176 
    177     def close(self):
    178         if not self.closed:
    179             self.closed = 1
    180             raise OSError
    181 
    182 class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
    183     pass
    184 
    185 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
    186     pass
    187 
    188 
    189 class MockFileIO:
    190 
    191     def __init__(self, data):
    192         self.read_history = []
    193         super().__init__(data)
    194 
    195     def read(self, n=None):
    196         res = super().read(n)
    197         self.read_history.append(None if res is None else len(res))
    198         return res
    199 
    200     def readinto(self, b):
    201         res = super().readinto(b)
    202         self.read_history.append(res)
    203         return res
    204 
    205 class CMockFileIO(MockFileIO, io.BytesIO):
    206     pass
    207 
    208 class PyMockFileIO(MockFileIO, pyio.BytesIO):
    209     pass
    210 
    211 
    212 class MockUnseekableIO:
    213     def seekable(self):
    214         return False
    215 
    216     def seek(self, *args):
    217         raise self.UnsupportedOperation("not seekable")
    218 
    219     def tell(self, *args):
    220         raise self.UnsupportedOperation("not seekable")
    221 
    222     def truncate(self, *args):
    223         raise self.UnsupportedOperation("not seekable")
    224 
    225 class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
    226     UnsupportedOperation = io.UnsupportedOperation
    227 
    228 class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
    229     UnsupportedOperation = pyio.UnsupportedOperation
    230 
    231 
    232 class MockNonBlockWriterIO:
    233 
    234     def __init__(self):
    235         self._write_stack = []
    236         self._blocker_char = None
    237 
    238     def pop_written(self):
    239         s = b"".join(self._write_stack)
    240         self._write_stack[:] = []
    241         return s
    242 
    243     def block_on(self, char):
    244         """Block when a given char is encountered."""
    245         self._blocker_char = char
    246 
    247     def readable(self):
    248         return True
    249 
    250     def seekable(self):
    251         return True
    252 
    253     def writable(self):
    254         return True
    255 
    256     def write(self, b):
    257         b = bytes(b)
    258         n = -1
    259         if self._blocker_char:
    260             try:
    261                 n = b.index(self._blocker_char)
    262             except ValueError:
    263                 pass
    264             else:
    265                 if n > 0:
    266                     # write data up to the first blocker
    267                     self._write_stack.append(b[:n])
    268                     return n
    269                 else:
    270                     # cancel blocker and indicate would block
    271                     self._blocker_char = None
    272                     return None
    273         self._write_stack.append(b)
    274         return len(b)
    275 
    276 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
    277     BlockingIOError = io.BlockingIOError
    278 
    279 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
    280     BlockingIOError = pyio.BlockingIOError
    281 
    282 
    283 class IOTest(unittest.TestCase):
    284 
    285     def setUp(self):
    286         support.unlink(support.TESTFN)
    287 
    288     def tearDown(self):
    289         support.unlink(support.TESTFN)
    290 
    291     def write_ops(self, f):
    292         self.assertEqual(f.write(b"blah."), 5)
    293         f.truncate(0)
    294         self.assertEqual(f.tell(), 5)
    295         f.seek(0)
    296 
    297         self.assertEqual(f.write(b"blah."), 5)
    298         self.assertEqual(f.seek(0), 0)
    299         self.assertEqual(f.write(b"Hello."), 6)
    300         self.assertEqual(f.tell(), 6)
    301         self.assertEqual(f.seek(-1, 1), 5)
    302         self.assertEqual(f.tell(), 5)
    303         buffer = bytearray(b" world\n\n\n")
    304         self.assertEqual(f.write(buffer), 9)
    305         buffer[:] = b"*" * 9  # Overwrite our copy of the data
    306         self.assertEqual(f.seek(0), 0)
    307         self.assertEqual(f.write(b"h"), 1)
    308         self.assertEqual(f.seek(-1, 2), 13)
    309         self.assertEqual(f.tell(), 13)
    310 
    311         self.assertEqual(f.truncate(12), 12)
    312         self.assertEqual(f.tell(), 13)
    313         self.assertRaises(TypeError, f.seek, 0.0)
    314 
    315     def read_ops(self, f, buffered=False):
    316         data = f.read(5)
    317         self.assertEqual(data, b"hello")
    318         data = byteslike(data)
    319         self.assertEqual(f.readinto(data), 5)
    320         self.assertEqual(bytes(data), b" worl")
    321         data = bytearray(5)
    322         self.assertEqual(f.readinto(data), 2)
    323         self.assertEqual(len(data), 5)
    324         self.assertEqual(data[:2], b"d\n")
    325         self.assertEqual(f.seek(0), 0)
    326         self.assertEqual(f.read(20), b"hello world\n")
    327         self.assertEqual(f.read(1), b"")
    328         self.assertEqual(f.readinto(byteslike(b"x")), 0)
    329         self.assertEqual(f.seek(-6, 2), 6)
    330         self.assertEqual(f.read(5), b"world")
    331         self.assertEqual(f.read(0), b"")
    332         self.assertEqual(f.readinto(byteslike()), 0)
    333         self.assertEqual(f.seek(-6, 1), 5)
    334         self.assertEqual(f.read(5), b" worl")
    335         self.assertEqual(f.tell(), 10)
    336         self.assertRaises(TypeError, f.seek, 0.0)
    337         if buffered:
    338             f.seek(0)
    339             self.assertEqual(f.read(), b"hello world\n")
    340             f.seek(6)
    341             self.assertEqual(f.read(), b"world\n")
    342             self.assertEqual(f.read(), b"")
    343             f.seek(0)
    344             data = byteslike(5)
    345             self.assertEqual(f.readinto1(data), 5)
    346             self.assertEqual(bytes(data), b"hello")
    347 
    348     LARGE = 2**31
    349 
    350     def large_file_ops(self, f):
    351         assert f.readable()
    352         assert f.writable()
    353         try:
    354             self.assertEqual(f.seek(self.LARGE), self.LARGE)
    355         except (OverflowError, ValueError):
    356             self.skipTest("no largefile support")
    357         self.assertEqual(f.tell(), self.LARGE)
    358         self.assertEqual(f.write(b"xxx"), 3)
    359         self.assertEqual(f.tell(), self.LARGE + 3)
    360         self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
    361         self.assertEqual(f.truncate(), self.LARGE + 2)
    362         self.assertEqual(f.tell(), self.LARGE + 2)
    363         self.assertEqual(f.seek(0, 2), self.LARGE + 2)
    364         self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
    365         self.assertEqual(f.tell(), self.LARGE + 2)
    366         self.assertEqual(f.seek(0, 2), self.LARGE + 1)
    367         self.assertEqual(f.seek(-1, 2), self.LARGE)
    368         self.assertEqual(f.read(2), b"x")
    369 
    370     def test_invalid_operations(self):
    371         # Try writing on a file opened in read mode and vice-versa.
    372         exc = self.UnsupportedOperation
    373         for mode in ("w", "wb"):
    374             with self.open(support.TESTFN, mode) as fp:
    375                 self.assertRaises(exc, fp.read)
    376                 self.assertRaises(exc, fp.readline)
    377         with self.open(support.TESTFN, "wb", buffering=0) as fp:
    378             self.assertRaises(exc, fp.read)
    379             self.assertRaises(exc, fp.readline)
    380         with self.open(support.TESTFN, "rb", buffering=0) as fp:
    381             self.assertRaises(exc, fp.write, b"blah")
    382             self.assertRaises(exc, fp.writelines, [b"blah\n"])
    383         with self.open(support.TESTFN, "rb") as fp:
    384             self.assertRaises(exc, fp.write, b"blah")
    385             self.assertRaises(exc, fp.writelines, [b"blah\n"])
    386         with self.open(support.TESTFN, "r") as fp:
    387             self.assertRaises(exc, fp.write, "blah")
    388             self.assertRaises(exc, fp.writelines, ["blah\n"])
    389             # Non-zero seeking from current or end pos
    390             self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
    391             self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
    392 
    393     def test_optional_abilities(self):
    394         # Test for OSError when optional APIs are not supported
    395         # The purpose of this test is to try fileno(), reading, writing and
    396         # seeking operations with various objects that indicate they do not
    397         # support these operations.
    398 
    399         def pipe_reader():
    400             [r, w] = os.pipe()
    401             os.close(w)  # So that read() is harmless
    402             return self.FileIO(r, "r")
    403 
    404         def pipe_writer():
    405             [r, w] = os.pipe()
    406             self.addCleanup(os.close, r)
    407             # Guarantee that we can write into the pipe without blocking
    408             thread = threading.Thread(target=os.read, args=(r, 100))
    409             thread.start()
    410             self.addCleanup(thread.join)
    411             return self.FileIO(w, "w")
    412 
    413         def buffered_reader():
    414             return self.BufferedReader(self.MockUnseekableIO())
    415 
    416         def buffered_writer():
    417             return self.BufferedWriter(self.MockUnseekableIO())
    418 
    419         def buffered_random():
    420             return self.BufferedRandom(self.BytesIO())
    421 
    422         def buffered_rw_pair():
    423             return self.BufferedRWPair(self.MockUnseekableIO(),
    424                 self.MockUnseekableIO())
    425 
    426         def text_reader():
    427             class UnseekableReader(self.MockUnseekableIO):
    428                 writable = self.BufferedIOBase.writable
    429                 write = self.BufferedIOBase.write
    430             return self.TextIOWrapper(UnseekableReader(), "ascii")
    431 
    432         def text_writer():
    433             class UnseekableWriter(self.MockUnseekableIO):
    434                 readable = self.BufferedIOBase.readable
    435                 read = self.BufferedIOBase.read
    436             return self.TextIOWrapper(UnseekableWriter(), "ascii")
    437 
    438         tests = (
    439             (pipe_reader, "fr"), (pipe_writer, "fw"),
    440             (buffered_reader, "r"), (buffered_writer, "w"),
    441             (buffered_random, "rws"), (buffered_rw_pair, "rw"),
    442             (text_reader, "r"), (text_writer, "w"),
    443             (self.BytesIO, "rws"), (self.StringIO, "rws"),
    444         )
    445         for [test, abilities] in tests:
    446             if test is pipe_writer and not threading:
    447                 continue  # Skip subtest that uses a background thread
    448             with self.subTest(test), test() as obj:
    449                 readable = "r" in abilities
    450                 self.assertEqual(obj.readable(), readable)
    451                 writable = "w" in abilities
    452                 self.assertEqual(obj.writable(), writable)
    453 
    454                 if isinstance(obj, self.TextIOBase):
    455                     data = "3"
    456                 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
    457                     data = b"3"
    458                 else:
    459                     self.fail("Unknown base class")
    460 
    461                 if "f" in abilities:
    462                     obj.fileno()
    463                 else:
    464                     self.assertRaises(OSError, obj.fileno)
    465 
    466                 if readable:
    467                     obj.read(1)
    468                     obj.read()
    469                 else:
    470                     self.assertRaises(OSError, obj.read, 1)
    471                     self.assertRaises(OSError, obj.read)
    472 
    473                 if writable:
    474                     obj.write(data)
    475                 else:
    476                     self.assertRaises(OSError, obj.write, data)
    477 
    478                 if sys.platform.startswith("win") and test in (
    479                         pipe_reader, pipe_writer):
    480                     # Pipes seem to appear as seekable on Windows
    481                     continue
    482                 seekable = "s" in abilities
    483                 self.assertEqual(obj.seekable(), seekable)
    484 
    485                 if seekable:
    486                     obj.tell()
    487                     obj.seek(0)
    488                 else:
    489                     self.assertRaises(OSError, obj.tell)
    490                     self.assertRaises(OSError, obj.seek, 0)
    491 
    492                 if writable and seekable:
    493                     obj.truncate()
    494                     obj.truncate(0)
    495                 else:
    496                     self.assertRaises(OSError, obj.truncate)
    497                     self.assertRaises(OSError, obj.truncate, 0)
    498 
    499     def test_open_handles_NUL_chars(self):
    500         fn_with_NUL = 'foo\0bar'
    501         self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
    502 
    503         bytes_fn = bytes(fn_with_NUL, 'ascii')
    504         with warnings.catch_warnings():
    505             warnings.simplefilter("ignore", DeprecationWarning)
    506             self.assertRaises(ValueError, self.open, bytes_fn, 'w')
    507 
    508     def test_raw_file_io(self):
    509         with self.open(support.TESTFN, "wb", buffering=0) as f:
    510             self.assertEqual(f.readable(), False)
    511             self.assertEqual(f.writable(), True)
    512             self.assertEqual(f.seekable(), True)
    513             self.write_ops(f)
    514         with self.open(support.TESTFN, "rb", buffering=0) as f:
    515             self.assertEqual(f.readable(), True)
    516             self.assertEqual(f.writable(), False)
    517             self.assertEqual(f.seekable(), True)
    518             self.read_ops(f)
    519 
    520     def test_buffered_file_io(self):
    521         with self.open(support.TESTFN, "wb") as f:
    522             self.assertEqual(f.readable(), False)
    523             self.assertEqual(f.writable(), True)
    524             self.assertEqual(f.seekable(), True)
    525             self.write_ops(f)
    526         with self.open(support.TESTFN, "rb") as f:
    527             self.assertEqual(f.readable(), True)
    528             self.assertEqual(f.writable(), False)
    529             self.assertEqual(f.seekable(), True)
    530             self.read_ops(f, True)
    531 
    532     def test_readline(self):
    533         with self.open(support.TESTFN, "wb") as f:
    534             f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
    535         with self.open(support.TESTFN, "rb") as f:
    536             self.assertEqual(f.readline(), b"abc\n")
    537             self.assertEqual(f.readline(10), b"def\n")
    538             self.assertEqual(f.readline(2), b"xy")
    539             self.assertEqual(f.readline(4), b"zzy\n")
    540             self.assertEqual(f.readline(), b"foo\x00bar\n")
    541             self.assertEqual(f.readline(None), b"another line")
    542             self.assertRaises(TypeError, f.readline, 5.3)
    543         with self.open(support.TESTFN, "r") as f:
    544             self.assertRaises(TypeError, f.readline, 5.3)
    545 
    546     def test_raw_bytes_io(self):
    547         f = self.BytesIO()
    548         self.write_ops(f)
    549         data = f.getvalue()
    550         self.assertEqual(data, b"hello world\n")
    551         f = self.BytesIO(data)
    552         self.read_ops(f, True)
    553 
    554     def test_large_file_ops(self):
    555         # On Windows and Mac OSX this test comsumes large resources; It takes
    556         # a long time to build the >2GB file and takes >2GB of disk space
    557         # therefore the resource must be enabled to run this test.
    558         if sys.platform[:3] == 'win' or sys.platform == 'darwin':
    559             support.requires(
    560                 'largefile',
    561                 'test requires %s bytes and a long time to run' % self.LARGE)
    562         with self.open(support.TESTFN, "w+b", 0) as f:
    563             self.large_file_ops(f)
    564         with self.open(support.TESTFN, "w+b") as f:
    565             self.large_file_ops(f)
    566 
    567     def test_with_open(self):
    568         for bufsize in (0, 1, 100):
    569             f = None
    570             with self.open(support.TESTFN, "wb", bufsize) as f:
    571                 f.write(b"xxx")
    572             self.assertEqual(f.closed, True)
    573             f = None
    574             try:
    575                 with self.open(support.TESTFN, "wb", bufsize) as f:
    576                     1/0
    577             except ZeroDivisionError:
    578                 self.assertEqual(f.closed, True)
    579             else:
    580                 self.fail("1/0 didn't raise an exception")
    581 
    582     # issue 5008
    583     def test_append_mode_tell(self):
    584         with self.open(support.TESTFN, "wb") as f:
    585             f.write(b"xxx")
    586         with self.open(support.TESTFN, "ab", buffering=0) as f:
    587             self.assertEqual(f.tell(), 3)
    588         with self.open(support.TESTFN, "ab") as f:
    589             self.assertEqual(f.tell(), 3)
    590         with self.open(support.TESTFN, "a") as f:
    591             self.assertGreater(f.tell(), 0)
    592 
    593     def test_destructor(self):
    594         record = []
    595         class MyFileIO(self.FileIO):
    596             def __del__(self):
    597                 record.append(1)
    598                 try:
    599                     f = super().__del__
    600                 except AttributeError:
    601                     pass
    602                 else:
    603                     f()
    604             def close(self):
    605                 record.append(2)
    606                 super().close()
    607             def flush(self):
    608                 record.append(3)
    609                 super().flush()
    610         with support.check_warnings(('', ResourceWarning)):
    611             f = MyFileIO(support.TESTFN, "wb")
    612             f.write(b"xxx")
    613             del f
    614             support.gc_collect()
    615             self.assertEqual(record, [1, 2, 3])
    616             with self.open(support.TESTFN, "rb") as f:
    617                 self.assertEqual(f.read(), b"xxx")
    618 
    619     def _check_base_destructor(self, base):
    620         record = []
    621         class MyIO(base):
    622             def __init__(self):
    623                 # This exercises the availability of attributes on object
    624                 # destruction.
    625                 # (in the C version, close() is called by the tp_dealloc
    626                 # function, not by __del__)
    627                 self.on_del = 1
    628                 self.on_close = 2
    629                 self.on_flush = 3
    630             def __del__(self):
    631                 record.append(self.on_del)
    632                 try:
    633                     f = super().__del__
    634                 except AttributeError:
    635                     pass
    636                 else:
    637                     f()
    638             def close(self):
    639                 record.append(self.on_close)
    640                 super().close()
    641             def flush(self):
    642                 record.append(self.on_flush)
    643                 super().flush()
    644         f = MyIO()
    645         del f
    646         support.gc_collect()
    647         self.assertEqual(record, [1, 2, 3])
    648 
    649     def test_IOBase_destructor(self):
    650         self._check_base_destructor(self.IOBase)
    651 
    652     def test_RawIOBase_destructor(self):
    653         self._check_base_destructor(self.RawIOBase)
    654 
    655     def test_BufferedIOBase_destructor(self):
    656         self._check_base_destructor(self.BufferedIOBase)
    657 
    658     def test_TextIOBase_destructor(self):
    659         self._check_base_destructor(self.TextIOBase)
    660 
    661     def test_close_flushes(self):
    662         with self.open(support.TESTFN, "wb") as f:
    663             f.write(b"xxx")
    664         with self.open(support.TESTFN, "rb") as f:
    665             self.assertEqual(f.read(), b"xxx")
    666 
    667     def test_array_writes(self):
    668         a = array.array('i', range(10))
    669         n = len(a.tobytes())
    670         def check(f):
    671             with f:
    672                 self.assertEqual(f.write(a), n)
    673                 f.writelines((a,))
    674         check(self.BytesIO())
    675         check(self.FileIO(support.TESTFN, "w"))
    676         check(self.BufferedWriter(self.MockRawIO()))
    677         check(self.BufferedRandom(self.MockRawIO()))
    678         check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
    679 
    680     def test_closefd(self):
    681         self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
    682                           closefd=False)
    683 
    684     def test_read_closed(self):
    685         with self.open(support.TESTFN, "w") as f:
    686             f.write("egg\n")
    687         with self.open(support.TESTFN, "r") as f:
    688             file = self.open(f.fileno(), "r", closefd=False)
    689             self.assertEqual(file.read(), "egg\n")
    690             file.seek(0)
    691             file.close()
    692             self.assertRaises(ValueError, file.read)
    693 
    694     def test_no_closefd_with_filename(self):
    695         # can't use closefd in combination with a file name
    696         self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
    697 
    698     def test_closefd_attr(self):
    699         with self.open(support.TESTFN, "wb") as f:
    700             f.write(b"egg\n")
    701         with self.open(support.TESTFN, "r") as f:
    702             self.assertEqual(f.buffer.raw.closefd, True)
    703             file = self.open(f.fileno(), "r", closefd=False)
    704             self.assertEqual(file.buffer.raw.closefd, False)
    705 
    706     def test_garbage_collection(self):
    707         # FileIO objects are collected, and collecting them flushes
    708         # all data to disk.
    709         with support.check_warnings(('', ResourceWarning)):
    710             f = self.FileIO(support.TESTFN, "wb")
    711             f.write(b"abcxxx")
    712             f.f = f
    713             wr = weakref.ref(f)
    714             del f
    715             support.gc_collect()
    716         self.assertIsNone(wr(), wr)
    717         with self.open(support.TESTFN, "rb") as f:
    718             self.assertEqual(f.read(), b"abcxxx")
    719 
    720     def test_unbounded_file(self):
    721         # Issue #1174606: reading from an unbounded stream such as /dev/zero.
    722         zero = "/dev/zero"
    723         if not os.path.exists(zero):
    724             self.skipTest("{0} does not exist".format(zero))
    725         if sys.maxsize > 0x7FFFFFFF:
    726             self.skipTest("test can only run in a 32-bit address space")
    727         if support.real_max_memuse < support._2G:
    728             self.skipTest("test requires at least 2GB of memory")
    729         with self.open(zero, "rb", buffering=0) as f:
    730             self.assertRaises(OverflowError, f.read)
    731         with self.open(zero, "rb") as f:
    732             self.assertRaises(OverflowError, f.read)
    733         with self.open(zero, "r") as f:
    734             self.assertRaises(OverflowError, f.read)
    735 
    736     def check_flush_error_on_close(self, *args, **kwargs):
    737         # Test that the file is closed despite failed flush
    738         # and that flush() is called before file closed.
    739         f = self.open(*args, **kwargs)
    740         closed = []
    741         def bad_flush():
    742             closed[:] = [f.closed]
    743             raise OSError()
    744         f.flush = bad_flush
    745         self.assertRaises(OSError, f.close) # exception not swallowed
    746         self.assertTrue(f.closed)
    747         self.assertTrue(closed)      # flush() called
    748         self.assertFalse(closed[0])  # flush() called before file closed
    749         f.flush = lambda: None  # break reference loop
    750 
    751     def test_flush_error_on_close(self):
    752         # raw file
    753         # Issue #5700: io.FileIO calls flush() after file closed
    754         self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
    755         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    756         self.check_flush_error_on_close(fd, 'wb', buffering=0)
    757         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    758         self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
    759         os.close(fd)
    760         # buffered io
    761         self.check_flush_error_on_close(support.TESTFN, 'wb')
    762         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    763         self.check_flush_error_on_close(fd, 'wb')
    764         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    765         self.check_flush_error_on_close(fd, 'wb', closefd=False)
    766         os.close(fd)
    767         # text io
    768         self.check_flush_error_on_close(support.TESTFN, 'w')
    769         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    770         self.check_flush_error_on_close(fd, 'w')
    771         fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
    772         self.check_flush_error_on_close(fd, 'w', closefd=False)
    773         os.close(fd)
    774 
    775     def test_multi_close(self):
    776         f = self.open(support.TESTFN, "wb", buffering=0)
    777         f.close()
    778         f.close()
    779         f.close()
    780         self.assertRaises(ValueError, f.flush)
    781 
    782     def test_RawIOBase_read(self):
    783         # Exercise the default RawIOBase.read() implementation (which calls
    784         # readinto() internally).
    785         rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
    786         self.assertEqual(rawio.read(2), b"ab")
    787         self.assertEqual(rawio.read(2), b"c")
    788         self.assertEqual(rawio.read(2), b"d")
    789         self.assertEqual(rawio.read(2), None)
    790         self.assertEqual(rawio.read(2), b"ef")
    791         self.assertEqual(rawio.read(2), b"g")
    792         self.assertEqual(rawio.read(2), None)
    793         self.assertEqual(rawio.read(2), b"")
    794 
    795     def test_types_have_dict(self):
    796         test = (
    797             self.IOBase(),
    798             self.RawIOBase(),
    799             self.TextIOBase(),
    800             self.StringIO(),
    801             self.BytesIO()
    802         )
    803         for obj in test:
    804             self.assertTrue(hasattr(obj, "__dict__"))
    805 
    806     def test_opener(self):
    807         with self.open(support.TESTFN, "w") as f:
    808             f.write("egg\n")
    809         fd = os.open(support.TESTFN, os.O_RDONLY)
    810         def opener(path, flags):
    811             return fd
    812         with self.open("non-existent", "r", opener=opener) as f:
    813             self.assertEqual(f.read(), "egg\n")
    814 
    815     def test_bad_opener_negative_1(self):
    816         # Issue #27066.
    817         def badopener(fname, flags):
    818             return -1
    819         with self.assertRaises(ValueError) as cm:
    820             open('non-existent', 'r', opener=badopener)
    821         self.assertEqual(str(cm.exception), 'opener returned -1')
    822 
    823     def test_bad_opener_other_negative(self):
    824         # Issue #27066.
    825         def badopener(fname, flags):
    826             return -2
    827         with self.assertRaises(ValueError) as cm:
    828             open('non-existent', 'r', opener=badopener)
    829         self.assertEqual(str(cm.exception), 'opener returned -2')
    830 
    831     def test_fileio_closefd(self):
    832         # Issue #4841
    833         with self.open(__file__, 'rb') as f1, \
    834              self.open(__file__, 'rb') as f2:
    835             fileio = self.FileIO(f1.fileno(), closefd=False)
    836             # .__init__() must not close f1
    837             fileio.__init__(f2.fileno(), closefd=False)
    838             f1.readline()
    839             # .close() must not close f2
    840             fileio.close()
    841             f2.readline()
    842 
    843     def test_nonbuffered_textio(self):
    844         with support.check_no_resource_warning(self):
    845             with self.assertRaises(ValueError):
    846                 self.open(support.TESTFN, 'w', buffering=0)
    847 
    848     def test_invalid_newline(self):
    849         with support.check_no_resource_warning(self):
    850             with self.assertRaises(ValueError):
    851                 self.open(support.TESTFN, 'w', newline='invalid')
    852 
    853     def test_buffered_readinto_mixin(self):
    854         # Test the implementation provided by BufferedIOBase
    855         class Stream(self.BufferedIOBase):
    856             def read(self, size):
    857                 return b"12345"
    858             read1 = read
    859         stream = Stream()
    860         for method in ("readinto", "readinto1"):
    861             with self.subTest(method):
    862                 buffer = byteslike(5)
    863                 self.assertEqual(getattr(stream, method)(buffer), 5)
    864                 self.assertEqual(bytes(buffer), b"12345")
    865 
    866     def test_fspath_support(self):
    867         class PathLike:
    868             def __init__(self, path):
    869                 self.path = path
    870 
    871             def __fspath__(self):
    872                 return self.path
    873 
    874         def check_path_succeeds(path):
    875             with self.open(path, "w") as f:
    876                 f.write("egg\n")
    877 
    878             with self.open(path, "r") as f:
    879                 self.assertEqual(f.read(), "egg\n")
    880 
    881         check_path_succeeds(PathLike(support.TESTFN))
    882         check_path_succeeds(PathLike(support.TESTFN.encode('utf-8')))
    883 
    884         bad_path = PathLike(TypeError)
    885         with self.assertRaises(TypeError):
    886             self.open(bad_path, 'w')
    887 
    888         # ensure that refcounting is correct with some error conditions
    889         with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
    890             self.open(PathLike(support.TESTFN), 'rwxa')
    891 
    892 
    893 class CIOTest(IOTest):
    894 
    895     def test_IOBase_finalize(self):
    896         # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
    897         # class which inherits IOBase and an object of this class are caught
    898         # in a reference cycle and close() is already in the method cache.
    899         class MyIO(self.IOBase):
    900             def close(self):
    901                 pass
    902 
    903         # create an instance to populate the method cache
    904         MyIO()
    905         obj = MyIO()
    906         obj.obj = obj
    907         wr = weakref.ref(obj)
    908         del MyIO
    909         del obj
    910         support.gc_collect()
    911         self.assertIsNone(wr(), wr)
    912 
    913 class PyIOTest(IOTest):
    914     pass
    915 
    916 
    917 @support.cpython_only
    918 class APIMismatchTest(unittest.TestCase):
    919 
    920     def test_RawIOBase_io_in_pyio_match(self):
    921         """Test that pyio RawIOBase class has all c RawIOBase methods"""
    922         mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
    923                                                ignore=('__weakref__',))
    924         self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
    925 
    926     def test_RawIOBase_pyio_in_io_match(self):
    927         """Test that c RawIOBase class has all pyio RawIOBase methods"""
    928         mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
    929         self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
    930 
    931 
    932 class CommonBufferedTests:
    933     # Tests common to BufferedReader, BufferedWriter and BufferedRandom
    934 
    935     def test_detach(self):
    936         raw = self.MockRawIO()
    937         buf = self.tp(raw)
    938         self.assertIs(buf.detach(), raw)
    939         self.assertRaises(ValueError, buf.detach)
    940 
    941         repr(buf)  # Should still work
    942 
    943     def test_fileno(self):
    944         rawio = self.MockRawIO()
    945         bufio = self.tp(rawio)
    946 
    947         self.assertEqual(42, bufio.fileno())
    948 
    949     def test_invalid_args(self):
    950         rawio = self.MockRawIO()
    951         bufio = self.tp(rawio)
    952         # Invalid whence
    953         self.assertRaises(ValueError, bufio.seek, 0, -1)
    954         self.assertRaises(ValueError, bufio.seek, 0, 9)
    955 
    956     def test_override_destructor(self):
    957         tp = self.tp
    958         record = []
    959         class MyBufferedIO(tp):
    960             def __del__(self):
    961                 record.append(1)
    962                 try:
    963                     f = super().__del__
    964                 except AttributeError:
    965                     pass
    966                 else:
    967                     f()
    968             def close(self):
    969                 record.append(2)
    970                 super().close()
    971             def flush(self):
    972                 record.append(3)
    973                 super().flush()
    974         rawio = self.MockRawIO()
    975         bufio = MyBufferedIO(rawio)
    976         del bufio
    977         support.gc_collect()
    978         self.assertEqual(record, [1, 2, 3])
    979 
    980     def test_context_manager(self):
    981         # Test usability as a context manager
    982         rawio = self.MockRawIO()
    983         bufio = self.tp(rawio)
    984         def _with():
    985             with bufio:
    986                 pass
    987         _with()
    988         # bufio should now be closed, and using it a second time should raise
    989         # a ValueError.
    990         self.assertRaises(ValueError, _with)
    991 
    992     def test_error_through_destructor(self):
    993         # Test that the exception state is not modified by a destructor,
    994         # even if close() fails.
    995         rawio = self.CloseFailureIO()
    996         def f():
    997             self.tp(rawio).xyzzy
    998         with support.captured_output("stderr") as s:
    999             self.assertRaises(AttributeError, f)
   1000         s = s.getvalue().strip()
   1001         if s:
   1002             # The destructor *may* have printed an unraisable error, check it
   1003             self.assertEqual(len(s.splitlines()), 1)
   1004             self.assertTrue(s.startswith("Exception OSError: "), s)
   1005             self.assertTrue(s.endswith(" ignored"), s)
   1006 
   1007     def test_repr(self):
   1008         raw = self.MockRawIO()
   1009         b = self.tp(raw)
   1010         clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
   1011         self.assertEqual(repr(b), "<%s>" % clsname)
   1012         raw.name = "dummy"
   1013         self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
   1014         raw.name = b"dummy"
   1015         self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
   1016 
   1017     def test_flush_error_on_close(self):
   1018         # Test that buffered file is closed despite failed flush
   1019         # and that flush() is called before file closed.
   1020         raw = self.MockRawIO()
   1021         closed = []
   1022         def bad_flush():
   1023             closed[:] = [b.closed, raw.closed]
   1024             raise OSError()
   1025         raw.flush = bad_flush
   1026         b = self.tp(raw)
   1027         self.assertRaises(OSError, b.close) # exception not swallowed
   1028         self.assertTrue(b.closed)
   1029         self.assertTrue(raw.closed)
   1030         self.assertTrue(closed)      # flush() called
   1031         self.assertFalse(closed[0])  # flush() called before file closed
   1032         self.assertFalse(closed[1])
   1033         raw.flush = lambda: None  # break reference loop
   1034 
   1035     def test_close_error_on_close(self):
   1036         raw = self.MockRawIO()
   1037         def bad_flush():
   1038             raise OSError('flush')
   1039         def bad_close():
   1040             raise OSError('close')
   1041         raw.close = bad_close
   1042         b = self.tp(raw)
   1043         b.flush = bad_flush
   1044         with self.assertRaises(OSError) as err: # exception not swallowed
   1045             b.close()
   1046         self.assertEqual(err.exception.args, ('close',))
   1047         self.assertIsInstance(err.exception.__context__, OSError)
   1048         self.assertEqual(err.exception.__context__.args, ('flush',))
   1049         self.assertFalse(b.closed)
   1050 
   1051     def test_nonnormalized_close_error_on_close(self):
   1052         # Issue #21677
   1053         raw = self.MockRawIO()
   1054         def bad_flush():
   1055             raise non_existing_flush
   1056         def bad_close():
   1057             raise non_existing_close
   1058         raw.close = bad_close
   1059         b = self.tp(raw)
   1060         b.flush = bad_flush
   1061         with self.assertRaises(NameError) as err: # exception not swallowed
   1062             b.close()
   1063         self.assertIn('non_existing_close', str(err.exception))
   1064         self.assertIsInstance(err.exception.__context__, NameError)
   1065         self.assertIn('non_existing_flush', str(err.exception.__context__))
   1066         self.assertFalse(b.closed)
   1067 
   1068     def test_multi_close(self):
   1069         raw = self.MockRawIO()
   1070         b = self.tp(raw)
   1071         b.close()
   1072         b.close()
   1073         b.close()
   1074         self.assertRaises(ValueError, b.flush)
   1075 
   1076     def test_unseekable(self):
   1077         bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
   1078         self.assertRaises(self.UnsupportedOperation, bufio.tell)
   1079         self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
   1080 
   1081     def test_readonly_attributes(self):
   1082         raw = self.MockRawIO()
   1083         buf = self.tp(raw)
   1084         x = self.MockRawIO()
   1085         with self.assertRaises(AttributeError):
   1086             buf.raw = x
   1087 
   1088 
   1089 class SizeofTest:
   1090 
   1091     @support.cpython_only
   1092     def test_sizeof(self):
   1093         bufsize1 = 4096
   1094         bufsize2 = 8192
   1095         rawio = self.MockRawIO()
   1096         bufio = self.tp(rawio, buffer_size=bufsize1)
   1097         size = sys.getsizeof(bufio) - bufsize1
   1098         rawio = self.MockRawIO()
   1099         bufio = self.tp(rawio, buffer_size=bufsize2)
   1100         self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
   1101 
   1102     @support.cpython_only
   1103     def test_buffer_freeing(self) :
   1104         bufsize = 4096
   1105         rawio = self.MockRawIO()
   1106         bufio = self.tp(rawio, buffer_size=bufsize)
   1107         size = sys.getsizeof(bufio) - bufsize
   1108         bufio.close()
   1109         self.assertEqual(sys.getsizeof(bufio), size)
   1110 
   1111 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
   1112     read_mode = "rb"
   1113 
   1114     def test_constructor(self):
   1115         rawio = self.MockRawIO([b"abc"])
   1116         bufio = self.tp(rawio)
   1117         bufio.__init__(rawio)
   1118         bufio.__init__(rawio, buffer_size=1024)
   1119         bufio.__init__(rawio, buffer_size=16)
   1120         self.assertEqual(b"abc", bufio.read())
   1121         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
   1122         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
   1123         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
   1124         rawio = self.MockRawIO([b"abc"])
   1125         bufio.__init__(rawio)
   1126         self.assertEqual(b"abc", bufio.read())
   1127 
   1128     def test_uninitialized(self):
   1129         bufio = self.tp.__new__(self.tp)
   1130         del bufio
   1131         bufio = self.tp.__new__(self.tp)
   1132         self.assertRaisesRegex((ValueError, AttributeError),
   1133                                'uninitialized|has no attribute',
   1134                                bufio.read, 0)
   1135         bufio.__init__(self.MockRawIO())
   1136         self.assertEqual(bufio.read(0), b'')
   1137 
   1138     def test_read(self):
   1139         for arg in (None, 7):
   1140             rawio = self.MockRawIO((b"abc", b"d", b"efg"))
   1141             bufio = self.tp(rawio)
   1142             self.assertEqual(b"abcdefg", bufio.read(arg))
   1143         # Invalid args
   1144         self.assertRaises(ValueError, bufio.read, -2)
   1145 
   1146     def test_read1(self):
   1147         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
   1148         bufio = self.tp(rawio)
   1149         self.assertEqual(b"a", bufio.read(1))
   1150         self.assertEqual(b"b", bufio.read1(1))
   1151         self.assertEqual(rawio._reads, 1)
   1152         self.assertEqual(b"c", bufio.read1(100))
   1153         self.assertEqual(rawio._reads, 1)
   1154         self.assertEqual(b"d", bufio.read1(100))
   1155         self.assertEqual(rawio._reads, 2)
   1156         self.assertEqual(b"efg", bufio.read1(100))
   1157         self.assertEqual(rawio._reads, 3)
   1158         self.assertEqual(b"", bufio.read1(100))
   1159         self.assertEqual(rawio._reads, 4)
   1160         # Invalid args
   1161         self.assertRaises(ValueError, bufio.read1, -1)
   1162 
   1163     def test_readinto(self):
   1164         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
   1165         bufio = self.tp(rawio)
   1166         b = bytearray(2)
   1167         self.assertEqual(bufio.readinto(b), 2)
   1168         self.assertEqual(b, b"ab")
   1169         self.assertEqual(bufio.readinto(b), 2)
   1170         self.assertEqual(b, b"cd")
   1171         self.assertEqual(bufio.readinto(b), 2)
   1172         self.assertEqual(b, b"ef")
   1173         self.assertEqual(bufio.readinto(b), 1)
   1174         self.assertEqual(b, b"gf")
   1175         self.assertEqual(bufio.readinto(b), 0)
   1176         self.assertEqual(b, b"gf")
   1177         rawio = self.MockRawIO((b"abc", None))
   1178         bufio = self.tp(rawio)
   1179         self.assertEqual(bufio.readinto(b), 2)
   1180         self.assertEqual(b, b"ab")
   1181         self.assertEqual(bufio.readinto(b), 1)
   1182         self.assertEqual(b, b"cb")
   1183 
   1184     def test_readinto1(self):
   1185         buffer_size = 10
   1186         rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
   1187         bufio = self.tp(rawio, buffer_size=buffer_size)
   1188         b = bytearray(2)
   1189         self.assertEqual(bufio.peek(3), b'abc')
   1190         self.assertEqual(rawio._reads, 1)
   1191         self.assertEqual(bufio.readinto1(b), 2)
   1192         self.assertEqual(b, b"ab")
   1193         self.assertEqual(rawio._reads, 1)
   1194         self.assertEqual(bufio.readinto1(b), 1)
   1195         self.assertEqual(b[:1], b"c")
   1196         self.assertEqual(rawio._reads, 1)
   1197         self.assertEqual(bufio.readinto1(b), 2)
   1198         self.assertEqual(b, b"de")
   1199         self.assertEqual(rawio._reads, 2)
   1200         b = bytearray(2*buffer_size)
   1201         self.assertEqual(bufio.peek(3), b'fgh')
   1202         self.assertEqual(rawio._reads, 3)
   1203         self.assertEqual(bufio.readinto1(b), 6)
   1204         self.assertEqual(b[:6], b"fghjkl")
   1205         self.assertEqual(rawio._reads, 4)
   1206 
   1207     def test_readinto_array(self):
   1208         buffer_size = 60
   1209         data = b"a" * 26
   1210         rawio = self.MockRawIO((data,))
   1211         bufio = self.tp(rawio, buffer_size=buffer_size)
   1212 
   1213         # Create an array with element size > 1 byte
   1214         b = array.array('i', b'x' * 32)
   1215         assert len(b) != 16
   1216 
   1217         # Read into it. We should get as many *bytes* as we can fit into b
   1218         # (which is more than the number of elements)
   1219         n = bufio.readinto(b)
   1220         self.assertGreater(n, len(b))
   1221 
   1222         # Check that old contents of b are preserved
   1223         bm = memoryview(b).cast('B')
   1224         self.assertLess(n, len(bm))
   1225         self.assertEqual(bm[:n], data[:n])
   1226         self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
   1227 
   1228     def test_readinto1_array(self):
   1229         buffer_size = 60
   1230         data = b"a" * 26
   1231         rawio = self.MockRawIO((data,))
   1232         bufio = self.tp(rawio, buffer_size=buffer_size)
   1233 
   1234         # Create an array with element size > 1 byte
   1235         b = array.array('i', b'x' * 32)
   1236         assert len(b) != 16
   1237 
   1238         # Read into it. We should get as many *bytes* as we can fit into b
   1239         # (which is more than the number of elements)
   1240         n = bufio.readinto1(b)
   1241         self.assertGreater(n, len(b))
   1242 
   1243         # Check that old contents of b are preserved
   1244         bm = memoryview(b).cast('B')
   1245         self.assertLess(n, len(bm))
   1246         self.assertEqual(bm[:n], data[:n])
   1247         self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
   1248 
   1249     def test_readlines(self):
   1250         def bufio():
   1251             rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
   1252             return self.tp(rawio)
   1253         self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
   1254         self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
   1255         self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
   1256 
   1257     def test_buffering(self):
   1258         data = b"abcdefghi"
   1259         dlen = len(data)
   1260 
   1261         tests = [
   1262             [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
   1263             [ 100, [ 3, 3, 3],     [ dlen ]    ],
   1264             [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
   1265         ]
   1266 
   1267         for bufsize, buf_read_sizes, raw_read_sizes in tests:
   1268             rawio = self.MockFileIO(data)
   1269             bufio = self.tp(rawio, buffer_size=bufsize)
   1270             pos = 0
   1271             for nbytes in buf_read_sizes:
   1272                 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
   1273                 pos += nbytes
   1274             # this is mildly implementation-dependent
   1275             self.assertEqual(rawio.read_history, raw_read_sizes)
   1276 
   1277     def test_read_non_blocking(self):
   1278         # Inject some None's in there to simulate EWOULDBLOCK
   1279         rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
   1280         bufio = self.tp(rawio)
   1281         self.assertEqual(b"abcd", bufio.read(6))
   1282         self.assertEqual(b"e", bufio.read(1))
   1283         self.assertEqual(b"fg", bufio.read())
   1284         self.assertEqual(b"", bufio.peek(1))
   1285         self.assertIsNone(bufio.read())
   1286         self.assertEqual(b"", bufio.read())
   1287 
   1288         rawio = self.MockRawIO((b"a", None, None))
   1289         self.assertEqual(b"a", rawio.readall())
   1290         self.assertIsNone(rawio.readall())
   1291 
   1292     def test_read_past_eof(self):
   1293         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
   1294         bufio = self.tp(rawio)
   1295 
   1296         self.assertEqual(b"abcdefg", bufio.read(9000))
   1297 
   1298     def test_read_all(self):
   1299         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
   1300         bufio = self.tp(rawio)
   1301 
   1302         self.assertEqual(b"abcdefg", bufio.read())
   1303 
   1304     @unittest.skipUnless(threading, 'Threading required for this test.')
   1305     @support.requires_resource('cpu')
   1306     def test_threads(self):
   1307         try:
   1308             # Write out many bytes with exactly the same number of 0's,
   1309             # 1's... 255's. This will help us check that concurrent reading
   1310             # doesn't duplicate or forget contents.
   1311             N = 1000
   1312             l = list(range(256)) * N
   1313             random.shuffle(l)
   1314             s = bytes(bytearray(l))
   1315             with self.open(support.TESTFN, "wb") as f:
   1316                 f.write(s)
   1317             with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
   1318                 bufio = self.tp(raw, 8)
   1319                 errors = []
   1320                 results = []
   1321                 def f():
   1322                     try:
   1323                         # Intra-buffer read then buffer-flushing read
   1324                         for n in cycle([1, 19]):
   1325                             s = bufio.read(n)
   1326                             if not s:
   1327                                 break
   1328                             # list.append() is atomic
   1329                             results.append(s)
   1330                     except Exception as e:
   1331                         errors.append(e)
   1332                         raise
   1333                 threads = [threading.Thread(target=f) for x in range(20)]
   1334                 with support.start_threads(threads):
   1335                     time.sleep(0.02) # yield
   1336                 self.assertFalse(errors,
   1337                     "the following exceptions were caught: %r" % errors)
   1338                 s = b''.join(results)
   1339                 for i in range(256):
   1340                     c = bytes(bytearray([i]))
   1341                     self.assertEqual(s.count(c), N)
   1342         finally:
   1343             support.unlink(support.TESTFN)
   1344 
   1345     def test_unseekable(self):
   1346         bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
   1347         self.assertRaises(self.UnsupportedOperation, bufio.tell)
   1348         self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
   1349         bufio.read(1)
   1350         self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
   1351         self.assertRaises(self.UnsupportedOperation, bufio.tell)
   1352 
   1353     def test_misbehaved_io(self):
   1354         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
   1355         bufio = self.tp(rawio)
   1356         self.assertRaises(OSError, bufio.seek, 0)
   1357         self.assertRaises(OSError, bufio.tell)
   1358 
   1359     def test_no_extraneous_read(self):
   1360         # Issue #9550; when the raw IO object has satisfied the read request,
   1361         # we should not issue any additional reads, otherwise it may block
   1362         # (e.g. socket).
   1363         bufsize = 16
   1364         for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
   1365             rawio = self.MockRawIO([b"x" * n])
   1366             bufio = self.tp(rawio, bufsize)
   1367             self.assertEqual(bufio.read(n), b"x" * n)
   1368             # Simple case: one raw read is enough to satisfy the request.
   1369             self.assertEqual(rawio._extraneous_reads, 0,
   1370                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
   1371             # A more complex case where two raw reads are needed to satisfy
   1372             # the request.
   1373             rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
   1374             bufio = self.tp(rawio, bufsize)
   1375             self.assertEqual(bufio.read(n), b"x" * n)
   1376             self.assertEqual(rawio._extraneous_reads, 0,
   1377                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
   1378 
   1379     def test_read_on_closed(self):
   1380         # Issue #23796
   1381         b = io.BufferedReader(io.BytesIO(b"12"))
   1382         b.read(1)
   1383         b.close()
   1384         self.assertRaises(ValueError, b.peek)
   1385         self.assertRaises(ValueError, b.read1, 1)
   1386 
   1387 
   1388 class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
   1389     tp = io.BufferedReader
   1390 
   1391     def test_constructor(self):
   1392         BufferedReaderTest.test_constructor(self)
   1393         # The allocation can succeed on 32-bit builds, e.g. with more
   1394         # than 2GB RAM and a 64-bit kernel.
   1395         if sys.maxsize > 0x7FFFFFFF:
   1396             rawio = self.MockRawIO()
   1397             bufio = self.tp(rawio)
   1398             self.assertRaises((OverflowError, MemoryError, ValueError),
   1399                 bufio.__init__, rawio, sys.maxsize)
   1400 
   1401     def test_initialization(self):
   1402         rawio = self.MockRawIO([b"abc"])
   1403         bufio = self.tp(rawio)
   1404         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
   1405         self.assertRaises(ValueError, bufio.read)
   1406         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
   1407         self.assertRaises(ValueError, bufio.read)
   1408         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
   1409         self.assertRaises(ValueError, bufio.read)
   1410 
   1411     def test_misbehaved_io_read(self):
   1412         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
   1413         bufio = self.tp(rawio)
   1414         # _pyio.BufferedReader seems to implement reading different, so that
   1415         # checking this is not so easy.
   1416         self.assertRaises(OSError, bufio.read, 10)
   1417 
   1418     def test_garbage_collection(self):
   1419         # C BufferedReader objects are collected.
   1420         # The Python version has __del__, so it ends into gc.garbage instead
   1421         with support.check_warnings(('', ResourceWarning)):
   1422             rawio = self.FileIO(support.TESTFN, "w+b")
   1423             f = self.tp(rawio)
   1424             f.f = f
   1425             wr = weakref.ref(f)
   1426             del f
   1427             support.gc_collect()
   1428         self.assertIsNone(wr(), wr)
   1429 
   1430     def test_args_error(self):
   1431         # Issue #17275
   1432         with self.assertRaisesRegex(TypeError, "BufferedReader"):
   1433             self.tp(io.BytesIO(), 1024, 1024, 1024)
   1434 
   1435 
   1436 class PyBufferedReaderTest(BufferedReaderTest):
   1437     tp = pyio.BufferedReader
   1438 
   1439 
   1440 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
   1441     write_mode = "wb"
   1442 
   1443     def test_constructor(self):
   1444         rawio = self.MockRawIO()
   1445         bufio = self.tp(rawio)
   1446         bufio.__init__(rawio)
   1447         bufio.__init__(rawio, buffer_size=1024)
   1448         bufio.__init__(rawio, buffer_size=16)
   1449         self.assertEqual(3, bufio.write(b"abc"))
   1450         bufio.flush()
   1451         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
   1452         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
   1453         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
   1454         bufio.__init__(rawio)
   1455         self.assertEqual(3, bufio.write(b"ghi"))
   1456         bufio.flush()
   1457         self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
   1458 
   1459     def test_uninitialized(self):
   1460         bufio = self.tp.__new__(self.tp)
   1461         del bufio
   1462         bufio = self.tp.__new__(self.tp)
   1463         self.assertRaisesRegex((ValueError, AttributeError),
   1464                                'uninitialized|has no attribute',
   1465                                bufio.write, b'')
   1466         bufio.__init__(self.MockRawIO())
   1467         self.assertEqual(bufio.write(b''), 0)
   1468 
   1469     def test_detach_flush(self):
   1470         raw = self.MockRawIO()
   1471         buf = self.tp(raw)
   1472         buf.write(b"howdy!")
   1473         self.assertFalse(raw._write_stack)
   1474         buf.detach()
   1475         self.assertEqual(raw._write_stack, [b"howdy!"])
   1476 
   1477     def test_write(self):
   1478         # Write to the buffered IO but don't overflow the buffer.
   1479         writer = self.MockRawIO()
   1480         bufio = self.tp(writer, 8)
   1481         bufio.write(b"abc")
   1482         self.assertFalse(writer._write_stack)
   1483         buffer = bytearray(b"def")
   1484         bufio.write(buffer)
   1485         buffer[:] = b"***"  # Overwrite our copy of the data
   1486         bufio.flush()
   1487         self.assertEqual(b"".join(writer._write_stack), b"abcdef")
   1488 
   1489     def test_write_overflow(self):
   1490         writer = self.MockRawIO()
   1491         bufio = self.tp(writer, 8)
   1492         contents = b"abcdefghijklmnop"
   1493         for n in range(0, len(contents), 3):
   1494             bufio.write(contents[n:n+3])
   1495         flushed = b"".join(writer._write_stack)
   1496         # At least (total - 8) bytes were implicitly flushed, perhaps more
   1497         # depending on the implementation.
   1498         self.assertTrue(flushed.startswith(contents[:-8]), flushed)
   1499 
   1500     def check_writes(self, intermediate_func):
   1501         # Lots of writes, test the flushed output is as expected.
   1502         contents = bytes(range(256)) * 1000
   1503         n = 0
   1504         writer = self.MockRawIO()
   1505         bufio = self.tp(writer, 13)
   1506         # Generator of write sizes: repeat each N 15 times then proceed to N+1
   1507         def gen_sizes():
   1508             for size in count(1):
   1509                 for i in range(15):
   1510                     yield size
   1511         sizes = gen_sizes()
   1512         while n < len(contents):
   1513             size = min(next(sizes), len(contents) - n)
   1514             self.assertEqual(bufio.write(contents[n:n+size]), size)
   1515             intermediate_func(bufio)
   1516             n += size
   1517         bufio.flush()
   1518         self.assertEqual(contents, b"".join(writer._write_stack))
   1519 
   1520     def test_writes(self):
   1521         self.check_writes(lambda bufio: None)
   1522 
   1523     def test_writes_and_flushes(self):
   1524         self.check_writes(lambda bufio: bufio.flush())
   1525 
   1526     def test_writes_and_seeks(self):
   1527         def _seekabs(bufio):
   1528             pos = bufio.tell()
   1529             bufio.seek(pos + 1, 0)
   1530             bufio.seek(pos - 1, 0)
   1531             bufio.seek(pos, 0)
   1532         self.check_writes(_seekabs)
   1533         def _seekrel(bufio):
   1534             pos = bufio.seek(0, 1)
   1535             bufio.seek(+1, 1)
   1536             bufio.seek(-1, 1)
   1537             bufio.seek(pos, 0)
   1538         self.check_writes(_seekrel)
   1539 
   1540     def test_writes_and_truncates(self):
   1541         self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
   1542 
   1543     def test_write_non_blocking(self):
   1544         raw = self.MockNonBlockWriterIO()
   1545         bufio = self.tp(raw, 8)
   1546 
   1547         self.assertEqual(bufio.write(b"abcd"), 4)
   1548         self.assertEqual(bufio.write(b"efghi"), 5)
   1549         # 1 byte will be written, the rest will be buffered
   1550         raw.block_on(b"k")
   1551         self.assertEqual(bufio.write(b"jklmn"), 5)
   1552 
   1553         # 8 bytes will be written, 8 will be buffered and the rest will be lost
   1554         raw.block_on(b"0")
   1555         try:
   1556             bufio.write(b"opqrwxyz0123456789")
   1557         except self.BlockingIOError as e:
   1558             written = e.characters_written
   1559         else:
   1560             self.fail("BlockingIOError should have been raised")
   1561         self.assertEqual(written, 16)
   1562         self.assertEqual(raw.pop_written(),
   1563             b"abcdefghijklmnopqrwxyz")
   1564 
   1565         self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
   1566         s = raw.pop_written()
   1567         # Previously buffered bytes were flushed
   1568         self.assertTrue(s.startswith(b"01234567A"), s)
   1569 
   1570     def test_write_and_rewind(self):
   1571         raw = io.BytesIO()
   1572         bufio = self.tp(raw, 4)
   1573         self.assertEqual(bufio.write(b"abcdef"), 6)
   1574         self.assertEqual(bufio.tell(), 6)
   1575         bufio.seek(0, 0)
   1576         self.assertEqual(bufio.write(b"XY"), 2)
   1577         bufio.seek(6, 0)
   1578         self.assertEqual(raw.getvalue(), b"XYcdef")
   1579         self.assertEqual(bufio.write(b"123456"), 6)
   1580         bufio.flush()
   1581         self.assertEqual(raw.getvalue(), b"XYcdef123456")
   1582 
   1583     def test_flush(self):
   1584         writer = self.MockRawIO()
   1585         bufio = self.tp(writer, 8)
   1586         bufio.write(b"abc")
   1587         bufio.flush()
   1588         self.assertEqual(b"abc", writer._write_stack[0])
   1589 
   1590     def test_writelines(self):
   1591         l = [b'ab', b'cd', b'ef']
   1592         writer = self.MockRawIO()
   1593         bufio = self.tp(writer, 8)
   1594         bufio.writelines(l)
   1595         bufio.flush()
   1596         self.assertEqual(b''.join(writer._write_stack), b'abcdef')
   1597 
   1598     def test_writelines_userlist(self):
   1599         l = UserList([b'ab', b'cd', b'ef'])
   1600         writer = self.MockRawIO()
   1601         bufio = self.tp(writer, 8)
   1602         bufio.writelines(l)
   1603         bufio.flush()
   1604         self.assertEqual(b''.join(writer._write_stack), b'abcdef')
   1605 
   1606     def test_writelines_error(self):
   1607         writer = self.MockRawIO()
   1608         bufio = self.tp(writer, 8)
   1609         self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
   1610         self.assertRaises(TypeError, bufio.writelines, None)
   1611         self.assertRaises(TypeError, bufio.writelines, 'abc')
   1612 
   1613     def test_destructor(self):
   1614         writer = self.MockRawIO()
   1615         bufio = self.tp(writer, 8)
   1616         bufio.write(b"abc")
   1617         del bufio
   1618         support.gc_collect()
   1619         self.assertEqual(b"abc", writer._write_stack[0])
   1620 
   1621     def test_truncate(self):
   1622         # Truncate implicitly flushes the buffer.
   1623         with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
   1624             bufio = self.tp(raw, 8)
   1625             bufio.write(b"abcdef")
   1626             self.assertEqual(bufio.truncate(3), 3)
   1627             self.assertEqual(bufio.tell(), 6)
   1628         with self.open(support.TESTFN, "rb", buffering=0) as f:
   1629             self.assertEqual(f.read(), b"abc")
   1630 
   1631     @unittest.skipUnless(threading, 'Threading required for this test.')
   1632     @support.requires_resource('cpu')
   1633     def test_threads(self):
   1634         try:
   1635             # Write out many bytes from many threads and test they were
   1636             # all flushed.
   1637             N = 1000
   1638             contents = bytes(range(256)) * N
   1639             sizes = cycle([1, 19])
   1640             n = 0
   1641             queue = deque()
   1642             while n < len(contents):
   1643                 size = next(sizes)
   1644                 queue.append(contents[n:n+size])
   1645                 n += size
   1646             del contents
   1647             # We use a real file object because it allows us to
   1648             # exercise situations where the GIL is released before
   1649             # writing the buffer to the raw streams. This is in addition
   1650             # to concurrency issues due to switching threads in the middle
   1651             # of Python code.
   1652             with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
   1653                 bufio = self.tp(raw, 8)
   1654                 errors = []
   1655                 def f():
   1656                     try:
   1657                         while True:
   1658                             try:
   1659                                 s = queue.popleft()
   1660                             except IndexError:
   1661                                 return
   1662                             bufio.write(s)
   1663                     except Exception as e:
   1664                         errors.append(e)
   1665                         raise
   1666                 threads = [threading.Thread(target=f) for x in range(20)]
   1667                 with support.start_threads(threads):
   1668                     time.sleep(0.02) # yield
   1669                 self.assertFalse(errors,
   1670                     "the following exceptions were caught: %r" % errors)
   1671                 bufio.close()
   1672             with self.open(support.TESTFN, "rb") as f:
   1673                 s = f.read()
   1674             for i in range(256):
   1675                 self.assertEqual(s.count(bytes([i])), N)
   1676         finally:
   1677             support.unlink(support.TESTFN)
   1678 
   1679     def test_misbehaved_io(self):
   1680         rawio = self.MisbehavedRawIO()
   1681         bufio = self.tp(rawio, 5)
   1682         self.assertRaises(OSError, bufio.seek, 0)
   1683         self.assertRaises(OSError, bufio.tell)
   1684         self.assertRaises(OSError, bufio.write, b"abcdef")
   1685 
   1686     def test_max_buffer_size_removal(self):
   1687         with self.assertRaises(TypeError):
   1688             self.tp(self.MockRawIO(), 8, 12)
   1689 
   1690     def test_write_error_on_close(self):
   1691         raw = self.MockRawIO()
   1692         def bad_write(b):
   1693             raise OSError()
   1694         raw.write = bad_write
   1695         b = self.tp(raw)
   1696         b.write(b'spam')
   1697         self.assertRaises(OSError, b.close) # exception not swallowed
   1698         self.assertTrue(b.closed)
   1699 
   1700 
   1701 class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
   1702     tp = io.BufferedWriter
   1703 
   1704     def test_constructor(self):
   1705         BufferedWriterTest.test_constructor(self)
   1706         # The allocation can succeed on 32-bit builds, e.g. with more
   1707         # than 2GB RAM and a 64-bit kernel.
   1708         if sys.maxsize > 0x7FFFFFFF:
   1709             rawio = self.MockRawIO()
   1710             bufio = self.tp(rawio)
   1711             self.assertRaises((OverflowError, MemoryError, ValueError),
   1712                 bufio.__init__, rawio, sys.maxsize)
   1713 
   1714     def test_initialization(self):
   1715         rawio = self.MockRawIO()
   1716         bufio = self.tp(rawio)
   1717         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
   1718         self.assertRaises(ValueError, bufio.write, b"def")
   1719         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
   1720         self.assertRaises(ValueError, bufio.write, b"def")
   1721         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
   1722         self.assertRaises(ValueError, bufio.write, b"def")
   1723 
   1724     def test_garbage_collection(self):
   1725         # C BufferedWriter objects are collected, and collecting them flushes
   1726         # all data to disk.
   1727         # The Python version has __del__, so it ends into gc.garbage instead
   1728         with support.check_warnings(('', ResourceWarning)):
   1729             rawio = self.FileIO(support.TESTFN, "w+b")
   1730             f = self.tp(rawio)
   1731             f.write(b"123xxx")
   1732             f.x = f
   1733             wr = weakref.ref(f)
   1734             del f
   1735             support.gc_collect()
   1736         self.assertIsNone(wr(), wr)
   1737         with self.open(support.TESTFN, "rb") as f:
   1738             self.assertEqual(f.read(), b"123xxx")
   1739 
   1740     def test_args_error(self):
   1741         # Issue #17275
   1742         with self.assertRaisesRegex(TypeError, "BufferedWriter"):
   1743             self.tp(io.BytesIO(), 1024, 1024, 1024)
   1744 
   1745 
   1746 class PyBufferedWriterTest(BufferedWriterTest):
   1747     tp = pyio.BufferedWriter
   1748 
   1749 class BufferedRWPairTest(unittest.TestCase):
   1750 
   1751     def test_constructor(self):
   1752         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1753         self.assertFalse(pair.closed)
   1754 
   1755     def test_uninitialized(self):
   1756         pair = self.tp.__new__(self.tp)
   1757         del pair
   1758         pair = self.tp.__new__(self.tp)
   1759         self.assertRaisesRegex((ValueError, AttributeError),
   1760                                'uninitialized|has no attribute',
   1761                                pair.read, 0)
   1762         self.assertRaisesRegex((ValueError, AttributeError),
   1763                                'uninitialized|has no attribute',
   1764                                pair.write, b'')
   1765         pair.__init__(self.MockRawIO(), self.MockRawIO())
   1766         self.assertEqual(pair.read(0), b'')
   1767         self.assertEqual(pair.write(b''), 0)
   1768 
   1769     def test_detach(self):
   1770         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1771         self.assertRaises(self.UnsupportedOperation, pair.detach)
   1772 
   1773     def test_constructor_max_buffer_size_removal(self):
   1774         with self.assertRaises(TypeError):
   1775             self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
   1776 
   1777     def test_constructor_with_not_readable(self):
   1778         class NotReadable(MockRawIO):
   1779             def readable(self):
   1780                 return False
   1781 
   1782         self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
   1783 
   1784     def test_constructor_with_not_writeable(self):
   1785         class NotWriteable(MockRawIO):
   1786             def writable(self):
   1787                 return False
   1788 
   1789         self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
   1790 
   1791     def test_read(self):
   1792         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1793 
   1794         self.assertEqual(pair.read(3), b"abc")
   1795         self.assertEqual(pair.read(1), b"d")
   1796         self.assertEqual(pair.read(), b"ef")
   1797         pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
   1798         self.assertEqual(pair.read(None), b"abc")
   1799 
   1800     def test_readlines(self):
   1801         pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
   1802         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
   1803         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
   1804         self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
   1805 
   1806     def test_read1(self):
   1807         # .read1() is delegated to the underlying reader object, so this test
   1808         # can be shallow.
   1809         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1810 
   1811         self.assertEqual(pair.read1(3), b"abc")
   1812 
   1813     def test_readinto(self):
   1814         for method in ("readinto", "readinto1"):
   1815             with self.subTest(method):
   1816                 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1817 
   1818                 data = byteslike(b'\0' * 5)
   1819                 self.assertEqual(getattr(pair, method)(data), 5)
   1820                 self.assertEqual(bytes(data), b"abcde")
   1821 
   1822     def test_write(self):
   1823         w = self.MockRawIO()
   1824         pair = self.tp(self.MockRawIO(), w)
   1825 
   1826         pair.write(b"abc")
   1827         pair.flush()
   1828         buffer = bytearray(b"def")
   1829         pair.write(buffer)
   1830         buffer[:] = b"***"  # Overwrite our copy of the data
   1831         pair.flush()
   1832         self.assertEqual(w._write_stack, [b"abc", b"def"])
   1833 
   1834     def test_peek(self):
   1835         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1836 
   1837         self.assertTrue(pair.peek(3).startswith(b"abc"))
   1838         self.assertEqual(pair.read(3), b"abc")
   1839 
   1840     def test_readable(self):
   1841         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1842         self.assertTrue(pair.readable())
   1843 
   1844     def test_writeable(self):
   1845         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1846         self.assertTrue(pair.writable())
   1847 
   1848     def test_seekable(self):
   1849         # BufferedRWPairs are never seekable, even if their readers and writers
   1850         # are.
   1851         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1852         self.assertFalse(pair.seekable())
   1853 
   1854     # .flush() is delegated to the underlying writer object and has been
   1855     # tested in the test_write method.
   1856 
   1857     def test_close_and_closed(self):
   1858         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1859         self.assertFalse(pair.closed)
   1860         pair.close()
   1861         self.assertTrue(pair.closed)
   1862 
   1863     def test_reader_close_error_on_close(self):
   1864         def reader_close():
   1865             reader_non_existing
   1866         reader = self.MockRawIO()
   1867         reader.close = reader_close
   1868         writer = self.MockRawIO()
   1869         pair = self.tp(reader, writer)
   1870         with self.assertRaises(NameError) as err:
   1871             pair.close()
   1872         self.assertIn('reader_non_existing', str(err.exception))
   1873         self.assertTrue(pair.closed)
   1874         self.assertFalse(reader.closed)
   1875         self.assertTrue(writer.closed)
   1876 
   1877     def test_writer_close_error_on_close(self):
   1878         def writer_close():
   1879             writer_non_existing
   1880         reader = self.MockRawIO()
   1881         writer = self.MockRawIO()
   1882         writer.close = writer_close
   1883         pair = self.tp(reader, writer)
   1884         with self.assertRaises(NameError) as err:
   1885             pair.close()
   1886         self.assertIn('writer_non_existing', str(err.exception))
   1887         self.assertFalse(pair.closed)
   1888         self.assertTrue(reader.closed)
   1889         self.assertFalse(writer.closed)
   1890 
   1891     def test_reader_writer_close_error_on_close(self):
   1892         def reader_close():
   1893             reader_non_existing
   1894         def writer_close():
   1895             writer_non_existing
   1896         reader = self.MockRawIO()
   1897         reader.close = reader_close
   1898         writer = self.MockRawIO()
   1899         writer.close = writer_close
   1900         pair = self.tp(reader, writer)
   1901         with self.assertRaises(NameError) as err:
   1902             pair.close()
   1903         self.assertIn('reader_non_existing', str(err.exception))
   1904         self.assertIsInstance(err.exception.__context__, NameError)
   1905         self.assertIn('writer_non_existing', str(err.exception.__context__))
   1906         self.assertFalse(pair.closed)
   1907         self.assertFalse(reader.closed)
   1908         self.assertFalse(writer.closed)
   1909 
   1910     def test_isatty(self):
   1911         class SelectableIsAtty(MockRawIO):
   1912             def __init__(self, isatty):
   1913                 MockRawIO.__init__(self)
   1914                 self._isatty = isatty
   1915 
   1916             def isatty(self):
   1917                 return self._isatty
   1918 
   1919         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
   1920         self.assertFalse(pair.isatty())
   1921 
   1922         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
   1923         self.assertTrue(pair.isatty())
   1924 
   1925         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
   1926         self.assertTrue(pair.isatty())
   1927 
   1928         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
   1929         self.assertTrue(pair.isatty())
   1930 
   1931     def test_weakref_clearing(self):
   1932         brw = self.tp(self.MockRawIO(), self.MockRawIO())
   1933         ref = weakref.ref(brw)
   1934         brw = None
   1935         ref = None # Shouldn't segfault.
   1936 
   1937 class CBufferedRWPairTest(BufferedRWPairTest):
   1938     tp = io.BufferedRWPair
   1939 
   1940 class PyBufferedRWPairTest(BufferedRWPairTest):
   1941     tp = pyio.BufferedRWPair
   1942 
   1943 
   1944 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
   1945     read_mode = "rb+"
   1946     write_mode = "wb+"
   1947 
   1948     def test_constructor(self):
   1949         BufferedReaderTest.test_constructor(self)
   1950         BufferedWriterTest.test_constructor(self)
   1951 
   1952     def test_uninitialized(self):
   1953         BufferedReaderTest.test_uninitialized(self)
   1954         BufferedWriterTest.test_uninitialized(self)
   1955 
   1956     def test_read_and_write(self):
   1957         raw = self.MockRawIO((b"asdf", b"ghjk"))
   1958         rw = self.tp(raw, 8)
   1959 
   1960         self.assertEqual(b"as", rw.read(2))
   1961         rw.write(b"ddd")
   1962         rw.write(b"eee")
   1963         self.assertFalse(raw._write_stack) # Buffer writes
   1964         self.assertEqual(b"ghjk", rw.read())
   1965         self.assertEqual(b"dddeee", raw._write_stack[0])
   1966 
   1967     def test_seek_and_tell(self):
   1968         raw = self.BytesIO(b"asdfghjkl")
   1969         rw = self.tp(raw)
   1970 
   1971         self.assertEqual(b"as", rw.read(2))
   1972         self.assertEqual(2, rw.tell())
   1973         rw.seek(0, 0)
   1974         self.assertEqual(b"asdf", rw.read(4))
   1975 
   1976         rw.write(b"123f")
   1977         rw.seek(0, 0)
   1978         self.assertEqual(b"asdf123fl", rw.read())
   1979         self.assertEqual(9, rw.tell())
   1980         rw.seek(-4, 2)
   1981         self.assertEqual(5, rw.tell())
   1982         rw.seek(2, 1)
   1983         self.assertEqual(7, rw.tell())
   1984         self.assertEqual(b"fl", rw.read(11))
   1985         rw.flush()
   1986         self.assertEqual(b"asdf123fl", raw.getvalue())
   1987 
   1988         self.assertRaises(TypeError, rw.seek, 0.0)
   1989 
   1990     def check_flush_and_read(self, read_func):
   1991         raw = self.BytesIO(b"abcdefghi")
   1992         bufio = self.tp(raw)
   1993 
   1994         self.assertEqual(b"ab", read_func(bufio, 2))
   1995         bufio.write(b"12")
   1996         self.assertEqual(b"ef", read_func(bufio, 2))
   1997         self.assertEqual(6, bufio.tell())
   1998         bufio.flush()
   1999         self.assertEqual(6, bufio.tell())
   2000         self.assertEqual(b"ghi", read_func(bufio))
   2001         raw.seek(0, 0)
   2002         raw.write(b"XYZ")
   2003         # flush() resets the read buffer
   2004         bufio.flush()
   2005         bufio.seek(0, 0)
   2006         self.assertEqual(b"XYZ", read_func(bufio, 3))
   2007 
   2008     def test_flush_and_read(self):
   2009         self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
   2010 
   2011     def test_flush_and_readinto(self):
   2012         def _readinto(bufio, n=-1):
   2013             b = bytearray(n if n >= 0 else 9999)
   2014             n = bufio.readinto(b)
   2015             return bytes(b[:n])
   2016         self.check_flush_and_read(_readinto)
   2017 
   2018     def test_flush_and_peek(self):
   2019         def _peek(bufio, n=-1):
   2020             # This relies on the fact that the buffer can contain the whole
   2021             # raw stream, otherwise peek() can return less.
   2022             b = bufio.peek(n)
   2023             if n != -1:
   2024                 b = b[:n]
   2025             bufio.seek(len(b), 1)
   2026             return b
   2027         self.check_flush_and_read(_peek)
   2028 
   2029     def test_flush_and_write(self):
   2030         raw = self.BytesIO(b"abcdefghi")
   2031         bufio = self.tp(raw)
   2032 
   2033         bufio.write(b"123")
   2034         bufio.flush()
   2035         bufio.write(b"45")
   2036         bufio.flush()
   2037         bufio.seek(0, 0)
   2038         self.assertEqual(b"12345fghi", raw.getvalue())
   2039         self.assertEqual(b"12345fghi", bufio.read())
   2040 
   2041     def test_threads(self):
   2042         BufferedReaderTest.test_threads(self)
   2043         BufferedWriterTest.test_threads(self)
   2044 
   2045     def test_writes_and_peek(self):
   2046         def _peek(bufio):
   2047             bufio.peek(1)
   2048         self.check_writes(_peek)
   2049         def _peek(bufio):
   2050             pos = bufio.tell()
   2051             bufio.seek(-1, 1)
   2052             bufio.peek(1)
   2053             bufio.seek(pos, 0)
   2054         self.check_writes(_peek)
   2055 
   2056     def test_writes_and_reads(self):
   2057         def _read(bufio):
   2058             bufio.seek(-1, 1)
   2059             bufio.read(1)
   2060         self.check_writes(_read)
   2061 
   2062     def test_writes_and_read1s(self):
   2063         def _read1(bufio):
   2064             bufio.seek(-1, 1)
   2065             bufio.read1(1)
   2066         self.check_writes(_read1)
   2067 
   2068     def test_writes_and_readintos(self):
   2069         def _read(bufio):
   2070             bufio.seek(-1, 1)
   2071             bufio.readinto(bytearray(1))
   2072         self.check_writes(_read)
   2073 
   2074     def test_write_after_readahead(self):
   2075         # Issue #6629: writing after the buffer was filled by readahead should
   2076         # first rewind the raw stream.
   2077         for overwrite_size in [1, 5]:
   2078             raw = self.BytesIO(b"A" * 10)
   2079             bufio = self.tp(raw, 4)
   2080             # Trigger readahead
   2081             self.assertEqual(bufio.read(1), b"A")
   2082             self.assertEqual(bufio.tell(), 1)
   2083             # Overwriting should rewind the raw stream if it needs so
   2084             bufio.write(b"B" * overwrite_size)
   2085             self.assertEqual(bufio.tell(), overwrite_size + 1)
   2086             # If the write size was smaller than the buffer size, flush() and
   2087             # check that rewind happens.
   2088             bufio.flush()
   2089             self.assertEqual(bufio.tell(), overwrite_size + 1)
   2090             s = raw.getvalue()
   2091             self.assertEqual(s,
   2092                 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
   2093 
   2094     def test_write_rewind_write(self):
   2095         # Various combinations of reading / writing / seeking backwards / writing again
   2096         def mutate(bufio, pos1, pos2):
   2097             assert pos2 >= pos1
   2098             # Fill the buffer
   2099             bufio.seek(pos1)
   2100             bufio.read(pos2 - pos1)
   2101             bufio.write(b'\x02')
   2102             # This writes earlier than the previous write, but still inside
   2103             # the buffer.
   2104             bufio.seek(pos1)
   2105             bufio.write(b'\x01')
   2106 
   2107         b = b"\x80\x81\x82\x83\x84"
   2108         for i in range(0, len(b)):
   2109             for j in range(i, len(b)):
   2110                 raw = self.BytesIO(b)
   2111                 bufio = self.tp(raw, 100)
   2112                 mutate(bufio, i, j)
   2113                 bufio.flush()
   2114                 expected = bytearray(b)
   2115                 expected[j] = 2
   2116                 expected[i] = 1
   2117                 self.assertEqual(raw.getvalue(), expected,
   2118                                  "failed result for i=%d, j=%d" % (i, j))
   2119 
   2120     def test_truncate_after_read_or_write(self):
   2121         raw = self.BytesIO(b"A" * 10)
   2122         bufio = self.tp(raw, 100)
   2123         self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
   2124         self.assertEqual(bufio.truncate(), 2)
   2125         self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
   2126         self.assertEqual(bufio.truncate(), 4)
   2127 
   2128     def test_misbehaved_io(self):
   2129         BufferedReaderTest.test_misbehaved_io(self)
   2130         BufferedWriterTest.test_misbehaved_io(self)
   2131 
   2132     def test_interleaved_read_write(self):
   2133         # Test for issue #12213
   2134         with self.BytesIO(b'abcdefgh') as raw:
   2135             with self.tp(raw, 100) as f:
   2136                 f.write(b"1")
   2137                 self.assertEqual(f.read(1), b'b')
   2138                 f.write(b'2')
   2139                 self.assertEqual(f.read1(1), b'd')
   2140                 f.write(b'3')
   2141                 buf = bytearray(1)
   2142                 f.readinto(buf)
   2143                 self.assertEqual(buf, b'f')
   2144                 f.write(b'4')
   2145                 self.assertEqual(f.peek(1), b'h')
   2146                 f.flush()
   2147                 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
   2148 
   2149         with self.BytesIO(b'abc') as raw:
   2150             with self.tp(raw, 100) as f:
   2151                 self.assertEqual(f.read(1), b'a')
   2152                 f.write(b"2")
   2153                 self.assertEqual(f.read(1), b'c')
   2154                 f.flush()
   2155                 self.assertEqual(raw.getvalue(), b'a2c')
   2156 
   2157     def test_interleaved_readline_write(self):
   2158         with self.BytesIO(b'ab\ncdef\ng\n') as raw:
   2159             with self.tp(raw) as f:
   2160                 f.write(b'1')
   2161                 self.assertEqual(f.readline(), b'b\n')
   2162                 f.write(b'2')
   2163                 self.assertEqual(f.readline(), b'def\n')
   2164                 f.write(b'3')
   2165                 self.assertEqual(f.readline(), b'\n')
   2166                 f.flush()
   2167                 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
   2168 
   2169     # You can't construct a BufferedRandom over a non-seekable stream.
   2170     test_unseekable = None
   2171 
   2172 
   2173 class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
   2174     tp = io.BufferedRandom
   2175 
   2176     def test_constructor(self):
   2177         BufferedRandomTest.test_constructor(self)
   2178         # The allocation can succeed on 32-bit builds, e.g. with more
   2179         # than 2GB RAM and a 64-bit kernel.
   2180         if sys.maxsize > 0x7FFFFFFF:
   2181             rawio = self.MockRawIO()
   2182             bufio = self.tp(rawio)
   2183             self.assertRaises((OverflowError, MemoryError, ValueError),
   2184                 bufio.__init__, rawio, sys.maxsize)
   2185 
   2186     def test_garbage_collection(self):
   2187         CBufferedReaderTest.test_garbage_collection(self)
   2188         CBufferedWriterTest.test_garbage_collection(self)
   2189 
   2190     def test_args_error(self):
   2191         # Issue #17275
   2192         with self.assertRaisesRegex(TypeError, "BufferedRandom"):
   2193             self.tp(io.BytesIO(), 1024, 1024, 1024)
   2194 
   2195 
   2196 class PyBufferedRandomTest(BufferedRandomTest):
   2197     tp = pyio.BufferedRandom
   2198 
   2199 
   2200 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
   2201 # properties:
   2202 #   - A single output character can correspond to many bytes of input.
   2203 #   - The number of input bytes to complete the character can be
   2204 #     undetermined until the last input byte is received.
   2205 #   - The number of input bytes can vary depending on previous input.
   2206 #   - A single input byte can correspond to many characters of output.
   2207 #   - The number of output characters can be undetermined until the
   2208 #     last input byte is received.
   2209 #   - The number of output characters can vary depending on previous input.
   2210 
   2211 class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
   2212     """
   2213     For testing seek/tell behavior with a stateful, buffering decoder.
   2214 
   2215     Input is a sequence of words.  Words may be fixed-length (length set
   2216     by input) or variable-length (period-terminated).  In variable-length
   2217     mode, extra periods are ignored.  Possible words are:
   2218       - 'i' followed by a number sets the input length, I (maximum 99).
   2219         When I is set to 0, words are space-terminated.
   2220       - 'o' followed by a number sets the output length, O (maximum 99).
   2221       - Any other word is converted into a word followed by a period on
   2222         the output.  The output word consists of the input word truncated
   2223         or padded out with hyphens to make its length equal to O.  If O
   2224         is 0, the word is output verbatim without truncating or padding.
   2225     I and O are initially set to 1.  When I changes, any buffered input is
   2226     re-scanned according to the new I.  EOF also terminates the last word.
   2227     """
   2228 
   2229     def __init__(self, errors='strict'):
   2230         codecs.IncrementalDecoder.__init__(self, errors)
   2231         self.reset()
   2232 
   2233     def __repr__(self):
   2234         return '<SID %x>' % id(self)
   2235 
   2236     def reset(self):
   2237         self.i = 1
   2238         self.o = 1
   2239         self.buffer = bytearray()
   2240 
   2241     def getstate(self):
   2242         i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
   2243         return bytes(self.buffer), i*100 + o
   2244 
   2245     def setstate(self, state):
   2246         buffer, io = state
   2247         self.buffer = bytearray(buffer)
   2248         i, o = divmod(io, 100)
   2249         self.i, self.o = i ^ 1, o ^ 1
   2250 
   2251     def decode(self, input, final=False):
   2252         output = ''
   2253         for b in input:
   2254             if self.i == 0: # variable-length, terminated with period
   2255                 if b == ord('.'):
   2256                     if self.buffer:
   2257                         output += self.process_word()
   2258                 else:
   2259                     self.buffer.append(b)
   2260             else: # fixed-length, terminate after self.i bytes
   2261                 self.buffer.append(b)
   2262                 if len(self.buffer) == self.i:
   2263                     output += self.process_word()
   2264         if final and self.buffer: # EOF terminates the last word
   2265             output += self.process_word()
   2266         return output
   2267 
   2268     def process_word(self):
   2269         output = ''
   2270         if self.buffer[0] == ord('i'):
   2271             self.i = min(99, int(self.buffer[1:] or 0)) # set input length
   2272         elif self.buffer[0] == ord('o'):
   2273             self.o = min(99, int(self.buffer[1:] or 0)) # set output length
   2274         else:
   2275             output = self.buffer.decode('ascii')
   2276             if len(output) < self.o:
   2277                 output += '-'*self.o # pad out with hyphens
   2278             if self.o:
   2279                 output = output[:self.o] # truncate to output length
   2280             output += '.'
   2281         self.buffer = bytearray()
   2282         return output
   2283 
   2284     codecEnabled = False
   2285 
   2286     @classmethod
   2287     def lookupTestDecoder(cls, name):
   2288         if cls.codecEnabled and name == 'test_decoder':
   2289             latin1 = codecs.lookup('latin-1')
   2290             return codecs.CodecInfo(
   2291                 name='test_decoder', encode=latin1.encode, decode=None,
   2292                 incrementalencoder=None,
   2293                 streamreader=None, streamwriter=None,
   2294                 incrementaldecoder=cls)
   2295 
   2296 # Register the previous decoder for testing.
   2297 # Disabled by default, tests will enable it.
   2298 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
   2299 
   2300 
   2301 class StatefulIncrementalDecoderTest(unittest.TestCase):
   2302     """
   2303     Make sure the StatefulIncrementalDecoder actually works.
   2304     """
   2305 
   2306     test_cases = [
   2307         # I=1, O=1 (fixed-length input == fixed-length output)
   2308         (b'abcd', False, 'a.b.c.d.'),
   2309         # I=0, O=0 (variable-length input, variable-length output)
   2310         (b'oiabcd', True, 'abcd.'),
   2311         # I=0, O=0 (should ignore extra periods)
   2312         (b'oi...abcd...', True, 'abcd.'),
   2313         # I=0, O=6 (variable-length input, fixed-length output)
   2314         (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
   2315         # I=2, O=6 (fixed-length input < fixed-length output)
   2316         (b'i.i2.o6xyz', True, 'xy----.z-----.'),
   2317         # I=6, O=3 (fixed-length input > fixed-length output)
   2318         (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
   2319         # I=0, then 3; O=29, then 15 (with longer output)
   2320         (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
   2321          'a----------------------------.' +
   2322          'b----------------------------.' +
   2323          'cde--------------------------.' +
   2324          'abcdefghijabcde.' +
   2325          'a.b------------.' +
   2326          '.c.------------.' +
   2327          'd.e------------.' +
   2328          'k--------------.' +
   2329          'l--------------.' +
   2330          'm--------------.')
   2331     ]
   2332 
   2333     def test_decoder(self):
   2334         # Try a few one-shot test cases.
   2335         for input, eof, output in self.test_cases:
   2336             d = StatefulIncrementalDecoder()
   2337             self.assertEqual(d.decode(input, eof), output)
   2338 
   2339         # Also test an unfinished decode, followed by forcing EOF.
   2340         d = StatefulIncrementalDecoder()
   2341         self.assertEqual(d.decode(b'oiabcd'), '')
   2342         self.assertEqual(d.decode(b'', 1), 'abcd.')
   2343 
   2344 class TextIOWrapperTest(unittest.TestCase):
   2345 
   2346     def setUp(self):
   2347         self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
   2348         self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
   2349         support.unlink(support.TESTFN)
   2350 
   2351     def tearDown(self):
   2352         support.unlink(support.TESTFN)
   2353 
   2354     def test_constructor(self):
   2355         r = self.BytesIO(b"\xc3\xa9\n\n")
   2356         b = self.BufferedReader(r, 1000)
   2357         t = self.TextIOWrapper(b)
   2358         t.__init__(b, encoding="latin-1", newline="\r\n")
   2359         self.assertEqual(t.encoding, "latin-1")
   2360         self.assertEqual(t.line_buffering, False)
   2361         t.__init__(b, encoding="utf-8", line_buffering=True)
   2362         self.assertEqual(t.encoding, "utf-8")
   2363         self.assertEqual(t.line_buffering, True)
   2364         self.assertEqual("\xe9\n", t.readline())
   2365         self.assertRaises(TypeError, t.__init__, b, newline=42)
   2366         self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
   2367 
   2368     def test_uninitialized(self):
   2369         t = self.TextIOWrapper.__new__(self.TextIOWrapper)
   2370         del t
   2371         t = self.TextIOWrapper.__new__(self.TextIOWrapper)
   2372         self.assertRaises(Exception, repr, t)
   2373         self.assertRaisesRegex((ValueError, AttributeError),
   2374                                'uninitialized|has no attribute',
   2375                                t.read, 0)
   2376         t.__init__(self.MockRawIO())
   2377         self.assertEqual(t.read(0), '')
   2378 
   2379     def test_non_text_encoding_codecs_are_rejected(self):
   2380         # Ensure the constructor complains if passed a codec that isn't
   2381         # marked as a text encoding
   2382         # http://bugs.python.org/issue20404
   2383         r = self.BytesIO()
   2384         b = self.BufferedWriter(r)
   2385         with self.assertRaisesRegex(LookupError, "is not a text encoding"):
   2386             self.TextIOWrapper(b, encoding="hex")
   2387 
   2388     def test_detach(self):
   2389         r = self.BytesIO()
   2390         b = self.BufferedWriter(r)
   2391         t = self.TextIOWrapper(b)
   2392         self.assertIs(t.detach(), b)
   2393 
   2394         t = self.TextIOWrapper(b, encoding="ascii")
   2395         t.write("howdy")
   2396         self.assertFalse(r.getvalue())
   2397         t.detach()
   2398         self.assertEqual(r.getvalue(), b"howdy")
   2399         self.assertRaises(ValueError, t.detach)
   2400 
   2401         # Operations independent of the detached stream should still work
   2402         repr(t)
   2403         self.assertEqual(t.encoding, "ascii")
   2404         self.assertEqual(t.errors, "strict")
   2405         self.assertFalse(t.line_buffering)
   2406 
   2407     def test_repr(self):
   2408         raw = self.BytesIO("hello".encode("utf-8"))
   2409         b = self.BufferedReader(raw)
   2410         t = self.TextIOWrapper(b, encoding="utf-8")
   2411         modname = self.TextIOWrapper.__module__
   2412         self.assertEqual(repr(t),
   2413                          "<%s.TextIOWrapper encoding='utf-8'>" % modname)
   2414         raw.name = "dummy"
   2415         self.assertEqual(repr(t),
   2416                          "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
   2417         t.mode = "r"
   2418         self.assertEqual(repr(t),
   2419                          "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
   2420         raw.name = b"dummy"
   2421         self.assertEqual(repr(t),
   2422                          "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
   2423 
   2424         t.buffer.detach()
   2425         repr(t)  # Should not raise an exception
   2426 
   2427     def test_line_buffering(self):
   2428         r = self.BytesIO()
   2429         b = self.BufferedWriter(r, 1000)
   2430         t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
   2431         t.write("X")
   2432         self.assertEqual(r.getvalue(), b"")  # No flush happened
   2433         t.write("Y\nZ")
   2434         self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
   2435         t.write("A\rB")
   2436         self.assertEqual(r.getvalue(), b"XY\nZA\rB")
   2437 
   2438     def test_default_encoding(self):
   2439         old_environ = dict(os.environ)
   2440         try:
   2441             # try to get a user preferred encoding different than the current
   2442             # locale encoding to check that TextIOWrapper() uses the current
   2443             # locale encoding and not the user preferred encoding
   2444             for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
   2445                 if key in os.environ:
   2446                     del os.environ[key]
   2447 
   2448             current_locale_encoding = locale.getpreferredencoding(False)
   2449             b = self.BytesIO()
   2450             t = self.TextIOWrapper(b)
   2451             self.assertEqual(t.encoding, current_locale_encoding)
   2452         finally:
   2453             os.environ.clear()
   2454             os.environ.update(old_environ)
   2455 
   2456     @support.cpython_only
   2457     def test_device_encoding(self):
   2458         # Issue 15989
   2459         import _testcapi
   2460         b = self.BytesIO()
   2461         b.fileno = lambda: _testcapi.INT_MAX + 1
   2462         self.assertRaises(OverflowError, self.TextIOWrapper, b)
   2463         b.fileno = lambda: _testcapi.UINT_MAX + 1
   2464         self.assertRaises(OverflowError, self.TextIOWrapper, b)
   2465 
   2466     def test_encoding(self):
   2467         # Check the encoding attribute is always set, and valid
   2468         b = self.BytesIO()
   2469         t = self.TextIOWrapper(b, encoding="utf-8")
   2470         self.assertEqual(t.encoding, "utf-8")
   2471         t = self.TextIOWrapper(b)
   2472         self.assertIsNotNone(t.encoding)
   2473         codecs.lookup(t.encoding)
   2474 
   2475     def test_encoding_errors_reading(self):
   2476         # (1) default
   2477         b = self.BytesIO(b"abc\n\xff\n")
   2478         t = self.TextIOWrapper(b, encoding="ascii")
   2479         self.assertRaises(UnicodeError, t.read)
   2480         # (2) explicit strict
   2481         b = self.BytesIO(b"abc\n\xff\n")
   2482         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
   2483         self.assertRaises(UnicodeError, t.read)
   2484         # (3) ignore
   2485         b = self.BytesIO(b"abc\n\xff\n")
   2486         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
   2487         self.assertEqual(t.read(), "abc\n\n")
   2488         # (4) replace
   2489         b = self.BytesIO(b"abc\n\xff\n")
   2490         t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
   2491         self.assertEqual(t.read(), "abc\n\ufffd\n")
   2492 
   2493     def test_encoding_errors_writing(self):
   2494         # (1) default
   2495         b = self.BytesIO()
   2496         t = self.TextIOWrapper(b, encoding="ascii")
   2497         self.assertRaises(UnicodeError, t.write, "\xff")
   2498         # (2) explicit strict
   2499         b = self.BytesIO()
   2500         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
   2501         self.assertRaises(UnicodeError, t.write, "\xff")
   2502         # (3) ignore
   2503         b = self.BytesIO()
   2504         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
   2505                              newline="\n")
   2506         t.write("abc\xffdef\n")
   2507         t.flush()
   2508         self.assertEqual(b.getvalue(), b"abcdef\n")
   2509         # (4) replace
   2510         b = self.BytesIO()
   2511         t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
   2512                              newline="\n")
   2513         t.write("abc\xffdef\n")
   2514         t.flush()
   2515         self.assertEqual(b.getvalue(), b"abc?def\n")
   2516 
   2517     def test_newlines(self):
   2518         input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
   2519 
   2520         tests = [
   2521             [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
   2522             [ '', input_lines ],
   2523             [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
   2524             [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
   2525             [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
   2526         ]
   2527         encodings = (
   2528             'utf-8', 'latin-1',
   2529             'utf-16', 'utf-16-le', 'utf-16-be',
   2530             'utf-32', 'utf-32-le', 'utf-32-be',
   2531         )
   2532 
   2533         # Try a range of buffer sizes to test the case where \r is the last
   2534         # character in TextIOWrapper._pending_line.
   2535         for encoding in encodings:
   2536             # XXX: str.encode() should return bytes
   2537             data = bytes(''.join(input_lines).encode(encoding))
   2538             for do_reads in (False, True):
   2539                 for bufsize in range(1, 10):
   2540                     for newline, exp_lines in tests:
   2541                         bufio = self.BufferedReader(self.BytesIO(data), bufsize)
   2542                         textio = self.TextIOWrapper(bufio, newline=newline,
   2543                                                   encoding=encoding)
   2544                         if do_reads:
   2545                             got_lines = []
   2546                             while True:
   2547                                 c2 = textio.read(2)
   2548                                 if c2 == '':
   2549                                     break
   2550                                 self.assertEqual(len(c2), 2)
   2551                                 got_lines.append(c2 + textio.readline())
   2552                         else:
   2553                             got_lines = list(textio)
   2554 
   2555                         for got_line, exp_line in zip(got_lines, exp_lines):
   2556                             self.assertEqual(got_line, exp_line)
   2557                         self.assertEqual(len(got_lines), len(exp_lines))
   2558 
   2559     def test_newlines_input(self):
   2560         testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
   2561         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
   2562         for newline, expected in [
   2563             (None, normalized.decode("ascii").splitlines(keepends=True)),
   2564             ("", testdata.decode("ascii").splitlines(keepends=True)),
   2565             ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
   2566             ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
   2567             ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
   2568             ]:
   2569             buf = self.BytesIO(testdata)
   2570             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
   2571             self.assertEqual(txt.readlines(), expected)
   2572             txt.seek(0)
   2573             self.assertEqual(txt.read(), "".join(expected))
   2574 
   2575     def test_newlines_output(self):
   2576         testdict = {
   2577             "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
   2578             "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
   2579             "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
   2580             "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
   2581             }
   2582         tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
   2583         for newline, expected in tests:
   2584             buf = self.BytesIO()
   2585             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
   2586             txt.write("AAA\nB")
   2587             txt.write("BB\nCCC\n")
   2588             txt.write("X\rY\r\nZ")
   2589             txt.flush()
   2590             self.assertEqual(buf.closed, False)
   2591             self.assertEqual(buf.getvalue(), expected)
   2592 
   2593     def test_destructor(self):
   2594         l = []
   2595         base = self.BytesIO
   2596         class MyBytesIO(base):
   2597             def close(self):
   2598                 l.append(self.getvalue())
   2599                 base.close(self)
   2600         b = MyBytesIO()
   2601         t = self.TextIOWrapper(b, encoding="ascii")
   2602         t.write("abc")
   2603         del t
   2604         support.gc_collect()
   2605         self.assertEqual([b"abc"], l)
   2606 
   2607     def test_override_destructor(self):
   2608         record = []
   2609         class MyTextIO(self.TextIOWrapper):
   2610             def __del__(self):
   2611                 record.append(1)
   2612                 try:
   2613                     f = super().__del__
   2614                 except AttributeError:
   2615                     pass
   2616                 else:
   2617                     f()
   2618             def close(self):
   2619                 record.append(2)
   2620                 super().close()
   2621             def flush(self):
   2622                 record.append(3)
   2623                 super().flush()
   2624         b = self.BytesIO()
   2625         t = MyTextIO(b, encoding="ascii")
   2626         del t
   2627         support.gc_collect()
   2628         self.assertEqual(record, [1, 2, 3])
   2629 
   2630     def test_error_through_destructor(self):
   2631         # Test that the exception state is not modified by a destructor,
   2632         # even if close() fails.
   2633         rawio = self.CloseFailureIO()
   2634         def f():
   2635             self.TextIOWrapper(rawio).xyzzy
   2636         with support.captured_output("stderr") as s:
   2637             self.assertRaises(AttributeError, f)
   2638         s = s.getvalue().strip()
   2639         if s:
   2640             # The destructor *may* have printed an unraisable error, check it
   2641             self.assertEqual(len(s.splitlines()), 1)
   2642             self.assertTrue(s.startswith("Exception OSError: "), s)
   2643             self.assertTrue(s.endswith(" ignored"), s)
   2644 
   2645     # Systematic tests of the text I/O API
   2646 
   2647     def test_basic_io(self):
   2648         for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
   2649             for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
   2650                 f = self.open(support.TESTFN, "w+", encoding=enc)
   2651                 f._CHUNK_SIZE = chunksize
   2652                 self.assertEqual(f.write("abc"), 3)
   2653                 f.close()
   2654                 f = self.open(support.TESTFN, "r+", encoding=enc)
   2655                 f._CHUNK_SIZE = chunksize
   2656                 self.assertEqual(f.tell(), 0)
   2657                 self.assertEqual(f.read(), "abc")
   2658                 cookie = f.tell()
   2659                 self.assertEqual(f.seek(0), 0)
   2660                 self.assertEqual(f.read(None), "abc")
   2661                 f.seek(0)
   2662                 self.assertEqual(f.read(2), "ab")
   2663                 self.assertEqual(f.read(1), "c")
   2664                 self.assertEqual(f.read(1), "")
   2665                 self.assertEqual(f.read(), "")
   2666                 self.assertEqual(f.tell(), cookie)
   2667                 self.assertEqual(f.seek(0), 0)
   2668                 self.assertEqual(f.seek(0, 2), cookie)
   2669                 self.assertEqual(f.write("def"), 3)
   2670                 self.assertEqual(f.seek(cookie), cookie)
   2671                 self.assertEqual(f.read(), "def")
   2672                 if enc.startswith("utf"):
   2673                     self.multi_line_test(f, enc)
   2674                 f.close()
   2675 
   2676     def multi_line_test(self, f, enc):
   2677         f.seek(0)
   2678         f.truncate()
   2679         sample = "s\xff\u0fff\uffff"
   2680         wlines = []
   2681         for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
   2682             chars = []
   2683             for i in range(size):
   2684                 chars.append(sample[i % len(sample)])
   2685             line = "".join(chars) + "\n"
   2686             wlines.append((f.tell(), line))
   2687             f.write(line)
   2688         f.seek(0)
   2689         rlines = []
   2690         while True:
   2691             pos = f.tell()
   2692             line = f.readline()
   2693             if not line:
   2694                 break
   2695             rlines.append((pos, line))
   2696         self.assertEqual(rlines, wlines)
   2697 
   2698     def test_telling(self):
   2699         f = self.open(support.TESTFN, "w+", encoding="utf-8")
   2700         p0 = f.tell()
   2701         f.write("\xff\n")
   2702         p1 = f.tell()
   2703         f.write("\xff\n")
   2704         p2 = f.tell()
   2705         f.seek(0)
   2706         self.assertEqual(f.tell(), p0)
   2707         self.assertEqual(f.readline(), "\xff\n")
   2708         self.assertEqual(f.tell(), p1)
   2709         self.assertEqual(f.readline(), "\xff\n")
   2710         self.assertEqual(f.tell(), p2)
   2711         f.seek(0)
   2712         for line in f:
   2713             self.assertEqual(line, "\xff\n")
   2714             self.assertRaises(OSError, f.tell)
   2715         self.assertEqual(f.tell(), p2)
   2716         f.close()
   2717 
   2718     def test_seeking(self):
   2719         chunk_size = _default_chunk_size()
   2720         prefix_size = chunk_size - 2
   2721         u_prefix = "a" * prefix_size
   2722         prefix = bytes(u_prefix.encode("utf-8"))
   2723         self.assertEqual(len(u_prefix), len(prefix))
   2724         u_suffix = "\u8888\n"
   2725         suffix = bytes(u_suffix.encode("utf-8"))
   2726         line = prefix + suffix
   2727         with self.open(support.TESTFN, "wb") as f:
   2728             f.write(line*2)
   2729         with self.open(support.TESTFN, "r", encoding="utf-8") as f:
   2730             s = f.read(prefix_size)
   2731             self.assertEqual(s, str(prefix, "ascii"))
   2732             self.assertEqual(f.tell(), prefix_size)
   2733             self.assertEqual(f.readline(), u_suffix)
   2734 
   2735     def test_seeking_too(self):
   2736         # Regression test for a specific bug
   2737         data = b'\xe0\xbf\xbf\n'
   2738         with self.open(support.TESTFN, "wb") as f:
   2739             f.write(data)
   2740         with self.open(support.TESTFN, "r", encoding="utf-8") as f:
   2741             f._CHUNK_SIZE  # Just test that it exists
   2742             f._CHUNK_SIZE = 2
   2743             f.readline()
   2744             f.tell()
   2745 
   2746     def test_seek_and_tell(self):
   2747         #Test seek/tell using the StatefulIncrementalDecoder.
   2748         # Make test faster by doing smaller seeks
   2749         CHUNK_SIZE = 128
   2750 
   2751         def test_seek_and_tell_with_data(data, min_pos=0):
   2752             """Tell/seek to various points within a data stream and ensure
   2753             that the decoded data returned by read() is consistent."""
   2754             f = self.open(support.TESTFN, 'wb')
   2755             f.write(data)
   2756             f.close()
   2757             f = self.open(support.TESTFN, encoding='test_decoder')
   2758             f._CHUNK_SIZE = CHUNK_SIZE
   2759             decoded = f.read()
   2760             f.close()
   2761 
   2762             for i in range(min_pos, len(decoded) + 1): # seek positions
   2763                 for j in [1, 5, len(decoded) - i]: # read lengths
   2764                     f = self.open(support.TESTFN, encoding='test_decoder')
   2765                     self.assertEqual(f.read(i), decoded[:i])
   2766                     cookie = f.tell()
   2767                     self.assertEqual(f.read(j), decoded[i:i + j])
   2768                     f.seek(cookie)
   2769                     self.assertEqual(f.read(), decoded[i:])
   2770                     f.close()
   2771 
   2772         # Enable the test decoder.
   2773         StatefulIncrementalDecoder.codecEnabled = 1
   2774 
   2775         # Run the tests.
   2776         try:
   2777             # Try each test case.
   2778             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
   2779                 test_seek_and_tell_with_data(input)
   2780 
   2781             # Position each test case so that it crosses a chunk boundary.
   2782             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
   2783                 offset = CHUNK_SIZE - len(input)//2
   2784                 prefix = b'.'*offset
   2785                 # Don't bother seeking into the prefix (takes too long).
   2786                 min_pos = offset*2
   2787                 test_seek_and_tell_with_data(prefix + input, min_pos)
   2788 
   2789         # Ensure our test decoder won't interfere with subsequent tests.
   2790         finally:
   2791             StatefulIncrementalDecoder.codecEnabled = 0
   2792 
   2793     def test_encoded_writes(self):
   2794         data = "1234567890"
   2795         tests = ("utf-16",
   2796                  "utf-16-le",
   2797                  "utf-16-be",
   2798                  "utf-32",
   2799                  "utf-32-le",
   2800                  "utf-32-be")
   2801         for encoding in tests:
   2802             buf = self.BytesIO()
   2803             f = self.TextIOWrapper(buf, encoding=encoding)
   2804             # Check if the BOM is written only once (see issue1753).
   2805             f.write(data)
   2806             f.write(data)
   2807             f.seek(0)
   2808             self.assertEqual(f.read(), data * 2)
   2809             f.seek(0)
   2810             self.assertEqual(f.read(), data * 2)
   2811             self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
   2812 
   2813     def test_unreadable(self):
   2814         class UnReadable(self.BytesIO):
   2815             def readable(self):
   2816                 return False
   2817         txt = self.TextIOWrapper(UnReadable())
   2818         self.assertRaises(OSError, txt.read)
   2819 
   2820     def test_read_one_by_one(self):
   2821         txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
   2822         reads = ""
   2823         while True:
   2824             c = txt.read(1)
   2825             if not c:
   2826                 break
   2827             reads += c
   2828         self.assertEqual(reads, "AA\nBB")
   2829 
   2830     def test_readlines(self):
   2831         txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
   2832         self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
   2833         txt.seek(0)
   2834         self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
   2835         txt.seek(0)
   2836         self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
   2837 
   2838     # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
   2839     def test_read_by_chunk(self):
   2840         # make sure "\r\n" straddles 128 char boundary.
   2841         txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
   2842         reads = ""
   2843         while True:
   2844             c = txt.read(128)
   2845             if not c:
   2846                 break
   2847             reads += c
   2848         self.assertEqual(reads, "A"*127+"\nB")
   2849 
   2850     def test_writelines(self):
   2851         l = ['ab', 'cd', 'ef']
   2852         buf = self.BytesIO()
   2853         txt = self.TextIOWrapper(buf)
   2854         txt.writelines(l)
   2855         txt.flush()
   2856         self.assertEqual(buf.getvalue(), b'abcdef')
   2857 
   2858     def test_writelines_userlist(self):
   2859         l = UserList(['ab', 'cd', 'ef'])
   2860         buf = self.BytesIO()
   2861         txt = self.TextIOWrapper(buf)
   2862         txt.writelines(l)
   2863         txt.flush()
   2864         self.assertEqual(buf.getvalue(), b'abcdef')
   2865 
   2866     def test_writelines_error(self):
   2867         txt = self.TextIOWrapper(self.BytesIO())
   2868         self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
   2869         self.assertRaises(TypeError, txt.writelines, None)
   2870         self.assertRaises(TypeError, txt.writelines, b'abc')
   2871 
   2872     def test_issue1395_1(self):
   2873         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2874 
   2875         # read one char at a time
   2876         reads = ""
   2877         while True:
   2878             c = txt.read(1)
   2879             if not c:
   2880                 break
   2881             reads += c
   2882         self.assertEqual(reads, self.normalized)
   2883 
   2884     def test_issue1395_2(self):
   2885         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2886         txt._CHUNK_SIZE = 4
   2887 
   2888         reads = ""
   2889         while True:
   2890             c = txt.read(4)
   2891             if not c:
   2892                 break
   2893             reads += c
   2894         self.assertEqual(reads, self.normalized)
   2895 
   2896     def test_issue1395_3(self):
   2897         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2898         txt._CHUNK_SIZE = 4
   2899 
   2900         reads = txt.read(4)
   2901         reads += txt.read(4)
   2902         reads += txt.readline()
   2903         reads += txt.readline()
   2904         reads += txt.readline()
   2905         self.assertEqual(reads, self.normalized)
   2906 
   2907     def test_issue1395_4(self):
   2908         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2909         txt._CHUNK_SIZE = 4
   2910 
   2911         reads = txt.read(4)
   2912         reads += txt.read()
   2913         self.assertEqual(reads, self.normalized)
   2914 
   2915     def test_issue1395_5(self):
   2916         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2917         txt._CHUNK_SIZE = 4
   2918 
   2919         reads = txt.read(4)
   2920         pos = txt.tell()
   2921         txt.seek(0)
   2922         txt.seek(pos)
   2923         self.assertEqual(txt.read(4), "BBB\n")
   2924 
   2925     def test_issue2282(self):
   2926         buffer = self.BytesIO(self.testdata)
   2927         txt = self.TextIOWrapper(buffer, encoding="ascii")
   2928 
   2929         self.assertEqual(buffer.seekable(), txt.seekable())
   2930 
   2931     def test_append_bom(self):
   2932         # The BOM is not written again when appending to a non-empty file
   2933         filename = support.TESTFN
   2934         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
   2935             with self.open(filename, 'w', encoding=charset) as f:
   2936                 f.write('aaa')
   2937                 pos = f.tell()
   2938             with self.open(filename, 'rb') as f:
   2939                 self.assertEqual(f.read(), 'aaa'.encode(charset))
   2940 
   2941             with self.open(filename, 'a', encoding=charset) as f:
   2942                 f.write('xxx')
   2943             with self.open(filename, 'rb') as f:
   2944                 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
   2945 
   2946     def test_seek_bom(self):
   2947         # Same test, but when seeking manually
   2948         filename = support.TESTFN
   2949         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
   2950             with self.open(filename, 'w', encoding=charset) as f:
   2951                 f.write('aaa')
   2952                 pos = f.tell()
   2953             with self.open(filename, 'r+', encoding=charset) as f:
   2954                 f.seek(pos)
   2955                 f.write('zzz')
   2956                 f.seek(0)
   2957                 f.write('bbb')
   2958             with self.open(filename, 'rb') as f:
   2959                 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
   2960 
   2961     def test_seek_append_bom(self):
   2962         # Same test, but first seek to the start and then to the end
   2963         filename = support.TESTFN
   2964         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
   2965             with self.open(filename, 'w', encoding=charset) as f:
   2966                 f.write('aaa')
   2967             with self.open(filename, 'a', encoding=charset) as f:
   2968                 f.seek(0)
   2969                 f.seek(0, self.SEEK_END)
   2970                 f.write('xxx')
   2971             with self.open(filename, 'rb') as f:
   2972                 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
   2973 
   2974     def test_errors_property(self):
   2975         with self.open(support.TESTFN, "w") as f:
   2976             self.assertEqual(f.errors, "strict")
   2977         with self.open(support.TESTFN, "w", errors="replace") as f:
   2978             self.assertEqual(f.errors, "replace")
   2979 
   2980     @support.no_tracing
   2981     @unittest.skipUnless(threading, 'Threading required for this test.')
   2982     def test_threads_write(self):
   2983         # Issue6750: concurrent writes could duplicate data
   2984         event = threading.Event()
   2985         with self.open(support.TESTFN, "w", buffering=1) as f:
   2986             def run(n):
   2987                 text = "Thread%03d\n" % n
   2988                 event.wait()
   2989                 f.write(text)
   2990             threads = [threading.Thread(target=run, args=(x,))
   2991                        for x in range(20)]
   2992             with support.start_threads(threads, event.set):
   2993                 time.sleep(0.02)
   2994         with self.open(support.TESTFN) as f:
   2995             content = f.read()
   2996             for n in range(20):
   2997                 self.assertEqual(content.count("Thread%03d\n" % n), 1)
   2998 
   2999     def test_flush_error_on_close(self):
   3000         # Test that text file is closed despite failed flush
   3001         # and that flush() is called before file closed.
   3002         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   3003         closed = []
   3004         def bad_flush():
   3005             closed[:] = [txt.closed, txt.buffer.closed]
   3006             raise OSError()
   3007         txt.flush = bad_flush
   3008         self.assertRaises(OSError, txt.close) # exception not swallowed
   3009         self.assertTrue(txt.closed)
   3010         self.assertTrue(txt.buffer.closed)
   3011         self.assertTrue(closed)      # flush() called
   3012         self.assertFalse(closed[0])  # flush() called before file closed
   3013         self.assertFalse(closed[1])
   3014         txt.flush = lambda: None  # break reference loop
   3015 
   3016     def test_close_error_on_close(self):
   3017         buffer = self.BytesIO(self.testdata)
   3018         def bad_flush():
   3019             raise OSError('flush')
   3020         def bad_close():
   3021             raise OSError('close')
   3022         buffer.close = bad_close
   3023         txt = self.TextIOWrapper(buffer, encoding="ascii")
   3024         txt.flush = bad_flush
   3025         with self.assertRaises(OSError) as err: # exception not swallowed
   3026             txt.close()
   3027         self.assertEqual(err.exception.args, ('close',))
   3028         self.assertIsInstance(err.exception.__context__, OSError)
   3029         self.assertEqual(err.exception.__context__.args, ('flush',))
   3030         self.assertFalse(txt.closed)
   3031 
   3032     def test_nonnormalized_close_error_on_close(self):
   3033         # Issue #21677
   3034         buffer = self.BytesIO(self.testdata)
   3035         def bad_flush():
   3036             raise non_existing_flush
   3037         def bad_close():
   3038             raise non_existing_close
   3039         buffer.close = bad_close
   3040         txt = self.TextIOWrapper(buffer, encoding="ascii")
   3041         txt.flush = bad_flush
   3042         with self.assertRaises(NameError) as err: # exception not swallowed
   3043             txt.close()
   3044         self.assertIn('non_existing_close', str(err.exception))
   3045         self.assertIsInstance(err.exception.__context__, NameError)
   3046         self.assertIn('non_existing_flush', str(err.exception.__context__))
   3047         self.assertFalse(txt.closed)
   3048 
   3049     def test_multi_close(self):
   3050         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   3051         txt.close()
   3052         txt.close()
   3053         txt.close()
   3054         self.assertRaises(ValueError, txt.flush)
   3055 
   3056     def test_unseekable(self):
   3057         txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
   3058         self.assertRaises(self.UnsupportedOperation, txt.tell)
   3059         self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
   3060 
   3061     def test_readonly_attributes(self):
   3062         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   3063         buf = self.BytesIO(self.testdata)
   3064         with self.assertRaises(AttributeError):
   3065             txt.buffer = buf
   3066 
   3067     def test_rawio(self):
   3068         # Issue #12591: TextIOWrapper must work with raw I/O objects, so
   3069         # that subprocess.Popen() can have the required unbuffered
   3070         # semantics with universal_newlines=True.
   3071         raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
   3072         txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
   3073         # Reads
   3074         self.assertEqual(txt.read(4), 'abcd')
   3075         self.assertEqual(txt.readline(), 'efghi\n')
   3076         self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
   3077 
   3078     def test_rawio_write_through(self):
   3079         # Issue #12591: with write_through=True, writes don't need a flush
   3080         raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
   3081         txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
   3082                                  write_through=True)
   3083         txt.write('1')
   3084         txt.write('23\n4')
   3085         txt.write('5')
   3086         self.assertEqual(b''.join(raw._write_stack), b'123\n45')
   3087 
   3088     def test_bufio_write_through(self):
   3089         # Issue #21396: write_through=True doesn't force a flush()
   3090         # on the underlying binary buffered object.
   3091         flush_called, write_called = [], []
   3092         class BufferedWriter(self.BufferedWriter):
   3093             def flush(self, *args, **kwargs):
   3094                 flush_called.append(True)
   3095                 return super().flush(*args, **kwargs)
   3096             def write(self, *args, **kwargs):
   3097                 write_called.append(True)
   3098                 return super().write(*args, **kwargs)
   3099 
   3100         rawio = self.BytesIO()
   3101         data = b"a"
   3102         bufio = BufferedWriter(rawio, len(data)*2)
   3103         textio = self.TextIOWrapper(bufio, encoding='ascii',
   3104                                     write_through=True)
   3105         # write to the buffered io but don't overflow the buffer
   3106         text = data.decode('ascii')
   3107         textio.write(text)
   3108 
   3109         # buffer.flush is not called with write_through=True
   3110         self.assertFalse(flush_called)
   3111         # buffer.write *is* called with write_through=True
   3112         self.assertTrue(write_called)
   3113         self.assertEqual(rawio.getvalue(), b"") # no flush
   3114 
   3115         write_called = [] # reset
   3116         textio.write(text * 10) # total content is larger than bufio buffer
   3117         self.assertTrue(write_called)
   3118         self.assertEqual(rawio.getvalue(), data * 11) # all flushed
   3119 
   3120     def test_read_nonbytes(self):
   3121         # Issue #17106
   3122         # Crash when underlying read() returns non-bytes
   3123         t = self.TextIOWrapper(self.StringIO('a'))
   3124         self.assertRaises(TypeError, t.read, 1)
   3125         t = self.TextIOWrapper(self.StringIO('a'))
   3126         self.assertRaises(TypeError, t.readline)
   3127         t = self.TextIOWrapper(self.StringIO('a'))
   3128         self.assertRaises(TypeError, t.read)
   3129 
   3130     def test_illegal_decoder(self):
   3131         # Issue #17106
   3132         # Bypass the early encoding check added in issue 20404
   3133         def _make_illegal_wrapper():
   3134             quopri = codecs.lookup("quopri")
   3135             quopri._is_text_encoding = True
   3136             try:
   3137                 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
   3138                                        newline='\n', encoding="quopri")
   3139             finally:
   3140                 quopri._is_text_encoding = False
   3141             return t
   3142         # Crash when decoder returns non-string
   3143         t = _make_illegal_wrapper()
   3144         self.assertRaises(TypeError, t.read, 1)
   3145         t = _make_illegal_wrapper()
   3146         self.assertRaises(TypeError, t.readline)
   3147         t = _make_illegal_wrapper()
   3148         self.assertRaises(TypeError, t.read)
   3149 
   3150     def _check_create_at_shutdown(self, **kwargs):
   3151         # Issue #20037: creating a TextIOWrapper at shutdown
   3152         # shouldn't crash the interpreter.
   3153         iomod = self.io.__name__
   3154         code = """if 1:
   3155             import codecs
   3156             import {iomod} as io
   3157 
   3158             # Avoid looking up codecs at shutdown
   3159             codecs.lookup('utf-8')
   3160 
   3161             class C:
   3162                 def __init__(self):
   3163                     self.buf = io.BytesIO()
   3164                 def __del__(self):
   3165                     io.TextIOWrapper(self.buf, **{kwargs})
   3166                     print("ok")
   3167             c = C()
   3168             """.format(iomod=iomod, kwargs=kwargs)
   3169         return assert_python_ok("-c", code)
   3170 
   3171     @support.requires_type_collecting
   3172     def test_create_at_shutdown_without_encoding(self):
   3173         rc, out, err = self._check_create_at_shutdown()
   3174         if err:
   3175             # Can error out with a RuntimeError if the module state
   3176             # isn't found.
   3177             self.assertIn(self.shutdown_error, err.decode())
   3178         else:
   3179             self.assertEqual("ok", out.decode().strip())
   3180 
   3181     @support.requires_type_collecting
   3182     def test_create_at_shutdown_with_encoding(self):
   3183         rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
   3184                                                       errors='strict')
   3185         self.assertFalse(err)
   3186         self.assertEqual("ok", out.decode().strip())
   3187 
   3188     def test_read_byteslike(self):
   3189         r = MemviewBytesIO(b'Just some random string\n')
   3190         t = self.TextIOWrapper(r, 'utf-8')
   3191 
   3192         # TextIOwrapper will not read the full string, because
   3193         # we truncate it to a multiple of the native int size
   3194         # so that we can construct a more complex memoryview.
   3195         bytes_val =  _to_memoryview(r.getvalue()).tobytes()
   3196 
   3197         self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
   3198 
   3199     def test_issue22849(self):
   3200         class F(object):
   3201             def readable(self): return True
   3202             def writable(self): return True
   3203             def seekable(self): return True
   3204 
   3205         for i in range(10):
   3206             try:
   3207                 self.TextIOWrapper(F(), encoding='utf-8')
   3208             except Exception:
   3209                 pass
   3210 
   3211         F.tell = lambda x: 0
   3212         t = self.TextIOWrapper(F(), encoding='utf-8')
   3213 
   3214 
   3215 class MemviewBytesIO(io.BytesIO):
   3216     '''A BytesIO object whose read method returns memoryviews
   3217        rather than bytes'''
   3218 
   3219     def read1(self, len_):
   3220         return _to_memoryview(super().read1(len_))
   3221 
   3222     def read(self, len_):
   3223         return _to_memoryview(super().read(len_))
   3224 
   3225 def _to_memoryview(buf):
   3226     '''Convert bytes-object *buf* to a non-trivial memoryview'''
   3227 
   3228     arr = array.array('i')
   3229     idx = len(buf) - len(buf) % arr.itemsize
   3230     arr.frombytes(buf[:idx])
   3231     return memoryview(arr)
   3232 
   3233 
   3234 class CTextIOWrapperTest(TextIOWrapperTest):
   3235     io = io
   3236     shutdown_error = "RuntimeError: could not find io module state"
   3237 
   3238     def test_initialization(self):
   3239         r = self.BytesIO(b"\xc3\xa9\n\n")
   3240         b = self.BufferedReader(r, 1000)
   3241         t = self.TextIOWrapper(b)
   3242         self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
   3243         self.assertRaises(ValueError, t.read)
   3244 
   3245         t = self.TextIOWrapper.__new__(self.TextIOWrapper)
   3246         self.assertRaises(Exception, repr, t)
   3247 
   3248     def test_garbage_collection(self):
   3249         # C TextIOWrapper objects are collected, and collecting them flushes
   3250         # all data to disk.
   3251         # The Python version has __del__, so it ends in gc.garbage instead.
   3252         with support.check_warnings(('', ResourceWarning)):
   3253             rawio = io.FileIO(support.TESTFN, "wb")
   3254             b = self.BufferedWriter(rawio)
   3255             t = self.TextIOWrapper(b, encoding="ascii")
   3256             t.write("456def")
   3257             t.x = t
   3258             wr = weakref.ref(t)
   3259             del t
   3260             support.gc_collect()
   3261         self.assertIsNone(wr(), wr)
   3262         with self.open(support.TESTFN, "rb") as f:
   3263             self.assertEqual(f.read(), b"456def")
   3264 
   3265     def test_rwpair_cleared_before_textio(self):
   3266         # Issue 13070: TextIOWrapper's finalization would crash when called
   3267         # after the reference to the underlying BufferedRWPair's writer got
   3268         # cleared by the GC.
   3269         for i in range(1000):
   3270             b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
   3271             t1 = self.TextIOWrapper(b1, encoding="ascii")
   3272             b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
   3273             t2 = self.TextIOWrapper(b2, encoding="ascii")
   3274             # circular references
   3275             t1.buddy = t2
   3276             t2.buddy = t1
   3277         support.gc_collect()
   3278 
   3279 
   3280 class PyTextIOWrapperTest(TextIOWrapperTest):
   3281     io = pyio
   3282     shutdown_error = "LookupError: unknown encoding: ascii"
   3283 
   3284 
   3285 class IncrementalNewlineDecoderTest(unittest.TestCase):
   3286 
   3287     def check_newline_decoding_utf8(self, decoder):
   3288         # UTF-8 specific tests for a newline decoder
   3289         def _check_decode(b, s, **kwargs):
   3290             # We exercise getstate() / setstate() as well as decode()
   3291             state = decoder.getstate()
   3292             self.assertEqual(decoder.decode(b, **kwargs), s)
   3293             decoder.setstate(state)
   3294             self.assertEqual(decoder.decode(b, **kwargs), s)
   3295 
   3296         _check_decode(b'\xe8\xa2\x88', "\u8888")
   3297 
   3298         _check_decode(b'\xe8', "")
   3299         _check_decode(b'\xa2', "")
   3300         _check_decode(b'\x88', "\u8888")
   3301 
   3302         _check_decode(b'\xe8', "")
   3303         _check_decode(b'\xa2', "")
   3304         _check_decode(b'\x88', "\u8888")
   3305 
   3306         _check_decode(b'\xe8', "")
   3307         self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
   3308 
   3309         decoder.reset()
   3310         _check_decode(b'\n', "\n")
   3311         _check_decode(b'\r', "")
   3312         _check_decode(b'', "\n", final=True)
   3313         _check_decode(b'\r', "\n", final=True)
   3314 
   3315         _check_decode(b'\r', "")
   3316         _check_decode(b'a', "\na")
   3317 
   3318         _check_decode(b'\r\r\n', "\n\n")
   3319         _check_decode(b'\r', "")
   3320         _check_decode(b'\r', "\n")
   3321         _check_decode(b'\na', "\na")
   3322 
   3323         _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
   3324         _check_decode(b'\xe8\xa2\x88', "\u8888")
   3325         _check_decode(b'\n', "\n")
   3326         _check_decode(b'\xe8\xa2\x88\r', "\u8888")
   3327         _check_decode(b'\n', "\n")
   3328 
   3329     def check_newline_decoding(self, decoder, encoding):
   3330         result = []
   3331         if encoding is not None:
   3332             encoder = codecs.getincrementalencoder(encoding)()
   3333             def _decode_bytewise(s):
   3334                 # Decode one byte at a time
   3335                 for b in encoder.encode(s):
   3336                     result.append(decoder.decode(bytes([b])))
   3337         else:
   3338             encoder = None
   3339             def _decode_bytewise(s):
   3340                 # Decode one char at a time
   3341                 for c in s:
   3342                     result.append(decoder.decode(c))
   3343         self.assertEqual(decoder.newlines, None)
   3344         _decode_bytewise("abc\n\r")
   3345         self.assertEqual(decoder.newlines, '\n')
   3346         _decode_bytewise("\nabc")
   3347         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
   3348         _decode_bytewise("abc\r")
   3349         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
   3350         _decode_bytewise("abc")
   3351         self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
   3352         _decode_bytewise("abc\r")
   3353         self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
   3354         decoder.reset()
   3355         input = "abc"
   3356         if encoder is not None:
   3357             encoder.reset()
   3358             input = encoder.encode(input)
   3359         self.assertEqual(decoder.decode(input), "abc")
   3360         self.assertEqual(decoder.newlines, None)
   3361 
   3362     def test_newline_decoder(self):
   3363         encodings = (
   3364             # None meaning the IncrementalNewlineDecoder takes unicode input
   3365             # rather than bytes input
   3366             None, 'utf-8', 'latin-1',
   3367             'utf-16', 'utf-16-le', 'utf-16-be',
   3368             'utf-32', 'utf-32-le', 'utf-32-be',
   3369         )
   3370         for enc in encodings:
   3371             decoder = enc and codecs.getincrementaldecoder(enc)()
   3372             decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
   3373             self.check_newline_decoding(decoder, enc)
   3374         decoder = codecs.getincrementaldecoder("utf-8")()
   3375         decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
   3376         self.check_newline_decoding_utf8(decoder)
   3377 
   3378     def test_newline_bytes(self):
   3379         # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
   3380         def _check(dec):
   3381             self.assertEqual(dec.newlines, None)
   3382             self.assertEqual(dec.decode("\u0D00"), "\u0D00")
   3383             self.assertEqual(dec.newlines, None)
   3384             self.assertEqual(dec.decode("\u0A00"), "\u0A00")
   3385             self.assertEqual(dec.newlines, None)
   3386         dec = self.IncrementalNewlineDecoder(None, translate=False)
   3387         _check(dec)
   3388         dec = self.IncrementalNewlineDecoder(None, translate=True)
   3389         _check(dec)
   3390 
   3391 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
   3392     pass
   3393 
   3394 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
   3395     pass
   3396 
   3397 
   3398 # XXX Tests for open()
   3399 
   3400 class MiscIOTest(unittest.TestCase):
   3401 
   3402     def tearDown(self):
   3403         support.unlink(support.TESTFN)
   3404 
   3405     def test___all__(self):
   3406         for name in self.io.__all__:
   3407             obj = getattr(self.io, name, None)
   3408             self.assertIsNotNone(obj, name)
   3409             if name == "open":
   3410                 continue
   3411             elif "error" in name.lower() or name == "UnsupportedOperation":
   3412                 self.assertTrue(issubclass(obj, Exception), name)
   3413             elif not name.startswith("SEEK_"):
   3414                 self.assertTrue(issubclass(obj, self.IOBase))
   3415 
   3416     def test_attributes(self):
   3417         f = self.open(support.TESTFN, "wb", buffering=0)
   3418         self.assertEqual(f.mode, "wb")
   3419         f.close()
   3420 
   3421         with support.check_warnings(('', DeprecationWarning)):
   3422             f = self.open(support.TESTFN, "U")
   3423         self.assertEqual(f.name,            support.TESTFN)
   3424         self.assertEqual(f.buffer.name,     support.TESTFN)
   3425         self.assertEqual(f.buffer.raw.name, support.TESTFN)
   3426         self.assertEqual(f.mode,            "U")
   3427         self.assertEqual(f.buffer.mode,     "rb")
   3428         self.assertEqual(f.buffer.raw.mode, "rb")
   3429         f.close()
   3430 
   3431         f = self.open(support.TESTFN, "w+")
   3432         self.assertEqual(f.mode,            "w+")
   3433         self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
   3434         self.assertEqual(f.buffer.raw.mode, "rb+")
   3435 
   3436         g = self.open(f.fileno(), "wb", closefd=False)
   3437         self.assertEqual(g.mode,     "wb")
   3438         self.assertEqual(g.raw.mode, "wb")
   3439         self.assertEqual(g.name,     f.fileno())
   3440         self.assertEqual(g.raw.name, f.fileno())
   3441         f.close()
   3442         g.close()
   3443 
   3444     def test_io_after_close(self):
   3445         for kwargs in [
   3446                 {"mode": "w"},
   3447                 {"mode": "wb"},
   3448                 {"mode": "w", "buffering": 1},
   3449                 {"mode": "w", "buffering": 2},
   3450                 {"mode": "wb", "buffering": 0},
   3451                 {"mode": "r"},
   3452                 {"mode": "rb"},
   3453                 {"mode": "r", "buffering": 1},
   3454                 {"mode": "r", "buffering": 2},
   3455                 {"mode": "rb", "buffering": 0},
   3456                 {"mode": "w+"},
   3457                 {"mode": "w+b"},
   3458                 {"mode": "w+", "buffering": 1},
   3459                 {"mode": "w+", "buffering": 2},
   3460                 {"mode": "w+b", "buffering": 0},
   3461             ]:
   3462             f = self.open(support.TESTFN, **kwargs)
   3463             f.close()
   3464             self.assertRaises(ValueError, f.flush)
   3465             self.assertRaises(ValueError, f.fileno)
   3466             self.assertRaises(ValueError, f.isatty)
   3467             self.assertRaises(ValueError, f.__iter__)
   3468             if hasattr(f, "peek"):
   3469                 self.assertRaises(ValueError, f.peek, 1)
   3470             self.assertRaises(ValueError, f.read)
   3471             if hasattr(f, "read1"):
   3472                 self.assertRaises(ValueError, f.read1, 1024)
   3473             if hasattr(f, "readall"):
   3474                 self.assertRaises(ValueError, f.readall)
   3475             if hasattr(f, "readinto"):
   3476                 self.assertRaises(ValueError, f.readinto, bytearray(1024))
   3477             if hasattr(f, "readinto1"):
   3478                 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
   3479             self.assertRaises(ValueError, f.readline)
   3480             self.assertRaises(ValueError, f.readlines)
   3481             self.assertRaises(ValueError, f.seek, 0)
   3482             self.assertRaises(ValueError, f.tell)
   3483             self.assertRaises(ValueError, f.truncate)
   3484             self.assertRaises(ValueError, f.write,
   3485                               b"" if "b" in kwargs['mode'] else "")
   3486             self.assertRaises(ValueError, f.writelines, [])
   3487             self.assertRaises(ValueError, next, f)
   3488 
   3489     def test_blockingioerror(self):
   3490         # Various BlockingIOError issues
   3491         class C(str):
   3492             pass
   3493         c = C("")
   3494         b = self.BlockingIOError(1, c)
   3495         c.b = b
   3496         b.c = c
   3497         wr = weakref.ref(c)
   3498         del c, b
   3499         support.gc_collect()
   3500         self.assertIsNone(wr(), wr)
   3501 
   3502     def test_abcs(self):
   3503         # Test the visible base classes are ABCs.
   3504         self.assertIsInstance(self.IOBase, abc.ABCMeta)
   3505         self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
   3506         self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
   3507         self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
   3508 
   3509     def _check_abc_inheritance(self, abcmodule):
   3510         with self.open(support.TESTFN, "wb", buffering=0) as f:
   3511             self.assertIsInstance(f, abcmodule.IOBase)
   3512             self.assertIsInstance(f, abcmodule.RawIOBase)
   3513             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
   3514             self.assertNotIsInstance(f, abcmodule.TextIOBase)
   3515         with self.open(support.TESTFN, "wb") as f:
   3516             self.assertIsInstance(f, abcmodule.IOBase)
   3517             self.assertNotIsInstance(f, abcmodule.RawIOBase)
   3518             self.assertIsInstance(f, abcmodule.BufferedIOBase)
   3519             self.assertNotIsInstance(f, abcmodule.TextIOBase)
   3520         with self.open(support.TESTFN, "w") as f:
   3521             self.assertIsInstance(f, abcmodule.IOBase)
   3522             self.assertNotIsInstance(f, abcmodule.RawIOBase)
   3523             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
   3524             self.assertIsInstance(f, abcmodule.TextIOBase)
   3525 
   3526     def test_abc_inheritance(self):
   3527         # Test implementations inherit from their respective ABCs
   3528         self._check_abc_inheritance(self)
   3529 
   3530     def test_abc_inheritance_official(self):
   3531         # Test implementations inherit from the official ABCs of the
   3532         # baseline "io" module.
   3533         self._check_abc_inheritance(io)
   3534 
   3535     def _check_warn_on_dealloc(self, *args, **kwargs):
   3536         f = open(*args, **kwargs)
   3537         r = repr(f)
   3538         with self.assertWarns(ResourceWarning) as cm:
   3539             f = None
   3540             support.gc_collect()
   3541         self.assertIn(r, str(cm.warning.args[0]))
   3542 
   3543     def test_warn_on_dealloc(self):
   3544         self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
   3545         self._check_warn_on_dealloc(support.TESTFN, "wb")
   3546         self._check_warn_on_dealloc(support.TESTFN, "w")
   3547 
   3548     def _check_warn_on_dealloc_fd(self, *args, **kwargs):
   3549         fds = []
   3550         def cleanup_fds():
   3551             for fd in fds:
   3552                 try:
   3553                     os.close(fd)
   3554                 except OSError as e:
   3555                     if e.errno != errno.EBADF:
   3556                         raise
   3557         self.addCleanup(cleanup_fds)
   3558         r, w = os.pipe()
   3559         fds += r, w
   3560         self._check_warn_on_dealloc(r, *args, **kwargs)
   3561         # When using closefd=False, there's no warning
   3562         r, w = os.pipe()
   3563         fds += r, w
   3564         with support.check_no_resource_warning(self):
   3565             open(r, *args, closefd=False, **kwargs)
   3566 
   3567     def test_warn_on_dealloc_fd(self):
   3568         self._check_warn_on_dealloc_fd("rb", buffering=0)
   3569         self._check_warn_on_dealloc_fd("rb")
   3570         self._check_warn_on_dealloc_fd("r")
   3571 
   3572 
   3573     def test_pickling(self):
   3574         # Pickling file objects is forbidden
   3575         for kwargs in [
   3576                 {"mode": "w"},
   3577                 {"mode": "wb"},
   3578                 {"mode": "wb", "buffering": 0},
   3579                 {"mode": "r"},
   3580                 {"mode": "rb"},
   3581                 {"mode": "rb", "buffering": 0},
   3582                 {"mode": "w+"},
   3583                 {"mode": "w+b"},
   3584                 {"mode": "w+b", "buffering": 0},
   3585             ]:
   3586             for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
   3587                 with self.open(support.TESTFN, **kwargs) as f:
   3588                     self.assertRaises(TypeError, pickle.dumps, f, protocol)
   3589 
   3590     def test_nonblock_pipe_write_bigbuf(self):
   3591         self._test_nonblock_pipe_write(16*1024)
   3592 
   3593     def test_nonblock_pipe_write_smallbuf(self):
   3594         self._test_nonblock_pipe_write(1024)
   3595 
   3596     @unittest.skipUnless(hasattr(os, 'set_blocking'),
   3597                          'os.set_blocking() required for this test')
   3598     def _test_nonblock_pipe_write(self, bufsize):
   3599         sent = []
   3600         received = []
   3601         r, w = os.pipe()
   3602         os.set_blocking(r, False)
   3603         os.set_blocking(w, False)
   3604 
   3605         # To exercise all code paths in the C implementation we need
   3606         # to play with buffer sizes.  For instance, if we choose a
   3607         # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
   3608         # then we will never get a partial write of the buffer.
   3609         rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
   3610         wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
   3611 
   3612         with rf, wf:
   3613             for N in 9999, 73, 7574:
   3614                 try:
   3615                     i = 0
   3616                     while True:
   3617                         msg = bytes([i % 26 + 97]) * N
   3618                         sent.append(msg)
   3619                         wf.write(msg)
   3620                         i += 1
   3621 
   3622                 except self.BlockingIOError as e:
   3623                     self.assertEqual(e.args[0], errno.EAGAIN)
   3624                     self.assertEqual(e.args[2], e.characters_written)
   3625                     sent[-1] = sent[-1][:e.characters_written]
   3626                     received.append(rf.read())
   3627                     msg = b'BLOCKED'
   3628                     wf.write(msg)
   3629                     sent.append(msg)
   3630 
   3631             while True:
   3632                 try:
   3633                     wf.flush()
   3634                     break
   3635                 except self.BlockingIOError as e:
   3636                     self.assertEqual(e.args[0], errno.EAGAIN)
   3637                     self.assertEqual(e.args[2], e.characters_written)
   3638                     self.assertEqual(e.characters_written, 0)
   3639                     received.append(rf.read())
   3640 
   3641             received += iter(rf.read, None)
   3642 
   3643         sent, received = b''.join(sent), b''.join(received)
   3644         self.assertEqual(sent, received)
   3645         self.assertTrue(wf.closed)
   3646         self.assertTrue(rf.closed)
   3647 
   3648     def test_create_fail(self):
   3649         # 'x' mode fails if file is existing
   3650         with self.open(support.TESTFN, 'w'):
   3651             pass
   3652         self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
   3653 
   3654     def test_create_writes(self):
   3655         # 'x' mode opens for writing
   3656         with self.open(support.TESTFN, 'xb') as f:
   3657             f.write(b"spam")
   3658         with self.open(support.TESTFN, 'rb') as f:
   3659             self.assertEqual(b"spam", f.read())
   3660 
   3661     def test_open_allargs(self):
   3662         # there used to be a buffer overflow in the parser for rawmode
   3663         self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
   3664 
   3665 
   3666 class CMiscIOTest(MiscIOTest):
   3667     io = io
   3668 
   3669     def test_readinto_buffer_overflow(self):
   3670         # Issue #18025
   3671         class BadReader(self.io.BufferedIOBase):
   3672             def read(self, n=-1):
   3673                 return b'x' * 10**6
   3674         bufio = BadReader()
   3675         b = bytearray(2)
   3676         self.assertRaises(ValueError, bufio.readinto, b)
   3677 
   3678     @unittest.skipUnless(threading, 'Threading required for this test.')
   3679     def check_daemon_threads_shutdown_deadlock(self, stream_name):
   3680         # Issue #23309: deadlocks at shutdown should be avoided when a
   3681         # daemon thread and the main thread both write to a file.
   3682         code = """if 1:
   3683             import sys
   3684             import time
   3685             import threading
   3686 
   3687             file = sys.{stream_name}
   3688 
   3689             def run():
   3690                 while True:
   3691                     file.write('.')
   3692                     file.flush()
   3693 
   3694             thread = threading.Thread(target=run)
   3695             thread.daemon = True
   3696             thread.start()
   3697 
   3698             time.sleep(0.5)
   3699             file.write('!')
   3700             file.flush()
   3701             """.format_map(locals())
   3702         res, _ = run_python_until_end("-c", code)
   3703         err = res.err.decode()
   3704         if res.rc != 0:
   3705             # Failure: should be a fatal error
   3706             self.assertIn("Fatal Python error: could not acquire lock "
   3707                           "for <_io.BufferedWriter name='<{stream_name}>'> "
   3708                           "at interpreter shutdown, possibly due to "
   3709                           "daemon threads".format_map(locals()),
   3710                           err)
   3711         else:
   3712             self.assertFalse(err.strip('.!'))
   3713 
   3714     def test_daemon_threads_shutdown_stdout_deadlock(self):
   3715         self.check_daemon_threads_shutdown_deadlock('stdout')
   3716 
   3717     def test_daemon_threads_shutdown_stderr_deadlock(self):
   3718         self.check_daemon_threads_shutdown_deadlock('stderr')
   3719 
   3720 
   3721 class PyMiscIOTest(MiscIOTest):
   3722     io = pyio
   3723 
   3724 
   3725 @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
   3726 class SignalsTest(unittest.TestCase):
   3727 
   3728     def setUp(self):
   3729         self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
   3730 
   3731     def tearDown(self):
   3732         signal.signal(signal.SIGALRM, self.oldalrm)
   3733 
   3734     def alarm_interrupt(self, sig, frame):
   3735         1/0
   3736 
   3737     @unittest.skipUnless(threading, 'Threading required for this test.')
   3738     def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
   3739         """Check that a partial write, when it gets interrupted, properly
   3740         invokes the signal handler, and bubbles up the exception raised
   3741         in the latter."""
   3742         read_results = []
   3743         def _read():
   3744             if hasattr(signal, 'pthread_sigmask'):
   3745                 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
   3746             s = os.read(r, 1)
   3747             read_results.append(s)
   3748         t = threading.Thread(target=_read)
   3749         t.daemon = True
   3750         r, w = os.pipe()
   3751         fdopen_kwargs["closefd"] = False
   3752         large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
   3753         try:
   3754             wio = self.io.open(w, **fdopen_kwargs)
   3755             t.start()
   3756             # Fill the pipe enough that the write will be blocking.
   3757             # It will be interrupted by the timer armed above.  Since the
   3758             # other thread has read one byte, the low-level write will
   3759             # return with a successful (partial) result rather than an EINTR.
   3760             # The buffered IO layer must check for pending signal
   3761             # handlers, which in this case will invoke alarm_interrupt().
   3762             signal.alarm(1)
   3763             try:
   3764                 self.assertRaises(ZeroDivisionError, wio.write, large_data)
   3765             finally:
   3766                 signal.alarm(0)
   3767                 t.join()
   3768             # We got one byte, get another one and check that it isn't a
   3769             # repeat of the first one.
   3770             read_results.append(os.read(r, 1))
   3771             self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
   3772         finally:
   3773             os.close(w)
   3774             os.close(r)
   3775             # This is deliberate. If we didn't close the file descriptor
   3776             # before closing wio, wio would try to flush its internal
   3777             # buffer, and block again.
   3778             try:
   3779                 wio.close()
   3780             except OSError as e:
   3781                 if e.errno != errno.EBADF:
   3782                     raise
   3783 
   3784     def test_interrupted_write_unbuffered(self):
   3785         self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
   3786 
   3787     def test_interrupted_write_buffered(self):
   3788         self.check_interrupted_write(b"xy", b"xy", mode="wb")
   3789 
   3790     # Issue #22331: The test hangs on FreeBSD 7.2
   3791     @support.requires_freebsd_version(8)
   3792     def test_interrupted_write_text(self):
   3793         self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
   3794 
   3795     @support.no_tracing
   3796     def check_reentrant_write(self, data, **fdopen_kwargs):
   3797         def on_alarm(*args):
   3798             # Will be called reentrantly from the same thread
   3799             wio.write(data)
   3800             1/0
   3801         signal.signal(signal.SIGALRM, on_alarm)
   3802         r, w = os.pipe()
   3803         wio = self.io.open(w, **fdopen_kwargs)
   3804         try:
   3805             signal.alarm(1)
   3806             # Either the reentrant call to wio.write() fails with RuntimeError,
   3807             # or the signal handler raises ZeroDivisionError.
   3808             with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
   3809                 while 1:
   3810                     for i in range(100):
   3811                         wio.write(data)
   3812                         wio.flush()
   3813                     # Make sure the buffer doesn't fill up and block further writes
   3814                     os.read(r, len(data) * 100)
   3815             exc = cm.exception
   3816             if isinstance(exc, RuntimeError):
   3817                 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
   3818         finally:
   3819             wio.close()
   3820             os.close(r)
   3821 
   3822     def test_reentrant_write_buffered(self):
   3823         self.check_reentrant_write(b"xy", mode="wb")
   3824 
   3825     def test_reentrant_write_text(self):
   3826         self.check_reentrant_write("xy", mode="w", encoding="ascii")
   3827 
   3828     def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
   3829         """Check that a buffered read, when it gets interrupted (either
   3830         returning a partial result or EINTR), properly invokes the signal
   3831         handler and retries if the latter returned successfully."""
   3832         r, w = os.pipe()
   3833         fdopen_kwargs["closefd"] = False
   3834         def alarm_handler(sig, frame):
   3835             os.write(w, b"bar")
   3836         signal.signal(signal.SIGALRM, alarm_handler)
   3837         try:
   3838             rio = self.io.open(r, **fdopen_kwargs)
   3839             os.write(w, b"foo")
   3840             signal.alarm(1)
   3841             # Expected behaviour:
   3842             # - first raw read() returns partial b"foo"
   3843             # - second raw read() returns EINTR
   3844             # - third raw read() returns b"bar"
   3845             self.assertEqual(decode(rio.read(6)), "foobar")
   3846         finally:
   3847             rio.close()
   3848             os.close(w)
   3849             os.close(r)
   3850 
   3851     def test_interrupted_read_retry_buffered(self):
   3852         self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
   3853                                           mode="rb")
   3854 
   3855     def test_interrupted_read_retry_text(self):
   3856         self.check_interrupted_read_retry(lambda x: x,
   3857                                           mode="r")
   3858 
   3859     @unittest.skipUnless(threading, 'Threading required for this test.')
   3860     def check_interrupted_write_retry(self, item, **fdopen_kwargs):
   3861         """Check that a buffered write, when it gets interrupted (either
   3862         returning a partial result or EINTR), properly invokes the signal
   3863         handler and retries if the latter returned successfully."""
   3864         select = support.import_module("select")
   3865 
   3866         # A quantity that exceeds the buffer size of an anonymous pipe's
   3867         # write end.
   3868         N = support.PIPE_MAX_SIZE
   3869         r, w = os.pipe()
   3870         fdopen_kwargs["closefd"] = False
   3871 
   3872         # We need a separate thread to read from the pipe and allow the
   3873         # write() to finish.  This thread is started after the SIGALRM is
   3874         # received (forcing a first EINTR in write()).
   3875         read_results = []
   3876         write_finished = False
   3877         error = None
   3878         def _read():
   3879             try:
   3880                 while not write_finished:
   3881                     while r in select.select([r], [], [], 1.0)[0]:
   3882                         s = os.read(r, 1024)
   3883                         read_results.append(s)
   3884             except BaseException as exc:
   3885                 nonlocal error
   3886                 error = exc
   3887         t = threading.Thread(target=_read)
   3888         t.daemon = True
   3889         def alarm1(sig, frame):
   3890             signal.signal(signal.SIGALRM, alarm2)
   3891             signal.alarm(1)
   3892         def alarm2(sig, frame):
   3893             t.start()
   3894 
   3895         large_data = item * N
   3896         signal.signal(signal.SIGALRM, alarm1)
   3897         try:
   3898             wio = self.io.open(w, **fdopen_kwargs)
   3899             signal.alarm(1)
   3900             # Expected behaviour:
   3901             # - first raw write() is partial (because of the limited pipe buffer
   3902             #   and the first alarm)
   3903             # - second raw write() returns EINTR (because of the second alarm)
   3904             # - subsequent write()s are successful (either partial or complete)
   3905             written = wio.write(large_data)
   3906             self.assertEqual(N, written)
   3907 
   3908             wio.flush()
   3909             write_finished = True
   3910             t.join()
   3911 
   3912             self.assertIsNone(error)
   3913             self.assertEqual(N, sum(len(x) for x in read_results))
   3914         finally:
   3915             write_finished = True
   3916             os.close(w)
   3917             os.close(r)
   3918             # This is deliberate. If we didn't close the file descriptor
   3919             # before closing wio, wio would try to flush its internal
   3920             # buffer, and could block (in case of failure).
   3921             try:
   3922                 wio.close()
   3923             except OSError as e:
   3924                 if e.errno != errno.EBADF:
   3925                     raise
   3926 
   3927     def test_interrupted_write_retry_buffered(self):
   3928         self.check_interrupted_write_retry(b"x", mode="wb")
   3929 
   3930     def test_interrupted_write_retry_text(self):
   3931         self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
   3932 
   3933 
   3934 class CSignalsTest(SignalsTest):
   3935     io = io
   3936 
   3937 class PySignalsTest(SignalsTest):
   3938     io = pyio
   3939 
   3940     # Handling reentrancy issues would slow down _pyio even more, so the
   3941     # tests are disabled.
   3942     test_reentrant_write_buffered = None
   3943     test_reentrant_write_text = None
   3944 
   3945 
   3946 def load_tests(*args):
   3947     tests = (CIOTest, PyIOTest, APIMismatchTest,
   3948              CBufferedReaderTest, PyBufferedReaderTest,
   3949              CBufferedWriterTest, PyBufferedWriterTest,
   3950              CBufferedRWPairTest, PyBufferedRWPairTest,
   3951              CBufferedRandomTest, PyBufferedRandomTest,
   3952              StatefulIncrementalDecoderTest,
   3953              CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
   3954              CTextIOWrapperTest, PyTextIOWrapperTest,
   3955              CMiscIOTest, PyMiscIOTest,
   3956              CSignalsTest, PySignalsTest,
   3957              )
   3958 
   3959     # Put the namespaces of the IO module we are testing and some useful mock
   3960     # classes in the __dict__ of each test.
   3961     mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
   3962              MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
   3963     all_members = io.__all__ + ["IncrementalNewlineDecoder"]
   3964     c_io_ns = {name : getattr(io, name) for name in all_members}
   3965     py_io_ns = {name : getattr(pyio, name) for name in all_members}
   3966     globs = globals()
   3967     c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
   3968     py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
   3969     # Avoid turning open into a bound method.
   3970     py_io_ns["open"] = pyio.OpenWrapper
   3971     for test in tests:
   3972         if test.__name__.startswith("C"):
   3973             for name, obj in c_io_ns.items():
   3974                 setattr(test, name, obj)
   3975         elif test.__name__.startswith("Py"):
   3976             for name, obj in py_io_ns.items():
   3977                 setattr(test, name, obj)
   3978 
   3979     suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
   3980     return suite
   3981 
   3982 if __name__ == "__main__":
   3983     unittest.main()
   3984