Home | History | Annotate | Download | only in test
      1 """Unit tests for the io module."""
      2 
      3 # Tests of io are scattered over the test suite:
      4 # * test_bufio - tests file buffering
      5 # * test_memoryio - tests BytesIO and StringIO
      6 # * test_fileio - tests FileIO
      7 # * test_file - tests the file interface
      8 # * test_io - tests everything else in the io module
      9 # * test_univnewlines - tests universal newline support
     10 # * test_largefile - tests operations on a file greater than 2**32 bytes
     11 #     (only enabled with -ulargefile)
     12 
     13 ################################################################################
     14 # ATTENTION TEST WRITERS!!!
     15 ################################################################################
     16 # When writing tests for io, it's important to test both the C and Python
     17 # implementations. This is usually done by writing a base test that refers to
     18 # the type it is testing as a attribute. Then it provides custom subclasses to
     19 # test both implementations. This file has lots of examples.
     20 ################################################################################
     21 
     22 from __future__ import print_function
     23 from __future__ import unicode_literals
     24 
     25 import os
     26 import sys
     27 import time
     28 import array
     29 import random
     30 import unittest
     31 import weakref
     32 import abc
     33 import signal
     34 import errno
     35 from itertools import cycle, count
     36 from collections import deque
     37 from UserList import UserList
     38 from test import test_support as support
     39 import contextlib
     40 
     41 import codecs
     42 import io  # C implementation of io
     43 import _pyio as pyio # Python implementation of io
     44 try:
     45     import threading
     46 except ImportError:
     47     threading = None
     48 try:
     49     import fcntl
     50 except ImportError:
     51     fcntl = None
     52 
     53 __metaclass__ = type
     54 bytes = support.py3k_bytes
     55 
     56 def _default_chunk_size():
     57     """Get the default TextIOWrapper chunk size"""
     58     with io.open(__file__, "r", encoding="latin1") as f:
     59         return f._CHUNK_SIZE
     60 
     61 
     62 class MockRawIOWithoutRead:
     63     """A RawIO implementation without read(), so as to exercise the default
     64     RawIO.read() which calls readinto()."""
     65 
     66     def __init__(self, read_stack=()):
     67         self._read_stack = list(read_stack)
     68         self._write_stack = []
     69         self._reads = 0
     70         self._extraneous_reads = 0
     71 
     72     def write(self, b):
     73         self._write_stack.append(bytes(b))
     74         return len(b)
     75 
     76     def writable(self):
     77         return True
     78 
     79     def fileno(self):
     80         return 42
     81 
     82     def readable(self):
     83         return True
     84 
     85     def seekable(self):
     86         return True
     87 
     88     def seek(self, pos, whence):
     89         return 0   # wrong but we gotta return something
     90 
     91     def tell(self):
     92         return 0   # same comment as above
     93 
     94     def readinto(self, buf):
     95         self._reads += 1
     96         max_len = len(buf)
     97         try:
     98             data = self._read_stack[0]
     99         except IndexError:
    100             self._extraneous_reads += 1
    101             return 0
    102         if data is None:
    103             del self._read_stack[0]
    104             return None
    105         n = len(data)
    106         if len(data) <= max_len:
    107             del self._read_stack[0]
    108             buf[:n] = data
    109             return n
    110         else:
    111             buf[:] = data[:max_len]
    112             self._read_stack[0] = data[max_len:]
    113             return max_len
    114 
    115     def truncate(self, pos=None):
    116         return pos
    117 
    118 class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
    119     pass
    120 
    121 class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
    122     pass
    123 
    124 
    125 class MockRawIO(MockRawIOWithoutRead):
    126 
    127     def read(self, n=None):
    128         self._reads += 1
    129         try:
    130             return self._read_stack.pop(0)
    131         except:
    132             self._extraneous_reads += 1
    133             return b""
    134 
    135 class CMockRawIO(MockRawIO, io.RawIOBase):
    136     pass
    137 
    138 class PyMockRawIO(MockRawIO, pyio.RawIOBase):
    139     pass
    140 
    141 
    142 class MisbehavedRawIO(MockRawIO):
    143     def write(self, b):
    144         return MockRawIO.write(self, b) * 2
    145 
    146     def read(self, n=None):
    147         return MockRawIO.read(self, n) * 2
    148 
    149     def seek(self, pos, whence):
    150         return -123
    151 
    152     def tell(self):
    153         return -456
    154 
    155     def readinto(self, buf):
    156         MockRawIO.readinto(self, buf)
    157         return len(buf) * 5
    158 
    159 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
    160     pass
    161 
    162 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
    163     pass
    164 
    165 
    166 class CloseFailureIO(MockRawIO):
    167     closed = 0
    168 
    169     def close(self):
    170         if not self.closed:
    171             self.closed = 1
    172             raise IOError
    173 
    174 class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
    175     pass
    176 
    177 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
    178     pass
    179 
    180 
    181 class MockFileIO:
    182 
    183     def __init__(self, data):
    184         self.read_history = []
    185         super(MockFileIO, self).__init__(data)
    186 
    187     def read(self, n=None):
    188         res = super(MockFileIO, self).read(n)
    189         self.read_history.append(None if res is None else len(res))
    190         return res
    191 
    192     def readinto(self, b):
    193         res = super(MockFileIO, self).readinto(b)
    194         self.read_history.append(res)
    195         return res
    196 
    197 class CMockFileIO(MockFileIO, io.BytesIO):
    198     pass
    199 
    200 class PyMockFileIO(MockFileIO, pyio.BytesIO):
    201     pass
    202 
    203 
    204 class MockNonBlockWriterIO:
    205 
    206     def __init__(self):
    207         self._write_stack = []
    208         self._blocker_char = None
    209 
    210     def pop_written(self):
    211         s = b"".join(self._write_stack)
    212         self._write_stack[:] = []
    213         return s
    214 
    215     def block_on(self, char):
    216         """Block when a given char is encountered."""
    217         self._blocker_char = char
    218 
    219     def readable(self):
    220         return True
    221 
    222     def seekable(self):
    223         return True
    224 
    225     def writable(self):
    226         return True
    227 
    228     def write(self, b):
    229         b = bytes(b)
    230         n = -1
    231         if self._blocker_char:
    232             try:
    233                 n = b.index(self._blocker_char)
    234             except ValueError:
    235                 pass
    236             else:
    237                 if n > 0:
    238                     # write data up to the first blocker
    239                     self._write_stack.append(b[:n])
    240                     return n
    241                 else:
    242                     # cancel blocker and indicate would block
    243                     self._blocker_char = None
    244                     return None
    245         self._write_stack.append(b)
    246         return len(b)
    247 
    248 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
    249     BlockingIOError = io.BlockingIOError
    250 
    251 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
    252     BlockingIOError = pyio.BlockingIOError
    253 
    254 
    255 class IOTest(unittest.TestCase):
    256 
    257     def setUp(self):
    258         support.unlink(support.TESTFN)
    259 
    260     def tearDown(self):
    261         support.unlink(support.TESTFN)
    262 
    263     def write_ops(self, f):
    264         self.assertEqual(f.write(b"blah."), 5)
    265         f.truncate(0)
    266         self.assertEqual(f.tell(), 5)
    267         f.seek(0)
    268 
    269         self.assertEqual(f.write(b"blah."), 5)
    270         self.assertEqual(f.seek(0), 0)
    271         self.assertEqual(f.write(b"Hello."), 6)
    272         self.assertEqual(f.tell(), 6)
    273         self.assertEqual(f.seek(-1, 1), 5)
    274         self.assertEqual(f.tell(), 5)
    275         self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
    276         self.assertEqual(f.seek(0), 0)
    277         self.assertEqual(f.write(b"h"), 1)
    278         self.assertEqual(f.seek(-1, 2), 13)
    279         self.assertEqual(f.tell(), 13)
    280 
    281         self.assertEqual(f.truncate(12), 12)
    282         self.assertEqual(f.tell(), 13)
    283         self.assertRaises(TypeError, f.seek, 0.0)
    284 
    285     def read_ops(self, f, buffered=False):
    286         data = f.read(5)
    287         self.assertEqual(data, b"hello")
    288         data = bytearray(data)
    289         self.assertEqual(f.readinto(data), 5)
    290         self.assertEqual(data, b" worl")
    291         self.assertEqual(f.readinto(data), 2)
    292         self.assertEqual(len(data), 5)
    293         self.assertEqual(data[:2], b"d\n")
    294         self.assertEqual(f.seek(0), 0)
    295         self.assertEqual(f.read(20), b"hello world\n")
    296         self.assertEqual(f.read(1), b"")
    297         self.assertEqual(f.readinto(bytearray(b"x")), 0)
    298         self.assertEqual(f.seek(-6, 2), 6)
    299         self.assertEqual(f.read(5), b"world")
    300         self.assertEqual(f.read(0), b"")
    301         self.assertEqual(f.readinto(bytearray()), 0)
    302         self.assertEqual(f.seek(-6, 1), 5)
    303         self.assertEqual(f.read(5), b" worl")
    304         self.assertEqual(f.tell(), 10)
    305         self.assertRaises(TypeError, f.seek, 0.0)
    306         if buffered:
    307             f.seek(0)
    308             self.assertEqual(f.read(), b"hello world\n")
    309             f.seek(6)
    310             self.assertEqual(f.read(), b"world\n")
    311             self.assertEqual(f.read(), b"")
    312 
    313     LARGE = 2**31
    314 
    315     def large_file_ops(self, f):
    316         assert f.readable()
    317         assert f.writable()
    318         self.assertEqual(f.seek(self.LARGE), self.LARGE)
    319         self.assertEqual(f.tell(), self.LARGE)
    320         self.assertEqual(f.write(b"xxx"), 3)
    321         self.assertEqual(f.tell(), self.LARGE + 3)
    322         self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
    323         self.assertEqual(f.truncate(), self.LARGE + 2)
    324         self.assertEqual(f.tell(), self.LARGE + 2)
    325         self.assertEqual(f.seek(0, 2), self.LARGE + 2)
    326         self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
    327         self.assertEqual(f.tell(), self.LARGE + 2)
    328         self.assertEqual(f.seek(0, 2), self.LARGE + 1)
    329         self.assertEqual(f.seek(-1, 2), self.LARGE)
    330         self.assertEqual(f.read(2), b"x")
    331 
    332     def test_invalid_operations(self):
    333         # Try writing on a file opened in read mode and vice-versa.
    334         for mode in ("w", "wb"):
    335             with self.open(support.TESTFN, mode) as fp:
    336                 self.assertRaises(IOError, fp.read)
    337                 self.assertRaises(IOError, fp.readline)
    338         with self.open(support.TESTFN, "rb") as fp:
    339             self.assertRaises(IOError, fp.write, b"blah")
    340             self.assertRaises(IOError, fp.writelines, [b"blah\n"])
    341         with self.open(support.TESTFN, "r") as fp:
    342             self.assertRaises(IOError, fp.write, "blah")
    343             self.assertRaises(IOError, fp.writelines, ["blah\n"])
    344 
    345     def test_raw_file_io(self):
    346         with self.open(support.TESTFN, "wb", buffering=0) as f:
    347             self.assertEqual(f.readable(), False)
    348             self.assertEqual(f.writable(), True)
    349             self.assertEqual(f.seekable(), True)
    350             self.write_ops(f)
    351         with self.open(support.TESTFN, "rb", buffering=0) as f:
    352             self.assertEqual(f.readable(), True)
    353             self.assertEqual(f.writable(), False)
    354             self.assertEqual(f.seekable(), True)
    355             self.read_ops(f)
    356 
    357     def test_buffered_file_io(self):
    358         with self.open(support.TESTFN, "wb") as f:
    359             self.assertEqual(f.readable(), False)
    360             self.assertEqual(f.writable(), True)
    361             self.assertEqual(f.seekable(), True)
    362             self.write_ops(f)
    363         with self.open(support.TESTFN, "rb") as f:
    364             self.assertEqual(f.readable(), True)
    365             self.assertEqual(f.writable(), False)
    366             self.assertEqual(f.seekable(), True)
    367             self.read_ops(f, True)
    368 
    369     def test_readline(self):
    370         with self.open(support.TESTFN, "wb") as f:
    371             f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
    372         with self.open(support.TESTFN, "rb") as f:
    373             self.assertEqual(f.readline(), b"abc\n")
    374             self.assertEqual(f.readline(10), b"def\n")
    375             self.assertEqual(f.readline(2), b"xy")
    376             self.assertEqual(f.readline(4), b"zzy\n")
    377             self.assertEqual(f.readline(), b"foo\x00bar\n")
    378             self.assertEqual(f.readline(None), b"another line")
    379             self.assertRaises(TypeError, f.readline, 5.3)
    380         with self.open(support.TESTFN, "r") as f:
    381             self.assertRaises(TypeError, f.readline, 5.3)
    382 
    383     def test_raw_bytes_io(self):
    384         f = self.BytesIO()
    385         self.write_ops(f)
    386         data = f.getvalue()
    387         self.assertEqual(data, b"hello world\n")
    388         f = self.BytesIO(data)
    389         self.read_ops(f, True)
    390 
    391     def test_large_file_ops(self):
    392         # On Windows and Mac OSX this test comsumes large resources; It takes
    393         # a long time to build the >2GB file and takes >2GB of disk space
    394         # therefore the resource must be enabled to run this test.
    395         if sys.platform[:3] == 'win' or sys.platform == 'darwin':
    396             if not support.is_resource_enabled("largefile"):
    397                 print("\nTesting large file ops skipped on %s." % sys.platform,
    398                       file=sys.stderr)
    399                 print("It requires %d bytes and a long time." % self.LARGE,
    400                       file=sys.stderr)
    401                 print("Use 'regrtest.py -u largefile test_io' to run it.",
    402                       file=sys.stderr)
    403                 return
    404         with self.open(support.TESTFN, "w+b", 0) as f:
    405             self.large_file_ops(f)
    406         with self.open(support.TESTFN, "w+b") as f:
    407             self.large_file_ops(f)
    408 
    409     def test_with_open(self):
    410         for bufsize in (0, 1, 100):
    411             f = None
    412             with self.open(support.TESTFN, "wb", bufsize) as f:
    413                 f.write(b"xxx")
    414             self.assertEqual(f.closed, True)
    415             f = None
    416             try:
    417                 with self.open(support.TESTFN, "wb", bufsize) as f:
    418                     1 // 0
    419             except ZeroDivisionError:
    420                 self.assertEqual(f.closed, True)
    421             else:
    422                 self.fail("1 // 0 didn't raise an exception")
    423 
    424     # issue 5008
    425     def test_append_mode_tell(self):
    426         with self.open(support.TESTFN, "wb") as f:
    427             f.write(b"xxx")
    428         with self.open(support.TESTFN, "ab", buffering=0) as f:
    429             self.assertEqual(f.tell(), 3)
    430         with self.open(support.TESTFN, "ab") as f:
    431             self.assertEqual(f.tell(), 3)
    432         with self.open(support.TESTFN, "a") as f:
    433             self.assertTrue(f.tell() > 0)
    434 
    435     def test_destructor(self):
    436         record = []
    437         class MyFileIO(self.FileIO):
    438             def __del__(self):
    439                 record.append(1)
    440                 try:
    441                     f = super(MyFileIO, self).__del__
    442                 except AttributeError:
    443                     pass
    444                 else:
    445                     f()
    446             def close(self):
    447                 record.append(2)
    448                 super(MyFileIO, self).close()
    449             def flush(self):
    450                 record.append(3)
    451                 super(MyFileIO, self).flush()
    452         f = MyFileIO(support.TESTFN, "wb")
    453         f.write(b"xxx")
    454         del f
    455         support.gc_collect()
    456         self.assertEqual(record, [1, 2, 3])
    457         with self.open(support.TESTFN, "rb") as f:
    458             self.assertEqual(f.read(), b"xxx")
    459 
    460     def _check_base_destructor(self, base):
    461         record = []
    462         class MyIO(base):
    463             def __init__(self):
    464                 # This exercises the availability of attributes on object
    465                 # destruction.
    466                 # (in the C version, close() is called by the tp_dealloc
    467                 # function, not by __del__)
    468                 self.on_del = 1
    469                 self.on_close = 2
    470                 self.on_flush = 3
    471             def __del__(self):
    472                 record.append(self.on_del)
    473                 try:
    474                     f = super(MyIO, self).__del__
    475                 except AttributeError:
    476                     pass
    477                 else:
    478                     f()
    479             def close(self):
    480                 record.append(self.on_close)
    481                 super(MyIO, self).close()
    482             def flush(self):
    483                 record.append(self.on_flush)
    484                 super(MyIO, self).flush()
    485         f = MyIO()
    486         del f
    487         support.gc_collect()
    488         self.assertEqual(record, [1, 2, 3])
    489 
    490     def test_IOBase_destructor(self):
    491         self._check_base_destructor(self.IOBase)
    492 
    493     def test_RawIOBase_destructor(self):
    494         self._check_base_destructor(self.RawIOBase)
    495 
    496     def test_BufferedIOBase_destructor(self):
    497         self._check_base_destructor(self.BufferedIOBase)
    498 
    499     def test_TextIOBase_destructor(self):
    500         self._check_base_destructor(self.TextIOBase)
    501 
    502     def test_close_flushes(self):
    503         with self.open(support.TESTFN, "wb") as f:
    504             f.write(b"xxx")
    505         with self.open(support.TESTFN, "rb") as f:
    506             self.assertEqual(f.read(), b"xxx")
    507 
    508     def test_array_writes(self):
    509         a = array.array(b'i', range(10))
    510         n = len(a.tostring())
    511         with self.open(support.TESTFN, "wb", 0) as f:
    512             self.assertEqual(f.write(a), n)
    513         with self.open(support.TESTFN, "wb") as f:
    514             self.assertEqual(f.write(a), n)
    515 
    516     def test_closefd(self):
    517         self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
    518                           closefd=False)
    519 
    520     def test_read_closed(self):
    521         with self.open(support.TESTFN, "w") as f:
    522             f.write("egg\n")
    523         with self.open(support.TESTFN, "r") as f:
    524             file = self.open(f.fileno(), "r", closefd=False)
    525             self.assertEqual(file.read(), "egg\n")
    526             file.seek(0)
    527             file.close()
    528             self.assertRaises(ValueError, file.read)
    529 
    530     def test_no_closefd_with_filename(self):
    531         # can't use closefd in combination with a file name
    532         self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
    533 
    534     def test_closefd_attr(self):
    535         with self.open(support.TESTFN, "wb") as f:
    536             f.write(b"egg\n")
    537         with self.open(support.TESTFN, "r") as f:
    538             self.assertEqual(f.buffer.raw.closefd, True)
    539             file = self.open(f.fileno(), "r", closefd=False)
    540             self.assertEqual(file.buffer.raw.closefd, False)
    541 
    542     def test_garbage_collection(self):
    543         # FileIO objects are collected, and collecting them flushes
    544         # all data to disk.
    545         f = self.FileIO(support.TESTFN, "wb")
    546         f.write(b"abcxxx")
    547         f.f = f
    548         wr = weakref.ref(f)
    549         del f
    550         support.gc_collect()
    551         self.assertTrue(wr() is None, wr)
    552         with self.open(support.TESTFN, "rb") as f:
    553             self.assertEqual(f.read(), b"abcxxx")
    554 
    555     def test_unbounded_file(self):
    556         # Issue #1174606: reading from an unbounded stream such as /dev/zero.
    557         zero = "/dev/zero"
    558         if not os.path.exists(zero):
    559             self.skipTest("{0} does not exist".format(zero))
    560         if sys.maxsize > 0x7FFFFFFF:
    561             self.skipTest("test can only run in a 32-bit address space")
    562         if support.real_max_memuse < support._2G:
    563             self.skipTest("test requires at least 2GB of memory")
    564         with self.open(zero, "rb", buffering=0) as f:
    565             self.assertRaises(OverflowError, f.read)
    566         with self.open(zero, "rb") as f:
    567             self.assertRaises(OverflowError, f.read)
    568         with self.open(zero, "r") as f:
    569             self.assertRaises(OverflowError, f.read)
    570 
    571     def test_flush_error_on_close(self):
    572         f = self.open(support.TESTFN, "wb", buffering=0)
    573         def bad_flush():
    574             raise IOError()
    575         f.flush = bad_flush
    576         self.assertRaises(IOError, f.close) # exception not swallowed
    577         self.assertTrue(f.closed)
    578 
    579     def test_multi_close(self):
    580         f = self.open(support.TESTFN, "wb", buffering=0)
    581         f.close()
    582         f.close()
    583         f.close()
    584         self.assertRaises(ValueError, f.flush)
    585 
    586     def test_RawIOBase_read(self):
    587         # Exercise the default RawIOBase.read() implementation (which calls
    588         # readinto() internally).
    589         rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
    590         self.assertEqual(rawio.read(2), b"ab")
    591         self.assertEqual(rawio.read(2), b"c")
    592         self.assertEqual(rawio.read(2), b"d")
    593         self.assertEqual(rawio.read(2), None)
    594         self.assertEqual(rawio.read(2), b"ef")
    595         self.assertEqual(rawio.read(2), b"g")
    596         self.assertEqual(rawio.read(2), None)
    597         self.assertEqual(rawio.read(2), b"")
    598 
    599     def test_fileio_closefd(self):
    600         # Issue #4841
    601         with self.open(__file__, 'rb') as f1, \
    602              self.open(__file__, 'rb') as f2:
    603             fileio = self.FileIO(f1.fileno(), closefd=False)
    604             # .__init__() must not close f1
    605             fileio.__init__(f2.fileno(), closefd=False)
    606             f1.readline()
    607             # .close() must not close f2
    608             fileio.close()
    609             f2.readline()
    610 
    611 
    612 class CIOTest(IOTest):
    613 
    614     def test_IOBase_finalize(self):
    615         # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
    616         # class which inherits IOBase and an object of this class are caught
    617         # in a reference cycle and close() is already in the method cache.
    618         class MyIO(self.IOBase):
    619             def close(self):
    620                 pass
    621 
    622         # create an instance to populate the method cache
    623         MyIO()
    624         obj = MyIO()
    625         obj.obj = obj
    626         wr = weakref.ref(obj)
    627         del MyIO
    628         del obj
    629         support.gc_collect()
    630         self.assertTrue(wr() is None, wr)
    631 
    632 class PyIOTest(IOTest):
    633     test_array_writes = unittest.skip(
    634         "len(array.array) returns number of elements rather than bytelength"
    635     )(IOTest.test_array_writes)
    636 
    637 
    638 class CommonBufferedTests:
    639     # Tests common to BufferedReader, BufferedWriter and BufferedRandom
    640 
    641     def test_detach(self):
    642         raw = self.MockRawIO()
    643         buf = self.tp(raw)
    644         self.assertIs(buf.detach(), raw)
    645         self.assertRaises(ValueError, buf.detach)
    646 
    647     def test_fileno(self):
    648         rawio = self.MockRawIO()
    649         bufio = self.tp(rawio)
    650 
    651         self.assertEqual(42, bufio.fileno())
    652 
    653     def test_no_fileno(self):
    654         # XXX will we always have fileno() function? If so, kill
    655         # this test. Else, write it.
    656         pass
    657 
    658     def test_invalid_args(self):
    659         rawio = self.MockRawIO()
    660         bufio = self.tp(rawio)
    661         # Invalid whence
    662         self.assertRaises(ValueError, bufio.seek, 0, -1)
    663         self.assertRaises(ValueError, bufio.seek, 0, 3)
    664 
    665     def test_override_destructor(self):
    666         tp = self.tp
    667         record = []
    668         class MyBufferedIO(tp):
    669             def __del__(self):
    670                 record.append(1)
    671                 try:
    672                     f = super(MyBufferedIO, self).__del__
    673                 except AttributeError:
    674                     pass
    675                 else:
    676                     f()
    677             def close(self):
    678                 record.append(2)
    679                 super(MyBufferedIO, self).close()
    680             def flush(self):
    681                 record.append(3)
    682                 super(MyBufferedIO, self).flush()
    683         rawio = self.MockRawIO()
    684         bufio = MyBufferedIO(rawio)
    685         writable = bufio.writable()
    686         del bufio
    687         support.gc_collect()
    688         if writable:
    689             self.assertEqual(record, [1, 2, 3])
    690         else:
    691             self.assertEqual(record, [1, 2])
    692 
    693     def test_context_manager(self):
    694         # Test usability as a context manager
    695         rawio = self.MockRawIO()
    696         bufio = self.tp(rawio)
    697         def _with():
    698             with bufio:
    699                 pass
    700         _with()
    701         # bufio should now be closed, and using it a second time should raise
    702         # a ValueError.
    703         self.assertRaises(ValueError, _with)
    704 
    705     def test_error_through_destructor(self):
    706         # Test that the exception state is not modified by a destructor,
    707         # even if close() fails.
    708         rawio = self.CloseFailureIO()
    709         def f():
    710             self.tp(rawio).xyzzy
    711         with support.captured_output("stderr") as s:
    712             self.assertRaises(AttributeError, f)
    713         s = s.getvalue().strip()
    714         if s:
    715             # The destructor *may* have printed an unraisable error, check it
    716             self.assertEqual(len(s.splitlines()), 1)
    717             self.assertTrue(s.startswith("Exception IOError: "), s)
    718             self.assertTrue(s.endswith(" ignored"), s)
    719 
    720     def test_repr(self):
    721         raw = self.MockRawIO()
    722         b = self.tp(raw)
    723         clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
    724         self.assertEqual(repr(b), "<%s>" % clsname)
    725         raw.name = "dummy"
    726         self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
    727         raw.name = b"dummy"
    728         self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
    729 
    730     def test_flush_error_on_close(self):
    731         raw = self.MockRawIO()
    732         def bad_flush():
    733             raise IOError()
    734         raw.flush = bad_flush
    735         b = self.tp(raw)
    736         self.assertRaises(IOError, b.close) # exception not swallowed
    737         self.assertTrue(b.closed)
    738 
    739     def test_close_error_on_close(self):
    740         raw = self.MockRawIO()
    741         def bad_flush():
    742             raise IOError('flush')
    743         def bad_close():
    744             raise IOError('close')
    745         raw.close = bad_close
    746         b = self.tp(raw)
    747         b.flush = bad_flush
    748         with self.assertRaises(IOError) as err: # exception not swallowed
    749             b.close()
    750         self.assertEqual(err.exception.args, ('close',))
    751         self.assertFalse(b.closed)
    752 
    753     def test_multi_close(self):
    754         raw = self.MockRawIO()
    755         b = self.tp(raw)
    756         b.close()
    757         b.close()
    758         b.close()
    759         self.assertRaises(ValueError, b.flush)
    760 
    761     def test_readonly_attributes(self):
    762         raw = self.MockRawIO()
    763         buf = self.tp(raw)
    764         x = self.MockRawIO()
    765         with self.assertRaises((AttributeError, TypeError)):
    766             buf.raw = x
    767 
    768 
    769 class SizeofTest:
    770 
    771     @support.cpython_only
    772     def test_sizeof(self):
    773         bufsize1 = 4096
    774         bufsize2 = 8192
    775         rawio = self.MockRawIO()
    776         bufio = self.tp(rawio, buffer_size=bufsize1)
    777         size = sys.getsizeof(bufio) - bufsize1
    778         rawio = self.MockRawIO()
    779         bufio = self.tp(rawio, buffer_size=bufsize2)
    780         self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
    781 
    782 
    783 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
    784     read_mode = "rb"
    785 
    786     def test_constructor(self):
    787         rawio = self.MockRawIO([b"abc"])
    788         bufio = self.tp(rawio)
    789         bufio.__init__(rawio)
    790         bufio.__init__(rawio, buffer_size=1024)
    791         bufio.__init__(rawio, buffer_size=16)
    792         self.assertEqual(b"abc", bufio.read())
    793         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
    794         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
    795         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
    796         rawio = self.MockRawIO([b"abc"])
    797         bufio.__init__(rawio)
    798         self.assertEqual(b"abc", bufio.read())
    799 
    800     def test_read(self):
    801         for arg in (None, 7):
    802             rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    803             bufio = self.tp(rawio)
    804             self.assertEqual(b"abcdefg", bufio.read(arg))
    805         # Invalid args
    806         self.assertRaises(ValueError, bufio.read, -2)
    807 
    808     def test_read1(self):
    809         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    810         bufio = self.tp(rawio)
    811         self.assertEqual(b"a", bufio.read(1))
    812         self.assertEqual(b"b", bufio.read1(1))
    813         self.assertEqual(rawio._reads, 1)
    814         self.assertEqual(b"c", bufio.read1(100))
    815         self.assertEqual(rawio._reads, 1)
    816         self.assertEqual(b"d", bufio.read1(100))
    817         self.assertEqual(rawio._reads, 2)
    818         self.assertEqual(b"efg", bufio.read1(100))
    819         self.assertEqual(rawio._reads, 3)
    820         self.assertEqual(b"", bufio.read1(100))
    821         self.assertEqual(rawio._reads, 4)
    822         # Invalid args
    823         self.assertRaises(ValueError, bufio.read1, -1)
    824 
    825     def test_readinto(self):
    826         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    827         bufio = self.tp(rawio)
    828         b = bytearray(2)
    829         self.assertEqual(bufio.readinto(b), 2)
    830         self.assertEqual(b, b"ab")
    831         self.assertEqual(bufio.readinto(b), 2)
    832         self.assertEqual(b, b"cd")
    833         self.assertEqual(bufio.readinto(b), 2)
    834         self.assertEqual(b, b"ef")
    835         self.assertEqual(bufio.readinto(b), 1)
    836         self.assertEqual(b, b"gf")
    837         self.assertEqual(bufio.readinto(b), 0)
    838         self.assertEqual(b, b"gf")
    839 
    840     def test_readlines(self):
    841         def bufio():
    842             rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
    843             return self.tp(rawio)
    844         self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
    845         self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
    846         self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
    847 
    848     def test_buffering(self):
    849         data = b"abcdefghi"
    850         dlen = len(data)
    851 
    852         tests = [
    853             [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
    854             [ 100, [ 3, 3, 3],     [ dlen ]    ],
    855             [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
    856         ]
    857 
    858         for bufsize, buf_read_sizes, raw_read_sizes in tests:
    859             rawio = self.MockFileIO(data)
    860             bufio = self.tp(rawio, buffer_size=bufsize)
    861             pos = 0
    862             for nbytes in buf_read_sizes:
    863                 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
    864                 pos += nbytes
    865             # this is mildly implementation-dependent
    866             self.assertEqual(rawio.read_history, raw_read_sizes)
    867 
    868     def test_read_non_blocking(self):
    869         # Inject some None's in there to simulate EWOULDBLOCK
    870         rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
    871         bufio = self.tp(rawio)
    872         self.assertEqual(b"abcd", bufio.read(6))
    873         self.assertEqual(b"e", bufio.read(1))
    874         self.assertEqual(b"fg", bufio.read())
    875         self.assertEqual(b"", bufio.peek(1))
    876         self.assertIsNone(bufio.read())
    877         self.assertEqual(b"", bufio.read())
    878 
    879         rawio = self.MockRawIO((b"a", None, None))
    880         self.assertEqual(b"a", rawio.readall())
    881         self.assertIsNone(rawio.readall())
    882 
    883     def test_read_past_eof(self):
    884         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    885         bufio = self.tp(rawio)
    886 
    887         self.assertEqual(b"abcdefg", bufio.read(9000))
    888 
    889     def test_read_all(self):
    890         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    891         bufio = self.tp(rawio)
    892 
    893         self.assertEqual(b"abcdefg", bufio.read())
    894 
    895     @unittest.skipUnless(threading, 'Threading required for this test.')
    896     @support.requires_resource('cpu')
    897     def test_threads(self):
    898         try:
    899             # Write out many bytes with exactly the same number of 0's,
    900             # 1's... 255's. This will help us check that concurrent reading
    901             # doesn't duplicate or forget contents.
    902             N = 1000
    903             l = list(range(256)) * N
    904             random.shuffle(l)
    905             s = bytes(bytearray(l))
    906             with self.open(support.TESTFN, "wb") as f:
    907                 f.write(s)
    908             with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
    909                 bufio = self.tp(raw, 8)
    910                 errors = []
    911                 results = []
    912                 def f():
    913                     try:
    914                         # Intra-buffer read then buffer-flushing read
    915                         for n in cycle([1, 19]):
    916                             s = bufio.read(n)
    917                             if not s:
    918                                 break
    919                             # list.append() is atomic
    920                             results.append(s)
    921                     except Exception as e:
    922                         errors.append(e)
    923                         raise
    924                 threads = [threading.Thread(target=f) for x in range(20)]
    925                 for t in threads:
    926                     t.start()
    927                 time.sleep(0.02) # yield
    928                 for t in threads:
    929                     t.join()
    930                 self.assertFalse(errors,
    931                     "the following exceptions were caught: %r" % errors)
    932                 s = b''.join(results)
    933                 for i in range(256):
    934                     c = bytes(bytearray([i]))
    935                     self.assertEqual(s.count(c), N)
    936         finally:
    937             support.unlink(support.TESTFN)
    938 
    939     def test_misbehaved_io(self):
    940         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
    941         bufio = self.tp(rawio)
    942         self.assertRaises(IOError, bufio.seek, 0)
    943         self.assertRaises(IOError, bufio.tell)
    944 
    945     def test_no_extraneous_read(self):
    946         # Issue #9550; when the raw IO object has satisfied the read request,
    947         # we should not issue any additional reads, otherwise it may block
    948         # (e.g. socket).
    949         bufsize = 16
    950         for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
    951             rawio = self.MockRawIO([b"x" * n])
    952             bufio = self.tp(rawio, bufsize)
    953             self.assertEqual(bufio.read(n), b"x" * n)
    954             # Simple case: one raw read is enough to satisfy the request.
    955             self.assertEqual(rawio._extraneous_reads, 0,
    956                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
    957             # A more complex case where two raw reads are needed to satisfy
    958             # the request.
    959             rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
    960             bufio = self.tp(rawio, bufsize)
    961             self.assertEqual(bufio.read(n), b"x" * n)
    962             self.assertEqual(rawio._extraneous_reads, 0,
    963                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
    964 
    965 
    966 class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
    967     tp = io.BufferedReader
    968 
    969     def test_constructor(self):
    970         BufferedReaderTest.test_constructor(self)
    971         # The allocation can succeed on 32-bit builds, e.g. with more
    972         # than 2GB RAM and a 64-bit kernel.
    973         if sys.maxsize > 0x7FFFFFFF:
    974             rawio = self.MockRawIO()
    975             bufio = self.tp(rawio)
    976             self.assertRaises((OverflowError, MemoryError, ValueError),
    977                 bufio.__init__, rawio, sys.maxsize)
    978 
    979     def test_initialization(self):
    980         rawio = self.MockRawIO([b"abc"])
    981         bufio = self.tp(rawio)
    982         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
    983         self.assertRaises(ValueError, bufio.read)
    984         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
    985         self.assertRaises(ValueError, bufio.read)
    986         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
    987         self.assertRaises(ValueError, bufio.read)
    988 
    989     def test_misbehaved_io_read(self):
    990         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
    991         bufio = self.tp(rawio)
    992         # _pyio.BufferedReader seems to implement reading different, so that
    993         # checking this is not so easy.
    994         self.assertRaises(IOError, bufio.read, 10)
    995 
    996     def test_garbage_collection(self):
    997         # C BufferedReader objects are collected.
    998         # The Python version has __del__, so it ends into gc.garbage instead
    999         rawio = self.FileIO(support.TESTFN, "w+b")
   1000         f = self.tp(rawio)
   1001         f.f = f
   1002         wr = weakref.ref(f)
   1003         del f
   1004         support.gc_collect()
   1005         self.assertTrue(wr() is None, wr)
   1006 
   1007     def test_args_error(self):
   1008         # Issue #17275
   1009         with self.assertRaisesRegexp(TypeError, "BufferedReader"):
   1010             self.tp(io.BytesIO(), 1024, 1024, 1024)
   1011 
   1012 
   1013 class PyBufferedReaderTest(BufferedReaderTest):
   1014     tp = pyio.BufferedReader
   1015 
   1016 
   1017 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
   1018     write_mode = "wb"
   1019 
   1020     def test_constructor(self):
   1021         rawio = self.MockRawIO()
   1022         bufio = self.tp(rawio)
   1023         bufio.__init__(rawio)
   1024         bufio.__init__(rawio, buffer_size=1024)
   1025         bufio.__init__(rawio, buffer_size=16)
   1026         self.assertEqual(3, bufio.write(b"abc"))
   1027         bufio.flush()
   1028         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
   1029         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
   1030         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
   1031         bufio.__init__(rawio)
   1032         self.assertEqual(3, bufio.write(b"ghi"))
   1033         bufio.flush()
   1034         self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
   1035 
   1036     def test_detach_flush(self):
   1037         raw = self.MockRawIO()
   1038         buf = self.tp(raw)
   1039         buf.write(b"howdy!")
   1040         self.assertFalse(raw._write_stack)
   1041         buf.detach()
   1042         self.assertEqual(raw._write_stack, [b"howdy!"])
   1043 
   1044     def test_write(self):
   1045         # Write to the buffered IO but don't overflow the buffer.
   1046         writer = self.MockRawIO()
   1047         bufio = self.tp(writer, 8)
   1048         bufio.write(b"abc")
   1049         self.assertFalse(writer._write_stack)
   1050 
   1051     def test_write_overflow(self):
   1052         writer = self.MockRawIO()
   1053         bufio = self.tp(writer, 8)
   1054         contents = b"abcdefghijklmnop"
   1055         for n in range(0, len(contents), 3):
   1056             bufio.write(contents[n:n+3])
   1057         flushed = b"".join(writer._write_stack)
   1058         # At least (total - 8) bytes were implicitly flushed, perhaps more
   1059         # depending on the implementation.
   1060         self.assertTrue(flushed.startswith(contents[:-8]), flushed)
   1061 
   1062     def check_writes(self, intermediate_func):
   1063         # Lots of writes, test the flushed output is as expected.
   1064         contents = bytes(range(256)) * 1000
   1065         n = 0
   1066         writer = self.MockRawIO()
   1067         bufio = self.tp(writer, 13)
   1068         # Generator of write sizes: repeat each N 15 times then proceed to N+1
   1069         def gen_sizes():
   1070             for size in count(1):
   1071                 for i in range(15):
   1072                     yield size
   1073         sizes = gen_sizes()
   1074         while n < len(contents):
   1075             size = min(next(sizes), len(contents) - n)
   1076             self.assertEqual(bufio.write(contents[n:n+size]), size)
   1077             intermediate_func(bufio)
   1078             n += size
   1079         bufio.flush()
   1080         self.assertEqual(contents,
   1081             b"".join(writer._write_stack))
   1082 
   1083     def test_writes(self):
   1084         self.check_writes(lambda bufio: None)
   1085 
   1086     def test_writes_and_flushes(self):
   1087         self.check_writes(lambda bufio: bufio.flush())
   1088 
   1089     def test_writes_and_seeks(self):
   1090         def _seekabs(bufio):
   1091             pos = bufio.tell()
   1092             bufio.seek(pos + 1, 0)
   1093             bufio.seek(pos - 1, 0)
   1094             bufio.seek(pos, 0)
   1095         self.check_writes(_seekabs)
   1096         def _seekrel(bufio):
   1097             pos = bufio.seek(0, 1)
   1098             bufio.seek(+1, 1)
   1099             bufio.seek(-1, 1)
   1100             bufio.seek(pos, 0)
   1101         self.check_writes(_seekrel)
   1102 
   1103     def test_writes_and_truncates(self):
   1104         self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
   1105 
   1106     def test_write_non_blocking(self):
   1107         raw = self.MockNonBlockWriterIO()
   1108         bufio = self.tp(raw, 8)
   1109 
   1110         self.assertEqual(bufio.write(b"abcd"), 4)
   1111         self.assertEqual(bufio.write(b"efghi"), 5)
   1112         # 1 byte will be written, the rest will be buffered
   1113         raw.block_on(b"k")
   1114         self.assertEqual(bufio.write(b"jklmn"), 5)
   1115 
   1116         # 8 bytes will be written, 8 will be buffered and the rest will be lost
   1117         raw.block_on(b"0")
   1118         try:
   1119             bufio.write(b"opqrwxyz0123456789")
   1120         except self.BlockingIOError as e:
   1121             written = e.characters_written
   1122         else:
   1123             self.fail("BlockingIOError should have been raised")
   1124         self.assertEqual(written, 16)
   1125         self.assertEqual(raw.pop_written(),
   1126             b"abcdefghijklmnopqrwxyz")
   1127 
   1128         self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
   1129         s = raw.pop_written()
   1130         # Previously buffered bytes were flushed
   1131         self.assertTrue(s.startswith(b"01234567A"), s)
   1132 
   1133     def test_write_and_rewind(self):
   1134         raw = io.BytesIO()
   1135         bufio = self.tp(raw, 4)
   1136         self.assertEqual(bufio.write(b"abcdef"), 6)
   1137         self.assertEqual(bufio.tell(), 6)
   1138         bufio.seek(0, 0)
   1139         self.assertEqual(bufio.write(b"XY"), 2)
   1140         bufio.seek(6, 0)
   1141         self.assertEqual(raw.getvalue(), b"XYcdef")
   1142         self.assertEqual(bufio.write(b"123456"), 6)
   1143         bufio.flush()
   1144         self.assertEqual(raw.getvalue(), b"XYcdef123456")
   1145 
   1146     def test_flush(self):
   1147         writer = self.MockRawIO()
   1148         bufio = self.tp(writer, 8)
   1149         bufio.write(b"abc")
   1150         bufio.flush()
   1151         self.assertEqual(b"abc", writer._write_stack[0])
   1152 
   1153     def test_writelines(self):
   1154         l = [b'ab', b'cd', b'ef']
   1155         writer = self.MockRawIO()
   1156         bufio = self.tp(writer, 8)
   1157         bufio.writelines(l)
   1158         bufio.flush()
   1159         self.assertEqual(b''.join(writer._write_stack), b'abcdef')
   1160 
   1161     def test_writelines_userlist(self):
   1162         l = UserList([b'ab', b'cd', b'ef'])
   1163         writer = self.MockRawIO()
   1164         bufio = self.tp(writer, 8)
   1165         bufio.writelines(l)
   1166         bufio.flush()
   1167         self.assertEqual(b''.join(writer._write_stack), b'abcdef')
   1168 
   1169     def test_writelines_error(self):
   1170         writer = self.MockRawIO()
   1171         bufio = self.tp(writer, 8)
   1172         self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
   1173         self.assertRaises(TypeError, bufio.writelines, None)
   1174 
   1175     def test_destructor(self):
   1176         writer = self.MockRawIO()
   1177         bufio = self.tp(writer, 8)
   1178         bufio.write(b"abc")
   1179         del bufio
   1180         support.gc_collect()
   1181         self.assertEqual(b"abc", writer._write_stack[0])
   1182 
   1183     def test_truncate(self):
   1184         # Truncate implicitly flushes the buffer.
   1185         with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
   1186             bufio = self.tp(raw, 8)
   1187             bufio.write(b"abcdef")
   1188             self.assertEqual(bufio.truncate(3), 3)
   1189             self.assertEqual(bufio.tell(), 6)
   1190         with self.open(support.TESTFN, "rb", buffering=0) as f:
   1191             self.assertEqual(f.read(), b"abc")
   1192 
   1193     @unittest.skipUnless(threading, 'Threading required for this test.')
   1194     @support.requires_resource('cpu')
   1195     def test_threads(self):
   1196         try:
   1197             # Write out many bytes from many threads and test they were
   1198             # all flushed.
   1199             N = 1000
   1200             contents = bytes(range(256)) * N
   1201             sizes = cycle([1, 19])
   1202             n = 0
   1203             queue = deque()
   1204             while n < len(contents):
   1205                 size = next(sizes)
   1206                 queue.append(contents[n:n+size])
   1207                 n += size
   1208             del contents
   1209             # We use a real file object because it allows us to
   1210             # exercise situations where the GIL is released before
   1211             # writing the buffer to the raw streams. This is in addition
   1212             # to concurrency issues due to switching threads in the middle
   1213             # of Python code.
   1214             with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
   1215                 bufio = self.tp(raw, 8)
   1216                 errors = []
   1217                 def f():
   1218                     try:
   1219                         while True:
   1220                             try:
   1221                                 s = queue.popleft()
   1222                             except IndexError:
   1223                                 return
   1224                             bufio.write(s)
   1225                     except Exception as e:
   1226                         errors.append(e)
   1227                         raise
   1228                 threads = [threading.Thread(target=f) for x in range(20)]
   1229                 for t in threads:
   1230                     t.start()
   1231                 time.sleep(0.02) # yield
   1232                 for t in threads:
   1233                     t.join()
   1234                 self.assertFalse(errors,
   1235                     "the following exceptions were caught: %r" % errors)
   1236                 bufio.close()
   1237             with self.open(support.TESTFN, "rb") as f:
   1238                 s = f.read()
   1239             for i in range(256):
   1240                 self.assertEqual(s.count(bytes([i])), N)
   1241         finally:
   1242             support.unlink(support.TESTFN)
   1243 
   1244     def test_misbehaved_io(self):
   1245         rawio = self.MisbehavedRawIO()
   1246         bufio = self.tp(rawio, 5)
   1247         self.assertRaises(IOError, bufio.seek, 0)
   1248         self.assertRaises(IOError, bufio.tell)
   1249         self.assertRaises(IOError, bufio.write, b"abcdef")
   1250 
   1251     def test_max_buffer_size_deprecation(self):
   1252         with support.check_warnings(("max_buffer_size is deprecated",
   1253                                      DeprecationWarning)):
   1254             self.tp(self.MockRawIO(), 8, 12)
   1255 
   1256     def test_write_error_on_close(self):
   1257         raw = self.MockRawIO()
   1258         def bad_write(b):
   1259             raise IOError()
   1260         raw.write = bad_write
   1261         b = self.tp(raw)
   1262         b.write(b'spam')
   1263         self.assertRaises(IOError, b.close) # exception not swallowed
   1264         self.assertTrue(b.closed)
   1265 
   1266 
   1267 class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
   1268     tp = io.BufferedWriter
   1269 
   1270     def test_constructor(self):
   1271         BufferedWriterTest.test_constructor(self)
   1272         # The allocation can succeed on 32-bit builds, e.g. with more
   1273         # than 2GB RAM and a 64-bit kernel.
   1274         if sys.maxsize > 0x7FFFFFFF:
   1275             rawio = self.MockRawIO()
   1276             bufio = self.tp(rawio)
   1277             self.assertRaises((OverflowError, MemoryError, ValueError),
   1278                 bufio.__init__, rawio, sys.maxsize)
   1279 
   1280     def test_initialization(self):
   1281         rawio = self.MockRawIO()
   1282         bufio = self.tp(rawio)
   1283         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
   1284         self.assertRaises(ValueError, bufio.write, b"def")
   1285         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
   1286         self.assertRaises(ValueError, bufio.write, b"def")
   1287         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
   1288         self.assertRaises(ValueError, bufio.write, b"def")
   1289 
   1290     def test_garbage_collection(self):
   1291         # C BufferedWriter objects are collected, and collecting them flushes
   1292         # all data to disk.
   1293         # The Python version has __del__, so it ends into gc.garbage instead
   1294         rawio = self.FileIO(support.TESTFN, "w+b")
   1295         f = self.tp(rawio)
   1296         f.write(b"123xxx")
   1297         f.x = f
   1298         wr = weakref.ref(f)
   1299         del f
   1300         support.gc_collect()
   1301         self.assertTrue(wr() is None, wr)
   1302         with self.open(support.TESTFN, "rb") as f:
   1303             self.assertEqual(f.read(), b"123xxx")
   1304 
   1305     def test_args_error(self):
   1306         # Issue #17275
   1307         with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
   1308             self.tp(io.BytesIO(), 1024, 1024, 1024)
   1309 
   1310 
   1311 class PyBufferedWriterTest(BufferedWriterTest):
   1312     tp = pyio.BufferedWriter
   1313 
   1314 class BufferedRWPairTest(unittest.TestCase):
   1315 
   1316     def test_constructor(self):
   1317         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1318         self.assertFalse(pair.closed)
   1319 
   1320     def test_detach(self):
   1321         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1322         self.assertRaises(self.UnsupportedOperation, pair.detach)
   1323 
   1324     def test_constructor_max_buffer_size_deprecation(self):
   1325         with support.check_warnings(("max_buffer_size is deprecated",
   1326                                      DeprecationWarning)):
   1327             self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
   1328 
   1329     def test_constructor_with_not_readable(self):
   1330         class NotReadable(MockRawIO):
   1331             def readable(self):
   1332                 return False
   1333 
   1334         self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
   1335 
   1336     def test_constructor_with_not_writeable(self):
   1337         class NotWriteable(MockRawIO):
   1338             def writable(self):
   1339                 return False
   1340 
   1341         self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
   1342 
   1343     def test_read(self):
   1344         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1345 
   1346         self.assertEqual(pair.read(3), b"abc")
   1347         self.assertEqual(pair.read(1), b"d")
   1348         self.assertEqual(pair.read(), b"ef")
   1349         pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
   1350         self.assertEqual(pair.read(None), b"abc")
   1351 
   1352     def test_readlines(self):
   1353         pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
   1354         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
   1355         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
   1356         self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
   1357 
   1358     def test_read1(self):
   1359         # .read1() is delegated to the underlying reader object, so this test
   1360         # can be shallow.
   1361         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1362 
   1363         self.assertEqual(pair.read1(3), b"abc")
   1364 
   1365     def test_readinto(self):
   1366         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1367 
   1368         data = bytearray(5)
   1369         self.assertEqual(pair.readinto(data), 5)
   1370         self.assertEqual(data, b"abcde")
   1371 
   1372     def test_write(self):
   1373         w = self.MockRawIO()
   1374         pair = self.tp(self.MockRawIO(), w)
   1375 
   1376         pair.write(b"abc")
   1377         pair.flush()
   1378         pair.write(b"def")
   1379         pair.flush()
   1380         self.assertEqual(w._write_stack, [b"abc", b"def"])
   1381 
   1382     def test_peek(self):
   1383         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1384 
   1385         self.assertTrue(pair.peek(3).startswith(b"abc"))
   1386         self.assertEqual(pair.read(3), b"abc")
   1387 
   1388     def test_readable(self):
   1389         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1390         self.assertTrue(pair.readable())
   1391 
   1392     def test_writeable(self):
   1393         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1394         self.assertTrue(pair.writable())
   1395 
   1396     def test_seekable(self):
   1397         # BufferedRWPairs are never seekable, even if their readers and writers
   1398         # are.
   1399         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1400         self.assertFalse(pair.seekable())
   1401 
   1402     # .flush() is delegated to the underlying writer object and has been
   1403     # tested in the test_write method.
   1404 
   1405     def test_close_and_closed(self):
   1406         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1407         self.assertFalse(pair.closed)
   1408         pair.close()
   1409         self.assertTrue(pair.closed)
   1410 
   1411     def test_isatty(self):
   1412         class SelectableIsAtty(MockRawIO):
   1413             def __init__(self, isatty):
   1414                 MockRawIO.__init__(self)
   1415                 self._isatty = isatty
   1416 
   1417             def isatty(self):
   1418                 return self._isatty
   1419 
   1420         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
   1421         self.assertFalse(pair.isatty())
   1422 
   1423         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
   1424         self.assertTrue(pair.isatty())
   1425 
   1426         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
   1427         self.assertTrue(pair.isatty())
   1428 
   1429         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
   1430         self.assertTrue(pair.isatty())
   1431 
   1432 class CBufferedRWPairTest(BufferedRWPairTest):
   1433     tp = io.BufferedRWPair
   1434 
   1435 class PyBufferedRWPairTest(BufferedRWPairTest):
   1436     tp = pyio.BufferedRWPair
   1437 
   1438 
   1439 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
   1440     read_mode = "rb+"
   1441     write_mode = "wb+"
   1442 
   1443     def test_constructor(self):
   1444         BufferedReaderTest.test_constructor(self)
   1445         BufferedWriterTest.test_constructor(self)
   1446 
   1447     def test_read_and_write(self):
   1448         raw = self.MockRawIO((b"asdf", b"ghjk"))
   1449         rw = self.tp(raw, 8)
   1450 
   1451         self.assertEqual(b"as", rw.read(2))
   1452         rw.write(b"ddd")
   1453         rw.write(b"eee")
   1454         self.assertFalse(raw._write_stack) # Buffer writes
   1455         self.assertEqual(b"ghjk", rw.read())
   1456         self.assertEqual(b"dddeee", raw._write_stack[0])
   1457 
   1458     def test_seek_and_tell(self):
   1459         raw = self.BytesIO(b"asdfghjkl")
   1460         rw = self.tp(raw)
   1461 
   1462         self.assertEqual(b"as", rw.read(2))
   1463         self.assertEqual(2, rw.tell())
   1464         rw.seek(0, 0)
   1465         self.assertEqual(b"asdf", rw.read(4))
   1466 
   1467         rw.write(b"123f")
   1468         rw.seek(0, 0)
   1469         self.assertEqual(b"asdf123fl", rw.read())
   1470         self.assertEqual(9, rw.tell())
   1471         rw.seek(-4, 2)
   1472         self.assertEqual(5, rw.tell())
   1473         rw.seek(2, 1)
   1474         self.assertEqual(7, rw.tell())
   1475         self.assertEqual(b"fl", rw.read(11))
   1476         rw.flush()
   1477         self.assertEqual(b"asdf123fl", raw.getvalue())
   1478 
   1479         self.assertRaises(TypeError, rw.seek, 0.0)
   1480 
   1481     def check_flush_and_read(self, read_func):
   1482         raw = self.BytesIO(b"abcdefghi")
   1483         bufio = self.tp(raw)
   1484 
   1485         self.assertEqual(b"ab", read_func(bufio, 2))
   1486         bufio.write(b"12")
   1487         self.assertEqual(b"ef", read_func(bufio, 2))
   1488         self.assertEqual(6, bufio.tell())
   1489         bufio.flush()
   1490         self.assertEqual(6, bufio.tell())
   1491         self.assertEqual(b"ghi", read_func(bufio))
   1492         raw.seek(0, 0)
   1493         raw.write(b"XYZ")
   1494         # flush() resets the read buffer
   1495         bufio.flush()
   1496         bufio.seek(0, 0)
   1497         self.assertEqual(b"XYZ", read_func(bufio, 3))
   1498 
   1499     def test_flush_and_read(self):
   1500         self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
   1501 
   1502     def test_flush_and_readinto(self):
   1503         def _readinto(bufio, n=-1):
   1504             b = bytearray(n if n >= 0 else 9999)
   1505             n = bufio.readinto(b)
   1506             return bytes(b[:n])
   1507         self.check_flush_and_read(_readinto)
   1508 
   1509     def test_flush_and_peek(self):
   1510         def _peek(bufio, n=-1):
   1511             # This relies on the fact that the buffer can contain the whole
   1512             # raw stream, otherwise peek() can return less.
   1513             b = bufio.peek(n)
   1514             if n != -1:
   1515                 b = b[:n]
   1516             bufio.seek(len(b), 1)
   1517             return b
   1518         self.check_flush_and_read(_peek)
   1519 
   1520     def test_flush_and_write(self):
   1521         raw = self.BytesIO(b"abcdefghi")
   1522         bufio = self.tp(raw)
   1523 
   1524         bufio.write(b"123")
   1525         bufio.flush()
   1526         bufio.write(b"45")
   1527         bufio.flush()
   1528         bufio.seek(0, 0)
   1529         self.assertEqual(b"12345fghi", raw.getvalue())
   1530         self.assertEqual(b"12345fghi", bufio.read())
   1531 
   1532     def test_threads(self):
   1533         BufferedReaderTest.test_threads(self)
   1534         BufferedWriterTest.test_threads(self)
   1535 
   1536     def test_writes_and_peek(self):
   1537         def _peek(bufio):
   1538             bufio.peek(1)
   1539         self.check_writes(_peek)
   1540         def _peek(bufio):
   1541             pos = bufio.tell()
   1542             bufio.seek(-1, 1)
   1543             bufio.peek(1)
   1544             bufio.seek(pos, 0)
   1545         self.check_writes(_peek)
   1546 
   1547     def test_writes_and_reads(self):
   1548         def _read(bufio):
   1549             bufio.seek(-1, 1)
   1550             bufio.read(1)
   1551         self.check_writes(_read)
   1552 
   1553     def test_writes_and_read1s(self):
   1554         def _read1(bufio):
   1555             bufio.seek(-1, 1)
   1556             bufio.read1(1)
   1557         self.check_writes(_read1)
   1558 
   1559     def test_writes_and_readintos(self):
   1560         def _read(bufio):
   1561             bufio.seek(-1, 1)
   1562             bufio.readinto(bytearray(1))
   1563         self.check_writes(_read)
   1564 
   1565     def test_write_after_readahead(self):
   1566         # Issue #6629: writing after the buffer was filled by readahead should
   1567         # first rewind the raw stream.
   1568         for overwrite_size in [1, 5]:
   1569             raw = self.BytesIO(b"A" * 10)
   1570             bufio = self.tp(raw, 4)
   1571             # Trigger readahead
   1572             self.assertEqual(bufio.read(1), b"A")
   1573             self.assertEqual(bufio.tell(), 1)
   1574             # Overwriting should rewind the raw stream if it needs so
   1575             bufio.write(b"B" * overwrite_size)
   1576             self.assertEqual(bufio.tell(), overwrite_size + 1)
   1577             # If the write size was smaller than the buffer size, flush() and
   1578             # check that rewind happens.
   1579             bufio.flush()
   1580             self.assertEqual(bufio.tell(), overwrite_size + 1)
   1581             s = raw.getvalue()
   1582             self.assertEqual(s,
   1583                 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
   1584 
   1585     def test_write_rewind_write(self):
   1586         # Various combinations of reading / writing / seeking backwards / writing again
   1587         def mutate(bufio, pos1, pos2):
   1588             assert pos2 >= pos1
   1589             # Fill the buffer
   1590             bufio.seek(pos1)
   1591             bufio.read(pos2 - pos1)
   1592             bufio.write(b'\x02')
   1593             # This writes earlier than the previous write, but still inside
   1594             # the buffer.
   1595             bufio.seek(pos1)
   1596             bufio.write(b'\x01')
   1597 
   1598         b = b"\x80\x81\x82\x83\x84"
   1599         for i in range(0, len(b)):
   1600             for j in range(i, len(b)):
   1601                 raw = self.BytesIO(b)
   1602                 bufio = self.tp(raw, 100)
   1603                 mutate(bufio, i, j)
   1604                 bufio.flush()
   1605                 expected = bytearray(b)
   1606                 expected[j] = 2
   1607                 expected[i] = 1
   1608                 self.assertEqual(raw.getvalue(), expected,
   1609                                  "failed result for i=%d, j=%d" % (i, j))
   1610 
   1611     def test_truncate_after_read_or_write(self):
   1612         raw = self.BytesIO(b"A" * 10)
   1613         bufio = self.tp(raw, 100)
   1614         self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
   1615         self.assertEqual(bufio.truncate(), 2)
   1616         self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
   1617         self.assertEqual(bufio.truncate(), 4)
   1618 
   1619     def test_misbehaved_io(self):
   1620         BufferedReaderTest.test_misbehaved_io(self)
   1621         BufferedWriterTest.test_misbehaved_io(self)
   1622 
   1623     def test_interleaved_read_write(self):
   1624         # Test for issue #12213
   1625         with self.BytesIO(b'abcdefgh') as raw:
   1626             with self.tp(raw, 100) as f:
   1627                 f.write(b"1")
   1628                 self.assertEqual(f.read(1), b'b')
   1629                 f.write(b'2')
   1630                 self.assertEqual(f.read1(1), b'd')
   1631                 f.write(b'3')
   1632                 buf = bytearray(1)
   1633                 f.readinto(buf)
   1634                 self.assertEqual(buf, b'f')
   1635                 f.write(b'4')
   1636                 self.assertEqual(f.peek(1), b'h')
   1637                 f.flush()
   1638                 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
   1639 
   1640         with self.BytesIO(b'abc') as raw:
   1641             with self.tp(raw, 100) as f:
   1642                 self.assertEqual(f.read(1), b'a')
   1643                 f.write(b"2")
   1644                 self.assertEqual(f.read(1), b'c')
   1645                 f.flush()
   1646                 self.assertEqual(raw.getvalue(), b'a2c')
   1647 
   1648     def test_interleaved_readline_write(self):
   1649         with self.BytesIO(b'ab\ncdef\ng\n') as raw:
   1650             with self.tp(raw) as f:
   1651                 f.write(b'1')
   1652                 self.assertEqual(f.readline(), b'b\n')
   1653                 f.write(b'2')
   1654                 self.assertEqual(f.readline(), b'def\n')
   1655                 f.write(b'3')
   1656                 self.assertEqual(f.readline(), b'\n')
   1657                 f.flush()
   1658                 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
   1659 
   1660 
   1661 class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
   1662                           BufferedRandomTest, SizeofTest):
   1663     tp = io.BufferedRandom
   1664 
   1665     def test_constructor(self):
   1666         BufferedRandomTest.test_constructor(self)
   1667         # The allocation can succeed on 32-bit builds, e.g. with more
   1668         # than 2GB RAM and a 64-bit kernel.
   1669         if sys.maxsize > 0x7FFFFFFF:
   1670             rawio = self.MockRawIO()
   1671             bufio = self.tp(rawio)
   1672             self.assertRaises((OverflowError, MemoryError, ValueError),
   1673                 bufio.__init__, rawio, sys.maxsize)
   1674 
   1675     def test_garbage_collection(self):
   1676         CBufferedReaderTest.test_garbage_collection(self)
   1677         CBufferedWriterTest.test_garbage_collection(self)
   1678 
   1679     def test_args_error(self):
   1680         # Issue #17275
   1681         with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
   1682             self.tp(io.BytesIO(), 1024, 1024, 1024)
   1683 
   1684 
   1685 class PyBufferedRandomTest(BufferedRandomTest):
   1686     tp = pyio.BufferedRandom
   1687 
   1688 
   1689 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
   1690 # properties:
   1691 #   - A single output character can correspond to many bytes of input.
   1692 #   - The number of input bytes to complete the character can be
   1693 #     undetermined until the last input byte is received.
   1694 #   - The number of input bytes can vary depending on previous input.
   1695 #   - A single input byte can correspond to many characters of output.
   1696 #   - The number of output characters can be undetermined until the
   1697 #     last input byte is received.
   1698 #   - The number of output characters can vary depending on previous input.
   1699 
   1700 class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
   1701     """
   1702     For testing seek/tell behavior with a stateful, buffering decoder.
   1703 
   1704     Input is a sequence of words.  Words may be fixed-length (length set
   1705     by input) or variable-length (period-terminated).  In variable-length
   1706     mode, extra periods are ignored.  Possible words are:
   1707       - 'i' followed by a number sets the input length, I (maximum 99).
   1708         When I is set to 0, words are space-terminated.
   1709       - 'o' followed by a number sets the output length, O (maximum 99).
   1710       - Any other word is converted into a word followed by a period on
   1711         the output.  The output word consists of the input word truncated
   1712         or padded out with hyphens to make its length equal to O.  If O
   1713         is 0, the word is output verbatim without truncating or padding.
   1714     I and O are initially set to 1.  When I changes, any buffered input is
   1715     re-scanned according to the new I.  EOF also terminates the last word.
   1716     """
   1717 
   1718     def __init__(self, errors='strict'):
   1719         codecs.IncrementalDecoder.__init__(self, errors)
   1720         self.reset()
   1721 
   1722     def __repr__(self):
   1723         return '<SID %x>' % id(self)
   1724 
   1725     def reset(self):
   1726         self.i = 1
   1727         self.o = 1
   1728         self.buffer = bytearray()
   1729 
   1730     def getstate(self):
   1731         i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
   1732         return bytes(self.buffer), i*100 + o
   1733 
   1734     def setstate(self, state):
   1735         buffer, io = state
   1736         self.buffer = bytearray(buffer)
   1737         i, o = divmod(io, 100)
   1738         self.i, self.o = i ^ 1, o ^ 1
   1739 
   1740     def decode(self, input, final=False):
   1741         output = ''
   1742         for b in input:
   1743             if self.i == 0: # variable-length, terminated with period
   1744                 if b == '.':
   1745                     if self.buffer:
   1746                         output += self.process_word()
   1747                 else:
   1748                     self.buffer.append(b)
   1749             else: # fixed-length, terminate after self.i bytes
   1750                 self.buffer.append(b)
   1751                 if len(self.buffer) == self.i:
   1752                     output += self.process_word()
   1753         if final and self.buffer: # EOF terminates the last word
   1754             output += self.process_word()
   1755         return output
   1756 
   1757     def process_word(self):
   1758         output = ''
   1759         if self.buffer[0] == ord('i'):
   1760             self.i = min(99, int(self.buffer[1:] or 0)) # set input length
   1761         elif self.buffer[0] == ord('o'):
   1762             self.o = min(99, int(self.buffer[1:] or 0)) # set output length
   1763         else:
   1764             output = self.buffer.decode('ascii')
   1765             if len(output) < self.o:
   1766                 output += '-'*self.o # pad out with hyphens
   1767             if self.o:
   1768                 output = output[:self.o] # truncate to output length
   1769             output += '.'
   1770         self.buffer = bytearray()
   1771         return output
   1772 
   1773     codecEnabled = False
   1774 
   1775     @classmethod
   1776     def lookupTestDecoder(cls, name):
   1777         if cls.codecEnabled and name == 'test_decoder':
   1778             latin1 = codecs.lookup('latin-1')
   1779             return codecs.CodecInfo(
   1780                 name='test_decoder', encode=latin1.encode, decode=None,
   1781                 incrementalencoder=None,
   1782                 streamreader=None, streamwriter=None,
   1783                 incrementaldecoder=cls)
   1784 
   1785 # Register the previous decoder for testing.
   1786 # Disabled by default, tests will enable it.
   1787 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
   1788 
   1789 
   1790 class StatefulIncrementalDecoderTest(unittest.TestCase):
   1791     """
   1792     Make sure the StatefulIncrementalDecoder actually works.
   1793     """
   1794 
   1795     test_cases = [
   1796         # I=1, O=1 (fixed-length input == fixed-length output)
   1797         (b'abcd', False, 'a.b.c.d.'),
   1798         # I=0, O=0 (variable-length input, variable-length output)
   1799         (b'oiabcd', True, 'abcd.'),
   1800         # I=0, O=0 (should ignore extra periods)
   1801         (b'oi...abcd...', True, 'abcd.'),
   1802         # I=0, O=6 (variable-length input, fixed-length output)
   1803         (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
   1804         # I=2, O=6 (fixed-length input < fixed-length output)
   1805         (b'i.i2.o6xyz', True, 'xy----.z-----.'),
   1806         # I=6, O=3 (fixed-length input > fixed-length output)
   1807         (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
   1808         # I=0, then 3; O=29, then 15 (with longer output)
   1809         (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
   1810          'a----------------------------.' +
   1811          'b----------------------------.' +
   1812          'cde--------------------------.' +
   1813          'abcdefghijabcde.' +
   1814          'a.b------------.' +
   1815          '.c.------------.' +
   1816          'd.e------------.' +
   1817          'k--------------.' +
   1818          'l--------------.' +
   1819          'm--------------.')
   1820     ]
   1821 
   1822     def test_decoder(self):
   1823         # Try a few one-shot test cases.
   1824         for input, eof, output in self.test_cases:
   1825             d = StatefulIncrementalDecoder()
   1826             self.assertEqual(d.decode(input, eof), output)
   1827 
   1828         # Also test an unfinished decode, followed by forcing EOF.
   1829         d = StatefulIncrementalDecoder()
   1830         self.assertEqual(d.decode(b'oiabcd'), '')
   1831         self.assertEqual(d.decode(b'', 1), 'abcd.')
   1832 
   1833 class TextIOWrapperTest(unittest.TestCase):
   1834 
   1835     def setUp(self):
   1836         self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
   1837         self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
   1838         support.unlink(support.TESTFN)
   1839 
   1840     def tearDown(self):
   1841         support.unlink(support.TESTFN)
   1842 
   1843     def test_constructor(self):
   1844         r = self.BytesIO(b"\xc3\xa9\n\n")
   1845         b = self.BufferedReader(r, 1000)
   1846         t = self.TextIOWrapper(b)
   1847         t.__init__(b, encoding="latin1", newline="\r\n")
   1848         self.assertEqual(t.encoding, "latin1")
   1849         self.assertEqual(t.line_buffering, False)
   1850         t.__init__(b, encoding="utf8", line_buffering=True)
   1851         self.assertEqual(t.encoding, "utf8")
   1852         self.assertEqual(t.line_buffering, True)
   1853         self.assertEqual("\xe9\n", t.readline())
   1854         self.assertRaises(TypeError, t.__init__, b, newline=42)
   1855         self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
   1856 
   1857     def test_detach(self):
   1858         r = self.BytesIO()
   1859         b = self.BufferedWriter(r)
   1860         t = self.TextIOWrapper(b)
   1861         self.assertIs(t.detach(), b)
   1862 
   1863         t = self.TextIOWrapper(b, encoding="ascii")
   1864         t.write("howdy")
   1865         self.assertFalse(r.getvalue())
   1866         t.detach()
   1867         self.assertEqual(r.getvalue(), b"howdy")
   1868         self.assertRaises(ValueError, t.detach)
   1869 
   1870     def test_repr(self):
   1871         raw = self.BytesIO("hello".encode("utf-8"))
   1872         b = self.BufferedReader(raw)
   1873         t = self.TextIOWrapper(b, encoding="utf-8")
   1874         modname = self.TextIOWrapper.__module__
   1875         self.assertEqual(repr(t),
   1876                          "<%s.TextIOWrapper encoding='utf-8'>" % modname)
   1877         raw.name = "dummy"
   1878         self.assertEqual(repr(t),
   1879                          "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
   1880         raw.name = b"dummy"
   1881         self.assertEqual(repr(t),
   1882                          "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
   1883 
   1884     def test_line_buffering(self):
   1885         r = self.BytesIO()
   1886         b = self.BufferedWriter(r, 1000)
   1887         t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
   1888         t.write("X")
   1889         self.assertEqual(r.getvalue(), b"")  # No flush happened
   1890         t.write("Y\nZ")
   1891         self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
   1892         t.write("A\rB")
   1893         self.assertEqual(r.getvalue(), b"XY\nZA\rB")
   1894 
   1895     def test_encoding(self):
   1896         # Check the encoding attribute is always set, and valid
   1897         b = self.BytesIO()
   1898         t = self.TextIOWrapper(b, encoding="utf8")
   1899         self.assertEqual(t.encoding, "utf8")
   1900         t = self.TextIOWrapper(b)
   1901         self.assertTrue(t.encoding is not None)
   1902         codecs.lookup(t.encoding)
   1903 
   1904     def test_encoding_errors_reading(self):
   1905         # (1) default
   1906         b = self.BytesIO(b"abc\n\xff\n")
   1907         t = self.TextIOWrapper(b, encoding="ascii")
   1908         self.assertRaises(UnicodeError, t.read)
   1909         # (2) explicit strict
   1910         b = self.BytesIO(b"abc\n\xff\n")
   1911         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
   1912         self.assertRaises(UnicodeError, t.read)
   1913         # (3) ignore
   1914         b = self.BytesIO(b"abc\n\xff\n")
   1915         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
   1916         self.assertEqual(t.read(), "abc\n\n")
   1917         # (4) replace
   1918         b = self.BytesIO(b"abc\n\xff\n")
   1919         t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
   1920         self.assertEqual(t.read(), "abc\n\ufffd\n")
   1921 
   1922     def test_encoding_errors_writing(self):
   1923         # (1) default
   1924         b = self.BytesIO()
   1925         t = self.TextIOWrapper(b, encoding="ascii")
   1926         self.assertRaises(UnicodeError, t.write, "\xff")
   1927         # (2) explicit strict
   1928         b = self.BytesIO()
   1929         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
   1930         self.assertRaises(UnicodeError, t.write, "\xff")
   1931         # (3) ignore
   1932         b = self.BytesIO()
   1933         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
   1934                              newline="\n")
   1935         t.write("abc\xffdef\n")
   1936         t.flush()
   1937         self.assertEqual(b.getvalue(), b"abcdef\n")
   1938         # (4) replace
   1939         b = self.BytesIO()
   1940         t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
   1941                              newline="\n")
   1942         t.write("abc\xffdef\n")
   1943         t.flush()
   1944         self.assertEqual(b.getvalue(), b"abc?def\n")
   1945 
   1946     def test_newlines(self):
   1947         input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
   1948 
   1949         tests = [
   1950             [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
   1951             [ '', input_lines ],
   1952             [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
   1953             [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
   1954             [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
   1955         ]
   1956         encodings = (
   1957             'utf-8', 'latin-1',
   1958             'utf-16', 'utf-16-le', 'utf-16-be',
   1959             'utf-32', 'utf-32-le', 'utf-32-be',
   1960         )
   1961 
   1962         # Try a range of buffer sizes to test the case where \r is the last
   1963         # character in TextIOWrapper._pending_line.
   1964         for encoding in encodings:
   1965             # XXX: str.encode() should return bytes
   1966             data = bytes(''.join(input_lines).encode(encoding))
   1967             for do_reads in (False, True):
   1968                 for bufsize in range(1, 10):
   1969                     for newline, exp_lines in tests:
   1970                         bufio = self.BufferedReader(self.BytesIO(data), bufsize)
   1971                         textio = self.TextIOWrapper(bufio, newline=newline,
   1972                                                   encoding=encoding)
   1973                         if do_reads:
   1974                             got_lines = []
   1975                             while True:
   1976                                 c2 = textio.read(2)
   1977                                 if c2 == '':
   1978                                     break
   1979                                 self.assertEqual(len(c2), 2)
   1980                                 got_lines.append(c2 + textio.readline())
   1981                         else:
   1982                             got_lines = list(textio)
   1983 
   1984                         for got_line, exp_line in zip(got_lines, exp_lines):
   1985                             self.assertEqual(got_line, exp_line)
   1986                         self.assertEqual(len(got_lines), len(exp_lines))
   1987 
   1988     def test_newlines_input(self):
   1989         testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
   1990         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
   1991         for newline, expected in [
   1992             (None, normalized.decode("ascii").splitlines(True)),
   1993             ("", testdata.decode("ascii").splitlines(True)),
   1994             ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
   1995             ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
   1996             ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
   1997             ]:
   1998             buf = self.BytesIO(testdata)
   1999             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
   2000             self.assertEqual(txt.readlines(), expected)
   2001             txt.seek(0)
   2002             self.assertEqual(txt.read(), "".join(expected))
   2003 
   2004     def test_newlines_output(self):
   2005         testdict = {
   2006             "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
   2007             "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
   2008             "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
   2009             "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
   2010             }
   2011         tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
   2012         for newline, expected in tests:
   2013             buf = self.BytesIO()
   2014             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
   2015             txt.write("AAA\nB")
   2016             txt.write("BB\nCCC\n")
   2017             txt.write("X\rY\r\nZ")
   2018             txt.flush()
   2019             self.assertEqual(buf.closed, False)
   2020             self.assertEqual(buf.getvalue(), expected)
   2021 
   2022     def test_destructor(self):
   2023         l = []
   2024         base = self.BytesIO
   2025         class MyBytesIO(base):
   2026             def close(self):
   2027                 l.append(self.getvalue())
   2028                 base.close(self)
   2029         b = MyBytesIO()
   2030         t = self.TextIOWrapper(b, encoding="ascii")
   2031         t.write("abc")
   2032         del t
   2033         support.gc_collect()
   2034         self.assertEqual([b"abc"], l)
   2035 
   2036     def test_override_destructor(self):
   2037         record = []
   2038         class MyTextIO(self.TextIOWrapper):
   2039             def __del__(self):
   2040                 record.append(1)
   2041                 try:
   2042                     f = super(MyTextIO, self).__del__
   2043                 except AttributeError:
   2044                     pass
   2045                 else:
   2046                     f()
   2047             def close(self):
   2048                 record.append(2)
   2049                 super(MyTextIO, self).close()
   2050             def flush(self):
   2051                 record.append(3)
   2052                 super(MyTextIO, self).flush()
   2053         b = self.BytesIO()
   2054         t = MyTextIO(b, encoding="ascii")
   2055         del t
   2056         support.gc_collect()
   2057         self.assertEqual(record, [1, 2, 3])
   2058 
   2059     def test_error_through_destructor(self):
   2060         # Test that the exception state is not modified by a destructor,
   2061         # even if close() fails.
   2062         rawio = self.CloseFailureIO()
   2063         def f():
   2064             self.TextIOWrapper(rawio).xyzzy
   2065         with support.captured_output("stderr") as s:
   2066             self.assertRaises(AttributeError, f)
   2067         s = s.getvalue().strip()
   2068         if s:
   2069             # The destructor *may* have printed an unraisable error, check it
   2070             self.assertEqual(len(s.splitlines()), 1)
   2071             self.assertTrue(s.startswith("Exception IOError: "), s)
   2072             self.assertTrue(s.endswith(" ignored"), s)
   2073 
   2074     # Systematic tests of the text I/O API
   2075 
   2076     def test_basic_io(self):
   2077         for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
   2078             for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
   2079                 f = self.open(support.TESTFN, "w+", encoding=enc)
   2080                 f._CHUNK_SIZE = chunksize
   2081                 self.assertEqual(f.write("abc"), 3)
   2082                 f.close()
   2083                 f = self.open(support.TESTFN, "r+", encoding=enc)
   2084                 f._CHUNK_SIZE = chunksize
   2085                 self.assertEqual(f.tell(), 0)
   2086                 self.assertEqual(f.read(), "abc")
   2087                 cookie = f.tell()
   2088                 self.assertEqual(f.seek(0), 0)
   2089                 self.assertEqual(f.read(None), "abc")
   2090                 f.seek(0)
   2091                 self.assertEqual(f.read(2), "ab")
   2092                 self.assertEqual(f.read(1), "c")
   2093                 self.assertEqual(f.read(1), "")
   2094                 self.assertEqual(f.read(), "")
   2095                 self.assertEqual(f.tell(), cookie)
   2096                 self.assertEqual(f.seek(0), 0)
   2097                 self.assertEqual(f.seek(0, 2), cookie)
   2098                 self.assertEqual(f.write("def"), 3)
   2099                 self.assertEqual(f.seek(cookie), cookie)
   2100                 self.assertEqual(f.read(), "def")
   2101                 if enc.startswith("utf"):
   2102                     self.multi_line_test(f, enc)
   2103                 f.close()
   2104 
   2105     def multi_line_test(self, f, enc):
   2106         f.seek(0)
   2107         f.truncate()
   2108         sample = "s\xff\u0fff\uffff"
   2109         wlines = []
   2110         for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
   2111             chars = []
   2112             for i in range(size):
   2113                 chars.append(sample[i % len(sample)])
   2114             line = "".join(chars) + "\n"
   2115             wlines.append((f.tell(), line))
   2116             f.write(line)
   2117         f.seek(0)
   2118         rlines = []
   2119         while True:
   2120             pos = f.tell()
   2121             line = f.readline()
   2122             if not line:
   2123                 break
   2124             rlines.append((pos, line))
   2125         self.assertEqual(rlines, wlines)
   2126 
   2127     def test_telling(self):
   2128         f = self.open(support.TESTFN, "w+", encoding="utf8")
   2129         p0 = f.tell()
   2130         f.write("\xff\n")
   2131         p1 = f.tell()
   2132         f.write("\xff\n")
   2133         p2 = f.tell()
   2134         f.seek(0)
   2135         self.assertEqual(f.tell(), p0)
   2136         self.assertEqual(f.readline(), "\xff\n")
   2137         self.assertEqual(f.tell(), p1)
   2138         self.assertEqual(f.readline(), "\xff\n")
   2139         self.assertEqual(f.tell(), p2)
   2140         f.seek(0)
   2141         for line in f:
   2142             self.assertEqual(line, "\xff\n")
   2143             self.assertRaises(IOError, f.tell)
   2144         self.assertEqual(f.tell(), p2)
   2145         f.close()
   2146 
   2147     def test_seeking(self):
   2148         chunk_size = _default_chunk_size()
   2149         prefix_size = chunk_size - 2
   2150         u_prefix = "a" * prefix_size
   2151         prefix = bytes(u_prefix.encode("utf-8"))
   2152         self.assertEqual(len(u_prefix), len(prefix))
   2153         u_suffix = "\u8888\n"
   2154         suffix = bytes(u_suffix.encode("utf-8"))
   2155         line = prefix + suffix
   2156         f = self.open(support.TESTFN, "wb")
   2157         f.write(line*2)
   2158         f.close()
   2159         f = self.open(support.TESTFN, "r", encoding="utf-8")
   2160         s = f.read(prefix_size)
   2161         self.assertEqual(s, prefix.decode("ascii"))
   2162         self.assertEqual(f.tell(), prefix_size)
   2163         self.assertEqual(f.readline(), u_suffix)
   2164 
   2165     def test_seeking_too(self):
   2166         # Regression test for a specific bug
   2167         data = b'\xe0\xbf\xbf\n'
   2168         f = self.open(support.TESTFN, "wb")
   2169         f.write(data)
   2170         f.close()
   2171         f = self.open(support.TESTFN, "r", encoding="utf-8")
   2172         f._CHUNK_SIZE  # Just test that it exists
   2173         f._CHUNK_SIZE = 2
   2174         f.readline()
   2175         f.tell()
   2176 
   2177     def test_seek_and_tell(self):
   2178         #Test seek/tell using the StatefulIncrementalDecoder.
   2179         # Make test faster by doing smaller seeks
   2180         CHUNK_SIZE = 128
   2181 
   2182         def test_seek_and_tell_with_data(data, min_pos=0):
   2183             """Tell/seek to various points within a data stream and ensure
   2184             that the decoded data returned by read() is consistent."""
   2185             f = self.open(support.TESTFN, 'wb')
   2186             f.write(data)
   2187             f.close()
   2188             f = self.open(support.TESTFN, encoding='test_decoder')
   2189             f._CHUNK_SIZE = CHUNK_SIZE
   2190             decoded = f.read()
   2191             f.close()
   2192 
   2193             for i in range(min_pos, len(decoded) + 1): # seek positions
   2194                 for j in [1, 5, len(decoded) - i]: # read lengths
   2195                     f = self.open(support.TESTFN, encoding='test_decoder')
   2196                     self.assertEqual(f.read(i), decoded[:i])
   2197                     cookie = f.tell()
   2198                     self.assertEqual(f.read(j), decoded[i:i + j])
   2199                     f.seek(cookie)
   2200                     self.assertEqual(f.read(), decoded[i:])
   2201                     f.close()
   2202 
   2203         # Enable the test decoder.
   2204         StatefulIncrementalDecoder.codecEnabled = 1
   2205 
   2206         # Run the tests.
   2207         try:
   2208             # Try each test case.
   2209             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
   2210                 test_seek_and_tell_with_data(input)
   2211 
   2212             # Position each test case so that it crosses a chunk boundary.
   2213             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
   2214                 offset = CHUNK_SIZE - len(input)//2
   2215                 prefix = b'.'*offset
   2216                 # Don't bother seeking into the prefix (takes too long).
   2217                 min_pos = offset*2
   2218                 test_seek_and_tell_with_data(prefix + input, min_pos)
   2219 
   2220         # Ensure our test decoder won't interfere with subsequent tests.
   2221         finally:
   2222             StatefulIncrementalDecoder.codecEnabled = 0
   2223 
   2224     def test_encoded_writes(self):
   2225         data = "1234567890"
   2226         tests = ("utf-16",
   2227                  "utf-16-le",
   2228                  "utf-16-be",
   2229                  "utf-32",
   2230                  "utf-32-le",
   2231                  "utf-32-be")
   2232         for encoding in tests:
   2233             buf = self.BytesIO()
   2234             f = self.TextIOWrapper(buf, encoding=encoding)
   2235             # Check if the BOM is written only once (see issue1753).
   2236             f.write(data)
   2237             f.write(data)
   2238             f.seek(0)
   2239             self.assertEqual(f.read(), data * 2)
   2240             f.seek(0)
   2241             self.assertEqual(f.read(), data * 2)
   2242             self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
   2243 
   2244     def test_unreadable(self):
   2245         class UnReadable(self.BytesIO):
   2246             def readable(self):
   2247                 return False
   2248         txt = self.TextIOWrapper(UnReadable())
   2249         self.assertRaises(IOError, txt.read)
   2250 
   2251     def test_read_one_by_one(self):
   2252         txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
   2253         reads = ""
   2254         while True:
   2255             c = txt.read(1)
   2256             if not c:
   2257                 break
   2258             reads += c
   2259         self.assertEqual(reads, "AA\nBB")
   2260 
   2261     def test_readlines(self):
   2262         txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
   2263         self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
   2264         txt.seek(0)
   2265         self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
   2266         txt.seek(0)
   2267         self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
   2268 
   2269     # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
   2270     def test_read_by_chunk(self):
   2271         # make sure "\r\n" straddles 128 char boundary.
   2272         txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
   2273         reads = ""
   2274         while True:
   2275             c = txt.read(128)
   2276             if not c:
   2277                 break
   2278             reads += c
   2279         self.assertEqual(reads, "A"*127+"\nB")
   2280 
   2281     def test_writelines(self):
   2282         l = ['ab', 'cd', 'ef']
   2283         buf = self.BytesIO()
   2284         txt = self.TextIOWrapper(buf)
   2285         txt.writelines(l)
   2286         txt.flush()
   2287         self.assertEqual(buf.getvalue(), b'abcdef')
   2288 
   2289     def test_writelines_userlist(self):
   2290         l = UserList(['ab', 'cd', 'ef'])
   2291         buf = self.BytesIO()
   2292         txt = self.TextIOWrapper(buf)
   2293         txt.writelines(l)
   2294         txt.flush()
   2295         self.assertEqual(buf.getvalue(), b'abcdef')
   2296 
   2297     def test_writelines_error(self):
   2298         txt = self.TextIOWrapper(self.BytesIO())
   2299         self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
   2300         self.assertRaises(TypeError, txt.writelines, None)
   2301         self.assertRaises(TypeError, txt.writelines, b'abc')
   2302 
   2303     def test_issue1395_1(self):
   2304         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2305 
   2306         # read one char at a time
   2307         reads = ""
   2308         while True:
   2309             c = txt.read(1)
   2310             if not c:
   2311                 break
   2312             reads += c
   2313         self.assertEqual(reads, self.normalized)
   2314 
   2315     def test_issue1395_2(self):
   2316         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2317         txt._CHUNK_SIZE = 4
   2318 
   2319         reads = ""
   2320         while True:
   2321             c = txt.read(4)
   2322             if not c:
   2323                 break
   2324             reads += c
   2325         self.assertEqual(reads, self.normalized)
   2326 
   2327     def test_issue1395_3(self):
   2328         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2329         txt._CHUNK_SIZE = 4
   2330 
   2331         reads = txt.read(4)
   2332         reads += txt.read(4)
   2333         reads += txt.readline()
   2334         reads += txt.readline()
   2335         reads += txt.readline()
   2336         self.assertEqual(reads, self.normalized)
   2337 
   2338     def test_issue1395_4(self):
   2339         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2340         txt._CHUNK_SIZE = 4
   2341 
   2342         reads = txt.read(4)
   2343         reads += txt.read()
   2344         self.assertEqual(reads, self.normalized)
   2345 
   2346     def test_issue1395_5(self):
   2347         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2348         txt._CHUNK_SIZE = 4
   2349 
   2350         reads = txt.read(4)
   2351         pos = txt.tell()
   2352         txt.seek(0)
   2353         txt.seek(pos)
   2354         self.assertEqual(txt.read(4), "BBB\n")
   2355 
   2356     def test_issue2282(self):
   2357         buffer = self.BytesIO(self.testdata)
   2358         txt = self.TextIOWrapper(buffer, encoding="ascii")
   2359 
   2360         self.assertEqual(buffer.seekable(), txt.seekable())
   2361 
   2362     def test_append_bom(self):
   2363         # The BOM is not written again when appending to a non-empty file
   2364         filename = support.TESTFN
   2365         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
   2366             with self.open(filename, 'w', encoding=charset) as f:
   2367                 f.write('aaa')
   2368                 pos = f.tell()
   2369             with self.open(filename, 'rb') as f:
   2370                 self.assertEqual(f.read(), 'aaa'.encode(charset))
   2371 
   2372             with self.open(filename, 'a', encoding=charset) as f:
   2373                 f.write('xxx')
   2374             with self.open(filename, 'rb') as f:
   2375                 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
   2376 
   2377     def test_seek_bom(self):
   2378         # Same test, but when seeking manually
   2379         filename = support.TESTFN
   2380         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
   2381             with self.open(filename, 'w', encoding=charset) as f:
   2382                 f.write('aaa')
   2383                 pos = f.tell()
   2384             with self.open(filename, 'r+', encoding=charset) as f:
   2385                 f.seek(pos)
   2386                 f.write('zzz')
   2387                 f.seek(0)
   2388                 f.write('bbb')
   2389             with self.open(filename, 'rb') as f:
   2390                 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
   2391 
   2392     def test_errors_property(self):
   2393         with self.open(support.TESTFN, "w") as f:
   2394             self.assertEqual(f.errors, "strict")
   2395         with self.open(support.TESTFN, "w", errors="replace") as f:
   2396             self.assertEqual(f.errors, "replace")
   2397 
   2398     @unittest.skipUnless(threading, 'Threading required for this test.')
   2399     def test_threads_write(self):
   2400         # Issue6750: concurrent writes could duplicate data
   2401         event = threading.Event()
   2402         with self.open(support.TESTFN, "w", buffering=1) as f:
   2403             def run(n):
   2404                 text = "Thread%03d\n" % n
   2405                 event.wait()
   2406                 f.write(text)
   2407             threads = [threading.Thread(target=lambda n=x: run(n))
   2408                        for x in range(20)]
   2409             for t in threads:
   2410                 t.start()
   2411             time.sleep(0.02)
   2412             event.set()
   2413             for t in threads:
   2414                 t.join()
   2415         with self.open(support.TESTFN) as f:
   2416             content = f.read()
   2417             for n in range(20):
   2418                 self.assertEqual(content.count("Thread%03d\n" % n), 1)
   2419 
   2420     def test_flush_error_on_close(self):
   2421         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2422         def bad_flush():
   2423             raise IOError()
   2424         txt.flush = bad_flush
   2425         self.assertRaises(IOError, txt.close) # exception not swallowed
   2426         self.assertTrue(txt.closed)
   2427 
   2428     def test_multi_close(self):
   2429         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2430         txt.close()
   2431         txt.close()
   2432         txt.close()
   2433         self.assertRaises(ValueError, txt.flush)
   2434 
   2435     def test_readonly_attributes(self):
   2436         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2437         buf = self.BytesIO(self.testdata)
   2438         with self.assertRaises((AttributeError, TypeError)):
   2439             txt.buffer = buf
   2440 
   2441     def test_read_nonbytes(self):
   2442         # Issue #17106
   2443         # Crash when underlying read() returns non-bytes
   2444         class NonbytesStream(self.StringIO):
   2445             read1 = self.StringIO.read
   2446         class NonbytesStream(self.StringIO):
   2447             read1 = self.StringIO.read
   2448         t = self.TextIOWrapper(NonbytesStream('a'))
   2449         with self.maybeRaises(TypeError):
   2450             t.read(1)
   2451         t = self.TextIOWrapper(NonbytesStream('a'))
   2452         with self.maybeRaises(TypeError):
   2453             t.readline()
   2454         t = self.TextIOWrapper(NonbytesStream('a'))
   2455         self.assertEqual(t.read(), u'a')
   2456 
   2457     def test_illegal_decoder(self):
   2458         # Issue #17106
   2459         # Crash when decoder returns non-string
   2460         t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
   2461                                encoding='quopri_codec')
   2462         with self.maybeRaises(TypeError):
   2463             t.read(1)
   2464         t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
   2465                                encoding='quopri_codec')
   2466         with self.maybeRaises(TypeError):
   2467             t.readline()
   2468         t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
   2469                                encoding='quopri_codec')
   2470         with self.maybeRaises(TypeError):
   2471             t.read()
   2472 
   2473 
   2474 class CTextIOWrapperTest(TextIOWrapperTest):
   2475 
   2476     def test_initialization(self):
   2477         r = self.BytesIO(b"\xc3\xa9\n\n")
   2478         b = self.BufferedReader(r, 1000)
   2479         t = self.TextIOWrapper(b)
   2480         self.assertRaises(TypeError, t.__init__, b, newline=42)
   2481         self.assertRaises(ValueError, t.read)
   2482         self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
   2483         self.assertRaises(ValueError, t.read)
   2484 
   2485     def test_garbage_collection(self):
   2486         # C TextIOWrapper objects are collected, and collecting them flushes
   2487         # all data to disk.
   2488         # The Python version has __del__, so it ends in gc.garbage instead.
   2489         rawio = io.FileIO(support.TESTFN, "wb")
   2490         b = self.BufferedWriter(rawio)
   2491         t = self.TextIOWrapper(b, encoding="ascii")
   2492         t.write("456def")
   2493         t.x = t
   2494         wr = weakref.ref(t)
   2495         del t
   2496         support.gc_collect()
   2497         self.assertTrue(wr() is None, wr)
   2498         with self.open(support.TESTFN, "rb") as f:
   2499             self.assertEqual(f.read(), b"456def")
   2500 
   2501     def test_rwpair_cleared_before_textio(self):
   2502         # Issue 13070: TextIOWrapper's finalization would crash when called
   2503         # after the reference to the underlying BufferedRWPair's writer got
   2504         # cleared by the GC.
   2505         for i in range(1000):
   2506             b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
   2507             t1 = self.TextIOWrapper(b1, encoding="ascii")
   2508             b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
   2509             t2 = self.TextIOWrapper(b2, encoding="ascii")
   2510             # circular references
   2511             t1.buddy = t2
   2512             t2.buddy = t1
   2513         support.gc_collect()
   2514 
   2515     maybeRaises = unittest.TestCase.assertRaises
   2516 
   2517 
   2518 class PyTextIOWrapperTest(TextIOWrapperTest):
   2519     @contextlib.contextmanager
   2520     def maybeRaises(self, *args, **kwds):
   2521         yield
   2522 
   2523 
   2524 class IncrementalNewlineDecoderTest(unittest.TestCase):
   2525 
   2526     def check_newline_decoding_utf8(self, decoder):
   2527         # UTF-8 specific tests for a newline decoder
   2528         def _check_decode(b, s, **kwargs):
   2529             # We exercise getstate() / setstate() as well as decode()
   2530             state = decoder.getstate()
   2531             self.assertEqual(decoder.decode(b, **kwargs), s)
   2532             decoder.setstate(state)
   2533             self.assertEqual(decoder.decode(b, **kwargs), s)
   2534 
   2535         _check_decode(b'\xe8\xa2\x88', "\u8888")
   2536 
   2537         _check_decode(b'\xe8', "")
   2538         _check_decode(b'\xa2', "")
   2539         _check_decode(b'\x88', "\u8888")
   2540 
   2541         _check_decode(b'\xe8', "")
   2542         _check_decode(b'\xa2', "")
   2543         _check_decode(b'\x88', "\u8888")
   2544 
   2545         _check_decode(b'\xe8', "")
   2546         self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
   2547 
   2548         decoder.reset()
   2549         _check_decode(b'\n', "\n")
   2550         _check_decode(b'\r', "")
   2551         _check_decode(b'', "\n", final=True)
   2552         _check_decode(b'\r', "\n", final=True)
   2553 
   2554         _check_decode(b'\r', "")
   2555         _check_decode(b'a', "\na")
   2556 
   2557         _check_decode(b'\r\r\n', "\n\n")
   2558         _check_decode(b'\r', "")
   2559         _check_decode(b'\r', "\n")
   2560         _check_decode(b'\na', "\na")
   2561 
   2562         _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
   2563         _check_decode(b'\xe8\xa2\x88', "\u8888")
   2564         _check_decode(b'\n', "\n")
   2565         _check_decode(b'\xe8\xa2\x88\r', "\u8888")
   2566         _check_decode(b'\n', "\n")
   2567 
   2568     def check_newline_decoding(self, decoder, encoding):
   2569         result = []
   2570         if encoding is not None:
   2571             encoder = codecs.getincrementalencoder(encoding)()
   2572             def _decode_bytewise(s):
   2573                 # Decode one byte at a time
   2574                 for b in encoder.encode(s):
   2575                     result.append(decoder.decode(b))
   2576         else:
   2577             encoder = None
   2578             def _decode_bytewise(s):
   2579                 # Decode one char at a time
   2580                 for c in s:
   2581                     result.append(decoder.decode(c))
   2582         self.assertEqual(decoder.newlines, None)
   2583         _decode_bytewise("abc\n\r")
   2584         self.assertEqual(decoder.newlines, '\n')
   2585         _decode_bytewise("\nabc")
   2586         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
   2587         _decode_bytewise("abc\r")
   2588         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
   2589         _decode_bytewise("abc")
   2590         self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
   2591         _decode_bytewise("abc\r")
   2592         self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
   2593         decoder.reset()
   2594         input = "abc"
   2595         if encoder is not None:
   2596             encoder.reset()
   2597             input = encoder.encode(input)
   2598         self.assertEqual(decoder.decode(input), "abc")
   2599         self.assertEqual(decoder.newlines, None)
   2600 
   2601     def test_newline_decoder(self):
   2602         encodings = (
   2603             # None meaning the IncrementalNewlineDecoder takes unicode input
   2604             # rather than bytes input
   2605             None, 'utf-8', 'latin-1',
   2606             'utf-16', 'utf-16-le', 'utf-16-be',
   2607             'utf-32', 'utf-32-le', 'utf-32-be',
   2608         )
   2609         for enc in encodings:
   2610             decoder = enc and codecs.getincrementaldecoder(enc)()
   2611             decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
   2612             self.check_newline_decoding(decoder, enc)
   2613         decoder = codecs.getincrementaldecoder("utf-8")()
   2614         decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
   2615         self.check_newline_decoding_utf8(decoder)
   2616 
   2617     def test_newline_bytes(self):
   2618         # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
   2619         def _check(dec):
   2620             self.assertEqual(dec.newlines, None)
   2621             self.assertEqual(dec.decode("\u0D00"), "\u0D00")
   2622             self.assertEqual(dec.newlines, None)
   2623             self.assertEqual(dec.decode("\u0A00"), "\u0A00")
   2624             self.assertEqual(dec.newlines, None)
   2625         dec = self.IncrementalNewlineDecoder(None, translate=False)
   2626         _check(dec)
   2627         dec = self.IncrementalNewlineDecoder(None, translate=True)
   2628         _check(dec)
   2629 
   2630 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
   2631     pass
   2632 
   2633 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
   2634     pass
   2635 
   2636 
   2637 # XXX Tests for open()
   2638 
   2639 class MiscIOTest(unittest.TestCase):
   2640 
   2641     def tearDown(self):
   2642         support.unlink(support.TESTFN)
   2643 
   2644     def test___all__(self):
   2645         for name in self.io.__all__:
   2646             obj = getattr(self.io, name, None)
   2647             self.assertTrue(obj is not None, name)
   2648             if name == "open":
   2649                 continue
   2650             elif "error" in name.lower() or name == "UnsupportedOperation":
   2651                 self.assertTrue(issubclass(obj, Exception), name)
   2652             elif not name.startswith("SEEK_"):
   2653                 self.assertTrue(issubclass(obj, self.IOBase))
   2654 
   2655     def test_attributes(self):
   2656         f = self.open(support.TESTFN, "wb", buffering=0)
   2657         self.assertEqual(f.mode, "wb")
   2658         f.close()
   2659 
   2660         f = self.open(support.TESTFN, "U")
   2661         self.assertEqual(f.name,            support.TESTFN)
   2662         self.assertEqual(f.buffer.name,     support.TESTFN)
   2663         self.assertEqual(f.buffer.raw.name, support.TESTFN)
   2664         self.assertEqual(f.mode,            "U")
   2665         self.assertEqual(f.buffer.mode,     "rb")
   2666         self.assertEqual(f.buffer.raw.mode, "rb")
   2667         f.close()
   2668 
   2669         f = self.open(support.TESTFN, "w+")
   2670         self.assertEqual(f.mode,            "w+")
   2671         self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
   2672         self.assertEqual(f.buffer.raw.mode, "rb+")
   2673 
   2674         g = self.open(f.fileno(), "wb", closefd=False)
   2675         self.assertEqual(g.mode,     "wb")
   2676         self.assertEqual(g.raw.mode, "wb")
   2677         self.assertEqual(g.name,     f.fileno())
   2678         self.assertEqual(g.raw.name, f.fileno())
   2679         f.close()
   2680         g.close()
   2681 
   2682     def test_io_after_close(self):
   2683         for kwargs in [
   2684                 {"mode": "w"},
   2685                 {"mode": "wb"},
   2686                 {"mode": "w", "buffering": 1},
   2687                 {"mode": "w", "buffering": 2},
   2688                 {"mode": "wb", "buffering": 0},
   2689                 {"mode": "r"},
   2690                 {"mode": "rb"},
   2691                 {"mode": "r", "buffering": 1},
   2692                 {"mode": "r", "buffering": 2},
   2693                 {"mode": "rb", "buffering": 0},
   2694                 {"mode": "w+"},
   2695                 {"mode": "w+b"},
   2696                 {"mode": "w+", "buffering": 1},
   2697                 {"mode": "w+", "buffering": 2},
   2698                 {"mode": "w+b", "buffering": 0},
   2699             ]:
   2700             f = self.open(support.TESTFN, **kwargs)
   2701             f.close()
   2702             self.assertRaises(ValueError, f.flush)
   2703             self.assertRaises(ValueError, f.fileno)
   2704             self.assertRaises(ValueError, f.isatty)
   2705             self.assertRaises(ValueError, f.__iter__)
   2706             if hasattr(f, "peek"):
   2707                 self.assertRaises(ValueError, f.peek, 1)
   2708             self.assertRaises(ValueError, f.read)
   2709             if hasattr(f, "read1"):
   2710                 self.assertRaises(ValueError, f.read1, 1024)
   2711             if hasattr(f, "readall"):
   2712                 self.assertRaises(ValueError, f.readall)
   2713             if hasattr(f, "readinto"):
   2714                 self.assertRaises(ValueError, f.readinto, bytearray(1024))
   2715             self.assertRaises(ValueError, f.readline)
   2716             self.assertRaises(ValueError, f.readlines)
   2717             self.assertRaises(ValueError, f.seek, 0)
   2718             self.assertRaises(ValueError, f.tell)
   2719             self.assertRaises(ValueError, f.truncate)
   2720             self.assertRaises(ValueError, f.write,
   2721                               b"" if "b" in kwargs['mode'] else "")
   2722             self.assertRaises(ValueError, f.writelines, [])
   2723             self.assertRaises(ValueError, next, f)
   2724 
   2725     def test_blockingioerror(self):
   2726         # Various BlockingIOError issues
   2727         self.assertRaises(TypeError, self.BlockingIOError)
   2728         self.assertRaises(TypeError, self.BlockingIOError, 1)
   2729         self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
   2730         self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
   2731         b = self.BlockingIOError(1, "")
   2732         self.assertEqual(b.characters_written, 0)
   2733         class C(unicode):
   2734             pass
   2735         c = C("")
   2736         b = self.BlockingIOError(1, c)
   2737         c.b = b
   2738         b.c = c
   2739         wr = weakref.ref(c)
   2740         del c, b
   2741         support.gc_collect()
   2742         self.assertTrue(wr() is None, wr)
   2743 
   2744     def test_abcs(self):
   2745         # Test the visible base classes are ABCs.
   2746         self.assertIsInstance(self.IOBase, abc.ABCMeta)
   2747         self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
   2748         self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
   2749         self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
   2750 
   2751     def _check_abc_inheritance(self, abcmodule):
   2752         with self.open(support.TESTFN, "wb", buffering=0) as f:
   2753             self.assertIsInstance(f, abcmodule.IOBase)
   2754             self.assertIsInstance(f, abcmodule.RawIOBase)
   2755             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
   2756             self.assertNotIsInstance(f, abcmodule.TextIOBase)
   2757         with self.open(support.TESTFN, "wb") as f:
   2758             self.assertIsInstance(f, abcmodule.IOBase)
   2759             self.assertNotIsInstance(f, abcmodule.RawIOBase)
   2760             self.assertIsInstance(f, abcmodule.BufferedIOBase)
   2761             self.assertNotIsInstance(f, abcmodule.TextIOBase)
   2762         with self.open(support.TESTFN, "w") as f:
   2763             self.assertIsInstance(f, abcmodule.IOBase)
   2764             self.assertNotIsInstance(f, abcmodule.RawIOBase)
   2765             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
   2766             self.assertIsInstance(f, abcmodule.TextIOBase)
   2767 
   2768     def test_abc_inheritance(self):
   2769         # Test implementations inherit from their respective ABCs
   2770         self._check_abc_inheritance(self)
   2771 
   2772     def test_abc_inheritance_official(self):
   2773         # Test implementations inherit from the official ABCs of the
   2774         # baseline "io" module.
   2775         self._check_abc_inheritance(io)
   2776 
   2777     @unittest.skipUnless(fcntl, 'fcntl required for this test')
   2778     def test_nonblock_pipe_write_bigbuf(self):
   2779         self._test_nonblock_pipe_write(16*1024)
   2780 
   2781     @unittest.skipUnless(fcntl, 'fcntl required for this test')
   2782     def test_nonblock_pipe_write_smallbuf(self):
   2783         self._test_nonblock_pipe_write(1024)
   2784 
   2785     def _set_non_blocking(self, fd):
   2786         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
   2787         self.assertNotEqual(flags, -1)
   2788         res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
   2789         self.assertEqual(res, 0)
   2790 
   2791     def _test_nonblock_pipe_write(self, bufsize):
   2792         sent = []
   2793         received = []
   2794         r, w = os.pipe()
   2795         self._set_non_blocking(r)
   2796         self._set_non_blocking(w)
   2797 
   2798         # To exercise all code paths in the C implementation we need
   2799         # to play with buffer sizes.  For instance, if we choose a
   2800         # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
   2801         # then we will never get a partial write of the buffer.
   2802         rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
   2803         wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
   2804 
   2805         with rf, wf:
   2806             for N in 9999, 73, 7574:
   2807                 try:
   2808                     i = 0
   2809                     while True:
   2810                         msg = bytes([i % 26 + 97]) * N
   2811                         sent.append(msg)
   2812                         wf.write(msg)
   2813                         i += 1
   2814 
   2815                 except self.BlockingIOError as e:
   2816                     self.assertEqual(e.args[0], errno.EAGAIN)
   2817                     sent[-1] = sent[-1][:e.characters_written]
   2818                     received.append(rf.read())
   2819                     msg = b'BLOCKED'
   2820                     wf.write(msg)
   2821                     sent.append(msg)
   2822 
   2823             while True:
   2824                 try:
   2825                     wf.flush()
   2826                     break
   2827                 except self.BlockingIOError as e:
   2828                     self.assertEqual(e.args[0], errno.EAGAIN)
   2829                     self.assertEqual(e.characters_written, 0)
   2830                     received.append(rf.read())
   2831 
   2832             received += iter(rf.read, None)
   2833 
   2834         sent, received = b''.join(sent), b''.join(received)
   2835         self.assertTrue(sent == received)
   2836         self.assertTrue(wf.closed)
   2837         self.assertTrue(rf.closed)
   2838 
   2839 class CMiscIOTest(MiscIOTest):
   2840     io = io
   2841 
   2842 class PyMiscIOTest(MiscIOTest):
   2843     io = pyio
   2844 
   2845 
   2846 @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
   2847 class SignalsTest(unittest.TestCase):
   2848 
   2849     def setUp(self):
   2850         self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
   2851 
   2852     def tearDown(self):
   2853         signal.signal(signal.SIGALRM, self.oldalrm)
   2854 
   2855     def alarm_interrupt(self, sig, frame):
   2856         1 // 0
   2857 
   2858     @unittest.skipUnless(threading, 'Threading required for this test.')
   2859     @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
   2860                      'issue #12429: skip test on FreeBSD <= 7')
   2861     def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
   2862         """Check that a partial write, when it gets interrupted, properly
   2863         invokes the signal handler, and bubbles up the exception raised
   2864         in the latter."""
   2865         read_results = []
   2866         def _read():
   2867             s = os.read(r, 1)
   2868             read_results.append(s)
   2869         t = threading.Thread(target=_read)
   2870         t.daemon = True
   2871         r, w = os.pipe()
   2872         try:
   2873             wio = self.io.open(w, **fdopen_kwargs)
   2874             t.start()
   2875             signal.alarm(1)
   2876             # Fill the pipe enough that the write will be blocking.
   2877             # It will be interrupted by the timer armed above.  Since the
   2878             # other thread has read one byte, the low-level write will
   2879             # return with a successful (partial) result rather than an EINTR.
   2880             # The buffered IO layer must check for pending signal
   2881             # handlers, which in this case will invoke alarm_interrupt().
   2882             self.assertRaises(ZeroDivisionError,
   2883                         wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
   2884             t.join()
   2885             # We got one byte, get another one and check that it isn't a
   2886             # repeat of the first one.
   2887             read_results.append(os.read(r, 1))
   2888             self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
   2889         finally:
   2890             os.close(w)
   2891             os.close(r)
   2892             # This is deliberate. If we didn't close the file descriptor
   2893             # before closing wio, wio would try to flush its internal
   2894             # buffer, and block again.
   2895             try:
   2896                 wio.close()
   2897             except IOError as e:
   2898                 if e.errno != errno.EBADF:
   2899                     raise
   2900 
   2901     def test_interrupted_write_unbuffered(self):
   2902         self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
   2903 
   2904     def test_interrupted_write_buffered(self):
   2905         self.check_interrupted_write(b"xy", b"xy", mode="wb")
   2906 
   2907     def test_interrupted_write_text(self):
   2908         self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
   2909 
   2910     def check_reentrant_write(self, data, **fdopen_kwargs):
   2911         def on_alarm(*args):
   2912             # Will be called reentrantly from the same thread
   2913             wio.write(data)
   2914             1//0
   2915         signal.signal(signal.SIGALRM, on_alarm)
   2916         r, w = os.pipe()
   2917         wio = self.io.open(w, **fdopen_kwargs)
   2918         try:
   2919             signal.alarm(1)
   2920             # Either the reentrant call to wio.write() fails with RuntimeError,
   2921             # or the signal handler raises ZeroDivisionError.
   2922             with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
   2923                 while 1:
   2924                     for i in range(100):
   2925                         wio.write(data)
   2926                         wio.flush()
   2927                     # Make sure the buffer doesn't fill up and block further writes
   2928                     os.read(r, len(data) * 100)
   2929             exc = cm.exception
   2930             if isinstance(exc, RuntimeError):
   2931                 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
   2932         finally:
   2933             wio.close()
   2934             os.close(r)
   2935 
   2936     def test_reentrant_write_buffered(self):
   2937         self.check_reentrant_write(b"xy", mode="wb")
   2938 
   2939     def test_reentrant_write_text(self):
   2940         self.check_reentrant_write("xy", mode="w", encoding="ascii")
   2941 
   2942     def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
   2943         """Check that a buffered read, when it gets interrupted (either
   2944         returning a partial result or EINTR), properly invokes the signal
   2945         handler and retries if the latter returned successfully."""
   2946         r, w = os.pipe()
   2947         fdopen_kwargs["closefd"] = False
   2948         def alarm_handler(sig, frame):
   2949             os.write(w, b"bar")
   2950         signal.signal(signal.SIGALRM, alarm_handler)
   2951         try:
   2952             rio = self.io.open(r, **fdopen_kwargs)
   2953             os.write(w, b"foo")
   2954             signal.alarm(1)
   2955             # Expected behaviour:
   2956             # - first raw read() returns partial b"foo"
   2957             # - second raw read() returns EINTR
   2958             # - third raw read() returns b"bar"
   2959             self.assertEqual(decode(rio.read(6)), "foobar")
   2960         finally:
   2961             rio.close()
   2962             os.close(w)
   2963             os.close(r)
   2964 
   2965     def test_interrupterd_read_retry_buffered(self):
   2966         self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
   2967                                           mode="rb")
   2968 
   2969     def test_interrupterd_read_retry_text(self):
   2970         self.check_interrupted_read_retry(lambda x: x,
   2971                                           mode="r")
   2972 
   2973     @unittest.skipUnless(threading, 'Threading required for this test.')
   2974     def check_interrupted_write_retry(self, item, **fdopen_kwargs):
   2975         """Check that a buffered write, when it gets interrupted (either
   2976         returning a partial result or EINTR), properly invokes the signal
   2977         handler and retries if the latter returned successfully."""
   2978         select = support.import_module("select")
   2979         # A quantity that exceeds the buffer size of an anonymous pipe's
   2980         # write end.
   2981         N = support.PIPE_MAX_SIZE
   2982         r, w = os.pipe()
   2983         fdopen_kwargs["closefd"] = False
   2984         # We need a separate thread to read from the pipe and allow the
   2985         # write() to finish.  This thread is started after the SIGALRM is
   2986         # received (forcing a first EINTR in write()).
   2987         read_results = []
   2988         write_finished = False
   2989         def _read():
   2990             while not write_finished:
   2991                 while r in select.select([r], [], [], 1.0)[0]:
   2992                     s = os.read(r, 1024)
   2993                     read_results.append(s)
   2994         t = threading.Thread(target=_read)
   2995         t.daemon = True
   2996         def alarm1(sig, frame):
   2997             signal.signal(signal.SIGALRM, alarm2)
   2998             signal.alarm(1)
   2999         def alarm2(sig, frame):
   3000             t.start()
   3001         signal.signal(signal.SIGALRM, alarm1)
   3002         try:
   3003             wio = self.io.open(w, **fdopen_kwargs)
   3004             signal.alarm(1)
   3005             # Expected behaviour:
   3006             # - first raw write() is partial (because of the limited pipe buffer
   3007             #   and the first alarm)
   3008             # - second raw write() returns EINTR (because of the second alarm)
   3009             # - subsequent write()s are successful (either partial or complete)
   3010             self.assertEqual(N, wio.write(item * N))
   3011             wio.flush()
   3012             write_finished = True
   3013             t.join()
   3014             self.assertEqual(N, sum(len(x) for x in read_results))
   3015         finally:
   3016             write_finished = True
   3017             os.close(w)
   3018             os.close(r)
   3019             # This is deliberate. If we didn't close the file descriptor
   3020             # before closing wio, wio would try to flush its internal
   3021             # buffer, and could block (in case of failure).
   3022             try:
   3023                 wio.close()
   3024             except IOError as e:
   3025                 if e.errno != errno.EBADF:
   3026                     raise
   3027 
   3028     def test_interrupterd_write_retry_buffered(self):
   3029         self.check_interrupted_write_retry(b"x", mode="wb")
   3030 
   3031     def test_interrupterd_write_retry_text(self):
   3032         self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
   3033 
   3034 
   3035 class CSignalsTest(SignalsTest):
   3036     io = io
   3037 
   3038 class PySignalsTest(SignalsTest):
   3039     io = pyio
   3040 
   3041     # Handling reentrancy issues would slow down _pyio even more, so the
   3042     # tests are disabled.
   3043     test_reentrant_write_buffered = None
   3044     test_reentrant_write_text = None
   3045 
   3046 
   3047 def test_main():
   3048     tests = (CIOTest, PyIOTest,
   3049              CBufferedReaderTest, PyBufferedReaderTest,
   3050              CBufferedWriterTest, PyBufferedWriterTest,
   3051              CBufferedRWPairTest, PyBufferedRWPairTest,
   3052              CBufferedRandomTest, PyBufferedRandomTest,
   3053              StatefulIncrementalDecoderTest,
   3054              CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
   3055              CTextIOWrapperTest, PyTextIOWrapperTest,
   3056              CMiscIOTest, PyMiscIOTest,
   3057              CSignalsTest, PySignalsTest,
   3058              )
   3059 
   3060     # Put the namespaces of the IO module we are testing and some useful mock
   3061     # classes in the __dict__ of each test.
   3062     mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
   3063              MockNonBlockWriterIO, MockRawIOWithoutRead)
   3064     all_members = io.__all__ + ["IncrementalNewlineDecoder"]
   3065     c_io_ns = dict((name, getattr(io, name)) for name in all_members)
   3066     py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
   3067     globs = globals()
   3068     c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
   3069     py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
   3070     # Avoid turning open into a bound method.
   3071     py_io_ns["open"] = pyio.OpenWrapper
   3072     for test in tests:
   3073         if test.__name__.startswith("C"):
   3074             for name, obj in c_io_ns.items():
   3075                 setattr(test, name, obj)
   3076         elif test.__name__.startswith("Py"):
   3077             for name, obj in py_io_ns.items():
   3078                 setattr(test, name, obj)
   3079 
   3080     support.run_unittest(*tests)
   3081 
   3082 if __name__ == "__main__":
   3083     test_main()
   3084