Home | History | Annotate | Download | only in test
      1 """Unit tests for the io module."""
      2 
      3 # Tests of io are scattered over the test suite:

      4 # * test_bufio - tests file buffering

      5 # * test_memoryio - tests BytesIO and StringIO

      6 # * test_fileio - tests FileIO

      7 # * test_file - tests the file interface

      8 # * test_io - tests everything else in the io module

      9 # * test_univnewlines - tests universal newline support

     10 # * test_largefile - tests operations on a file greater than 2**32 bytes

     11 #     (only enabled with -ulargefile)

     12 
     13 ################################################################################

     14 # ATTENTION TEST WRITERS!!!

     15 ################################################################################

     16 # When writing tests for io, it's important to test both the C and Python

     17 # implementations. This is usually done by writing a base test that refers to

     18 # the type it is testing as a attribute. Then it provides custom subclasses to

     19 # test both implementations. This file has lots of examples.

     20 ################################################################################

     21 
     22 from __future__ import print_function
     23 from __future__ import unicode_literals
     24 
     25 import os
     26 import sys
     27 import time
     28 import array
     29 import random
     30 import unittest
     31 import weakref
     32 import abc
     33 import signal
     34 import errno
     35 from itertools import cycle, count
     36 from collections import deque
     37 from test import test_support as support
     38 
     39 import codecs
     40 import io  # C implementation of io

     41 import _pyio as pyio # Python implementation of io

     42 try:
     43     import threading
     44 except ImportError:
     45     threading = None
     46 
     47 __metaclass__ = type
     48 bytes = support.py3k_bytes
     49 
     50 def _default_chunk_size():
     51     """Get the default TextIOWrapper chunk size"""
     52     with io.open(__file__, "r", encoding="latin1") as f:
     53         return f._CHUNK_SIZE
     54 
     55 
     56 class MockRawIOWithoutRead:
     57     """A RawIO implementation without read(), so as to exercise the default
     58     RawIO.read() which calls readinto()."""
     59 
     60     def __init__(self, read_stack=()):
     61         self._read_stack = list(read_stack)
     62         self._write_stack = []
     63         self._reads = 0
     64         self._extraneous_reads = 0
     65 
     66     def write(self, b):
     67         self._write_stack.append(bytes(b))
     68         return len(b)
     69 
     70     def writable(self):
     71         return True
     72 
     73     def fileno(self):
     74         return 42
     75 
     76     def readable(self):
     77         return True
     78 
     79     def seekable(self):
     80         return True
     81 
     82     def seek(self, pos, whence):
     83         return 0   # wrong but we gotta return something

     84 
     85     def tell(self):
     86         return 0   # same comment as above

     87 
     88     def readinto(self, buf):
     89         self._reads += 1
     90         max_len = len(buf)
     91         try:
     92             data = self._read_stack[0]
     93         except IndexError:
     94             self._extraneous_reads += 1
     95             return 0
     96         if data is None:
     97             del self._read_stack[0]
     98             return None
     99         n = len(data)
    100         if len(data) <= max_len:
    101             del self._read_stack[0]
    102             buf[:n] = data
    103             return n
    104         else:
    105             buf[:] = data[:max_len]
    106             self._read_stack[0] = data[max_len:]
    107             return max_len
    108 
    109     def truncate(self, pos=None):
    110         return pos
    111 
    112 class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
    113     pass
    114 
    115 class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
    116     pass
    117 
    118 
    119 class MockRawIO(MockRawIOWithoutRead):
    120 
    121     def read(self, n=None):
    122         self._reads += 1
    123         try:
    124             return self._read_stack.pop(0)
    125         except:
    126             self._extraneous_reads += 1
    127             return b""
    128 
    129 class CMockRawIO(MockRawIO, io.RawIOBase):
    130     pass
    131 
    132 class PyMockRawIO(MockRawIO, pyio.RawIOBase):
    133     pass
    134 
    135 
    136 class MisbehavedRawIO(MockRawIO):
    137     def write(self, b):
    138         return MockRawIO.write(self, b) * 2
    139 
    140     def read(self, n=None):
    141         return MockRawIO.read(self, n) * 2
    142 
    143     def seek(self, pos, whence):
    144         return -123
    145 
    146     def tell(self):
    147         return -456
    148 
    149     def readinto(self, buf):
    150         MockRawIO.readinto(self, buf)
    151         return len(buf) * 5
    152 
    153 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
    154     pass
    155 
    156 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
    157     pass
    158 
    159 
    160 class CloseFailureIO(MockRawIO):
    161     closed = 0
    162 
    163     def close(self):
    164         if not self.closed:
    165             self.closed = 1
    166             raise IOError
    167 
    168 class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
    169     pass
    170 
    171 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
    172     pass
    173 
    174 
    175 class MockFileIO:
    176 
    177     def __init__(self, data):
    178         self.read_history = []
    179         super(MockFileIO, self).__init__(data)
    180 
    181     def read(self, n=None):
    182         res = super(MockFileIO, self).read(n)
    183         self.read_history.append(None if res is None else len(res))
    184         return res
    185 
    186     def readinto(self, b):
    187         res = super(MockFileIO, self).readinto(b)
    188         self.read_history.append(res)
    189         return res
    190 
    191 class CMockFileIO(MockFileIO, io.BytesIO):
    192     pass
    193 
    194 class PyMockFileIO(MockFileIO, pyio.BytesIO):
    195     pass
    196 
    197 
    198 class MockNonBlockWriterIO:
    199 
    200     def __init__(self):
    201         self._write_stack = []
    202         self._blocker_char = None
    203 
    204     def pop_written(self):
    205         s = b"".join(self._write_stack)
    206         self._write_stack[:] = []
    207         return s
    208 
    209     def block_on(self, char):
    210         """Block when a given char is encountered."""
    211         self._blocker_char = char
    212 
    213     def readable(self):
    214         return True
    215 
    216     def seekable(self):
    217         return True
    218 
    219     def writable(self):
    220         return True
    221 
    222     def write(self, b):
    223         b = bytes(b)
    224         n = -1
    225         if self._blocker_char:
    226             try:
    227                 n = b.index(self._blocker_char)
    228             except ValueError:
    229                 pass
    230             else:
    231                 self._blocker_char = None
    232                 self._write_stack.append(b[:n])
    233                 raise self.BlockingIOError(0, "test blocking", n)
    234         self._write_stack.append(b)
    235         return len(b)
    236 
    237 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
    238     BlockingIOError = io.BlockingIOError
    239 
    240 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
    241     BlockingIOError = pyio.BlockingIOError
    242 
    243 
    244 class IOTest(unittest.TestCase):
    245 
    246     def setUp(self):
    247         support.unlink(support.TESTFN)
    248 
    249     def tearDown(self):
    250         support.unlink(support.TESTFN)
    251 
    252     def write_ops(self, f):
    253         self.assertEqual(f.write(b"blah."), 5)
    254         f.truncate(0)
    255         self.assertEqual(f.tell(), 5)
    256         f.seek(0)
    257 
    258         self.assertEqual(f.write(b"blah."), 5)
    259         self.assertEqual(f.seek(0), 0)
    260         self.assertEqual(f.write(b"Hello."), 6)
    261         self.assertEqual(f.tell(), 6)
    262         self.assertEqual(f.seek(-1, 1), 5)
    263         self.assertEqual(f.tell(), 5)
    264         self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
    265         self.assertEqual(f.seek(0), 0)
    266         self.assertEqual(f.write(b"h"), 1)
    267         self.assertEqual(f.seek(-1, 2), 13)
    268         self.assertEqual(f.tell(), 13)
    269 
    270         self.assertEqual(f.truncate(12), 12)
    271         self.assertEqual(f.tell(), 13)
    272         self.assertRaises(TypeError, f.seek, 0.0)
    273 
    274     def read_ops(self, f, buffered=False):
    275         data = f.read(5)
    276         self.assertEqual(data, b"hello")
    277         data = bytearray(data)
    278         self.assertEqual(f.readinto(data), 5)
    279         self.assertEqual(data, b" worl")
    280         self.assertEqual(f.readinto(data), 2)
    281         self.assertEqual(len(data), 5)
    282         self.assertEqual(data[:2], b"d\n")
    283         self.assertEqual(f.seek(0), 0)
    284         self.assertEqual(f.read(20), b"hello world\n")
    285         self.assertEqual(f.read(1), b"")
    286         self.assertEqual(f.readinto(bytearray(b"x")), 0)
    287         self.assertEqual(f.seek(-6, 2), 6)
    288         self.assertEqual(f.read(5), b"world")
    289         self.assertEqual(f.read(0), b"")
    290         self.assertEqual(f.readinto(bytearray()), 0)
    291         self.assertEqual(f.seek(-6, 1), 5)
    292         self.assertEqual(f.read(5), b" worl")
    293         self.assertEqual(f.tell(), 10)
    294         self.assertRaises(TypeError, f.seek, 0.0)
    295         if buffered:
    296             f.seek(0)
    297             self.assertEqual(f.read(), b"hello world\n")
    298             f.seek(6)
    299             self.assertEqual(f.read(), b"world\n")
    300             self.assertEqual(f.read(), b"")
    301 
    302     LARGE = 2**31
    303 
    304     def large_file_ops(self, f):
    305         assert f.readable()
    306         assert f.writable()
    307         self.assertEqual(f.seek(self.LARGE), self.LARGE)
    308         self.assertEqual(f.tell(), self.LARGE)
    309         self.assertEqual(f.write(b"xxx"), 3)
    310         self.assertEqual(f.tell(), self.LARGE + 3)
    311         self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
    312         self.assertEqual(f.truncate(), self.LARGE + 2)
    313         self.assertEqual(f.tell(), self.LARGE + 2)
    314         self.assertEqual(f.seek(0, 2), self.LARGE + 2)
    315         self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
    316         self.assertEqual(f.tell(), self.LARGE + 2)
    317         self.assertEqual(f.seek(0, 2), self.LARGE + 1)
    318         self.assertEqual(f.seek(-1, 2), self.LARGE)
    319         self.assertEqual(f.read(2), b"x")
    320 
    321     def test_invalid_operations(self):
    322         # Try writing on a file opened in read mode and vice-versa.

    323         for mode in ("w", "wb"):
    324             with self.open(support.TESTFN, mode) as fp:
    325                 self.assertRaises(IOError, fp.read)
    326                 self.assertRaises(IOError, fp.readline)
    327         with self.open(support.TESTFN, "rb") as fp:
    328             self.assertRaises(IOError, fp.write, b"blah")
    329             self.assertRaises(IOError, fp.writelines, [b"blah\n"])
    330         with self.open(support.TESTFN, "r") as fp:
    331             self.assertRaises(IOError, fp.write, "blah")
    332             self.assertRaises(IOError, fp.writelines, ["blah\n"])
    333 
    334     def test_raw_file_io(self):
    335         with self.open(support.TESTFN, "wb", buffering=0) as f:
    336             self.assertEqual(f.readable(), False)
    337             self.assertEqual(f.writable(), True)
    338             self.assertEqual(f.seekable(), True)
    339             self.write_ops(f)
    340         with self.open(support.TESTFN, "rb", buffering=0) as f:
    341             self.assertEqual(f.readable(), True)
    342             self.assertEqual(f.writable(), False)
    343             self.assertEqual(f.seekable(), True)
    344             self.read_ops(f)
    345 
    346     def test_buffered_file_io(self):
    347         with self.open(support.TESTFN, "wb") as f:
    348             self.assertEqual(f.readable(), False)
    349             self.assertEqual(f.writable(), True)
    350             self.assertEqual(f.seekable(), True)
    351             self.write_ops(f)
    352         with self.open(support.TESTFN, "rb") as f:
    353             self.assertEqual(f.readable(), True)
    354             self.assertEqual(f.writable(), False)
    355             self.assertEqual(f.seekable(), True)
    356             self.read_ops(f, True)
    357 
    358     def test_readline(self):
    359         with self.open(support.TESTFN, "wb") as f:
    360             f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
    361         with self.open(support.TESTFN, "rb") as f:
    362             self.assertEqual(f.readline(), b"abc\n")
    363             self.assertEqual(f.readline(10), b"def\n")
    364             self.assertEqual(f.readline(2), b"xy")
    365             self.assertEqual(f.readline(4), b"zzy\n")
    366             self.assertEqual(f.readline(), b"foo\x00bar\n")
    367             self.assertEqual(f.readline(None), b"another line")
    368             self.assertRaises(TypeError, f.readline, 5.3)
    369         with self.open(support.TESTFN, "r") as f:
    370             self.assertRaises(TypeError, f.readline, 5.3)
    371 
    372     def test_raw_bytes_io(self):
    373         f = self.BytesIO()
    374         self.write_ops(f)
    375         data = f.getvalue()
    376         self.assertEqual(data, b"hello world\n")
    377         f = self.BytesIO(data)
    378         self.read_ops(f, True)
    379 
    380     def test_large_file_ops(self):
    381         # On Windows and Mac OSX this test comsumes large resources; It takes

    382         # a long time to build the >2GB file and takes >2GB of disk space

    383         # therefore the resource must be enabled to run this test.

    384         if sys.platform[:3] == 'win' or sys.platform == 'darwin':
    385             if not support.is_resource_enabled("largefile"):
    386                 print("\nTesting large file ops skipped on %s." % sys.platform,
    387                       file=sys.stderr)
    388                 print("It requires %d bytes and a long time." % self.LARGE,
    389                       file=sys.stderr)
    390                 print("Use 'regrtest.py -u largefile test_io' to run it.",
    391                       file=sys.stderr)
    392                 return
    393         with self.open(support.TESTFN, "w+b", 0) as f:
    394             self.large_file_ops(f)
    395         with self.open(support.TESTFN, "w+b") as f:
    396             self.large_file_ops(f)
    397 
    398     def test_with_open(self):
    399         for bufsize in (0, 1, 100):
    400             f = None
    401             with self.open(support.TESTFN, "wb", bufsize) as f:
    402                 f.write(b"xxx")
    403             self.assertEqual(f.closed, True)
    404             f = None
    405             try:
    406                 with self.open(support.TESTFN, "wb", bufsize) as f:
    407                     1 // 0
    408             except ZeroDivisionError:
    409                 self.assertEqual(f.closed, True)
    410             else:
    411                 self.fail("1 // 0 didn't raise an exception")
    412 
    413     # issue 5008

    414     def test_append_mode_tell(self):
    415         with self.open(support.TESTFN, "wb") as f:
    416             f.write(b"xxx")
    417         with self.open(support.TESTFN, "ab", buffering=0) as f:
    418             self.assertEqual(f.tell(), 3)
    419         with self.open(support.TESTFN, "ab") as f:
    420             self.assertEqual(f.tell(), 3)
    421         with self.open(support.TESTFN, "a") as f:
    422             self.assertTrue(f.tell() > 0)
    423 
    424     def test_destructor(self):
    425         record = []
    426         class MyFileIO(self.FileIO):
    427             def __del__(self):
    428                 record.append(1)
    429                 try:
    430                     f = super(MyFileIO, self).__del__
    431                 except AttributeError:
    432                     pass
    433                 else:
    434                     f()
    435             def close(self):
    436                 record.append(2)
    437                 super(MyFileIO, self).close()
    438             def flush(self):
    439                 record.append(3)
    440                 super(MyFileIO, self).flush()
    441         f = MyFileIO(support.TESTFN, "wb")
    442         f.write(b"xxx")
    443         del f
    444         support.gc_collect()
    445         self.assertEqual(record, [1, 2, 3])
    446         with self.open(support.TESTFN, "rb") as f:
    447             self.assertEqual(f.read(), b"xxx")
    448 
    449     def _check_base_destructor(self, base):
    450         record = []
    451         class MyIO(base):
    452             def __init__(self):
    453                 # This exercises the availability of attributes on object

    454                 # destruction.

    455                 # (in the C version, close() is called by the tp_dealloc

    456                 # function, not by __del__)

    457                 self.on_del = 1
    458                 self.on_close = 2
    459                 self.on_flush = 3
    460             def __del__(self):
    461                 record.append(self.on_del)
    462                 try:
    463                     f = super(MyIO, self).__del__
    464                 except AttributeError:
    465                     pass
    466                 else:
    467                     f()
    468             def close(self):
    469                 record.append(self.on_close)
    470                 super(MyIO, self).close()
    471             def flush(self):
    472                 record.append(self.on_flush)
    473                 super(MyIO, self).flush()
    474         f = MyIO()
    475         del f
    476         support.gc_collect()
    477         self.assertEqual(record, [1, 2, 3])
    478 
    479     def test_IOBase_destructor(self):
    480         self._check_base_destructor(self.IOBase)
    481 
    482     def test_RawIOBase_destructor(self):
    483         self._check_base_destructor(self.RawIOBase)
    484 
    485     def test_BufferedIOBase_destructor(self):
    486         self._check_base_destructor(self.BufferedIOBase)
    487 
    488     def test_TextIOBase_destructor(self):
    489         self._check_base_destructor(self.TextIOBase)
    490 
    491     def test_close_flushes(self):
    492         with self.open(support.TESTFN, "wb") as f:
    493             f.write(b"xxx")
    494         with self.open(support.TESTFN, "rb") as f:
    495             self.assertEqual(f.read(), b"xxx")
    496 
    497     def test_array_writes(self):
    498         a = array.array(b'i', range(10))
    499         n = len(a.tostring())
    500         with self.open(support.TESTFN, "wb", 0) as f:
    501             self.assertEqual(f.write(a), n)
    502         with self.open(support.TESTFN, "wb") as f:
    503             self.assertEqual(f.write(a), n)
    504 
    505     def test_closefd(self):
    506         self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
    507                           closefd=False)
    508 
    509     def test_read_closed(self):
    510         with self.open(support.TESTFN, "w") as f:
    511             f.write("egg\n")
    512         with self.open(support.TESTFN, "r") as f:
    513             file = self.open(f.fileno(), "r", closefd=False)
    514             self.assertEqual(file.read(), "egg\n")
    515             file.seek(0)
    516             file.close()
    517             self.assertRaises(ValueError, file.read)
    518 
    519     def test_no_closefd_with_filename(self):
    520         # can't use closefd in combination with a file name

    521         self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
    522 
    523     def test_closefd_attr(self):
    524         with self.open(support.TESTFN, "wb") as f:
    525             f.write(b"egg\n")
    526         with self.open(support.TESTFN, "r") as f:
    527             self.assertEqual(f.buffer.raw.closefd, True)
    528             file = self.open(f.fileno(), "r", closefd=False)
    529             self.assertEqual(file.buffer.raw.closefd, False)
    530 
    531     def test_garbage_collection(self):
    532         # FileIO objects are collected, and collecting them flushes

    533         # all data to disk.

    534         f = self.FileIO(support.TESTFN, "wb")
    535         f.write(b"abcxxx")
    536         f.f = f
    537         wr = weakref.ref(f)
    538         del f
    539         support.gc_collect()
    540         self.assertTrue(wr() is None, wr)
    541         with self.open(support.TESTFN, "rb") as f:
    542             self.assertEqual(f.read(), b"abcxxx")
    543 
    544     def test_unbounded_file(self):
    545         # Issue #1174606: reading from an unbounded stream such as /dev/zero.

    546         zero = "/dev/zero"
    547         if not os.path.exists(zero):
    548             self.skipTest("{0} does not exist".format(zero))
    549         if sys.maxsize > 0x7FFFFFFF:
    550             self.skipTest("test can only run in a 32-bit address space")
    551         if support.real_max_memuse < support._2G:
    552             self.skipTest("test requires at least 2GB of memory")
    553         with self.open(zero, "rb", buffering=0) as f:
    554             self.assertRaises(OverflowError, f.read)
    555         with self.open(zero, "rb") as f:
    556             self.assertRaises(OverflowError, f.read)
    557         with self.open(zero, "r") as f:
    558             self.assertRaises(OverflowError, f.read)
    559 
    560     def test_flush_error_on_close(self):
    561         f = self.open(support.TESTFN, "wb", buffering=0)
    562         def bad_flush():
    563             raise IOError()
    564         f.flush = bad_flush
    565         self.assertRaises(IOError, f.close) # exception not swallowed

    566 
    567     def test_multi_close(self):
    568         f = self.open(support.TESTFN, "wb", buffering=0)
    569         f.close()
    570         f.close()
    571         f.close()
    572         self.assertRaises(ValueError, f.flush)
    573 
    574     def test_RawIOBase_read(self):
    575         # Exercise the default RawIOBase.read() implementation (which calls

    576         # readinto() internally).

    577         rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
    578         self.assertEqual(rawio.read(2), b"ab")
    579         self.assertEqual(rawio.read(2), b"c")
    580         self.assertEqual(rawio.read(2), b"d")
    581         self.assertEqual(rawio.read(2), None)
    582         self.assertEqual(rawio.read(2), b"ef")
    583         self.assertEqual(rawio.read(2), b"g")
    584         self.assertEqual(rawio.read(2), None)
    585         self.assertEqual(rawio.read(2), b"")
    586 
    587 class CIOTest(IOTest):
    588     pass
    589 
    590 class PyIOTest(IOTest):
    591     test_array_writes = unittest.skip(
    592         "len(array.array) returns number of elements rather than bytelength"
    593     )(IOTest.test_array_writes)
    594 
    595 
    596 class CommonBufferedTests:
    597     # Tests common to BufferedReader, BufferedWriter and BufferedRandom

    598 
    599     def test_detach(self):
    600         raw = self.MockRawIO()
    601         buf = self.tp(raw)
    602         self.assertIs(buf.detach(), raw)
    603         self.assertRaises(ValueError, buf.detach)
    604 
    605     def test_fileno(self):
    606         rawio = self.MockRawIO()
    607         bufio = self.tp(rawio)
    608 
    609         self.assertEqual(42, bufio.fileno())
    610 
    611     def test_no_fileno(self):
    612         # XXX will we always have fileno() function? If so, kill

    613         # this test. Else, write it.

    614         pass
    615 
    616     def test_invalid_args(self):
    617         rawio = self.MockRawIO()
    618         bufio = self.tp(rawio)
    619         # Invalid whence

    620         self.assertRaises(ValueError, bufio.seek, 0, -1)
    621         self.assertRaises(ValueError, bufio.seek, 0, 3)
    622 
    623     def test_override_destructor(self):
    624         tp = self.tp
    625         record = []
    626         class MyBufferedIO(tp):
    627             def __del__(self):
    628                 record.append(1)
    629                 try:
    630                     f = super(MyBufferedIO, self).__del__
    631                 except AttributeError:
    632                     pass
    633                 else:
    634                     f()
    635             def close(self):
    636                 record.append(2)
    637                 super(MyBufferedIO, self).close()
    638             def flush(self):
    639                 record.append(3)
    640                 super(MyBufferedIO, self).flush()
    641         rawio = self.MockRawIO()
    642         bufio = MyBufferedIO(rawio)
    643         writable = bufio.writable()
    644         del bufio
    645         support.gc_collect()
    646         if writable:
    647             self.assertEqual(record, [1, 2, 3])
    648         else:
    649             self.assertEqual(record, [1, 2])
    650 
    651     def test_context_manager(self):
    652         # Test usability as a context manager

    653         rawio = self.MockRawIO()
    654         bufio = self.tp(rawio)
    655         def _with():
    656             with bufio:
    657                 pass
    658         _with()
    659         # bufio should now be closed, and using it a second time should raise

    660         # a ValueError.

    661         self.assertRaises(ValueError, _with)
    662 
    663     def test_error_through_destructor(self):
    664         # Test that the exception state is not modified by a destructor,

    665         # even if close() fails.

    666         rawio = self.CloseFailureIO()
    667         def f():
    668             self.tp(rawio).xyzzy
    669         with support.captured_output("stderr") as s:
    670             self.assertRaises(AttributeError, f)
    671         s = s.getvalue().strip()
    672         if s:
    673             # The destructor *may* have printed an unraisable error, check it

    674             self.assertEqual(len(s.splitlines()), 1)
    675             self.assertTrue(s.startswith("Exception IOError: "), s)
    676             self.assertTrue(s.endswith(" ignored"), s)
    677 
    678     def test_repr(self):
    679         raw = self.MockRawIO()
    680         b = self.tp(raw)
    681         clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
    682         self.assertEqual(repr(b), "<%s>" % clsname)
    683         raw.name = "dummy"
    684         self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
    685         raw.name = b"dummy"
    686         self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
    687 
    688     def test_flush_error_on_close(self):
    689         raw = self.MockRawIO()
    690         def bad_flush():
    691             raise IOError()
    692         raw.flush = bad_flush
    693         b = self.tp(raw)
    694         self.assertRaises(IOError, b.close) # exception not swallowed

    695 
    696     def test_multi_close(self):
    697         raw = self.MockRawIO()
    698         b = self.tp(raw)
    699         b.close()
    700         b.close()
    701         b.close()
    702         self.assertRaises(ValueError, b.flush)
    703 
    704     def test_readonly_attributes(self):
    705         raw = self.MockRawIO()
    706         buf = self.tp(raw)
    707         x = self.MockRawIO()
    708         with self.assertRaises((AttributeError, TypeError)):
    709             buf.raw = x
    710 
    711 
    712 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
    713     read_mode = "rb"
    714 
    715     def test_constructor(self):
    716         rawio = self.MockRawIO([b"abc"])
    717         bufio = self.tp(rawio)
    718         bufio.__init__(rawio)
    719         bufio.__init__(rawio, buffer_size=1024)
    720         bufio.__init__(rawio, buffer_size=16)
    721         self.assertEqual(b"abc", bufio.read())
    722         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
    723         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
    724         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
    725         rawio = self.MockRawIO([b"abc"])
    726         bufio.__init__(rawio)
    727         self.assertEqual(b"abc", bufio.read())
    728 
    729     def test_read(self):
    730         for arg in (None, 7):
    731             rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    732             bufio = self.tp(rawio)
    733             self.assertEqual(b"abcdefg", bufio.read(arg))
    734         # Invalid args

    735         self.assertRaises(ValueError, bufio.read, -2)
    736 
    737     def test_read1(self):
    738         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    739         bufio = self.tp(rawio)
    740         self.assertEqual(b"a", bufio.read(1))
    741         self.assertEqual(b"b", bufio.read1(1))
    742         self.assertEqual(rawio._reads, 1)
    743         self.assertEqual(b"c", bufio.read1(100))
    744         self.assertEqual(rawio._reads, 1)
    745         self.assertEqual(b"d", bufio.read1(100))
    746         self.assertEqual(rawio._reads, 2)
    747         self.assertEqual(b"efg", bufio.read1(100))
    748         self.assertEqual(rawio._reads, 3)
    749         self.assertEqual(b"", bufio.read1(100))
    750         self.assertEqual(rawio._reads, 4)
    751         # Invalid args

    752         self.assertRaises(ValueError, bufio.read1, -1)
    753 
    754     def test_readinto(self):
    755         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    756         bufio = self.tp(rawio)
    757         b = bytearray(2)
    758         self.assertEqual(bufio.readinto(b), 2)
    759         self.assertEqual(b, b"ab")
    760         self.assertEqual(bufio.readinto(b), 2)
    761         self.assertEqual(b, b"cd")
    762         self.assertEqual(bufio.readinto(b), 2)
    763         self.assertEqual(b, b"ef")
    764         self.assertEqual(bufio.readinto(b), 1)
    765         self.assertEqual(b, b"gf")
    766         self.assertEqual(bufio.readinto(b), 0)
    767         self.assertEqual(b, b"gf")
    768 
    769     def test_readlines(self):
    770         def bufio():
    771             rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
    772             return self.tp(rawio)
    773         self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
    774         self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
    775         self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
    776 
    777     def test_buffering(self):
    778         data = b"abcdefghi"
    779         dlen = len(data)
    780 
    781         tests = [
    782             [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
    783             [ 100, [ 3, 3, 3],     [ dlen ]    ],
    784             [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
    785         ]
    786 
    787         for bufsize, buf_read_sizes, raw_read_sizes in tests:
    788             rawio = self.MockFileIO(data)
    789             bufio = self.tp(rawio, buffer_size=bufsize)
    790             pos = 0
    791             for nbytes in buf_read_sizes:
    792                 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
    793                 pos += nbytes
    794             # this is mildly implementation-dependent

    795             self.assertEqual(rawio.read_history, raw_read_sizes)
    796 
    797     def test_read_non_blocking(self):
    798         # Inject some None's in there to simulate EWOULDBLOCK

    799         rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
    800         bufio = self.tp(rawio)
    801         self.assertEqual(b"abcd", bufio.read(6))
    802         self.assertEqual(b"e", bufio.read(1))
    803         self.assertEqual(b"fg", bufio.read())
    804         self.assertEqual(b"", bufio.peek(1))
    805         self.assertIsNone(bufio.read())
    806         self.assertEqual(b"", bufio.read())
    807 
    808         rawio = self.MockRawIO((b"a", None, None))
    809         self.assertEqual(b"a", rawio.readall())
    810         self.assertIsNone(rawio.readall())
    811 
    812     def test_read_past_eof(self):
    813         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    814         bufio = self.tp(rawio)
    815 
    816         self.assertEqual(b"abcdefg", bufio.read(9000))
    817 
    818     def test_read_all(self):
    819         rawio = self.MockRawIO((b"abc", b"d", b"efg"))
    820         bufio = self.tp(rawio)
    821 
    822         self.assertEqual(b"abcdefg", bufio.read())
    823 
    824     @unittest.skipUnless(threading, 'Threading required for this test.')
    825     @support.requires_resource('cpu')
    826     def test_threads(self):
    827         try:
    828             # Write out many bytes with exactly the same number of 0's,

    829             # 1's... 255's. This will help us check that concurrent reading

    830             # doesn't duplicate or forget contents.

    831             N = 1000
    832             l = list(range(256)) * N
    833             random.shuffle(l)
    834             s = bytes(bytearray(l))
    835             with self.open(support.TESTFN, "wb") as f:
    836                 f.write(s)
    837             with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
    838                 bufio = self.tp(raw, 8)
    839                 errors = []
    840                 results = []
    841                 def f():
    842                     try:
    843                         # Intra-buffer read then buffer-flushing read

    844                         for n in cycle([1, 19]):
    845                             s = bufio.read(n)
    846                             if not s:
    847                                 break
    848                             # list.append() is atomic

    849                             results.append(s)
    850                     except Exception as e:
    851                         errors.append(e)
    852                         raise
    853                 threads = [threading.Thread(target=f) for x in range(20)]
    854                 for t in threads:
    855                     t.start()
    856                 time.sleep(0.02) # yield

    857                 for t in threads:
    858                     t.join()
    859                 self.assertFalse(errors,
    860                     "the following exceptions were caught: %r" % errors)
    861                 s = b''.join(results)
    862                 for i in range(256):
    863                     c = bytes(bytearray([i]))
    864                     self.assertEqual(s.count(c), N)
    865         finally:
    866             support.unlink(support.TESTFN)
    867 
    868     def test_misbehaved_io(self):
    869         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
    870         bufio = self.tp(rawio)
    871         self.assertRaises(IOError, bufio.seek, 0)
    872         self.assertRaises(IOError, bufio.tell)
    873 
    874     def test_no_extraneous_read(self):
    875         # Issue #9550; when the raw IO object has satisfied the read request,

    876         # we should not issue any additional reads, otherwise it may block

    877         # (e.g. socket).

    878         bufsize = 16
    879         for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
    880             rawio = self.MockRawIO([b"x" * n])
    881             bufio = self.tp(rawio, bufsize)
    882             self.assertEqual(bufio.read(n), b"x" * n)
    883             # Simple case: one raw read is enough to satisfy the request.

    884             self.assertEqual(rawio._extraneous_reads, 0,
    885                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
    886             # A more complex case where two raw reads are needed to satisfy

    887             # the request.

    888             rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
    889             bufio = self.tp(rawio, bufsize)
    890             self.assertEqual(bufio.read(n), b"x" * n)
    891             self.assertEqual(rawio._extraneous_reads, 0,
    892                              "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
    893 
    894 
    895 class CBufferedReaderTest(BufferedReaderTest):
    896     tp = io.BufferedReader
    897 
    898     def test_constructor(self):
    899         BufferedReaderTest.test_constructor(self)
    900         # The allocation can succeed on 32-bit builds, e.g. with more

    901         # than 2GB RAM and a 64-bit kernel.

    902         if sys.maxsize > 0x7FFFFFFF:
    903             rawio = self.MockRawIO()
    904             bufio = self.tp(rawio)
    905             self.assertRaises((OverflowError, MemoryError, ValueError),
    906                 bufio.__init__, rawio, sys.maxsize)
    907 
    908     def test_initialization(self):
    909         rawio = self.MockRawIO([b"abc"])
    910         bufio = self.tp(rawio)
    911         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
    912         self.assertRaises(ValueError, bufio.read)
    913         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
    914         self.assertRaises(ValueError, bufio.read)
    915         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
    916         self.assertRaises(ValueError, bufio.read)
    917 
    918     def test_misbehaved_io_read(self):
    919         rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
    920         bufio = self.tp(rawio)
    921         # _pyio.BufferedReader seems to implement reading different, so that

    922         # checking this is not so easy.

    923         self.assertRaises(IOError, bufio.read, 10)
    924 
    925     def test_garbage_collection(self):
    926         # C BufferedReader objects are collected.

    927         # The Python version has __del__, so it ends into gc.garbage instead

    928         rawio = self.FileIO(support.TESTFN, "w+b")
    929         f = self.tp(rawio)
    930         f.f = f
    931         wr = weakref.ref(f)
    932         del f
    933         support.gc_collect()
    934         self.assertTrue(wr() is None, wr)
    935 
    936 class PyBufferedReaderTest(BufferedReaderTest):
    937     tp = pyio.BufferedReader
    938 
    939 
    940 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
    941     write_mode = "wb"
    942 
    943     def test_constructor(self):
    944         rawio = self.MockRawIO()
    945         bufio = self.tp(rawio)
    946         bufio.__init__(rawio)
    947         bufio.__init__(rawio, buffer_size=1024)
    948         bufio.__init__(rawio, buffer_size=16)
    949         self.assertEqual(3, bufio.write(b"abc"))
    950         bufio.flush()
    951         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
    952         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
    953         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
    954         bufio.__init__(rawio)
    955         self.assertEqual(3, bufio.write(b"ghi"))
    956         bufio.flush()
    957         self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
    958 
    959     def test_detach_flush(self):
    960         raw = self.MockRawIO()
    961         buf = self.tp(raw)
    962         buf.write(b"howdy!")
    963         self.assertFalse(raw._write_stack)
    964         buf.detach()
    965         self.assertEqual(raw._write_stack, [b"howdy!"])
    966 
    967     def test_write(self):
    968         # Write to the buffered IO but don't overflow the buffer.

    969         writer = self.MockRawIO()
    970         bufio = self.tp(writer, 8)
    971         bufio.write(b"abc")
    972         self.assertFalse(writer._write_stack)
    973 
    974     def test_write_overflow(self):
    975         writer = self.MockRawIO()
    976         bufio = self.tp(writer, 8)
    977         contents = b"abcdefghijklmnop"
    978         for n in range(0, len(contents), 3):
    979             bufio.write(contents[n:n+3])
    980         flushed = b"".join(writer._write_stack)
    981         # At least (total - 8) bytes were implicitly flushed, perhaps more

    982         # depending on the implementation.

    983         self.assertTrue(flushed.startswith(contents[:-8]), flushed)
    984 
    985     def check_writes(self, intermediate_func):
    986         # Lots of writes, test the flushed output is as expected.

    987         contents = bytes(range(256)) * 1000
    988         n = 0
    989         writer = self.MockRawIO()
    990         bufio = self.tp(writer, 13)
    991         # Generator of write sizes: repeat each N 15 times then proceed to N+1

    992         def gen_sizes():
    993             for size in count(1):
    994                 for i in range(15):
    995                     yield size
    996         sizes = gen_sizes()
    997         while n < len(contents):
    998             size = min(next(sizes), len(contents) - n)
    999             self.assertEqual(bufio.write(contents[n:n+size]), size)
   1000             intermediate_func(bufio)
   1001             n += size
   1002         bufio.flush()
   1003         self.assertEqual(contents,
   1004             b"".join(writer._write_stack))
   1005 
   1006     def test_writes(self):
   1007         self.check_writes(lambda bufio: None)
   1008 
   1009     def test_writes_and_flushes(self):
   1010         self.check_writes(lambda bufio: bufio.flush())
   1011 
   1012     def test_writes_and_seeks(self):
   1013         def _seekabs(bufio):
   1014             pos = bufio.tell()
   1015             bufio.seek(pos + 1, 0)
   1016             bufio.seek(pos - 1, 0)
   1017             bufio.seek(pos, 0)
   1018         self.check_writes(_seekabs)
   1019         def _seekrel(bufio):
   1020             pos = bufio.seek(0, 1)
   1021             bufio.seek(+1, 1)
   1022             bufio.seek(-1, 1)
   1023             bufio.seek(pos, 0)
   1024         self.check_writes(_seekrel)
   1025 
   1026     def test_writes_and_truncates(self):
   1027         self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
   1028 
   1029     def test_write_non_blocking(self):
   1030         raw = self.MockNonBlockWriterIO()
   1031         bufio = self.tp(raw, 8)
   1032 
   1033         self.assertEqual(bufio.write(b"abcd"), 4)
   1034         self.assertEqual(bufio.write(b"efghi"), 5)
   1035         # 1 byte will be written, the rest will be buffered

   1036         raw.block_on(b"k")
   1037         self.assertEqual(bufio.write(b"jklmn"), 5)
   1038 
   1039         # 8 bytes will be written, 8 will be buffered and the rest will be lost

   1040         raw.block_on(b"0")
   1041         try:
   1042             bufio.write(b"opqrwxyz0123456789")
   1043         except self.BlockingIOError as e:
   1044             written = e.characters_written
   1045         else:
   1046             self.fail("BlockingIOError should have been raised")
   1047         self.assertEqual(written, 16)
   1048         self.assertEqual(raw.pop_written(),
   1049             b"abcdefghijklmnopqrwxyz")
   1050 
   1051         self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
   1052         s = raw.pop_written()
   1053         # Previously buffered bytes were flushed

   1054         self.assertTrue(s.startswith(b"01234567A"), s)
   1055 
   1056     def test_write_and_rewind(self):
   1057         raw = io.BytesIO()
   1058         bufio = self.tp(raw, 4)
   1059         self.assertEqual(bufio.write(b"abcdef"), 6)
   1060         self.assertEqual(bufio.tell(), 6)
   1061         bufio.seek(0, 0)
   1062         self.assertEqual(bufio.write(b"XY"), 2)
   1063         bufio.seek(6, 0)
   1064         self.assertEqual(raw.getvalue(), b"XYcdef")
   1065         self.assertEqual(bufio.write(b"123456"), 6)
   1066         bufio.flush()
   1067         self.assertEqual(raw.getvalue(), b"XYcdef123456")
   1068 
   1069     def test_flush(self):
   1070         writer = self.MockRawIO()
   1071         bufio = self.tp(writer, 8)
   1072         bufio.write(b"abc")
   1073         bufio.flush()
   1074         self.assertEqual(b"abc", writer._write_stack[0])
   1075 
   1076     def test_destructor(self):
   1077         writer = self.MockRawIO()
   1078         bufio = self.tp(writer, 8)
   1079         bufio.write(b"abc")
   1080         del bufio
   1081         support.gc_collect()
   1082         self.assertEqual(b"abc", writer._write_stack[0])
   1083 
   1084     def test_truncate(self):
   1085         # Truncate implicitly flushes the buffer.

   1086         with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
   1087             bufio = self.tp(raw, 8)
   1088             bufio.write(b"abcdef")
   1089             self.assertEqual(bufio.truncate(3), 3)
   1090             self.assertEqual(bufio.tell(), 6)
   1091         with self.open(support.TESTFN, "rb", buffering=0) as f:
   1092             self.assertEqual(f.read(), b"abc")
   1093 
   1094     @unittest.skipUnless(threading, 'Threading required for this test.')
   1095     @support.requires_resource('cpu')
   1096     def test_threads(self):
   1097         try:
   1098             # Write out many bytes from many threads and test they were

   1099             # all flushed.

   1100             N = 1000
   1101             contents = bytes(range(256)) * N
   1102             sizes = cycle([1, 19])
   1103             n = 0
   1104             queue = deque()
   1105             while n < len(contents):
   1106                 size = next(sizes)
   1107                 queue.append(contents[n:n+size])
   1108                 n += size
   1109             del contents
   1110             # We use a real file object because it allows us to

   1111             # exercise situations where the GIL is released before

   1112             # writing the buffer to the raw streams. This is in addition

   1113             # to concurrency issues due to switching threads in the middle

   1114             # of Python code.

   1115             with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
   1116                 bufio = self.tp(raw, 8)
   1117                 errors = []
   1118                 def f():
   1119                     try:
   1120                         while True:
   1121                             try:
   1122                                 s = queue.popleft()
   1123                             except IndexError:
   1124                                 return
   1125                             bufio.write(s)
   1126                     except Exception as e:
   1127                         errors.append(e)
   1128                         raise
   1129                 threads = [threading.Thread(target=f) for x in range(20)]
   1130                 for t in threads:
   1131                     t.start()
   1132                 time.sleep(0.02) # yield

   1133                 for t in threads:
   1134                     t.join()
   1135                 self.assertFalse(errors,
   1136                     "the following exceptions were caught: %r" % errors)
   1137                 bufio.close()
   1138             with self.open(support.TESTFN, "rb") as f:
   1139                 s = f.read()
   1140             for i in range(256):
   1141                 self.assertEqual(s.count(bytes([i])), N)
   1142         finally:
   1143             support.unlink(support.TESTFN)
   1144 
   1145     def test_misbehaved_io(self):
   1146         rawio = self.MisbehavedRawIO()
   1147         bufio = self.tp(rawio, 5)
   1148         self.assertRaises(IOError, bufio.seek, 0)
   1149         self.assertRaises(IOError, bufio.tell)
   1150         self.assertRaises(IOError, bufio.write, b"abcdef")
   1151 
   1152     def test_max_buffer_size_deprecation(self):
   1153         with support.check_warnings(("max_buffer_size is deprecated",
   1154                                      DeprecationWarning)):
   1155             self.tp(self.MockRawIO(), 8, 12)
   1156 
   1157 
   1158 class CBufferedWriterTest(BufferedWriterTest):
   1159     tp = io.BufferedWriter
   1160 
   1161     def test_constructor(self):
   1162         BufferedWriterTest.test_constructor(self)
   1163         # The allocation can succeed on 32-bit builds, e.g. with more

   1164         # than 2GB RAM and a 64-bit kernel.

   1165         if sys.maxsize > 0x7FFFFFFF:
   1166             rawio = self.MockRawIO()
   1167             bufio = self.tp(rawio)
   1168             self.assertRaises((OverflowError, MemoryError, ValueError),
   1169                 bufio.__init__, rawio, sys.maxsize)
   1170 
   1171     def test_initialization(self):
   1172         rawio = self.MockRawIO()
   1173         bufio = self.tp(rawio)
   1174         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
   1175         self.assertRaises(ValueError, bufio.write, b"def")
   1176         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
   1177         self.assertRaises(ValueError, bufio.write, b"def")
   1178         self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
   1179         self.assertRaises(ValueError, bufio.write, b"def")
   1180 
   1181     def test_garbage_collection(self):
   1182         # C BufferedWriter objects are collected, and collecting them flushes

   1183         # all data to disk.

   1184         # The Python version has __del__, so it ends into gc.garbage instead

   1185         rawio = self.FileIO(support.TESTFN, "w+b")
   1186         f = self.tp(rawio)
   1187         f.write(b"123xxx")
   1188         f.x = f
   1189         wr = weakref.ref(f)
   1190         del f
   1191         support.gc_collect()
   1192         self.assertTrue(wr() is None, wr)
   1193         with self.open(support.TESTFN, "rb") as f:
   1194             self.assertEqual(f.read(), b"123xxx")
   1195 
   1196 
   1197 class PyBufferedWriterTest(BufferedWriterTest):
   1198     tp = pyio.BufferedWriter
   1199 
   1200 class BufferedRWPairTest(unittest.TestCase):
   1201 
   1202     def test_constructor(self):
   1203         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1204         self.assertFalse(pair.closed)
   1205 
   1206     def test_detach(self):
   1207         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1208         self.assertRaises(self.UnsupportedOperation, pair.detach)
   1209 
   1210     def test_constructor_max_buffer_size_deprecation(self):
   1211         with support.check_warnings(("max_buffer_size is deprecated",
   1212                                      DeprecationWarning)):
   1213             self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
   1214 
   1215     def test_constructor_with_not_readable(self):
   1216         class NotReadable(MockRawIO):
   1217             def readable(self):
   1218                 return False
   1219 
   1220         self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
   1221 
   1222     def test_constructor_with_not_writeable(self):
   1223         class NotWriteable(MockRawIO):
   1224             def writable(self):
   1225                 return False
   1226 
   1227         self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
   1228 
   1229     def test_read(self):
   1230         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1231 
   1232         self.assertEqual(pair.read(3), b"abc")
   1233         self.assertEqual(pair.read(1), b"d")
   1234         self.assertEqual(pair.read(), b"ef")
   1235         pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
   1236         self.assertEqual(pair.read(None), b"abc")
   1237 
   1238     def test_readlines(self):
   1239         pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
   1240         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
   1241         self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
   1242         self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
   1243 
   1244     def test_read1(self):
   1245         # .read1() is delegated to the underlying reader object, so this test

   1246         # can be shallow.

   1247         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1248 
   1249         self.assertEqual(pair.read1(3), b"abc")
   1250 
   1251     def test_readinto(self):
   1252         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1253 
   1254         data = bytearray(5)
   1255         self.assertEqual(pair.readinto(data), 5)
   1256         self.assertEqual(data, b"abcde")
   1257 
   1258     def test_write(self):
   1259         w = self.MockRawIO()
   1260         pair = self.tp(self.MockRawIO(), w)
   1261 
   1262         pair.write(b"abc")
   1263         pair.flush()
   1264         pair.write(b"def")
   1265         pair.flush()
   1266         self.assertEqual(w._write_stack, [b"abc", b"def"])
   1267 
   1268     def test_peek(self):
   1269         pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
   1270 
   1271         self.assertTrue(pair.peek(3).startswith(b"abc"))
   1272         self.assertEqual(pair.read(3), b"abc")
   1273 
   1274     def test_readable(self):
   1275         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1276         self.assertTrue(pair.readable())
   1277 
   1278     def test_writeable(self):
   1279         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1280         self.assertTrue(pair.writable())
   1281 
   1282     def test_seekable(self):
   1283         # BufferedRWPairs are never seekable, even if their readers and writers

   1284         # are.

   1285         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1286         self.assertFalse(pair.seekable())
   1287 
   1288     # .flush() is delegated to the underlying writer object and has been

   1289     # tested in the test_write method.

   1290 
   1291     def test_close_and_closed(self):
   1292         pair = self.tp(self.MockRawIO(), self.MockRawIO())
   1293         self.assertFalse(pair.closed)
   1294         pair.close()
   1295         self.assertTrue(pair.closed)
   1296 
   1297     def test_isatty(self):
   1298         class SelectableIsAtty(MockRawIO):
   1299             def __init__(self, isatty):
   1300                 MockRawIO.__init__(self)
   1301                 self._isatty = isatty
   1302 
   1303             def isatty(self):
   1304                 return self._isatty
   1305 
   1306         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
   1307         self.assertFalse(pair.isatty())
   1308 
   1309         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
   1310         self.assertTrue(pair.isatty())
   1311 
   1312         pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
   1313         self.assertTrue(pair.isatty())
   1314 
   1315         pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
   1316         self.assertTrue(pair.isatty())
   1317 
   1318 class CBufferedRWPairTest(BufferedRWPairTest):
   1319     tp = io.BufferedRWPair
   1320 
   1321 class PyBufferedRWPairTest(BufferedRWPairTest):
   1322     tp = pyio.BufferedRWPair
   1323 
   1324 
   1325 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
   1326     read_mode = "rb+"
   1327     write_mode = "wb+"
   1328 
   1329     def test_constructor(self):
   1330         BufferedReaderTest.test_constructor(self)
   1331         BufferedWriterTest.test_constructor(self)
   1332 
   1333     def test_read_and_write(self):
   1334         raw = self.MockRawIO((b"asdf", b"ghjk"))
   1335         rw = self.tp(raw, 8)
   1336 
   1337         self.assertEqual(b"as", rw.read(2))
   1338         rw.write(b"ddd")
   1339         rw.write(b"eee")
   1340         self.assertFalse(raw._write_stack) # Buffer writes

   1341         self.assertEqual(b"ghjk", rw.read())
   1342         self.assertEqual(b"dddeee", raw._write_stack[0])
   1343 
   1344     def test_seek_and_tell(self):
   1345         raw = self.BytesIO(b"asdfghjkl")
   1346         rw = self.tp(raw)
   1347 
   1348         self.assertEqual(b"as", rw.read(2))
   1349         self.assertEqual(2, rw.tell())
   1350         rw.seek(0, 0)
   1351         self.assertEqual(b"asdf", rw.read(4))
   1352 
   1353         rw.write(b"asdf")
   1354         rw.seek(0, 0)
   1355         self.assertEqual(b"asdfasdfl", rw.read())
   1356         self.assertEqual(9, rw.tell())
   1357         rw.seek(-4, 2)
   1358         self.assertEqual(5, rw.tell())
   1359         rw.seek(2, 1)
   1360         self.assertEqual(7, rw.tell())
   1361         self.assertEqual(b"fl", rw.read(11))
   1362         self.assertRaises(TypeError, rw.seek, 0.0)
   1363 
   1364     def check_flush_and_read(self, read_func):
   1365         raw = self.BytesIO(b"abcdefghi")
   1366         bufio = self.tp(raw)
   1367 
   1368         self.assertEqual(b"ab", read_func(bufio, 2))
   1369         bufio.write(b"12")
   1370         self.assertEqual(b"ef", read_func(bufio, 2))
   1371         self.assertEqual(6, bufio.tell())
   1372         bufio.flush()
   1373         self.assertEqual(6, bufio.tell())
   1374         self.assertEqual(b"ghi", read_func(bufio))
   1375         raw.seek(0, 0)
   1376         raw.write(b"XYZ")
   1377         # flush() resets the read buffer

   1378         bufio.flush()
   1379         bufio.seek(0, 0)
   1380         self.assertEqual(b"XYZ", read_func(bufio, 3))
   1381 
   1382     def test_flush_and_read(self):
   1383         self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
   1384 
   1385     def test_flush_and_readinto(self):
   1386         def _readinto(bufio, n=-1):
   1387             b = bytearray(n if n >= 0 else 9999)
   1388             n = bufio.readinto(b)
   1389             return bytes(b[:n])
   1390         self.check_flush_and_read(_readinto)
   1391 
   1392     def test_flush_and_peek(self):
   1393         def _peek(bufio, n=-1):
   1394             # This relies on the fact that the buffer can contain the whole

   1395             # raw stream, otherwise peek() can return less.

   1396             b = bufio.peek(n)
   1397             if n != -1:
   1398                 b = b[:n]
   1399             bufio.seek(len(b), 1)
   1400             return b
   1401         self.check_flush_and_read(_peek)
   1402 
   1403     def test_flush_and_write(self):
   1404         raw = self.BytesIO(b"abcdefghi")
   1405         bufio = self.tp(raw)
   1406 
   1407         bufio.write(b"123")
   1408         bufio.flush()
   1409         bufio.write(b"45")
   1410         bufio.flush()
   1411         bufio.seek(0, 0)
   1412         self.assertEqual(b"12345fghi", raw.getvalue())
   1413         self.assertEqual(b"12345fghi", bufio.read())
   1414 
   1415     def test_threads(self):
   1416         BufferedReaderTest.test_threads(self)
   1417         BufferedWriterTest.test_threads(self)
   1418 
   1419     def test_writes_and_peek(self):
   1420         def _peek(bufio):
   1421             bufio.peek(1)
   1422         self.check_writes(_peek)
   1423         def _peek(bufio):
   1424             pos = bufio.tell()
   1425             bufio.seek(-1, 1)
   1426             bufio.peek(1)
   1427             bufio.seek(pos, 0)
   1428         self.check_writes(_peek)
   1429 
   1430     def test_writes_and_reads(self):
   1431         def _read(bufio):
   1432             bufio.seek(-1, 1)
   1433             bufio.read(1)
   1434         self.check_writes(_read)
   1435 
   1436     def test_writes_and_read1s(self):
   1437         def _read1(bufio):
   1438             bufio.seek(-1, 1)
   1439             bufio.read1(1)
   1440         self.check_writes(_read1)
   1441 
   1442     def test_writes_and_readintos(self):
   1443         def _read(bufio):
   1444             bufio.seek(-1, 1)
   1445             bufio.readinto(bytearray(1))
   1446         self.check_writes(_read)
   1447 
   1448     def test_write_after_readahead(self):
   1449         # Issue #6629: writing after the buffer was filled by readahead should

   1450         # first rewind the raw stream.

   1451         for overwrite_size in [1, 5]:
   1452             raw = self.BytesIO(b"A" * 10)
   1453             bufio = self.tp(raw, 4)
   1454             # Trigger readahead

   1455             self.assertEqual(bufio.read(1), b"A")
   1456             self.assertEqual(bufio.tell(), 1)
   1457             # Overwriting should rewind the raw stream if it needs so

   1458             bufio.write(b"B" * overwrite_size)
   1459             self.assertEqual(bufio.tell(), overwrite_size + 1)
   1460             # If the write size was smaller than the buffer size, flush() and

   1461             # check that rewind happens.

   1462             bufio.flush()
   1463             self.assertEqual(bufio.tell(), overwrite_size + 1)
   1464             s = raw.getvalue()
   1465             self.assertEqual(s,
   1466                 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
   1467 
   1468     def test_write_rewind_write(self):
   1469         # Various combinations of reading / writing / seeking backwards / writing again

   1470         def mutate(bufio, pos1, pos2):
   1471             assert pos2 >= pos1
   1472             # Fill the buffer

   1473             bufio.seek(pos1)
   1474             bufio.read(pos2 - pos1)
   1475             bufio.write(b'\x02')
   1476             # This writes earlier than the previous write, but still inside

   1477             # the buffer.

   1478             bufio.seek(pos1)
   1479             bufio.write(b'\x01')
   1480 
   1481         b = b"\x80\x81\x82\x83\x84"
   1482         for i in range(0, len(b)):
   1483             for j in range(i, len(b)):
   1484                 raw = self.BytesIO(b)
   1485                 bufio = self.tp(raw, 100)
   1486                 mutate(bufio, i, j)
   1487                 bufio.flush()
   1488                 expected = bytearray(b)
   1489                 expected[j] = 2
   1490                 expected[i] = 1
   1491                 self.assertEqual(raw.getvalue(), expected,
   1492                                  "failed result for i=%d, j=%d" % (i, j))
   1493 
   1494     def test_truncate_after_read_or_write(self):
   1495         raw = self.BytesIO(b"A" * 10)
   1496         bufio = self.tp(raw, 100)
   1497         self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled

   1498         self.assertEqual(bufio.truncate(), 2)
   1499         self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases

   1500         self.assertEqual(bufio.truncate(), 4)
   1501 
   1502     def test_misbehaved_io(self):
   1503         BufferedReaderTest.test_misbehaved_io(self)
   1504         BufferedWriterTest.test_misbehaved_io(self)
   1505 
   1506 class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
   1507     tp = io.BufferedRandom
   1508 
   1509     def test_constructor(self):
   1510         BufferedRandomTest.test_constructor(self)
   1511         # The allocation can succeed on 32-bit builds, e.g. with more

   1512         # than 2GB RAM and a 64-bit kernel.

   1513         if sys.maxsize > 0x7FFFFFFF:
   1514             rawio = self.MockRawIO()
   1515             bufio = self.tp(rawio)
   1516             self.assertRaises((OverflowError, MemoryError, ValueError),
   1517                 bufio.__init__, rawio, sys.maxsize)
   1518 
   1519     def test_garbage_collection(self):
   1520         CBufferedReaderTest.test_garbage_collection(self)
   1521         CBufferedWriterTest.test_garbage_collection(self)
   1522 
   1523 class PyBufferedRandomTest(BufferedRandomTest):
   1524     tp = pyio.BufferedRandom
   1525 
   1526 
   1527 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these

   1528 # properties:

   1529 #   - A single output character can correspond to many bytes of input.

   1530 #   - The number of input bytes to complete the character can be

   1531 #     undetermined until the last input byte is received.

   1532 #   - The number of input bytes can vary depending on previous input.

   1533 #   - A single input byte can correspond to many characters of output.

   1534 #   - The number of output characters can be undetermined until the

   1535 #     last input byte is received.

   1536 #   - The number of output characters can vary depending on previous input.

   1537 
   1538 class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
   1539     """
   1540     For testing seek/tell behavior with a stateful, buffering decoder.
   1541 
   1542     Input is a sequence of words.  Words may be fixed-length (length set
   1543     by input) or variable-length (period-terminated).  In variable-length
   1544     mode, extra periods are ignored.  Possible words are:
   1545       - 'i' followed by a number sets the input length, I (maximum 99).
   1546         When I is set to 0, words are space-terminated.
   1547       - 'o' followed by a number sets the output length, O (maximum 99).
   1548       - Any other word is converted into a word followed by a period on
   1549         the output.  The output word consists of the input word truncated
   1550         or padded out with hyphens to make its length equal to O.  If O
   1551         is 0, the word is output verbatim without truncating or padding.
   1552     I and O are initially set to 1.  When I changes, any buffered input is
   1553     re-scanned according to the new I.  EOF also terminates the last word.
   1554     """
   1555 
   1556     def __init__(self, errors='strict'):
   1557         codecs.IncrementalDecoder.__init__(self, errors)
   1558         self.reset()
   1559 
   1560     def __repr__(self):
   1561         return '<SID %x>' % id(self)
   1562 
   1563     def reset(self):
   1564         self.i = 1
   1565         self.o = 1
   1566         self.buffer = bytearray()
   1567 
   1568     def getstate(self):
   1569         i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()

   1570         return bytes(self.buffer), i*100 + o
   1571 
   1572     def setstate(self, state):
   1573         buffer, io = state
   1574         self.buffer = bytearray(buffer)
   1575         i, o = divmod(io, 100)
   1576         self.i, self.o = i ^ 1, o ^ 1
   1577 
   1578     def decode(self, input, final=False):
   1579         output = ''
   1580         for b in input:
   1581             if self.i == 0: # variable-length, terminated with period

   1582                 if b == '.':
   1583                     if self.buffer:
   1584                         output += self.process_word()
   1585                 else:
   1586                     self.buffer.append(b)
   1587             else: # fixed-length, terminate after self.i bytes

   1588                 self.buffer.append(b)
   1589                 if len(self.buffer) == self.i:
   1590                     output += self.process_word()
   1591         if final and self.buffer: # EOF terminates the last word

   1592             output += self.process_word()
   1593         return output
   1594 
   1595     def process_word(self):
   1596         output = ''
   1597         if self.buffer[0] == ord('i'):
   1598             self.i = min(99, int(self.buffer[1:] or 0)) # set input length

   1599         elif self.buffer[0] == ord('o'):
   1600             self.o = min(99, int(self.buffer[1:] or 0)) # set output length

   1601         else:
   1602             output = self.buffer.decode('ascii')
   1603             if len(output) < self.o:
   1604                 output += '-'*self.o # pad out with hyphens

   1605             if self.o:
   1606                 output = output[:self.o] # truncate to output length

   1607             output += '.'
   1608         self.buffer = bytearray()
   1609         return output
   1610 
   1611     codecEnabled = False
   1612 
   1613     @classmethod
   1614     def lookupTestDecoder(cls, name):
   1615         if cls.codecEnabled and name == 'test_decoder':
   1616             latin1 = codecs.lookup('latin-1')
   1617             return codecs.CodecInfo(
   1618                 name='test_decoder', encode=latin1.encode, decode=None,
   1619                 incrementalencoder=None,
   1620                 streamreader=None, streamwriter=None,
   1621                 incrementaldecoder=cls)
   1622 
   1623 # Register the previous decoder for testing.

   1624 # Disabled by default, tests will enable it.

   1625 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
   1626 
   1627 
   1628 class StatefulIncrementalDecoderTest(unittest.TestCase):
   1629     """
   1630     Make sure the StatefulIncrementalDecoder actually works.
   1631     """
   1632 
   1633     test_cases = [
   1634         # I=1, O=1 (fixed-length input == fixed-length output)

   1635         (b'abcd', False, 'a.b.c.d.'),
   1636         # I=0, O=0 (variable-length input, variable-length output)

   1637         (b'oiabcd', True, 'abcd.'),
   1638         # I=0, O=0 (should ignore extra periods)

   1639         (b'oi...abcd...', True, 'abcd.'),
   1640         # I=0, O=6 (variable-length input, fixed-length output)

   1641         (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
   1642         # I=2, O=6 (fixed-length input < fixed-length output)

   1643         (b'i.i2.o6xyz', True, 'xy----.z-----.'),
   1644         # I=6, O=3 (fixed-length input > fixed-length output)

   1645         (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
   1646         # I=0, then 3; O=29, then 15 (with longer output)

   1647         (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
   1648          'a----------------------------.' +
   1649          'b----------------------------.' +
   1650          'cde--------------------------.' +
   1651          'abcdefghijabcde.' +
   1652          'a.b------------.' +
   1653          '.c.------------.' +
   1654          'd.e------------.' +
   1655          'k--------------.' +
   1656          'l--------------.' +
   1657          'm--------------.')
   1658     ]
   1659 
   1660     def test_decoder(self):
   1661         # Try a few one-shot test cases.

   1662         for input, eof, output in self.test_cases:
   1663             d = StatefulIncrementalDecoder()
   1664             self.assertEqual(d.decode(input, eof), output)
   1665 
   1666         # Also test an unfinished decode, followed by forcing EOF.

   1667         d = StatefulIncrementalDecoder()
   1668         self.assertEqual(d.decode(b'oiabcd'), '')
   1669         self.assertEqual(d.decode(b'', 1), 'abcd.')
   1670 
   1671 class TextIOWrapperTest(unittest.TestCase):
   1672 
   1673     def setUp(self):
   1674         self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
   1675         self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
   1676         support.unlink(support.TESTFN)
   1677 
   1678     def tearDown(self):
   1679         support.unlink(support.TESTFN)
   1680 
   1681     def test_constructor(self):
   1682         r = self.BytesIO(b"\xc3\xa9\n\n")
   1683         b = self.BufferedReader(r, 1000)
   1684         t = self.TextIOWrapper(b)
   1685         t.__init__(b, encoding="latin1", newline="\r\n")
   1686         self.assertEqual(t.encoding, "latin1")
   1687         self.assertEqual(t.line_buffering, False)
   1688         t.__init__(b, encoding="utf8", line_buffering=True)
   1689         self.assertEqual(t.encoding, "utf8")
   1690         self.assertEqual(t.line_buffering, True)
   1691         self.assertEqual("\xe9\n", t.readline())
   1692         self.assertRaises(TypeError, t.__init__, b, newline=42)
   1693         self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
   1694 
   1695     def test_detach(self):
   1696         r = self.BytesIO()
   1697         b = self.BufferedWriter(r)
   1698         t = self.TextIOWrapper(b)
   1699         self.assertIs(t.detach(), b)
   1700 
   1701         t = self.TextIOWrapper(b, encoding="ascii")
   1702         t.write("howdy")
   1703         self.assertFalse(r.getvalue())
   1704         t.detach()
   1705         self.assertEqual(r.getvalue(), b"howdy")
   1706         self.assertRaises(ValueError, t.detach)
   1707 
   1708     def test_repr(self):
   1709         raw = self.BytesIO("hello".encode("utf-8"))
   1710         b = self.BufferedReader(raw)
   1711         t = self.TextIOWrapper(b, encoding="utf-8")
   1712         modname = self.TextIOWrapper.__module__
   1713         self.assertEqual(repr(t),
   1714                          "<%s.TextIOWrapper encoding='utf-8'>" % modname)
   1715         raw.name = "dummy"
   1716         self.assertEqual(repr(t),
   1717                          "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
   1718         raw.name = b"dummy"
   1719         self.assertEqual(repr(t),
   1720                          "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
   1721 
   1722     def test_line_buffering(self):
   1723         r = self.BytesIO()
   1724         b = self.BufferedWriter(r, 1000)
   1725         t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
   1726         t.write("X")
   1727         self.assertEqual(r.getvalue(), b"")  # No flush happened

   1728         t.write("Y\nZ")
   1729         self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed

   1730         t.write("A\rB")
   1731         self.assertEqual(r.getvalue(), b"XY\nZA\rB")
   1732 
   1733     def test_encoding(self):
   1734         # Check the encoding attribute is always set, and valid

   1735         b = self.BytesIO()
   1736         t = self.TextIOWrapper(b, encoding="utf8")
   1737         self.assertEqual(t.encoding, "utf8")
   1738         t = self.TextIOWrapper(b)
   1739         self.assertTrue(t.encoding is not None)
   1740         codecs.lookup(t.encoding)
   1741 
   1742     def test_encoding_errors_reading(self):
   1743         # (1) default

   1744         b = self.BytesIO(b"abc\n\xff\n")
   1745         t = self.TextIOWrapper(b, encoding="ascii")
   1746         self.assertRaises(UnicodeError, t.read)
   1747         # (2) explicit strict

   1748         b = self.BytesIO(b"abc\n\xff\n")
   1749         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
   1750         self.assertRaises(UnicodeError, t.read)
   1751         # (3) ignore

   1752         b = self.BytesIO(b"abc\n\xff\n")
   1753         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
   1754         self.assertEqual(t.read(), "abc\n\n")
   1755         # (4) replace

   1756         b = self.BytesIO(b"abc\n\xff\n")
   1757         t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
   1758         self.assertEqual(t.read(), "abc\n\ufffd\n")
   1759 
   1760     def test_encoding_errors_writing(self):
   1761         # (1) default

   1762         b = self.BytesIO()
   1763         t = self.TextIOWrapper(b, encoding="ascii")
   1764         self.assertRaises(UnicodeError, t.write, "\xff")
   1765         # (2) explicit strict

   1766         b = self.BytesIO()
   1767         t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
   1768         self.assertRaises(UnicodeError, t.write, "\xff")
   1769         # (3) ignore

   1770         b = self.BytesIO()
   1771         t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
   1772                              newline="\n")
   1773         t.write("abc\xffdef\n")
   1774         t.flush()
   1775         self.assertEqual(b.getvalue(), b"abcdef\n")
   1776         # (4) replace

   1777         b = self.BytesIO()
   1778         t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
   1779                              newline="\n")
   1780         t.write("abc\xffdef\n")
   1781         t.flush()
   1782         self.assertEqual(b.getvalue(), b"abc?def\n")
   1783 
   1784     def test_newlines(self):
   1785         input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
   1786 
   1787         tests = [
   1788             [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
   1789             [ '', input_lines ],
   1790             [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
   1791             [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
   1792             [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
   1793         ]
   1794         encodings = (
   1795             'utf-8', 'latin-1',
   1796             'utf-16', 'utf-16-le', 'utf-16-be',
   1797             'utf-32', 'utf-32-le', 'utf-32-be',
   1798         )
   1799 
   1800         # Try a range of buffer sizes to test the case where \r is the last

   1801         # character in TextIOWrapper._pending_line.

   1802         for encoding in encodings:
   1803             # XXX: str.encode() should return bytes

   1804             data = bytes(''.join(input_lines).encode(encoding))
   1805             for do_reads in (False, True):
   1806                 for bufsize in range(1, 10):
   1807                     for newline, exp_lines in tests:
   1808                         bufio = self.BufferedReader(self.BytesIO(data), bufsize)
   1809                         textio = self.TextIOWrapper(bufio, newline=newline,
   1810                                                   encoding=encoding)
   1811                         if do_reads:
   1812                             got_lines = []
   1813                             while True:
   1814                                 c2 = textio.read(2)
   1815                                 if c2 == '':
   1816                                     break
   1817                                 self.assertEqual(len(c2), 2)
   1818                                 got_lines.append(c2 + textio.readline())
   1819                         else:
   1820                             got_lines = list(textio)
   1821 
   1822                         for got_line, exp_line in zip(got_lines, exp_lines):
   1823                             self.assertEqual(got_line, exp_line)
   1824                         self.assertEqual(len(got_lines), len(exp_lines))
   1825 
   1826     def test_newlines_input(self):
   1827         testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
   1828         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
   1829         for newline, expected in [
   1830             (None, normalized.decode("ascii").splitlines(True)),
   1831             ("", testdata.decode("ascii").splitlines(True)),
   1832             ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
   1833             ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
   1834             ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
   1835             ]:
   1836             buf = self.BytesIO(testdata)
   1837             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
   1838             self.assertEqual(txt.readlines(), expected)
   1839             txt.seek(0)
   1840             self.assertEqual(txt.read(), "".join(expected))
   1841 
   1842     def test_newlines_output(self):
   1843         testdict = {
   1844             "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
   1845             "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
   1846             "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
   1847             "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
   1848             }
   1849         tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
   1850         for newline, expected in tests:
   1851             buf = self.BytesIO()
   1852             txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
   1853             txt.write("AAA\nB")
   1854             txt.write("BB\nCCC\n")
   1855             txt.write("X\rY\r\nZ")
   1856             txt.flush()
   1857             self.assertEqual(buf.closed, False)
   1858             self.assertEqual(buf.getvalue(), expected)
   1859 
   1860     def test_destructor(self):
   1861         l = []
   1862         base = self.BytesIO
   1863         class MyBytesIO(base):
   1864             def close(self):
   1865                 l.append(self.getvalue())
   1866                 base.close(self)
   1867         b = MyBytesIO()
   1868         t = self.TextIOWrapper(b, encoding="ascii")
   1869         t.write("abc")
   1870         del t
   1871         support.gc_collect()
   1872         self.assertEqual([b"abc"], l)
   1873 
   1874     def test_override_destructor(self):
   1875         record = []
   1876         class MyTextIO(self.TextIOWrapper):
   1877             def __del__(self):
   1878                 record.append(1)
   1879                 try:
   1880                     f = super(MyTextIO, self).__del__
   1881                 except AttributeError:
   1882                     pass
   1883                 else:
   1884                     f()
   1885             def close(self):
   1886                 record.append(2)
   1887                 super(MyTextIO, self).close()
   1888             def flush(self):
   1889                 record.append(3)
   1890                 super(MyTextIO, self).flush()
   1891         b = self.BytesIO()
   1892         t = MyTextIO(b, encoding="ascii")
   1893         del t
   1894         support.gc_collect()
   1895         self.assertEqual(record, [1, 2, 3])
   1896 
   1897     def test_error_through_destructor(self):
   1898         # Test that the exception state is not modified by a destructor,

   1899         # even if close() fails.

   1900         rawio = self.CloseFailureIO()
   1901         def f():
   1902             self.TextIOWrapper(rawio).xyzzy
   1903         with support.captured_output("stderr") as s:
   1904             self.assertRaises(AttributeError, f)
   1905         s = s.getvalue().strip()
   1906         if s:
   1907             # The destructor *may* have printed an unraisable error, check it

   1908             self.assertEqual(len(s.splitlines()), 1)
   1909             self.assertTrue(s.startswith("Exception IOError: "), s)
   1910             self.assertTrue(s.endswith(" ignored"), s)
   1911 
   1912     # Systematic tests of the text I/O API

   1913 
   1914     def test_basic_io(self):
   1915         for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
   1916             for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":

   1917                 f = self.open(support.TESTFN, "w+", encoding=enc)
   1918                 f._CHUNK_SIZE = chunksize
   1919                 self.assertEqual(f.write("abc"), 3)
   1920                 f.close()
   1921                 f = self.open(support.TESTFN, "r+", encoding=enc)
   1922                 f._CHUNK_SIZE = chunksize
   1923                 self.assertEqual(f.tell(), 0)
   1924                 self.assertEqual(f.read(), "abc")
   1925                 cookie = f.tell()
   1926                 self.assertEqual(f.seek(0), 0)
   1927                 self.assertEqual(f.read(None), "abc")
   1928                 f.seek(0)
   1929                 self.assertEqual(f.read(2), "ab")
   1930                 self.assertEqual(f.read(1), "c")
   1931                 self.assertEqual(f.read(1), "")
   1932                 self.assertEqual(f.read(), "")
   1933                 self.assertEqual(f.tell(), cookie)
   1934                 self.assertEqual(f.seek(0), 0)
   1935                 self.assertEqual(f.seek(0, 2), cookie)
   1936                 self.assertEqual(f.write("def"), 3)
   1937                 self.assertEqual(f.seek(cookie), cookie)
   1938                 self.assertEqual(f.read(), "def")
   1939                 if enc.startswith("utf"):
   1940                     self.multi_line_test(f, enc)
   1941                 f.close()
   1942 
   1943     def multi_line_test(self, f, enc):
   1944         f.seek(0)
   1945         f.truncate()
   1946         sample = "s\xff\u0fff\uffff"
   1947         wlines = []
   1948         for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
   1949             chars = []
   1950             for i in range(size):
   1951                 chars.append(sample[i % len(sample)])
   1952             line = "".join(chars) + "\n"
   1953             wlines.append((f.tell(), line))
   1954             f.write(line)
   1955         f.seek(0)
   1956         rlines = []
   1957         while True:
   1958             pos = f.tell()
   1959             line = f.readline()
   1960             if not line:
   1961                 break
   1962             rlines.append((pos, line))
   1963         self.assertEqual(rlines, wlines)
   1964 
   1965     def test_telling(self):
   1966         f = self.open(support.TESTFN, "w+", encoding="utf8")
   1967         p0 = f.tell()
   1968         f.write("\xff\n")
   1969         p1 = f.tell()
   1970         f.write("\xff\n")
   1971         p2 = f.tell()
   1972         f.seek(0)
   1973         self.assertEqual(f.tell(), p0)
   1974         self.assertEqual(f.readline(), "\xff\n")
   1975         self.assertEqual(f.tell(), p1)
   1976         self.assertEqual(f.readline(), "\xff\n")
   1977         self.assertEqual(f.tell(), p2)
   1978         f.seek(0)
   1979         for line in f:
   1980             self.assertEqual(line, "\xff\n")
   1981             self.assertRaises(IOError, f.tell)
   1982         self.assertEqual(f.tell(), p2)
   1983         f.close()
   1984 
   1985     def test_seeking(self):
   1986         chunk_size = _default_chunk_size()
   1987         prefix_size = chunk_size - 2
   1988         u_prefix = "a" * prefix_size
   1989         prefix = bytes(u_prefix.encode("utf-8"))
   1990         self.assertEqual(len(u_prefix), len(prefix))
   1991         u_suffix = "\u8888\n"
   1992         suffix = bytes(u_suffix.encode("utf-8"))
   1993         line = prefix + suffix
   1994         f = self.open(support.TESTFN, "wb")
   1995         f.write(line*2)
   1996         f.close()
   1997         f = self.open(support.TESTFN, "r", encoding="utf-8")
   1998         s = f.read(prefix_size)
   1999         self.assertEqual(s, prefix.decode("ascii"))
   2000         self.assertEqual(f.tell(), prefix_size)
   2001         self.assertEqual(f.readline(), u_suffix)
   2002 
   2003     def test_seeking_too(self):
   2004         # Regression test for a specific bug

   2005         data = b'\xe0\xbf\xbf\n'
   2006         f = self.open(support.TESTFN, "wb")
   2007         f.write(data)
   2008         f.close()
   2009         f = self.open(support.TESTFN, "r", encoding="utf-8")
   2010         f._CHUNK_SIZE  # Just test that it exists

   2011         f._CHUNK_SIZE = 2
   2012         f.readline()
   2013         f.tell()
   2014 
   2015     def test_seek_and_tell(self):
   2016         #Test seek/tell using the StatefulIncrementalDecoder.

   2017         # Make test faster by doing smaller seeks

   2018         CHUNK_SIZE = 128
   2019 
   2020         def test_seek_and_tell_with_data(data, min_pos=0):
   2021             """Tell/seek to various points within a data stream and ensure
   2022             that the decoded data returned by read() is consistent."""
   2023             f = self.open(support.TESTFN, 'wb')
   2024             f.write(data)
   2025             f.close()
   2026             f = self.open(support.TESTFN, encoding='test_decoder')
   2027             f._CHUNK_SIZE = CHUNK_SIZE
   2028             decoded = f.read()
   2029             f.close()
   2030 
   2031             for i in range(min_pos, len(decoded) + 1): # seek positions

   2032                 for j in [1, 5, len(decoded) - i]: # read lengths

   2033                     f = self.open(support.TESTFN, encoding='test_decoder')
   2034                     self.assertEqual(f.read(i), decoded[:i])
   2035                     cookie = f.tell()
   2036                     self.assertEqual(f.read(j), decoded[i:i + j])
   2037                     f.seek(cookie)
   2038                     self.assertEqual(f.read(), decoded[i:])
   2039                     f.close()
   2040 
   2041         # Enable the test decoder.

   2042         StatefulIncrementalDecoder.codecEnabled = 1
   2043 
   2044         # Run the tests.

   2045         try:
   2046             # Try each test case.

   2047             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
   2048                 test_seek_and_tell_with_data(input)
   2049 
   2050             # Position each test case so that it crosses a chunk boundary.

   2051             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
   2052                 offset = CHUNK_SIZE - len(input)//2
   2053                 prefix = b'.'*offset
   2054                 # Don't bother seeking into the prefix (takes too long).

   2055                 min_pos = offset*2
   2056                 test_seek_and_tell_with_data(prefix + input, min_pos)
   2057 
   2058         # Ensure our test decoder won't interfere with subsequent tests.

   2059         finally:
   2060             StatefulIncrementalDecoder.codecEnabled = 0
   2061 
   2062     def test_encoded_writes(self):
   2063         data = "1234567890"
   2064         tests = ("utf-16",
   2065                  "utf-16-le",
   2066                  "utf-16-be",
   2067                  "utf-32",
   2068                  "utf-32-le",
   2069                  "utf-32-be")
   2070         for encoding in tests:
   2071             buf = self.BytesIO()
   2072             f = self.TextIOWrapper(buf, encoding=encoding)
   2073             # Check if the BOM is written only once (see issue1753).

   2074             f.write(data)
   2075             f.write(data)
   2076             f.seek(0)
   2077             self.assertEqual(f.read(), data * 2)
   2078             f.seek(0)
   2079             self.assertEqual(f.read(), data * 2)
   2080             self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
   2081 
   2082     def test_unreadable(self):
   2083         class UnReadable(self.BytesIO):
   2084             def readable(self):
   2085                 return False
   2086         txt = self.TextIOWrapper(UnReadable())
   2087         self.assertRaises(IOError, txt.read)
   2088 
   2089     def test_read_one_by_one(self):
   2090         txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
   2091         reads = ""
   2092         while True:
   2093             c = txt.read(1)
   2094             if not c:
   2095                 break
   2096             reads += c
   2097         self.assertEqual(reads, "AA\nBB")
   2098 
   2099     def test_readlines(self):
   2100         txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
   2101         self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
   2102         txt.seek(0)
   2103         self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
   2104         txt.seek(0)
   2105         self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
   2106 
   2107     # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.

   2108     def test_read_by_chunk(self):
   2109         # make sure "\r\n" straddles 128 char boundary.

   2110         txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
   2111         reads = ""
   2112         while True:
   2113             c = txt.read(128)
   2114             if not c:
   2115                 break
   2116             reads += c
   2117         self.assertEqual(reads, "A"*127+"\nB")
   2118 
   2119     def test_issue1395_1(self):
   2120         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2121 
   2122         # read one char at a time

   2123         reads = ""
   2124         while True:
   2125             c = txt.read(1)
   2126             if not c:
   2127                 break
   2128             reads += c
   2129         self.assertEqual(reads, self.normalized)
   2130 
   2131     def test_issue1395_2(self):
   2132         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2133         txt._CHUNK_SIZE = 4
   2134 
   2135         reads = ""
   2136         while True:
   2137             c = txt.read(4)
   2138             if not c:
   2139                 break
   2140             reads += c
   2141         self.assertEqual(reads, self.normalized)
   2142 
   2143     def test_issue1395_3(self):
   2144         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2145         txt._CHUNK_SIZE = 4
   2146 
   2147         reads = txt.read(4)
   2148         reads += txt.read(4)
   2149         reads += txt.readline()
   2150         reads += txt.readline()
   2151         reads += txt.readline()
   2152         self.assertEqual(reads, self.normalized)
   2153 
   2154     def test_issue1395_4(self):
   2155         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2156         txt._CHUNK_SIZE = 4
   2157 
   2158         reads = txt.read(4)
   2159         reads += txt.read()
   2160         self.assertEqual(reads, self.normalized)
   2161 
   2162     def test_issue1395_5(self):
   2163         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2164         txt._CHUNK_SIZE = 4
   2165 
   2166         reads = txt.read(4)
   2167         pos = txt.tell()
   2168         txt.seek(0)
   2169         txt.seek(pos)
   2170         self.assertEqual(txt.read(4), "BBB\n")
   2171 
   2172     def test_issue2282(self):
   2173         buffer = self.BytesIO(self.testdata)
   2174         txt = self.TextIOWrapper(buffer, encoding="ascii")
   2175 
   2176         self.assertEqual(buffer.seekable(), txt.seekable())
   2177 
   2178     def test_append_bom(self):
   2179         # The BOM is not written again when appending to a non-empty file

   2180         filename = support.TESTFN
   2181         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
   2182             with self.open(filename, 'w', encoding=charset) as f:
   2183                 f.write('aaa')
   2184                 pos = f.tell()
   2185             with self.open(filename, 'rb') as f:
   2186                 self.assertEqual(f.read(), 'aaa'.encode(charset))
   2187 
   2188             with self.open(filename, 'a', encoding=charset) as f:
   2189                 f.write('xxx')
   2190             with self.open(filename, 'rb') as f:
   2191                 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
   2192 
   2193     def test_seek_bom(self):
   2194         # Same test, but when seeking manually

   2195         filename = support.TESTFN
   2196         for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
   2197             with self.open(filename, 'w', encoding=charset) as f:
   2198                 f.write('aaa')
   2199                 pos = f.tell()
   2200             with self.open(filename, 'r+', encoding=charset) as f:
   2201                 f.seek(pos)
   2202                 f.write('zzz')
   2203                 f.seek(0)
   2204                 f.write('bbb')
   2205             with self.open(filename, 'rb') as f:
   2206                 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
   2207 
   2208     def test_errors_property(self):
   2209         with self.open(support.TESTFN, "w") as f:
   2210             self.assertEqual(f.errors, "strict")
   2211         with self.open(support.TESTFN, "w", errors="replace") as f:
   2212             self.assertEqual(f.errors, "replace")
   2213 
   2214     @unittest.skipUnless(threading, 'Threading required for this test.')
   2215     def test_threads_write(self):
   2216         # Issue6750: concurrent writes could duplicate data

   2217         event = threading.Event()
   2218         with self.open(support.TESTFN, "w", buffering=1) as f:
   2219             def run(n):
   2220                 text = "Thread%03d\n" % n
   2221                 event.wait()
   2222                 f.write(text)
   2223             threads = [threading.Thread(target=lambda n=x: run(n))
   2224                        for x in range(20)]
   2225             for t in threads:
   2226                 t.start()
   2227             time.sleep(0.02)
   2228             event.set()
   2229             for t in threads:
   2230                 t.join()
   2231         with self.open(support.TESTFN) as f:
   2232             content = f.read()
   2233             for n in range(20):
   2234                 self.assertEqual(content.count("Thread%03d\n" % n), 1)
   2235 
   2236     def test_flush_error_on_close(self):
   2237         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2238         def bad_flush():
   2239             raise IOError()
   2240         txt.flush = bad_flush
   2241         self.assertRaises(IOError, txt.close) # exception not swallowed

   2242 
   2243     def test_multi_close(self):
   2244         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2245         txt.close()
   2246         txt.close()
   2247         txt.close()
   2248         self.assertRaises(ValueError, txt.flush)
   2249 
   2250     def test_readonly_attributes(self):
   2251         txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
   2252         buf = self.BytesIO(self.testdata)
   2253         with self.assertRaises((AttributeError, TypeError)):
   2254             txt.buffer = buf
   2255 
   2256 class CTextIOWrapperTest(TextIOWrapperTest):
   2257 
   2258     def test_initialization(self):
   2259         r = self.BytesIO(b"\xc3\xa9\n\n")
   2260         b = self.BufferedReader(r, 1000)
   2261         t = self.TextIOWrapper(b)
   2262         self.assertRaises(TypeError, t.__init__, b, newline=42)
   2263         self.assertRaises(ValueError, t.read)
   2264         self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
   2265         self.assertRaises(ValueError, t.read)
   2266 
   2267     def test_garbage_collection(self):
   2268         # C TextIOWrapper objects are collected, and collecting them flushes

   2269         # all data to disk.

   2270         # The Python version has __del__, so it ends in gc.garbage instead.

   2271         rawio = io.FileIO(support.TESTFN, "wb")
   2272         b = self.BufferedWriter(rawio)
   2273         t = self.TextIOWrapper(b, encoding="ascii")
   2274         t.write("456def")
   2275         t.x = t
   2276         wr = weakref.ref(t)
   2277         del t
   2278         support.gc_collect()
   2279         self.assertTrue(wr() is None, wr)
   2280         with self.open(support.TESTFN, "rb") as f:
   2281             self.assertEqual(f.read(), b"456def")
   2282 
   2283 class PyTextIOWrapperTest(TextIOWrapperTest):
   2284     pass
   2285 
   2286 
   2287 class IncrementalNewlineDecoderTest(unittest.TestCase):
   2288 
   2289     def check_newline_decoding_utf8(self, decoder):
   2290         # UTF-8 specific tests for a newline decoder

   2291         def _check_decode(b, s, **kwargs):
   2292             # We exercise getstate() / setstate() as well as decode()

   2293             state = decoder.getstate()
   2294             self.assertEqual(decoder.decode(b, **kwargs), s)
   2295             decoder.setstate(state)
   2296             self.assertEqual(decoder.decode(b, **kwargs), s)
   2297 
   2298         _check_decode(b'\xe8\xa2\x88', "\u8888")
   2299 
   2300         _check_decode(b'\xe8', "")
   2301         _check_decode(b'\xa2', "")
   2302         _check_decode(b'\x88', "\u8888")
   2303 
   2304         _check_decode(b'\xe8', "")
   2305         _check_decode(b'\xa2', "")
   2306         _check_decode(b'\x88', "\u8888")
   2307 
   2308         _check_decode(b'\xe8', "")
   2309         self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
   2310 
   2311         decoder.reset()
   2312         _check_decode(b'\n', "\n")
   2313         _check_decode(b'\r', "")
   2314         _check_decode(b'', "\n", final=True)
   2315         _check_decode(b'\r', "\n", final=True)
   2316 
   2317         _check_decode(b'\r', "")
   2318         _check_decode(b'a', "\na")
   2319 
   2320         _check_decode(b'\r\r\n', "\n\n")
   2321         _check_decode(b'\r', "")
   2322         _check_decode(b'\r', "\n")
   2323         _check_decode(b'\na', "\na")
   2324 
   2325         _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
   2326         _check_decode(b'\xe8\xa2\x88', "\u8888")
   2327         _check_decode(b'\n', "\n")
   2328         _check_decode(b'\xe8\xa2\x88\r', "\u8888")
   2329         _check_decode(b'\n', "\n")
   2330 
   2331     def check_newline_decoding(self, decoder, encoding):
   2332         result = []
   2333         if encoding is not None:
   2334             encoder = codecs.getincrementalencoder(encoding)()
   2335             def _decode_bytewise(s):
   2336                 # Decode one byte at a time

   2337                 for b in encoder.encode(s):
   2338                     result.append(decoder.decode(b))
   2339         else:
   2340             encoder = None
   2341             def _decode_bytewise(s):
   2342                 # Decode one char at a time

   2343                 for c in s:
   2344                     result.append(decoder.decode(c))
   2345         self.assertEqual(decoder.newlines, None)
   2346         _decode_bytewise("abc\n\r")
   2347         self.assertEqual(decoder.newlines, '\n')
   2348         _decode_bytewise("\nabc")
   2349         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
   2350         _decode_bytewise("abc\r")
   2351         self.assertEqual(decoder.newlines, ('\n', '\r\n'))
   2352         _decode_bytewise("abc")
   2353         self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
   2354         _decode_bytewise("abc\r")
   2355         self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
   2356         decoder.reset()
   2357         input = "abc"
   2358         if encoder is not None:
   2359             encoder.reset()
   2360             input = encoder.encode(input)
   2361         self.assertEqual(decoder.decode(input), "abc")
   2362         self.assertEqual(decoder.newlines, None)
   2363 
   2364     def test_newline_decoder(self):
   2365         encodings = (
   2366             # None meaning the IncrementalNewlineDecoder takes unicode input

   2367             # rather than bytes input

   2368             None, 'utf-8', 'latin-1',
   2369             'utf-16', 'utf-16-le', 'utf-16-be',
   2370             'utf-32', 'utf-32-le', 'utf-32-be',
   2371         )
   2372         for enc in encodings:
   2373             decoder = enc and codecs.getincrementaldecoder(enc)()
   2374             decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
   2375             self.check_newline_decoding(decoder, enc)
   2376         decoder = codecs.getincrementaldecoder("utf-8")()
   2377         decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
   2378         self.check_newline_decoding_utf8(decoder)
   2379 
   2380     def test_newline_bytes(self):
   2381         # Issue 5433: Excessive optimization in IncrementalNewlineDecoder

   2382         def _check(dec):
   2383             self.assertEqual(dec.newlines, None)
   2384             self.assertEqual(dec.decode("\u0D00"), "\u0D00")
   2385             self.assertEqual(dec.newlines, None)
   2386             self.assertEqual(dec.decode("\u0A00"), "\u0A00")
   2387             self.assertEqual(dec.newlines, None)
   2388         dec = self.IncrementalNewlineDecoder(None, translate=False)
   2389         _check(dec)
   2390         dec = self.IncrementalNewlineDecoder(None, translate=True)
   2391         _check(dec)
   2392 
   2393 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
   2394     pass
   2395 
   2396 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
   2397     pass
   2398 
   2399 
   2400 # XXX Tests for open()

   2401 
   2402 class MiscIOTest(unittest.TestCase):
   2403 
   2404     def tearDown(self):
   2405         support.unlink(support.TESTFN)
   2406 
   2407     def test___all__(self):
   2408         for name in self.io.__all__:
   2409             obj = getattr(self.io, name, None)
   2410             self.assertTrue(obj is not None, name)
   2411             if name == "open":
   2412                 continue
   2413             elif "error" in name.lower() or name == "UnsupportedOperation":
   2414                 self.assertTrue(issubclass(obj, Exception), name)
   2415             elif not name.startswith("SEEK_"):
   2416                 self.assertTrue(issubclass(obj, self.IOBase))
   2417 
   2418     def test_attributes(self):
   2419         f = self.open(support.TESTFN, "wb", buffering=0)
   2420         self.assertEqual(f.mode, "wb")
   2421         f.close()
   2422 
   2423         f = self.open(support.TESTFN, "U")
   2424         self.assertEqual(f.name,            support.TESTFN)
   2425         self.assertEqual(f.buffer.name,     support.TESTFN)
   2426         self.assertEqual(f.buffer.raw.name, support.TESTFN)
   2427         self.assertEqual(f.mode,            "U")
   2428         self.assertEqual(f.buffer.mode,     "rb")
   2429         self.assertEqual(f.buffer.raw.mode, "rb")
   2430         f.close()
   2431 
   2432         f = self.open(support.TESTFN, "w+")
   2433         self.assertEqual(f.mode,            "w+")
   2434         self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?

   2435         self.assertEqual(f.buffer.raw.mode, "rb+")
   2436 
   2437         g = self.open(f.fileno(), "wb", closefd=False)
   2438         self.assertEqual(g.mode,     "wb")
   2439         self.assertEqual(g.raw.mode, "wb")
   2440         self.assertEqual(g.name,     f.fileno())
   2441         self.assertEqual(g.raw.name, f.fileno())
   2442         f.close()
   2443         g.close()
   2444 
   2445     def test_io_after_close(self):
   2446         for kwargs in [
   2447                 {"mode": "w"},
   2448                 {"mode": "wb"},
   2449                 {"mode": "w", "buffering": 1},
   2450                 {"mode": "w", "buffering": 2},
   2451                 {"mode": "wb", "buffering": 0},
   2452                 {"mode": "r"},
   2453                 {"mode": "rb"},
   2454                 {"mode": "r", "buffering": 1},
   2455                 {"mode": "r", "buffering": 2},
   2456                 {"mode": "rb", "buffering": 0},
   2457                 {"mode": "w+"},
   2458                 {"mode": "w+b"},
   2459                 {"mode": "w+", "buffering": 1},
   2460                 {"mode": "w+", "buffering": 2},
   2461                 {"mode": "w+b", "buffering": 0},
   2462             ]:
   2463             f = self.open(support.TESTFN, **kwargs)
   2464             f.close()
   2465             self.assertRaises(ValueError, f.flush)
   2466             self.assertRaises(ValueError, f.fileno)
   2467             self.assertRaises(ValueError, f.isatty)
   2468             self.assertRaises(ValueError, f.__iter__)
   2469             if hasattr(f, "peek"):
   2470                 self.assertRaises(ValueError, f.peek, 1)
   2471             self.assertRaises(ValueError, f.read)
   2472             if hasattr(f, "read1"):
   2473                 self.assertRaises(ValueError, f.read1, 1024)
   2474             if hasattr(f, "readall"):
   2475                 self.assertRaises(ValueError, f.readall)
   2476             if hasattr(f, "readinto"):
   2477                 self.assertRaises(ValueError, f.readinto, bytearray(1024))
   2478             self.assertRaises(ValueError, f.readline)
   2479             self.assertRaises(ValueError, f.readlines)
   2480             self.assertRaises(ValueError, f.seek, 0)
   2481             self.assertRaises(ValueError, f.tell)
   2482             self.assertRaises(ValueError, f.truncate)
   2483             self.assertRaises(ValueError, f.write,
   2484                               b"" if "b" in kwargs['mode'] else "")
   2485             self.assertRaises(ValueError, f.writelines, [])
   2486             self.assertRaises(ValueError, next, f)
   2487 
   2488     def test_blockingioerror(self):
   2489         # Various BlockingIOError issues

   2490         self.assertRaises(TypeError, self.BlockingIOError)
   2491         self.assertRaises(TypeError, self.BlockingIOError, 1)
   2492         self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
   2493         self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
   2494         b = self.BlockingIOError(1, "")
   2495         self.assertEqual(b.characters_written, 0)
   2496         class C(unicode):
   2497             pass
   2498         c = C("")
   2499         b = self.BlockingIOError(1, c)
   2500         c.b = b
   2501         b.c = c
   2502         wr = weakref.ref(c)
   2503         del c, b
   2504         support.gc_collect()
   2505         self.assertTrue(wr() is None, wr)
   2506 
   2507     def test_abcs(self):
   2508         # Test the visible base classes are ABCs.

   2509         self.assertIsInstance(self.IOBase, abc.ABCMeta)
   2510         self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
   2511         self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
   2512         self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
   2513 
   2514     def _check_abc_inheritance(self, abcmodule):
   2515         with self.open(support.TESTFN, "wb", buffering=0) as f:
   2516             self.assertIsInstance(f, abcmodule.IOBase)
   2517             self.assertIsInstance(f, abcmodule.RawIOBase)
   2518             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
   2519             self.assertNotIsInstance(f, abcmodule.TextIOBase)
   2520         with self.open(support.TESTFN, "wb") as f:
   2521             self.assertIsInstance(f, abcmodule.IOBase)
   2522             self.assertNotIsInstance(f, abcmodule.RawIOBase)
   2523             self.assertIsInstance(f, abcmodule.BufferedIOBase)
   2524             self.assertNotIsInstance(f, abcmodule.TextIOBase)
   2525         with self.open(support.TESTFN, "w") as f:
   2526             self.assertIsInstance(f, abcmodule.IOBase)
   2527             self.assertNotIsInstance(f, abcmodule.RawIOBase)
   2528             self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
   2529             self.assertIsInstance(f, abcmodule.TextIOBase)
   2530 
   2531     def test_abc_inheritance(self):
   2532         # Test implementations inherit from their respective ABCs

   2533         self._check_abc_inheritance(self)
   2534 
   2535     def test_abc_inheritance_official(self):
   2536         # Test implementations inherit from the official ABCs of the

   2537         # baseline "io" module.

   2538         self._check_abc_inheritance(io)
   2539 
   2540 class CMiscIOTest(MiscIOTest):
   2541     io = io
   2542 
   2543 class PyMiscIOTest(MiscIOTest):
   2544     io = pyio
   2545 
   2546 
   2547 @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
   2548 class SignalsTest(unittest.TestCase):
   2549 
   2550     def setUp(self):
   2551         self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
   2552 
   2553     def tearDown(self):
   2554         signal.signal(signal.SIGALRM, self.oldalrm)
   2555 
   2556     def alarm_interrupt(self, sig, frame):
   2557         1 // 0
   2558 
   2559     @unittest.skipUnless(threading, 'Threading required for this test.')
   2560     def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
   2561         """Check that a partial write, when it gets interrupted, properly
   2562         invokes the signal handler, and bubbles up the exception raised
   2563         in the latter."""
   2564         read_results = []
   2565         def _read():
   2566             s = os.read(r, 1)
   2567             read_results.append(s)
   2568         t = threading.Thread(target=_read)
   2569         t.daemon = True
   2570         r, w = os.pipe()
   2571         try:
   2572             wio = self.io.open(w, **fdopen_kwargs)
   2573             t.start()
   2574             signal.alarm(1)
   2575             # Fill the pipe enough that the write will be blocking.

   2576             # It will be interrupted by the timer armed above.  Since the

   2577             # other thread has read one byte, the low-level write will

   2578             # return with a successful (partial) result rather than an EINTR.

   2579             # The buffered IO layer must check for pending signal

   2580             # handlers, which in this case will invoke alarm_interrupt().

   2581             self.assertRaises(ZeroDivisionError,
   2582                               wio.write, item * (1024 * 1024))
   2583             t.join()
   2584             # We got one byte, get another one and check that it isn't a

   2585             # repeat of the first one.

   2586             read_results.append(os.read(r, 1))
   2587             self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
   2588         finally:
   2589             os.close(w)
   2590             os.close(r)
   2591             # This is deliberate. If we didn't close the file descriptor

   2592             # before closing wio, wio would try to flush its internal

   2593             # buffer, and block again.

   2594             try:
   2595                 wio.close()
   2596             except IOError as e:
   2597                 if e.errno != errno.EBADF:
   2598                     raise
   2599 
   2600     def test_interrupted_write_unbuffered(self):
   2601         self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
   2602 
   2603     def test_interrupted_write_buffered(self):
   2604         self.check_interrupted_write(b"xy", b"xy", mode="wb")
   2605 
   2606     def test_interrupted_write_text(self):
   2607         self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
   2608 
   2609     def check_reentrant_write(self, data, **fdopen_kwargs):
   2610         def on_alarm(*args):
   2611             # Will be called reentrantly from the same thread

   2612             wio.write(data)
   2613             1/0
   2614         signal.signal(signal.SIGALRM, on_alarm)
   2615         r, w = os.pipe()
   2616         wio = self.io.open(w, **fdopen_kwargs)
   2617         try:
   2618             signal.alarm(1)
   2619             # Either the reentrant call to wio.write() fails with RuntimeError,

   2620             # or the signal handler raises ZeroDivisionError.

   2621             with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
   2622                 while 1:
   2623                     for i in range(100):
   2624                         wio.write(data)
   2625                         wio.flush()
   2626                     # Make sure the buffer doesn't fill up and block further writes

   2627                     os.read(r, len(data) * 100)
   2628             exc = cm.exception
   2629             if isinstance(exc, RuntimeError):
   2630                 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
   2631         finally:
   2632             wio.close()
   2633             os.close(r)
   2634 
   2635     def test_reentrant_write_buffered(self):
   2636         self.check_reentrant_write(b"xy", mode="wb")
   2637 
   2638     def test_reentrant_write_text(self):
   2639         self.check_reentrant_write("xy", mode="w", encoding="ascii")
   2640 
   2641     def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
   2642         """Check that a buffered read, when it gets interrupted (either
   2643         returning a partial result or EINTR), properly invokes the signal
   2644         handler and retries if the latter returned successfully."""
   2645         r, w = os.pipe()
   2646         fdopen_kwargs["closefd"] = False
   2647         def alarm_handler(sig, frame):
   2648             os.write(w, b"bar")
   2649         signal.signal(signal.SIGALRM, alarm_handler)
   2650         try:
   2651             rio = self.io.open(r, **fdopen_kwargs)
   2652             os.write(w, b"foo")
   2653             signal.alarm(1)
   2654             # Expected behaviour:

   2655             # - first raw read() returns partial b"foo"

   2656             # - second raw read() returns EINTR

   2657             # - third raw read() returns b"bar"

   2658             self.assertEqual(decode(rio.read(6)), "foobar")
   2659         finally:
   2660             rio.close()
   2661             os.close(w)
   2662             os.close(r)
   2663 
   2664     def test_interrupterd_read_retry_buffered(self):
   2665         self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
   2666                                           mode="rb")
   2667 
   2668     def test_interrupterd_read_retry_text(self):
   2669         self.check_interrupted_read_retry(lambda x: x,
   2670                                           mode="r")
   2671 
   2672     @unittest.skipUnless(threading, 'Threading required for this test.')
   2673     def check_interrupted_write_retry(self, item, **fdopen_kwargs):
   2674         """Check that a buffered write, when it gets interrupted (either
   2675         returning a partial result or EINTR), properly invokes the signal
   2676         handler and retries if the latter returned successfully."""
   2677         select = support.import_module("select")
   2678         # A quantity that exceeds the buffer size of an anonymous pipe's

   2679         # write end.

   2680         N = 1024 * 1024
   2681         r, w = os.pipe()
   2682         fdopen_kwargs["closefd"] = False
   2683         # We need a separate thread to read from the pipe and allow the

   2684         # write() to finish.  This thread is started after the SIGALRM is

   2685         # received (forcing a first EINTR in write()).

   2686         read_results = []
   2687         write_finished = False
   2688         def _read():
   2689             while not write_finished:
   2690                 while r in select.select([r], [], [], 1.0)[0]:
   2691                     s = os.read(r, 1024)
   2692                     read_results.append(s)
   2693         t = threading.Thread(target=_read)
   2694         t.daemon = True
   2695         def alarm1(sig, frame):
   2696             signal.signal(signal.SIGALRM, alarm2)
   2697             signal.alarm(1)
   2698         def alarm2(sig, frame):
   2699             t.start()
   2700         signal.signal(signal.SIGALRM, alarm1)
   2701         try:
   2702             wio = self.io.open(w, **fdopen_kwargs)
   2703             signal.alarm(1)
   2704             # Expected behaviour:

   2705             # - first raw write() is partial (because of the limited pipe buffer

   2706             #   and the first alarm)

   2707             # - second raw write() returns EINTR (because of the second alarm)

   2708             # - subsequent write()s are successful (either partial or complete)

   2709             self.assertEqual(N, wio.write(item * N))
   2710             wio.flush()
   2711             write_finished = True
   2712             t.join()
   2713             self.assertEqual(N, sum(len(x) for x in read_results))
   2714         finally:
   2715             write_finished = True
   2716             os.close(w)
   2717             os.close(r)
   2718             # This is deliberate. If we didn't close the file descriptor

   2719             # before closing wio, wio would try to flush its internal

   2720             # buffer, and could block (in case of failure).

   2721             try:
   2722                 wio.close()
   2723             except IOError as e:
   2724                 if e.errno != errno.EBADF:
   2725                     raise
   2726 
   2727     def test_interrupterd_write_retry_buffered(self):
   2728         self.check_interrupted_write_retry(b"x", mode="wb")
   2729 
   2730     def test_interrupterd_write_retry_text(self):
   2731         self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
   2732 
   2733 
   2734 class CSignalsTest(SignalsTest):
   2735     io = io
   2736 
   2737 class PySignalsTest(SignalsTest):
   2738     io = pyio
   2739 
   2740     # Handling reentrancy issues would slow down _pyio even more, so the

   2741     # tests are disabled.

   2742     test_reentrant_write_buffered = None
   2743     test_reentrant_write_text = None
   2744 
   2745 
   2746 def test_main():
   2747     tests = (CIOTest, PyIOTest,
   2748              CBufferedReaderTest, PyBufferedReaderTest,
   2749              CBufferedWriterTest, PyBufferedWriterTest,
   2750              CBufferedRWPairTest, PyBufferedRWPairTest,
   2751              CBufferedRandomTest, PyBufferedRandomTest,
   2752              StatefulIncrementalDecoderTest,
   2753              CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
   2754              CTextIOWrapperTest, PyTextIOWrapperTest,
   2755              CMiscIOTest, PyMiscIOTest,
   2756              CSignalsTest, PySignalsTest,
   2757              )
   2758 
   2759     # Put the namespaces of the IO module we are testing and some useful mock

   2760     # classes in the __dict__ of each test.

   2761     mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
   2762              MockNonBlockWriterIO, MockRawIOWithoutRead)
   2763     all_members = io.__all__ + ["IncrementalNewlineDecoder"]
   2764     c_io_ns = dict((name, getattr(io, name)) for name in all_members)
   2765     py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
   2766     globs = globals()
   2767     c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
   2768     py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
   2769     # Avoid turning open into a bound method.

   2770     py_io_ns["open"] = pyio.OpenWrapper
   2771     for test in tests:
   2772         if test.__name__.startswith("C"):
   2773             for name, obj in c_io_ns.items():
   2774                 setattr(test, name, obj)
   2775         elif test.__name__.startswith("Py"):
   2776             for name, obj in py_io_ns.items():
   2777                 setattr(test, name, obj)
   2778 
   2779     support.run_unittest(*tests)
   2780 
   2781 if __name__ == "__main__":
   2782     test_main()
   2783